Module enrgdaq.tools.plot_benchmarks

Functions

def load_results()
Expand source code
def load_results():
    """Load latest results for plotting."""
    latest = RESULTS_DIR / "latest_results.json"
    if not latest.exists():
        print(f"Error: {latest} not found. Run with --run first.")
        sys.exit(1)
    with open(latest) as f:
        return json.load(f)

Load latest results for plotting.

def main()
Expand source code
def main():
    parser = argparse.ArgumentParser(description="ENRGDAQ Visualization Suite")
    parser.add_argument("--run", action="store_true", help="Run benchmarks")
    parser.add_argument(
        "--plot", action="store_true", help="Generate plots from latest results"
    )
    parser.add_argument("--all", action="store_true", help="Run and plot")
    args = parser.parse_args()

    if not any([args.run, args.plot, args.all]):
        parser.print_help()
        sys.exit(1)

    GRAPHS_DIR.mkdir(parents=True, exist_ok=True)

    if args.run or args.all:
        results = run_benchmarks()
    else:
        results = load_results()

    if args.plot or args.all or not args.run:
        print("\n=== Generating Advanced Visuals ===")
        plot_wcd_headroom(results)
        plot_performance_curve(results)
        plot_latency_jitter(results)
        plot_cpu_efficiency(results)
        plot_aggregate_scaling(results)
        print(f"Success! High-resolution graphs saved to: {GRAPHS_DIR}/")
def plot_aggregate_scaling(results: list[dict])
Expand source code
def plot_aggregate_scaling(results: list[dict]):
    """Plot aggregate throughput scaling with client count."""
    # Filter for results with 500k payload (our efficiency sweet spot)
    data = [r for r in results if r["payload_size"] == 500000]
    data = sorted(data, key=lambda x: x["num_clients"])

    if not data:
        return

    clients = [r["num_clients"] for r in data]
    throughput = [r["avg_throughput_mbs"] for r in data]

    fig, ax = plt.subplots(figsize=(10, 6))

    # Bar plot for throughput
    bars = ax.bar(
        [str(c) for c in clients],
        throughput,
        color=COLORS["primary"],
        alpha=0.8,
        label="Aggregate Throughput",
    )

    # Aesthetics
    ax.set_xlabel("Number of Concurrent Clients", fontsize=12, fontweight="bold")
    ax.set_ylabel("Total System Throughput (MB/s)", fontsize=12, fontweight="bold")
    ax.set_title(
        "ENRGDAQ Aggregate Scalability\n(Fixed 500k Payload)",
        fontsize=15,
        fontweight="bold",
        pad=20,
    )

    # Gbps labels on top
    for bar, val in zip(bars, throughput):
        gbps = (val * 8) / 1000.0
        ax.text(
            bar.get_x() + bar.get_width() / 2,
            val + 20,
            f"{val:,.0f} MB/s\n({gbps:.1f} Gbps)",
            ha="center",
            va="bottom",
            fontweight="bold",
            color=COLORS["dark"],
        )

    # Draw the 11.4 Gbps hardware limit line
    ceiling_mbps = 1433.0
    ax.axhline(ceiling_mbps, color=COLORS["danger"], ls="--", lw=2, alpha=0.6)
    ax.text(
        -0.4,
        ceiling_mbps + 30,
        "System Hardware Limit (11.4 Gbps)",
        color=COLORS["danger"],
        fontweight="bold",
    )

    plt.tight_layout()
    plt.savefig(GRAPHS_DIR / "aggregate_scaling.png")
    plt.savefig(GRAPHS_DIR / "aggregate_scaling.pdf")
    plt.close()

Plot aggregate throughput scaling with client count.

def plot_cpu_efficiency(results: list[dict])
Expand source code
def plot_cpu_efficiency(results: list[dict]):
    """Plot Throughput vs CPU efficiency."""
    data = sorted(results, key=lambda x: x["payload_size"])
    throughput = [r["avg_throughput_mbs"] for r in data]
    cpu = [r["cpu_usage_percent"] for r in data]

    fig, ax = plt.subplots(figsize=(10, 6))

    # Normalize to MB/s per CPU %
    efficiency = [t / c if c > 0 else 0 for t, c in zip(throughput, cpu)]

    ax.bar(
        [str(r["payload_size"]) for r in data],
        efficiency,
        color=COLORS["highlight"],
        alpha=0.7,
    )

    ax.set_xlabel("Payload Size (Values per Message)", fontsize=12)
    ax.set_ylabel("Efficiency (MB/s per 1% CPU)", fontsize=12)
    ax.set_title(
        "CPU Efficiency Scaling\nDelegation to C++ Backends (libzmq, PyArrow)",
        fontsize=14,
        fontweight="bold",
    )

    plt.tight_layout()
    plt.savefig(GRAPHS_DIR / "cpu_efficiency.png")
    plt.savefig(GRAPHS_DIR / "cpu_efficiency.pdf")
    plt.close()

