Source code for ewoksbm29.io.read_offline

import logging
import os
from typing import Optional
from typing import Tuple

import h5py
from silx.io import h5py_utils

logger = logging.getLogger(__name__)


[docs] def read_scan_data_slice( scan_file_path: str, scan_number: int, lima_file_index: int = 0, dahu_to_counter_name: Optional[dict] = None, storage_ring_current: Optional[float] = None, exposure_time: Optional[float] = None, **_, ) -> Optional[dict]: """Return a subset of `IntegrateParameters` for a SAXS scan index range covered by the lima file with index `lima_file_index`.""" subscan = f"{scan_number}.1" with h5py_utils.File(scan_file_path, mode="r") as h5f: scan_entry = h5f[subscan] instrument = scan_entry["instrument"] images = _get_lima_dataset(instrument) if images is None: logger.warning( "scan '%s::/%s' has no Lima detector", scan_file_path, subscan ) return virtual_sources = images.virtual_sources() number_of_lima_files = len(virtual_sources) if lima_file_index >= number_of_lima_files: return # Determine scan slice lima_file_path = _extract_lima_file_path( virtual_sources[lima_file_index], scan_file_path ) start_index, stop_index = _extract_lima_slice( virtual_sources, lima_file_index, scan_file_path ) # Slice counter data counter_data = {} if dahu_to_counter_name is None: dahu_to_counter_name = {} for dahu_name, ctr_name in dahu_to_counter_name.items(): counter_data[dahu_name] = h5f[f"/{scan_number}.1/measurement/{ctr_name}"][ start_index:stop_index ] # Compile scan data slice (see IntegrateParameters) scan_data_slice = { "input_file": lima_file_path, "frame_ids": list(range(start_index, stop_index)), **counter_data, } # Extract additional scan info energy = _get_energy(instrument) if energy is not None: scan_data_slice["energy"] = energy if exposure_time is None: exposure_time = _get_exposure_time(scan_entry) if exposure_time is not None: scan_data_slice["exposure_time"] = exposure_time if "storage_ring_current" not in counter_data: if storage_ring_current is None: storage_ring_current = _get_storage_ring_current(instrument) if storage_ring_current is not None: storage_ring_current = [storage_ring_current] * ( stop_index - start_index ) scan_data_slice["storage_ring_current"] = storage_ring_current return scan_data_slice
def _extract_lima_file_path(vsource, scan_file_path: str) -> str: lima_file_path = vsource.file_name if os.path.isabs(lima_file_path): return lima_file_path return os.path.join(os.path.dirname(scan_file_path), lima_file_path) def _extract_lima_slice( virtual_sources, lima_file_index: int, scan_file_path: str ) -> Tuple[int, int]: start_index = 0 for i in range(0, lima_file_index): nimages = _extract_number_of_lima_images(virtual_sources[i], scan_file_path) start_index += nimages nimages = _extract_number_of_lima_images( virtual_sources[lima_file_index], scan_file_path ) stop_index = start_index + nimages return start_index, stop_index def _extract_number_of_lima_images(vsource, scan_file_path: str) -> int: src_space = vsource.src_space if src_space.shape: return src_space.shape[0] # TODO: sometimes the shape is an empty tuple lima_file_path = _extract_lima_file_path(vsource, scan_file_path) with h5py_utils.File(lima_file_path) as f: return len(f[vsource.dset_name]) def _get_lima_dataset(instrument: h5py.Group) -> Optional[h5py.Dataset]: for name in instrument: h5item = instrument[name] if not isinstance(h5item, h5py.Group): continue if "type" not in h5item: continue stype = h5item["type"][()] try: stype = stype.decode() except AttributeError: pass if stype != "lima": continue if "image" not in h5item: continue image = h5item["image"] if not image.is_virtual: continue return image def _get_energy(instrument: h5py.Group) -> Optional[float]: """Energy in keV""" try: return instrument["positioners_start"]["energy"][()] except KeyError: return None def _get_exposure_time(scan_entry: h5py.Group) -> Optional[float]: """Exposure time in seconds""" try: return scan_entry["scan_parameters"]["count_time"][()] except KeyError: return None def _get_storage_ring_current(instrument: h5py.Group) -> Optional[float]: """Storage ring current in mA""" source = _get_nxsource(instrument) if not source: return None try: return source["current"][()] except KeyError: return None def _get_nxsource(instrument: h5py.Group) -> Optional[h5py.Group]: for name in instrument: h5item = instrument[name] if not isinstance(h5item, h5py.Group): continue if h5item.attrs.get("NXsource"): return h5item