Developers guide to pydidas multiprocessing

Pydidas offers multiprocessing using the python multiprocessing package with a controller process in a separate thread to prevent the caller from blocking.

The WorkerController is the generic pydidas implementation and the AppRunner is the subclassed version to run pydidas apps.

WorkerController

Pydidas uses the WorkerController class to run generic (function) tasks.

Communication with workers

Communication with the workers is handles by four queues:

  • send queue to send tasks to the workers.

  • reveiver queue to receive results from the workers.

  • stop queue to send stop signals to the workers.

  • finished queue for the workers to signal they have completed all tasks.

The user does not have to interact with the queues itself, this is handled by the WorkerController.

If the user wants to stop the workers, they can use the send_stop_signal method:

WorkerController.send_stop_signal()

Send stop signal to workers.

This method will send stop signals to all workers. Note that the runtime of the workers will be determined by the runtime of the called function. The currrent call will be finished before the stop signal will be processed.

Communication with the user

The WorkerController uses Qt’s slots and signals for communicating results to the user.

Three signals are provided:

  • sig_progress(float): This signal is emitted after a result has been received and gives the current progress, normalized to the range [0, 1).

  • sig_results(object, object): This signal is emitted for each result and returns the tuple (task argument, task result) to allow identification of the result.

  • sig_finshed: The finished signal is emitted once all tasks have been performed and all workers have finished.

Note

It is the responsibility of the user to connect to the signals prior to starting a process to receive the results. Because of the Qt framework’s behaviour, an eventloop must be running for the signals to be processed.

Key methods

The following methods are key to using the WorkerController :

method name

description

change_function(func, *args, **kwargs)

Change the function to be called by the workers. *args and **kwargs can be any additional calling arguments to the function. The first calling argument will always be the task.

add_task(task)

Add the given task to the list of tasks to be processed.

add_tasks(tasks)

Add all individual tasks from the iterable argument to the list of tasks to be processed.

finalize_tasks()

This method will add stop tasks to the queue to inform the workers that all tasks have been successfully finished. Calling this method will also flag the workers to finish and the processes will terminate after finishing all calculations.

start()

The run method will start the thread event loop, start the worker processes and submit all tasks to the queue.

suspend()

Suspend will temporarily suspend the event loop. Note that all submitted tasks will still be processed by the workers but no new tasks will be submitted and no results will be processed.

restart()

This method will restart processing of the event loop.

Examples

Minimal working example

The following minimal working example can be run from an interactive console or saved as file.

import time
import pydidas
import numpy as np

from qtpy import QtTest


def test_func(task, slope, offset):
    return task* slope + offset


def run_worker_controller():
    worker_controller = pydidas.multiprocessing.WorkerController()
    worker_controller.change_function(test_func, 2, 5)
    result_spy = QtTest.QSignalSpy(worker_controller.sig_results)

    worker_controller.add_tasks(np.arange(10))
    worker_controller.finalize_tasks()
    worker_controller.start()


    while True:
        print("Progress at ", worker_controller.progress)
        if worker_controller.progress >= 1:
            break
        time.sleep(0.5)

    results = sorted(result_spy)
    print(results)

    print("WorkerController is alive: ", worker_controller.is_alive())


if __name__ == "__main__":
    run_worker_controller()

Working example with restart of the Thread

In the following example, not calling the finalize_tasks will keep the thread alive and allow the submission of new tasks.

import time
import pydidas
import numpy as np

from qtpy import QtTest


def test_func(task, slope, offset):
    return task* slope + offset