Plot Throughput vs CPU efficiency.

def plot_latency_jitter(results: list[dict])
Expand source code
def plot_latency_jitter(results: list[dict]):
    """Plot latency jitter (p95, p99) against payload size."""
    data = sorted(results, key=lambda x: x["payload_size"])
    payloads = [r["payload_size"] for r in data]
    p95 = [r["latency_p95_ms"] for r in data]
    p99 = [r["latency_p99_ms"] for r in data]

    fig, ax = plt.subplots(figsize=(10, 6))

    ax.plot(
        payloads,
        p95,
        "o-",
        color=COLORS["secondary"],
        lw=3,
        label="p95 Jitter (95th percentile)",
    )
    ax.plot(
        payloads,
        p99,
        "s-",
        color=COLORS["danger"],
        lw=2,
        alpha=0.7,
        label="p99 Jitter (Worst-case tail)",
    )

    ax.set_xscale("log")
    ax.set_yscale("log")
    ax.set_xlabel("Payload Size (Values per Message)", fontsize=12)
    ax.set_ylabel("Latency (ms)", fontsize=12)
    ax.set_title(
        "Non-Deterministic Latency (Jitter Analysis)\nEffect of Python GC & ZMQ Context Swapping",
        fontsize=14,
        fontweight="bold",
    )

    # JINST Reference line for "Soft Real-Time" (e.g. 10ms)
    ax.axhline(10.0, color=COLORS["dark"], ls="--", alpha=0.4)
    ax.text(
        payloads[0],
        11,
        "Standard Soft Real-Time Threshold (10ms)",
        color=COLORS["dark"],
        alpha=0.6,
    )

    ax.legend()
    plt.grid(True, which="both", ls="-", alpha=0.1)
    plt.tight_layout()
    plt.savefig(GRAPHS_DIR / "latency_jitter.png")
    plt.savefig(GRAPHS_DIR / "latency_jitter.pdf")
    plt.close()

Plot latency jitter (p95, p99) against payload size.

def plot_performance_curve(results: list[dict])
Expand source code
def plot_performance_curve(results: list[dict]):
    """Modern performance curve showing throughput vs payload size."""
    data = sorted(results, key=lambda x: x["payload_size"])
    payloads = [r["payload_size"] for r in data]
    avg = [r["avg_throughput_mbs"] for r in data]
    peak = [r["peak_throughput_mbs"] for r in data]

    fig, ax = plt.subplots(figsize=(10, 6))

    # Area fill
    ax.fill_between(payloads, avg, color=COLORS["primary"], alpha=0.1)

    # Lines
    ax.plot(
        payloads,
        avg,
        "o-",
        color=COLORS["secondary"],
        lw=4,
        markersize=10,
        label="Sustained SHM Throughput",
    )
    ax.plot(
        payloads,
        peak,
        "s--",
        color=COLORS["primary"],
        lw=2,
        alpha=0.5,
        label="Peak SHM Capability",
    )

    # Added: ZMQ Throughput (Management Overhead)
    zmq = [r["zmq_throughput_mbs"] for r in data]
    ax.plot(
        payloads,
        zmq,
        "x:",
        color=COLORS["dark"],
        lw=1,
        alpha=0.8,
        label="ZMQ Management Overhead",
    )

    # 11.4 Gbps Ceiling (based on latest tests)
    ceiling_mbps = 1433.0
    ax.axhline(ceiling_mbps, color=COLORS["dark"], ls=":", lw=2, alpha=0.6)
    ax.text(
        payloads[0],
        ceiling_mbps + 30,
        "11.4 Gbps Performance Ceiling",
        color=COLORS["dark"],
        fontweight="bold",
        fontsize=10,
    )

    # 10GbE Comparison
    ax.axhline(1250, color=COLORS["danger"], ls="-.", lw=1, alpha=0.4)
    ax.text(
        payloads[0],
        1250 - 60,
        "Standard 10 Gbps Ethernet Limit",
        color=COLORS["danger"],
        alpha=0.7,
    )

    ax.set_xscale("log")
    ax.set_xlabel("Payload Size (Values per Message)", fontsize=12, fontweight="bold")
    ax.set_ylabel("Data Throughput (MB/s)", fontsize=12, fontweight="bold")
    ax.set_title(
        "ENRGDAQ Throughput Scaling\n(Arrow IPC + Zero-Copy SHM)",
        fontsize=16,
        pad=20,
        fontweight="bold",
    )

    ax.legend(frameon=True, facecolor="white", edgecolor="#dee2e6")
    plt.grid(True, which="both", ls="-", alpha=0.15)

    plt.tight_layout()
    plt.savefig(GRAPHS_DIR / "throughput_scaling.png")
    plt.savefig(GRAPHS_DIR / "throughput_scaling.pdf")
    plt.close()

