# Copyright (c) 2025, TU Wien
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of TU Wien, Department of Geodesy and Geoinformation
# nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written
# permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
File search methods.
"""
import abc
import glob
import re
import warnings
from pathlib import Path
from datetime import timedelta
from datetime import datetime
from collections import defaultdict
import numpy as np
from tqdm import tqdm
from tqdm.dask import TqdmCallback
from dask.delayed import delayed
from dask.base import compute
[docs]
class FilenameTemplate:
"""
FilenameTemplate class.
"""
def __init__(self, root_path, fn_templ, sf_templ=None):
"""
Initialize root path, filename pattern and optional subfolder pattern.
Parameters
----------
root_path : str
Root path.
fn_templ : str
Filename (glob) pattern.
e.g. "{date}_*.{suffix}"
sf_templ : dict, optional
Subfolder pattern defined as dictionary (default: None).
Keys represent unique meta names of subfolders and values define
real folder names and/or (glob) pattern.
e.g. {"variable": "*", "tile": "EN012*"}
"""
self.root_path = Path(root_path)
self.fn_templ = fn_templ
self.sf_templ = sf_templ
@property
def template(self):
"""
Name property.
"""
if self.sf_templ is None:
filename = self.root_path / self.fn_templ
else:
filename = self.root_path.joinpath(*list(self.sf_templ.values()),
self.fn_templ)
return filename
[docs]
def build_filename(self, fn_fmt, sf_fmt=None):
"""
Create filename from format dictionary.
Parameters
----------
fn_fmt : dict
Filename format applied on filename pattern (fn_pattern).
e.g. fn_pattern = "{date}*.{suffix}"
with fn_format_dict = {"date": "20000101", "suffix": "nc"}
returns "20000101*.nc"
sf_fmt : dict of dicts
Format dictionary for subfolders. Each subfolder contains
a dictionary defining the format of the folder name.
e.g. sf_templ = {"years": {year}, "months": {month}}
with sf_format = {"years": {"year": "2000"},
"months": {"month": "02"}}
returns ["2000", "02"]
Returns
-------
filename : str
Filename with format_dict applied.
"""
fn = self.build_basename(fn_fmt)
if sf_fmt is None:
filename = self.root_path / fn
else:
sf = self.build_subfolder(sf_fmt)
filename = self.root_path.joinpath(*sf, fn)
return str(filename)
[docs]
def build_basename(self, fmt):
"""
Create file basename from format dictionary.
Parameters
----------
fmt : dict
Filename format applied on filename pattern (fn_pattern).
e.g. fn_pattern = "{date}*.{suffix}"
with fmt = {"date": "20000101", "suffix": "nc"}
returns "20000101*.nc"
Returns
-------
filename : str
Filename with format_dict applied.
"""
return self.fn_templ.format(**fmt)
[docs]
def build_subfolder(self, fmt):
"""
Create subfolder path from format dictionary.
Parameters
----------
fmt : dict of dicts
Format dictionary for subfolders. Each subfolder contains
a dictionary defining the format of the folder name.
e.g. sf_pattern = {"years": {year}, "months": {month}}
with format_dict = {"years": {"year": "2000"},
"months": {"month": "02"}}
returns ["2000", "02"]
Returns
-------
subfolder : list of str
Subfolder with format_dict applied.
"""
subfolder = []
if self.sf_templ is not None:
for name, v in self.sf_templ.items():
if fmt is not None:
if name in fmt:
subfolder.append(
self.sf_templ[name].format(**fmt[name]))
else:
subfolder.append(self.sf_templ[name])
else:
subfolder.append(self.sf_templ[name])
return subfolder
[docs]
class FileSearch:
"""
FileSearch class.
"""
def __init__(self, root_path, fn_pattern, sf_pattern=None):
"""
Initialize root path, filename pattern and optional subfolder pattern.
Parameters
----------
root_path : str
Root path.
fn_pattern : str
Filename (glob) pattern.
e.g. "{date}_*.{suffix}"
sf_pattern : dict, optional
Subfolder pattern defined as dictionary (default: None).
Keys represent unique meta names of subfolders and values define
real folder names and/or (glob) pattern.
e.g. {"variable": "*", "tile": "EN012*"}
"""
self.file_templ = FilenameTemplate(root_path, fn_pattern, sf_pattern)
[docs]
def search(self, fn_fmt, sf_fmt=None, recursive=False):
"""
Search filesystem for given pattern returning list.
Parameters
----------
fn_fmt : dict
Filename format dictionary.
sf_fmt : dict of dicts, optional
Format dictionary for subfolders (default: None).
recursive : bool, optional
If recursive is true, the pattern "**" will match any files and
zero or more directories, subdirectories and symbolic links to
directories (default: False).
Returns
-------
filenames : list of str
Return a possibly-empty list of path/file names that match.
"""
return glob.glob(
self.file_templ.build_filename(fn_fmt, sf_fmt),
recursive=recursive)
[docs]
def isearch(self, fn_fmt, sf_fmt=None, recursive=False):
"""
Search filesystem for given pattern returning iterator.
Parameters
----------
fn_fmt : dict
Filename format dictionary.
sf_fmt : dict of dicts, optional
Format dictionary for subfolders (default: None).
recursive : bool, optional
If recursive is true, the pattern "**" will match any files and
zero or more directories, subdirectories and symbolic links to
directories (default: False).
Returns
-------
filenames : iterator
Iterator which yields the same values as search() without
actually storing them all simultaneously.
"""
return glob.iglob(
self.file_templ.build_filename(fn_fmt, sf_fmt),
recursive=recursive)
[docs]
def create_search_func(self, func, recursive=False):
"""
Create custom search function returning it.
Parameters
----------
func : function
Search function with its own args/kwargs returning
a filename format dictionary and subfolder format dictionary
depending on the passed arguments.
recursive : bool, optional
If recursive is true, the pattern "**" will match any files and
zero or more directories, subdirectories and symbolic links to
directories (default: False).
Returns
-------
custom_search : function
Custom search function returning a possibly-empty list
of path/file names that match.
"""
def custom_search(*args, **kwargs):
fn_fmt, sf_fmt = func(*args, **kwargs)
return self.search(fn_fmt, sf_fmt, recursive=recursive)
return custom_search
[docs]
def create_isearch_func(self, func, recursive=False):
"""
Create custom search function returning it.
Parameters
----------
func : function
Search function with its own args/kwargs returning
a filename format dictionary and subfolder format dictionary
depending on the passed arguments.
recursive : bool, optional
If recursive is true, the pattern "**" will match any files and
zero or more directories, subdirectories and symbolic links to
directories (default: False).
Returns
-------
custom_search : function
Custom search function returning an iterator of path/file names
that match.
"""
def custom_search(*args, **kwargs):
fn_fmt, sf_fmt = func(*args, **kwargs)
return self.isearch(fn_fmt, sf_fmt, recursive=recursive)
return custom_search
[docs]
class MultiFileHandler(metaclass=abc.ABCMeta):
"""
MultiFileHandler class.
"""
def __init__(
self,
root_path,
cls,
fn_templ,
sf_templ=None,
cls_kwargs=None,
err=False,
cache_size=0,
):
"""
Initialize MultiFileHandler class.
Parameters
----------
root_path : str
Root path.
cls : class
Class reading/writing files.
fn_templ : str
Filename template (e.g. "{date}_ascat.nc").
sf_templ : dict, optional
Subfolder template defined as dictionary (default: None).
cls_kwargs : dict, optional
Class keyword arguments (default: None).
err : bool, optional
Set true if a file error should be re-raised instead of
reporting a warning.
Default: False
cache_size : int, optional
Number of files to keep in memory (default=0).
"""
self.root_path = root_path
self.cls = cls
self.ft = FilenameTemplate(root_path, fn_templ, sf_templ)
self.fid = None
self.err = err
self.cache_size = cache_size
if cache_size > 0:
self.cache = {}
if cls_kwargs is None:
self.cls_kwargs = {}
else:
self.cls_kwargs = cls_kwargs
self.fs = FileSearch(self.root_path, self.ft.fn_templ,
self.ft.sf_templ)
def __enter__(self):
"""
Context manager initialization.
"""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Exit the runtime context related to this object. The file will be
closed. The parameters describe the exception that caused the
context to be exited.
"""
self._close()
def _open(self, filename):
"""
Open file, i.e. create io class instance.
Parameters
----------
filename : str
Filename.
"""
self._close()
try:
self.fid = self.cls(filename, **self.cls_kwargs)
except IOError:
self.fid = None
if self.err:
raise
else:
warnings.warn(f"IOError: {filename}")
def _close(self):
"""
Try closing file.
"""
if self.fid is not None and hasattr(self.fid, "close"):
self.fid.close()
self.fid = None
@abc.abstractmethod
def _fmt(*args, **kwargs):
"""
Filename format and subfolder format used to read/write
individual files.
Returns
-------
fn_read_fmt : dict
Filename format.
sf_read_fmt : dict
Subfolder format.
fn_write_fmt : dict
Filename format.
sf_write_fmt : dict
Subfolder format.
"""
return
[docs]
def read(self, *fmt_args, fmt_kwargs=None, cls_kwargs=None):
"""
Read data.
Parameters
----------
fmt_args : tuple
Format arguments.
fmt_kwargs : dict, optional
Format keywords (Default: None).
cls_kwargs : dict, optional
Class keywords (Default: None).
Returns
-------
data : dict, numpy.ndarray
Data stored in file.
"""
if fmt_kwargs is None:
fmt_kwargs = {}
if cls_kwargs is None:
cls_kwargs = {}
fn_read_fmt, sf_read_fmt, _, _ = self._fmt(*fmt_args, **fmt_kwargs)
search_filename = self.ft.build_filename(fn_read_fmt, sf_read_fmt)
filename = glob.glob(search_filename)
self._open(filename)
data = None
data = self.fid.read(**cls_kwargs)
return data
[docs]
def write(self, data, *fmt_args, fmt_kwargs=None, cls_kwargs=None):
"""
Write data.
Parameters
----------
data : dict, numpy.ndarray
Data to write.
fmt_args : tuple
Format arguments.
fmt_kwargs : dict, optional
Format keywords (Default: None).
cls_kwargs : dict, optional
Class keywords (Default: None).
"""
if fmt_kwargs is None:
fmt_kwargs = {}
if cls_kwargs is None:
cls_kwargs = {}
_, _, fn_write_fmt, sf_write_fmt = self._fmt(*fmt_args, **fmt_kwargs)
filename = self.ft.build_filename(fn_write_fmt, sf_write_fmt)
self.write_file(data, filename, cls_kwargs=cls_kwargs)
[docs]
def read_file(self, filename, cls_kwargs=None):
"""
Read data for given filename.
Parameters
----------
filename : str
Filename.
"""
if self.cache_size > 0 and filename in self.cache:
return self.cache[filename]
self._open(filename)
data = self.fid.read(**cls_kwargs)
if self.cache_size > 0:
if len(self.cache) == self.cache_size:
del self.cache[next(iter(self.cache))]
self.cache[filename] = data
return data
[docs]
def write_file(self, data, filename, cls_kwargs=None):
"""
Write data for given filename.
Parameters
----------
filename : str
Filename.
"""
Path(filename).parent.mkdir(parents=True, exist_ok=True)
self._open(filename)
self.fid.write(data, **cls_kwargs)
[docs]
def search(
self,
fn_search_pattern,
sf_search_pattern=None,
custom_fn_templ=None,
custom_sf_templ=None,
):
"""
Search files for given root path and filename/folder pattern.
Returns
-------
filenames : list of str
Filenames.
"""
fn_templ = custom_fn_templ if custom_fn_templ else self.ft.fn_templ
sf_templ = custom_sf_templ if custom_sf_templ else self.ft.sf_templ
if custom_fn_templ or custom_sf_templ:
self.fs = FileSearch(self.root_path, fn_templ, sf_templ)
filenames = self.fs.search(fn_search_pattern, sf_search_pattern)
return sorted(filenames, reverse=True)
[docs]
class ChronFiles(MultiFileHandler):
"""
Managing chronological files with a date field in the filename.
"""
def __init__(
self,
root_path,
cls,
fn_templ,
sf_templ,
cls_kwargs=None,
err=True,
fn_read_fmt=None,
sf_read_fmt=None,
fn_write_fmt=None,
sf_write_fmt=None,
cache_size=0,
):
"""
Initialize ChronFiles class.
Parameters
----------
root_path : str
Root path.
cls : class
Class reading/writing files.
fn_templ : str
Filename template (e.g. "{date}_ascat.nc").
sf_templ : dict, optional
Subfolder template defined as dictionary (default: None).
cls_kwargs : dict, optional
Class keyword arguments (default: None).
err : bool, optional
Set true if a file error should be re-raised instead of
reporting a warning.
Default: False
fn_read_fmt : str or function, optional
Filename format for read operation.
sf_read_fmt : str or function, optional
Subfolder format for read operation.
fn_write_fmt : str or function, optional
Filename format for write operation.
sf_write_fmt : str or function, optional
Subfolder format for write operation.
cache_size : int, optional
Number of files to keep in memory (default=0).
"""
super().__init__(root_path, cls, fn_templ, sf_templ, cls_kwargs, err,
cache_size)
self.fn_read_fmt = fn_read_fmt
self.sf_read_fmt = sf_read_fmt
self.fn_write_fmt = fn_write_fmt
self.sf_write_fmt = sf_write_fmt
def _fmt(self, *fmt_args, **fmt_kwargs):
"""
Format filenames/filepaths.
"""
if callable(self.fn_read_fmt):
fn_read_fmt = self.fn_read_fmt(*fmt_args, **fmt_kwargs)
else:
fn_read_fmt = self.fn_read_fmt
if callable(self.sf_read_fmt):
sf_read_fmt = self.sf_read_fmt(*fmt_args, **fmt_kwargs)
else:
sf_read_fmt = self.sf_read_fmt
if callable(self.fn_write_fmt):
fn_write_fmt = self.fn_write_fmt(*fmt_args, **fmt_kwargs)
else:
fn_write_fmt = self.fn_write_fmt
if callable(self.sf_write_fmt):
sf_write_fmt = self.sf_write_fmt(*fmt_args, **fmt_kwargs)
else:
sf_write_fmt = self.sf_write_fmt
return fn_read_fmt, sf_read_fmt, fn_write_fmt, sf_write_fmt
def _parse_date(self, filename, date_field, date_field_fmt):
"""
Parse datetime from filename.
Parameters
----------
filename : str
Filename.
Returns
-------
timestamp : datetime
File timestamp.
"""
# escape special characters in the template string
escaped_template = re.escape(self.ft.fn_templ)
# replace escaped curly braces with capturing groups
pattern = re.sub(r'\\{(.*?)\\}', r'(?P<\1>.*?)', escaped_template)
match = re.match(pattern, Path(filename).name)
date_substring = match.group(date_field)
date = datetime.strptime(date_substring, date_field_fmt)
return date
def _merge_data(self, data, **kwargs):
"""
Merge datasets after reading period. Needs to be overwritten
by child class, otherwise data is returned as is.
Parameters
----------
data : list
Data.
**kwargs : dict
Additional keyword arguments to the fid's merge method.
Returns
-------
data : list
Merged data.
"""
return self.fid.merge(data, **kwargs)
[docs]
def search_date(self,
timestamp,
search_date_fmt="%Y%m%d*",
date_field="date",
date_field_fmt="%Y%m%d",
return_date=False,
**fmt_kwargs):
"""
Search files for given date.
Parameters
----------
timestamp : datetime
Search date.
search_date_fmt : str, optional
Search date string format used during file search (default: %Y%m%d*).
date_field : str, optional
Date field name (default: "date")
date_field_format : str, optional
Date field string format (default: %Y%m%d).
return_date : bool, optional
Return date parsed from filename (default: False).
Returns
-------
filenames : list of str
Filenames.
dates : list of datetime
Parsed date of filename (only returned if return_date=True).
"""
fn_read_fmt, sf_read_fmt, _, _ = self._fmt(timestamp, **fmt_kwargs)
fn_read_fmt[date_field] = timestamp.strftime(search_date_fmt)
fs = FileSearch(self.root_path, self.ft.fn_templ, self.ft.sf_templ)
def key_func(x): return self._parse_date(x, date_field, date_field_fmt)
filenames = sorted(fs.search(fn_read_fmt, sf_read_fmt), key=key_func)
if return_date:
dates = []
for filename in filenames:
dates.append(
self._parse_date(filename, date_field, date_field_fmt))
return filenames, dates
else:
return filenames
[docs]
def search_period(
self,
dt_start,
dt_end,
dt_delta=timedelta(days=1),
search_date_fmt="%Y%m%d*",
date_field="date",
date_field_fmt="%Y%m%d",
end_inclusive=True,
**fmt_kwargs
):
"""
Search files for time period.
Parameters
----------
dt_start : datetime
Start datetime.
dt_end : datetime
End datetime.
dt_delta : timedelta, optional
Time delta used to jump through search date.
search_fmt : str, optional
Search date string format used during file search (default: %Y%m%d*).
date_field : str, optional
Date field name (default: "date").
date_field_fmt : str, optional
Date field string format (default: %Y%m%d).
end_inclusive : bool, optional
Include files from a dt_delta length period beyond dt_end if True
(default: False).
Returns
-------
filenames : list of str
Filenames.
"""
filenames = []
dt_end = dt_end + dt_delta if end_inclusive else dt_end
for dt_cur in np.arange(dt_start, dt_end, dt_delta).astype(datetime):
files, dates = self.search_date(
dt_cur,
search_date_fmt=search_date_fmt,
date_field=date_field,
date_field_fmt=date_field_fmt,
return_date=True,
**fmt_kwargs,
)
for f, dt in zip(files, dates):
if f not in filenames and dt >= dt_start and dt < dt_end:
filenames.append(f)
return filenames
[docs]
def read_period(
self,
dt_start,
dt_end,
dt_delta=timedelta(days=1),
dt_buffer=timedelta(days=1),
search_date_fmt="%Y%m%d*",
date_field="date",
date_field_fmt="%Y%m%d",
end_inclusive=True,
fmt_kwargs={},
**kwargs,
):
"""
Read data for given interval.
Parameters
----------
dt_start : datetime
Start datetime.
dt_end : datetime
End datetime.
dt_delta : timedelta, optional
Time delta used to jump through search date.
dt_buffer : timedelta, optional
Search buffer used to find files which could possibly contain
data but would be left out because of dt_start.
search_date_fmt : str, optional
Search date string format used during file search (default: %Y%m%d*).
date_field : str, optional
Date field name (default: "date").
date_field_fmt : str, optional
Date field string format (default: %Y%m%d).
Returns
-------
data : dict, numpy.ndarray
Data stored in file.
"""
filenames = self.search_period(dt_start - dt_buffer, dt_end, dt_delta,
search_date_fmt, date_field,
date_field_fmt, end_inclusive, **fmt_kwargs)
data = []
for filename in filenames:
self._open(filename)
d = self.fid.read_period(dt_start, dt_end, **kwargs)
if d is not None:
data.append(d)
if data:
data = self._merge_data(data)
return data
[docs]
class Filenames:
"""
A class to handle operations on multiple filenames.
This class provides methods for reading from, writing to, and merging data from multiple files.
"""
def __init__(self, filenames):
"""
Initialize Filenames.
Parameters
----------
filenames : str, Path, or list
File path(s) to be handled.
"""
if isinstance(filenames, (str, Path)):
filenames = [filenames]
elif not isinstance(filenames, list):
raise ValueError("filenames must be a string or list of strings.")
self.filenames = [Path(f) for f in filenames]
self.cache = {}
def _read(self, filename, **kwargs):
"""
Read data from a single file.
This method should be implemented by subclasses.
Parameters
----------
filename : Path
The file to read from.
**kwargs : dict
Additional keyword arguments for reading.
Raises
------
NotImplementedError
If the method is not implemented in a subclass.
"""
raise NotImplementedError
def _merge(self, data, **kwargs):
"""
Merge multiple data objects.
This method should be implemented by subclasses.
Parameters
----------
data : list
List of data objects to merge.
**kwargs : dict
Additional keyword arguments for merging.
Raises
------
NotImplementedError
If the method is not implemented in a subclass.
"""
raise NotImplementedError
def _write(self, data, filename, **kwargs):
"""
Write data to a single file.
This method should be implemented by subclasses.
Parameters
----------
data : object
The data to write.
filename : Path
The file to write to.
**kwargs : dict
Additional keyword arguments for writing.
Raises
------
NotImplementedError
If the method is not implemented in a subclass.
"""
raise NotImplementedError
[docs]
def reprocess(self,
out_dir,
func,
parallel=False,
print_progress=False,
read_kwargs=None,
**write_kwargs):
"""
Reprocess data from all files through `func`, writing the results to `out_dir`.
Assumes that if any files have the same name, they should be merged.
Parameters
----------
out_dir : Path
Directory to write the output files. This will be prepended to the filenames.
func : function
The function to apply to the data before writing out.
parallel : bool, optional
Whether to process the data in parallel (default: False).
**kwargs : dict
Additional keyword arguments for writing.
"""
read_kwargs = read_kwargs or {}
if parallel:
read_ = delayed(self._read)
getattr_ = delayed(getattr)
func_ = delayed(func)
merge_ = delayed(self._merge)
else:
read_ = self._read
getattr_ = getattr
func_ = func
merge_ = self._merge
filenames = self.filenames
name_to_paths = defaultdict(list)
for path in filenames:
name_to_paths[path.name].append(path)
out_filenames = [out_dir / name for name in name_to_paths]
out_path_groups = list(name_to_paths.values())
if print_progress:
out_path_groups = tqdm(out_path_groups)
out_path_groups.set_description("Opening files...")
data = [merge_([func_(read_(f, **read_kwargs)) for f in paths])
for paths in out_path_groups]
self.filenames = out_filenames
self.write(data, parallel=parallel, print_progress=print_progress, **write_kwargs)
[docs]
def write(self, data, parallel=False, print_progress=False, **kwargs):
"""
Write data to file.
If there's only one filename in `self.filenames`, write provided data to that file.
If there is more than one filename, write each element of the provided data list
to the corresponding filename.
Parameters
----------
data : list of objects
The data to write. Should be a list with the same length as self.filenames,
where each element is the data to be written to the corresponding filename.
"""
if len(self.filenames) == 1 and not isinstance(data, list):
data = [data]
if len(data) == len(self.filenames):
if parallel:
write_ = delayed(self._write)
writers = [write_(d, f, **kwargs) for d, f in zip(data, self.filenames)]
if print_progress:
with TqdmCallback(desc="Writing cells to disk...", total=len(writers)):
compute(writers, scheduler="processes")
else:
compute(writers, scheduler="processes")
else:
if print_progress:
data = tqdm(data)
data.set_description("Writing cells to disk...")
for d, f in zip(data, self.filenames):
self._write(d, f, **kwargs)
else:
# Special case when the data object meant to be written to a single filename is a list
raise ValueError("Number of data objects must match number of filenames.")
return
[docs]
def read(self, parallel=False, closer_attr=None, **kwargs):
"""
Read all data from files.
Returns
-------
object
Merged data from all files.
"""
if parallel:
read_ = delayed(self._read)
getattr_ = delayed(getattr)
else:
read_ = self._read
getattr_ = getattr
data = [read_(f, **kwargs) for f in self.filenames]
if closer_attr is not None:
closers = [getattr_(d, closer_attr) for d in data if d is not None]
if parallel:
data = compute(data, scheduler="processes")[0]
if closer_attr is not None:
closers = compute(closers)[0]
data = self.merge(data)
if closer_attr is not None:
return data, closers
return data
[docs]
def iter_read(self, print_progress=False, **kwargs):
"""
Iterate over all files and yield data.
Yields
------
object
Data read from each file.
"""
if print_progress:
filenames = tqdm(self.filenames)
else:
filenames = self.filenames
size = 0
for filename in filenames:
if print_progress:
filenames.set_description(f"Opening {Path(filename).name}, total {size} bytes...")
data = self._read(filename, **kwargs)
size += self._nbytes(data)
yield data
[docs]
def iter_read_nbytes(self, max_nbytes, print_progress=False, **kwargs):
"""
Iterate over all files and yield data until the specified number of bytes is reached.
If `_read` returns dask objects, they are computed (in parallel) before merging the data.
"""
size = 0
data_list = []
for data in self.iter_read(print_progress, **kwargs):
data_size = self._nbytes(data)
size += data_size
if size > max_nbytes and size > data_size:
if print_progress:
print(f"Opened {size} bytes, reading and merging data...")
with TqdmCallback(desc="Reading data..."):
out_data = compute(*[d for d in data_list],
scheduler="processes")
print("Merging data...")
out_data = self.merge(out_data)
else:
out_data = self.merge(compute(*[d for d in data_list],
scheduler="processes"))
yield out_data
size = data_size
data_list = [data]
else:
data_list.append(data)
if data_list:
if print_progress:
print("All source files opened, reading and merging remaining data...")
yield self.merge(compute(*[d for d in data_list], scheduler="processes"))
@staticmethod
def _nbytes(data):
"""
Returns size of data object in bytes.
"""
raise NotImplementedError
[docs]
def merge(self, data):
"""
Merge data from multiple data objects.
Parameters
----------
data : list
List of data objects.
Returns
-------
object
Merged data, or None if the input list is empty.
"""
if len(data) > 1:
data = self._merge(data)
elif len(data) == 1:
data = data[0]
else:
data = None
return data
[docs]
def close(self):
"""
Close file(s).
This method can be overridden in subclasses if necessary.
"""
pass
@staticmethod
def _multi_file_closer(closers):
for closer in closers:
closer()
return
[docs]
class CsvFile(Filenames):
"""
Read and write single CSV file.
"""
def __init__(self, filename, mode="r"):
"""
Initialize Csv
Parameters
----------
filename : str
Filename
mode : str, optional
File opening mode.
"""
self.mode = mode
super().__init__(filename)
def _read(self, filename):
"""
Read data from CSV file.
Parameters
----------
filename : str
Filename.
"""
with open(filename) as fid:
header = fid.readline()
dtype = self.header2dtype(header)
data = np.loadtxt(fid, dtype)
return data
[docs]
def read_period(self, dt_start, dt_end):
"""
Read subset data from CSV file for given interval.
Parameters
----------
interval : (datetime, datetime)
Time interval to extract data.
Returns
-------
data : numpy.ndarray
Data.
"""
data = self.read()
subset = (data["date"] >= np.datetime64(dt_start)) & (
data["date"] <= np.datetime64(dt_end))
if np.sum(subset) > 0:
data = data[subset]
else:
data = None
return data
def _write(self, data, filename):
"""
Write data to CSV file.
Parameters
----------
data : numpy.ndarray
Data.
"""
header = data.dtype.__repr__()
np.savetxt(filename, data, fmt="%s", header=header)
@staticmethod
def _merge(data):
"""
Merge data.
Parameters
----------
data : list of numpy.ndarray
List of data.
Returns
-------
data : numpy.ndarray
Merged data.
"""
return np.hstack(data)
[docs]
class CsvFiles(ChronFiles):
"""
Write CSV files.
"""
def __init__(self, root_path):
"""
Initialize CvsFileRW.
Parameters
----------
root_path : str
Root path.
"""
fn_templ = "prefix_{date}_{now}_postfix.csv"
sf_templ = {"Y": "{year}", "M": "{month}"}
super().__init__(root_path, CsvFile, fn_templ, sf_templ=sf_templ)
def _fmt(self, timestamp):
"""
Definition of filename and subfolder format.
Parameters
----------
timestamp : datetime
Time stamp.
Returns
-------
fn_fmt : dict
Filename format.
sf_fmt : dict
Subfolder format.
"""
fn_read_fmt = {"date": timestamp.strftime("%Y%m%d_%H%M%S"), "now": "*"}
sf_read_fmt = {
"Y": {
"year": timestamp.strftime("%Y")
},
"M": {
"month": timestamp.strftime("%m")
},
}
fn_write_fmt = {
"date": timestamp.strftime("%Y%m%d_%H%M%S"),
"now": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
sf_write_fmt = sf_read_fmt
return fn_read_fmt, sf_read_fmt, fn_write_fmt, sf_write_fmt