Module enrgdaq.daq.jobs.store.raw
Classes
class DAQJobStoreRaw (config: DAQJobStoreRawConfig,
**kwargs)-
Expand source code
class DAQJobStoreRaw(DAQJobStore): config_type = DAQJobStoreRawConfig allowed_store_config_types = [DAQJobStoreConfigRaw] allowed_message_in_types = [DAQJobMessageStoreRaw] _open_raw_files: dict[str, RawFile] def __init__(self, config: DAQJobStoreRawConfig, **kwargs): super().__init__(config, **kwargs) self._open_raw_files = {} def handle_message(self, message: DAQJobMessageStoreRaw) -> bool: if not super().handle_message(message): return False store_config = cast(DAQJobStoreConfigRaw, message.store_config.raw) file_path = modify_file_path( store_config.file_path, store_config.add_date, message.tag ) file_path = os.path.join(self.config.out_dir, file_path) file, new_file = self._open_raw_file(file_path, store_config.overwrite) if file.overwrite: file.write_queue.clear() # Append raw data to write_queue file.write_queue.append(message.data) return True def _open_raw_file( self, file_path: str, overwrite: Optional[bool] ) -> tuple[RawFile, bool]: """ Opens a file and returns (RawFile, new_file) """ if file_path not in self._open_raw_files: file_exists = os.path.exists(file_path) # Create the file if it doesn't exist if not file_exists: # Create the directory if it doesn't exist Path(os.path.dirname(file_path)).mkdir(parents=True, exist_ok=True) Path(file_path).touch() file_handle = open(file_path, "ab" if not overwrite else "wb") # Open file file = RawFile( file_handle, datetime.now(), deque(), overwrite, ) self._open_raw_files[file_path] = file else: file_exists = True file = self._open_raw_files[file_path] return file, not file_exists def _flush(self, file: RawFile) -> bool: if ( datetime.now() - file.last_flush_date ).total_seconds() < DAQ_JOB_STORE_RAW_FLUSH_INTERVAL_SECONDS: return False file.file.flush() file.last_flush_date = datetime.now() return True def store_loop(self): files_to_delete = [] for file_path, file in self._open_raw_files.items(): if file.file.closed: files_to_delete.append(file_path) continue while file.write_queue: file.file.write(file.write_queue.popleft()) if file.overwrite: file.file.close() files_to_delete.append(file_path) continue # Flush if the flush time is up self._flush(file) for file_path in files_to_delete: del self._open_raw_files[file_path] def __del__(self): self.store_loop() # Close all open files for file in self._open_raw_files.values(): if file.file.closed: continue file.file.close() return super().__del__()
DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.
Attributes
allowed_store_config_types
:list
- A list of allowed store configuration types.
Ancestors
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
var allowed_store_config_types : list
var config_type : Any
-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Methods
def store_loop(self)
-
Expand source code
def store_loop(self): files_to_delete = [] for file_path, file in self._open_raw_files.items(): if file.file.closed: files_to_delete.append(file_path) continue while file.write_queue: file.file.write(file.write_queue.popleft()) if file.overwrite: file.file.close() files_to_delete.append(file_path) continue # Flush if the flush time is up self._flush(file) for file_path in files_to_delete: del self._open_raw_files[file_path]
Inherited members
class DAQJobStoreRawConfig (out_dir: str = 'out/',
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)-
Expand source code
class DAQJobStoreRawConfig(DAQJobConfig): out_dir: str = "out/"
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity
:LogVerbosity
- The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config
:Optional[DAQRemoteConfig]
- The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type
:str
- The type of the DAQ job.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var out_dir : str
-
Expand source code
class DAQJobStoreRawConfig(DAQJobConfig): out_dir: str = "out/"
class RawFile (file: Any,
last_flush_date: datetime.datetime,
write_queue: collections.deque[bytes],
overwrite: bool | None = None)-
Expand source code
@dataclass class RawFile: file: Any last_flush_date: datetime write_queue: deque[bytes] overwrite: Optional[bool] = None
RawFile(file: Any, last_flush_date: datetime.datetime, write_queue: collections.deque[bytes], overwrite: Optional[bool] = None)
Class variables
var file : Any
var last_flush_date : datetime.datetime
var overwrite : bool | None
var write_queue : collections.deque[bytes]