Module enrgdaq.daq.jobs.store.hdf5

Classes

class DAQJobStoreHDF5 (config: Any, **kwargs)
Expand source code
class DAQJobStoreHDF5(DAQJobStore):
    config_type = DAQJobStoreHDF5Config
    allowed_store_config_types = [DAQJobStoreConfigHDF5]
    allowed_message_in_types = [DAQJobMessageStoreTabular]
    _open_files: dict[str, Any]

    def __init__(self, config: Any, **kwargs):
        super().__init__(config, **kwargs)

        self._open_files = {}

    def store_loop(self):
        pass

    def handle_message(self, message: DAQJobMessageStoreTabular) -> bool:
        super().handle_message(message)

        if message.data_columns is None:
            return True

        store_config = cast(DAQJobStoreConfigHDF5, message.store_config.hdf5)
        file_path = modify_file_path(
            store_config.file_path, store_config.add_date, message.tag
        )
        file_path = os.path.join(self.config.out_dir, file_path)

        if file_path not in self._open_files:
            # Ensure directory exists to prevent errors
            dir_name = os.path.dirname(file_path)
            if dir_name:
                os.makedirs(dir_name, exist_ok=True)

            hdf5_file = h5py.File(file_path, "a")
            self._open_files[file_path] = hdf5_file
            self._logger.info(f"Opened file {file_path}")
        else:
            hdf5_file = self._open_files[file_path]

        dataset_name = store_config.dataset_name
        data_to_write = message.data_columns

        if not data_to_write:
            return True

        if dataset_name not in hdf5_file:
            # Create a resizable dataset for each column
            for k, v in data_to_write.items():
                hdf5_file.create_dataset(
                    f"{dataset_name}/{k}",
                    data=v,
                    maxshape=(None,),
                    chunks=True,
                )
        else:
            for k, v in data_to_write.items():
                dataset = hdf5_file[f"{dataset_name}/{k}"]
                assert isinstance(dataset, h5py.Dataset), "Dataset is not a Dataset"
                dataset.resize((dataset.shape[0] + len(v)), axis=0)
                dataset[-len(v) :] = v

        return True

    def __del__(self):
        # Ensure all files are properly closed on exit.
        for hdf5_file in self._open_files.values():
            hdf5_file.close()
        self._open_files.clear()

        super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores.

Ancestors

Class variables

var allowed_message_in_types
var allowed_store_config_types
var config_type : type[DAQJobConfig]

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.

Methods

def store_loop(self)
Expand source code
def store_loop(self):
    pass

Inherited members

class DAQJobStoreHDF5Config (out_dir: str = 'out/',
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)
Expand source code
class DAQJobStoreHDF5Config(DAQJobConfig):
    out_dir: str = "out/"

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.

Ancestors

Instance variables

var out_dir : str
Expand source code
class DAQJobStoreHDF5Config(DAQJobConfig):
    out_dir: str = "out/"