Module enrgdaq.cnc.handlers

Sub-modules

enrgdaq.cnc.handlers.base
enrgdaq.cnc.handlers.heartbeat
enrgdaq.cnc.handlers.req_list_clients
enrgdaq.cnc.handlers.req_log
enrgdaq.cnc.handlers.req_ping
enrgdaq.cnc.handlers.req_restart_daq
enrgdaq.cnc.handlers.req_restart_daqjobs
enrgdaq.cnc.handlers.req_run_custom_daqjob
enrgdaq.cnc.handlers.req_send_message
enrgdaq.cnc.handlers.req_status
enrgdaq.cnc.handlers.req_stop_daqjob
enrgdaq.cnc.handlers.res_ping
enrgdaq.cnc.handlers.res_status

Classes

class CNCMessageHandler (cnc: SupervisorCNC)
Expand source code
class CNCMessageHandler(ABC):
    """
    Abstract base class for C&C message handlers.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        self.cnc = cnc
        self._logger = cnc._logger

    @abstractmethod
    def handle(
        self, sender_identity: bytes, msg: CNCMessage
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles an incoming C&C message.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The C&C message to handle.
        :return: An optional tuple containing the response message and a boolean
                 indicating if it's a forward reply.
        """
        pass

Abstract base class for C&C message handlers.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

  • abc.ABC

Subclasses

Methods

def handle(self, sender_identity: bytes, msg: CNCMessage) ‑> Tuple[CNCMessage, bool] | None
Expand source code
@abstractmethod
def handle(
    self, sender_identity: bytes, msg: CNCMessage
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles an incoming C&C message.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The C&C message to handle.
    :return: An optional tuple containing the response message and a boolean
             indicating if it's a forward reply.
    """
    pass

Handles an incoming C&C message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The C&C message to handle. :return: An optional tuple containing the response message and a boolean indicating if it's a forward reply.

class HeartbeatHandler (cnc: SupervisorCNC)
Expand source code
class HeartbeatHandler(CNCMessageHandler):
    """
    Handler for CNCMessageHeartbeat messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageHeartbeat
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a heartbeat message.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The heartbeat message.
        :return: None
        """
        sender_id_str = sender_identity.decode("utf-8")
        self._logger.debug(f"Received heartbeat from {sender_id_str}")
        self.cnc.clients[sender_id_str] = CNCClientInfo(
            identity=sender_identity,
            last_seen=datetime.now().isoformat(),
            info=msg.supervisor_info,
        )
        return None

Handler for CNCMessageHeartbeat messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageHeartbeat) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageHeartbeat
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a heartbeat message.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The heartbeat message.
    :return: None
    """
    sender_id_str = sender_identity.decode("utf-8")
    self._logger.debug(f"Received heartbeat from {sender_id_str}")
    self.cnc.clients[sender_id_str] = CNCClientInfo(
        identity=sender_identity,
        last_seen=datetime.now().isoformat(),
        info=msg.supervisor_info,
    )
    return None

Handles a heartbeat message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The heartbeat message. :return: None

class ReqListClientsHandler (cnc: SupervisorCNC)
Expand source code
class ReqListClientsHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqListClients messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqListClients
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a list clients request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The list clients request message.
        :return: A list clients response message.
        """
        sender_id_str = sender_identity.decode("utf-8")
        self._logger.debug(f"Received list clients request from {sender_id_str}")
        client_list = {cid: cinfo.info for cid, cinfo in self.cnc.clients.items()}
        return CNCMessageResListClients(clients=client_list), False

Handler for CNCMessageReqListClients messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqListClients) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqListClients
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a list clients request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The list clients request message.
    :return: A list clients response message.
    """
    sender_id_str = sender_identity.decode("utf-8")
    self._logger.debug(f"Received list clients request from {sender_id_str}")
    client_list = {cid: cinfo.info for cid, cinfo in self.cnc.clients.items()}
    return CNCMessageResListClients(clients=client_list), False

Handles a list clients request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The list clients request message. :return: A list clients response message.

class ReqLogHandler (cnc: SupervisorCNC)
Expand source code
class ReqLogHandler(CNCMessageHandler):
    """
    Handler for CNCMessageLog messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageLog
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a log message from a client.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The log message.
        :return: None
        """
        self.cnc.add_client_log(sender_identity.decode("utf-8"), msg)
        return None

Handler for CNCMessageLog messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageLog) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageLog
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a log message from a client.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The log message.
    :return: None
    """
    self.cnc.add_client_log(sender_identity.decode("utf-8"), msg)
    return None

Handles a log message from a client. :param sender_identity: The ZMQ identity of the message sender. :param msg: The log message. :return: None

class ReqPingHandler (cnc: SupervisorCNC)
Expand source code
class ReqPingHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqPing messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqPing
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a ping request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The ping request message.
        :return: A pong response message.
        """
        self._logger.debug("Received ping, sending pong.")
        return CNCMessageResPing(), True

