Module enrgdaq.daq.store.models
Classes
class DAQJobMessageStore (store_config: DAQJobStoreConfig,
tag: str | None = None,
target_local_supervisor: bool = False,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)-
Expand source code
class DAQJobMessageStore(DAQJobMessage): """ DAQJobMessageStore is a class that extends DAQJobMessage and is used to store configuration and data related to a DAQ (Data Acquisition) job. Attributes: store_config (DAQJobStoreConfig): Configuration for the DAQ job store. tag (str | None): Optional tag associated with the message. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ store_config: DAQJobStoreConfig tag: str | None = None target_local_supervisor: bool = False def __post_init__(self): self.target_local_supervisor = ( self.target_local_supervisor or self.store_config.target_local_supervisor ) def pre_send(self): super().pre_send() mappings = _get_store_config_base_to_store_job_mapping() for store_type in self.store_config.store_types: for store_job in mappings[store_type]: if self.target_local_supervisor: self.topics.add( Topic.store_supervisor(self.supervisor_id, store_job.__name__) ) else: self.topics.add(Topic.store(store_job.__name__))DAQJobMessageStore is a class that extends DAQJobMessage and is used to store configuration and data related to a DAQ (Data Acquisition) job.
Attributes
store_config:DAQJobStoreConfig- Configuration for the DAQ job store.
tag:str | None- Optional tag associated with the message.
target_local_supervisor:bool- Whether to send the message to store job topics of the local supervisor, or all supervisors.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Subclasses
Instance variables
var store_config : DAQJobStoreConfig-
Expand source code
class DAQJobMessageStore(DAQJobMessage): """ DAQJobMessageStore is a class that extends DAQJobMessage and is used to store configuration and data related to a DAQ (Data Acquisition) job. Attributes: store_config (DAQJobStoreConfig): Configuration for the DAQ job store. tag (str | None): Optional tag associated with the message. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ store_config: DAQJobStoreConfig tag: str | None = None target_local_supervisor: bool = False def __post_init__(self): self.target_local_supervisor = ( self.target_local_supervisor or self.store_config.target_local_supervisor ) def pre_send(self): super().pre_send() mappings = _get_store_config_base_to_store_job_mapping() for store_type in self.store_config.store_types: for store_job in mappings[store_type]: if self.target_local_supervisor: self.topics.add( Topic.store_supervisor(self.supervisor_id, store_job.__name__) ) else: self.topics.add(Topic.store(store_job.__name__)) var tag : str | None-
Expand source code
class DAQJobMessageStore(DAQJobMessage): """ DAQJobMessageStore is a class that extends DAQJobMessage and is used to store configuration and data related to a DAQ (Data Acquisition) job. Attributes: store_config (DAQJobStoreConfig): Configuration for the DAQ job store. tag (str | None): Optional tag associated with the message. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ store_config: DAQJobStoreConfig tag: str | None = None target_local_supervisor: bool = False def __post_init__(self): self.target_local_supervisor = ( self.target_local_supervisor or self.store_config.target_local_supervisor ) def pre_send(self): super().pre_send() mappings = _get_store_config_base_to_store_job_mapping() for store_type in self.store_config.store_types: for store_job in mappings[store_type]: if self.target_local_supervisor: self.topics.add( Topic.store_supervisor(self.supervisor_id, store_job.__name__) ) else: self.topics.add(Topic.store(store_job.__name__)) var target_local_supervisor : bool-
Expand source code
class DAQJobMessageStore(DAQJobMessage): """ DAQJobMessageStore is a class that extends DAQJobMessage and is used to store configuration and data related to a DAQ (Data Acquisition) job. Attributes: store_config (DAQJobStoreConfig): Configuration for the DAQ job store. tag (str | None): Optional tag associated with the message. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ store_config: DAQJobStoreConfig tag: str | None = None target_local_supervisor: bool = False def __post_init__(self): self.target_local_supervisor = ( self.target_local_supervisor or self.store_config.target_local_supervisor ) def pre_send(self): super().pre_send() mappings = _get_store_config_base_to_store_job_mapping() for store_type in self.store_config.store_types: for store_job in mappings[store_type]: if self.target_local_supervisor: self.topics.add( Topic.store_supervisor(self.supervisor_id, store_job.__name__) ) else: self.topics.add(Topic.store(store_job.__name__))
Methods
def pre_send(self)-
Expand source code
def pre_send(self): super().pre_send() mappings = _get_store_config_base_to_store_job_mapping() for store_type in self.store_config.store_types: for store_job in mappings[store_type]: if self.target_local_supervisor: self.topics.add( Topic.store_supervisor(self.supervisor_id, store_job.__name__) ) else: self.topics.add(Topic.store(store_job.__name__))
class DAQJobMessageStorePyArrow (store_config: DAQJobStoreConfig,
tag: str | None = None,
target_local_supervisor: bool = False,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
table: pyarrow.lib.Table | None = None,
handle: RingBufferHandle | None = None)-
Expand source code
class DAQJobMessageStorePyArrow(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStorePyArrow is a high-performance message store using PyArrow's columnar format. Optimized for numerical data with zero-copy reads. Attributes: table (pa.Table | None): A PyArrow Table containing the columnar data. None when using zero-copy mode (handle is set instead). handle (RingBufferHandle | None): Handle to PyArrow data in shared memory. When set, the table is loaded from shared memory using zero-copy. """ table: pa.Table | None = None handle: RingBufferHandle | None = None def get_table(self) -> pa.Table: """ Get the PyArrow table, loading from shared memory if using zero-copy. Returns: pa.Table: The PyArrow table. """ if self.handle is not None: return self.handle.load_pyarrow() if self.table is not None: return self.table raise ValueError("Neither table nor handle is set") def release(self): """ Release the ring buffer slot back to the pool (if using zero-copy). Call this after you're done with the table to allow the slot to be reused. No-op if not using zero-copy mode. """ if self.handle is not None: self.handle.release() def __getstate__(self): state = self.__dict__.copy() # If we have a handle, we serialize it (efficient local transfer) # The supervisor will convert this to full data if sending remotely if self.handle is not None: # Table shouldn't be serialized if we're sending the handle state["table"] = None elif state.get("table") is not None: # Convert table to bytes using Arrow IPC sink = pa.BufferOutputStream() with pa.ipc.new_stream(sink, state["table"].schema) as writer: writer.write_table(state["table"]) state["table"] = sink.getvalue().to_pybytes() return state def __setstate__(self, state): # Restore table from bytes table_bytes = state.get("table") if table_bytes is not None and isinstance(table_bytes, bytes): with pa.ipc.open_stream(table_bytes) as reader: state["table"] = reader.read_all() state["handle"] = None # Handle is never deserialized self.__dict__.update(state)DAQJobMessageStorePyArrow is a high-performance message store using PyArrow's columnar format. Optimized for numerical data with zero-copy reads.
Attributes
table:pa.Table | None- A PyArrow Table containing the columnar data. None when using zero-copy mode (handle is set instead).
handle:RingBufferHandle | None- Handle to PyArrow data in shared memory. When set, the table is loaded from shared memory using zero-copy.
Ancestors
- DAQJobMessageStore
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var handle : RingBufferHandle | None-
Expand source code
class DAQJobMessageStorePyArrow(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStorePyArrow is a high-performance message store using PyArrow's columnar format. Optimized for numerical data with zero-copy reads. Attributes: table (pa.Table | None): A PyArrow Table containing the columnar data. None when using zero-copy mode (handle is set instead). handle (RingBufferHandle | None): Handle to PyArrow data in shared memory. When set, the table is loaded from shared memory using zero-copy. """ table: pa.Table | None = None handle: RingBufferHandle | None = None def get_table(self) -> pa.Table: """ Get the PyArrow table, loading from shared memory if using zero-copy. Returns: pa.Table: The PyArrow table. """ if self.handle is not None: return self.handle.load_pyarrow() if self.table is not None: return self.table raise ValueError("Neither table nor handle is set") def release(self): """ Release the ring buffer slot back to the pool (if using zero-copy). Call this after you're done with the table to allow the slot to be reused. No-op if not using zero-copy mode. """ if self.handle is not None: self.handle.release() def __getstate__(self): state = self.__dict__.copy() # If we have a handle, we serialize it (efficient local transfer) # The supervisor will convert this to full data if sending remotely if self.handle is not None: # Table shouldn't be serialized if we're sending the handle state["table"] = None elif state.get("table") is not None: # Convert table to bytes using Arrow IPC sink = pa.BufferOutputStream() with pa.ipc.new_stream(sink, state["table"].schema) as writer: writer.write_table(state["table"]) state["table"] = sink.getvalue().to_pybytes() return state def __setstate__(self, state): # Restore table from bytes table_bytes = state.get("table") if table_bytes is not None and isinstance(table_bytes, bytes): with pa.ipc.open_stream(table_bytes) as reader: state["table"] = reader.read_all() state["handle"] = None # Handle is never deserialized self.__dict__.update(state) var table : pyarrow.lib.Table | None-
Expand source code
class DAQJobMessageStorePyArrow(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStorePyArrow is a high-performance message store using PyArrow's columnar format. Optimized for numerical data with zero-copy reads. Attributes: table (pa.Table | None): A PyArrow Table containing the columnar data. None when using zero-copy mode (handle is set instead). handle (RingBufferHandle | None): Handle to PyArrow data in shared memory. When set, the table is loaded from shared memory using zero-copy. """ table: pa.Table | None = None handle: RingBufferHandle | None = None def get_table(self) -> pa.Table: """ Get the PyArrow table, loading from shared memory if using zero-copy. Returns: pa.Table: The PyArrow table. """ if self.handle is not None: return self.handle.load_pyarrow() if self.table is not None: return self.table raise ValueError("Neither table nor handle is set") def release(self): """ Release the ring buffer slot back to the pool (if using zero-copy). Call this after you're done with the table to allow the slot to be reused. No-op if not using zero-copy mode. """ if self.handle is not None: self.handle.release() def __getstate__(self): state = self.__dict__.copy() # If we have a handle, we serialize it (efficient local transfer) # The supervisor will convert this to full data if sending remotely if self.handle is not None: # Table shouldn't be serialized if we're sending the handle state["table"] = None elif state.get("table") is not None: # Convert table to bytes using Arrow IPC sink = pa.BufferOutputStream() with pa.ipc.new_stream(sink, state["table"].schema) as writer: writer.write_table(state["table"]) state["table"] = sink.getvalue().to_pybytes() return state def __setstate__(self, state): # Restore table from bytes table_bytes = state.get("table") if table_bytes is not None and isinstance(table_bytes, bytes): with pa.ipc.open_stream(table_bytes) as reader: state["table"] = reader.read_all() state["handle"] = None # Handle is never deserialized self.__dict__.update(state)
Methods
def get_table(self) ‑> pyarrow.lib.Table-
Expand source code
def get_table(self) -> pa.Table: """ Get the PyArrow table, loading from shared memory if using zero-copy. Returns: pa.Table: The PyArrow table. """ if self.handle is not None: return self.handle.load_pyarrow() if self.table is not None: return self.table raise ValueError("Neither table nor handle is set")Get the PyArrow table, loading from shared memory if using zero-copy.
Returns
pa.Table- The PyArrow table.
def release(self)-
Expand source code
def release(self): """ Release the ring buffer slot back to the pool (if using zero-copy). Call this after you're done with the table to allow the slot to be reused. No-op if not using zero-copy mode. """ if self.handle is not None: self.handle.release()Release the ring buffer slot back to the pool (if using zero-copy).
Call this after you're done with the table to allow the slot to be reused. No-op if not using zero-copy mode.
class DAQJobMessageStoreRaw (store_config: DAQJobStoreConfig,
tag: str | None = None,
target_local_supervisor: bool = False,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
data: bytes)-
Expand source code
class DAQJobMessageStoreRaw(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStoreRaw is a class that inherits from DAQJobMessageStore and represents a raw data message store for DAQ jobs. Attributes: data (bytes): The raw data associated with the DAQ job message. """ data: bytesDAQJobMessageStoreRaw is a class that inherits from DAQJobMessageStore and represents a raw data message store for DAQ jobs.
Attributes
data:bytes- The raw data associated with the DAQ job message.
Ancestors
- DAQJobMessageStore
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var data : bytes-
Expand source code
class DAQJobMessageStoreRaw(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStoreRaw is a class that inherits from DAQJobMessageStore and represents a raw data message store for DAQ jobs. Attributes: data (bytes): The raw data associated with the DAQ job message. """ data: bytes
class DAQJobMessageStoreSHM (store_config: DAQJobStoreConfig,
tag: str | None = None,
target_local_supervisor: bool = False,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
shm: SHMHandle)-
Expand source code
class DAQJobMessageStoreSHM(DAQJobMessageStore, kw_only=True): shm: SHMHandleDAQJobMessageStore is a class that extends DAQJobMessage and is used to store configuration and data related to a DAQ (Data Acquisition) job.
Attributes
store_config:DAQJobStoreConfig- Configuration for the DAQ job store.
tag:str | None- Optional tag associated with the message.
target_local_supervisor:bool- Whether to send the message to store job topics of the local supervisor, or all supervisors.
Ancestors
- DAQJobMessageStore
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var shm : SHMHandle-
Expand source code
class DAQJobMessageStoreSHM(DAQJobMessageStore, kw_only=True): shm: SHMHandle
class DAQJobMessageStoreTabular (store_config: DAQJobStoreConfig,
tag: str | None = None,
target_local_supervisor: bool = False,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
keys: list[str],
data: list[list[str | float | int] | numpy.ndarray] | None = None,
data_columns: dict[str, list[str | float | int] | numpy.ndarray] | None = None)-
Expand source code
class DAQJobMessageStoreTabular(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStoreTabular is a class that inherits from DAQJobMessageStore and represents a tabular data store for DAQ job messages. Attributes: keys (list[str]): A list of strings representing the keys or column names of the tabular data. data (list[list[str | float | int]]): A list of lists where each inner list represents a row of data.. """ keys: list[str] data: Optional[list[list[str | float | int] | ndarray]] = None data_columns: Optional[dict[str, list[str | float | int] | ndarray]] = NoneDAQJobMessageStoreTabular is a class that inherits from DAQJobMessageStore and represents a tabular data store for DAQ job messages.
Attributes
keys:list[str]- A list of strings representing the keys or column names of the tabular data.
data:list[list[str | float | int]]- A list of lists where each inner list represents a row of data..
Ancestors
- DAQJobMessageStore
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var data : list[list[str | float | int] | numpy.ndarray] | None-
Expand source code
class DAQJobMessageStoreTabular(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStoreTabular is a class that inherits from DAQJobMessageStore and represents a tabular data store for DAQ job messages. Attributes: keys (list[str]): A list of strings representing the keys or column names of the tabular data. data (list[list[str | float | int]]): A list of lists where each inner list represents a row of data.. """ keys: list[str] data: Optional[list[list[str | float | int] | ndarray]] = None data_columns: Optional[dict[str, list[str | float | int] | ndarray]] = None var data_columns : dict[str, list[str | float | int] | numpy.ndarray] | None-
Expand source code
class DAQJobMessageStoreTabular(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStoreTabular is a class that inherits from DAQJobMessageStore and represents a tabular data store for DAQ job messages. Attributes: keys (list[str]): A list of strings representing the keys or column names of the tabular data. data (list[list[str | float | int]]): A list of lists where each inner list represents a row of data.. """ keys: list[str] data: Optional[list[list[str | float | int] | ndarray]] = None data_columns: Optional[dict[str, list[str | float | int] | ndarray]] = None var keys : list[str]-
Expand source code
class DAQJobMessageStoreTabular(DAQJobMessageStore, kw_only=True): """ DAQJobMessageStoreTabular is a class that inherits from DAQJobMessageStore and represents a tabular data store for DAQ job messages. Attributes: keys (list[str]): A list of strings representing the keys or column names of the tabular data. data (list[list[str | float | int]]): A list of lists where each inner list represents a row of data.. """ keys: list[str] data: Optional[list[list[str | float | int] | ndarray]] = None data_columns: Optional[dict[str, list[str | float | int] | ndarray]] = None
class DAQJobStoreConfig (csv: DAQJobStoreConfigCSV | None = None,
root: DAQJobStoreConfigROOT | None = None,
hdf5: DAQJobStoreConfigHDF5 | None = None,
mysql: DAQJobStoreConfigMySQL | None = None,
redis: DAQJobStoreConfigRedis | None = None,
raw: DAQJobStoreConfigRaw | None = None,
memory: DAQJobStoreConfigMemory | None = None,
store_types: set[type[DAQJobStoreConfigBase]] = <factory>,
target_local_supervisor: bool = False)-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_typesUsed to store the configuration of the DAQ Job Store, usually inside DAQJobConfig.
Attributes
csv:Optional[DAQJobStoreConfigCSV]- CSV store configuration.
root:Optional[DAQJobStoreConfigROOT]- ROOT store configuration.
hdf5:Optional[DAQJobStoreConfigHDF5]- HDF5 store configuration.
mysql:Optional[DAQJobStoreConfigMySQL]- MySQL store configuration.
redis:Optional[DAQJobStoreConfigRedis]- Redis store configuration.
raw:Optional[DAQJobStoreConfigRaw]- Raw store configuration.
memory:Optional[DAQJobStoreConfigMemory]- Memory store configuration.
target_local_supervisor:bool- Whether to send the message to store job topics of the local supervisor, or all supervisors.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var csv : DAQJobStoreConfigCSV | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var hdf5 : DAQJobStoreConfigHDF5 | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var memory : DAQJobStoreConfigMemory | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var mysql : DAQJobStoreConfigMySQL | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var raw : DAQJobStoreConfigRaw | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var redis : DAQJobStoreConfigRedis | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var root : DAQJobStoreConfigROOT | None-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var store_types : set[type[DAQJobStoreConfigBase]]-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types var target_local_supervisor : bool-
Expand source code
class DAQJobStoreConfig(Struct, dict=True): """ Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig. Attributes: csv (Optional[DAQJobStoreConfigCSV]): CSV store configuration. root (Optional[DAQJobStoreConfigROOT]): ROOT store configuration. hdf5 (Optional[DAQJobStoreConfigHDF5]): HDF5 store configuration. mysql (Optional[DAQJobStoreConfigMySQL]): MySQL store configuration. redis (Optional[DAQJobStoreConfigRedis]): Redis store configuration. raw (Optional[DAQJobStoreConfigRaw]): Raw store configuration. memory (Optional[DAQJobStoreConfigMemory]): Memory store configuration. target_local_supervisor (bool): Whether to send the message to store job topics of the local supervisor, or all supervisors. """ csv: "Optional[DAQJobStoreConfigCSV]" = None root: "Optional[DAQJobStoreConfigROOT]" = None hdf5: "Optional[DAQJobStoreConfigHDF5]" = None mysql: "Optional[DAQJobStoreConfigMySQL]" = None redis: "Optional[DAQJobStoreConfigRedis]" = None raw: "Optional[DAQJobStoreConfigRaw]" = None memory: "Optional[DAQJobStoreConfigMemory]" = None store_types: set[type["DAQJobStoreConfigBase"]] = field(default_factory=set) target_local_supervisor: bool = False def __post_init__(self): for key in dir(self): value = getattr(self, key) if isinstance(value, DAQJobStoreConfigBase): self.store_types.add(type(value)) def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types
Methods
def has_store_config(self,
store_type: type['DAQJobStoreConfigBase']) ‑> bool-
Expand source code
def has_store_config(self, store_type: type["DAQJobStoreConfigBase"]) -> bool: return store_type in self.store_types
class DAQJobStoreConfigBase-
Expand source code
class DAQJobStoreConfigBase(Struct, kw_only=True): """ DAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc. """ passDAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Subclasses
class DAQJobStoreConfigCSV (file_path: str,
add_date: bool = False,
overwrite: bool = False,
use_zstd: bool = False)-
Expand source code
class DAQJobStoreConfigCSV(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Include the date in the file path. """ overwrite: bool = False """ Overwrite the file contents. """ use_zstd: bool = False """ Use zstd streaming compression. Recommended for live streaming as it creates independent frames that can be recovered even if the process is killed. """DAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var add_date : bool-
Expand source code
class DAQJobStoreConfigCSV(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Include the date in the file path. """ overwrite: bool = False """ Overwrite the file contents. """ use_zstd: bool = False """ Use zstd streaming compression. Recommended for live streaming as it creates independent frames that can be recovered even if the process is killed. """Include the date in the file path.
var file_path : str-
Expand source code
class DAQJobStoreConfigCSV(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Include the date in the file path. """ overwrite: bool = False """ Overwrite the file contents. """ use_zstd: bool = False """ Use zstd streaming compression. Recommended for live streaming as it creates independent frames that can be recovered even if the process is killed. """File path to store data in.
var overwrite : bool-
Expand source code
class DAQJobStoreConfigCSV(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Include the date in the file path. """ overwrite: bool = False """ Overwrite the file contents. """ use_zstd: bool = False """ Use zstd streaming compression. Recommended for live streaming as it creates independent frames that can be recovered even if the process is killed. """Overwrite the file contents.
var use_zstd : bool-
Expand source code
class DAQJobStoreConfigCSV(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Include the date in the file path. """ overwrite: bool = False """ Overwrite the file contents. """ use_zstd: bool = False """ Use zstd streaming compression. Recommended for live streaming as it creates independent frames that can be recovered even if the process is killed. """Use zstd streaming compression. Recommended for live streaming as it creates independent frames that can be recovered even if the process is killed.
class DAQJobStoreConfigHDF5 (file_path: str, add_date: bool, dataset_name: str)-
Expand source code
class DAQJobStoreConfigHDF5(DAQJobStoreConfigBase): file_path: str add_date: bool dataset_name: strDAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var add_date : bool-
Expand source code
class DAQJobStoreConfigHDF5(DAQJobStoreConfigBase): file_path: str add_date: bool dataset_name: str var dataset_name : str-
Expand source code
class DAQJobStoreConfigHDF5(DAQJobStoreConfigBase): file_path: str add_date: bool dataset_name: str var file_path : str-
Expand source code
class DAQJobStoreConfigHDF5(DAQJobStoreConfigBase): file_path: str add_date: bool dataset_name: str
class DAQJobStoreConfigMemory-
Expand source code
class DAQJobStoreConfigMemory(DAQJobStoreConfigBase): """ Configuration for in-memory DAQ job store. """ passConfiguration for in-memory DAQ job store.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
class DAQJobStoreConfigMySQL (table_name: str)-
Expand source code
class DAQJobStoreConfigMySQL(DAQJobStoreConfigBase): table_name: strDAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var table_name : str-
Expand source code
class DAQJobStoreConfigMySQL(DAQJobStoreConfigBase): table_name: str
class DAQJobStoreConfigROOT (file_path: str,
add_date: bool,
tree_name: str,
compression_type: str = 'ZSTD',
compression_level: int = 5)-
Expand source code
class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str add_date: bool tree_name: str compression_type: str = "ZSTD" compression_level: int = 5DAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var add_date : bool-
Expand source code
class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str add_date: bool tree_name: str compression_type: str = "ZSTD" compression_level: int = 5 var compression_level : int-
Expand source code
class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str add_date: bool tree_name: str compression_type: str = "ZSTD" compression_level: int = 5 var compression_type : str-
Expand source code
class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str add_date: bool tree_name: str compression_type: str = "ZSTD" compression_level: int = 5 var file_path : str-
Expand source code
class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str add_date: bool tree_name: str compression_type: str = "ZSTD" compression_level: int = 5 var tree_name : str-
Expand source code
class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str add_date: bool tree_name: str compression_type: str = "ZSTD" compression_level: int = 5
class DAQJobStoreConfigRaw (file_path: str, add_date: bool = False, overwrite: bool = True)-
Expand source code
class DAQJobStoreConfigRaw(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Overwrite the file contents always. """ overwrite: bool = True """ Overwrite the file contents. """DAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var add_date : bool-
Expand source code
class DAQJobStoreConfigRaw(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Overwrite the file contents always. """ overwrite: bool = True """ Overwrite the file contents. """Overwrite the file contents always.
var file_path : str-
Expand source code
class DAQJobStoreConfigRaw(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Overwrite the file contents always. """ overwrite: bool = True """ Overwrite the file contents. """File path to store data in.
var overwrite : bool-
Expand source code
class DAQJobStoreConfigRaw(DAQJobStoreConfigBase): file_path: str """ File path to store data in. """ add_date: bool = False """ Overwrite the file contents always. """ overwrite: bool = True """ Overwrite the file contents. """Overwrite the file contents.
class DAQJobStoreConfigRedis (key: str,
key_expiration_days: int | None = None,
use_timeseries: bool | None = None)-
Expand source code
class DAQJobStoreConfigRedis(DAQJobStoreConfigBase): key: str """ Redis key to store data in. Data keys will be prefixed with the redis_key, e.g. for the data key "test", the redis key will be "redis_key.test". If the expiration is set, the key will be prefixed with the date, e.g. for the data key "test", the redis key will be "redis_key.test:2023-01-01". """ key_expiration_days: int | None = None """ Delete keys older than this number of days. If None, keys will not be deleted. """ use_timeseries: bool | None = None """ Utilize Redis Timeseries to store data. A key called "timestamp" is requires when using timeseries. """DAQJobStoreConfigBase is a configuration class for DAQ job store, that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Ancestors
- DAQJobStoreConfigBase
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var key : str-
Expand source code
class DAQJobStoreConfigRedis(DAQJobStoreConfigBase): key: str """ Redis key to store data in. Data keys will be prefixed with the redis_key, e.g. for the data key "test", the redis key will be "redis_key.test". If the expiration is set, the key will be prefixed with the date, e.g. for the data key "test", the redis key will be "redis_key.test:2023-01-01". """ key_expiration_days: int | None = None """ Delete keys older than this number of days. If None, keys will not be deleted. """ use_timeseries: bool | None = None """ Utilize Redis Timeseries to store data. A key called "timestamp" is requires when using timeseries. """Redis key to store data in.
Data keys will be prefixed with the redis_key, e.g. for the data key "test", the redis key will be "redis_key.test".
If the expiration is set, the key will be prefixed with the date, e.g. for the data key "test", the redis key will be "redis_key.test:2023-01-01".
var key_expiration_days : int | None-
Expand source code
class DAQJobStoreConfigRedis(DAQJobStoreConfigBase): key: str """ Redis key to store data in. Data keys will be prefixed with the redis_key, e.g. for the data key "test", the redis key will be "redis_key.test". If the expiration is set, the key will be prefixed with the date, e.g. for the data key "test", the redis key will be "redis_key.test:2023-01-01". """ key_expiration_days: int | None = None """ Delete keys older than this number of days. If None, keys will not be deleted. """ use_timeseries: bool | None = None """ Utilize Redis Timeseries to store data. A key called "timestamp" is requires when using timeseries. """Delete keys older than this number of days.
If None, keys will not be deleted.
var use_timeseries : bool | None-
Expand source code
class DAQJobStoreConfigRedis(DAQJobStoreConfigBase): key: str """ Redis key to store data in. Data keys will be prefixed with the redis_key, e.g. for the data key "test", the redis key will be "redis_key.test". If the expiration is set, the key will be prefixed with the date, e.g. for the data key "test", the redis key will be "redis_key.test:2023-01-01". """ key_expiration_days: int | None = None """ Delete keys older than this number of days. If None, keys will not be deleted. """ use_timeseries: bool | None = None """ Utilize Redis Timeseries to store data. A key called "timestamp" is requires when using timeseries. """Utilize Redis Timeseries to store data.
A key called "timestamp" is requires when using timeseries.
class StorableDAQJobConfig (store_config: DAQJobStoreConfig,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)-
Expand source code
class StorableDAQJobConfig(DAQJobConfig): store_config: DAQJobStoreConfigDAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
topics_to_subscribe:list[str]- List of topics to subscribe to. Use with caution, making DAQJob subscribe to topics with messages it doesn't know how to handle will result in errors.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Subclasses
- DAQJobBenchmarkConfig
- DAQJobCAENToolboxConfig
- DAQJobCameraConfig
- DAQJobHandleAlertsConfig
- DAQJobHandleStatsConfig
- DAQJobHandleTracesConfig
- DAQJobPCMetricsConfig
- DAQXiaomiMijiaConfig
- DAQJobTestConfig
Instance variables
var store_config : DAQJobStoreConfig-
Expand source code
class StorableDAQJobConfig(DAQJobConfig): store_config: DAQJobStoreConfig