Module enrgdaq.daq.jobs.pc_metrics

Classes

class DAQJobPCMetrics (config: DAQJobPCMetricsConfig,
**kwargs)
Expand source code
class DAQJobPCMetrics(DAQJob):
    """
    DAQ job for monitoring the CPU, memory, and disk usage of the PC.
    """

    config_type = DAQJobPCMetricsConfig
    config: DAQJobPCMetricsConfig

    def __init__(self, config: DAQJobPCMetricsConfig, **kwargs):
        super().__init__(config, **kwargs)

    def handle_message(self, message: DAQJobMessage):
        super().handle_message(message)

    def start(self):
        while True:
            start_time = datetime.now()
            self._poll_metrics()
            sleep_for(self.config.poll_interval_seconds, start_time)

    def _poll_metrics(self):
        data = {}
        if PCMetric.CPU in self.config.metrics_to_store:
            cpu_percent = psutil.cpu_percent(interval=None)
            # psutil returns 0 on the first run if we set the interval
            # to None, so skip the initial polling
            if cpu_percent == 0:
                return

            data[PCMetric.CPU] = cpu_percent
        if PCMetric.MEMORY in self.config.metrics_to_store:
            data[PCMetric.MEMORY] = psutil.virtual_memory().percent
        if PCMetric.DISK in self.config.metrics_to_store:
            data[PCMetric.DISK] = psutil.disk_usage("/").percent

        data = {k.value: v for k, v in data.items()}
        self._send_store_message(data)

    def _send_store_message(self, data: dict):
        keys = ["timestamp", *data.keys()]
        values = [get_now_unix_timestamp_ms(), *data.values()]
        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                tag=self._supervisor_config.supervisor_id,
                keys=keys,
                data=[values],
            )
        )

DAQ job for monitoring the CPU, memory, and disk usage of the PC.

Ancestors

Class variables

var configDAQJobPCMetricsConfig
var config_type : Any

Configuration class for DAQJobPCMetrics.

Attributes

metrics_to_store : list[PCMetric]
List of metrics to store. If not specified, all metrics will be stored.
poll_interval_seconds : int
Polling interval in seconds. If not specified, the default polling interval will be used.

Methods

def start(self)
Expand source code
def start(self):
    while True:
        start_time = datetime.now()
        self._poll_metrics()
        sleep_for(self.config.poll_interval_seconds, start_time)

Inherited members

class DAQJobPCMetricsConfig (store_config: DAQJobStoreConfig,
metrics_to_store: list[PCMetric] = <factory>,
poll_interval_seconds: int = 1,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobPCMetricsConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobPCMetrics.

    Attributes:
        metrics_to_store (list[PCMetric]): List of metrics to store. If not specified, all metrics will be stored.
        poll_interval_seconds (int): Polling interval in seconds. If not specified, the default polling interval will be used.
    """

    metrics_to_store: list[PCMetric] = field(default_factory=lambda: list(PCMetric))
    poll_interval_seconds: int = DAQ_JOB_PC_METRICS_QUERY_INTERVAL_SECONDS

Configuration class for DAQJobPCMetrics.

Attributes

metrics_to_store : list[PCMetric]
List of metrics to store. If not specified, all metrics will be stored.
poll_interval_seconds : int
Polling interval in seconds. If not specified, the default polling interval will be used.

Ancestors

Instance variables

var metrics_to_store : list[PCMetric]
Expand source code
class DAQJobPCMetricsConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobPCMetrics.

    Attributes:
        metrics_to_store (list[PCMetric]): List of metrics to store. If not specified, all metrics will be stored.
        poll_interval_seconds (int): Polling interval in seconds. If not specified, the default polling interval will be used.
    """

    metrics_to_store: list[PCMetric] = field(default_factory=lambda: list(PCMetric))
    poll_interval_seconds: int = DAQ_JOB_PC_METRICS_QUERY_INTERVAL_SECONDS
var poll_interval_seconds : int
Expand source code
class DAQJobPCMetricsConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobPCMetrics.

    Attributes:
        metrics_to_store (list[PCMetric]): List of metrics to store. If not specified, all metrics will be stored.
        poll_interval_seconds (int): Polling interval in seconds. If not specified, the default polling interval will be used.
    """

    metrics_to_store: list[PCMetric] = field(default_factory=lambda: list(PCMetric))
    poll_interval_seconds: int = DAQ_JOB_PC_METRICS_QUERY_INTERVAL_SECONDS
class PCMetric (*args, **kwds)
Expand source code
class PCMetric(str, Enum):
    CPU = "cpu"
    MEMORY = "memory"
    DISK = "disk"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var CPU
var DISK
var MEMORY