Module enrgdaq.daq.jobs.handle_stats

Classes

class DAQJobHandleStats (config: DAQJobHandleStatsConfig,
**kwargs)
Expand source code
class DAQJobHandleStats(DAQJob):
    """
    Handles statistics for DAQ jobs.

    This class is responsible for consuming and processing DAQ job statistics messages.
    It extracts relevant statistics from the messages and stores them.
    """

    allowed_message_in_types = [DAQJobMessageStats, DAQJobMessageStatsRemote]
    config_type = DAQJobHandleStatsConfig
    config: DAQJobHandleStatsConfig

    _stats: dict[str, DAQJobStatsDict]
    _remote_stats: dict[str, DAQJobRemoteStatsDict]

    def __init__(self, config: DAQJobHandleStatsConfig, **kwargs):
        super().__init__(config, **kwargs)
        self._stats = {}
        self._remote_stats = defaultdict()

    def start(self):
        while True:
            start_time = datetime.now()
            self.consume()
            self._save_stats()
            self._save_remote_stats()
            sleep_for(DAQ_JOB_HANDLE_STATS_SLEEP_INTERVAL_SECONDS, start_time)

    def handle_message(
        self, message: DAQJobMessageStats | DAQJobMessageStatsRemote
    ) -> bool:
        if not super().handle_message(message):
            return False

        # Ignore if the message has no supervisor info
        if not message.daq_job_info or not message.daq_job_info.supervisor_config:
            return True

        if isinstance(message, DAQJobMessageStats):
            self._stats[message.supervisor_id] = message.stats
        elif isinstance(message, DAQJobMessageStatsRemote):
            self._remote_stats[message.supervisor_id] = message.stats
        return True

    def _save_stats(self):
        def datetime_to_str(dt: Optional[datetime]):
            if dt is None:
                return "N/A"
            return get_unix_timestamp_ms(dt)

        def unpack_record(record: DAQJobStatsRecord):
            return [
                datetime_to_str(record.last_updated),
                record.count,
            ]

        keys = [
            "supervisor",
            "daq_job",
            "is_alive",
            "last_message_in_date",
            "message_in_count",
            "last_message_out_date",
            "message_out_count",
            "last_restart_date",
            "restart_count",
        ]
        data_to_send = []

        for supervisor_id, stats in self._stats.items():
            for daq_job_type, msg in stats.items():
                data_to_send.append(
                    [
                        supervisor_id,
                        daq_job_type.__name__,
                        str(msg.is_alive).lower(),
                        *unpack_record(msg.message_in_stats),
                        *unpack_record(msg.message_out_stats),
                        *unpack_record(msg.restart_stats),
                    ]
                )

        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                keys=keys,
                data=data_to_send,
            )
        )

    def _save_remote_stats(self):
        keys = [
            "supervisor",
            "is_alive",
            "last_active",
            "message_in_count",
            "message_in_megabytes",
            "message_out_count",
            "message_out_megabytes",
        ]
        data_to_send = []

        # Combine remote stats from all supervisors
        remote_stats_combined = defaultdict(lambda: SupervisorRemoteStats())
        if (
            self._supervisor_config
            and self._supervisor_config.supervisor_id in self._remote_stats
        ):
            for supervisor_id, remote_stats in self._remote_stats[
                self._supervisor_config.supervisor_id
            ].items():
                remote_stats_combined[supervisor_id] = remote_stats

        for remote_supervisor_id, remote_stats_dict in self._remote_stats.items():
            # For each remote stats dict, combine the values
            for (
                supervisor_id,
                remote_stats_dict_serialized_item,
            ) in remote_stats_dict.items():
                # Skip if the remote supervisor id is the same as the local supervisor id or
                # other supervisors try to overwrite other supervisors
                if supervisor_id != remote_supervisor_id or (
                    self._supervisor_config
                    and self._supervisor_config.supervisor_id == remote_supervisor_id
                ):
                    continue
                # Convert the supervisor remote stats to a dict
                remote_stats_dict_serialized = msgspec.structs.asdict(
                    remote_stats_dict_serialized_item
                )
                # Set each value
                for item, value in remote_stats_dict_serialized.items():
                    if value == 0 or not value:
                        continue
                    setattr(remote_stats_combined[supervisor_id], item, value)
        for supervisor_id, remote_stats in remote_stats_combined.items():
            is_remote_alive = datetime.now() - remote_stats.last_active <= timedelta(
                seconds=DAQ_JOB_HANDLE_STATS_REMOTE_ALIVE_SECONDS
            )

            def _byte_to_mb(x):
                return "{:.3f}".format(x / 1024 / 1024)

            data_to_send.append(
                [
                    supervisor_id,
                    str(is_remote_alive).lower(),
                    remote_stats.last_active,
                    remote_stats.message_in_count,
                    _byte_to_mb(remote_stats.message_in_bytes),
                    remote_stats.message_out_count,
                    _byte_to_mb(remote_stats.message_out_bytes),
                ]
            )
        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                keys=keys,
                data=data_to_send,
                tag="remote",
            )
        )

Handles statistics for DAQ jobs.

This class is responsible for consuming and processing DAQ job statistics messages. It extracts relevant statistics from the messages and stores them.

Ancestors

Class variables

var allowed_message_in_types : list[type[DAQJobMessage]]
var configDAQJobHandleStatsConfig
var config_type : Any

Configuration class for DAQJobHandleStats.

Methods

def start(self)
Expand source code
def start(self):
    while True:
        start_time = datetime.now()
        self.consume()
        self._save_stats()
        self._save_remote_stats()
        sleep_for(DAQ_JOB_HANDLE_STATS_SLEEP_INTERVAL_SECONDS, start_time)

Inherited members

class DAQJobHandleStatsConfig (store_config: DAQJobStoreConfig,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobHandleStatsConfig(StorableDAQJobConfig):
    """Configuration class for DAQJobHandleStats."""

    pass

Configuration class for DAQJobHandleStats.

Ancestors

class DAQJobMessageStats (stats: Dict[type[DAQJob], DAQJobStats],
*,
id: str | None = <factory>,
timestamp: datetime.datetime | None = <factory>,
is_remote: bool = False,
daq_job_info: DAQJobInfo | None = None,
remote_config: DAQRemoteConfig = <factory>)
Expand source code
class DAQJobMessageStats(DAQJobMessage):
    """Message class containing DAQ job statistics."""

    stats: DAQJobStatsDict

Message class containing DAQ job statistics.

Ancestors

Instance variables

var stats : Dict[type[DAQJob], DAQJobStats]
Expand source code
class DAQJobMessageStats(DAQJobMessage):
    """Message class containing DAQ job statistics."""

    stats: DAQJobStatsDict