Module enrgdaq.daq.models

Classes

class DAQJobConfig (*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)
Expand source code
class DAQJobConfig(Struct, kw_only=True):
    """
    DAQJobConfig is the base configuration class for DAQJobs.
    Attributes:
        verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO.
        daq_job_type (str): The type of the DAQ job.
        daq_job_unique_id (str): The unique identifier for the DAQ job.
        use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
    """

    daq_job_type: str
    verbosity: LogVerbosity = LogVerbosity.INFO
    daq_job_unique_id: str | None = None
    use_shm_when_possible: bool = True

DAQJobConfig is the base configuration class for DAQJobs.

Attributes

verbosity : LogVerbosity
The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type : str
The type of the DAQ job.
daq_job_unique_id : str
The unique identifier for the DAQ job.
use_shm_when_possible : bool
Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Subclasses

Instance variables

var daq_job_type : str
Expand source code
class DAQJobConfig(Struct, kw_only=True):
    """
    DAQJobConfig is the base configuration class for DAQJobs.
    Attributes:
        verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO.
        daq_job_type (str): The type of the DAQ job.
        daq_job_unique_id (str): The unique identifier for the DAQ job.
        use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
    """

    daq_job_type: str
    verbosity: LogVerbosity = LogVerbosity.INFO
    daq_job_unique_id: str | None = None
    use_shm_when_possible: bool = True
var daq_job_unique_id : str | None
Expand source code
class DAQJobConfig(Struct, kw_only=True):
    """
    DAQJobConfig is the base configuration class for DAQJobs.
    Attributes:
        verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO.
        daq_job_type (str): The type of the DAQ job.
        daq_job_unique_id (str): The unique identifier for the DAQ job.
        use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
    """

    daq_job_type: str
    verbosity: LogVerbosity = LogVerbosity.INFO
    daq_job_unique_id: str | None = None
    use_shm_when_possible: bool = True
var use_shm_when_possible : bool
Expand source code
class DAQJobConfig(Struct, kw_only=True):
    """
    DAQJobConfig is the base configuration class for DAQJobs.
    Attributes:
        verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO.
        daq_job_type (str): The type of the DAQ job.
        daq_job_unique_id (str): The unique identifier for the DAQ job.
        use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
    """

    daq_job_type: str
    verbosity: LogVerbosity = LogVerbosity.INFO
    daq_job_unique_id: str | None = None
    use_shm_when_possible: bool = True
var verbosityLogVerbosity
Expand source code
class DAQJobConfig(Struct, kw_only=True):
    """
    DAQJobConfig is the base configuration class for DAQJobs.
    Attributes:
        verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO.
        daq_job_type (str): The type of the DAQ job.
        daq_job_unique_id (str): The unique identifier for the DAQ job.
        use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
    """

    daq_job_type: str
    verbosity: LogVerbosity = LogVerbosity.INFO
    daq_job_unique_id: str | None = None
    use_shm_when_possible: bool = True
class DAQJobInfo (daq_job_type: str,
unique_id: str,
instance_id: int,
config: str,
subscribed_topics: list[str] = <msgspec._core.Field object>,
supervisor_info: SupervisorInfo | None = None)
Expand source code
@dataclass
class DAQJobInfo:
    """
    A class to represent the information of a DAQJob.
    Attributes:
        daq_job_type : str
            The type of the DAQ job.
        unique_id : str
            A unique identifier for the DAQ job.
        instance_id : int
            An instance identifier for the DAQ job.
        supervisor_config : Optional[SupervisorConfig]
            Configuration for the supervisor, if any.
    """

    daq_job_type: str  # has type(self).__name__
    unique_id: str
    instance_id: int
    config: str
    subscribed_topics: list[str] = field(default_factory=list)
    supervisor_info: Optional[SupervisorInfo] = None

    @staticmethod
    def mock() -> "DAQJobInfo":
        return DAQJobInfo(
            daq_job_type="mock",
            unique_id="mock",
            instance_id=0,
            supervisor_info=SupervisorInfo(supervisor_id="mock"),
            config="",
        )

A class to represent the information of a DAQJob.

Attributes

daq_job_type : str The type of the DAQ job. unique_id : str A unique identifier for the DAQ job. instance_id : int An instance identifier for the DAQ job. supervisor_config : Optional[SupervisorConfig] Configuration for the supervisor, if any.

