Module enrgdaq.utils.shared_ring_buffer

Shared Memory Ring Buffer for zero-copy inter-process communication.

This module provides a pre-allocated shared memory circular buffer with slot-based allocation, designed for high-throughput data transfer between processes with minimal memory copies.

Functions

def attach_to_ring_buffer(name: str = 'enrgdaq_ring_buffer',
total_size: int = 268435456,
slot_size: int = 10485760) ‑> SharedMemoryRingBuffer
Expand source code
def attach_to_ring_buffer(
    name: str = "enrgdaq_ring_buffer",
    total_size: int = DEFAULT_RING_BUFFER_SIZE,
    slot_size: int = DEFAULT_SLOT_SIZE,
) -> SharedMemoryRingBuffer:
    """
    Attach to an existing ring buffer (for use in child processes).

    Args:
        name: Name of the shared memory region.
        total_size: Total buffer size.
        slot_size: Size per slot.

    Returns:
        A SharedMemoryRingBuffer attached to the existing shared memory.
    """
    return SharedMemoryRingBuffer(
        name=name,
        total_size=total_size,
        slot_size=slot_size,
        create=False,
    )

Attach to an existing ring buffer (for use in child processes).

Args

name
Name of the shared memory region.
total_size
Total buffer size.
slot_size
Size per slot.

Returns

A SharedMemoryRingBuffer attached to the existing shared memory.

def cleanup_global_ring_buffer()
Expand source code
def cleanup_global_ring_buffer():
    """Clean up the global ring buffer. Call at supervisor shutdown."""
    global _global_ring_buffer

    if _global_ring_buffer is not None:
        with _global_ring_buffer_lock:
            if _global_ring_buffer is not None:
                _global_ring_buffer.cleanup()
                _global_ring_buffer = None

Clean up the global ring buffer. Call at supervisor shutdown.

def get_global_ring_buffer(name: str = 'enrgdaq_ring_buffer',
total_size: int = 268435456,
slot_size: int = 10485760) ‑> SharedMemoryRingBuffer
Expand source code
def get_global_ring_buffer(
    name: str = "enrgdaq_ring_buffer",
    total_size: int = DEFAULT_RING_BUFFER_SIZE,
    slot_size: int = DEFAULT_SLOT_SIZE,
) -> SharedMemoryRingBuffer:
    """
    Get or create the global ring buffer singleton.

    This is lazily initialized on first access.

    Args:
        name: Name for the shared memory region.
        total_size: Total buffer size (only used on first call).
        slot_size: Size per slot (only used on first call).

    Returns:
        The global SharedMemoryRingBuffer instance.
    """
    global _global_ring_buffer

    if _global_ring_buffer is None:
        with _global_ring_buffer_lock:
            if _global_ring_buffer is None:
                _global_ring_buffer = SharedMemoryRingBuffer(
                    name=name,
                    total_size=total_size,
                    slot_size=slot_size,
                    create=True,
                )

    return _global_ring_buffer

Get or create the global ring buffer singleton.

This is lazily initialized on first access.

Args

name
Name for the shared memory region.
total_size
Total buffer size (only used on first call).
slot_size
Size per slot (only used on first call).

Returns

The global SharedMemoryRingBuffer instance.

def get_global_ring_buffer_stats() ‑> tuple[int, int]
Expand source code
def get_global_ring_buffer_stats() -> tuple[int, int]:
    """Get stats from the global ring buffer (bytes_written, bytes_read)."""
    global _global_ring_buffer
    if _global_ring_buffer is not None:
        return _global_ring_buffer.get_stats()
    return (0, 0)

Get stats from the global ring buffer (bytes_written, bytes_read).

Classes

class RingBufferSlot (buffer: SharedMemoryRingBuffer,
index: int,
offset: int,
max_size: int)
Expand source code
@dataclass
class RingBufferSlot:
    """Handle to a slot in the ring buffer."""

    buffer: "SharedMemoryRingBuffer"
    index: int
    offset: int
    max_size: int
    _memoryview: Optional[memoryview] = None

    @property
    def view(self) -> memoryview:
        """Get a memoryview into this slot for reading/writing."""
        if self._memoryview is None:
            self._memoryview = self.buffer.get_slot_view(self.index)
        return self._memoryview

    def release(self):
        """Release this slot back to the buffer pool."""
        if self._memoryview is not None:
            self._memoryview.release()
            self._memoryview = None
        self.buffer.release(self.index)