Handler for CNCMessageReqPing messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqPing) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqPing
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a ping request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The ping request message.
    :return: A pong response message.
    """
    self._logger.debug("Received ping, sending pong.")
    return CNCMessageResPing(), True

Handles a ping request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The ping request message. :return: A pong response message.

class ReqRestartHandler (cnc: SupervisorCNC)
Expand source code
class ReqRestartHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqUpdateAndRestart messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqRestartDAQ
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles an update and restart request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The update and restart request message.
        :return: An update and restart response message.
        """
        self._logger.info("Received update and restart request.")

        try:
            if msg.update:
                # Run git pull
                git_result = subprocess.run(
                    ["git", "pull"], check=True, capture_output=True, text=True
                )
                self._logger.info(f"Git pull output: {git_result.stdout}")

                # Run uv sync
                sync_result = subprocess.run(
                    ["uv", "sync"], check=True, capture_output=True, text=True
                )
                self._logger.info(f"uv sync output: {sync_result.stdout}")

                message = f"Update completed successfully. Will terminate after {CNC_REQ_UPDATE_AND_RESTART_SECONDS} seconds."
            else:
                message = "Restart requested via CNC"
            success = True
            self._logger.info(message)

            # Schedule exit
            threading.Timer(CNC_REQ_UPDATE_AND_RESTART_SECONDS, self._exit).start()
        except subprocess.CalledProcessError as e:
            success = False
            message = f"Error during update: {str(e)}"
            self._logger.error(message)
        except Exception as e:
            success = False
            message = f"Unexpected error during update: {str(e)}"
            self._logger.error(message)

        return CNCMessageResRestartDAQ(success=success, message=message), True

    def _exit(self):
        self.cnc.supervisor.stop()

Handler for CNCMessageReqUpdateAndRestart messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqRestartDAQ) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqRestartDAQ
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles an update and restart request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The update and restart request message.
    :return: An update and restart response message.
    """
    self._logger.info("Received update and restart request.")

    try:
        if msg.update:
            # Run git pull
            git_result = subprocess.run(
                ["git", "pull"], check=True, capture_output=True, text=True
            )
            self._logger.info(f"Git pull output: {git_result.stdout}")

            # Run uv sync
            sync_result = subprocess.run(
                ["uv", "sync"], check=True, capture_output=True, text=True
            )
            self._logger.info(f"uv sync output: {sync_result.stdout}")

            message = f"Update completed successfully. Will terminate after {CNC_REQ_UPDATE_AND_RESTART_SECONDS} seconds."
        else:
            message = "Restart requested via CNC"
        success = True
        self._logger.info(message)

        # Schedule exit
        threading.Timer(CNC_REQ_UPDATE_AND_RESTART_SECONDS, self._exit).start()
    except subprocess.CalledProcessError as e:
        success = False
        message = f"Error during update: {str(e)}"
        self._logger.error(message)
    except Exception as e:
        success = False
        message = f"Unexpected error during update: {str(e)}"
        self._logger.error(message)

    return CNCMessageResRestartDAQ(success=success, message=message), True

Handles an update and restart request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The update and restart request message. :return: An update and restart response message.

class ReqRunCustomDAQJobHandler (cnc: SupervisorCNC)
Expand source code
class ReqRunCustomDAQJobHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqRunCustomDAQJob messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqRunCustomDAQJob
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a run custom DAQJob request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The run custom DAQJob request message.
        :return: A run custom DAQJob response message.
        """
        self._logger.info("Received run custom DAQJob request.")

        try:
            from enrgdaq.daq.daq_job import build_daq_job
            from enrgdaq.models import SupervisorInfo

            # Create a basic supervisor info for the custom job
            supervisor_info = SupervisorInfo(supervisor_id="remote_custom")

            # Write the config to a temporary file
            with tempfile.NamedTemporaryFile(
                mode="w", suffix=".toml", delete=False
            ) as temp_config:
                temp_config.write(msg.config)
                config_path = temp_config.name

            # Read the config from the temporary file as bytes
            with open(config_path, "rb") as f:
                config_data = f.read()

            try:
                # Build the DAQ job from the config
                daq_job_process = build_daq_job(config_data, supervisor_info)
                daq_job_process.restart_on_crash = msg.restart_on_crash

                self._logger.info(
                    f"Starting DAQ job: {daq_job_process.daq_job_cls.__name__} (restart_on_crash={msg.restart_on_crash})"
                )

                # Start the DAQ job
                self.cnc.supervisor.start_daq_job_processes([daq_job_process])

                success = True
                message = f"DAQ job started with PID: {daq_job_process.process.pid if daq_job_process.process else 'Unknown'}"
                self._logger.info(message)

            except Exception as e:
                success = False
                message = f"Error starting DAQ job: {str(e)}"
                self._logger.error(message)
            finally:
                # Clean up the temporary file
                import os

                os.unlink(config_path)

        except Exception as e:
            success = False
            message = f"Error processing run custom DAQJob request: {str(e)}"
            self._logger.error(message)

        return CNCMessageResRunCustomDAQJob(success=success, message=message), True

