from __future__ import annotations
import logging
import math
from typing import Callable, List, Optional, Union
import fsspec
import numpy as np
import numpy.ma as ma
import pystac
from affine import Affine
from mapchete import Timer
from mapchete.io import copy, fiona_open, rasterio_open
from mapchete.io.raster import ReferencedRaster, read_raster, read_raster_window
from mapchete.path import MPath
from mapchete.protocols import GridProtocol
from mapchete.settings import IORetrySettings
from mapchete.types import Grid, NodataVal
from numpy.typing import DTypeLike
from pydantic import BaseModel
from rasterio.dtypes import dtype_ranges
from rasterio.enums import Resampling
from rasterio.features import rasterize
from rasterio.profiles import Profile
from rasterio.vrt import WarpedVRT
from retry import retry
from mapchete_eo.io.path import COMMON_RASTER_EXTENSIONS, asset_mpath, cached_path
from mapchete_eo.io.profiles import COGDeflateProfile
logger = logging.getLogger(__name__)
[docs]
class STACRasterBandProperties(BaseModel):
nodata: Optional[NodataVal] = None
data_type: Optional[str] = None
scale: float = 1.0
offset: float = 0.0
[docs]
@staticmethod
def from_asset(
asset: pystac.Asset,
nodataval: Optional[NodataVal] = None,
) -> STACRasterBandProperties:
if asset.extra_fields.get("raster:offset", {}):
properties = dict(
offset=asset.extra_fields.get("raster:offset"),
scale=asset.extra_fields.get("raster:scale"),
nodata=asset.extra_fields.get("nodata", nodataval),
)
else:
properties = asset.extra_fields.get("raster:bands", [{}])[0]
properties.update(
nodata=(
nodataval
if properties.get("nodata") is None
else properties.get("nodata")
),
)
return STACRasterBandProperties(
**properties,
)
[docs]
def asset_to_np_array(
item: pystac.Item,
asset: str,
indexes: Union[List[int], int] = 1,
grid: Optional[GridProtocol] = None,
resampling: Resampling = Resampling.nearest,
nodataval: NodataVal = None,
apply_offset: bool = True,
) -> ma.MaskedArray:
"""
Read grid window of STAC Items and merge into a 2D ma.MaskedArray.
This is the main read method which is one way or the other being called from everywhere
whenever a band is being read!
"""
# get path early to catch an eventual asset missing error early
path = asset_mpath(item, asset)
# find out asset details if raster:bands is available
band_properties = STACRasterBandProperties.from_asset(
item.assets[asset], nodataval=nodataval
)
logger.debug("reading asset %s and indexes %s ...", asset, indexes)
array = read_raster(
inp=path,
indexes=indexes,
grid=grid,
resampling=resampling.name,
dst_nodata=band_properties.nodata,
).array
if apply_offset and band_properties.offset:
logger.debug(
"apply offset %s and scale %s to asset %s",
band_properties.offset,
band_properties.scale,
asset,
)
data_type = band_properties.data_type or array.dtype
# determine value range for the target data_type
clip_min, clip_max = dtype_ranges[str(data_type)]
# increase minimum clip value to avoid collission with nodata value
if clip_min == band_properties.nodata:
clip_min += 1
array[~array.mask] = (
(
((array[~array.mask] * band_properties.scale) + band_properties.offset)
/ band_properties.scale
)
.round()
.clip(clip_min, clip_max)
.astype(data_type, copy=False)
.data
)
return array
[docs]
def get_assets(
item: pystac.Item,
assets: List[str],
dst_dir: MPath,
src_fs: fsspec.AbstractFileSystem = None,
overwrite: bool = False,
resolution: Union[None, float, int] = None,
convert_profile: Optional[Profile] = None,
item_href_in_dst_dir: bool = True,
ignore_if_exists: bool = False,
) -> pystac.Item:
"""
Copy or convert assets depending on settings.
Conversion is triggered if either resolution or convert_profile is provided.
"""
for asset in assets:
path = asset_mpath(item, asset, fs=src_fs)
# convert if possible
if should_be_converted(path, resolution=resolution, profile=convert_profile):
item = convert_asset(
item,
asset,
dst_dir,
src_fs=src_fs,
resolution=resolution,
overwrite=overwrite,
ignore_if_exists=ignore_if_exists,
profile=convert_profile or COGDeflateProfile(),
item_href_in_dst_dir=item_href_in_dst_dir,
)
continue
# copy
item = copy_asset(
item,
asset,
dst_dir,
overwrite=overwrite,
ignore_if_exists=ignore_if_exists,
item_href_in_dst_dir=item_href_in_dst_dir,
)
return item
[docs]
def copy_asset(
item: pystac.Item,
asset: str,
dst_dir: MPath,
src_fs: fsspec.AbstractFileSystem = None,
overwrite: bool = False,
item_href_in_dst_dir: bool = True,
ignore_if_exists: bool = False,
) -> pystac.Item:
"""Copy asset from one place to another."""
src_path = asset_mpath(item, asset, fs=src_fs)
output_path = dst_dir / src_path.name
# write relative path into asset.href if Item will be in the same directory
if item_href_in_dst_dir and not output_path.is_absolute(): # pragma: no cover
item.assets[asset].href = src_path.name
else:
item.assets[asset].href = str(output_path)
# TODO make this check optional
if output_path.exists():
if ignore_if_exists:
logger.debug("ignore existing asset %s", output_path)
return item
elif overwrite:
logger.debug("overwrite exsiting asset %s", output_path)
else:
raise IOError(f"{output_path} already exists")
else:
dst_dir.makedirs()
with Timer() as t:
logger.debug("copy asset %s to %s ...", src_path, dst_dir)
copy(
src_path,
output_path,
overwrite=overwrite,
)
logger.debug("copied asset '%s' in %s", asset, t)
return item
[docs]
def convert_asset(
item: pystac.Item,
asset: str,
dst_dir: MPath,
src_fs: fsspec.AbstractFileSystem = None,
overwrite: bool = False,
resolution: Union[None, float, int] = None,
profile: Optional[Profile] = None,
item_href_in_dst_dir: bool = True,
ignore_if_exists: bool = False,
) -> pystac.Item:
"""
Convert asset to a different format.
"""
src_path = asset_mpath(item, asset, fs=src_fs)
output_path = dst_dir / src_path.name
profile = profile or COGDeflateProfile()
# write relative path into asset.href if Item will be in the same directory
if item_href_in_dst_dir and not output_path.is_absolute(): # pragma: no cover
item.assets[asset].href = src_path.name
else:
item.assets[asset].href = str(output_path)
# TODO make this check optional
if output_path.exists():
if ignore_if_exists:
logger.debug("ignore existing asset %s", output_path)
return item
elif overwrite:
logger.debug("overwrite exsiting asset %s", output_path)
else:
raise IOError(f"{output_path} already exists")
else:
dst_dir.makedirs()
with Timer() as t:
convert_raster(src_path, output_path, resolution, profile)
logger.debug("converted asset '%s' in %s", asset, t)
return item
[docs]
@retry(logger=logger, **dict(IORetrySettings()))
def convert_raster(
src_path: MPath,
dst_path: MPath,
resolution: Union[None, float, int] = None,
profile: Optional[Profile] = None,
) -> None:
with rasterio_open(src_path, "r") as src:
meta = src.meta.copy()
if profile:
meta.update(**profile)
src_transform = src.transform
if resolution:
logger.debug(
"converting %s to %s using %sm resolution with profile %s ...",
src_path,
dst_path,
resolution,
profile,
)
src_res = src.transform[0]
dst_transform = Affine.from_gdal(
*(
src_transform[2],
resolution,
0.0,
src_transform[5],
0.0,
-resolution,
)
)
dst_width = int(math.ceil(src.width * (src_res / resolution)))
dst_height = int(math.ceil(src.height * (src_res / resolution)))
meta.update(
transform=dst_transform,
width=dst_width,
height=dst_height,
)
logger.debug("convert %s to %s with settings %s", src_path, dst_path, meta)
with rasterio_open(dst_path, "w", **meta) as dst:
with WarpedVRT(
src,
width=meta["width"],
height=meta["height"],
transform=meta["transform"],
) as warped:
dst.write(warped.read())
[docs]
@retry(logger=logger, **dict(IORetrySettings()))
def should_be_converted(
path: MPath,
resolution: Union[None, float, int] = None,
profile: Optional[Profile] = None,
) -> bool:
"""Decide whether a raster file should be converted or not."""
if path.endswith(tuple(COMMON_RASTER_EXTENSIONS)):
# see if it even pays off to convert based on resolution
if resolution is not None:
with rasterio_open(path) as src:
src_resolution = src.transform[0]
if src_resolution != resolution:
return True
# when profile is given, check if profile differs from remote file
elif profile is not None:
with rasterio_open(path) as src:
for key, value in profile.items():
if value == "COG":
# TODO check if file is really a valid cog
value = "GTiff"
if key in src.meta and src.meta[key] != value:
logger.debug(
"different value for %s required: %s should become %s",
key,
src.meta[key],
value,
)
return True
return False
return False
def _read_vector_mask(mask_path):
logger.debug("open %s with Fiona", mask_path)
with cached_path(mask_path) as cached:
try:
with fiona_open(cached) as src:
return list([dict(f) for f in src])
except ValueError as e:
# this happens if GML file is empty
for message in ["Null layer: ", "'hLayer' is NULL in 'OGR_L_GetName'"]:
if message in str(e):
return []
else: # pragma: no cover
raise
[docs]
@retry(logger=logger, **dict(IORetrySettings()))
def read_mask_as_raster(
path: MPath,
indexes: Optional[List[int]] = None,
dst_grid: Optional[GridProtocol] = None,
resampling: Resampling = Resampling.nearest,
rasterize_value_func: Callable = lambda feature: feature.get("id", 1),
rasterize_feature_filter: Callable = lambda feature: True,
dtype: Optional[DTypeLike] = None,
masked: bool = True,
cached_read: bool = False,
) -> ReferencedRaster:
"""
Read mask as array regardless of source data type (raster or vector).
"""
if dst_grid:
dst_grid = Grid.from_obj(dst_grid)
try:
with cached_path(path, active=cached_read) as path:
if path.suffix in COMMON_RASTER_EXTENSIONS:
if dst_grid:
array = read_raster_window(
path, grid=dst_grid, indexes=indexes, resampling=resampling
)
# sum up bands to 2D mask and keep dtype
array = array.sum(axis=0, dtype=array.dtype)
mask = ReferencedRaster(
data=array if masked else array.data,
transform=dst_grid.transform,
bounds=dst_grid.bounds,
crs=dst_grid.crs,
)
else:
with rasterio_open(path) as src:
mask = ReferencedRaster(
src.read(indexes, masked=masked).sum(
axis=0, dtype=src.dtypes[0]
),
transform=src.transform,
bounds=src.bounds,
crs=src.crs,
)
# make sure output has correct dtype
if dtype:
mask.data = mask.data.astype(dtype)
return mask
else:
if dst_grid:
features = [
feature
for feature in _read_vector_mask(path)
if rasterize_feature_filter(feature)
]
features_values = [
(feature["geometry"], rasterize_value_func(feature))
for feature in features
]
return ReferencedRaster(
data=(
rasterize(
features_values,
out_shape=dst_grid.shape,
transform=dst_grid.transform,
dtype=np.uint8,
).astype(dtype)
if features_values
else np.zeros(dst_grid.shape, dtype=dtype)
),
transform=dst_grid.transform,
crs=dst_grid.crs,
bounds=dst_grid.bounds,
)
else: # pragma: no cover
raise ValueError("out_shape and out_transform have to be provided.")
except Exception as exception: # pragma: no cover
# This is a hack because some tool using aiohttp does not raise a
# ClientResponseError directly but masks it as a generic Exception and thus
# preventing our retry mechanism to kick in.
if repr(exception).startswith('Exception("ClientResponseError'):
raise ConnectionError(repr(exception)).with_traceback(
exception.__traceback__
) from exception
raise