Source code for skmap.overlay

import hashlib
import itertools
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union

import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
import rasterio.windows
import requests
from shapely import box
from shapely.geometry import Point

import skmap.set_env  # noqa: F401
import skmap_bindings as sb
from skmap import parallel
from skmap.catalog import DataCatalog, run_whales
from skmap.misc import ttprint


class _ParallelOverlay:
    # sampling only first band in every layer

    TILE_PLACEHOLDER = "{tile_id}"

    def __init__(
        self,
        points_x: np.ndarray | Sequence[float],
        points_y: np.ndarray | Sequence[float],
        raster_files: Sequence[str | Path],
        points_crs: str | None,
        raster_tiles: gpd.GeoDataFrame | str | None = None,
        tile_id_col: str = "tile_id",
        n_threads: int = parallel.CPU_COUNT,
        verbose: bool = True,
    ) -> None:
        self.verbose: bool = verbose

        self.default_tile_id: str = ""
        self.tile_id_col: str = tile_id_col

        if raster_tiles is not None:
            if not isinstance(raster_tiles, gpd.GeoDataFrame):
                if self.verbose:
                    ttprint(f"Reading {raster_tiles}")
                raster_tiles: gpd.GeoDataFrame = gpd.read_file(raster_tiles)

            self.default_tile_id: str = raster_tiles[self.tile_id_col].iloc[0]

        self.raster_tiles: gpd.GeoDataFrame = raster_tiles

        if points_crs is None:
            points_crs = rasterio.open(
                str(raster_files[0]).replace(
                    _ParallelOverlay.TILE_PLACEHOLDER, self.default_tile_id
                )
            ).crs

        samples: gpd.GeoDataFrame = gpd.GeoDataFrame(
            geometry=gpd.points_from_xy(points_x, points_y), crs=points_crs
        )
        samples.reset_index(drop=True, inplace=True)

        self.raster_files = raster_files

        self.layers: pd.DataFrame = (
            pd.DataFrame(
                {
                    "name": [
                        str(Path(raster_file).with_suffix("").name)
                        for raster_file in raster_files
                    ],
                    "path": self.raster_files,
                }
            )
            .reset_index(drop=True)
            .apply(
                _ParallelOverlay._layer_metadata,
                default_tile_id=self.default_tile_id,
                axis=1,
            )
        )

        self.query_pixels: dict[str, gpd.GeoDataFrame] = self._find_blocks(samples)

    @staticmethod
    def _layer_metadata(row, default_tile_id):
        path = row["path"]
        if _ParallelOverlay._is_tiled(path):
            path = path.replace(_ParallelOverlay.TILE_PLACEHOLDER, default_tile_id)
        src = rasterio.open(path)

        row["nodata"] = src.nodata
        _, window = next(src.block_windows(1))
        row["block_height"] = window.height
        row["block_width"] = window.width

        key = "".join(
            [
                str(default_tile_id),
                str(src.height),
                str(src.width),
                str(src.block_shapes[0]),
                str(src.transform.to_gdal()),
            ]
        )
        row["group"] = str(hashlib.md5(key.encode("utf-8")).hexdigest())

        return row

    @staticmethod
    def _is_tiled(path: Path | str) -> bool:
        return _ParallelOverlay.TILE_PLACEHOLDER in str(path)

    @staticmethod
    def _tiled_blocks(tile_path, tile_id):
        tile_blocks = []

        with rasterio.Env(
            CPL_VSIL_CURL_ALLOWED_EXTENSIONS=".tif", GDAL_HTTP_VERSION="1.1"
        ):
            src = rasterio.open(tile_path)

            for (i, j), window in src.block_windows(1):
                tile_blocks.append(
                    {
                        "tile_id": tile_id,
                        "window": window,
                        "geometry": box(
                            *rasterio.windows.bounds(window, src.transform)
                        ),
                    }
                )

        return tile_blocks

    def _find_blocks_for_src(
        self, path: str | Path, samples: gpd.GeoDataFrame
    ) -> gpd.GeoDataFrame:
        gdf_blocks = []

        if _ParallelOverlay._is_tiled(path):
            samp_tiles = self.raster_tiles.sjoin(
                samples.to_crs(self.raster_tiles.crs), how="inner"
            )[self.tile_id_col].unique()
            raster_tiles = self.raster_tiles[
                self.raster_tiles[self.tile_id_col].isin(samp_tiles)
            ]
            if self.verbose:
                ttprint(
                    f"Retrieving block information for {raster_tiles.shape[0]} tiles."
                )

            for _, tile in raster_tiles.iterrows():
                tile_id = tile[self.tile_id_col]
                tile_path = str(path).replace(
                    _ParallelOverlay.TILE_PLACEHOLDER, tile_id
                )
                src: rasterio.io.DatasetReader = rasterio.open(tile_path)
                # read the raster's specific blocks and insert their dimensions into gdf
                # see https://rasterio.readthedocs.io/en/latest/topics/windowed-rw.html#blocks
                for (i, j), window in src.block_windows(1):
                    gdf_blocks.append(
                        {
                            "tile_id": tile_id,
                            "window": window,
                            "inv_transform": ~rasterio.windows.transform(
                                window, src.transform
                            ),
                            "geometry": box(
                                *rasterio.windows.bounds(window, src.transform)
                            ),
                        }
                    )

        else:
            src: rasterio.io.DatasetReader = rasterio.open(path)

            for (i, j), window in src.block_windows(1):
                gdf_blocks.append(
                    {
                        "window": window,
                        "inv_transform": ~rasterio.windows.transform(
                            window, src.transform
                        ),
                        "geometry": box(
                            *rasterio.windows.bounds(window, src.transform)
                        ),
                    }
                )

        # convert list-of-dicts to gdf
        gdf_blocks = gpd.GeoDataFrame(gdf_blocks, crs=src.crs)
        gdf_blocks.reset_index(drop=True, inplace=True)

        src.close()

        assert gdf_blocks.crs is not None, f"The layer {path} has not crs, need fix"

        # join with samples
        gdf_blocks = (
            samples.to_crs(gdf_blocks.crs)
            .sjoin(gdf_blocks, how="inner")
            .rename(columns={"index_right": "block_id"})
        )

        query_pixels: Sequence[gpd.GeoDataFrame] = []

        for ij, block in gdf_blocks.groupby("block_id"):
            inv_block_transform = block["inv_transform"].iloc[0]
            window = block["window"].iloc[0]
            block["x"] = block.geometry.x
            block["y"] = block.geometry.y
            block["block_col_off"] = window.col_off
            block["block_row_off"] = window.row_off
            block["block_width"] = window.width
            block["block_height"] = window.height

            block.loc[:, "sample_col"], block.loc[:, "sample_row"] = (
                inv_block_transform * (block["x"], block["y"])
            )
            block["sample_col"] = block["sample_col"].astype("int")
            block["sample_row"] = block["sample_row"].astype("int")
            block = block.drop(columns="geometry")

            query_pixels.append(block)

        assert query_pixels, f"query_pixels is empty for path: {path}"
        res: gpd.GeoDataFrame = pd.concat(query_pixels).drop(
            columns=["window", "inv_transform"]
        )
        if len(set(res.index)) < int(samples.shape[0] * 0.5):
            print(
                f"Less then 50% of points queryed for path: {path}, this is suspicious"
            )

        return res

    def _find_blocks(self, samples: gpd.GeoDataFrame) -> dict[str, gpd.GeoDataFrame]:
        if self.verbose:
            ttprint(f"Scanning blocks of {len(self.raster_files)} layers")

        query_pixels = {}

        for _, layers in self.layers.groupby("group"):
            group = layers.iloc[0]["group"]
            path = layers.iloc[0]["path"]

            if self.verbose:
                ttprint(f"Finding query pixels for {group} ({layers.shape[0]} layers)")

            query_pixels[group] = self._find_blocks_for_src(path, samples)

        if self.verbose:
            ttprint("End")

        return query_pixels