Static methods

def mock() ‑> DAQJobInfo
Expand source code
@staticmethod
def mock() -> "DAQJobInfo":
    return DAQJobInfo(
        daq_job_type="mock",
        unique_id="mock",
        instance_id=0,
        supervisor_info=SupervisorInfo(supervisor_id="mock"),
        config="",
    )

Instance variables

var config : str
var daq_job_type : str
var instance_id : int
var subscribed_topics : list[str]
var supervisor_infoSupervisorInfo | None
var unique_id : str
class DAQJobLatencyStats (count: int = 0,
min_ms: float = 0.0,
max_ms: float = 0.0,
avg_ms: float = 0.0,
p95_ms: float = 0.0,
p99_ms: float = 0.0)
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0

Statistics for message processing latency. Measurements are in milliseconds.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var avg_ms : float
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0
var count : int
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0
var max_ms : float
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0
var min_ms : float
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0
var p95_ms : float
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0
var p99_ms : float
Expand source code
class DAQJobLatencyStats(Struct):
    """
    Statistics for message processing latency.
    Measurements are in milliseconds.
    """

    count: int = 0
    min_ms: float = 0.0
    max_ms: float = 0.0
    avg_ms: float = 0.0
    p95_ms: float = 0.0
    p99_ms: float = 0.0
class DAQJobMessage (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)
Expand source code
class DAQJobMessage(Struct, kw_only=True):
    """
    DAQJobMessage is the base class for messages sent between DAQJobs.
    Attributes:
        id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
        timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
        is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
        daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
    """

    id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime | None = field(default_factory=datetime.now)
    is_remote: bool = False
    daq_job_info: "DAQJobInfo | None" = None
    topics: set[RouteKey] = field(default_factory=set)

    @property
    def supervisor_id(self) -> str:
        if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
            return "unknown"

        return self.daq_job_info.supervisor_info.supervisor_id

    def pre_send(self):
        pass

DAQJobMessage is the base class for messages sent between DAQJobs.

Attributes

id : Optional[str]
The unique identifier for the message. Defaults to a UUID.
timestamp : Optional[datetime]
The timestamp for the message. Defaults to the current datetime.
is_remote : bool
Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info : Optional[DAQJobInfo]
The information about the DAQJob that sent the message. Defaults to None.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Subclasses

Instance variables

var daq_job_infoDAQJobInfo | None
Expand source code
class DAQJobMessage(Struct, kw_only=True):
    """
    DAQJobMessage is the base class for messages sent between DAQJobs.
    Attributes:
        id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
        timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
        is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
        daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
    """

    id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime | None = field(default_factory=datetime.now)
    is_remote: bool = False
    daq_job_info: "DAQJobInfo | None" = None
    topics: set[RouteKey] = field(default_factory=set)

    @property
    def supervisor_id(self) -> str:
        if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
            return "unknown"

        return self.daq_job_info.supervisor_info.supervisor_id

    def pre_send(self):
        pass
var id : str | None
Expand source code
class DAQJobMessage(Struct, kw_only=True):
    """
    DAQJobMessage is the base class for messages sent between DAQJobs.
    Attributes:
        id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
        timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
        is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
        daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
    """

    id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime | None = field(default_factory=datetime.now)
    is_remote: bool = False
    daq_job_info: "DAQJobInfo | None" = None
    topics: set[RouteKey] = field(default_factory=set)

    @property
    def supervisor_id(self) -> str:
        if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
            return "unknown"

        return self.daq_job_info.supervisor_info.supervisor_id

    def pre_send(self):
        pass
var is_remote : bool
Expand source code
class DAQJobMessage(Struct, kw_only=True):
    """
    DAQJobMessage is the base class for messages sent between DAQJobs.
    Attributes:
        id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
        timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
        is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
        daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
    """

    id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime | None = field(default_factory=datetime.now)
    is_remote: bool = False
    daq_job_info: "DAQJobInfo | None" = None
    topics: set[RouteKey] = field(default_factory=set)

    @property
    def supervisor_id(self) -> str:
        if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
            return "unknown"

        return self.daq_job_info.supervisor_info.supervisor_id

    def pre_send(self):
        pass
prop supervisor_id : str
Expand source code
@property
def supervisor_id(self) -> str:
    if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
        return "unknown"

    return self.daq_job_info.supervisor_info.supervisor_id
