Developers guide to pydidas multiprocessing#
Pydidas offers multiprocessing using the python multiprocessing package with a controller process in a separate thread to prevent the caller from blocking.
The WorkerController
is
the generic pydidas implementation and the AppRunner
is the subclassed version to run pydidas
apps.
WorkerController#
Pydidas uses the WorkerController
class to run generic (function)
tasks.
Communication with workers#
Communication with the workers is handles by four queues:
send queue to send tasks to the workers.
reveiver queue to receive results from the workers.
stop queue to send stop signals to the workers.
finished queue for the workers to signal they have completed all tasks.
The user does not have to interact with the queues itself, this is handled by
the WorkerController
.
If the user wants to stop the workers, they can use the
send_stop_signal
method:
- WorkerController.send_stop_signal()
Send stop signal to workers.
This method will send stop signals to all workers. Note that the runtime of the workers will be determined by the runtime of the called function. The currrent call will be finished before the stop signal will be processed.
Communication with the user#
The WorkerController
uses Qt’s slots and signals for communicating results to the user.
Three signals are provided:
sig_progress(float): This signal is emitted after a result has been received and gives the current progress, normalized to the range [0, 1).
sig_results(object, object): This signal is emitted for each result and returns the tuple (task argument, task result) to allow identification of the result.
sig_finshed: The finished signal is emitted once all tasks have been performed and all workers have finished.
Note
It is the responsibility of the user to connect to the signals prior to starting a process to receive the results. Because of the Qt framework’s behaviour, an eventloop must be running for the signals to be processed.
Key methods#
The following methods are key to using the WorkerController
:
method name |
description |
---|---|
change_function(func, *args, **kwargs) |
Change the function to be called by the workers. *args and **kwargs can be any additional calling arguments to the function. The first calling argument will always be the task. |
add_task(task) |
Add the given task to the list of tasks to be processed. |
add_tasks(tasks) |
Add all individual tasks from the iterable argument to the list of tasks to be processed. |
finalize_tasks() |
This method will add stop tasks to the queue to inform the workers that all tasks have been successfully finished. Calling this method will also flag the workers to finish and the processes will terminate after finishing all calculations. |
start() |
The run method will start the thread event loop, start the worker processes and submit all tasks to the queue. |
suspend() |
Suspend will temporarily suspend the event loop. Note that all submitted tasks will still be processed by the workers but no new tasks will be submitted and no results will be processed. |
restart() |
This method will restart processing of the event loop. |
Examples#
Minimal working example#
The following minimal working example can be run from an interactive console or saved as file.
import time
import pydidas
import numpy as np
from qtpy import QtTest
def test_func(task, slope, offset):
return task* slope + offset
def run_worker_controller():
worker_controller = pydidas.multiprocessing.WorkerController()
worker_controller.change_function(test_func, 2, 5)
result_spy = QtTest.QSignalSpy(worker_controller.sig_results)
worker_controller.add_tasks(np.arange(10))
worker_controller.finalize_tasks()
worker_controller.start()
while True:
print("Progress at ", worker_controller.progress)
if worker_controller.progress >= 1:
break
time.sleep(0.5)
results = sorted(result_spy)
print(results)
print("WorkerController is alive: ", worker_controller.is_alive())
if __name__ == "__main__":
run_worker_controller()
Working example with restart of the Thread#
In the following example, not calling the finalize_tasks
will keep the
thread alive and allow the submission of new tasks.
import time
import pydidas
import numpy as np
from qtpy import QtTest
def test_func(task, slope, offset):
return task* slope + offset
def run_worker_controller_with_restart():
worker_controller = pydidas.multiprocessing.WorkerController()
worker_controller.change_function(test_func, 2, 5)
result_spy = QtTest.QSignalSpy(worker_controller.sig_results)
worker_controller.add_tasks(np.arange(10))
# worker_controller.finalize_tasks()
worker_controller.start()
print("\nWaiting for results ...")
with pydidas.core.utils.TimerSaveRuntime() as runtime:
while True:
if worker_controller.progress >= 1:
break
time.sleep(0.005)
print("Runtime was ", runtime())
results = sorted(result_spy)
print("Results: ", results)
print("WorkerController is alive: ", worker_controller.isRunning())
worker_controller.add_tasks(np.arange(10, 20))
print("\nWaiting for results ...")
with pydidas.core.utils.TimerSaveRuntime() as runtime:
while True:
if worker_controller.progress >= 1:
break
time.sleep(0.005)
print("Runtime was ", runtime())
results = sorted(result_spy)
print("Results: ", results)
# now, if we suspend it, to change the function, and to add more items to
# its tasks but they will not be processed:
worker_controller.suspend()
worker_controller.change_function(test_func, -1, 0)
worker_controller.add_tasks(np.arange(20, 30))
time.sleep(0.2)
# restarting will spawn new Processes to carry out the calculations:
worker_controller.restart()
print("\nWaiting for results ...")
with pydidas.core.utils.TimerSaveRuntime() as runtime:
while True:
if worker_controller.progress >= 1:
break
time.sleep(0.005)
print("Runtime was ", runtime())
results = sorted(result_spy)
print("Results: ", results)
print("WorkerController is alive: ", worker_controller.isRunning())
if __name__ == "__main__":
# run_worker_controller()
run_worker_controller_with_restart()
AppRunner#
The AppRunner
is the specialized
subclass to work with pydidas Apps
.
A sequence diagram of the communication with the AppRunner
is given below.
It is a QObject and uses signals and slots for communicating with the main event
loop. The app
will be executed in independent
processes in the The app_processor .
For a full description of the BaseApp
and
how it works, please refer to the Developers guide to pydidas applications.
AppRunner signals#
The AppRunner
uses the following
signals:
signal name |
type |
description |
---|---|---|
sig_progress |
float |
This signal emits the relative progress once a result has been received from a worker. The values are in the range [0, 1]. |
sig_results |
(int, object) |
The task number and results are emitted as a signal once they have been received from the workers. |
finished |
None |
This generic QThread signal is emitted once the processing has been completed. |
sig_final_app_state |
object |
After the AppRunner’s local copy of the app has finished processing all results, this signal sends the local app’s state back to the main event loop. |
The app_processor#
The app_processor
is the
pydidas function which runs App tasks in a separate process. Tasks and result
notifications are exchanged via queues. The transfer of results to the AppRunner
process must be handled by the app and can be implemented to the developer’s
own taste. Because all queued data is pickled, it is not advisable to send large
data over the queue but instead to use the multiprocessing shared memory.
The app_processor's
event
loop is summarized in the flowchart below:
Example#
The following example is a minimal working example. A TestApp
has
been written which performs a simple arithmetic operation on the numbers
0..20.
Because signals and slots only work when the Qt event loop is running, a
QCoreApplication is started and a test object is used to receive the
AppRunner's
signals.
import numpy as np
from qtpy import QtCore
import pydidas
class TestApp(pydidas.core.BaseApp):
def __init__(self, *args, **kwargs):
pydidas.core.BaseApp.__init__(self, *args, **kwargs)
self._n = 20
self.results = np.zeros((self._n))
def multiprocessing_get_tasks(self):
return np.arange(self._n)
def multiprocessing_func(self, index):
return 3 * index + 5
@QtCore.Slot(int, object)
def multiprocessing_store_results(self, index, *args):
self.results[index] = args[0]
class TestObject(QtCore.QObject):
def __init__(self):
QtCore.QObject.__init__(self)
self.app = None
self.results = []
@QtCore.Slot(object)
def store_app(self, app):
self.app = app
@QtCore.Slot(int, object)
def store_results(self, index, *results):
self.results.append([index, results[0]])
def run_app_runner():
app = QtCore.QCoreApplication([])
tester = TestObject()
test_app = TestApp()
app_runner = pydidas.multiprocessing.AppRunner(test_app)
app_runner.sig_final_app_state.connect(tester.store_app)
app_runner.sig_results.connect(tester.store_results)
app_runner.finished.connect(app.exit)
timer = QtCore.QTimer()
timer.singleShot(10, app_runner.start)
app.exec_()
print("Raw results as received from the signal:")
print("Results: ", tester.results)
print("\nThe test app does not have any stored results because it was not connected:")
print("test_app.results: ", test_app.results)
print("\nThe final app has all the results stored internally in the correct order:")
print("final_app.results:", tester.app.results)
if __name__ == "__main__":
run_app_runner()