skmap.parallel.utils.TaskSequencer#
- class TaskSequencer(tasks, mem_usage_limit=0.75, wait_timeout=5, verbose=False)[source]#
Bases:
objectExecute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data
- Parameters:
tasks (
Union[List[Callable],List[tuple]]) – Task definition list, where each element can be: (1) aCallablefunction; (2) a tuple containing aCallablefunction and the number of workers for the task; or (3) a tuple containing aCallablefunction, the number of workers and anboolindication if the task would respect themem_usage_limit. The default number of workers is1.mem_usage_limit (
float) – Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if thetask_1is responsible for reading the data andtask_2for processing it, thetask_1definition can receive anboolindication to respect themem_usage_limit, allowing thetask_2to process the data that has already been read and releasing memory for the nexttask_1reads.wait_timeout (
int) – Timeout argument used byconcurrent.futures.wait.verbose (
bool) – UseTrueto print the communication and status of the tasks
Examples
Pipeline produced by this example code:
---------- ---------- input_data -> | task_1 | -> | task_2 | -> output_data ---------- ---------- | | |-worker_1 |-worker_1 |-worker_2
Methods
Run the task pipeline considering the
input_dataargument.- run(input_data)[source]#
Run the task pipeline considering the
input_dataargument.- Parameters:
input_data (
List[tuple]) – Input data used to feed the first task.- Returns:
List of returned values produced by the last task and with the same size of the
input_dataargument.- Return type:
List
Examples
>>> from skmap.misc import ttprint >>> from skmap.parallel import TaskSequencer >>> import time >>> >>> def rnd_data(const, size): ... data = np.random.rand(size, size, size) ... time.sleep(2) ... return (const, data) >>> >>> def max_value(const, data): ... ttprint(f'Calculating the max value over {data.shape}') ... time.sleep(8) ... result = np.max(data + const) ... return result >>> >>> taskSeq = TaskSequencer( ... tasks=[ ... rnd_data, ... (max_value, 2) ... ], ... verbose=True ... ) [...] Starting 1 worker(s) for rnd_data (mem_check=False) [...] Starting 2 worker(s) for max_value (mem_check=False) >>> >>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ]) >>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ])