Source code for ewoksbm29.tasks.base.dahu_ispyb

from ...models.ispyb import ISPyBMetadata
from .dahu import DahuJob


[docs] class DahuJobWithIspybUpload( DahuJob, optional_input_names=["ispyb_metadata", "ispyb_url"], ): """Ewoks task that runs a Dahu job with uploading to Ipysb. In addition to the inputs from `DahuJob`: Optional inputs: - ispyb_metadata (dict): Scan metadata (see `ISPyBMetadata`). - ispyb_url (str): WDSL end-point of the Ispyb SOAP service. """
[docs] def dahu_parameters_initialize(self) -> dict: dahu_parameters = super().dahu_parameters_initialize() ispyb_metadata = self.get_input_value("ispyb_metadata") if ispyb_metadata: ispyb_metadata = ISPyBMetadata(**ispyb_metadata) self._add_ispyb_metadata(dahu_parameters, ispyb_metadata) return dahu_parameters
def _add_ispyb_metadata( self, dahu_parameters: dict, ispyb_metadata: ISPyBMetadata ) -> None: dahu_parameters["ispyb"] = ispyb_metadata.ispyb_parameters ispyb_url = self.get_input_value("ispyb_url") if ispyb_url: dahu_parameters["ispyb"]["url"] = ispyb_url
[docs] def dahu_parameters_finalize(self, dahu_parameters: dict) -> None: if not self.missing_inputs.ispyb_url and not self.inputs.ispyb_url: # Ispyb upload is explicitly disabled if "ispyb" in dahu_parameters: _ = dahu_parameters["ispyb"].pop("url", None) super().dahu_parameters_finalize(dahu_parameters)