Source code for ascat.cf_array

# Copyright (c) 2025, TU Wien
# All rights reserved.

# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#    * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#    * Neither the name of TU Wien, Department of Geodesy and Geoinformation
#      nor the names of its contributors may be used to endorse or promote
#      products derived from this software without specific prior written
#      permission.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations

from typing import Union, Sequence

import numpy as np
import xarray as xr


[docs] def check_orthomulti_ts(ds): # Assumptions: # - two dimensions [DONE] # - single variable with only the sample dimension (e.g. time) [TODO] # - data variables have sample and instance dimension [TODO] # - data variables have ALL instance dimension coordinates listed as coordinates [TODO] if len(ds.dims) == 2: for v in ds.variables: if "cf_role" in ds[v].attrs and ds[v].attrs["cf_role"] == "timeseries_id": return True return False
[docs] def cf_array_type(ds): if ds.attrs.get("featureType") == "point": return "point" for v in ds.variables: if "instance_dimension" in ds[v].attrs: return "indexed" if "sample_dimension" in ds[v].attrs: return "contiguous" if check_orthomulti_ts(ds): return "orthomulti_ts" raise ValueError("Array type could not be determined.")
[docs] def cf_array_class(ds, array_type, **kwargs): if array_type == "point": return TimeseriesPointArray(ds, **kwargs) if array_type == "indexed": return RaggedArray(ds, **kwargs) if array_type == "contiguous": return RaggedArray(ds, **kwargs) if array_type == "orthomulti_ts": return OrthoMultiTimeseriesArray(ds, **kwargs) raise ValueError(f"Array type '{array_type}' not recognized." "Should be one of 'point', 'indexed', 'contiguous', 'orthomulti_ts'.")
[docs] def point_to_indexed( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, index_var: str = "locationIndex", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: coord_vars = coord_vars or [] instance_vars = instance_vars or [] instance_vars = [timeseries_id] + instance_vars _, unique_index_1d, instanceIndex = np.unique( ds[timeseries_id], return_index=True, return_inverse=True ) ds[index_var] = (sample_dim, instanceIndex, {"instance_dimension": instance_dim}) for var in instance_vars: if var in ds: ds = ds.assign( {var: (instance_dim, ds[var][unique_index_1d].data, ds[var].attrs)} ) if var in coord_vars: ds = ds.set_coords(var) ds = ds.assign_attrs({"featureType": "timeSeries"}) return ds
[docs] def point_to_contiguous( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: coord_vars = coord_vars or [] sort_vars = sort_vars or [] instance_vars = instance_vars or [] instance_vars = [timeseries_id] + instance_vars ds = ds.sortby([timeseries_id, *sort_vars]) _, unique_index_1d, row_size = np.unique( ds[timeseries_id], return_index=True, return_counts=True ) ds[count_var] = ("locations", row_size, {"sample_dimension": sample_dim}) for var in instance_vars: if var in ds: encoding = ds[var].encoding ds[var] = (instance_dim, ds[var][unique_index_1d].data, ds[var].attrs) ds[var].encoding = encoding if var in coord_vars: ds = ds.set_coords(var) ds = ds.assign_attrs({"featureType": "timeSeries"}) return ds
[docs] def contiguous_to_indexed( ds: xr.Dataset, sample_dim: str, instance_dim: str, count_var: str, index_var: str, ) -> xr.Dataset: """ Convert a contiguous ragged array dataset to an indexed ragged array dataset. """ row_size = np.where(ds[count_var].data > 0, ds[count_var].data, 0) locationIndex = np.repeat(np.arange(row_size.size), row_size) ds = ds.assign( { index_var: ( sample_dim, locationIndex, {"instance_dimension": instance_dim}, ) } ).drop_vars([count_var]) # put locationIndex as first var ds = ds[[index_var] + [var for var in ds.variables if var != index_var]] return ds
[docs] def indexed_to_contiguous( ds: xr.Dataset, sample_dim: str, instance_dim: str, count_var: str, index_var: str, sort_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: """ Convert an indexed ragged array dataset to a contiguous ragged array dataset """ sort_vars = sort_vars or [] ds = ds.sortby([index_var, *sort_vars]) idxs, sizes = np.unique(ds[index_var], return_counts=True) row_size = np.zeros_like(ds[instance_dim].data) row_size[idxs] = sizes ds = ds.assign( {count_var: (instance_dim, row_size, {"sample_dimension": sample_dim})} ).drop_vars([index_var]) return ds
[docs] def indexed_to_point( ds: xr.Dataset, sample_dim: str, instance_dim: str, index_var: str ): instance_vars = [var for var in ds.variables if instance_dim in ds[var].dims] for instance_var in instance_vars: ds = ds.assign( { instance_var: ( sample_dim, ds[instance_var][ds[index_var]].data, ds[instance_var].attrs, ) } ) ds = ds.drop_vars([index_var]).assign_attrs({"featureType": "point"}) return ds
[docs] def contiguous_to_point( ds: xr.Dataset, sample_dim: str, instance_dim: str, count_var: str, ): """Convert a contiguous ragged array dataset to a Point Array. Parameters ---------- ds : xarray.Dataset Dataset. sample_dim : str Name of the sample dimension. instance_dim : str Name of the instance dimension. count_var : str Name of the count variable. Returns ------- xarray.Dataset Dataset with only the time series variables. """ row_size = ds[count_var].values ds = ds.drop_vars(count_var) instance_vars = [var for var in ds.variables if instance_dim in ds[var].dims] for instance_var in instance_vars: ds = ds.assign( { instance_var: ( sample_dim, np.repeat(ds[instance_var].values, row_size), ds[instance_var].attrs, ) } ) ds = ds.assign_attrs({"featureType": "point"}) return ds
[docs] class CFDiscreteGeom: def __init__( self, xarray_obj: xr.Dataset, coord_vars: Union[Sequence[str], None] = None, instance_vars: Union[Sequence[str], None] = None, contiguous_sort_vars: Union[Sequence[str], None] = None, ): """ Parameters ---------- xarray_obj : xarray.Dataset Xarray dataset. coord_vars : Sequence[str], optional Coordinate variables, by default None. instance_vars : Sequence[str], optional Instance variables, by default None. contiguous_sort_vars : Sequence[str], optional Variables that each timeseries should be sorted by in contiguous ragged array format. """ self._data = xarray_obj self._coord_vars = coord_vars or [ "lon", "lat", "alt", "longitude", "latitude", "altitude", ] self._instance_vars = instance_vars or [ "lon", "lat", "alt", "longitude", "latitude", "altitude", "location_description", ] self._contiguous_sort_vars = contiguous_sort_vars or [ "time", ] self._ra_type = None self._sample_dimension = None self._instance_dimension = None self._count_var = None self._index_var = None self._timeseries_id = None self.array_type @property def array_type(): raise NotImplementedError
[docs] class PointArray(CFDiscreteGeom): pass
[docs] class TimeseriesPointArray(PointArray): """ Assumptions made beyond basic CF conventions: - cf_role="timeseries_id" is used to identify the timeseries ID variable for purposes of selecting instances and converting to ragged arrays. If you only have a single timeseries there's not much point in using this class. """ @property def array_type(self) -> str: if self._ra_type is None: if self._data.attrs["featureType"] == "point": self._ra_type = "point" self._sample_dimension = str(list(self._data.dims)[0]) else: raise ValueError( "Dataset is not a point array" "(should have featureType='point' in attributes)." ) return self._ra_type @property def timeseries_id(self): if self._timeseries_id is not None: return self._timeseries_id for v in self._data.variables: if cf_role := self._data[v].attrs.get("cf_role"): if cf_role == "timeseries_id": self._timeseries_id = v return self.timeseries_id raise ValueError( "Timeseries ID could not be determined from dataset attributes." )
[docs] def sel_instances( self, instance_vals: Union[Sequence[Union[int, str]], np.ndarray, None] = None, instance_lookup_vector: Union[np.ndarray, None] = None, timeseries_id: str = "location_id", ): ds = self._data return self._select_instances( ds, self._sample_dimension, instance_vals, instance_lookup_vector, timeseries_id, )
[docs] def to_indexed_ragged( self, instance_dim: str = "locations", timeseries_id: str = "location_id", index_var: str = "locationIndex", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: return self._point_to_indexed( self._data, self._sample_dimension, instance_dim, timeseries_id, index_var, instance_vars or self._instance_vars, coord_vars or self._coord_vars, )
[docs] def to_contiguous_ragged( self, instance_dim: str = "locations", timeseries_id: str = "location_id", count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: return self._point_to_contiguous( self._data, self._sample_dimension, instance_dim, timeseries_id, count_var, instance_vars or self._instance_vars, coord_vars or self._coord_vars, sort_vars or self._contiguous_sort_vars, )
[docs] def to_orthomulti( self, instance_dim: str = "locations", timeseries_id: str = "location_id", count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, ): return self._point_to_orthomulti( self._data, self._sample_dimension, instance_dim, timeseries_id, count_var, instance_vars or self._instance_vars, coord_vars or self._coord_vars, sort_vars or self._contiguous_sort_vars, )
[docs] def resample_to_orthomulti( self, instance_dim: str = "locations", timeseries_id: str = "location_id", count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, vars_to_resample: Union[Sequence[str], None] = None, resample_method: callable = np.mean, resample_period: str = "1M", ): return self._resample_point_to_orthomulti( self._data, self._sample_dimension, instance_dim, timeseries_id, count_var, instance_vars or self._instance_vars, coord_vars or self._coord_vars, sort_vars or self._contiguous_sort_vars, vars_to_resample, resample_method, resample_period, )
[docs] def to_point_array(self): return self._data
[docs] def set_sample_dimension(self, sample_dim: str): if self._sample_dimension != sample_dim: self._data = self._data.rename_dims({self._sample_dimension: sample_dim}) self._sample_dimension = sample_dim return self._data
@staticmethod def _select_instances( ds: xr.Dataset, sample_dim: str, instance_vals: Union[Sequence[Union[int, str]], np.ndarray, None] = None, instance_lookup_vector: Union[np.ndarray, None] = None, timeseries_id: str = "location_id", ) -> xr.Dataset: if not ds.chunks: ds = ds.chunk({sample_dim: -1}) if instance_vals is None: instance_vals = [] if instance_lookup_vector is not None: sample_idx = instance_lookup_vector[ds[timeseries_id]] return ds.sel({sample_dim: sample_idx}) sample_idx = np.isin(ds[timeseries_id], instance_vals) return ds.sel({sample_dim: sample_idx}) @staticmethod def _point_to_indexed( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, index_var: str = "locationIndex", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: return point_to_indexed( ds, sample_dim, instance_dim, timeseries_id, index_var, instance_vars, coord_vars, ) @staticmethod def _point_to_contiguous( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: return point_to_contiguous( ds, sample_dim, instance_dim, timeseries_id, count_var, instance_vars, coord_vars, sort_vars, ) @staticmethod def _point_to_orthomulti( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: """ At the moment, minimum resolution is 1D """ ds = ds.rename({sample_dim: "time"}).set_xindex("time") ds = ds.set_index(event=["time", timeseries_id]).unstack("event") for c in ds.coords: if "time" in ds[c].dims and c != "time": ds[c] = ds[c].max("time", keep_attrs=True) ds.attrs.pop("featureType") return ds @staticmethod def _resample_point_to_orthomulti( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, count_var: str = "row_size", instance_vars: Union[Sequence[str], None] = None, coord_vars: Union[Sequence[str], None] = None, sort_vars: Union[Sequence[str], None] = None, vars_to_resample: Union[Sequence[str], None] = None, resample_method: callable = np.mean, resample_period: str = "1M", ) -> xr.Dataset: """ At the moment, minimum resolution is 1D """ ds = ds.rename({sample_dim: "time"}).set_xindex("time") ds = ds.set_index(event=["time", timeseries_id]).unstack("event") ds = ds.resample(time=resample_period).apply(resample_method) ds.attrs.pop("featureType") return ds
[docs] class RaggedArray(CFDiscreteGeom): @property def array_type(self): if self._ra_type is not None: return self._ra_type ds = self._data for v in ds.variables: if "instance_dimension" in ds[v].attrs: self._ra_type = "indexed" self._index_var = v self._instance_dimension = ds[v].attrs["instance_dimension"] self._sample_dimension = str(ds[v].dims[0]) return self._ra_type if "sample_dimension" in ds[v].attrs: self._ra_type = "contiguous" self._count_var = v self._sample_dimension = ds[v].attrs["sample_dimension"] if len(ds[v].dims) > 0: self._instance_dimension = ds[v].dims[0] return self._ra_type raise ValueError("Ragged array type could not be determined.") @property def timeseries_id(self): if self._timeseries_id is not None: return self._timeseries_id for v in self._data.variables: if cf_role := self._data[v].attrs.get("cf_role"): if cf_role == "timeseries_id": self._timeseries_id = v return self.timeseries_id raise ValueError( "Timeseries ID could not be determined from dataset attributes." )
[docs] def to_indexed_ragged( self, index_var: str = "locationIndex" ) -> xr.Dataset: if self.array_type == "indexed": return self._data elif self.array_type == "contiguous": if self._index_var is None: self._index_var = index_var return self._contiguous_to_indexed( self._data, self._sample_dimension, self._instance_dimension, self._count_var, self._index_var, )
[docs] def to_contiguous_ragged( self, count_var: str = "row_size", sort_vars: Union[Sequence[str], None] = None ) -> xr.Dataset: if self.array_type == "contiguous": return self._data elif self.array_type == "indexed": if self._count_var is None: self._count_var = count_var return self._indexed_to_contiguous( self._data, self._sample_dimension, self._instance_dimension, self._count_var, self._index_var, sort_vars=sort_vars or self._contiguous_sort_vars, )
[docs] def to_point_array(self): if self.array_type == "indexed": return self._indexed_to_point( self._data, self._sample_dimension, self._instance_dimension, self._index_var, ) if self.array_type == "contiguous": return self._contiguous_to_point( self._data, self._sample_dimension, self._instance_dimension, self._count_var, )
[docs] def sel_instances( self, instance_vals: Union[Sequence[Union[int, str]], np.ndarray, None] = None, instance_lookup_vector: Union[np.ndarray, None] = None, ) -> xr.Dataset: if self.array_type == "indexed": # convert to point array, select there, convert back\ ds = self.to_point_array() instances = ds.cf_geom.sel_instances( instance_vals=instance_vals, instance_lookup_vector=instance_lookup_vector, ) return instances.cf_geom.to_indexed_ragged(index_var=self._index_var) if self.array_type == "contiguous": return self._select_instances_contiguous( self._data, self._sample_dimension, self._instance_dimension, self.timeseries_id, self._count_var, instance_vals=instance_vals, instance_lookup_vector=instance_lookup_vector, )
[docs] def set_sample_dimension(self, sample_dim: str): if self._sample_dimension != sample_dim: self._data = self._data.rename_dims({self._sample_dimension: sample_dim}) if self.array_type == "contiguous": self._data[self._count_var].attrs["sample_dimension"] = sample_dim self._sample_dimension = sample_dim return self._data
@staticmethod def _select_instances_contiguous( ds: xr.Dataset, sample_dim: str, instance_dim: str, timeseries_id: str, count_var: str, instance_vals: Union[Sequence[int], np.ndarray, None] = None, instance_lookup_vector: Union[np.ndarray, None] = None, ) -> xr.Dataset: if instance_vals is None: instance_vals = [] # For contiguous using the lookup vector would be slower, so if we get only that, # we'll just turn it into an instance_vals array. if len(instance_vals) == 0: if instance_lookup_vector is not None and sum(instance_lookup_vector) > 0: instance_vals = np.where(instance_lookup_vector)[0] def get_single_instance_idxs(ds, instance_val): instances_idx = np.where(ds[timeseries_id] == instance_val)[0] if len(instances_idx) == 0: return None instances_idx = int(instances_idx[0]) sample_start = int( ds[count_var].isel({instance_dim: slice(0, instances_idx)}).sum().values ) sample_end = int( sample_start + ds[count_var].isel({instance_dim: instances_idx}).values ) return sample_start, sample_end, instances_idx def select_single_instance(ds, sample_start, sample_end, instances_idx): return ds.isel( { sample_dim: slice(sample_start, sample_end), instance_dim: instances_idx, } ) def select_several_instances(ds, sample_starts, sample_ends, instances_idxs): sample_idxs = np.concatenate( [range(start, end) for start, end in zip(sample_starts, sample_ends) if end > start] ) return ds.isel({sample_dim: sample_idxs, instance_dim: np.array(instances_idxs)}) if len(instance_vals) == 1: if get_single_instance_idxs(ds, instance_vals[0]) is None: return None return select_single_instance(ds, *get_single_instance_idxs(ds, instance_vals[0])) else: instance_vals = np.unique(instance_vals) ds[count_var].load() ds[timeseries_id].load() results = [get_single_instance_idxs(ds, instance_val) for instance_val in instance_vals] results = [r for r in results if r is not None] if len(results) == 0: return None if not ds.chunks: ds = ds.chunk({sample_dim: -1}) return select_several_instances( ds, *zip(*results) ) @staticmethod def _contiguous_to_indexed( ds: xr.Dataset, sample_dim: str, instance_dim: str, count_var: str, index_var: str, ) -> xr.Dataset: """ Convert a contiguous ragged array dataset to an indexed ragged array dataset. """ return contiguous_to_indexed(ds, sample_dim, instance_dim, count_var, index_var) @staticmethod def _indexed_to_contiguous( ds: xr.Dataset, sample_dim: str, instance_dim: str, count_var: str, index_var: str, sort_vars: Union[Sequence[str], None] = None, ) -> xr.Dataset: """ Convert an indexed ragged array dataset to a contiguous ragged array dataset """ return indexed_to_contiguous( ds, sample_dim, instance_dim, count_var, index_var, sort_vars ) @staticmethod def _indexed_to_point( ds: xr.Dataset, sample_dim: str, instance_dim: str, index_var: str ): return indexed_to_point(ds, sample_dim, instance_dim, index_var) @staticmethod def _contiguous_to_point( ds: xr.Dataset, sample_dim: str, instance_dim: str, count_var: str, ): """Convert a ragged array dataset to a Point Array. Parameters ---------- ds : xarray.Dataset Dataset. Returns ------- xarray.Dataset Dataset with only the time series variables. """ return contiguous_to_point(ds, sample_dim, instance_dim, count_var)
[docs] class OrthoMultiTimeseriesArray(CFDiscreteGeom): @property def array_type(self): if check_orthomulti_ts(self._data): for v in self._data.variables: if "cf_role" in self._data[v].attrs and self._data[v].attrs["cf_role"] == "timeseries_id": self._timeseries_id = v self._instance_dimension = self._data[v].dims[0] break return "orthomulti_ts" else: raise ValueError("Dataset is not an orthomulti timeseries array.")
[docs] def sel_instances( self, instance_vals: Union[Sequence[Union[int, str]], np.ndarray, None] = None, instance_lookup_vector: Union[np.ndarray, None] = None, ): """ Select requested timeseries instances from an orthomulti timeseries array dataset. Parameters ---------- instance_vals : Union[Sequence[Union[int, str]], np.ndarray], optional List of instance values to select, by default None instance_lookup_vector : Union[np.ndarray], optional Lookup vector for instance values, by default None """ return self._select_instances( self._data, self._instance_dimension, self._timeseries_id, instance_vals, instance_lookup_vector, )
[docs] def set_sample_dimension(self, sample_dim: str): if self._sample_dimension != sample_dim: self._data = self._data.rename_dims({self._sample_dimension: sample_dim}) self._sample_dimension = sample_dim return self._data
[docs] def to_raster(self, x_var, y_var): return self._data.reset_index(self._timeseries_id)\ .set_index({self._instance_dimension: [x_var, y_var]})\ .unstack(self._instance_dimension)
@staticmethod def _select_instances( ds: xr.Dataset, instance_dim: str, timeseries_id: str, instance_vals: Union[Sequence[Union[int, str]], np.ndarray, None] = None, instance_lookup_vector: Union[np.ndarray, None] = None, ) -> xr.Dataset: """ Selects requested instances from an orthomulti timeseries array dataset. Returns a dataset containing the requested instances. If instances are requested that are not in the dataset, no error will be thrown. """ if instance_lookup_vector is not None: instance_bool = instance_lookup_vector[ds[timeseries_id]] else: instance_bool = np.isin(ds[timeseries_id], instance_vals) return ds.sel({instance_dim: instance_bool})