Handler for CNCMessageReqRunCustomDAQJob messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqRunCustomDAQJob) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqRunCustomDAQJob
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a run custom DAQJob request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The run custom DAQJob request message.
    :return: A run custom DAQJob response message.
    """
    self._logger.info("Received run custom DAQJob request.")

    try:
        from enrgdaq.daq.daq_job import build_daq_job
        from enrgdaq.models import SupervisorInfo

        # Create a basic supervisor info for the custom job
        supervisor_info = SupervisorInfo(supervisor_id="remote_custom")

        # Write the config to a temporary file
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".toml", delete=False
        ) as temp_config:
            temp_config.write(msg.config)
            config_path = temp_config.name

        # Read the config from the temporary file as bytes
        with open(config_path, "rb") as f:
            config_data = f.read()

        try:
            # Build the DAQ job from the config
            daq_job_process = build_daq_job(config_data, supervisor_info)
            daq_job_process.restart_on_crash = msg.restart_on_crash

            self._logger.info(
                f"Starting DAQ job: {daq_job_process.daq_job_cls.__name__} (restart_on_crash={msg.restart_on_crash})"
            )

            # Start the DAQ job
            self.cnc.supervisor.start_daq_job_processes([daq_job_process])

            success = True
            message = f"DAQ job started with PID: {daq_job_process.process.pid if daq_job_process.process else 'Unknown'}"
            self._logger.info(message)

        except Exception as e:
            success = False
            message = f"Error starting DAQ job: {str(e)}"
            self._logger.error(message)
        finally:
            # Clean up the temporary file
            import os

            os.unlink(config_path)

    except Exception as e:
        success = False
        message = f"Error processing run custom DAQJob request: {str(e)}"
        self._logger.error(message)

    return CNCMessageResRunCustomDAQJob(success=success, message=message), True

Handles a run custom DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The run custom DAQJob request message. :return: A run custom DAQJob response message.

class ReqSendMessageHandler (cnc: SupervisorCNC)
Expand source code
class ReqSendMessageHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqSendMessage messages.
    Sends a custom message to DAQ job(s).
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqSendMessage
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a send custom message request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The send custom message request message.
        :return: A send custom message response message.
        """
        self._logger.info(
            f"Received send message request: type={msg.message_type}, "
            f"target={msg.target_daq_job_unique_id or 'all'}"
        )

        try:
            # Get all registered DAQJobMessage types
            message_types = self._get_message_types()

            # Find the requested message type
            if msg.message_type not in message_types:
                return CNCMessageResSendMessage(
                    success=False,
                    message=f"Unknown message type: {msg.message_type}. "
                    f"Available types: {list(message_types.keys())}",
                    jobs_notified=0,
                ), True

            message_cls = message_types[msg.message_type]

            # Decode the JSON payload
            try:
                message_instance = msgspec.json.decode(
                    msg.payload.encode(), type=message_cls
                )
            except Exception as e:
                return CNCMessageResSendMessage(
                    success=False,
                    message=f"Failed to decode payload as {msg.message_type}: {str(e)}",
                    jobs_notified=0,
                ), True

            supervisor = self.cnc.supervisor
            if not supervisor:
                return CNCMessageResSendMessage(
                    success=False,
                    message="Supervisor not available",
                    jobs_notified=0,
                ), True

            jobs_notified = 0

            # Send to specific job or all jobs
            for daq_job_process in supervisor.daq_job_processes:
                # If a target is specified, only send to that job
                if msg.target_daq_job_unique_id:
                    if (
                        daq_job_process.daq_job_info
                        and daq_job_process.daq_job_info.unique_id
                        != msg.target_daq_job_unique_id
                    ):
                        continue

                # Check if the job accepts this message type
                daq_job_cls = daq_job_process.daq_job_cls
                accepts_message = any(
                    isinstance(message_instance, msg_type)
                    for msg_type in daq_job_cls.allowed_message_in_types
                )

                if not accepts_message:
                    continue

                # Send the message
                try:
                    daq_job_process.message_in.put_nowait(message_instance)
                    jobs_notified += 1
                    self._logger.debug(
                        f"Sent {msg.message_type} to {daq_job_cls.__name__}"
                    )
                except Exception as e:
                    self._logger.warning(
                        f"Failed to send message to {daq_job_cls.__name__}: {e}"
                    )

            if jobs_notified == 0:
                return CNCMessageResSendMessage(
                    success=False,
                    message=f"No jobs accepted message type {msg.message_type}"
                    + (
                        f" (target: {msg.target_daq_job_unique_id})"
                        if msg.target_daq_job_unique_id
                        else ""
                    ),
                    jobs_notified=0,
                ), True

            return CNCMessageResSendMessage(
                success=True,
                message=f"Message sent to {jobs_notified} job(s)",
                jobs_notified=jobs_notified,
            ), True

        except Exception as e:
            self._logger.error(f"Error sending custom message: {e}", exc_info=True)
            return CNCMessageResSendMessage(
                success=False,
                message=f"Error sending message: {str(e)}",
                jobs_notified=0,
            ), True

    def _get_message_types(self) -> dict[str, type]:
        """
        Gets all registered DAQJobMessage types.
        Returns a dict mapping type name to type class.
        """
        from enrgdaq.daq.jobs.caen.hv import DAQJobMessageCAENHVSetChParam
        from enrgdaq.daq.jobs.handle_stats import DAQJobMessageStats
        from enrgdaq.daq.jobs.remote import DAQJobMessageStatsRemote
        from enrgdaq.daq.models import (
            DAQJobMessage,
            DAQJobMessageHeartbeat,
            DAQJobMessageStop,
        )
        from enrgdaq.daq.store.models import (
            DAQJobMessageStore,
            DAQJobMessageStoreRaw,
            DAQJobMessageStoreTabular,
        )

        # Build a dictionary of all known message types
        types: dict[str, type] = {
            "DAQJobMessage": DAQJobMessage,
            "DAQJobMessageHeartbeat": DAQJobMessageHeartbeat,
            "DAQJobMessageStop": DAQJobMessageStop,
            "DAQJobMessageStore": DAQJobMessageStore,
            "DAQJobMessageStoreRaw": DAQJobMessageStoreRaw,
            "DAQJobMessageStoreTabular": DAQJobMessageStoreTabular,
            "DAQJobMessageStats": DAQJobMessageStats,
            "DAQJobMessageStatsRemote": DAQJobMessageStatsRemote,
            "DAQJobMessageCAENHVSetChParam": DAQJobMessageCAENHVSetChParam,
        }

        return types

