Module enrgdaq.tools.plot_benchmarks
Functions
def load_results()-
Expand source code
def load_results(): """Load latest results for plotting.""" latest = RESULTS_DIR / "latest_results.json" if not latest.exists(): print(f"Error: {latest} not found. Run with --run first.") sys.exit(1) with open(latest) as f: return json.load(f)Load latest results for plotting.
def main()-
Expand source code
def main(): parser = argparse.ArgumentParser(description="ENRGDAQ Visualization Suite") parser.add_argument("--run", action="store_true", help="Run benchmarks") parser.add_argument( "--plot", action="store_true", help="Generate plots from latest results" ) parser.add_argument("--all", action="store_true", help="Run and plot") args = parser.parse_args() if not any([args.run, args.plot, args.all]): parser.print_help() sys.exit(1) GRAPHS_DIR.mkdir(parents=True, exist_ok=True) if args.run or args.all: results = run_benchmarks() else: results = load_results() if args.plot or args.all or not args.run: print("\n=== Generating Advanced Visuals ===") plot_wcd_headroom(results) plot_performance_curve(results) plot_latency_jitter(results) plot_cpu_efficiency(results) plot_aggregate_scaling(results) print(f"Success! High-resolution graphs saved to: {GRAPHS_DIR}/") def plot_aggregate_scaling(results: list[dict])-
Expand source code
def plot_aggregate_scaling(results: list[dict]): """Plot aggregate throughput scaling with client count.""" # Filter for results with 500k payload (our efficiency sweet spot) data = [r for r in results if r["payload_size"] == 500000] data = sorted(data, key=lambda x: x["num_clients"]) if not data: return clients = [r["num_clients"] for r in data] throughput = [r["avg_throughput_mbs"] for r in data] fig, ax = plt.subplots(figsize=(10, 6)) # Bar plot for throughput bars = ax.bar( [str(c) for c in clients], throughput, color=COLORS["primary"], alpha=0.8, label="Aggregate Throughput", ) # Aesthetics ax.set_xlabel("Number of Concurrent Clients", fontsize=12, fontweight="bold") ax.set_ylabel("Total System Throughput (MB/s)", fontsize=12, fontweight="bold") ax.set_title( "ENRGDAQ Aggregate Scalability\n(Fixed 500k Payload)", fontsize=15, fontweight="bold", pad=20, ) # Gbps labels on top for bar, val in zip(bars, throughput): gbps = (val * 8) / 1000.0 ax.text( bar.get_x() + bar.get_width() / 2, val + 20, f"{val:,.0f} MB/s\n({gbps:.1f} Gbps)", ha="center", va="bottom", fontweight="bold", color=COLORS["dark"], ) # Draw the 11.4 Gbps hardware limit line ceiling_mbps = 1433.0 ax.axhline(ceiling_mbps, color=COLORS["danger"], ls="--", lw=2, alpha=0.6) ax.text( -0.4, ceiling_mbps + 30, "System Hardware Limit (11.4 Gbps)", color=COLORS["danger"], fontweight="bold", ) plt.tight_layout() plt.savefig(GRAPHS_DIR / "aggregate_scaling.png") plt.savefig(GRAPHS_DIR / "aggregate_scaling.pdf") plt.close()Plot aggregate throughput scaling with client count.
def plot_cpu_efficiency(results: list[dict])-
Expand source code
def plot_cpu_efficiency(results: list[dict]): """Plot Throughput vs CPU efficiency.""" data = sorted(results, key=lambda x: x["payload_size"]) throughput = [r["avg_throughput_mbs"] for r in data] cpu = [r["cpu_usage_percent"] for r in data] fig, ax = plt.subplots(figsize=(10, 6)) # Normalize to MB/s per CPU % efficiency = [t / c if c > 0 else 0 for t, c in zip(throughput, cpu)] ax.bar( [str(r["payload_size"]) for r in data], efficiency, color=COLORS["highlight"], alpha=0.7, ) ax.set_xlabel("Payload Size (Values per Message)", fontsize=12) ax.set_ylabel("Efficiency (MB/s per 1% CPU)", fontsize=12) ax.set_title( "CPU Efficiency Scaling\nDelegation to C++ Backends (libzmq, PyArrow)", fontsize=14, fontweight="bold", ) plt.tight_layout() plt.savefig(GRAPHS_DIR / "cpu_efficiency.png") plt.savefig(GRAPHS_DIR / "cpu_efficiency.pdf") plt.close()Plot Throughput vs CPU efficiency.
def plot_latency_jitter(results: list[dict])-
Expand source code
def plot_latency_jitter(results: list[dict]): """Plot latency jitter (p95, p99) against payload size.""" data = sorted(results, key=lambda x: x["payload_size"]) payloads = [r["payload_size"] for r in data] p95 = [r["latency_p95_ms"] for r in data] p99 = [r["latency_p99_ms"] for r in data] fig, ax = plt.subplots(figsize=(10, 6)) ax.plot( payloads, p95, "o-", color=COLORS["secondary"], lw=3, label="p95 Jitter (95th percentile)", ) ax.plot( payloads, p99, "s-", color=COLORS["danger"], lw=2, alpha=0.7, label="p99 Jitter (Worst-case tail)", ) ax.set_xscale("log") ax.set_yscale("log") ax.set_xlabel("Payload Size (Values per Message)", fontsize=12) ax.set_ylabel("Latency (ms)", fontsize=12) ax.set_title( "Non-Deterministic Latency (Jitter Analysis)\nEffect of Python GC & ZMQ Context Swapping", fontsize=14, fontweight="bold", ) # JINST Reference line for "Soft Real-Time" (e.g. 10ms) ax.axhline(10.0, color=COLORS["dark"], ls="--", alpha=0.4) ax.text( payloads[0], 11, "Standard Soft Real-Time Threshold (10ms)", color=COLORS["dark"], alpha=0.6, ) ax.legend() plt.grid(True, which="both", ls="-", alpha=0.1) plt.tight_layout() plt.savefig(GRAPHS_DIR / "latency_jitter.png") plt.savefig(GRAPHS_DIR / "latency_jitter.pdf") plt.close()Plot latency jitter (p95, p99) against payload size.
def plot_performance_curve(results: list[dict])-
Expand source code
def plot_performance_curve(results: list[dict]): """Modern performance curve showing throughput vs payload size.""" data = sorted(results, key=lambda x: x["payload_size"]) payloads = [r["payload_size"] for r in data] avg = [r["avg_throughput_mbs"] for r in data] peak = [r["peak_throughput_mbs"] for r in data] fig, ax = plt.subplots(figsize=(10, 6)) # Area fill ax.fill_between(payloads, avg, color=COLORS["primary"], alpha=0.1) # Lines ax.plot( payloads, avg, "o-", color=COLORS["secondary"], lw=4, markersize=10, label="Sustained SHM Throughput", ) ax.plot( payloads, peak, "s--", color=COLORS["primary"], lw=2, alpha=0.5, label="Peak SHM Capability", ) # Added: ZMQ Throughput (Management Overhead) zmq = [r["zmq_throughput_mbs"] for r in data] ax.plot( payloads, zmq, "x:", color=COLORS["dark"], lw=1, alpha=0.8, label="ZMQ Management Overhead", ) # 11.4 Gbps Ceiling (based on latest tests) ceiling_mbps = 1433.0 ax.axhline(ceiling_mbps, color=COLORS["dark"], ls=":", lw=2, alpha=0.6) ax.text( payloads[0], ceiling_mbps + 30, "11.4 Gbps Performance Ceiling", color=COLORS["dark"], fontweight="bold", fontsize=10, ) # 10GbE Comparison ax.axhline(1250, color=COLORS["danger"], ls="-.", lw=1, alpha=0.4) ax.text( payloads[0], 1250 - 60, "Standard 10 Gbps Ethernet Limit", color=COLORS["danger"], alpha=0.7, ) ax.set_xscale("log") ax.set_xlabel("Payload Size (Values per Message)", fontsize=12, fontweight="bold") ax.set_ylabel("Data Throughput (MB/s)", fontsize=12, fontweight="bold") ax.set_title( "ENRGDAQ Throughput Scaling\n(Arrow IPC + Zero-Copy SHM)", fontsize=16, pad=20, fontweight="bold", ) ax.legend(frameon=True, facecolor="white", edgecolor="#dee2e6") plt.grid(True, which="both", ls="-", alpha=0.15) plt.tight_layout() plt.savefig(GRAPHS_DIR / "throughput_scaling.png") plt.savefig(GRAPHS_DIR / "throughput_scaling.pdf") plt.close()Modern performance curve showing throughput vs payload size.
def plot_wcd_headroom(results: list[dict])-
Expand source code
def plot_wcd_headroom(results: list[dict]): """Specific plot for Water Cherenkov Detector (WCD) headroom.""" # Context: 1-ton WCD with 8 PMTs. # Approx 4 MB/s requirement assumes 100 Hz triggers with 500 samples/ch wcd_requirement = 4.0 # Use the best observed result best = max(results, key=lambda x: x["avg_throughput_mbs"]) fig, ax = plt.subplots(figsize=(9, 6)) labels = [ "Target: WCD Experiment\n(8 PMTs @ 100Hz)", "Achieved: ENRGDAQ\n(Zero-Copy Python)", ] values = [wcd_requirement, best["avg_throughput_mbs"]] bars = ax.bar( labels, values, color=[COLORS["danger"], COLORS["primary"]], alpha=0.85, width=0.5, ) # Aesthetics ax.set_ylabel("Throughput (MB/s)", fontsize=12, fontweight="bold") ax.set_title( "Safety Margin: ENRGDAQ vs. WCD Requirements", fontsize=15, pad=25, fontweight="bold", ) # Log scale is often better for such massive differences, but linear shows the 'wow' factor # We'll use linear but with a break or a clear label headroom = values[1] / values[0] ax.annotate( f"{headroom:.0f}x Headroom", xy=(1, values[1]), xytext=(1, values[1] * 1.05), ha="center", fontsize=14, fontweight="bold", color=COLORS["secondary"], arrowprops=dict(arrowstyle="->", color=COLORS["secondary"], lw=2), ) for bar, val in zip(bars, values): ax.text( bar.get_x() + bar.get_width() / 2, val + 10, f"{val:,.1f} MB/s", ha="center", va="bottom", fontweight="bold", color=COLORS["dark"], ) plt.tight_layout() plt.savefig(GRAPHS_DIR / "wcd_headroom.png") plt.savefig(GRAPHS_DIR / "wcd_headroom.pdf") plt.close()Specific plot for Water Cherenkov Detector (WCD) headroom.
def run_benchmarks()-
Expand source code
def run_benchmarks(): """Run comprehensive benchmark suite.""" RESULTS_DIR.mkdir(parents=True, exist_ok=True) results = [] # Scaling Payloads print("\n=== Running Performance Scaling Benchmarks ===") payload_sizes = [1000, 10000, 50000, 100000, 500000] for payload in payload_sizes: res = run_single_benchmark(3, payload, 10) if res: results.append(asdict(res)) time.sleep(1) # Aggregate Scaling (Fixed Payload, Varying Clients) print("\n=== Running Aggregate Scaling Benchmarks ===") client_counts = [1, 2, 4, 8] for clients in client_counts: # Fixed payload at 500k to show system limits res = run_single_benchmark(clients, 500000, 10) if res: results.append(asdict(res)) time.sleep(1) with open(RESULTS_DIR / "latest_results.json", "w") as f: json.dump(results, f, indent=2) return resultsRun comprehensive benchmark suite.
def run_single_benchmark(clients: int, payload_size: int, duration: int) ‑> BenchmarkResult | None-
Expand source code
def run_single_benchmark( clients: int, payload_size: int, duration: int ) -> Optional[BenchmarkResult]: """Run a single benchmark configuration and return results.""" cmd = [ "uv", "run", "src/enrgdaq/tools/benchmark_runner.py", "--clients", str(clients), "--payload-size", str(payload_size), "--duration", str(duration), "--use-memory-store", "--use-shm", "--stats-interval", "1.0", ] print(f" Running: clients={clients}, payload={payload_size}...") try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(timeout=duration + 30) if process.returncode != 0: print(f" Failed: {stderr}") return None # Parse summary from stdout lines = stdout.split("\n") data = {} for line in lines: if "Avg SHM Throughput:" in line: data["avg_throughput"] = float( line.split(":")[1].split("MB/s")[0].strip() ) if "Peak SHM Throughput:" in line: data["peak_throughput"] = float( line.split(":")[1].split("MB/s")[0].strip() ) if "ZMQ Throughput:" in line: data["zmq_throughput"] = float( line.split(":")[1].split("MB/s")[0].strip() ) if "Messages/Second:" in line: data["messages_per_second"] = float( line.split(":")[1].replace(",", "").strip() ) if "Average Queue Size:" in line: data["avg_queue"] = float(line.split(":")[1].strip()) if "Avg CPU Usage:" in line: data["cpu_usage"] = float(line.split(":")[1].replace("%", "").strip()) if "Avg p95 Latency:" in line: data["latency_p95"] = float( line.split(":")[1].replace("ms", "").strip() ) if "Peak p99 Latency:" in line: data["latency_p99"] = float( line.split(":")[1].replace("ms", "").strip() ) return BenchmarkResult( num_clients=clients, payload_size=payload_size, duration=duration, avg_throughput_mbs=data.get("avg_throughput", 0), peak_throughput_mbs=data.get("peak_throughput", 0), zmq_throughput_mbs=data.get("zmq_throughput", 0), messages_per_second=data.get("messages_per_second", 0), avg_queue_size=data.get("avg_queue", 0), cpu_usage_percent=data.get("cpu_usage", 0), latency_p95_ms=data.get("latency_p95", 0), latency_p99_ms=data.get("latency_p99", 0), ) except Exception as e: print(f" Error: {e}") return NoneRun a single benchmark configuration and return results.
Classes
class BenchmarkResult (num_clients: int,
payload_size: int,
duration: int,
avg_throughput_mbs: float,
peak_throughput_mbs: float,
zmq_throughput_mbs: float,
messages_per_second: float,
avg_queue_size: float,
cpu_usage_percent: float,
latency_p95_ms: float,
latency_p99_ms: float)-
Expand source code
@dataclass class BenchmarkResult: num_clients: int payload_size: int duration: int avg_throughput_mbs: float peak_throughput_mbs: float zmq_throughput_mbs: float messages_per_second: float avg_queue_size: float cpu_usage_percent: float latency_p95_ms: float latency_p99_ms: floatBenchmarkResult(num_clients: int, payload_size: int, duration: int, avg_throughput_mbs: float, peak_throughput_mbs: float, zmq_throughput_mbs: float, messages_per_second: float, avg_queue_size: float, cpu_usage_percent: float, latency_p95_ms: float, latency_p99_ms: float)
Instance variables
var avg_queue_size : floatvar avg_throughput_mbs : floatvar cpu_usage_percent : floatvar duration : intvar latency_p95_ms : floatvar latency_p99_ms : floatvar messages_per_second : floatvar num_clients : intvar payload_size : intvar peak_throughput_mbs : floatvar zmq_throughput_mbs : float