Module enrgdaq.daq.base
Classes
class DAQJob (config: Any,
supervisor_config: SupervisorConfig | None = None)-
Expand source code
class DAQJob: """ DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages. Attributes: allowed_message_in_types (list[type[DAQJobMessage]]): List of allowed message types for input. config_type (Any): Type of the configuration. config (Any): Configuration object. message_in (Queue[DAQJobMessage]): Queue for incoming messages. message_out (Queue[DAQJobMessage]): Queue for outgoing messages. instance_id (int): Unique instance identifier. unique_id (str): Unique identifier for the job. restart_offset (timedelta): Offset for restarting the job. info (DAQJobInfo): Information about the job. _has_been_freed (bool): Flag indicating if the job has been freed. _logger (logging.Logger): Logger instance for the job. """ allowed_message_in_types: list[type[DAQJobMessage]] = [] config_type: Any config: Any message_in: Queue[DAQJobMessage] message_out: Queue[DAQJobMessage] instance_id: int unique_id: str restart_offset: timedelta info: "DAQJobInfo" _has_been_freed: bool _logger: logging.Logger def __init__( self, config: Any, supervisor_config: Optional[SupervisorConfig] = None ): global daq_job_instance_id, daq_job_instance_id_lock with daq_job_instance_id_lock: self.instance_id = daq_job_instance_id daq_job_instance_id += 1 self._logger = logging.getLogger(f"{type(self).__name__}({self.instance_id})") if isinstance(config, DAQJobConfig): self._logger.setLevel(config.verbosity.to_logging_level()) self.config = config self.message_in = Queue() self.message_out = Queue() self._has_been_freed = False self.unique_id = str(uuid.uuid4()) if supervisor_config is not None: self._supervisor_config = supervisor_config else: self._supervisor_config = None self.info = self._create_info() def consume(self, nowait=True): """ Consumes messages from the message_in queue. If nowait is True, it will consume the message immediately. Otherwise, it will wait until a message is available. """ def _process_message(message): if not self.handle_message(message): self.message_in.put(message) # Return immediately after consuming the message if not nowait: _process_message(self.message_in.get()) return while True: try: _process_message(self.message_in.get_nowait()) except Empty: break def handle_message(self, message: "DAQJobMessage") -> bool: """ Handles a message received from the message queue. Args: message (DAQJobMessage): The message to handle. Returns: bool: True if the message was handled, False otherwise. Raises: DAQJobStopError: If the message is a DAQJobMessageStop. Exception: If the message is not accepted by the job. """ if isinstance(message, DAQJobMessageStop): raise DAQJobStopError(message.reason) # check if the message is accepted is_message_type_accepted = False for accepted_message_type in self.allowed_message_in_types: if isinstance(message, accepted_message_type): is_message_type_accepted = True if not is_message_type_accepted: raise Exception( f"Message type '{type(message)}' is not accepted by '{type(self).__name__}'" ) # Drop remote messages silently if configured to do so if self.config.remote_config.drop_remote_messages: if message.daq_job_info and message.daq_job_info.supervisor_config: remote_supervisor_id = ( message.daq_job_info.supervisor_config.supervisor_id ) else: remote_supervisor_id = "unknown" self._logger.debug( f"Dropping remote message '{type(message)}' from '{remote_supervisor_id}' because drop_remote_messages is True" ) return True return True def start(self): raise NotImplementedError def _create_info(self) -> "DAQJobInfo": """ Creates a DAQJobInfo object for the job. Returns: DAQJobInfo: The created DAQJobInfo object. """ return DAQJobInfo( daq_job_type=self.config.daq_job_type if isinstance(self.config, DAQJobConfig) else self.config["daq_job_type"], daq_job_class_name=type(self).__name__, unique_id=self.unique_id, instance_id=self.instance_id, supervisor_config=getattr(self, "_supervisor_config", None), ) def _put_message_out(self, message: DAQJobMessage): """ Puts a message in the message_out queue. Should be called by DAQJob itself. Args: message (DAQJobMessage): The message to put in the queue. """ message.daq_job_info = self.info message.remote_config = self.config.remote_config # Get the remote config from the store config if it exists if isinstance(message, DAQJobMessageStore): store_remote_config = message.get_remote_config() if store_remote_config is not None: message.remote_config = store_remote_config if self.config.verbosity == LogVerbosity.DEBUG: msg_json = msgspec.json.encode(message) self._logger.debug(f"Message out: {msg_json}") self.message_out.put(message) def __del__(self): self._logger.info("DAQ job is being deleted") self._has_been_freed = True @property def supervisor_id(self): return self.info.supervisor_config.supervisor_id def free(self): if self._has_been_freed: return self._has_been_freed = True self.__del__()
DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.
Attributes
allowed_message_in_types
:list[type[DAQJobMessage]]
- List of allowed message types for input.
config_type
:Any
- Type of the configuration.
config
:Any
- Configuration object.
message_in
:Queue[DAQJobMessage]
- Queue for incoming messages.
message_out
:Queue[DAQJobMessage]
- Queue for outgoing messages.
instance_id
:int
- Unique instance identifier.
unique_id
:str
- Unique identifier for the job.
restart_offset
:timedelta
- Offset for restarting the job.
info
:DAQJobInfo
- Information about the job.
_has_been_freed
:bool
- Flag indicating if the job has been freed.
_logger
:logging.Logger
- Logger instance for the job.
Subclasses
- DAQJobAlert
- DAQJobN1081B
- DAQJobCAENToolbox
- DAQJobCamera
- DAQJobHandleAlerts
- DAQJobHandleStats
- DAQJobHealthcheck
- DAQJobPCMetrics
- DAQJobRemote
- DAQJobRemoteProxy
- DAQJobServeHTTP
- DAQJobTest
- DAQJobStore
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]
var config : Any
var config_type : Any
var info : DAQJobInfo
var instance_id : int
var message_in : queue.Queue[DAQJobMessage]
var message_out : queue.Queue[DAQJobMessage]
var restart_offset : datetime.timedelta
var unique_id : str
Instance variables
prop supervisor_id
-
Expand source code
@property def supervisor_id(self): return self.info.supervisor_config.supervisor_id
Methods
def consume(self, nowait=True)
-
Expand source code
def consume(self, nowait=True): """ Consumes messages from the message_in queue. If nowait is True, it will consume the message immediately. Otherwise, it will wait until a message is available. """ def _process_message(message): if not self.handle_message(message): self.message_in.put(message) # Return immediately after consuming the message if not nowait: _process_message(self.message_in.get()) return while True: try: _process_message(self.message_in.get_nowait()) except Empty: break
Consumes messages from the message_in queue. If nowait is True, it will consume the message immediately. Otherwise, it will wait until a message is available.
def free(self)
-
Expand source code
def free(self): if self._has_been_freed: return self._has_been_freed = True self.__del__()
def handle_message(self, message: DAQJobMessage) ‑> bool
-
Expand source code
def handle_message(self, message: "DAQJobMessage") -> bool: """ Handles a message received from the message queue. Args: message (DAQJobMessage): The message to handle. Returns: bool: True if the message was handled, False otherwise. Raises: DAQJobStopError: If the message is a DAQJobMessageStop. Exception: If the message is not accepted by the job. """ if isinstance(message, DAQJobMessageStop): raise DAQJobStopError(message.reason) # check if the message is accepted is_message_type_accepted = False for accepted_message_type in self.allowed_message_in_types: if isinstance(message, accepted_message_type): is_message_type_accepted = True if not is_message_type_accepted: raise Exception( f"Message type '{type(message)}' is not accepted by '{type(self).__name__}'" ) # Drop remote messages silently if configured to do so if self.config.remote_config.drop_remote_messages: if message.daq_job_info and message.daq_job_info.supervisor_config: remote_supervisor_id = ( message.daq_job_info.supervisor_config.supervisor_id ) else: remote_supervisor_id = "unknown" self._logger.debug( f"Dropping remote message '{type(message)}' from '{remote_supervisor_id}' because drop_remote_messages is True" ) return True return True
Handles a message received from the message queue.
Args
message
:DAQJobMessage
- The message to handle.
Returns
bool
- True if the message was handled, False otherwise.
Raises
DAQJobStopError
- If the message is a DAQJobMessageStop.
Exception
- If the message is not accepted by the job.
def start(self)
-
Expand source code
def start(self): raise NotImplementedError
class DAQJobThread (daq_job: DAQJob,
thread: threading.Thread,
start_time: datetime.datetime = <factory>)-
Expand source code
@dataclass class DAQJobThread: daq_job: DAQJob thread: threading.Thread start_time: datetime = field(default_factory=datetime.now)
DAQJobThread(daq_job: enrgdaq.daq.base.DAQJob, thread: threading.Thread, start_time: datetime.datetime =
) Class variables
var daq_job : DAQJob
var start_time : datetime.datetime
var thread : threading.Thread