Handler for CNCMessageReqSendMessage messages. Sends a custom message to DAQ job(s).

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqSendMessage) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqSendMessage
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a send custom message request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The send custom message request message.
    :return: A send custom message response message.
    """
    self._logger.info(
        f"Received send message request: type={msg.message_type}, "
        f"target={msg.target_daq_job_unique_id or 'all'}"
    )

    try:
        # Get all registered DAQJobMessage types
        message_types = self._get_message_types()

        # Find the requested message type
        if msg.message_type not in message_types:
            return CNCMessageResSendMessage(
                success=False,
                message=f"Unknown message type: {msg.message_type}. "
                f"Available types: {list(message_types.keys())}",
                jobs_notified=0,
            ), True

        message_cls = message_types[msg.message_type]

        # Decode the JSON payload
        try:
            message_instance = msgspec.json.decode(
                msg.payload.encode(), type=message_cls
            )
        except Exception as e:
            return CNCMessageResSendMessage(
                success=False,
                message=f"Failed to decode payload as {msg.message_type}: {str(e)}",
                jobs_notified=0,
            ), True

        supervisor = self.cnc.supervisor
        if not supervisor:
            return CNCMessageResSendMessage(
                success=False,
                message="Supervisor not available",
                jobs_notified=0,
            ), True

        jobs_notified = 0

        # Send to specific job or all jobs
        for daq_job_process in supervisor.daq_job_processes:
            # If a target is specified, only send to that job
            if msg.target_daq_job_unique_id:
                if (
                    daq_job_process.daq_job_info
                    and daq_job_process.daq_job_info.unique_id
                    != msg.target_daq_job_unique_id
                ):
                    continue

            # Check if the job accepts this message type
            daq_job_cls = daq_job_process.daq_job_cls
            accepts_message = any(
                isinstance(message_instance, msg_type)
                for msg_type in daq_job_cls.allowed_message_in_types
            )

            if not accepts_message:
                continue

            # Send the message
            try:
                daq_job_process.message_in.put_nowait(message_instance)
                jobs_notified += 1
                self._logger.debug(
                    f"Sent {msg.message_type} to {daq_job_cls.__name__}"
                )
            except Exception as e:
                self._logger.warning(
                    f"Failed to send message to {daq_job_cls.__name__}: {e}"
                )

        if jobs_notified == 0:
            return CNCMessageResSendMessage(
                success=False,
                message=f"No jobs accepted message type {msg.message_type}"
                + (
                    f" (target: {msg.target_daq_job_unique_id})"
                    if msg.target_daq_job_unique_id
                    else ""
                ),
                jobs_notified=0,
            ), True

        return CNCMessageResSendMessage(
            success=True,
            message=f"Message sent to {jobs_notified} job(s)",
            jobs_notified=jobs_notified,
        ), True

    except Exception as e:
        self._logger.error(f"Error sending custom message: {e}", exc_info=True)
        return CNCMessageResSendMessage(
            success=False,
            message=f"Error sending message: {str(e)}",
            jobs_notified=0,
        ), True

Handles a send custom message request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The send custom message request message. :return: A send custom message response message.

class ReqStatusHandler (cnc: SupervisorCNC)
Expand source code
class ReqStatusHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqStatus messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqStatus
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a status request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The status request message.
        :return: A status response message.
        """
        status = self.cnc.supervisor.get_status()
        return CNCMessageResStatus(status=status), True

