Source code for mapchete_eo.cli.s2_verify

import logging
from dataclasses import dataclass
from typing import List, Optional

import click
import numpy as np
import pystac
from mapchete.cli.options import opt_debug
from mapchete.io import copy
from mapchete.io.raster import read_raster_no_crs
from mapchete.path import MPath
from tqdm import tqdm

from mapchete_eo.array.color import outlier_pixels
from mapchete_eo.cli import options_arguments
from mapchete_eo.exceptions import AssetKeyError
from mapchete_eo.platforms.sentinel2.product import asset_mpath

logger = logging.getLogger(__name__)


[docs] @dataclass class Report: item: pystac.Item missing_asset_entries: List[str] missing_assets: List[MPath] color_artefacts: bool = False
[docs] def product_broken(self) -> bool: return any( [ bool(self.missing_asset_entries), bool(self.missing_assets), bool(self.color_artefacts), ] )
@click.command() @options_arguments.arg_stac_items @options_arguments.opt_assets @opt_debug def s2_verify( stac_items: List[MPath], assets: List[str] = [], asset_exists_check: bool = True, **_, ): """Verify Sentinel-2 products.""" assets = assets or [] for item_path in tqdm(stac_items): report = verify_item( pystac.Item.from_file(item_path), assets=assets, asset_exists_check=asset_exists_check, ) for asset in report.missing_asset_entries: tqdm.write(f"[ERROR] {report.item.id} has no asset named '{asset}") for path in report.missing_assets: tqdm.write( f"[ERROR] {report.item.id} asset '{asset}' with path {str(path)} does not exist" ) if report.color_artefacts: tqdm.write( f"[ERROR] {report.item.id} thumbnail ({report.item.assets['thumbnail'].href}) indicates that there are some color artefacts" )
[docs] def verify_item( item: pystac.Item, assets: List[str], asset_exists_check: bool = False, check_thumbnail: bool = True, thumbnail_dir: Optional[MPath] = None, ): missing_asset_entries = [] missing_assets = [] color_artefacts = False for asset in assets: logger.debug("verify asset %s is available", asset) if asset not in item.assets: missing_asset_entries.append(asset) if asset_exists_check: try: path = asset_mpath(item=item, asset=asset) logger.debug("check if asset %s (%s) exists", asset, str(path)) if not path.exists(): missing_assets.append(path) except AssetKeyError: missing_asset_entries.append(asset) if check_thumbnail: thumbnail_href = MPath.from_inp(item.assets["thumbnail"].href) logger.debug("check thumbnail %s for artefacts ...", thumbnail_href) if thumbnail_dir: thumbnail_path = thumbnail_dir / item.id + ".jpg" copy(thumbnail_href, thumbnail_path) else: thumbnail_path = thumbnail_href color_artefacts = outlier_pixels_detected(read_raster_no_crs(thumbnail_href)) return Report( item, missing_asset_entries=missing_asset_entries, missing_assets=missing_assets, color_artefacts=color_artefacts, )
[docs] def outlier_pixels_detected( arr: np.ndarray, axis: int = 0, range_threshold: int = 100, allowed_error_percentage: float = 1, ) -> bool: """ Checks whether number of outlier pixels is larger than allowed. An outlier pixel is a pixel, where the value range between bands exceeds the range_threshold. """ _, width, height = arr.shape pixels = width * height outliers = outlier_pixels(arr, axis=axis, range_threshold=range_threshold).sum() outlier_percent = outliers / pixels * 100 logger.debug( "%s (%s %%) suspicious pixels detected", outliers, round(outlier_percent, 2), ) return outlier_percent > allowed_error_percentage