Module enrgdaq.daq.jobs.benchmark
Classes
class DAQJobBenchmark (config: DAQJobBenchmarkConfig,
**kwargs)-
Expand source code
class DAQJobBenchmark(DAQJob): """ Benchmark job for DAQ system to stress test serialization/deserialization/networking. """ config_type = DAQJobBenchmarkConfig config: DAQJobBenchmarkConfig def __init__(self, config: DAQJobBenchmarkConfig, **kwargs): super().__init__(config, **kwargs) self._payload_size = self.config.payload_size # Pre-allocate numpy array for the payload (reused each message) self._data_array = np.zeros(self._payload_size, dtype=np.float64) def start(self): """ Start the benchmark job, sending as much data as possible. """ i = 0 while True: # Only check for messages every 1000 iterations to maximize production speed # if i % 1000 == 0: self._send_store_message() i += 1 time.sleep(0.00005) def _send_store_message(self): """ Send a store message with a large payload using PyArrow. Uses columnar layout: many rows, few columns. """ timestamp = datetime.now().timestamp() * 1000 # timestamp in ms # Create PyArrow table with many rows # timestamp column: same timestamp for all rows in this batch # value column: the actual payload data table = pa.table( { "timestamp": pa.array( np.full(self._payload_size, timestamp, dtype=np.float64) ), "value": pa.array(np.random.random(self._payload_size)), } ) self._put_message_out( DAQJobMessageStorePyArrow( store_config=self.config.store_config, table=table, ), use_shm=self.config.use_shm, )Benchmark job for DAQ system to stress test serialization/deserialization/networking.
Ancestors
Class variables
var config : DAQJobBenchmarkConfigvar config_type : type[DAQJobConfig]-
Configuration for DAQJobBenchmark.
Attributes
payload_size:int- Number of rows per message.
use_shm:bool- Use shared memory for message passing.
Methods
def start(self)-
Expand source code
def start(self): """ Start the benchmark job, sending as much data as possible. """ i = 0 while True: # Only check for messages every 1000 iterations to maximize production speed # if i % 1000 == 0: self._send_store_message() i += 1 time.sleep(0.00005)Start the benchmark job, sending as much data as possible.
Inherited members
class DAQJobBenchmarkConfig (store_config: DAQJobStoreConfig,
payload_size: int = 1000,
use_shm: bool = False,
*,
daq_job_type: str,
verbosity: LogVerbosity = LogVerbosity.INFO,
daq_job_unique_id: str | None = None,
use_shm_when_possible: bool = True)-
Expand source code
class DAQJobBenchmarkConfig(StorableDAQJobConfig): """ Configuration for DAQJobBenchmark. Attributes: payload_size (int): Number of rows per message. use_shm (bool): Use shared memory for message passing. """ payload_size: int = 1000 use_shm: bool = FalseConfiguration for DAQJobBenchmark.
Attributes
payload_size:int- Number of rows per message.
use_shm:bool- Use shared memory for message passing.
Ancestors
- StorableDAQJobConfig
- DAQJobConfig
- msgspec.Struct
- msgspec._core._StructMixin
Instance variables
var payload_size : int-
Expand source code
class DAQJobBenchmarkConfig(StorableDAQJobConfig): """ Configuration for DAQJobBenchmark. Attributes: payload_size (int): Number of rows per message. use_shm (bool): Use shared memory for message passing. """ payload_size: int = 1000 use_shm: bool = False var use_shm : bool-
Expand source code
class DAQJobBenchmarkConfig(StorableDAQJobConfig): """ Configuration for DAQJobBenchmark. Attributes: payload_size (int): Number of rows per message. use_shm (bool): Use shared memory for message passing. """ payload_size: int = 1000 use_shm: bool = False