Handle to a slot in the ring buffer.

Instance variables

var bufferSharedMemoryRingBuffer
var index : int
var max_size : int
var offset : int
prop view : memoryview
Expand source code
@property
def view(self) -> memoryview:
    """Get a memoryview into this slot for reading/writing."""
    if self._memoryview is None:
        self._memoryview = self.buffer.get_slot_view(self.index)
    return self._memoryview

Get a memoryview into this slot for reading/writing.

Methods

def release(self)
Expand source code
def release(self):
    """Release this slot back to the buffer pool."""
    if self._memoryview is not None:
        self._memoryview.release()
        self._memoryview = None
    self.buffer.release(self.index)

Release this slot back to the buffer pool.

class SharedMemoryRingBuffer (name: str,
total_size: int = 268435456,
slot_size: int = 10485760,
create: bool = True)
Expand source code
class SharedMemoryRingBuffer:
    """
    A pre-allocated shared memory ring buffer with fixed-size slots.

    This buffer divides a large shared memory region into fixed-size slots
    that can be allocated and released for zero-copy data transfer between
    processes.

    The buffer is designed for single-producer, single-consumer scenarios
    but supports multiple consumers through reference counting.
    """

    def __init__(
        self,
        name: str,
        total_size: int = DEFAULT_RING_BUFFER_SIZE,
        slot_size: int = DEFAULT_SLOT_SIZE,
        create: bool = True,
    ):
        """
        Initialize or attach to a shared memory ring buffer.

        Args:
            name: Unique name for the shared memory region.
            total_size: Total size of the buffer in bytes.
            slot_size: Size of each slot in bytes.
            create: If True, create the buffer. If False, attach to existing.
        """
        self.name = name
        self.slot_size = slot_size
        self.slot_count = total_size // slot_size
        self.total_size = self.slot_count * slot_size  # Align to slot boundaries

        self._create = create
        self._shm: Optional[SharedMemory] = None
        self._slot_states: Any = None  # RawArray[ctypes.c_int]
        self._write_index: Any = None  # RawValue[ctypes.c_int]
        self._bytes_written: Any = None  # RawValue[ctypes.c_longlong]
        self._bytes_read: Any = None  # RawValue[ctypes.c_longlong]
        self._lock = threading.Lock()
        self._initialized = False

    def _ensure_initialized(self):
        """Lazy initialization of shared memory resources."""
        if self._initialized:
            return

        with self._lock:
            if self._initialized:
                return

            try:
                if self._create:
                    # Create new shared memory
                    try:
                        self._shm = SharedMemory(
                            name=self.name, create=True, size=self.total_size
                        )
                        logger.info(
                            f"Created shared memory ring buffer '{self.name}' "
                            f"with {self.slot_count} slots of {self.slot_size} bytes each"
                        )
                    except FileExistsError:
                        # Already exists, attach to it
                        self._shm = SharedMemory(name=self.name, create=False)
                        logger.info(
                            f"Attached to existing shared memory ring buffer '{self.name}'"
                        )
                else:
                    # Attach to existing shared memory
                    self._shm = SharedMemory(name=self.name, create=False)

                # Slot states array: one int per slot for reference counting
                # Using ctypes shared array for cross-process access
                self._slot_states = RawArray(ctypes.c_int, self.slot_count)

                # Write index for round-robin allocation
                self._write_index = RawValue(ctypes.c_int, 0)

                # Byte counters for throughput measurement
                self._bytes_written = RawValue(ctypes.c_longlong, 0)
                self._bytes_read = RawValue(ctypes.c_longlong, 0)

                self._initialized = True

            except Exception as e:
                logger.error(f"Failed to initialize ring buffer: {e}")
                raise

    def allocate(self, required_size: int) -> Optional[RingBufferSlot]:
        """
        Allocate a slot that can hold the required size.

        Args:
            required_size: Minimum size needed in bytes.

        Returns:
            RingBufferSlot if allocation successful, None if no slots available
            or size too large.
        """
        self._ensure_initialized()

        if required_size > self.slot_size:
            logger.warning(
                f"Requested size {required_size} exceeds slot size {self.slot_size}"
            )
            return None

        assert self._slot_states is not None
        assert self._write_index is not None

        # Try to find a free slot using round-robin starting from write_index
        start_index = self._write_index.value
        for i in range(self.slot_count):
            slot_index = (start_index + i) % self.slot_count

            # Try to atomically set slot from FREE to WRITING
            if self._slot_states[slot_index] == SLOT_FREE:
                # Simple compare-and-swap using the GIL
                # For truly lock-free, we'd need atomics from a C extension
                with self._lock:
                    if self._slot_states[slot_index] == SLOT_FREE:
                        self._slot_states[slot_index] = SLOT_WRITING
                        self._write_index.value = (slot_index + 1) % self.slot_count

                        return RingBufferSlot(
                            buffer=self,
                            index=slot_index,
                            offset=slot_index * self.slot_size,
                            max_size=self.slot_size,
                        )

        # No free slots - overwrite oldest READY slot (true ring buffer behavior)
        # This drops old data if consumer can't keep up
        for i in range(self.slot_count):
            slot_index = (start_index + i) % self.slot_count

            if self._slot_states[slot_index] == SLOT_READY:
                with self._lock:
                    # Only overwrite if still READY (not being read)
                    if self._slot_states[slot_index] == SLOT_READY:
                        self._slot_states[slot_index] = SLOT_WRITING
                        self._write_index.value = (slot_index + 1) % self.slot_count
                        # logger.debug(
                        #    f"Overwriting slot {slot_index} (consumer too slow)"
                        # )

                        return RingBufferSlot(
                            buffer=self,
                            index=slot_index,
                            offset=slot_index * self.slot_size,
                            max_size=self.slot_size,
                        )

        # All slots are either WRITING or READING - can't allocate
        logger.warning("No free or overwritable slots available in ring buffer")
        return None

    def mark_ready(self, slot_index: int, bytes_written: int = 0):
        """Mark a slot as ready for reading (after writing is complete)."""
        self._ensure_initialized()
        assert self._slot_states is not None

        if self._slot_states[slot_index] == SLOT_WRITING:
            self._slot_states[slot_index] = SLOT_READY
            if bytes_written > 0 and self._bytes_written is not None:
                self._bytes_written.value += bytes_written

    def acquire_for_read(self, slot_index: int, bytes_to_read: int = 0) -> bool:
        """
        Acquire a slot for reading (increment reference count).

        Returns True if successful, False if slot is not ready.
        """
        self._ensure_initialized()
        assert self._slot_states is not None

        with self._lock:
            state = self._slot_states[slot_index]
            if state >= SLOT_READY:
                self._slot_states[slot_index] = state + 1
                if bytes_to_read > 0 and self._bytes_read is not None:
                    self._bytes_read.value += bytes_to_read
                return True
        return False

    def release(self, slot_index: int):
        """
        Release a slot (decrement reference count, free if zero).
        """
        self._ensure_initialized()
        assert self._slot_states is not None

        with self._lock:
            state = self._slot_states[slot_index]
            if state == SLOT_READY:
                # Last reference, free the slot
                self._slot_states[slot_index] = SLOT_FREE
            elif state > SLOT_READY:
                # Decrement reference count
                self._slot_states[slot_index] = state - 1
            elif state == SLOT_WRITING:
                # Writing was aborted, free the slot
                self._slot_states[slot_index] = SLOT_FREE

    def get_stats(self) -> tuple[int, int]:
        """Get (bytes_written, bytes_read) stats."""
        self._ensure_initialized()
        return (
            self._bytes_written.value if self._bytes_written else 0,
            self._bytes_read.value if self._bytes_read else 0,
        )

    def get_slot_view(self, slot_index: int) -> memoryview:
        """Get a memoryview into a specific slot."""
        self._ensure_initialized()
        assert self._shm is not None

        offset = slot_index * self.slot_size
        buf = self._shm.buf
        assert buf is not None, "Shared memory buffer is None"
        return memoryview(buf)[offset : offset + self.slot_size]

    def get_slot_address(self, slot_index: int) -> int:
        """Get the memory address of a slot (for pa.foreign_buffer)."""
        self._ensure_initialized()
        assert self._shm is not None

        # Get base address of the shared memory buffer
        buf = self._shm.buf
        assert buf is not None, "Shared memory buffer is None"
        offset = slot_index * self.slot_size

        # Create a ctypes pointer to get the address
        c_buf = (ctypes.c_char * len(buf)).from_buffer(buf)
        base_addr = ctypes.addressof(c_buf)
        return base_addr + offset

    def cleanup(self):
        """Clean up resources. Only call from the creating process at shutdown."""
        if self._shm is not None:
            try:
                self._shm.close()
                if self._create:
                    self._shm.unlink()
                    logger.info(f"Unlinked shared memory ring buffer '{self.name}'")
            except Exception as e:
                logger.warning(f"Error during ring buffer cleanup: {e}")
            self._shm = None

        self._initialized = False

    def __del__(self):
        try:
            if self._shm is not None:
                self._shm.close()
        except Exception:
            pass

