Module enrgdaq.daq.jobs.test_job

Classes

class DAQJobTest (config: Any,
supervisor_config: SupervisorConfig | None = None)
Expand source code
class DAQJobTest(DAQJob):
    """
    A test job for the DAQ system that generates random data.

    Attributes:
        config_type (type): The configuration class type.
        device (N1081B): The device used for data acquisition.
        config (DAQJobTestConfig): The configuration instance.
    """

    config_type = DAQJobTestConfig
    device: N1081B
    config: DAQJobTestConfig

    def start(self):
        """
        Start the DAQ job, continuously consuming data and sending store messages.
        """
        while True:
            self.consume()
            self._send_store_message()

            time.sleep(1)

    def _send_store_message(self):
        """
        Send a store message with random data.
        """

        def get_int():
            """
            Generate a random integer within the configured range.

            Returns:
                int: A random integer.
            """
            return randint(self.config.rand_min, self.config.rand_max)

        self._put_message_out(
            DAQJobMessageStoreTabular(
                store_config=self.config.store_config,
                keys=["timestamp", "A", "B", "C"],
                data=[
                    [datetime.now().timestamp() * 1000, get_int(), get_int(), get_int()]
                ],
            )
        )

A test job for the DAQ system that generates random data.

Attributes

config_type : type
The configuration class type.
device : N1081B
The device used for data acquisition.
config : DAQJobTestConfig
The configuration instance.

Ancestors

Class variables

var configDAQJobTestConfig
var config_type : Any

Configuration class for DAQJobTest.

Attributes

rand_min : int
Minimum random integer value.
rand_max : int
Maximum random integer value.
var device : N1081B.N1081B

Methods

def start(self)
Expand source code
def start(self):
    """
    Start the DAQ job, continuously consuming data and sending store messages.
    """
    while True:
        self.consume()
        self._send_store_message()

        time.sleep(1)

Start the DAQ job, continuously consuming data and sending store messages.

Inherited members

class DAQJobTestConfig (store_config: DAQJobStoreConfig,
rand_min: int,
rand_max: int,
*,
verbosity: LogVerbosity = LogVerbosity.INFO,
remote_config: DAQRemoteConfig | None = <factory>,
daq_job_type: str)
Expand source code
class DAQJobTestConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobTest.

    Attributes:
        rand_min (int): Minimum random integer value.
        rand_max (int): Maximum random integer value.
    """

    rand_min: int
    rand_max: int

Configuration class for DAQJobTest.

Attributes

rand_min : int
Minimum random integer value.
rand_max : int
Maximum random integer value.

Ancestors

Instance variables

var rand_max : int
Expand source code
class DAQJobTestConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobTest.

    Attributes:
        rand_min (int): Minimum random integer value.
        rand_max (int): Maximum random integer value.
    """

    rand_min: int
    rand_max: int
var rand_min : int
Expand source code
class DAQJobTestConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobTest.

    Attributes:
        rand_min (int): Minimum random integer value.
        rand_max (int): Maximum random integer value.
    """

    rand_min: int
    rand_max: int