feat(sensing-server): route live frames through the governed StreamingEngine

Closes the live-trust-path gap (ADR-136 section 8, beyond-SOTA system review):
the running server fused live CSI with the bare MultistaticFuser, while the
privacy/provenance/witness control plane (ADR-135..146) only ever ran on
synthetic in-test frames. The privacy control plane was therefore bypassable
on the real path.

New engine_bridge module drives StreamingEngine::process_cycle from the
server's live NodeState map, reusing the existing NodeState -> MultiBandCsiFrame
conversion. It lazily wires each contributing node as a WorldGraph sensor
(idempotent), bounds belief growth via the retention cap, and forwards explicit
timestamps/calibration ids so the path stays deterministic and replayable.

Wired additively into both live ESP32/WiFi fusion sites in main.rs via a
split-borrow off the write guard, so person-count behavior is unchanged; the
latest BLAKE3 witness is stored on AppState. Every published belief now carries
evidence + model + calibration + privacy decision and a deterministic witness.

Adds wifi-densepose-engine/-worldgraph/-bfld/-geo deps. 6 new bridge tests
(witnessed belief with full provenance, cross-run determinism, idempotent node
registration, retention bound, privacy-mode propagation). sensing-server suite
430+128 green; workspace gate 2,904 passed / 0 failed.