A pre-allocated shared memory ring buffer with fixed-size slots.

This buffer divides a large shared memory region into fixed-size slots that can be allocated and released for zero-copy data transfer between processes.

The buffer is designed for single-producer, single-consumer scenarios but supports multiple consumers through reference counting.

Initialize or attach to a shared memory ring buffer.

Args

name
Unique name for the shared memory region.
total_size
Total size of the buffer in bytes.
slot_size
Size of each slot in bytes.
create
If True, create the buffer. If False, attach to existing.

Methods

def acquire_for_read(self, slot_index: int, bytes_to_read: int = 0) ‑> bool
Expand source code
def acquire_for_read(self, slot_index: int, bytes_to_read: int = 0) -> bool:
    """
    Acquire a slot for reading (increment reference count).

    Returns True if successful, False if slot is not ready.
    """
    self._ensure_initialized()
    assert self._slot_states is not None

    with self._lock:
        state = self._slot_states[slot_index]
        if state >= SLOT_READY:
            self._slot_states[slot_index] = state + 1
            if bytes_to_read > 0 and self._bytes_read is not None:
                self._bytes_read.value += bytes_to_read
            return True
    return False

Acquire a slot for reading (increment reference count).

Returns True if successful, False if slot is not ready.

def allocate(self, required_size: int) ‑> RingBufferSlot | None
Expand source code
def allocate(self, required_size: int) -> Optional[RingBufferSlot]:
    """
    Allocate a slot that can hold the required size.

    Args:
        required_size: Minimum size needed in bytes.

    Returns:
        RingBufferSlot if allocation successful, None if no slots available
        or size too large.
    """
    self._ensure_initialized()

    if required_size > self.slot_size:
        logger.warning(
            f"Requested size {required_size} exceeds slot size {self.slot_size}"
        )
        return None

    assert self._slot_states is not None
    assert self._write_index is not None

    # Try to find a free slot using round-robin starting from write_index
    start_index = self._write_index.value
    for i in range(self.slot_count):
        slot_index = (start_index + i) % self.slot_count

        # Try to atomically set slot from FREE to WRITING
        if self._slot_states[slot_index] == SLOT_FREE:
            # Simple compare-and-swap using the GIL
            # For truly lock-free, we'd need atomics from a C extension
            with self._lock:
                if self._slot_states[slot_index] == SLOT_FREE:
                    self._slot_states[slot_index] = SLOT_WRITING
                    self._write_index.value = (slot_index + 1) % self.slot_count

                    return RingBufferSlot(
                        buffer=self,
                        index=slot_index,
                        offset=slot_index * self.slot_size,
                        max_size=self.slot_size,
                    )

    # No free slots - overwrite oldest READY slot (true ring buffer behavior)
    # This drops old data if consumer can't keep up
    for i in range(self.slot_count):
        slot_index = (start_index + i) % self.slot_count

        if self._slot_states[slot_index] == SLOT_READY:
            with self._lock:
                # Only overwrite if still READY (not being read)
                if self._slot_states[slot_index] == SLOT_READY:
                    self._slot_states[slot_index] = SLOT_WRITING
                    self._write_index.value = (slot_index + 1) % self.slot_count
                    # logger.debug(
                    #    f"Overwriting slot {slot_index} (consumer too slow)"
                    # )

                    return RingBufferSlot(
                        buffer=self,
                        index=slot_index,
                        offset=slot_index * self.slot_size,
                        max_size=self.slot_size,
                    )

    # All slots are either WRITING or READING - can't allocate
    logger.warning("No free or overwritable slots available in ring buffer")
    return None