Modern performance curve showing throughput vs payload size.

def plot_wcd_headroom(results: list[dict])
Expand source code
def plot_wcd_headroom(results: list[dict]):
    """Specific plot for Water Cherenkov Detector (WCD) headroom."""
    # Context: 1-ton WCD with 8 PMTs.
    # Approx 4 MB/s requirement assumes 100 Hz triggers with 500 samples/ch
    wcd_requirement = 4.0

    # Use the best observed result
    best = max(results, key=lambda x: x["avg_throughput_mbs"])

    fig, ax = plt.subplots(figsize=(9, 6))

    labels = [
        "Target: WCD Experiment\n(8 PMTs @ 100Hz)",
        "Achieved: ENRGDAQ\n(Zero-Copy Python)",
    ]
    values = [wcd_requirement, best["avg_throughput_mbs"]]

    bars = ax.bar(
        labels,
        values,
        color=[COLORS["danger"], COLORS["primary"]],
        alpha=0.85,
        width=0.5,
    )

    # Aesthetics
    ax.set_ylabel("Throughput (MB/s)", fontsize=12, fontweight="bold")
    ax.set_title(
        "Safety Margin: ENRGDAQ vs. WCD Requirements",
        fontsize=15,
        pad=25,
        fontweight="bold",
    )

    # Log scale is often better for such massive differences, but linear shows the 'wow' factor
    # We'll use linear but with a break or a clear label

    headroom = values[1] / values[0]
    ax.annotate(
        f"{headroom:.0f}x Headroom",
        xy=(1, values[1]),
        xytext=(1, values[1] * 1.05),
        ha="center",
        fontsize=14,
        fontweight="bold",
        color=COLORS["secondary"],
        arrowprops=dict(arrowstyle="->", color=COLORS["secondary"], lw=2),
    )

    for bar, val in zip(bars, values):
        ax.text(
            bar.get_x() + bar.get_width() / 2,
            val + 10,
            f"{val:,.1f} MB/s",
            ha="center",
            va="bottom",
            fontweight="bold",
            color=COLORS["dark"],
        )

    plt.tight_layout()
    plt.savefig(GRAPHS_DIR / "wcd_headroom.png")
    plt.savefig(GRAPHS_DIR / "wcd_headroom.pdf")
    plt.close()

Specific plot for Water Cherenkov Detector (WCD) headroom.

def run_benchmarks()
Expand source code
def run_benchmarks():
    """Run comprehensive benchmark suite."""
    RESULTS_DIR.mkdir(parents=True, exist_ok=True)
    results = []

    # Scaling Payloads
    print("\n=== Running Performance Scaling Benchmarks ===")
    payload_sizes = [1000, 10000, 50000, 100000, 500000]
    for payload in payload_sizes:
        res = run_single_benchmark(3, payload, 10)
        if res:
            results.append(asdict(res))
        time.sleep(1)

    # Aggregate Scaling (Fixed Payload, Varying Clients)
    print("\n=== Running Aggregate Scaling Benchmarks ===")
    client_counts = [1, 2, 4, 8]
    for clients in client_counts:
        # Fixed payload at 500k to show system limits
        res = run_single_benchmark(clients, 500000, 10)
        if res:
            results.append(asdict(res))
        time.sleep(1)

    with open(RESULTS_DIR / "latest_results.json", "w") as f:
        json.dump(results, f, indent=2)
    return results

