The multiprocessing sub-package#

The pydidas.multiprocessing module includes functionalities to run scripts and applications in parallel procesing.

class pydidas.multiprocessing.AppRunner(app: ~pydidas.core.base_app.BaseApp, n_workers: None | int = None, processor: type = <function app_processor>)#

The AppRunner is a subclassed WorkerController for running pydidas applications.

The App runner requires a BaseApp (or subclass) instance with a defined method layout as defined in the BaseApp.

The AppRunner will create a local copy of the Application and only modify the local version.

Notes

The AppRunner has the following signals which can be connected to:

sig_final_app_stateQtCore.Signal(object)

This signal emits a copy of the App after all the calculations have been performed if it needs to be used in another context.

Parameters:
  • app (pydidas.core.BaseApp) – The instance of the application to be run.

  • n_workers (int, optional) – The number of spawned worker processes. The default is None which will use the globally defined pydidas setting for the number of workers.

  • processor (Union[pydidas.multiprocessing.app_processor,) – pydidas.multiprocessing.app_processor_without_tasks] The processor to be used. The generic ‘app_processor’ requires input tasks whereas the ‘app_processor_without_tasks’ can run indefinite without any defined tasks. The app itself is responsible for managing tasks on the fly. The default is app_processor.

call_app_method(method_name: str, *args: tuple, **kwargs: dict) object#

Call a method of the app on the AppRunner’s copy.

Parameters:
  • method_name (str) – The name of the Application method.

  • *args (tuple) – Any arguments which need to be passed to the method.

  • **kwargs (kwargs) – Any keyword arguments which need to be passed to the method.

Raises:

RuntimeError – If the Application is currently running.

Returns:

result – The return object(s) from the App method call.

Return type:

object

cycle_post_run(timeout: float = 10)#

Perform the app’s final operations and shut down the workers.

Parameters:

timeout (float) – The timeout while waiting for the worker processes.

cycle_pre_run()#

Perform pre-multiprocessing operations.

This time slot is used to prepare the App by running the app.multiprocessing_pre_run, settings the tasks and starting the workers.

get_app() BaseApp#

Get the reference to the interally stored app.

Note

This method will only provide a copy of the app and keep the internal instance for further use.

Returns:

The application instance.

Return type:

pydidas.core.BaseApp

set_app_param(param_name: str, value: object)#

Set a Parameter of the Application.

Parameters:
  • param_name (str) – The name of the Application Parameter.

  • value (object) – The new value for the selected Parameter.

class pydidas.multiprocessing.WorkerController(n_workers: int | None = None, function: type | None = None, func_args: tuple = (), func_kwargs: dict | None = None)#

The WorkerController is a QThread for spawing and controlling workers.

The WorkerControllers workers are used to perform computations in parallel.

A function with defined args and kwargs can be called in separate processes with the first function argument as the variable that differs between function calls.

Parameters:
  • n_workers (Union[None, int], optional) – The number of spawned worker processes. The default is None which will use the globally defined pydidas setting for the number of workers.

  • function (type, optional) – The function to be called by the workers. If not specified at init, it must be set later for the workers to actually perform tasks.

  • func_args (tuple, optional) – The function arguments. The default is an empty tuple.

  • func_kwargs (dict, optional) – Kwywords passed to the function. The default is an empty dictionary.

add_task(task: object)#

Add a task to the worker pool.

This will add a task to the worker pool to call the function defined in :py:meth`WorkerController.change_function` method. Note that the task_arg given here will be interpreted as the first argument required by the function.

Parameters:

task (object) – The first argument for the processing function.

add_tasks(tasks: Iterable, are_stop_tasks: bool = False)#

Add tasks to the worker pool.

This will add tasks to the worker pool to call the function defined in :py:meth`WorkerController.change_function` method. Note that the task_arg given here will be interpreted as the first argument required by the function.

Parameters:
  • tasks (Union[list, tuple, set]) – An iterable of the first argument for the processing function.

  • are_stop_tasks (bool) – Keyword to signal that the added tasks are stop tasks. This flag will disable updating the task target number.

change_function(func: type, *args: tuple, **kwargs: dict)#

Change the function called by the workers.

This method stops any active processing and resets the queue. Then, the function is changed and new workers are spawned. To run the new function, new tasks must be submitted.

Parameters:
  • func (object) – The function to be called by the workers.

  • *args (tuple) – Any additional arguments which need to be passed to the function. The first function argument will always the the task.

  • **kwargs (dict) – Any keyword arguments which need to be passed to the function.

cycle_post_run(timeout: float = 10)#

Perform operations after the the main processing loop.

Parameters:

timeout (float) – The waiting time to wait on the workers to send the finished signal before raising a TimeoutError.

cycle_pre_run()#

Perform operations before entering the main processing loop.

exit(code: None | int = None)#

Call the exit method.

This exit method adds shutdown calls to the Queue threads.

Parameters:

code (Union[None, int]) – The exit code.

finalize_tasks()#

Finalize the task list.

This will add tasks to tell the workers to shut down and set a flag to quit the loop once processing is done.

