The WorkflowTree class#
Introduction to the WorkflowTree#
The WorkflowTree consists of
multiple WorkflowNodes which store
information about their position in the tree and their parents and children as
well as their associated processing plugin but the nodes are agnostic to any
meta-information.
The WorkflowTree is a pydidas
singleton object with only a single instance at runtime. It manages the
interactions between the user and the individual nodes.
Its instance can be obtained by calling the following code:
>>> import pydidas
>>> TREE = pydidas.workflow.WorkflowTree()
Processing with the WorkflowTree is separated in two steps. First, any operations which need to be performed only once (i.e. initializations) are executed. Second, processing is performed for each data frame at a time. This allows to easily run the WorkflowTree in serial or parallel processing.
Assembling a WorkflowTree#
To assemble a WorkflowTree, users
need to know which Plugins they want to use and they need to configure these
plugins. Then, they can add these plugins to the tree. If the plugins are passed
to the WorkflowTree without any further information, they will be connected in a
linear manner, with every plugin appended to the last one.
Plugins can be configured either in the
WorkflowTree or before adding them
to the tree. Access to the individual plugins in the tree is somewhat hidden,
though, and it is recommended to configure each Plugin before adding it to the WorkflowTree.
To create a new node with a plugin and add it to the WorkflowTree, use the create_and_add_node method:
- ProcessingTree.create_and_add_node(plugin: BasePlugin, parent: WorkflowNode | int | None = None, node_id: int | None = None) int
Create a new node and add it to the tree.
If the tree is empty, the new node is set as root node. If no parent is given, the node will be created as child of the latest node in the tree.
- Parameters:
plugin (pydidas.Plugin) – The plugin to be added to the tree.
parent (WorkflowNode | int | None, optional) – The parent node of the newly created node. If an integer, this will be interpreted as the node_id of the parent and the respective parent will be selected. If None, this will select the latest node in the tree. The default is None.
node_id (int | None, optional) – The node ID of the newly created node, used for referencing the node in the WorkflowTree. If not specified (i.e. None), the WorkflowTree will create a new node ID. The default is None.
- Returns:
node_id – The node ID of the added node.
- Return type:
int
The following example will create a WorkflowTree which loads data from a single Hdf5 file and performs two separate integrations in different angular ranges:
>>> import pydidas
>>> TREE = pydidas.workflow.WorkflowTree()
>>> COLLECTION = pydidas.plugins.PluginCollection()
# Create a loader plugin and set the file path
>>> loader = COLLECTION.get_plugin_by_name('Hdf5FileSeriesLoader')()
# The configuration of the loader is not detailed here.
# Create an integrator plugin for a specific radial range
>>> integrator1 = COLLECTION.get_plugin_by_name('PyFAIazimuthalIntegration')()
>>> integrator1.set_param_value('rad_use_range', True)
>>> integrator1.set_param_value('rad_npoint', 200)
>>> integrator1.set_param_value('rad_range_lower', 5.5)
>>> integrator1.set_param_value('rad_range_upper', 7.5)
# Create an integrator plugin for a second radial range
>>> integrator2 = COLLECTION.get_plugin_by_name('PyFAIazimuthalIntegration')()
>>> integrator2.set_param_value('rad_use_range', True)
>>> integrator2.set_param_value('rad_npoint', 400)
>>> integrator2.set_param_value('rad_range_lower', 12.1)
>>> integrator2.set_param_value('rad_range_upper', 16.1)
# Add the plugins to the WorkflowTree. The return value of the node ID of
# the newly added plugin.
>>> TREE.create_and_add_node(loader)
0
>>> TREE.create_and_add_node(integrator1)
1
# because plugins will always be attached to the last node, the first
# integrator plugin did not need to specify a parent, but the second one
# will have to do just that:
>>> TREE.create_and_add_node(integrator2, parent=0)
2
Running workflows#
The WorkflowTree includes several
methods to run either the full Workflow or just individual plugins for testing.
Test individual plugins#
To test individual plugins, users can use the execute_single_plugin method.
- ProcessingTree.execute_single_plugin(node_id: int, arg: object, **kwargs: dict) -> (<class 'object'>, <class 'dict'>)
Execute a single node Plugin and get the return.
- Parameters:
node_id (int) – The ID of the node in the tree.
arg (object) – The input argument for the Plugin.
**kwargs (dict) – Any keyword arguments for the Plugin execution.
- Raises:
KeyError – If the node ID is not registered.
- Returns:
res (object) – The return value of the Plugin. Depending on the plugin, it can be a single value or an array.
kwargs (dict) – The (updated) kwargs dictionary.
This method will execute a single plugin only. This method can be used to check intermediate results and make sure that a workflow works as intended.
The following example shows how to use this method to read a frame from an hdf5 file and store it for further processing. (This example assumes that the objects from the previous example are still existing).
>>> res, kws = TREE.execute_single_plugin(0, 0)
>>> kws
{}
>>> res
Dataset(
axis_labels: {
0: "detector y",
1: "detector x"},
axis_ranges: {
0: None
1: None},
axis_units: {
0: "pixel",
1: "pixel"},
metadata: {'slicing_axes': [0], 'frame': [0], 'dataset':
'/entry/data/data'},
array([[0, 1, 0, ..., 1, 0, 1],
[0, 0, 1, ..., 2, 0, 0],
[0, 0, 0, ..., 0, 3, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 1, 1]], dtype=uint32)
)
Run the full WorkflowTree#
Two different methods are available to run the full WorkflowTree. First, there is the
execute_process
method which will run the full workflow for a single frame but will not gather
any results from the nodes nor return any values. This method is used by the
automatic processing where pydidas organizes results. Secondly, the
execute_process_and_get_results
method will do the same calculations but also gathers the results from the
individual plugins and returns them to the user. The documentation for the
execute_process_and_get_results
method is given below.
- ProcessingTree.execute_process_and_get_results(arg: object, **kwargs: dict) dict
Execute the WorkflowTree process and get the results.
- Parameters:
arg (object) – Any argument that need to be passed to the plugin chain.
**kwargs (dict) – Any keyword arguments which need to be passed to the plugin chain.
- Returns:
results – A dictionary with results in the form of entries with node_id keys and results items.
- Return type:
dict
Using the WorkflowTree from the
example above, the following example demonstrates the usage.
# This method will not return any results:
>>> res = TREE.execute_process(0)
>>> res is None
True
# This method will return results:
>>> res = TREE.execute_process_and_get_results(0)
>>> res
{1: Dataset(
axis_labels: {
0: '2theta'},
axis_ranges: {
0: array([5.505 , 5.51500001, 5.52500001, ...,
7.47500088, 7.48500089, 7.49500089])},
axis_units: {
0: 'deg'},
metadata: {},
array([2.357937 , 2.29853 , 2.3073444, ..., 2.0363004, 2.039918 ,
2.0199535], dtype=float32)
),
2: Dataset(
axis_labels: {
0: '2theta'},
axis_ranges: {
0: array([12.105 , 12.11500001, 12.12500001, ...,
16.07500191, 16.08500191, 16.09500192])},
axis_units: {
0: 'deg'},
metadata: {},
array([ 1.4057364, 1.4105228, 1.4086472, ..., 8.046747 , 17.791353 ,
22.341616 ], dtype=float32)
)}
To run the workflow for multiple data frames, it is recommended to use the
ExecuteWorkflowApp. Please refer
to the Tutorial for the ExecuteWorkflowApp.