Module enrgdaq.daq.jobs.remote

Classes

class DAQJobMessageStatsRemote (stats: dict[str, SupervisorRemoteStats],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
remote_config: DAQRemoteConfig = <factory>,
route_keys: set[str] = <factory>)
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage):
    """Message class containing remote statistics."""

    stats: "DAQJobRemoteStatsDict"

Message class containing remote statistics.

Ancestors

Instance variables

var stats : dict[str, SupervisorRemoteStats]
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage):
    """Message class containing remote statistics."""

    stats: "DAQJobRemoteStatsDict"
class DAQJobRemote (config: DAQJobRemoteConfig,
**kwargs)
Expand source code
class DAQJobRemote(DAQJob):
    """
    DAQJobRemote is a DAQJob that connects two separate ENRGDAQ instances.
    It sends to and receives from a remote ENRGDAQ, such that:

    - message_in -> remote message_out
    - remote message_in -> message_out

    Attributes:
        allowed_message_in_types (list): List of allowed message types.
        config_type (type): Configuration type for the job.
        config (DAQJobRemoteConfig): Configuration instance.
        restart_offset (timedelta): Restart offset time.
        _message_class_cache (dict): Cache for message classes.
        _remote_message_ids (set): Set of remote message IDs.
        _receive_thread (threading.Thread): Thread for receiving messages.
    """

    allowed_message_in_types = [DAQJobMessage]  # accept all message types
    config_type = DAQJobRemoteConfig
    config: DAQJobRemoteConfig
    restart_offset = timedelta(seconds=5)

    _message_class_cache: dict[str, type[DAQJobMessage]]
    _remote_message_ids: set[str]
    _receive_thread: threading.Thread
    _remote_stats: DAQJobRemoteStatsDict
    _remote_stats_last_sent_at: float
    _is_alive_check_interval: datetime

    def __init__(self, config: DAQJobRemoteConfig, **kwargs):
        super().__init__(config, **kwargs)
        if self.config.zmq_proxy_pub_url is not None:
            self._zmq_pub_ctx = zmq.Context()
            self._zmq_pub = self._zmq_pub_ctx.socket(zmq.PUB)
            self._zmq_pub.connect(self.config.zmq_proxy_pub_url)
        else:
            self._zmq_pub_ctx = None
            self._zmq_pub = None
        self._zmq_sub = None

        self._receive_thread = threading.Thread(
            target=self._start_receive_thread,
            args=(self.config.zmq_proxy_sub_urls,),
            daemon=True,
        )
        self._send_remote_stats_thread = threading.Thread(
            target=self._start_send_remote_stats_thread,
            daemon=True,
        )
        self._message_class_cache = {}

        self._message_class_cache = {
            x.__name__: x for x in all_subclasses(DAQJobMessage)
        }
        self._remote_message_ids = set()
        self._remote_stats = defaultdict(lambda: SupervisorRemoteStats())
        self._remote_stats_last_sent_at = datetime.now().timestamp()
        self._last_is_alive_check_time = datetime.now()

    def handle_message(self, message: DAQJobMessage) -> bool:
        if (
            # Ignore if the message is not allowed by the DAQ Job
            not super().handle_message(message)
            # Ignore if we already received the message
            or message.id in self._remote_message_ids
            # Ignore if the message is remote, meaning it was sent by another Supervisor
            or message.is_remote
            # Ignore if we are not connected to the proxy
            or self._zmq_pub is None
        ):
            return True  # Silently ignore

        if message.remote_config.remote_disable and not isinstance(
            message, SupervisorDAQJobMessage
        ):
            return True

        self._send_remote_pub_message(message)
        return True

    @classmethod
    def can_handle_message(cls, message: DAQJobMessage) -> bool:
        if isinstance(message, SupervisorDAQJobMessage):
            return True
        if message.remote_config.remote_disable:
            return False
        return True

    def _send_remote_pub_message(self, message: DAQJobMessage):
        if self._zmq_pub is None:
            return

        remote_topic = message.remote_config.remote_topic or DEFAULT_REMOTE_TOPIC
        remote_topic_bytes = remote_topic.encode()

        # Use multipart for zero-copy if using pickle
        buffers = []
        header = pickle.dumps(message, protocol=5, buffer_callback=buffers.append)

        payload = [remote_topic_bytes, header] + [zmq.Frame(b) for b in buffers]
        self._zmq_pub.send_multipart(payload)

        # Update remote stats
        if self._supervisor_info:
            # Estimate size: header + sum of buffer lengths
            total_size = len(header)
            for b in buffers:
                total_size += b.raw().nbytes
            self._remote_stats[
                self._supervisor_info.supervisor_id
            ].update_message_out_stats(total_size + len(remote_topic_bytes))

        self._logger.debug(
            f"Sent message '{type(message).__name__}' to topic '{remote_topic}'"
        )
        return True

    def _create_zmq_sub(self, remote_urls: list[str]) -> zmq.Socket:
        """
        Create a ZMQ subscriber socket.

        Args:g
            remote_urls (list[str]): List of remote URLs to connect to.

        Returns:
            zmq.Socket: The created ZMQ subscriber socket.
        """
        self._zmq_sub_ctx = zmq.Context()
        zmq_sub = self._zmq_sub_ctx.socket(zmq.SUB)
        for remote_url in remote_urls:
            self._logger.debug(f"Connecting to {remote_url}")
            zmq_sub.connect(remote_url)
            topics_to_subscribe = [DEFAULT_REMOTE_TOPIC]
            topics_to_subscribe.extend(self.config.topics)
            # Subscribe to the supervisor id if we have it
            if self.info.supervisor_info is not None:
                topics_to_subscribe.append(self.info.supervisor_info.supervisor_id)
            for topic in topics_to_subscribe:
                zmq_sub.subscribe(topic)

            self._logger.info(f"Subscribed to topics: {", ".join(topics_to_subscribe)}")
        return zmq_sub

    def _start_receive_thread(self, remote_urls: list[str]):
        """
        Start the receive thread.

        Args:
            remote_urls (list[str]): List of remote URLs to connect to.
        """
        self._zmq_sub = self._create_zmq_sub(remote_urls)

        while True:
            try:
                parts = self._zmq_sub.recv_multipart()
                topic = parts[0]
                header = parts[1]
                buffers = parts[2:]
                recv_message = pickle.loads(header, buffers=buffers)

                message_len = len(header)
                for b in buffers:
                    # buffers here are memoryviews/bytes from zmq
                    message_len += len(b)
            except zmq.ContextTerminated:
                break
            except Exception as e:
                self._logger.error(
                    f"Error while unpacking message sent in {topic}: {e}", exc_info=True
                )
                continue
            if (
                recv_message.daq_job_info is not None
                and recv_message.daq_job_info.supervisor_info is not None
                and self.info.supervisor_info is not None
                and recv_message.daq_job_info.supervisor_info.supervisor_id
                == self.info.supervisor_info.supervisor_id
            ):
                self._logger.warning(
                    f"Received own message '{type(recv_message).__name__}' on topic '{topic.decode()}', ignoring message. This should NOT happen. Check the config."
                )
                continue
            self._logger.debug(
                f"Received {message_len} bytes for message '{type(recv_message).__name__}' on topic '{topic.decode()}'"
            )
            recv_message.is_remote = True

            # Update remote stats BEFORE forwarding
            if self._supervisor_info:
                self._remote_stats[
                    self._supervisor_info.supervisor_id
                ].update_message_in_stats(message_len)
            if recv_message.daq_job_info and recv_message.daq_job_info.supervisor_info:
                self._remote_stats[
                    recv_message.daq_job_info.supervisor_info.supervisor_id
                ].update_message_out_stats(message_len)

            # remote message_in -> message_out
            self._put_message_out(recv_message, modify_message_metadata=False)

    def _start_send_remote_stats_thread(self):
        while True:
            self._send_remote_stats_message()
            sleep_for(DAQ_JOB_REMOTE_STATS_SEND_INTERVAL_SECONDS)

    def start(self):
        """
        Start the receive thread and the DAQ job.
        """
        self._receive_thread.start()
        self._send_remote_stats_thread.start()

        while True:
            # Consume all messages in the queue
            while True:
                try:
                    self.consume(
                        nowait=False, timeout=DAQ_JOB_REMOTE_QUEUE_ACTION_TIMEOUT
                    )
                except Empty:
                    break

            # Check thread healths
            if datetime.now() - self._last_is_alive_check_time > timedelta(
                seconds=DAQ_JOB_REMOTE_IS_ALIVE_CHECK_INTERVAL_SECONDS
            ):
                if not self._receive_thread.is_alive():
                    raise RuntimeError("Receive thread is dead")
                if not self._send_remote_stats_thread.is_alive():
                    raise RuntimeError("Send remote stats thread is dead")
                self._last_is_alive_check_time = datetime.now()

    def _pack_message(self, message: DAQJobMessage, use_pickle: bool = True) -> bytes:
        """
        Pack a message for sending.

        Args:
            message (DAQJobMessage): The message to pack.
            use_pickle (bool): Whether to use pickle for packing, if not, use msgspec.

        Returns:
            bytes: The packed message.
        """
        message_type = type(message).__name__
        if use_pickle:
            return pickle.dumps(message, protocol=pickle.HIGHEST_PROTOCOL)

        return msgspec.msgpack.encode([message_type, message])

    def _unpack_message(self, message: bytes) -> DAQJobMessage:
        """
        Unpack a received message.

        It tries to unpack the message using pickle, and if that fails, it uses msgspec.

        Args:
            message (bytes): The received message.

        Returns:
            DAQJobMessage: The unpacked message.
        """
        # TODO: fix unpack without pickle
        try:
            res = pickle.loads(message)
            if not isinstance(res, DAQJobMessage):
                raise Exception("Message is not DAQJobMessage")
            message_type = type(res).__name__
        except pickle.UnpicklingError:
            message_type, data = msgspec.msgpack.decode(message)
            if message_type not in self._message_class_cache:
                raise Exception(f"Invalid message type: {message_type}")
            message_class = self._message_class_cache[message_type]

            res = msgspec.convert(data, type=message_class)

        if res.id is None:
            raise Exception("Message id is not set")
        self._remote_message_ids.add(res.id)
        if len(self._remote_message_ids) > DAQ_JOB_REMOTE_MAX_REMOTE_MESSAGE_ID_COUNT:
            self._remote_message_ids.pop()
        return res

    def _send_remote_stats_message(self):
        msg = DAQJobMessageStatsRemote(dict(self._remote_stats))
        self._put_message_out(msg)

    def __del__(self):
        """
        Destructor for DAQJobRemote.
        """
        if getattr(self, "_zmq_sub_ctx", None) is not None:
            self._zmq_sub_ctx.destroy()
        if self._zmq_pub_ctx is not None:
            self._zmq_pub_ctx.destroy()

        return super().__del__()

