skmap.parallel.utils.TaskSequencer#

class TaskSequencer(tasks, mem_usage_limit=0.75, wait_timeout=5, verbose=False)[source]#

Bases: object

Execute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data

Parameters:
  • tasks (Union[List[Callable], List[tuple]]) – Task definition list, where each element can be: (1) a Callable function; (2) a tuple containing a Callable function and the number of workers for the task; or (3) a tuple containing a Callable function, the number of workers and an bool indication if the task would respect the mem_usage_limit. The default number of workers is 1.

  • mem_usage_limit (float) – Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if the task_1 is responsible for reading the data and task_2 for processing it, the task_1 definition can receive an bool indication to respect the mem_usage_limit, allowing the task_2 to process the data that has already been read and releasing memory for the next task_1 reads.

  • wait_timeout (int) – Timeout argument used by concurrent.futures.wait.

  • verbose (bool) – Use True to print the communication and status of the tasks

Examples

Pipeline produced by this example code:

               ----------      ----------
input_data ->  | task_1 |  ->  | task_2 |  ->  output_data
                ----------      ----------
                |              |
                |-worker_1     |-worker_1
                               |-worker_2

Methods

run

Run the task pipeline considering the input_data argument.

run(input_data)[source]#

Run the task pipeline considering the input_data argument.

Parameters:

input_data (List[tuple]) – Input data used to feed the first task.

Returns:

List of returned values produced by the last task and with the same size of the input_data argument.

Return type:

List

Examples

>>> from skmap.misc import ttprint
>>> from skmap.parallel import TaskSequencer
>>> import time
>>>
>>> def rnd_data(const, size):
...     data = np.random.rand(size, size, size)
...     time.sleep(2)
...     return (const, data)
>>>
>>> def max_value(const, data):
...     ttprint(f'Calculating the max value over {data.shape}')
...     time.sleep(8)
...     result = np.max(data + const)
...     return result
>>>
>>> taskSeq = TaskSequencer(
...     tasks=[
...         rnd_data,
...         (max_value, 2)
...     ],
...     verbose=True
... )
[...] Starting 1 worker(s) for rnd_data (mem_check=False)
[...] Starting 2 worker(s) for max_value (mem_check=False)
>>>
>>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ])
>>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ])