Module enrgdaq.utils.queue
Classes
class ZMQQueue (uri: str | None = None, maxsize: int = 0)-
Expand source code
class ZMQQueue: """A high-performance multiprocess queue using ZeroMQ. This class mimics the queue. Queue interface but works across process boundaries without requiring inheritance. It uses ZMQ PUSH/PULL sockets. On Unix-like systems, it uses IPC; on Windows, it uses TCP. The first call to 'get' will bind a PULL socket to the URI. The first call to 'put' will connect a PUSH socket to the URI. """ def __init__(self, uri: Optional[str] = None, maxsize: int = 0): self.uri = uri or _generate_zmq_uri() self.maxsize = maxsize self._context: Optional[zmq.Context] = None self._push_socket: Optional[zmq.Socket] = None self._pull_socket: Optional[zmq.Socket] = None self._pid = os.getpid() def __getstate__(self): # Only pickle the URI and maxsize. Sockets and contexts cannot be pickled. return { "uri": self.uri, "maxsize": self.maxsize, } def __setstate__(self, state): self.uri = state["uri"] self.maxsize = state["maxsize"] self._context = None self._push_socket = None self._pull_socket = None self._pid = os.getpid() def _get_context(self) -> zmq.Context: if self._context is None or self._pid != os.getpid(): self._context = zmq.Context.instance() self._pid = os.getpid() return self._context def _get_push_socket(self) -> zmq.Socket: if self._push_socket is None or self._pid != os.getpid(): ctx = self._get_context() socket = ctx.socket(zmq.PUSH) # Set a high water mark if maxsize is specified if self.maxsize > 0: socket.set(zmq.SNDHWM, self.maxsize) socket.connect(self.uri) self._push_socket = socket assert self._push_socket is not None return self._push_socket def _get_pull_socket(self) -> zmq.Socket: if self._pull_socket is None or self._pid != os.getpid(): ctx = self._get_context() socket = ctx.socket(zmq.PULL) if self.maxsize > 0: socket.set(zmq.RCVHWM, self.maxsize) try: socket.bind(self.uri) except zmq.ZMQError as e: if e.errno == zmq.EADDRINUSE: # Someone else already bound to this URI (probably another consumer) # For a simple Queue substitute, we should probably allow multiple consumers # but ZMQ PULL doesn't naturally support multiple binders. # However, if we're substituting Manager().Queue(), usually there's one consumer. socket.connect(self.uri) else: raise self._pull_socket = socket assert self._pull_socket is not None return self._pull_socket def put(self, obj: Any, block: bool = True, timeout: Optional[float] = None): socket = self._get_push_socket() flags = 0 if block else zmq.NOBLOCK try: buffers = [] # Use Protocol 5 to collect out-of-band buffers for zero-copy header = pickle.dumps(obj, protocol=5, buffer_callback=buffers.append) # Combine header and buffers into a multipart message payload = [header] + [zmq.Frame(b) for b in buffers] if timeout is not None and block: if socket.poll(int(timeout * 1000), zmq.POLLOUT): socket.send_multipart(payload, flags) else: raise Full else: socket.send_multipart(payload, flags) except zmq.Again: raise Full def get(self, block: bool = True, timeout: Optional[float] = None) -> Any: socket = self._get_pull_socket() try: if not block: payload = socket.recv_multipart(zmq.NOBLOCK) elif timeout is not None: if socket.poll(int(timeout * 1000), zmq.POLLIN): payload = socket.recv_multipart() else: raise Empty else: payload = socket.recv_multipart() header = payload[0] buffers = payload[1:] return pickle.loads(header, buffers=buffers) except (zmq.Again, IndexError): raise Empty def put_nowait(self, obj: Any): return self.put(obj, block=False) def get_nowait(self) -> Any: return self.get(block=False) def qsize(self) -> int: # ZMQ doesn't have a cross-platform way to get queue size from sockets easily # Returning 0 as a placeholder since supervisor uses it for stats on non-Mac return 0 def empty(self) -> bool: socket = self._get_pull_socket() return not socket.poll(0, zmq.POLLIN) def close(self): if self._push_socket: self._push_socket.close() self._push_socket = None if self._pull_socket: self._pull_socket.close() self._pull_socket = None # Note: we don't destroy the URI/IPC file here as other processes might be using it. # Ideally, there should be a cleanup mechanism. def __del__(self): try: self.close() except Exception: pass def __repr__(self): return f"ZMQQueue(uri='{self.uri}', maxsize={self.maxsize})"A high-performance multiprocess queue using ZeroMQ.
This class mimics the queue. Queue interface but works across process boundaries without requiring inheritance. It uses ZMQ PUSH/PULL sockets.
On Unix-like systems, it uses IPC; on Windows, it uses TCP.
The first call to 'get' will bind a PULL socket to the URI. The first call to 'put' will connect a PUSH socket to the URI.
Methods
def close(self)-
Expand source code
def close(self): if self._push_socket: self._push_socket.close() self._push_socket = None if self._pull_socket: self._pull_socket.close() self._pull_socket = None # Note: we don't destroy the URI/IPC file here as other processes might be using it. # Ideally, there should be a cleanup mechanism. def empty(self) ‑> bool-
Expand source code
def empty(self) -> bool: socket = self._get_pull_socket() return not socket.poll(0, zmq.POLLIN) def get(self, block: bool = True, timeout: float | None = None) ‑> Any-
Expand source code
def get(self, block: bool = True, timeout: Optional[float] = None) -> Any: socket = self._get_pull_socket() try: if not block: payload = socket.recv_multipart(zmq.NOBLOCK) elif timeout is not None: if socket.poll(int(timeout * 1000), zmq.POLLIN): payload = socket.recv_multipart() else: raise Empty else: payload = socket.recv_multipart() header = payload[0] buffers = payload[1:] return pickle.loads(header, buffers=buffers) except (zmq.Again, IndexError): raise Empty def get_nowait(self) ‑> Any-
Expand source code
def get_nowait(self) -> Any: return self.get(block=False) def put(self, obj: Any, block: bool = True, timeout: float | None = None)-
Expand source code
def put(self, obj: Any, block: bool = True, timeout: Optional[float] = None): socket = self._get_push_socket() flags = 0 if block else zmq.NOBLOCK try: buffers = [] # Use Protocol 5 to collect out-of-band buffers for zero-copy header = pickle.dumps(obj, protocol=5, buffer_callback=buffers.append) # Combine header and buffers into a multipart message payload = [header] + [zmq.Frame(b) for b in buffers] if timeout is not None and block: if socket.poll(int(timeout * 1000), zmq.POLLOUT): socket.send_multipart(payload, flags) else: raise Full else: socket.send_multipart(payload, flags) except zmq.Again: raise Full def put_nowait(self, obj: Any)-
Expand source code
def put_nowait(self, obj: Any): return self.put(obj, block=False) def qsize(self) ‑> int-
Expand source code
def qsize(self) -> int: # ZMQ doesn't have a cross-platform way to get queue size from sockets easily # Returning 0 as a placeholder since supervisor uses it for stats on non-Mac return 0