Handler for CNCMessageReqStatus messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqStatus) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqStatus
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a status request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The status request message.
    :return: A status response message.
    """
    status = self.cnc.supervisor.get_status()
    return CNCMessageResStatus(status=status), True

Handles a status request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status request message. :return: A status response message.

class ReqStopDAQJobHandler (cnc: SupervisorCNC)
Expand source code
class ReqStopDAQJobHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqStopDAQJob messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a stop and remove DAQJob request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The stop and remove DAQJob request message.
        :return: A stop and remove DAQJob response message.
        """
        if not msg.daq_job_name and not msg.daq_job_unique_id:
            raise Exception(
                "Either daq_job_name or daq_job_unique_id must be specified"
            )
        self._logger.info(
            f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}"
        )

        try:
            supervisor = self.cnc.supervisor
            if supervisor:
                # Find the DAQ job process with the specified name
                target_process = None
                for daq_job_process in supervisor.daq_job_processes:
                    # If daq_job_unique_id is specified
                    if (
                        msg.daq_job_unique_id
                        and daq_job_process.daq_job_info
                        and daq_job_process.daq_job_info.unique_id
                        == msg.daq_job_unique_id
                    ):
                        target_process = daq_job_process
                        break
                    # If daq_job_name is specified
                    elif (
                        msg.daq_job_name
                        and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name
                    ):
                        target_process = daq_job_process
                        break

                if target_process:
                    # Send a stop message to the DAQ job process
                    try:
                        target_process.message_in.put(
                            DAQJobMessageStop(
                                reason="Stop and remove requested via CNC"
                            )
                        )

                        # Remove the process from the supervisor's list
                        supervisor.daq_job_processes.remove(target_process)
                        if msg.remove:
                            supervisor.daq_job_stats.pop(
                                target_process.daq_job_cls.__name__
                            )

                        success = True
                        message = f"DAQJob {msg.daq_job_unique_id} " + (
                            "removed and stopped" if msg.remove else "stopped"
                        )
                        self._logger.info(message)
                    except Exception as e:
                        success = False
                        message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}"
                        self._logger.error(message, exc_info=True)
                else:
                    success = False
                    message = (
                        f"DAQ job with unique id '{msg.daq_job_unique_id}' not found"
                    )
                    self._logger.warning(message)
            else:
                success = False
                message = "Supervisor not available"
                self._logger.error(message)

        except Exception as e:
            success = False
            message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}"
            self._logger.error(message, exc_info=True)

        return CNCMessageResStopDAQJob(success=success, message=message), True

