Module enrgdaq.daq.alert.base
Classes
class DAQJobAlert (config: Any, **kwargs)
-
Expand source code
class DAQJobAlert(DAQJob): allowed_message_in_types = [DAQJobMessageAlert] def __init__(self, config: Any, **kwargs): super().__init__(config, **kwargs) def start(self): while True: self.consume(nowait=False) def handle_message(self, message: DAQJobMessageAlert) -> bool: return super().handle_message(message) def alert_loop(self): raise NotImplementedError
DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.
Attributes
allowed_message_in_types
:list[type[DAQJobMessage]]
- List of allowed message types for input.
config_type
:Any
- Type of the configuration.
config
:Any
- Configuration object.
message_in
:Queue[DAQJobMessage]
- Queue for incoming messages.
message_out
:Queue[DAQJobMessage]
- Queue for outgoing messages.
instance_id
:int
- Unique instance identifier.
unique_id
:str
- Unique identifier for the job.
restart_offset
:timedelta
- Offset for restarting the job.
info
:DAQJobInfo
- Information about the job.
_has_been_freed
:bool
- Flag indicating if the job has been freed.
_logger
:logging.Logger
- Logger instance for the job.
Ancestors
Subclasses
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
Methods
def alert_loop(self)
-
Expand source code
def alert_loop(self): raise NotImplementedError
def start(self)
-
Expand source code
def start(self): while True: self.consume(nowait=False)
Inherited members