def run_worker_controller_with_restart():

    worker_controller = pydidas.multiprocessing.WorkerController()
    worker_controller.change_function(test_func, 2, 5)
    result_spy = QtTest.QSignalSpy(worker_controller.sig_results)

    worker_controller.add_tasks(np.arange(10))
    # worker_controller.finalize_tasks()
    worker_controller.start()

    print("\nWaiting for results ...")
    with pydidas.core.utils.TimerSaveRuntime() as runtime:
        while True:
            if worker_controller.progress >= 1:
                break
            time.sleep(0.005)
    print("Runtime was ", runtime())

    results = sorted(result_spy)
    print("Results: ", results)
    print("WorkerController is alive: ", worker_controller.isRunning())

    worker_controller.add_tasks(np.arange(10, 20))

    print("\nWaiting for results ...")
    with pydidas.core.utils.TimerSaveRuntime() as runtime:
        while True:
            if worker_controller.progress >= 1:
                break
            time.sleep(0.005)
    print("Runtime was ", runtime())

    results = sorted(result_spy)
    print("Results: ", results)

    # now, if we suspend it, to change the function, and to add more items to
    # its tasks but they will not be processed:
    worker_controller.suspend()

    worker_controller.change_function(test_func, -1, 0)
    worker_controller.add_tasks(np.arange(20, 30))

    time.sleep(0.2)

    # restarting will spawn new Processes to carry out the calculations:
    worker_controller.restart()

    print("\nWaiting for results ...")
    with pydidas.core.utils.TimerSaveRuntime() as runtime:
        while True:
            if worker_controller.progress >= 1:
                break
            time.sleep(0.005)
    print("Runtime was ", runtime())

    results = sorted(result_spy)
    print("Results: ", results)
    print("WorkerController is alive: ", worker_controller.isRunning())


if __name__ == "__main__":
    # run_worker_controller()
    run_worker_controller_with_restart()

AppRunner

The AppRunner is the specialized subclass to work with pydidas Apps.

A sequence diagram of the communication with the AppRunner is given below.

../_images/AppRunner_sequence.png

It is a QObject and uses signals and slots for communicating with the main event loop. The app will be executed in independent processes in the The app_processor .

For a full description of the BaseApp and how it works, please refer to the Developers guide to pydidas applications.

AppRunner signals

The AppRunner uses the following signals:

signal name

type

description

sig_progress

float

This signal emits the relative progress once a result has been received from a worker. The values are in the range [0, 1].

sig_results

(int, object)

The task number and results are emitted as a signal once they have been received from the workers.

finished

None

This generic QThread signal is emitted once the processing has been completed.

sig_final_app_state

object

After the AppRunner’s local copy of the app has finished processing all results, this signal sends the local app’s state back to the main event loop.

The app_processor

The app_processor is the pydidas function which runs App tasks in a separate process. Tasks and result notifications are exchanged via queues. The transfer of results to the AppRunner process must be handled by the app and can be implemented to the developer’s own taste. Because all queued data is pickled, it is not advisable to send large data over the queue but instead to use the multiprocessing shared memory.

The app_processor's event loop is summarized in the flowchart below:

../_images/app_proc_logic_flow_chart.png

Example

The following example is a minimal working example. A TestApp has been written which performs a simple arithmetic operation on the numbers 0..20. Because signals and slots only work when the Qt event loop is running, a QCoreApplication is started and a test object is used to receive the AppRunner's signals.

import numpy as np
from qtpy import QtCore

import pydidas


class TestApp(pydidas.core.BaseApp):

    def __init__(self, *args, **kwargs):
        pydidas.core.BaseApp.__init__(self, *args, **kwargs)
        self._n = 20
        self.results = np.zeros((self._n))

    def multiprocessing_get_tasks(self):
        return np.arange(self._n)

    def multiprocessing_func(self, index):
        return 3 * index + 5

    @QtCore.Slot(int, object)
    def multiprocessing_store_results(self, index, *args):
        self.results[index] = args[0]


class TestObject(QtCore.QObject):

    def __init__(self):
        QtCore.QObject.__init__(self)
        self.app = None
        self.results = []

    @QtCore.Slot(object)
    def store_app(self, app):
        self.app = app

    @QtCore.Slot(int, object)
    def store_results(self, index, *results):
        self.results.append([index, results[0]])


def run_app_runner():
    app = QtCore.QCoreApplication([])

    tester = TestObject()
    test_app = TestApp()
    app_runner = pydidas.multiprocessing.AppRunner(test_app)

    app_runner.sig_final_app_state.connect(tester.store_app)
    app_runner.sig_results.connect(tester.store_results)
    app_runner.finished.connect(app.exit)

    timer = QtCore.QTimer()
    timer.singleShot(10, app_runner.start)
    app.exec_()

    print("Raw results as received from the signal:")
    print("Results: ", tester.results)
    print("\nThe test app does not have any stored results because it was not connected:")
    print("test_app.results: ", test_app.results)
    print("\nThe final app has all the results stored internally in the correct order:")
    print("final_app.results:", tester.app.results)


if __name__ == "__main__":
    run_app_runner()