Module enrgdaq.tools.benchmark_runner
ENRGDAQ Benchmark Script
This script benchmarks the ENRGDAQ system by running multiple supervisor instances with benchmark jobs that stress test message throughput, serialization, and networking.
Usage
python benchmark.py [–clients N] [–payload-size N] [–duration SECONDS]
Example
python benchmark.py –clients 5 –payload-size 10000 –duration 30
Functions
def cleanup_supervisor(supervisor: Supervisor)-
Expand source code
def cleanup_supervisor(supervisor: Supervisor): """Clean up supervisor and all its child processes.""" try: supervisor.stop() # Give a moment for clean shutdown time.sleep(0.1) # Force kill any remaining DAQ job processes for process in supervisor.daq_job_processes: if process.process and process.process.is_alive() and process.process.pid: kill_process_tree(process.process.pid) except Exception: passClean up supervisor and all its child processes.
def create_supervisor_config(supervisor_id: str) ‑> SupervisorConfig-
Expand source code
def create_supervisor_config(supervisor_id: str) -> SupervisorConfig: """Create a SupervisorConfig instance.""" return SupervisorConfig(info=create_supervisor_info(supervisor_id))Create a SupervisorConfig instance.
def create_supervisor_info(supervisor_id: str) ‑> SupervisorInfo-
Expand source code
def create_supervisor_info(supervisor_id: str) -> SupervisorInfo: """Create a SupervisorInfo instance.""" return SupervisorInfo(supervisor_id=supervisor_id)Create a SupervisorInfo instance.
def kill_process_tree(pid: int, sig=15)-
Expand source code
def kill_process_tree(pid: int, sig=signal.SIGTERM): """Kill a process and all its children using psutil.""" try: parent = psutil.Process(pid) children = parent.children(recursive=True) # Kill children first for child in children: try: child.send_signal(sig) except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Kill parent try: parent.send_signal(sig) except (psutil.NoSuchProcess, psutil.AccessDenied): pass # Wait for processes to terminate gone, alive = psutil.wait_procs(children + [parent], timeout=1) # Force kill any remaining for p in alive: try: p.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): pass except psutil.NoSuchProcess: passKill a process and all its children using psutil.
def parse_args() ‑> BenchmarkConfig-
Expand source code
def parse_args() -> BenchmarkConfig: """Parse command line arguments.""" parser = argparse.ArgumentParser( description="ENRGDAQ Benchmark Script", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python benchmark.py # Run with defaults python benchmark.py --clients 10 # Run with 10 clients python benchmark.py --payload-size 50000 # Larger payloads python benchmark.py --duration 120 # Run for 2 minutes """, ) parser.add_argument( "--clients", type=int, default=DEFAULT_NUM_CLIENTS, help=f"Number of benchmark clients (default: {DEFAULT_NUM_CLIENTS})", ) parser.add_argument( "--payload-size", type=int, default=DEFAULT_PAYLOAD_SIZE, help=f"Number of values per message (default: {DEFAULT_PAYLOAD_SIZE})", ) parser.add_argument( "--duration", type=int, default=DEFAULT_DURATION_SECONDS, help=f"Benchmark duration in seconds (default: {DEFAULT_DURATION_SECONDS})", ) parser.add_argument( "--stats-interval", type=float, default=DEFAULT_STATS_INTERVAL_SECONDS, help=f"Stats collection interval in seconds (default: {DEFAULT_STATS_INTERVAL_SECONDS})", ) parser.add_argument( "--zmq-xsub", type=str, default=DEFAULT_ZMQ_XSUB_URL, help=f"ZMQ XSUB URL (default: {DEFAULT_ZMQ_XSUB_URL})", ) parser.add_argument( "--zmq-xpub", type=str, default=DEFAULT_ZMQ_XPUB_URL, help=f"ZMQ XPUB URL (default: {DEFAULT_ZMQ_XPUB_URL})", ) parser.add_argument( "--no-void-data", action="store_true", help="Don't void memory store data (uses more memory)", ) parser.add_argument( "--use-shm", action="store_true", default=True, help="Use SHM for zero-copy" ) parser.add_argument( "--no-shm", action="store_false", dest="use_shm", help="Disable SHM" ) parser.add_argument( "--use-memory-store", action="store_true", help="Use Memory store instead of ROOT store", ) args = parser.parse_args() return BenchmarkConfig( num_clients=args.clients, payload_size=args.payload_size, duration_seconds=args.duration, stats_interval_seconds=args.stats_interval, zmq_xsub_url=args.zmq_xsub, zmq_xpub_url=args.zmq_xpub, void_memory_data=not args.no_void_data, use_memory_store=args.use_memory_store, use_shm=args.use_shm, )Parse command line arguments.
def run_client_supervisor(client_id: int,
config: BenchmarkConfig,
stop_flag:>,
ready_event:>,
shm_bytes_counter:>) -
Expand source code
def run_client_supervisor( client_id: int, config: BenchmarkConfig, stop_flag: Value, ready_event: Event, shm_bytes_counter: Array, ): """Run a client supervisor that generates benchmark data.""" # Create temporary config directory for the supervisor temp_config_dir = tempfile.mkdtemp(prefix="enrgdaq_benchmark_client_") supervisor_id = f"benchmark_client_{client_id}" supervisor_info = create_supervisor_info(supervisor_id) supervisor_config = create_supervisor_config(supervisor_id) supervisor_config.ring_buffer_size_mb = 1024 # 1GB for clients supervisor_config.ring_buffer_slot_size_kb = 10 * 1024 # 10MB slots daq_job_processes = [ # Benchmark job that generates data _create_daq_job_process( DAQJobBenchmark, DAQJobBenchmarkConfig( daq_job_type="DAQJobBenchmark", payload_size=config.payload_size, use_shm=config.use_shm, store_config=DAQJobStoreConfig(memory=DAQJobStoreConfigMemory()) if config.use_memory_store else DAQJobStoreConfig( root=DAQJobStoreConfigROOT( file_path="test.root", add_date=False, tree_name="benchmark_tree", ) ), ), supervisor_info, ), # Remote job for sending to main supervisor _create_daq_job_process( DAQJobRemote, DAQJobRemoteConfig( daq_job_type="DAQJobRemote", zmq_proxy_sub_urls=[], zmq_proxy_pub_url=config.zmq_xsub_url, ), supervisor_info, ), ] supervisor = Supervisor( config=supervisor_config, daq_job_processes=daq_job_processes, daq_job_config_path=temp_config_dir, ) # Register cleanup on exit atexit.register(cleanup_supervisor, supervisor) # Initialize supervisor and signal ready supervisor.init() ready_event.set() # Signal that this client is ready run_supervisor_with_stats( supervisor, config, None, stop_flag, collect_stats=False, skip_init=True, shm_bytes_counter=shm_bytes_counter, client_id=client_id, )Run a client supervisor that generates benchmark data.
def run_main_supervisor(config: BenchmarkConfig,
stats_queue: Any,
stop_flag:>,
client_shm_bytes:>) -
Expand source code
def run_main_supervisor( config: BenchmarkConfig, stats_queue: Any, stop_flag: Value, client_shm_bytes: Array, ): """Run the main supervisor that collects stats and runs the proxy.""" # Create temporary config directory for the supervisor temp_config_dir = tempfile.mkdtemp(prefix="enrgdaq_benchmark_") supervisor_id = "benchmark_supervisor" supervisor_info = create_supervisor_info(supervisor_id) supervisor_config = create_supervisor_config(supervisor_id) supervisor_config.ring_buffer_size_mb = 1024 supervisor_config.ring_buffer_slot_size_kb = 10 * 1024 # Create DAQ job processes using the proper factory function daq_job_processes = [] # Main store - either Memory (fast, for testing) or ROOT (slow, for production) if config.use_memory_store: daq_job_processes.append( _create_daq_job_process( DAQJobStoreMemory, DAQJobStoreMemoryConfig( daq_job_type="DAQJobStoreMemory", void_data=config.void_memory_data, ), supervisor_info, ) ) else: daq_job_processes.append( _create_daq_job_process( DAQJobStoreROOT, DAQJobStoreROOTConfig( daq_job_type="DAQJobStoreROOT", verbosity=LogVerbosity.DEBUG ), supervisor_info, ) ) # CSV store for stats output daq_job_processes.append( _create_daq_job_process( DAQJobStoreCSV, DAQJobStoreCSVConfig(daq_job_type="DAQJobStoreCSV"), supervisor_info, ) ) # Remote job for receiving from clients daq_job_processes.append( _create_daq_job_process( DAQJobRemote, DAQJobRemoteConfig( daq_job_type="DAQJobRemote", zmq_proxy_sub_urls=[config.zmq_xpub_url], ), supervisor_info, ) ) # Stats handler daq_job_processes.append( _create_daq_job_process( DAQJobHandleStats, DAQJobHandleStatsConfig( daq_job_type="DAQJobHandleStats", store_config=DAQJobStoreConfig( csv=DAQJobStoreConfigCSV( file_path=config.output_stats_csv, overwrite=True, ), ), ), supervisor_info, ) ) # Remote proxy (XSUB/XPUB) daq_job_processes.append( _create_daq_job_process( DAQJobRemoteProxy, DAQJobRemoteProxyConfig( daq_job_type="DAQJobRemoteProxy", zmq_xsub_url=config.zmq_xsub_url, zmq_xpub_url=config.zmq_xpub_url, ), supervisor_info, ) ) supervisor = Supervisor( config=supervisor_config, daq_job_processes=daq_job_processes, daq_job_config_path=temp_config_dir, ) # Register cleanup on exit atexit.register(cleanup_supervisor, supervisor) run_supervisor_with_stats( supervisor, config, stats_queue, stop_flag, client_shm_bytes=client_shm_bytes )Run the main supervisor that collects stats and runs the proxy.
def run_supervisor_with_stats(supervisor: Supervisor,
config: BenchmarkConfig,
stats_queue: Any,
stop_flag:>,
collect_stats: bool = True,
skip_init: bool = False,
shm_bytes_counter:> | None = None,
client_id: int | None = None,
client_shm_bytes:> | None = None) -
Expand source code
def run_supervisor_with_stats( supervisor: Supervisor, config: BenchmarkConfig, stats_queue: Any, stop_flag: Value, collect_stats: bool = True, skip_init: bool = False, shm_bytes_counter: Optional[Array] = None, client_id: Optional[int] = None, client_shm_bytes: Optional[Array] = None, ): """Run a supervisor and optionally collect stats.""" assert supervisor.config is not None if not skip_init: supervisor.init() # Start supervisor in a separate thread supervisor_thread = Thread(target=supervisor.run, daemon=True) supervisor_thread.start() try: if not collect_stats or stats_queue is None: # Just keep the process alive while not stop_flag.value: time.sleep(0.5) else: # Collect and report stats last_stats: Optional[dict] = None last_iteration = datetime.now() while not stop_flag.value: # Get stats from remote stats dict stats_list = [ v for k, v in supervisor.daq_job_remote_stats.items() if k == supervisor.config.info.supervisor_id ] # Calculate current stats msg_in_out_mb = ( sum([x.message_in_bytes + x.message_out_bytes for x in stats_list]) / 10**6 if stats_list else 0.0 ) msg_in_count = ( sum([x.message_in_count for x in stats_list]) if stats_list else 0 ) msg_out_count = ( sum([x.message_out_count for x in stats_list]) if stats_list else 0 ) # Calculate queue sizes (macOS doesn't support qsize, so use fallback) try: avg_queue_size = fmean( [ x.message_out.qsize() + x.message_in.qsize() for x in supervisor.daq_job_processes ] ) except (NotImplementedError, Exception): avg_queue_size = 0.0 # Calculate active job count active_job_count = len( [ x for x in supervisor.daq_job_processes if x.process and x.process.is_alive() ] ) # Calculate MB/s now = datetime.now() elapsed = (now - last_iteration).total_seconds() if last_stats and elapsed > 0: mb_diff = msg_in_out_mb - last_stats["msg_in_out_mb"] msg_in_out_mb_per_s = mb_diff / elapsed else: msg_in_out_mb_per_s = 0.0 # Calculate CPU and Memory usage cpu_usage = sum( [ stats.resource_stats.cpu_percent for stats in supervisor.daq_job_stats.values() ] ) rss_mb_total = sum( [ stats.resource_stats.rss_mb for stats in supervisor.daq_job_stats.values() ] ) # Calculate Latency (max of p95/p99 across jobs to be conservative) p95_latencies = [ stats.latency_stats.p95_ms for stats in supervisor.daq_job_stats.values() if stats.latency_stats.count > 0 ] p99_latencies = [ stats.latency_stats.p99_ms for stats in supervisor.daq_job_stats.values() if stats.latency_stats.count > 0 ] latency_p95 = max(p95_latencies) if p95_latencies else 0.0 latency_p99 = max(p99_latencies) if p99_latencies else 0.0 # Calculate data throughput from message count (more reliable than ring buffer stats) # Each message contains payload_size * 2 float64 columns (timestamp + value) = 16 bytes/row msg_size_bytes = config.payload_size * 16 if last_stats and elapsed > 0: msg_diff = msg_in_count - last_stats.get("msg_in_count", 0) data_mb_per_s = (msg_diff * msg_size_bytes) / elapsed / 10**6 else: data_mb_per_s = 0.0 total_data_mb = msg_in_count * msg_size_bytes / 10**6 current_stats = { "timestamp": now.isoformat(), "supervisor_id": supervisor.config.info.supervisor_id, "msg_in_out_mb": msg_in_out_mb, "msg_in_count": msg_in_count, "msg_out_count": msg_out_count, "msg_in_out_mb_per_s": msg_in_out_mb_per_s, "avg_queue_size": avg_queue_size, "active_job_count": active_job_count, "cpu_usage_percent": cpu_usage, "rss_mb": rss_mb_total, "latency_p95_ms": latency_p95, "latency_p99_ms": latency_p99, "shm_bytes_written": int(total_data_mb * 10**6), "shm_mb_per_s": data_mb_per_s, } try: stats_queue.put_nowait(current_stats) except Exception: pass last_stats = current_stats last_iteration = now time.sleep(config.stats_interval_seconds) finally: # Always clean up the supervisor cleanup_supervisor(supervisor)Run a supervisor and optionally collect stats.
Classes
class BenchmarkConfig (num_clients: int = 5,
payload_size: int = 1000,
duration_seconds: int = 60,
stats_interval_seconds: float = 1,
zmq_xsub_url: str = 'tcp://localhost:10001',
zmq_xpub_url: str = 'tcp://localhost:10002',
output_stats_csv: str = 'benchmark_stats.csv',
output_remote_stats_csv: str = 'benchmark_remote_stats.csv',
void_memory_data: bool = True,
use_memory_store: bool = False,
use_shm: bool = True)-
Expand source code
@dataclass class BenchmarkConfig: """Configuration for benchmark run.""" num_clients: int = DEFAULT_NUM_CLIENTS payload_size: int = DEFAULT_PAYLOAD_SIZE duration_seconds: int = DEFAULT_DURATION_SECONDS stats_interval_seconds: float = DEFAULT_STATS_INTERVAL_SECONDS zmq_xsub_url: str = DEFAULT_ZMQ_XSUB_URL zmq_xpub_url: str = DEFAULT_ZMQ_XPUB_URL output_stats_csv: str = "benchmark_stats.csv" output_remote_stats_csv: str = "benchmark_remote_stats.csv" void_memory_data: bool = True use_memory_store: bool = False use_shm: bool = TrueConfiguration for benchmark run.
Instance variables
var duration_seconds : intvar num_clients : intvar output_remote_stats_csv : strvar output_stats_csv : strvar payload_size : intvar stats_interval_seconds : floatvar use_memory_store : boolvar use_shm : boolvar void_memory_data : boolvar zmq_xpub_url : strvar zmq_xsub_url : str
class BenchmarkRunner (config: BenchmarkConfig)-
Expand source code
class BenchmarkRunner: """Runs the ENRGDAQ benchmark and collects statistics.""" def __init__(self, config: BenchmarkConfig): self.config = config self._stats_queue: Any = _create_queue() self._stop_flag = Value("b", False) self._processes: list[Process] = [] self._stats_history: list[BenchmarkStats] = [] self._main_pid = os.getpid() def _dict_to_stats(self, d: dict) -> BenchmarkStats: """Convert dictionary to BenchmarkStats.""" return BenchmarkStats( timestamp=datetime.fromisoformat(d["timestamp"]), supervisor_id=d["supervisor_id"], msg_in_out_mb=d["msg_in_out_mb"], msg_in_count=d["msg_in_count"], msg_out_count=d["msg_out_count"], msg_in_out_mb_per_s=d["msg_in_out_mb_per_s"], avg_queue_size=d["avg_queue_size"], active_job_count=d["active_job_count"], cpu_usage_percent=d.get("cpu_usage_percent", 0.0), rss_mb=d.get("rss_mb", 0.0), latency_p95_ms=d.get("latency_p95_ms", 0.0), latency_p99_ms=d.get("latency_p99_ms", 0.0), shm_bytes_written=d.get("shm_bytes_written", 0), shm_mb_per_s=d.get("shm_mb_per_s", 0.0), ) def _print_stats(self, stats: BenchmarkStats): """Print statistics to console.""" print( f"[{stats.timestamp.strftime('%H:%M:%S')}] " f"SHM: {stats.shm_mb_per_s:7.2f} MB/s | " f"CPU: {stats.cpu_usage_percent:5.1f}% | " f"p95 Latency: {stats.latency_p95_ms:5.2f}ms | " f"Active Jobs: {stats.active_job_count:3d}" ) def _handle_signal(self, signum, frame): """Handle termination signals.""" print("\nReceived termination signal, stopping...") self._stop_flag.value = True def _cleanup_all_processes(self): """Forcefully clean up all child processes using psutil.""" print("\nTerminating processes...") # First, signal all processes to stop gracefully self._stop_flag.value = True # Give processes time to clean up their children time.sleep(0.5) # Kill entire process tree for each child process for p in self._processes: if p.pid: kill_process_tree(p.pid) # Also terminate using Process API as backup for p in self._processes: try: if p.is_alive(): p.terminate() except Exception: pass # Wait for processes to terminate for p in self._processes: try: p.join(timeout=1) except Exception: pass # Force kill any remaining processes for p in self._processes: try: if p.is_alive(): p.kill() except Exception: pass def run(self): """Run the benchmark.""" # Set up signal handlers signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) # Register cleanup on exit atexit.register(self._cleanup_all_processes) print("=" * 80) print("ENRGDAQ Benchmark") print("=" * 80) print("Configuration:") print(f" - Clients: {self.config.num_clients}") print(f" - Payload Size: {self.config.payload_size} values/message") print(f" - Duration: {self.config.duration_seconds} seconds") print(f" - ZMQ XSUB URL: {self.config.zmq_xsub_url}") print(f" - ZMQ XPUB URL: {self.config.zmq_xpub_url}") print("=" * 80) print() # Clean up any existing output files from previous runs output_files_to_clean = ["out/test.root"] for output_file in output_files_to_clean: if os.path.exists(output_file): os.remove(output_file) print(f"Removed existing output file: {output_file}") # Create shared array to track client ring buffer bytes (one c_longlong per client) self._client_shm_bytes = Array("q", self.config.num_clients) # 'q' = c_longlong # Start main supervisor process print("Starting main supervisor...") main_process = Process( target=run_main_supervisor, args=( self.config, self._stats_queue, self._stop_flag, self._client_shm_bytes, ), ) main_process.start() self._processes.append(main_process) # Give main supervisor time to start (ZMQ needs to bind first) time.sleep(1) # Create ready events for each client ready_events = [Event() for _ in range(self.config.num_clients)] # Start client processes print(f"Starting {self.config.num_clients} client(s)...") for i in range(self.config.num_clients): client_process = Process( target=run_client_supervisor, args=( i, self.config, self._stop_flag, ready_events[i], self._client_shm_bytes, ), ) client_process.start() self._processes.append(client_process) # Wait for all clients to signal ready (with timeout) print("Waiting for clients to be ready...") all_ready = all(event.wait(timeout=30) for event in ready_events) if not all_ready: print("WARNING: Not all clients signaled ready within timeout") print() print("Benchmark running... (waiting for first data)") print("-" * 80) # Timer starts when first data arrives, not now start_time: datetime | None = None end_time_seconds = self.config.duration_seconds try: while not self._stop_flag.value: # Check for stats in queue try: stats_dict = self._stats_queue.get(timeout=0.5) stats = self._dict_to_stats(stats_dict) # Start timer when first data arrives if start_time is None and stats.msg_in_count > 0: start_time = datetime.now() print("First data received, starting timer...") self._stats_history.append(stats) self._print_stats(stats) except Exception: pass # Check duration (only if timer has started) if start_time is not None: elapsed = (datetime.now() - start_time).total_seconds() if elapsed >= end_time_seconds: print(f"\nDuration of {end_time_seconds}s reached, stopping...") break finally: self._stop_flag.value = True # Print summary self._print_summary() # Clean up all processes self._cleanup_all_processes() # Unregister atexit since we've already cleaned up try: atexit.unregister(self._cleanup_all_processes) except Exception: pass def _print_summary(self): """Print benchmark summary statistics.""" # Filter to stats with actual data, skipping the first 3 seconds of warmup data_stats = [s for s in self._stats_history if s.msg_in_count > 0] if len(data_stats) > 3: data_stats = data_stats[3:] if not data_stats: print("\nNo statistics collected.") return print() print("=" * 80) print("Benchmark Summary") print("=" * 80) # Calculate duration from first data to last data total_duration = ( data_stats[-1].timestamp - data_stats[0].timestamp ).total_seconds() if total_duration <= 0: total_duration = 1.0 avg_throughput = fmean([s.msg_in_out_mb_per_s for s in data_stats]) # max_throughput = max([s.msg_in_out_mb_per_s for s in data_stats]) avg_shm_throughput = fmean([s.shm_mb_per_s for s in data_stats]) max_shm_throughput = max([s.shm_mb_per_s for s in data_stats]) total_shm_mb = data_stats[-1].shm_bytes_written / 10**6 # total_mb = data_stats[-1].msg_in_out_mb total_msgs = data_stats[-1].msg_in_count avg_queue = fmean([s.avg_queue_size for s in data_stats]) print(f"Duration: {total_duration:.1f} seconds") print(f"Avg SHM Throughput: {avg_shm_throughput:.2f} MB/s") print(f"Peak SHM Throughput: {max_shm_throughput:.2f} MB/s") print(f"Total SHM Data: {total_shm_mb:.2f} MB") print(f"ZMQ Throughput: {avg_throughput:.2f} MB/s (handles only)") print(f"Total Messages: {total_msgs:,}") print(f"Average Queue Size: {avg_queue:.1f}") print(f"Messages/Second: {total_msgs / total_duration:,.0f}") print( f"Avg CPU Usage: {fmean([s.cpu_usage_percent for s in data_stats]):.1f}%" ) print( f"Avg p95 Latency: {fmean([s.latency_p95_ms for s in data_stats]):.2f} ms" ) print( f"Peak p99 Latency: {max([s.latency_p99_ms for s in data_stats]):.2f} ms" ) print("=" * 80)Runs the ENRGDAQ benchmark and collects statistics.
Methods
def run(self)-
Expand source code
def run(self): """Run the benchmark.""" # Set up signal handlers signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) # Register cleanup on exit atexit.register(self._cleanup_all_processes) print("=" * 80) print("ENRGDAQ Benchmark") print("=" * 80) print("Configuration:") print(f" - Clients: {self.config.num_clients}") print(f" - Payload Size: {self.config.payload_size} values/message") print(f" - Duration: {self.config.duration_seconds} seconds") print(f" - ZMQ XSUB URL: {self.config.zmq_xsub_url}") print(f" - ZMQ XPUB URL: {self.config.zmq_xpub_url}") print("=" * 80) print() # Clean up any existing output files from previous runs output_files_to_clean = ["out/test.root"] for output_file in output_files_to_clean: if os.path.exists(output_file): os.remove(output_file) print(f"Removed existing output file: {output_file}") # Create shared array to track client ring buffer bytes (one c_longlong per client) self._client_shm_bytes = Array("q", self.config.num_clients) # 'q' = c_longlong # Start main supervisor process print("Starting main supervisor...") main_process = Process( target=run_main_supervisor, args=( self.config, self._stats_queue, self._stop_flag, self._client_shm_bytes, ), ) main_process.start() self._processes.append(main_process) # Give main supervisor time to start (ZMQ needs to bind first) time.sleep(1) # Create ready events for each client ready_events = [Event() for _ in range(self.config.num_clients)] # Start client processes print(f"Starting {self.config.num_clients} client(s)...") for i in range(self.config.num_clients): client_process = Process( target=run_client_supervisor, args=( i, self.config, self._stop_flag, ready_events[i], self._client_shm_bytes, ), ) client_process.start() self._processes.append(client_process) # Wait for all clients to signal ready (with timeout) print("Waiting for clients to be ready...") all_ready = all(event.wait(timeout=30) for event in ready_events) if not all_ready: print("WARNING: Not all clients signaled ready within timeout") print() print("Benchmark running... (waiting for first data)") print("-" * 80) # Timer starts when first data arrives, not now start_time: datetime | None = None end_time_seconds = self.config.duration_seconds try: while not self._stop_flag.value: # Check for stats in queue try: stats_dict = self._stats_queue.get(timeout=0.5) stats = self._dict_to_stats(stats_dict) # Start timer when first data arrives if start_time is None and stats.msg_in_count > 0: start_time = datetime.now() print("First data received, starting timer...") self._stats_history.append(stats) self._print_stats(stats) except Exception: pass # Check duration (only if timer has started) if start_time is not None: elapsed = (datetime.now() - start_time).total_seconds() if elapsed >= end_time_seconds: print(f"\nDuration of {end_time_seconds}s reached, stopping...") break finally: self._stop_flag.value = True # Print summary self._print_summary() # Clean up all processes self._cleanup_all_processes() # Unregister atexit since we've already cleaned up try: atexit.unregister(self._cleanup_all_processes) except Exception: passRun the benchmark.
class BenchmarkStats (timestamp: datetime.datetime,
supervisor_id: str,
msg_in_out_mb: float,
msg_in_count: int,
msg_out_count: int,
msg_in_out_mb_per_s: float,
avg_queue_size: float,
active_job_count: int,
cpu_usage_percent: float,
rss_mb: float,
latency_p95_ms: float,
latency_p99_ms: float,
shm_bytes_written: int = 0,
shm_mb_per_s: float = 0.0)-
Expand source code
@dataclass class BenchmarkStats: """Statistics collected during benchmark run.""" timestamp: datetime supervisor_id: str msg_in_out_mb: float msg_in_count: int msg_out_count: int msg_in_out_mb_per_s: float avg_queue_size: float active_job_count: int cpu_usage_percent: float rss_mb: float latency_p95_ms: float latency_p99_ms: float shm_bytes_written: int = 0 shm_mb_per_s: float = 0.0Statistics collected during benchmark run.
Instance variables
var active_job_count : intvar avg_queue_size : floatvar cpu_usage_percent : floatvar latency_p95_ms : floatvar latency_p99_ms : floatvar msg_in_count : intvar msg_in_out_mb : floatvar msg_in_out_mb_per_s : floatvar msg_out_count : intvar rss_mb : floatvar shm_bytes_written : intvar shm_mb_per_s : floatvar supervisor_id : strvar timestamp : datetime.datetime