Module enrgdaq.supervisor_message_handler

Supervisor Message Handler - Handles incoming messages from DAQJobs via ZMQ subscription.

Classes

class SupervisorMessageHandler (xpub_url: str,
supervisor_id: str,
on_stats_receive: Callable[[DAQJobMessageCombinedStats], None] | None = None,
on_remote_stats_receive: Callable[[DAQJobMessageCombinedRemoteStats], None] | None = None)
Expand source code
class SupervisorMessageHandler:
    """
    Handles incoming messages from DAQJobs via ZMQ subscription.

    This class abstracts the ZMQ subscription logic, providing callbacks
    for different message types (stats reports, remote stats, etc.).
    """

    def __init__(
        self,
        xpub_url: str,
        supervisor_id: str,
        on_stats_receive: Callable[[DAQJobMessageCombinedStats], None] | None = None,
        on_remote_stats_receive: Callable[[DAQJobMessageCombinedRemoteStats], None]
        | None = None,
    ):
        """
        Initialize the message handler.

        Args:
            xpub_url: URL of the ZMQ XPUB socket to connect to.
            supervisor_id: ID of the supervisor (used for topic subscription).
        """
        self._xpub_url = xpub_url
        self._supervisor_id = supervisor_id
        self._on_stats_receive = on_stats_receive
        self._on_remote_stats_receive = on_remote_stats_receive

        self._logger = logging.getLogger(f"SupervisorMessageHandler({supervisor_id})")
        self._zmq_context: zmq.Context | None = None
        self._zmq_sub: zmq.Socket | None = None
        self._subscriber_thread: threading.Thread | None = None
        self._is_stopped = False

    def start(self):
        """Start the subscriber thread to receive messages."""
        self._is_stopped = False
        self._zmq_context = zmq.Context()
        self._zmq_sub = self._zmq_context.socket(zmq.SUB)
        assert self._zmq_sub is not None
        self._zmq_sub.connect(self._xpub_url)

        topics_to_subscribe = [Topic.stats_combined(self._supervisor_id)]
        for topic in topics_to_subscribe:
            self._zmq_sub.subscribe(topic)

        self._logger.info(f"Subscribed to topics: {', '.join(topics_to_subscribe)}")

        self._subscriber_thread = threading.Thread(
            target=self._subscriber_loop, daemon=True
        )
        self._subscriber_thread.start()
        self._logger.info("Message handler started")

    def stop(self):
        """Stop the subscriber thread and clean up ZMQ resources."""
        if self._is_stopped:
            return

        self._is_stopped = True

        # Close the socket first to unblock recv_multipart()
        if self._zmq_sub:
            self._zmq_sub.setsockopt(zmq.LINGER, 0)
            self._zmq_sub.close()
            self._zmq_sub = None

        if self._zmq_context:
            self._zmq_context.term()
            self._zmq_context = None

        self._logger.info("Message handler stopped")

    def _subscriber_loop(self):
        """Receive and dispatch messages from DAQJobs."""

        while not self._is_stopped:
            try:
                assert self._zmq_sub is not None
                parts = self._zmq_sub.recv_multipart()

                if len(parts) < 2:
                    continue

                # parts[0] = topic, parts[1] = header, parts[2:] = buffers
                header = parts[1]
                buffers = parts[2:] if len(parts) > 2 else []
                message = pickle.loads(header, buffers=buffers)

                # Dispatch based on message type
                if isinstance(message, DAQJobMessageCombinedStats):
                    if self._on_stats_receive:
                        self._on_stats_receive(message)
                elif isinstance(message, DAQJobMessageCombinedRemoteStats):
                    if self._on_remote_stats_receive:
                        self._on_remote_stats_receive(message)

            except zmq.ContextTerminated:
                break
            except Exception as e:
                if not self._is_stopped:
                    self._logger.error(f"Error in subscriber loop: {e}", exc_info=True)

Handles incoming messages from DAQJobs via ZMQ subscription.

This class abstracts the ZMQ subscription logic, providing callbacks for different message types (stats reports, remote stats, etc.).

Initialize the message handler.

Args

xpub_url
URL of the ZMQ XPUB socket to connect to.
supervisor_id
ID of the supervisor (used for topic subscription).

Methods

def start(self)
Expand source code
def start(self):
    """Start the subscriber thread to receive messages."""
    self._is_stopped = False
    self._zmq_context = zmq.Context()
    self._zmq_sub = self._zmq_context.socket(zmq.SUB)
    assert self._zmq_sub is not None
    self._zmq_sub.connect(self._xpub_url)

    topics_to_subscribe = [Topic.stats_combined(self._supervisor_id)]
    for topic in topics_to_subscribe:
        self._zmq_sub.subscribe(topic)

    self._logger.info(f"Subscribed to topics: {', '.join(topics_to_subscribe)}")

    self._subscriber_thread = threading.Thread(
        target=self._subscriber_loop, daemon=True
    )
    self._subscriber_thread.start()
    self._logger.info("Message handler started")

Start the subscriber thread to receive messages.

def stop(self)
Expand source code
def stop(self):
    """Stop the subscriber thread and clean up ZMQ resources."""
    if self._is_stopped:
        return

    self._is_stopped = True

    # Close the socket first to unblock recv_multipart()
    if self._zmq_sub:
        self._zmq_sub.setsockopt(zmq.LINGER, 0)
        self._zmq_sub.close()
        self._zmq_sub = None

    if self._zmq_context:
        self._zmq_context.term()
        self._zmq_context = None

    self._logger.info("Message handler stopped")

Stop the subscriber thread and clean up ZMQ resources.