Source code for odc.geo.warp

# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Iterable, Optional, Tuple, Union

import numpy as np
import rasterio.warp
from affine import Affine

from .gcp import GCPGeoBox
from .geobox import GeoBox
from .types import wh_

# pylint: disable=invalid-name, too-many-arguments
Resampling = Union[str, int, rasterio.warp.Resampling]
Nodata = Optional[Union[int, float]]
_WRP_CRS = "epsg:3857"


def resampling_s2rio(name: str) -> rasterio.warp.Resampling:
    """
    Convert from string to rasterio.warp.Resampling enum, raises ValueError on bad input.
    """
    try:
        return getattr(rasterio.warp.Resampling, name.lower())
    except AttributeError:
        raise ValueError(f"Bad resampling parameter: {name}") from None


def is_resampling_nn(resampling: Resampling) -> bool:
    """
    :returns: True if resampling mode is nearest neighbour
    :returns: False otherwise
    """
    if isinstance(resampling, str):
        return resampling.lower() == "nearest"
    return resampling == rasterio.warp.Resampling.nearest


def warp_affine_rio(
    src: np.ndarray,
    dst: np.ndarray,
    A: Affine,
    resampling: Resampling,
    src_nodata: Nodata = None,
    dst_nodata: Nodata = None,
    **kwargs,
) -> np.ndarray:
    """
    Perform Affine warp using rasterio as backend library.

    :param        src: image as ndarray
    :param        dst: image as ndarray
    :param          A: Affine transform, maps from dst_coords to src_coords
    :param resampling: str|rasterio.warp.Resampling resampling strategy
    :param src_nodata: Value representing "no data" in the source image
    :param dst_nodata: Value to represent "no data" in the destination image

    :param     kwargs: any other args to pass to ``rasterio.warp.reproject``

    :returns: dst
    """
    assert src.ndim == dst.ndim
    assert src.ndim == 2
    sh, sw = src.shape
    dh, dw = dst.shape

    s_gbox = GeoBox(wh_(sw, sh), Affine.identity(), _WRP_CRS)
    d_gbox = GeoBox(wh_(dw, dh), A, _WRP_CRS)
    return _rio_reproject(
        src, dst, s_gbox, d_gbox, resampling, src_nodata, dst_nodata, **kwargs
    )


def warp_affine(
    src: np.ndarray,
    dst: np.ndarray,
    A: Affine,
    resampling: Resampling,
    src_nodata: Nodata = None,
    dst_nodata: Nodata = None,
    **kwargs,
) -> np.ndarray:
    """
    Perform Affine warp using best available backend (GDAL via rasterio is the only one so far).

    :param        src: image as ndarray
    :param        dst: image as ndarray
    :param          A: Affine transformm, maps from dst_coords to src_coords
    :param resampling: str resampling strategy
    :param src_nodata: Value representing "no data" in the source image
    :param dst_nodata: Value to represent "no data" in the destination image

    :param     kwargs: any other args to pass to implementation

    :returns: dst
    """
    return warp_affine_rio(
        src, dst, A, resampling, src_nodata=src_nodata, dst_nodata=dst_nodata, **kwargs
    )


[docs] def rio_reproject( src: np.ndarray, dst: np.ndarray, s_gbox: Union[GeoBox, GCPGeoBox], d_gbox: GeoBox, resampling: Resampling, src_nodata: Nodata = None, dst_nodata: Nodata = None, ydim: Optional[int] = None, **kwargs, ) -> np.ndarray: """ Perform reproject from ndarray->ndarray using rasterio as backend library. :param src: image as ndarray :param dst: image as ndarray :param s_gbox: GeoBox of source image :param d_gbox: GeoBox of destination image :param resampling: str|rasterio.warp.Resampling resampling strategy :param src_nodata: Value representing "no data" in the source image :param dst_nodata: Value to represent "no data" in the destination image :param ydim: Which dimension is y-axis, next one must be x :param kwargs: any other args to pass to ``rasterio.warp.reproject`` :returns: dst """ assert src.ndim == dst.ndim if dst_nodata is None: if dst.dtype.kind == "f": dst_nodata = np.nan if src.ndim == 2: return _rio_reproject( src, dst, s_gbox, d_gbox, resampling, src_nodata, dst_nodata, **kwargs ) if ydim is None: # Assume last two dimensions are Y/X ydim = src.ndim - 2 extra_dims = (*src.shape[:ydim], *src.shape[ydim + 2 :]) # Selects each 2d plane in [...]YX[B] array slices: Iterable[Any] = ( (*idx[:ydim], slice(None), slice(None), *idx[ydim:]) for idx in np.ndindex(*extra_dims) ) for roi in slices: _rio_reproject( src[roi], dst[roi], s_gbox, d_gbox, resampling, src_nodata, dst_nodata, **kwargs, ) return dst
def _rio_reproject( src: np.ndarray, dst: np.ndarray, s_gbox: Union[GCPGeoBox, GeoBox], d_gbox: GeoBox, resampling: Resampling, src_nodata: Nodata = None, dst_nodata: Nodata = None, **kwargs, ) -> np.ndarray: assert src.ndim == dst.ndim assert src.ndim == 2 if "XSCALE" not in kwargs and "YSCALE" not in kwargs: # Work around for issue in GDAL # https://github.com/OSGeo/gdal/issues/7750 # Force GDAL into performing bilinear,cubic,lanczos with raidius=1px. # GDAL is trying to be smart and pick sampling radius based on scale # change, but does it without consideration for rotational component # of the transform, leading to blury output in some situations # See also: # https://github.com/opendatacube/datacube-core/issues/1448 kwargs.update(XSCALE=1, YSCALE=1) dtype_remap = {"int8": "int16", "bool": "uint8"} def _alias_or_convert(arr: np.ndarray) -> Tuple[np.ndarray, bool]: if arr.dtype.name not in dtype_remap: return arr, False wk_dtype = dtype_remap[arr.dtype.name] if arr.dtype.name == "bool": F, T = (np.array(v, dtype=wk_dtype) for v in [0, 255]) return np.where(arr, T, F), True return arr.astype(wk_dtype), False if isinstance(resampling, str): resampling = resampling_s2rio(resampling) src_transform = None gcps = None if isinstance(s_gbox, GCPGeoBox): gcps = s_gbox.gcps() else: src_transform = s_gbox.transform # GDAL support for int8 is patchy, warp doesn't support it, so we need to convert to int16 src, src_is_bool = _alias_or_convert(src) _dst, _ = _alias_or_convert(dst) rasterio.warp.reproject( src, _dst, src_transform=src_transform, gcps=gcps, src_crs=str(s_gbox.crs), dst_transform=d_gbox.transform, dst_crs=str(d_gbox.crs), resampling=resampling, src_nodata=src_nodata, dst_nodata=dst_nodata, **kwargs, ) if dst is not _dst: # int8 workaround copy pixels back to int8 if src_is_bool: # undo [0, 1] to [0, 255] stretching of the src np.copyto(dst, _dst > 127, casting="unsafe") else: np.copyto(dst, _dst, casting="unsafe") return dst