# Copyright (c) 2025, TU Wien
# All rights reserved.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import tempfile
from pathlib import Path
import numpy as np
import xarray as xr
from ascat.cf_array import indexed_to_contiguous
from ascat.cf_array import contiguous_to_indexed
dtype_to_nan = {
np.dtype("int8"): -2**7,
np.dtype("uint8"): 2**8 - 1,
np.dtype("int16"): -2**15,
np.dtype("uint16"): 2**16 - 1,
np.dtype("int32"): -2**31,
np.dtype("uint32"): 2**32 - 1,
np.dtype("int64"): -2**63,
np.dtype("uint64"): 2**64 - 1,
np.dtype("float32"): -2**31,
np.dtype("float64"): -2**31,
np.dtype("<M8[ns]"): 0,
np.dtype("<M8[s]"): 0,
}
[docs]
def verify_ortho_multi(ds: xr.Dataset, instance_dim: str,
element_dim: str) -> None:
"""
Verify dataset follows orthogonal multidimensional array CF definition.
Parameters
----------
ds : xarray.Dataset
Dataset to be verified.
instance_dim : str
Name of the instance dimension.
element_dim : str
Name of the element dimension.
Returns
-------
sample_dimension : str
Name of the sample dimension.
Raises
------
RuntimeError if verification fails.
"""
# check that instance dimension exists
if instance_dim not in ds.dims:
raise RuntimeError(f"Instance dimension is missing '{instance_dim}'")
# check that element dimension exists
if element_dim not in ds.dims:
raise RuntimeError(f"Element dimension is missing '{element_dim}'")
[docs]
def verify_contiguous_ragged(ds: xr.Dataset, count_var: str,
instance_dim: str) -> None:
"""
Verify dataset follows contiguous ragged array CF definition.
Parameters
----------
ds : xarray.Dataset
Dataset to be verified.
count_var : str
Name of the count variable.
Count variable contains the length of each time series feature. It is
identified by having an attribute with name 'sample_dimension' whose
value is name of the sample dimension. The count variable implicitly
partitions into individual instances all variables that have the
sample dimension.
Raises
------
RuntimeError if verification fails.
"""
# check that count variable exists
if count_var not in ds:
raise RuntimeError(f"Count variable is missing: {count_var}")
# check that count variable contains sample_dimension attribute
if "sample_dimension" not in ds[count_var].attrs:
raise RuntimeError(f"Count variable '{count_var}' has no "
"sample_dimension attribute")
# check that count variable has instance_dimension as single dimension
if ds[count_var].dims != (instance_dim,):
raise RuntimeError(f"Count variable '{count_var}' must have the "
f"instance dimension '{instance_dim}' as its "
"single dimension")
[docs]
def verify_indexed_ragged(ds: xr.Dataset, index_var: str,
sample_dim: str) -> None:
"""
Verify dataset follows indexed ragged array CF definition.
Parameters
----------
ds : xarray.Dataset
Dataset.
index_var : str
The index variable can be identified by having an attribute with
name of instance_dimension whose value is the instance dimension.
sample_dim : str
Name of the sample dimension.
Raises
------
RuntimeError if verification fails.
"""
# check that index variable exists
if index_var not in ds:
raise RuntimeError(f"Index variable is missing: {index_var}")
# check that index variable must have sample dimension as single dimension
if ds[index_var].dims != (sample_dim,):
raise RuntimeError(f"Index variable '{index_var}' must have the "
f"sample dimension '{sample_dim}' as its "
"single dimension")
# check that index variable has instance_dimension attribute
if "instance_dimension" not in ds[index_var].attrs:
raise RuntimeError(f"Index variable '{index_var}' has no "
"instance_dimension attribute")
[docs]
def verify_point_array(ds: xr.Dataset, sample_dim: str) -> None:
"""
Verify dataset follows the CF point data array convention.
Parameters
----------
ds : xarray.Dataset
Dataset to be verified.
sample_dim : str
Name of the sample dimension.
Raises
------
RuntimeError if verification fails.
"""
# check that the sample_dim exists
if sample_dim not in ds.dims:
raise RuntimeError(f"Sample dimension '{sample_dim}' is missing.")
# check all data and coordinate variables have only the sample_dim
for var in ds.variables:
dims = ds[var].dims
if ds[var].ndim > 0 and dims != (sample_dim,):
raise RuntimeError(
f"Variable '{var}' does not conform to point structure "
f"(dims: {dims}). All variables must use only the sample_dim ('{sample_dim}')."
)
[docs]
class PointData:
"""
Point data represent scattered locations and times with no implied
relationship among of coordinate positions, both data and coordinates must
share the same (sample) instance dimension.
"""
def __init__(self, ds: xr.Dataset, sample_dim: str):
"""
Initialize.
Parameters
----------
ds : xarray.Dataset
Dataset to be verified.
sample_dim : str
Name of the sample dimension. The sample dimension indicates the
number of instances (e.g. stations, locations).
"""
self.sample_dim = sample_dim
self._data = ds
self.validate()
[docs]
def validate(self):
"""Validate format."""
verify_point_array(self.ds, self.sample_dim)
@property
def ds(self):
return self._data
[docs]
def to_indexed(self, index_var: str = "obs", instance_dim: str = "loc"):
"""
Convert point data to indexed ragged array.
Parameters
----------
index_var : str
Name of the new index variable to be added.
instance_dim : str
Name of the instance dimension.
Returns
-------
indexed : IndexedRaggedArray
Indexed ragged array object.
"""
if instance_dim not in self.ds:
raise ValueError(
f"'{instance_dim}' must be a coordinate in the dataset")
instance_ids, inverse_index = np.unique(
self.ds[instance_dim], return_inverse=True)
# create index variable (mapping each sample to its instance)
index_data = xr.DataArray(
inverse_index,
dims=(self.sample_dim,),
attrs={"instance_dimension": instance_dim})
new_ds = self.ds.copy()
new_ds[index_var] = index_data
new_ds = new_ds.assign_coords({instance_dim: instance_ids})
return IndexedRaggedArray(
new_ds, index_var=index_var, sample_dim=self.sample_dim)
[docs]
def to_contiguous(self,
count_var: str = "row_size",
instance_dim: str = "loc"):
"""
Convert point data to contiguous ragged array.
Parameters
----------
count_var : str
Name of the new count variable to be added (default: 'row_size').
instance_dim : str
Name of the instance dimension (default: 'loc').
Returns
-------
contiguous : ContiguousRaggedArray
Contiguous ragged array object.
"""
if instance_dim not in self.ds:
raise ValueError(
f"'{instance_dim}' must be a coordinate in the dataset")
group = self.ds.groupby(self.ds[instance_dim])
row_sizes = group.groups
instance_ids = list(row_sizes.keys())
counts = np.array([len(row_sizes[i]) for i in instance_ids])
sort_index = np.argsort(self.ds[instance_dim].values)
sorted_ds = self.ds.isel({self.sample_dim: sort_index})
count_data = xr.DataArray(
counts,
dims=(instance_dim,),
attrs={"sample_dimension": self.sample_dim})
new_ds = sorted_ds.copy()
new_ds[count_var] = count_data
if instance_dim in new_ds.coords:
new_ds = new_ds.assign_coords(
{instance_dim: np.array(instance_ids)})
return ContiguousRaggedArray(
new_ds, count_var=count_var, instance_dim=instance_dim)
[docs]
class OrthoMultiArray:
"""
Orthogonal multidimensional array.
Attributes
----------
instance_dim : str
Name of the instance dimension.
element_dim : str
Element dimension name.
ds : xarray.Dataset
Orthomulti array dataset.
instance_variables : list
List of instance variables.
Methods
-------
sel_instance(i)
Read time series for given instance.
iter()
Yield time series for each instance.
"""
def __init__(self,
ds: xr.Dataset,
instance_dim: str = "loc",
element_dim: str = "time"):
"""
Initialize.
Parameters
----------
ds : xr.Dataset
Data stored in orthomulti array format.
instance_dim : str
Instance dimension name.
element_dim : str
Element dimension name.
"""
self.instance_dim = instance_dim
self.element_dim = element_dim
self._data = ds
self.validate()
# if instance_dim exists as a variable, it is assumed these are there
# instance identifiers used as keys/reading
if self.instance_dim in ds:
self._instances = ds[self.instance_dim].to_numpy()
self._instance_lookup = np.zeros(
self._instances.max() + 1,
dtype=np.int64) + self._instances.size
self._instance_lookup[self._instances] = np.arange(
self._instances.size)
else:
self._instances = np.arange(ds.sizes[instance_dim])
self._instance_lookup = np.arange(self._instances.size)
[docs]
def validate(self):
"""Validate format."""
verify_ortho_multi(self.ds, self.instance_dim, self.element_dim)
@property
def ds(self):
return self._data
[docs]
def sel_instance(self, instance_id: int):
"""Read time series"""
return self.ds.sel({self.instance_dim: instance_id})
def __iter__(self):
"""Iterator over time series"""
for instance in self.ds[self.instance_dim]:
yield self.sel_instance(instance)
[docs]
def iter(self):
"""Explicit iterator method"""
return self.__iter__()
[docs]
def to_point_data(self):
raise NotImplementedError(
"Conversion to point data not implemented yet.")
[docs]
def to_indexed(self):
raise NotImplementedError(
"Conversion to indexed ragged not implemented yet.")
[docs]
def to_contiguous(self):
raise NotImplementedError(
"Conversion to contiguous ragged not implemented yet.")
[docs]
def apply(self, func):
pass
[docs]
class ContiguousRaggedArray:
"""
Contiguous ragged array representation (CF convention).
In an contiguous ragged array representation, the dataset for all time
series are stored in a single 1D array. Additional variables or
dimensions provide the metadata needed to map these values back to their
respective time series.
The contiguous ragged array representation can be used only if the size
of each instance is known at the time that it is created. In this
representation the data for each instance will be contiguous on disk.
If the instance dimension exists as a variable, it is assumed that the
values represent the identifiers for each instance otherwise they are
count upwards from 0.
Attributes
----------
instance_dim : str
Name of the instance dimension.
sample_dim : str
Name of the sample dimension. The variable bearing the
sample_dimension attribute (i.e. count_var) must have the instance
dimension as its single dimension, and must have an integer type.
count_var : str
Name of the count variable. The count variable must be an integer
type and must have the instance dimension as its sole dimension.
The count variable are identifiable by the presence of an attribute,
sample_dimension, found on the count variable, which names the sample
dimension being counted.
ds : xarray.Dataset
Contiguous ragged array dataset.
instance_variables : list
List of instance variables.
instance_ids : list
List of instance ids.
Methods
-------
sel_instance(i)
Read time series for given instance.
iter()
Yield time series for each instance.
"""
def __init__(self,
ds: xr.Dataset,
count_var: str,
instance_dim: str,
instance_id_var: str = None):
"""
Initialize.
Parameters
----------
ds : xr.Dataset
Data stored in contiguous ragged array format.
count_var : str
Count variable name.
instance_dim : str
Instance dimension name.
instance_id_var: str, optional
Variable used as instance identifier (default: None).
"""
self.count_var = count_var
self.instance_dim = instance_dim
self._data = ds
self.validate()
self.sample_dim = ds[count_var].attrs["sample_dimension"]
self.instance_id_var = instance_id_var
self._lut = None
# cache row_size and instance_ids data
self._row_size = self.ds[self.count_var].to_numpy()
if self.instance_id_var is None:
self._instance_ids = self.ds[self.instance_dim].to_numpy()
else:
self._instance_ids = self.ds[self.instance_id_var].to_numpy()
self._set_instance_lut()
[docs]
def validate(self):
"""Validate format."""
verify_contiguous_ragged(self.ds, self.count_var, self.instance_dim)
def _set_instance_lut(self):
"""
Set instance lookup-table.
"""
self._instance_lookup = np.zeros(
self._instance_ids.max() + 1,
dtype=np.int64) + self._instance_ids.size
self._instance_lookup[self._instance_ids] = np.arange(
self._instance_ids.size)
self._lut = np.zeros(self._instance_ids.max() + 1, dtype=bool)
[docs]
@classmethod
def from_file(cls,
filename: str,
count_var: str,
instance_dim: str,
instance_id_var: str = None,
**kwargs):
"""
Load time series from file.
Parameters
----------
filename : str
Filename.
count_var : str
Count variable name.
instance_dim : str
Instance dimension name.
instance_id_var: str, optional
Variable used as instance identifier (default: None).
Returns
-------
data : ContiguousRaggedArray
ContiguousRaggedArray object loaded from a file.
"""
ds = xr.open_dataset(filename, **kwargs)
verify_contiguous_ragged(ds, count_var, instance_dim)
return cls(ds, count_var, instance_dim, instance_id_var)
@property
def ds(self):
"""
Dataset.
Returns
-------
ds : xr.Dataset
Contiguous ragged array dataset.
"""
return self._data
@property
def size(self) -> list:
"""
Number of instances.
Returns
-------
instance_ids : int
Number of instance.
"""
return self._instance_ids.size
@property
def instance_ids(self) -> list:
"""
Instance ids
Returns
-------
instance_ids : list of int
Instance ids.
"""
return self._instance_ids
@property
def instance_variables(self) -> list:
"""
Instance variables.
Returns
-------
instance_variables : list of str
Instance variables.
"""
return [
var for var in self.ds.variables
if (self.ds[var].dims == (self.sample_dim,)) and
(var != self.sample_dim)
]
[docs]
def get_instance_variables(self, include_dtype: bool = False) -> list:
"""
Instance variables.
Returns
-------
instance_variables : list of str
Instance variables.
"""
if include_dtype:
instance_variables = [
(var, self.ds[var].dtype)
for var in self.ds.variables
if (self.ds[var].dims == (self.sample_dim,)) and
(var != self.sample_dim)
]
else:
instance_variables = [
var for var in self.ds.variables
if (self.ds[var].dims == (self.sample_dim,)) and
(var != self.sample_dim)
]
return instance_variables
[docs]
def sel_instance(self, i: int):
"""Read time series"""
try:
idx = self._instance_lookup[i]
except IndexError:
data = None
else:
if idx == self._instance_ids.size:
data = None
else:
start = self._row_size[:idx].sum()
end = start + self._row_size[idx]
data = self.ds.isel({
self.sample_dim: slice(start, end),
self.instance_dim: idx
})
return data
[docs]
def sel_instances(self, i: np.ndarray) -> xr.Dataset:
"""
Read time series for given instance IDs using a LUT and preserve order.
Parameters
----------
i : np.ndarray
Array of instance IDs.
Returns
-------
ds : xr.Dataset
Dataset containing the selected instances in the correct order.
"""
i = np.asarray(i)
idx_array = self._instance_lookup[i]
# mark selected instances in lookup table
self._lut[idx_array] = True
# map each sample index to its instance index
obs = np.repeat(np.arange(len(self._row_size)), self._row_size)
data = self.ds.sel({
self.sample_dim: self._lut[obs],
self.instance_dim: idx_array,
})
# reset lookup table
self._lut[:] = False
# sort the selected data to match the order in i
# assuming instance_dim is coordinate-based and one-dimensional
_, order = np.unique(i, return_inverse=True)
data = data.isel({self.instance_dim: order})
data = data.assign_coords({self.instance_dim: i})
return data
def __iter__(self):
"""
Iterator over instances.
Returns
-------
ds : xr.Dataset
Time series for instance.
"""
for i in self.instance_ids:
yield self.sel_instance(i)
[docs]
def iter(self):
"""
Explicit iterator method.
Returns
-------
ds : xr.Dataset
Time series for instance.
"""
return self.__iter__()
[docs]
def to_indexed(self):
"""
Convert to indexed ragged array.
Returns
-------
data : IndexedRaggedArray
Indexed ragged array time series.
"""
ds = contiguous_to_indexed(self.ds, self.sample_dim, self.instance_dim,
self.count_var, self.sample_dim)
return IndexedRaggedArray(ds, self.sample_dim, self.sample_dim)
[docs]
def to_orthomulti(self):
"""
Convert to orthogonal multidimensional array.
Returns
-------
data : OrthoMultiArray
Orthogonal multidimensional array time series.
"""
# element length defined by longest time series
element_len = self._row_size.max()
instance_len = self.instance_ids.size
# compute matrix coordinates
x = np.arange(self._row_size.size).repeat(self._row_size)
y = vrange(np.zeros_like(self._row_size), self._row_size)
shape = (instance_len, element_len)
reshaped_ds = xr.Dataset(
{
var: ([self.instance_dim, self.sample_dim
], pad_to_2d(self.ds[var], x, y, shape)
) for var in self.instance_variables
},
coords={
self.instance_dim: self.instance_ids,
},
)
for var in self.instance_variables:
reshaped_ds[var].encoding["_FillValue"] = dtype_to_nan[
reshaped_ds[var].dtype]
return OrthoMultiArray(reshaped_ds, self.instance_dim, self.sample_dim)
[docs]
def to_point_data(self):
raise NotImplementedError(
"Conversion to point data not implemented yet.")
[docs]
def apply(self, func):
"""
Apply function on each instance.
"""
return self.ds.groupby(self.sample_dim).map(func)
[docs]
class IndexedRaggedArray:
"""
Indexed ragged array representation (CF convention).
In an indexed ragged array representation, the dataset is structured
to store variable-length data (e.g., time series with varying lengths)
compactly. To achieve this, auxiliary indexing variables that map the
flat array storage to meaningful groups (e.g. locations).
If the instance dimension exists as a variable, it is assumed that the
values represent the identfiers for each instance otherwise they counting
upwards from 0.
Attributes
----------
index_var : str
The indexed ragged array representation must contain an index
variable, which must be an integer type, and must have the sample
dimension as its single dimension.
The index variable can be identified by having an attribute
'instance_dimension' whose value is the instance dimension.
sample_dim : str
Name of the sample dimension. The sample dimension indicates the
number of instances (e.g. stations, locations).
instance_dim : str
The name of the instance dimension. The value is defined by
the 'instance_dimension' attribute, which must be present on the
index variable. All variables having the instance dimension are
instance variables, i.e. variables holding time series data.
ds : xarray.Dataset
Indexed ragged array dataset.
instance_variables : list
List of instance variables.
instance_ids : list
List of instance ids.
Methods
-------
sel_instance(i)
Read time series for given instance.
iter()
Yield time series for each instance.
"""
def __init__(self, ds: xr.Dataset, index_var: str, sample_dim: str):
"""
Initialize.
Parameters
----------
ds : xr.Dataset
Data in indexed ragged array structure.
index_var : str
Index variable name.
sample_dim : str
Sample dimension name.
"""
self.index_var = index_var
self.sample_dim = sample_dim
self._data = ds.set_coords(self.index_var).set_xindex(self.index_var)
self.validate()
self.instance_dim = ds[index_var].attrs["instance_dimension"]
self._instance_lut = None
self._lut = None
self._set_instance_lut()
[docs]
def validate(self):
"""Validate format."""
verify_indexed_ragged(self.ds, self.index_var, self.sample_dim)
def __repr__(self):
""""""
return self.ds.__repr__()
def _set_instance_lut(self):
"""
Set instance lookup-table.
"""
if self.instance_dim in self.ds:
instance_ids = self.ds[self.instance_dim].to_numpy()
self._instance_lut = np.zeros(
instance_ids.max() + 1, dtype=np.int64) - 1
self._instance_lut[instance_ids] = np.arange(instance_ids.size)
else:
instance_ids = np.unique(self.ds[self.index_var])
self._instance_lut = np.arange(instance_ids.size)
self._lut = np.zeros(instance_ids.max() + 1, dtype=bool)
[docs]
@classmethod
def from_file(cls, filename: str, index_var: str, sample_dim: str):
"""
Read data from file.
Parameters
----------
filename : str
Filename.
index_var : str
Index variable name.
sample_dim : str
Sample dimension name.
Returns
-------
data : IndexRaggedArray
IndexRaggedArray object loaded from a file.
"""
ds = xr.open_dataset(filename)
verify_indexed_ragged(ds, index_var, sample_dim)
return cls(ds, index_var, sample_dim)
[docs]
def save(self, filename: str):
"""
Write data to file.
Parameters
----------
filename : str
Filename.
"""
suffix = Path(filename).suffix
if suffix == ".nc":
self.ds.to_netcdf(filename)
elif suffix == ".zarr":
self.ds.to_zarr(filename)
else:
raise ValueError(f"Unknown file suffix '{suffix}' "
"(.nc and .zarr supported)")
@property
def ds(self) -> xr.Dataset:
"""
Dataset.
Returns
-------
ds : xr.Dataset
Indexed ragged array dataset.
"""
return self._data
@property
def size(self) -> list:
"""
Number of instances.
Returns
-------
instance_ids : int
Number of instance.
"""
return self.instance_ids.size
@property
def instance_ids(self) -> list:
"""
Instance ids.
Returns
-------
instance_ids : list of int
Instance ids.
"""
return self.ds[self.instance_dim].values
@property
def instance_variables(self) -> list:
"""
Instance variables.
Returns
-------
instance_variables : list of str
Instance variables.
"""
return [
var for var in self.ds.variables
if (self.ds[var].dims == (self.sample_dim,)) and
(var != self.sample_dim)
]
[docs]
def sel_instance(self, i: int) -> xr.Dataset:
"""
Read time series.
Parameters
----------
i : int
Instance identifier.
Returns
-------
ds : xr.Dataset
Time series for instance.
"""
data = self.ds.sel({
self.index_var: self._instance_lut[i],
self.instance_dim: i
})
# reset index variable or drop index variable (my preference)?
data[self.index_var] = (self.sample_dim,
np.zeros(data[self.index_var].size, dtype=int))
return data
[docs]
def sel_instances(self,
i: np.array,
ignore_missing: bool = True) -> xr.Dataset:
"""
Select multiple instances (time series).
Parameters
----------
i : numpy.array
Instance identifier.
Returns
-------
ds : xr.Dataset
Time series for instance.
"""
# check for duplicates in instance identifier?
i = np.asarray(i)
# check if any missing instance ids have been selected
if ignore_missing:
valid = np.where(self._instance_lut[i] != -1)[0]
if valid.size == 0:
raise ValueError("No valid instances selected")
else:
if np.any(self._instance_lut[np.asarray(i)] == -1):
raise ValueError("Missing instances selected")
else:
valid = np.ones(i.size, dtype=bool)
i = i[valid]
# initialize look-up table
self._lut[self._instance_lut[i]] = True
data = self.ds.sel(
{self.index_var: self._lut[self.ds[self.index_var]]})
# reset index variable
index = data[self.instance_dim][data[self.index_var]].to_numpy()
lut = np.zeros(index.max() + 1, dtype=np.int64)
# n_inst = self.ds.sel({self.instance_dim: i}).sizes[self.instance_dim]
n_inst = i.size
lut[i] = np.arange(n_inst)
data[self.index_var] = (self.sample_dim, lut[index])
data = data.sel({self.instance_dim: i})
# copy attributes
data[self.index_var].attrs = self.ds[self.index_var].attrs
# reset look-up table
self._lut[:] = False
return IndexedRaggedArray(data, self.index_var, self.sample_dim)
def __iter__(self) -> xr.Dataset:
"""
Iterator over instances.
Returns
-------
ds : xr.Dataset
Time series for instance.
"""
for i in self.instance_ids:
yield self.sel_instance(i)
[docs]
def iter(self) -> xr.Dataset:
"""
Explicit iterator method.
Returns
-------
ds : xr.Dataset
Time series for instance.
"""
return self.__iter__()
[docs]
def to_contiguous(self,
count_var: str = "row_size") -> ContiguousRaggedArray:
"""
Convert to contiguous ragged array.
Parameters
----------
count_var : str, optional
Count variable (default: "row_size").
Returns
-------
data : ContiguousRaggedArray
Contiguous ragged array time series.
"""
ds = indexed_to_contiguous(self.ds, self.sample_dim, self.instance_dim,
count_var, self.index_var)
return ContiguousRaggedArray(ds, self.instance_dim, self.instance_dim,
count_var)
[docs]
def to_orthomulti(self) -> OrthoMultiArray:
"""
Convert to orthogonal multidimensional array.
Returns
-------
data : OrthoMultiArray
Orthogonal multidimensional array time series.
"""
ds = self.ds.sortby([self.index_var])
_, row_size = np.unique(ds[self.index_var], return_counts=True)
# size defined by longest time series
element_len = row_size.max()
instance_len = ds.sizes[self.instance_dim]
# compute matrix coordinates
x = np.arange(row_size.size).repeat(row_size)
y = vrange(np.zeros_like(row_size), row_size)
shape = (instance_len, element_len)
reshaped_ds = xr.Dataset(
{
var: ([self.instance_dim, self.sample_dim],
pad_to_2d(ds[var], x, y, shape)) for var in ds.data_vars
},
coords={
self.instance_dim: ds[self.instance_dim],
},
)
return OrthoMultiArray(reshaped_ds, self.instance_dim, self.sample_dim)
[docs]
def to_point_data(self):
raise NotImplementedError(
"Conversion to point data not implemented yet.")
[docs]
def apply(self, func):
"""
Apply function on each instance.
"""
# return self.ds.groupby(self.sample_dim).apply(func)
return self.ds.groupby(self.sample_dim).map(func)
[docs]
def append(self, ds: xr.Dataset):
"""
Append indexed ragged array time series.
Parameters
----------
ds : xarray.Dataset
Indexed ragged array time series.
"""
verify_indexed_ragged(ds, self.index_var, self.sample_dim)
# use instance_id in index variable if instance dimension is a variable
if self.instance_dim in self.ds:
self.ds[self.index_var] = self.ds[self.instance_dim][self.ds[
self.index_var]]
if self.instance_dim in ds:
# assume this has been set or not?
ds = ds.set_coords(self.index_var).set_xindex(self.index_var)
ds[self.index_var] = (
self.sample_dim,
ds[self.instance_dim].values[ds[self.index_var]])
self._data = xr.combine_nested([self._data, ds], self.sample_dim)
if self.instance_dim in self.ds:
instance_ids = self.ds[self.instance_dim].to_numpy()
lut = np.zeros(instance_ids.max() + 1, dtype=np.int64)
lut[instance_ids] = np.arange(self.ds[self.instance_dim].size)
self.ds[self.index_var] = (self.sample_dim,
lut[self.ds[self.index_var]])
self._set_instance_lut()
[docs]
def vrange(starts, stops):
"""
Create concatenated ranges of integers for multiple start/stop values.
Parameters
----------
starts : numpy.ndarray
Starts for each range.
stops : numpy.ndarray
Stops for each range (same shape as starts).
Returns
-------
ranges : numpy.ndarray
Concatenated ranges.
Example
-------
>>> starts = [1, 3, 4, 6]
>>> stops = [1, 5, 7, 6]
>>> vrange(starts, stops)
array([3, 4, 4, 5, 6])
"""
if starts.shape != stops.shape:
raise ValueError("starts and stops must have the same shape")
stops = np.asarray(stops)
l = stops - starts # lengths of each range
return np.repeat(stops - l.cumsum(), l) + np.arange(l.sum())
[docs]
def pad_to_2d(var: xr.DataArray, x: np.array, y: np.array,
shape: tuple) -> np.array:
"""
Pad each time series
Parameters
----------
var : xarray.DataArray
1d array to be converted into 2d array.
x : np.array
Row indices.
y : np.array
Column indices.
shape : tuple
Array shape.
Returns
-------
padded : numpy.array
Padded 2d array.
"""
padded = np.full(shape, dtype_to_nan[var.dtype], dtype=var.dtype)
padded[x, y] = var.values
padded[np.isnan(padded)] = dtype_to_nan[var.dtype]
return padded
[docs]
def create_contiguous_ragged():
"""Create CF-compliant contiguous ragged array."""
count_var = "row_size"
sample_dim = "obs"
instance_dim = "loc"
row_size = np.array([5, 10, 2, 10])
n_obs = row_size.sum()
n_instances = row_size.size
attrs = {"sample_dimension": sample_dim}
data_vars = {
"precipitation": ((sample_dim,), 10 * np.random.randn(n_obs)),
"temperature": ((sample_dim,), 15 + 8 * np.random.randn(n_obs)),
count_var: ((instance_dim,), row_size, attrs)
}
location_id = np.random.choice(
np.arange(100), size=n_instances, replace=False)
coords = {
instance_dim: (instance_dim, np.arange(n_instances)),
"lon": ((instance_dim,), np.random.randn(n_instances)),
"lat": ((instance_dim,), np.random.randn(n_instances)),
"location_id": ((instance_dim,), location_id)
}
ds = xr.Dataset(data_vars=data_vars, coords=coords)
verify_contiguous_ragged(ds, count_var, instance_dim)
cr = ContiguousRaggedArray(ds, count_var, instance_dim)
tmp_dir = tempfile.mkdtemp()
filename = Path(tmp_dir) / "cont_ragged.nc"
print(filename)
ds.to_netcdf(filename)
[docs]
def create_indexed_ragged():
"""Create CF-compliant indexed ragged array."""
sample_dim = "obs"
instance_dim = "loc"
index_var = "locationIndex"
n_obs = 100
location_index = np.random.choice(np.arange(10), size=n_obs)
n_instances = np.unique(location_index).size
attrs = {"instance_dimension": instance_dim}
data_vars = {
"precipitation": ((sample_dim,), 10 * np.random.randn(n_obs)),
"temperature": ((sample_dim,), 15 + 8 * np.random.randn(n_obs)),
index_var: ((sample_dim), location_index, attrs),
}
location_id = np.random.choice(
np.arange(100), size=n_instances, replace=False)
coords = {
instance_dim: (instance_dim, np.arange(n_instances)),
"lon": ((instance_dim,), np.random.randn(n_instances)),
"lat": ((instance_dim,), np.random.randn(n_instances)),
"location_id": ((instance_dim,), location_id)
}
ds = xr.Dataset(data_vars=data_vars, coords=coords)
verify_indexed_ragged(ds, index_var, sample_dim)
ir_arr = IndexedRaggedArray(ds, index_var, sample_dim)
tmp_dir = tempfile.mkdtemp()
filename = Path(tmp_dir) / "ind_ragged.nc"
print(filename)
ds.to_netcdf(filename)
[docs]
def create_ortho_multi():
"""Create CF-compliant orthomulti array."""
instance_dim = "loc"
element_dim = "time"
n_instances = 10
n_obs = 10
data_vars = {
"precipitation": ((instance_dim, element_dim),
10 * np.random.randn(n_instances, n_obs)),
"temperature": ((instance_dim, element_dim),
15 + 8 * np.random.randn(n_instances, n_obs)),
}
coords = {
instance_dim: (instance_dim, np.arange(n_instances)),
"lon": ((instance_dim,), np.random.randn(n_instances)),
"lat": ((instance_dim,), np.random.randn(n_instances)),
}
ds = xr.Dataset(data_vars=data_vars, coords=coords)
verify_ortho_multi(ds, instance_dim, element_dim)
om_arr = OrthoMultiArray(ds, instance_dim, element_dim)
tmp_dir = tempfile.mkdtemp()
filename = Path(tmp_dir) / "om.nc"
print(filename)
ds.to_netcdf(filename)
[docs]
def create_point_data():
"""Create CF-compliant point data array."""
sample_dim = "obs"
n_obs = 100
time = np.arange("2024-01-01", "2024-04-10", dtype="datetime64[D]")[:n_obs]
lat = np.random.uniform(-90, 90, size=n_obs)
lon = np.random.uniform(-180, 180, size=n_obs)
data_vars = {
"precipitation": ((sample_dim,), 10 * np.random.randn(n_obs)),
"temperature": ((sample_dim,), 15 + 8 * np.random.randn(n_obs)),
}
coords = {
sample_dim: np.arange(n_obs),
"time": (sample_dim, time),
"lat": (sample_dim, lat),
"lon": (sample_dim, lon),
}
ds = xr.Dataset(data_vars=data_vars, coords=coords)
ds["lat"].attrs = {"standard_name": "latitude", "units": "degrees_north"}
ds["lon"].attrs = {"standard_name": "longitude", "units": "degrees_east"}
ds["time"].attrs = {"standard_name": "time"}
verify_point_array(ds, sample_dim)
tmp_dir = tempfile.mkdtemp()
filename = Path(tmp_dir) / "point_data.nc"
print(filename)
ds.to_netcdf(filename)