Source code for datacube.api.query

# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2025 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Storage Query and Access API module
"""


import logging
import datetime
import collections
import math
import warnings
from typing import Optional, Union
import pandas

from pandas import to_datetime as pandas_to_datetime
import numpy as np
from ..index import extract_geom_from_query, strip_all_spatial_fields_from_query
from ..model import Range, Dataset
from ..utils.dates import normalise_dt, tz_aware
from odc.geo import Geometry
from odc.geo.geom import lonlat_bounds, mid_longitude

_LOG = logging.getLogger(__name__)


[docs] class GroupBy:
[docs] def __init__(self, group_by_func, dimension, units, sort_key=None, group_key=None): """ GroupBy Object :param group_by_func: Dataset -> group identifier :param dimension: dimension of the group key :param units: units of the group key :param sort_key: how to sort datasets in a group internally :param group_key: the coordinate value for a group list[Dataset] -> coord value """ self.group_by_func = group_by_func self.dimension = dimension self.units = units if sort_key is None: sort_key = group_by_func self.sort_key = sort_key if group_key is None: group_key = lambda datasets: group_by_func(datasets[0]) # noqa: E731 self.group_key = group_key
OTHER_KEYS = ('measurements', 'group_by', 'output_crs', 'resolution', 'set_nan', 'product', 'geopolygon', 'like', 'source_filter')
[docs] class Query: def __init__(self, index=None, product=None, geopolygon=None, like=None, **search_terms): """Parses search terms in preparation for querying the Data Cube Index. Create a :class:`Query` object by passing it a set of search terms as keyword arguments. >>> query = Query(product='ls5_nbar_albers', time=('2001-01-01', '2002-01-01')) Use by accessing :attr:`search_terms`: >>> query.search_terms['time'] # doctest: +NORMALIZE_WHITESPACE Range(begin=datetime.datetime(2001, 1, 1, 0, 0, tzinfo=tzutc()), \ end=datetime.datetime(2002, 1, 1, 23, 59, 59, 999999, tzinfo=tzutc())) By passing in an ``index``, the search parameters will be validated as existing on the ``product``, and a spatial search appropriate for the index driver can be extracted. Used by :meth:`datacube.Datacube.find_datasets` and :meth:`datacube.Datacube.load`. :param datacube.index.Index index: An optional `index` object, if checking of field names is desired. :param str product: name of product :type geopolygon: the spatial boundaries of the search, can be: odc.geo.geom.Geometry: A Geometry object Any string or JsonLike object that can be converted to a Geometry object. An iterable of either of the above; or None: no geopolygon defined (may be derived from like or lat/lon/x/y/crs search terms) :param xarray.Dataset like: spatio-temporal bounds of `like` are used for the search :param search_terms: * `measurements` - list of measurements to retrieve * `latitude`, `lat`, `y`, `longitude`, `lon`, `long`, `x` - tuples (min, max) bounding spatial dimensions * 'extra_dimension_name' (e.g. `z`) - tuples (min, max) bounding extra \ dimensions specified by name for 3D datasets. E.g. z=(10, 30). * `crs` - spatial coordinate reference system to interpret the spatial bounds * `group_by` - observation grouping method. One of `time`, `solar_day`. Default is `time` """ self.index = index self.product = product self.geopolygon = extract_geom_from_query(geopolygon=geopolygon, **search_terms) if 'source_filter' in search_terms and search_terms['source_filter'] is not None: self.source_filter = Query(**search_terms['source_filter']) else: self.source_filter = None search_terms = strip_all_spatial_fields_from_query(search_terms) remaining_keys = set(search_terms.keys()) - set(OTHER_KEYS) if self.index: # Retrieve known keys for extra dimensions known_dim_keys = set() if product is not None: datacube_products = index.products.search(product=product) else: datacube_products = index.products.get_all() for datacube_product in datacube_products: known_dim_keys.update(datacube_product.extra_dimensions.dims.keys()) remaining_keys -= known_dim_keys unknown_keys = remaining_keys - set(index.products.get_field_names()) # TODO: What about keys source filters, and what if the keys don't match up with this product... if unknown_keys: raise LookupError('Unknown arguments: ', unknown_keys) self.search = {} for key in remaining_keys: self.search.update(_values_to_search(**{key: search_terms[key]})) if like: assert self.geopolygon is None, "'like' with other spatial bounding parameters is not supported" self.geopolygon = getattr(like, 'extent', self.geopolygon) if 'time' not in self.search: time_coord = like.coords.get('time') if time_coord is not None: self.search['time'] = _time_to_search_dims( # convert from np.datetime64 to datetime.datetime (pandas_to_datetime(time_coord.values[0]).to_pydatetime(), pandas_to_datetime(time_coord.values[-1]).to_pydatetime()) ) @property def search_terms(self): """ Access the search terms as a dictionary. :type: dict """ kwargs = {} kwargs.update(self.search) if self.geopolygon: if self.index and self.index.supports_spatial_indexes: kwargs['geopolygon'] = self.geopolygon else: geo_bb = lonlat_bounds(self.geopolygon, resolution="auto") if math.isclose(geo_bb.bottom, geo_bb.top, abs_tol=1e-5): kwargs['lat'] = geo_bb.bottom else: kwargs['lat'] = Range(geo_bb.bottom, geo_bb.top) if math.isclose(geo_bb.left, geo_bb.right, abs_tol=1e-5): kwargs['lon'] = geo_bb.left else: kwargs['lon'] = Range(geo_bb.left, geo_bb.right) if self.product: kwargs['product'] = self.product if self.source_filter: kwargs['source_filter'] = self.source_filter.search_terms return kwargs def __repr__(self): return self.__str__() def __str__(self): return """Datacube Query: type = {type} search = {search} geopolygon = {geopolygon} """.format(type=self.product, search=self.search, geopolygon=self.geopolygon)
def _extract_time_from_ds(ds: Dataset) -> datetime.datetime: return normalise_dt(ds.center_time)
[docs] def query_group_by(group_by='time', **kwargs): """ Group by function for loading datasets :param group_by: group_by name, supported strings are - `time` (default) - `solar_day`, see :func:`datacube.api.query.solar_day` or pass a :class:`datacube.api.query.GroupBy` object. :return: :class:`datacube.api.query.GroupBy` :raises LookupError: when group_by string is not a valid dictionary key. """ if isinstance(group_by, GroupBy): return group_by if not isinstance(group_by, str): group_by = None time_grouper = GroupBy(group_by_func=_extract_time_from_ds, dimension='time', units='seconds since 1970-01-01 00:00:00') solar_day_grouper = GroupBy(group_by_func=solar_day, dimension='time', units='seconds since 1970-01-01 00:00:00', sort_key=_extract_time_from_ds, group_key=lambda datasets: _extract_time_from_ds(datasets[0])) group_by_map = { None: time_grouper, 'time': time_grouper, 'solar_day': solar_day_grouper } try: return group_by_map[group_by] except KeyError: raise LookupError( f'No group by function for {group_by}, valid options are: {group_by_map.keys()}', )
def _value_to_range(value): if isinstance(value, (str, float, int)): value = float(value) return value, value else: return float(value[0]), float(value[-1]) def _values_to_search(**kwargs): search = {} for key, value in kwargs.items(): if key.lower() in ('time', 't'): search['time'] = _time_to_search_dims(value) elif key not in ['latitude', 'lat', 'y'] + ['longitude', 'lon', 'x']: # If it's not a string, but is a sequence of length 2, then it's a Range if ( not isinstance(value, str) and isinstance(value, collections.abc.Sequence) and len(value) == 2 ): search[key] = Range(*value) # All other cases are default else: search[key] = value return search def _time_to_search_dims(time_range): with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) tr_start, tr_end = time_range, time_range if hasattr(time_range, '__iter__') and not isinstance(time_range, str): tmp = list(time_range) if len(tmp) > 2: raise ValueError("Please supply start and end date only for time query") tr_start, tr_end = tmp[0], tmp[-1] if isinstance(tr_start, (int, float)) or isinstance(tr_end, (int, float)): raise TypeError("Time dimension must be provided as a datetime or a string") if tr_start is None: start = datetime.datetime.fromtimestamp(0) elif not isinstance(tr_start, datetime.datetime): # convert to datetime.datetime if hasattr(tr_start, 'isoformat'): tr_start = tr_start.isoformat() start = pandas_to_datetime(tr_start).to_pydatetime() else: start = tr_start if tr_end is None: tr_end = datetime.datetime.now().strftime("%Y-%m-%d") # Attempt conversion to isoformat # allows pandas.Period to handle datetime objects if hasattr(tr_end, 'isoformat'): tr_end = tr_end.isoformat() # get end of period to ensure range is inclusive end = pandas.Period(tr_end).end_time.to_pydatetime() tr = Range(tz_aware(start), tz_aware(end)) if start == end: return tr[0] return tr def _convert_to_solar_time(utc, longitude): seconds_per_degree = 240 offset_seconds = int(longitude * seconds_per_degree) offset = datetime.timedelta(seconds=offset_seconds) return utc + offset def _ds_mid_longitude(dataset: Dataset) -> Optional[float]: m = dataset.metadata if hasattr(m, 'lon'): lon = m.lon return (lon.begin + lon.end)*0.5 return None
[docs] def solar_day(dataset: Dataset, longitude: Optional[float] = None) -> np.datetime64: """ Adjust Dataset timestamp for "local time" given location and convert to numpy. :param dataset: Dataset object from which to read time and location :param longitude: If supplied correct timestamp for this longitude, rather than mid-point of the Dataset's footprint """ utc = dataset.center_time.astimezone(datetime.timezone.utc) if longitude is None: _lon = _ds_mid_longitude(dataset) if _lon is None: raise ValueError('Cannot compute solar_day: dataset is missing spatial info') longitude = _lon solar_time = _convert_to_solar_time(utc, longitude) return np.datetime64(solar_time.date(), 'D')
def solar_offset(geom: Union[Geometry, Dataset], precision: str = 'h') -> datetime.timedelta: """ Given a geometry or a Dataset compute offset to add to UTC timestamp to get solar day right. This only work when geometry is "local enough". :param geom: Geometry with defined CRS :param precision: one of ``'h'`` or ``'s'``, defaults to hour precision """ if isinstance(geom, Geometry): lon = mid_longitude(geom) else: _lon = _ds_mid_longitude(geom) if _lon is None: raise ValueError('Cannot compute solar offset, dataset is missing spatial info') lon = _lon if precision == 'h': return datetime.timedelta(hours=int(round(lon*24/360))) # 240 == (24*60*60)/360 (seconds of a day per degree of longitude) return datetime.timedelta(seconds=int(lon*240))