Source code for skmap.tiled_data

# ruff: noqa: ANN001
# ruff: noqa: ANN201
# ruff: noqa: ANN202
# ruff: noqa: ANN204
# ruff: noqa: E722
import os
import random
import re
import subprocess
import warnings
from concurrent.futures import ProcessPoolExecutor
from typing import Dict, Optional

import numpy as np
from osgeo import gdal

import skmap.set_env  # noqa: F401
import skmap_bindings as sb
from skmap.catalog import DataCatalog, run_whales
from skmap.misc import TimeTracker, sb_arr, sb_vec, ttprint

mask_aggregation_bash_script = """#!/bin/bash
    if [ -z "$1" ]; then
        echo "Usage: $0 <input_tif_file>"
        exit 1
    fi
    tif_file="$1"
    basename_tif=$(basename "$tif_file" .tif)
    output_file="comp_${basename_tif}.tif"

    if [ -f "$output_file" ]; then
        rm "$output_file"
    fi
    xmin=$(gdalinfo $tif_file | grep "Upper Left" | awk -F'[(), ]+' '{print $3}')
    ymax=$(gdalinfo $tif_file | grep "Upper Left" | awk -F'[(), ]+' '{print $4}')
    xmax=$(gdalinfo $tif_file | grep "Lower Right" | awk -F'[(), ]+' '{print $3}')
    ymin=$(gdalinfo $tif_file | grep "Lower Right" | awk -F'[(), ]+' '{print $4}')
    tmp_out=$(gdalinfo "$tif_file" | grep "Pixel Size" | awk -F'[(),]' '{print $2, $3}')
    read -r pixel_size_x pixel_size_y <<< "$tmp_out"
    pixel_size_y=$(awk "BEGIN {print -1 * $pixel_size_y}")
    pixel_size_x_new=$(awk "BEGIN {print $pixel_size_x * <AGG_FACTOR>}")
    pixel_size_y_new=$(awk "BEGIN {print $pixel_size_y * <AGG_FACTOR>}")
    xmin_new=$(awk "BEGIN {print $xmin + 2 * $pixel_size_x}")
    ymin_new=$(awk "BEGIN {print $ymin + 2 * $pixel_size_y}")
    xmax_new=$(awk "BEGIN {print $xmax - 2 * $pixel_size_x}")
    ymax_new=$(awk "BEGIN {print $ymax - 2 * $pixel_size_y}")
    gdalwarp -q -te "$xmin_new" "$ymin_new" "$xmax_new" "$ymax_new" -tr "$pixel_size_x_new" "$pixel_size_y_new" -r max "$tif_file" "$output_file"
    """