Allocate a slot that can hold the required size.

Args

required_size
Minimum size needed in bytes.

Returns

RingBufferSlot if allocation successful, None if no slots available or size too large.

def cleanup(self)
Expand source code
def cleanup(self):
    """Clean up resources. Only call from the creating process at shutdown."""
    if self._shm is not None:
        try:
            self._shm.close()
            if self._create:
                self._shm.unlink()
                logger.info(f"Unlinked shared memory ring buffer '{self.name}'")
        except Exception as e:
            logger.warning(f"Error during ring buffer cleanup: {e}")
        self._shm = None

    self._initialized = False

Clean up resources. Only call from the creating process at shutdown.

def get_slot_address(self, slot_index: int) ‑> int
Expand source code
def get_slot_address(self, slot_index: int) -> int:
    """Get the memory address of a slot (for pa.foreign_buffer)."""
    self._ensure_initialized()
    assert self._shm is not None

    # Get base address of the shared memory buffer
    buf = self._shm.buf
    assert buf is not None, "Shared memory buffer is None"
    offset = slot_index * self.slot_size

    # Create a ctypes pointer to get the address
    c_buf = (ctypes.c_char * len(buf)).from_buffer(buf)
    base_addr = ctypes.addressof(c_buf)
    return base_addr + offset