Run comprehensive benchmark suite.

def run_single_benchmark(clients: int, payload_size: int, duration: int) ‑> BenchmarkResult | None
Expand source code
def run_single_benchmark(
    clients: int, payload_size: int, duration: int
) -> Optional[BenchmarkResult]:
    """Run a single benchmark configuration and return results."""
    cmd = [
        "uv",
        "run",
        "src/enrgdaq/tools/benchmark_runner.py",
        "--clients",
        str(clients),
        "--payload-size",
        str(payload_size),
        "--duration",
        str(duration),
        "--use-memory-store",
        "--use-shm",
        "--stats-interval",
        "1.0",
    ]

    print(f"  Running: clients={clients}, payload={payload_size}...")
    try:
        process = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
        )
        stdout, stderr = process.communicate(timeout=duration + 30)

        if process.returncode != 0:
            print(f"    Failed: {stderr}")
            return None

        # Parse summary from stdout
        lines = stdout.split("\n")
        data = {}
        for line in lines:
            if "Avg SHM Throughput:" in line:
                data["avg_throughput"] = float(
                    line.split(":")[1].split("MB/s")[0].strip()
                )
            if "Peak SHM Throughput:" in line:
                data["peak_throughput"] = float(
                    line.split(":")[1].split("MB/s")[0].strip()
                )
            if "ZMQ Throughput:" in line:
                data["zmq_throughput"] = float(
                    line.split(":")[1].split("MB/s")[0].strip()
                )
            if "Messages/Second:" in line:
                data["messages_per_second"] = float(
                    line.split(":")[1].replace(",", "").strip()
                )
            if "Average Queue Size:" in line:
                data["avg_queue"] = float(line.split(":")[1].strip())
            if "Avg CPU Usage:" in line:
                data["cpu_usage"] = float(line.split(":")[1].replace("%", "").strip())
            if "Avg p95 Latency:" in line:
                data["latency_p95"] = float(
                    line.split(":")[1].replace("ms", "").strip()
                )
            if "Peak p99 Latency:" in line:
                data["latency_p99"] = float(
                    line.split(":")[1].replace("ms", "").strip()
                )

        return BenchmarkResult(
            num_clients=clients,
            payload_size=payload_size,
            duration=duration,
            avg_throughput_mbs=data.get("avg_throughput", 0),
            peak_throughput_mbs=data.get("peak_throughput", 0),
            zmq_throughput_mbs=data.get("zmq_throughput", 0),
            messages_per_second=data.get("messages_per_second", 0),
            avg_queue_size=data.get("avg_queue", 0),
            cpu_usage_percent=data.get("cpu_usage", 0),
            latency_p95_ms=data.get("latency_p95", 0),
            latency_p99_ms=data.get("latency_p99", 0),
        )

    except Exception as e:
        print(f"    Error: {e}")
        return None

Run a single benchmark configuration and return results.

Classes

class BenchmarkResult (num_clients: int,
payload_size: int,
duration: int,
avg_throughput_mbs: float,
peak_throughput_mbs: float,
zmq_throughput_mbs: float,
messages_per_second: float,
avg_queue_size: float,
cpu_usage_percent: float,
latency_p95_ms: float,
latency_p99_ms: float)
Expand source code
@dataclass
class BenchmarkResult:
    num_clients: int
    payload_size: int
    duration: int
    avg_throughput_mbs: float
    peak_throughput_mbs: float
    zmq_throughput_mbs: float
    messages_per_second: float
    avg_queue_size: float
    cpu_usage_percent: float
    latency_p95_ms: float
    latency_p99_ms: float

BenchmarkResult(num_clients: int, payload_size: int, duration: int, avg_throughput_mbs: float, peak_throughput_mbs: float, zmq_throughput_mbs: float, messages_per_second: float, avg_queue_size: float, cpu_usage_percent: float, latency_p95_ms: float, latency_p99_ms: float)

Instance variables

var avg_queue_size : float
var avg_throughput_mbs : float
var cpu_usage_percent : float
var duration : int
var latency_p95_ms : float
var latency_p99_ms : float
var messages_per_second : float
var num_clients : int
var payload_size : int
var peak_throughput_mbs : float
var zmq_throughput_mbs : float