Module enrgdaq.cnc.handlers
Sub-modules
enrgdaq.cnc.handlers.baseenrgdaq.cnc.handlers.heartbeatenrgdaq.cnc.handlers.req_list_clientsenrgdaq.cnc.handlers.req_logenrgdaq.cnc.handlers.req_pingenrgdaq.cnc.handlers.req_restart_daqenrgdaq.cnc.handlers.req_restart_daqjobsenrgdaq.cnc.handlers.req_run_custom_daqjobenrgdaq.cnc.handlers.req_send_messageenrgdaq.cnc.handlers.req_statusenrgdaq.cnc.handlers.req_stop_daqjobenrgdaq.cnc.handlers.res_pingenrgdaq.cnc.handlers.res_status
Classes
class CNCMessageHandler (cnc: SupervisorCNC)-
Expand source code
class CNCMessageHandler(ABC): """ Abstract base class for C&C message handlers. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ self.cnc = cnc self._logger = cnc._logger @abstractmethod def handle( self, sender_identity: bytes, msg: CNCMessage ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles an incoming C&C message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The C&C message to handle. :return: An optional tuple containing the response message and a boolean indicating if it's a forward reply. """ passAbstract base class for C&C message handlers.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- abc.ABC
Subclasses
- HeartbeatHandler
- ReqListClientsHandler
- ReqLogHandler
- ReqPingHandler
- ReqRestartHandler
- ReqStopDAQJobsHandler
- ReqRunCustomDAQJobHandler
- ReqSendMessageHandler
- ReqStatusHandler
- ReqStopDAQJobHandler
- ResPingHandler
- ResStatusHandler
Methods
def handle(self, sender_identity: bytes, msg: CNCMessage) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
@abstractmethod def handle( self, sender_identity: bytes, msg: CNCMessage ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles an incoming C&C message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The C&C message to handle. :return: An optional tuple containing the response message and a boolean indicating if it's a forward reply. """ passHandles an incoming C&C message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The C&C message to handle. :return: An optional tuple containing the response message and a boolean indicating if it's a forward reply.
class HeartbeatHandler (cnc: SupervisorCNC)-
Expand source code
class HeartbeatHandler(CNCMessageHandler): """ Handler for CNCMessageHeartbeat messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageHeartbeat ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a heartbeat message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The heartbeat message. :return: None """ sender_id_str = sender_identity.decode("utf-8") self._logger.debug(f"Received heartbeat from {sender_id_str}") self.cnc.clients[sender_id_str] = CNCClientInfo( identity=sender_identity, last_seen=datetime.now().isoformat(), info=msg.supervisor_info, ) return NoneHandler for CNCMessageHeartbeat messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageHeartbeat) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageHeartbeat ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a heartbeat message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The heartbeat message. :return: None """ sender_id_str = sender_identity.decode("utf-8") self._logger.debug(f"Received heartbeat from {sender_id_str}") self.cnc.clients[sender_id_str] = CNCClientInfo( identity=sender_identity, last_seen=datetime.now().isoformat(), info=msg.supervisor_info, ) return NoneHandles a heartbeat message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The heartbeat message. :return: None
class ReqListClientsHandler (cnc: SupervisorCNC)-
Expand source code
class ReqListClientsHandler(CNCMessageHandler): """ Handler for CNCMessageReqListClients messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqListClients ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a list clients request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The list clients request message. :return: A list clients response message. """ sender_id_str = sender_identity.decode("utf-8") self._logger.debug(f"Received list clients request from {sender_id_str}") client_list = {cid: cinfo.info for cid, cinfo in self.cnc.clients.items()} return CNCMessageResListClients(clients=client_list), FalseHandler for CNCMessageReqListClients messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqListClients) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqListClients ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a list clients request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The list clients request message. :return: A list clients response message. """ sender_id_str = sender_identity.decode("utf-8") self._logger.debug(f"Received list clients request from {sender_id_str}") client_list = {cid: cinfo.info for cid, cinfo in self.cnc.clients.items()} return CNCMessageResListClients(clients=client_list), FalseHandles a list clients request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The list clients request message. :return: A list clients response message.
class ReqLogHandler (cnc: SupervisorCNC)-
Expand source code
class ReqLogHandler(CNCMessageHandler): """ Handler for CNCMessageLog messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageLog ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a log message from a client. :param sender_identity: The ZMQ identity of the message sender. :param msg: The log message. :return: None """ self.cnc.add_client_log(sender_identity.decode("utf-8"), msg) return NoneHandler for CNCMessageLog messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageLog) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageLog ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a log message from a client. :param sender_identity: The ZMQ identity of the message sender. :param msg: The log message. :return: None """ self.cnc.add_client_log(sender_identity.decode("utf-8"), msg) return NoneHandles a log message from a client. :param sender_identity: The ZMQ identity of the message sender. :param msg: The log message. :return: None
class ReqPingHandler (cnc: SupervisorCNC)-
Expand source code
class ReqPingHandler(CNCMessageHandler): """ Handler for CNCMessageReqPing messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqPing ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a ping request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The ping request message. :return: A pong response message. """ self._logger.debug("Received ping, sending pong.") return CNCMessageResPing(), TrueHandler for CNCMessageReqPing messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqPing) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqPing ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a ping request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The ping request message. :return: A pong response message. """ self._logger.debug("Received ping, sending pong.") return CNCMessageResPing(), TrueHandles a ping request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The ping request message. :return: A pong response message.
class ReqRestartHandler (cnc: SupervisorCNC)-
Expand source code
class ReqRestartHandler(CNCMessageHandler): """ Handler for CNCMessageReqUpdateAndRestart messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqRestartDAQ ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles an update and restart request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The update and restart request message. :return: An update and restart response message. """ self._logger.info("Received update and restart request.") try: if msg.update: # Run git pull git_result = subprocess.run( ["git", "pull"], check=True, capture_output=True, text=True ) self._logger.info(f"Git pull output: {git_result.stdout}") # Run uv sync sync_result = subprocess.run( ["uv", "sync"], check=True, capture_output=True, text=True ) self._logger.info(f"uv sync output: {sync_result.stdout}") message = f"Update completed successfully. Will terminate after {CNC_REQ_UPDATE_AND_RESTART_SECONDS} seconds." else: message = "Restart requested via CNC" success = True self._logger.info(message) # Schedule exit threading.Timer(CNC_REQ_UPDATE_AND_RESTART_SECONDS, self._exit).start() except subprocess.CalledProcessError as e: success = False message = f"Error during update: {str(e)}" self._logger.error(message) except Exception as e: success = False message = f"Unexpected error during update: {str(e)}" self._logger.error(message) return CNCMessageResRestartDAQ(success=success, message=message), True def _exit(self): self.cnc.supervisor.stop()Handler for CNCMessageReqUpdateAndRestart messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqRestartDAQ) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqRestartDAQ ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles an update and restart request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The update and restart request message. :return: An update and restart response message. """ self._logger.info("Received update and restart request.") try: if msg.update: # Run git pull git_result = subprocess.run( ["git", "pull"], check=True, capture_output=True, text=True ) self._logger.info(f"Git pull output: {git_result.stdout}") # Run uv sync sync_result = subprocess.run( ["uv", "sync"], check=True, capture_output=True, text=True ) self._logger.info(f"uv sync output: {sync_result.stdout}") message = f"Update completed successfully. Will terminate after {CNC_REQ_UPDATE_AND_RESTART_SECONDS} seconds." else: message = "Restart requested via CNC" success = True self._logger.info(message) # Schedule exit threading.Timer(CNC_REQ_UPDATE_AND_RESTART_SECONDS, self._exit).start() except subprocess.CalledProcessError as e: success = False message = f"Error during update: {str(e)}" self._logger.error(message) except Exception as e: success = False message = f"Unexpected error during update: {str(e)}" self._logger.error(message) return CNCMessageResRestartDAQ(success=success, message=message), TrueHandles an update and restart request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The update and restart request message. :return: An update and restart response message.
class ReqRunCustomDAQJobHandler (cnc: SupervisorCNC)-
Expand source code
class ReqRunCustomDAQJobHandler(CNCMessageHandler): """ Handler for CNCMessageReqRunCustomDAQJob messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqRunCustomDAQJob ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a run custom DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The run custom DAQJob request message. :return: A run custom DAQJob response message. """ self._logger.info("Received run custom DAQJob request.") try: from enrgdaq.daq.daq_job import build_daq_job from enrgdaq.models import SupervisorInfo # Create a basic supervisor info for the custom job supervisor_info = SupervisorInfo(supervisor_id="remote_custom") # Write the config to a temporary file with tempfile.NamedTemporaryFile( mode="w", suffix=".toml", delete=False ) as temp_config: temp_config.write(msg.config) config_path = temp_config.name # Read the config from the temporary file as bytes with open(config_path, "rb") as f: config_data = f.read() try: # Build the DAQ job from the config daq_job_process = build_daq_job(config_data, supervisor_info) daq_job_process.restart_on_crash = msg.restart_on_crash self._logger.info( f"Starting DAQ job: {daq_job_process.daq_job_cls.__name__} (restart_on_crash={msg.restart_on_crash})" ) # Start the DAQ job self.cnc.supervisor.start_daq_job_processes([daq_job_process]) success = True message = f"DAQ job started with PID: {daq_job_process.process.pid if daq_job_process.process else 'Unknown'}" self._logger.info(message) except Exception as e: success = False message = f"Error starting DAQ job: {str(e)}" self._logger.error(message) finally: # Clean up the temporary file import os os.unlink(config_path) except Exception as e: success = False message = f"Error processing run custom DAQJob request: {str(e)}" self._logger.error(message) return CNCMessageResRunCustomDAQJob(success=success, message=message), TrueHandler for CNCMessageReqRunCustomDAQJob messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqRunCustomDAQJob) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqRunCustomDAQJob ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a run custom DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The run custom DAQJob request message. :return: A run custom DAQJob response message. """ self._logger.info("Received run custom DAQJob request.") try: from enrgdaq.daq.daq_job import build_daq_job from enrgdaq.models import SupervisorInfo # Create a basic supervisor info for the custom job supervisor_info = SupervisorInfo(supervisor_id="remote_custom") # Write the config to a temporary file with tempfile.NamedTemporaryFile( mode="w", suffix=".toml", delete=False ) as temp_config: temp_config.write(msg.config) config_path = temp_config.name # Read the config from the temporary file as bytes with open(config_path, "rb") as f: config_data = f.read() try: # Build the DAQ job from the config daq_job_process = build_daq_job(config_data, supervisor_info) daq_job_process.restart_on_crash = msg.restart_on_crash self._logger.info( f"Starting DAQ job: {daq_job_process.daq_job_cls.__name__} (restart_on_crash={msg.restart_on_crash})" ) # Start the DAQ job self.cnc.supervisor.start_daq_job_processes([daq_job_process]) success = True message = f"DAQ job started with PID: {daq_job_process.process.pid if daq_job_process.process else 'Unknown'}" self._logger.info(message) except Exception as e: success = False message = f"Error starting DAQ job: {str(e)}" self._logger.error(message) finally: # Clean up the temporary file import os os.unlink(config_path) except Exception as e: success = False message = f"Error processing run custom DAQJob request: {str(e)}" self._logger.error(message) return CNCMessageResRunCustomDAQJob(success=success, message=message), TrueHandles a run custom DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The run custom DAQJob request message. :return: A run custom DAQJob response message.
class ReqSendMessageHandler (cnc: SupervisorCNC)-
Expand source code
class ReqSendMessageHandler(CNCMessageHandler): """ Handler for CNCMessageReqSendMessage messages. Sends a custom message to DAQ job(s). """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqSendMessage ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a send custom message request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The send custom message request message. :return: A send custom message response message. """ self._logger.info( f"Received send message request: type={msg.message_type}, " f"target={msg.target_daq_job_unique_id or 'all'}" ) try: # Get all registered DAQJobMessage types message_types = self._get_message_types() # Find the requested message type if msg.message_type not in message_types: return CNCMessageResSendMessage( success=False, message=f"Unknown message type: {msg.message_type}. " f"Available types: {list(message_types.keys())}", jobs_notified=0, ), True message_cls = message_types[msg.message_type] # Decode the JSON payload try: message_instance = msgspec.json.decode( msg.payload.encode(), type=message_cls ) except Exception as e: return CNCMessageResSendMessage( success=False, message=f"Failed to decode payload as {msg.message_type}: {str(e)}", jobs_notified=0, ), True supervisor = self.cnc.supervisor if not supervisor: return CNCMessageResSendMessage( success=False, message="Supervisor not available", jobs_notified=0, ), True jobs_notified = 0 # Send to specific job or all jobs for daq_job_process in supervisor.daq_job_processes: # If a target is specified, only send to that job if msg.target_daq_job_unique_id: if ( daq_job_process.daq_job_info and daq_job_process.daq_job_info.unique_id != msg.target_daq_job_unique_id ): continue # Check if the job accepts this message type daq_job_cls = daq_job_process.daq_job_cls accepts_message = any( isinstance(message_instance, msg_type) for msg_type in daq_job_cls.allowed_message_in_types ) if not accepts_message: continue # Send the message try: daq_job_process.message_in.put_nowait(message_instance) jobs_notified += 1 self._logger.debug( f"Sent {msg.message_type} to {daq_job_cls.__name__}" ) except Exception as e: self._logger.warning( f"Failed to send message to {daq_job_cls.__name__}: {e}" ) if jobs_notified == 0: return CNCMessageResSendMessage( success=False, message=f"No jobs accepted message type {msg.message_type}" + ( f" (target: {msg.target_daq_job_unique_id})" if msg.target_daq_job_unique_id else "" ), jobs_notified=0, ), True return CNCMessageResSendMessage( success=True, message=f"Message sent to {jobs_notified} job(s)", jobs_notified=jobs_notified, ), True except Exception as e: self._logger.error(f"Error sending custom message: {e}", exc_info=True) return CNCMessageResSendMessage( success=False, message=f"Error sending message: {str(e)}", jobs_notified=0, ), True def _get_message_types(self) -> dict[str, type]: """ Gets all registered DAQJobMessage types. Returns a dict mapping type name to type class. """ from enrgdaq.daq.jobs.caen.hv import DAQJobMessageCAENHVSetChParam from enrgdaq.daq.jobs.handle_stats import DAQJobMessageStats from enrgdaq.daq.jobs.remote import DAQJobMessageStatsRemote from enrgdaq.daq.models import ( DAQJobMessage, DAQJobMessageHeartbeat, DAQJobMessageStop, ) from enrgdaq.daq.store.models import ( DAQJobMessageStore, DAQJobMessageStoreRaw, DAQJobMessageStoreTabular, ) # Build a dictionary of all known message types types: dict[str, type] = { "DAQJobMessage": DAQJobMessage, "DAQJobMessageHeartbeat": DAQJobMessageHeartbeat, "DAQJobMessageStop": DAQJobMessageStop, "DAQJobMessageStore": DAQJobMessageStore, "DAQJobMessageStoreRaw": DAQJobMessageStoreRaw, "DAQJobMessageStoreTabular": DAQJobMessageStoreTabular, "DAQJobMessageStats": DAQJobMessageStats, "DAQJobMessageStatsRemote": DAQJobMessageStatsRemote, "DAQJobMessageCAENHVSetChParam": DAQJobMessageCAENHVSetChParam, } return typesHandler for CNCMessageReqSendMessage messages. Sends a custom message to DAQ job(s).
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqSendMessage) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqSendMessage ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a send custom message request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The send custom message request message. :return: A send custom message response message. """ self._logger.info( f"Received send message request: type={msg.message_type}, " f"target={msg.target_daq_job_unique_id or 'all'}" ) try: # Get all registered DAQJobMessage types message_types = self._get_message_types() # Find the requested message type if msg.message_type not in message_types: return CNCMessageResSendMessage( success=False, message=f"Unknown message type: {msg.message_type}. " f"Available types: {list(message_types.keys())}", jobs_notified=0, ), True message_cls = message_types[msg.message_type] # Decode the JSON payload try: message_instance = msgspec.json.decode( msg.payload.encode(), type=message_cls ) except Exception as e: return CNCMessageResSendMessage( success=False, message=f"Failed to decode payload as {msg.message_type}: {str(e)}", jobs_notified=0, ), True supervisor = self.cnc.supervisor if not supervisor: return CNCMessageResSendMessage( success=False, message="Supervisor not available", jobs_notified=0, ), True jobs_notified = 0 # Send to specific job or all jobs for daq_job_process in supervisor.daq_job_processes: # If a target is specified, only send to that job if msg.target_daq_job_unique_id: if ( daq_job_process.daq_job_info and daq_job_process.daq_job_info.unique_id != msg.target_daq_job_unique_id ): continue # Check if the job accepts this message type daq_job_cls = daq_job_process.daq_job_cls accepts_message = any( isinstance(message_instance, msg_type) for msg_type in daq_job_cls.allowed_message_in_types ) if not accepts_message: continue # Send the message try: daq_job_process.message_in.put_nowait(message_instance) jobs_notified += 1 self._logger.debug( f"Sent {msg.message_type} to {daq_job_cls.__name__}" ) except Exception as e: self._logger.warning( f"Failed to send message to {daq_job_cls.__name__}: {e}" ) if jobs_notified == 0: return CNCMessageResSendMessage( success=False, message=f"No jobs accepted message type {msg.message_type}" + ( f" (target: {msg.target_daq_job_unique_id})" if msg.target_daq_job_unique_id else "" ), jobs_notified=0, ), True return CNCMessageResSendMessage( success=True, message=f"Message sent to {jobs_notified} job(s)", jobs_notified=jobs_notified, ), True except Exception as e: self._logger.error(f"Error sending custom message: {e}", exc_info=True) return CNCMessageResSendMessage( success=False, message=f"Error sending message: {str(e)}", jobs_notified=0, ), TrueHandles a send custom message request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The send custom message request message. :return: A send custom message response message.
class ReqStatusHandler (cnc: SupervisorCNC)-
Expand source code
class ReqStatusHandler(CNCMessageHandler): """ Handler for CNCMessageReqStatus messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqStatus ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a status request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status request message. :return: A status response message. """ status = self.cnc.supervisor.get_status() return CNCMessageResStatus(status=status), TrueHandler for CNCMessageReqStatus messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqStatus) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqStatus ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a status request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status request message. :return: A status response message. """ status = self.cnc.supervisor.get_status() return CNCMessageResStatus(status=status), TrueHandles a status request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status request message. :return: A status response message.
class ReqStopDAQJobHandler (cnc: SupervisorCNC)-
Expand source code
class ReqStopDAQJobHandler(CNCMessageHandler): """ Handler for CNCMessageReqStopDAQJob messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message. """ if not msg.daq_job_name and not msg.daq_job_unique_id: raise Exception( "Either daq_job_name or daq_job_unique_id must be specified" ) self._logger.info( f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}" ) try: supervisor = self.cnc.supervisor if supervisor: # Find the DAQ job process with the specified name target_process = None for daq_job_process in supervisor.daq_job_processes: # If daq_job_unique_id is specified if ( msg.daq_job_unique_id and daq_job_process.daq_job_info and daq_job_process.daq_job_info.unique_id == msg.daq_job_unique_id ): target_process = daq_job_process break # If daq_job_name is specified elif ( msg.daq_job_name and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name ): target_process = daq_job_process break if target_process: # Send a stop message to the DAQ job process try: target_process.message_in.put( DAQJobMessageStop( reason="Stop and remove requested via CNC" ) ) # Remove the process from the supervisor's list supervisor.daq_job_processes.remove(target_process) if msg.remove: supervisor.daq_job_stats.pop( target_process.daq_job_cls.__name__ ) success = True message = f"DAQJob {msg.daq_job_unique_id} " + ( "removed and stopped" if msg.remove else "stopped" ) self._logger.info(message) except Exception as e: success = False message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) else: success = False message = ( f"DAQ job with unique id '{msg.daq_job_unique_id}' not found" ) self._logger.warning(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) return CNCMessageResStopDAQJob(success=success, message=message), TrueHandler for CNCMessageReqStopDAQJob messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message. """ if not msg.daq_job_name and not msg.daq_job_unique_id: raise Exception( "Either daq_job_name or daq_job_unique_id must be specified" ) self._logger.info( f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}" ) try: supervisor = self.cnc.supervisor if supervisor: # Find the DAQ job process with the specified name target_process = None for daq_job_process in supervisor.daq_job_processes: # If daq_job_unique_id is specified if ( msg.daq_job_unique_id and daq_job_process.daq_job_info and daq_job_process.daq_job_info.unique_id == msg.daq_job_unique_id ): target_process = daq_job_process break # If daq_job_name is specified elif ( msg.daq_job_name and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name ): target_process = daq_job_process break if target_process: # Send a stop message to the DAQ job process try: target_process.message_in.put( DAQJobMessageStop( reason="Stop and remove requested via CNC" ) ) # Remove the process from the supervisor's list supervisor.daq_job_processes.remove(target_process) if msg.remove: supervisor.daq_job_stats.pop( target_process.daq_job_cls.__name__ ) success = True message = f"DAQJob {msg.daq_job_unique_id} " + ( "removed and stopped" if msg.remove else "stopped" ) self._logger.info(message) except Exception as e: success = False message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) else: success = False message = ( f"DAQ job with unique id '{msg.daq_job_unique_id}' not found" ) self._logger.warning(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) return CNCMessageResStopDAQJob(success=success, message=message), TrueHandles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message.
class ReqStopDAQJobsHandler (cnc: SupervisorCNC)-
Expand source code
class ReqStopDAQJobsHandler(CNCMessageHandler): """ Handler for CNCMessageReqStopDAQJobs messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message. """ self._logger.info("Received restart DAQJobs request.") try: # Use the supervisor's built-in mechanism to restart DAQ jobs # by stopping all current processes which will trigger restart logic supervisor = self.cnc.supervisor if supervisor: # Send stop messages to all DAQ job processes to trigger restart for daq_job_process in supervisor.daq_job_processes: try: # Send a stop message to each DAQ job process via its message_in queue daq_job_process.message_in.put( DAQJobMessageStop(reason="Restart requested via CNC") ) except Exception as e: self._logger.warning( f"Error sending stop message to DAQ job: {e}" ) success = True message = "Stop signals sent to all DAQ jobs." self._logger.info(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error restarting DAQJobs: {str(e)}" self._logger.error(message) return CNCMessageResStopDAQJobs(success=success, message=message), TrueHandler for CNCMessageReqStopDAQJobs messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message. """ self._logger.info("Received restart DAQJobs request.") try: # Use the supervisor's built-in mechanism to restart DAQ jobs # by stopping all current processes which will trigger restart logic supervisor = self.cnc.supervisor if supervisor: # Send stop messages to all DAQ job processes to trigger restart for daq_job_process in supervisor.daq_job_processes: try: # Send a stop message to each DAQ job process via its message_in queue daq_job_process.message_in.put( DAQJobMessageStop(reason="Restart requested via CNC") ) except Exception as e: self._logger.warning( f"Error sending stop message to DAQ job: {e}" ) success = True message = "Stop signals sent to all DAQ jobs." self._logger.info(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error restarting DAQJobs: {str(e)}" self._logger.error(message) return CNCMessageResStopDAQJobs(success=success, message=message), TrueHandles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message.
class ResPingHandler (cnc: SupervisorCNC)-
Expand source code
class ResPingHandler(CNCMessageHandler): """ Handler for CNCMessageResPing messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageResPing ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a pong response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The pong response message. :return: None """ sender_id_str = sender_identity.decode("utf-8") self._logger.info(f"Received pong from {sender_id_str}") return NoneHandler for CNCMessageResPing messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageResPing) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageResPing ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a pong response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The pong response message. :return: None """ sender_id_str = sender_identity.decode("utf-8") self._logger.info(f"Received pong from {sender_id_str}") return NoneHandles a pong response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The pong response message. :return: None
class ResStatusHandler (cnc: SupervisorCNC)-
Expand source code
class ResStatusHandler(CNCMessageHandler): """ Handler for CNCMessageResStatus messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageResStatus ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a status response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status response message. :return: None """ sender_id_str = sender_identity.decode("utf-8") self._logger.debug(f"Received status from {sender_id_str}: {msg.status}") return NoneHandler for CNCMessageResStatus messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageResStatus) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageResStatus ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a status response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status response message. :return: None """ sender_id_str = sender_identity.decode("utf-8") self._logger.debug(f"Received status from {sender_id_str}: {msg.status}") return NoneHandles a status response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status response message. :return: None