from functools import cached_property
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Generator, List, Optional, Set, Type, Union
from pygeofilter.parsers.ecql import parse as parse_ecql
from pygeofilter.backends.native.evaluate import NativeEvaluator
from pydantic import BaseModel
from mapchete.path import MPath, MPathLike
from mapchete.types import Bounds
from pystac import Catalog, Item, CatalogType, Extent
from pystac.collection import Collection
from pystac.stac_io import DefaultStacIO
from pystac_client import CollectionClient
from pystac_client.stac_api_io import StacApiIO
from rasterio.profiles import Profile
from shapely.geometry.base import BaseGeometry
from mapchete_eo.io.assets import get_assets, get_metadata_assets
from mapchete_eo.product import blacklist_products
from mapchete_eo.settings import mapchete_eo_settings
from mapchete_eo.types import TimeRange
logger = logging.getLogger(__name__)
[docs]
class FSSpecStacIO(StacApiIO):
"""Custom class which allows I/O operations on object storage."""
[docs]
def read_text(self, source: MPathLike, *args, **kwargs) -> str:
return MPath.from_inp(source).read_text()
[docs]
def write_text(self, dest: MPathLike, txt: str, *args, **kwargs) -> None:
path = MPath.from_inp(dest)
if not path.parent.exists():
path.parent.makedirs(exist_ok=True)
with path.open("w") as dst:
return dst.write(txt)
# TODO: investigate in pystac why this has to be a staticmethod
[docs]
@staticmethod
def save_json(dest: MPathLike, json_dict: dict, *args, **kwargs) -> None:
path = MPath.from_inp(dest)
if not path.parent.exists():
path.parent.makedirs(exist_ok=True)
with path.open("w") as dst:
return dst.write(json.dumps(json_dict, indent=2))
[docs]
class CollectionSearcher(ABC):
"""
Bridge between a Source and a catalog implementation.
"""
config_cls: Type[BaseModel]
collection: str
stac_item_modifiers: Optional[List[Callable[[Item], Item]]] = None
blacklist: Set[str] = (
blacklist_products(mapchete_eo_settings.blacklist)
if mapchete_eo_settings.blacklist
else set()
)
def __init__(
self,
collection: str,
stac_item_modifiers: Optional[List[Callable[[Item], Item]]] = None,
):
self.collection = collection
self.stac_item_modifiers = stac_item_modifiers
[docs]
@abstractmethod
@cached_property
def client(self) -> CollectionClient: ...
[docs]
@abstractmethod
@cached_property
def eo_bands(self) -> List[str]: ...
@property
def config(self) -> BaseModel:
return self.config_cls()
[docs]
@cached_property
def id(self) -> str:
return self.client.id
[docs]
@cached_property
def description(self) -> str:
return self.client.description
[docs]
@cached_property
def stac_extensions(self) -> List[str]:
return self.client.stac_extensions
[docs]
@abstractmethod
def search(
self,
time: Optional[Union[TimeRange, List[TimeRange]]] = None,
bounds: Optional[Bounds] = None,
area: Optional[BaseGeometry] = None,
query: Optional[str] = None,
search_kwargs: Optional[Dict[str, Any]] = None,
) -> Generator[Item, None, None]: ...
[docs]
class StaticCollectionWriterMixin(CollectionSearcher):
# client: Client
# id: str
# description: str
# stac_extensions: List[str]
[docs]
def write_static_catalog(
self,
output_path: MPathLike,
bounds: Optional[Bounds] = None,
area: Optional[BaseGeometry] = None,
time: Optional[TimeRange] = None,
search_kwargs: Optional[Dict[str, Any]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
assets: Optional[List[str]] = None,
assets_dst_resolution: Union[None, float, int] = None,
assets_convert_profile: Optional[Profile] = None,
copy_metadata: bool = False,
metadata_parser_classes: Optional[tuple] = None,
overwrite: bool = False,
stac_io: DefaultStacIO = FSSpecStacIO(),
progress_callback: Optional[Callable] = None,
) -> MPath:
"""
Export a static STAC catalog from the search results.
Args:
output_path: Destination directory for the static catalog.
bounds: Spatial filter bounds.
area: Spatial filter geometry.
time: Temporal filter range.
search_kwargs: Additional search arguments.
name: Catalog name.
description: Catalog description.
assets: List of assets to download.
assets_dst_resolution: Sub-sampling resolution for assets.
assets_convert_profile: Output profile for assets (e.g. for COG conversion).
copy_metadata: Whether to copy sidecar metadata files.
metadata_parser_classes: Custom parser classes for metadata.
overwrite: Overwrite existing files.
stac_io: Custom STAC IO implementation.
progress_callback: Optional function for progress reporting.
Returns:
MPath: Path to the generated catalog.json.
"""
output_path = MPath.from_inp(output_path)
assets = assets or []
# initialize catalog
catalog_json = output_path / "catalog.json"
if catalog_json.exists():
logger.debug("open existing catalog %s", str(catalog_json))
catalog = Catalog.from_file(catalog_json)
# client = Client.from_file(catalog_json)
# existing_collection = client.get_collection(self.id)
else:
# existing_collections = []
catalog = Catalog(
name or f"{self.id}",
description or f"Static subset of {self.description}",
stac_extensions=self.stac_extensions,
href=str(catalog_json),
catalog_type=CatalogType.SELF_CONTAINED,
)
src_items = list(
self.search(
time=time, bounds=bounds, area=area, search_kwargs=search_kwargs
)
)
# collect all items and download assets if required
items: List[Item] = []
item_ids = set()
for n, item in enumerate(src_items, 1):
logger.debug("found item %s", item)
item = item.clone()
if assets:
logger.debug("get assets %s", assets)
item = get_assets(
item,
assets,
output_path / self.id / item.id,
resolution=assets_dst_resolution,
convert_profile=assets_convert_profile,
overwrite=overwrite,
ignore_if_exists=True,
)
if copy_metadata:
item = get_metadata_assets(
item,
output_path / self.id / item.id,
metadata_parser_classes=metadata_parser_classes,
resolution=assets_dst_resolution,
convert_profile=assets_convert_profile,
overwrite=overwrite,
)
# this has to be set to None, otherwise pystac will mess up the asset paths
# after normalizing
item.set_self_href(None)
items.append(item)
item_ids.add(item.id)
if progress_callback:
progress_callback(n=n, total=len(src_items))
# for existing_collection in existing_collections:
# if existing_collection.id == collection.id:
# logger.debug("try to find unregistered items in collection")
# collection_root_path = MPath.from_inp(
# existing_collection.get_self_href()
# ).parent
# for subpath in collection_root_path.ls():
# if subpath.is_directory():
# try:
# item = Item.from_file(
# subpath / subpath.with_suffix(".json").name
# )
# if item.id not in item_ids:
# logger.debug(
# "add existing item with id %s", item.id
# )
# items.append(item)
# item_ids.add(item.id)
# except FileNotFoundError:
# pass
# break
# create collection and copy metadata
logger.debug("create new collection")
out_collection = Collection(
id=self.id,
extent=Extent.from_items(items),
description=self.description,
title=self.client.title,
stac_extensions=self.stac_extensions,
license=self.client.license,
keywords=self.client.keywords,
providers=self.client.providers,
summaries=self.client.summaries,
extra_fields=self.client.extra_fields,
catalog_type=CatalogType.SELF_CONTAINED,
)
# finally, add all items to collection
for item in items:
out_collection.add_item(item)
out_collection.update_extent_from_items()
catalog.add_child(out_collection)
logger.debug("write catalog to %s", output_path)
catalog.normalize_hrefs(str(output_path))
catalog.make_all_asset_hrefs_relative()
catalog.save(dest_href=str(output_path), stac_io=stac_io)
return catalog_json
[docs]
def filter_items(
items: Generator[Item, None, None],
query: Optional[str] = None,
) -> Generator[Item, None, None]:
"""
Only for cloudcover now, this can and should be adapted for filter field and value
the field and value for the item filter would be defined in search.config.py corresponding configs
and passed down to the individual search approaches via said config and this Function.
"""
if query:
ast = parse_ecql(query)
evaluator = NativeEvaluator(use_getattr=False)
filter_func = evaluator.evaluate(ast)
for item in items:
# pystac items store metadata in 'properties'
if filter_func(item.properties):
yield item
else:
yield from items