Module enrgdaq.daq.models
Classes
class DAQJobConfig (*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)-
Expand source code
class DAQJobConfig(Struct, kw_only=True): """ DAQJobConfig is the base configuration class for DAQJobs. Attributes: verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO. daq_job_type (str): The type of the DAQ job. daq_job_unique_id (str): The unique identifier for the DAQ job. use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True. """ daq_job_type: str verbosity: LogVerbosity = LogVerbosity.INFO daq_job_unique_id: str | None = None use_shm_when_possible: bool = TrueDAQJobConfig is the base configuration class for DAQJobs.
Attributes
verbosity:LogVerbosity- The verbosity level for logging. Defaults to LogVerbosity.INFO.
daq_job_type:str- The type of the DAQ job.
daq_job_unique_id:str- The unique identifier for the DAQ job.
use_shm_when_possible:bool- Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Subclasses
- DAQJobAlertSlackConfig
- DAQJobCAENDigitizerConfig
- DAQJobCAENHVConfig
- DAQJobHealthcheckConfig
- DAQJobServeHTTPConfig
- DAQJobStoreCSVConfig
- DAQJobStoreHDF5Config
- DAQJobStoreMemoryConfig
- DAQJobStoreMySQLConfig
- DAQJobStoreRawConfig
- DAQJobStoreRedisConfig
- DAQJobStoreROOTConfig
- StorableDAQJobConfig
Instance variables
var daq_job_type : str-
Expand source code
class DAQJobConfig(Struct, kw_only=True): """ DAQJobConfig is the base configuration class for DAQJobs. Attributes: verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO. daq_job_type (str): The type of the DAQ job. daq_job_unique_id (str): The unique identifier for the DAQ job. use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True. """ daq_job_type: str verbosity: LogVerbosity = LogVerbosity.INFO daq_job_unique_id: str | None = None use_shm_when_possible: bool = True var daq_job_unique_id : str | None-
Expand source code
class DAQJobConfig(Struct, kw_only=True): """ DAQJobConfig is the base configuration class for DAQJobs. Attributes: verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO. daq_job_type (str): The type of the DAQ job. daq_job_unique_id (str): The unique identifier for the DAQ job. use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True. """ daq_job_type: str verbosity: LogVerbosity = LogVerbosity.INFO daq_job_unique_id: str | None = None use_shm_when_possible: bool = True var use_shm_when_possible : bool-
Expand source code
class DAQJobConfig(Struct, kw_only=True): """ DAQJobConfig is the base configuration class for DAQJobs. Attributes: verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO. daq_job_type (str): The type of the DAQ job. daq_job_unique_id (str): The unique identifier for the DAQ job. use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True. """ daq_job_type: str verbosity: LogVerbosity = LogVerbosity.INFO daq_job_unique_id: str | None = None use_shm_when_possible: bool = True var verbosity : LogVerbosity-
Expand source code
class DAQJobConfig(Struct, kw_only=True): """ DAQJobConfig is the base configuration class for DAQJobs. Attributes: verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO. daq_job_type (str): The type of the DAQ job. daq_job_unique_id (str): The unique identifier for the DAQ job. use_shm_when_possible (bool): Whether to use shared memory when possible. It is guaranteed to never be used when set to False, although not guaranteed when set to True. """ daq_job_type: str verbosity: LogVerbosity = LogVerbosity.INFO daq_job_unique_id: str | None = None use_shm_when_possible: bool = True
class DAQJobInfo (daq_job_type: str,
unique_id: str,
instance_id: int,
config: str,
subscribed_topics: list[str] = <msgspec._core.Field object>,
supervisor_info: SupervisorInfo | None = None)-
Expand source code
@dataclass class DAQJobInfo: """ A class to represent the information of a DAQJob. Attributes: daq_job_type : str The type of the DAQ job. unique_id : str A unique identifier for the DAQ job. instance_id : int An instance identifier for the DAQ job. supervisor_config : Optional[SupervisorConfig] Configuration for the supervisor, if any. """ daq_job_type: str # has type(self).__name__ unique_id: str instance_id: int config: str subscribed_topics: list[str] = field(default_factory=list) supervisor_info: Optional[SupervisorInfo] = None @staticmethod def mock() -> "DAQJobInfo": return DAQJobInfo( daq_job_type="mock", unique_id="mock", instance_id=0, supervisor_info=SupervisorInfo(supervisor_id="mock"), config="", )A class to represent the information of a DAQJob.
Attributes
daq_job_type : str The type of the DAQ job. unique_id : str A unique identifier for the DAQ job. instance_id : int An instance identifier for the DAQ job. supervisor_config : Optional[SupervisorConfig] Configuration for the supervisor, if any.
Static methods
def mock() ‑> DAQJobInfo-
Expand source code
@staticmethod def mock() -> "DAQJobInfo": return DAQJobInfo( daq_job_type="mock", unique_id="mock", instance_id=0, supervisor_info=SupervisorInfo(supervisor_id="mock"), config="", )
Instance variables
var config : strvar daq_job_type : strvar instance_id : intvar subscribed_topics : list[str]var supervisor_info : SupervisorInfo | Nonevar unique_id : str
class DAQJobLatencyStats (count: int = 0,
min_ms: float = 0.0,
max_ms: float = 0.0,
avg_ms: float = 0.0,
p95_ms: float = 0.0,
p99_ms: float = 0.0)-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0Statistics for message processing latency. Measurements are in milliseconds.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var avg_ms : float-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0 var count : int-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0 var max_ms : float-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0 var min_ms : float-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0 var p95_ms : float-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0 var p99_ms : float-
Expand source code
class DAQJobLatencyStats(Struct): """ Statistics for message processing latency. Measurements are in milliseconds. """ count: int = 0 min_ms: float = 0.0 max_ms: float = 0.0 avg_ms: float = 0.0 p95_ms: float = 0.0 p99_ms: float = 0.0
class DAQJobMessage (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)-
Expand source code
class DAQJobMessage(Struct, kw_only=True): """ DAQJobMessage is the base class for messages sent between DAQJobs. Attributes: id (Optional[str]): The unique identifier for the message. Defaults to a UUID. timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime. is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False. daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None. """ id: str | None = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime | None = field(default_factory=datetime.now) is_remote: bool = False daq_job_info: "DAQJobInfo | None" = None topics: set[RouteKey] = field(default_factory=set) @property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id def pre_send(self): passDAQJobMessage is the base class for messages sent between DAQJobs.
Attributes
id:Optional[str]- The unique identifier for the message. Defaults to a UUID.
timestamp:Optional[datetime]- The timestamp for the message. Defaults to the current datetime.
is_remote:bool- Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info:Optional[DAQJobInfo]- The information about the DAQJob that sent the message. Defaults to None.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Subclasses
- DAQJobMessageAlert
- DAQJobMessageCAENHVSetChParam
- DAQJobMessageCombinedRemoteStats
- DAQJobMessageCombinedStats
- DAQJobMessageCombinedTraces
- DAQJobMessageHeartbeat
- DAQJobMessageSHM
- DAQJobMessageStatsRemote
- DAQJobMessageStop
- InternalDAQJobMessage
- DAQJobMessageStore
Instance variables
var daq_job_info : DAQJobInfo | None-
Expand source code
class DAQJobMessage(Struct, kw_only=True): """ DAQJobMessage is the base class for messages sent between DAQJobs. Attributes: id (Optional[str]): The unique identifier for the message. Defaults to a UUID. timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime. is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False. daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None. """ id: str | None = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime | None = field(default_factory=datetime.now) is_remote: bool = False daq_job_info: "DAQJobInfo | None" = None topics: set[RouteKey] = field(default_factory=set) @property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id def pre_send(self): pass var id : str | None-
Expand source code
class DAQJobMessage(Struct, kw_only=True): """ DAQJobMessage is the base class for messages sent between DAQJobs. Attributes: id (Optional[str]): The unique identifier for the message. Defaults to a UUID. timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime. is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False. daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None. """ id: str | None = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime | None = field(default_factory=datetime.now) is_remote: bool = False daq_job_info: "DAQJobInfo | None" = None topics: set[RouteKey] = field(default_factory=set) @property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id def pre_send(self): pass var is_remote : bool-
Expand source code
class DAQJobMessage(Struct, kw_only=True): """ DAQJobMessage is the base class for messages sent between DAQJobs. Attributes: id (Optional[str]): The unique identifier for the message. Defaults to a UUID. timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime. is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False. daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None. """ id: str | None = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime | None = field(default_factory=datetime.now) is_remote: bool = False daq_job_info: "DAQJobInfo | None" = None topics: set[RouteKey] = field(default_factory=set) @property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id def pre_send(self): pass prop supervisor_id : str-
Expand source code
@property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id var timestamp : datetime.datetime | None-
Expand source code
class DAQJobMessage(Struct, kw_only=True): """ DAQJobMessage is the base class for messages sent between DAQJobs. Attributes: id (Optional[str]): The unique identifier for the message. Defaults to a UUID. timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime. is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False. daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None. """ id: str | None = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime | None = field(default_factory=datetime.now) is_remote: bool = False daq_job_info: "DAQJobInfo | None" = None topics: set[RouteKey] = field(default_factory=set) @property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id def pre_send(self): pass var topics : set[str]-
Expand source code
class DAQJobMessage(Struct, kw_only=True): """ DAQJobMessage is the base class for messages sent between DAQJobs. Attributes: id (Optional[str]): The unique identifier for the message. Defaults to a UUID. timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime. is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False. daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None. """ id: str | None = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime | None = field(default_factory=datetime.now) is_remote: bool = False daq_job_info: "DAQJobInfo | None" = None topics: set[RouteKey] = field(default_factory=set) @property def supervisor_id(self) -> str: if self.daq_job_info is None or self.daq_job_info.supervisor_info is None: return "unknown" return self.daq_job_info.supervisor_info.supervisor_id def pre_send(self): pass
Methods
def pre_send(self)-
Expand source code
def pre_send(self): pass
class DAQJobMessageHeartbeat (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)-
Expand source code
class DAQJobMessageHeartbeat(DAQJobMessage): passDAQJobMessage is the base class for messages sent between DAQJobs.
Attributes
id:Optional[str]- The unique identifier for the message. Defaults to a UUID.
timestamp:Optional[datetime]- The timestamp for the message. Defaults to the current datetime.
is_remote:bool- Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info:Optional[DAQJobInfo]- The information about the DAQJob that sent the message. Defaults to None.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
class DAQJobMessageJobStarted (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True)-
Expand source code
class DAQJobMessageJobStarted(InternalDAQJobMessage): """ DAQJobMessageJobStarted is sent when a DAQJob starts, primarily used for setting DAQJobInfo of the DAQJobProcess. Also signals the process started without a problem. """ passDAQJobMessageJobStarted is sent when a DAQJob starts, primarily used for setting DAQJobInfo of the DAQJobProcess. Also signals the process started without a problem.
Ancestors
- InternalDAQJobMessage
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
class DAQJobMessageRoutes (routes: dict[str, list[typing.Any]],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True)-
Expand source code
class DAQJobMessageRoutes(InternalDAQJobMessage): """ DAQJobMessageRoutes is sent by the supervisor to the DAQJobProcess to set the routes for the DAQJobProcess. Attributes: routes (RouteMapping): route_key to queue mapping. """ routes: RouteMapping target_supervisor = FalseDAQJobMessageRoutes is sent by the supervisor to the DAQJobProcess to set the routes for the DAQJobProcess.
Attributes
routes:RouteMapping- route_key to queue mapping.
Ancestors
- InternalDAQJobMessage
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Class variables
var target_supervisor : bool
Instance variables
var routes : dict[str, list[typing.Any]]-
Expand source code
class DAQJobMessageRoutes(InternalDAQJobMessage): """ DAQJobMessageRoutes is sent by the supervisor to the DAQJobProcess to set the routes for the DAQJobProcess. Attributes: routes (RouteMapping): route_key to queue mapping. """ routes: RouteMapping target_supervisor = False
class DAQJobMessageSHM (shm: SHMHandle,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)-
Expand source code
class DAQJobMessageSHM(DAQJobMessage): shm: SHMHandleDAQJobMessage is the base class for messages sent between DAQJobs.
Attributes
id:Optional[str]- The unique identifier for the message. Defaults to a UUID.
timestamp:Optional[datetime]- The timestamp for the message. Defaults to the current datetime.
is_remote:bool- Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info:Optional[DAQJobInfo]- The information about the DAQJob that sent the message. Defaults to None.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var shm : SHMHandle-
Expand source code
class DAQJobMessageSHM(DAQJobMessage): shm: SHMHandle
class DAQJobMessageStatsRemote (stats: dict[str, SupervisorRemoteStats],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)-
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage): """Message class containing remote statistics.""" stats: "DAQJobRemoteStatsDict"Message class containing remote statistics.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var stats : dict[str, SupervisorRemoteStats]-
Expand source code
class DAQJobMessageStatsRemote(DAQJobMessage): """Message class containing remote statistics.""" stats: "DAQJobRemoteStatsDict"
class DAQJobMessageStatsReport (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True,
processed_count: int,
processed_bytes: int = 0,
sent_count: int,
sent_bytes: int = 0,
latency: DAQJobLatencyStats)-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id))Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor.
Ancestors
- InternalDAQJobMessage
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var latency : DAQJobLatencyStats-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id)) var processed_bytes : int-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id)) var processed_count : int-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id)) var sent_bytes : int-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id)) var sent_count : int-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id)) var target_supervisor : bool-
Expand source code
class DAQJobMessageStatsReport(InternalDAQJobMessage, kw_only=True): """ Periodic report of high-fidelity statistics from the DAQJob back to the Supervisor. """ target_supervisor: bool = True processed_count: int processed_bytes: int = 0 sent_count: int sent_bytes: int = 0 latency: DAQJobLatencyStats @override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id))
Methods
def pre_send(self)-
Expand source code
@override def pre_send(self): super().pre_send() self.topics.add(Topic.stats(self.supervisor_id))
class DAQJobMessageStop (reason: str,
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)-
Expand source code
class DAQJobMessageStop(DAQJobMessage): reason: strDAQJobMessage is the base class for messages sent between DAQJobs.
Attributes
id:Optional[str]- The unique identifier for the message. Defaults to a UUID.
timestamp:Optional[datetime]- The timestamp for the message. Defaults to the current datetime.
is_remote:bool- Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info:Optional[DAQJobInfo]- The information about the DAQJob that sent the message. Defaults to None.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var reason : str-
Expand source code
class DAQJobMessageStop(DAQJobMessage): reason: str
class DAQJobMessageTraceEvent (message_id: str,
message_type: str,
event_type: str,
topics: list[str],
timestamp: datetime.datetime,
size_bytes: int,
source_job: str,
source_supervisor: str)-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_idSingle trace event for a message.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var event_type : str-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var message_id : str-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var message_type : str-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var size_bytes : int-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var source_job : str-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var source_supervisor : str-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var timestamp : datetime.datetime-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id var topics : list[str]-
Expand source code
class DAQJobMessageTraceEvent(Struct): """Single trace event for a message.""" message_id: str message_type: str event_type: str # "sent" | "received" topics: list[str] timestamp: datetime size_bytes: int source_job: str # daq_job_type source_supervisor: str # supervisor_id
class DAQJobMessageTraceReport (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True,
events: list[DAQJobMessageTraceEvent])-
Expand source code
class DAQJobMessageTraceReport(InternalDAQJobMessage, kw_only=True): """Periodic trace report from a DAQJob.""" target_supervisor: bool = True events: list[DAQJobMessageTraceEvent] @override def pre_send(self): super().pre_send() self.topics.add(Topic.traces(self.supervisor_id))Periodic trace report from a DAQJob.
Ancestors
- InternalDAQJobMessage
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var events : list[DAQJobMessageTraceEvent]-
Expand source code
class DAQJobMessageTraceReport(InternalDAQJobMessage, kw_only=True): """Periodic trace report from a DAQJob.""" target_supervisor: bool = True events: list[DAQJobMessageTraceEvent] @override def pre_send(self): super().pre_send() self.topics.add(Topic.traces(self.supervisor_id)) var target_supervisor : bool-
Expand source code
class DAQJobMessageTraceReport(InternalDAQJobMessage, kw_only=True): """Periodic trace report from a DAQJob.""" target_supervisor: bool = True events: list[DAQJobMessageTraceEvent] @override def pre_send(self): super().pre_send() self.topics.add(Topic.traces(self.supervisor_id))
Methods
def pre_send(self)-
Expand source code
@override def pre_send(self): super().pre_send() self.topics.add(Topic.traces(self.supervisor_id))
class DAQJobResourceStats (cpu_percent: float = 0.0, rss_mb: float = 0.0)-
Expand source code
class DAQJobResourceStats(Struct): """ Statistics for process resource usage. """ cpu_percent: float = 0.0 rss_mb: float = 0.0Statistics for process resource usage.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var cpu_percent : float-
Expand source code
class DAQJobResourceStats(Struct): """ Statistics for process resource usage. """ cpu_percent: float = 0.0 rss_mb: float = 0.0 var rss_mb : float-
Expand source code
class DAQJobResourceStats(Struct): """ Statistics for process resource usage. """ cpu_percent: float = 0.0 rss_mb: float = 0.0
class DAQJobStats (message_in_stats: DAQJobStatsRecord = <factory>,
message_out_stats: DAQJobStatsRecord = <factory>,
message_in_queue_stats: DAQJobStatsRecord = <factory>,
message_out_queue_stats: DAQJobStatsRecord = <factory>,
restart_stats: DAQJobStatsRecord = <factory>,
latency_stats: DAQJobLatencyStats = <factory>,
resource_stats: DAQJobResourceStats = <factory>,
is_alive: bool = True)-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = TrueA class to represent statistics for a DAQJob. Gets created and updated by Supervisor.
Attributes
message_in_stats:DAQJobStatsRecord- The statistics for incoming messages.
message_out_stats:DAQJobStatsRecord- The statistics for outgoing messages.
restart_stats:DAQJobStatsRecord- The statistics for restarts.
is_alive:bool- Whether the DAQJob is alive.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var is_alive : bool-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var latency_stats : DAQJobLatencyStats-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var message_in_queue_stats : DAQJobStatsRecord-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var message_in_stats : DAQJobStatsRecord-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var message_out_queue_stats : DAQJobStatsRecord-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var message_out_stats : DAQJobStatsRecord-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var resource_stats : DAQJobResourceStats-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True var restart_stats : DAQJobStatsRecord-
Expand source code
class DAQJobStats(Struct): """ A class to represent statistics for a DAQJob. Gets created and updated by Supervisor. Attributes: message_in_stats (DAQJobStatsRecord): The statistics for incoming messages. message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages. restart_stats (DAQJobStatsRecord): The statistics for restarts. is_alive (bool): Whether the DAQJob is alive. """ message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_in_queue_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) message_out_queue_stats: DAQJobStatsRecord = field( default_factory=DAQJobStatsRecord ) restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord) latency_stats: DAQJobLatencyStats = field(default_factory=DAQJobLatencyStats) resource_stats: DAQJobResourceStats = field(default_factory=DAQJobResourceStats) is_alive: bool = True
class DAQJobStatsRecord (count: int = 0, last_updated: datetime.datetime | None = None)-
Expand source code
class DAQJobStatsRecord(Struct): """ A class to represent a record of statistics for a DAQJob. Attributes: count (int): The number of times the DAQJob has been called. last_updated (Optional[datetime]): The last time the DAQJob was called. """ count: int = 0 last_updated: Optional[datetime] = None def increase(self, amount: int = 1): self.set(self.count + amount) def set(self, amount: int): self.count = amount self.last_updated = datetime.now()A class to represent a record of statistics for a DAQJob.
Attributes
count:int- The number of times the DAQJob has been called.
last_updated:Optional[datetime]- The last time the DAQJob was called.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var count : int-
Expand source code
class DAQJobStatsRecord(Struct): """ A class to represent a record of statistics for a DAQJob. Attributes: count (int): The number of times the DAQJob has been called. last_updated (Optional[datetime]): The last time the DAQJob was called. """ count: int = 0 last_updated: Optional[datetime] = None def increase(self, amount: int = 1): self.set(self.count + amount) def set(self, amount: int): self.count = amount self.last_updated = datetime.now() var last_updated : datetime.datetime | None-
Expand source code
class DAQJobStatsRecord(Struct): """ A class to represent a record of statistics for a DAQJob. Attributes: count (int): The number of times the DAQJob has been called. last_updated (Optional[datetime]): The last time the DAQJob was called. """ count: int = 0 last_updated: Optional[datetime] = None def increase(self, amount: int = 1): self.set(self.count + amount) def set(self, amount: int): self.count = amount self.last_updated = datetime.now()
Methods
def increase(self, amount: int = 1)-
Expand source code
def increase(self, amount: int = 1): self.set(self.count + amount) def set(self, amount: int)-
Expand source code
def set(self, amount: int): self.count = amount self.last_updated = datetime.now()
class DAQJobStopError (reason: str)-
Expand source code
class DAQJobStopError(Exception): def __init__(self, reason: str): self.reason = reasonCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class InternalDAQJobMessage (*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>,
target_supervisor: bool = True)-
Expand source code
class InternalDAQJobMessage(DAQJobMessage, kw_only=True): target_supervisor: bool = True @override def pre_send(self): if self.target_supervisor: from enrgdaq.daq.topics import Topic self.topics.add(Topic.supervisor_internal(self.supervisor_id))DAQJobMessage is the base class for messages sent between DAQJobs.
Attributes
id:Optional[str]- The unique identifier for the message. Defaults to a UUID.
timestamp:Optional[datetime]- The timestamp for the message. Defaults to the current datetime.
is_remote:bool- Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info:Optional[DAQJobInfo]- The information about the DAQJob that sent the message. Defaults to None.
Ancestors
- DAQJobMessage
- msgspec.Struct
- msgspec._core._StructMixin
Subclasses
Instance variables
var target_supervisor : bool-
Expand source code
class InternalDAQJobMessage(DAQJobMessage, kw_only=True): target_supervisor: bool = True @override def pre_send(self): if self.target_supervisor: from enrgdaq.daq.topics import Topic self.topics.add(Topic.supervisor_internal(self.supervisor_id))
Methods
def pre_send(self)-
Expand source code
@override def pre_send(self): if self.target_supervisor: from enrgdaq.daq.topics import Topic self.topics.add(Topic.supervisor_internal(self.supervisor_id))
class RingBufferHandle (buffer_name: str,
slot_index: int,
data_size: int,
is_pyarrow: bool = True,
total_size: int = 268435456,
slot_size: int = 10485760)-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index)Handle to data stored in a shared memory ring buffer slot.
This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var buffer_name : str-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index) var data_size : int-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index) var is_pyarrow : bool-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index) var slot_index : int-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index) var slot_size : int-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index) var total_size : int-
Expand source code
class RingBufferHandle(Struct): """ Handle to data stored in a shared memory ring buffer slot. This is used for zero-copy transfer of PyArrow data between processes. The handle contains only metadata; the actual data remains in shared memory. """ buffer_name: str # Name of the SharedMemoryRingBuffer slot_index: int # Slot index within the buffer data_size: int # Actual size of data in the slot is_pyarrow: bool = True # Whether this is PyArrow data (for zero-copy path) # Ring buffer configuration (needed to attach in other process) total_size: int = 256 * 1024 * 1024 # 256 MB default slot_size: int = 10 * 1024 * 1024 # 10 MB default def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return table def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index)
Methods
def load_pyarrow(self)-
Expand source code
def load_pyarrow(self): """ Load PyArrow table from the ring buffer slot using zero-copy. Returns: pa.Table: The PyArrow table (references shared memory directly). """ from enrgdaq.utils.arrow_ipc import read_table_from_address from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) # Acquire for reading (increment ref count) and track bytes ring_buffer.acquire_for_read(self.slot_index, self.data_size) # Get the address and read zero-copy address = ring_buffer.get_slot_address(self.slot_index) table = read_table_from_address(address, self.data_size, base=ring_buffer) return tableLoad PyArrow table from the ring buffer slot using zero-copy.
Returns
pa.Table- The PyArrow table (references shared memory directly).
def release(self)-
Expand source code
def release(self): """Release the slot back to the ring buffer pool.""" from enrgdaq.utils.shared_ring_buffer import attach_to_ring_buffer ring_buffer = attach_to_ring_buffer( name=self.buffer_name, total_size=self.total_size, slot_size=self.slot_size, ) ring_buffer.release(self.slot_index)Release the slot back to the ring buffer pool.
class SHMHandle (shm_name: str, shm_size: int)-
Expand source code
class SHMHandle(Struct): shm_name: str shm_size: int def load(self) -> DAQJobMessage: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" message_bytes = shm.buf[: self.shm_size] message = pickle.loads(message_bytes) del message_bytes self.cleanup(shm) return message def cleanup(self, shm: SharedMemory | None = None): if shm is None: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" shm.close() shm.unlink()A base class for defining efficient serializable objects.
Fields are defined using type annotations. Fields may optionally have default values, which result in keyword parameters to the constructor.
Structs automatically define
__init__,__eq__,__repr__, and__copy__methods. Additional methods can be defined on the class as needed. Note that__init__/__new__cannot be overridden, but other methods can. A tuple of the field names is available on the class via the__struct_fields__attribute if needed.Additional class options can be enabled by passing keywords to the class definition (see example below). These configuration options may also be inspected at runtime through the
__struct_config__attribute.Configuration
frozen: bool, default False Whether instances of this type are pseudo-immutable. If true, attribute assignment is disabled and a corresponding
__hash__is defined. order: bool, default False If True,__lt__, `le,gt, andge`` methods will be generated for this type. eq: bool, default True If True (the default), an__eq__method will be generated for this type. Set to False to compare based on instance identity alone. kw_only: bool, default False If True, all fields will be treated as keyword-only arguments in the generated__init__method. Default is False. omit_defaults: bool, default False Whether fields should be omitted from encoding if the corresponding value is the default for that field. Enabling this may reduce message size, and often also improve encoding & decoding performance. forbid_unknown_fields: bool, default False If True, an error is raised if an unknown field is encountered while decoding structs of this type. If False (the default), no error is raised and the unknown field is skipped. tag: str, int, bool, callable, or None, default None Used along withtag_fieldfor configuring tagged union support. If either are non-None, then the struct is considered "tagged". In this case, an extra field (thetag_field) and value (thetag) are added to the encoded message, which can be used to differentiate message types during decoding.Set
tag=Trueto enable the default tagged configuration (tag_fieldis"type",tagis the class name). Alternatively, you can provide a string (or less commonly int) value directly to be used as the tag (e.g.tag="my-tag-value").tagcan also be passed a callable that takes the class qualname and returns a valid tag value (e.g.tag=str.lower). See the docs for more information. tag_field: str or None, default None The field name to use for tagged union support. Iftagis non-None, then this defaults to"type". See thetagdocs above for more information. rename: str, mapping, callable, or None, default None Controls renaming the field names used when encoding/decoding the struct. May be one of"lower","upper","camel","pascal", or"kebab"to rename in lowercase, UPPERCASE, camelCase, PascalCase, or kebab-case respectively. May also be a mapping from field names to the renamed names (missing fields are not renamed). Alternatively, may be a callable that takes the field name and returns a new name orNoneto not rename that field. Default isNonefor no field renaming. repr_omit_defaults: bool, default False Whether fields should be omitted from the generated repr if the corresponding value is the default for that field. array_like: bool, default False If True, this struct type will be treated as an array-like type during encoding/decoding, rather than a dict-like type (the default). This may improve performance, at the cost of a more inscrutable message encoding. gc: bool, default True Whether garbage collection is enabled for this type. Disabling this may help reduce GC pressure, but will prevent reference cycles composed of onlygc=Falsefrom being collected. It is the user's responsibility to ensure that reference cycles don't occur when settinggc=False. weakref: bool, default False Whether instances of this type support weak references. Defaults to False. dict: bool, default False Whether instances of this type will include a__dict__. Setting this to True will allow adding additional undeclared attributes to a struct instance, which may be useful for holding private runtime state. Defaults to False. cache_hash: bool, default False If enabled, the hash of a frozen struct instance will be computed at most once, and then cached on the instance for further reuse. For expensive hash values this can improve performance at the cost of a small amount of memory usage.Examples
Here we define a new
Structtype for describing a dog. It has three fields; two required and one optional.>>> class Dog(Struct): ... name: str ... breed: str ... is_good_boy: bool = True ... >>> Dog('snickers', breed='corgi') Dog(name='snickers', breed='corgi', is_good_boy=True)Additional struct options can be set as part of the class definition. Here we define a new
Structtype for a frozenPointobject.>>> class Point(Struct, frozen=True): ... x: float ... y: float ... >>> {Point(1.5, 2.0): 1} # frozen structs are hashable {Point(x=1.5, y=2.0): 1}Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var shm_name : str-
Expand source code
class SHMHandle(Struct): shm_name: str shm_size: int def load(self) -> DAQJobMessage: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" message_bytes = shm.buf[: self.shm_size] message = pickle.loads(message_bytes) del message_bytes self.cleanup(shm) return message def cleanup(self, shm: SharedMemory | None = None): if shm is None: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" shm.close() shm.unlink() var shm_size : int-
Expand source code
class SHMHandle(Struct): shm_name: str shm_size: int def load(self) -> DAQJobMessage: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" message_bytes = shm.buf[: self.shm_size] message = pickle.loads(message_bytes) del message_bytes self.cleanup(shm) return message def cleanup(self, shm: SharedMemory | None = None): if shm is None: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" shm.close() shm.unlink()
Methods
def cleanup(self, shm: multiprocessing.shared_memory.SharedMemory | None = None)-
Expand source code
def cleanup(self, shm: SharedMemory | None = None): if shm is None: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" shm.close() shm.unlink() def load(self) ‑> DAQJobMessage-
Expand source code
def load(self) -> DAQJobMessage: shm = SharedMemory(name=self.shm_name, create=False) assert shm.buf is not None, "Shared memory buffer is None" message_bytes = shm.buf[: self.shm_size] message = pickle.loads(message_bytes) del message_bytes self.cleanup(shm) return message
class SupervisorRemoteStats (message_in_count: int = 0,
message_in_bytes: int = 0,
message_out_count: int = 0,
message_out_bytes: int = 0,
last_active: datetime.datetime = <factory>)-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now()Statistics for a remote supervisor.
Ancestors
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var last_active : datetime.datetime-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_in_bytes : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_in_count : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_out_bytes : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now() var message_out_count : int-
Expand source code
class SupervisorRemoteStats(Struct): """Statistics for a remote supervisor.""" message_in_count: int = 0 message_in_bytes: int = 0 message_out_count: int = 0 message_out_bytes: int = 0 last_active: datetime = field(default_factory=datetime.now) def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now()
Methods
def update_message_in_stats(self, message_in_bytes: int)-
Expand source code
def update_message_in_stats(self, message_in_bytes: int): self.message_in_count += 1 self.message_in_bytes += message_in_bytes self.last_active = datetime.now() def update_message_out_stats(self, message_out_bytes: int)-
Expand source code
def update_message_out_stats(self, message_out_bytes: int): self.message_out_count += 1 self.message_out_bytes += message_out_bytes self.last_active = datetime.now()