Compare commits

...

7 Commits

Author SHA1 Message Date
ruv 8f927aaedb feat(server): per-node state pipeline for multi-node sensing (ADR-068, #249)
Replaces the single shared state pipeline with per-node HashMap<u8, NodeState>.
Each ESP32 node now gets independent:
- frame_history (temporal analysis)
- smoothed_person_score / prev_person_count
- smoothed_motion / baseline / debounce state
- vital sign detector + smoothing buffers
- RSSI history

Multi-node aggregation:
- Person count = sum of per-node counts for active nodes (seen <10s)
- SensingUpdate.nodes includes all active nodes
- estimated_persons reflects cross-node aggregate

Single-node deployments behave identically (HashMap has one entry).
Simulated data path unchanged for backward compatibility.

Closes #249
Refs #237, #276, #282

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-03-27 17:51:43 -04:00
ruv 635c152e61 docs(adr): ADR-068 per-node state pipeline for multi-node sensing (#249)
Documents the architectural change from single shared state to per-node
HashMap<u8, NodeState> in the sensing server. Includes scaling analysis
(256 nodes < 13 MB), QEMU validation plan, and aggregation strategy.

Also links README hero image to the explainer video.

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-03-27 17:45:23 -04:00
ruv c2e564a9f4 docs(readme): expand alpha notice with known limitations
List specific known issues (multi-node detection, training plateau,
no pre-trained weights, hardware compatibility) to set expectations
for new users.

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-03-27 17:40:39 -04:00
rUv 40f19622af fix(firmware,server): watchdog crash + no detection from edge vitals (#321, #323)
* fix(firmware,server): watchdog crash on busy LANs + no detection from edge vitals (#321, #323)

**Firmware (#321):** edge_dsp task now batch-limits frame processing to 4
frames before a 10ms yield. On corporate LANs with high CSI frame rates,
the previous 1-tick-per-frame yield wasn't enough to prevent IDLE1
starvation and task watchdog triggers.

**Sensing server (#323):** When ESP32 runs the edge DSP pipeline (Tier 2+),
it sends vitals packets (magic 0xC5110002) instead of raw CSI frames.
Previously, the server broadcast these as raw edge_vitals but never
generated a sensing_update, so the UI showed "connected" but "0 persons".
Now synthesizes a full sensing_update from vitals data including
classification, person count, and pose generation.

Closes #321
Closes #323

Co-Authored-By: claude-flow <ruv@ruv.net>

* fix(firmware): address review findings — idle busy-spin and observability

- Fix pdMS_TO_TICKS(5)==0 at 100Hz causing busy-spin in idle path (use
  vTaskDelay(1) instead)
- Post-batch yield now 2 ticks (20ms) for genuinely longer pause
- Add s_ring_drops counter to ring_push for diagnosing frame drops
- Expose drop count in periodic vitals log line

Co-Authored-By: claude-flow <ruv@ruv.net>

* fix(server): set breathing_band_power for skeleton animation from vitals

When presence is detected via edge vitals, set breathing_band_power to
0.5 so the UI's torso breathing animation works. Previously hardcoded
to 0.0 which made the skeleton appear static even when breathing rate
was being reported.

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-03-27 17:31:06 -04:00
rUv 022499b2f5 fix: add wifi_densepose package for correct module import (#314)
The README Quick Start tells users to `pip install wifi-densepose` and then
`from wifi_densepose import WiFiDensePose`, but no `wifi_densepose` Python
package existed — only `v1/src`. This adds a top-level `wifi_densepose/`
package with a WiFiDensePose facade class matching the documented API, and
updates pyproject.toml to include it in the distribution.

Closes #314
2026-03-27 17:31:03 -04:00
rUv e6068c5efe Enhance README with Cognitum.One reference
Updated project description to include Cognitum.One.
2026-03-25 21:21:58 -04:00
rUv 7a13877fa3 fix(sensing-server): detect ESP32 offline after 5s frame timeout (#300)
The source field was set to "esp32" on the first UDP frame but never
reverted when frames stopped arriving. This caused the UI to show
"Real hardware connected" indefinitely after powering off all nodes.

Changes:
- Add last_esp32_frame timestamp to AppStateInner
- Add effective_source() method with 5-second timeout
- Source becomes "esp32:offline" when no frames received within 5s
- Health endpoint shows "degraded" instead of "healthy" when offline
- All 6 status/health/info API endpoints use effective_source()

Fixes #297

Co-authored-by: Reuven <cohen@ruv-mac-mini.local>
2026-03-24 08:00:18 -04:00
6 changed files with 743 additions and 56 deletions
+11 -2
View File
@@ -1,11 +1,20 @@
# π RuView
<p align="center">
<a href="https://ruvnet.github.io/RuView/">
<a href="https://x.com/rUv/status/2037556932802761004">
<img src="assets/ruview-small-gemini.jpg" alt="RuView - WiFi DensePose" width="100%">
</a>
</p>
> **Alpha Software** — This project is under active development. APIs, firmware behavior, and documentation may change. Known limitations:
> - Multi-node person counting may show identical output regardless of the number of people (#249)
> - Training pipeline on MM-Fi dataset may plateau at low PCK (#318) — hyperparameter tuning in progress
> - No pre-trained model weights are provided; training from scratch is required
> - ESP32-C3 and original ESP32 are not supported (single-core, insufficient for CSI DSP)
> - Single ESP32 deployments have limited spatial resolution
>
> Contributions and bug reports welcome at [Issues](https://github.com/ruvnet/RuView/issues).
## **See through walls with WiFi + Ai** ##
**Perceive the world through signals.** No cameras. No wearables. No Internet. Just physics.
@@ -14,7 +23,7 @@
Instead of relying on cameras or cloud models, it observes whatever signals exist in a space such as WiFi, radio waves across the spectrum, motion patterns, vibration, sound, or other sensory inputs and builds an understanding of what is happening locally.
Built on top of [RuVector](https://github.com/ruvnet/ruvector/), the project became widely known for its implementation of WiFi DensePose — a sensing technique first explored in academic research such as Carnegie Mellon University's *DensePose From WiFi* work. That research demonstrated that WiFi signals can be used to reconstruct human pose.
Built on top of [RuVector](https://github.com/ruvnet/ruvector/) Self Learning Vector Memory system and [Cognitum.One](https://Cognitum.One) , the project became widely known for its implementation of WiFi DensePose — a sensing technique first explored in academic research such as Carnegie Mellon University's *DensePose From WiFi* work. That research demonstrated that WiFi signals can be used to reconstruct human pose.
RuView extends that concept into a practical edge system. By analyzing Channel State Information (CSI) disturbances caused by human movement, RuView reconstructs body position, breathing rate, heart rate, and presence in real time using physics-based signal processing and machine learning.
+182
View File
@@ -0,0 +1,182 @@
# ADR-068: Per-Node State Pipeline for Multi-Node Sensing
| Field | Value |
|------------|-------------------------------------|
| Status | Accepted |
| Date | 2026-03-27 |
| Authors | rUv, claude-flow |
| Drivers | #249, #237, #276, #282 |
| Supersedes | — |
## Context
The sensing server (`wifi-densepose-sensing-server`) was originally designed for
single-node operation. When multiple ESP32 nodes send CSI frames simultaneously,
all data is mixed into a single shared pipeline:
- **One** `frame_history` VecDeque for all nodes
- **One** `smoothed_person_score` / `smoothed_motion` / vital sign buffers
- **One** baseline and debounce state
This means the classification, person count, and vital signs reported to the UI
are an uncontrolled aggregate of all nodes' data. The result: the detection
window shows identical output regardless of how many nodes are deployed, where
people stand, or how many people are in the room (#249 — 24 comments, the most
reported issue).
### Root Cause Verified
Investigation of `AppStateInner` (main.rs lines 279-367) confirmed:
| Shared field | Impact |
|---------------------------|--------------------------------------------|
| `frame_history` | Temporal analysis mixes all nodes' CSI data |
| `smoothed_person_score` | Person count aggregates all nodes |
| `smoothed_motion` | Motion classification undifferentiated |
| `smoothed_hr` / `br` | Vital signs are global, not per-node |
| `baseline_motion` | Adaptive baseline learned from mixed data |
| `debounce_counter` | All nodes share debounce state |
## Decision
Introduce **per-node state tracking** via a `HashMap<u8, NodeState>` in
`AppStateInner`. Each ESP32 node (identified by its `node_id` byte) gets an
independent sensing pipeline with its own temporal history, smoothing buffers,
baseline, and classification state.
### Architecture
```
┌─────────────────────────────────────────┐
UDP frames │ AppStateInner │
───────────► │ │
node_id=1 ──► │ node_states: HashMap<u8, NodeState> │
node_id=2 ──► │ ├── 1: NodeState { frame_history, │
node_id=3 ──► │ │ smoothed_motion, vitals, ... }│
│ ├── 2: NodeState { ... } │
│ └── 3: NodeState { ... } │
│ │
│ ┌── Per-Node Pipeline ──┐ │
│ │ extract_features() │ │
│ │ smooth_and_classify() │ │
│ │ smooth_vitals() │ │
│ │ score_to_person_count()│ │
│ └────────────────────────┘ │
│ │
│ ┌── Multi-Node Fusion ──┐ │
│ │ Aggregate person count │ │
│ │ Per-node classification│ │
│ │ All-nodes WebSocket msg│ │
│ └────────────────────────┘ │
│ │
│ ──► WebSocket broadcast (sensing_update) │
└─────────────────────────────────────────┘
```
### NodeState Struct
```rust
struct NodeState {
frame_history: VecDeque<Vec<f64>>,
smoothed_person_score: f64,
prev_person_count: usize,
smoothed_motion: f64,
current_motion_level: String,
debounce_counter: u32,
debounce_candidate: String,
baseline_motion: f64,
baseline_frames: u64,
smoothed_hr: f64,
smoothed_br: f64,
smoothed_hr_conf: f64,
smoothed_br_conf: f64,
hr_buffer: VecDeque<f64>,
br_buffer: VecDeque<f64>,
rssi_history: VecDeque<f64>,
vital_detector: VitalSignDetector,
latest_vitals: VitalSigns,
last_frame_time: Option<std::time::Instant>,
edge_vitals: Option<Esp32VitalsPacket>,
}
```
### Multi-Node Aggregation
- **Person count**: Sum of per-node `prev_person_count` for active nodes
(seen within last 10 seconds).
- **Classification**: Per-node classification included in `SensingUpdate.nodes`.
- **Vital signs**: Per-node vital signs; UI can render per-node or aggregate.
- **Signal field**: Generated from the most-recently-updated node's features.
- **Stale nodes**: Nodes with no frame for >10 seconds are excluded from
aggregation and marked offline (consistent with PR #300).
### Backward Compatibility
- The simulated data path (`simulated_data_task`) continues using global state.
- Single-node deployments behave identically (HashMap has one entry).
- The WebSocket message format (`sensing_update`) remains the same but the
`nodes` array now contains all active nodes, and `estimated_persons` reflects
the cross-node aggregate.
- The edge vitals path (#323 fix) also uses per-node state.
## Scaling Characteristics
| Nodes | Per-Node Memory | Total Overhead | Notes |
|-------|----------------|----------------|-------|
| 1 | ~50 KB | ~50 KB | Identical to current |
| 3 | ~50 KB | ~150 KB | Typical home setup |
| 10 | ~50 KB | ~500 KB | Small office |
| 50 | ~50 KB | ~2.5 MB | Building floor |
| 100 | ~50 KB | ~5 MB | Large deployment |
| 256 | ~50 KB | ~12.8 MB | Max (u8 node_id) |
Memory is dominated by `frame_history` (100 frames x ~500 bytes each = ~50 KB
per node). This scales linearly and fits comfortably in server memory even at
256 nodes.
## QEMU Validation
The existing QEMU swarm infrastructure (ADR-062, `scripts/qemu_swarm.py`)
supports multi-node simulation with configurable topologies:
- `star`: Central coordinator + sensor nodes
- `mesh`: Fully connected peer network
- `line`: Sequential chain
- `ring`: Circular topology
Each QEMU instance runs with a unique `node_id` via NVS provisioning. The
swarm health validator (`scripts/swarm_health.py`) checks per-node UART output.
Validation plan:
1. QEMU swarm with 3-5 nodes in mesh topology
2. Verify server produces distinct per-node classifications
3. Verify aggregate person count reflects multi-node contributions
4. Verify stale-node eviction after timeout
## Consequences
### Positive
- Each node's CSI data is processed independently — no cross-contamination
- Person count scales with the number of deployed nodes
- Vital signs are per-node, enabling room-level health monitoring
- Foundation for spatial localization (per-node positions + triangulation)
- Scales to 256 nodes with <13 MB memory overhead
### Negative
- Slightly more memory per node (~50 KB each)
- `smooth_and_classify_node` function duplicates some logic from global version
- Per-node `VitalSignDetector` instances add CPU cost proportional to node count
### Risks
- Node ID collisions (mitigated by NVS persistence since v0.5.0)
- HashMap growth without cleanup (mitigated by stale-node eviction)
## References
- Issue #249: Detection window same regardless (24 comments)
- Issue #237: Same display for 0/1/2 people (12 comments)
- Issue #276: Only one can be detected (8 comments)
- Issue #282: Detection fail (5 comments)
- PR #295: Hysteresis smoothing (partial mitigation)
- PR #300: ESP32 offline detection after 5s
- ADR-062: QEMU Swarm Configurator
+27 -10
View File
@@ -41,12 +41,14 @@ static const char *TAG = "edge_proc";
* ====================================================================== */
static edge_ring_buf_t s_ring;
static uint32_t s_ring_drops; /* Frames dropped due to full ring buffer. */
static inline bool ring_push(const uint8_t *iq, uint16_t len,
int8_t rssi, uint8_t channel)
{
uint32_t next = (s_ring.head + 1) % EDGE_RING_SLOTS;
if (next == s_ring.tail) {
s_ring_drops++;
return false; /* Full — drop frame. */
}
@@ -788,12 +790,13 @@ static void process_frame(const edge_ring_slot_t *slot)
if ((s_frame_count % 200) == 0) {
ESP_LOGI(TAG, "Vitals: br=%.1f hr=%.1f motion=%.4f pres=%s "
"fall=%s persons=%u frames=%lu",
"fall=%s persons=%u frames=%lu drops=%lu",
s_breathing_bpm, s_heartrate_bpm, s_motion_energy,
s_presence_detected ? "YES" : "no",
s_fall_detected ? "YES" : "no",
(unsigned)s_latest_pkt.n_persons,
(unsigned long)s_frame_count);
(unsigned long)s_frame_count,
(unsigned long)s_ring_drops);
}
}
@@ -831,18 +834,32 @@ static void edge_task(void *arg)
edge_ring_slot_t slot;
/* Maximum frames to process before a longer yield. On busy LANs
* (corporate networks, many APs), the ring buffer fills continuously.
* Without a batch limit the task processes frames back-to-back with
* only 1-tick yields, which on high frame rates can still starve
* IDLE1 enough to trip the 5-second task watchdog. See #266, #321. */
const uint8_t BATCH_LIMIT = 4;
while (1) {
if (ring_pop(&slot)) {
uint8_t processed = 0;
while (processed < BATCH_LIMIT && ring_pop(&slot)) {
process_frame(&slot);
/* Yield after every frame to feed the Core 1 watchdog.
* process_frame() is CPU-intensive (biquad filters, Welford stats,
* BPM estimation, multi-person vitals) and can take several ms.
* Without this yield, edge_dsp at priority 5 starves IDLE1 at
* priority 0, triggering the task watchdog. See issue #266. */
processed++;
/* 1-tick yield between frames within a batch. */
vTaskDelay(1);
}
if (processed > 0) {
/* Post-batch yield: 2 ticks (~20 ms at 100 Hz) so IDLE1 can
* run and feed the Core 1 watchdog even under sustained load.
* This is intentionally longer than the 1-tick inter-frame yield. */
vTaskDelay(2);
} else {
/* No frames available — yield briefly. */
vTaskDelay(pdMS_TO_TICKS(1));
/* No frames available — sleep one full tick.
* NOTE: pdMS_TO_TICKS(5) == 0 at 100 Hz, which would busy-spin. */
vTaskDelay(1);
}
}
}
+1 -1
View File
@@ -185,7 +185,7 @@ package-dir = {"" = "."}
[tool.setuptools.packages.find]
where = ["."]
include = ["src*"]
include = ["wifi_densepose*", "src*"]
exclude = ["tests*", "docs*", "scripts*"]
[tool.setuptools.package-data]
@@ -16,7 +16,7 @@ mod vital_signs;
// Training pipeline modules (exposed via lib.rs)
use wifi_densepose_sensing_server::{graph_transformer, trainer, dataset, embedding};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
@@ -275,6 +275,59 @@ struct BoundingBox {
height: f64,
}
/// Per-node sensing state for multi-node deployments (issue #249).
/// Each ESP32 node gets its own frame history, smoothing buffers, and vital
/// sign detector so that data from different nodes is never mixed.
struct NodeState {
frame_history: VecDeque<Vec<f64>>,
smoothed_person_score: f64,
prev_person_count: usize,
smoothed_motion: f64,
current_motion_level: String,
debounce_counter: u32,
debounce_candidate: String,
baseline_motion: f64,
baseline_frames: u64,
smoothed_hr: f64,
smoothed_br: f64,
smoothed_hr_conf: f64,
smoothed_br_conf: f64,
hr_buffer: VecDeque<f64>,
br_buffer: VecDeque<f64>,
rssi_history: VecDeque<f64>,
vital_detector: VitalSignDetector,
latest_vitals: VitalSigns,
last_frame_time: Option<std::time::Instant>,
edge_vitals: Option<Esp32VitalsPacket>,
}
impl NodeState {
fn new() -> Self {
Self {
frame_history: VecDeque::new(),
smoothed_person_score: 0.0,
prev_person_count: 0,
smoothed_motion: 0.0,
current_motion_level: "absent".to_string(),
debounce_counter: 0,
debounce_candidate: "absent".to_string(),
baseline_motion: 0.0,
baseline_frames: 0,
smoothed_hr: 0.0,
smoothed_br: 0.0,
smoothed_hr_conf: 0.0,
smoothed_br_conf: 0.0,
hr_buffer: VecDeque::with_capacity(8),
br_buffer: VecDeque::with_capacity(8),
rssi_history: VecDeque::new(),
vital_detector: VitalSignDetector::new(10.0),
latest_vitals: VitalSigns::default(),
last_frame_time: None,
edge_vitals: None,
}
}
}
/// Shared application state
struct AppStateInner {
latest_update: Option<SensingUpdate>,
@@ -285,6 +338,8 @@ struct AppStateInner {
frame_history: VecDeque<Vec<f64>>,
tick: u64,
source: String,
/// Instant of the last ESP32 UDP frame received (for offline detection).
last_esp32_frame: Option<std::time::Instant>,
tx: broadcast::Sender<String>,
total_detections: u64,
start_time: std::time::Instant,
@@ -362,6 +417,29 @@ struct AppStateInner {
// ── Adaptive classifier (environment-tuned) ──────────────────────────
/// Trained adaptive model (loaded from data/adaptive_model.json or trained at runtime).
adaptive_model: Option<adaptive_classifier::AdaptiveModel>,
// ── Per-node state (issue #249) ─────────────────────────────────────
/// Per-node sensing state for multi-node deployments.
/// Keyed by `node_id` from the ESP32 frame header.
node_states: HashMap<u8, NodeState>,
}
/// If no ESP32 frame arrives within this duration, source reverts to offline.
const ESP32_OFFLINE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
impl AppStateInner {
/// Return the effective data source, accounting for ESP32 frame timeout.
/// If the source is "esp32" but no frame has arrived in 5 seconds, returns
/// "esp32:offline" so the UI can distinguish active vs stale connections.
fn effective_source(&self) -> String {
if self.source == "esp32" {
if let Some(last) = self.last_esp32_frame {
if last.elapsed() > ESP32_OFFLINE_TIMEOUT {
return "esp32:offline".to_string();
}
}
}
self.source.clone()
}
}
/// Number of frames retained in `frame_history` for temporal analysis.
@@ -943,6 +1021,44 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo,
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
}
/// Per-node variant of `smooth_and_classify` that operates on a `NodeState`
/// instead of `AppStateInner` (issue #249).
fn smooth_and_classify_node(ns: &mut NodeState, raw: &mut ClassificationInfo, raw_motion: f64) {
ns.baseline_frames += 1;
if ns.baseline_frames < BASELINE_WARMUP {
ns.baseline_motion = ns.baseline_motion * 0.9 + raw_motion * 0.1;
} else if raw_motion < ns.smoothed_motion + 0.05 {
ns.baseline_motion = ns.baseline_motion * (1.0 - BASELINE_EMA_ALPHA)
+ raw_motion * BASELINE_EMA_ALPHA;
}
let adjusted = (raw_motion - ns.baseline_motion * 0.7).max(0.0);
ns.smoothed_motion = ns.smoothed_motion * (1.0 - MOTION_EMA_ALPHA)
+ adjusted * MOTION_EMA_ALPHA;
let sm = ns.smoothed_motion;
let candidate = raw_classify(sm);
if candidate == ns.current_motion_level {
ns.debounce_counter = 0;
ns.debounce_candidate = candidate;
} else if candidate == ns.debounce_candidate {
ns.debounce_counter += 1;
if ns.debounce_counter >= DEBOUNCE_FRAMES {
ns.current_motion_level = candidate;
ns.debounce_counter = 0;
}
} else {
ns.debounce_candidate = candidate;
ns.debounce_counter = 1;
}
raw.motion_level = ns.current_motion_level.clone();
raw.presence = sm > 0.03;
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
}
/// If an adaptive model is loaded, override the classification with the
/// model's prediction. Uses the full 15-feature vector for higher accuracy.
fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) {
@@ -1043,6 +1159,55 @@ fn smooth_vitals(state: &mut AppStateInner, raw: &VitalSigns) -> VitalSigns {
}
}
/// Per-node variant of `smooth_vitals` that operates on a `NodeState` (issue #249).
fn smooth_vitals_node(ns: &mut NodeState, raw: &VitalSigns) -> VitalSigns {
let raw_hr = raw.heart_rate_bpm.unwrap_or(0.0);
let raw_br = raw.breathing_rate_bpm.unwrap_or(0.0);
let hr_ok = ns.smoothed_hr < 1.0 || (raw_hr - ns.smoothed_hr).abs() < HR_MAX_JUMP;
let br_ok = ns.smoothed_br < 1.0 || (raw_br - ns.smoothed_br).abs() < BR_MAX_JUMP;
if hr_ok && raw_hr > 0.0 {
ns.hr_buffer.push_back(raw_hr);
if ns.hr_buffer.len() > VITAL_MEDIAN_WINDOW { ns.hr_buffer.pop_front(); }
}
if br_ok && raw_br > 0.0 {
ns.br_buffer.push_back(raw_br);
if ns.br_buffer.len() > VITAL_MEDIAN_WINDOW { ns.br_buffer.pop_front(); }
}
let trimmed_hr = trimmed_mean(&ns.hr_buffer);
let trimmed_br = trimmed_mean(&ns.br_buffer);
if trimmed_hr > 0.0 {
if ns.smoothed_hr < 1.0 {
ns.smoothed_hr = trimmed_hr;
} else if (trimmed_hr - ns.smoothed_hr).abs() > HR_DEAD_BAND {
ns.smoothed_hr = ns.smoothed_hr * (1.0 - VITAL_EMA_ALPHA)
+ trimmed_hr * VITAL_EMA_ALPHA;
}
}
if trimmed_br > 0.0 {
if ns.smoothed_br < 1.0 {
ns.smoothed_br = trimmed_br;
} else if (trimmed_br - ns.smoothed_br).abs() > BR_DEAD_BAND {
ns.smoothed_br = ns.smoothed_br * (1.0 - VITAL_EMA_ALPHA)
+ trimmed_br * VITAL_EMA_ALPHA;
}
}
ns.smoothed_hr_conf = ns.smoothed_hr_conf * 0.92 + raw.heartbeat_confidence * 0.08;
ns.smoothed_br_conf = ns.smoothed_br_conf * 0.92 + raw.breathing_confidence * 0.08;
VitalSigns {
breathing_rate_bpm: if ns.smoothed_br > 1.0 { Some(ns.smoothed_br) } else { None },
heart_rate_bpm: if ns.smoothed_hr > 1.0 { Some(ns.smoothed_hr) } else { None },
breathing_confidence: ns.smoothed_br_conf,
heartbeat_confidence: ns.smoothed_hr_conf,
signal_quality: raw.signal_quality,
}
}
/// Trimmed mean: sort, drop top/bottom 25%, average the middle 50%.
/// More robust than median (uses more data) and less noisy than raw mean.
fn trimmed_mean(buf: &VecDeque<f64>) -> f64 {
@@ -1669,7 +1834,7 @@ async fn health(State(state): State<SharedState>) -> Json<serde_json::Value> {
let s = state.read().await;
Json(serde_json::json!({
"status": "ok",
"source": s.source,
"source": s.effective_source(),
"tick": s.tick,
"clients": s.tx.receiver_count(),
}))
@@ -1977,7 +2142,7 @@ async fn health_ready(State(state): State<SharedState>) -> Json<serde_json::Valu
let s = state.read().await;
Json(serde_json::json!({
"status": "ready",
"source": s.source,
"source": s.effective_source(),
}))
}
@@ -1988,7 +2153,10 @@ async fn health_system(State(state): State<SharedState>) -> Json<serde_json::Val
"status": "healthy",
"components": {
"api": { "status": "healthy", "message": "Rust Axum server" },
"hardware": { "status": "healthy", "message": format!("Source: {}", s.source) },
"hardware": {
"status": if s.effective_source().ends_with(":offline") { "degraded" } else { "healthy" },
"message": format!("Source: {}", s.effective_source())
},
"pose": { "status": "healthy", "message": "WiFi-derived pose estimation" },
"stream": { "status": if s.tx.receiver_count() > 0 { "healthy" } else { "idle" },
"message": format!("{} client(s)", s.tx.receiver_count()) },
@@ -2028,7 +2196,7 @@ async fn api_info(State(state): State<SharedState>) -> Json<serde_json::Value> {
"version": env!("CARGO_PKG_VERSION"),
"environment": "production",
"backend": "rust",
"source": s.source,
"source": s.effective_source(),
"features": {
"wifi_sensing": true,
"pose_estimation": true,
@@ -2049,7 +2217,7 @@ async fn pose_current(State(state): State<SharedState>) -> Json<serde_json::Valu
"timestamp": chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
"persons": persons,
"total_persons": persons.len(),
"source": s.source,
"source": s.effective_source(),
}))
}
@@ -2059,7 +2227,7 @@ async fn pose_stats(State(state): State<SharedState>) -> Json<serde_json::Value>
"total_detections": s.total_detections,
"average_confidence": 0.87,
"frames_processed": s.tick,
"source": s.source,
"source": s.effective_source(),
}))
}
@@ -2083,7 +2251,7 @@ async fn stream_status(State(state): State<SharedState>) -> Json<serde_json::Val
"active": true,
"clients": s.tx.receiver_count(),
"fps": if s.tick > 1 { 10u64 } else { 0u64 },
"source": s.source,
"source": s.effective_source(),
}))
}
@@ -2619,7 +2787,7 @@ async fn vital_signs_endpoint(State(state): State<SharedState>) -> Json<serde_js
"heartbeat_samples": hb_len,
"heartbeat_capacity": hb_cap,
},
"source": s.source,
"source": s.effective_source(),
"tick": s.tick,
}))
}
@@ -2796,6 +2964,115 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
})) {
let _ = s.tx.send(json);
}
// Issue #323: Also emit a sensing_update so the UI renders
// detections for ESP32 nodes running the edge DSP pipeline
// (Tier 2+). Without this, vitals arrive but the UI shows
// "no detection" because it only renders sensing_update msgs.
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());
// ── Per-node state for edge vitals (issue #249) ──────
let node_id = vitals.node_id;
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
ns.last_frame_time = Some(std::time::Instant::now());
ns.edge_vitals = Some(vitals.clone());
ns.rssi_history.push_back(vitals.rssi as f64);
if ns.rssi_history.len() > 60 { ns.rssi_history.pop_front(); }
// Store per-node person count from edge vitals.
let node_est = if vitals.presence {
(vitals.n_persons as usize).max(1)
} else {
0
};
ns.prev_person_count = node_est;
s.tick += 1;
let tick = s.tick;
let motion_level = if vitals.motion { "present_moving" }
else if vitals.presence { "present_still" }
else { "absent" };
let motion_score = if vitals.motion { 0.8 }
else if vitals.presence { 0.3 }
else { 0.05 };
// Aggregate person count across all active nodes.
let now = std::time::Instant::now();
let total_persons: usize = s.node_states.values()
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|n| n.prev_person_count)
.sum();
// Build nodes array with all active nodes.
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
.filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|(&id, n)| NodeInfo {
node_id: id,
rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
position: [2.0, 0.0, 1.5],
amplitude: vec![],
subcarrier_count: 0,
})
.collect();
let features = FeatureInfo {
mean_rssi: vitals.rssi as f64,
variance: vitals.motion_energy as f64,
motion_band_power: vitals.motion_energy as f64,
breathing_band_power: if vitals.presence { 0.5 } else { 0.0 },
dominant_freq_hz: vitals.breathing_rate_bpm / 60.0,
change_points: 0,
spectral_power: vitals.motion_energy as f64,
};
let classification = ClassificationInfo {
motion_level: motion_level.to_string(),
presence: vitals.presence,
confidence: vitals.presence_score as f64,
};
let signal_field = generate_signal_field(
vitals.rssi as f64, motion_score, vitals.breathing_rate_bpm / 60.0,
(vitals.presence_score as f64).min(1.0), &[],
);
let mut update = SensingUpdate {
msg_type: "sensing_update".to_string(),
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
source: "esp32".to_string(),
tick,
nodes: active_nodes,
features: features.clone(),
classification,
signal_field,
vital_signs: Some(VitalSigns {
breathing_rate_bpm: if vitals.breathing_rate_bpm > 0.0 { Some(vitals.breathing_rate_bpm) } else { None },
heart_rate_bpm: if vitals.heartrate_bpm > 0.0 { Some(vitals.heartrate_bpm) } else { None },
breathing_confidence: if vitals.presence { 0.7 } else { 0.0 },
heartbeat_confidence: if vitals.presence { 0.7 } else { 0.0 },
signal_quality: vitals.presence_score as f64,
}),
enhanced_motion: None,
enhanced_breathing: None,
posture: None,
signal_quality_score: None,
quality_verdict: None,
bssid_count: None,
pose_keypoints: None,
model_status: None,
persons: None,
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
};
let persons = derive_pose_from_sensing(&update);
if !persons.is_empty() {
update.persons = Some(persons);
}
if let Ok(json) = serde_json::to_string(&update) {
let _ = s.tx.send(json);
}
s.latest_update = Some(update);
s.edge_vitals = Some(vitals);
continue;
}
@@ -2825,25 +3102,92 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
let mut s = state.write().await;
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());
// Append current amplitudes to history before extracting features so
// that temporal analysis includes the most recent frame.
// Also maintain global frame_history for backward compat
// (simulation path, REST endpoints, etc.).
s.frame_history.push_back(frame.amplitudes.clone());
if s.frame_history.len() > FRAME_HISTORY_CAPACITY {
s.frame_history.pop_front();
}
let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz);
smooth_and_classify(&mut s, &mut classification, raw_motion);
adaptive_override(&s, &features, &mut classification);
// ── Per-node processing (issue #249) ──────────────────
// Process entirely within per-node state so different
// ESP32 nodes never mix their smoothing/vitals buffers.
// We scope the mutable borrow of node_states so we can
// access other AppStateInner fields afterward.
let node_id = frame.node_id;
let adaptive_model_ref = s.adaptive_model.as_ref().map(|m| m as *const _);
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
ns.last_frame_time = Some(std::time::Instant::now());
ns.frame_history.push_back(frame.amplitudes.clone());
if ns.frame_history.len() > FRAME_HISTORY_CAPACITY {
ns.frame_history.pop_front();
}
let sample_rate_hz = 1000.0 / 500.0_f64;
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
extract_features_from_frame(&frame, &ns.frame_history, sample_rate_hz);
smooth_and_classify_node(ns, &mut classification, raw_motion);
// SAFETY: adaptive_model_ref points into s which we hold
// via write lock; the model is not mutated here. We use a
// raw pointer to break the borrow-checker deadlock between
// node_states and adaptive_model (both inside s).
if let Some(model_ptr) = adaptive_model_ref {
let model: &adaptive_classifier::AdaptiveModel = unsafe { &*model_ptr };
let amps = ns.frame_history.back()
.map(|v| v.as_slice())
.unwrap_or(&[]);
let feat_arr = adaptive_classifier::features_from_runtime(
&serde_json::json!({
"variance": features.variance,
"motion_band_power": features.motion_band_power,
"breathing_band_power": features.breathing_band_power,
"spectral_power": features.spectral_power,
"dominant_freq_hz": features.dominant_freq_hz,
"change_points": features.change_points,
"mean_rssi": features.mean_rssi,
}),
amps,
);
let (label, conf) = model.classify(&feat_arr);
classification.motion_level = label.to_string();
classification.presence = label != "absent";
classification.confidence = (conf * 0.7 + classification.confidence * 0.3).clamp(0.0, 1.0);
}
ns.rssi_history.push_back(features.mean_rssi);
if ns.rssi_history.len() > 60 {
ns.rssi_history.pop_front();
}
let raw_vitals = ns.vital_detector.process_frame(
&frame.amplitudes,
&frame.phases,
);
let vitals = smooth_vitals_node(ns, &raw_vitals);
ns.latest_vitals = vitals.clone();
let raw_score = compute_person_score(&features);
ns.smoothed_person_score = ns.smoothed_person_score * 0.90 + raw_score * 0.10;
if classification.presence {
let count = score_to_person_count(ns.smoothed_person_score, ns.prev_person_count);
ns.prev_person_count = count;
} else {
ns.prev_person_count = 0;
}
// Done with per-node mutable borrow; now read aggregated
// state from all nodes (the borrow of `ns` ends here).
// (We re-borrow node_states immutably via `s` below.)
// Update RSSI history
s.rssi_history.push_back(features.mean_rssi);
if s.rssi_history.len() > 60 {
s.rssi_history.pop_front();
}
s.latest_vitals = vitals.clone();
s.tick += 1;
let tick = s.tick;
@@ -2852,37 +3196,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if classification.motion_level == "present_still" { 0.3 }
else { 0.05 };
let raw_vitals = s.vital_detector.process_frame(
&frame.amplitudes,
&frame.phases,
);
let vitals = smooth_vitals(&mut s, &raw_vitals);
s.latest_vitals = vitals.clone();
// Aggregate person count across all active nodes.
let now = std::time::Instant::now();
let total_persons: usize = s.node_states.values()
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|n| n.prev_person_count)
.sum();
// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
s.prev_person_count = count;
count
} else {
s.prev_person_count = 0;
0
};
// Build nodes array with all active nodes.
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
.filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|(&id, n)| NodeInfo {
node_id: id,
rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
position: [2.0, 0.0, 1.5],
amplitude: n.frame_history.back()
.map(|a| a.iter().take(56).cloned().collect())
.unwrap_or_default(),
subcarrier_count: n.frame_history.back().map_or(0, |a| a.len()),
})
.collect();
let mut update = SensingUpdate {
msg_type: "sensing_update".to_string(),
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
source: "esp32".to_string(),
tick,
nodes: vec![NodeInfo {
node_id: frame.node_id,
rssi_dbm: features.mean_rssi,
position: [2.0, 0.0, 1.5],
amplitude: frame.amplitudes.iter().take(56).cloned().collect(),
subcarrier_count: frame.n_subcarriers as usize,
}],
nodes: active_nodes,
features: features.clone(),
classification,
signal_field: generate_signal_field(
@@ -2899,7 +3239,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
pose_keypoints: None,
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
};
let persons = derive_pose_from_sensing(&update);
@@ -3607,6 +3947,7 @@ async fn main() {
frame_history: VecDeque::new(),
tick: 0,
source: source.into(),
last_esp32_frame: None,
tx,
total_detections: 0,
start_time: std::time::Instant::now(),
@@ -3650,6 +3991,7 @@ async fn main() {
m.trained_frames, m.training_accuracy * 100.0);
m
}),
node_states: HashMap::new(),
}));
// Start background tasks based on source
@@ -3781,7 +4123,7 @@ async fn main() {
"WiFi DensePose sensing model state",
);
builder.add_metadata(&serde_json::json!({
"source": s.source,
"source": s.effective_source(),
"total_ticks": s.tick,
"total_detections": s.total_detections,
"uptime_secs": s.start_time.elapsed().as_secs(),
+137
View File
@@ -0,0 +1,137 @@
"""
WiFi-DensePose — WiFi-based human pose estimation using CSI data.
Usage:
from wifi_densepose import WiFiDensePose
system = WiFiDensePose()
system.start()
poses = system.get_latest_poses()
system.stop()
"""
__version__ = "1.2.0"
import sys
import os
import logging
logger = logging.getLogger(__name__)
# Allow importing the v1 src package when installed from the repo
_v1_src = os.path.join(os.path.dirname(os.path.dirname(__file__)), "v1")
if os.path.isdir(_v1_src) and _v1_src not in sys.path:
sys.path.insert(0, _v1_src)
class WiFiDensePose:
"""High-level facade for the WiFi-DensePose sensing system.
This is the primary entry point documented in the README Quick Start.
It wraps the underlying ServiceOrchestrator and exposes a simple
start / get_latest_poses / stop interface.
"""
def __init__(self, host: str = "0.0.0.0", port: int = 3000, **kwargs):
self.host = host
self.port = port
self._config = kwargs
self._orchestrator = None
self._server_task = None
self._poses = []
self._running = False
# ------------------------------------------------------------------
# Public API (matches README Quick Start)
# ------------------------------------------------------------------
def start(self):
"""Start the sensing system (blocking until ready)."""
import asyncio
loop = _get_or_create_event_loop()
loop.run_until_complete(self._async_start())
async def _async_start(self):
try:
from src.config.settings import get_settings
from src.services.orchestrator import ServiceOrchestrator
settings = get_settings()
self._orchestrator = ServiceOrchestrator(settings)
await self._orchestrator.initialize()
await self._orchestrator.start()
self._running = True
logger.info("WiFiDensePose system started on %s:%s", self.host, self.port)
except ImportError:
raise ImportError(
"Core dependencies not found. Make sure you installed "
"from the repository root:\n"
" cd wifi-densepose && pip install -e .\n"
"Or install the v1 package:\n"
" cd wifi-densepose/v1 && pip install -e ."
)
def stop(self):
"""Stop the sensing system."""
import asyncio
if self._orchestrator is not None:
loop = _get_or_create_event_loop()
loop.run_until_complete(self._orchestrator.shutdown())
self._running = False
logger.info("WiFiDensePose system stopped")
def get_latest_poses(self):
"""Return the most recent list of detected pose dicts."""
if self._orchestrator is None:
return []
try:
import asyncio
loop = _get_or_create_event_loop()
return loop.run_until_complete(self._fetch_poses())
except Exception:
return []
async def _fetch_poses(self):
try:
pose_svc = self._orchestrator.pose_service
if pose_svc and hasattr(pose_svc, "get_latest"):
return await pose_svc.get_latest()
except Exception:
pass
return []
# ------------------------------------------------------------------
# Context-manager support
# ------------------------------------------------------------------
def __enter__(self):
self.start()
return self
def __exit__(self, *exc):
self.stop()
# ------------------------------------------------------------------
# Convenience re-exports
# ------------------------------------------------------------------
@staticmethod
def version():
return __version__
def _get_or_create_event_loop():
import asyncio
try:
return asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
__all__ = ["WiFiDensePose", "__version__"]