import os
import random
import threading
from typing import Callable, List, NoReturn, Optional
import joblib
import numpy as np
from joblib import Parallel, delayed
from sklearn.ensemble import RandomForestRegressor
import skmap.set_env # noqa: F401
import skmap_bindings as sb
from skmap.misc import TimeTracker, _make_dir, _rm_dir, sb_arr
from skmap.tiled_data import TiledData, TiledDataLoader
def _get_out_files_depths(
out_files_prefix,
out_files_suffix,
tile_id,
depths,
n_depths,
years,
n_years,
n_stats,
):
assert len(out_files_prefix) == len(out_files_suffix)
assert len(out_files_prefix) == n_stats
assert len(depths) >= n_depths
assert len(years) >= n_years
out_files = []
for i in range(n_depths):
for k in range(n_stats):
for j in range(n_years):
if n_years < len(years):
y1 = years[j]
y2 = years[j + len(years) - n_years]
if n_depths < len(depths):
d1 = depths[i]
d2 = depths[i + len(depths) - n_depths]
file = f"{out_files_prefix[k]}_b{d1}cm..{d2}cm_{y1}0101_{y2}1231_tile.{tile_id}_{out_files_suffix[k]}"
else:
d1 = depths[i]
file = f"{out_files_prefix[k]}_b{d1}cm_{y1}0101_{y2}1231_tile.{tile_id}_{out_files_suffix[k]}"
else:
y1 = years[j]
if n_depths < len(depths):
d1 = depths[i]
d2 = depths[i + len(depths) - n_depths]
file = f"{out_files_prefix[k]}_b{d1}cm..{d2}cm_{y1}0101_{y1}1231_tile.{tile_id}_{out_files_suffix[k]}"
else:
d1 = depths[i]
file = f"{out_files_prefix[k]}_b{d1}cm_{y1}0101_{y1}1231_tile.{tile_id}_{out_files_suffix[k]}"
out_files.append(file)
return out_files
def _tree_based_load_model(model_path):
if model_path.endswith((".joblib", ".lz4", ".pkl")):
model = joblib.load(model_path)
def predict_fn(predictor, data):
return predictor.predict(data)
elif model_path.endswith(".so"):
import tl2cgen
try:
tl2cgen.util.check_if_fast()
except:
print(
"The current installation of tl2cgen is not the one with parallel DMatrix and can be slow"
)
model = tl2cgen.Predictor(model_path, nthread=os.cpu_count())
def predict_tl2cgen(predictor, data):
dmat = tl2cgen.DMatrix(data, dtype="float32")
res = predictor.predict(dmat)
for a in range(len(res.shape)):
if res.shape[a] == 1:
res = np.squeeze(res, axis=a)
break
for a in range(len(res.shape)):
if res.shape[a] == 1:
res = np.squeeze(res, axis=a)
break
return res
predict_fn = predict_tl2cgen
else:
raise ValueError(f"Invalid model path extension '{model_path}'")
return model, predict_fn
[docs]
class Modeler:
def __init__(
self,
model_path: str,
model_covs_path: str = None,
predict_fn: Callable = lambda predictor, data: predictor.predict(data),
) -> None:
assert os.path.exists(model_path), f"Model path {model_path} do not exist"
self.model_path = model_path
self.model_covs_path = model_covs_path
self.predict_fn = predict_fn
self.in_covs_t = None
self.in_covs = None
self.in_covs_valid = None
self.model = None
self.model_covs = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.in_covs_t = None
self.in_covs = None
self.in_covs_valid = None
self.model = None
self.model_covs = None
def _load_model(self) -> None:
if self.model_path.endswith((".joblib", ".lz4", ".pkl")):
model = joblib.load(self.model_path)
else:
raise ValueError(f"Invalid model path extension '{self.model_path}'")
self.model = model
def _load_covs(self) -> None:
if self.model_covs_path is not None:
with open(self.model_covs_path, "r") as file:
model_covs = [line.strip() for line in file]
elif hasattr(self.model, "feature_names_in_"):
model_covs = list(self.model.feature_names_in_)
elif hasattr(self.model, "feature_names_"):
model_covs = self.model.feature_names_
elif hasattr(self.model, "feature_name"):
model_covs = self.model.feature_name()
else:
base, _ = os.path.splitext(self.model_path)
covs_path = base + ".covs"
if os.path.exists(covs_path):
with open(covs_path, "r") as file:
model_covs = [line.strip() for line in file]
else:
raise ValueError("No feature names was found")
self.model_covs = model_covs
def _prepare_covariates(self, data: TiledDataLoader) -> None:
# prepare input and output arrays
n_groups = len(data.catalog.get_groups())
n_samples = n_groups * data.n_pixels
n_samples_valid = n_groups * data.n_pixels_valid
self.in_covs_t = sb_arr(len(self.model_covs), n_samples)
self.in_covs = sb_arr(n_samples, len(self.model_covs))
self.in_covs_valid = sb_arr(n_samples_valid, len(self.model_covs))
# transpose data
matrix_idx = data.catalog._get_covs_idx(self.model_covs)
sb.reorderArray(data.array, data.n_threads, self.in_covs_t, matrix_idx)
sb.transposeArray(self.in_covs_t, data.n_threads, self.in_covs)
sb.selArrayRows(
self.in_covs,
data.n_threads,
self.in_covs_valid,
data.get_pixels_valid_idx(n_groups),
)
def predict() -> NoReturn:
raise NotImplementedError()
#################################################################################################################################
######################################## Regressors ########################################################################
#################################################################################################################################
[docs]
class Regressor(Modeler):
def __init__(
self,
model_path: str,
model_covs_path: str = None,
n_responses: int = 1,
predict_fn: Callable = None,
) -> None:
super().__init__(model_path, model_covs_path, predict_fn)
self.n_responses = n_responses
#
[docs]
class RFRegressor(Regressor):
def __init__(
self,
model_path: str,
model_covs_path: str = None,
n_responses: int = 1,
predict_fn: Callable = None,
) -> None:
super().__init__(model_path, model_covs_path, n_responses, predict_fn)
self.model, self.predict_fn = _tree_based_load_model(model_path)
if predict_fn:
self.predict_fn = predict_fn
self._load_covs()
def predict(self, data: TiledData):
# prepare input and output arrays
with TimeTracker(" Transpose data", False):
self._prepare_covariates(data)
# predict
with TimeTracker(" Model prediction", False):
result = TiledData(
self.n_responses, self.in_covs_valid.shape[0], data.tile_id
)
assert self.n_responses == 1, (
"Do not yet manage the case for multiple respounces"
)
result.array[0, :] = self.predict_fn(self.model, self.in_covs_valid).astype(
np.float32
)
return result # shape: (n_responses, n_samples)
#
[docs]
class RFRegressorTrees(Regressor):
def __init__(
self,
model_path: str,
model_covs_path: str = None,
n_responses: int = 1,
predict_fn: Callable = None,
) -> None:
super().__init__(model_path, model_covs_path, n_responses, predict_fn)
self._load_model()
assert isinstance(self.model, RandomForestRegressor), (
"The model must be of type sklearn.ensemble.RandomForestRegressor"
)
self.n_trees = self.model.n_estimators
self._load_covs()
def predict(self, data: TiledData):
# prepare input and output arrays
with TimeTracker(" Transpose data", False):
self._prepare_covariates(data)
# predict
with TimeTracker(" Model prediction", False):
def _single_prediction(predict, X, out, i, lock) -> None:
prediction = predict(X, check_input=False)
with lock:
out[i, :] = prediction
self.in_covs_valid = self.model._validate_X_predict(self.in_covs_valid)
assert self.model.n_outputs_ == 1
result = TiledData(self.n_trees, self.in_covs_valid.shape[0], data.tile_id)
# Assign chunk of trees to jobs
n_jobs = min(self.n_trees, data.n_threads)
# Parallel loop prediction
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=self.model.verbose, require="sharedmem")(
delayed(_single_prediction)(
self.model.estimators_[i].predict,
self.in_covs_valid,
result.array,
i,
lock,
)
for i in range(self.n_trees)
)
return result # shape: (n_responses, n_samples)
#################################################################################################################################
######################################## Classifiers ########################################################################
#################################################################################################################################
[docs]
class Classifier(Modeler):
def __init__(
self,
model_path: str,
model_covs_path: str = None,
n_class: int = 1,
predict_fn: Callable = lambda predictor, data: predictor.predict_proba(data),
) -> None:
super().__init__(model_path, model_covs_path, predict_fn)
self.n_class = n_class
#
[docs]
class RFClassifier(Classifier):
def __init__(
self,
model_path: str,
model_covs_path: str = None,
n_class: int = 1,
predict_fn: Callable = None,
) -> None:
super().__init__(model_path, model_covs_path, n_class, predict_fn)
self.model, self.predict_fn = _tree_based_load_model(model_path)
if predict_fn:
self.predict_fn = predict_fn
self._load_covs()
def predict(self, data: TiledData):
# prepare input and output arrays
with TimeTracker(" Transpose data", False):
self._prepare_covariates(data)
# predict
with TimeTracker(" Model prediction", False):
result = TiledData(self.n_class, self.in_covs_valid.shape[0], data.tile_id)
if self.n_class == 1:
result.array[0, :] = self.predict_fn(
self.model, self.in_covs_valid
).astype(np.float32)
else:
tmp_res_t = self.predict_fn(self.model, self.in_covs_valid)
if self.n_class != 1:
with TimeTracker(" Convert and back transpose data", False):
if tmp_res_t.dtype == np.float64:
tmp_res_cast_t = sb_arr(tmp_res_t.shape[0], tmp_res_t.shape[1])
sb.castFloat64ToFloat32(tmp_res_t, data.n_threads, tmp_res_cast_t)
sb.transposeArray(tmp_res_cast_t, data.n_threads, result.array)
elif tmp_res_t.dtype == np.float32:
sb.transposeArray(tmp_res_t, data.n_threads, result.array)
else:
print(
"Result prediction are not in float32 nor float64, converting with python (can be slow)"
)
tmp_res_t = tmp_res_t.astype(np.float32)
sb.transposeArray(tmp_res_t, data.n_threads, result.array)
return result # shape: (n_class, n_samples)
#################################################################################################################################
######################################## Others ########################################################################
#################################################################################################################################
[docs]
class Predicted:
def __init__(self, data: TiledDataLoader, depths) -> None:
self.data = data
assert self.data is not None
self.depths = depths
self.groups = self.data.catalog.get_groups()
self.n_groups = len(self.groups)
# TODO optimize shape of self.out_trees
self._out_valid = np.empty(
(self.n_groups, self.data.n_pixels_valid), dtype=np.float32
)
self._out_stats_valid = None
self._out_stats = None
self._out_stats_t = None
self._out_stats_gdal = None
self.n_stats = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._out_valid = None
self._out_stats_valid = None
self._out_stats = None
self._out_stats_t = None
self._out_stats_gdal = None
@property
def predicted_trees(self):
# self.predicted_trees shape: (n_depths, n_trees, n_years, n_pixels_valid)
return self._out_valid[: self.n_depths, :, : self.n_groups, :]
@property
def predicted_stats(self):
# self.predicted_stats shape: (n_years, n_pixels_valid, n_depths, n_stats)
if self._out_stats_valid is not None:
return self._out_stats_valid.reshape(
(self.n_groups, self.data.n_pixels_valid, self.n_depths, self.n_stats)
)
def average_trees_depth_ranges(self) -> None:
assert self.n_depths > 1
self.n_depths -= 1
for i in range(self.n_depths):
# self.predicted_trees shape: (n_depths, n_trees, n_years, n_pixels_valid)
self._out_valid[i, :, : self.n_groups, :] += self._out_valid[
i + 1, :, : self.n_groups, :
]
self._out_valid[i, :, : self.n_groups, :] /= 2
def average_trees_year_ranges(self) -> None:
assert self.n_groups > 1
self.n_groups -= 1
for j in range(self.n_groups):
# self.predicted_trees shape: (n_depths, n_trees, n_years, n_pixels_valid)
self._out_valid[: self.n_depths, :, j, :] += self._out_valid[
: self.n_depths, :, j + 1, :
]
self._out_valid[: self.n_depths, :, j, :] /= 2
def compute_stats(
self, mean=True, quantiles=[0.025, 0.975], expm1=False, scale=1
) -> None:
quantile_idx = 1 if mean else 0
self.n_stats = quantile_idx + len(quantiles)
assert self.n_stats > 0
self._out_stats_valid = np.empty(
(self.n_groups * self.data.n_pixels_valid, self.n_depths * self.n_stats),
dtype=np.float32,
)
# compute stats
for i in range(self.n_depths):
if mean:
# self.predicted_stats shape: (n_years, n_pixels_valid, n_depths, n_stats)
# self.predicted_trees shape: (n_depths, n_trees, n_years, n_pixels_valid)
self.predicted_stats[:, :, i, 0] = np.mean(
self._out_valid[i, :, : self.n_groups, :], axis=0
)
if len(quantiles) > 0:
q = np.quantile(
self._out_valid[i, :, : self.n_groups, :], quantiles, axis=0
)
self.predicted_stats[:, :, i, quantile_idx:] = q.transpose((1, 2, 0))
# compute inverse log1p
if expm1:
np.expm1(self._out_stats_valid, out=self._out_stats_valid)
# compute scale
if scale != 1:
self._out_stats_valid[:] = self._out_stats_valid * scale
def save_stats_layers(
self,
base_dir,
nodata,
dtype,
out_files_prefix,
out_files_suffix,
s3_prefix,
s3_aliases,
gdal_opts,
threads,
):
assert self._out_stats_valid is not None
assert dtype == "int16" or dtype == "uint8"
assert len(out_files_prefix) == len(out_files_suffix)
assert len(out_files_prefix) == self.n_stats
assert s3_prefix is None or len(s3_aliases) > 0
# create and transpose output
with TimeTracker(
f" Tile {self.data.tile_id} - transpose data for final output ({threads} threads)"
):
# expand to original number of pixels
self._out_stats = np.empty(
(self.n_groups * self.data.n_pixels, self.n_depths * self.n_stats),
dtype=np.float32,
)
sb.fillArray(self._out_stats, threads, nodata)
sb.expandArrayRows(
self._out_stats_valid,
threads,
self._out_stats,
self.data.get_pixels_valid_idx(self.n_groups),
)
# transpose expanded array
self._out_stats_t = np.empty(
(self.n_depths * self.n_stats, self.n_groups * self.data.n_pixels),
dtype=np.float32,
)
sb.transposeArray(self._out_stats, threads, self._out_stats_t)
# rearrange years and stats
# TODO ? could this be replaced by just self._out_stats_t.reshape((self.n_depths * self.n_stats * self.n_years, self.model._data.n_pixels))?
self._out_stats_gdal = np.empty(
(self.n_depths * self.n_stats * self.n_groups, self.data.n_pixels),
dtype=np.float32,
)
sb.fillArray(self._out_stats_gdal, threads, nodata)
inverse_idx = np.empty(
(self.n_depths * self.n_stats * self.n_groups, 2), dtype=np.uintc
)
subrows_grid, rows_grid = np.meshgrid(
np.arange(self.n_groups), list(range(self.n_depths * self.n_stats))
)
inverse_idx[:, 0] = rows_grid.flatten()
inverse_idx[:, 1] = subrows_grid.flatten()
sb.inverseReorderArray(
self._out_stats_t, threads, self._out_stats_gdal, inverse_idx
)
# write outputs
with TimeTracker(
f" Tile {self.data.tile_id} - write output ({threads} threads)"
):
out_dir = _make_dir(f"{base_dir}/{self.data.tile_id}")
# TODO implement filenames function as an class function
out_files = _get_out_files_depths(
out_files_prefix=out_files_prefix,
out_files_suffix=out_files_suffix,
tile_id=self.data.tile_id,
depths=self.depths,
n_depths=self.n_depths,
years=self.groups,
n_years=self.n_groups,
n_stats=self.n_stats,
)
# TODO change the need for base image in sb.writeByteData and sb.writeInt16Data
temp_dir = f"{base_dir}/.skmap"
temp_tif = self.data.create_image_template(dtype, nodata, temp_dir)
write_idx = range(self.n_depths * self.n_stats * self.n_groups)
compress_cmd = f"gdal_translate -a_nodata {nodata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048"
s3_out = None
if s3_prefix is not None:
s3_out = [
f"{s3_aliases[random.randint(0, len(s3_aliases) - 1)]}{s3_prefix}/{self.data.tile_id}"
for _ in range(len(out_files))
]
if dtype == "int16":
sb.writeInt16Data(
self._out_stats_gdal,
threads,
gdal_opts,
[temp_tif for _ in range(len(out_files))],
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
elif dtype == "uint8":
sb.writeByteData(
self._out_stats_gdal,
threads,
gdal_opts,
[temp_tif for _ in range(len(out_files))],
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
# show final message and remove local files after sent to s3 backend
if s3_prefix is not None:
for k in range(self.n_stats):
print(
f"List results with `mc ls {s3_aliases[0]}{s3_prefix}/{self.data.tile_id}/{out_files_prefix[k]}_`"
)
_rm_dir(out_dir)
os.remove(temp_tif)
return s3_out
return [f"{out_dir}/{file}" for file in out_files]
#
[docs]
class PredictedProbs:
def __init__(self, data: TiledDataLoader, legend) -> None:
self.data = data
assert self.data is not None
self.legend = legend
assert isinstance(self.legend, dict)
try:
self.legend_codes = np.array(list(self.legend.keys()), dtype=np.float32)
except ValueError as e:
raise ValueError("Legend keys must be convertible to float32") from e
self.n_class = len(self.legend)
self.groups = self.data.catalog.get_groups()
self.n_groups = len(self.groups)
self._out_probs_valid = np.empty(
(self.n_groups * self.data.n_pixels_valid, self.n_class), dtype=np.float32
)
self._out_cls_valid = np.empty(
(self.n_groups * self.data.n_pixels_valid, 1), dtype=np.float32
)
self._out_kld_valid = np.empty(
(self.n_groups * self.data.n_pixels_valid, 1), dtype=np.float32
)
self._out_probs = None
self._out_probs_t = None
self._out_probs_gdal = None
self._out_cls = None
self._out_cls_t = None
self._out_cls_gdal = None
self._out_kld = None
self._out_kld_t = None
self._out_kld_gdal = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._out_probs = None
self._out_probs_t = None
self._out_probs_gdal = None
self._out_cls = None
self._out_cls_t = None
self._out_cls_gdal = None
self._out_kld = None
self._out_kld_t = None
self._out_kld_gdal = None
@property
def predicted_probs(self):
return self._out_probs_valid.reshape(
(self.n_groups, self.data.n_pixels_valid, self.n_class)
)
@property
def predicted_class(self):
return self._out_cls_valid.reshape((self.n_groups, self.data.n_pixels_valid))
def compute_class(self) -> None:
self._out_cls_valid[:, 0] = self.legend_codes[
np.argmax(self._out_probs_valid[:, :], axis=-1).astype(int)
]
def compute_kl_divergence(self) -> None:
adjusted_data = np.where(
self._out_probs_valid[:, :] > 0,
self._out_probs_valid[:, :],
1 / self.n_class,
)
self._out_kld_valid[:, 0] = np.sum(
adjusted_data * np.log2(adjusted_data * self.n_class), axis=-1
)
def save_class_layer(
self,
base_dir,
nodata,
dtype,
out_files_prefix,
out_files_suffix,
s3_prefix,
s3_aliases,
gdal_opts,
n_threads,
):
self.compute_class()
assert self._out_cls_valid is not None
assert dtype == "int16" or dtype == "uint8"
assert len(out_files_prefix) == len(out_files_suffix)
assert len(out_files_prefix) == 1
assert s3_prefix is None or len(s3_aliases) > 0
# create and transpose output
with TimeTracker(
f" Tile {self.data.tile_id} - transpose data for final output"
):
# expand to original number of pixels
self._out_cls = np.empty(
(self.n_groups * self.data.n_pixels, self._out_cls_valid.shape[1]),
dtype=np.float32,
)
sb.fillArray(self._out_cls, n_threads, nodata)
sb.expandArrayRows(
self._out_cls_valid,
n_threads,
self._out_cls,
self.data.get_pixels_valid_idx(self.n_groups),
)
# transpose expanded array
self._out_cls_t = np.empty(
(self._out_cls.shape[1], self._out_cls.shape[0]), dtype=np.float32
)
sb.transposeArray(self._out_cls, n_threads, self._out_cls_t)
# rearrange groups
self._out_cls_gdal = np.empty(
(self._out_cls_t.shape[0] * self.n_groups, self.data.n_pixels),
dtype=np.float32,
)
sb.fillArray(self._out_cls_gdal, n_threads, nodata)
inverse_idx = np.empty((self._out_cls_gdal.shape[0], 2), dtype=np.uintc)
subrows_grid, rows_grid = np.meshgrid(
np.arange(self.n_groups), list(range(self._out_cls_valid.shape[1]))
)
inverse_idx[:, 0] = rows_grid.flatten()
inverse_idx[:, 1] = subrows_grid.flatten()
sb.inverseReorderArray(
self._out_cls_t, n_threads, self._out_cls_gdal, inverse_idx
)
# write outputs
with TimeTracker(f" Tile {self.data.tile_id} - write class images"):
out_dir = _make_dir(f"{base_dir}/{self.data.tile_id}")
out_files = _get_out_files(out_files_prefix, out_files_suffix, self.groups)
temp_tif = [self.data.mask_path for _ in range(len(out_files))]
write_idx = list(range(self._out_cls_gdal.shape[0]))
compress_cmd = f"gdal_translate -a_nodata {nodata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048"
s3_out = None
if s3_prefix is not None:
s3_out = [
f"{s3_aliases[random.randint(0, len(s3_aliases) - 1)]}{s3_prefix}/{self.data.tile_id}"
for _ in range(len(out_files))
]
if dtype == "int16":
sb.writeInt16Data(
self._out_cls_gdal,
n_threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
elif dtype == "uint8":
sb.writeByteData(
self._out_cls_gdal,
n_threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
if s3_prefix is not None:
print(
f"List results with `mc ls {s3_aliases[0]}{s3_prefix}/{self.data.tile_id}/{out_files_prefix[0]}_`"
)
_rm_dir(out_dir)
return s3_out
return [f"{out_dir}/{file}" for file in out_files]
def save_kl_divergence_layer(
self,
base_dir,
nodata,
dtype,
scale,
out_files_prefix,
out_files_suffix,
s3_prefix,
s3_aliases,
gdal_opts,
n_threads,
):
self.compute_kl_divergence()
assert self._out_kld_valid is not None
assert dtype == "int16" or dtype == "uint8"
assert len(out_files_prefix) == len(out_files_suffix)
assert len(out_files_prefix) == 1
assert s3_prefix is None or len(s3_aliases) > 0
# create and transpose output
with TimeTracker(f"tile {self.data.tile_id} - transpose data for final output"):
sb.offsetAndScale(self._out_kld_valid, n_threads, 0.5 / scale, scale)
# expand to original number of pixels
self._out_kld = np.empty(
(self.n_groups * self.data.n_pixels, self._out_kld_valid.shape[1]),
dtype=np.float32,
)
sb.fillArray(self._out_kld, n_threads, nodata)
sb.expandArrayRows(
self._out_kld_valid,
n_threads,
self._out_kld,
self.data.get_pixels_valid_idx(self.n_groups),
)
# transpose expanded array
self._out_kld_t = np.empty(
(self._out_kld.shape[1], self._out_kld.shape[0]), dtype=np.float32
)
sb.transposeArray(self._out_kld, n_threads, self._out_kld_t)
# rearrange groups
self._out_kld_gdal = np.empty(
(self._out_kld_t.shape[0] * self.n_groups, self.data.n_pixels),
dtype=np.float32,
)
sb.fillArray(self._out_kld_gdal, n_threads, nodata)
inverse_idx = np.empty((self._out_kld_gdal.shape[0], 2), dtype=np.uintc)
subrows_grid, rows_grid = np.meshgrid(
np.arange(self.n_groups), list(range(self._out_kld_valid.shape[1]))
)
inverse_idx[:, 0] = rows_grid.flatten()
inverse_idx[:, 1] = subrows_grid.flatten()
sb.inverseReorderArray(
self._out_kld_t, n_threads, self._out_kld_gdal, inverse_idx
)
# write outputs
with TimeTracker(
f"tile {self.data.tile_id} - write Kullback-Leibler divergence image"
):
out_dir = _make_dir(f"{base_dir}/{self.data.tile_id}")
out_files = _get_out_files(out_files_prefix, out_files_suffix, self.groups)
temp_tif = [self.data.mask_path for _ in range(len(out_files))]
write_idx = list(range(self._out_kld_gdal.shape[0]))
compress_cmd = f"gdal_translate -a_nodata {nodata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048"
s3_out = None
if s3_prefix is not None:
s3_out = [
f"{s3_aliases[random.randint(0, len(s3_aliases) - 1)]}{s3_prefix}/{self.data.tile_id}"
for _ in range(len(out_files))
]
if dtype == "int16":
sb.writeInt16Data(
self._out_kld_gdal,
n_threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
elif dtype == "uint8":
sb.writeByteData(
self._out_kld_gdal,
n_threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
if s3_prefix is not None:
print(
f"List results with `mc ls {s3_aliases[0]}{s3_prefix}/{self.data.tile_id}/{out_files_prefix[0]}_`"
)
_rm_dir(out_dir)
return s3_out
return [f"{out_dir}/{file}" for file in out_files]
def save_probs_layers(
self,
base_dir,
nodata,
dtype,
scale,
out_files_prefix,
out_files_suffix,
s3_prefix,
s3_aliases,
gdal_opts,
n_threads,
):
assert self._out_probs_valid is not None
assert dtype == "int16" or dtype == "uint8"
assert len(out_files_prefix) == len(out_files_suffix)
assert len(out_files_prefix) == self.n_class
assert s3_prefix is None or len(s3_aliases) > 0
# create and transpose output
with TimeTracker(
f" Tile {self.data.tile_id} - transpose data for final output"
):
sb.offsetAndScale(self._out_probs_valid, n_threads, 0.5 / scale, scale)
# expand to original number of pixels
self._out_probs = np.empty(
(self.n_groups * self.data.n_pixels, self.n_class), dtype=np.float32
)
sb.fillArray(self._out_probs, n_threads, nodata)
sb.expandArrayRows(
self._out_probs_valid,
n_threads,
self._out_probs,
self.data.get_pixels_valid_idx(self.n_groups),
)
# transpose expanded array
self._out_probs_t = np.empty(
(self.n_class, self.n_groups * self.data.n_pixels), dtype=np.float32
)
sb.transposeArray(self._out_probs, n_threads, self._out_probs_t)
# rearrange years and stats
self._out_probs_gdal = np.empty(
(self.n_class * self.n_groups, self.data.n_pixels), dtype=np.float32
)
sb.fillArray(self._out_probs_gdal, n_threads, nodata)
inverse_idx = np.empty((self._out_probs_gdal.shape[0], 2), dtype=np.uintc)
subrows_grid, rows_grid = np.meshgrid(
np.arange(self.n_groups), list(range(self.n_class))
)
inverse_idx[:, 0] = rows_grid.flatten()
inverse_idx[:, 1] = subrows_grid.flatten()
sb.inverseReorderArray(
self._out_probs_t, n_threads, self._out_probs_gdal, inverse_idx
)
# write outputs
with TimeTracker(f" Tile {self.data.tile_id} - write probs images"):
out_dir = _make_dir(f"{base_dir}/{self.data.tile_id}")
out_files = _get_out_files(
out_files_prefix, out_files_suffix, self.groups, self.n_class
)
temp_tif = [self.data.mask_path for _ in range(len(out_files))]
write_idx = list(range(self._out_probs_gdal.shape[0]))
compress_cmd = f"gdal_translate -a_nodata {nodata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048"
s3_out = None
if s3_prefix is not None:
s3_out = [
f"{s3_aliases[random.randint(0, len(s3_aliases) - 1)]}{s3_prefix}/{self.data.tile_id}"
for _ in range(len(out_files))
]
if dtype == "int16":
sb.writeInt16Data(
self._out_probs_gdal,
n_threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
elif dtype == "uint8":
sb.writeByteData(
self._out_probs_gdal,
n_threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
if s3_prefix is not None:
print(
f"List results with `mc ls {s3_aliases[0]}{s3_prefix}/{self.data.tile_id}/{out_files_prefix[0]}_`"
)
_rm_dir(out_dir)
return s3_out
return [f"{out_dir}/{file}" for file in out_files]
#
[docs]
class Reducer:
def __init__(self, reducer_features: List[str], reducer_fn: Callable) -> None:
self.reducer_features = reducer_features
self.reducer_fn = reducer_fn
self.in_covs_t = None
self.in_covs = None
self.in_covs_valid = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.in_covs_t = None
self.in_covs = None
self.in_covs_valid = None
def reduce(self, data: TiledDataLoader):
with TimeTracker(
f" Tile {data.tile_id} - predict ({len(self.reducer_features)} input features)",
True,
):
# prepare input and output arrays
n_groups = len(data.catalog.get_groups())
n_samples = n_groups * data.n_pixels
n_samples_valid = n_groups * data.n_pixels_valid
self.in_covs_t = np.empty(
(len(self.reducer_features), n_samples), dtype=np.float32
)
self.in_covs = np.empty(
(n_samples, len(self.reducer_features)), dtype=np.float32
)
self.in_covs_valid = np.empty(
(n_samples_valid, len(self.reducer_features)), dtype=np.float32
)
# transpose data
matrix_idx = data.catalog._get_covs_idx(self.reducer_features)
with TimeTracker(f" Tile {data.tile_id} - transpose data"):
sb.reorderArray(data.array, data.n_threads, self.in_covs_t, matrix_idx)
sb.transposeArray(self.in_covs_t, data.n_threads, self.in_covs)
sb.selArrayRows(
self.in_covs,
data.n_threads,
self.in_covs_valid,
data.get_pixels_valid_idx(n_groups),
)
# create output result
result = ReducedValues(data=data)
# compute
with TimeTracker(f" Tile {data.tile_id} - model prediction"):
result._out_reduc_valid[:] = self.reducer_fn(self.in_covs_valid)
return result # shape: (n_samples, 1)
#
[docs]
class ReducedValues:
def __init__(self, data: TiledDataLoader) -> None:
self.data = data
assert self.data is not None
self.groups = self.data.catalog.get_groups()
self.n_groups = len(self.groups)
self._out_reduc_valid = np.empty(
(self.n_groups * self.data.n_pixels_valid), dtype=np.float32
)
self._out_reduc = None
self._out_reduc_t = None
self._out_reduc_gdal = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._out_reduc_valid = None
self._out_reduc = None
self._out_reduc_t = None
self._out_reduc_gdal = None
@property
def reduced_values(self):
return self._out_reduc_valid.reshape((self.n_groups, self.data.n_pixels_valid))
def save_reduced_layer(
self,
base_dir,
nodata,
dtype,
out_files_prefix,
out_files_suffix,
s3_prefix,
s3_aliases,
gdal_opts,
threads,
):
assert self._out_reduc_valid is not None
assert dtype == "int16" or dtype == "uint8"
assert len(out_files_prefix) == len(out_files_suffix)
assert len(out_files_prefix) == 1
assert s3_prefix is None or len(s3_aliases) > 0
# create and transpose output
with TimeTracker(
f" Tile {self.data.tile_id} - transpose data for final output ({threads} threads)"
):
# expand to original number of pixels
self._out_reduc = np.empty(
(self.n_groups * self.data.n_pixels, 1), dtype=np.float32
)
sb.fillArray(self._out_reduc, threads, nodata)
sb.expandArrayRows(
self._out_reduc_valid,
threads,
self._out_reduc,
self.data.get_pixels_valid_idx(self.n_groups),
)
# transpose expanded array
self._out_reduc_t = np.empty(
(1, self.n_groups * self.data.n_pixels), dtype=np.float32
)
sb.transposeArray(self._out_reduc, threads, self._out_reduc_t)
# rearrange groups
self._out_reduc_gdal = np.empty(
(1 * self.n_groups, self.data.n_pixels), dtype=np.float32
)
sb.fillArray(self._out_reduc_gdal, threads, nodata)
inverse_idx = np.empty((self._out_reduc_gdal.shape[0], 2), dtype=np.uintc)
subrows_grid, rows_grid = np.meshgrid(
np.arange(self.n_groups), list(range(1))
)
inverse_idx[:, 0] = rows_grid.flatten()
inverse_idx[:, 1] = subrows_grid.flatten()
sb.inverseReorderArray(
self._out_reduc_t, threads, self._out_reduc_gdal, inverse_idx
)
# write outputs
with TimeTracker(
f" Tile {self.data.tile_id} - write probs images ({threads} threads)"
):
out_dir = _make_dir(f"{base_dir}/{self.data.tile_id}")
out_files = _get_out_files(out_files_prefix, out_files_suffix, self.groups)
temp_tif = [self.data.mask_path for _ in range(len(out_files))]
write_idx = list(range(self._out_reduc_gdal.shape[0]))
compress_cmd = f"gdal_translate -a_nodata {nodata} -co COMPRESS=deflate -co PREDICTOR=2 -co TILED=TRUE -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048"
s3_out = None
if s3_prefix is not None:
s3_out = [
f"{s3_aliases[random.randint(0, len(s3_aliases) - 1)]}{s3_prefix}/{self.data.tile_id}"
for _ in range(len(out_files))
]
if dtype == "int16":
sb.writeInt16Data(
self._out_reduc_gdal,
threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
elif dtype == "uint8":
sb.writeByteData(
self._out_reduc_gdal,
threads,
gdal_opts,
temp_tif,
out_dir,
out_files,
write_idx,
0,
0,
self.data.x_size,
self.data.y_size,
int(nodata),
compress_cmd,
s3_out,
)
if s3_prefix is not None:
print(
f"List results with `mc ls {s3_aliases[0]}{s3_prefix}/{self.data.tile_id}/{out_files_prefix[0]}_`"
)
_rm_dir(out_dir)
return s3_out
return [f"{out_dir}/{file}" for file in out_files]
#
def _get_out_files(
out_files_prefix: List[str],
out_files_suffix: List[str],
groups: List[str],
n_class: Optional[int] = None,
):
if n_class is None:
n_class = 1
out_files = []
if "common" in groups:
for _ in groups:
for i in range(n_class):
file = f"{out_files_prefix[i]}_{out_files_suffix[i]}"
out_files.append(file)
else:
for group in groups:
for i in range(n_class):
file = f"{out_files_prefix[i]}_{group}0101_{group}1231_{out_files_suffix[i]}"
out_files.append(file)
return out_files