Module enrgdaq.daq.jobs.store.root
Classes
class DAQJobStoreROOT (config: Any, **kwargs)-
Expand source code
class DAQJobStoreROOT(DAQJobStore): """ ROOT file store with buffered writes for high throughput. Buffers incoming data and writes to ROOT in batches to minimize TBasket overhead. Per uproot docs, each extend() should be at least 100 kB per branch for efficient writing. """ config_type = DAQJobStoreROOTConfig config: DAQJobStoreROOTConfig allowed_store_config_types = [DAQJobStoreConfigROOT] allowed_message_in_types = [DAQJobMessageStoreTabular, DAQJobMessageStorePyArrow] _open_files: dict[str, uproot.WritableDirectory] _open_trees: dict[str, dict[str, uproot.WritableTree]] _buffers: dict[tuple[str, str], TreeBuffer] # (file_path, tree_name) -> buffer _resolved_paths: dict[str, str] # original_path -> resolved_path def __init__(self, config: Any, **kwargs): super().__init__(config, **kwargs) self._open_files = {} self._open_trees = {} self._buffers = {} self._resolved_paths = {} def handle_message( self, message: DAQJobMessageStoreTabular | DAQJobMessageStorePyArrow ) -> bool: super().handle_message(message) # Convert message to PyArrow table if isinstance(message, DAQJobMessageStorePyArrow): table = message.get_table() message.release() if table.num_rows == 0: return True else: # Convert tabular data to PyArrow table data_columns = message.data_columns if not data_columns: self._logger.warning( "Received tabular message with no data_column, DAQJobStoreROOT does not support that." ) return True table = pa.table(data_columns) store_config = cast(DAQJobStoreConfigROOT, message.store_config.root) original_path = modify_file_path( store_config.file_path, store_config.add_date, message.tag ) original_path = os.path.join(self.config.out_dir, original_path) # Resolve file path (use cached resolution or find available path) if original_path in self._resolved_paths: file_path = self._resolved_paths[original_path] else: file_path = _get_available_file_path( original_path, set(self._resolved_paths.values()) ) self._resolved_paths[original_path] = file_path if file_path != original_path: self._logger.info( f"File '{original_path}' exists, using '{file_path}' instead" ) tree_name = store_config.tree_name # Get or create buffer for this file/tree buffer_key = (file_path, tree_name) if buffer_key not in self._buffers: self._buffers[buffer_key] = TreeBuffer(store_config=store_config) buffer = self._buffers[buffer_key] # Add table to buffer buffer.tables.append(table) buffer.total_bytes += table.nbytes # Check if we should flush immediately (buffer too large) if buffer.total_bytes >= self.config.buffer_size_bytes: self._flush_buffer(file_path, tree_name) return True def _flush_ready_buffers(self): """Flush buffers that have exceeded the time threshold.""" now = datetime.now() buffers_to_flush = [] for (file_path, tree_name), buffer in self._buffers.items(): if not buffer.tables: continue time_since_flush = (now - buffer.last_flush_time).total_seconds() if time_since_flush >= self.config.flush_interval_seconds: buffers_to_flush.append((file_path, tree_name)) for file_path, tree_name in buffers_to_flush: self._flush_buffer(file_path, tree_name) def _flush_buffer(self, file_path: str, tree_name: str): """Flush buffered data to ROOT file.""" buffer_key = (file_path, tree_name) if buffer_key not in self._buffers: return buffer = self._buffers[buffer_key] if not buffer.tables: return store_config = buffer.store_config assert store_config is not None, "Buffer has no store_config" # Combine all buffered tables into one combined_table = pa.concat_tables(buffer.tables) # Convert to dict of numpy arrays for uproot data_to_write = { col_name: combined_table.column(col_name).to_numpy() for col_name in combined_table.column_names } # Open or get ROOT file if file_path not in self._open_files or self._open_files[file_path].closed: dir_name = os.path.dirname(file_path) if dir_name: os.makedirs(dir_name, exist_ok=True) if os.path.exists(file_path): root_file = uproot.update(file_path) else: root_file = uproot.recreate( file_path, compression=ROOT_COMPRESSION_TYPES[store_config.compression_type]( level=store_config.compression_level ), ) self._open_files[file_path] = root_file self._logger.debug(f"Opened file {file_path}") else: root_file = self._open_files[file_path] # Create or get tree # Check our cache first - if we have the tree cached, use it if file_path in self._open_trees and tree_name in self._open_trees[file_path]: tree = self._open_trees[file_path][tree_name] elif tree_name not in root_file: # Tree doesn't exist, create it tree = root_file.mktree( tree_name, { k: v.dtype for k, v in data_to_write.items() if isinstance(v, ndarray) }, ) self._logger.debug(f"Created tree {tree_name}") if file_path not in self._open_trees: self._open_trees[file_path] = {} self._open_trees[file_path][tree_name] = tree else: # Tree exists in file but not in our cache - this shouldn't happen # in normal operation. It can occur if a file was left from a previous # run. The caller should clean up output files before starting. raise RuntimeError( f"Tree '{tree_name}' already exists in file '{file_path}' but was not created " f"by this process. Please remove the file and try again." ) # Write the combined data in one extend() call start_time = time.time() tree.extend(data_to_write) write_time = time.time() - start_time self._logger.debug( f"Flushed {len(buffer.tables)} messages ({buffer.total_bytes / 1_000_000:.2f} MB, " f"{combined_table.num_rows} rows) to {tree_name} in {write_time:.3f}s" ) # Clear buffer buffer.tables.clear() buffer.total_bytes = 0 buffer.last_flush_time = datetime.now() def _flush_all_buffers(self): """Flush all buffers regardless of thresholds.""" for file_path, tree_name in list(self._buffers.keys()): self._flush_buffer(file_path, tree_name) def __del__(self): # Flush any remaining buffered data try: self._flush_all_buffers() except Exception: pass # Ensure all files are properly closed on exit for root_file in self._open_files.values(): if not root_file.closed: root_file.close() self._open_files.clear() super().__del__()ROOT file store with buffered writes for high throughput.
Buffers incoming data and writes to ROOT in batches to minimize TBasket overhead. Per uproot docs, each extend() should be at least 100 kB per branch for efficient writing.
Ancestors
Class variables
var allowed_message_in_typesvar allowed_store_config_typesvar config : DAQJobStoreROOTConfigvar config_type : type[DAQJobConfig]-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe:list[str]- List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.
Inherited members
class DAQJobStoreROOTConfig (out_dir: str = 'out/',
buffer_size_bytes: int = 5000000,
flush_interval_seconds: float = 1.0,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)-
Expand source code
class DAQJobStoreROOTConfig(DAQJobConfig): out_dir: str = "out/" buffer_size_bytes: int = BUFFER_SIZE_THRESHOLD_BYTES flush_interval_seconds: float = BUFFER_FLUSH_INTERVAL_SECONDSDAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe:list[str]- List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var buffer_size_bytes : int-
Expand source code
class DAQJobStoreROOTConfig(DAQJobConfig): out_dir: str = "out/" buffer_size_bytes: int = BUFFER_SIZE_THRESHOLD_BYTES flush_interval_seconds: float = BUFFER_FLUSH_INTERVAL_SECONDS var flush_interval_seconds : float-
Expand source code
class DAQJobStoreROOTConfig(DAQJobConfig): out_dir: str = "out/" buffer_size_bytes: int = BUFFER_SIZE_THRESHOLD_BYTES flush_interval_seconds: float = BUFFER_FLUSH_INTERVAL_SECONDS var out_dir : str-
Expand source code
class DAQJobStoreROOTConfig(DAQJobConfig): out_dir: str = "out/" buffer_size_bytes: int = BUFFER_SIZE_THRESHOLD_BYTES flush_interval_seconds: float = BUFFER_FLUSH_INTERVAL_SECONDS
class TreeBuffer (tables: list[pyarrow.lib.Table] = <factory>,
total_bytes: int = 0,
last_flush_time: datetime.datetime = <factory>,
store_config: DAQJobStoreConfigROOT | None = None)-
Expand source code
@dataclass class TreeBuffer: """Buffer for accumulating data before writing to ROOT tree.""" tables: list[pa.Table] = field(default_factory=list) total_bytes: int = 0 last_flush_time: datetime = field(default_factory=datetime.now) store_config: DAQJobStoreConfigROOT | None = NoneBuffer for accumulating data before writing to ROOT tree.
Instance variables
var last_flush_time : datetime.datetimevar store_config : DAQJobStoreConfigROOT | Nonevar tables : list[pyarrow.lib.Table]var total_bytes : int