Module enrgdaq.cnc.handlers.req_restart_daqjobs
Classes
class ReqStopDAQJobsHandler (cnc: SupervisorCNC)-
Expand source code
class ReqStopDAQJobsHandler(CNCMessageHandler): """ Handler for CNCMessageReqStopDAQJobs messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message. """ self._logger.info("Received restart DAQJobs request.") try: # Use the supervisor's built-in mechanism to restart DAQ jobs # by stopping all current processes which will trigger restart logic supervisor = self.cnc.supervisor if supervisor: # Send stop messages to all DAQ job processes to trigger restart for daq_job_process in supervisor.daq_job_processes: try: # Send a stop message to each DAQ job process via its message_in queue daq_job_process.message_in.put( DAQJobMessageStop(reason="Restart requested via CNC") ) except Exception as e: self._logger.warning( f"Error sending stop message to DAQ job: {e}" ) success = True message = "Stop signals sent to all DAQ jobs." self._logger.info(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error restarting DAQJobs: {str(e)}" self._logger.error(message) return CNCMessageResStopDAQJobs(success=success, message=message), TrueHandler for CNCMessageReqStopDAQJobs messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message. """ self._logger.info("Received restart DAQJobs request.") try: # Use the supervisor's built-in mechanism to restart DAQ jobs # by stopping all current processes which will trigger restart logic supervisor = self.cnc.supervisor if supervisor: # Send stop messages to all DAQ job processes to trigger restart for daq_job_process in supervisor.daq_job_processes: try: # Send a stop message to each DAQ job process via its message_in queue daq_job_process.message_in.put( DAQJobMessageStop(reason="Restart requested via CNC") ) except Exception as e: self._logger.warning( f"Error sending stop message to DAQ job: {e}" ) success = True message = "Stop signals sent to all DAQ jobs." self._logger.info(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error restarting DAQJobs: {str(e)}" self._logger.error(message) return CNCMessageResStopDAQJobs(success=success, message=message), TrueHandles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message.