DAQJobRemote is a DAQJob that connects two separate ENRGDAQ instances. It sends to and receives from a remote ENRGDAQ, such that:

  • message_in -> remote message_out
  • remote message_in -> message_out

Attributes

allowed_message_in_types : list
List of allowed message types.
config_type : type
Configuration type for the job.
config : DAQJobRemoteConfig
Configuration instance.
restart_offset : timedelta
Restart offset time.
_message_class_cache : dict
Cache for message classes.
_remote_message_ids : set
Set of remote message IDs.
_receive_thread : threading.Thread
Thread for receiving messages.

Ancestors

Class variables

var allowed_message_in_types : list[type[DAQJobMessage]]
var configDAQJobRemoteConfig
var config_type : type[DAQJobConfig]

Configuration for DAQJobRemote.

Attributes

zmq_local_url : str
Local ZMQ URL.
zmq_remote_urls : list[str]
List of remote ZMQ URLs.
topics : list[str]
List of topics to subscribe to.
var restart_offset : datetime.timedelta

Static methods

def can_handle_message(message: DAQJobMessage) ‑> bool

Methods

def start(self)
Expand source code
def start(self):
    """
    Start the receive thread and the DAQ job.
    """
    self._receive_thread.start()
    self._send_remote_stats_thread.start()

    while True:
        # Consume all messages in the queue
        while True:
            try:
                self.consume(
                    nowait=False, timeout=DAQ_JOB_REMOTE_QUEUE_ACTION_TIMEOUT
                )
            except Empty:
                break

        # Check thread healths
        if datetime.now() - self._last_is_alive_check_time > timedelta(
            seconds=DAQ_JOB_REMOTE_IS_ALIVE_CHECK_INTERVAL_SECONDS
        ):
            if not self._receive_thread.is_alive():
                raise RuntimeError("Receive thread is dead")
            if not self._send_remote_stats_thread.is_alive():
                raise RuntimeError("Send remote stats thread is dead")
            self._last_is_alive_check_time = datetime.now()

