Files
ruv 8504638187 feat(signal): ADR-135 — empty-room baseline calibration
Operator-initiated calibration that records 30 s of stationary CSI,
emits a per-subcarrier baseline (amplitude mean+variance via Welford,
phase via circular sin/cos sums with von Mises dispersion), and gates
downstream stages on a deviation z-score. Plugs into multistatic
coherence gating, motion/presence detection, and the new ADR-134 CIR
estimator as a reference-subtracted input.

API surface (under wifi_densepose_signal):
  CalibrationConfig::{ht20, ht40, he20, he40}
  CalibrationRecorder { record(), finalize(), frames_recorded() }
  BaselineCalibration {
    subcarriers: Vec<SubcarrierBaseline>,
    deviation(&CsiFrame), subtract_in_place(&mut CsiFrame),
    to_bytes(), from_bytes()
  }
  CalibrationDeviationScore { amplitude_z_median, amplitude_z_max,
                              phase_drift_median, motion_flagged }
  CalibrationError { SubcarrierMismatch, TierMismatch,
                     InsufficientFrames, VersionMismatch, TruncatedBuffer }

Binary baseline format: magic 0xCA1B_0001 + u8 version=1 + u8 tier +
captured_at_unix_s (i64) + frame_count (u64) + num_subcarriers (u32) +
[SubcarrierBaseline; N] as 16 bytes each (amp_mean, amp_variance,
phase_mean, phase_dispersion as f32 LE). Hand-written serialisation so
the format is stable across Rust toolchain versions without serde drift.

CLI: new `wifi-densepose calibrate` subcommand binds a UDP listener
(0xC511_0001 frames), streams them through CalibrationRecorder, prints
a real-time z-score banner per ADR-135 §risk 1 (operator-may-be-moving),
aborts on sustained high deviation, and writes the binary baseline to
disk. Local UDP packet parser duplicated from sensing-server (per ADR
discussion — avoids cross-crate API churn).

Witness: cross-platform-deterministic SHA-256 over the per-subcarrier
quantised baseline profile (u16 LE at 1e-2/1e-4/1e-3, no sort) using
the lesson learnt from the CIR PR #837 libm-jitter fix. Hash:
d6bce07ecb1648e6936561df44bf4a3bfc17bb0ba5f692646b2301d105b52f67

CI guard: new "ADR-135 calibration witness proof (determinism guard)"
step under the Rust Workspace Tests job, adjacent to the existing
ADR-134 CIR guard. Regressions are unambiguously attributable.

Hardware-in-loop validation: full 600-frame capture exercised via the
new scripts/synth-csi-udp.py emitter targeting 127.0.0.1:5005. The CLI
binary received 600 frames at 20 Hz, z_med stable at ~0.7, motion
correctly NOT flagged, finalised baseline written to baseline.bin (860
bytes) with correct magic + version + timestamp in the header. Live
ESP32 capture from COM9 is operator follow-up — requires provisioning
the firmware's UDP target IP to match the host running the CLI.

Test results (cargo test -p wifi-densepose-signal --no-default-features):
  lib:                    382 pass / 0 fail / 1 ignored
  calibration_synthetic:   17 pass / 0 fail
  calibration_drift:        5 pass / 0 fail
  calibration_roundtrip:   10 pass / 0 fail
  cir_*:                    9 pass + 6 documented P2 ignores
  doctest:                 10 pass

Bench: 20 Criterion combinations registered
(recorder_record / recorder_finalize / deviation / record_600 /
to_bytes across HT20/HT40/HE20/HE40 tiers).

Witness: bash scripts/verify-calibration-proof.sh → VERDICT: PASS

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-05-28 18:57:08 -04:00

92 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""Synthetic CSI UDP emitter for testing the calibration CLI end-to-end.
Emits the same 0xC511_0001 frame format the ESP32-S3 firmware produces, so the
`wifi-densepose calibrate` CLI can be exercised without a live ESP32 in the
loop. Generates HT20 frames (52 active subcarriers, 1 antenna) at 20 Hz.
"""
import argparse
import math
import random
import socket
import struct
import time
MAGIC = 0xC511_0001
def build_packet(node_id: int, seq: int, freq_mhz: int, rssi: int,
amps: list[float], phases: list[float]) -> bytes:
n_ant = 1
n_sc = len(amps)
header = struct.pack(
"<I B B B B H I b b I",
MAGIC,
node_id,
n_ant,
n_sc,
0, # reserved
freq_mhz,
seq,
rssi,
-95, # noise_floor
0, # reserved/padding
)
iq = bytearray()
for amp, phase in zip(amps, phases):
i = max(-127, min(127, int(amp * math.cos(phase))))
q = max(-127, min(127, int(amp * math.sin(phase))))
iq.extend(struct.pack("bb", i, q))
return bytes(header) + bytes(iq)
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--host", default="127.0.0.1")
p.add_argument("--port", type=int, default=5005)
p.add_argument("--duration-s", type=float, default=35.0,
help="emit duration; default 35s so a 30s capture sees the full stream")
p.add_argument("--rate-hz", type=float, default=20.0)
p.add_argument("--n-sc", type=int, default=52)
p.add_argument("--motion-after-s", type=float, default=-1.0,
help="if >=0, inject amplitude jitter after this many seconds")
args = p.parse_args()
random.seed(42)
base_amps = [40.0 + 10.0 * math.cos(k * 0.2) for k in range(args.n_sc)]
base_phases = [0.5 * math.sin(k * 0.3) for k in range(args.n_sc)]
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
period = 1.0 / args.rate_hz
started = time.time()
seq = 0
print(f"emitting CSI to {args.host}:{args.port} at {args.rate_hz} Hz, "
f"{args.n_sc} sc/frame, duration {args.duration_s}s", flush=True)
while True:
elapsed = time.time() - started
if elapsed >= args.duration_s:
break
amps = list(base_amps)
phases = list(base_phases)
# Mild stationary jitter (~0.5 amplitude units RMS)
for k in range(args.n_sc):
amps[k] += random.gauss(0.0, 0.5)
phases[k] += random.gauss(0.0, 0.01)
if args.motion_after_s >= 0 and elapsed >= args.motion_after_s:
for k in range(args.n_sc):
amps[k] += random.gauss(0.0, 8.0)
phases[k] += random.gauss(0.0, 0.3)
pkt = build_packet(node_id=42, seq=seq, freq_mhz=2412, rssi=-55,
amps=amps, phases=phases)
sock.sendto(pkt, (args.host, args.port))
seq += 1
time.sleep(period)
print(f"emitted {seq} frames", flush=True)
if __name__ == "__main__":
main()