var timestamp : datetime.datetime | None
Expand source code
class DAQJobMessage(Struct, kw_only=True):
    """
    DAQJobMessage is the base class for messages sent between DAQJobs.
    Attributes:
        id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
        timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
        is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
        daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
    """

    id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime | None = field(default_factory=datetime.now)
    is_remote: bool = False
    daq_job_info: "DAQJobInfo | None" = None
    topics: set[RouteKey] = field(default_factory=set)

    @property
    def supervisor_id(self) -> str:
        if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
            return "unknown"

        return self.daq_job_info.supervisor_info.supervisor_id

    def pre_send(self):
        pass
var topics : set[str]
Expand source code
class DAQJobMessage(Struct, kw_only=True):
    """
    DAQJobMessage is the base class for messages sent between DAQJobs.
    Attributes:
        id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
        timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
        is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
        daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
    """

    id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime | None = field(default_factory=datetime.now)
    is_remote: bool = False
    daq_job_info: "DAQJobInfo | None" = None
    topics: set[RouteKey] = field(default_factory=set)

    @property
    def supervisor_id(self) -> str:
        if self.daq_job_info is None or self.daq_job_info.supervisor_info is None:
            return "unknown"

        return self.daq_job_info.supervisor_info.supervisor_id

    def pre_send(self):
        pass

Methods

def pre_send(self)
Expand source code
def pre_send(self):
    pass
class DAQJobMessageHeartbeat (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)
Expand source code
class DAQJobMessageHeartbeat(DAQJobMessage):
    pass

DAQJobMessage is the base class for messages sent between DAQJobs.

Attributes

id : Optional[str]
The unique identifier for the message. Defaults to a UUID.
timestamp : Optional[datetime]
The timestamp for the message. Defaults to the current datetime.
is_remote : bool
Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info : Optional[DAQJobInfo]
The information about the DAQJob that sent the message. Defaults to None.

Ancestors

class DAQJobMessageJobStarted (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True)
Expand source code
class DAQJobMessageJobStarted(InternalDAQJobMessage):
    """
    DAQJobMessageJobStarted is sent when a DAQJob starts, primarily used for
    setting DAQJobInfo of the DAQJobProcess. Also signals the process started
    without a problem.
    """

    pass

DAQJobMessageJobStarted is sent when a DAQJob starts, primarily used for setting DAQJobInfo of the DAQJobProcess. Also signals the process started without a problem.

Ancestors

class DAQJobMessageRoutes (routes: dict[str, list[typing.Any]],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True)
Expand source code
class DAQJobMessageRoutes(InternalDAQJobMessage):
    """
    DAQJobMessageRoutes is sent by the supervisor to the DAQJobProcess to
    set the routes for the DAQJobProcess.

    Attributes:
        routes (RouteMapping): route_key to queue mapping.
    """

    routes: RouteMapping
    target_supervisor = False

DAQJobMessageRoutes is sent by the supervisor to the DAQJobProcess to set the routes for the DAQJobProcess.

Attributes

routes : RouteMapping
route_key to queue mapping.

Ancestors

Class variables

var target_supervisor : bool

Instance variables

var routes : dict[str, list[typing.Any]]
Expand source code
class DAQJobMessageRoutes(InternalDAQJobMessage):
    """
    DAQJobMessageRoutes is sent by the supervisor to the DAQJobProcess to
    set the routes for the DAQJobProcess.

    Attributes:
        routes (RouteMapping): route_key to queue mapping.
    """

    routes: RouteMapping
    target_supervisor = False
class DAQJobMessageSHM (shm: SHMHandle,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)
Expand source code
class DAQJobMessageSHM(DAQJobMessage):
    shm: SHMHandle

DAQJobMessage is the base class for messages sent between DAQJobs.

Attributes

id : Optional[str]
The unique identifier for the message. Defaults to a UUID.
timestamp : Optional[datetime]
The timestamp for the message. Defaults to the current datetime.
is_remote : bool
Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info : Optional[DAQJobInfo]
The information about the DAQJob that sent the message. Defaults to None.

Ancestors

Instance variables

var shmSHMHandle
Expand source code
class DAQJobMessageSHM(DAQJobMessage):
    shm: SHMHandle
class DAQJobMessageStatsRemote (stats: dict[str, SupervisorRemoteStats],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage):
    """Message class containing remote statistics."""

    stats: "DAQJobRemoteStatsDict"