[docs] def warp_tile(tile_file, mosaic_paths, n_pix, resample): """ no docs .. deprecated:: 0.10.0 Use GDAL vrt's """ warp_data = sb_vec( n_pix, ) try: sb.warpTile( warp_data, 1, {"GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif"}, tile_file, mosaic_paths, resample, ) except: sb.fillArray(warp_data, 1, 0.0) print(f"Mosaic {mosaic_paths} has no data in {tile_file}, filling with 0.0") return warp_data
[docs] def s3_list_files(s3_aliases, s3_prefix, tile_id, file_pattern=None): if len(s3_aliases) == 0: return [] bash_cmd = f"mc find {s3_aliases[0]}/{s3_prefix}/{tile_id}" process = subprocess.Popen( bash_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True ) stdout, stderr = process.communicate() # stderr = stderr.decode('utf-8') # assert stderr == '', f"Error listing S3 `{s3_aliases[0]}{s3_prefix}/{tile_id}`. \nError: {stderr}" stdout = stdout.decode("utf-8") lines = stdout.splitlines() if file_pattern is not None: pattern = re.compile(file_pattern) lines = [line for line in lines if pattern.search(line)] return lines
#
[docs] def s3_setup(access_key, secret_key, gaia_addrs): s3_aliases = [] s3_aliases = [f"g{i + 1}" for i, _ in enumerate(gaia_addrs)] commands = [ f"mc alias set g{i + 1} {addr} {access_key} {secret_key} --api S3v4 > /dev/null" for i, addr in enumerate(gaia_addrs) ] for cmd in commands: subprocess.run(cmd, shell=True, capture_output=False, text=True, check=True) return s3_aliases
#
[docs] class TiledData: """Base class for `TiledDataLoader` and `TiledDataExporter`""" def __init__( self, n_layers: int = None, n_pixels: int = None, tile_id: str = None, n_threads: int = os.cpu_count(), ) -> None: self.tile_id = tile_id self.n_threads = n_threads if (n_layers is not None) & (n_pixels is not None): self.array = sb_arr(n_layers, n_pixels) else: self.array = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.array = None def convert_nan_to_value(self, value) -> None: sb.maskNan(self.array, self.n_threads, range(self.array.shape[0]), value) def convert_nan_to_median(self) -> None: medians = sb_arr(self.array.shape[0], 1) sb.computePercentiles( self.array, self.n_threads, range(self.array.shape[1]), medians, [0], [50.0] ) nan_indices = np.argwhere(np.isnan(medians[:, 0])).flatten() if len(nan_indices) > 0: for nan_idx in nan_indices: print( f"scikit-map ERROR 101: index {nan_idx} has all NaN for tile {self.tile_id}" ) # raise Exception("scikit-map ERROR 101") sb.maskNanRows(self.array, self.n_threads, range(self.array.shape[0]), medians)
[docs] class TiledDataLoader(TiledData): """ Loads tiled data in parallel based on a catalog file :param catalog: scikit-map `DataCatalog` :param mask_template_path: ``str`` with a ``{tile_id}`` placeholder :type mask_template_path: str :param spatial_aggregation: Whether to use spatial aggregation Should be powers of 2?, defaults to None :type spatial_aggregation: int, optional :param resampling_strategy: _description_, defaults to "GRA_NearestNeighbour" :type resampling_strategy: str, optional :param gdal_opts: _description_, defaults to { "GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif", } :type gdal_opts: Dict[str, str], optional :param n_threads: The number of threads to use for whales, defaults to os.cpu_count() :type n_threads: int, optional :param verbose: Whether to print output, defaults to True :type verbose: bool, optional :param n_threads_read: Number of threads for GDAL, defaults to n_threads :type n_threads_read: int, optional """ def __init__( self, catalog: DataCatalog, mask_template_path: str, spatial_aggregation: bool = None, resampling_strategy: str = "GRA_NearestNeighbour", gdal_opts: Dict[str, str] = { "GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif", }, n_threads: int = os.cpu_count(), verbose: bool = True, n_threads_read=None, ) -> None: """_summary_ :param catalog: _description_ :type catalog: DataCatalog :param mask_template_path: _description_ """ self.catalog = catalog self.mask_template_path = mask_template_path self.spatial_aggregation = spatial_aggregation self.resampling_strategy = resampling_strategy self.gdal_opts = gdal_opts self.n_threads = n_threads if n_threads_read is None: self.n_threads_read = self.n_threads else: self.n_threads_read = n_threads_read self.executor = ProcessPoolExecutor(max_workers=self.n_threads_read) self.mask_path = None self.array = None self.array_valid = None self.tile_id = None self.x_off = None self.y_off = None self.x_size = None self.y_size = None self.n_pixels = None self.mask = None self.verbose = verbose self.pixels_valid_idx = None self.n_pixels_valid = None def __exit__(self, exc_type, exc_val, exc_tb): if os.path.exists(self.mask_path): os.remove(self.mask_path) ttprint(f"Temporary mask data {self.mask_path} has been deleted.") def load_tile_data(self, tile_id): self.tile_id = tile_id tmp_mask_path = self.mask_template_path.format(tile_id=tile_id) self.mask_path = ( f"/tmp/tmp_mask_{tmp_mask_path.split('/')[-1].split('.')[0]}.tif" ) subprocess.run( f"wget {tmp_mask_path} -q -O {self.mask_path}", shell=True, check=True ) # @FIXME: this only work with our setting of Landsat data if self.spatial_aggregation: if self.verbose: warnings.warn( "Spatial aggregation only works with 4004x4004 mask tiles" ) assert 4000 % self.spatial_aggregation == 0, ( "Aggregation factor must be a integer dividend of 4000" ) mask_aggregation_bash_script_parsed = mask_aggregation_bash_script.replace( "<AGG_FACTOR>", str(self.spatial_aggregation) ) with open("mask_aggregation_bash_script.sh", "w") as file: file.write(mask_aggregation_bash_script_parsed) subprocess.run(["chmod", "+x", "mask_aggregation_bash_script.sh"]) input_file = self.mask_path subprocess.run(["./mask_aggregation_bash_script.sh", input_file]) self.mask_path = f"comp_{input_file.split('/')[-1].split('.')[0]}.tif" # check for valid pixels gdal_img = gdal.Open(self.mask_path) self.x_size = gdal_img.RasterXSize self.y_size = gdal_img.RasterYSize gdal_img = None self.x_off, self.y_off = (0, 0) self.n_pixels = self.x_size * self.y_size # prepare mask with TimeTracker(f" Prepare mask for tile {self.tile_id}", False): self.mask = np.zeros((1, self.n_pixels), dtype=np.float32) sb.readData( self.mask, 1, [self.mask_path], [0], self.x_off, self.y_off, self.x_size, self.y_size, [1], self.gdal_opts, ) self.pixels_valid_idx = np.arange(0, self.n_pixels)[ self.mask[0, :].astype(int).astype(bool) ] self.n_pixels_valid = int(np.sum(self.mask)) if self.n_pixels_valid == 0: return self # read rasters self.array = sb_arr(self.catalog.data_size, self.n_pixels) with TimeTracker( f" Read rasters and compute whales for tile {self.tile_id}", False ): paths, paths_idx, _ = self.catalog.get_paths() tile_paths, tile_idxs, mosaic_paths, mosaic_idxs = [], [], [], [] for path, idx in zip(paths, paths_idx): if "{tile_id}" in path: tile_paths += [path.format(tile_id=self.tile_id)] tile_idxs += [idx] else: mosaic_paths += [path] mosaic_idxs += [idx] # Reading resampled data if mosaic_paths: # @FIXME we could copy locally the mask and then delete to reduce number of requests tile_template_paths = [self.mask_path for i in range(len(mosaic_paths))] futures = [ self.executor.submit( warp_tile, tile_template_paths[i], mosaic_paths[i], self.n_pixels, self.resampling_strategy, ) for i in range(len(mosaic_paths)) ] for i, future in enumerate(futures): self.array[mosaic_idxs[i], :] = future.result() # Reading tiled data if tile_paths: if self.spatial_aggregation: tmp_data = sb_arr(len(tile_paths), 4000 * 4000) sb.readData( tmp_data, self.n_threads_read, tile_paths, range(len(tile_paths)), 2, 2, 4000, 4000, [1], self.gdal_opts, None, np.nan, ) arr_reshaped = tmp_data.reshape(len(tile_paths), 4000, 4000) arr_aggregated = arr_reshaped.reshape( len(tile_paths), self.y_size, self.spatial_aggregation, self.x_size, self.spatial_aggregation, ).mean(axis=(2, 4)) arr_final = arr_aggregated.reshape( len(tile_paths), self.x_size * self.y_size ) self.array[tile_idxs, :] = arr_final[:, :] else: sb.readData( self.array, self.n_threads_read, tile_paths, tile_idxs, self.x_off, self.y_off, self.x_size, self.y_size, [1], self.gdal_opts, None, np.nan, ) # Go whales, go!! lon_lat = sb_arr(2, self.n_pixels) sb.getLatLonArray( lon_lat, self.n_threads_read, self.gdal_opts, self.mask_path, self.x_off, self.y_off, self.x_size, self.y_size, ) run_whales(self.catalog, self.array, self.n_threads, lon_lat[1, :]) return self def convert_nan_to_median(self) -> None: medians = sb_arr(self.array.shape[0], 1) sb.computePercentiles( self.array, self.n_threads, range(self.array.shape[1]), medians, [0], [50.0] ) nan_indices = np.argwhere(np.isnan(medians[:, 0])).flatten() if len(nan_indices) > 0: for nan_idx in nan_indices: group_name, feature_name = self.catalog.find_group_and_feature_by_index( nan_idx ) print( f"scikit-map ERROR 101: index {nan_idx} corresponding to group {group_name} and feature {feature_name} has all NaN for tile {self.tile_id}" ) # raise Exception("scikit-map ERROR 101") sb.maskNanRows(self.array, self.n_threads, range(self.array.shape[0]), medians) def filter_valid_pixels(self) -> None: self.array_valid = sb_arr(self.catalog.data_size, self.n_pixels_valid) sb.selArrayCols( self.array, self.n_threads, self.array_valid, self.pixels_valid_idx ) def expand_valid_pixels(self, array_valid, array_expanded) -> None: sb.expandArrayCols( array_valid, self.n_threads, array_expanded, self.pixels_valid_idx ) def get_pixels_valid_idx(self, n_groups): return np.concatenate( [self.pixels_valid_idx + self.n_pixels * i for i in range(n_groups)] ).tolist() def fill_otf_constant(self, otf_name, otf_const) -> None: otf_idx = self.catalog.get_otf_idx() assert otf_name in otf_idx otf_name_idx = otf_idx[otf_name] self.array[otf_name_idx] = otf_const
[docs] def get_percentiele_string(q): if int(q / 0.1) == float(q / 0.1): formatted_p = ( "p0" if (q == 0) else ("p100" if (q == 1) else f"p{int(q / 0.1)}0") ) else: formatted_p = ( "p0" if (q == 0) else ("p100" if (q == 1) else str(q).replace("0.", "p")) ) return formatted_p
[docs] class TiledDataExporter(TiledData): def __init__( self, n_pixels: int = None, tile_id: str = None, mode: str = None, spatial_res: str = None, s3_params=None, verbose=False, years=None, depths=None, legend=None, quantiles=None, save_hdf5=None, n_threads: int = os.cpu_count(), ) -> None: if s3_params: self.s3_aliases = s3_setup( s3_params["s3_access_key"], s3_params["s3_secret_key"], s3_params["s3_addresses"], ) self.s3_prefix = s3_params["s3_prefix"] else: self.s3_aliases = None self.s3_prefix = None self.spatial_res = spatial_res self.n_pixels = n_pixels self.mode = mode self.verbose = verbose self.tile_id = tile_id self.n_threads = n_threads self.years = years self.depths = depths self.legend = legend self.quantiles = quantiles self.save_hdf5 = save_hdf5 if self.mode == "depths_years_quantiles_textures": assert ( (years is not None) & (depths is not None) & (quantiles is not None) ), "Need to provide years, depths, quantiles" elif self.mode == "depths_years_quantiles": assert ( (years is not None) & (depths is not None) & (quantiles is not None) ), "Need to provide years, depths, quantiles" elif self.mode == "static_depths_quantiles": assert (depths is not None) & (quantiles is not None), ( "Need to provide depths, quantiles" ) elif self.mode == "static_quantiles": assert quantiles is not None, "Need to provide quantiles" elif self.mode == "depths_years": assert (years is not None) & (depths is not None), ( "Need to provide years, depths" ) elif self.mode == "depths_years_textures": assert (years is not None) & (depths is not None), ( "Need to provide years, depths" ) elif self.mode == "years": assert years is not None, "Need to provide years" elif self.mode == "probabilities": assert legend is not None, "Need to provide legend" elif (self.mode == "static") | (self.mode == "dominant_class"): pass else: raise Exception( "Available modes: dominant_class, probabilities, depths_years_quantiles_textures, depths_years_quantiles, depths_years, depths_years_textures, years, static_depths_quantiles, static_quantiles, static" ) self.n_layers = len(self._get_out_names("", "")) if (self.n_layers is not None) & (self.n_pixels is not None): self.array = sb_arr(self.n_layers, n_pixels) else: self.array = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.array = None def _get_out_names( self, prefix: str, suffix: str, time_frame: Optional[str] = None ): if self.save_hdf5: return [f"{prefix.format(CLASS='CLASS')}_{suffix}"] if self.mode == "depths_years_quantiles_textures": return self._get_out_names_depths_years_quantiles_textures(prefix, suffix) if self.mode == "depths_years_quantiles": return self._get_out_names_depths_years_quantiles(prefix, suffix) elif self.mode == "depths_years": return self._get_out_names_depths_years(prefix, suffix) elif self.mode == "depths_years_textures": return self._get_out_names_depths_years_textures(prefix, suffix) elif self.mode == "static_depths_quantiles": return self._get_out_names_static_depths_quantiles( prefix, suffix, time_frame ) elif self.mode == "static_quantiles": return self._get_out_names_static_quantiles(prefix, suffix, time_frame) elif self.mode == "static": return self._get_out_names_static(prefix, suffix, time_frame) elif self.mode == "probabilities": return self._get_out_names_probabilities(prefix, suffix, time_frame) elif self.mode == "dominant_class": return self._get_out_names_dominant_class(prefix, suffix, time_frame) elif self.mode == "years": return self._get_out_names_years(prefix, suffix) else: raise Exception( "Available modes: dominant_class, probabilities, depths_years_quantiles_textures, depths_years_quantiles, depths_years, depths_years_textures, years, static_depths_quantiles, static_quantiles, static" ) def _get_out_names_years(self, prefix, suffix): out_files = [] for y in self.years: out_files.append( f"{prefix}_m_{self.spatial_res}_s_{y}0101_{y}1231_{suffix}" ) return out_files def _get_out_names_probabilities(self, prefix, suffix, time_frame): out_files = [ f"{prefix}_p_{self.spatial_res}_s_{time_frame}_{suffix}".format( CLASS=self.legend[str(i)] ) for i in range(len(self.legend)) ] return out_files def _get_out_names_dominant_class(self, prefix, suffix, time_frame): out_files = [f"{prefix}_c_{self.spatial_res}_s_{time_frame}_{suffix}"] return out_files def _get_out_names_static(self, prefix, suffix, time_frame): out_files = [f"{prefix}_m_{self.spatial_res}_s_{time_frame}_{suffix}"] return out_files def _get_out_names_static_quantiles(self, prefix, suffix, time_frame): out_files = [] out_files.append(f"{prefix}_m_{self.spatial_res}_s_{time_frame}_{suffix}") for q in self.quantiles: formatted_p = get_percentiele_string(q) out_files.append( f"{prefix}_{formatted_p}_{self.spatial_res}_s_{time_frame}_{suffix}" ) return out_files def _get_out_names_depths_years(self, prefix, suffix): out_files = [] for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): out_files.append( f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}" ) return out_files def _get_out_names_static_depths_quantiles(self, prefix, suffix, time_frame): out_files = [] for d in range(len(self.depths) - 1): out_files.append( f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{time_frame}_{suffix}" ) for q in self.quantiles: formatted_p = get_percentiele_string(q) out_files.append( f"{prefix}_{formatted_p}_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{time_frame}_{suffix}" ) return out_files def _get_out_names_depths_years_quantiles(self, prefix, suffix): out_files = [] for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): out_files.append( f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}" ) for q in self.quantiles: formatted_p = get_percentiele_string(q) out_files.append( f"{prefix}_{formatted_p}_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}" ) return out_files def _get_out_names_depths_years_quantiles_textures(self, prefixes, suffix): if prefixes == "": prefixes = ["", "", ""] # order: prefixes = [prefix_clay, prefix_sand, prefix_silt] out_files = [] for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): for prefix in prefixes: out_files.append( f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}" ) for q in self.quantiles: formatted_p = get_percentiele_string(q) out_files.append( f"{prefix}_{formatted_p}_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}" ) return out_files def _get_out_names_depths_years_textures(self, prefixes, suffix): if prefixes == "": prefixes = ["", "", ""] # order: prefixes = [prefix_clay, prefix_sand, prefix_silt] out_files = [] for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): for prefix in prefixes: out_files.append( f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}" ) return out_files def check_all_exported(self, prefix, suffix, timeframe=None): assert (self.s3_aliases is not None) & (self.s3_prefix is not None), ( "The check requires that S3 is properly set" ) out_files = self._get_out_names(prefix, suffix, timeframe) files_in_s3 = s3_list_files(self.s3_aliases, self.s3_prefix, self.tile_id) basename_files_in_s3 = [os.path.basename(f) for f in files_in_s3] flag = True for s in out_files: if self.save_hdf5: if f"{s}.h5" not in basename_files_in_s3: flag = False if self.verbose: print(f"Missing file {s}") else: if f"{s}.tif" not in basename_files_in_s3: flag = False if self.verbose: print(f"Missing file {s}") return flag def derive_block_quantiles_and_mean(self, depths_trees_pred, expm1) -> None: assert self.mode == "depths_years_quantiles", ( "Mode must be 'depths_years_quantiles'" ) self.n_pixels = int(depths_trees_pred[0].array.shape[1] / len(self.years)) self.array = sb_arr(self.n_layers, self.n_pixels) array_t = sb_arr(self.n_pixels, self.n_layers) n_trees = depths_trees_pred[0].array.shape[0] for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): trees_avg = sb_arr(n_trees, self.n_pixels) sb.blocksAverage( trees_avg, self.n_threads, depths_trees_pred[d].array, depths_trees_pred[d + 1].array, self.n_pixels, y, ) trees_avg_t = sb_arr(self.n_pixels, n_trees) prop_mean = sb_vec(self.n_pixels) sb.transposeArray(trees_avg, self.n_threads, trees_avg_t) sb.nanMean(trees_avg_t, self.n_threads, prop_mean) if expm1: np.expm1(prop_mean, out=prop_mean) np.expm1(trees_avg_t, out=trees_avg_t) percentiles = [q * 100.0 for q in self.quantiles] offset_prop = d * ( (len(self.years) - 1) * (len(self.quantiles) + 1) ) + y * (len(self.quantiles) + 1) sb.computePercentiles( trees_avg_t, self.n_threads, range(trees_avg_t.shape[1]), array_t, range(offset_prop + 1, offset_prop + 1 + len(percentiles)), percentiles, ) array_t[:, offset_prop] = prop_mean sb.transposeArray(array_t, self.n_threads, self.array) def derive_static_depths_quantiles_and_mean(self, depths_trees_pred, expm1) -> None: assert self.mode == "static_depths_quantiles", ( "Mode must be static_depths_quantiles" ) self.n_pixels = int(depths_trees_pred[0].array.shape[1]) self.array = sb_arr(self.n_layers, self.n_pixels) array_t = sb_arr(self.n_pixels, self.n_layers) n_trees = depths_trees_pred[0].array.shape[0] for d in range(len(self.depths) - 1): trees_avg = sb_arr(n_trees, self.n_pixels) sb.elementwiseAverage( trees_avg, self.n_threads, depths_trees_pred[d].array, depths_trees_pred[d + 1].array, ) trees_avg_t = sb_arr(self.n_pixels, n_trees) prop_mean = sb_vec(self.n_pixels) sb.transposeArray(trees_avg, self.n_threads, trees_avg_t) sb.nanMean(trees_avg_t, self.n_threads, prop_mean) if expm1: np.expm1(prop_mean, out=prop_mean) np.expm1(trees_avg_t, out=trees_avg_t) percentiles = [q * 100.0 for q in self.quantiles] offset_prop = d * (len(self.quantiles) + 1) sb.computePercentiles( trees_avg_t, self.n_threads, range(trees_avg_t.shape[1]), array_t, range(offset_prop + 1, offset_prop + 1 + len(percentiles)), percentiles, ) array_t[:, offset_prop] = prop_mean sb.transposeArray(array_t, self.n_threads, self.array) def derive_block_quantiles_and_mean_textures( self, pred_depths_texture1, pred_depths_texture2, k=1.0, a=100.0 ) -> None: assert self.mode == "depths_years_quantiles_textures", ( "Mode must be 'depths_years_quantiles_textures'" ) self.n_pixels = int(pred_depths_texture1[0].array.shape[1] / len(self.years)) n_quant = len(self.quantiles) self.array = sb_arr(self.n_layers, self.n_pixels) array_t = sb_arr(self.n_pixels, self.n_layers) n_trees1 = pred_depths_texture1[0].array.shape[0] n_trees2 = pred_depths_texture2[0].array.shape[0] n_trees = min(n_trees1, n_trees2) for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): offset_clay = d * (len(self.years) - 1) * 3 * (n_quant + 1) + y * 3 * ( n_quant + 1 ) offset_sand = ( d * (len(self.years) - 1) * 3 * (n_quant + 1) + y * 3 * (n_quant + 1) + (n_quant + 1) ) offset_silt = ( d * (len(self.years) - 1) * 3 * (n_quant + 1) + y * 3 * (n_quant + 1) + 2 * (n_quant + 1) ) trees_avg_texture1 = sb_arr(n_trees1, self.n_pixels) trees_avg_texture2 = sb_arr(n_trees2, self.n_pixels) sb.blocksAverage( trees_avg_texture1, self.n_threads, pred_depths_texture1[d].array, pred_depths_texture1[d + 1].array, self.n_pixels, y, ) sb.blocksAverage( trees_avg_texture2, self.n_threads, pred_depths_texture2[d].array, pred_depths_texture2[d + 1].array, self.n_pixels, y, ) trees_avg_texture1_t = sb_arr(self.n_pixels, n_trees1) trees_avg_texture2_t = sb_arr(self.n_pixels, n_trees2) sb.transposeArray( trees_avg_texture1, self.n_threads, trees_avg_texture1_t ) sb.transposeArray( trees_avg_texture2, self.n_threads, trees_avg_texture2_t ) mean_texture1 = sb_arr(self.n_pixels, 1) mean_texture2 = sb_arr(self.n_pixels, 1) sb.nanMean(trees_avg_texture1_t, self.n_threads, mean_texture1) sb.nanMean(trees_avg_texture2_t, self.n_threads, mean_texture2) clay_mean = sb_arr(self.n_pixels, 1) sand_mean = sb_arr(self.n_pixels, 1) silt_mean = sb_arr(self.n_pixels, 1) sb.texturesBwTransform( mean_texture1, self.n_threads, mean_texture2, k, a, sand_mean, silt_mean, clay_mean, ) clay_trees = sb_arr(self.n_pixels, n_trees) sand_trees = sb_arr(self.n_pixels, n_trees) silt_trees = sb_arr(self.n_pixels, n_trees) trees_avg_texture1_t_sel = sb_arr(self.n_pixels, n_trees) trees_avg_texture2_t_sel = sb_arr(self.n_pixels, n_trees) # Randomly as many trees as the minimum number of trees sb.extractArrayCols( trees_avg_texture1_t, self.n_threads, trees_avg_texture1_t_sel, range(0, n_trees), ) sb.extractArrayCols( trees_avg_texture2_t, self.n_threads, trees_avg_texture2_t_sel, range(0, n_trees), ) sb.texturesBwTransform( trees_avg_texture1_t_sel, self.n_threads, trees_avg_texture2_t_sel, k, a, sand_trees, silt_trees, clay_trees, ) percentiles = [q * 100.0 for q in self.quantiles] sb.computePercentiles( clay_trees, self.n_threads, range(clay_trees.shape[1]), array_t, range(offset_clay + 1, offset_clay + 1 + len(percentiles)), percentiles, ) sb.computePercentiles( sand_trees, self.n_threads, range(sand_trees.shape[1]), array_t, range(offset_sand + 1, offset_sand + 1 + len(percentiles)), percentiles, ) sb.computePercentiles( silt_trees, self.n_threads, range(silt_trees.shape[1]), array_t, range(offset_silt + 1, offset_silt + 1 + len(percentiles)), percentiles, ) array_t[:, offset_clay] = clay_mean[:, 0] array_t[:, offset_sand] = sand_mean[:, 0] array_t[:, offset_silt] = silt_mean[:, 0] sb.transposeArray(array_t, self.n_threads, self.array) def derive_block_mean_textures( self, pred_depths_texture1, pred_depths_texture2, k=1.0, a=100.0 ) -> None: assert self.mode == "depths_years_textures", ( "Mode must be 'depths_years_textures'" ) self.n_pixels = int(pred_depths_texture1[0].array.shape[1] / len(self.years)) self.array = sb_arr(self.n_layers, self.n_pixels) array_t = sb_arr(self.n_pixels, self.n_layers) n_trees1 = pred_depths_texture1[0].array.shape[0] n_trees2 = pred_depths_texture2[0].array.shape[0] for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): offset_clay = d * (len(self.years) - 1) * 3 + y * 3 offset_sand = d * (len(self.years) - 1) * 3 + y * 3 + 1 offset_silt = d * (len(self.years) - 1) * 3 + y * 3 + 2 trees_avg_texture1 = sb_arr(n_trees1, self.n_pixels) trees_avg_texture2 = sb_arr(n_trees2, self.n_pixels) sb.blocksAverage( trees_avg_texture1, self.n_threads, pred_depths_texture1[d].array, pred_depths_texture1[d + 1].array, self.n_pixels, y, ) sb.blocksAverage( trees_avg_texture2, self.n_threads, pred_depths_texture2[d].array, pred_depths_texture2[d + 1].array, self.n_pixels, y, ) trees_avg_texture1_t = sb_arr(self.n_pixels, n_trees1) trees_avg_texture2_t = sb_arr(self.n_pixels, n_trees2) mean_texture1 = sb_arr(self.n_pixels, 1) mean_texture2 = sb_arr(self.n_pixels, 1) sb.transposeArray( trees_avg_texture1, self.n_threads, trees_avg_texture1_t ) sb.transposeArray( trees_avg_texture2, self.n_threads, trees_avg_texture2_t ) sb.nanMean(trees_avg_texture1_t, self.n_threads, mean_texture1) sb.nanMean(trees_avg_texture2_t, self.n_threads, mean_texture2) clay_mean = sb_arr(self.n_pixels, 1) sand_mean = sb_arr(self.n_pixels, 1) silt_mean = sb_arr(self.n_pixels, 1) sb.texturesBwTransform( mean_texture1, self.n_threads, mean_texture2, k, a, sand_mean, silt_mean, clay_mean, ) array_t[:, offset_clay] = clay_mean[:, 0] array_t[:, offset_sand] = sand_mean[:, 0] array_t[:, offset_silt] = silt_mean[:, 0] sb.transposeArray(array_t, self.n_threads, self.array) def derive_block_mean(self, depths_pred, expm1) -> None: assert self.mode == "depths_years", "Mode must be 'depths_years'" self.n_pixels = int(depths_pred[0].array.shape[1] / len(self.years)) self.array = sb_arr(self.n_layers, self.n_pixels) for d in range(len(self.depths) - 1): for y in range(len(self.years) - 1): offset_prop = d * (len(self.years) - 1) + y sb.blocksAverageVecs( self.array, self.n_threads, depths_pred[d].array, depths_pred[d + 1].array, self.n_pixels, y, offset_prop, ) if expm1: np.expm1(self.array, out=self.array) def derive_dominant_class(self, probs) -> None: assert self.mode == "dominant_class", "Mode must be 'dominant_class'" self.array = sb_arr(1, probs.shape[1]) legend_map = np.array(list(self.legend.keys()), dtype=int) self.array[0, :] = legend_map[ np.argmax(probs[:, :], axis=0).astype(int) ].astype(np.float32) def fit_probabilities(self, in_probs, input_scaling, target_scaling): assert self.mode == "probabilities", "Mode must be 'probabilities'" n_classes = in_probs.shape[0] n_pix = in_probs.shape[1] # @FIXME: for now the best N classes feature is not generalized n_best_classes = 1 self.array = sb_arr(n_classes, n_pix) array_t = sb_arr(n_pix, n_classes) in_probs_t = sb_arr(n_pix, n_classes) best_classes_t = sb_arr(n_pix, n_best_classes) sb.transposeArray(in_probs, self.n_threads, in_probs_t) sb.fitProbabilities( in_probs_t, self.n_threads, array_t, input_scaling, target_scaling, best_classes_t, n_best_classes, ) sb.transposeArray(array_t, self.n_threads, self.array) legend_map = np.array(list(self.legend.keys()), dtype=int) return legend_map[best_classes_t.T.astype(int)].astype(np.float32) def export_files( self, prefix, suffix, nodata, template_file, save_type, valid_idx, write_folder: str = ".", scaling: float = 1, scaling_metadata: Optional[float] = None, gdal_opts: Dict[str, str] = { "GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif", }, timeframe: int | None = None, n_threads_write: int | None = None, ) -> None: with TimeTracker(f" Prepare data to export for {self.tile_id}", False): if scaling_metadata is None: scaling_metadata = 1.0 / scaling if n_threads_write is None: n_threads_write = self.n_threads if self.tile_id is None: raise ValueError("Argument 'tile_id' cannot be None") out_files = self._get_out_names(prefix, suffix, timeframe) n_files = len(out_files) gdal_img = gdal.Open(template_file) x_size = gdal_img.RasterXSize y_size = gdal_img.RasterYSize n_pixels = x_size * y_size write_data = sb_arr(self.array.shape[0], n_pixels) write_data_t = sb_arr(n_pixels, self.array.shape[0]) out_data_t = sb_arr(self.array.shape[1], self.array.shape[0]) sb.transposeArray(self.array, self.n_threads, out_data_t) offset = ( 0.5 if save_type in {"byte", "uint16", "int16", "uint32", "int32"} else 0.0 ) if (offset != 0) or (scaling != 1.0): sb.scaleAndOffset(out_data_t, self.n_threads, offset, scaling) sb.fillArray(write_data_t, self.n_threads, nodata) sb.expandArrayRows(out_data_t, self.n_threads, write_data_t, valid_idx) sb.transposeArray(write_data_t, self.n_threads, write_data) tile_dir = write_folder + f"/{self.tile_id}" os.makedirs(write_folder, exist_ok=True) os.makedirs(tile_dir, exist_ok=True) compress_cmd = f"gdal_translate -a_nodata {nodata} -a_scale {scaling_metadata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048" with TimeTracker(f" Exporting data for {self.tile_id}", False): if self.save_hdf5: import h5py s3_out = ( f"{random.choice(self.s3_aliases)}/{self.s3_prefix}/{self.tile_id}" ) with h5py.File(f"{tile_dir}/{out_files[0]}.h5", "w") as f: if save_type == "byte": save_type = "uint8" f.create_dataset( "data", data=write_data, dtype=save_type, compression="lzf" ) subprocess.run( f"mc cp {tile_dir}/{out_files[0]}.h5 {s3_out}/{out_files[0]}.h5", shell=True, check=True, ) subprocess.run( f"rm {tile_dir}/{out_files[0]}.h5", shell=True, check=True ) ttprint(f"Export complete, check mc ls {s3_out}/{out_files[0]}") else: if self.s3_prefix: s3_out = [ f"{random.choice(self.s3_aliases)}/{self.s3_prefix}/{self.tile_id}" for _ in range(len(out_files)) ] sb.writeData( write_data, n_threads_write, gdal_opts, [template_file for _ in range(n_files)], tile_dir, out_files, range(n_files), 0, 0, x_size, y_size, nodata, save_type, compress_cmd, s3_out, ) ttprint(f"Export complete, check mc ls {s3_out[0]}/{out_files[0]}") else: sb.writeData( write_data, n_threads_write, gdal_opts, [template_file for _ in range(n_files)], tile_dir, out_files, range(n_files), 0, 0, x_size, y_size, nodata, save_type, compress_cmd, ) ttprint(f"Export complete, check mc {tile_dir}")