Module enrgdaq.daq.alert.base
Classes
class DAQJobAlert (config: Any, **kwargs)-
Expand source code
class DAQJobAlert(DAQJob): allowed_message_in_types = [DAQJobMessageAlert] def __init__(self, config: Any, **kwargs): super().__init__(config, **kwargs) def start(self): while True: self.consume(nowait=False) def handle_message(self, message: DAQJobMessageAlert) -> bool: return super().handle_message(message) def alert_loop(self): raise NotImplementedErrorDAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.
Attributes
allowed_message_in_types:list[type[DAQJobMessage]]- List of allowed message types for input.
config_type:Any- Type of the configuration.
config:Any- Configuration object.
message_in:Queue[DAQJobMessage]- Queue for incoming messages.
message_out:Queue[DAQJobMessage]- Queue for outgoing messages.
instance_id:int- Unique instance identifier.
unique_id:str- Unique identifier for the job.
restart_offset:timedelta- Offset for restarting the job.
info:DAQJobInfo- Information about the job.
_has_been_freed:bool- Flag indicating if the job has been freed.
_logger:logging.Logger- Logger instance for the job.
Ancestors
Subclasses
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
Methods
def alert_loop(self)-
Expand source code
def alert_loop(self): raise NotImplementedError def start(self)-
Expand source code
def start(self): while True: self.consume(nowait=False)
Inherited members