Message class containing remote statistics.

Ancestors

Instance variables

var stats : dict[str, SupervisorRemoteStats]
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage):
    """Message class containing remote statistics."""

    stats: "DAQJobRemoteStatsDict"
class DAQJobMessageStatsReport (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True,
processed_count: int,
processed_bytes: int = 0,
sent_count: int,
sent_bytes: int = 0,
latency: DAQJobLatencyStats)
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))

Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.

Ancestors

Instance variables

var latencyDAQJobLatencyStats
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))
var processed_bytes : int
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))
var processed_count : int
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))
var sent_bytes : int
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))
var sent_count : int
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))
var target_supervisor : bool
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True):
    """
    Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
    """

    target_supervisor: bool = True
    processed_count: int
    processed_bytes: int = 0
    sent_count: int
    sent_bytes: int = 0
    latency: DAQJobLatencyStats

    @override
    def pre_send(self):
        super().pre_send()

        self.topics.add(Topic.stats(self.supervisor_id))

Methods

def pre_send(self)
Expand source code
@override
def pre_send(self):
    super().pre_send()

    self.topics.add(Topic.stats(self.supervisor_id))
class DAQJobMessageStop (reason: str,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)
Expand source code
class DAQJobMessageStop(DAQJobMessage):
    reason: str

DAQJobMessage is the base class for messages sent between DAQJobs.

Attributes

id : Optional[str]
The unique identifier for the message. Defaults to a UUID.
timestamp : Optional[datetime]
The timestamp for the message. Defaults to the current datetime.
is_remote : bool
Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info : Optional[DAQJobInfo]
The information about the DAQJob that sent the message. Defaults to None.

Ancestors

Instance variables

var reason : str
Expand source code
class DAQJobMessageStop(DAQJobMessage):
    reason: str
class DAQJobMessageTraceEvent (message_id: str,
message_type: str,
event_type: str,
topics: list[str],
timestamp: datetime.datetime,
size_bytes: int,
source_job: str,
source_supervisor: str)
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id

Single trace event for a message.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var event_type : str
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var message_id : str
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var message_type : str
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var size_bytes : int
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var source_job : str
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var source_supervisor : str
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var timestamp : datetime.datetime
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
var topics : list[str]
Expand source code
class DAQJobMessageTraceEvent(Struct):
    """Single trace event for a message."""

    message_id: str
    message_type: str
    event_type: str  # "sent" | "received"
    topics: list[str]
    timestamp: datetime
    size_bytes: int
    source_job: str  # daq_job_type
    source_supervisor: str  # supervisor_id
class DAQJobMessageTraceReport (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True,
events: list[DAQJobMessageTraceEvent])
Expand source code
class DAQJobMessageTraceReport(InternalDAQJobMessage, kw_only=True):
    """Periodic trace report from a DAQJob."""

    target_supervisor: bool = True
    events: list[DAQJobMessageTraceEvent]

    @override
    def pre_send(self):
        super().pre_send()
        self.topics.add(Topic.traces(self.supervisor_id))

Periodic trace report from a DAQJob.

Ancestors

Instance variables

var events : list[DAQJobMessageTraceEvent]
Expand source code
class DAQJobMessageTraceReport(InternalDAQJobMessage, kw_only=True):
    """Periodic trace report from a DAQJob."""

    target_supervisor: bool = True
    events: list[DAQJobMessageTraceEvent]

    @override
    def pre_send(self):
        super().pre_send()
        self.topics.add(Topic.traces(self.supervisor_id))
var target_supervisor : bool
Expand source code
class DAQJobMessageTraceReport(InternalDAQJobMessage, kw_only=True):
    """Periodic trace report from a DAQJob."""

    target_supervisor: bool = True
    events: list[DAQJobMessageTraceEvent]

    @override
    def pre_send(self):
        super().pre_send()
        self.topics.add(Topic.traces(self.supervisor_id))

Methods

def pre_send(self)
Expand source code
@override
def pre_send(self):
    super().pre_send()
    self.topics.add(Topic.traces(self.supervisor_id))
class DAQJobResourceStats (cpu_percent: float = 0.0, rss_mb: float = 0.0)
Expand source code
class DAQJobResourceStats(Struct):
    """
    Statistics for process resource usage.
    """

    cpu_percent: float = 0.0
    rss_mb: float = 0.0

