Module enrgdaq.daq.jobs.test_job

Classes

class DAQJobTest (config: DAQJobConfig,
supervisor_info: SupervisorInfo | None = None,
instance_id: int | None = None,
message_in: Any = None,
message_out: Any = None,
raw_config: str | None = None,
zmq_xpub_url: str | None = None,
zmq_xsub_url: str | None = None)
Expand source code
class DAQJobTest(DAQJob):
    """
    A test job for the DAQ system that generates random data.

    Attributes:
        config_type (type): The configuration class type.
        config (DAQJobTestConfig): The configuration instance.
    """

    config_type = DAQJobTestConfig
    config: DAQJobTestConfig

    def start(self):
        """
        Start the DAQ job, continuously consuming data and sending store messages.
        """
        while not self._has_been_freed:
            self._send_store_message()

            time.sleep(1)

    def _send_store_message(self):
        """
        Send a store message with random data.
        """

        def get_int():
            """
            Generate a random integer within the configured range.

            Returns:
                int: A random integer.
            """
            return randint(self.config.rand_min, self.config.rand_max)

        self._put_message_out(
            DAQJobMessageStorePyArrow(
                store_config=self.config.store_config,
                table=pa.Table.from_arrays(
                    [
                        pa.array([int(datetime.now().timestamp() * 1000)]),
                        pa.array([get_int()]),
                        pa.array([get_int()]),
                        pa.array([get_int()]),
                    ],
                    names=["timestamp", "A", "B", "C"],
                ),
            ),
            use_shm=True,
        )

A test job for the DAQ system that generates random data.

Attributes

config_type : type
The configuration class type.
config : DAQJobTestConfig
The configuration instance.

Ancestors

Class variables

var configDAQJobTestConfig
var config_type : type[DAQJobConfig]

Configuration class for DAQJobTest.

Attributes

rand_min : int
Minimum random integer value.
rand_max : int
Maximum random integer value.

Methods

def start(self)
Expand source code
def start(self):
    """
    Start the DAQ job, continuously consuming data and sending store messages.
    """
    while not self._has_been_freed:
        self._send_store_message()

        time.sleep(1)

Start the DAQ job, continuously consuming data and sending store messages.

Inherited members

class DAQJobTestConfig (store_config: DAQJobStoreConfig,
rand_min: int,
rand_max: int,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True,
topics_to_subscribe: list[str] = <factory>)
Expand source code
class DAQJobTestConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobTest.

    Attributes:
        rand_min (int): Minimum random integer value.
        rand_max (int): Maximum random integer value.
    """

    rand_min: int
    rand_max: int

Configuration class for DAQJobTest.

Attributes

rand_min : int
Minimum random integer value.
rand_max : int
Maximum random integer value.

Ancestors

Instance variables

var rand_max : int
Expand source code
class DAQJobTestConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobTest.

    Attributes:
        rand_min (int): Minimum random integer value.
        rand_max (int): Maximum random integer value.
    """

    rand_min: int
    rand_max: int
var rand_min : int
Expand source code
class DAQJobTestConfig(StorableDAQJobConfig):
    """
    Configuration class for DAQJobTest.

    Attributes:
        rand_min (int): Minimum random integer value.
        rand_max (int): Maximum random integer value.
    """

    rand_min: int
    rand_max: int