Source code for mapchete_eo.cli.s2_cat_results

from datetime import datetime
from typing import Any, Literal, Optional

import click
import click_spinner

from shapely.geometry import mapping, MultiPolygon, Polygon, shape

from mapchete.cli.options import opt_bounds, opt_debug
from mapchete.io import fiona_open
from mapchete.path import MPath
from mapchete.types import Bounds

from mapchete_eo.cli import options_arguments
from mapchete_eo.io.products import Slice, products_to_slices
from mapchete_eo.platforms.sentinel2.product import S2Product
from mapchete_eo.platforms.sentinel2.source import Sentinel2Source
from mapchete_eo.sort import TargetDateSort
from mapchete_eo.types import TimeRange


@click.command()
@options_arguments.arg_dst_path
@options_arguments.opt_start_time
@options_arguments.opt_end_time
@opt_bounds
@options_arguments.opt_mgrs_tile
@options_arguments.opt_source
@click.option(
    "--format",
    type=click.Choice(["FlatGeobuf", "GeoJSON"]),
    help="Format of output file.",
)
@click.option("--by-slices", is_flag=True, help="Merge product to slices.")
@click.option(
    "--add-index", is_flag=True, help="Add unique indexes to products/slices."
)
@opt_debug
def s2_cat_results(
    dst_path: MPath,
    start_time: datetime,
    end_time: datetime,
    bounds: Optional[Bounds] = None,
    mgrs_tile: Optional[str] = None,
    source: Sentinel2Source = Sentinel2Source(collection="EarthSearch"),
    format: Literal["FlatGeobuf", "GeoJSON"] = "FlatGeobuf",
    by_slices: bool = False,
    add_index: bool = False,
    debug: bool = False,
):
    """Write a search result."""
    if any([start_time is None, end_time is None]):  # pragma: no cover
        raise click.ClickException("--start-time and --end-time are mandatory")
    if all([bounds is None, mgrs_tile is None]):  # pragma: no cover
        raise click.ClickException("--bounds or --mgrs-tile are required")
    slice_property_key = "s2:datastrip_id"
    with click_spinner.Spinner(disable=debug):
        catalog = source.get_catalog()
        slices = products_to_slices(
            [
                S2Product.from_stac_item(item)
                for item in catalog.search(
                    time=TimeRange(start=start_time, end=end_time),
                    bounds=bounds,
                    search_kwargs=dict(mgrs_tile=mgrs_tile),
                )
            ],
            group_by_property=slice_property_key if by_slices else None,
            sort=TargetDateSort(target_date=start_time),
        )
    if slices:
        schema = get_schema(by_slices=by_slices, add_index=add_index)
        with fiona_open(
            dst_path, mode="w", schema=schema, crs="EPSG:4326", format=format
        ) as dst:
            for index, _slice in enumerate(slices, start=1):
                # 2025-4 agreed to make outputs multipolygons
                # Convert the _slice.__geom_interface__ to Multipolygon if not the case

                # Ensure the result is always a MultiPolygon even if only single Polygon is returned
                # Else split features should come here as MultiPolygons
                slice_shape = shape(_slice.__geom_interface__)
                if isinstance(slice_shape, Polygon):
                    slice_multipolygon = mapping(MultiPolygon([slice_shape]))
                else:
                    slice_multipolygon = _slice.__geom_interface__

                out_feature = {
                    "geometry": slice_multipolygon,
                    "properties": {
                        key: get_value(_slice, key, index, slice_property_key)
                        for key in schema["properties"].keys()
                    },
                }
                dst.write(out_feature)
    else:
        click.echo("No results found.")


[docs] def get_schema( by_slices: bool, add_index: bool, geometry_type: str = "MultiPolygon" ) -> dict: if by_slices: properties = { "timestamp": "str", "slice_id": "str", } else: properties = { "eo:cloud_cover": "float", "timestamp": "str", "slice_id": "str", "product_id": "str", } if add_index: properties.update(index="int") return {"geometry": geometry_type, "properties": properties}
[docs] def get_value(_slice: Slice, key: str, index: int, slice_property_key: str) -> Any: if key == "index": return index elif key == "slice_id": return _slice.get_property(slice_property_key) elif key == "product_id": return _slice.products[0].item.id elif key == "timestamp": return _slice.datetime else: return _slice.get_property(key)