Statistics for process resource usage.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var cpu_percent : float
Expand source code
class DAQJobResourceStats(Struct):
    """
    Statistics for process resource usage.
    """

    cpu_percent: float = 0.0
    rss_mb: float = 0.0
var rss_mb : float
Expand source code
class DAQJobResourceStats(Struct):
    """
    Statistics for process resource usage.
    """

    cpu_percent: float = 0.0
    rss_mb: float = 0.0
class DAQJobStats (message_in_stats: DAQJobStatsRecord = <factory>,
message_out_stats: DAQJobStatsRecord = <factory>,
message_in_queue_stats: DAQJobStatsRecord = <factory>,
message_out_queue_stats: DAQJobStatsRecord = <factory>,
restart_stats: DAQJobStatsRecord = <factory>,
latency_stats: DAQJobLatencyStats = <factory>,
resource_stats: DAQJobResourceStats = <factory>,
is_alive: bool = True)
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True

A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

Attributes

message_in_stats : DAQJobStatsRecord
The statistics for incoming messages.
message_out_stats : DAQJobStatsRecord
The statistics for outgoing messages.
restart_stats : DAQJobStatsRecord
The statistics for restarts.
is_alive : bool
Whether the DAQJob is alive.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var is_alive : bool
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var latency_statsDAQJobLatencyStats
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var message_in_queue_statsDAQJobStatsRecord
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var message_in_statsDAQJobStatsRecord
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var message_out_queue_statsDAQJobStatsRecord
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var message_out_statsDAQJobStatsRecord
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var resource_statsDAQJobResourceStats
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
var restart_statsDAQJobStatsRecord
Expand source code
class DAQJobStats(Struct):
    """
    A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.

    Attributes:
        message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
        message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
        restart_stats (DAQJobStatsRecord): The statistics for restarts.
        is_alive (bool): Whether the DAQJob is alive.
    """

    message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    message_out_queue_stats: DAQJobStatsRecord = field(
        default_factory=DAQJobStatsRecord
    )
    restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
    latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats)
    resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats)
    is_alive: bool = True
class DAQJobStatsRecord (count: int = 0, last_updated: datetime.datetime | None = None)
Expand source code
class DAQJobStatsRecord(Struct):
    """
    A class to represent a record of statistics for a DAQJob.

    Attributes:
        count (int): The number of times the DAQJob has been called.
        last_updated (Optional[datetime]): The last time the DAQJob was called.
    """

    count: int = 0
    last_updated: Optional[datetime] = None

    def increase(self, amount: int = 1):
        self.set(self.count + amount)

    def set(self, amount: int):
        self.count = amount
        self.last_updated = datetime.now()

A class to represent a record of statistics for a DAQJob.

Attributes

count : int
The number of times the DAQJob has been called.
last_updated : Optional[datetime]
The last time the DAQJob was called.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var count : int
Expand source code
class DAQJobStatsRecord(Struct):
    """
    A class to represent a record of statistics for a DAQJob.

    Attributes:
        count (int): The number of times the DAQJob has been called.
        last_updated (Optional[datetime]): The last time the DAQJob was called.
    """

    count: int = 0
    last_updated: Optional[datetime] = None

    def increase(self, amount: int = 1):
        self.set(self.count + amount)

    def set(self, amount: int):
        self.count = amount
        self.last_updated = datetime.now()
var last_updated : datetime.datetime | None
Expand source code
class DAQJobStatsRecord(Struct):
    """
    A class to represent a record of statistics for a DAQJob.

    Attributes:
        count (int): The number of times the DAQJob has been called.
        last_updated (Optional[datetime]): The last time the DAQJob was called.
    """

    count: int = 0
    last_updated: Optional[datetime] = None

    def increase(self, amount: int = 1):
        self.set(self.count + amount)

    def set(self, amount: int):
        self.count = amount
        self.last_updated = datetime.now()

Methods

def increase(self, amount: int = 1)
Expand source code
def increase(self, amount: int = 1):
    self.set(self.count + amount)
def set(self, amount: int)
Expand source code
def set(self, amount: int):
    self.count = amount
    self.last_updated = datetime.now()
