Module enrgdaq.cnc.handlers.req_stop_daqjob

Classes

class ReqStopDAQJobHandler (cnc: SupervisorCNC)
Expand source code
class ReqStopDAQJobHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqStopDAQJob messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a stop and remove DAQJob request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The stop and remove DAQJob request message.
        :return: A stop and remove DAQJob response message.
        """
        if not msg.daq_job_name and not msg.daq_job_unique_id:
            raise Exception(
                "Either daq_job_name or daq_job_unique_id must be specified"
            )
        self._logger.info(
            f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}"
        )

        try:
            supervisor = self.cnc.supervisor
            if supervisor:
                # Find the DAQ job process with the specified name
                target_process = None
                for daq_job_process in supervisor.daq_job_processes:
                    # If daq_job_unique_id is specified
                    if (
                        msg.daq_job_unique_id
                        and daq_job_process.daq_job_info
                        and daq_job_process.daq_job_info.unique_id
                        == msg.daq_job_unique_id
                    ):
                        target_process = daq_job_process
                        break
                    # If daq_job_name is specified
                    elif (
                        msg.daq_job_name
                        and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name
                    ):
                        target_process = daq_job_process
                        break

                if target_process:
                    # Send a stop message to the DAQ job process
                    try:
                        target_process.message_in.put(
                            DAQJobMessageStop(
                                reason="Stop and remove requested via CNC"
                            )
                        )

                        # Remove the process from the supervisor's list
                        supervisor.daq_job_processes.remove(target_process)
                        if msg.remove:
                            supervisor.daq_job_stats.pop(
                                target_process.daq_job_cls.__name__
                            )

                        success = True
                        message = f"DAQJob {msg.daq_job_unique_id} " + (
                            "removed and stopped" if msg.remove else "stopped"
                        )
                        self._logger.info(message)
                    except Exception as e:
                        success = False
                        message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}"
                        self._logger.error(message, exc_info=True)
                else:
                    success = False
                    message = (
                        f"DAQ job with unique id '{msg.daq_job_unique_id}' not found"
                    )
                    self._logger.warning(message)
            else:
                success = False
                message = "Supervisor not available"
                self._logger.error(message)

        except Exception as e:
            success = False
            message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}"
            self._logger.error(message, exc_info=True)

        return CNCMessageResStopDAQJob(success=success, message=message), True

Handler for CNCMessageReqStopDAQJob messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a stop and remove DAQJob request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The stop and remove DAQJob request message.
    :return: A stop and remove DAQJob response message.
    """
    if not msg.daq_job_name and not msg.daq_job_unique_id:
        raise Exception(
            "Either daq_job_name or daq_job_unique_id must be specified"
        )
    self._logger.info(
        f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}"
    )

    try:
        supervisor = self.cnc.supervisor
        if supervisor:
            # Find the DAQ job process with the specified name
            target_process = None
            for daq_job_process in supervisor.daq_job_processes:
                # If daq_job_unique_id is specified
                if (
                    msg.daq_job_unique_id
                    and daq_job_process.daq_job_info
                    and daq_job_process.daq_job_info.unique_id
                    == msg.daq_job_unique_id
                ):
                    target_process = daq_job_process
                    break
                # If daq_job_name is specified
                elif (
                    msg.daq_job_name
                    and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name
                ):
                    target_process = daq_job_process
                    break

            if target_process:
                # Send a stop message to the DAQ job process
                try:
                    target_process.message_in.put(
                        DAQJobMessageStop(
                            reason="Stop and remove requested via CNC"
                        )
                    )

                    # Remove the process from the supervisor's list
                    supervisor.daq_job_processes.remove(target_process)
                    if msg.remove:
                        supervisor.daq_job_stats.pop(
                            target_process.daq_job_cls.__name__
                        )

                    success = True
                    message = f"DAQJob {msg.daq_job_unique_id} " + (
                        "removed and stopped" if msg.remove else "stopped"
                    )
                    self._logger.info(message)
                except Exception as e:
                    success = False
                    message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}"
                    self._logger.error(message, exc_info=True)
            else:
                success = False
                message = (
                    f"DAQ job with unique id '{msg.daq_job_unique_id}' not found"
                )
                self._logger.warning(message)
        else:
            success = False
            message = "Supervisor not available"
            self._logger.error(message)

    except Exception as e:
        success = False
        message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}"
        self._logger.error(message, exc_info=True)

    return CNCMessageResStopDAQJob(success=success, message=message), True

Handles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message.