Module enrgdaq.daq.alert.base

Classes

class DAQJobAlert (config: Any, **kwargs)
Expand source code
class DAQJobAlert(DAQJob):
    allowed_message_in_types = [DAQJobMessageAlert]

    def __init__(self, config: Any, **kwargs):
        super().__init__(config, **kwargs)

    def handle_message(self, message: DAQJobMessageAlert) -> bool:
        return super().handle_message(message)

    def start(self):
        while not self._has_been_freed:
            time.sleep(DAQ_JOB_ALERT_LOOP_SLEEP_SECONDS)

DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.

Attributes

allowed_message_in_types : list[type[DAQJobMessage]]
List of allowed message types for input.
topics_to_subscribe : list[str]
List of topics to subscribe to.
config_type : Any
Type of the configuration.
config : Any
Configuration object.
message_in : Queue[DAQJobMessage]
Queue for incoming messages.
message_out : Queue[DAQJobMessage]
Queue for outgoing messages.
instance_id : int
Unique instance identifier.
unique_id : str
Unique identifier for the job.
restart_offset : timedelta
Offset for restarting the job.
info : DAQJobInfo
Information about the job.
_has_been_freed : bool
Flag indicating if the job has been freed.
_logger : logging.Logger
Logger instance for the job.
multiprocessing_method : str
The multiprocessing method to use ('fork' or 'spawn').

Ancestors

Subclasses

Class variables

var allowed_message_in_types

Methods

def start(self)
Expand source code
def start(self):
    while not self._has_been_freed:
        time.sleep(DAQ_JOB_ALERT_LOOP_SLEEP_SECONDS)

Inherited members