Start the receive thread and the DAQ job.

Inherited members

class DAQJobRemoteConfig (zmq_proxy_sub_urls: list[str],
topics: list[str] = <factory>,
zmq_proxy_pub_url: str | None = None,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig = <factory>,
daq_job_unique_id: str | None = None)
Expand source code
class DAQJobRemoteConfig(DAQJobConfig):
    """
    Configuration for DAQJobRemote.

    Attributes:
        zmq_local_url (str): Local ZMQ URL.
        zmq_remote_urls (list[str]): List of remote ZMQ URLs.
        topics (list[str]): List of topics to subscribe to.
    """

    zmq_proxy_sub_urls: list[str]
    topics: list[str] = []
    zmq_proxy_pub_url: Optional[str] = None

Configuration for DAQJobRemote.

Attributes

zmq_local_url : str
Local ZMQ URL.
zmq_remote_urls : list[str]
List of remote ZMQ URLs.
topics : list[str]
List of topics to subscribe to.

Ancestors

Instance variables

var topics : list[str]
Expand source code
class DAQJobRemoteConfig(DAQJobConfig):
    """
    Configuration for DAQJobRemote.

    Attributes:
        zmq_local_url (str): Local ZMQ URL.
        zmq_remote_urls (list[str]): List of remote ZMQ URLs.
        topics (list[str]): List of topics to subscribe to.
    """

    zmq_proxy_sub_urls: list[str]
    topics: list[str] = []
    zmq_proxy_pub_url: Optional[str] = None