Handler for CNCMessageReqStopDAQJob messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqStopDAQJob
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a stop and remove DAQJob request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The stop and remove DAQJob request message.
    :return: A stop and remove DAQJob response message.
    """
    if not msg.daq_job_name and not msg.daq_job_unique_id:
        raise Exception(
            "Either daq_job_name or daq_job_unique_id must be specified"
        )
    self._logger.info(
        f"Received stop and remove DAQJob request for job: {msg.daq_job_unique_id}"
    )

    try:
        supervisor = self.cnc.supervisor
        if supervisor:
            # Find the DAQ job process with the specified name
            target_process = None
            for daq_job_process in supervisor.daq_job_processes:
                # If daq_job_unique_id is specified
                if (
                    msg.daq_job_unique_id
                    and daq_job_process.daq_job_info
                    and daq_job_process.daq_job_info.unique_id
                    == msg.daq_job_unique_id
                ):
                    target_process = daq_job_process
                    break
                # If daq_job_name is specified
                elif (
                    msg.daq_job_name
                    and daq_job_process.daq_job_cls.__name__ == msg.daq_job_name
                ):
                    target_process = daq_job_process
                    break

            if target_process:
                # Send a stop message to the DAQ job process
                try:
                    target_process.message_in.put(
                        DAQJobMessageStop(
                            reason="Stop and remove requested via CNC"
                        )
                    )

                    # Remove the process from the supervisor's list
                    supervisor.daq_job_processes.remove(target_process)
                    if msg.remove:
                        supervisor.daq_job_stats.pop(
                            target_process.daq_job_cls.__name__
                        )

                    success = True
                    message = f"DAQJob {msg.daq_job_unique_id} " + (
                        "removed and stopped" if msg.remove else "stopped"
                    )
                    self._logger.info(message)
                except Exception as e:
                    success = False
                    message = f"Error during Stop DAQJob for '{msg.daq_job_unique_id}': {str(e)}"
                    self._logger.error(message, exc_info=True)
            else:
                success = False
                message = (
                    f"DAQ job with unique id '{msg.daq_job_unique_id}' not found"
                )
                self._logger.warning(message)
        else:
            success = False
            message = "Supervisor not available"
            self._logger.error(message)

    except Exception as e:
        success = False
        message = f"Error stopping and removing DAQJob '{msg.daq_job_unique_id}': {str(e)}"
        self._logger.error(message, exc_info=True)

    return CNCMessageResStopDAQJob(success=success, message=message), True

Handles a stop and remove DAQJob request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The stop and remove DAQJob request message. :return: A stop and remove DAQJob response message.

class ReqStopDAQJobsHandler (cnc: SupervisorCNC)
Expand source code
class ReqStopDAQJobsHandler(CNCMessageHandler):
    """
    Handler for CNCMessageReqStopDAQJobs messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a restart DAQJobs request.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The restart DAQJobs request message.
        :return: A restart DAQJobs response message.
        """
        self._logger.info("Received restart DAQJobs request.")

        try:
            # Use the supervisor's built-in mechanism to restart DAQ jobs
            # by stopping all current processes which will trigger restart logic
            supervisor = self.cnc.supervisor
            if supervisor:
                # Send stop messages to all DAQ job processes to trigger restart
                for daq_job_process in supervisor.daq_job_processes:
                    try:
                        # Send a stop message to each DAQ job process via its message_in queue
                        daq_job_process.message_in.put(
                            DAQJobMessageStop(reason="Restart requested via CNC")
                        )
                    except Exception as e:
                        self._logger.warning(
                            f"Error sending stop message to DAQ job: {e}"
                        )

                success = True
                message = "Stop signals sent to all DAQ jobs."
                self._logger.info(message)
            else:
                success = False
                message = "Supervisor not available"
                self._logger.error(message)

        except Exception as e:
            success = False
            message = f"Error restarting DAQJobs: {str(e)}"
            self._logger.error(message)

        return CNCMessageResStopDAQJobs(success=success, message=message), True

Handler for CNCMessageReqStopDAQJobs messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageReqStopDAQJobs
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a restart DAQJobs request.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The restart DAQJobs request message.
    :return: A restart DAQJobs response message.
    """
    self._logger.info("Received restart DAQJobs request.")

    try:
        # Use the supervisor's built-in mechanism to restart DAQ jobs
        # by stopping all current processes which will trigger restart logic
        supervisor = self.cnc.supervisor
        if supervisor:
            # Send stop messages to all DAQ job processes to trigger restart
            for daq_job_process in supervisor.daq_job_processes:
                try:
                    # Send a stop message to each DAQ job process via its message_in queue
                    daq_job_process.message_in.put(
                        DAQJobMessageStop(reason="Restart requested via CNC")
                    )
                except Exception as e:
                    self._logger.warning(
                        f"Error sending stop message to DAQ job: {e}"
                    )

            success = True
            message = "Stop signals sent to all DAQ jobs."
            self._logger.info(message)
        else:
            success = False
            message = "Supervisor not available"
            self._logger.error(message)

    except Exception as e:
        success = False
        message = f"Error restarting DAQJobs: {str(e)}"
        self._logger.error(message)

    return CNCMessageResStopDAQJobs(success=success, message=message), True

