Module enrgdaq.daq.base

Classes

class DAQJob (config: DAQJobConfig,
supervisor_info: SupervisorInfo | None = None,
instance_id: int | None = None,
message_in: Any = None,
message_out: Any = None,
raw_config: str | None = None,
zmq_xpub_url: str | None = None,
zmq_xsub_url: str | None = None)
Expand source code
class DAQJob:
    """
    DAQJob is a base class for data acquisition jobs. It handles the configuration,
    message queues, and provides methods for consuming and handling messages.
    Attributes:
        allowed_message_in_types (list[type[DAQJobMessage]]): List of allowed message types for input.
        config_type (Any): Type of the configuration.
        config (Any): Configuration object.
        message_in (Queue[DAQJobMessage]): Queue for incoming messages.
        message_out (Queue[DAQJobMessage]): Queue for outgoing messages.
        instance_id (int): Unique instance identifier.
        unique_id (str): Unique identifier for the job.
        restart_offset (timedelta): Offset for restarting the job.
        info (DAQJobInfo): Information about the job.
        _has_been_freed (bool): Flag indicating if the job has been freed.
        _logger (logging.Logger): Logger instance for the job.
        multiprocessing_method (str): The multiprocessing method to use ('fork' or 'spawn').
    """

    allowed_message_in_types = []

    topics_to_subscribe: list[str] = []
    config_type: type[DAQJobConfig]  # pyright: ignore[reportUninitializedInstanceVariable]
    config: DAQJobConfig
    message_in: Any
    message_out: Any
    instance_id: int
    unique_id: str
    restart_offset: timedelta  # pyright: ignore[reportUninitializedInstanceVariable]
    info: "DAQJobInfo"
    _has_been_freed: bool
    _logger: logging.Logger
    multiprocessing_method: str = "default"  # Can be 'fork', 'spawn', or 'default'
    watchdog_timeout_seconds: float = 0  # Watchdog timeout in seconds, 0 = disabled
    watchdog_force_exit: bool = (
        False  # If True, force exit on timeout (for blocking calls)
    )
    _watchdog: Watchdog
    _supervisor_info: SupervisorInfo | None
    _consume_thread: threading.Thread | None = None

    def __init__(
        self,
        config: DAQJobConfig,
        supervisor_info: SupervisorInfo | None = None,
        instance_id: int | None = None,
        message_in: Any = None,
        message_out: Any = None,
        raw_config: str | None = None,
        zmq_xpub_url: str | None = None,
        zmq_xsub_url: str | None = None,
    ):
        self.instance_id = instance_id or 0
        self._logger = logging.getLogger(f"{type(self).__name__}({self.instance_id})")

        # TODO: In some tests config is a dict, so we need to check for this
        if isinstance(config, DAQJobConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
            self._logger.setLevel(config.verbosity.to_logging_level())

        self.config = config
        self.message_in = message_in or _create_queue()
        self.message_out = message_out or _create_queue()

        self._has_been_freed = False
        self.unique_id = getattr(config, "daq_job_unique_id", None) or str(uuid.uuid4())

        self._supervisor_info = supervisor_info

        self.info = self._create_info(raw_config)
        self._logger.debug(f"DAQ job {self.info.unique_id} created")

        self._watchdog = Watchdog(
            timeout_seconds=self.watchdog_timeout_seconds,
            force_exit=self.watchdog_force_exit,
            logger=self._logger,
        )

        self._latency_samples: list[float] = []
        self._processed_count = 0
        self._processed_bytes = 0
        self._sent_count = 0
        self._sent_bytes = 0
        self._last_stats_report_time = datetime.now()

        # Trace collection
        self._trace_events: list[DAQJobMessageTraceEvent] = []
        self._last_trace_report_time = datetime.now()

        self.topics_to_subscribe.extend(
            [
                Topic.supervisor_broadcast(self.supervisor_id),
                Topic.daq_job_direct(type(self).__name__, self.unique_id),
            ]
        )
        self.info.subscribed_topics = self.topics_to_subscribe

        self._zmq_xpub_url = zmq_xpub_url
        self._zmq_xsub_url = zmq_xsub_url

        self._consume_thread = threading.Thread(
            target=self._consume_thread_func, daemon=True
        )
        self._publish_thread = threading.Thread(
            target=self._publish_thread_func, daemon=True
        )
        self._publish_buffer = queue.Queue()
        self._consume_thread.start()
        self._publish_thread.start()

    def get_job_started_message(self):
        return self._prepare_message(DAQJobMessageJobStarted())

    def _consume_thread_func(self):
        assert self._zmq_xpub_url is not None

        # Connect to zmq xpub
        self.zmq_context = zmq.Context()
        zmq_xpub = self.zmq_context.socket(zmq.SUB)
        zmq_xpub.setsockopt_string(zmq.IDENTITY, self.unique_id)
        zmq_xpub.connect(self._zmq_xpub_url)
        # Subscribe to topics
        for topic in self.topics_to_subscribe:
            zmq_xpub.subscribe(topic)

        self._logger.debug(
            f"Subscribed to topics: {', '.join(self.topics_to_subscribe)}"
        )

        # Start receiving messages
        while not self._has_been_freed:
            try:
                parts = zmq_xpub.recv_multipart()
                topic = parts[0]
                header = parts[1]
                buffers = parts[2:]

                message_len = len(header) + sum(len(b) for b in buffers)
                recv_message = pickle.loads(header, buffers=buffers)
                if isinstance(recv_message, DAQJobMessageSHM) or isinstance(
                    recv_message, DAQJobMessageStoreSHM
                ):
                    message_len += recv_message.shm.shm_size
                elif isinstance(recv_message, DAQJobMessageStorePyArrow):
                    if recv_message.handle is not None:
                        message_len += recv_message.handle.data_size

                recv_message = self._unwrap_message(recv_message)
                self._logger.debug(
                    f"Received message of size {message_len} bytes '{type(recv_message).__name__}' on topic '{topic.decode()}'"
                )
                recv_message.is_remote = True
                self.handle_message(recv_message)
                self._processed_bytes += message_len

                # Record received trace event (skip internal messages)
                if not isinstance(recv_message, InternalDAQJobMessage):
                    self._trace_events.append(
                        DAQJobMessageTraceEvent(
                            message_id=recv_message.id or "unknown",
                            message_type=type(recv_message).__name__,
                            event_type="received",
                            topics=list(recv_message.topics),
                            timestamp=datetime.now(),
                            size_bytes=message_len,
                            source_job=type(self).__name__,
                            source_supervisor=self.supervisor_id,
                        )
                    )

                self.report_stats()
                self.report_traces()
            except zmq.ContextTerminated:
                break
            except Exception as e:
                self._logger.error(
                    f"Error while unpacking message sent in {topic}: {e}",
                    exc_info=True,
                )

    def _publish_thread_func(self):
        assert self._zmq_xsub_url is not None
        # Connect to zmq xsub
        self.zmq_context = zmq.Context()
        zmq_xsub = self.zmq_context.socket(zmq.PUB)
        zmq_xsub.setsockopt_string(zmq.IDENTITY, self.unique_id)
        zmq_xsub.connect(self._zmq_xsub_url)

        buffer = []
        while not self._has_been_freed:
            message: DAQJobMessage = self._publish_buffer.get()
            if message is None:
                break
            send_message(zmq_xsub, message, buffer)
            self.report_stats()

    def _unwrap_message(self, message: DAQJobMessage) -> DAQJobMessage:
        if isinstance(message, DAQJobMessageSHM) or isinstance(
            message, DAQJobMessageStoreSHM
        ):
            res = message.shm.load()
            message.shm.cleanup()
            return res
        return message

    def handle_message(self, message: "DAQJobMessage") -> bool:
        """
        Handles a message received from the message queue.

        Args:
            message (DAQJobMessage): The message to handle.

        Returns:
            bool: True if the message was handled, False otherwise.

        Raises:
            DAQJobStopError: If the message is a DAQJobMessageStop.
            Exception: If the message is not accepted by the job.
        """

        if isinstance(message, DAQJobMessageStop):
            self.free()
            raise DAQJobStopError(message.reason)
        # check if the message is accepted
        is_message_type_accepted = False
        for accepted_message_type in self.allowed_message_in_types:
            if isinstance(message, accepted_message_type):
                is_message_type_accepted = True
        if not is_message_type_accepted:
            raise Exception(
                f"Message type '{type(message)}' is not accepted by '{type(self).__name__}'"
            )

        self._processed_count += 1

        if message.timestamp and isinstance(message, DAQJobMessageStore):
            latency = (datetime.now() - message.timestamp).total_seconds() * 1000.0
            self._latency_samples.append(latency)

            # Keep only last 1000 samples
            if len(self._latency_samples) > 1000:
                self._latency_samples.pop(0)

        return True

    def start(self):
        raise NotImplementedError

    def _create_info(self, raw_config: Optional[str] = None) -> "DAQJobInfo":
        """
        Creates a DAQJobInfo object for the job.

        Returns:
            DAQJobInfo: The created DAQJobInfo object.
        """

        return DAQJobInfo(
            daq_job_type=self.config.daq_job_type
            if isinstance(self.config, DAQJobConfig)
            else self.config["daq_job_type"],
            unique_id=self.unique_id,
            instance_id=self.instance_id,
            supervisor_info=getattr(self, "_supervisor_info", None),
            config=raw_config or "# No config",
            subscribed_topics=self.topics_to_subscribe,
        )

    def _prepare_message(
        self, message: DAQJobMessage, use_shm=False, modify_message_metadata=True
    ):
        """
        Prepares a message for sending.

        Args:
            message (DAQJobMessage): The message to put in the queue.
        """

        if modify_message_metadata:
            message.daq_job_info = self.info

        omit_debug_message = not isinstance(
            message, DAQJobMessageStatsReport
        ) and not isinstance(message, InternalDAQJobMessage)
        if self.config.verbosity == LogVerbosity.DEBUG and omit_debug_message:
            self._logger.debug(f"Message out: {_format_message_for_log(message)}")

        if (
            use_shm
            and self.config.use_shm_when_possible
            # Should be not store message, or if it is, it should target local supervisor
            and (
                not isinstance(message, DAQJobMessageStore)
                or message.target_local_supervisor
            )
            # Should not be Windows
            and sys.platform != "win32"
        ):
            original_message = message

            # Use zero-copy ring buffer for PyArrow messages
            if (
                isinstance(message, DAQJobMessageStorePyArrow)
                and message.table is not None
            ):
                handle, success = try_zero_copy_pyarrow(
                    message.table,
                    message.store_config,
                    message.tag,
                )
                if success and handle is not None:
                    message = DAQJobMessageStorePyArrow(
                        store_config=message.store_config,
                        tag=message.tag,
                        table=None,
                        handle=handle,
                    )
                    message.daq_job_info = self.info
                    message.topics = original_message.topics
                else:
                    # Fall back to regular pickle-based SHM
                    pass
            else:
                # For non-PyArrow messages, use the existing pickle-based approach
                pass

            # Standard SHM path for non-PyArrow or fallback
            if not (
                isinstance(message, DAQJobMessageStorePyArrow)
                and message.handle is not None
            ):
                # Pickle message
                message_bytes = pickle.dumps(message)
                # Create shared memory object
                shm = SharedMemory(create=True, size=len(message_bytes))
                assert shm.buf is not None, "Shared memory buffer is None"
                # Write message to shared memory
                shm.buf[: len(message_bytes)] = message_bytes
                shm.close()
                shm_handle = SHMHandle(shm_name=shm.name, shm_size=shm.size)
                if isinstance(original_message, DAQJobMessageStore):
                    message = DAQJobMessageStoreSHM(
                        store_config=getattr(original_message, "store_config"),
                        tag=getattr(original_message, "tag", None),
                        shm=shm_handle,
                    )
                else:
                    message = DAQJobMessageSHM(
                        shm=shm_handle,
                    )
                message.daq_job_info = self.info
                message.topics = original_message.topics

        return message

    def _put_message_out(
        self, message: DAQJobMessage, use_shm=False, modify_message_metadata=True
    ):
        """
        Sends the message to its described topics.

        Args:
            message (DAQJobMessage): The message to put in the queue.
        """
        message = self._prepare_message(message, use_shm, modify_message_metadata)
        self._publish_buffer.put(message)
        self._sent_count += 1

        # Record sent trace event (skip internal messages to avoid loops)
        if not isinstance(message, InternalDAQJobMessage):
            # Estimate size using pickle (approximation)
            try:
                sent_size = len(pickle.dumps(message))
            except Exception:
                sent_size = 0
            self._trace_events.append(
                DAQJobMessageTraceEvent(
                    message_id=message.id or "unknown",
                    message_type=type(message).__name__,
                    event_type="sent",
                    topics=list(message.topics),
                    timestamp=datetime.now(),
                    size_bytes=sent_size,
                    source_job=type(self).__name__,
                    source_supervisor=self.supervisor_id,
                )
            )

    def get_latency_stats(self) -> Any:
        if not self._latency_samples:
            return DAQJobLatencyStats()

        samples = sorted(self._latency_samples)
        count = len(samples)
        return DAQJobLatencyStats(
            count=self._processed_count,
            min_ms=samples[0],
            max_ms=samples[-1],
            avg_ms=sum(samples) / count,
            p95_ms=samples[int(count * 0.95)],
            p99_ms=samples[int(count * 0.99)],
        )

    def report_stats(self, force: bool = False):
        if (
            not force
            and (datetime.now() - self._last_stats_report_time).total_seconds()
            < DAQ_JOB_STATS_REPORT_INTERVAL_SECONDS
        ):
            return

        self._last_stats_report_time = datetime.now()
        report = DAQJobMessageStatsReport(
            processed_count=self._processed_count,
            processed_bytes=self._processed_bytes,
            sent_count=self._sent_count,
            sent_bytes=self._sent_bytes,
            latency=self.get_latency_stats(),
        )
        self._put_message_out(report)
        self._latency_samples = []  # RESET after report to get interval stats

    def report_traces(self, force: bool = False):
        if (
            not force
            and (datetime.now() - self._last_trace_report_time).total_seconds()
            < DAQ_JOB_TRACE_REPORT_INTERVAL_SECONDS
        ):
            return

        if not self._trace_events:
            return

        self._last_trace_report_time = datetime.now()
        report = DAQJobMessageTraceReport(events=self._trace_events.copy())
        self._put_message_out(report)
        self._trace_events.clear()

    def __del__(self):
        self._logger.info("DAQ job is being deleted")
        self._has_been_freed = True

    @property
    def supervisor_id(self):
        if self.info.supervisor_info is None:
            return "unknown"
        return self.info.supervisor_info.supervisor_id

    def free(self):
        if self._has_been_freed:
            return
        self._has_been_freed = True
        self.__del__()

DAQJob is a base class for data acquisition jobs. It handles the configuration, message queues, and provides methods for consuming and handling messages.

Attributes

allowed_message_in_types : list[type[DAQJobMessage]]
List of allowed message types for input.
config_type : Any
Type of the configuration.
config : Any
Configuration object.
message_in : Queue[DAQJobMessage]
Queue for incoming messages.
message_out : Queue[DAQJobMessage]
Queue for outgoing messages.
instance_id : int
Unique instance identifier.
unique_id : str
Unique identifier for the job.
restart_offset : timedelta
Offset for restarting the job.
info : DAQJobInfo
Information about the job.
_has_been_freed : bool
Flag indicating if the job has been freed.
_logger : logging.Logger
Logger instance for the job.
multiprocessing_method : str
The multiprocessing method to use ('fork' or 'spawn').

Subclasses

Class variables

var allowed_message_in_types
var configDAQJobConfig
var config_type : type[DAQJobConfig]
var infoDAQJobInfo
var instance_id : int
var message_in : Any
var message_out : Any
var multiprocessing_method : str
var restart_offset : datetime.timedelta
var topics_to_subscribe : list[str]
var unique_id : str
var watchdog_force_exit : bool
var watchdog_timeout_seconds : float

Instance variables

prop supervisor_id
Expand source code
@property
def supervisor_id(self):
    if self.info.supervisor_info is None:
        return "unknown"
    return self.info.supervisor_info.supervisor_id

Methods

def free(self)
Expand source code
def free(self):
    if self._has_been_freed:
        return
    self._has_been_freed = True
    self.__del__()
def get_job_started_message(self)
Expand source code
def get_job_started_message(self):
    return self._prepare_message(DAQJobMessageJobStarted())
def get_latency_stats(self) ‑> Any
Expand source code
def get_latency_stats(self) -> Any:
    if not self._latency_samples:
        return DAQJobLatencyStats()

    samples = sorted(self._latency_samples)
    count = len(samples)
    return DAQJobLatencyStats(
        count=self._processed_count,
        min_ms=samples[0],
        max_ms=samples[-1],
        avg_ms=sum(samples) / count,
        p95_ms=samples[int(count * 0.95)],
        p99_ms=samples[int(count * 0.99)],
    )
def handle_message(self, message: DAQJobMessage) ‑> bool
Expand source code
def handle_message(self, message: "DAQJobMessage") -> bool:
    """
    Handles a message received from the message queue.

    Args:
        message (DAQJobMessage): The message to handle.

    Returns:
        bool: True if the message was handled, False otherwise.

    Raises:
        DAQJobStopError: If the message is a DAQJobMessageStop.
        Exception: If the message is not accepted by the job.
    """

    if isinstance(message, DAQJobMessageStop):
        self.free()
        raise DAQJobStopError(message.reason)
    # check if the message is accepted
    is_message_type_accepted = False
    for accepted_message_type in self.allowed_message_in_types:
        if isinstance(message, accepted_message_type):
            is_message_type_accepted = True
    if not is_message_type_accepted:
        raise Exception(
            f"Message type '{type(message)}' is not accepted by '{type(self).__name__}'"
        )

    self._processed_count += 1

    if message.timestamp and isinstance(message, DAQJobMessageStore):
        latency = (datetime.now() - message.timestamp).total_seconds() * 1000.0
        self._latency_samples.append(latency)

        # Keep only last 1000 samples
        if len(self._latency_samples) > 1000:
            self._latency_samples.pop(0)

    return True

Handles a message received from the message queue.

Args

message : DAQJobMessage
The message to handle.

Returns

bool
True if the message was handled, False otherwise.

Raises

DAQJobStopError
If the message is a DAQJobMessageStop.
Exception
If the message is not accepted by the job.
def report_stats(self, force: bool = False)
Expand source code
def report_stats(self, force: bool = False):
    if (
        not force
        and (datetime.now() - self._last_stats_report_time).total_seconds()
        < DAQ_JOB_STATS_REPORT_INTERVAL_SECONDS
    ):
        return

    self._last_stats_report_time = datetime.now()
    report = DAQJobMessageStatsReport(
        processed_count=self._processed_count,
        processed_bytes=self._processed_bytes,
        sent_count=self._sent_count,
        sent_bytes=self._sent_bytes,
        latency=self.get_latency_stats(),
    )
    self._put_message_out(report)
    self._latency_samples = []  # RESET after report to get interval stats
def report_traces(self, force: bool = False)
Expand source code
def report_traces(self, force: bool = False):
    if (
        not force
        and (datetime.now() - self._last_trace_report_time).total_seconds()
        < DAQ_JOB_TRACE_REPORT_INTERVAL_SECONDS
    ):
        return

    if not self._trace_events:
        return

    self._last_trace_report_time = datetime.now()
    report = DAQJobMessageTraceReport(events=self._trace_events.copy())
    self._put_message_out(report)
    self._trace_events.clear()
def start(self)
Expand source code
def start(self):
    raise NotImplementedError
class DAQJobProcess (*,
daq_job_cls: type[DAQJob],
supervisor_info: SupervisorInfo,
config: DAQJobConfig,
process: multiprocessing.context.Process | None,
start_time: datetime.datetime = <factory>,
instance_id: int,
daq_job_info: DAQJobInfo | None = None,
raw_config: str | None = None,
log_queue: typing.Any | None = None,
restart_on_crash: bool = True,
zmq_xpub_url: str | None = None,
zmq_xsub_url: str | None = None,
job_started_queue: > = <factory>)
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e

A base class for defining efficient serializable objects.

Fields are defined using type annotations. Fields may optionally have default values, which result in keyword parameters to the constructor.

Structs automatically define __init__, __eq__, __repr__, and __copy__ methods. Additional methods can be defined on the class as needed. Note that __init__/__new__ cannot be overridden, but other methods can. A tuple of the field names is available on the class via the __struct_fields__ attribute if needed.

Additional class options can be enabled by passing keywords to the class definition (see example below). These configuration options may also be inspected at runtime through the __struct_config__ attribute.

Configuration

frozen: bool, default False Whether instances of this type are pseudo-immutable. If true, attribute assignment is disabled and a corresponding __hash__ is defined. order: bool, default False If True, __lt__, `le, gt, and ge`` methods will be generated for this type. eq: bool, default True If True (the default), an __eq__ method will be generated for this type. Set to False to compare based on instance identity alone. kw_only: bool, default False If True, all fields will be treated as keyword-only arguments in the generated __init__ method. Default is False. omit_defaults: bool, default False Whether fields should be omitted from encoding if the corresponding value is the default for that field. Enabling this may reduce message size, and often also improve encoding & decoding performance. forbid_unknown_fields: bool, default False If True, an error is raised if an unknown field is encountered while decoding structs of this type. If False (the default), no error is raised and the unknown field is skipped. tag: str, int, bool, callable, or None, default None Used along with tag_field for configuring tagged union support. If either are non-None, then the struct is considered "tagged". In this case, an extra field (the tag_field) and value (the tag) are added to the encoded message, which can be used to differentiate message types during decoding.

Set tag=True to enable the default tagged configuration (tag_field is "type", tag is the class name). Alternatively, you can provide a string (or less commonly int) value directly to be used as the tag (e.g. tag="my-tag-value").tag can also be passed a callable that takes the class qualname and returns a valid tag value (e.g. tag=str.lower). See the docs for more information. tag_field: str or None, default None The field name to use for tagged union support. If tag is non-None, then this defaults to "type". See the tag docs above for more information. rename: str, mapping, callable, or None, default None Controls renaming the field names used when encoding/decoding the struct. May be one of "lower", "upper", "camel", "pascal", or "kebab" to rename in lowercase, UPPERCASE, camelCase, PascalCase, or kebab-case respectively. May also be a mapping from field names to the renamed names (missing fields are not renamed). Alternatively, may be a callable that takes the field name and returns a new name or None to not rename that field. Default is None for no field renaming. repr_omit_defaults: bool, default False Whether fields should be omitted from the generated repr if the corresponding value is the default for that field. array_like: bool, default False If True, this struct type will be treated as an array-like type during encoding/decoding, rather than a dict-like type (the default). This may improve performance, at the cost of a more inscrutable message encoding. gc: bool, default True Whether garbage collection is enabled for this type. Disabling this may help reduce GC pressure, but will prevent reference cycles composed of only gc=False from being collected. It is the user's responsibility to ensure that reference cycles don't occur when setting gc=False. weakref: bool, default False Whether instances of this type support weak references. Defaults to False. dict: bool, default False Whether instances of this type will include a __dict__. Setting this to True will allow adding additional undeclared attributes to a struct instance, which may be useful for holding private runtime state. Defaults to False. cache_hash: bool, default False If enabled, the hash of a frozen struct instance will be computed at most once, and then cached on the instance for further reuse. For expensive hash values this can improve performance at the cost of a small amount of memory usage.

Examples

Here we define a new Struct type for describing a dog. It has three fields; two required and one optional.

>>> class Dog(Struct):
...     name: str
...     breed: str
...     is_good_boy: bool = True
...
>>> Dog('snickers', breed='corgi')
Dog(name='snickers', breed='corgi', is_good_boy=True)

Additional struct options can be set as part of the class definition. Here we define a new Struct type for a frozen Point object.

>>> class Point(Struct, frozen=True):
...     x: float
...     y: float
...
>>> {Point(1.5, 2.0): 1}  # frozen structs are hashable
{Point(x=1.5, y=2.0): 1}

Ancestors

  • msgspec.Struct
  • msgspec._core._StructMixin

Instance variables

var configDAQJobConfig
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var daq_job_cls : type[DAQJob]
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var daq_job_infoDAQJobInfo | None
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var instance_id : int
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var job_started_queue>
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var log_queue : typing.Any | None
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var process : multiprocessing.context.Process | None
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var raw_config : str | None
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var restart_on_crash : bool
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var start_time : datetime.datetime
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var supervisor_infoSupervisorInfo
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var zmq_xpub_url : str | None
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e
var zmq_xsub_url : str | None
Expand source code
class DAQJobProcess(msgspec.Struct, kw_only=True):
    daq_job_cls: type[DAQJob]
    supervisor_info: SupervisorInfo
    config: DAQJobConfig
    process: Process | None
    start_time: datetime = msgspec.field(default_factory=datetime.now)
    instance_id: int
    daq_job_info: DAQJobInfo | None = None
    raw_config: str | None = None
    log_queue: Any | None = None
    restart_on_crash: bool = True

    zmq_xpub_url: str | None = None
    zmq_xsub_url: str | None = None

    job_started_queue: multiprocessing.Queue = msgspec.field(
        default_factory=multiprocessing.Queue
    )

    def start(self):
        if self.log_queue:
            root_logger = logging.getLogger()
            root_logger.handlers.clear()
            root_logger.addHandler(QueueHandler(self.log_queue))
            root_logger.setLevel(logging.DEBUG)

        instance = self.daq_job_cls(
            self.config,
            supervisor_info=self.supervisor_info,
            instance_id=self.instance_id,
            raw_config=self.raw_config,
            zmq_xpub_url=self.zmq_xpub_url,
            zmq_xsub_url=self.zmq_xsub_url,
        )
        self.job_started_queue.put(instance.get_job_started_message())

        try:
            instance.start()
        except Exception as e:
            logging.error(
                f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
            )
            raise e

Methods

def start(self)
Expand source code
def start(self):
    if self.log_queue:
        root_logger = logging.getLogger()
        root_logger.handlers.clear()
        root_logger.addHandler(QueueHandler(self.log_queue))
        root_logger.setLevel(logging.DEBUG)

    instance = self.daq_job_cls(
        self.config,
        supervisor_info=self.supervisor_info,
        instance_id=self.instance_id,
        raw_config=self.raw_config,
        zmq_xpub_url=self.zmq_xpub_url,
        zmq_xsub_url=self.zmq_xsub_url,
    )
    self.job_started_queue.put(instance.get_job_started_message())

    try:
        instance.start()
    except Exception as e:
        logging.error(
            f"Error on {self.daq_job_cls.__name__}.start(): {e}", exc_info=True
        )
        raise e