Get the memory address of a slot (for pa.foreign_buffer).

def get_slot_view(self, slot_index: int) ‑> memoryview
Expand source code
def get_slot_view(self, slot_index: int) -> memoryview:
    """Get a memoryview into a specific slot."""
    self._ensure_initialized()
    assert self._shm is not None

    offset = slot_index * self.slot_size
    buf = self._shm.buf
    assert buf is not None, "Shared memory buffer is None"
    return memoryview(buf)[offset : offset + self.slot_size]

Get a memoryview into a specific slot.

def get_stats(self) ‑> tuple[int, int]
Expand source code
def get_stats(self) -> tuple[int, int]:
    """Get (bytes_written, bytes_read) stats."""
    self._ensure_initialized()
    return (
        self._bytes_written.value if self._bytes_written else 0,
        self._bytes_read.value if self._bytes_read else 0,
    )

Get (bytes_written, bytes_read) stats.

def mark_ready(self, slot_index: int, bytes_written: int = 0)
Expand source code
def mark_ready(self, slot_index: int, bytes_written: int = 0):
    """Mark a slot as ready for reading (after writing is complete)."""
    self._ensure_initialized()
    assert self._slot_states is not None

    if self._slot_states[slot_index] == SLOT_WRITING:
        self._slot_states[slot_index] = SLOT_READY
        if bytes_written > 0 and self._bytes_written is not None:
            self._bytes_written.value += bytes_written

Mark a slot as ready for reading (after writing is complete).

def release(self, slot_index: int)
Expand source code
def release(self, slot_index: int):
    """
    Release a slot (decrement reference count, free if zero).
    """
    self._ensure_initialized()
    assert self._slot_states is not None

    with self._lock:
        state = self._slot_states[slot_index]
        if state == SLOT_READY:
            # Last reference, free the slot
            self._slot_states[slot_index] = SLOT_FREE
        elif state > SLOT_READY:
            # Decrement reference count
            self._slot_states[slot_index] = state - 1
        elif state == SLOT_WRITING:
            # Writing was aborted, free the slot
            self._slot_states[slot_index] = SLOT_FREE

Release a slot (decrement reference count, free if zero).