Source code for ewoksbm29.tests.test_accumulation

import time

from ewokscore import Task
from ewoksppf import execute_graph


[docs] def test_accumulate_dahu_results(): global _DAHU_JOB_COUNTER _DAHU_JOB_COUNTER = 0 njobs_executed = 0 # Check the njobs are accumulated i0 = 0 njobs = 3 inputs = [ {"id": "job", "name": "dahu_job_index", "value": i0}, {"id": "job", "name": "dahu_job_index_max", "value": i0 + njobs - 1}, ] results = execute_graph(_WORKFLOW, pool_type="thread", inputs=inputs) dahu_results = {} for i in range(njobs): dahu_results[i0 + i] = { "dahu_job_id": i + njobs_executed, "dahu_result": {"value": i0 + i}, } assert results == {"dahu_results": dahu_results} njobs_executed += njobs # Ensure we don't accumulate across jobs i0 = 10 njobs = 3 inputs = [ {"id": "job", "name": "dahu_job_index", "value": i0}, {"id": "job", "name": "dahu_job_index_max", "value": i0 + njobs - 1}, ] results = execute_graph(_WORKFLOW, pool_type="thread", inputs=inputs) dahu_results = {} for i in range(njobs): dahu_results[i0 + i] = { "dahu_job_id": i + njobs_executed, "dahu_result": {"value": i0 + i}, } assert results == {"dahu_results": dahu_results}
class _MockDahuJob( Task, input_names=["dahu_job_index", "dahu_job_index_max"], output_names=[ "dahu_job_index", "dahu_job_id", "dahu_result", "next_dahu_job_index", "finished", ], ): def run(self): global _DAHU_JOB_COUNTER dahu_job_id = _DAHU_JOB_COUNTER _DAHU_JOB_COUNTER += 1 dahu_job_index = self.inputs.dahu_job_index dahu_result = {"value": dahu_job_index} self.outputs.dahu_job_index = dahu_job_index self.outputs.dahu_job_id = dahu_job_id self.outputs.dahu_result = dahu_result self.outputs.next_dahu_job_index = dahu_job_index + 1 self.outputs.finished = dahu_job_index >= self.inputs.dahu_job_index_max print( f"job_id={self.job_id}, dahu_job_id={dahu_job_id}, dahu_job_index={dahu_job_index}, finished={self.outputs.finished}" ) time.sleep(0.1) _DAHU_JOB_COUNTER = None _WORKFLOW = { "graph": {"id": "test_accumulation"}, "nodes": [ { "id": "job", "task_type": "class", "task_identifier": f"{__name__}._MockDahuJob", }, { "id": "accumulate", "task_type": "class", "task_identifier": "ewoksbm29.tasks.results.AccumulateDahuJobResults", }, ], "links": [ { "source": "job", "target": "job", "data_mapping": [ { "source_output": "next_dahu_job_index", "target_input": "dahu_job_index", } ], "conditions": [{"source_output": "finished", "value": False}], }, { "source": "job", "target": "accumulate", "data_mapping": [ {"source_output": "dahu_job_index", "target_input": "dahu_job_index"}, {"source_output": "dahu_job_id", "target_input": "dahu_job_id"}, {"source_output": "dahu_result", "target_input": "dahu_result"}, ], }, ], }