https://claude.ai/code/session_01MjBucx95K4BuUxZi8NWwRH
This commit is contained in:
Claude
2026-06-10 15:59:47 +00:00
parent d0cf36a0ba
commit a8291d2535
5 changed files with 297 additions and 0 deletions
+3
View File
@@ -20,6 +20,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **CIR estimator warm-start precompute** — the diagonal Tikhonov preconditioner `diag(Φ^H Φ)+λI` and its CSR matrix were rebuilt every frame although they depend only on Φ and λ (fixed at `CirEstimator::new`); now precomputed at construction (`ruvsense/cir.rs`). Bit-identical floats (summation order unchanged, witness chain unaffected). Measured: `cir_estimate/he40` 3.9% (p<0.01), multiband groups 1.2/1.4%; smaller configs within container noise.
- **RF tomography solver hoisting** — ISTA gradient buffer no longer allocated inside the 100-iteration loop, and the Frobenius Lipschitz bound moved from per-`reconstruct` to construction (`ruvsense/tomography.rs`). Bit-identical results.
### Added
- **Live trust path: sensing-server now routes real frames through the governed `StreamingEngine`.** Previously the live server ran only the *bare* `MultistaticFuser` (fused amplitudes, no trust control plane), while the privacy/provenance/witness engine (ADR-135..146) ran only on synthetic in-test frames — the gap called out in ADR-136 §8 and the beyond-SOTA system review (the privacy control plane was bypassable). New `engine_bridge` module drives `StreamingEngine::process_cycle` from the server's live `NodeState` map (reusing the existing `NodeState → MultiBandCsiFrame` conversion), lazily wiring each node as a WorldGraph sensor and bounding belief growth via the retention cap. Wired additively into both live ESP32/WiFi fusion sites in `main.rs` (split-borrow off the write guard; does not alter person-count behavior) and stores the latest BLAKE3 witness on `AppState`. Every published belief now carries evidence + model + calibration + privacy decision and a deterministic witness. Adds `wifi-densepose-engine/-worldgraph/-bfld/-geo` deps. 6 new bridge tests (witnessed belief with provenance, determinism, idempotent node registration, retention bound, privacy-mode propagation); sensing-server suite 430+128 green.
### Fixed
- **WorldGraph no longer grows unboundedly under the live loop.** `StreamingEngine::process_cycle` appended one `SemanticState` belief per cycle with no eviction — ~1.7M nodes/day at 20 Hz (identified in `docs/research/ruview-beyond-sota/04-optimization-roadmap.md`). Added `WorldGraph::prune_semantic_states(max)` — deterministic eviction of the oldest beliefs by `(valid_from_unix_ms, id)`, structural nodes (rooms/zones/sensors/anchors/tracks/events) never eligible — and wired it into the engine after each belief append (`StreamingEngine::DEFAULT_SEMANTIC_RETENTION` = 7,200 ≈ 6 min at 20 Hz; tunable via `set_semantic_retention`). The WorldGraph holds *current* beliefs; durable history is the recorder's job, so no audit data is lost. 3 new tests (bounded growth end-to-end, oldest-only eviction, deterministic tie-break).
- **ESP32 edge heart rate no longer stuck at ~45 BPM / dropping wildly — #987.** The on-device HR estimator (`edge_processing.c`, `0xC5110002`) reported ~45 BPM regardless of true heart rate (Apple-Watch ground truth 87 BPM read as ~45) and swung frame-to-frame. Two root causes: (1) a hardcoded `sample_rate = 10.0f` that became wrong after #985's self-ping raised the CSI callback rate to a variable ~1319 Hz — BPM scales as `assumed/actual × true`, so 87 read ~45 and the reading swung as CSI yield fluctuated; (2) the zero-crossing estimator locked onto a breathing harmonic (a 0.25 Hz breathing fundamental puts its 3rd harmonic at ~0.74 Hz ≈ 44 BPM inside the HR band). Fix: measure the real sample rate from inter-frame timestamps (used for BPM conversion + biquad re-tuning on >15% drift); replace the HR zero-crossing with an autocorrelation estimator that rejects breathing harmonics (driven by a robust autocorr breathing period); median-13 smooth the output. Hardware A/B (fixed vs unmodified control board, both `edge_tier=2`): control pegged 4049 BPM; fixed reaches the true 8891 BPM (vs 87 GT) and holds a stable physiological value (spread 59→0 for a steady subject). Known limitation: heavy subject motion still degrades the estimate (motion gating is a follow-up).
Generated
+4
View File
@@ -11089,9 +11089,13 @@ dependencies = [
"tracing",
"tracing-subscriber",
"ureq 2.12.1",
"wifi-densepose-bfld",
"wifi-densepose-engine",
"wifi-densepose-geo 0.1.0",
"wifi-densepose-hardware",
"wifi-densepose-signal",
"wifi-densepose-wifiscan",
"wifi-densepose-worldgraph 0.3.0",
]
[[package]]
@@ -53,6 +53,15 @@ wifi-densepose-signal = { version = "0.3.1", path = "../wifi-densepose-signal",
# Hardware crate — SyncPacket decoder for ADR-110 §A0.12 mesh-aligned timestamps.
wifi-densepose-hardware = { version = "0.3.0", path = "../wifi-densepose-hardware" }
# Governed streaming engine (ADR-135..146): fusion + privacy demotion +
# WorldGraph belief + deterministic witness. Wiring the live server data through
# this is what makes the trust/privacy control plane non-bypassable (the live
# 20 Hz path) — see engine_bridge.rs.
wifi-densepose-engine = { version = "0.3.0", path = "../wifi-densepose-engine" }
wifi-densepose-worldgraph = { version = "0.3.0", path = "../wifi-densepose-worldgraph" }
wifi-densepose-bfld = { version = "0.3.1", path = "../wifi-densepose-bfld", default-features = false }
wifi-densepose-geo = { version = "0.1.0", path = "../wifi-densepose-geo" }
# midstream — real-time introspection / low-latency tap (ADR-099 D1).
# Two crates only, on purpose: scheduler / neural-solver / strange-loop are
# explicitly out of scope of ADR-099 (D5).
@@ -0,0 +1,233 @@
//! Live trust-path bridge: drive the governed [`StreamingEngine`] from the
//! sensing-server's live `NodeState` map.
//!
//! `multistatic_bridge.rs` already converts `NodeState` → `MultiBandCsiFrame`
//! and runs the *bare* `MultistaticFuser`. That path produces fused amplitudes
//! but skips the trust control plane: privacy demotion on contradiction, the
//! WorldGraph belief with mandatory provenance, and the deterministic witness
//! (ADR-135..146). This bridge routes the same live frames through
//! [`StreamingEngine::process_cycle`], so every published belief carries
//! evidence + model + calibration + privacy decision and a BLAKE3 witness —
//! making the privacy control plane non-bypassable on the live 20 Hz path
//! (the gap called out in ADR-136 §8 and the beyond-SOTA system review).
//!
//! Determinism: this module reads server state and forwards explicit
//! timestamps/calibration ids; it introduces no wall-clock reads of its own, so
//! a given `(frames, calibration, now_ms)` always yields the same
//! [`TrustedOutput`] witness.
use std::collections::HashMap;
use wifi_densepose_bfld::PrivacyMode;
use wifi_densepose_engine::{EngineError, StreamingEngine, TrustedOutput};
use wifi_densepose_geo::types::GeoRegistration;
use wifi_densepose_signal::ruvsense::fusion_quality::CalibrationId;
use wifi_densepose_worldgraph::WorldId;
use super::multistatic_bridge::node_frames_from_states;
use super::NodeState;
/// Owns a [`StreamingEngine`] and the WorldGraph scope (one room + sensor) the
/// live sensing loop publishes beliefs into.
pub struct EngineBridge {
engine: StreamingEngine,
room: WorldId,
/// Nodes already wired into the WorldGraph as sensors (by `node_id`).
registered_nodes: HashMap<u8, WorldId>,
/// Calibration epoch applied to live frames until the ADR-135 baseline
/// stage supplies a real per-node id. Stable so witnesses are reproducible.
calibration: CalibrationId,
}
impl EngineBridge {
/// Build a bridge for one installation. `room_area_id`/`room_name` name the
/// observation scope; `mode` is the starting privacy mode.
pub fn new(mode: PrivacyMode, model_version: u16, room_area_id: &str, room_name: &str) -> Self {
let mut engine = StreamingEngine::new(mode, model_version, GeoRegistration::default());
let room = engine.add_room(room_area_id, room_name);
Self {
engine,
room,
registered_nodes: HashMap::new(),
calibration: CalibrationId(0x5256_0001), // "RV\0\x01" — placeholder epoch
}
}
/// Override the calibration epoch stamped onto live frames (ADR-135).
pub fn set_calibration(&mut self, calibration: CalibrationId) {
self.calibration = calibration;
}
/// Override the WorldGraph belief-retention cap (bounds memory on the live
/// loop; see `WorldGraph::prune_semantic_states`).
pub fn set_semantic_retention(&mut self, max_states: usize) {
self.engine.set_semantic_retention(max_states);
}
/// Switch the active privacy mode (operator/control-plane action).
pub fn set_privacy_mode(&mut self, mode: PrivacyMode) {
self.engine.set_privacy_mode(mode);
}
/// Borrow the engine (queries, WorldGraph snapshot, privacy audit).
pub fn engine(&self) -> &StreamingEngine {
&self.engine
}
/// Number of sensor nodes wired into the WorldGraph so far.
pub fn registered_node_count(&self) -> usize {
self.registered_nodes.len()
}
/// Run one governed trust cycle over the current live node states.
///
/// Returns `None` when no active node yields a frame (nothing to fuse —
/// the engine is not invoked, so no spurious belief is published). On a
/// real cycle it lazily wires any newly-seen node as a WorldGraph sensor,
/// then returns the witnessed [`TrustedOutput`] (or a fusion error).
///
/// `now_ms` is supplied by the caller (the sensing loop's clock), keeping
/// the bridge deterministic and replayable.
pub fn process_cycle_from_states(
&mut self,
node_states: &HashMap<u8, NodeState>,
now_ms: i64,
) -> Option<Result<TrustedOutput, EngineError>> {
let frames = node_frames_from_states(node_states);
if frames.is_empty() {
return None;
}
// Lazily register each contributing node as a sensor observing the room,
// so the privacy rollup can suppress it under identity-strict modes.
for f in &frames {
self.registered_nodes.entry(f.node_id).or_insert_with(|| {
self.engine
.add_sensor(&format!("node-{}", f.node_id), self.room)
});
}
Some(
self.engine
.process_cycle(&frames, self.calibration, self.room, now_ms),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::time::Instant;
use wifi_densepose_bfld::PrivacyClass;
fn node_state_with_history(amp: f64, n_sub: usize) -> NodeState {
let mut ns = NodeState::new();
let frame: Vec<f64> = (0..n_sub).map(|i| amp + 0.1 * i as f64).collect();
ns.frame_history = VecDeque::from(vec![frame]);
ns.last_frame_time = Some(Instant::now());
ns
}
fn two_node_states() -> HashMap<u8, NodeState> {
let mut m = HashMap::new();
m.insert(0u8, node_state_with_history(1.0, 56));
m.insert(1u8, node_state_with_history(1.05, 56));
m
}
#[test]
fn empty_states_produce_no_belief() {
let mut bridge = EngineBridge::new(PrivacyMode::PrivateHome, 1, "living_room", "Living Room");
let out = bridge.process_cycle_from_states(&HashMap::new(), 1_000);
assert!(out.is_none());
// No belief published, no sensor wired.
assert_eq!(bridge.registered_node_count(), 0);
}
#[test]
fn live_cycle_produces_witnessed_belief_with_provenance() {
let mut bridge = EngineBridge::new(PrivacyMode::PrivateHome, 1, "living_room", "Living Room");
let states = two_node_states();
let out = bridge
.process_cycle_from_states(&states, 10_000)
.expect("frames present")
.expect("fusion succeeds");
// Full provenance: evidence + model + calibration + privacy decision.
assert!(!out.provenance.evidence.is_empty());
assert_eq!(out.provenance.model_version, "rfenc-v1");
assert!(out.provenance.calibration_version.starts_with("cal:"));
assert!(out.provenance.privacy_decision.starts_with("PrivateHome/"));
// A witness was produced and the belief is in the WorldGraph.
assert_ne!(out.witness, [0u8; 32]);
assert!(bridge.engine().world().node(out.semantic_id).is_some());
// Both nodes are now wired as sensors.
assert_eq!(bridge.registered_node_count(), 2);
}
#[test]
fn live_path_is_deterministic() {
let states = two_node_states_fixed();
let run = || {
let mut b = EngineBridge::new(PrivacyMode::PrivateHome, 1, "r", "R");
b.process_cycle_from_states(&states, 5_000).unwrap().unwrap()
};
let a = run();
let b = run();
assert_eq!(a.witness, b.witness);
assert_eq!(a.provenance.calibration_version, b.provenance.calibration_version);
assert_eq!(a.effective_class, b.effective_class);
}
// Deterministic node states (no wall-clock in amplitude/history).
fn two_node_states_fixed() -> HashMap<u8, NodeState> {
let mut m = HashMap::new();
for (id, amp) in [(0u8, 1.0_f64), (1u8, 1.05)] {
let mut ns = NodeState::new();
ns.frame_history = VecDeque::from(vec![(0..56)
.map(|i| amp + 0.1 * i as f64)
.collect::<Vec<f64>>()]);
ns.last_frame_time = Some(Instant::now());
m.insert(id, ns);
}
m
}
#[test]
fn nodes_registered_once_across_cycles() {
let mut bridge = EngineBridge::new(PrivacyMode::PrivateHome, 1, "r", "R");
let states = two_node_states();
bridge.process_cycle_from_states(&states, 1_000);
bridge.process_cycle_from_states(&states, 2_000);
bridge.process_cycle_from_states(&states, 3_000);
// Still exactly two sensors — idempotent registration.
assert_eq!(bridge.registered_node_count(), 2);
}
#[test]
fn retention_bounds_world_graph_growth() {
let mut bridge = EngineBridge::new(PrivacyMode::PrivateHome, 1, "r", "R");
bridge.set_semantic_retention(5);
let states = two_node_states();
for i in 0..20i64 {
bridge.process_cycle_from_states(&states, 1_000 + i * 50);
}
// room + 2 sensors + at most 5 retained beliefs.
assert!(bridge.engine().world().node_count() <= 3 + 5);
}
#[test]
fn identity_strict_mode_is_carried_into_provenance() {
let mut bridge = EngineBridge::new(PrivacyMode::PrivateHome, 1, "r", "R");
bridge.set_privacy_mode(PrivacyMode::StrictNoIdentity);
let out = bridge
.process_cycle_from_states(&two_node_states(), 7_000)
.unwrap()
.unwrap();
assert!(out.provenance.privacy_decision.starts_with("StrictNoIdentity/"));
// Effective class is a valid privacy class (sanity).
let _ = matches!(
out.effective_class,
PrivacyClass::Raw | PrivacyClass::Derived | PrivacyClass::Anonymous | PrivacyClass::Restricted
);
}
}
@@ -12,6 +12,7 @@
mod adaptive_classifier;
pub mod cli;
pub mod csi;
mod engine_bridge;
mod field_bridge;
mod multistatic_bridge;
pub mod pose;
@@ -988,6 +989,12 @@ struct AppStateInner {
last_tracker_instant: Option<std::time::Instant>,
/// Attention-weighted multi-node CSI fusion engine.
multistatic_fuser: MultistaticFuser,
/// Governed trust-path bridge (ADR-135..146): runs the same live frames
/// through the privacy/provenance/witness control plane. Additive — does not
/// affect person-count behavior; produces the auditable belief + witness.
engine_bridge: engine_bridge::EngineBridge,
/// Witness of the most recent governed trust cycle (BLAKE3), for audit/UI.
pub(crate) last_trust_witness: Option<[u8; 32]>,
/// SVD-based room field model for eigenvalue person counting (None until calibration).
field_model: Option<FieldModel>,
// ── ADR-044 §5.2: adaptive rolling-p95 normalization ─────────────────────
@@ -4986,6 +4993,23 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
0
};
// Governed trust cycle (ADR-135..146): run the same live
// frames through the privacy/provenance/witness control
// plane. Split-borrow the two distinct fields off the guard.
{
let sref: &mut AppStateInner = &mut s;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
if let Some(Ok(trust)) = sref
.engine_bridge
.process_cycle_from_states(&sref.node_states, now_ms)
{
sref.last_trust_witness = Some(trust.witness);
}
}
// Feed field model calibration if active (use per-node history for ESP32).
if let Some(frame_history) = s
.node_states
@@ -5410,6 +5434,23 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
0
};
// Governed trust cycle (ADR-135..146): run the same live
// frames through the privacy/provenance/witness control
// plane. Split-borrow the two distinct fields off the guard.
{
let sref: &mut AppStateInner = &mut s;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
if let Some(Ok(trust)) = sref
.engine_bridge
.process_cycle_from_states(&sref.node_states, now_ms)
{
sref.last_trust_witness = Some(trust.witness);
}
}
// Feed field model calibration if active (use per-node history for ESP32).
if let Some(frame_history) = s
.node_states
@@ -6721,6 +6762,13 @@ async fn main() {
}
fuser
},
engine_bridge: engine_bridge::EngineBridge::new(
wifi_densepose_bfld::PrivacyMode::PrivateHome,
1,
"default",
"Default Room",
),
last_trust_witness: None,
field_model: if args.calibrate {
info!("Field model calibration enabled — room should be empty during startup");
FieldModel::new(field_bridge::single_link_config()).ok()