Module enrgdaq.cnc.handlers.req_stop_daqjob
Classes
class ReqStopDAQJobHandler (cnc: SupervisorCNC)-
Expand source code
class ReqStopDAQJobHandler(CNCMessageHandler): """ Handler for CNCMessageReqStopDAQJob messages. """ def __init__(self, cnc: SupervisorCNC): """ Initialize the handler. :param cnc: The SupervisorCNC instance. """ super().__init__(cnc) def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message. """ if not msg.daq_job_name and not msg.daq_job_unique_id: raise Exception( "Either daq_job_name or daq_job_unique_id must be specified" ) self._logger.info( f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}" ) try: supervisor = self.cnc.supervisor if supervisor: # Find the DAQ job process with the specified name target_process = None for daq_job_process in supervisor.daq_job_processes: # If daq_job_unique_id is specified if ( msg.daq_job_unique_id and daq_job_process.daq_job_info and daq_job_process.daq_job_info.unique_id == msg.daq_job_unique_id ): target_process = daq_job_process break # If daq_job_name is specified elif ( msg.daq_job_name and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name ): target_process = daq_job_process break if target_process: # Send a stop message to the DAQ job process try: target_process.message_in.put( DAQJobMessageStop( reason="Stop and remove requested via CNC" ) ) # Remove the process from the supervisor's list supervisor.daq_job_processes.remove(target_process) if msg.remove: supervisor.daq_job_stats.pop( target_process.daq_job_cls.__name__ ) success = True message = f"DAQJob {msg.daq_job_unique_id} " + ( "removed and stopped" if msg.remove else "stopped" ) self._logger.info(message) except Exception as e: success = False message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) else: success = False message = ( f"DAQ job with unique id '{msg.daq_job_unique_id}' not found" ) self._logger.warning(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) return CNCMessageResStopDAQJob(success=success, message=message), TrueHandler for CNCMessageReqStopDAQJob messages.
Initialize the handler. :param cnc: The SupervisorCNC instance.
Ancestors
- CNCMessageHandler
- abc.ABC
Methods
def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob) ‑> Tuple[CNCMessage, bool] | None-
Expand source code
def handle( self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob ) -> Optional[Tuple[CNCMessage, bool]]: """ Handles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message. """ if not msg.daq_job_name and not msg.daq_job_unique_id: raise Exception( "Either daq_job_name or daq_job_unique_id must be specified" ) self._logger.info( f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}" ) try: supervisor = self.cnc.supervisor if supervisor: # Find the DAQ job process with the specified name target_process = None for daq_job_process in supervisor.daq_job_processes: # If daq_job_unique_id is specified if ( msg.daq_job_unique_id and daq_job_process.daq_job_info and daq_job_process.daq_job_info.unique_id == msg.daq_job_unique_id ): target_process = daq_job_process break # If daq_job_name is specified elif ( msg.daq_job_name and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name ): target_process = daq_job_process break if target_process: # Send a stop message to the DAQ job process try: target_process.message_in.put( DAQJobMessageStop( reason="Stop and remove requested via CNC" ) ) # Remove the process from the supervisor's list supervisor.daq_job_processes.remove(target_process) if msg.remove: supervisor.daq_job_stats.pop( target_process.daq_job_cls.__name__ ) success = True message = f"DAQJob {msg.daq_job_unique_id} " + ( "removed and stopped" if msg.remove else "stopped" ) self._logger.info(message) except Exception as e: success = False message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) else: success = False message = ( f"DAQ job with unique id '{msg.daq_job_unique_id}' not found" ) self._logger.warning(message) else: success = False message = "Supervisor not available" self._logger.error(message) except Exception as e: success = False message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}" self._logger.error(message, exc_info=True) return CNCMessageResStopDAQJob(success=success, message=message), TrueHandles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message.