class DAQJobStopError (reason: str)
Expand source code
class DAQJobStopError(Exception):
    def __init__(self, reason: str):
        self.reason = reason

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class InternalDAQJobMessage (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True)
Expand source code
class InternalDAQJobMessage(DAQJobMessage, kw_only=True):
    target_supervisor: bool = True

    @override
    def pre_send(self):
        if self.target_supervisor:
            from enrgdaq.daq.topics import Topic

            self.topics.add(Topic.supervisor_internal(self.supervisor_id))

DAQJobMessage is the base class for messages sent between DAQJobs.

Attributes

id : Optional[str]
The unique identifier for the message. Defaults to a UUID.
timestamp : Optional[datetime]
The timestamp for the message. Defaults to the current datetime.
is_remote : bool
Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info : Optional[DAQJobInfo]
The information about the DAQJob that sent the message. Defaults to None.

Ancestors

Subclasses

Instance variables

var target_supervisor : bool
Expand source code
class InternalDAQJobMessage(DAQJobMessage, kw_only=True):
    target_supervisor: bool = True

    @override
    def pre_send(self):
        if self.target_supervisor:
            from enrgdaq.daq.topics import Topic

            self.topics.add(Topic.supervisor_internal(self.supervisor_id))

Methods

def pre_send(self)
Expand source code
@override
def pre_send(self):
    if self.target_supervisor:
        from enrgdaq.daq.topics import Topic

        self.topics.add(Topic.supervisor_internal(self.supervisor_id))
class RingBufferHandle (buffer_name: str,
slot_index: int,
data_size: int,
is_pyarrow: bool = True,
total_size: int = 268435456,
slot_size: int = 10485760)
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)

Handle to data stored in a shared memory ring buffer slot.

This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var buffer_name : str
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)
var data_size : int
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)
var is_pyarrow : bool
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)
var slot_index : int
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)
var slot_size : int
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)
var total_size : int
Expand source code
class RingBufferHandle(Struct):
    """
    Handle to data stored in a shared memory ring buffer slot.

    This is used for zero-copy transfer of PyArrow data between processes.
    The handle contains only metadata; the actual data remains in shared memory.
    """

    buffer_name: str  # Name of the SharedMemoryRingBuffer
    slot_index: int  # Slot index within the buffer
    data_size: int  # Actual size of data in the slot
    is_pyarrow: bool = True  # Whether this is PyArrow data (for zero-copy path)

    # Ring buffer configuration (needed to attach in other process)
    total_size: int = 256 * 1024 * 1024  # 256 MB default
    slot_size: int = 10 * 1024 * 1024  # 10 MB default

    def load_pyarrow(self):
        """
        Load PyArrow table from the ring buffer slot using zero-copy.

        Returns:
            pa.Table: The PyArrow table (references shared memory directly).
        """
        from enrgdaq.utils.arrow_ipc import read_table_from_address
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )

        # Acquire for reading (increment ref count) and track bytes
        ring_buffer.acquire_for_read(self.slot_index, self.data_size)

        # Get the address and read zero-copy
        address = ring_buffer.get_slot_address(self.slot_index)
        table = read_table_from_address(address, self.data_size, base=ring_buffer)

        return table

    def release(self):
        """Release the slot back to the ring buffer pool."""
        from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

        ring_buffer = attach_to_ring_buffer(
            name=self.buffer_name,
            total_size=self.total_size,
            slot_size=self.slot_size,
        )
        ring_buffer.release(self.slot_index)

Methods

def load_pyarrow(self)
Expand source code
def load_pyarrow(self):
    """
    Load PyArrow table from the ring buffer slot using zero-copy.

    Returns:
        pa.Table: The PyArrow table (references shared memory directly).
    """
    from enrgdaq.utils.arrow_ipc import read_table_from_address
    from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

    ring_buffer = attach_to_ring_buffer(
        name=self.buffer_name,
        total_size=self.total_size,
        slot_size=self.slot_size,
    )

    # Acquire for reading (increment ref count) and track bytes
    ring_buffer.acquire_for_read(self.slot_index, self.data_size)

    # Get the address and read zero-copy
    address = ring_buffer.get_slot_address(self.slot_index)
    table = read_table_from_address(address, self.data_size, base=ring_buffer)

    return table

Load PyArrow table from the ring buffer slot using zero-copy.

Returns

