Module enrgdaq.daq.jobs.benchmark

Classes

class DAQJobBenchmark (config: DAQJobBenchmarkConfig,
**kwargs)
Expand source code
class DAQJobBenchmark(DAQJob):
    """
    Benchmark job for DAQ system to stress test serialization/deserialization/networking.
    """

    config_type = DAQJobBenchmarkConfig
    config: DAQJobBenchmarkConfig

    def __init__(self, config: DAQJobBenchmarkConfig, **kwargs):
        super().__init__(config, **kwargs)
        self._payload_size = self.config.payload_size
        # Pre-allocate numpy array for the payload (reused each message)
        self._data_array = np.zeros(self._payload_size, dtype=np.float64)

    def start(self):
        """
        Start the benchmark job, sending as much data as possible.
        """
        i = 0
        while True:
            # Only check for messages every 1000 iterations to maximize production speed
            # if i % 1000 == 0:

            self._send_store_message()
            i += 1
            time.sleep(0.00005)

    def _send_store_message(self):
        """
        Send a store message with a large payload using PyArrow.
        Uses columnar layout: many rows, few columns.
        """
        timestamp = datetime.now().timestamp() * 1000  # timestamp in ms

        # Create PyArrow table with many rows
        # timestamp column: same timestamp for all rows in this batch
        # value column: the actual payload data
        table = pa.table(
            {
                "timestamp": pa.array(
                    np.full(self._payload_size, timestamp, dtype=np.float64)
                ),
                "value": pa.array(np.random.random(self._payload_size)),
            }
        )

        self._put_message_out(
            DAQJobMessageStorePyArrow(
                store_config=self.config.store_config,
                table=table,
            ),
            use_shm=self.config.use_shm,
        )

Benchmark job for DAQ system to stress test serialization/deserialization/networking.

Ancestors

Class variables

var configDAQJobBenchmarkConfig
var config_type : type[DAQJobConfig]

Configuration for DAQJobBenchmark.

Attributes

payload_size : int
Number of rows per message.
use_shm : bool
Use shared memory for message passing.

Methods

def start(self)
Expand source code
def start(self):
    """
    Start the benchmark job, sending as much data as possible.
    """
    i = 0
    while True:
        # Only check for messages every 1000 iterations to maximize production speed
        # if i % 1000 == 0:

        self._send_store_message()
        i += 1
        time.sleep(0.00005)

Start the benchmark job, sending as much data as possible.

Inherited members

class DAQJobBenchmarkConfig (store_config: DAQJobStoreConfig,
payload_size: int = 1000,
use_shm: bool = False,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)
Expand source code
class DAQJobBenchmarkConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobBenchmark.

    Attributes:
        payload_size (int): Number of rows per message.
        use_shm (bool): Use shared memory for message passing.
    """

    payload_size: int = 1000
    use_shm: bool = False

Configuration for DAQJobBenchmark.

Attributes

payload_size : int
Number of rows per message.
use_shm : bool
Use shared memory for message passing.

Ancestors

Instance variables

var payload_size : int
Expand source code
class DAQJobBenchmarkConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobBenchmark.

    Attributes:
        payload_size (int): Number of rows per message.
        use_shm (bool): Use shared memory for message passing.
    """

    payload_size: int = 1000
    use_shm: bool = False
var use_shm : bool
Expand source code
class DAQJobBenchmarkConfig(StorableDAQJobConfig):
    """
    Configuration for DAQJobBenchmark.

    Attributes:
        payload_size (int): Number of rows per message.
        use_shm (bool): Use shared memory for message passing.
    """

    payload_size: int = 1000
    use_shm: bool = False