Source code for skmap.io.process

import os
import time
import warnings
from enum import Enum
from typing import List

from scipy.linalg import matmul_toeplitz

try:
    import gc
    import math
    from abc import ABC, abstractmethod
    from datetime import datetime

    import bottleneck as bn
    import numexpr as ne
    import numpy as np
    import pandas as pd
    import pyfftw
    import scipy.sparse as sparse
    import statsmodels.api as sm
    from dateutil.relativedelta import relativedelta
    from pandas import DataFrame
    from scipy.linalg import circulant, matmul_toeplitz
    from scipy.ndimage import convolve1d
    from scipy.signal import find_peaks
    from scipy.sparse.linalg import splu
    from scipy.special import log1p
    from scipy.stats import theilslopes
    from statsmodels.tsa.seasonal import STL

    from skmap import SKMapGroupRunner, SKMapRunner, parallel
    from skmap.io import RasterData
    from skmap.misc import (
        date_range,
        del_memmap,
        load_memmap,
        nan_percentile,
        new_memmap,
        ref_memmap,
    )

[docs] class Transformer(SKMapGroupRunner, ABC): def __init__(self, name: str, verbose: bool = True, temporal=False) -> None: super().__init__(verbose=verbose, temporal=temporal) self.name = name self.name_qa = f"{self.name}{RasterData.TRANSFORM_SEP}qa" def _new_info(self, rdata, group, outname, nm): info = rdata._info() new_info = [] for index, row in info.iterrows(): start_dt = row[RasterData.START_DT_COL] end_dt = row[RasterData.END_DT_COL] raster_file = row[RasterData.PATH_COL] name = rdata._set_date(outname, start_dt, end_dt, nm=nm, gr=group) new_group = f"{group}.{nm}" new_info.append( rdata._new_info_row( raster_file, group=new_group, name=name, dates=[start_dt, end_dt], ) ) return new_info # FIXME: adapt for group_list, ginfo_list def run( self, rdata: RasterData, group: str, outname: str = "skmap_{nm}_{gr}_{dt}" ): if outname is None: outname = "skmap_{nm}_{gr}_{dt}" array = rdata._array() result = self._run(array) if isinstance(result, tuple) and len(result) >= 2: new_array, new_array_qa = result[0], result[1] else: new_array, new_array_qa = result, None new_info = self._new_info(rdata, group, outname, self.name) if new_array_qa is not None: new_info += self._new_info(rdata, group, outname, self.name_qa) new_array = np.concatenate([new_array, new_array_qa], axis=-1) return new_array, DataFrame(new_info) @abstractmethod def _run(self, data): pass
[docs] class Derivator(SKMapGroupRunner, ABC): def __init__(self, verbose: bool = True, temporal=False) -> None: super().__init__(verbose=verbose, temporal=temporal)
[docs] def run( self, rdata: RasterData, group_list: str, ginfo_list: str, outname: str = None, ): """ Execute the gapfilling approach. """ kwargs = { "rdata": rdata, "group_list": group_list, "ginfo_list": ginfo_list, } if outname is not None: kwargs["outname"] = outname new_array, new_info = self._run(**kwargs) return new_array, new_info
@abstractmethod def _run( self, rdata: RasterData, group_list: str, ginfo_list: str, outname: str ): pass
[docs] class Filler(Transformer, ABC): def __init__(self, name: str, verbose: bool = True, temporal=False) -> None: super().__init__(name=name, verbose=verbose, temporal=temporal) def _n_gaps(self, data=None): nan_data = np.isnan(data) gap_mask = np.logical_not(np.all(nan_data, axis=-1)) return np.sum(nan_data[gap_mask].astype("int")) def _run(self, data): """ Execute the gapfilling approach. """ n_gaps = None if self.verbose: n_gaps = self._n_gaps(data) self._verbose(f"There are {n_gaps} gaps in {data.shape}") start = time.time() result = self._gapfill(data) if isinstance(result, tuple) and len(result) >= 2: filled = result[0] else: filled = result if self.verbose: r_gaps = self._n_gaps(filled) gaps_perc = (n_gaps - r_gaps) / n_gaps self._verbose( f"{gaps_perc * 100:.2f}% of the gaps filled in {(time.time() - start):.2f} segs" ) if gaps_perc < 1: self._verbose(f"Remained gaps: {r_gaps}") return result @abstractmethod def _gapfill(self, data): pass
[docs] class SircleTransformer(Transformer): """ :param data: N_timeseries x N_samples matrix where the time series are stored one per each row :param w_0: convolution coefficent associated with the present :param w_f: convolution coefficents associated with the future :param w_p: convolution coefficents associated with the past :param use_mask: decide if to use a mask for weights renormalization :param return_den: in case of usage of the mask will return the denominator matrix in the Hadamard division :param S: optional N_timeseries x N_samples matrix where per element scalings are stored :param use_fft_backend: force usage of FFT backend computation of the convolution :param n_jobs: number of CPU to be used in parallel """ def __init__( self, wv_0: float, wv_f=[], wv_p=[], wm_0: float = None, wm_f=[], wm_p=[], use_mask: bool = False, return_den: bool = False, keep_original_values: bool = True, S=[], backend: str = "dense", n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(name="SIRCLE", verbose=verbose, temporal=True) self.wv_0 = wv_0 self.wv_f = wv_f self.wv_p = wv_p self.wm_0 = wm_0 self.wm_f = wm_f self.wm_p = wm_p self.use_mask = use_mask self.return_den = return_den self.keep_original_values = keep_original_values self.S = S self.backend = backend self.n_jobs = n_jobs def _run(self, data): # Convolution and normalization try: import mkl mkl.set_num_threads(self.n_jobs) except ImportError: pass np.seterr(divide="ignore", invalid="ignore") orig_shape = data.shape data = np.reshape(data, (data.shape[0] * data.shape[1], data.shape[2])) # @TODO avoid this and include the multiband case if data.ndim > 1: n_t = data.shape[0] n_s = data.shape[1] else: n_t = 1 n_s = data.size assert self.wv_p.ndim == 1, "wv_p must be a 1D array" assert self.wv_f.ndim == 1, "wv_f must be a 1D array" if self.use_mask: if self.wm_0 is None: self.wm_0 = self.wv_0 if self.wm_p == []: self.wm_p = self.wv_p.copy() else: assert self.wv_p.ndim == 1, "wm_p must be a 1D array" if self.wm_f == []: self.wm_f = self.wv_f.copy() else: assert self.wv_f.ndim == 1, "wm_f must be a 1D array" assert self.wm_p.shape == self.wv_p.shape, ( "wm_p must be of the same size of wv_p" ) assert self.wm_f.shape == self.wv_f.shape, ( "wm_f must be of the same size of wv_f" ) n_p = self.wv_p.size n_f = self.wv_f.size n_e = n_s + max(n_p, n_f) assert max(n_p, n_f) <= n_s, ( "the size of wv_p and of wv_f should be inferior to the one of the time series" ) V_e = np.zeros((n_t, n_e), dtype=np.float64, order="F") V_e[:, 0:n_s] = data if self.use_mask: valid_mask = ~np.isnan(V_e).astype(bool) valid_mask[:, n_s:] = False V_e[~valid_mask] = 0.0 M_e = valid_mask.astype(np.float64) if self.backend == "dense": wv_e = np.zeros((n_e,)) wv_e[0] = self.wv_0 wv_e[1 : n_f + 1] = self.wv_f if n_p > 0: wv_e[-n_p:] = self.wv_p Wv_e = circulant(wv_e) Vt_e = np.dot(V_e, Wv_e)[:, 0:n_s] if self.use_mask: wm_e = np.zeros((n_e,)) wm_e[0] = self.wm_0 wm_e[1 : n_f + 1] = self.wm_f if n_p > 0: wm_e[-n_p:] = self.wm_p Wm_e = circulant(wm_e) Mt_e = np.dot(M_e, Wm_e)[:, 0:n_s] elif self.backend == "sparse": n_pad = max(len(self.wv_f), len(self.wv_p)) wv_e = np.zeros(n_pad * 2 + 1) wv_e[n_pad] = self.wv_0 wv_e[n_pad - len(self.wv_f) : n_pad] = self.wv_f[::-1] wv_e[n_pad + 1 : n_pad + 1 + len(self.wv_p)] = self.wv_p[::-1] Vt_e = convolve1d(V_e[:, 0:n_s], wv_e, mode="constant", cval=0, axis=-1) if self.use_mask: wm_e = np.zeros(n_pad * 2 + 1) wm_e[n_pad] = self.wm_0 wm_e[n_pad - len(self.wm_f) : n_pad] = self.wm_f[::-1] wm_e[n_pad + 1 : n_pad + 1 + len(self.wm_p)] = self.wm_p[::-1] Mt_e = convolve1d( M_e[:, 0:n_s], wm_e, mode="constant", cval=0, axis=-1 ) elif self.backend == "FFT": wv_e = np.zeros((n_e,)) wv_e[0] = self.wv_0 wv_e[1 : n_p + 1] = self.wv_p[::-1] if n_f > 0: wv_e[-n_f:] = self.wv_f[::-1] if self.use_mask: wm_e = np.zeros((n_e,)) wm_e[0] = self.wm_0 wm_e[1 : n_p + 1] = self.wm_p[::-1] if n_f > 0: wm_e[-n_f:] = self.wm_f[::-1] def parallel_calculation(i): pyfftw.config.NUM_THREADS = self.n_jobs in_fft = pyfftw.empty_aligned(n_e, dtype="float64") in_ifft = pyfftw.empty_aligned(n_e, dtype="complex128") in_fft[:] = wv_e Wv_fft = pyfftw.interfaces.numpy_fft.fft(in_fft) in_fft[:] = V_e[i, :] V_fft = pyfftw.interfaces.numpy_fft.fft(in_fft) in_ifft[:] = V_fft * Wv_fft return pyfftw.interfaces.numpy_fft.ifft(in_ifft).real with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(parallel_calculation, range(n_t))) Vt_e = np.array(results)[:, 0:n_s] if self.use_mask: def parallel_calculation(i): pyfftw.config.NUM_THREADS = self.n_jobs in_fft = pyfftw.empty_aligned(n_e, dtype="float64") in_ifft = pyfftw.empty_aligned(n_e, dtype="complex128") in_fft[:] = wm_e Wm_fft = pyfftw.interfaces.numpy_fft.fft(in_fft) in_fft[:] = M_e[i, :] M_fft = pyfftw.interfaces.numpy_fft.fft(in_fft) in_ifft[:] = M_fft * Wm_fft return pyfftw.interfaces.numpy_fft.ifft(in_ifft).real with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(parallel_calculation, range(n_t))) Mt_e = np.array(results)[:, 0:n_s] else: raise ValueError("Invalid backend specified") if self.use_mask: Vt_e = Vt_e / Mt_e if self.keep_original_values: Vt_e[valid_mask[:, 0:n_s]] = V_e[valid_mask] min_non_zero = np.min(wm_e[wm_e != 0.0]) assert min_non_zero > pow(10.0, -np.finfo(Mt_e.dtype).precision - 1), ( "Use larger values for the non-zero elements of the mask weighting vector, \ otherise numerical noise could make indistinguishable actual zeros form numerical zeros." ) numerical_zeros_mask = np.abs(Mt_e) < min_non_zero Vt_e[numerical_zeros_mask] = np.nan if self.return_den: Mt_e[numerical_zeros_mask] = 0.0 n_pad = max(len(self.wv_f), len(self.wv_p)) wm_e = np.zeros(n_pad * 2 + 1) wm_e[n_pad] = self.wm_0 wm_e[n_pad - len(self.wm_f) : n_pad] = self.wm_f[::-1] wm_e[n_pad + 1 : n_pad + 1 + len(self.wm_p)] = self.wm_p[::-1] tmp_vec = np.zeros((n_e,), dtype=np.float64) tmp_vec[0:n_s] = 1.0 norm_mask = max( convolve1d(tmp_vec, wm_e, mode="constant", cval=0, axis=-1) ) Mt_e[:, :n_s] = Mt_e[:, :n_s] / norm_mask # Normalize by the best acheavable weight if self.keep_original_values: Mt_e[valid_mask[:, 0:n_s]] = 1.0 return np.reshape(Vt_e, orig_shape), np.reshape(Mt_e, orig_shape) else: return np.reshape(Vt_e, orig_shape) else: return np.reshape(Vt_e, orig_shape)
[docs] class SeasConvFill(Filler): """ :param season_size: number of images per year :param att_seas: dB of attenuation for images of opposite seasonality :param att_env: dB of attenuation for temporarily far images :param n_cpu: number of CPU to be used in parallel """ def __init__( self, season_size: int, att_seas: float = 60, att_env: float = 20, conv_vect_future=[], conv_vect_past=[], return_qa: bool = False, n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(name="seasconv", verbose=verbose, temporal=True) self.season_size = season_size self.return_qa = return_qa self.att_seas = att_seas self.att_env = att_env self.conv_vect_future = conv_vect_future self.conv_vect_past = conv_vect_past self.n_jobs = n_jobs def _compute_conv_mat_row(self, n_imag): # Compute a triangular basis function with yaerly periodicity conv_mat_row = np.zeros((n_imag)) base_func = np.zeros((self.season_size,)) period_y = self.season_size / 2.0 slope_y = self.att_seas / 10 / period_y for i in np.arange(self.season_size): if i <= period_y: base_func[i] = -slope_y * i else: base_func[i] = slope_y * (i - period_y) - self.att_seas / 10 # Compute the envelop to attenuate temporarly far images env_func = np.zeros((n_imag,)) delta_e = n_imag slope_e = self.att_env / 10 / delta_e for i in np.arange(delta_e): env_func[i] = -slope_e * i conv_mat_row = 10.0 ** (np.resize(base_func, n_imag) + env_func) return conv_mat_row def _fftw_toeplitz_matmul(self, data, valid_mask): tmp_norm_vec = np.ones((data.shape[0], 1)) tmp_norm_vec[0] = 0.0 norm_vec = matmul_toeplitz( (self.conv_vect_past, self.conv_vect_future), tmp_norm_vec, check_finite=False, workers=None, ) filled = matmul_toeplitz( (self.conv_vect_past, self.conv_vect_future), data, check_finite=False, workers=None, ) filled_qa = matmul_toeplitz( (self.conv_vect_past, self.conv_vect_future), valid_mask, check_finite=False, workers=None, ) conv_vec = np.concatenate( (self.conv_vect_past, self.conv_vect_future[-1:0:-1]) ) nz_conv_vec = conv_vec[conv_vec > 0] min_conv_val = np.min(nz_conv_vec) filled = filled / filled_qa no_fill_mask = filled_qa < min_conv_val filled_qa /= np.max(norm_vec) filled[no_fill_mask] = np.nan filled_qa[no_fill_mask] = 0 return filled, filled_qa def _gapfill(self, data): # Convolution and normalization try: import mkl mkl.get_num_threads() mkl.set_num_threads(self.n_jobs) except: pass np.seterr(divide="ignore", invalid="ignore") orig_shape = data.shape data = np.reshape( data, (data.shape[0] * data.shape[1], data.shape[2]) ).T.copy() valid_mask = ~np.isnan(data) data[~valid_mask] = 0.0 n_imag = data.shape[0] if self.season_size * 2 > n_imag: warnings.warn( "Less then two years of images available, the time series reconstruction will not take advantage of seasonality" ) half_conv_vect = self._compute_conv_mat_row(n_imag) if self.conv_vect_future == []: self.conv_vect_future = half_conv_vect if self.conv_vect_past == []: self.conv_vect_past = half_conv_vect filled, filled_qa = self._fftw_toeplitz_matmul( data, valid_mask.astype(float) ) filled[valid_mask] = data[valid_mask] filled_qa[valid_mask] = 1.0 filled_qa = filled_qa * 100 filled_qa[filled_qa == 0.0] = np.nan # Return the reconstructed time series and the quality assesment layer if self.return_qa: return np.reshape(filled.T, orig_shape), np.reshape( filled_qa.T, orig_shape ) else: return np.reshape(filled.T, orig_shape)
[docs] class WhittakerSmooth(Transformer): """ https://github.com/mhvwerts/whittaker-eilers-smoother/blob/master/whittaker_smooth.py """ def __init__( self, lmbd=1, d=2, n_jobs: int = os.cpu_count(), verbose=False ) -> None: super().__init__(name="whittaker", verbose=verbose, temporal=True) self.lmbd = lmbd self.d = d self.n_jobs = n_jobs def _speyediff(self, N, d, format="csc"): """ (utility function) Construct a d-th order sparse difference matrix based on an initial N x N identity matrix Final matrix (N-d) x N """ assert not (d < 0), "d must be non negative" shape = (N - d, N) diagonals = np.zeros(2 * d + 1) diagonals[d] = 1.0 for i in range(d): diff = diagonals[:-1] - diagonals[1:] diagonals = diff offsets = np.arange(d + 1) spmat = sparse.diags(diagonals, offsets, shape, format=format) return spmat def _process_ts(self, data): y = data.reshape(-1).copy() n_gaps = np.sum((np.isnan(y)).astype("int")) if n_gaps == 0: r = splu(self.coefmat).solve(y) return r else: return y def _run(self, data): m = data.shape[-1] E = sparse.eye(m, format="csc") D = self._speyediff(m, self.d, format="csc") self.coefmat = E + self.lmbd * D.conj().T.dot(D) return parallel.apply_along_axis( self._process_ts, 2, data, n_jobs=self.n_jobs )
[docs] class TimeEnum(Enum): MONTHLY = 1 MONTHLY_15P = 2 MONTHLY_LONGTERM = 3 BIMONTHLY = 4 BIMONTHLY_15P = 5 BIMONTHLY_LONGTERM = 6 QUARTERLY = 7 YEARLY = 8
[docs] class TimeAggregate(Derivator): def __init__( self, time: list = [TimeEnum.YEARLY, TimeEnum.MONTHLY_LONGTERM], operations: List = ["p25", "p50", "p75", "std"], rename_operations: dict = {}, post_expression: str = None, date_overlap: bool = False, n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(verbose=verbose, temporal=True) self.time = time self.operations = operations self.rename_operations = rename_operations self.date_overlap = date_overlap self.n_jobs = n_jobs self.post_expression = post_expression self.percs = [] self.bn_ops = [] for op in self.operations: if op[0] == "p": self.percs.append(int(op[1:])) else: method = f"nan{op}" if not hasattr(bn, method): raise Exception( f"Operation {method} is invalid, since bottleneck.{method} not exists." ) self.bn_ops.append((op, getattr(bn, method))) def _op_name(self, op): if op in self.rename_operations: return self.rename_operations[op] else: return op def _aggregate(self, new_idx, ref_array, array_idx, group, tm, dt1, dt2): array = load_memmap(**ref_array) ops = [] _idxs = [] for op, method in self.bn_ops: array[:, :, new_idx : new_idx + 1] = method( array[:, :, array_idx], axis=-1 )[:, :, np.newaxis] _idxs.append(new_idx) new_idx += 1 ops.append(self._op_name(f"{op}")) if len(self.percs) > 0: perc_idx = list(range(new_idx, new_idx + len(self.percs))) in_array = array[:, :, array_idx] # array[:,:,array_idx].copy() array[:, :, perc_idx] = nan_percentile( in_array.transpose((2, 0, 1)), q=self.percs ).transpose((1, 2, 0)) new_idx += len(self.percs) _idxs += perc_idx for p in self.percs: ops.append(self._op_name(f"p{p}")) if self.post_expression is not None and len(_idxs) > 0: for idx in _idxs: array[:, :, idx] = ne.evaluate( self.post_expression, local_dict={"new_array": array[:, :, idx]} ) return (group, ops, tm, dt1, dt2) def _args_monthly( self, rdata, group, start_dt, end_dt, date_format, months=1, daysp=None ): args = [] ref_array = ref_memmap(rdata.array) for dt1, dt2 in date_range( f"{start_dt.year}0101", f"{end_dt.year}1201", "months", months, return_str=True, ignore_29feb=True, date_format=date_format, ): dt1a, dt2a = dt1, dt2 if daysp is not None: dt1a = datetime.strptime(dt1, date_format) dt2a = datetime.strptime(dt2, date_format) dt1a = (dt1a - relativedelta(days=daysp)).strftime(date_format) dt2a = (dt2a + relativedelta(days=daysp)).strftime(date_format) tm = "" array_idx = rdata.filter_date( dt1a, dt2a, return_idx=True, date_format=date_format, date_overlap=self.date_overlap, ) if len(array_idx) > 0: args += [ ( ref_array, array_idx, group, tm, datetime.strptime(dt1, date_format), datetime.strptime(dt2, date_format), ) ] return args def _args_yearly(self, rdata, group, start_dt, end_dt, date_format): args = [] ref_array = ref_memmap(rdata.array) for dt1, dt2 in date_range( f"{start_dt.year}0101", f"{end_dt.year}1201", "years", 1, return_str=True, ignore_29feb=False, date_format=date_format, ): tm = "yearly" array_idx = rdata.filter_date( dt1, dt2, return_idx=True, date_format=date_format, date_overlap=self.date_overlap, ) if len(array_idx): args += [ ( ref_array, array_idx, group, tm, datetime.strptime(dt1, date_format), datetime.strptime(dt2, date_format), ) ] return args def _args_monthly_longterm(self, rdata, group, start_dt, end_dt, date_format): args = [] ref_array = ref_memmap(rdata.array) for month in range(1, 13): array_idx_list = [] month = str(month).zfill(2) for dt1, dt2 in date_range( f"{start_dt.year}{month}01", f"{end_dt.year}{month}01", "months", 1, date_offset=11, return_str=True, ignore_29feb=False, date_format=date_format, ): array_idx = rdata.filter_date( dt1, dt2, return_idx=True, date_format=date_format, date_overlap=self.date_overlap, ) if len(array_idx): array_idx_list += array_idx tm = f"m{month}" if len(array_idx_list) > 0: # args += [ (np.concatenate(in_array, axis=-1), tm, start_dt, end_dt) ] args += [(ref_array, array_idx_list, group, tm, start_dt, end_dt)] return args def _run( self, rdata: RasterData, group_list: list, ginfo_list: list, outname: str = "skmap_aggregate.{gr}_{op}_{dt}", ): args = [] for group, ginfo in zip(group_list, ginfo_list): date_format = "%Y%m%d" start_dt = ginfo[RasterData.START_DT_COL].min() end_dt = ginfo[RasterData.END_DT_COL].max() rdata._active_group = group for t in self.time: if t == TimeEnum.MONTHLY_LONGTERM: args += self._args_monthly_longterm( rdata, group, start_dt, end_dt, date_format ) elif t == TimeEnum.YEARLY: args += self._args_yearly( rdata, group, start_dt, end_dt, date_format ) elif t == TimeEnum.MONTHLY: args += self._args_monthly( rdata, group, start_dt, end_dt, date_format, 1 ) elif t == TimeEnum.MONTHLY_15P: args += self._args_monthly( rdata, group, start_dt, end_dt, date_format, 1, 15 ) elif t == TimeEnum.BIMONTHLY: args += self._args_monthly( rdata, group, start_dt, end_dt, date_format, 2 ) elif t == TimeEnum.BIMONTHLY_15P: args += self._args_monthly( rdata, group, start_dt, end_dt, date_format, 2, 15 ) elif t == TimeEnum.QUARTERLY: args += self._args_monthly( rdata, group, start_dt, end_dt, date_format, 3 ) else: raise Exception(f"Aggregation by {t} not implemented") n_new_rasters = len(args) * len(self.operations) idx_offset = rdata._idx_offset() _args = [] for idx, arg in zip(range(0, n_new_rasters, len(self.operations)), args): _arg = list(arg) _arg.insert(0, idx_offset + idx) _args.append(tuple(_arg)) args = _args new_info = [] self._verbose( f"Computing {len(args)} " + f"time aggregates from {start_dt.year} to {end_dt.year}" ) for group, ops, tm, dt1, dt2 in parallel.job( self._aggregate, args, joblib_args={"backend": "multiprocessing"} ): for op in ops: _group = group if tm != "": _group = f"{group}.{tm}" rdata._active_group = group name = rdata._set_date(outname, dt1, dt2, op=op, gr=_group) new_group = f"{_group}.{op}" new_info.append( rdata._new_info_row( rdata.base_raster, name=name, group=new_group, dates=[dt1, dt2], ) ) rdata._active_group = None return None, DataFrame(new_info)
[docs] class PeakAnalysis(Derivator): def __init__( self, season_size: int, min_height: float = 0.5, min_prominence: float = 0.2, min_distance: float = 1.0, scale_expr: str = None, n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(verbose=verbose, temporal=True) self.season_size = season_size self.min_height = min_height self.min_prominence = min_prominence self.min_distance = min_distance self.scale_expr = scale_expr self.n_jobs = n_jobs self.name_misc = [ ("peaks", "m", 100), ("peaks", "n", 1), ] self.scale_arr = np.array([scale for _, _, scale in self.name_misc]) def _find_peaks(self, data): if self.scale_expr is not None: data = ne.evaluate(self.scale_expr, {"data": data}) has_nan = np.sum(np.isnan(data).astype("int")) ts_size = data.shape[0] idxs = [ (i, i + self.season_size) for i in range(0, ts_size, self.season_size) ] result = np.empty((len(idxs) * 2)) if has_nan == 0: peaks, _ = find_peaks( data, height=self.min_height, prominence=self.min_prominence, distance=self.min_distance, ) _peaks = list(peaks) o2 = 0 if len(peaks) > 0: for i0, i1 in idxs: seas_peaks = list((i for i in range(i0, i1) if i in _peaks)) nos = len(seas_peaks) mean, los = np.nan, 0 if nos > 0: mean = np.mean(data[seas_peaks]) los = np.sum(data[i0:i1] > mean * 0.5) / self.season_size result[o2] = los * self.scale_arr[0] result[o2 + 1] = nos * self.scale_arr[1] o2 += 2 return result def _unpack(self, i0_0, i0_1, i2, ref_array, idx_offset) -> bool: array = load_memmap(**ref_array) result = np.apply_along_axis(self._find_peaks, 2, array[i0_0:i0_1, :, i2]) o2 = list(range(idx_offset, idx_offset + result.shape[2])) array[i0_0:i0_1, :, o2] = result return True def _args(self, rdata, ginfo): ref_array = ref_memmap(rdata.array) max_i0 = rdata.array.shape[0] rows_per_job = math.ceil(max_i0 / self.n_jobs) idx_offset = rdata._idx_offset() args = [] for i in range(0, max_i0, rows_per_job): i0_0, i0_1 = i, (i + rows_per_job) if i0_1 > max_i0: i0_1 = max_i0 i2 = ginfo.index args.append((i0_0, i0_1, i2, ref_array, idx_offset)) return args def _run( self, rdata: RasterData, group_list: list, ginfo_list: list, outname: str = "skmap_{gr}.{nm}_{pr}_{dt}", ): new_info = [] for group, ginfo in zip(group_list, ginfo_list): rdata._active_group = group start_dt_min = ginfo[RasterData.START_DT_COL].min() end_dt_max = ginfo[RasterData.END_DT_COL].max() ts_size = ginfo.shape[0] args = self._args(rdata, ginfo) for r in parallel.job( self._unpack, args, n_jobs=self.n_jobs, joblib_args={"backend": "multiprocessing"}, ): continue for i in range(0, ts_size, self.season_size): _i = int(i / self.season_size) i0, i1 = (i, i + self.season_size - 1) start_dt_min = ginfo.iloc[i0][RasterData.START_DT_COL] end_dt_max = ginfo.iloc[i1][RasterData.END_DT_COL] for j, (nm, pr, _) in zip( range(_i, _i + len(self.name_misc)), self.name_misc ): name = rdata._set_date( outname, start_dt_min, end_dt_max, nm=nm, pr=pr, gr=group ) new_group = f"{group}.{nm}.{pr}" new_info.append( rdata._new_info_row( rdata.base_raster, group=new_group, name=name, dates=[start_dt_min, end_dt_max], ) ) return None, DataFrame(new_info)
[docs] class SlopeAnalysis(Derivator): def __init__( self, scale_expr: str = None, scaling: float = 1.0, n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(verbose=verbose, temporal=True) self.scale_expr = scale_expr self.scaling = scaling self.n_jobs = n_jobs def _theil_slopes(self, data): if self.scale_expr is not None: data = ne.evaluate(self.scale_expr, {"data": data}) has_nan = np.sum(np.isnan(data).astype("int")) result = np.empty( 1, ) if has_nan == 0: result[0], _, _, _ = theilslopes(data, np.arange(0, data.shape[0])) else: result[0] = np.nan result[0] *= self.scaling return result def _unpack(self, i0_0, i0_1, i2, ref_array, idx_offset) -> bool: array = load_memmap(**ref_array) result = np.apply_along_axis(self._theil_slopes, 2, array[i0_0:i0_1, :, i2]) o2 = list(range(idx_offset, idx_offset + result.shape[2])) array[i0_0:i0_1, :, o2] = result return True def _args(self, rdata, ginfo): ref_array = ref_memmap(rdata.array) max_i0 = rdata.array.shape[0] rows_per_job = math.ceil(max_i0 / self.n_jobs) idx_offset = rdata._idx_offset() args = [] for i in range(0, max_i0, rows_per_job): i0_0, i0_1 = i, (i + rows_per_job) if i0_1 > max_i0: i0_1 = max_i0 i2 = ginfo.index args.append((i0_0, i0_1, i2, ref_array, idx_offset)) return args def _run( self, rdata: RasterData, group_list: list, ginfo_list: list, outname: str = "{gr}.{nm}_{pr}_{dt}", ): new_info = [] for group, ginfo in zip(group_list, ginfo_list): rdata._active_group = group start_dt_min = ginfo[RasterData.START_DT_COL].min() end_dt_max = ginfo[RasterData.END_DT_COL].max() ginfo.shape[0] args = self._args(rdata, ginfo) for r in parallel.job( self._unpack, args, n_jobs=self.n_jobs, joblib_args={"backend": "multiprocessing"}, ): continue nm = "theilslopes" pr = "m" name = rdata._set_date( outname, start_dt_min, end_dt_max, nm=nm, pr=pr, gr=group ) new_group = f"{group}.{nm}.{pr}" new_info.append( rdata._new_info_row( rdata.base_raster, group=new_group, name=name, dates=[start_dt_min, end_dt_max], ) ) return None, DataFrame(new_info)
[docs] class FindMinMax(Derivator): def __init__( self, scale_expr: str = None, season_size: int = None, scaling: float = 1.0, min_max: str = None, n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(verbose=verbose, temporal=True) self.scale_expr = scale_expr self.season_size = season_size self.min_max = min_max self.scaling = scaling self.n_jobs = n_jobs def _find_min_max(self, data): if self.scale_expr is not None: data = ne.evaluate(self.scale_expr, {"data": data}) ts_size = data.shape[0] idxs = [ (i, i + self.season_size) for i in range(0, ts_size, self.season_size) ] result = np.empty(len(idxs)) for j, (i0, i1) in enumerate(idxs): has_nan = np.sum(np.isnan(data[i0:i1]).astype("int")) if has_nan == 0: if self.min_max == "min": result[j] = np.min(data[i0:i1]) elif self.min_max == "max": result[j] = np.max(data[i0:i1]) else: assert False, "min_max can be ether 'min' or 'max'" else: result[j] = np.nan result[j] *= self.scaling return result def _unpack(self, i0_0, i0_1, i2, ref_array, idx_offset) -> bool: array = load_memmap(**ref_array) result = np.apply_along_axis(self._find_min_max, 2, array[i0_0:i0_1, :, i2]) o2 = list(range(idx_offset, idx_offset + result.shape[2])) array[i0_0:i0_1, :, o2] = result return True def _args(self, rdata, ginfo): ref_array = ref_memmap(rdata.array) max_i0 = rdata.array.shape[0] rows_per_job = math.ceil(max_i0 / self.n_jobs) idx_offset = rdata._idx_offset() args = [] for i in range(0, max_i0, rows_per_job): i0_0, i0_1 = i, (i + rows_per_job) if i0_1 > max_i0: i0_1 = max_i0 i2 = ginfo.index args.append((i0_0, i0_1, i2, ref_array, idx_offset)) return args def _run( self, rdata: RasterData, group_list: list, ginfo_list: list, outname: str = "{gr}.{nm}_{pr}_{dt}", ): new_info = [] for group, ginfo in zip(group_list, ginfo_list): rdata._active_group = group start_dt_min = ginfo[RasterData.START_DT_COL].min() end_dt_max = ginfo[RasterData.END_DT_COL].max() ts_size = ginfo.shape[0] args = self._args(rdata, ginfo) for r in parallel.job( self._unpack, args, n_jobs=self.n_jobs, joblib_args={"backend": "multiprocessing"}, ): continue nm = self.min_max pr = "m" for i0, i1 in [ (i, i + self.season_size - 1) for i in range(0, ts_size, self.season_size) ]: start_dt_min = ginfo.iloc[i0][RasterData.START_DT_COL] end_dt_max = ginfo.iloc[i1][RasterData.END_DT_COL] name = rdata._set_date( outname, start_dt_min, end_dt_max, nm=nm, pr=pr, gr=group ) new_group = f"{group}.{nm}" new_info.append( rdata._new_info_row( rdata.base_raster, group=new_group, name=name, dates=[start_dt_min, end_dt_max], ) ) return None, DataFrame(new_info)
[docs] class TrendAnalysis(Derivator): def __init__( self, season_size: int, season_smoother: int = None, trend_smoother: int = None, log_rescale: tuple = None, scale_factor: int = 10000, n_jobs: int = os.cpu_count(), verbose=False, ) -> None: super().__init__(verbose=verbose, temporal=True) self.season_size = season_size self.season_smoother = season_smoother self.trend_smoother = trend_smoother self.n_jobs = n_jobs self.vmin, self.vmax = None, None if log_rescale is not None: self.vmin, self.vmax = log_rescale self.name_misc = [ ("alpha", "m", scale_factor), ("alpha", "sd", scale_factor), ("alpha", "tv", scale_factor), ("alpha", "pv", scale_factor), ("beta", "m", scale_factor), ("beta", "sd", scale_factor), ("beta", "tv", scale_factor), ("beta", "pv", scale_factor), ("r2", "m", 100), ] if self.season_smoother is None: self.season_smoother = self.season_size + 1 if self.trend_smoother is None: self.trend_smoother = (2 * self.season_size) + 1 def _trend_regression(self, data): has_nan = np.sum(np.isnan(data).astype("int")) ts_size = data.shape[0] out_size = ts_size + 9 # fixed number of ols return values if has_nan == 0: if np.std(data) == 0: nan_result = np.empty(out_size) nan_result[:] = np.nan return nan_result res = STL( data.copy(), period=self.season_size, seasonal=self.season_smoother, trend=self.trend_smoother, robust=True, ).fit() y = res.trend if self.vmin is not None: y[y > self.vmax] = self.vmax y[y < self.vmin] = self.vmin y = log1p(y / self.vmax) y_size = y.shape[0] X = np.array(range(0, y_size)) / y_size X = sm.add_constant(X) model = sm.OLS(y, X) results = model.fit() result_stack = np.stack( [results.params, results.bse, results.tvalues, results.pvalues], axis=1, ) return np.concatenate( [ res.trend, result_stack[0, :], result_stack[1, :], np.stack([results.rsquared]), ] ) else: nan_result = np.empty(out_size) nan_result[:] = np.nan return nan_result def _run( self, rdata: RasterData, group: str, outname: str = "skmap_{gr}.{nm}_{pr}_{dt}", ): array = rdata._array() info = rdata._info() start_dt_min = rdata.info[RasterData.START_DT_COL].min() end_dt_max = rdata.info[RasterData.END_DT_COL].max() new_array = parallel.apply_along_axis( self._trend_regression, axis=2, arr=array, n_jobs=self.n_jobs ) new_info = [] for index, row in info.iterrows(): start_dt = row[RasterData.START_DT_COL] end_dt = row[RasterData.END_DT_COL] nm, pr = ("trend", "m") name = rdata._set_date( outname, start_dt, end_dt, nm=nm, pr=pr, gr=group ) new_group = f"{group}.{nm}.{pr}" new_info.append( rdata._new_info_row( rdata.base_raster, group=new_group, name=name, dates=[start_dt, end_dt], ) ) ts_size = array.shape[2] for i, (nm, pr, scale) in zip( range(0, len(self.name_misc)), self.name_misc ): new_array[:, :, ts_size + i] *= scale name = rdata._set_date( outname, start_dt_min, end_dt_max, nm=nm, pr=pr, gr=group ) new_group = f"{group}.{nm}.{pr}" new_info.append( rdata._new_info_row( rdata.base_raster, group=new_group, name=name, dates=[start_dt_min, end_dt_max], ) ) return new_array, DataFrame(new_info)
[docs] class Calc(SKMapRunner): def __init__( self, expressions: dict, mask_group: str = None, mask_values: list = [], n_jobs: int = os.cpu_count(), verbose=False, ) -> None: self.n_jobs = n_jobs self.expressions = expressions self.mask_group = mask_group self.mask_values = mask_values self.date_cols = [RasterData.START_DT_COL, RasterData.END_DT_COL] def _map(self, ref_array, gmap, new_gmap): array_dict = {} array = load_memmap(**ref_array) array_mask = None if self.mask_group is not None and len(self.mask_values) >= 1: idx = gmap[self.mask_group] array_mask = np.isin(array[:, :, idx], self.mask_values) for group in gmap.keys(): idx = gmap[group] array_dict[group] = array[:, :, idx] if array_mask is not None and group != self.mask_group: array_dict[group][array_mask] = np.nan for group in self.expressions.keys(): expression = self.expressions[group] if group in gmap: idx = gmap[group] else: idx = new_gmap[group] array[:, :, idx] = ne.evaluate(expression, local_dict=array_dict) fidx = list(gmap.values())[0] return fidx def run(self, rdata: RasterData, outname: str = "skmap_{gr}_{dt}"): self.groups = list(rdata.info[RasterData.GROUP_COL].unique()) # n_dates = rdata.info[self.date_cols].value_counts().shape[0] self.new_groups = [] for key in self.expressions.keys(): if key not in self.groups: self.new_groups.append(key) args = [] ref_array = ref_memmap(rdata.array) idx_offset = rdata._idx_offset() idx_counter = 0 n_new_groups = len(self.new_groups) for _, rows in rdata.info.groupby(self.date_cols): gidx = rows.index ggroup = list(rdata.info.iloc[gidx]["group"]) gmap = {} new_gmap = {} for idx, group in zip(gidx, ggroup): gmap[group] = idx new_group_offset = idx_offset + (idx_counter * n_new_groups) for idx, new_group in zip(range(0, n_new_groups), self.new_groups): new_gmap[new_group] = new_group_offset + idx args.append((ref_array, gmap, new_gmap)) idx_counter += 1 new_info = [] for fidx in parallel.job( self._map, args, n_jobs=self.n_jobs, joblib_args={"backend": "multiprocessing"}, ): row = rdata.info.iloc[fidx] start_dt, end_dt = ( row[RasterData.START_DT_COL], row[RasterData.END_DT_COL], ) group = row[RasterData.GROUP_COL] date_format = rdata.date_args[group]["date_format"] date_style = rdata.date_args[group]["date_style"] for new_group in self.new_groups: name = rdata._set_date( outname, start_dt, end_dt, date_format=date_format, date_style=date_style, gr=new_group, ) new_info.append( rdata._new_info_row( rdata.base_raster, date_format=date_format, date_style=date_style, group=new_group, name=name, dates=[start_dt, end_dt], ) ) return None, DataFrame(new_info) def _calc(self, array_dict): if self.mask_group is not None and len(self.mask_values) >= 1: array_mask = np.isin(array_dict[self.mask_group], self.mask_values) for g in array_dict.keys(): if g != self.mask_group: array_dict[g][array_mask] = np.nan for group in self.expressions.keys(): expression = self.expressions[group] array_dict[group] = ne.evaluate(expression, local_dict=array_dict) return array_dict
except ImportError as e: from skmap.misc import _warn_deps _warn_deps(e, "skmap.io")