Source code for ewoksbm29.tasks.results
import logging
from functools import lru_cache
from ewokscore import Task
logger = logging.getLogger(__name__)
[docs]
class AccumulateDahuJobResults(
Task,
input_names=["dahu_job_index", "dahu_job_id", "dahu_result"],
output_names=["dahu_results"],
):
"""Accumulate Dahu job results with the job index as a key.
.. code: python
dahu_results[dahu_job_index] = {"dahu_job_id": dahu_job_id, "dahu_result": dahu_result}
Required inputs:
- dahu_job_index (int): Dahu job index for ordering results.
- dahu_job_id (Optional[int]): Dahu job id (`None` means no Dahu job was executed).
- dahu_result (Optional[dict]): Dahu result (`None` means no Dahu job was executed).
Outputs:
- dahu_results (Dict[int, dict]): Add the Dahu result to results from previous
executions within the same workflow.
"""
[docs]
def run(self):
ewoks_job_id = self.job_id
dahu_results = _dahu_job_results(ewoks_job_id)
value = {
"dahu_job_id": self.inputs.dahu_job_id,
"dahu_result": self.inputs.dahu_result,
}
dahu_results[self.inputs.dahu_job_index] = value
self.outputs.dahu_results = dahu_results
logger.info(
"Accumulate dahu_job_index=%d, job_id=%d",
self.inputs.dahu_job_id,
self.inputs.dahu_job_index,
)
@lru_cache(maxsize=10)
def _dahu_job_results(ewoks_job_id: str) -> dict:
return {}