Compare commits

..

2 Commits

Author SHA1 Message Date
rUv 3fec67654a Merge pull request #906 from ruvnet/fix/893-csi-data-frame-capture
fix(firmware): capture DATA frames on display-less boards — #893/#866/#897 (yield=0pps root cause)
2026-06-02 04:23:44 -04:00
ruv 898c536eac fix(firmware): capture DATA frames on display-less boards — #893/#866/#897
The pre-built binaries set a MGMT-only promiscuous filter
(WIFI_PROMIS_FILTER_MASK_MGMT) as the #396 workaround — DATA-frame
interrupt load races the QSPI display's SPI traffic against the SPI-flash
cache and crashes Core 0 in wDev_ProcessFiq. But MGMT-only fires the CSI
callback only on sparse management frames, so on the common DISPLAY-LESS
boards (DevKitC-1, T7-S3, N8R8) CSI yield collapses to 0 pps under real
traffic (#521) — the node looks dead despite being on the network, which
is the root cause of most "can't reproduce / it's fake" reports (#804/#37).

A board with no AMOLED panel has no QSPI/SPI-flash contention, so it can
safely capture DATA frames. After the boot-time display probe runs:
  - display present  -> keep MGMT-only (preserve #396 crash protection)
  - no display       -> upgrade filter to MGMT|DATA (restore CSI yield)

Implementation (runtime-gated, no boot reorder):
  - display_task.c: s_display_active flag + display_is_active() accessor,
    set true only when the panel is detected and the display task starts.
  - csi_collector.c: csi_collector_enable_data_capture() re-sets the
    promiscuous filter to MGMT|DATA.
  - main.c: after display_task_start(), if !display_is_active() (or display
    support not compiled in), upgrade the filter.

Build-verified on BOTH targets: esp32c6 (headless path) and esp32s3
(display path, display_task.c compiled) — Project build complete, RC 0.
Needs on-hardware confirmation that yield recovers and no #396 crash.
2026-06-02 09:57:19 +02:00
9 changed files with 94 additions and 172 deletions
-1
View File
@@ -8,7 +8,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- **MQTT multi-node deployments now create one Home-Assistant device per node — closes #898.** After the #872 MQTT wiring landed, the JSON→`VitalsSnapshot` bridge hard-coded a single `node_id` (the MQTT client id) and the publisher used a single `OwnedDiscoveryBuilder`, so every physical node collapsed into one device (`identifiers:["wifi_densepose_wifi-densepose-1"]`), contradicting the "one device per node" docs. The bridge now emits one snapshot per node in the sensing update's `nodes[]` (each with its own `node_id` + RSSI, falling back to a single aggregate snapshot for wifi/simulate sources), and the publisher derives a per-node builder (`OwnedDiscoveryBuilder::for_node`) that publishes discovery + availability lazily on first sight of each `node_id` and routes state to per-node topics — yielding N distinct HA devices with per-node availability/LWT. Unit-tested (distinct nodes → distinct `wifi_densepose_<node>` identifiers); 71 MQTT tests pass.
- **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 23 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 03) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass.
- **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass.
@@ -637,6 +637,23 @@ static void hop_timer_cb(void *arg)
csi_hop_next_channel();
}
void csi_collector_enable_data_capture(void)
{
/* MGMT-only (RuView#396) starves the CSI callback on display-less boards
* (RuView#521/#893): beacons alone are sparse, yield collapses to 0 pps.
* Without a display there is no QSPI/SPI-flash cache contention with the
* DATA-frame interrupt load, so capture DATA frames too. */
wifi_promiscuous_filter_t filt = {
.filter_mask = WIFI_PROMIS_FILTER_MASK_MGMT | WIFI_PROMIS_FILTER_MASK_DATA,
};
esp_err_t err = esp_wifi_set_promiscuous_filter(&filt);
if (err == ESP_OK) {
ESP_LOGI(TAG, "CSI filter upgraded to MGMT+DATA (no display, RuView#893)");
} else {
ESP_LOGW(TAG, "Failed to enable DATA-frame CSI capture: %s", esp_err_to_name(err));
}
}
void csi_collector_start_hop_timer(void)
{
if (s_hop_count <= 1) {
@@ -90,6 +90,19 @@ void csi_hop_next_channel(void);
*/
void csi_collector_start_hop_timer(void);
/**
* Upgrade the promiscuous filter to capture DATA frames in addition to MGMT
* (RuView#893/#521).
*
* Called on display-less boards: the MGMT-only filter (the #396 display-crash
* workaround set in csi_collector_init) only fires the CSI callback on sparse
* management frames, so yield collapses to 0 pps under real traffic and the
* node looks dead. A board with no AMOLED panel has no QSPI/SPI-flash cache
* contention, so it can safely capture DATA frames — restoring abundant CSI.
* Display boards keep MGMT-only to avoid the #396 crash.
*/
void csi_collector_enable_data_capture(void);
/**
* Inject an NDP (Null Data Packet) frame for sensing.
*
@@ -9,6 +9,14 @@
#include "display_task.h"
#include "sdkconfig.h"
/* Set true once an AMOLED panel is detected and the display task starts.
* Defined outside the CONFIG_DISPLAY_ENABLE guard so display_is_active()
* exists on headless builds too (where it stays false → CSI captures DATA
* frames; see RuView#893). */
static bool s_display_active = false;
bool display_is_active(void) { return s_display_active; }
#if CONFIG_DISPLAY_ENABLE
#include <string.h>
@@ -162,6 +170,7 @@ esp_err_t display_task_start(void)
ESP_LOGI(TAG, "Display task started (Core %d, priority %d, %d fps)",
DISP_TASK_CORE, DISP_TASK_PRIORITY, DISP_FPS_LIMIT);
s_display_active = true;
return ESP_OK;
}
@@ -7,6 +7,7 @@
#define DISPLAY_TASK_H
#include "esp_err.h"
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
@@ -22,6 +23,15 @@ extern "C" {
*/
esp_err_t display_task_start(void);
/**
* @return true once an AMOLED panel has been detected and the display task
* is running; false on headless boards (no panel, or built without display
* support). Used to choose the CSI promiscuous filter (RuView#893): a board
* with no display has no QSPI/SPI-flash contention, so it can safely capture
* DATA frames for proper CSI yield instead of starving on MGMT-only.
*/
bool display_is_active(void);
#ifdef __cplusplus
}
#endif
+15
View File
@@ -410,6 +410,21 @@ void app_main(void)
}
#endif
/* RuView#893/#521: the MGMT-only promiscuous filter (set in
* csi_collector_init as the #396 display-crash workaround) starves the CSI
* callback on display-less boards — yield collapses to 0 pps and the node
* looks dead despite being on the network. Now that the display probe has
* run, boards with no AMOLED panel (no QSPI/SPI-flash cache contention)
* upgrade the filter to capture DATA frames too, restoring CSI yield. */
#ifdef CONFIG_DISPLAY_ENABLE
bool has_display = display_is_active(); /* runtime panel probe result */
#else
bool has_display = false; /* display support not compiled in */
#endif
if (!has_display) {
csi_collector_enable_data_capture();
}
ESP_LOGI(TAG, "CSI streaming active → %s:%d (edge_tier=%u, OTA=%s, WASM=%s, mmWave=%s, swarm=%s, adapt=%s)",
g_nvs_config.target_ip, g_nvs_config.target_port,
g_nvs_config.edge_tier,
@@ -6213,44 +6213,24 @@ async fn main() {
Some(_) => 1.0,
None => 0.0,
};
let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
let conf = cls["confidence"].as_f64().unwrap_or(0.0);
let presence_score = if presence { conf.max(0.0) } else { 0.0 };
let breathing = vit["breathing_rate_bpm"].as_f64();
let hr = vit["heart_rate_bpm"].as_f64();
// #898: emit one snapshot per physical node so each
// surfaces as its own Home-Assistant device (with
// its own RSSI + availability). Falls back to a
// single aggregate snapshot when there is no
// per-node data (e.g. wifi / simulate sources).
let mk = |nid: String, rssi: Option<f64>| mqtt::state::VitalsSnapshot {
node_id: nid,
timestamp_ms: ts,
let snap = mqtt::state::VitalsSnapshot {
node_id: node_id.clone(),
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
presence,
motion,
presence_score,
breathing_rate_bpm: breathing,
heartrate_bpm: hr,
presence_score: if presence {
cls["confidence"].as_f64().unwrap_or(1.0)
} else {
0.0
},
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
n_persons,
rssi_dbm: rssi,
vital_confidence: conf,
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
..Default::default()
};
match v["nodes"].as_array() {
Some(arr) if !arr.is_empty() => {
for node in arr {
let n = node["node_id"].as_u64().unwrap_or(0);
let nid = format!("{node_id}-node{n}");
let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64()));
}
}
_ => {
let _ = vtx.send(mk(
node_id.clone(),
v["nodes"][0]["rssi_dbm"].as_f64(),
));
}
}
let _ = vtx.send(snap);
}
});
tracing::info!("MQTT publisher started -> {host}:{port}");
@@ -117,23 +117,6 @@ impl OwnedDiscoveryBuilder {
via_device: self.via_device.as_deref(),
}
}
/// Derive a per-node builder from this base (issue #898). Each physical
/// RuView node must surface as its own Home-Assistant device — the base
/// builder's `node_id` (the MQTT client id) is replaced with the actual
/// node id, giving a distinct `wifi_densepose_<node>` device identifier
/// and a per-node friendly name, instead of collapsing every node into a
/// single hard-coded device.
pub fn for_node(&self, node_id: &str) -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: self.discovery_prefix.clone(),
node_id: node_id.to_string(),
node_friendly_name: Some(format!("RuView node {node_id}")),
sw_version: self.sw_version.clone(),
model: self.model.clone(),
via_device: self.via_device.clone(),
}
}
}
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
@@ -146,19 +129,20 @@ async fn run(
let opts = build_mqtt_options(&cfg);
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
let builder_borrowed = builder_owned.as_borrowed();
let entities = DiscoveryBuilder::enabled_entities(
cfg.privacy_mode,
cfg.publish_pose,
&[], // no_semantic — wire from cli::Args in P3.5
);
// #898: one Home-Assistant device per node. Discovery + availability are
// published lazily the first time a snapshot for a given node_id arrives;
// each node's builder + availability are retained here for heartbeats and
// the offline LWT. (Previously a single hard-coded builder collapsed every
// node into one device.)
let mut nodes: std::collections::HashMap<String, (OwnedDiscoveryBuilder, NodeAvailability)> =
std::collections::HashMap::new();
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] initial discovery publish failed: {e}");
}
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] initial availability publish failed: {e}");
}
let mut rate_limiter = RateLimiter::new();
let mut last_heartbeat = Instant::now();
@@ -195,20 +179,14 @@ async fn run(
// Periodic heartbeat / discovery refresh.
_ = tokio::time::sleep(Duration::from_secs(1)) => {
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
for (_, na) in nodes.values() {
if let Err(e) = publish_availability(&client, na, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
}
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
}
last_heartbeat = Instant::now();
}
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
for (nb, _) in nodes.values() {
if let Err(e) =
publish_all_discovery(&client, &nb.as_borrowed(), &entities).await
{
warn!("[mqtt] discovery refresh failed: {e}");
}
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] discovery refresh failed: {e}");
}
last_refresh = Instant::now();
}
@@ -219,39 +197,18 @@ async fn run(
match recv {
Ok(snap) => {
let elapsed = start_instant.elapsed();
// #898: on first sight of a node_id, publish that
// node's discovery + availability; then route its
// state to per-node topics.
if !nodes.contains_key(&snap.node_id) {
let nb = builder_owned.for_node(&snap.node_id);
let borrowed = nb.as_borrowed();
if let Err(e) =
publish_all_discovery(&client, &borrowed, &entities).await
{
warn!("[mqtt] node {} discovery failed: {e}", snap.node_id);
}
let na = NodeAvailability::for_builder(&borrowed, &entities);
if let Err(e) = publish_availability(&client, &na, "online").await {
warn!("[mqtt] node {} availability failed: {e}", snap.node_id);
}
nodes.insert(snap.node_id.clone(), (nb, na));
}
let borrowed = nodes[&snap.node_id].0.as_borrowed();
publish_snapshot(&client, &borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
}
Err(broadcast::error::RecvError::Closed) => {
info!("[mqtt] broadcast channel closed, draining");
// Publish offline for every known node before exit.
for (_, na) in nodes.values() {
let _ = publish_availability(&client, na, "offline").await;
}
// Publish offline before exit.
let _ = publish_availability(&client, &avail, "offline").await;
let _ = client.disconnect().await;
return;
}
}
}
}
@@ -339,52 +296,3 @@ async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), Cli
};
client.publish(&m.topic, qos, m.retain, m.payload.clone()).await
}
#[cfg(test)]
mod per_node_device_tests {
//! Issue #898 — each physical node must surface as its own Home-Assistant
//! device, not collapse into one hard-coded device.
use super::*;
fn base() -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: "homeassistant".into(),
node_id: "wifi-densepose-1".into(),
node_friendly_name: Some("RuView".into()),
sw_version: "0.0.0".into(),
model: "test".into(),
via_device: None,
}
}
fn device_identifiers(b: &OwnedDiscoveryBuilder) -> Vec<String> {
b.as_borrowed().build(EntityKind::Presence).device.identifiers
}
#[test]
fn for_node_overrides_node_id_and_friendly_name() {
let n = base().for_node("node-A");
assert_eq!(n.node_id, "node-A");
assert_eq!(n.node_friendly_name.as_deref(), Some("RuView node node-A"));
}
#[test]
fn distinct_nodes_yield_distinct_ha_device_identifiers() {
let b = base();
let a = device_identifiers(&b.for_node("node-A"));
let c = device_identifiers(&b.for_node("node-B"));
assert_eq!(a, vec!["wifi_densepose_node-A".to_string()]);
assert_eq!(c, vec!["wifi_densepose_node-B".to_string()]);
assert_ne!(a, c, "#898: two nodes must not collapse into one device");
}
#[test]
fn single_node_keeps_a_stable_identity() {
// Two snapshots from the same node map to the same device.
let b = base();
assert_eq!(
device_identifiers(&b.for_node("node-7")),
device_identifiers(&b.for_node("node-7"))
);
}
}
@@ -171,28 +171,12 @@ async fn discovery_topics_appear_on_broker() {
// Spawn the publisher.
let cfg = make_cfg(port, false, "discovery");
let builder = make_builder("inttest1");
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// #898: discovery is now published per-node the first time a snapshot for
// that node_id arrives (not eagerly at startup). Drive snapshots for
// "inttest1" throughout the window so its device's discovery lands — same
// pattern as state_messages_published_on_snapshot_broadcast.
let tx_bg = tx.clone();
let drive = tokio::spawn(async move {
for _ in 0..60 {
let _ = tx_bg.send(VitalsSnapshot {
node_id: "inttest1".into(),
..Default::default()
});
tokio::time::sleep(Duration::from_millis(200)).await;
}
});
// Drain the subscriber for up to 6 s — enough for initial discovery
// + first availability publication.
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
drive.abort();
let _ = sub.disconnect().await;
// Assertions: at least the presence + heart_rate + fall discovery
@@ -237,23 +221,10 @@ async fn privacy_mode_suppresses_biometric_discovery() {
let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy");
let builder = make_builder("inttest2");
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// #898: per-node discovery is triggered by a snapshot for that node_id.
let tx_bg = tx.clone();
let drive = tokio::spawn(async move {
for _ in 0..60 {
let _ = tx_bg.send(VitalsSnapshot {
node_id: "inttest2".into(),
..Default::default()
});
tokio::time::sleep(Duration::from_millis(200)).await;
}
});
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
drive.abort();
let _ = sub.disconnect().await;
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();