Source code for ewoksbm29.workflows.run

from typing import Dict
from typing import List
from typing import Union

from celery.result import AsyncResult
from ewoksjob.client import submit
from ewoksppf import execute_graph
from ewoksutils.task_utils import task_inputs


[docs] def integrate( read_inputs: dict, integrate_inputs: dict, remote: bool = False ) -> Union[Dict[int, dict], AsyncResult]: inputs = task_inputs(task_identifier="ReadScanDataSlice", inputs=read_inputs) inputs += task_inputs(task_identifier="DahuIntegrate", inputs=integrate_inputs) return _execute("integrate", inputs, remote=remote)
[docs] def subtract(subtract_inputs: dict, remote: bool = False) -> None: inputs = task_inputs(task_identifier="DahuSubtract", inputs=subtract_inputs) return _execute("subtract", inputs, remote=remote)
[docs] def hplc_summary(summary_inputs: dict, remote: bool = False) -> None: inputs = task_inputs(task_identifier="DahuHplcSummary", inputs=summary_inputs) return _execute("hplc_summary", inputs, remote=remote)
def _execute(workflow: str, inputs: List[dict], remote: bool = False): args = (workflow,) kwargs = { "inputs": inputs, "engine": "ppf", "pool_type": "thread", # "max_workers": 3, "load_options": {"root_module": "ewoksbm29.workflows"}, } if remote: return submit(args=args, kwargs=kwargs) else: return execute_graph(*args, **kwargs)