Module enrgdaq.daq.jobs.store.hdf5
Classes
class DAQJobStoreHDF5 (config: Any, **kwargs)-
Expand source code
class DAQJobStoreHDF5(DAQJobStore): config_type = DAQJobStoreHDF5Config allowed_store_config_types = [DAQJobStoreConfigHDF5] allowed_message_in_types = [DAQJobMessageStoreTabular] _open_files: dict[str, Any] def __init__(self, config: Any, **kwargs): super().__init__(config, **kwargs) self._open_files = {} def store_loop(self): pass def handle_message(self, message: DAQJobMessageStoreTabular) -> bool: super().handle_message(message) if message.data_columns is None: return True store_config = cast(DAQJobStoreConfigHDF5, message.store_config.hdf5) file_path = modify_file_path( store_config.file_path, store_config.add_date, message.tag ) file_path = os.path.join(self.config.out_dir, file_path) if file_path not in self._open_files: # Ensure directory exists to prevent errors dir_name = os.path.dirname(file_path) if dir_name: os.makedirs(dir_name, exist_ok=True) hdf5_file = h5py.File(file_path, "a") self._open_files[file_path] = hdf5_file self._logger.info(f"Opened file {file_path}") else: hdf5_file = self._open_files[file_path] dataset_name = store_config.dataset_name data_to_write = message.data_columns if not data_to_write: return True if dataset_name not in hdf5_file: # Create a resizable dataset for each column for k, v in data_to_write.items(): hdf5_file.create_dataset( f"{dataset_name}/{k}", data=v, maxshape=(None,), chunks=True, ) else: for k, v in data_to_write.items(): dataset = hdf5_file[f"{dataset_name}/{k}"] assert isinstance(dataset, h5py.Dataset), "Dataset is not a Dataset" dataset.resize((dataset.shape[0] + len(v)), axis=0) dataset[-len(v) :] = v return True def __del__(self): # Ensure all files are properly closed on exit. for hdf5_file in self._open_files.values(): hdf5_file.close() self._open_files.clear() super().__del__()DAQJobStore is an abstract base class for data acquisition job stores.
Ancestors
Class variables
var allowed_message_in_typesvar allowed_store_config_typesvar config_type : type[DAQJobConfig]-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
Methods
def store_loop(self)-
Expand source code
def store_loop(self): pass
Inherited members
class DAQJobStoreHDF5Config (out_dir: str = 'out/',
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)-
Expand source code
class DAQJobStoreHDF5Config(DAQJobConfig): out_dir: str = "out/"DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var out_dir : str-
Expand source code
class DAQJobStoreHDF5Config(DAQJobConfig): out_dir: str = "out/"