Module enrgdaq.daq.jobs.handle_traces

Classes

class DAQJobHandleTraces (config: DAQJobHandleTracesConfig,
supervisor_info: SupervisorInfo,
**kwargs)
Expand source code
class DAQJobHandleTraces(DAQJob):
    """
    Handles trace events for DAQ jobs.

    This class is responsible for consuming and processing DAQ job trace messages.
    It aggregates trace events by message_id and outputs them to storage.
    """

    allowed_message_in_types = [DAQJobMessageTraceReport]
    config_type = DAQJobHandleTracesConfig
    config: DAQJobHandleTracesConfig

    _traces: dict[str, list[DAQJobMessageTraceEvent]]

    def __init__(
        self,
        config: DAQJobHandleTracesConfig,
        supervisor_info: SupervisorInfo,
        **kwargs,
    ):
        self.topics_to_subscribe.append(Topic.traces(supervisor_info.supervisor_id))
        super().__init__(config, supervisor_info, **kwargs)
        self._traces = defaultdict(list)

    def start(self):
        while not self._has_been_freed:
            start_time = datetime.now()
            self._save_traces()
            sleep_for(DAQ_JOB_HANDLE_TRACES_SLEEP_INTERVAL_SECONDS, start_time)

    def handle_message(
        self,
        message: DAQJobMessageTraceReport,
    ) -> bool:
        if not super().handle_message(message):
            return False

        # Aggregate trace events by message_id
        for event in message.events:
            self._traces[event.message_id].append(event)

        return True

    def _save_traces(self):
        if not self._traces:
            return

        # Raw trace events output
        trace_keys = [
            "message_id",
            "message_type",
            "event_type",
            "topics",
            "timestamp",
            "size_bytes",
            "source_job",
            "source_supervisor",
        ]
        trace_data = []

        # Latency stats output (for messages with both sent and received)
        latency_keys = [
            "message_id",
            "message_type",
            "source_job",
            "dest_job",
            "source_supervisor",
            "dest_supervisor",
            "sent_timestamp",
            "received_timestamp",
            "latency_ms",
            "size_bytes",
        ]
        latency_data = []

        for message_id, events in self._traces.items():
            # Collect raw trace data
            for event in events:
                trace_data.append(
                    [
                        message_id,
                        event.message_type,
                        event.event_type,
                        ",".join(event.topics),
                        get_unix_timestamp_ms(event.timestamp),
                        event.size_bytes,
                        event.source_job,
                        event.source_supervisor,
                    ]
                )

            # Calculate latency if we have both sent and received events
            sent_events = [e for e in events if e.event_type == "sent"]
            received_events = [e for e in events if e.event_type == "received"]

            for sent in sent_events:
                for received in received_events:
                    latency_ms = (
                        received.timestamp - sent.timestamp
                    ).total_seconds() * 1000
                    latency_data.append(
                        [
                            message_id,
                            sent.message_type,
                            sent.source_job,
                            received.source_job,
                            sent.source_supervisor,
                            received.source_supervisor,
                            get_unix_timestamp_ms(sent.timestamp),
                            get_unix_timestamp_ms(received.timestamp),
                            f"{latency_ms:.2f}",
                            received.size_bytes,
                        ]
                    )

        # Output raw traces
        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                keys=trace_keys,
                data=trace_data,
            )
        )

        # Output latency stats if we have any
        if latency_data:
            self._put_message_out(
                DAQJobMessageStoreTabular(
                    store_config=self.config.store_config,
                    keys=latency_keys,
                    data=latency_data,
                    tag="latency",
                )
            )

        # Broadcast combined traces for cross-supervisor visibility
        self._put_message_out(
            DAQJobMessageCombinedTraces(
                traces=dict(self._traces),
                topics={Topic.traces_combined(self.supervisor_id)},
            )
        )

        # Clear traces after sending
        self._traces.clear()

Handles trace events for DAQ jobs.

This class is responsible for consuming and processing DAQ job trace messages. It aggregates trace events by message_id and outputs them to storage.

Ancestors

Class variables

var allowed_message_in_types
var configDAQJobHandleTracesConfig
var config_type : type[DAQJobConfig]

Configuration class for DAQJobHandleTraces.

Methods

def start(self)
Expand source code
def start(self):
    while not self._has_been_freed:
        start_time = datetime.now()
        self._save_traces()
        sleep_for(DAQ_JOB_HANDLE_TRACES_SLEEP_INTERVAL_SECONDS, start_time)

Inherited members

class DAQJobHandleTracesConfig (store_config: DAQJobStoreConfig,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)
Expand source code
class DAQJobHandleTracesConfig(StorableDAQJobConfig):
    """Configuration class for DAQJobHandleTraces."""

    pass

Configuration class for DAQJobHandleTraces.

Ancestors

class DAQJobMessageCombinedTraces (traces: dict[str, list[DAQJobMessageTraceEvent]],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
topics: set[str] = <factory>)
Expand source code
class DAQJobMessageCombinedTraces(DAQJobMessage):
    """Message class containing combined trace events from a supervisor."""

    traces: dict[str, list[DAQJobMessageTraceEvent]]  # keyed by message_id

Message class containing combined trace events from a supervisor.

Ancestors

Instance variables

var traces : dict[str, list[DAQJobMessageTraceEvent]]
Expand source code
class DAQJobMessageCombinedTraces(DAQJobMessage):
    """Message class containing combined trace events from a supervisor."""

    traces: dict[str, list[DAQJobMessageTraceEvent]]  # keyed by message_id