Source code for ewoksbm29.io.read_online

import logging
import os
import time
from typing import Optional

from blissdata.redis_engine.scan import ScanState
from silx.io import h5py_utils
from silx.utils.retry import RetryTimeoutError

from ..blissdata import get_streams_with_lima
from ..blissdata import wait_scan_prepared

logger = logging.getLogger(__name__)


[docs] def read_scan_data_slice( scan_key: str, lima_file_index: int = 0, retry_timeout: int = 10, retry_period: int = 1, dahu_to_counter_name: Optional[dict] = None, storage_ring_current: Optional[float] = None, exposure_time: Optional[float] = None, **_, ) -> Optional[dict]: """Return a subset of `IntegrateParameters` for a SAXS scan index range covered by the lima file with index `lima_file_index`.""" # Wait for scan to be PREPARED scan = wait_scan_prepared(scan_key, retry_timeout) # Get Blissdata streams if dahu_to_counter_name is None: dahu_to_counter_name = {} counter_to_dahu_name = {v: k for k, v in dahu_to_counter_name.items()} lima_stream, counter_streams = get_streams_with_lima( scan, list(counter_to_dahu_name) ) if lima_stream is None: logger.warning("scan '%s' has no Lima stream", scan_key) return lima_info = lima_stream.info["lima_info"] # Wait until the next lima file appears on disk or the scan is finished next_lima_file_path = lima_info["file_path"] % (lima_file_index + 1) while scan.state < ScanState.CLOSED and not os.path.exists(next_lima_file_path): time.sleep(retry_period) scan.update(block=False) # When the scan is finished and the lima file does not exist, no more data is expected lima_file_path = lima_info["file_path"] % lima_file_index if scan.state >= ScanState.CLOSED and not os.path.exists(lima_file_path): return # Get number of Lima images logger.info("Select Lima file: %s", lima_file_path) lima_data_path = lima_info["data_path"] nimages = _wait_lima_images_accessible( lima_file_path, lima_data_path, retry_timeout, retry_period ) if nimages == 0: if scan.state < ScanState.CLOSED: logger.warning("Lima file '%s' has no images", lima_file_path) return # Determine scan slice frame_per_file = lima_info["frame_per_file"] start_index = lima_file_index * frame_per_file stop_index = start_index + nimages # Slice counter data if not counter_streams: counter_data = {} elif start_index == stop_index: counter_data = {dahu_name: [] for dahu_name in dahu_to_counter_name} else: if scan.state >= ScanState.CLOSED: npoints = min(len(stream) for stream in counter_streams.values()) stop_index = min(stop_index, npoints) counter_data = { counter_to_dahu_name[ctr_name]: stream[start_index:stop_index].tolist() for ctr_name, stream in counter_streams.items() } # Compile scan data slice (see IntegrateParameters) scan_data_slice = { "input_file": lima_file_path, "frame_ids": list(range(start_index, stop_index)), **counter_data, } # Extract additional scan info scan_info = scan.info energy = _get_energy(scan_info) if energy is not None: scan_data_slice["energy"] = energy if exposure_time is None: exposure_time = _get_exposure_time(scan_info) if exposure_time is not None: scan_data_slice["exposure_time"] = exposure_time if "storage_ring_current" not in counter_data: if storage_ring_current is None: storage_ring_current = _get_storage_ring_current(scan_info) if storage_ring_current is not None: storage_ring_current = [storage_ring_current] * (stop_index - start_index) scan_data_slice["storage_ring_current"] = storage_ring_current return scan_data_slice
def _wait_lima_images_accessible( lima_file_path: str, lima_data_path: str, retry_timeout: float, retry_period: float ) -> int: """Returns number of lima images in the `"{lima_file_path}::{lima_data_path}"` dataset.""" try: with h5py_utils.open_item( lima_file_path, lima_data_path, mode="r", retry_timeout=retry_timeout, retry_period=retry_period, ) as images: return len(images) except RetryTimeoutError: return 0 def _get_energy(scan_info: dict) -> Optional[float]: """Energy in keV""" try: return scan_info["positioners"]["positioners_start"]["energy"] except KeyError: return None def _get_exposure_time(scan_info) -> Optional[float]: """Exposure time in seconds""" return scan_info.get("count_time") def _get_storage_ring_current(scan_info: dict) -> Optional[float]: """Storage ring current in mA""" instrument = scan_info.get("instrument", {}) for item in instrument.values(): if not isinstance(item, dict): continue if item.get("@NX_class") == "NXsource": return item.get("current")