Files
ruvnet--RuView/python/bench/test_bench_vitals.py
T
ruv 78916d8455 hardening(adr-117): benchmarks + security/robustness test suite
Benchmarks (`python/bench/`, pytest-benchmark — opt-in via --benchmark-only):

| Hot path | Mean | Ops/sec | % of 100 Hz budget |
|---|---|---|---|
| BfldFrame HT20 1×1×52 | 800 ns | 1.25 Mops | 0.008% |
| BfldFrame HE20 2×1×242 | 1.3 μs | 750 kops | 0.013% |
| BfldFrame HE80 2×1×996 | 4.2 μs | 236 kops | 0.042% |
| BfldFrame HE160 2×2×1992 | 14 μs | 71 kops | 0.14% |
| BfldFrame.feedback_matrix() | 2.8 μs | 352 kops | — |
| WS edge_vitals decode | 7.4 μs | 134 kops | 0.074% |
| WS pose_data decode (3 persons) | 23 μs | 42 kops | 0.24% |
| BreathingExtractor.extract() 56sc | 28 μs | 35 kops | 0.28% |
| BreathingExtractor.extract() 114sc | 44 μs | 23 kops | 0.44% |
| BreathingExtractor.extract() 242sc | 79 μs | 13 kops | 0.79% |
| HeartRateExtractor.extract() 56sc | 105 μs | 9.5 kops | 1.05% |

All hot paths well under the 100 Hz ESP32 frame budget (10 ms).
Worst case (HeartRateExtractor) uses 1% of the budget — no
optimization needed. Scaling on n_subcarriers is sub-quadratic
(56→242 = 4.3× input, 2.8× time) — catches future O(n²)
regressions.

Security & robustness tests (`tests/test_security.py`, +27 tests):

- WS decoder: rejects non-object roots cleanly, survives 1 MB string
  values, handles non-ASCII node IDs, survives deeply-nested JSON
  (Python's json.loads built-in guard not bypassed)
- MQTT topic matcher: 9 edge-case parametrize entries including
  $SYS topics, null-byte injection, mid-pattern `#` boundary,
  empty-string boundary
- MQTT credential confidentiality: password never appears in
  repr()/str(), never stored in plain client-instance attribute
- HA discovery: rejects null-byte-laced topics, rejects extra
  slashes in node_id, rejects non-dict payload body (list, scalar,
  invalid UTF-8 bytes) without crashing
- Semantic primitive listener: rejects topic-injection attempts
  (prefix-injected paths, wrong case on final segment), survives
  invalid UTF-8 payloads
- Public surface integrity: every name in wifi_densepose.__all__
  AND wifi_densepose.client.__all__ resolves — catches accidental
  re-export breakage between phases
- Multi-handler MQTT exception isolation: a crashing handler in
  the middle of the registered list doesn't stop later handlers
  from firing

Test count: 156 → 183 (+27). All passing.

Bench results steady-state confirm no Rust-binding-layer
optimization is needed before the v2.0.0 publish.

Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #785

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-05-24 11:44:54 -04:00

86 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ADR-117 hardening sweep — Benchmarks for the P3 vitals hot paths.
Targets the ESP32 production rate: 100 Hz × 56 subcarriers, which is
what `BreathingExtractor.esp32_default()` is tuned for. The bench
asserts the *per-extract* cost is comfortably below 10 ms — at 100 Hz
that's the entire frame budget, so anything above 10 ms means the
Python binding would be the bottleneck instead of the radio.
Run with:
pytest python/bench/ --benchmark-only
The benchmarks are skipped by default (`addopts` in pyproject.toml
doesn't include them) — they live in a sibling `bench/` directory
so the main test run stays fast.
"""
from __future__ import annotations
import math
from random import Random
import pytest
from wifi_densepose import BreathingExtractor, HeartRateExtractor
def _synth_frame(n_subcarriers: int, sample_rate: float, t: float, freq_hz: float, rng: Random) -> tuple[list[float], list[float]]:
"""Build one ESP32-shape frame at time `t`: sine at `freq_hz` plus
tiny per-subcarrier noise."""
base = math.sin(2.0 * math.pi * freq_hz * t)
residuals = [base + rng.gauss(0.0, 0.01) for _ in range(n_subcarriers)]
weights = [1.0] * n_subcarriers
return residuals, weights
def test_breathing_extract_per_frame_cost(benchmark) -> None:
"""One BreathingExtractor.extract() at ESP32 defaults should
finish well under 10 ms — that's the 100 Hz frame budget."""
br = BreathingExtractor.esp32_default()
rng = Random(42)
# Pre-fill ~25 seconds of history so the bench measures the
# steady-state cost, not the cold-start cost.
for i in range(2500):
residuals, weights = _synth_frame(56, 100.0, i / 100.0, 0.25, rng)
br.extract(residuals=residuals, weights=weights)
def _one_frame():
residuals, weights = _synth_frame(56, 100.0, 30.0, 0.25, rng)
return br.extract(residuals=residuals, weights=weights)
benchmark(_one_frame)
def test_heart_rate_extract_per_frame_cost(benchmark) -> None:
"""One HeartRateExtractor.extract() at ESP32 defaults — same 10 ms
target."""
hr = HeartRateExtractor.esp32_default()
rng = Random(43)
for i in range(1500):
residuals, weights = _synth_frame(56, 100.0, i / 100.0, 1.2, rng)
hr.extract(residuals=residuals, weights=weights)
def _one_frame():
residuals, weights = _synth_frame(56, 100.0, 16.0, 1.2, rng)
return hr.extract(residuals=residuals, weights=weights)
benchmark(_one_frame)
@pytest.mark.parametrize("n_subcarriers", [56, 114, 242])
def test_breathing_extract_scaling(benchmark, n_subcarriers: int) -> None:
"""Sanity check: cost should scale roughly linearly with the
subcarrier count. Catches accidental O(n^2) regressions."""
sample_rate = 100.0
br = BreathingExtractor(n_subcarriers, sample_rate, 30.0)
rng = Random(n_subcarriers)
for i in range(2500):
residuals, weights = _synth_frame(n_subcarriers, sample_rate, i / sample_rate, 0.25, rng)
br.extract(residuals=residuals, weights=weights)
def _one_frame():
residuals, weights = _synth_frame(n_subcarriers, sample_rate, 30.0, 0.25, rng)
return br.extract(residuals=residuals, weights=weights)
benchmark(_one_frame)