[docs] class SpaceOverlay: """ Overlay a set of points over multiple raster files. The retrieved pixel values are organized in columns according to the filenames. :param points: The path for vector file or ``geopandas.GeoDataFrame`` with the points. :param catalog: scikit-map data catalog. :param n_threads: Number of CPU cores to be used in parallel. By default all cores are used. :param verbose: Use ``True`` to print the overlay progress. """ def __init__( self, points: gpd.GeoDataFrame | str | pd.DataFrame, catalog: DataCatalog, raster_tiles: Optional[gpd.GeoDataFrame | str] = None, tile_id_col: Union[str] = "tile_id", n_threads: int = parallel.CPU_COUNT, verbose: bool = True, ) -> None: self.verbose: bool = verbose self.catalog: DataCatalog = catalog self.layer_paths, self.layer_idxs, self.layer_names = self.catalog.get_paths() if raster_tiles is not None: if not isinstance(raster_tiles, gpd.GeoDataFrame): if self.verbose: ttprint(f"Reading {raster_tiles}") raster_tiles = gpd.read_file(raster_tiles) processed_urls = [] for url in self.layer_paths: url = url.replace("/vsicurl/", "") if "{tile_id}" in url: url = url.replace("{tile_id}", raster_tiles[tile_id_col].iloc[0]) processed_urls.append(url) urls_with_404 = [] for url in processed_urls: try: response = requests.head(url, timeout=10) if response.status_code == 404: urls_with_404.append(url) except requests.RequestException as e: print(f"Error checking URL {url}: {e}") if self.verbose: ttprint( f"{len(urls_with_404)} out of {len(self.layer_paths)} URLs returning 404" ) for url in urls_with_404: print(url) assert len(urls_with_404) == 0, "The listed URLs can not be reached" if not isinstance(points, gpd.GeoDataFrame): if not isinstance(points, pd.DataFrame): points = pd.read_parquet(points) points["geometry"] = points.apply( lambda row: Point(row["lon"], row["lat"]), axis=1 ) points = gpd.GeoDataFrame(points, geometry="geometry") self.pts = points.reset_index(drop=True) self.n_threads = n_threads self.parallelOverlay = _ParallelOverlay( self.pts.geometry.x.values, self.pts.geometry.y.values, self.layer_paths, points_crs=self.pts.crs, raster_tiles=raster_tiles, tile_id_col=tile_id_col, n_threads=self.n_threads, verbose=self.verbose, ) # Drop duplicates (from overlapping tiles), find union of missing points (out of extent) to drop them and sorting the dataframes keys = list(self.parallelOverlay.query_pixels.keys()) missing_indices = [] for key in keys: self.parallelOverlay.query_pixels[key] = self.parallelOverlay.query_pixels[ key ][~self.parallelOverlay.query_pixels[key].index.duplicated(keep="first")] missing_indices_key_set = set(self.pts.index) - set( self.parallelOverlay.query_pixels[key].index ) missing_indices += list(missing_indices_key_set) for key in keys: key_indices_to_drop = set(missing_indices) & set( self.parallelOverlay.query_pixels[key].index ) v = self.parallelOverlay.query_pixels[key] v.drop(index=key_indices_to_drop, inplace=True) v.sort_index(axis=0, ascending=True, inplace=True) self.parallelOverlay.query_pixels[key] = v print( f"Dropping {len(list(set(missing_indices)))} points out of {self.pts.shape[0]} because out of extent" ) self.pts = self.pts.drop(index=set(missing_indices)).sort_index( axis=0, ascending=True )
[docs] def run( self, max_ram_mb: int, out_file_name: Path | None, gdal_opts: Dict[str, str] = { "GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif", }, ) -> pd.DataFrame: """ Execute the space overlay. :param gdal_opts: Options for GDAL in a dictionary. :param max_ram_mb: Maximum RAM that can be used for the overlay expressed in MB. :returns: Data frame with the original columns plus the overlay result (one new column per raster). :rtype: geopandas.GeoDataFrame """ feats_names, _, feats_idx = self.catalog.get_unrolled_catalog() # FIXME: this should be in a unit test assert (self.catalog.data_size == len(self.catalog.get_feature_names())) & ( self.catalog.data_size == len(feats_names) ), ( "Catalog data size should coincide with the number of features, something went wrong" ) self.ordered_feats_names: List[str] = [ s for _, s in sorted(zip(feats_idx, feats_names)) ] self.data_overlay = self.read_data(gdal_opts, max_ram_mb) self.data_array = np.empty( (self.catalog.data_size, self.data_overlay.shape[1]), dtype=np.float32 ) assert self.pts.shape[0] == self.data_overlay.shape[1], ( "Not matching size between input points and the overalied data" ) self.data_array[self.layer_idxs, :] = self.data_overlay[:, :] self.pts = self.pts.reset_index(drop=True) self.pts["lon"] = self.pts["geometry"].x self.pts["lat"] = self.pts["geometry"].y run_whales( self.catalog, self.data_array, self.n_threads, lat_info=self.pts["lat"].to_numpy(), ) # @FIXME check that all the filled flags are True or assert at this point df = pd.DataFrame(self.data_array.T, columns=pd.Index(self.ordered_feats_names)) if "lat" in df: self.pts = self.pts.drop(columns=["lat"]) self.pts_out = pd.concat([self.pts, df.reset_index(drop=True)], axis=1) self.pts_out = self.pts_out.drop(columns=["geometry"]) if out_file_name is not None: self.pts_out.to_parquet(out_file_name) return self.pts_out
[docs] def read_data(self, gdal_opts: dict[str, str], max_ram_mb: int) -> np.ndarray: """read data in parallel across blocks, normally called by `self.run` :param gdal_opts: additional options to pass to GDAL :type gdal_opts: dict[str, str] :param max_ram_mb: max RAM to use in MB :type max_ram_mb: int :return: return array ?with overlay applied? :rtype: np.ndarray """ layers = self.parallelOverlay.layers layers["layer_id"] = layers.index query_pixels = self.parallelOverlay.query_pixels keys = list(query_pixels.keys()) data_overlay = np.empty( (layers.shape[0], query_pixels[keys[0]].shape[0]), dtype=np.float32 ) sb.fillArray(data_overlay, self.n_threads, np.nan) for i, key in enumerate(keys): key_layer_ids = [ int(l) for l in list(layers[layers["group"] == key]["layer_id"]) ] key_query_pixels = query_pixels[key] assert key_query_pixels.shape[0] == query_pixels[keys[0]].shape[0], ( "Query pixel size is inconsistent between keys, something went wrong" ) unique_blocks = key_query_pixels[ [ "block_id", "block_col_off", "block_row_off", "block_height", "block_width", ] ].drop_duplicates("block_id") unique_blocks_ids = unique_blocks["block_id"].tolist() unique_blocks_dict = unique_blocks.set_index("block_id")[ ["block_row_off", "block_col_off", "block_height", "block_width"] ].to_dict("index") layer_path_dict = layers.set_index("layer_id")["path"].to_dict() if self.verbose: ttprint( f"Loading and sampling {len(set(key_layer_ids))} raster layers for group {key}" ) # layer_nodata_dict = layers.set_index("layer_id")["nodata"].to_dict() key_layer_ids_comb, unique_blocks_ids_comb = map( list, zip(*itertools.product(key_layer_ids, unique_blocks_ids)) ) block_row_off_comb = [ unique_blocks_dict[ubid]["block_row_off"] for ubid in unique_blocks_ids_comb ] block_col_off_comb = [ unique_blocks_dict[ubid]["block_col_off"] for ubid in unique_blocks_ids_comb ] block_height_comb = [ unique_blocks_dict[ubid]["block_height"] for ubid in unique_blocks_ids_comb ] block_width_comb = [ unique_blocks_dict[ubid]["block_width"] for ubid in unique_blocks_ids_comb ] key_layer_paths_comb = [ layer_path_dict[ulid] for ulid in key_layer_ids_comb ] # key_layer_nodatas_comb = [ # np.nan if layer_nodata_dict[ulid] is None else layer_nodata_dict[ulid] # for ulid in key_layer_ids_comb # ] n_comb = len(key_layer_paths_comb) bands_list = [1] if "tile_id" in query_pixels[key].columns: block_tile_id_dict = key_query_pixels.set_index("block_id")[ "tile_id" ].to_dict() key_layer_paths_comb = [ key_layer_paths_comb[j].format( tile_id=block_tile_id_dict[unique_blocks_ids_comb[j]] ) for j in range(len(key_layer_paths_comb)) ] block_height = np.max(block_height_comb) block_width = np.max(block_width_comb) n_block_pix = block_height * block_width # Factor 2 is heuristic to keep margin for temporary variables max_comb_chunk = int( np.ceil( (max_ram_mb - data_overlay.size * 4 / 1024 / 1024) * 1024 * 1024 / 4 / n_block_pix / 2 ) ) assert max_comb_chunk > 1, ( "skmap-error 42: max_ram_mb too small, can not chunk" ) for chunk_start in range(0, n_comb, max_comb_chunk): key_layer_paths_chunk = key_layer_paths_comb[ chunk_start : chunk_start + max_comb_chunk ] block_col_off_chunk = block_col_off_comb[ chunk_start : chunk_start + max_comb_chunk ] block_row_off_chunk = block_row_off_comb[ chunk_start : chunk_start + max_comb_chunk ] block_height_chunk = block_height_comb[ chunk_start : chunk_start + max_comb_chunk ] block_width_chunk = block_width_comb[ chunk_start : chunk_start + max_comb_chunk ] # key_layer_nodatas_comb[chunk_start : chunk_start + max_comb_chunk] unique_blocks_ids_chunk = unique_blocks_ids_comb[ chunk_start : chunk_start + max_comb_chunk ] key_layer_ids_chunk = key_layer_ids_comb[ chunk_start : chunk_start + max_comb_chunk ] chunk_size = len(key_layer_paths_chunk) data_array = np.empty((chunk_size, n_block_pix), dtype=np.float32) perm_vec = range(chunk_size) sb.readDataBlocks( data_array, self.n_threads, key_layer_paths_chunk, perm_vec, block_col_off_chunk, block_row_off_chunk, block_width_chunk, block_height_chunk, bands_list, gdal_opts, None, np.nan, ) pix_block_ids = key_query_pixels["block_id"].tolist() sample_rows = key_query_pixels["sample_row"].tolist() sample_cols = key_query_pixels["sample_col"].tolist() block_width_dict = unique_blocks.set_index("block_id")[ "block_width" ].to_dict() pix_inblock_idxs = [ sample_rows[k] * block_width_dict[ubid] + sample_cols[k] for k, ubid in enumerate(pix_block_ids) ] sb.extractOverlay( data_array, self.n_threads, pix_block_ids, pix_inblock_idxs, unique_blocks_ids_chunk, key_layer_ids_chunk, data_overlay, ) return data_overlay
[docs] class SpaceTimeOverlay: """ Overlay a set of points over multiple raster considering the year information. The retrieved pixel values are organized in columns according to the filenames. :param points: The path for vector file or ``geopandas.GeoDataFrame`` with the points. :param col_date: Date column to retrieve the year information. :param catalog. :param n_threads: Number of CPU cores to be used in parallel. By default all cores are used. :param verbose: Use ``True`` to print the overlay progress. Examples ======== >>> from skmap.overlay import SpaceTimeOverlay >>> """ def __init__( self, points: gpd.GeoDataFrame | pd.DataFrame | str, col_date: str, catalog: DataCatalog, raster_tiles: gpd.GeoDataFrame | str, tile_id_col: str = "tile_id", n_threads: int = parallel.CPU_COUNT, verbose: bool = False, ) -> None: if not isinstance(points, gpd.GeoDataFrame): if not isinstance(points, pd.DataFrame): points = pd.read_parquet(points) points["geometry"] = points.apply( lambda row: Point(row["lon"], row["lat"]), axis=1 ) points = gpd.GeoDataFrame(points, geometry="geometry") self.pts = points.reset_index(drop=True) self.n_threads = n_threads self.col_date = col_date self.overlay_objs: Dict[ str, SpaceOverlay ] = {} # FIXME: Dict[int, SpaceOverlay]? self.year_catalogs = {} self.verbose = verbose self.catalog = catalog self.pts[self.col_date] = self.pts[self.col_date].astype(int) self.year_points = {} for year in self.catalog.get_groups(): self.year_points[year] = self.pts[self.pts[self.col_date] == int(year)] if len(self.year_points[year]) > 0: year_catalog = catalog.copy() year_catalog.query( catalog.get_feature_names(), [year] ) # 'common' group is retrieved by default self.year_catalogs[year] = year_catalog if self.verbose: ttprint( f"Overlay {len(self.year_points[year])} points from {year} in {year_catalog.data_size} raster layers" ) self.overlay_objs[year] = SpaceOverlay( points=self.year_points[year], catalog=self.year_catalogs[year], raster_tiles=raster_tiles, tile_id_col=tile_id_col, n_threads=n_threads, verbose=verbose, ) else: print( f"No points to overlay for year {year}, removing it from the catalog" ) del catalog.data[year]
[docs] def run( self, max_ram_mb: int, out_file_name: Optional[Path], gdal_opts: Dict[str, str] = { "GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif", }, ) -> Optional[pd.DataFrame]: """ Execute the spacetime overlay. It removes the year part from the column names. For example, the raster ``raster_20101202..20110320.tif`` results in the column name ``raster_1202..0320``. :returns: Data frame with the original columns plus the overlay result (one new column per raster). :rtype: geopandas.GeoDataFrame """ self.result: Optional[pd.DataFrame] = None for year in self.catalog.get_groups(): if self.verbose: ttprint(f"Running the overlay for {year}") year_result = self.overlay_objs[year].run(max_ram_mb, None, gdal_opts) if self.result is None: self.result = year_result else: self.result: pd.DataFrame = pd.concat( [self.result, year_result], ignore_index=True ) if out_file_name is not None: self.result.to_parquet(out_file_name) return self.result