# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import warnings
from collections.abc import Hashable
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Protocol,
Tuple,
Union,
overload,
)
import cachetools
import numpy
from pyproj.crs import CRS as _CRS
from pyproj.enums import WktVersion
from pyproj.exceptions import CRSError
from pyproj.transformer import Transformer
from .types import XY, Unset
EPSG_UNSET = 0
class CRSLike(Protocol):
"""CRS Like object."""
def to_wkt(self, *args, **kw) -> str: ...
_crs_cache: Dict[Hashable, Tuple[_CRS, str, Optional[int]]] = {}
def _make_crs_key(crs_spec: Union[int, str, Hashable, CRSLike]) -> Hashable:
if isinstance(crs_spec, str):
normed_epsg = crs_spec.upper()
if normed_epsg.startswith("EPSG:"):
return normed_epsg
return crs_spec
if isinstance(crs_spec, int):
return f"EPSG:{crs_spec}"
if isinstance(crs_spec, Hashable):
return crs_spec
return crs_spec.to_wkt()
@cachetools.cached(_crs_cache, key=_make_crs_key)
def _make_crs(
crs_spec: Union[str, int, _CRS, CRSLike],
) -> Tuple[_CRS, str, Optional[int]]:
epsg = EPSG_UNSET
if isinstance(crs_spec, str):
crs = _CRS.from_user_input(crs_spec)
elif isinstance(crs_spec, int):
epsg = crs_spec
crs = _CRS.from_epsg(crs_spec)
elif isinstance(crs_spec, _CRS):
crs = crs_spec
else:
crs = _CRS.from_wkt(crs_spec.to_wkt())
crs_str = str(crs)
crs_str_u = crs_str.upper()
if crs_str_u.startswith("EPSG:"):
crs_str = crs_str_u
epsg = int(crs_str.split(":", 1)[1])
return (crs, crs_str, epsg)
def _make_crs_transform_key(from_crs, to_crs, always_xy):
return (id(from_crs), id(to_crs), always_xy)
@cachetools.cached({}, key=_make_crs_transform_key)
def _make_crs_transform(from_crs: _CRS, to_crs: _CRS, always_xy: bool) -> Transformer:
return Transformer.from_crs(from_crs, to_crs, always_xy=always_xy)
[docs]
class CRS:
"""
Wrapper around :py:class:`pyproj.crs.CRS` for backwards compatibility.
"""
DEFAULT_WKT_VERSION = WktVersion.WKT2_2019
"""Default version for WKT: WKT2_2019"""
__slots__ = ("_crs", "_epsg", "_str")
[docs]
def __init__(self, crs_spec: Any) -> None:
"""
Construct CRS object from *something*.
:param crs_spec:
String representation of a CRS, often an EPSG code like ``'EPSG:4326'``. Can also be any
object that implements ``.to_epsg()`` or ``.to_wkt()``.
:raises: :py:class:`pyproj.exceptions.CRSError`
"""
if isinstance(crs_spec, (str, int, _CRS)):
self._crs, self._str, self._epsg = _make_crs(crs_spec)
elif isinstance(crs_spec, CRS):
self._crs = crs_spec._crs
self._epsg = crs_spec._epsg
self._str = crs_spec._str
elif isinstance(crs_spec, dict):
self._crs, self._str, self._epsg = _make_crs(_CRS.from_dict(crs_spec))
elif hasattr(crs_spec, "to_wkt"):
self._crs, self._str, self._epsg = _make_crs(crs_spec)
else:
raise CRSError(f"Unexpected input encountered: {crs_spec}")
def __getstate__(self):
return {"crs_str": self._str}
def __setstate__(self, state) -> None:
self._crs, self._str, self._epsg = _make_crs(state["crs_str"])
[docs]
def to_wkt(self, pretty: bool = False, version: Optional[WktVersion] = None) -> str:
"""
Generate WKT representation of this CRS.
:param pretty: If ``True`` generate multi-line WKT.
:param version: Specify WKT version.
"""
if version is None:
version = self.DEFAULT_WKT_VERSION
return self._crs.to_wkt(pretty=pretty, version=version)
@property
def wkt(self) -> str:
"""WKT representation of this CRS."""
return self.to_wkt()
[docs]
def to_epsg(self) -> Optional[int]:
"""
EPSG Code of the CRS or ``None``.
"""
if self._epsg == EPSG_UNSET:
self._epsg = self._crs.to_epsg()
return self._epsg
@property
def epsg(self) -> Optional[int]:
"""
EPSG Code of the CRS or ``None``.
"""
return self.to_epsg()
@property
def semi_major_axis(self):
"""Semi-major axis of the ellipsoid."""
return self._crs.ellipsoid.semi_major_metre
@property
def semi_minor_axis(self):
"""Semi-minor axis of the ellipsoid."""
return self._crs.ellipsoid.semi_minor_metre
@property
def inverse_flattening(self):
"""Inverse flattening of the ellipsoid."""
return self._crs.ellipsoid.inverse_flattening
@property
def geographic(self) -> bool:
"""True if CRS is geographic."""
return self._crs.is_geographic
@property
def projected(self) -> bool:
"""True if CRS is projected."""
return self._crs.is_projected
@property
def dimensions(self) -> Tuple[str, str]:
"""
List of dimension names of the CRS.
The ordering of the names is intended to reflect the :py:class:`numpy.ndarray` axis order of the
loaded raster.
"""
if self.geographic:
return "latitude", "longitude"
if self.projected:
return "y", "x"
raise ValueError("Neither projected nor geographic") # pragma: no cover
@property
def units(self) -> Tuple[str, str]:
"""
List of dimension units of the CRS.
The ordering of the units is intended to reflect the :py:class:`numpy.ndarray` axis order of the
loaded raster.
"""
if self.geographic:
return "degrees_north", "degrees_east"
if self.projected:
_dir_renames = {"north": "y", "south": "y", "east": "x", "west": "x"}
units = {
_dir_renames.get(ax.direction, ax.direction): ax.unit_name
for ax in self._crs.axis_info
}
return units.get("y", ""), units.get("x", "")
raise ValueError("Neither projected nor geographic") # pragma: no cover
@property
def authority(self) -> Tuple[str, Union[str, int]]:
"""
Get ``(authority_name, code)`` tuple.
:returns: ``("", "")`` when not available
"""
if self._epsg:
return ("EPSG", self._epsg)
if (r := self._crs.to_authority()) is not None:
name, code = r
try:
return (name, int(code))
except ValueError: # pragma: nocover
return (name, code)
return ("", "")
def __str__(self) -> str:
return self._str
def __hash__(self) -> int:
return hash(self._crs)
def __repr__(self) -> str:
return f"CRS('{self._str}')"
def __eq__(self, other) -> bool:
if not isinstance(other, CRS):
try:
other = CRS(other)
except Exception: # pylint: disable=broad-except
return False
if self._crs is other._crs:
return True
if self._epsg and other._epsg:
return self._epsg == other._epsg
if self._str == other._str:
return True
return self._crs == other._crs
def __ne__(self, other) -> bool:
return not self == other
@property
def proj(self) -> _CRS:
"""Access :py:class:`pyproj.crs.CRS` object that this wraps."""
return self._crs
@property
def valid_region(self) -> Optional["geom.Geometry"]:
"""
Return valid region of this CRS.
:returns: Bounding box in Lon/Lat as a 4 point Polygon in EPSG:4326.
:returns: ``None`` if valid region is not defined for this CRS
"""
from . import geom # pylint: disable=import-outside-toplevel
aou = self._crs.area_of_use
if aou is None:
return None
return geom.box(aou.west, aou.south, aou.east, aou.north, "EPSG:4326")
@property
def crs_str(self) -> str:
"""DEPRECATED"""
warnings.warn(
"Please use `str(crs)` instead of `crs.crs_str`",
category=DeprecationWarning,
)
return self._str
def transformer(self, other: "SomeCRS", always_xy: bool = True) -> Transformer:
"""Construct :py:class:`pyproj.Transformer`.
Direction is from ``self`` to ``other``.
:param other:
Destination CRS
:param always_xy:
If true, the transform method will accept as input and return as output coordinates
using the traditional GIS order, that is longitude, latitude for geographic CRS and
easting, northing for most projected CRS.
"""
return _make_crs_transform(self.proj, norm_crs(other).proj, always_xy=always_xy)
def __dask_tokenize__(self):
return ("odc.geo.crs.CRS", str(self))
[docs]
@staticmethod
def utm(
x: Union[float, int, XY[float], "geom.Geometry", "geom.BoundingBox"],
y: Optional[float] = None,
/,
datum_name: str = "WGS 84",
) -> "CRS":
"""
Construct appropriate UTM CRS for a given point.
Uses CRS database query methods from :py:mod:`pyproj` to locate
appropriate UTM CRS.
:params datum_name: The name of the datum in the CRS name ('NAD27', 'NAD83', 'WGS 84', ...)
"""
# pylint: disable=import-outside-toplevel,no-name-in-module
from pyproj.database import query_utm_crs_info
from . import geom
if isinstance(x, geom.BoundingBox):
_bbox = x
elif isinstance(x, geom.Geometry):
if x.crs is not None:
_bbox = x.to_crs("epsg:4326").boundingbox
else:
# assume already in lon/lat
_bbox = x.boundingbox
elif isinstance(x, (float, int)):
if y is None:
y = 0.0
_bbox = geom.BoundingBox(x, y, x, y)
else:
x, y = x.xy
_bbox = geom.BoundingBox(x, y, x, y)
return _pick_best_crs(
_bbox.polygon,
[
CRS(f"{info.auth_name}:{info.code}")
for info in query_utm_crs_info(
datum_name=datum_name,
area_of_interest=_bbox.aoi,
)
],
)
[docs]
class CRSMismatchError(ValueError):
"""
CRS Mismatch Error.
Raised when geometry operation is attempted on geometries in different
coordinate references.
"""
SomeCRS = Union[str, int, CRS, _CRS, Dict[str, Any]]
MaybeCRS = Union[SomeCRS, Unset, None]
# fmt: off
@overload
def norm_crs(crs: SomeCRS) -> CRS: ...
@overload
def norm_crs(crs: SomeCRS, ctx: Any) -> CRS: ...
@overload
def norm_crs(crs: Union[None, Unset]) -> None: ...
@overload
def norm_crs(crs: Union[None, Unset], ctx: Any) -> None: ...
# fmt: on
[docs]
def norm_crs(crs: MaybeCRS, ctx=None) -> Optional[CRS]:
"""Normalise CRS representation."""
if isinstance(crs, CRS):
return crs
if crs is None or isinstance(crs, Unset):
return None
if isinstance(crs, str):
_txt = crs.lower()
if _txt.startswith("utm"):
assert ctx is not None
utm_crs = CRS.utm(ctx)
if _txt == "utm":
return utm_crs
utm_zone = utm_crs.proj.utm_zone
epsg = utm_crs.epsg
assert utm_zone is not None
assert epsg is not None
if _txt == "utm-n" and utm_zone.endswith("S"):
utm_crs = CRS(epsg - 100)
elif _txt == "utm-s" and utm_zone.endswith("N"):
utm_crs = CRS(epsg + 100)
return utm_crs
return CRS(crs)
[docs]
def norm_crs_or_error(crs: MaybeCRS, ctx=None) -> CRS:
"""Normalise CRS representation, raise error if input is ``None``."""
_crs = norm_crs(crs, ctx=ctx)
if _crs is None:
raise ValueError("Expect valid CRS")
return _crs
[docs]
def crs_units_per_degree(
crs: SomeCRS,
lon: Union[float, Tuple[float, float]],
lat: float = 0,
step: float = 0.1,
) -> float:
"""
Helper method for converting resolution between meters/degrees.
Compute number of CRS units per degree for a projected CRS at a given location
in lon/lat.
Location can be supplied as a tuple or as two arguments.
:param crs: CRS
:param lon: Either longitude or ``(lon, lat)`` tuple
:param lat: Latitude or ignored if ``lon`` was a tuple
:param step: Length of the segment in degrees used to estimate relative scale change
:returns: A floating number ``S`` such that ``S*degrees -> meters``
"""
from . import geom # pylint: disable=import-outside-toplevel
if isinstance(lon, tuple):
lon, lat = lon
lon2 = lon + step
if lon2 > 180:
lon2 = lon - step
ll = geom.line([(lon, lat), (lon2, lat)], "EPSG:4326")
xy = ll.to_crs(crs)
return xy.length / step
def _pick_best_crs(poly: "geom.Geometry", crs_candidates: List[CRS]) -> CRS:
# pylint: disable=import-outside-toplevel
from . import geom
def overlap_pct(crs: CRS) -> float:
crs_region = crs.valid_region
if crs_region is None:
return 1 # pragma: nocover
return (crs_region & poly).area / poly.area
if len(crs_candidates) < 1:
raise ValueError("No candidate CRSs found")
if len(crs_candidates) > 1 and poly.area > 1e-9:
if poly.crs is None:
poly = geom.Geometry(poly.geom, "epsg:4326")
crs_candidates = sorted(crs_candidates, key=overlap_pct, reverse=True)
return crs_candidates[0]
if TYPE_CHECKING:
from . import geom # pragma: no cover