Module enrgdaq.daq.jobs.store.root
Classes
class DAQJobStoreROOT (config: Any, **kwargs)
-
Expand source code
class DAQJobStoreROOT(DAQJobStore): config_type = DAQJobStoreROOTConfig allowed_store_config_types = [DAQJobStoreConfigROOT] allowed_message_in_types = [DAQJobMessageStoreTabular] _open_files: dict[str, Any] def __init__(self, config: Any, **kwargs): super().__init__(config, **kwargs) self._open_files = {} def handle_message(self, message: DAQJobMessageStoreTabular) -> bool: super().handle_message(message) store_config = cast(DAQJobStoreConfigROOT, message.store_config) file_path = modify_file_path( store_config.file_path, store_config.add_date, message.tag ) if file_path not in self._open_files: file_exists = os.path.exists(file_path) # Create the file if it doesn't exist if not file_exists: # If file was newly created, do not commit it, close it and # switch to update mode on the next iteration root_file = uproot.recreate(file_path) else: root_file = uproot.update(file_path) self._open_files[file_path] = root_file else: file_exists = True root_file = self._open_files[file_path] data_to_write = {} for idx, key in enumerate(message.keys): for data in message.data: if key not in data_to_write: data_to_write[key] = [] data_to_write[key].append(data[idx]) # TODO: This creates a new tree every time we commit. We should probably create tree # once and only once, preferably when everything we needed to save is available # This kind of depends on the task so it will have to wait root_file["tree"] = {key: data_to_write[key] for key in message.keys} root_file.file.sink.flush() # Close the file if it was newly created if not file_exists: root_file.file.sink.close() return True def __del__(self): # Close all open files for root_file in self._open_files.values(): if root_file.closed: continue root_file.file.close() return super().__del__()
DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.
Attributes
allowed_store_config_types
:list
- A list of allowed store configuration types.
Ancestors
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any
-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Inherited members
class DAQJobStoreROOTConfig (*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)-
Expand source code
class DAQJobStoreROOTConfig(DAQJobConfig): pass
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin