Module enrgdaq.cnc.handlers.heartbeat

Classes

class HeartbeatHandler (cnc: SupervisorCNC)
Expand source code
class HeartbeatHandler(CNCMessageHandler):
    """
    Handler for CNCMessageHeartbeat messages.
    """

    def __init__(self, cnc: SupervisorCNC):
        """
        Initialize the handler.
        :param cnc: The SupervisorCNC instance.
        """
        super().__init__(cnc)

    def handle(
        self, sender_identity: bytes, msg: CNCMessageHeartbeat
    ) -> Optional[Tuple[CNCMessage, bool]]:
        """
        Handles a heartbeat message.
        :param sender_identity: The ZMQ identity of the message sender.
        :param msg: The heartbeat message.
        :return: None
        """
        sender_id_str = sender_identity.decode("utf-8")
        self._logger.debug(f"Received heartbeat from {sender_id_str}")
        self.cnc.clients[sender_id_str] = CNCClientInfo(
            identity=sender_identity,
            last_seen=datetime.now().isoformat(),
            info=msg.supervisor_info,
        )
        return None

Handler for CNCMessageHeartbeat messages.

Initialize the handler. :param cnc: The SupervisorCNC instance.

Ancestors

Methods

def handle(self, sender_identity: bytes, msg: CNCMessageHeartbeat) ‑> Tuple[CNCMessage, bool] | None
Expand source code
def handle(
    self, sender_identity: bytes, msg: CNCMessageHeartbeat
) -> Optional[Tuple[CNCMessage, bool]]:
    """
    Handles a heartbeat message.
    :param sender_identity: The ZMQ identity of the message sender.
    :param msg: The heartbeat message.
    :return: None
    """
    sender_id_str = sender_identity.decode("utf-8")
    self._logger.debug(f"Received heartbeat from {sender_id_str}")
    self.cnc.clients[sender_id_str] = CNCClientInfo(
        identity=sender_identity,
        last_seen=datetime.now().isoformat(),
        info=msg.supervisor_info,
    )
    return None

Handles a heartbeat message. :param sender_identity: The ZMQ identity of the message sender. :param msg: The heartbeat message. :return: None