Module enrgdaq.daq.jobs.store.raw
Classes
class DAQJobStoreRaw (config: DAQJobStoreRawConfig,
**kwargs)-
Expand source code
class DAQJobStoreRaw(DAQJobStore): config: DAQJobStoreRawConfig config_type = DAQJobStoreRawConfig allowed_store_config_types = [DAQJobStoreConfigRaw] allowed_message_in_types = [DAQJobMessageStoreRaw] _open_raw_files: dict[str, RawFile] def __init__(self, config: DAQJobStoreRawConfig, **kwargs): super().__init__(config, **kwargs) self._open_raw_files = {} def handle_message(self, message: DAQJobMessageStoreRaw) -> bool: if not super().handle_message(message): return False store_config = cast(DAQJobStoreConfigRaw, message.store_config.raw) file_path = modify_file_path( store_config.file_path, store_config.add_date, message.tag ) file_path = os.path.join(self.config.out_dir, file_path) file, new_file = self._open_raw_file(file_path, store_config.overwrite) if file.overwrite: file.write_queue.clear() # Append raw data to write_queue file.write_queue.append(message.data) return True def _open_raw_file( self, file_path: str, overwrite: Optional[bool] ) -> tuple[RawFile, bool]: """ Opens a file and returns (RawFile, new_file) """ if file_path not in self._open_raw_files: file_exists = os.path.exists(file_path) # Create the file if it doesn't exist if not file_exists: # Create the directory if it doesn't exist Path(os.path.dirname(file_path)).mkdir(parents=True, exist_ok=True) Path(file_path).touch() file_handle = open(file_path, "ab" if not overwrite else "wb") # Open file file = RawFile( file_handle, datetime.now(), deque(), overwrite, ) self._open_raw_files[file_path] = file else: file_exists = True file = self._open_raw_files[file_path] return file, not file_exists def _flush(self, file: RawFile) -> bool: if ( datetime.now() - file.last_flush_date ).total_seconds() < DAQ_JOB_STORE_RAW_FLUSH_INTERVAL_SECONDS: return False self._logger.debug("Flushing raw file") file.file.flush() file.last_flush_date = datetime.now() return True def store_loop(self): files_to_delete = [] for file_path, file in self._open_raw_files.items(): if file.file.closed: files_to_delete.append(file_path) continue while file.write_queue: item = file.write_queue.popleft() self._logger.debug("Writing raw file " + str(len(item))) file.file.write(item) if file.overwrite: file.file.close() files_to_delete.append(file_path) continue # Flush if the flush time is up self._flush(file) for file_path in files_to_delete: del self._open_raw_files[file_path] def __del__(self): self.store_loop() # Close all open files for file in self._open_raw_files.values(): if file.file.closed: continue file.file.close() return super().__del__()DAQJobStore is an abstract base class for data acquisition job stores.
Ancestors
Class variables
var allowed_message_in_typesvar allowed_store_config_typesvar config : DAQJobStoreRawConfigvar config_type : type[DAQJobConfig]-
DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
Methods
def store_loop(self)-
Expand source code
def store_loop(self): files_to_delete = [] for file_path, file in self._open_raw_files.items(): if file.file.closed: files_to_delete.append(file_path) continue while file.write_queue: item = file.write_queue.popleft() self._logger.debug("Writing raw file " + str(len(item))) file.file.write(item) if file.overwrite: file.file.close() files_to_delete.append(file_path) continue # Flush if the flush time is up self._flush(file) for file_path in files_to_delete: del self._open_raw_files[file_path]
Inherited members
class DAQJobStoreRawConfig (out_dir: str = 'out/',
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)-
Expand source code
class DAQJobStoreRawConfig(DAQJobConfig): out_dir: str = "out/"DAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var out_dir : str-
Expand source code
class DAQJobStoreRawConfig(DAQJobConfig): out_dir: str = "out/"
class RawFile (file: Any,
last_flush_date: datetime.datetime,
write_queue: collections.deque[bytes],
overwrite: bool | None = None)-
Expand source code
@dataclass class RawFile: file: Any last_flush_date: datetime write_queue: deque[bytes] overwrite: Optional[bool] = NoneRawFile(file: Any, last_flush_date: datetime.datetime, write_queue: collections.deque[bytes], overwrite: Optional[bool] = None)
Instance variables
var file : Anyvar last_flush_date : datetime.datetimevar overwrite : bool | Nonevar write_queue : collections.deque[bytes]