var zmq_proxy_pub_url : str | None
Expand source code
class DAQJobRemoteConfig(DAQJobConfig):
    """
    Configuration for DAQJobRemote.

    Attributes:
        zmq_local_url (str): Local ZMQ URL.
        zmq_remote_urls (list[str]): List of remote ZMQ URLs.
        topics (list[str]): List of topics to subscribe to.
    """

    zmq_proxy_sub_urls: list[str]
    topics: list[str] = []
    zmq_proxy_pub_url: Optional[str] = None
var zmq_proxy_sub_urls : list[str]
Expand source code
class DAQJobRemoteConfig(DAQJobConfig):
    """
    Configuration for DAQJobRemote.

    Attributes:
        zmq_local_url (str): Local ZMQ URL.
        zmq_remote_urls (list[str]): List of remote ZMQ URLs.
        topics (list[str]): List of topics to subscribe to.
    """

    zmq_proxy_sub_urls: list[str]
    topics: list[str] = []
    zmq_proxy_pub_url: Optional[str] = None
class SupervisorRemoteStats (message_in_count: int = 0,
message_in_bytes: int = 0,
message_out_count: int = 0,
message_out_bytes: int = 0,
last_active: datetime.datetime = <factory>)
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()

Statistics for a remote supervisor.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var last_active : datetime.datetime
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_in_bytes : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_in_count : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_out_bytes : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_out_count : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()

Methods

def update_message_in_stats(self, message_in_bytes: int)
Expand source code
def update_message_in_stats(self, message_in_bytes: int):
    self.message_in_count += 1
    self.message_in_bytes += message_in_bytes
    self.last_active = datetime.now()
def update_message_out_stats(self, message_out_bytes: int)
Expand source code
def update_message_out_stats(self, message_out_bytes: int):
    self.message_out_count += 1
    self.message_out_bytes += message_out_bytes
    self.last_active = datetime.now()