Module enrgdaq.daq.store.base
Classes
class DAQJobStore (config: Any,
supervisor_config: SupervisorConfig | None = None)-
Expand source code
class DAQJobStore(DAQJob): """ DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages. Attributes: allowed_store_config_types (list): A list of allowed store configuration types. """ allowed_store_config_types: list def start(self): """ Starts the continuous loop for consuming and storing data. This method runs an infinite loop that repeatedly calls the `consume` and `store_loop` methods. """ while True: self.consume() self.store_loop() time.sleep(STORE_LOOP_INTERVAL_SECONDS) def store_loop(self): raise NotImplementedError def handle_message(self, message: DAQJobMessage) -> bool: if not self.can_store(message): raise Exception( f"Invalid message type '{type(message)}' for DAQJob '{type(self).__name__}'" ) return super().handle_message(message) def can_store(self, message: DAQJobMessage) -> bool: """ Determines if the given message can be stored based on its configuration. Args: message (DAQJobMessage): The message to be checked. Returns: bool: True if the message can be stored, False otherwise. """ if not isinstance(message, DAQJobMessageStore): return False is_message_allowed = False for allowed_config_type in self.allowed_store_config_types: if message.store_config.has_store_config(allowed_config_type): is_message_allowed = True return is_message_allowed
DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class and provides additional functionality for handling and storing messages.
Attributes
allowed_store_config_types
:list
- A list of allowed store configuration types.
Ancestors
Subclasses
Class variables
var allowed_store_config_types : list
Methods
def can_store(self,
message: DAQJobMessage) ‑> bool-
Expand source code
def can_store(self, message: DAQJobMessage) -> bool: """ Determines if the given message can be stored based on its configuration. Args: message (DAQJobMessage): The message to be checked. Returns: bool: True if the message can be stored, False otherwise. """ if not isinstance(message, DAQJobMessageStore): return False is_message_allowed = False for allowed_config_type in self.allowed_store_config_types: if message.store_config.has_store_config(allowed_config_type): is_message_allowed = True return is_message_allowed
Determines if the given message can be stored based on its configuration.
Args
message
:DAQJobMessage
- The message to be checked.
Returns
bool
- True if the message can be stored, False otherwise.
def start(self)
-
Expand source code
def start(self): """ Starts the continuous loop for consuming and storing data. This method runs an infinite loop that repeatedly calls the `consume` and `store_loop` methods. """ while True: self.consume() self.store_loop() time.sleep(STORE_LOOP_INTERVAL_SECONDS)
Starts the continuous loop for consuming and storing data. This method runs an infinite loop that repeatedly calls the
consume
andstore_loop
methods. def store_loop(self)
-
Expand source code
def store_loop(self): raise NotImplementedError
Inherited members