Module enrgdaq.daq.jobs.store.root

Classes

class DAQJobStoreROOT (config: Any, **kwargs)
Expand source code
class DAQJobStoreROOT(DAQJobStore):
    config_type = DAQJobStoreROOTConfig
    allowed_store_config_types = [DAQJobStoreConfigROOT]
    allowed_message_in_types = [DAQJobMessageStoreTabular]
    _open_files: dict[str, Any]

    def __init__(self, config: Any, **kwargs):
        super().__init__(config, **kwargs)

        self._open_files = {}

    def handle_message(self, message: DAQJobMessageStoreTabular) -> bool:
        super().handle_message(message)
        store_config = cast(DAQJobStoreConfigROOT, message.store_config)
        file_path = modify_file_path(
            store_config.file_path, store_config.add_date, message.tag
        )

        if file_path not in self._open_files:
            file_exists = os.path.exists(file_path)
            # Create the file if it doesn't exist
            if not file_exists:
                # If file was newly created, do not commit it, close it and
                # switch to update mode on the next iteration
                root_file = uproot.recreate(file_path)
            else:
                root_file = uproot.update(file_path)
                self._open_files[file_path] = root_file
        else:
            file_exists = True
            root_file = self._open_files[file_path]

        data_to_write = {}
        for idx, key in enumerate(message.keys):
            for data in message.data:
                if key not in data_to_write:
                    data_to_write[key] = []
                data_to_write[key].append(data[idx])

        # TODO: This creates a new tree every time we commit. We should probably create tree
        # once and only once, preferably when everything we needed to save is available
        # This kind of depends on the task so it will have to wait
        root_file["tree"] = {key: data_to_write[key] for key in message.keys}
        root_file.file.sink.flush()

        # Close the file if it was newly created
        if not file_exists:
            root_file.file.sink.close()

        return True

    def __del__(self):
        # Close all open files
        for root_file in self._open_files.values():
            if root_file.closed:
                continue
            root_file.file.close()

        return super().__del__()

DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.

Attributes

allowed_store_config_types : list
A list of allowed store configuration types.

Ancestors

Class variables

var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Inherited members

class DAQJobStoreROOTConfig (*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobStoreROOTConfig(DAQJobConfig):
    pass

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config : Optional[DAQRemoteConfig]
The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type : str
The type of the DAQ job.

Ancestors