import hashlib
import itertools
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union
import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
import rasterio.windows
import requests
from shapely import box
from shapely.geometry import Point
import skmap.set_env # noqa: F401
import skmap_bindings as sb
from skmap import parallel
from skmap.catalog import DataCatalog, run_whales
from skmap.misc import ttprint
class _ParallelOverlay:
# sampling only first band in every layer
TILE_PLACEHOLDER = "{tile_id}"
def __init__(
self,
points_x: np.ndarray | Sequence[float],
points_y: np.ndarray | Sequence[float],
raster_files: Sequence[str | Path],
points_crs: str | None,
raster_tiles: gpd.GeoDataFrame | str | None = None,
tile_id_col: str = "tile_id",
n_threads: int = parallel.CPU_COUNT,
verbose: bool = True,
) -> None:
self.verbose: bool = verbose
self.default_tile_id: str = ""
self.tile_id_col: str = tile_id_col
if raster_tiles is not None:
if not isinstance(raster_tiles, gpd.GeoDataFrame):
if self.verbose:
ttprint(f"Reading {raster_tiles}")
raster_tiles: gpd.GeoDataFrame = gpd.read_file(raster_tiles)
self.default_tile_id: str = raster_tiles[self.tile_id_col].iloc[0]
self.raster_tiles: gpd.GeoDataFrame = raster_tiles
if points_crs is None:
points_crs = rasterio.open(
str(raster_files[0]).replace(
_ParallelOverlay.TILE_PLACEHOLDER, self.default_tile_id
)
).crs
samples: gpd.GeoDataFrame = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(points_x, points_y), crs=points_crs
)
samples.reset_index(drop=True, inplace=True)
self.raster_files = raster_files
self.layers: pd.DataFrame = (
pd.DataFrame(
{
"name": [
str(Path(raster_file).with_suffix("").name)
for raster_file in raster_files
],
"path": self.raster_files,
}
)
.reset_index(drop=True)
.apply(
_ParallelOverlay._layer_metadata,
default_tile_id=self.default_tile_id,
axis=1,
)
)
self.query_pixels: dict[str, gpd.GeoDataFrame] = self._find_blocks(samples)
@staticmethod
def _layer_metadata(row, default_tile_id):
path = row["path"]
if _ParallelOverlay._is_tiled(path):
path = path.replace(_ParallelOverlay.TILE_PLACEHOLDER, default_tile_id)
src = rasterio.open(path)
row["nodata"] = src.nodata
_, window = next(src.block_windows(1))
row["block_height"] = window.height
row["block_width"] = window.width
key = "".join(
[
str(default_tile_id),
str(src.height),
str(src.width),
str(src.block_shapes[0]),
str(src.transform.to_gdal()),
]
)
row["group"] = str(hashlib.md5(key.encode("utf-8")).hexdigest())
return row
@staticmethod
def _is_tiled(path: Path | str) -> bool:
return _ParallelOverlay.TILE_PLACEHOLDER in str(path)
@staticmethod
def _tiled_blocks(tile_path, tile_id):
tile_blocks = []
with rasterio.Env(
CPL_VSIL_CURL_ALLOWED_EXTENSIONS=".tif", GDAL_HTTP_VERSION="1.1"
):
src = rasterio.open(tile_path)
for (i, j), window in src.block_windows(1):
tile_blocks.append(
{
"tile_id": tile_id,
"window": window,
"geometry": box(
*rasterio.windows.bounds(window, src.transform)
),
}
)
return tile_blocks
def _find_blocks_for_src(
self, path: str | Path, samples: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
gdf_blocks = []
if _ParallelOverlay._is_tiled(path):
samp_tiles = self.raster_tiles.sjoin(
samples.to_crs(self.raster_tiles.crs), how="inner"
)[self.tile_id_col].unique()
raster_tiles = self.raster_tiles[
self.raster_tiles[self.tile_id_col].isin(samp_tiles)
]
if self.verbose:
ttprint(
f"Retrieving block information for {raster_tiles.shape[0]} tiles."
)
for _, tile in raster_tiles.iterrows():
tile_id = tile[self.tile_id_col]
tile_path = str(path).replace(
_ParallelOverlay.TILE_PLACEHOLDER, tile_id
)
src: rasterio.io.DatasetReader = rasterio.open(tile_path)
# read the raster's specific blocks and insert their dimensions into gdf
# see https://rasterio.readthedocs.io/en/latest/topics/windowed-rw.html#blocks
for (i, j), window in src.block_windows(1):
gdf_blocks.append(
{
"tile_id": tile_id,
"window": window,
"inv_transform": ~rasterio.windows.transform(
window, src.transform
),
"geometry": box(
*rasterio.windows.bounds(window, src.transform)
),
}
)
else:
src: rasterio.io.DatasetReader = rasterio.open(path)
for (i, j), window in src.block_windows(1):
gdf_blocks.append(
{
"window": window,
"inv_transform": ~rasterio.windows.transform(
window, src.transform
),
"geometry": box(
*rasterio.windows.bounds(window, src.transform)
),
}
)
# convert list-of-dicts to gdf
gdf_blocks = gpd.GeoDataFrame(gdf_blocks, crs=src.crs)
gdf_blocks.reset_index(drop=True, inplace=True)
src.close()
assert gdf_blocks.crs is not None, f"The layer {path} has not crs, need fix"
# join with samples
gdf_blocks = (
samples.to_crs(gdf_blocks.crs)
.sjoin(gdf_blocks, how="inner")
.rename(columns={"index_right": "block_id"})
)
query_pixels: Sequence[gpd.GeoDataFrame] = []
for ij, block in gdf_blocks.groupby("block_id"):
inv_block_transform = block["inv_transform"].iloc[0]
window = block["window"].iloc[0]
block["x"] = block.geometry.x
block["y"] = block.geometry.y
block["block_col_off"] = window.col_off
block["block_row_off"] = window.row_off
block["block_width"] = window.width
block["block_height"] = window.height
block.loc[:, "sample_col"], block.loc[:, "sample_row"] = (
inv_block_transform * (block["x"], block["y"])
)
block["sample_col"] = block["sample_col"].astype("int")
block["sample_row"] = block["sample_row"].astype("int")
block = block.drop(columns="geometry")
query_pixels.append(block)
assert query_pixels, f"query_pixels is empty for path: {path}"
res: gpd.GeoDataFrame = pd.concat(query_pixels).drop(
columns=["window", "inv_transform"]
)
if len(set(res.index)) < int(samples.shape[0] * 0.5):
print(
f"Less then 50% of points queryed for path: {path}, this is suspicious"
)
return res
def _find_blocks(self, samples: gpd.GeoDataFrame) -> dict[str, gpd.GeoDataFrame]:
if self.verbose:
ttprint(f"Scanning blocks of {len(self.raster_files)} layers")
query_pixels = {}
for _, layers in self.layers.groupby("group"):
group = layers.iloc[0]["group"]
path = layers.iloc[0]["path"]
if self.verbose:
ttprint(f"Finding query pixels for {group} ({layers.shape[0]} layers)")
query_pixels[group] = self._find_blocks_for_src(path, samples)
if self.verbose:
ttprint("End")
return query_pixels
[docs]
class SpaceOverlay:
"""
Overlay a set of points over multiple raster files.
The retrieved pixel values are organized in columns
according to the filenames.
:param points: The path for vector file or ``geopandas.GeoDataFrame`` with
the points.
:param catalog: scikit-map data catalog.
:param n_threads: Number of CPU cores to be used in parallel. By default all cores
are used.
:param verbose: Use ``True`` to print the overlay progress.
"""
def __init__(
self,
points: gpd.GeoDataFrame | str | pd.DataFrame,
catalog: DataCatalog,
raster_tiles: Optional[gpd.GeoDataFrame | str] = None,
tile_id_col: Union[str] = "tile_id",
n_threads: int = parallel.CPU_COUNT,
verbose: bool = True,
) -> None:
self.verbose: bool = verbose
self.catalog: DataCatalog = catalog
self.layer_paths, self.layer_idxs, self.layer_names = self.catalog.get_paths()
if raster_tiles is not None:
if not isinstance(raster_tiles, gpd.GeoDataFrame):
if self.verbose:
ttprint(f"Reading {raster_tiles}")
raster_tiles = gpd.read_file(raster_tiles)
processed_urls = []
for url in self.layer_paths:
url = url.replace("/vsicurl/", "")
if "{tile_id}" in url:
url = url.replace("{tile_id}", raster_tiles[tile_id_col].iloc[0])
processed_urls.append(url)
urls_with_404 = []
for url in processed_urls:
try:
response = requests.head(url, timeout=10)
if response.status_code == 404:
urls_with_404.append(url)
except requests.RequestException as e:
print(f"Error checking URL {url}: {e}")
if self.verbose:
ttprint(
f"{len(urls_with_404)} out of {len(self.layer_paths)} URLs returning 404"
)
for url in urls_with_404:
print(url)
assert len(urls_with_404) == 0, "The listed URLs can not be reached"
if not isinstance(points, gpd.GeoDataFrame):
if not isinstance(points, pd.DataFrame):
points = pd.read_parquet(points)
points["geometry"] = points.apply(
lambda row: Point(row["lon"], row["lat"]), axis=1
)
points = gpd.GeoDataFrame(points, geometry="geometry")
self.pts = points.reset_index(drop=True)
self.n_threads = n_threads
self.parallelOverlay = _ParallelOverlay(
self.pts.geometry.x.values,
self.pts.geometry.y.values,
self.layer_paths,
points_crs=self.pts.crs,
raster_tiles=raster_tiles,
tile_id_col=tile_id_col,
n_threads=self.n_threads,
verbose=self.verbose,
)
# Drop duplicates (from overlapping tiles), find union of missing points (out of extent) to drop them and sorting the dataframes
keys = list(self.parallelOverlay.query_pixels.keys())
missing_indices = []
for key in keys:
self.parallelOverlay.query_pixels[key] = self.parallelOverlay.query_pixels[
key
][~self.parallelOverlay.query_pixels[key].index.duplicated(keep="first")]
missing_indices_key_set = set(self.pts.index) - set(
self.parallelOverlay.query_pixels[key].index
)
missing_indices += list(missing_indices_key_set)
for key in keys:
key_indices_to_drop = set(missing_indices) & set(
self.parallelOverlay.query_pixels[key].index
)
v = self.parallelOverlay.query_pixels[key]
v.drop(index=key_indices_to_drop, inplace=True)
v.sort_index(axis=0, ascending=True, inplace=True)
self.parallelOverlay.query_pixels[key] = v
print(
f"Dropping {len(list(set(missing_indices)))} points out of {self.pts.shape[0]} because out of extent"
)
self.pts = self.pts.drop(index=set(missing_indices)).sort_index(
axis=0, ascending=True
)
[docs]
def run(
self,
max_ram_mb: int,
out_file_name: Path | None,
gdal_opts: Dict[str, str] = {
"GDAL_HTTP_VERSION": "1.0",
"CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif",
},
) -> pd.DataFrame:
"""
Execute the space overlay.
:param gdal_opts: Options for GDAL in a dictionary.
:param max_ram_mb: Maximum RAM that can be used for the overlay expressed in MB.
:returns: Data frame with the original columns plus the overlay result (one new
column per raster).
:rtype: geopandas.GeoDataFrame
"""
feats_names, _, feats_idx = self.catalog.get_unrolled_catalog()
# FIXME: this should be in a unit test
assert (self.catalog.data_size == len(self.catalog.get_feature_names())) & (
self.catalog.data_size == len(feats_names)
), (
"Catalog data size should coincide with the number of features, something went wrong"
)
self.ordered_feats_names: List[str] = [
s for _, s in sorted(zip(feats_idx, feats_names))
]
self.data_overlay = self.read_data(gdal_opts, max_ram_mb)
self.data_array = np.empty(
(self.catalog.data_size, self.data_overlay.shape[1]), dtype=np.float32
)
assert self.pts.shape[0] == self.data_overlay.shape[1], (
"Not matching size between input points and the overalied data"
)
self.data_array[self.layer_idxs, :] = self.data_overlay[:, :]
self.pts = self.pts.reset_index(drop=True)
self.pts["lon"] = self.pts["geometry"].x
self.pts["lat"] = self.pts["geometry"].y
run_whales(
self.catalog,
self.data_array,
self.n_threads,
lat_info=self.pts["lat"].to_numpy(),
)
# @FIXME check that all the filled flags are True or assert at this point
df = pd.DataFrame(self.data_array.T, columns=pd.Index(self.ordered_feats_names))
if "lat" in df:
self.pts = self.pts.drop(columns=["lat"])
self.pts_out = pd.concat([self.pts, df.reset_index(drop=True)], axis=1)
self.pts_out = self.pts_out.drop(columns=["geometry"])
if out_file_name is not None:
self.pts_out.to_parquet(out_file_name)
return self.pts_out
[docs]
def read_data(self, gdal_opts: dict[str, str], max_ram_mb: int) -> np.ndarray:
"""read data in parallel across blocks, normally called by `self.run`
:param gdal_opts: additional options to pass to GDAL
:type gdal_opts: dict[str, str]
:param max_ram_mb: max RAM to use in MB
:type max_ram_mb: int
:return: return array ?with overlay applied?
:rtype: np.ndarray
"""
layers = self.parallelOverlay.layers
layers["layer_id"] = layers.index
query_pixels = self.parallelOverlay.query_pixels
keys = list(query_pixels.keys())
data_overlay = np.empty(
(layers.shape[0], query_pixels[keys[0]].shape[0]), dtype=np.float32
)
sb.fillArray(data_overlay, self.n_threads, np.nan)
for i, key in enumerate(keys):
key_layer_ids = [
int(l) for l in list(layers[layers["group"] == key]["layer_id"])
]
key_query_pixels = query_pixels[key]
assert key_query_pixels.shape[0] == query_pixels[keys[0]].shape[0], (
"Query pixel size is inconsistent between keys, something went wrong"
)
unique_blocks = key_query_pixels[
[
"block_id",
"block_col_off",
"block_row_off",
"block_height",
"block_width",
]
].drop_duplicates("block_id")
unique_blocks_ids = unique_blocks["block_id"].tolist()
unique_blocks_dict = unique_blocks.set_index("block_id")[
["block_row_off", "block_col_off", "block_height", "block_width"]
].to_dict("index")
layer_path_dict = layers.set_index("layer_id")["path"].to_dict()
if self.verbose:
ttprint(
f"Loading and sampling {len(set(key_layer_ids))} raster layers for group {key}"
)
# layer_nodata_dict = layers.set_index("layer_id")["nodata"].to_dict()
key_layer_ids_comb, unique_blocks_ids_comb = map(
list, zip(*itertools.product(key_layer_ids, unique_blocks_ids))
)
block_row_off_comb = [
unique_blocks_dict[ubid]["block_row_off"]
for ubid in unique_blocks_ids_comb
]
block_col_off_comb = [
unique_blocks_dict[ubid]["block_col_off"]
for ubid in unique_blocks_ids_comb
]
block_height_comb = [
unique_blocks_dict[ubid]["block_height"]
for ubid in unique_blocks_ids_comb
]
block_width_comb = [
unique_blocks_dict[ubid]["block_width"]
for ubid in unique_blocks_ids_comb
]
key_layer_paths_comb = [
layer_path_dict[ulid] for ulid in key_layer_ids_comb
]
# key_layer_nodatas_comb = [
# np.nan if layer_nodata_dict[ulid] is None else layer_nodata_dict[ulid]
# for ulid in key_layer_ids_comb
# ]
n_comb = len(key_layer_paths_comb)
bands_list = [1]
if "tile_id" in query_pixels[key].columns:
block_tile_id_dict = key_query_pixels.set_index("block_id")[
"tile_id"
].to_dict()
key_layer_paths_comb = [
key_layer_paths_comb[j].format(
tile_id=block_tile_id_dict[unique_blocks_ids_comb[j]]
)
for j in range(len(key_layer_paths_comb))
]
block_height = np.max(block_height_comb)
block_width = np.max(block_width_comb)
n_block_pix = block_height * block_width
# Factor 2 is heuristic to keep margin for temporary variables
max_comb_chunk = int(
np.ceil(
(max_ram_mb - data_overlay.size * 4 / 1024 / 1024)
* 1024
* 1024
/ 4
/ n_block_pix
/ 2
)
)
assert max_comb_chunk > 1, (
"skmap-error 42: max_ram_mb too small, can not chunk"
)
for chunk_start in range(0, n_comb, max_comb_chunk):
key_layer_paths_chunk = key_layer_paths_comb[
chunk_start : chunk_start + max_comb_chunk
]
block_col_off_chunk = block_col_off_comb[
chunk_start : chunk_start + max_comb_chunk
]
block_row_off_chunk = block_row_off_comb[
chunk_start : chunk_start + max_comb_chunk
]
block_height_chunk = block_height_comb[
chunk_start : chunk_start + max_comb_chunk
]
block_width_chunk = block_width_comb[
chunk_start : chunk_start + max_comb_chunk
]
# key_layer_nodatas_comb[chunk_start : chunk_start + max_comb_chunk]
unique_blocks_ids_chunk = unique_blocks_ids_comb[
chunk_start : chunk_start + max_comb_chunk
]
key_layer_ids_chunk = key_layer_ids_comb[
chunk_start : chunk_start + max_comb_chunk
]
chunk_size = len(key_layer_paths_chunk)
data_array = np.empty((chunk_size, n_block_pix), dtype=np.float32)
perm_vec = range(chunk_size)
sb.readDataBlocks(
data_array,
self.n_threads,
key_layer_paths_chunk,
perm_vec,
block_col_off_chunk,
block_row_off_chunk,
block_width_chunk,
block_height_chunk,
bands_list,
gdal_opts,
None,
np.nan,
)
pix_block_ids = key_query_pixels["block_id"].tolist()
sample_rows = key_query_pixels["sample_row"].tolist()
sample_cols = key_query_pixels["sample_col"].tolist()
block_width_dict = unique_blocks.set_index("block_id")[
"block_width"
].to_dict()
pix_inblock_idxs = [
sample_rows[k] * block_width_dict[ubid] + sample_cols[k]
for k, ubid in enumerate(pix_block_ids)
]
sb.extractOverlay(
data_array,
self.n_threads,
pix_block_ids,
pix_inblock_idxs,
unique_blocks_ids_chunk,
key_layer_ids_chunk,
data_overlay,
)
return data_overlay
[docs]
class SpaceTimeOverlay:
"""
Overlay a set of points over multiple raster considering the year information.
The retrieved pixel values are organized in columns according to the filenames.
:param points: The path for vector file or ``geopandas.GeoDataFrame`` with
the points.
:param col_date: Date column to retrieve the year information.
:param catalog.
:param n_threads: Number of CPU cores to be used in parallel. By default all cores
are used.
:param verbose: Use ``True`` to print the overlay progress.
Examples
========
>>> from skmap.overlay import SpaceTimeOverlay
>>>
"""
def __init__(
self,
points: gpd.GeoDataFrame | pd.DataFrame | str,
col_date: str,
catalog: DataCatalog,
raster_tiles: gpd.GeoDataFrame | str,
tile_id_col: str = "tile_id",
n_threads: int = parallel.CPU_COUNT,
verbose: bool = False,
) -> None:
if not isinstance(points, gpd.GeoDataFrame):
if not isinstance(points, pd.DataFrame):
points = pd.read_parquet(points)
points["geometry"] = points.apply(
lambda row: Point(row["lon"], row["lat"]), axis=1
)
points = gpd.GeoDataFrame(points, geometry="geometry")
self.pts = points.reset_index(drop=True)
self.n_threads = n_threads
self.col_date = col_date
self.overlay_objs: Dict[
str, SpaceOverlay
] = {} # FIXME: Dict[int, SpaceOverlay]?
self.year_catalogs = {}
self.verbose = verbose
self.catalog = catalog
self.pts[self.col_date] = self.pts[self.col_date].astype(int)
self.year_points = {}
for year in self.catalog.get_groups():
self.year_points[year] = self.pts[self.pts[self.col_date] == int(year)]
if len(self.year_points[year]) > 0:
year_catalog = catalog.copy()
year_catalog.query(
catalog.get_feature_names(), [year]
) # 'common' group is retrieved by default
self.year_catalogs[year] = year_catalog
if self.verbose:
ttprint(
f"Overlay {len(self.year_points[year])} points from {year} in {year_catalog.data_size} raster layers"
)
self.overlay_objs[year] = SpaceOverlay(
points=self.year_points[year],
catalog=self.year_catalogs[year],
raster_tiles=raster_tiles,
tile_id_col=tile_id_col,
n_threads=n_threads,
verbose=verbose,
)
else:
print(
f"No points to overlay for year {year}, removing it from the catalog"
)
del catalog.data[year]
[docs]
def run(
self,
max_ram_mb: int,
out_file_name: Optional[Path],
gdal_opts: Dict[str, str] = {
"GDAL_HTTP_VERSION": "1.0",
"CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif",
},
) -> Optional[pd.DataFrame]:
"""
Execute the spacetime overlay. It removes the year part from the column names.
For example, the raster ``raster_20101202..20110320.tif`` results in the column
name ``raster_1202..0320``.
:returns: Data frame with the original columns plus the overlay result (one new
column per raster).
:rtype: geopandas.GeoDataFrame
"""
self.result: Optional[pd.DataFrame] = None
for year in self.catalog.get_groups():
if self.verbose:
ttprint(f"Running the overlay for {year}")
year_result = self.overlay_objs[year].run(max_ram_mb, None, gdal_opts)
if self.result is None:
self.result = year_result
else:
self.result: pd.DataFrame = pd.concat(
[self.result, year_result], ignore_index=True
)
if out_file_name is not None:
self.result.to_parquet(out_file_name)
return self.result