# ruff: noqa: ANN001
# ruff: noqa: ANN201
# ruff: noqa: ANN202
# ruff: noqa: ANN204
# ruff: noqa: E722
import os
import random
import re
import subprocess
import warnings
from concurrent.futures import ProcessPoolExecutor
from typing import Dict, Optional
import numpy as np
from osgeo import gdal
import skmap.set_env # noqa: F401
import skmap_bindings as sb
from skmap.catalog import DataCatalog, run_whales
from skmap.misc import TimeTracker, sb_arr, sb_vec, ttprint
mask_aggregation_bash_script = """#!/bin/bash
if [ -z "$1" ]; then
echo "Usage: $0 <input_tif_file>"
exit 1
fi
tif_file="$1"
basename_tif=$(basename "$tif_file" .tif)
output_file="comp_${basename_tif}.tif"
if [ -f "$output_file" ]; then
rm "$output_file"
fi
xmin=$(gdalinfo $tif_file | grep "Upper Left" | awk -F'[(), ]+' '{print $3}')
ymax=$(gdalinfo $tif_file | grep "Upper Left" | awk -F'[(), ]+' '{print $4}')
xmax=$(gdalinfo $tif_file | grep "Lower Right" | awk -F'[(), ]+' '{print $3}')
ymin=$(gdalinfo $tif_file | grep "Lower Right" | awk -F'[(), ]+' '{print $4}')
tmp_out=$(gdalinfo "$tif_file" | grep "Pixel Size" | awk -F'[(),]' '{print $2, $3}')
read -r pixel_size_x pixel_size_y <<< "$tmp_out"
pixel_size_y=$(awk "BEGIN {print -1 * $pixel_size_y}")
pixel_size_x_new=$(awk "BEGIN {print $pixel_size_x * <AGG_FACTOR>}")
pixel_size_y_new=$(awk "BEGIN {print $pixel_size_y * <AGG_FACTOR>}")
xmin_new=$(awk "BEGIN {print $xmin + 2 * $pixel_size_x}")
ymin_new=$(awk "BEGIN {print $ymin + 2 * $pixel_size_y}")
xmax_new=$(awk "BEGIN {print $xmax - 2 * $pixel_size_x}")
ymax_new=$(awk "BEGIN {print $ymax - 2 * $pixel_size_y}")
gdalwarp -q -te "$xmin_new" "$ymin_new" "$xmax_new" "$ymax_new" -tr "$pixel_size_x_new" "$pixel_size_y_new" -r max "$tif_file" "$output_file"
"""
[docs]
def warp_tile(tile_file, mosaic_paths, n_pix, resample):
"""
no docs
.. deprecated:: 0.10.0
Use GDAL vrt's
"""
warp_data = sb_vec(
n_pix,
)
try:
sb.warpTile(
warp_data,
1,
{"GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif"},
tile_file,
mosaic_paths,
resample,
)
except:
sb.fillArray(warp_data, 1, 0.0)
print(f"Mosaic {mosaic_paths} has no data in {tile_file}, filling with 0.0")
return warp_data
[docs]
def s3_list_files(s3_aliases, s3_prefix, tile_id, file_pattern=None):
if len(s3_aliases) == 0:
return []
bash_cmd = f"mc find {s3_aliases[0]}/{s3_prefix}/{tile_id}"
process = subprocess.Popen(
bash_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
stdout, stderr = process.communicate()
# stderr = stderr.decode('utf-8')
# assert stderr == '', f"Error listing S3 `{s3_aliases[0]}{s3_prefix}/{tile_id}`. \nError: {stderr}"
stdout = stdout.decode("utf-8")
lines = stdout.splitlines()
if file_pattern is not None:
pattern = re.compile(file_pattern)
lines = [line for line in lines if pattern.search(line)]
return lines
#
[docs]
def s3_setup(access_key, secret_key, gaia_addrs):
s3_aliases = []
s3_aliases = [f"g{i + 1}" for i, _ in enumerate(gaia_addrs)]
commands = [
f"mc alias set g{i + 1} {addr} {access_key} {secret_key} --api S3v4 > /dev/null"
for i, addr in enumerate(gaia_addrs)
]
for cmd in commands:
subprocess.run(cmd, shell=True, capture_output=False, text=True, check=True)
return s3_aliases
#
[docs]
class TiledData:
"""Base class for `TiledDataLoader` and `TiledDataExporter`"""
def __init__(
self,
n_layers: int = None,
n_pixels: int = None,
tile_id: str = None,
n_threads: int = os.cpu_count(),
) -> None:
self.tile_id = tile_id
self.n_threads = n_threads
if (n_layers is not None) & (n_pixels is not None):
self.array = sb_arr(n_layers, n_pixels)
else:
self.array = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.array = None
def convert_nan_to_value(self, value) -> None:
sb.maskNan(self.array, self.n_threads, range(self.array.shape[0]), value)
def convert_nan_to_median(self) -> None:
medians = sb_arr(self.array.shape[0], 1)
sb.computePercentiles(
self.array, self.n_threads, range(self.array.shape[1]), medians, [0], [50.0]
)
nan_indices = np.argwhere(np.isnan(medians[:, 0])).flatten()
if len(nan_indices) > 0:
for nan_idx in nan_indices:
print(
f"scikit-map ERROR 101: index {nan_idx} has all NaN for tile {self.tile_id}"
)
# raise Exception("scikit-map ERROR 101")
sb.maskNanRows(self.array, self.n_threads, range(self.array.shape[0]), medians)
[docs]
class TiledDataLoader(TiledData):
"""
Loads tiled data in parallel based on a catalog file
:param catalog: scikit-map `DataCatalog`
:param mask_template_path: ``str`` with a ``{tile_id}`` placeholder
:type mask_template_path: str
:param spatial_aggregation: Whether to use spatial aggregation Should be powers of 2?, defaults to None
:type spatial_aggregation: int, optional
:param resampling_strategy: _description_, defaults to "GRA_NearestNeighbour"
:type resampling_strategy: str, optional
:param gdal_opts: _description_, defaults to { "GDAL_HTTP_VERSION": "1.0", "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif", }
:type gdal_opts: Dict[str, str], optional
:param n_threads: The number of threads to use for whales, defaults to os.cpu_count()
:type n_threads: int, optional
:param verbose: Whether to print output, defaults to True
:type verbose: bool, optional
:param n_threads_read: Number of threads for GDAL, defaults to n_threads
:type n_threads_read: int, optional
"""
def __init__(
self,
catalog: DataCatalog,
mask_template_path: str,
spatial_aggregation: bool = None,
resampling_strategy: str = "GRA_NearestNeighbour",
gdal_opts: Dict[str, str] = {
"GDAL_HTTP_VERSION": "1.0",
"CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif",
},
n_threads: int = os.cpu_count(),
verbose: bool = True,
n_threads_read=None,
) -> None:
"""_summary_
:param catalog: _description_
:type catalog: DataCatalog
:param mask_template_path: _description_
"""
self.catalog = catalog
self.mask_template_path = mask_template_path
self.spatial_aggregation = spatial_aggregation
self.resampling_strategy = resampling_strategy
self.gdal_opts = gdal_opts
self.n_threads = n_threads
if n_threads_read is None:
self.n_threads_read = self.n_threads
else:
self.n_threads_read = n_threads_read
self.executor = ProcessPoolExecutor(max_workers=self.n_threads_read)
self.mask_path = None
self.array = None
self.array_valid = None
self.tile_id = None
self.x_off = None
self.y_off = None
self.x_size = None
self.y_size = None
self.n_pixels = None
self.mask = None
self.verbose = verbose
self.pixels_valid_idx = None
self.n_pixels_valid = None
def __exit__(self, exc_type, exc_val, exc_tb):
if os.path.exists(self.mask_path):
os.remove(self.mask_path)
ttprint(f"Temporary mask data {self.mask_path} has been deleted.")
def load_tile_data(self, tile_id):
self.tile_id = tile_id
tmp_mask_path = self.mask_template_path.format(tile_id=tile_id)
self.mask_path = (
f"/tmp/tmp_mask_{tmp_mask_path.split('/')[-1].split('.')[0]}.tif"
)
subprocess.run(
f"wget {tmp_mask_path} -q -O {self.mask_path}", shell=True, check=True
)
# @FIXME: this only work with our setting of Landsat data
if self.spatial_aggregation:
if self.verbose:
warnings.warn(
"Spatial aggregation only works with 4004x4004 mask tiles"
)
assert 4000 % self.spatial_aggregation == 0, (
"Aggregation factor must be a integer dividend of 4000"
)
mask_aggregation_bash_script_parsed = mask_aggregation_bash_script.replace(
"<AGG_FACTOR>", str(self.spatial_aggregation)
)
with open("mask_aggregation_bash_script.sh", "w") as file:
file.write(mask_aggregation_bash_script_parsed)
subprocess.run(["chmod", "+x", "mask_aggregation_bash_script.sh"])
input_file = self.mask_path
subprocess.run(["./mask_aggregation_bash_script.sh", input_file])
self.mask_path = f"comp_{input_file.split('/')[-1].split('.')[0]}.tif"
# check for valid pixels
gdal_img = gdal.Open(self.mask_path)
self.x_size = gdal_img.RasterXSize
self.y_size = gdal_img.RasterYSize
gdal_img = None
self.x_off, self.y_off = (0, 0)
self.n_pixels = self.x_size * self.y_size
# prepare mask
with TimeTracker(f" Prepare mask for tile {self.tile_id}", False):
self.mask = np.zeros((1, self.n_pixels), dtype=np.float32)
sb.readData(
self.mask,
1,
[self.mask_path],
[0],
self.x_off,
self.y_off,
self.x_size,
self.y_size,
[1],
self.gdal_opts,
)
self.pixels_valid_idx = np.arange(0, self.n_pixels)[
self.mask[0, :].astype(int).astype(bool)
]
self.n_pixels_valid = int(np.sum(self.mask))
if self.n_pixels_valid == 0:
return self
# read rasters
self.array = sb_arr(self.catalog.data_size, self.n_pixels)
with TimeTracker(
f" Read rasters and compute whales for tile {self.tile_id}", False
):
paths, paths_idx, _ = self.catalog.get_paths()
tile_paths, tile_idxs, mosaic_paths, mosaic_idxs = [], [], [], []
for path, idx in zip(paths, paths_idx):
if "{tile_id}" in path:
tile_paths += [path.format(tile_id=self.tile_id)]
tile_idxs += [idx]
else:
mosaic_paths += [path]
mosaic_idxs += [idx]
# Reading resampled data
if mosaic_paths:
# @FIXME we could copy locally the mask and then delete to reduce number of requests
tile_template_paths = [self.mask_path for i in range(len(mosaic_paths))]
futures = [
self.executor.submit(
warp_tile,
tile_template_paths[i],
mosaic_paths[i],
self.n_pixels,
self.resampling_strategy,
)
for i in range(len(mosaic_paths))
]
for i, future in enumerate(futures):
self.array[mosaic_idxs[i], :] = future.result()
# Reading tiled data
if tile_paths:
if self.spatial_aggregation:
tmp_data = sb_arr(len(tile_paths), 4000 * 4000)
sb.readData(
tmp_data,
self.n_threads_read,
tile_paths,
range(len(tile_paths)),
2,
2,
4000,
4000,
[1],
self.gdal_opts,
None,
np.nan,
)
arr_reshaped = tmp_data.reshape(len(tile_paths), 4000, 4000)
arr_aggregated = arr_reshaped.reshape(
len(tile_paths),
self.y_size,
self.spatial_aggregation,
self.x_size,
self.spatial_aggregation,
).mean(axis=(2, 4))
arr_final = arr_aggregated.reshape(
len(tile_paths), self.x_size * self.y_size
)
self.array[tile_idxs, :] = arr_final[:, :]
else:
sb.readData(
self.array,
self.n_threads_read,
tile_paths,
tile_idxs,
self.x_off,
self.y_off,
self.x_size,
self.y_size,
[1],
self.gdal_opts,
None,
np.nan,
)
# Go whales, go!!
lon_lat = sb_arr(2, self.n_pixels)
sb.getLatLonArray(
lon_lat,
self.n_threads_read,
self.gdal_opts,
self.mask_path,
self.x_off,
self.y_off,
self.x_size,
self.y_size,
)
run_whales(self.catalog, self.array, self.n_threads, lon_lat[1, :])
return self
def convert_nan_to_median(self) -> None:
medians = sb_arr(self.array.shape[0], 1)
sb.computePercentiles(
self.array, self.n_threads, range(self.array.shape[1]), medians, [0], [50.0]
)
nan_indices = np.argwhere(np.isnan(medians[:, 0])).flatten()
if len(nan_indices) > 0:
for nan_idx in nan_indices:
group_name, feature_name = self.catalog.find_group_and_feature_by_index(
nan_idx
)
print(
f"scikit-map ERROR 101: index {nan_idx} corresponding to group {group_name} and feature {feature_name} has all NaN for tile {self.tile_id}"
)
# raise Exception("scikit-map ERROR 101")
sb.maskNanRows(self.array, self.n_threads, range(self.array.shape[0]), medians)
def filter_valid_pixels(self) -> None:
self.array_valid = sb_arr(self.catalog.data_size, self.n_pixels_valid)
sb.selArrayCols(
self.array, self.n_threads, self.array_valid, self.pixels_valid_idx
)
def expand_valid_pixels(self, array_valid, array_expanded) -> None:
sb.expandArrayCols(
array_valid, self.n_threads, array_expanded, self.pixels_valid_idx
)
def get_pixels_valid_idx(self, n_groups):
return np.concatenate(
[self.pixels_valid_idx + self.n_pixels * i for i in range(n_groups)]
).tolist()
def fill_otf_constant(self, otf_name, otf_const) -> None:
otf_idx = self.catalog.get_otf_idx()
assert otf_name in otf_idx
otf_name_idx = otf_idx[otf_name]
self.array[otf_name_idx] = otf_const
[docs]
def get_percentiele_string(q):
if int(q / 0.1) == float(q / 0.1):
formatted_p = (
"p0" if (q == 0) else ("p100" if (q == 1) else f"p{int(q / 0.1)}0")
)
else:
formatted_p = (
"p0" if (q == 0) else ("p100" if (q == 1) else str(q).replace("0.", "p"))
)
return formatted_p
[docs]
class TiledDataExporter(TiledData):
def __init__(
self,
n_pixels: int = None,
tile_id: str = None,
mode: str = None,
spatial_res: str = None,
s3_params=None,
verbose=False,
years=None,
depths=None,
legend=None,
quantiles=None,
save_hdf5=None,
n_threads: int = os.cpu_count(),
) -> None:
if s3_params:
self.s3_aliases = s3_setup(
s3_params["s3_access_key"],
s3_params["s3_secret_key"],
s3_params["s3_addresses"],
)
self.s3_prefix = s3_params["s3_prefix"]
else:
self.s3_aliases = None
self.s3_prefix = None
self.spatial_res = spatial_res
self.n_pixels = n_pixels
self.mode = mode
self.verbose = verbose
self.tile_id = tile_id
self.n_threads = n_threads
self.years = years
self.depths = depths
self.legend = legend
self.quantiles = quantiles
self.save_hdf5 = save_hdf5
if self.mode == "depths_years_quantiles_textures":
assert (
(years is not None) & (depths is not None) & (quantiles is not None)
), "Need to provide years, depths, quantiles"
elif self.mode == "depths_years_quantiles":
assert (
(years is not None) & (depths is not None) & (quantiles is not None)
), "Need to provide years, depths, quantiles"
elif self.mode == "static_depths_quantiles":
assert (depths is not None) & (quantiles is not None), (
"Need to provide depths, quantiles"
)
elif self.mode == "static_quantiles":
assert quantiles is not None, "Need to provide quantiles"
elif self.mode == "depths_years":
assert (years is not None) & (depths is not None), (
"Need to provide years, depths"
)
elif self.mode == "depths_years_textures":
assert (years is not None) & (depths is not None), (
"Need to provide years, depths"
)
elif self.mode == "years":
assert years is not None, "Need to provide years"
elif self.mode == "probabilities":
assert legend is not None, "Need to provide legend"
elif (self.mode == "static") | (self.mode == "dominant_class"):
pass
else:
raise Exception(
"Available modes: dominant_class, probabilities, depths_years_quantiles_textures, depths_years_quantiles, depths_years, depths_years_textures, years, static_depths_quantiles, static_quantiles, static"
)
self.n_layers = len(self._get_out_names("", ""))
if (self.n_layers is not None) & (self.n_pixels is not None):
self.array = sb_arr(self.n_layers, n_pixels)
else:
self.array = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.array = None
def _get_out_names(
self, prefix: str, suffix: str, time_frame: Optional[str] = None
):
if self.save_hdf5:
return [f"{prefix.format(CLASS='CLASS')}_{suffix}"]
if self.mode == "depths_years_quantiles_textures":
return self._get_out_names_depths_years_quantiles_textures(prefix, suffix)
if self.mode == "depths_years_quantiles":
return self._get_out_names_depths_years_quantiles(prefix, suffix)
elif self.mode == "depths_years":
return self._get_out_names_depths_years(prefix, suffix)
elif self.mode == "depths_years_textures":
return self._get_out_names_depths_years_textures(prefix, suffix)
elif self.mode == "static_depths_quantiles":
return self._get_out_names_static_depths_quantiles(
prefix, suffix, time_frame
)
elif self.mode == "static_quantiles":
return self._get_out_names_static_quantiles(prefix, suffix, time_frame)
elif self.mode == "static":
return self._get_out_names_static(prefix, suffix, time_frame)
elif self.mode == "probabilities":
return self._get_out_names_probabilities(prefix, suffix, time_frame)
elif self.mode == "dominant_class":
return self._get_out_names_dominant_class(prefix, suffix, time_frame)
elif self.mode == "years":
return self._get_out_names_years(prefix, suffix)
else:
raise Exception(
"Available modes: dominant_class, probabilities, depths_years_quantiles_textures, depths_years_quantiles, depths_years, depths_years_textures, years, static_depths_quantiles, static_quantiles, static"
)
def _get_out_names_years(self, prefix, suffix):
out_files = []
for y in self.years:
out_files.append(
f"{prefix}_m_{self.spatial_res}_s_{y}0101_{y}1231_{suffix}"
)
return out_files
def _get_out_names_probabilities(self, prefix, suffix, time_frame):
out_files = [
f"{prefix}_p_{self.spatial_res}_s_{time_frame}_{suffix}".format(
CLASS=self.legend[str(i)]
)
for i in range(len(self.legend))
]
return out_files
def _get_out_names_dominant_class(self, prefix, suffix, time_frame):
out_files = [f"{prefix}_c_{self.spatial_res}_s_{time_frame}_{suffix}"]
return out_files
def _get_out_names_static(self, prefix, suffix, time_frame):
out_files = [f"{prefix}_m_{self.spatial_res}_s_{time_frame}_{suffix}"]
return out_files
def _get_out_names_static_quantiles(self, prefix, suffix, time_frame):
out_files = []
out_files.append(f"{prefix}_m_{self.spatial_res}_s_{time_frame}_{suffix}")
for q in self.quantiles:
formatted_p = get_percentiele_string(q)
out_files.append(
f"{prefix}_{formatted_p}_{self.spatial_res}_s_{time_frame}_{suffix}"
)
return out_files
def _get_out_names_depths_years(self, prefix, suffix):
out_files = []
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
out_files.append(
f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}"
)
return out_files
def _get_out_names_static_depths_quantiles(self, prefix, suffix, time_frame):
out_files = []
for d in range(len(self.depths) - 1):
out_files.append(
f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{time_frame}_{suffix}"
)
for q in self.quantiles:
formatted_p = get_percentiele_string(q)
out_files.append(
f"{prefix}_{formatted_p}_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{time_frame}_{suffix}"
)
return out_files
def _get_out_names_depths_years_quantiles(self, prefix, suffix):
out_files = []
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
out_files.append(
f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}"
)
for q in self.quantiles:
formatted_p = get_percentiele_string(q)
out_files.append(
f"{prefix}_{formatted_p}_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}"
)
return out_files
def _get_out_names_depths_years_quantiles_textures(self, prefixes, suffix):
if prefixes == "":
prefixes = ["", "", ""]
# order: prefixes = [prefix_clay, prefix_sand, prefix_silt]
out_files = []
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
for prefix in prefixes:
out_files.append(
f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}"
)
for q in self.quantiles:
formatted_p = get_percentiele_string(q)
out_files.append(
f"{prefix}_{formatted_p}_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}"
)
return out_files
def _get_out_names_depths_years_textures(self, prefixes, suffix):
if prefixes == "":
prefixes = ["", "", ""]
# order: prefixes = [prefix_clay, prefix_sand, prefix_silt]
out_files = []
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
for prefix in prefixes:
out_files.append(
f"{prefix}_m_{self.spatial_res}_b{self.depths[d]}cm..{self.depths[d + 1]}cm_{self.years[y]}0101_{self.years[y + 1]}1231_{suffix}"
)
return out_files
def check_all_exported(self, prefix, suffix, timeframe=None):
assert (self.s3_aliases is not None) & (self.s3_prefix is not None), (
"The check requires that S3 is properly set"
)
out_files = self._get_out_names(prefix, suffix, timeframe)
files_in_s3 = s3_list_files(self.s3_aliases, self.s3_prefix, self.tile_id)
basename_files_in_s3 = [os.path.basename(f) for f in files_in_s3]
flag = True
for s in out_files:
if self.save_hdf5:
if f"{s}.h5" not in basename_files_in_s3:
flag = False
if self.verbose:
print(f"Missing file {s}")
else:
if f"{s}.tif" not in basename_files_in_s3:
flag = False
if self.verbose:
print(f"Missing file {s}")
return flag
def derive_block_quantiles_and_mean(self, depths_trees_pred, expm1) -> None:
assert self.mode == "depths_years_quantiles", (
"Mode must be 'depths_years_quantiles'"
)
self.n_pixels = int(depths_trees_pred[0].array.shape[1] / len(self.years))
self.array = sb_arr(self.n_layers, self.n_pixels)
array_t = sb_arr(self.n_pixels, self.n_layers)
n_trees = depths_trees_pred[0].array.shape[0]
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
trees_avg = sb_arr(n_trees, self.n_pixels)
sb.blocksAverage(
trees_avg,
self.n_threads,
depths_trees_pred[d].array,
depths_trees_pred[d + 1].array,
self.n_pixels,
y,
)
trees_avg_t = sb_arr(self.n_pixels, n_trees)
prop_mean = sb_vec(self.n_pixels)
sb.transposeArray(trees_avg, self.n_threads, trees_avg_t)
sb.nanMean(trees_avg_t, self.n_threads, prop_mean)
if expm1:
np.expm1(prop_mean, out=prop_mean)
np.expm1(trees_avg_t, out=trees_avg_t)
percentiles = [q * 100.0 for q in self.quantiles]
offset_prop = d * (
(len(self.years) - 1) * (len(self.quantiles) + 1)
) + y * (len(self.quantiles) + 1)
sb.computePercentiles(
trees_avg_t,
self.n_threads,
range(trees_avg_t.shape[1]),
array_t,
range(offset_prop + 1, offset_prop + 1 + len(percentiles)),
percentiles,
)
array_t[:, offset_prop] = prop_mean
sb.transposeArray(array_t, self.n_threads, self.array)
def derive_static_depths_quantiles_and_mean(self, depths_trees_pred, expm1) -> None:
assert self.mode == "static_depths_quantiles", (
"Mode must be static_depths_quantiles"
)
self.n_pixels = int(depths_trees_pred[0].array.shape[1])
self.array = sb_arr(self.n_layers, self.n_pixels)
array_t = sb_arr(self.n_pixels, self.n_layers)
n_trees = depths_trees_pred[0].array.shape[0]
for d in range(len(self.depths) - 1):
trees_avg = sb_arr(n_trees, self.n_pixels)
sb.elementwiseAverage(
trees_avg,
self.n_threads,
depths_trees_pred[d].array,
depths_trees_pred[d + 1].array,
)
trees_avg_t = sb_arr(self.n_pixels, n_trees)
prop_mean = sb_vec(self.n_pixels)
sb.transposeArray(trees_avg, self.n_threads, trees_avg_t)
sb.nanMean(trees_avg_t, self.n_threads, prop_mean)
if expm1:
np.expm1(prop_mean, out=prop_mean)
np.expm1(trees_avg_t, out=trees_avg_t)
percentiles = [q * 100.0 for q in self.quantiles]
offset_prop = d * (len(self.quantiles) + 1)
sb.computePercentiles(
trees_avg_t,
self.n_threads,
range(trees_avg_t.shape[1]),
array_t,
range(offset_prop + 1, offset_prop + 1 + len(percentiles)),
percentiles,
)
array_t[:, offset_prop] = prop_mean
sb.transposeArray(array_t, self.n_threads, self.array)
def derive_block_quantiles_and_mean_textures(
self, pred_depths_texture1, pred_depths_texture2, k=1.0, a=100.0
) -> None:
assert self.mode == "depths_years_quantiles_textures", (
"Mode must be 'depths_years_quantiles_textures'"
)
self.n_pixels = int(pred_depths_texture1[0].array.shape[1] / len(self.years))
n_quant = len(self.quantiles)
self.array = sb_arr(self.n_layers, self.n_pixels)
array_t = sb_arr(self.n_pixels, self.n_layers)
n_trees1 = pred_depths_texture1[0].array.shape[0]
n_trees2 = pred_depths_texture2[0].array.shape[0]
n_trees = min(n_trees1, n_trees2)
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
offset_clay = d * (len(self.years) - 1) * 3 * (n_quant + 1) + y * 3 * (
n_quant + 1
)
offset_sand = (
d * (len(self.years) - 1) * 3 * (n_quant + 1)
+ y * 3 * (n_quant + 1)
+ (n_quant + 1)
)
offset_silt = (
d * (len(self.years) - 1) * 3 * (n_quant + 1)
+ y * 3 * (n_quant + 1)
+ 2 * (n_quant + 1)
)
trees_avg_texture1 = sb_arr(n_trees1, self.n_pixels)
trees_avg_texture2 = sb_arr(n_trees2, self.n_pixels)
sb.blocksAverage(
trees_avg_texture1,
self.n_threads,
pred_depths_texture1[d].array,
pred_depths_texture1[d + 1].array,
self.n_pixels,
y,
)
sb.blocksAverage(
trees_avg_texture2,
self.n_threads,
pred_depths_texture2[d].array,
pred_depths_texture2[d + 1].array,
self.n_pixels,
y,
)
trees_avg_texture1_t = sb_arr(self.n_pixels, n_trees1)
trees_avg_texture2_t = sb_arr(self.n_pixels, n_trees2)
sb.transposeArray(
trees_avg_texture1, self.n_threads, trees_avg_texture1_t
)
sb.transposeArray(
trees_avg_texture2, self.n_threads, trees_avg_texture2_t
)
mean_texture1 = sb_arr(self.n_pixels, 1)
mean_texture2 = sb_arr(self.n_pixels, 1)
sb.nanMean(trees_avg_texture1_t, self.n_threads, mean_texture1)
sb.nanMean(trees_avg_texture2_t, self.n_threads, mean_texture2)
clay_mean = sb_arr(self.n_pixels, 1)
sand_mean = sb_arr(self.n_pixels, 1)
silt_mean = sb_arr(self.n_pixels, 1)
sb.texturesBwTransform(
mean_texture1,
self.n_threads,
mean_texture2,
k,
a,
sand_mean,
silt_mean,
clay_mean,
)
clay_trees = sb_arr(self.n_pixels, n_trees)
sand_trees = sb_arr(self.n_pixels, n_trees)
silt_trees = sb_arr(self.n_pixels, n_trees)
trees_avg_texture1_t_sel = sb_arr(self.n_pixels, n_trees)
trees_avg_texture2_t_sel = sb_arr(self.n_pixels, n_trees)
# Randomly as many trees as the minimum number of trees
sb.extractArrayCols(
trees_avg_texture1_t,
self.n_threads,
trees_avg_texture1_t_sel,
range(0, n_trees),
)
sb.extractArrayCols(
trees_avg_texture2_t,
self.n_threads,
trees_avg_texture2_t_sel,
range(0, n_trees),
)
sb.texturesBwTransform(
trees_avg_texture1_t_sel,
self.n_threads,
trees_avg_texture2_t_sel,
k,
a,
sand_trees,
silt_trees,
clay_trees,
)
percentiles = [q * 100.0 for q in self.quantiles]
sb.computePercentiles(
clay_trees,
self.n_threads,
range(clay_trees.shape[1]),
array_t,
range(offset_clay + 1, offset_clay + 1 + len(percentiles)),
percentiles,
)
sb.computePercentiles(
sand_trees,
self.n_threads,
range(sand_trees.shape[1]),
array_t,
range(offset_sand + 1, offset_sand + 1 + len(percentiles)),
percentiles,
)
sb.computePercentiles(
silt_trees,
self.n_threads,
range(silt_trees.shape[1]),
array_t,
range(offset_silt + 1, offset_silt + 1 + len(percentiles)),
percentiles,
)
array_t[:, offset_clay] = clay_mean[:, 0]
array_t[:, offset_sand] = sand_mean[:, 0]
array_t[:, offset_silt] = silt_mean[:, 0]
sb.transposeArray(array_t, self.n_threads, self.array)
def derive_block_mean_textures(
self, pred_depths_texture1, pred_depths_texture2, k=1.0, a=100.0
) -> None:
assert self.mode == "depths_years_textures", (
"Mode must be 'depths_years_textures'"
)
self.n_pixels = int(pred_depths_texture1[0].array.shape[1] / len(self.years))
self.array = sb_arr(self.n_layers, self.n_pixels)
array_t = sb_arr(self.n_pixels, self.n_layers)
n_trees1 = pred_depths_texture1[0].array.shape[0]
n_trees2 = pred_depths_texture2[0].array.shape[0]
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
offset_clay = d * (len(self.years) - 1) * 3 + y * 3
offset_sand = d * (len(self.years) - 1) * 3 + y * 3 + 1
offset_silt = d * (len(self.years) - 1) * 3 + y * 3 + 2
trees_avg_texture1 = sb_arr(n_trees1, self.n_pixels)
trees_avg_texture2 = sb_arr(n_trees2, self.n_pixels)
sb.blocksAverage(
trees_avg_texture1,
self.n_threads,
pred_depths_texture1[d].array,
pred_depths_texture1[d + 1].array,
self.n_pixels,
y,
)
sb.blocksAverage(
trees_avg_texture2,
self.n_threads,
pred_depths_texture2[d].array,
pred_depths_texture2[d + 1].array,
self.n_pixels,
y,
)
trees_avg_texture1_t = sb_arr(self.n_pixels, n_trees1)
trees_avg_texture2_t = sb_arr(self.n_pixels, n_trees2)
mean_texture1 = sb_arr(self.n_pixels, 1)
mean_texture2 = sb_arr(self.n_pixels, 1)
sb.transposeArray(
trees_avg_texture1, self.n_threads, trees_avg_texture1_t
)
sb.transposeArray(
trees_avg_texture2, self.n_threads, trees_avg_texture2_t
)
sb.nanMean(trees_avg_texture1_t, self.n_threads, mean_texture1)
sb.nanMean(trees_avg_texture2_t, self.n_threads, mean_texture2)
clay_mean = sb_arr(self.n_pixels, 1)
sand_mean = sb_arr(self.n_pixels, 1)
silt_mean = sb_arr(self.n_pixels, 1)
sb.texturesBwTransform(
mean_texture1,
self.n_threads,
mean_texture2,
k,
a,
sand_mean,
silt_mean,
clay_mean,
)
array_t[:, offset_clay] = clay_mean[:, 0]
array_t[:, offset_sand] = sand_mean[:, 0]
array_t[:, offset_silt] = silt_mean[:, 0]
sb.transposeArray(array_t, self.n_threads, self.array)
def derive_block_mean(self, depths_pred, expm1) -> None:
assert self.mode == "depths_years", "Mode must be 'depths_years'"
self.n_pixels = int(depths_pred[0].array.shape[1] / len(self.years))
self.array = sb_arr(self.n_layers, self.n_pixels)
for d in range(len(self.depths) - 1):
for y in range(len(self.years) - 1):
offset_prop = d * (len(self.years) - 1) + y
sb.blocksAverageVecs(
self.array,
self.n_threads,
depths_pred[d].array,
depths_pred[d + 1].array,
self.n_pixels,
y,
offset_prop,
)
if expm1:
np.expm1(self.array, out=self.array)
def derive_dominant_class(self, probs) -> None:
assert self.mode == "dominant_class", "Mode must be 'dominant_class'"
self.array = sb_arr(1, probs.shape[1])
legend_map = np.array(list(self.legend.keys()), dtype=int)
self.array[0, :] = legend_map[
np.argmax(probs[:, :], axis=0).astype(int)
].astype(np.float32)
def fit_probabilities(self, in_probs, input_scaling, target_scaling):
assert self.mode == "probabilities", "Mode must be 'probabilities'"
n_classes = in_probs.shape[0]
n_pix = in_probs.shape[1]
# @FIXME: for now the best N classes feature is not generalized
n_best_classes = 1
self.array = sb_arr(n_classes, n_pix)
array_t = sb_arr(n_pix, n_classes)
in_probs_t = sb_arr(n_pix, n_classes)
best_classes_t = sb_arr(n_pix, n_best_classes)
sb.transposeArray(in_probs, self.n_threads, in_probs_t)
sb.fitProbabilities(
in_probs_t,
self.n_threads,
array_t,
input_scaling,
target_scaling,
best_classes_t,
n_best_classes,
)
sb.transposeArray(array_t, self.n_threads, self.array)
legend_map = np.array(list(self.legend.keys()), dtype=int)
return legend_map[best_classes_t.T.astype(int)].astype(np.float32)
def export_files(
self,
prefix,
suffix,
nodata,
template_file,
save_type,
valid_idx,
write_folder: str = ".",
scaling: float = 1,
scaling_metadata: Optional[float] = None,
gdal_opts: Dict[str, str] = {
"GDAL_HTTP_VERSION": "1.0",
"CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif",
},
timeframe: int | None = None,
n_threads_write: int | None = None,
) -> None:
with TimeTracker(f" Prepare data to export for {self.tile_id}", False):
if scaling_metadata is None:
scaling_metadata = 1.0 / scaling
if n_threads_write is None:
n_threads_write = self.n_threads
if self.tile_id is None:
raise ValueError("Argument 'tile_id' cannot be None")
out_files = self._get_out_names(prefix, suffix, timeframe)
n_files = len(out_files)
gdal_img = gdal.Open(template_file)
x_size = gdal_img.RasterXSize
y_size = gdal_img.RasterYSize
n_pixels = x_size * y_size
write_data = sb_arr(self.array.shape[0], n_pixels)
write_data_t = sb_arr(n_pixels, self.array.shape[0])
out_data_t = sb_arr(self.array.shape[1], self.array.shape[0])
sb.transposeArray(self.array, self.n_threads, out_data_t)
offset = (
0.5
if save_type in {"byte", "uint16", "int16", "uint32", "int32"}
else 0.0
)
if (offset != 0) or (scaling != 1.0):
sb.scaleAndOffset(out_data_t, self.n_threads, offset, scaling)
sb.fillArray(write_data_t, self.n_threads, nodata)
sb.expandArrayRows(out_data_t, self.n_threads, write_data_t, valid_idx)
sb.transposeArray(write_data_t, self.n_threads, write_data)
tile_dir = write_folder + f"/{self.tile_id}"
os.makedirs(write_folder, exist_ok=True)
os.makedirs(tile_dir, exist_ok=True)
compress_cmd = f"gdal_translate -a_nodata {nodata} -a_scale {scaling_metadata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048"
with TimeTracker(f" Exporting data for {self.tile_id}", False):
if self.save_hdf5:
import h5py
s3_out = (
f"{random.choice(self.s3_aliases)}/{self.s3_prefix}/{self.tile_id}"
)
with h5py.File(f"{tile_dir}/{out_files[0]}.h5", "w") as f:
if save_type == "byte":
save_type = "uint8"
f.create_dataset(
"data", data=write_data, dtype=save_type, compression="lzf"
)
subprocess.run(
f"mc cp {tile_dir}/{out_files[0]}.h5 {s3_out}/{out_files[0]}.h5",
shell=True,
check=True,
)
subprocess.run(
f"rm {tile_dir}/{out_files[0]}.h5", shell=True, check=True
)
ttprint(f"Export complete, check mc ls {s3_out}/{out_files[0]}")
else:
if self.s3_prefix:
s3_out = [
f"{random.choice(self.s3_aliases)}/{self.s3_prefix}/{self.tile_id}"
for _ in range(len(out_files))
]
sb.writeData(
write_data,
n_threads_write,
gdal_opts,
[template_file for _ in range(n_files)],
tile_dir,
out_files,
range(n_files),
0,
0,
x_size,
y_size,
nodata,
save_type,
compress_cmd,
s3_out,
)
ttprint(f"Export complete, check mc ls {s3_out[0]}/{out_files[0]}")
else:
sb.writeData(
write_data,
n_threads_write,
gdal_opts,
[template_file for _ in range(n_files)],
tile_dir,
out_files,
range(n_files),
0,
0,
x_size,
y_size,
nodata,
save_type,
compress_cmd,
)
ttprint(f"Export complete, check mc {tile_dir}")