The WorkflowTree class#
Introduction to the WorkflowTree#
The WorkflowTree
consists of multipke WorkflowNodes
which store information about their position in the tree and their parents and
children as well as their associated processing plugin but the nodes are
agnostic to any meta-information.
The WorkflowTree
is a pydidas singleton object with only a single instance at runtime. It manages
the interactions between the user and the individual nodes.
Its instance can be obtained by calling the following code:
>>> import pydidas
>>> TREE = pydidas.workflow.WorkflowTree()
Processing with the WorkflowTree is separated in two steps. First, any operations which need to be performed only once (i.e. initializations) are executed. Second, processing is performed for each data frame at a time. This allows to easily run the WorkflowTree in serial or parallel processing.
Assembling a WorkflowTree#
To assemble a
WorkflowTree
, users
need to know which Plugins they want to use and they need to configure these
plugins. Then, they can add these plugins to the tree. If the plugins are
passed to the WorkflowTree without any further information, they will be
connected in a linear manner, with every plugin appended to the last one.
Plugins
can be configured either in the
WorkflowTree
or
before adding them to the tree. Access to the individual plugins in the tree
is somewhat hidded, though, and it is recommended to configure each
Plugins
before adding it to
the WorkflowTree
.
To create a new node with a plugin and add it to the
WorkflowTree
, use the
create_and_add_node
method:
- ProcessingTree.create_and_add_node(plugin: BasePlugin, parent: None | int | BasePlugin = None, node_id: None | int = None) int
Create a new node and add it to the tree.
If the tree is empty, the new node is set as root node. If no parent is given, the node will be created as child of the latest node in the tree.
- Parameters:
plugin (pydidas.Plugin) – The plugin to be added to the tree.
parent (Union[WorkflowNode, int, None], optional) – The parent node of the newly created node. If an integer, this will be interpreted as the node_id of the parent and the respective parent will be selected. If None, this will select the latest node in the tree. The default is None.
node_id (Union[int, None], optional) – The node ID of the newly created node, used for referencing the node in the WorkflowTree. If not specified(ie. None), the WorkflowTree will create a new node ID. The default is None.
- Returns:
node_id – The node ID of the added node.
- Return type:
int
The following example will create a WorkflowTree which loads data from a single Hdf5 file and performs two separate integrations in different angular ranges:
>>> import pydidas
>>> TREE = pydidas.workflow.WorkflowTree()
>>> COLLECTION = pydidas.plugins.PluginCollection()
# Create a loader plugin and set the file path
>>> loader = COLLECTION.get_plugin_by_name('Hdf5FileSeriesLoader')()
# The configuration of the loader is not detailed here.
# Create an integrator plugin for a specific radial range
>>> integrator1 = COLLECTION.get_plugin_by_name('PyFAIazimuthalIntegration')()
>>> integrator1.set_param_value('rad_use_range', True)
>>> integrator1.set_param_value('rad_npoint', 200)
>>> integrator1.set_param_value('rad_range_lower', 5.5)
>>> integrator1.set_param_value('rad_range_upper', 7.5)
# Create an integrator plugin for a second radial range
>>> integrator2 = COLLECTION.get_plugin_by_name('PyFAIazimuthalIntegration')()
>>> integrator2.set_param_value('rad_use_range', True)
>>> integrator2.set_param_value('rad_npoint', 400)
>>> integrator2.set_param_value('rad_range_lower', 12.1)
>>> integrator2.set_param_value('rad_range_upper', 16.1)
# Add the plugins to the WorkflowTree. The return value of the node ID of
# the newly added plugin.
>>> TREE.create_and_add_node(loader)
0
>>> TREE.create_and_add_node(integrator1)
1
# because plugins will always be attached to the last node, the first
# integrator plugin did not need to specify a parent, but the second one
# will have to do just that:
>>> TREE.create_and_add_node(integrator2, parent=0)
2
Running workflows#
The WorkflowTree
includes several methods to run either the full Workflow or just individual
plugins for testing.
Test individual plugins#
To test individual plugins, users can use the execute_single_plugin
method.
- ProcessingTree.execute_single_plugin(node_id: int, arg: object, **kwargs: dict) -> (<class 'object'>, <class 'dict'>)
Execute a single node Plugin and get the return.
- Parameters:
node_id (int) – The ID of the node in the tree.
arg (object) – The input argument for the Plugin.
**kwargs (dict) – Any keyword arguments for the Plugin execution.
- Raises:
KeyError – If the node ID is not registered.
- Returns:
res (object) – The return value of the Plugin. Depending on the plugin, it can be a single value or an array.
kwargs (dict) – The (updated) kwargs dictionary.
This method will execute a single plugin only. This method can be used to check intermediate results and make sure that a workflow works as intended.
The following example shows how to use this method to read a frame from an hdf5 file and store it for further processing. (This example assumes that the objects from the previous example are still existing).
>>> res, kws = TREE.execute_single_plugin(0, 0)
>>> kws
{}
>>> res
Dataset(
axis_labels: {
0: "detector y",
1: "detector x"},
axis_ranges: {
0: None
1: None},
axis_units: {
0: "pixel",
1: "pixel"},
metadata: {'slicing_axes': [0], 'frame': [0], 'dataset':
'/entry/data/data'},
array([[0, 1, 0, ..., 1, 0, 1],
[0, 0, 1, ..., 2, 0, 0],
[0, 0, 0, ..., 0, 3, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 1, 1]], dtype=uint32)
)
Run the full WorkflowTree#
Two different methods are available to run the full
WorkflowTree
. First,
there is the execute_process
method which
will run the full workflow for a single frame but will not gather any results
from the nodes nor return any values. This method is used by the automatic
processing where pydidas organizes results. Secondly, the
execute_process_and_get_results
method will do the same calculations but also gathers the results from the
individual plugins and returns them to the user. The documentation for the
execute_process_and_get_results
method is given below.
- ProcessingTree.execute_process_and_get_results(arg: object, **kwargs: dict) dict
Execute the WorkflowTree process and get the results.
- Parameters:
arg (object) – Any argument that need to be passed to the plugin chain.
**kwargs (dict) – Any keyword arguments which need to be passed to the plugin chain.
- Returns:
results – A dictionary with results in the form of entries with node_id keys and results items.
- Return type:
dict
Using the WorkflowTree
from the example above, the following example demonstrates the usage.
# This method will not return any results:
>>> res = TREE.execute_process(0)
>>> res is None
True
# This method will return results:
>>> res = TREE.execute_process_and_get_results(0)
>>> res
{1: Dataset(
axis_labels: {
0: '2theta'},
axis_ranges: {
0: array([5.505 , 5.51500001, 5.52500001, ...,
7.47500088, 7.48500089, 7.49500089])},
axis_units: {
0: 'deg'},
metadata: {},
array([2.357937 , 2.29853 , 2.3073444, ..., 2.0363004, 2.039918 ,
2.0199535], dtype=float32)
),
2: Dataset(
axis_labels: {
0: '2theta'},
axis_ranges: {
0: array([12.105 , 12.11500001, 12.12500001, ...,
16.07500191, 16.08500191, 16.09500192])},
axis_units: {
0: 'deg'},
metadata: {},
array([ 1.4057364, 1.4105228, 1.4086472, ..., 8.046747 , 17.791353 ,
22.341616 ], dtype=float32)
)}
To run the workflow for multiple data frames, it is recommended to use the
ExecuteWorkflowApp
. Please refer
to the Tutorial for the ExecuteWorkflowApp.