Module enrgdaq.daq.jobs.remote
Classes
class DAQJobMessageStatsRemote (stats: dict[str, SupervisorRemoteStats],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
remote_config: DAQRemoteConfig = <factory>,
route_keys: set[str] = <factory>)-
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage): """Message class containing remote statistics.""" stats: "DAQJobRemoteStatsDict"Message class containing remote statistics.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var stats : dict[str, SupervisorRemoteStats]-
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage): """Message class containing remote statistics.""" stats: "DAQJobRemoteStatsDict"
class DAQJobRemote (config: DAQJobRemoteConfig,
**kwargs)-
Expand source code
class DAQJobRemote(DAQJob): """ DAQJobRemote is a DAQJob that connects two separate ENRGDAQ instances. It sends to and receives from a remote ENRGDAQ, such that: - message_in -> remote message_out - remote message_in -> message_out Attributes: allowed_message_in_types (list): List of allowed message types. config_type (type): Configuration type for the job. config (DAQJobRemoteConfig): Configuration instance. restart_offset (timedelta): Restart offset time. _message_class_cache (dict): Cache for message classes. _remote_message_ids (set): Set of remote message IDs. _receive_thread (threading.Thread): Thread for receiving messages. """ allowed_message_in_types = [DAQJobMessage] # accept all message types config_type = DAQJobRemoteConfig config: DAQJobRemoteConfig restart_offset = timedelta(seconds=5) _message_class_cache: dict[str, type[DAQJobMessage]] _remote_message_ids: set[str] _receive_thread: threading.Thread _remote_stats: DAQJobRemoteStatsDict _remote_stats_last_sent_at: float _is_alive_check_interval: datetime def __init__(self, config: DAQJobRemoteConfig, **kwargs): super().__init__(config, **kwargs) if self.config.zmq_proxy_pub_url is not None: self._zmq_pub_ctx = zmq.Context() self._zmq_pub = self._zmq_pub_ctx.socket(zmq.PUB) self._zmq_pub.connect(self.config.zmq_proxy_pub_url) else: self._zmq_pub_ctx = None self._zmq_pub = None self._zmq_sub = None self._receive_thread = threading.Thread( target=self._start_receive_thread, args=(self.config.zmq_proxy_sub_urls,), daemon=True, ) self._send_remote_stats_thread = threading.Thread( target=self._start_send_remote_stats_thread, daemon=True, ) self._message_class_cache = {} self._message_class_cache = { x.__name__: x for x in all_subclasses(DAQJobMessage) } self._remote_message_ids = set() self._remote_stats = defaultdict(lambda: SupervisorRemoteStats()) self._remote_stats_last_sent_at = datetime.now().timestamp() self._last_is_alive_check_time = datetime.now() def handle_message(self, message: DAQJobMessage) -> bool: if ( # Ignore if the message is not allowed by the DAQ Job not super().handle_message(message) # Ignore if we already received the message or message.id in self._remote_message_ids # Ignore if the message is remote, meaning it was sent by another Supervisor or message.is_remote # Ignore if we are not connected to the proxy or self._zmq_pub is None ): return True # Silently ignore if message.remote_config.remote_disable and not isinstance( message, SupervisorDAQJobMessage ): return True self._send_remote_pub_message(message) return True @classmethod def can_handle_message(cls, message: DAQJobMessage) -> bool: if isinstance(message, SupervisorDAQJobMessage): return True if message.remote_config.remote_disable: return False return True def _send_remote_pub_message(self, message: DAQJobMessage): if self._zmq_pub is None: return remote_topic = message.remote_config.remote_topic or DEFAULT_REMOTE_TOPIC remote_topic_bytes = remote_topic.encode() # Use multipart for zero-copy if using pickle buffers = [] header = pickle.dumps(message, protocol=5, buffer_callback=buffers.append) payload = [remote_topic_bytes, header] + [zmq.Frame(b) for b in buffers] self._zmq_pub.send_multipart(payload) # Update remote stats if self._supervisor_info: # Estimate size: header + sum of buffer lengths total_size = len(header) for b in buffers: total_size += b.raw().nbytes self._remote_stats[ self._supervisor_info.supervisor_id ].update_message_out_stats(total_size + len(remote_topic_bytes)) self._logger.debug( f"Sent message '{type(message).__name__}' to topic '{remote_topic}'" ) return True def _create_zmq_sub(self, remote_urls: list[str]) -> zmq.Socket: """ Create a ZMQ subscriber socket. Args:g remote_urls (list[str]): List of remote URLs to connect to. Returns: zmq.Socket: The created ZMQ subscriber socket. """ self._zmq_sub_ctx = zmq.Context() zmq_sub = self._zmq_sub_ctx.socket(zmq.SUB) for remote_url in remote_urls: self._logger.debug(f"Connecting to {remote_url}") zmq_sub.connect(remote_url) topics_to_subscribe = [DEFAULT_REMOTE_TOPIC] topics_to_subscribe.extend(self.config.topics) # Subscribe to the supervisor id if we have it if self.info.supervisor_info is not None: topics_to_subscribe.append(self.info.supervisor_info.supervisor_id) for topic in topics_to_subscribe: zmq_sub.subscribe(topic) self._logger.info(f"Subscribed to topics: {", ".join(topics_to_subscribe)}") return zmq_sub def _start_receive_thread(self, remote_urls: list[str]): """ Start the receive thread. Args: remote_urls (list[str]): List of remote URLs to connect to. """ self._zmq_sub = self._create_zmq_sub(remote_urls) while True: try: parts = self._zmq_sub.recv_multipart() topic = parts[0] header = parts[1] buffers = parts[2:] recv_message = pickle.loads(header, buffers=buffers) message_len = len(header) for b in buffers: # buffers here are memoryviews/bytes from zmq message_len += len(b) except zmq.ContextTerminated: break except Exception as e: self._logger.error( f"Error while unpacking message sent in {topic}: {e}", exc_info=True ) continue if ( recv_message.daq_job_info is not None and recv_message.daq_job_info.supervisor_info is not None and self.info.supervisor_info is not None and recv_message.daq_job_info.supervisor_info.supervisor_id == self.info.supervisor_info.supervisor_id ): self._logger.warning( f"Received own message '{type(recv_message).__name__}' on topic '{topic.decode()}', ignoring message. This should NOT happen. Check the config." ) continue self._logger.debug( f"Received {message_len} bytes for message '{type(recv_message).__name__}' on topic '{topic.decode()}'" ) recv_message.is_remote = True # Update remote stats BEFORE forwarding if self._supervisor_info: self._remote_stats[ self._supervisor_info.supervisor_id ].update_message_in_stats(message_len) if recv_message.daq_job_info and recv_message.daq_job_info.supervisor_info: self._remote_stats[ recv_message.daq_job_info.supervisor_info.supervisor_id ].update_message_out_stats(message_len) # remote message_in -> message_out self._put_message_out(recv_message, modify_message_metadata=False) def _start_send_remote_stats_thread(self): while True: self._send_remote_stats_message() sleep_for(DAQ_JOB_REMOTE_STATS_SEND_INTERVAL_SECONDS) def start(self): """ Start the receive thread and the DAQ job. """ self._receive_thread.start() self._send_remote_stats_thread.start() while True: # Consume all messages in the queue while True: try: self.consume( nowait=False, timeout=DAQ_JOB_REMOTE_QUEUE_ACTION_TIMEOUT ) except Empty: break # Check thread healths if datetime.now() - self._last_is_alive_check_time > timedelta( seconds=DAQ_JOB_REMOTE_IS_ALIVE_CHECK_INTERVAL_SECONDS ): if not self._receive_thread.is_alive(): raise RuntimeError("Receive thread is dead") if not self._send_remote_stats_thread.is_alive(): raise RuntimeError("Send remote stats thread is dead") self._last_is_alive_check_time = datetime.now() def _pack_message(self, message: DAQJobMessage, use_pickle: bool = True) -> bytes: """ Pack a message for sending. Args: message (DAQJobMessage): The message to pack. use_pickle (bool): Whether to use pickle for packing, if not, use msgspec. Returns: bytes: The packed message. """ message_type = type(message).__name__ if use_pickle: return pickle.dumps(message, protocol=pickle.HIGHEST_PROTOCOL) return msgspec.msgpack.encode([message_type, message]) def _unpack_message(self, message: bytes) -> DAQJobMessage: """ Unpack a received message. It tries to unpack the message using pickle, and if that fails, it uses msgspec. Args: message (bytes): The received message. Returns: DAQJobMessage: The unpacked message. """ # TODO: fix unpack without pickle try: res = pickle.loads(message) if not isinstance(res, DAQJobMessage): raise Exception("Message is not DAQJobMessage") message_type = type(res).__name__ except pickle.UnpicklingError: message_type, data = msgspec.msgpack.decode(message) if message_type not in self._message_class_cache: raise Exception(f"Invalid message type: {message_type}") message_class = self._message_class_cache[message_type] res = msgspec.convert(data, type=message_class) if res.id is None: raise Exception("Message id is not set") self._remote_message_ids.add(res.id) if len(self._remote_message_ids) > DAQ_JOB_REMOTE_MAX_REMOTE_MESSAGE_ID_COUNT: self._remote_message_ids.pop() return res def _send_remote_stats_message(self): msg = DAQJobMessageStatsRemote(dict(self._remote_stats)) self._put_message_out(msg) def __del__(self): """ Destructor for DAQJobRemote. """ if getattr(self, "_zmq_sub_ctx", None) is not None: self._zmq_sub_ctx.destroy() if self._zmq_pub_ctx is not None: self._zmq_pub_ctx.destroy() return super().__del__()DAQJobRemote is a DAQJob that connects two separate ENRGDAQ instances. It sends to and receives from a remote ENRGDAQ, such that:
- message_in -> remote message_out
- remote message_in -> message_out
Attributes
allowed_message_in_types:list- List of allowed message types.
config_type:type- Configuration type for the job.
config:DAQJobRemoteConfig- Configuration instance.
restart_offset:timedelta- Restart offset time.
_message_class_cache:dict- Cache for message classes.
_remote_message_ids:set- Set of remote message IDs.
_receive_thread:threading.Thread- Thread for receiving messages.
Ancestors
Class variables
var allowed_message_in_types : list[type[DAQJobMessage]]var config : DAQJobRemoteConfigvar config_type : type[DAQJobConfig]-
Configuration for DAQJobRemote.
Attributes
zmq_local_url:str- Local ZMQ URL.
zmq_remote_urls:list[str]- List of remote ZMQ URLs.
topics:list[str]- List of topics to subscribe to.
var restart_offset : datetime.timedelta
Static methods
def can_handle_message(message: DAQJobMessage) ‑> bool
Methods
def start(self)-
Expand source code
def start(self): """ Start the receive thread and the DAQ job. """ self._receive_thread.start() self._send_remote_stats_thread.start() while True: # Consume all messages in the queue while True: try: self.consume( nowait=False, timeout=DAQ_JOB_REMOTE_QUEUE_ACTION_TIMEOUT ) except Empty: break # Check thread healths if datetime.now() - self._last_is_alive_check_time > timedelta( seconds=DAQ_JOB_REMOTE_IS_ALIVE_CHECK_INTERVAL_SECONDS ): if not self._receive_thread.is_alive(): raise RuntimeError("Receive thread is dead") if not self._send_remote_stats_thread.is_alive(): raise RuntimeError("Send remote stats thread is dead") self._last_is_alive_check_time = datetime.now()Start the receive thread and the DAQ job.
Inherited members
class DAQJobRemoteConfig (zmq_proxy_sub_urls: list[str],
topics: list[str] = <factory>,
zmq_proxy_pub_url: str | None = None,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig = <factory>,
daq_job_unique_id: str | None = None)-
Expand source code
class DAQJobRemoteConfig(DAQJobConfig): """ Configuration for DAQJobRemote. Attributes: zmq_local_url (str): Local ZMQ URL. zmq_remote_urls (list[str]): List of remote ZMQ URLs. topics (list[str]): List of topics to subscribe to. """ zmq_proxy_sub_urls: list[str] topics: list[str] = [] zmq_proxy_pub_url: Optional[str] = NoneConfiguration for DAQJobRemote.
Attributes
zmq_local_url:str- Local ZMQ URL.
zmq_remote_urls:list[str]- List of remote ZMQ URLs.
topics:list[str]- List of topics to subscribe to.
Ancestors
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var topics : list[str]-
Expand source code
class DAQJobRemoteConfig(DAQJobConfig): """ Configuration for DAQJobRemote. Attributes: zmq_local_url (str): Local ZMQ URL. zmq_remote_urls (list[str]): List of remote ZMQ URLs. topics (list[str]): List of topics to subscribe to. """ zmq_proxy_sub_urls: list[str] topics: list[str] = [] zmq_proxy_pub_url: Optional[str] = None var zmq_proxy_pub_url : str | None-
Expand source code
class DAQJobRemoteConfig(DAQJobConfig): """ Configuration for DAQJobRemote. Attributes: zmq_local_url (str): Local ZMQ URL. zmq_remote_urls (list[str]): List of remote ZMQ URLs. topics (list[str]): List of topics to subscribe to. """ zmq_proxy_sub_urls: list[str] topics: list[str] = [] zmq_proxy_pub_url: Optional[str] = None var zmq_proxy_sub_urls : list[str]-
Expand source code
class DAQJobRemoteConfig(DAQJobConfig): """ Configuration for DAQJobRemote. Attributes: zmq_local_url (str): Local ZMQ URL. zmq_remote_urls (list[str]): List of remote ZMQ URLs. topics (list[str]): List of topics to subscribe to. """ zmq_proxy_sub_urls: list[str] topics: list[str] = [] zmq_proxy_pub_url: Optional[str] = None
class SupervisorRemoteStats (message_in_count: int = 0,
message_in_bytes: int = 0,
message_out_count: int = 0,
message_out_bytes: int = 0,
last_active: datetime.datetime = <factory>)-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now()Statistics for a remote supervisor.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var last_active : datetime.datetime-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_in_bytes : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_in_count : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_out_bytes : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_out_count : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now()
Methods
def update_message_in_stats(self, message_in_bytes: int)-
Expand source code
def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int)-
Expand source code
def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now()