Compare commits

...

9 Commits

Author SHA1 Message Date
ruv 786e834dae release(firmware): refresh release_bins with the #893 CSI fix → v0.6.7
The pre-built binaries in release_bins/ were v0.6.6 (May 21) and shipped
the MGMT-only promiscuous filter, so display-less boards flashed from them
got yield=0pps (#893/#866/#897 — the root cause of the "can't reproduce /
it's fake" reports). Rebuilt every flashable variant from main (which has
the #893 display-gated DATA-frame fix) and refreshed the binaries:

- top-level ESP32-S3 8MB (sdkconfig.defaults) — esp32-csi-node.bin +
  bootloader (partition-table/ota_data unchanged — code-only fix)
- esp32-csi-node-4mb.bin (ESP32-S3 4MB, sdkconfig.defaults.4mb)
- c6-adr110/ (ESP32-C6, sdkconfig.defaults.esp32c6) — the exact firmware
  hardware-verified on COM6 (CSI yield 0→27 pps, presence/motion alive,
  no #396 crash)
- s3-adr110/ (same production S3 8MB config)

Left untouched: s3-fair-adr110/ (a non-production size-comparison build,
features stripped — not a board anyone flashes for sensing).

version.txt → 0.6.7; SHA256SUMS regenerated for the changed variant dirs.
Display boards keep MGMT-only (preserves the #396 crash protection);
display-less boards now capture DATA frames and stream CSI.

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-06-02 11:18:03 +02:00
rUv 8703ade9b6 Merge pull request #907 from ruvnet/fix/894-occupancy-cap
fix(occupancy): bound eigenvalue person-count to single-link max — #894
2026-06-02 04:53:18 -04:00
ruv 4c87f04919 Merge remote-tracking branch 'origin/main' into fix/894-occupancy-cap
# Conflicts:
#	CHANGELOG.md
2026-06-02 10:52:53 +02:00
rUv 9df908d898 Merge pull request #904 from ruvnet/fix/898-mqtt-per-node-devices
fix(mqtt): one Home-Assistant device per node — closes #898
2026-06-02 04:44:09 -04:00
ruv f34b94aa46 fix(occupancy): bound eigenvalue person-count to single-link max — #894
field_bridge::occupancy_or_fallback returned FieldModel::estimate_occupancy
unbounded (internal ceiling 10), while the perturbation fallback below it
and score_to_person_count both cap at 3 ("1-3 for single ESP32"). On noisy
or under-calibrated CSI the eigenvalue count inflated → "10 persons when 1
present" (#894, seen when --model fails to load → heuristic mode). Bound the
eigenvalue path to a shared MAX_SINGLE_LINK_OCCUPANCY const (3) so every
single-link estimator agrees. Genuine higher counts come from the
multistatic fusion path. Build clean, field_bridge tests pass.
2026-06-02 10:40:24 +02:00
ruv 27edf153dc test(mqtt): drive per-node snapshots in discovery integration tests — #898
After the per-node discovery change, discovery configs are published the
first time a snapshot for a node_id arrives (not eagerly at startup). The
two discovery integration tests (discovery_topics_appear_on_broker,
privacy_mode_suppresses_biometric_discovery) spawned the publisher with an
empty broadcast channel and never sent a snapshot, so they collected []
and failed ("missing presence discovery topic in []").

Drive snapshots for the test node_id throughout the capture window (same
pattern as state_messages_published_on_snapshot_broadcast) so the per-node
device's discovery lands. Verified against a local mosquitto: 3 passed.
2026-06-02 10:29:17 +02:00
rUv 3fec67654a Merge pull request #906 from ruvnet/fix/893-csi-data-frame-capture
fix(firmware): capture DATA frames on display-less boards — #893/#866/#897 (yield=0pps root cause)
2026-06-02 04:23:44 -04:00
ruv 898c536eac fix(firmware): capture DATA frames on display-less boards — #893/#866/#897
The pre-built binaries set a MGMT-only promiscuous filter
(WIFI_PROMIS_FILTER_MASK_MGMT) as the #396 workaround — DATA-frame
interrupt load races the QSPI display's SPI traffic against the SPI-flash
cache and crashes Core 0 in wDev_ProcessFiq. But MGMT-only fires the CSI
callback only on sparse management frames, so on the common DISPLAY-LESS
boards (DevKitC-1, T7-S3, N8R8) CSI yield collapses to 0 pps under real
traffic (#521) — the node looks dead despite being on the network, which
is the root cause of most "can't reproduce / it's fake" reports (#804/#37).

A board with no AMOLED panel has no QSPI/SPI-flash contention, so it can
safely capture DATA frames. After the boot-time display probe runs:
  - display present  -> keep MGMT-only (preserve #396 crash protection)
  - no display       -> upgrade filter to MGMT|DATA (restore CSI yield)

Implementation (runtime-gated, no boot reorder):
  - display_task.c: s_display_active flag + display_is_active() accessor,
    set true only when the panel is detected and the display task starts.
  - csi_collector.c: csi_collector_enable_data_capture() re-sets the
    promiscuous filter to MGMT|DATA.
  - main.c: after display_task_start(), if !display_is_active() (or display
    support not compiled in), upgrade the filter.

Build-verified on BOTH targets: esp32c6 (headless path) and esp32s3
(display path, display_task.c compiled) — Project build complete, RC 0.
Needs on-hardware confirmation that yield recovers and no #396 crash.
2026-06-02 09:57:19 +02:00
ruv 9ddcf0c9fc fix(mqtt): one HA device per node — closes #898
After the #872 MQTT wiring, the JSON->VitalsSnapshot bridge hard-coded a
single node_id (the MQTT client id) and the publisher used one
OwnedDiscoveryBuilder, so every physical node collapsed into a single
Home-Assistant device (identifiers:["wifi_densepose_wifi-densepose-1"]),
contradicting the one-device-per-node docs.

- Bridge (main.rs): emit one VitalsSnapshot per node in the sensing
  update's nodes[] (each carries its own node_id + RSSI; shared aggregate
  presence/vitals), falling back to a single aggregate snapshot when
  there is no per-node data (wifi/simulate sources).
- Publisher (publisher.rs): add OwnedDiscoveryBuilder::for_node(), and
  publish discovery + availability lazily on first sight of each node_id,
  routing state to per-node topics. Heartbeat/refresh/offline-LWT iterate
  all known nodes. Result: N distinct HA devices, one per node.

3 new unit tests (distinct nodes -> distinct wifi_densepose_<node>
identifiers); full MQTT suite 71 passed, example builds.
2026-06-02 09:43:28 +02:00
21 changed files with 262 additions and 39 deletions
+2
View File
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- **Person count no longer leaks up to 10 in heuristic mode — addresses #894.** `field_bridge::occupancy_or_fallback` returned the eigenvalue-based `FieldModel::estimate_occupancy` count **unbounded** (its internal ceiling is 10), while the sibling estimators on the same single-link data — the perturbation-energy fallback right below it and `score_to_person_count` — both cap at 3 ("1-3 for single ESP32"). On noisy / under-calibrated CSI the eigenvalue count inflated, producing the "10 persons reported when 1 present" symptom (seen when `--model` fails to load and the server runs on heuristics). Bounded the eigenvalue path to the shared `MAX_SINGLE_LINK_OCCUPANCY` (3) so every estimator on one link agrees; genuine higher counts come from the multistatic fusion path, not a single-link covariance estimate.
- **MQTT multi-node deployments now create one Home-Assistant device per node — closes #898.** After the #872 MQTT wiring landed, the JSON→`VitalsSnapshot` bridge hard-coded a single `node_id` (the MQTT client id) and the publisher used a single `OwnedDiscoveryBuilder`, so every physical node collapsed into one device (`identifiers:["wifi_densepose_wifi-densepose-1"]`), contradicting the "one device per node" docs. The bridge now emits one snapshot per node in the sensing update's `nodes[]` (each with its own `node_id` + RSSI, falling back to a single aggregate snapshot for wifi/simulate sources), and the publisher derives a per-node builder (`OwnedDiscoveryBuilder::for_node`) that publishes discovery + availability lazily on first sight of each `node_id` and routes state to per-node topics — yielding N distinct HA devices with per-node availability/LWT. Unit-tested (distinct nodes → distinct `wifi_densepose_<node>` identifiers); 71 MQTT tests pass.
- **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 23 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 03) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass.
- **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass.
@@ -637,6 +637,23 @@ static void hop_timer_cb(void *arg)
csi_hop_next_channel();
}
void csi_collector_enable_data_capture(void)
{
/* MGMT-only (RuView#396) starves the CSI callback on display-less boards
* (RuView#521/#893): beacons alone are sparse, yield collapses to 0 pps.
* Without a display there is no QSPI/SPI-flash cache contention with the
* DATA-frame interrupt load, so capture DATA frames too. */
wifi_promiscuous_filter_t filt = {
.filter_mask = WIFI_PROMIS_FILTER_MASK_MGMT | WIFI_PROMIS_FILTER_MASK_DATA,
};
esp_err_t err = esp_wifi_set_promiscuous_filter(&filt);
if (err == ESP_OK) {
ESP_LOGI(TAG, "CSI filter upgraded to MGMT+DATA (no display, RuView#893)");
} else {
ESP_LOGW(TAG, "Failed to enable DATA-frame CSI capture: %s", esp_err_to_name(err));
}
}
void csi_collector_start_hop_timer(void)
{
if (s_hop_count <= 1) {
@@ -90,6 +90,19 @@ void csi_hop_next_channel(void);
*/
void csi_collector_start_hop_timer(void);
/**
* Upgrade the promiscuous filter to capture DATA frames in addition to MGMT
* (RuView#893/#521).
*
* Called on display-less boards: the MGMT-only filter (the #396 display-crash
* workaround set in csi_collector_init) only fires the CSI callback on sparse
* management frames, so yield collapses to 0 pps under real traffic and the
* node looks dead. A board with no AMOLED panel has no QSPI/SPI-flash cache
* contention, so it can safely capture DATA frames — restoring abundant CSI.
* Display boards keep MGMT-only to avoid the #396 crash.
*/
void csi_collector_enable_data_capture(void);
/**
* Inject an NDP (Null Data Packet) frame for sensing.
*
@@ -9,6 +9,14 @@
#include "display_task.h"
#include "sdkconfig.h"
/* Set true once an AMOLED panel is detected and the display task starts.
* Defined outside the CONFIG_DISPLAY_ENABLE guard so display_is_active()
* exists on headless builds too (where it stays false → CSI captures DATA
* frames; see RuView#893). */
static bool s_display_active = false;
bool display_is_active(void) { return s_display_active; }
#if CONFIG_DISPLAY_ENABLE
#include <string.h>
@@ -162,6 +170,7 @@ esp_err_t display_task_start(void)
ESP_LOGI(TAG, "Display task started (Core %d, priority %d, %d fps)",
DISP_TASK_CORE, DISP_TASK_PRIORITY, DISP_FPS_LIMIT);
s_display_active = true;
return ESP_OK;
}
@@ -7,6 +7,7 @@
#define DISPLAY_TASK_H
#include "esp_err.h"
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
@@ -22,6 +23,15 @@ extern "C" {
*/
esp_err_t display_task_start(void);
/**
* @return true once an AMOLED panel has been detected and the display task
* is running; false on headless boards (no panel, or built without display
* support). Used to choose the CSI promiscuous filter (RuView#893): a board
* with no display has no QSPI/SPI-flash contention, so it can safely capture
* DATA frames for proper CSI yield instead of starving on MGMT-only.
*/
bool display_is_active(void);
#ifdef __cplusplus
}
#endif
+15
View File
@@ -410,6 +410,21 @@ void app_main(void)
}
#endif
/* RuView#893/#521: the MGMT-only promiscuous filter (set in
* csi_collector_init as the #396 display-crash workaround) starves the CSI
* callback on display-less boards — yield collapses to 0 pps and the node
* looks dead despite being on the network. Now that the display probe has
* run, boards with no AMOLED panel (no QSPI/SPI-flash cache contention)
* upgrade the filter to capture DATA frames too, restoring CSI yield. */
#ifdef CONFIG_DISPLAY_ENABLE
bool has_display = display_is_active(); /* runtime panel probe result */
#else
bool has_display = false; /* display support not compiled in */
#endif
if (!has_display) {
csi_collector_enable_data_capture();
}
ESP_LOGI(TAG, "CSI streaming active → %s:%d (edge_tier=%u, OTA=%s, WASM=%s, mmWave=%s, swarm=%s, adapt=%s)",
g_nvs_config.target_ip, g_nvs_config.target_port,
g_nvs_config.edge_tier,
Binary file not shown.
@@ -1,4 +1,4 @@
889715e9d698ad78f9978ad8b93b6af24a726b0494247201c8f0d920d9fc80ca *firmware/esp32-csi-node/release_bins/c6-adr110/bootloader.bin
d8539e47c6f10a3344679118619e3fe01cfd66eb560ea8883268ca7c9a12efa4 *firmware/esp32-csi-node/release_bins/c6-adr110/esp32-csi-node.bin
b0fb1f217a39c80bc95b5eb8208a0b8572ae64efa0f6d580b76caff4affe0f4d *firmware/esp32-csi-node/release_bins/c6-adr110/bootloader.bin
4764c5b20a353895f70122816adc98f861ec20e9a8ea9b344dc0648b6341073c *firmware/esp32-csi-node/release_bins/c6-adr110/esp32-csi-node.bin
7d2c7ac4888bfd75cd5f56e8d61f69595121183afc81556c876732fd3782c62f *firmware/esp32-csi-node/release_bins/c6-adr110/ota_data_initial.bin
4c2cc4ffd52641e23b779bd57b3908014083ac3c1aab395756478c89e70d81f0 *firmware/esp32-csi-node/release_bins/c6-adr110/partition-table.bin
@@ -1,3 +1,3 @@
3c4905dd202ccabf4230cbabcc9320f250a60b1a7254eff7424780201bcb2072 *firmware/esp32-csi-node/release_bins/s3-adr110/bootloader.bin
7a8bf9582c9031fed32f1ada44f5c41dd99bd07fadff8e5c86e07aa0f343e847 *firmware/esp32-csi-node/release_bins/s3-adr110/esp32-csi-node.bin
b973d7eda65affb746adcfa63ceb18f779f206d240b76f01b8c9ae7485455660 *firmware/esp32-csi-node/release_bins/s3-adr110/bootloader.bin
e21ef94aba779d534dc048c1b9da731c81e5dbe09d0645cfd70a05ad3642d3e9 *firmware/esp32-csi-node/release_bins/s3-adr110/esp32-csi-node.bin
67222c257c0477501fd4002275638dc4262b34eb68235b8289fb1337054d322b *firmware/esp32-csi-node/release_bins/s3-adr110/partition-table.bin
@@ -1,3 +1,4 @@
0.6.6
git-sha: cbcb389cb (pre-commit)
built: 2026-05-21
0.6.7
git-sha: 8703ade9b
built: 2026-06-02
note: RuView#893 — display-less boards capture DATA frames (CSI yield 0pps fix); hardware-verified on ESP32-C6 (0->27 pps)
@@ -21,6 +21,15 @@ const ENERGY_THRESH_2: f64 = 12.0;
/// Perturbation energy threshold for detecting a third person.
const ENERGY_THRESH_3: f64 = 25.0;
/// Maximum occupancy a single ESP32 link can plausibly resolve (#894).
/// The score heuristic (`score_to_person_count`) and the perturbation-energy
/// fallback below both cap here; the eigenvalue path is bounded to match,
/// rather than leaking its internal `min(10)` ceiling on noisy / under-
/// calibrated CSI (the "10 persons reported when 1 present" symptom).
/// Resolving more than this from one link's subcarrier covariance is not
/// reliable — genuine higher counts come from the multistatic fusion path.
const MAX_SINGLE_LINK_OCCUPANCY: usize = 3;
/// Create a FieldModelConfig for single-link mode (one ESP32 node = one link).
/// This avoids the DimensionMismatch error when feeding single-frame observations.
pub fn single_link_config() -> FieldModelConfig {
@@ -55,9 +64,15 @@ pub fn occupancy_or_fallback(
return score_to_person_count(smoothed_score, prev_count);
}
// Try eigenvalue-based occupancy first (best accuracy).
// Try eigenvalue-based occupancy first (best accuracy). Bound it to
// the same single-link maximum the sibling estimators use — the
// perturbation fallback below and score_to_person_count both cap at
// MAX_SINGLE_LINK_OCCUPANCY. Without this, estimate_occupancy's
// internal min(10) ceiling leaks up to 10 persons on noisy / under-
// calibrated CSI (#894), while every other path on the same data
// would report ≤3.
if let Ok(count) = field.estimate_occupancy(&frames) {
return count;
return count.min(MAX_SINGLE_LINK_OCCUPANCY);
} // else fall through to perturbation energy
// Fallback: perturbation energy thresholds.
@@ -6213,24 +6213,44 @@ async fn main() {
Some(_) => 1.0,
None => 0.0,
};
let snap = mqtt::state::VitalsSnapshot {
node_id: node_id.clone(),
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
let conf = cls["confidence"].as_f64().unwrap_or(0.0);
let presence_score = if presence { conf.max(0.0) } else { 0.0 };
let breathing = vit["breathing_rate_bpm"].as_f64();
let hr = vit["heart_rate_bpm"].as_f64();
// #898: emit one snapshot per physical node so each
// surfaces as its own Home-Assistant device (with
// its own RSSI + availability). Falls back to a
// single aggregate snapshot when there is no
// per-node data (e.g. wifi / simulate sources).
let mk = |nid: String, rssi: Option<f64>| mqtt::state::VitalsSnapshot {
node_id: nid,
timestamp_ms: ts,
presence,
motion,
presence_score: if presence {
cls["confidence"].as_f64().unwrap_or(1.0)
} else {
0.0
},
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
presence_score,
breathing_rate_bpm: breathing,
heartrate_bpm: hr,
n_persons,
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
rssi_dbm: rssi,
vital_confidence: conf,
..Default::default()
};
let _ = vtx.send(snap);
match v["nodes"].as_array() {
Some(arr) if !arr.is_empty() => {
for node in arr {
let n = node["node_id"].as_u64().unwrap_or(0);
let nid = format!("{node_id}-node{n}");
let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64()));
}
}
_ => {
let _ = vtx.send(mk(
node_id.clone(),
v["nodes"][0]["rssi_dbm"].as_f64(),
));
}
}
}
});
tracing::info!("MQTT publisher started -> {host}:{port}");
@@ -117,6 +117,23 @@ impl OwnedDiscoveryBuilder {
via_device: self.via_device.as_deref(),
}
}
/// Derive a per-node builder from this base (issue #898). Each physical
/// RuView node must surface as its own Home-Assistant device — the base
/// builder's `node_id` (the MQTT client id) is replaced with the actual
/// node id, giving a distinct `wifi_densepose_<node>` device identifier
/// and a per-node friendly name, instead of collapsing every node into a
/// single hard-coded device.
pub fn for_node(&self, node_id: &str) -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: self.discovery_prefix.clone(),
node_id: node_id.to_string(),
node_friendly_name: Some(format!("RuView node {node_id}")),
sw_version: self.sw_version.clone(),
model: self.model.clone(),
via_device: self.via_device.clone(),
}
}
}
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
@@ -129,20 +146,19 @@ async fn run(
let opts = build_mqtt_options(&cfg);
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
let builder_borrowed = builder_owned.as_borrowed();
let entities = DiscoveryBuilder::enabled_entities(
cfg.privacy_mode,
cfg.publish_pose,
&[], // no_semantic — wire from cli::Args in P3.5
);
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] initial discovery publish failed: {e}");
}
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] initial availability publish failed: {e}");
}
// #898: one Home-Assistant device per node. Discovery + availability are
// published lazily the first time a snapshot for a given node_id arrives;
// each node's builder + availability are retained here for heartbeats and
// the offline LWT. (Previously a single hard-coded builder collapsed every
// node into one device.)
let mut nodes: std::collections::HashMap<String, (OwnedDiscoveryBuilder, NodeAvailability)> =
std::collections::HashMap::new();
let mut rate_limiter = RateLimiter::new();
let mut last_heartbeat = Instant::now();
@@ -179,14 +195,20 @@ async fn run(
// Periodic heartbeat / discovery refresh.
_ = tokio::time::sleep(Duration::from_secs(1)) => {
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
for (_, na) in nodes.values() {
if let Err(e) = publish_availability(&client, na, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
}
}
last_heartbeat = Instant::now();
}
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] discovery refresh failed: {e}");
for (nb, _) in nodes.values() {
if let Err(e) =
publish_all_discovery(&client, &nb.as_borrowed(), &entities).await
{
warn!("[mqtt] discovery refresh failed: {e}");
}
}
last_refresh = Instant::now();
}
@@ -197,18 +219,39 @@ async fn run(
match recv {
Ok(snap) => {
let elapsed = start_instant.elapsed();
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
// #898: on first sight of a node_id, publish that
// node's discovery + availability; then route its
// state to per-node topics.
if !nodes.contains_key(&snap.node_id) {
let nb = builder_owned.for_node(&snap.node_id);
let borrowed = nb.as_borrowed();
if let Err(e) =
publish_all_discovery(&client, &borrowed, &entities).await
{
warn!("[mqtt] node {} discovery failed: {e}", snap.node_id);
}
let na = NodeAvailability::for_builder(&borrowed, &entities);
if let Err(e) = publish_availability(&client, &na, "online").await {
warn!("[mqtt] node {} availability failed: {e}", snap.node_id);
}
nodes.insert(snap.node_id.clone(), (nb, na));
}
let borrowed = nodes[&snap.node_id].0.as_borrowed();
publish_snapshot(&client, &borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
}
Err(broadcast::error::RecvError::Closed) => {
info!("[mqtt] broadcast channel closed, draining");
// Publish offline before exit.
let _ = publish_availability(&client, &avail, "offline").await;
// Publish offline for every known node before exit.
for (_, na) in nodes.values() {
let _ = publish_availability(&client, na, "offline").await;
}
let _ = client.disconnect().await;
return;
}
}
}
}
@@ -296,3 +339,52 @@ async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), Cli
};
client.publish(&m.topic, qos, m.retain, m.payload.clone()).await
}
#[cfg(test)]
mod per_node_device_tests {
//! Issue #898 — each physical node must surface as its own Home-Assistant
//! device, not collapse into one hard-coded device.
use super::*;
fn base() -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: "homeassistant".into(),
node_id: "wifi-densepose-1".into(),
node_friendly_name: Some("RuView".into()),
sw_version: "0.0.0".into(),
model: "test".into(),
via_device: None,
}
}
fn device_identifiers(b: &OwnedDiscoveryBuilder) -> Vec<String> {
b.as_borrowed().build(EntityKind::Presence).device.identifiers
}
#[test]
fn for_node_overrides_node_id_and_friendly_name() {
let n = base().for_node("node-A");
assert_eq!(n.node_id, "node-A");
assert_eq!(n.node_friendly_name.as_deref(), Some("RuView node node-A"));
}
#[test]
fn distinct_nodes_yield_distinct_ha_device_identifiers() {
let b = base();
let a = device_identifiers(&b.for_node("node-A"));
let c = device_identifiers(&b.for_node("node-B"));
assert_eq!(a, vec!["wifi_densepose_node-A".to_string()]);
assert_eq!(c, vec!["wifi_densepose_node-B".to_string()]);
assert_ne!(a, c, "#898: two nodes must not collapse into one device");
}
#[test]
fn single_node_keeps_a_stable_identity() {
// Two snapshots from the same node map to the same device.
let b = base();
assert_eq!(
device_identifiers(&b.for_node("node-7")),
device_identifiers(&b.for_node("node-7"))
);
}
}
@@ -171,12 +171,28 @@ async fn discovery_topics_appear_on_broker() {
// Spawn the publisher.
let cfg = make_cfg(port, false, "discovery");
let builder = make_builder("inttest1");
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// #898: discovery is now published per-node the first time a snapshot for
// that node_id arrives (not eagerly at startup). Drive snapshots for
// "inttest1" throughout the window so its device's discovery lands — same
// pattern as state_messages_published_on_snapshot_broadcast.
let tx_bg = tx.clone();
let drive = tokio::spawn(async move {
for _ in 0..60 {
let _ = tx_bg.send(VitalsSnapshot {
node_id: "inttest1".into(),
..Default::default()
});
tokio::time::sleep(Duration::from_millis(200)).await;
}
});
// Drain the subscriber for up to 6 s — enough for initial discovery
// + first availability publication.
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
drive.abort();
let _ = sub.disconnect().await;
// Assertions: at least the presence + heart_rate + fall discovery
@@ -221,10 +237,23 @@ async fn privacy_mode_suppresses_biometric_discovery() {
let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy");
let builder = make_builder("inttest2");
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// #898: per-node discovery is triggered by a snapshot for that node_id.
let tx_bg = tx.clone();
let drive = tokio::spawn(async move {
for _ in 0..60 {
let _ = tx_bg.send(VitalsSnapshot {
node_id: "inttest2".into(),
..Default::default()
});
tokio::time::sleep(Duration::from_millis(200)).await;
}
});
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
drive.abort();
let _ = sub.disconnect().await;
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();