Source code for skmap.parallel.utils

"""
Parallelization helpers based in thread/process pools and joblib
"""

import gc
import multiprocessing
import time
from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, as_completed, wait
from pathlib import Path
from typing import Any, Callable, Iterator, List, Union

import geopandas as gpd
import numpy
import numpy as np
import psutil
import rasterio
from rasterio.mask import mask
from rasterio.windows import from_bounds
from shapely.geometry import Polygon

from ..misc import ttprint

CPU_COUNT = multiprocessing.cpu_count()
"""
Number of CPU cores available.
"""

executor = None


def _mem_usage() -> float:
    mem = psutil.virtual_memory()
    return mem.used / mem.total


def _run_task(i, task, mem_usage_limit, mem_check_interval, mem_check, verbose, *args):
    while _mem_usage() > mem_usage_limit and mem_check:
        if verbose:
            ttprint(
                f"Memory usage in {_mem_usage():.2f}%, stopping workers for {task.__name__}"
            )
        time.sleep(mem_check_interval)
        gc.collect()

    return task(*args)


[docs] def ThreadGeneratorLazy( worker: Callable, args: Iterator[tuple], max_workers: int = CPU_COUNT, chunk: int = CPU_COUNT * 2, fixed_args: tuple = (), ): """ Execute a function in parallel using a ``ThreadPoolExecutor`` [1]. :param worker: Function to execute in parallel. :param args: Argument iterator where each element is send job of the pool. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param chunk: Number of chunks to split the parallelization jobs. :param fixed_args: Constant arguments added in ``args`` in each execution of the ``worker`` function. :returns: A generator with the return of all workers :rtype: Generator Examples ======== >>> from skmap.parallel import ThreadGeneratorLazy >>> >>> def worker(i, msg): ... print(f'{i}: {msg}') ... return f'Worker {i} finished' >>> >>> args = iter([ (i,) for i in range(0,5)]) >>> fixed_args = ("I'm running in parallel", ) >>> >>> for result in ThreadGeneratorLazy(worker, args, fixed_args=fixed_args): # doctest: +SKIP ... print(result) References ========== [1] `Python ThreadPoolExecutor class <https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor>`_ """ import concurrent.futures from itertools import islice with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: group = islice(args, chunk) futures = {executor.submit(worker, *arg + fixed_args) for arg in group} while futures: done, futures = concurrent.futures.wait( futures, return_when=concurrent.futures.FIRST_COMPLETED ) for future in done: yield future.result() group = islice(args, chunk) for arg in group: futures.add(executor.submit(worker, *arg + fixed_args))
[docs] def ProcessGeneratorLazy2(worker: Callable, args: Iterator[tuple]): import concurrent.futures import multiprocessing global executor if executor is None: max_workers = multiprocessing.cpu_count() ttprint(f"Initializing pool with {max_workers} workers") executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) futures = {executor.submit(worker, *arg) for arg in args} done, not_done = concurrent.futures.wait( futures, return_when=concurrent.futures.FIRST_EXCEPTION ) for task in done: err = task.exception() if err is not None: raise err else: yield task.result()
[docs] def ProcessGeneratorLazy( worker: Callable, args: Iterator[tuple], max_workers: int = CPU_COUNT, chunk: int = CPU_COUNT * 2, fixed_args: tuple = (), ): """ Execute a function in parallel using a ``ProcessPoolExecutor`` [1]. :param worker: Function to execute in parallel. :param args: to separate job of the pool. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param chunk: Number of chunks to split the parallelization jobs. :param fixed_args: Constant arguments added in ``args`` in each execution of the ``worker`` function. :returns: A generator with the return of all workers :rtype: Generator Examples ======== >>> from skmap.parallel import ProcessGeneratorLazy >>> >>> def worker(i, msg): ... print(f'{i}: {msg}') ... return f'Worker {i} finished' >>> >>> args = iter([ (i,) for i in range(0,5)]) >>> fixed_args = ("I'm running in parallel", ) >>> >>> for result in ProcessGeneratorLazy(worker, args, fixed_args=fixed_args): # doctest: +SKIP ... print(result) References ========== [1] `Python ProcessPoolExecutor class <https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor>`_ """ import concurrent.futures from itertools import islice with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: group = islice(args, chunk) futures = {executor.submit(worker, *arg + fixed_args) for arg in group} while futures: done, futures = concurrent.futures.wait( futures, return_when=concurrent.futures.FIRST_COMPLETED ) for future in done: yield future.result() group = islice(args, chunk) for arg in group: futures.add(executor.submit(worker, *arg + fixed_args))
[docs] def job( worker: Callable, worker_args: Iterator[tuple], n_jobs: int = -1, joblib_args: set = {}, ): """ Execute a function in parallel using joblib [1]. :param worker: Function to execute in parallel. :param worker_args: Argument iterator where each element is send to separate job. :param joblib_args: Number of CPU cores to use in the parallelization. By default all cores are used. :param joblib_args: Joblib argumets to send to ``Parallel class`` [1]. :returns: A generator with the return of all workers :rtype: Generator Examples ======== >>> from skmap import parallel >>> >>> def worker(i, msg): ... print(f'{i}: {msg}') ... return f'Worker {i} finished' >>> >>> msg = ("I'm running in parallel", ) >>> args = iter([ (i,msg) for i in range(0,5)]) >>> >>> for result in parallel.job(worker, args, n_jobs=-1, joblib_args={'backend': 'threading'}): # doctest: +SKIP ... print(result) References ========== [1] `joblib.Parallel class <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html#joblib.Parallel>`_ """ from joblib import Parallel, delayed joblib_args["n_jobs"] = n_jobs with Parallel(**joblib_args) as parallel: for worker_result in parallel(delayed(worker)(*args) for args in worker_args): yield worker_result
[docs] def apply_along_axis( worker: Callable, axis: int, arr: numpy.array, n_jobs: int = CPU_COUNT, *args: any, **kwargs: any, ): """ Execute a function through a ``numpy.array`` axis in parallel [1]. It uses joblib and ``backend=loky``, so avoid to send shared memory objects as arguments. :param worker: Function to execute in parallel. It needs to have at least one argument (``numpy.array``). :param axis: Axis used to execute the worker. :param arr: The input array. :param n_jobs: Number of parallel jobs to run the worker function :param args: Additional arguments to the worker. :param kwargs: Additional named arguments to the worker. :returns: The output array with one dimension less than the input array. :rtype: numpy.array Examples ======== >>> import multiprocessing >>> import numpy as np >>> from skmap import parallel >>> >>> def fn(arr, const): ... return np.sum(arr) + const >>> >>> const = 1 >>> arr = np.ones((100,100,100)) >>> >>> out = parallel.apply_along_axis(fn, 0, arr, 4, const) # doctest: +SKIP >>> print(arr.shape, out.shape) # doctest: +SKIP References ========== [1] `Best answer from Eric O Lebigot <https://stackoverflow.com/a/45555516>`_ """ import numpy as np def run(worker, axis, arr, args, kwargs): return np.apply_along_axis(worker, axis, arr, *args, **kwargs) """ Like numpy.apply_along_axis(), but takes advantage of multiple cores. """ # Effective axis where apply_along_axis() will be applied by each # worker (any non-zero axis number would work, so as to allow the use # of `np.array_split()`, which is only done on axis 0): effective_axis = 1 if axis == 0 else axis if effective_axis != axis: arr = arr.swapaxes(axis, effective_axis) # Chunks for the mapping (only a few chunks): chunks = [ (worker, effective_axis, sub_arr, args, kwargs) for sub_arr in np.array_split(arr, n_jobs) ] result = [] for r in job(run, chunks): result.append(r) return np.concatenate(result)
[docs] class TilingProcessing: """ Execute a processing function in parallel considering a tiling system and a base raster. It creates a rasterio ``window`` object for each tile according to the pixel size of the specified base. :param tiling_system_fn: Vector file path with the tiles to read. :param base_raster_fn: Raster file path used the retrieve the ``affine transformation`` for ``windows``. :param verbose: Use ``True`` to print information about read tiles and the base raster. """ def __init__( self, tiling_system_fn="http://s3.eu-central-1.wasabisys.com/skmap/tiling_system_30km.gpkg", base_raster_fn="http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201903_skmap_epsg3035_v1.0.tif", verbose: bool = False, epsg_checking: bool = True, ) -> None: from pyproj import CRS self.tiles = gpd.read_file(tiling_system_fn) self.num_tiles = self.tiles.shape[0] self.base_raster = rasterio.open(base_raster_fn) tile_epsg = CRS(self.tiles.crs.to_wkt()).to_epsg() raster_epsg = CRS(self.base_raster.crs.to_wkt()).to_epsg() if epsg_checking and tile_epsg != raster_epsg: raise Exception( "Different SpatialReference" + f"\n tiling_system_fn:\n{self.tiles.crs.to_wkt()}" + f"\n base_raster_fn:\n{self.base_raster.crs.to_wkt()}" ) if verbose: pixel_size = self.base_raster.transform[0] ttprint(f"Pixel size equal {pixel_size} in {Path(base_raster_fn).name}") ttprint( f"{self.num_tiles} tiles available in {Path(tiling_system_fn).name}" ) ttprint(f"Using EPSG:{raster_epsg}") def _tile_window(self, idx): tile = self.tiles.iloc[idx] left, bottom, right, top = tile.geometry.bounds return tile, from_bounds(left, bottom, right, top, self.base_raster.transform)
[docs] def process_one(self, idx: int, func: Callable, *args: any): """ Process a single tile using the specified function args. :param idx: The tile id to process. This idx is generated for all the tiles in a sequence starting from ``0``. :param func: A function with at least the arguments ``idx, tile, window``. :param args: Additional arguments to send to the function. Examples ======== >>> from skmap.parallel import TilingProcessing >>> from skmap.io.base import read_rasters >>> >>> def run(idx, tile, window, raster_files): ... data, _ = read_rasters(raster_files=raster_files, spatial_win=window, verbose=True) ... print(f'Tile {idx}: data read {data.shape}') >>> >>> raster_files = [ ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201903_skmap_epsg3035_v1.0.tif', # winter ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201906_skmap_epsg3035_v1.0.tif', # spring ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201909_skmap_epsg3035_v1.0.tif', # summer ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201912_skmap_epsg3035_v1.0.tif' # fall ... ] >>> >>> tiling= TilingProcessing(verbose=True) # doctest: +SKIP >>> tiling.process_one(0, run, raster_files) # doctest: +SKIP """ tile, window = self._tile_window(idx) return func(idx, tile, window, *args)
[docs] def process_multiple( self, idx_list: List[int], func: Callable, *args: any, max_workers: int = CPU_COUNT, use_threads: bool = True, progress_bar: bool = False, ): """ Process in parallel a list of tile using the specified function args. :param idx: The tile ids to process. This idx is generated for all the tiles in a sequence starting from ``0``. :param func: A function with at least the arguments ``idx, tile, window``. :param args: Additional arguments to send to the function. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param use_threads: If ``True`` the parallel processing uses ``ThreadGeneratorLazy``, otherwise it uses ProcessGeneratorLazy. :param progress_bar: If ``True`` the parallel processing uses ``pqdm`` [1] presenting a progress bar and ignoring the ``use_threads``. Examples ======== >>> from skmap.parallel import TilingProcessing >>> from skmap.io.base import read_rasters >>> >>> def run(idx, tile, window, raster_files): ... data, _ = read_rasters(raster_files=raster_files, spatial_win=window, verbose=True) ... print(f'Tile {idx}: data read {data.shape}') >>> >>> raster_files = [ ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201903_skmap_epsg3035_v1.0.tif', # winter ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201906_skmap_epsg3035_v1.0.tif', # spring ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201909_skmap_epsg3035_v1.0.tif', # summer ... 'http://s3.eu-central-1.wasabisys.com/skmap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201912_skmap_epsg3035_v1.0.tif' # fall ... ] >>> >>> tiling= TilingProcessing(verbose=True) # doctest: +SKIP >>> idx_list = [0,10,100] >>> result = tiling.process_multiple(idx_list, run, raster_files) # doctest: +SKIP References ========== [1] `Parallel TQDM <https://pqdm.readthedocs.io/en/latest/readme.html>`_ """ _args = [] for idx in idx_list: tile, window = self._tile_window(idx) _args.append((idx, tile, window, *args)) if progress_bar: try: from pqdm.processes import pqdm as ppqdm from pqdm.threads import pqdm as tpqdm pqdm = tpqdm if use_threads else ppqdm results = pqdm( iter(_args), func, n_jobs=max_workers, argument_type="args" ) except ImportError as e: from ..misc import _warn_deps _warn_deps(e, "progress bar") progress_bar = False if not progress_bar: WorkerPool = ThreadGeneratorLazy if use_threads else ProcessGeneratorLazy results = [] for r in WorkerPool( func, iter(_args), max_workers=max_workers, chunk=max_workers * 2 ): results.append(r) return results
[docs] def process_all( self, func: Callable, *args: any, max_workers: int = CPU_COUNT, use_threads: bool = True, progress_bar: bool = False, ): """ Process in parallel all of tile using the specified function args. :param func: A function with at least the arguments ``idx, tile, window``. :param args: Additional arguments to send to the function. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param use_threads: If ``True`` the parallel processing uses ``ThreadGeneratorLazy``, otherwise it uses ProcessGeneratorLazy. :param progress_bar: If ``True`` the parallel processing uses ``pqdm`` [1] presenting a progress bar, ignoring the ``use_threads``. Examples ======== >>> from skmap.parallel import TilingProcessing >>> from skmap.io.base import read_rasters >>> >>> def run(idx, tile, window, msg): ... print(f'Tile {idx} => {msg}') >>> >>> tiling= TilingProcessing(verbose=True) # doctest: +SKIP >>> msg = "Let's crunch some data." >>> result = tiling.process_all(run) # doctest: +SKIP References ========== [1] `Parallel TQDM <https://pqdm.readthedocs.io/en/latest/readme.html>`_ """ idx_list = range(0, self.num_tiles) return self.process_multiple( idx_list, func, *args, max_workers=max_workers, use_threads=use_threads, progress_bar=progress_bar, )
[docs] @staticmethod def generate_tiles( tile_size: int, extent: tuple, crs: str, raster_layer_fn: str = None, ): """ Generate a custom tiling system based on a regular grid. :param tile_size: Single value used to define the width and height of a individual tile. It assumes the same unit of ``crs`` (degree for geographic coordinate systems and meter for projected coordinate systems). Tiles outside of the image are clipped to fit in the informed extent. :param extent: Extent definition considering ``minx, miny, maxx, maxy`` according to the ``crs`` argument. :param crs: Coordinate reference system for the tile geometries. Can be anything accepted by pyproj.CRS.from_user_input(), such as an authority string (EPSG:4326) or a WKT/proj4 string. :param raster_layer_fn: If provided, for each tile the ``min``, ``max`` and ``mode`` values are calculated considering the raster pixels inside the tile. It assumes the same ``crs`` for the raster layer and tiles. :returns: Tiling system with follow columns: ``tile_id``, ``minx``, ``miny``, ``maxx``, ``maxy`` and ``geometry``. The additional columns ``raster_min``, ``raster_mode_value``, ``raster_mode_count`` and ``raster_max`` are returned when a raster layer is provided. :rtype: geopandas.GeoDataFrame Examples ======== >>> from skmap.parallel.utils import TilingProcessing >>> skmap_extent = (900000, 930010, 6540000, 5460010) >>> tiling_system = TilingProcessing.generate_tiles(30000, skmap_extent, 'epsg:3035') >>> tiling_system.to_file(tiling_system_fn, driver="GPKG") # doctest: +SKIP """ minx, miny, maxx, maxy = extent data = { "tile_id": [], "minx": [], "miny": [], "maxx": [], "maxy": [], "geometry": [], } tile_id = 0 for x1 in np.arange(minx, maxx, tile_size): for y1 in np.arange(miny, maxy, tile_size): x2 = x1 + tile_size if x2 > maxx: x2 = maxx y2 = y1 + tile_size if y2 > maxy: y2 = maxy data["tile_id"].append(tile_id) data["minx"].append(x1) data["miny"].append(y1) data["maxx"].append(x2) data["maxy"].append(y2) data["geometry"].append( Polygon([(x1, y1), (x2, y1), (x2, y2), (x1, y2)]) ) tile_id += 1 tiles = gpd.GeoDataFrame(data).set_crs(crs, inplace=True) if raster_layer_fn is not None: def _raster_values(tile, raster_layer_fn): shapes = [tile["geometry"]] try: with rasterio.open(raster_layer_fn) as src: out_image, out_transform = mask( src, shapes, crop=True, filled=True ) out_image = out_image.astype("float32") nodata_val = src.nodatavals[0] _values, _counts = np.unique(out_image, return_counts=True) values, counts = [], [] for v, c in zip(_values, _counts): if v != nodata_val: values.append(v) counts.append(c) values = np.array(values) counts = np.array(counts) m = np.argmax(counts) tile["raster_min"] = np.min(values) tile["raster_mode_value"] = values[m] tile["raster_mode_count"] = counts[m] tile["raster_max"] = np.max(values) except: tile["raster_min"] = None tile["raster_mode_value"] = None tile["raster_mode_count"] = None tile["raster_max"] = None return tile args = [(tiles.loc[i, :], raster_layer_fn) for i in range(0, len(tiles))] result = [] for t in job(_raster_values, args): result.append(t) tiles = gpd.GeoDataFrame(result).set_crs(crs, inplace=True) return tiles
[docs] class TaskSequencer: """ Execute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data :param tasks: Task definition list, where each element can be: (1) a ``Callable`` function; (2) a tuple containing a ``Callable`` function and the number of workers for the task; or (3) a tuple containing a ``Callable`` function, the number of workers and an ``bool`` indication if the task would respect the ``mem_usage_limit``. The default number of workers is ``1``. :param mem_usage_limit: Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if the ``task_1`` is responsible for reading the data and ``task_2`` for processing it, the ``task_1`` definition can receive an ``bool`` indication to respect the ``mem_usage_limit``, allowing the ``task_2`` to process the data that has already been read and releasing memory for the next ``task_1`` reads. :param wait_timeout: Timeout argument used by ``concurrent.futures.wait``. :param verbose: Use ``True`` to print the communication and status of the tasks Examples ======== .. testcode:: :skip: >>> from skmap.parallel import TaskSequencer >>> >>> output = TaskSequencer( ... tasks=[ ... task_1, ... (task_2, 2) ... ] ... ) # doctest: +SKIP Pipeline produced by this example code:: ---------- ---------- input_data -> | task_1 | -> | task_2 | -> output_data ---------- ---------- | | |-worker_1 |-worker_1 |-worker_2 """ def __init__( self, tasks: Union[List[Callable], List[tuple]], mem_usage_limit: float = 0.75, wait_timeout: int = 5, verbose: bool = False, ) -> None: self.wait_timeout = wait_timeout self.mem_usage_limit = mem_usage_limit self.verbose = verbose self.mem_check_interval = 10 self.tasks = [] self.pipeline = [] self.mem_checks = [] for task in tasks: pool_size = 1 mem_check = False if type(task) is tuple: if len(task) == 2: task, pool_size = task else: task, pool_size, mem_check = task self._verbose( f"Starting {pool_size} worker(s) for {task.__name__} (mem_check={mem_check})" ) self.tasks.append(task) self.pipeline.append(ProcessPoolExecutor(max_workers=pool_size)) self.mem_checks.append(mem_check) self.n_tasks = len(self.tasks) self.pipeline_futures = [set() for i in range(0, self.n_tasks)] def _verbose(self, *args: Any, **kwargs: Any) -> None: if self.verbose: ttprint(*args, **kwargs)
[docs] def run(self, input_data: List[tuple]): """ Run the task pipeline considering the ``input_data`` argument. :param input_data: Input data used to feed the first task. :returns: List of returned values produced by the last task and with the same size of the ``input_data`` argument. :rtype: List Examples ======== >>> from skmap.misc import ttprint >>> from skmap.parallel import TaskSequencer >>> import time >>> >>> def rnd_data(const, size): ... data = np.random.rand(size, size, size) ... time.sleep(2) ... return (const, data) >>> >>> def max_value(const, data): ... ttprint(f'Calculating the max value over {data.shape}') ... time.sleep(8) ... result = np.max(data + const) ... return result >>> >>> taskSeq = TaskSequencer( ... tasks=[ ... rnd_data, ... (max_value, 2) ... ], ... verbose=True ... ) [...] Starting 1 worker(s) for rnd_data (mem_check=False) [...] Starting 2 worker(s) for max_value (mem_check=False) >>> >>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ]) # doctest: +SKIP >>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ]) # doctest: +SKIP """ for i_dta in input_data: self._verbose(f"Submission to {self.tasks[0].__name__}") self.pipeline_futures[0].add( self.pipeline[0].submit( _run_task, *( ( 0, self.tasks[0], self.mem_usage_limit, self.mem_check_interval, self.mem_checks[0], self.verbose, *i_dta, ) ), ) ) keep_going = True while keep_going: keep_going_aux = [] for i in range(0, self.n_tasks - 1): done, self.pipeline_futures[i] = wait( self.pipeline_futures[i], return_when=FIRST_COMPLETED, timeout=self.wait_timeout, ) if len(done) > 0: self._verbose( f"{self.tasks[i].__name__} state: done={len(done)} waiting={len(self.pipeline_futures[i])}" ) for f in done: nex_i = i + 1 self._verbose(f"Submission to {self.tasks[nex_i].__name__}") self.pipeline_futures[nex_i].add( self.pipeline[nex_i].submit( _run_task, *( nex_i, self.tasks[nex_i], self.mem_usage_limit, self.mem_check_interval, self.mem_checks[nex_i], self.verbose, *f.result(), ), ) ) keep_going_aux.append(len(self.pipeline_futures[i]) > 0) keep_going = any(keep_going_aux) self._verbose(f"Waiting {self.tasks[-1].__name__}") result = [future.result() for future in as_completed(self.pipeline_futures[-1])] return result