pa.Table
The PyArrow table (references shared memory directly).
def release(self)
Expand source code
def release(self):
    """Release the slot back to the ring buffer pool."""
    from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer

    ring_buffer = attach_to_ring_buffer(
        name=self.buffer_name,
        total_size=self.total_size,
        slot_size=self.slot_size,
    )
    ring_buffer.release(self.slot_index)

Release the slot back to the ring buffer pool.

class SHMHandle (shm_name: str, shm_size: int)
Expand source code
class SHMHandle(Struct):
    shm_name: str
    shm_size: int

    def load(self) -> DAQJobMessage:
        shm = SharedMemory(name=self.shm_name, create=False)
        assert shm.buf is not None, "Shared memory buffer is None"
        message_bytes = shm.buf[: self.shm_size]
        message = pickle.loads(message_bytes)
        del message_bytes
        self.cleanup(shm)
        return message

    def cleanup(self, shm: SharedMemory | None = None):
        if shm is None:
            shm = SharedMemory(name=self.shm_name, create=False)
        assert shm.buf is not None, "Shared memory buffer is None"
        shm.close()
        shm.unlink()

A base class for defining efficient serializable objects.

Fields are defined using type annotations. Fields may optionally have default values, which result in keyword parameters to the constructor.

Structs automatically define __init__, __eq__, __repr__, and __copy__ methods. Additional methods can be defined on the class as needed. Note that __init__/__new__ cannot be overridden, but other methods can. A tuple of the field names is available on the class via the __struct_fields__ attribute if needed.

Additional class options can be enabled by passing keywords to the class definition (see example below). These configuration options may also be inspected at runtime through the __struct_config__ attribute.

Configuration