Handles a restart DAQJobs request. :param sender_identity: The ZMQ identity of the message sender. :param msg: The restart DAQJobs request message. :return: A restart DAQJobs response message.

class ResPingHandler (cnc: SupervisorCNC)
Expand source code
class ResPingHandler(CNCMessageHandler):
    """
    Handler for CNCMessageResPing messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageResPing
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a pong response.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The pong response message.
        :return: None
        """
        sender_id_str = sender_identity.decode("utf-8")
        self._logger.info(f"Received pong from {sender_id_str}")
        return None

Handler for CNCMessageResPing messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageResPing) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageResPing
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a pong response.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The pong response message.
    :return: None
    """
    sender_id_str = sender_identity.decode("utf-8")
    self._logger.info(f"Received pong from {sender_id_str}")
    return None

Handles a pong response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The pong response message. :return: None

class ResStatusHandler (cnc: SupervisorCNC)
Expand source code
class ResStatusHandler(CNCMessageHandler):
    """
    Handler for CNCMessageResStatus messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageResStatus
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a status response.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The status response message.
        :return: None
        """
        sender_id_str = sender_identity.decode("utf-8")
        self._logger.debug(f"Received status from {sender_id_str}: {msg.status}")
        return None

Handler for CNCMessageResStatus messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageResStatus) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageResStatus
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a status response.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The status response message.
    :return: None
    """
    sender_id_str = sender_identity.decode("utf-8")
    self._logger.debug(f"Received status from {sender_id_str}: {msg.status}")
    return None

Handles a status response. :param sender_identity: The ZMQ identity of the message sender. :param msg: The status response message. :return: None