mirror of
https://github.com/ruvnet/RuView
synced 2026-06-09 10:13:17 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f927aaedb | |||
| 635c152e61 |
@@ -1,7 +1,7 @@
|
||||
# π RuView
|
||||
|
||||
<p align="center">
|
||||
<a href="https://ruvnet.github.io/RuView/">
|
||||
<a href="https://x.com/rUv/status/2037556932802761004">
|
||||
<img src="assets/ruview-small-gemini.jpg" alt="RuView - WiFi DensePose" width="100%">
|
||||
</a>
|
||||
</p>
|
||||
|
||||
@@ -0,0 +1,182 @@
|
||||
# ADR-068: Per-Node State Pipeline for Multi-Node Sensing
|
||||
|
||||
| Field | Value |
|
||||
|------------|-------------------------------------|
|
||||
| Status | Accepted |
|
||||
| Date | 2026-03-27 |
|
||||
| Authors | rUv, claude-flow |
|
||||
| Drivers | #249, #237, #276, #282 |
|
||||
| Supersedes | — |
|
||||
|
||||
## Context
|
||||
|
||||
The sensing server (`wifi-densepose-sensing-server`) was originally designed for
|
||||
single-node operation. When multiple ESP32 nodes send CSI frames simultaneously,
|
||||
all data is mixed into a single shared pipeline:
|
||||
|
||||
- **One** `frame_history` VecDeque for all nodes
|
||||
- **One** `smoothed_person_score` / `smoothed_motion` / vital sign buffers
|
||||
- **One** baseline and debounce state
|
||||
|
||||
This means the classification, person count, and vital signs reported to the UI
|
||||
are an uncontrolled aggregate of all nodes' data. The result: the detection
|
||||
window shows identical output regardless of how many nodes are deployed, where
|
||||
people stand, or how many people are in the room (#249 — 24 comments, the most
|
||||
reported issue).
|
||||
|
||||
### Root Cause Verified
|
||||
|
||||
Investigation of `AppStateInner` (main.rs lines 279-367) confirmed:
|
||||
|
||||
| Shared field | Impact |
|
||||
|---------------------------|--------------------------------------------|
|
||||
| `frame_history` | Temporal analysis mixes all nodes' CSI data |
|
||||
| `smoothed_person_score` | Person count aggregates all nodes |
|
||||
| `smoothed_motion` | Motion classification undifferentiated |
|
||||
| `smoothed_hr` / `br` | Vital signs are global, not per-node |
|
||||
| `baseline_motion` | Adaptive baseline learned from mixed data |
|
||||
| `debounce_counter` | All nodes share debounce state |
|
||||
|
||||
## Decision
|
||||
|
||||
Introduce **per-node state tracking** via a `HashMap<u8, NodeState>` in
|
||||
`AppStateInner`. Each ESP32 node (identified by its `node_id` byte) gets an
|
||||
independent sensing pipeline with its own temporal history, smoothing buffers,
|
||||
baseline, and classification state.
|
||||
|
||||
### Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────┐
|
||||
UDP frames │ AppStateInner │
|
||||
───────────► │ │
|
||||
node_id=1 ──► │ node_states: HashMap<u8, NodeState> │
|
||||
node_id=2 ──► │ ├── 1: NodeState { frame_history, │
|
||||
node_id=3 ──► │ │ smoothed_motion, vitals, ... }│
|
||||
│ ├── 2: NodeState { ... } │
|
||||
│ └── 3: NodeState { ... } │
|
||||
│ │
|
||||
│ ┌── Per-Node Pipeline ──┐ │
|
||||
│ │ extract_features() │ │
|
||||
│ │ smooth_and_classify() │ │
|
||||
│ │ smooth_vitals() │ │
|
||||
│ │ score_to_person_count()│ │
|
||||
│ └────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌── Multi-Node Fusion ──┐ │
|
||||
│ │ Aggregate person count │ │
|
||||
│ │ Per-node classification│ │
|
||||
│ │ All-nodes WebSocket msg│ │
|
||||
│ └────────────────────────┘ │
|
||||
│ │
|
||||
│ ──► WebSocket broadcast (sensing_update) │
|
||||
└─────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### NodeState Struct
|
||||
|
||||
```rust
|
||||
struct NodeState {
|
||||
frame_history: VecDeque<Vec<f64>>,
|
||||
smoothed_person_score: f64,
|
||||
prev_person_count: usize,
|
||||
smoothed_motion: f64,
|
||||
current_motion_level: String,
|
||||
debounce_counter: u32,
|
||||
debounce_candidate: String,
|
||||
baseline_motion: f64,
|
||||
baseline_frames: u64,
|
||||
smoothed_hr: f64,
|
||||
smoothed_br: f64,
|
||||
smoothed_hr_conf: f64,
|
||||
smoothed_br_conf: f64,
|
||||
hr_buffer: VecDeque<f64>,
|
||||
br_buffer: VecDeque<f64>,
|
||||
rssi_history: VecDeque<f64>,
|
||||
vital_detector: VitalSignDetector,
|
||||
latest_vitals: VitalSigns,
|
||||
last_frame_time: Option<std::time::Instant>,
|
||||
edge_vitals: Option<Esp32VitalsPacket>,
|
||||
}
|
||||
```
|
||||
|
||||
### Multi-Node Aggregation
|
||||
|
||||
- **Person count**: Sum of per-node `prev_person_count` for active nodes
|
||||
(seen within last 10 seconds).
|
||||
- **Classification**: Per-node classification included in `SensingUpdate.nodes`.
|
||||
- **Vital signs**: Per-node vital signs; UI can render per-node or aggregate.
|
||||
- **Signal field**: Generated from the most-recently-updated node's features.
|
||||
- **Stale nodes**: Nodes with no frame for >10 seconds are excluded from
|
||||
aggregation and marked offline (consistent with PR #300).
|
||||
|
||||
### Backward Compatibility
|
||||
|
||||
- The simulated data path (`simulated_data_task`) continues using global state.
|
||||
- Single-node deployments behave identically (HashMap has one entry).
|
||||
- The WebSocket message format (`sensing_update`) remains the same but the
|
||||
`nodes` array now contains all active nodes, and `estimated_persons` reflects
|
||||
the cross-node aggregate.
|
||||
- The edge vitals path (#323 fix) also uses per-node state.
|
||||
|
||||
## Scaling Characteristics
|
||||
|
||||
| Nodes | Per-Node Memory | Total Overhead | Notes |
|
||||
|-------|----------------|----------------|-------|
|
||||
| 1 | ~50 KB | ~50 KB | Identical to current |
|
||||
| 3 | ~50 KB | ~150 KB | Typical home setup |
|
||||
| 10 | ~50 KB | ~500 KB | Small office |
|
||||
| 50 | ~50 KB | ~2.5 MB | Building floor |
|
||||
| 100 | ~50 KB | ~5 MB | Large deployment |
|
||||
| 256 | ~50 KB | ~12.8 MB | Max (u8 node_id) |
|
||||
|
||||
Memory is dominated by `frame_history` (100 frames x ~500 bytes each = ~50 KB
|
||||
per node). This scales linearly and fits comfortably in server memory even at
|
||||
256 nodes.
|
||||
|
||||
## QEMU Validation
|
||||
|
||||
The existing QEMU swarm infrastructure (ADR-062, `scripts/qemu_swarm.py`)
|
||||
supports multi-node simulation with configurable topologies:
|
||||
|
||||
- `star`: Central coordinator + sensor nodes
|
||||
- `mesh`: Fully connected peer network
|
||||
- `line`: Sequential chain
|
||||
- `ring`: Circular topology
|
||||
|
||||
Each QEMU instance runs with a unique `node_id` via NVS provisioning. The
|
||||
swarm health validator (`scripts/swarm_health.py`) checks per-node UART output.
|
||||
|
||||
Validation plan:
|
||||
1. QEMU swarm with 3-5 nodes in mesh topology
|
||||
2. Verify server produces distinct per-node classifications
|
||||
3. Verify aggregate person count reflects multi-node contributions
|
||||
4. Verify stale-node eviction after timeout
|
||||
|
||||
## Consequences
|
||||
|
||||
### Positive
|
||||
- Each node's CSI data is processed independently — no cross-contamination
|
||||
- Person count scales with the number of deployed nodes
|
||||
- Vital signs are per-node, enabling room-level health monitoring
|
||||
- Foundation for spatial localization (per-node positions + triangulation)
|
||||
- Scales to 256 nodes with <13 MB memory overhead
|
||||
|
||||
### Negative
|
||||
- Slightly more memory per node (~50 KB each)
|
||||
- `smooth_and_classify_node` function duplicates some logic from global version
|
||||
- Per-node `VitalSignDetector` instances add CPU cost proportional to node count
|
||||
|
||||
### Risks
|
||||
- Node ID collisions (mitigated by NVS persistence since v0.5.0)
|
||||
- HashMap growth without cleanup (mitigated by stale-node eviction)
|
||||
|
||||
## References
|
||||
|
||||
- Issue #249: Detection window same regardless (24 comments)
|
||||
- Issue #237: Same display for 0/1/2 people (12 comments)
|
||||
- Issue #276: Only one can be detected (8 comments)
|
||||
- Issue #282: Detection fail (5 comments)
|
||||
- PR #295: Hysteresis smoothing (partial mitigation)
|
||||
- PR #300: ESP32 offline detection after 5s
|
||||
- ADR-062: QEMU Swarm Configurator
|
||||
@@ -16,7 +16,7 @@ mod vital_signs;
|
||||
// Training pipeline modules (exposed via lib.rs)
|
||||
use wifi_densepose_sensing_server::{graph_transformer, trainer, dataset, embedding};
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -275,6 +275,59 @@ struct BoundingBox {
|
||||
height: f64,
|
||||
}
|
||||
|
||||
/// Per-node sensing state for multi-node deployments (issue #249).
|
||||
/// Each ESP32 node gets its own frame history, smoothing buffers, and vital
|
||||
/// sign detector so that data from different nodes is never mixed.
|
||||
struct NodeState {
|
||||
frame_history: VecDeque<Vec<f64>>,
|
||||
smoothed_person_score: f64,
|
||||
prev_person_count: usize,
|
||||
smoothed_motion: f64,
|
||||
current_motion_level: String,
|
||||
debounce_counter: u32,
|
||||
debounce_candidate: String,
|
||||
baseline_motion: f64,
|
||||
baseline_frames: u64,
|
||||
smoothed_hr: f64,
|
||||
smoothed_br: f64,
|
||||
smoothed_hr_conf: f64,
|
||||
smoothed_br_conf: f64,
|
||||
hr_buffer: VecDeque<f64>,
|
||||
br_buffer: VecDeque<f64>,
|
||||
rssi_history: VecDeque<f64>,
|
||||
vital_detector: VitalSignDetector,
|
||||
latest_vitals: VitalSigns,
|
||||
last_frame_time: Option<std::time::Instant>,
|
||||
edge_vitals: Option<Esp32VitalsPacket>,
|
||||
}
|
||||
|
||||
impl NodeState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
frame_history: VecDeque::new(),
|
||||
smoothed_person_score: 0.0,
|
||||
prev_person_count: 0,
|
||||
smoothed_motion: 0.0,
|
||||
current_motion_level: "absent".to_string(),
|
||||
debounce_counter: 0,
|
||||
debounce_candidate: "absent".to_string(),
|
||||
baseline_motion: 0.0,
|
||||
baseline_frames: 0,
|
||||
smoothed_hr: 0.0,
|
||||
smoothed_br: 0.0,
|
||||
smoothed_hr_conf: 0.0,
|
||||
smoothed_br_conf: 0.0,
|
||||
hr_buffer: VecDeque::with_capacity(8),
|
||||
br_buffer: VecDeque::with_capacity(8),
|
||||
rssi_history: VecDeque::new(),
|
||||
vital_detector: VitalSignDetector::new(10.0),
|
||||
latest_vitals: VitalSigns::default(),
|
||||
last_frame_time: None,
|
||||
edge_vitals: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared application state
|
||||
struct AppStateInner {
|
||||
latest_update: Option<SensingUpdate>,
|
||||
@@ -364,6 +417,10 @@ struct AppStateInner {
|
||||
// ── Adaptive classifier (environment-tuned) ──────────────────────────
|
||||
/// Trained adaptive model (loaded from data/adaptive_model.json or trained at runtime).
|
||||
adaptive_model: Option<adaptive_classifier::AdaptiveModel>,
|
||||
// ── Per-node state (issue #249) ─────────────────────────────────────
|
||||
/// Per-node sensing state for multi-node deployments.
|
||||
/// Keyed by `node_id` from the ESP32 frame header.
|
||||
node_states: HashMap<u8, NodeState>,
|
||||
}
|
||||
|
||||
/// If no ESP32 frame arrives within this duration, source reverts to offline.
|
||||
@@ -964,6 +1021,44 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo,
|
||||
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
|
||||
}
|
||||
|
||||
/// Per-node variant of `smooth_and_classify` that operates on a `NodeState`
|
||||
/// instead of `AppStateInner` (issue #249).
|
||||
fn smooth_and_classify_node(ns: &mut NodeState, raw: &mut ClassificationInfo, raw_motion: f64) {
|
||||
ns.baseline_frames += 1;
|
||||
if ns.baseline_frames < BASELINE_WARMUP {
|
||||
ns.baseline_motion = ns.baseline_motion * 0.9 + raw_motion * 0.1;
|
||||
} else if raw_motion < ns.smoothed_motion + 0.05 {
|
||||
ns.baseline_motion = ns.baseline_motion * (1.0 - BASELINE_EMA_ALPHA)
|
||||
+ raw_motion * BASELINE_EMA_ALPHA;
|
||||
}
|
||||
|
||||
let adjusted = (raw_motion - ns.baseline_motion * 0.7).max(0.0);
|
||||
|
||||
ns.smoothed_motion = ns.smoothed_motion * (1.0 - MOTION_EMA_ALPHA)
|
||||
+ adjusted * MOTION_EMA_ALPHA;
|
||||
let sm = ns.smoothed_motion;
|
||||
|
||||
let candidate = raw_classify(sm);
|
||||
|
||||
if candidate == ns.current_motion_level {
|
||||
ns.debounce_counter = 0;
|
||||
ns.debounce_candidate = candidate;
|
||||
} else if candidate == ns.debounce_candidate {
|
||||
ns.debounce_counter += 1;
|
||||
if ns.debounce_counter >= DEBOUNCE_FRAMES {
|
||||
ns.current_motion_level = candidate;
|
||||
ns.debounce_counter = 0;
|
||||
}
|
||||
} else {
|
||||
ns.debounce_candidate = candidate;
|
||||
ns.debounce_counter = 1;
|
||||
}
|
||||
|
||||
raw.motion_level = ns.current_motion_level.clone();
|
||||
raw.presence = sm > 0.03;
|
||||
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
|
||||
}
|
||||
|
||||
/// If an adaptive model is loaded, override the classification with the
|
||||
/// model's prediction. Uses the full 15-feature vector for higher accuracy.
|
||||
fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) {
|
||||
@@ -1064,6 +1159,55 @@ fn smooth_vitals(state: &mut AppStateInner, raw: &VitalSigns) -> VitalSigns {
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-node variant of `smooth_vitals` that operates on a `NodeState` (issue #249).
|
||||
fn smooth_vitals_node(ns: &mut NodeState, raw: &VitalSigns) -> VitalSigns {
|
||||
let raw_hr = raw.heart_rate_bpm.unwrap_or(0.0);
|
||||
let raw_br = raw.breathing_rate_bpm.unwrap_or(0.0);
|
||||
|
||||
let hr_ok = ns.smoothed_hr < 1.0 || (raw_hr - ns.smoothed_hr).abs() < HR_MAX_JUMP;
|
||||
let br_ok = ns.smoothed_br < 1.0 || (raw_br - ns.smoothed_br).abs() < BR_MAX_JUMP;
|
||||
|
||||
if hr_ok && raw_hr > 0.0 {
|
||||
ns.hr_buffer.push_back(raw_hr);
|
||||
if ns.hr_buffer.len() > VITAL_MEDIAN_WINDOW { ns.hr_buffer.pop_front(); }
|
||||
}
|
||||
if br_ok && raw_br > 0.0 {
|
||||
ns.br_buffer.push_back(raw_br);
|
||||
if ns.br_buffer.len() > VITAL_MEDIAN_WINDOW { ns.br_buffer.pop_front(); }
|
||||
}
|
||||
|
||||
let trimmed_hr = trimmed_mean(&ns.hr_buffer);
|
||||
let trimmed_br = trimmed_mean(&ns.br_buffer);
|
||||
|
||||
if trimmed_hr > 0.0 {
|
||||
if ns.smoothed_hr < 1.0 {
|
||||
ns.smoothed_hr = trimmed_hr;
|
||||
} else if (trimmed_hr - ns.smoothed_hr).abs() > HR_DEAD_BAND {
|
||||
ns.smoothed_hr = ns.smoothed_hr * (1.0 - VITAL_EMA_ALPHA)
|
||||
+ trimmed_hr * VITAL_EMA_ALPHA;
|
||||
}
|
||||
}
|
||||
if trimmed_br > 0.0 {
|
||||
if ns.smoothed_br < 1.0 {
|
||||
ns.smoothed_br = trimmed_br;
|
||||
} else if (trimmed_br - ns.smoothed_br).abs() > BR_DEAD_BAND {
|
||||
ns.smoothed_br = ns.smoothed_br * (1.0 - VITAL_EMA_ALPHA)
|
||||
+ trimmed_br * VITAL_EMA_ALPHA;
|
||||
}
|
||||
}
|
||||
|
||||
ns.smoothed_hr_conf = ns.smoothed_hr_conf * 0.92 + raw.heartbeat_confidence * 0.08;
|
||||
ns.smoothed_br_conf = ns.smoothed_br_conf * 0.92 + raw.breathing_confidence * 0.08;
|
||||
|
||||
VitalSigns {
|
||||
breathing_rate_bpm: if ns.smoothed_br > 1.0 { Some(ns.smoothed_br) } else { None },
|
||||
heart_rate_bpm: if ns.smoothed_hr > 1.0 { Some(ns.smoothed_hr) } else { None },
|
||||
breathing_confidence: ns.smoothed_br_conf,
|
||||
heartbeat_confidence: ns.smoothed_hr_conf,
|
||||
signal_quality: raw.signal_quality,
|
||||
}
|
||||
}
|
||||
|
||||
/// Trimmed mean: sort, drop top/bottom 25%, average the middle 50%.
|
||||
/// More robust than median (uses more data) and less noisy than raw mean.
|
||||
fn trimmed_mean(buf: &VecDeque<f64>) -> f64 {
|
||||
@@ -2827,6 +2971,23 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
// "no detection" because it only renders sensing_update msgs.
|
||||
s.source = "esp32".to_string();
|
||||
s.last_esp32_frame = Some(std::time::Instant::now());
|
||||
|
||||
// ── Per-node state for edge vitals (issue #249) ──────
|
||||
let node_id = vitals.node_id;
|
||||
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
|
||||
ns.last_frame_time = Some(std::time::Instant::now());
|
||||
ns.edge_vitals = Some(vitals.clone());
|
||||
ns.rssi_history.push_back(vitals.rssi as f64);
|
||||
if ns.rssi_history.len() > 60 { ns.rssi_history.pop_front(); }
|
||||
|
||||
// Store per-node person count from edge vitals.
|
||||
let node_est = if vitals.presence {
|
||||
(vitals.n_persons as usize).max(1)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
ns.prev_person_count = node_est;
|
||||
|
||||
s.tick += 1;
|
||||
let tick = s.tick;
|
||||
|
||||
@@ -2836,11 +2997,25 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
let motion_score = if vitals.motion { 0.8 }
|
||||
else if vitals.presence { 0.3 }
|
||||
else { 0.05 };
|
||||
let est_persons = if vitals.presence {
|
||||
(vitals.n_persons as usize).max(1)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
// Aggregate person count across all active nodes.
|
||||
let now = std::time::Instant::now();
|
||||
let total_persons: usize = s.node_states.values()
|
||||
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
|
||||
.map(|n| n.prev_person_count)
|
||||
.sum();
|
||||
|
||||
// Build nodes array with all active nodes.
|
||||
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
|
||||
.filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
|
||||
.map(|(&id, n)| NodeInfo {
|
||||
node_id: id,
|
||||
rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
|
||||
position: [2.0, 0.0, 1.5],
|
||||
amplitude: vec![],
|
||||
subcarrier_count: 0,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let features = FeatureInfo {
|
||||
mean_rssi: vitals.rssi as f64,
|
||||
@@ -2866,13 +3041,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
|
||||
source: "esp32".to_string(),
|
||||
tick,
|
||||
nodes: vec![NodeInfo {
|
||||
node_id: vitals.node_id,
|
||||
rssi_dbm: vitals.rssi as f64,
|
||||
position: [2.0, 0.0, 1.5],
|
||||
amplitude: vec![],
|
||||
subcarrier_count: 0,
|
||||
}],
|
||||
nodes: active_nodes,
|
||||
features: features.clone(),
|
||||
classification,
|
||||
signal_field,
|
||||
@@ -2892,7 +3061,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
pose_keypoints: None,
|
||||
model_status: None,
|
||||
persons: None,
|
||||
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
|
||||
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
|
||||
};
|
||||
|
||||
let persons = derive_pose_from_sensing(&update);
|
||||
@@ -2935,24 +3104,90 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
s.source = "esp32".to_string();
|
||||
s.last_esp32_frame = Some(std::time::Instant::now());
|
||||
|
||||
// Append current amplitudes to history before extracting features so
|
||||
// that temporal analysis includes the most recent frame.
|
||||
// Also maintain global frame_history for backward compat
|
||||
// (simulation path, REST endpoints, etc.).
|
||||
s.frame_history.push_back(frame.amplitudes.clone());
|
||||
if s.frame_history.len() > FRAME_HISTORY_CAPACITY {
|
||||
s.frame_history.pop_front();
|
||||
}
|
||||
|
||||
let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come
|
||||
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
|
||||
extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz);
|
||||
smooth_and_classify(&mut s, &mut classification, raw_motion);
|
||||
adaptive_override(&s, &features, &mut classification);
|
||||
// ── Per-node processing (issue #249) ──────────────────
|
||||
// Process entirely within per-node state so different
|
||||
// ESP32 nodes never mix their smoothing/vitals buffers.
|
||||
// We scope the mutable borrow of node_states so we can
|
||||
// access other AppStateInner fields afterward.
|
||||
let node_id = frame.node_id;
|
||||
let adaptive_model_ref = s.adaptive_model.as_ref().map(|m| m as *const _);
|
||||
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
|
||||
ns.last_frame_time = Some(std::time::Instant::now());
|
||||
|
||||
ns.frame_history.push_back(frame.amplitudes.clone());
|
||||
if ns.frame_history.len() > FRAME_HISTORY_CAPACITY {
|
||||
ns.frame_history.pop_front();
|
||||
}
|
||||
|
||||
let sample_rate_hz = 1000.0 / 500.0_f64;
|
||||
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
|
||||
extract_features_from_frame(&frame, &ns.frame_history, sample_rate_hz);
|
||||
smooth_and_classify_node(ns, &mut classification, raw_motion);
|
||||
|
||||
// SAFETY: adaptive_model_ref points into s which we hold
|
||||
// via write lock; the model is not mutated here. We use a
|
||||
// raw pointer to break the borrow-checker deadlock between
|
||||
// node_states and adaptive_model (both inside s).
|
||||
if let Some(model_ptr) = adaptive_model_ref {
|
||||
let model: &adaptive_classifier::AdaptiveModel = unsafe { &*model_ptr };
|
||||
let amps = ns.frame_history.back()
|
||||
.map(|v| v.as_slice())
|
||||
.unwrap_or(&[]);
|
||||
let feat_arr = adaptive_classifier::features_from_runtime(
|
||||
&serde_json::json!({
|
||||
"variance": features.variance,
|
||||
"motion_band_power": features.motion_band_power,
|
||||
"breathing_band_power": features.breathing_band_power,
|
||||
"spectral_power": features.spectral_power,
|
||||
"dominant_freq_hz": features.dominant_freq_hz,
|
||||
"change_points": features.change_points,
|
||||
"mean_rssi": features.mean_rssi,
|
||||
}),
|
||||
amps,
|
||||
);
|
||||
let (label, conf) = model.classify(&feat_arr);
|
||||
classification.motion_level = label.to_string();
|
||||
classification.presence = label != "absent";
|
||||
classification.confidence = (conf * 0.7 + classification.confidence * 0.3).clamp(0.0, 1.0);
|
||||
}
|
||||
|
||||
ns.rssi_history.push_back(features.mean_rssi);
|
||||
if ns.rssi_history.len() > 60 {
|
||||
ns.rssi_history.pop_front();
|
||||
}
|
||||
|
||||
let raw_vitals = ns.vital_detector.process_frame(
|
||||
&frame.amplitudes,
|
||||
&frame.phases,
|
||||
);
|
||||
let vitals = smooth_vitals_node(ns, &raw_vitals);
|
||||
ns.latest_vitals = vitals.clone();
|
||||
|
||||
let raw_score = compute_person_score(&features);
|
||||
ns.smoothed_person_score = ns.smoothed_person_score * 0.90 + raw_score * 0.10;
|
||||
if classification.presence {
|
||||
let count = score_to_person_count(ns.smoothed_person_score, ns.prev_person_count);
|
||||
ns.prev_person_count = count;
|
||||
} else {
|
||||
ns.prev_person_count = 0;
|
||||
}
|
||||
|
||||
// Done with per-node mutable borrow; now read aggregated
|
||||
// state from all nodes (the borrow of `ns` ends here).
|
||||
// (We re-borrow node_states immutably via `s` below.)
|
||||
|
||||
// Update RSSI history
|
||||
s.rssi_history.push_back(features.mean_rssi);
|
||||
if s.rssi_history.len() > 60 {
|
||||
s.rssi_history.pop_front();
|
||||
}
|
||||
s.latest_vitals = vitals.clone();
|
||||
|
||||
s.tick += 1;
|
||||
let tick = s.tick;
|
||||
@@ -2961,37 +3196,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
else if classification.motion_level == "present_still" { 0.3 }
|
||||
else { 0.05 };
|
||||
|
||||
let raw_vitals = s.vital_detector.process_frame(
|
||||
&frame.amplitudes,
|
||||
&frame.phases,
|
||||
);
|
||||
let vitals = smooth_vitals(&mut s, &raw_vitals);
|
||||
s.latest_vitals = vitals.clone();
|
||||
// Aggregate person count across all active nodes.
|
||||
let now = std::time::Instant::now();
|
||||
let total_persons: usize = s.node_states.values()
|
||||
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
|
||||
.map(|n| n.prev_person_count)
|
||||
.sum();
|
||||
|
||||
// Multi-person estimation with temporal smoothing (EMA α=0.10).
|
||||
let raw_score = compute_person_score(&features);
|
||||
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
|
||||
let est_persons = if classification.presence {
|
||||
let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
|
||||
s.prev_person_count = count;
|
||||
count
|
||||
} else {
|
||||
s.prev_person_count = 0;
|
||||
0
|
||||
};
|
||||
// Build nodes array with all active nodes.
|
||||
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
|
||||
.filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
|
||||
.map(|(&id, n)| NodeInfo {
|
||||
node_id: id,
|
||||
rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
|
||||
position: [2.0, 0.0, 1.5],
|
||||
amplitude: n.frame_history.back()
|
||||
.map(|a| a.iter().take(56).cloned().collect())
|
||||
.unwrap_or_default(),
|
||||
subcarrier_count: n.frame_history.back().map_or(0, |a| a.len()),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut update = SensingUpdate {
|
||||
msg_type: "sensing_update".to_string(),
|
||||
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
|
||||
source: "esp32".to_string(),
|
||||
tick,
|
||||
nodes: vec![NodeInfo {
|
||||
node_id: frame.node_id,
|
||||
rssi_dbm: features.mean_rssi,
|
||||
position: [2.0, 0.0, 1.5],
|
||||
amplitude: frame.amplitudes.iter().take(56).cloned().collect(),
|
||||
subcarrier_count: frame.n_subcarriers as usize,
|
||||
}],
|
||||
nodes: active_nodes,
|
||||
features: features.clone(),
|
||||
classification,
|
||||
signal_field: generate_signal_field(
|
||||
@@ -3008,7 +3239,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||
pose_keypoints: None,
|
||||
model_status: None,
|
||||
persons: None,
|
||||
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
|
||||
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
|
||||
};
|
||||
|
||||
let persons = derive_pose_from_sensing(&update);
|
||||
@@ -3760,6 +3991,7 @@ async fn main() {
|
||||
m.trained_frames, m.training_accuracy * 100.0);
|
||||
m
|
||||
}),
|
||||
node_states: HashMap::new(),
|
||||
}));
|
||||
|
||||
// Start background tasks based on source
|
||||
|
||||
Reference in New Issue
Block a user