frozen: bool, default False Whether instances of this type are pseudo-immutable. If true, attribute assignment is disabled and a corresponding __hash__ is defined. order: bool, default False If True, __lt__, `le, gt, and ge`` methods will be generated for this type. eq: bool, default True If True (the default), an __eq__ method will be generated for this type. Set to False to compare based on instance identity alone. kw_only: bool, default False If True, all fields will be treated as keyword-only arguments in the generated __init__ method. Default is False. omit_defaults: bool, default False Whether fields should be omitted from encoding if the corresponding value is the default for that field. Enabling this may reduce message size, and often also improve encoding & decoding performance. forbid_unknown_fields: bool, default False If True, an error is raised if an unknown field is encountered while decoding structs of this type. If False (the default), no error is raised and the unknown field is skipped. tag: str, int, bool, callable, or None, default None Used along with tag_field for configuring tagged union support. If either are non-None, then the struct is considered "tagged". In this case, an extra field (the tag_field) and value (the tag) are added to the encoded message, which can be used to differentiate message types during decoding.

Set tag=True to enable the default tagged configuration (tag_field is "type", tag is the class name). Alternatively, you can provide a string (or less commonly int) value directly to be used as the tag (e.g. tag="my-tag-value").tag can also be passed a callable that takes the class qualname and returns a valid tag value (e.g. tag=str.lower). See the docs for more information. tag_field: str or None, default None The field name to use for tagged union support. If tag is non-None, then this defaults to "type". See the tag docs above for more information. rename: str, mapping, callable, or None, default None Controls renaming the field names used when encoding/decoding the struct. May be one of "lower", "upper", "camel", "pascal", or "kebab" to rename in lowercase, UPPERCASE, camelCase, PascalCase, or kebab-case respectively. May also be a mapping from field names to the renamed names (missing fields are not renamed). Alternatively, may be a callable that takes the field name and returns a new name or None to not rename that field. Default is None for no field renaming. repr_omit_defaults: bool, default False Whether fields should be omitted from the generated repr if the corresponding value is the default for that field. array_like: bool, default False If True, this struct type will be treated as an array-like type during encoding/decoding, rather than a dict-like type (the default). This may improve performance, at the cost of a more inscrutable message encoding. gc: bool, default True Whether garbage collection is enabled for this type. Disabling this may help reduce GC pressure, but will prevent reference cycles composed of only gc=False from being collected. It is the user's responsibility to ensure that reference cycles don't occur when setting gc=False. weakref: bool, default False Whether instances of this type support weak references. Defaults to False. dict: bool, default False Whether instances of this type will include a __dict__. Setting this to True will allow adding additional undeclared attributes to a struct instance, which may be useful for holding private runtime state. Defaults to False. cache_hash: bool, default False If enabled, the hash of a frozen struct instance will be computed at most once, and then cached on the instance for further reuse. For expensive hash values this can improve performance at the cost of a small amount of memory usage.

Examples

Here we define a new Struct type for describing a dog. It has three fields; two required and one optional.

>>> class Dog(Struct):
...     name: str
...     breed: str
...     is_good_boy: bool = True
...
>>> Dog('snickers', breed='corgi')
Dog(name='snickers', breed='corgi', is_good_boy=True)

Additional struct options can be set as part of the class definition. Here we define a new Struct type for a frozen Point object.

>>> class Point(Struct, frozen=True):
...     x: float
...     y: float
...
>>> {Point(1.5, 2.0): 1}  # frozen structs are hashable
{Point(x=1.5, y=2.0): 1}

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var shm_name : str
Expand source code
class SHMHandle(Struct):
    shm_name: str
    shm_size: int

    def load(self) -> DAQJobMessage:
        shm = SharedMemory(name=self.shm_name, create=False)
        assert shm.buf is not None, "Shared memory buffer is None"
        message_bytes = shm.buf[: self.shm_size]
        message = pickle.loads(message_bytes)
        del message_bytes
        self.cleanup(shm)
        return message

    def cleanup(self, shm: SharedMemory | None = None):
        if shm is None:
            shm = SharedMemory(name=self.shm_name, create=False)
        assert shm.buf is not None, "Shared memory buffer is None"
        shm.close()
        shm.unlink()
var shm_size : int
Expand source code
class SHMHandle(Struct):
    shm_name: str
    shm_size: int

    def load(self) -> DAQJobMessage:
        shm = SharedMemory(name=self.shm_name, create=False)
        assert shm.buf is not None, "Shared memory buffer is None"
        message_bytes = shm.buf[: self.shm_size]
        message = pickle.loads(message_bytes)
        del message_bytes
        self.cleanup(shm)
        return message

    def cleanup(self, shm: SharedMemory | None = None):
        if shm is None:
            shm = SharedMemory(name=self.shm_name, create=False)
        assert shm.buf is not None, "Shared memory buffer is None"
        shm.close()
        shm.unlink()

Methods

def cleanup(self, shm: multiprocessing.shared_memory.SharedMemory | None = None)
Expand source code
def cleanup(self, shm: SharedMemory | None = None):
    if shm is None:
        shm = SharedMemory(name=self.shm_name, create=False)
    assert shm.buf is not None, "Shared memory buffer is None"
    shm.close()
    shm.unlink()
def load(self) ‑> DAQJobMessage
Expand source code
def load(self) -> DAQJobMessage:
    shm = SharedMemory(name=self.shm_name, create=False)
    assert shm.buf is not None, "Shared memory buffer is None"
    message_bytes = shm.buf[: self.shm_size]
    message = pickle.loads(message_bytes)
    del message_bytes
    self.cleanup(shm)
    return message
class SupervisorRemoteStats (message_in_count: int = 0,
message_in_bytes: int = 0,
message_out_count: int = 0,
message_out_bytes: int = 0,
last_active: datetime.datetime = <factory>)
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()

Statistics for a remote supervisor.

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var last_active : datetime.datetime
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_in_bytes : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_in_count : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_out_bytes : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()
var message_out_count : int
Expand source code
class SupervisorRemoteStats(Struct):
    """Statistics for a remote supervisor."""

    message_in_count: int = 0
    message_in_bytes: int = 0

    message_out_count: int = 0
    message_out_bytes: int = 0

    last_active: datetime = field(default_factory=datetime.now)

    def update_message_in_stats(self, message_in_bytes: int):
        self.message_in_count += 1
        self.message_in_bytes += message_in_bytes
        self.last_active = datetime.now()

    def update_message_out_stats(self, message_out_bytes: int):
        self.message_out_count += 1
        self.message_out_bytes += message_out_bytes
        self.last_active = datetime.now()

Methods

def update_message_in_stats(self, message_in_bytes: int)
Expand source code
def update_message_in_stats(self, message_in_bytes: int):
    self.message_in_count += 1
    self.message_in_bytes += message_in_bytes
    self.last_active = datetime.now()
def update_message_out_stats(self, message_out_bytes: int)
Expand source code
def update_message_out_stats(self, message_out_bytes: int):
    self.message_out_count += 1
    self.message_out_bytes += message_out_bytes
    self.last_active = datetime.now()