join_queues()#

Joining all active queues.

join_workers()#

Join the workers back to the thread and free their resources.

property n_workers: int#

Get the number of worker processes.

Returns:

The number of workers.

Return type:

int

property progress: float#

Get the progress level of the current computations.

Returns:

The progress, normalized to the range [0, 1]. A value of -1 means that no tasks have been defined.

Return type:

float

requestInterruption()#

Request the interruption of the thread.

This is a reimplementation of the generic “requestInterruption” method of the QThread.

reset_task_list()#

Reset and clear the list of tasks.

restart()#

Restart the event loop.

This method allow the event loop to run and to submit tasks via the queue to the workers.

run()#

Run the thread event loop.

This method is automatically called upon starting the thread.

send_stop_signal()#

Send stop signal to workers.

This method will send stop signals to all workers. Note that the runtime of the workers will be determined by the runtime of the called function. The currrent call will be finished before the stop signal will be processed.

stop()#

Stop the thread from running.

suspend()#

Suspend the event loop.

This method effectively suspends the event loop and calls for the workers to be joined again. New calculations can be performed by using the :py:meth`WorkerController.restart` method.

write_lock()#

Set up the write lock for adding tasks to the list.

pydidas.multiprocessing.app_processor(input_queue: Queue, output_queue: Queue, stop_queue: Queue, aborted_queue: Queue, app: type, app_params: ParameterCollection, app_config: dict, **kwargs: dict)#

Start a loop to process function calls on individual frames.

This function starts a while loop to call the supplied function with indices supplied by the queue. Results will be written to the output queue in a format [input_arg, results]

Parameters:
  • input_queue (multiprocessing.Queue) – The input queue which supplies the processor with indices to be processed.

  • output_queue (multiprocessing.Queue) – The queue for transmissing the results to the controlling thread.

  • stop_queue (multiprocessing.Queue) – The queue for sending a termination signal to the worker.

  • aborted_queue (multiprocessing.Queue) – The queue which is used by the processor to signal the calling thread that it has aborted its cycle.

  • app (type) – The Application class to be called in the process. The App must have a multiprocessing_func method.

  • app_params (ParameterCollection) – The App ParameterCollection used for creating the app.

  • app_config (dict) – The dictionary which is used for overwriting the app._config dictionary.

  • **kwargs (dict) –

    Supported keyword arguments are:

    wait_for_output_queuebool, optional

    Flag to wait for the output queue to be empty before shutting down the worker. The default is True.

    logging_levelint, optional

    The logger’s logging level. The default is the pydidas default logging level.

pydidas.multiprocessing.app_processor_without_tasks(input_queue: Queue, output_queue: Queue, stop_queue: Queue, aborted_queue: Queue, app: type, app_params: ParameterCollection, app_config: dict, **kwargs: dict)#

Start a loop to process function calls on individual frames.

This function starts a while loop to call the supplied function without actually using the input queue (but the input queue is kept for compatibility). Results will be written to the output queue in a format [item, results]

Parameters:
  • input_queue (multiprocessing.Queue) – The input queue which supplies the processor with indices to be processed.

  • output_queue (multiprocessing.Queue) – The queue for transmissing the results to the controlling thread.

  • stop_queue (multiprocessing.Queue) – The queue for sending a termination signal to the worker.

  • aborted_queue (multiprocessing.Queue) – The queue which is used by the processor to signal the calling thread that it has aborted its cycle.

  • app (BaseApp) – The Application class to be called in the process. The App must have a multiprocessing_func method.

  • app_params (ParameterCollection) – The App ParameterCollection used for creating the app.

  • app_config (dict) – The dictionary which is used for overwriting the app._config dictionary.

  • **kwargs (dict) –

    Supported keyword arguments are:

    wait_for_output_queuebool, optional

    Flag to wait for the output queue to be empty before shutting down the worker. The default is True.

    logging_levelint, optional

    The logger’s logging level. The default is the pydidas default logging level.

pydidas.multiprocessing.processor(input_queue: Queue, output_queue: Queue, stop_queue: Queue, aborted_queue: Queue, function: type, *func_args: tuple, **func_kwargs: dict)#

Start a loop to process function calls on individual frames.

This function starts a while loop to call the supplied function with indices supplied by the queue. Results will be written to the output queue in a format [input_arg, results]

Parameters:
  • input_queue (multiprocessing.Queue) – The input queue which supplies the processor with indices to be processed.

  • output_queue (multiprocessing.Queue) – The queue for transmissing the results to the controlling thread.

  • stop_queue (multiprocessing.Queue) – The queue for sending a termination signal to the worker.

  • aborted_queue (multiprocessing.Queue) – The queue which is used by the processor to signal the calling thread that it has aborted its cycle.

  • function (type) – The function to be called in the process. The function must accept the first argument from the queue and all additional arguments and keyword arguments from the calling arguments of processor.

  • *func_args (tuple) – The function calling arguments save the first.

  • **func_kwargs (dict) – The keyword arguments for the function.