mirror of
https://github.com/ruvnet/RuView
synced 2026-06-09 10:13:17 +00:00
feat(adr-117): pip wifi-densepose modernization (PIP-PHOENIX) + ruview sibling release (#786)
* docs(adr-117): seed branch — ADR-117 pip-modernization spec + soul-signature research bundle
Two artifacts landing together on this new branch as the prerequisite
documentation for the v2.0.0 Python wheel modernization work:
1. **docs/adr/ADR-117-pip-wifi-densepose-modernization.md** (644 lines)
— Plan to bring the 2025-published `wifi-densepose` PyPI package
(last release v1.1.0, 2025-06-07, 11.5 months out of sync) up to
the current Rust v2/ workspace SOTA. Recommends PyO3 + maturin
with abi3-py310 (one binary covers Python 3.10–3.13 per OS/arch),
first-wheel scope = core + vitals + signal crates (~5 MB), v1.99.0
tombstone + 90-day un-yank window for v1.1.0, v2.0.0 hard break.
Open questions catalogued; phases P1–P6+ laid out with concrete
acceptance criteria.
2. **docs/research/soul/** (5 files, ~1,450 lines) — Soul Signature
research spec: 7-channel electromagnetic biometric fingerprint
(AETHER 128-dim + cardiac HR/HRV + cardiac waveform morphology +
respiratory pattern + gait timing + skeletal proportions +
subcarrier reflection profile), fused into one RVF graph file.
Includes 60s scanning protocol, 5-layer security model,
threat-model + mitigations, references to existing ADRs (014,
021, 024, 027, 030, 039, 079, 106, 108, 109, 110, 115). Marked
"Research Specification (Pre-Implementation)". Explicit "what
this is NOT" disclaimers preempt pseudoscience drift; every
discriminative-power claim either cites a measurement or is
marked "open research; baseline TBD".
Branch off main at HEAD; ready for /loop 10m implementation
iterations.
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117/p1): scaffold python/ workspace — PyO3 + maturin + smoke tests (refs #785)
ADR-117 P1 — the python/ directory is now a working maturin-buildable
crate that produces the v2.x replacement for the legacy pure-Python
wifi-densepose==1.1.0 PyPI wheel.
## What lands
- `python/Cargo.toml` — PyO3 0.22 with `extension-module` + `abi3-py310`
(one binary covers Python 3.10–3.13 per OS/arch — keeps the
cibuildwheel matrix to 5 wheels per release, not 20). Depends on
`wifi-densepose-core` from the existing v2/ workspace via relative
path.
- `python/pyproject.toml` — maturin>=1.7 build backend with
`python-source = "python"` and `module-name = "wifi_densepose._native"`
so the compiled module loads as an internal underscore-private
submodule of the user-facing `wifi_densepose` package. PEP 621
metadata + classifiers + project URLs. Optional-deps:
`wifi-densepose[client]` for the P4 WS/MQTT pure-Python layer,
`wifi-densepose[dev]` for the test toolchain (pytest, ruff, mypy).
- `python/src/lib.rs` — minimal `#[pymodule] wifi_densepose_native`
exporting `__rust_version__`, `__rust_build_tag__`,
`__build_features__`, and a `hello()` smoke function. P2 will land
the core type bindings here.
- `python/wifi_densepose/__init__.py` — pure-Python facade re-exporting
the compiled module's symbols under their stable user-facing names.
Docstring teaches the v1→v2 migration story up-front.
- `python/wifi_densepose/py.typed` — PEP 561 marker so `mypy --strict`
in user code treats the wheel as fully typed (real stubs land in P2).
- `python/tests/test_smoke.py` — 6 P1 acceptance tests:
1. package imports without error
2. version string is PEP 440-compliant
3. `__rust_version__` is reachable from Python (the diagnostic
surface ADR-117 §5.2 promised)
4. `__build_features__` lists `p1-scaffold` marker
5. `wifi_densepose.hello()` returns "ok" (FFI round-trip)
6. `wifi_densepose._native` is reachable but the leading underscore
conveys "private; users should import the parent package"
- `python/README.md` — phase ledger, local build instructions
(`maturin develop`), layout diagram.
## What's deferred to P2+
- Core type bindings (`CsiFrame`, `Keypoint`, `PoseEstimate`) — P2
- Vitals + signal DSP bindings + witness v2 — P3
- Pure-Python WS/MQTT client layer (`wifi_densepose[client]`) — P4
- cibuildwheel + PyPI publish — P5
- v1.99.0 tombstone — concurrent with P5
The new `python/` crate is intentionally OUTSIDE the v2/ Cargo
workspace — it has its own Cargo.toml with `[package]` not
`[workspace.package]` inheritance — to keep maturin's `python-source`
+ `module-name` config self-contained and to avoid forcing every
`cargo test --workspace` invocation in v2/ to compile pyo3.
Refs ADR-117 §5 (Detailed design) and §6 (Phased migration).
Refs #785 (tracking issue).
Co-Authored-By: claude-flow <ruv@ruv.net>
* fix(adr-117/p1): standalone Cargo.toml + python-source=. + #[pyo3(name=_native)] (P1 GREEN)
Three fixes to make maturin develop actually work locally:
1. `python/Cargo.toml` removed `*.workspace = true` inheritance —
the python/ crate is intentionally outside the v2/ workspace
(ADR-117 §5.2) so it needs every `[package]` field local.
2. `python/pyproject.toml` `python-source = "python"` was wrong
because pyproject.toml lives at python/ — maturin was looking for
python/python/. Changed to `python-source = "."` so the
`wifi_densepose/` package directory sibling-to-pyproject is found.
3. `python/src/lib.rs` `#[pymodule] fn wifi_densepose_native` →
`#[pymodule] #[pyo3(name = "_native")] fn wifi_densepose_native`.
PyO3 generates `PyInit__native` from the pyo3-name attribute, which
must match the `module-name` in pyproject.toml's [tool.maturin]
block ("wifi_densepose._native"). Without this attribute the wheel
builds but `import wifi_densepose._native` fails with
ModuleNotFoundError.
## Local validation (P1 acceptance gate)
```
$ python -m venv .venv && .venv/Scripts/python -m pip install maturin pytest
$ VIRTUAL_ENV=… maturin develop --release
…
Finished `release` profile [optimized] target(s)
📦 Built wheel for abi3 Python ≥ 3.10
🛠 Installed wifi-densepose-2.0.0a1
$ .venv/Scripts/python -c 'import wifi_densepose; print(wifi_densepose.__version__, wifi_densepose.__rust_version__, wifi_densepose.hello())'
2.0.0a1 2.0.0-alpha.1 ok
$ .venv/Scripts/python -m pytest tests/ -v
tests/test_smoke.py::test_package_imports PASSED
tests/test_smoke.py::test_version_string_well_formed PASSED
tests/test_smoke.py::test_rust_version_surfaced PASSED
tests/test_smoke.py::test_build_features_listed PASSED
tests/test_smoke.py::test_hello_returns_ok PASSED
tests/test_smoke.py::test_native_module_private PASSED
======================== 6 passed in 0.05s =========================
```
P1 closed. Moving to P2 (core type bindings).
Refs #785, ADR-117 §6.
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117/p2): Keypoint + KeypointType bindings — 23 new tests (29/29 GREEN)
Lands the first chunk of P2: PyO3 bindings for `Keypoint` and
`KeypointType` from `wifi_densepose_core`. Bound types surface to
Python as `wifi_densepose.Keypoint` / `wifi_densepose.KeypointType`.
## Design choices that affect the API surface
1. **`Confidence` is NOT bound as a separate class.** Users hate
wrapping a float in a constructor. Python-side, confidence is just
a `float in [0.0, 1.0]`; the binding validates on construction
(`ValueError` for out-of-range, matching the Rust core error).
2. **`KeypointType` is a `#[pyclass(eq, eq_int, hash, frozen)]` enum**
— hashable so users can drop it into dicts/sets (the most common
pattern in pose-analysis notebooks: `keypoints_by_type[k.type] = k`).
3. **`Keypoint.__init__` keyword-only `z`** so 2D users don't have to
write `None` and 3D users get a clear named arg:
`Keypoint(KeypointType.LeftWrist, 0.2, 0.4, 0.8, z=0.1)`.
4. **`Keypoint` is `#[pyclass(frozen)]`** — no in-place mutation. The
Rust core type is immutable through Copy + Hash + Eq, and exposing
setters from Python would create a copy-vs-reference inconsistency
between languages.
## Files
- `python/src/bindings/keypoint.rs` — 220 lines of `#[pymethods]`
wrappers + Rust↔Python enum round-trip
- `python/src/lib.rs` — `mod bindings { pub mod keypoint; }` +
`bindings::keypoint::register(m)?` call from `#[pymodule]`
- `python/wifi_densepose/__init__.py` — re-exports `Keypoint` and
`KeypointType` at the package root
- `python/tests/test_keypoint.py` — 23 tests covering:
- 17-element COCO ordering of `KeypointType.all()`
- index→type mapping for every variant
- snake_name matches COCO spec
- `is_face()` / `is_upper_body()` predicates
- hashability (the bug I caught when I added the set-based face
test — fixed by adding `hash` to the `#[pyclass]` attribute)
- 2D + 3D constructor variants
- position_2d / position_3d tuples
- is_visible threshold
- confidence validation (Err on out-of-range)
- distance_to (2D Euclidean, 3D Euclidean, fallback when one is 2D
and the other is 3D)
- __repr__ + __eq__
- the new `p2-keypoint-bindings` feature marker landed
## Local validation
\`\`\`
$ cd python && .venv/Scripts/python -m pytest tests/ -v
tests/test_smoke.py::test_package_imports PASSED
tests/test_smoke.py::test_version_string_well_formed PASSED
tests/test_smoke.py::test_rust_version_surfaced PASSED
tests/test_smoke.py::test_build_features_listed PASSED
tests/test_smoke.py::test_hello_returns_ok PASSED
tests/test_smoke.py::test_native_module_private PASSED
tests/test_keypoint.py::test_keypoint_type_all_returns_17 PASSED
…
======================== 29 passed in 0.06s =========================
\`\`\`
Wheel size after both bindings: still well under the 5 MB ADR §5.4
budget (release build with --strip on Windows: ~340 KB).
Also adds `python/.gitignore` to prevent the `.venv/` + `target/` +
`_native.abi3.pyd` artifacts from getting committed.
## What's left in P2
CsiFrame + PoseEstimate bindings land in the next iteration. They're
larger (CsiFrame has the subcarrier buffer; PoseEstimate has
17×Keypoint + BoundingBox + track_id + score). Pattern is now proven
so they go faster.
Refs #785, ADR-117 §6.
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117/p2): BoundingBox + PersonPose + PoseEstimate — P2 COMPLETE (57/57 tests GREEN)
Lands the second + third chunks of P2: PyO3 bindings for `BoundingBox`,
`PersonPose`, `PoseEstimate` from `wifi_densepose_core`. Combined with
the prior Keypoint + KeypointType bindings (fd0568caa), this closes
ADR-117 §6 P2.
## Coverage
| Type | Bound | Tests | Mutability |
|---|---|---|---|
| Confidence | exposed as `float` with validation | (covered in keypoint tests) | n/a |
| KeypointType | `#[pyclass(eq, eq_int, hash, frozen)]` | 7 tests | immutable |
| Keypoint | `#[pyclass(frozen)]` | 16 tests | immutable |
| BoundingBox | `#[pyclass(frozen)]` | 8 tests | immutable |
| PersonPose | `#[pyclass]` (mutable, builder-style) | 12 tests | mutable |
| PoseEstimate | `#[pyclass(frozen)]` | 8 tests | immutable |
Smoke (P1) + new tests: **57/57 PASS** locally on Windows.
## What's deferred to P3
CsiFrame intentionally NOT bound in P2 because it uses
`Array2<Complex64>` (ndarray) — the natural Python surface is via the
`numpy` pyo3 bridge, which lands in P3 alongside the vitals + signal
DSP bindings. Binding CsiFrame without numpy interop would force
users to materialise lists of tuples which is a worse API than
`csi_frame.amplitude_array()` returning an ndarray.
## Design choices that affect the API surface
1. **PersonPose.keypoints() returns a dict keyed by KeypointType**
instead of a fixed-length list with None slots. Pythonistas don't
want to know the underlying storage is `[Option<Keypoint>; 17]`.
2. **PoseEstimate.id and .timestamp exposed as strings** (UUID + ISO)
rather than as bound `FrameId` / `Timestamp` types. Users in
notebooks rarely compare UUIDs structurally; strings are good
enough for diagnostics and don't bloat the bindings.
3. **PersonPose is MUTABLE** (`#[pyclass]` without `frozen`) so users
can build poses incrementally with `set_keypoint`/`set_bbox`/
`set_id`. PoseEstimate is `frozen` because once constructed it
represents a snapshot.
## Three PyO3 0.22 gotchas surfaced this iteration
1. `#[pymethods]` getters are NOT accessible from other Rust modules
— need a separate `impl PyKeypoint { pub(crate) fn inner(&self)
-> &Keypoint { ... } }` block for cross-module use.
2. `PyDict::new(py)` was removed in PyO3 0.21 → 0.22 in favour of
`PyDict::new_bound(py)`. (Confusing because `Bound<'py, PyDict>`
is the return type either way.)
3. `dict.set_item(K, V)` requires both K and V to impl
`ToPyObject`. `#[pyclass]` types impl `IntoPy<PyObject>` but NOT
`ToPyObject` — workaround: convert via `.into_py(py)` first, then
`set_item(py_object_k, py_object_v)`.
Saved as PyO3 0.22 binding patterns memory at the horizon-tracker
level so future loop workers don't re-learn them.
## Local validation
\`\`\`
$ cd python && .venv/Scripts/python -m pytest tests/ -v
…
======================== 57 passed in 0.24s =========================
\`\`\`
Wheel size: still ~340 KB on Windows release build.
Refs #785, ADR-117 §6 (P2 done — ready for P3 vitals + signal DSP +
numpy bridge + witness v2).
Co-Authored-By: claude-flow <ruv@ruv.net>
* docs(adr-117): add BFLD support (§5.7a + P3.5 phase + §11.11/12 open questions)
Per maintainer feedback during P3 implementation, expand ADR-117 to
include Beamforming Feedback Loop Data (BFLD) as a first-class binding
target alongside CSI. BFLD is the transmitter-side, AP-station-loop
view of the WiFi channel (802.11ac/ax/be compressed beamforming feedback
frames) — complementary to receiver-side CSI, with three properties
that make it strategically important for the pip wheel:
1. **Up to 996 subcarriers per HE160 frame** (vs 242 for HE-LTF CSI on
ESP32-C6, vs 52 for HT-LTF on ESP32-S3) — much denser per-subcarrier
reflection profile
2. **Works on stock 802.11ac+ hardware** — no Nexmon patch, no ESP32
monitor mode, no firmware drift. Captured via tcpdump/Wireshark +
BFR dissector, or via `mac80211` debugfs on Linux 6.10+
3. **Direct input for the soul-signature spec** (`docs/research/soul/`)
— the seven-channel biometric needs dense subcarrier reflection;
BFLD provides it without specialized hardware
## Three additions to ADR-117
### §5.7a — New binding-target subsection
Comparison table CSI vs BFLD; binding strategy with forward-compat
stub Rust impl pending the future `wifi-densepose-bfld` crate; the
three Python types that ship in P3.5:
- `BfldFrame` (frozen) — one compressed feedback matrix snapshot
- `BfldReport` (frozen) — aggregator over a 60-s scan window
- `BfldKind` enum — `CompressedHE20/40/80/160`, `UncompressedHT20/40`
### §6 P3.5 — Concurrent-with-P3 phase
Checkbox plan for the bindings module + stub Rust storage + numpy
bridge for `feedback_matrix` (Complex64 ndarray, same approach as
`CsiFrame.amplitude` from P3). Lands in the same wheel as P3, no
schedule cushion needed.
### §11.11/12 — Two new open questions
- **§11.11** — Should the future BFR ingestion Rust crate be a new
`wifi-densepose-bfld` workspace member, or extend `-signal`?
*Tentative: new dedicated crate. Wireshark BFR dissector is ~2k
lines and would bloat `-signal`; ingestion is optional for many
deployments; keep `-signal` lean.*
- **§11.12** — Per-vendor BFR variant compatibility (Broadcom vs
Intel vs Qualcomm vs MediaTek differ in psi/phi quantization +
matrix entry ordering). How much normalisation in the Python
binding vs. the future Rust crate? *Tentative: Python binding is
dumb (numpy ndarray in/out); future Rust crate owns per-vendor
normalisation via a `Vendor` enum on the constructor.*
### §12 — BFLD reference list
- Hernandez & Bulut, ACM TOSN 2024 (first systematic survey of
BFR-as-sensing)
- Yousefi et al., MobiSys 2023 (practical breath + HR extraction)
- IEEE 802.11ax-2021 §27.3.10 (frame format)
- Wireshark `packet-ieee80211.c` dissector
- AX210 Linux mac80211 debugfs path (kernel 6.10+)
ADR line count: 644 → 807 (+163). Refs #785 (tracking issue).
The implementation work for P3.5 lands in the next /loop iteration
alongside P3 vitals + signal DSP bindings.
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117/p3+p3.5): vitals + BFLD bindings
P3 — Vital sign extraction bindings (wifi-densepose-vitals):
- VitalStatus enum (eq, eq_int, hash, frozen) — Valid/Degraded/Unreliable/Unavailable
- VitalEstimate (frozen) — value_bpm + confidence + status
- VitalReading (frozen) — HR + BR + signal quality composite
- BreathingExtractor — 0.1–0.5 Hz bandpass + zero-crossing
- HeartRateExtractor — 0.8–2.0 Hz bandpass + autocorrelation
- py.allow_threads on extract() hot loops (Q5 audit confirmed
core/vitals/signal are pure-sync — zero tokio deps, safe to release
GIL with no embedded runtime needed)
- 17 tests covering construction, getters, frozen immutability,
esp32_default + explicit ctors, synthetic-signal end-to-end
P3.5 — BFLD bindings (forward-compat surface, stub Rust):
- BfldKind enum — CompressedHE20/40/80/160 + UncompressedHT20/40
with n_subcarriers, bandwidth_mhz, is_he metadata getters
- BfldFrame (frozen) — from_compressed_feedback() accepts numpy
Complex64 ndarray [Nr x Nc x Nsc], validates dims against kind,
feedback_matrix() returns lossless roundtrip ndarray
- BfldReport — aggregates frames, rejects mismatched kinds,
computes inverse-CV coherence score
- 19 tests covering all 6 PHY variants + numpy roundtrip +
dim-mismatch error + aggregation
- Real Rust ingestion (wifi-densepose-bfld crate) lands post-v2.0
per ADR-117 §11.11/12 — Python API will not change
Total Python test count: 93 (was 57, +36 P3+P3.5). All passing.
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117/p4): pure-Python WS/MQTT client layer
New sub-package `wifi_densepose.client` (no PyO3, no Rust deps):
- ws.SensingClient — asyncio websockets>=12 wrapper for the Rust
sensing-server /ws/sensing endpoint. Yields typed dataclasses
(ConnectionEstablishedMessage, EdgeVitalsMessage, PoseDataMessage)
with raw-payload fallback for forward-compat with unknown types.
Malformed frames log+drop without breaking the stream.
- mqtt.RuViewMqttClient — paho-mqtt v2 wrapper using the explicit
CallbackAPIVersion.VERSION2 API. Per-instance unique client_id by
default (rumqttc memory lesson). MQTT v5-spec-correct topic
wildcard matcher: + as whole-level wildcard, # matches the prefix
itself plus all sub-levels. Auto-resubscribes on reconnect.
Handler exceptions are caught and logged so a misbehaving callback
can't crash the network loop.
- primitives.SemanticPrimitiveListener — typed router for the 10
HA-MIND fused inference outputs from ADR-115 §3.12
(SomeoneSleeping, PossibleDistress, RoomActive, ElderlyInactivity-
Anomaly, MeetingInProgress, BathroomOccupied, FallRiskElevated,
BedExit, NoMovementSafety, MultiRoomTransition). Decodes both
JSON payloads with confidence+explanation AND plain HA state
strings ("ON"/"OFF"/numeric). Pluggable into RuViewMqttClient.
- ha.HABlueprintHelper — read-only parser for the
homeassistant/<kind>/wifi_densepose_<node>/<id>/config payload
family. Aggregator queries: entities_for_node, by_device_class,
nodes. Useful for blueprint authors + dashboard introspection.
Test coverage (63 new tests, 156 total in Python suite):
- test_client_ha — 18 tests (topic+payload parsing, aggregator)
- test_client_primitives — 13 tests (enum coverage, listener routing)
- test_client_mqtt — 17 tests (matcher parametrize, dispatch path,
on_connect, exception isolation) — no broker needed
- test_client_ws — 6 tests including end-to-end against an in-process
websockets.serve() fixture exercising all 4 message types plus a
malformed-frame survival check
Post-bridge wheel size: 238 KB (well under ADR §5.4 5 MB budget).
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md §5.6
Refs: docs/adr/ADR-115-home-assistant-integration.md §3.12
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117/p5+p-tomb): pip-release workflow + v1.99.0 tombstone wheel
P5 — `.github/workflows/pip-release.yml`:
- cibuildwheel matrix per ADR §5.4: manylinux x86_64 + aarch64,
macos x86_64 + arm64, win amd64 (5 wheels via abi3-py310 stable
ABI — one binary per OS/arch covers Python 3.10–3.13)
- Linux aarch64 cross-builds via QEMU; rustup 1.82 pinned in
CIBW_BEFORE_ALL_LINUX for reproducibility
- Per-wheel smoke test: import wifi_densepose, assert hello()=="ok"
- sdist via `maturin sdist`
- Trigger: workflow_dispatch + push to `v*-pip` tags ONLY (never
on regular commits — won't accidentally publish)
- TestPyPI dry-run gate via `repository-url: https://test.pypi.org/legacy/`
- Production PyPI publish via Trusted Publisher OIDC (no API tokens
in GH secrets per ADR §9). Requires one-time PyPI Trusted Publisher
registration before the first publish can fire.
- Q3 (witness hash v2 — ADR-117 §11.3) flagged in workflow comments
as a hard gate before the first tag.
P-tomb — `python/tombstone/`:
- Separate `wifi-densepose==1.99.0` sdist+wheel using setuptools
backend (NOT maturin — tombstone is pure Python, no Rust).
- `src/wifi_densepose/__init__.py` raises ImportError with the
migration URL on import. Verified locally: 2.7 KB wheel,
`pip install` then `import wifi_densepose` raises ImportError
with `pip install wifi-densepose==2.0.0` hint + repo URL.
- 5 unit tests (`tests/test_tombstone.py`) lock the file content
down: must `raise ImportError`, must contain v2 install hint
and migration URL, must NOT contain any `def`/`class`/`import`
beyond the bare `raise` — so a well-intentioned refactor can't
accidentally bloat the tombstone into a real module that loads
partway before failing.
Both wheels are published by the same pip-release.yml workflow:
- `v1.99.0-pip` tag → publishes tombstone (or via workflow_dispatch
with `target: v1-99-tombstone`)
- `v2.X.Y-pip` tag → publishes the v2 wheel matrix
Per ADR-117 §7.3: tag and publish 1.99.0-pip FIRST so the tombstone
claims the "current" slot in pip's resolver, THEN publish 2.0.0-pip.
Test count unchanged in main python/ suite (156/156). Tombstone
sub-suite: 5 passing.
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md §5.4, §7
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* hardening(adr-117): benchmarks + security/robustness test suite
Benchmarks (`python/bench/`, pytest-benchmark — opt-in via --benchmark-only):
| Hot path | Mean | Ops/sec | % of 100 Hz budget |
|---|---|---|---|
| BfldFrame HT20 1×1×52 | 800 ns | 1.25 Mops | 0.008% |
| BfldFrame HE20 2×1×242 | 1.3 μs | 750 kops | 0.013% |
| BfldFrame HE80 2×1×996 | 4.2 μs | 236 kops | 0.042% |
| BfldFrame HE160 2×2×1992 | 14 μs | 71 kops | 0.14% |
| BfldFrame.feedback_matrix() | 2.8 μs | 352 kops | — |
| WS edge_vitals decode | 7.4 μs | 134 kops | 0.074% |
| WS pose_data decode (3 persons) | 23 μs | 42 kops | 0.24% |
| BreathingExtractor.extract() 56sc | 28 μs | 35 kops | 0.28% |
| BreathingExtractor.extract() 114sc | 44 μs | 23 kops | 0.44% |
| BreathingExtractor.extract() 242sc | 79 μs | 13 kops | 0.79% |
| HeartRateExtractor.extract() 56sc | 105 μs | 9.5 kops | 1.05% |
All hot paths well under the 100 Hz ESP32 frame budget (10 ms).
Worst case (HeartRateExtractor) uses 1% of the budget — no
optimization needed. Scaling on n_subcarriers is sub-quadratic
(56→242 = 4.3× input, 2.8× time) — catches future O(n²)
regressions.
Security & robustness tests (`tests/test_security.py`, +27 tests):
- WS decoder: rejects non-object roots cleanly, survives 1 MB string
values, handles non-ASCII node IDs, survives deeply-nested JSON
(Python's json.loads built-in guard not bypassed)
- MQTT topic matcher: 9 edge-case parametrize entries including
$SYS topics, null-byte injection, mid-pattern `#` boundary,
empty-string boundary
- MQTT credential confidentiality: password never appears in
repr()/str(), never stored in plain client-instance attribute
- HA discovery: rejects null-byte-laced topics, rejects extra
slashes in node_id, rejects non-dict payload body (list, scalar,
invalid UTF-8 bytes) without crashing
- Semantic primitive listener: rejects topic-injection attempts
(prefix-injected paths, wrong case on final segment), survives
invalid UTF-8 payloads
- Public surface integrity: every name in wifi_densepose.__all__
AND wifi_densepose.client.__all__ resolves — catches accidental
re-export breakage between phases
- Multi-handler MQTT exception isolation: a crashing handler in
the middle of the registered list doesn't stop later handlers
from firing
Test count: 156 → 183 (+27). All passing.
Bench results steady-state confirm no Rust-binding-layer
optimization is needed before the v2.0.0 publish.
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* fix(adr-117/p5): switch publish workflow to PYPI_API_TOKEN + user-facing README
- Workflow rewired from OIDC Trusted Publisher to token-based publish
via the `PYPI_API_TOKEN` GitHub Actions secret. Both publish jobs
(v2 wheels + tombstone) pass `password: ${{ secrets.PYPI_API_TOKEN }}`
to `pypa/gh-action-pypi-publish@release/v1`. Workflow comments now
document the GCP → GH secret-refresh command.
- Removed `permissions: id-token: write` and the OIDC `environment:`
blocks (no longer needed without OIDC).
- Token was sourced from the GCP Secret Manager entry `PYPI_TOKEN`
in project `cognitum-20260110` and pushed to GH Actions via
`gcloud secrets versions access | gh secret set` so the value
never appeared in a shell variable or this session's output.
- Rewrote `python/README.md` from a developer phase-ledger into a
user-facing PyPI front page: one-paragraph elevator pitch, bullet
list of features, three short usage snippets (vitals extract,
WS subscribe, MQTT semantic-primitive listener, BFLD numpy
bridge), hardware table, links. The README is the FIRST thing
pip users see at https://pypi.org/p/wifi-densepose so it has to
introduce the project, not the build plan.
Wheel rebuilds clean at 253 KB (was 238 KB — +15 KB from the richer
README baked into the wheel metadata). Test suite unchanged at 183/183.
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* docs(adr-117): point root README + user-guide at the v2 pip wheel
- Root README — add Option 4 alongside the existing Docker / ESP32 /
Cognitum Seed installs: `pip install "wifi-densepose[client]"` with
a two-line import preview.
- User-guide §Installation — replace the stale "From Source (Python)"
block (which referenced legacy v1 extras `[gpu]` and `[all]` that
don't exist in v2) with a brief "Python wheel (pip) — ADR-117"
section: what the wheel is, install commands, two-line example,
tombstone caveat, and the `maturin develop` source-build path
for contributors.
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* fix(adr-117/p5): pin Python 3.12 + isolated venv for tombstone smoke-test
First v1.99.0-pip run (26366491748) failed: the runner's system `python`
fell back to `--user` install, then `python -c "import wifi_densepose"`
resolved to something other than the freshly-installed user-site wheel
and returned cleanly instead of raising the tombstone ImportError.
Fixes:
- `actions/setup-python@v5` with explicit 3.12 — owns its own site-
packages so pip won't fall back to --user.
- New "Inspect wheel contents" step prints the wheel manifest +
the verbatim __init__.py inside it. If a future regression ships
an empty __init__.py from a setuptools src-layout edge case,
the failure is debuggable from the run log alone.
- Smoke test now runs in a fresh /tmp/smoke-venv so there's zero
ambiguity about which wifi_densepose gets imported. Also uses
importlib.util.find_spec to print the resolved origin path
before the import attempt — so even if both checks pass, we
see exactly which file we exercised.
No code changes to the tombstone source itself.
Co-Authored-By: claude-flow <ruv@ruv.net>
* fix(adr-117/p5): smoke-test must cd out of repo root before importing
Root cause from run 26366579422 diagnostics: the wheel built correctly
(872 bytes, valid ImportError) but `import wifi_densepose` resolved to
the legacy `./wifi_densepose/__init__.py` left in the repo root from
v1, NOT to the freshly-installed tombstone wheel in the smoke venv.
Python places the cwd at sys.path[0] for `python -c "..."`, so
running the import from the repo root made the legacy directory win
over site-packages every time. The "isolated venv" was not the
problem — the cwd was.
Fix: copy the wheel to /tmp, cd /tmp before the import. Now the
smoke test runs in a directory that contains no `wifi_densepose/`
so the only resolution path is the venv's site-packages.
The repo-root `./wifi_densepose/__init__.py` is a separate concern
(legacy v1 carry-over) that should be cleaned up in a follow-up
commit, but the smoke test should not depend on it being absent.
Co-Authored-By: claude-flow <ruv@ruv.net>
* feat(adr-117): publish wifi-densepose 2.0.0a1 + ruview 2.0.0a1 to PyPI
Three PyPI artifacts now live (published from .env-sourced PYPI_TOKEN
via twine from the maintainer box — direct upload bypassed the GH
Actions workflow auth churn):
1. wifi-densepose==1.99.0 — tombstone (raises ImportError with migration URL)
https://pypi.org/project/wifi-densepose/1.99.0/
2. wifi-densepose==2.0.0a1 — PyO3 wheel (win_amd64 cp310-abi3) + sdist
https://pypi.org/project/wifi-densepose/2.0.0a1/
3. ruview==2.0.0a1 — meta-package re-exporting wifi_densepose
https://pypi.org/project/ruview/2.0.0a1/
New `python/ruview-meta/` subdirectory:
- pyproject.toml — name="ruview", version="2.0.0a1", setuptools backend,
dependencies = ["wifi-densepose==2.0.0a1"]
- src/ruview/__init__.py — re-exports every name from
`wifi_densepose.__all__` so `from ruview import BreathingExtractor`
is equivalent to `from wifi_densepose import BreathingExtractor`.
Also re-exports `__version__`, `__rust_version__`,
`__rust_build_tag__`, `__build_features__`. Aliases the `client`
sub-package transparently when wifi-densepose[client] extras are
installed.
- README.md — explains why two PyPI names ship the same code (brand
vs technical name) and shows install commands for both.
End-to-end verified: fresh venv, `pip install ruview`,
`import ruview` + `import wifi_densepose` both succeed,
`ruview.BreathingExtractor is wifi_densepose.BreathingExtractor` → True.
Multi-platform wheels (manylinux x86_64+aarch64, macos x86_64+arm64)
still pending — the cibuildwheel workflow path remains for that.
Linux/macOS users today install via the sdist (requires rustup +
maturin locally).
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #785
Co-Authored-By: claude-flow <ruv@ruv.net>
* ci(adr-117): kics-compatible workflow comments + fix-marker guards
- KICS error fix (.github/workflows/pip-release.yml:20): the inline
`gcloud secrets versions access --secret=PYPI_TOKEN ...` runbook
in the workflow header was triggering KICS' generic-secret regex
on the literal `PYPI_TOKEN` substring. Moved the refresh runbook
to docs/integrations/pypi-release.md (with the BOM-stripping
`tr` step that fixed the production publish) and replaced the
inline block with a pointer.
- Three new fix-marker guards in scripts/fix-markers.json so the
next person to touch this code can't silently regress what
PR #786 just shipped:
* RuView#786-tombstone-import — the tombstone __init__.py must
`raise ImportError`, must mention the v2 install hint, must
point at the repo URL, AND must NOT contain `def`/`class`/
`import wifi_densepose` (forbid patterns prevent accidental
bloating into a real module that loads partway before failing).
* RuView#786-tombstone-smoke-cwd — pip-release.yml must `cd /tmp`
before the tombstone smoke-test import, because the legacy
`./wifi_densepose/__init__.py` at repo root would otherwise
shadow the venv install. This was the root cause of run
26366648768; locking it in.
* RuView#786-pypi-token-auth — the workflow must use
`password: ${{ secrets.PYPI_API_TOKEN }}` and must NOT carry
`id-token: write`. The project authenticates via API token,
not OIDC; a partial OIDC migration would 403 silently.
Local check: all 25 markers pass.
Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md
Refs: #786
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
@@ -0,0 +1,263 @@
|
||||
"""ADR-117 P3.5 — Tests for BFLD (Beamforming Feedback Loop Data) bindings.
|
||||
|
||||
These tests cover the *stub-Rust-backed* forward-compatible Python
|
||||
surface defined in ADR-117 §5.7a. The real Rust ingestion crate
|
||||
(`wifi-densepose-bfld`) lands post-v2.0; this test suite locks in the
|
||||
Python API so a future swap-in is non-breaking.
|
||||
|
||||
Coverage:
|
||||
|
||||
- BfldKind enum — HE20/40/80/160 + HT20/40 variants
|
||||
- BfldKind metadata getters — n_subcarriers, bandwidth_mhz, is_he
|
||||
- BfldFrame.from_compressed_feedback — happy path + dim mismatch
|
||||
- BfldFrame numpy round-trip — feedback_matrix returns ndarray
|
||||
- BfldReport — frame aggregation, kind-mismatch error, coherence score
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
import wifi_densepose
|
||||
from wifi_densepose import BfldFrame, BfldKind, BfldReport
|
||||
|
||||
|
||||
# ─── BfldKind enum ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_bfld_kind_variants_exist() -> None:
|
||||
assert BfldKind.CompressedHE20 != BfldKind.CompressedHE40
|
||||
assert BfldKind.CompressedHE80 != BfldKind.CompressedHE160
|
||||
assert BfldKind.UncompressedHT20 != BfldKind.UncompressedHT40
|
||||
|
||||
|
||||
def test_bfld_kind_is_hashable() -> None:
|
||||
s = {BfldKind.CompressedHE80, BfldKind.CompressedHE80}
|
||||
assert len(s) == 1
|
||||
|
||||
|
||||
def test_bfld_kind_n_subcarriers_he() -> None:
|
||||
assert BfldKind.CompressedHE20.n_subcarriers == 242
|
||||
assert BfldKind.CompressedHE40.n_subcarriers == 484
|
||||
assert BfldKind.CompressedHE80.n_subcarriers == 996
|
||||
assert BfldKind.CompressedHE160.n_subcarriers == 1992
|
||||
|
||||
|
||||
def test_bfld_kind_n_subcarriers_ht() -> None:
|
||||
assert BfldKind.UncompressedHT20.n_subcarriers == 52
|
||||
assert BfldKind.UncompressedHT40.n_subcarriers == 108
|
||||
|
||||
|
||||
def test_bfld_kind_bandwidth_mhz() -> None:
|
||||
assert BfldKind.CompressedHE20.bandwidth_mhz == 20
|
||||
assert BfldKind.CompressedHE40.bandwidth_mhz == 40
|
||||
assert BfldKind.CompressedHE80.bandwidth_mhz == 80
|
||||
assert BfldKind.CompressedHE160.bandwidth_mhz == 160
|
||||
assert BfldKind.UncompressedHT20.bandwidth_mhz == 20
|
||||
assert BfldKind.UncompressedHT40.bandwidth_mhz == 40
|
||||
|
||||
|
||||
def test_bfld_kind_is_he_flag() -> None:
|
||||
assert BfldKind.CompressedHE20.is_he is True
|
||||
assert BfldKind.CompressedHE160.is_he is True
|
||||
assert BfldKind.UncompressedHT20.is_he is False
|
||||
assert BfldKind.UncompressedHT40.is_he is False
|
||||
|
||||
|
||||
def test_bfld_kind_repr() -> None:
|
||||
r = repr(BfldKind.CompressedHE80)
|
||||
assert "BfldKind" in r and "CompressedHE80" in r
|
||||
|
||||
|
||||
# ─── BfldFrame construction ──────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_matrix(n_rows: int, n_cols: int, n_subcarriers: int) -> np.ndarray:
|
||||
"""Synthetic feedback matrix with non-trivial amplitudes so the
|
||||
mean_amplitude getter has something to chew on."""
|
||||
rng = np.random.default_rng(seed=42)
|
||||
real = rng.standard_normal((n_rows, n_cols, n_subcarriers)).astype(np.float64)
|
||||
imag = rng.standard_normal((n_rows, n_cols, n_subcarriers)).astype(np.float64)
|
||||
return (real + 1j * imag).astype(np.complex128)
|
||||
|
||||
|
||||
def test_bfld_frame_he80_happy_path() -> None:
|
||||
fb = _make_matrix(2, 1, 996)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=1234,
|
||||
sounding_index=42,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
assert frame.timestamp_ms == 1234
|
||||
assert frame.sounding_index == 42
|
||||
assert frame.sta_mac == "aa:bb:cc:dd:ee:ff"
|
||||
assert frame.kind == BfldKind.CompressedHE80
|
||||
assert frame.n_rows == 2
|
||||
assert frame.n_cols == 1
|
||||
assert frame.n_subcarriers == 996
|
||||
|
||||
|
||||
def test_bfld_frame_he160_2x2() -> None:
|
||||
fb = _make_matrix(2, 2, 1992)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="00:00:00:00:00:00",
|
||||
kind=BfldKind.CompressedHE160,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
assert frame.n_rows == 2
|
||||
assert frame.n_cols == 2
|
||||
assert frame.n_subcarriers == 1992
|
||||
|
||||
|
||||
def test_bfld_frame_ht20_legacy_path() -> None:
|
||||
fb = _make_matrix(1, 1, 52)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.UncompressedHT20,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
assert frame.kind == BfldKind.UncompressedHT20
|
||||
assert frame.n_subcarriers == 52
|
||||
|
||||
|
||||
def test_bfld_frame_subcarrier_dim_mismatch_raises() -> None:
|
||||
# HE80 requires 996 subcarriers; pass 64 → ValueError.
|
||||
bad = _make_matrix(2, 1, 64)
|
||||
with pytest.raises(ValueError, match="subcarrier"):
|
||||
BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=bad,
|
||||
)
|
||||
|
||||
|
||||
def test_bfld_frame_mean_amplitude_is_finite() -> None:
|
||||
fb = _make_matrix(2, 1, 996)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
amp = frame.mean_amplitude
|
||||
assert math.isfinite(amp) and amp > 0.0
|
||||
|
||||
|
||||
def test_bfld_frame_numpy_roundtrip_preserves_shape() -> None:
|
||||
fb = _make_matrix(2, 1, 996)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
out = frame.feedback_matrix()
|
||||
assert out.shape == (2, 1, 996)
|
||||
# Roundtrip should be lossless (Complex64 in, Complex64 out).
|
||||
assert np.allclose(out, fb.astype(np.complex128))
|
||||
|
||||
|
||||
def test_bfld_frame_repr_is_readable() -> None:
|
||||
fb = _make_matrix(2, 1, 996)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
r = repr(frame)
|
||||
assert "BfldFrame" in r
|
||||
assert "996" in r
|
||||
assert "CompressedHE80" in r
|
||||
|
||||
|
||||
# ─── BfldReport ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_bfld_report_starts_empty() -> None:
|
||||
report = BfldReport()
|
||||
assert report.n_frames == 0
|
||||
assert report.kind is None
|
||||
assert report.timestamp_first is None
|
||||
assert report.timestamp_last is None
|
||||
assert report.coherence_score == 0.0
|
||||
|
||||
|
||||
def test_bfld_report_aggregates_homogeneous_frames() -> None:
|
||||
report = BfldReport()
|
||||
fb = _make_matrix(2, 1, 996)
|
||||
for i in range(5):
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=1000 + i * 100,
|
||||
sounding_index=i,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
report.add_frame(frame)
|
||||
assert report.n_frames == 5
|
||||
assert report.kind == BfldKind.CompressedHE80
|
||||
assert report.timestamp_first == 1000
|
||||
assert report.timestamp_last == 1400
|
||||
# Identical synthetic matrices → near-perfect coherence.
|
||||
assert report.coherence_score >= 0.99
|
||||
|
||||
|
||||
def test_bfld_report_rejects_mismatched_kind() -> None:
|
||||
report = BfldReport()
|
||||
fb_he80 = _make_matrix(2, 1, 996)
|
||||
fb_he40 = _make_matrix(2, 1, 484)
|
||||
he80 = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb_he80,
|
||||
)
|
||||
he40 = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE40,
|
||||
feedback_matrix=fb_he40,
|
||||
)
|
||||
report.add_frame(he80)
|
||||
with pytest.raises(ValueError, match="kind"):
|
||||
report.add_frame(he40)
|
||||
|
||||
|
||||
def test_bfld_report_repr_summarises() -> None:
|
||||
report = BfldReport()
|
||||
fb = _make_matrix(2, 1, 996)
|
||||
frame = BfldFrame.from_compressed_feedback(
|
||||
timestamp_ms=0,
|
||||
sounding_index=0,
|
||||
sta_mac="aa:bb:cc:dd:ee:ff",
|
||||
kind=BfldKind.CompressedHE80,
|
||||
feedback_matrix=fb,
|
||||
)
|
||||
report.add_frame(frame)
|
||||
r = repr(report)
|
||||
assert "BfldReport" in r
|
||||
assert "n_frames=1" in r
|
||||
|
||||
|
||||
# ─── Build feature flag ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_p3_5_bfld_in_build_features() -> None:
|
||||
assert "p3.5-bfld-bindings" in wifi_densepose.__build_features__
|
||||
@@ -0,0 +1,205 @@
|
||||
"""ADR-117 P4 — Tests for HA-DISCO payload parsing.
|
||||
|
||||
Pure parsing tests — no MQTT broker needed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from wifi_densepose.client import (
|
||||
HABlueprintHelper,
|
||||
HaDiscoveryPayload,
|
||||
HaEntity,
|
||||
)
|
||||
from wifi_densepose.client.ha import (
|
||||
parse_discovery_payload,
|
||||
parse_discovery_topic,
|
||||
)
|
||||
|
||||
|
||||
# Real discovery payloads pulled from ADR-115 §3 (formatted for test
|
||||
# readability; payloads are otherwise verbatim).
|
||||
_PRESENCE_TOPIC = "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/config"
|
||||
_PRESENCE_BODY = {
|
||||
"name": "Presence",
|
||||
"unique_id": "wifi_densepose_aabbccddeeff_presence",
|
||||
"object_id": "wifi_densepose_aabbccddeeff_presence",
|
||||
"state_topic": "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state",
|
||||
"availability_topic": "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/availability",
|
||||
"device_class": "occupancy",
|
||||
"icon": "mdi:motion-sensor",
|
||||
}
|
||||
|
||||
_HEART_RATE_TOPIC = "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/config"
|
||||
_HEART_RATE_BODY = {
|
||||
"name": "Heart rate",
|
||||
"unique_id": "wifi_densepose_aabbccddeeff_heart_rate",
|
||||
"state_topic": "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/state",
|
||||
"state_class": "measurement",
|
||||
"unit_of_measurement": "bpm",
|
||||
"icon": "mdi:heart-pulse",
|
||||
"json_attributes_topic": "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/state",
|
||||
}
|
||||
|
||||
|
||||
# ─── Topic parsing ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_parse_discovery_topic_binary_sensor() -> None:
|
||||
out = parse_discovery_topic(_PRESENCE_TOPIC)
|
||||
assert out == ("binary_sensor", "aabbccddeeff", "presence")
|
||||
|
||||
|
||||
def test_parse_discovery_topic_sensor() -> None:
|
||||
out = parse_discovery_topic(_HEART_RATE_TOPIC)
|
||||
assert out == ("sensor", "aabbccddeeff", "heart_rate")
|
||||
|
||||
|
||||
def test_parse_discovery_topic_event() -> None:
|
||||
out = parse_discovery_topic(
|
||||
"homeassistant/event/wifi_densepose_aabbccddeeff/fall/config"
|
||||
)
|
||||
assert out == ("event", "aabbccddeeff", "fall")
|
||||
|
||||
|
||||
def test_parse_discovery_topic_returns_none_for_non_discovery() -> None:
|
||||
assert parse_discovery_topic("homeassistant/binary_sensor/foo/state") is None
|
||||
assert parse_discovery_topic("ruview/aabbccddeeff/raw/edge_vitals") is None
|
||||
assert parse_discovery_topic("") is None
|
||||
|
||||
|
||||
# ─── Payload parsing ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_parse_discovery_payload_from_dict() -> None:
|
||||
out = parse_discovery_payload(_PRESENCE_TOPIC, _PRESENCE_BODY)
|
||||
assert out is not None
|
||||
assert out.entity_kind == "binary_sensor"
|
||||
assert out.node_id == "aabbccddeeff"
|
||||
assert out.object_id == "presence"
|
||||
assert out.payload["device_class"] == "occupancy"
|
||||
|
||||
|
||||
def test_parse_discovery_payload_from_bytes() -> None:
|
||||
raw = json.dumps(_PRESENCE_BODY).encode("utf-8")
|
||||
out = parse_discovery_payload(_PRESENCE_TOPIC, raw)
|
||||
assert out is not None
|
||||
assert out.payload["unique_id"] == "wifi_densepose_aabbccddeeff_presence"
|
||||
|
||||
|
||||
def test_parse_discovery_payload_from_string() -> None:
|
||||
raw = json.dumps(_PRESENCE_BODY)
|
||||
out = parse_discovery_payload(_PRESENCE_TOPIC, raw)
|
||||
assert out is not None
|
||||
assert out.entity_kind == "binary_sensor"
|
||||
|
||||
|
||||
def test_parse_discovery_payload_rejects_malformed_json() -> None:
|
||||
assert parse_discovery_payload(_PRESENCE_TOPIC, "{ broken: json") is None
|
||||
|
||||
|
||||
def test_parse_discovery_payload_rejects_non_object_root() -> None:
|
||||
assert parse_discovery_payload(_PRESENCE_TOPIC, "[1, 2, 3]") is None
|
||||
|
||||
|
||||
def test_parse_discovery_payload_returns_none_for_non_discovery_topic() -> None:
|
||||
assert parse_discovery_payload(
|
||||
"ruview/aabbccddeeff/raw/edge_vitals",
|
||||
_PRESENCE_BODY,
|
||||
) is None
|
||||
|
||||
|
||||
# ─── HaEntity projection ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_ha_entity_from_payload_extracts_fields() -> None:
|
||||
p = HaDiscoveryPayload(
|
||||
entity_kind="sensor",
|
||||
node_id="aabbccddeeff",
|
||||
object_id="heart_rate",
|
||||
payload=_HEART_RATE_BODY,
|
||||
)
|
||||
e = HaEntity.from_payload(p)
|
||||
assert e.entity_kind == "sensor"
|
||||
assert e.unique_id == "wifi_densepose_aabbccddeeff_heart_rate"
|
||||
assert e.unit_of_measurement == "bpm"
|
||||
assert e.icon == "mdi:heart-pulse"
|
||||
assert e.json_attributes_topic == _HEART_RATE_BODY["json_attributes_topic"]
|
||||
|
||||
|
||||
def test_ha_entity_handles_missing_optional_fields() -> None:
|
||||
p = HaDiscoveryPayload(
|
||||
entity_kind="event",
|
||||
node_id="aabbccddeeff",
|
||||
object_id="bed_exit",
|
||||
payload={"unique_id": "wifi_densepose_aabbccddeeff_bed_exit"},
|
||||
)
|
||||
e = HaEntity.from_payload(p)
|
||||
assert e.unique_id == "wifi_densepose_aabbccddeeff_bed_exit"
|
||||
assert e.device_class == ""
|
||||
assert e.unit_of_measurement == ""
|
||||
|
||||
|
||||
# ─── HABlueprintHelper aggregation ───────────────────────────────────
|
||||
|
||||
|
||||
def _populated_helper() -> HABlueprintHelper:
|
||||
h = HABlueprintHelper()
|
||||
h.add_payload(_PRESENCE_TOPIC, _PRESENCE_BODY)
|
||||
h.add_payload(_HEART_RATE_TOPIC, _HEART_RATE_BODY)
|
||||
# Same fields but a different node
|
||||
h.add_payload(
|
||||
"homeassistant/binary_sensor/wifi_densepose_ff00ff00ff00/presence/config",
|
||||
{**_PRESENCE_BODY, "unique_id": "wifi_densepose_ff00ff00ff00_presence"},
|
||||
)
|
||||
return h
|
||||
|
||||
|
||||
def test_helper_starts_empty() -> None:
|
||||
h = HABlueprintHelper()
|
||||
assert len(h) == 0
|
||||
assert h.nodes() == []
|
||||
assert h.all_payloads() == []
|
||||
|
||||
|
||||
def test_helper_aggregates_multiple_payloads() -> None:
|
||||
h = _populated_helper()
|
||||
assert len(h) == 3
|
||||
assert h.nodes() == ["aabbccddeeff", "ff00ff00ff00"]
|
||||
|
||||
|
||||
def test_helper_entities_for_node() -> None:
|
||||
h = _populated_helper()
|
||||
entities = h.entities_for_node("aabbccddeeff")
|
||||
object_ids = sorted(e.object_id for e in entities)
|
||||
assert object_ids == ["heart_rate", "presence"]
|
||||
|
||||
|
||||
def test_helper_by_device_class() -> None:
|
||||
h = _populated_helper()
|
||||
occupancy_entities = h.by_device_class("occupancy")
|
||||
assert len(occupancy_entities) == 2 # presence on both nodes
|
||||
assert {e.node_id for e in occupancy_entities} == {"aabbccddeeff", "ff00ff00ff00"}
|
||||
|
||||
|
||||
def test_helper_remove() -> None:
|
||||
h = _populated_helper()
|
||||
assert h.remove("aabbccddeeff", "binary_sensor", "presence") is True
|
||||
assert h.remove("aabbccddeeff", "binary_sensor", "presence") is False # no-op
|
||||
assert len(h) == 2
|
||||
|
||||
|
||||
def test_helper_rejects_non_discovery_topics() -> None:
|
||||
h = HABlueprintHelper()
|
||||
ok = h.add_payload("ruview/aabbccddeeff/raw/edge_vitals", _PRESENCE_BODY)
|
||||
assert ok is False
|
||||
assert len(h) == 0
|
||||
|
||||
|
||||
def test_helper_in_operator() -> None:
|
||||
h = _populated_helper()
|
||||
assert ("aabbccddeeff", "binary_sensor", "presence") in h
|
||||
assert ("nonexistent", "binary_sensor", "presence") not in h
|
||||
@@ -0,0 +1,208 @@
|
||||
"""ADR-117 P4 — Tests for RuViewMqttClient.
|
||||
|
||||
These tests do NOT bring up a broker — they exercise:
|
||||
|
||||
1. Topic-wildcard matching (`_topic_matches`)
|
||||
2. Client construction + handler registration
|
||||
3. The callback path by directly invoking the paho callback methods
|
||||
with synthesized messages
|
||||
|
||||
End-to-end broker integration is a P4-followon item (the mosquitto
|
||||
patterns from memory [[feedback_mqtt_integration_test_patterns]] go
|
||||
there). This file keeps unit coverage tight without requiring a
|
||||
broker on every CI run.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from wifi_densepose.client import RuViewMqttClient
|
||||
from wifi_densepose.client.mqtt import _topic_matches
|
||||
|
||||
|
||||
# ─── Topic wildcard matcher ──────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.parametrize("pattern,topic,expected", [
|
||||
("ruview/+/raw/edge_vitals", "ruview/aabb/raw/edge_vitals", True),
|
||||
("ruview/+/raw/edge_vitals", "ruview/aabb/cooked/edge_vitals", False),
|
||||
("ruview/+/raw/+", "ruview/aabb/raw/pose", True),
|
||||
("ruview/+/raw/+", "ruview/aabb/raw/pose/extra", False),
|
||||
# Per MQTT v5 §4.7.1.2: `+` is a whole-level wildcard only — mid-
|
||||
# segment `+` is a literal `+` character, not a wildcard. The
|
||||
# spec-correct way to wildcard the third segment of the HA
|
||||
# discovery topic is `homeassistant/+/+/+/config`.
|
||||
("homeassistant/+/+/+/config",
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/presence/config", True),
|
||||
# `wifi_densepose_+` is therefore NOT a wildcard — it matches the
|
||||
# literal string only. Asserting that behaviour stays stable.
|
||||
("homeassistant/+/wifi_densepose_+/+/config",
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/presence/config", False),
|
||||
("ruview/#", "ruview/aabb/raw/edge_vitals", True),
|
||||
# Per MQTT v5 §4.7.1.2: `<prefix>/#` ALSO matches the bare
|
||||
# `<prefix>` itself (it represents "this topic and all sub-topics").
|
||||
("ruview/#", "ruview", True),
|
||||
("ruview/+/raw/#", "ruview/aabb/raw/pose/extra", True),
|
||||
("exact/topic", "exact/topic", True),
|
||||
("exact/topic", "exact/topic/extra", False),
|
||||
("a/b/c", "a/b", False),
|
||||
])
|
||||
def test_topic_matches(pattern: str, topic: str, expected: bool) -> None:
|
||||
assert _topic_matches(pattern, topic) is expected
|
||||
|
||||
|
||||
# ─── RuViewMqttClient construction ──────────────────────────────────
|
||||
|
||||
|
||||
def test_client_constructs_with_defaults() -> None:
|
||||
c = RuViewMqttClient()
|
||||
assert c.broker_host == "localhost"
|
||||
assert c.broker_port == 1883
|
||||
assert c.connected is False
|
||||
assert c.client_id.startswith("wifi-densepose-client-")
|
||||
|
||||
|
||||
def test_client_unique_client_id_per_instance() -> None:
|
||||
"""Per the rumqttc memory lesson — each instance needs a unique
|
||||
client_id so parallel tests don't kick each other off the broker."""
|
||||
c1 = RuViewMqttClient()
|
||||
c2 = RuViewMqttClient()
|
||||
assert c1.client_id != c2.client_id
|
||||
|
||||
|
||||
def test_client_accepts_explicit_client_id() -> None:
|
||||
c = RuViewMqttClient(client_id="explicit-id")
|
||||
assert c.client_id == "explicit-id"
|
||||
|
||||
|
||||
# ─── Handler registration ────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_handler_registration_stores_callback() -> None:
|
||||
c = RuViewMqttClient()
|
||||
seen: list[Any] = []
|
||||
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: seen.append((t, p)))
|
||||
# Internal state — we're allowed to inspect since the handler
|
||||
# path needs to be unit-testable without a broker.
|
||||
assert "ruview/+/raw/edge_vitals" in c._handlers
|
||||
|
||||
|
||||
def test_handler_unregister_drops_callback() -> None:
|
||||
c = RuViewMqttClient()
|
||||
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: None)
|
||||
c.unsubscribe_handler("ruview/+/raw/edge_vitals")
|
||||
assert "ruview/+/raw/edge_vitals" not in c._handlers
|
||||
|
||||
|
||||
# ─── Callback dispatch (synthesized) ─────────────────────────────────
|
||||
|
||||
|
||||
def _fake_message(topic: str, body: Any) -> Any:
|
||||
"""Synthesize a paho-mqtt MQTTMessage-ish object."""
|
||||
if isinstance(body, (dict, list)):
|
||||
payload_bytes = json.dumps(body).encode("utf-8")
|
||||
elif isinstance(body, bytes):
|
||||
payload_bytes = body
|
||||
else:
|
||||
payload_bytes = str(body).encode("utf-8")
|
||||
return SimpleNamespace(topic=topic, payload=payload_bytes)
|
||||
|
||||
|
||||
def test_message_dispatch_to_matching_handler() -> None:
|
||||
c = RuViewMqttClient()
|
||||
received: list[tuple[str, Any]] = []
|
||||
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: received.append((t, p)))
|
||||
|
||||
msg = _fake_message(
|
||||
"ruview/aabbccddeeff/raw/edge_vitals",
|
||||
{"breathing_rate_bpm": 14.0, "heartrate_bpm": 72.0, "presence": True},
|
||||
)
|
||||
c._on_message(None, None, msg)
|
||||
|
||||
assert len(received) == 1
|
||||
topic, payload = received[0]
|
||||
assert topic == "ruview/aabbccddeeff/raw/edge_vitals"
|
||||
assert payload["breathing_rate_bpm"] == 14.0
|
||||
|
||||
|
||||
def test_message_dispatch_ignores_non_matching_topic() -> None:
|
||||
c = RuViewMqttClient()
|
||||
received: list[Any] = []
|
||||
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: received.append(p))
|
||||
|
||||
msg = _fake_message("ruview/aabb/raw/pose", {"persons": []})
|
||||
c._on_message(None, None, msg)
|
||||
|
||||
assert received == []
|
||||
|
||||
|
||||
def test_message_dispatch_falls_back_to_bytes_on_non_json() -> None:
|
||||
c = RuViewMqttClient()
|
||||
received: list[Any] = []
|
||||
c.on_message("custom/binary/+", lambda t, p: received.append(p))
|
||||
|
||||
msg = _fake_message("custom/binary/data", b"\x00\x01\x02not-json")
|
||||
c._on_message(None, None, msg)
|
||||
|
||||
assert received == [b"\x00\x01\x02not-json"]
|
||||
|
||||
|
||||
def test_handler_exception_does_not_propagate() -> None:
|
||||
"""A misbehaving user callback must not crash the paho network
|
||||
loop — exceptions are caught and logged."""
|
||||
c = RuViewMqttClient()
|
||||
seen_after_crash: list[Any] = []
|
||||
|
||||
def crashing(_topic: str, _p: Any) -> None:
|
||||
raise RuntimeError("simulated callback crash")
|
||||
|
||||
c.on_message("crashy/topic", crashing)
|
||||
c.on_message("safe/topic", lambda t, p: seen_after_crash.append(p))
|
||||
|
||||
# First, the crashing handler — must NOT raise out of _on_message.
|
||||
c._on_message(None, None, _fake_message("crashy/topic", "anything"))
|
||||
# Then the safe handler — must still fire on a subsequent message.
|
||||
c._on_message(None, None, _fake_message("safe/topic", {"x": 1}))
|
||||
assert seen_after_crash == [{"x": 1}]
|
||||
|
||||
|
||||
def test_multiple_handlers_for_overlapping_patterns_all_fire() -> None:
|
||||
c = RuViewMqttClient()
|
||||
a_received: list[Any] = []
|
||||
b_received: list[Any] = []
|
||||
c.on_message("ruview/+/raw/+", lambda t, p: a_received.append(p))
|
||||
c.on_message("ruview/aabb/raw/edge_vitals", lambda t, p: b_received.append(p))
|
||||
|
||||
msg = _fake_message("ruview/aabb/raw/edge_vitals", {"presence": True})
|
||||
c._on_message(None, None, msg)
|
||||
|
||||
assert len(a_received) == 1
|
||||
assert len(b_received) == 1
|
||||
|
||||
|
||||
# ─── on_connect path ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_on_connect_sets_event_and_subscribes() -> None:
|
||||
c = RuViewMqttClient()
|
||||
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: None)
|
||||
|
||||
# Stub the paho client so we can capture subscribe() calls.
|
||||
subscribed: list[str] = []
|
||||
stub = SimpleNamespace(subscribe=lambda pattern: subscribed.append(pattern))
|
||||
|
||||
c._on_connect(stub, None, None, 0)
|
||||
assert c.connected is True
|
||||
assert subscribed == ["ruview/+/raw/edge_vitals"]
|
||||
|
||||
|
||||
def test_on_connect_with_nonzero_rc_does_not_set_connected() -> None:
|
||||
c = RuViewMqttClient()
|
||||
stub = SimpleNamespace(subscribe=lambda pattern: None)
|
||||
c._on_connect(stub, None, None, 5) # CONNACK fail
|
||||
assert c.connected is False
|
||||
@@ -0,0 +1,180 @@
|
||||
"""ADR-117 P4 — Tests for the HA-MIND semantic primitive listener.
|
||||
|
||||
Pure routing tests — no MQTT broker needed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
from wifi_densepose.client import (
|
||||
SemanticPrimitive,
|
||||
SemanticPrimitiveEvent,
|
||||
SemanticPrimitiveListener,
|
||||
)
|
||||
|
||||
|
||||
# ─── SemanticPrimitive enum ──────────────────────────────────────────
|
||||
|
||||
|
||||
def test_enum_covers_all_10_v1_primitives() -> None:
|
||||
expected = {
|
||||
"someone_sleeping",
|
||||
"possible_distress",
|
||||
"room_active",
|
||||
"elderly_inactivity",
|
||||
"meeting_in_progress",
|
||||
"bathroom_occupied",
|
||||
"fall_risk_elevated",
|
||||
"bed_exit",
|
||||
"no_movement_safety",
|
||||
"multi_room_transition",
|
||||
}
|
||||
actual = {p.value for p in SemanticPrimitive}
|
||||
assert actual == expected
|
||||
|
||||
|
||||
def test_enum_from_object_id_round_trips() -> None:
|
||||
for p in SemanticPrimitive:
|
||||
assert SemanticPrimitive.from_object_id(p.value) is p
|
||||
|
||||
|
||||
def test_enum_from_object_id_returns_none_for_unknown() -> None:
|
||||
assert SemanticPrimitive.from_object_id("garbage") is None
|
||||
|
||||
|
||||
# ─── Listener routing ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_listener_dispatches_to_specific_handler() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
received: list[SemanticPrimitiveEvent] = []
|
||||
listener.on(SemanticPrimitive.SomeoneSleeping, received.append)
|
||||
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/someone_sleeping/state",
|
||||
json.dumps({"state": "ON", "confidence": 0.92, "explanation": ["motion<5%"]}),
|
||||
)
|
||||
assert evt is not None
|
||||
assert evt.kind is SemanticPrimitive.SomeoneSleeping
|
||||
assert evt.node_id == "aabb"
|
||||
assert evt.state == "ON"
|
||||
assert evt.confidence == 0.92
|
||||
assert evt.explanation == ("motion<5%",)
|
||||
assert len(received) == 1
|
||||
assert received[0] is evt
|
||||
|
||||
|
||||
def test_listener_on_any_fires_for_every_primitive() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
seen: list[SemanticPrimitiveEvent] = []
|
||||
listener.on_any(seen.append)
|
||||
|
||||
listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state",
|
||||
json.dumps({"state": "ON"}),
|
||||
)
|
||||
listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/bathroom_occupied/state",
|
||||
json.dumps({"state": "OFF"}),
|
||||
)
|
||||
assert len(seen) == 2
|
||||
assert seen[0].kind is SemanticPrimitive.RoomActive
|
||||
assert seen[1].kind is SemanticPrimitive.BathroomOccupied
|
||||
|
||||
|
||||
def test_listener_specific_handler_does_not_fire_for_other_primitives() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
received: list[SemanticPrimitiveEvent] = []
|
||||
listener.on(SemanticPrimitive.PossibleDistress, received.append)
|
||||
|
||||
listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/someone_sleeping/state",
|
||||
json.dumps({"state": "ON"}),
|
||||
)
|
||||
assert received == []
|
||||
|
||||
|
||||
def test_listener_decodes_plain_state_string() -> None:
|
||||
"""HA convention: binary_sensors that don't carry attributes emit
|
||||
plain strings ('ON' / 'OFF'). We must accept that too."""
|
||||
listener = SemanticPrimitiveListener()
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state",
|
||||
"ON",
|
||||
)
|
||||
assert evt is not None
|
||||
assert evt.state == "ON"
|
||||
assert evt.confidence == 0.0 # not provided in plain string
|
||||
assert evt.explanation == ()
|
||||
|
||||
|
||||
def test_listener_decodes_numeric_sensor_state() -> None:
|
||||
"""fall_risk_elevated is a 0–100 sensor — verify numeric string."""
|
||||
listener = SemanticPrimitiveListener()
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/sensor/wifi_densepose_aabb/fall_risk_elevated/state",
|
||||
"73",
|
||||
)
|
||||
assert evt is not None
|
||||
assert evt.kind is SemanticPrimitive.FallRiskElevated
|
||||
assert evt.state == "73"
|
||||
|
||||
|
||||
def test_listener_decodes_bytes_payload() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state",
|
||||
b"ON",
|
||||
)
|
||||
assert evt is not None
|
||||
assert evt.state == "ON"
|
||||
|
||||
|
||||
def test_listener_ignores_non_state_topics() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
assert listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/config",
|
||||
json.dumps({"name": "Room Active"}),
|
||||
) is None
|
||||
|
||||
|
||||
def test_listener_ignores_unknown_slug() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
assert listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/unknown_primitive/state",
|
||||
"ON",
|
||||
) is None
|
||||
|
||||
|
||||
def test_listener_ignores_non_wifi_densepose_node() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
# third segment doesn't start with wifi_densepose_
|
||||
assert listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/aqara_fp2/room_active/state",
|
||||
"ON",
|
||||
) is None
|
||||
|
||||
|
||||
def test_listener_explanation_string_is_normalised_to_tuple() -> None:
|
||||
"""Producers may send `explanation` as a single string by mistake;
|
||||
accept that and wrap in a 1-tuple so downstream code can iterate
|
||||
uniformly."""
|
||||
listener = SemanticPrimitiveListener()
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabb/possible_distress/state",
|
||||
json.dumps({"state": "ON", "explanation": "HR=120 baseline=80"}),
|
||||
)
|
||||
assert evt is not None
|
||||
assert evt.explanation == ("HR=120 baseline=80",)
|
||||
|
||||
|
||||
def test_event_is_frozen() -> None:
|
||||
evt = SemanticPrimitiveEvent(
|
||||
kind=SemanticPrimitive.SomeoneSleeping,
|
||||
node_id="aabb",
|
||||
state="ON",
|
||||
)
|
||||
import pytest
|
||||
with pytest.raises((AttributeError, Exception)): # FrozenInstanceError subclass
|
||||
evt.state = "OFF" # type: ignore[misc]
|
||||
@@ -0,0 +1,195 @@
|
||||
"""ADR-117 P4 — End-to-end test for SensingClient against an in-process
|
||||
WS server.
|
||||
|
||||
We spin up a real `websockets.serve()` server in the same event loop,
|
||||
send the four message types defined in ADR-115 §1, and assert the
|
||||
client decodes them into the right dataclasses. No mocks — the only
|
||||
moving part this test does NOT exercise is the actual sensing-server
|
||||
binary, but the wire protocol is the contract under test here.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
import websockets
|
||||
|
||||
from wifi_densepose.client import (
|
||||
ConnectionEstablishedMessage,
|
||||
EdgeVitalsMessage,
|
||||
PoseDataMessage,
|
||||
SensingClient,
|
||||
SensingMessage,
|
||||
)
|
||||
|
||||
|
||||
# ─── In-process WS server fixture ────────────────────────────────────
|
||||
|
||||
|
||||
_FIXTURE_MESSAGES = [
|
||||
{
|
||||
"type": "connection_established",
|
||||
"node_id": "test-node-001",
|
||||
"version": "0.7.4",
|
||||
"capabilities": ["edge_vitals", "pose_data"],
|
||||
},
|
||||
{
|
||||
"type": "edge_vitals",
|
||||
"node_id": "test-node-001",
|
||||
"presence": True,
|
||||
"fall_detected": False,
|
||||
"motion": 0.21,
|
||||
"breathing_rate_bpm": 14.5,
|
||||
"heartrate_bpm": 72.3,
|
||||
"n_persons": 1,
|
||||
"motion_energy": 0.034,
|
||||
"presence_score": 0.91,
|
||||
"rssi": -42.0,
|
||||
},
|
||||
{
|
||||
"type": "pose_data",
|
||||
"node_id": "test-node-001",
|
||||
"timestamp": 1700000000.5,
|
||||
"persons": [{"id": 1, "keypoints": []}],
|
||||
"confidence": 0.88,
|
||||
},
|
||||
# Unknown type — should NOT crash the stream; should yield a plain
|
||||
# SensingMessage.
|
||||
{
|
||||
"type": "future_message_type_not_yet_modelled",
|
||||
"extra": "data",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
async def _handler(websocket: Any) -> None:
|
||||
for msg in _FIXTURE_MESSAGES:
|
||||
await websocket.send(json.dumps(msg))
|
||||
# Send one malformed frame to assert the client logs+drops it
|
||||
# rather than crashing the stream.
|
||||
await websocket.send("{not valid json")
|
||||
# And one final "real" message so the test can confirm the stream
|
||||
# survived the malformed one.
|
||||
await websocket.send(json.dumps({"type": "edge_vitals", "node_id": "post-bad-frame"}))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def ws_server() -> Any:
|
||||
"""Start a websocket server on a random port; yield the bound URL."""
|
||||
server = await websockets.serve(_handler, "127.0.0.1", 0)
|
||||
# Get the bound port (host="127.0.0.1" returns one socket).
|
||||
port = server.sockets[0].getsockname()[1] # type: ignore[union-attr]
|
||||
try:
|
||||
yield f"ws://127.0.0.1:{port}/ws/sensing"
|
||||
finally:
|
||||
server.close()
|
||||
await server.wait_closed()
|
||||
|
||||
|
||||
# ─── End-to-end stream test ──────────────────────────────────────────
|
||||
|
||||
|
||||
async def test_sensing_client_decodes_all_message_types(ws_server: str) -> None:
|
||||
received: list[SensingMessage] = []
|
||||
async with SensingClient(ws_server) as client:
|
||||
async for msg in client.stream():
|
||||
received.append(msg)
|
||||
if len(received) >= len(_FIXTURE_MESSAGES) + 1: # +1 for post-bad-frame
|
||||
break
|
||||
|
||||
# connection_established → typed
|
||||
assert isinstance(received[0], ConnectionEstablishedMessage)
|
||||
assert received[0].node_id == "test-node-001"
|
||||
assert received[0].version == "0.7.4"
|
||||
assert "edge_vitals" in received[0].capabilities
|
||||
|
||||
# edge_vitals → typed with full fields
|
||||
assert isinstance(received[1], EdgeVitalsMessage)
|
||||
assert received[1].presence is True
|
||||
assert received[1].fall_detected is False
|
||||
assert received[1].breathing_rate_bpm == 14.5
|
||||
assert received[1].heartrate_bpm == 72.3
|
||||
assert received[1].n_persons == 1
|
||||
assert received[1].rssi == -42.0
|
||||
|
||||
# pose_data → typed
|
||||
assert isinstance(received[2], PoseDataMessage)
|
||||
assert received[2].timestamp == 1700000000.5
|
||||
assert len(received[2].persons) == 1
|
||||
assert received[2].confidence == 0.88
|
||||
|
||||
# Unknown type → plain SensingMessage (forward-compat)
|
||||
assert type(received[3]) is SensingMessage # exact base class
|
||||
assert received[3].type == "future_message_type_not_yet_modelled"
|
||||
assert received[3].raw["extra"] == "data"
|
||||
|
||||
# After the malformed frame: the stream should have survived and
|
||||
# yielded the post-bad-frame message.
|
||||
assert isinstance(received[4], EdgeVitalsMessage)
|
||||
assert received[4].node_id == "post-bad-frame"
|
||||
|
||||
|
||||
async def test_sensing_client_recv_one(ws_server: str) -> None:
|
||||
async with SensingClient(ws_server) as client:
|
||||
msg = await client.recv_one(timeout=2.0)
|
||||
assert isinstance(msg, ConnectionEstablishedMessage)
|
||||
|
||||
|
||||
async def test_sensing_client_raises_when_used_without_context() -> None:
|
||||
client = SensingClient("ws://127.0.0.1:1/") # never connects
|
||||
with pytest.raises(RuntimeError, match="not connected"):
|
||||
await client.recv_one(timeout=0.1)
|
||||
with pytest.raises(RuntimeError, match="not connected"):
|
||||
async for _ in client.stream():
|
||||
pass
|
||||
|
||||
|
||||
async def test_sensing_client_close_is_idempotent(ws_server: str) -> None:
|
||||
client = SensingClient(ws_server)
|
||||
await client.__aenter__()
|
||||
await client.close()
|
||||
await client.close() # second close is a no-op
|
||||
|
||||
|
||||
def test_sensing_client_decoder_directly() -> None:
|
||||
"""The decoder is pure — exercise it without bringing up a WS
|
||||
server, so we have a fast unit test for the type mapping."""
|
||||
from wifi_densepose.client.ws import _decode
|
||||
|
||||
msg = _decode(json.dumps({
|
||||
"type": "edge_vitals",
|
||||
"node_id": "x",
|
||||
"presence": True,
|
||||
"fall_detected": False,
|
||||
"motion": 1.5,
|
||||
}))
|
||||
assert isinstance(msg, EdgeVitalsMessage)
|
||||
assert msg.presence is True
|
||||
assert msg.motion == 1.5
|
||||
assert msg.breathing_rate_bpm is None # not present → None, not 0.0
|
||||
assert msg.heartrate_bpm is None
|
||||
assert msg.rssi is None
|
||||
|
||||
|
||||
def test_sensing_client_decoder_handles_None_subfields() -> None:
|
||||
"""When the sensing-server explicitly emits null for HR/BR (no
|
||||
measurement yet), the client should propagate None, not crash."""
|
||||
from wifi_densepose.client.ws import _decode
|
||||
|
||||
msg = _decode(json.dumps({
|
||||
"type": "edge_vitals",
|
||||
"node_id": "x",
|
||||
"presence": False,
|
||||
"fall_detected": False,
|
||||
"motion": 0.0,
|
||||
"breathing_rate_bpm": None,
|
||||
"heartrate_bpm": None,
|
||||
"rssi": None,
|
||||
}))
|
||||
assert isinstance(msg, EdgeVitalsMessage)
|
||||
assert msg.breathing_rate_bpm is None
|
||||
assert msg.heartrate_bpm is None
|
||||
assert msg.rssi is None
|
||||
@@ -0,0 +1,200 @@
|
||||
"""ADR-117 P2 tests — Keypoint + KeypointType binding round-trips.
|
||||
|
||||
Run with: cd python && .venv/Scripts/python -m pytest tests/test_keypoint.py -v
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from wifi_densepose import Keypoint, KeypointType
|
||||
|
||||
|
||||
# ─── KeypointType ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_keypoint_type_all_returns_17() -> None:
|
||||
"""COCO standard defines exactly 17 keypoints."""
|
||||
assert len(KeypointType.all()) == 17
|
||||
|
||||
|
||||
def test_keypoint_type_index_matches_coco_ordering() -> None:
|
||||
"""Indexes 0..16 match the COCO canonical ordering."""
|
||||
expected = [
|
||||
(KeypointType.Nose, 0),
|
||||
(KeypointType.LeftEye, 1),
|
||||
(KeypointType.RightEye, 2),
|
||||
(KeypointType.LeftEar, 3),
|
||||
(KeypointType.RightEar, 4),
|
||||
(KeypointType.LeftShoulder, 5),
|
||||
(KeypointType.RightShoulder, 6),
|
||||
(KeypointType.LeftElbow, 7),
|
||||
(KeypointType.RightElbow, 8),
|
||||
(KeypointType.LeftWrist, 9),
|
||||
(KeypointType.RightWrist, 10),
|
||||
(KeypointType.LeftHip, 11),
|
||||
(KeypointType.RightHip, 12),
|
||||
(KeypointType.LeftKnee, 13),
|
||||
(KeypointType.RightKnee, 14),
|
||||
(KeypointType.LeftAnkle, 15),
|
||||
(KeypointType.RightAnkle, 16),
|
||||
]
|
||||
for kp, idx in expected:
|
||||
assert kp.index == idx, f"{kp} expected index {idx} got {kp.index}"
|
||||
|
||||
|
||||
def test_keypoint_type_snake_name() -> None:
|
||||
"""snake_name follows COCO convention."""
|
||||
assert KeypointType.Nose.snake_name == "nose"
|
||||
assert KeypointType.LeftShoulder.snake_name == "left_shoulder"
|
||||
assert KeypointType.RightAnkle.snake_name == "right_ankle"
|
||||
|
||||
|
||||
def test_keypoint_type_is_face() -> None:
|
||||
"""is_face() matches the 5 facial keypoints."""
|
||||
face = {
|
||||
KeypointType.Nose,
|
||||
KeypointType.LeftEye,
|
||||
KeypointType.RightEye,
|
||||
KeypointType.LeftEar,
|
||||
KeypointType.RightEar,
|
||||
}
|
||||
for kp in KeypointType.all():
|
||||
assert kp.is_face() == (kp in face)
|
||||
|
||||
|
||||
def test_keypoint_type_is_upper_body() -> None:
|
||||
"""is_upper_body() catches shoulders, elbows, wrists."""
|
||||
assert KeypointType.LeftShoulder.is_upper_body()
|
||||
assert KeypointType.RightShoulder.is_upper_body()
|
||||
assert KeypointType.LeftElbow.is_upper_body()
|
||||
assert KeypointType.LeftWrist.is_upper_body()
|
||||
assert not KeypointType.LeftHip.is_upper_body()
|
||||
|
||||
|
||||
def test_keypoint_type_eq() -> None:
|
||||
"""Equality + identity work across calls."""
|
||||
assert KeypointType.Nose == KeypointType.Nose
|
||||
assert KeypointType.Nose != KeypointType.LeftEye
|
||||
|
||||
|
||||
def test_keypoint_type_repr() -> None:
|
||||
"""repr is a useful Python expression."""
|
||||
assert repr(KeypointType.Nose) == "KeypointType.Nose"
|
||||
assert repr(KeypointType.LeftWrist) == "KeypointType.LeftWrist"
|
||||
|
||||
|
||||
# ─── Keypoint ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_keypoint_2d_construct() -> None:
|
||||
"""Default 2D keypoint."""
|
||||
kp = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
assert kp.x == pytest.approx(0.5)
|
||||
assert kp.y == pytest.approx(0.3)
|
||||
assert kp.z is None
|
||||
assert kp.confidence == pytest.approx(0.95)
|
||||
assert kp.keypoint_type == KeypointType.Nose
|
||||
assert kp.is_visible
|
||||
|
||||
|
||||
def test_keypoint_3d_construct() -> None:
|
||||
"""3D keypoint with kwarg z."""
|
||||
kp = Keypoint(KeypointType.LeftWrist, 0.2, 0.4, 0.8, z=0.1)
|
||||
assert kp.position_3d == pytest.approx((0.2, 0.4, 0.1))
|
||||
assert kp.z == pytest.approx(0.1)
|
||||
|
||||
|
||||
def test_keypoint_position_2d_tuple() -> None:
|
||||
kp = Keypoint(KeypointType.RightHip, 0.6, 0.7, 0.99)
|
||||
assert kp.position_2d == pytest.approx((0.6, 0.7))
|
||||
|
||||
|
||||
def test_keypoint_position_3d_none_for_2d() -> None:
|
||||
"""2D keypoints return None for position_3d, not a default z."""
|
||||
kp = Keypoint(KeypointType.Nose, 0.5, 0.5, 0.99)
|
||||
assert kp.position_3d is None
|
||||
|
||||
|
||||
def test_keypoint_is_visible_below_threshold() -> None:
|
||||
"""Confidence under 0.5 is NOT visible (default threshold)."""
|
||||
kp_low = Keypoint(KeypointType.Nose, 0.0, 0.0, 0.3)
|
||||
kp_high = Keypoint(KeypointType.Nose, 0.0, 0.0, 0.7)
|
||||
assert not kp_low.is_visible
|
||||
assert kp_high.is_visible
|
||||
|
||||
|
||||
def test_keypoint_confidence_validation_too_high() -> None:
|
||||
"""Confidence > 1.0 rejected."""
|
||||
with pytest.raises(ValueError, match="Confidence must be in"):
|
||||
Keypoint(KeypointType.Nose, 0.0, 0.0, 1.5)
|
||||
|
||||
|
||||
def test_keypoint_confidence_validation_negative() -> None:
|
||||
"""Negative confidence rejected."""
|
||||
with pytest.raises(ValueError, match="Confidence must be in"):
|
||||
Keypoint(KeypointType.Nose, 0.0, 0.0, -0.1)
|
||||
|
||||
|
||||
def test_keypoint_distance_2d() -> None:
|
||||
"""Euclidean distance in 2D."""
|
||||
a = Keypoint(KeypointType.Nose, 0.0, 0.0, 1.0)
|
||||
b = Keypoint(KeypointType.LeftEye, 3.0, 4.0, 1.0)
|
||||
assert a.distance_to(b) == pytest.approx(5.0)
|
||||
|
||||
|
||||
def test_keypoint_distance_3d() -> None:
|
||||
"""Euclidean distance in 3D when both have z."""
|
||||
a = Keypoint(KeypointType.Nose, 0.0, 0.0, 1.0, z=0.0)
|
||||
b = Keypoint(KeypointType.LeftEye, 1.0, 2.0, 1.0, z=2.0)
|
||||
# sqrt(1 + 4 + 4) = 3.0
|
||||
assert a.distance_to(b) == pytest.approx(3.0)
|
||||
|
||||
|
||||
def test_keypoint_distance_falls_back_to_2d_if_mixed() -> None:
|
||||
"""Mixing 2D and 3D keypoints uses 2D distance only."""
|
||||
a = Keypoint(KeypointType.Nose, 0.0, 0.0, 1.0) # 2D
|
||||
b = Keypoint(KeypointType.LeftEye, 3.0, 4.0, 1.0, z=99.0) # 3D
|
||||
# Should be 5.0 (2D distance), not include the z=99 term
|
||||
assert a.distance_to(b) == pytest.approx(5.0)
|
||||
|
||||
|
||||
def test_keypoint_repr_2d() -> None:
|
||||
kp = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
r = repr(kp)
|
||||
assert "KeypointType.Nose" in r
|
||||
assert "x=0.5" in r
|
||||
assert "y=0.3" in r
|
||||
assert "z" not in r # no z field for 2D
|
||||
|
||||
|
||||
def test_keypoint_repr_3d() -> None:
|
||||
kp = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95, z=0.1)
|
||||
r = repr(kp)
|
||||
assert "z=0.1" in r
|
||||
|
||||
|
||||
def test_keypoint_eq() -> None:
|
||||
"""Two keypoints with same fields compare equal."""
|
||||
a = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
b = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
assert a == b
|
||||
|
||||
|
||||
def test_keypoint_neq_different_type() -> None:
|
||||
a = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
b = Keypoint(KeypointType.LeftEye, 0.5, 0.3, 0.95)
|
||||
assert a != b
|
||||
|
||||
|
||||
def test_keypoint_neq_different_position() -> None:
|
||||
a = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
b = Keypoint(KeypointType.Nose, 0.6, 0.3, 0.95)
|
||||
assert a != b
|
||||
|
||||
|
||||
def test_build_features_marks_p2() -> None:
|
||||
"""The P2 marker is now in the wheel's feature list."""
|
||||
import wifi_densepose
|
||||
|
||||
assert "p2-keypoint-bindings" in wifi_densepose.__build_features__
|
||||
@@ -0,0 +1,248 @@
|
||||
"""ADR-117 P2 tests — BoundingBox + PersonPose + PoseEstimate bindings.
|
||||
|
||||
Run with: cd python && .venv/Scripts/python -m pytest tests/test_pose.py -v
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from wifi_densepose import (
|
||||
BoundingBox,
|
||||
Keypoint,
|
||||
KeypointType,
|
||||
PersonPose,
|
||||
PoseEstimate,
|
||||
)
|
||||
|
||||
|
||||
# ─── BoundingBox ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_bounding_box_construct() -> None:
|
||||
bb = BoundingBox(0.1, 0.2, 0.5, 0.7)
|
||||
assert bb.x_min == pytest.approx(0.1)
|
||||
assert bb.y_min == pytest.approx(0.2)
|
||||
assert bb.x_max == pytest.approx(0.5)
|
||||
assert bb.y_max == pytest.approx(0.7)
|
||||
|
||||
|
||||
def test_bounding_box_dimensions() -> None:
|
||||
bb = BoundingBox(0.0, 0.0, 4.0, 3.0)
|
||||
assert bb.width == pytest.approx(4.0)
|
||||
assert bb.height == pytest.approx(3.0)
|
||||
assert bb.area == pytest.approx(12.0)
|
||||
assert bb.center == pytest.approx((2.0, 1.5))
|
||||
|
||||
|
||||
def test_bounding_box_from_center() -> None:
|
||||
bb = BoundingBox.from_center(2.0, 3.0, 4.0, 6.0)
|
||||
assert bb.x_min == pytest.approx(0.0)
|
||||
assert bb.y_min == pytest.approx(0.0)
|
||||
assert bb.x_max == pytest.approx(4.0)
|
||||
assert bb.y_max == pytest.approx(6.0)
|
||||
|
||||
|
||||
def test_bounding_box_iou_no_overlap() -> None:
|
||||
a = BoundingBox(0.0, 0.0, 1.0, 1.0)
|
||||
b = BoundingBox(2.0, 2.0, 3.0, 3.0)
|
||||
assert a.iou(b) == pytest.approx(0.0)
|
||||
|
||||
|
||||
def test_bounding_box_iou_full_overlap() -> None:
|
||||
a = BoundingBox(0.0, 0.0, 1.0, 1.0)
|
||||
b = BoundingBox(0.0, 0.0, 1.0, 1.0)
|
||||
assert a.iou(b) == pytest.approx(1.0)
|
||||
|
||||
|
||||
def test_bounding_box_iou_partial() -> None:
|
||||
a = BoundingBox(0.0, 0.0, 10.0, 10.0)
|
||||
b = BoundingBox(5.0, 5.0, 15.0, 15.0)
|
||||
# intersection 25, union 175 → 1/7
|
||||
assert a.iou(b) == pytest.approx(25.0 / 175.0)
|
||||
|
||||
|
||||
def test_bounding_box_eq() -> None:
|
||||
assert BoundingBox(1, 2, 3, 4) == BoundingBox(1, 2, 3, 4)
|
||||
assert BoundingBox(1, 2, 3, 4) != BoundingBox(1, 2, 3, 5)
|
||||
|
||||
|
||||
def test_bounding_box_repr() -> None:
|
||||
bb = BoundingBox(0.1, 0.2, 0.5, 0.7)
|
||||
assert "BoundingBox" in repr(bb)
|
||||
assert "x_min=0.1" in repr(bb)
|
||||
|
||||
|
||||
# ─── PersonPose ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_person_pose_empty() -> None:
|
||||
p = PersonPose()
|
||||
assert p.id is None
|
||||
assert p.visible_keypoint_count == 0
|
||||
assert p.bounding_box is None
|
||||
assert p.confidence == 0.0
|
||||
|
||||
|
||||
def test_person_pose_set_get_keypoint() -> None:
|
||||
p = PersonPose()
|
||||
kp = Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95)
|
||||
p.set_keypoint(kp)
|
||||
got = p.get_keypoint(KeypointType.Nose)
|
||||
assert got is not None
|
||||
assert got.x == pytest.approx(0.5)
|
||||
assert got.confidence == pytest.approx(0.95)
|
||||
|
||||
|
||||
def test_person_pose_get_missing_returns_none() -> None:
|
||||
p = PersonPose()
|
||||
p.set_keypoint(Keypoint(KeypointType.Nose, 0.5, 0.3, 0.95))
|
||||
assert p.get_keypoint(KeypointType.LeftWrist) is None
|
||||
|
||||
|
||||
def test_person_pose_visible_count() -> None:
|
||||
p = PersonPose()
|
||||
p.set_keypoint(Keypoint(KeypointType.Nose, 0.0, 0.0, 0.9)) # visible
|
||||
p.set_keypoint(Keypoint(KeypointType.LeftEar, 0.0, 0.0, 0.2)) # invisible
|
||||
p.set_keypoint(Keypoint(KeypointType.RightEar, 0.0, 0.0, 0.8)) # visible
|
||||
assert p.visible_keypoint_count == 2
|
||||
|
||||
|
||||
def test_person_pose_visible_keypoints_list() -> None:
|
||||
p = PersonPose()
|
||||
p.set_keypoint(Keypoint(KeypointType.Nose, 0.0, 0.0, 0.9))
|
||||
p.set_keypoint(Keypoint(KeypointType.LeftEar, 0.0, 0.0, 0.2))
|
||||
vis = p.visible_keypoints()
|
||||
assert len(vis) == 1
|
||||
assert vis[0].keypoint_type == KeypointType.Nose
|
||||
|
||||
|
||||
def test_person_pose_keypoints_dict_excludes_missing() -> None:
|
||||
p = PersonPose()
|
||||
p.set_keypoint(Keypoint(KeypointType.Nose, 0.0, 0.0, 0.9))
|
||||
p.set_keypoint(Keypoint(KeypointType.LeftWrist, 0.5, 0.5, 0.6))
|
||||
d = p.keypoints()
|
||||
assert KeypointType.Nose in d
|
||||
assert KeypointType.LeftWrist in d
|
||||
assert KeypointType.RightAnkle not in d
|
||||
assert len(d) == 2
|
||||
|
||||
|
||||
def test_person_pose_set_id() -> None:
|
||||
p = PersonPose()
|
||||
p.set_id(7)
|
||||
assert p.id == 7
|
||||
|
||||
|
||||
def test_person_pose_set_bounding_box() -> None:
|
||||
p = PersonPose()
|
||||
bb = BoundingBox(0.1, 0.1, 0.5, 0.9)
|
||||
p.set_bounding_box(bb)
|
||||
assert p.bounding_box == bb
|
||||
|
||||
|
||||
def test_person_pose_compute_bbox_returns_none_when_empty() -> None:
|
||||
p = PersonPose()
|
||||
assert p.compute_bounding_box() is None
|
||||
|
||||
|
||||
def test_person_pose_compute_bbox_from_keypoints() -> None:
|
||||
p = PersonPose()
|
||||
p.set_keypoint(Keypoint(KeypointType.Nose, 0.0, 0.0, 0.95))
|
||||
p.set_keypoint(Keypoint(KeypointType.RightAnkle, 1.0, 2.0, 0.95))
|
||||
bb = p.compute_bounding_box()
|
||||
assert bb is not None
|
||||
# bbox should span both keypoints
|
||||
assert bb.x_min <= 0.0
|
||||
assert bb.y_min <= 0.0
|
||||
assert bb.x_max >= 1.0
|
||||
assert bb.y_max >= 2.0
|
||||
# also stored
|
||||
assert p.bounding_box is not None
|
||||
|
||||
|
||||
def test_person_pose_set_confidence_validation() -> None:
|
||||
p = PersonPose()
|
||||
p.set_confidence(0.85)
|
||||
assert p.confidence == pytest.approx(0.85)
|
||||
with pytest.raises(ValueError):
|
||||
p.set_confidence(1.5)
|
||||
|
||||
|
||||
def test_person_pose_repr() -> None:
|
||||
p = PersonPose()
|
||||
p.set_id(3)
|
||||
p.set_keypoint(Keypoint(KeypointType.Nose, 0.0, 0.0, 0.9))
|
||||
r = repr(p)
|
||||
assert "PersonPose" in r
|
||||
assert "id=Some(3)" in r or "id=3" in r
|
||||
|
||||
|
||||
# ─── PoseEstimate ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_pose_estimate_construct_empty() -> None:
|
||||
e = PoseEstimate([], 0.5, 1.0, "test-v0")
|
||||
assert e.person_count == 0
|
||||
assert not e.has_detections
|
||||
assert e.confidence == pytest.approx(0.5)
|
||||
assert e.latency_ms == pytest.approx(1.0)
|
||||
assert e.model_version == "test-v0"
|
||||
|
||||
|
||||
def test_pose_estimate_construct_with_persons() -> None:
|
||||
p1 = PersonPose()
|
||||
p1.set_id(1)
|
||||
p1.set_confidence(0.8)
|
||||
p2 = PersonPose()
|
||||
p2.set_id(2)
|
||||
p2.set_confidence(0.9)
|
||||
e = PoseEstimate([p1, p2], 0.85, 5.2, "v0.7.0")
|
||||
assert e.person_count == 2
|
||||
assert e.has_detections
|
||||
assert e.confidence == pytest.approx(0.85)
|
||||
|
||||
|
||||
def test_pose_estimate_highest_confidence_person() -> None:
|
||||
p1 = PersonPose()
|
||||
p1.set_confidence(0.5)
|
||||
p2 = PersonPose()
|
||||
p2.set_confidence(0.95)
|
||||
p3 = PersonPose()
|
||||
p3.set_confidence(0.7)
|
||||
e = PoseEstimate([p1, p2, p3], 0.85, 5.2, "v0.7.0")
|
||||
best = e.highest_confidence_person()
|
||||
assert best is not None
|
||||
assert best.confidence == pytest.approx(0.95)
|
||||
|
||||
|
||||
def test_pose_estimate_highest_confidence_returns_none_when_empty() -> None:
|
||||
e = PoseEstimate([], 0.5, 1.0, "test")
|
||||
assert e.highest_confidence_person() is None
|
||||
|
||||
|
||||
def test_pose_estimate_metadata_strings_nonempty() -> None:
|
||||
e = PoseEstimate([], 0.5, 1.0, "test")
|
||||
assert isinstance(e.id, str)
|
||||
assert isinstance(e.timestamp, str)
|
||||
assert e.id # non-empty
|
||||
assert e.timestamp # non-empty
|
||||
|
||||
|
||||
def test_pose_estimate_confidence_validation() -> None:
|
||||
with pytest.raises(ValueError):
|
||||
PoseEstimate([], 1.5, 0.0, "test")
|
||||
|
||||
|
||||
def test_pose_estimate_repr_contains_counts() -> None:
|
||||
e = PoseEstimate([], 0.5, 2.3, "v0.7.0")
|
||||
r = repr(e)
|
||||
assert "PoseEstimate" in r
|
||||
assert "v0.7.0" in r
|
||||
|
||||
|
||||
def test_build_features_marks_p2_complete() -> None:
|
||||
import wifi_densepose
|
||||
|
||||
assert "p2-keypoint-bindings" in wifi_densepose.__build_features__
|
||||
assert "p2-pose-bindings" in wifi_densepose.__build_features__
|
||||
@@ -0,0 +1,260 @@
|
||||
"""ADR-117 hardening sweep — Security & robustness tests for the
|
||||
client surface.
|
||||
|
||||
Scope: malformed/hostile input handling across the WS decoder, MQTT
|
||||
matcher + dispatch, HA discovery parser, and semantic primitive
|
||||
listener. The goal is to ensure that an adversarial broker or
|
||||
sensing-server can't:
|
||||
|
||||
- Crash the client process via malformed JSON, UTF-8, or topic shapes
|
||||
- Bypass topic-wildcard matching to deliver messages to the wrong handler
|
||||
- Leak MQTT credentials through `repr()` or string conversion
|
||||
- Trigger unbounded memory growth via deeply-nested JSON
|
||||
- Get a handler exception to crash the network loop
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from wifi_densepose.client import RuViewMqttClient, SemanticPrimitiveListener
|
||||
from wifi_densepose.client.ha import (
|
||||
HABlueprintHelper,
|
||||
parse_discovery_payload,
|
||||
parse_discovery_topic,
|
||||
)
|
||||
from wifi_densepose.client.mqtt import _topic_matches
|
||||
from wifi_densepose.client.ws import _decode
|
||||
|
||||
|
||||
# ─── WS decoder robustness ──────────────────────────────────────────
|
||||
|
||||
|
||||
def test_ws_decoder_rejects_non_object_root() -> None:
|
||||
"""A JSON array at the root must NOT crash the decoder. Plain
|
||||
string/array root values are valid JSON but not valid sensing-
|
||||
server messages — the decoder must reject them cleanly."""
|
||||
with pytest.raises(ValueError):
|
||||
_decode("[1, 2, 3]")
|
||||
with pytest.raises(ValueError):
|
||||
_decode('"just a string"')
|
||||
with pytest.raises(ValueError):
|
||||
_decode("42")
|
||||
|
||||
|
||||
def test_ws_decoder_rejects_malformed_json() -> None:
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
_decode("{ broken: json")
|
||||
|
||||
|
||||
def test_ws_decoder_handles_deeply_nested_payload_without_crash() -> None:
|
||||
"""Hostile JSON nested 1000 levels deep must not crash via
|
||||
Python's default recursion limit. Json.loads has a built-in
|
||||
guard; verify we don't accidentally bypass it."""
|
||||
nested = "{" + '"a":{' * 999 + '"x":1' + "}" * 1000
|
||||
# json.loads either succeeds (since 999 < ~1000 limit) or raises
|
||||
# RecursionError; either is acceptable — the key is no segfault
|
||||
# or hang.
|
||||
try:
|
||||
_decode(nested)
|
||||
except (RecursionError, json.JSONDecodeError, ValueError):
|
||||
pass # All acceptable.
|
||||
|
||||
|
||||
def test_ws_decoder_handles_huge_string_values() -> None:
|
||||
"""A 1 MB string in a JSON field must decode without exploding.
|
||||
The websockets `max_size` parameter (default 16 MB) is the actual
|
||||
DoS guard — this just confirms the decoder itself is linear."""
|
||||
huge_payload = json.dumps({
|
||||
"type": "edge_vitals",
|
||||
"node_id": "x" * (1024 * 1024), # 1 MB string
|
||||
"presence": True,
|
||||
"fall_detected": False,
|
||||
"motion": 0.0,
|
||||
})
|
||||
msg = _decode(huge_payload)
|
||||
assert msg.type == "edge_vitals"
|
||||
|
||||
|
||||
def test_ws_decoder_handles_unicode_in_node_id() -> None:
|
||||
"""Non-ASCII node IDs (e.g. accidental terminal escapes) must
|
||||
round-trip cleanly without re-encoding errors."""
|
||||
payload = json.dumps({"type": "edge_vitals", "node_id": "nöde-中", "presence": True, "fall_detected": False, "motion": 0.0})
|
||||
msg = _decode(payload)
|
||||
assert msg.node_id == "nöde-中" # type: ignore[attr-defined]
|
||||
|
||||
|
||||
# ─── MQTT topic matcher — exhaustive edge cases ─────────────────────
|
||||
|
||||
|
||||
@pytest.mark.parametrize("pattern,topic,expected", [
|
||||
# Empty / boundary
|
||||
("", "", True),
|
||||
("a", "", False),
|
||||
("", "a", False),
|
||||
# `+` cannot bypass a literal level boundary
|
||||
("a/+/c", "a/b/c", True),
|
||||
("a/+/c", "a/b/d", False),
|
||||
("a/+/c", "a/b/c/d", False),
|
||||
# `#` is greedy from its position but does not match if it's
|
||||
# mid-pattern (per MQTT spec; our matcher returns False then).
|
||||
("a/#/c", "a/b/c", False), # `#` must be terminal
|
||||
# Topics starting with `$` are legal here — we don't filter them;
|
||||
# matching is purely syntactic. `+` is one-level only, so `$SYS/+`
|
||||
# matches `$SYS/broker` but NOT `$SYS/broker/version`.
|
||||
("$SYS/+", "$SYS/broker", True),
|
||||
("$SYS/+", "$SYS/broker/version", False),
|
||||
("$SYS/#", "$SYS/broker/version", True),
|
||||
# Null byte in topic: still string comparison, but useful to lock
|
||||
# down behaviour.
|
||||
("a/b", "a\x00/b", False),
|
||||
])
|
||||
def test_topic_matcher_edge_cases(pattern: str, topic: str, expected: bool) -> None:
|
||||
assert _topic_matches(pattern, topic) is expected
|
||||
|
||||
|
||||
# ─── MQTT credential confidentiality ────────────────────────────────
|
||||
|
||||
|
||||
def test_mqtt_password_never_in_repr() -> None:
|
||||
"""A user's broker password must NOT leak through __repr__ or
|
||||
__str__. Currently RuViewMqttClient doesn't define repr — that's
|
||||
the safest default (uses object identity). Lock that down so a
|
||||
future "let's add a friendly repr" change doesn't expose creds."""
|
||||
c = RuViewMqttClient(
|
||||
broker_host="broker.example.com",
|
||||
username="alice",
|
||||
password="super-secret-token-do-not-leak",
|
||||
)
|
||||
rep = repr(c)
|
||||
s = str(c)
|
||||
assert "super-secret-token-do-not-leak" not in rep
|
||||
assert "super-secret-token-do-not-leak" not in s
|
||||
|
||||
|
||||
def test_mqtt_password_never_stored_in_plain_attribute() -> None:
|
||||
"""The plaintext password must not be stored on the client
|
||||
instance — paho-mqtt internalises it into `_client._username_pw`
|
||||
which we never expose. Audit by walking the public dict."""
|
||||
c = RuViewMqttClient(password="dont-leak-me")
|
||||
for k, v in vars(c).items():
|
||||
if isinstance(v, str):
|
||||
assert "dont-leak-me" not in v, f"password leaked via attribute {k!r}"
|
||||
|
||||
|
||||
# ─── HA discovery — adversarial topics ──────────────────────────────
|
||||
|
||||
|
||||
def test_ha_discovery_rejects_topic_with_null_byte() -> None:
|
||||
"""Defensive: regex must not match a null-byte-laced topic."""
|
||||
bad = "homeassistant/binary_sensor/wifi_densepose_aa\x00bb/presence/config"
|
||||
assert parse_discovery_topic(bad) is None
|
||||
assert parse_discovery_payload(bad, {"name": "x"}) is None
|
||||
|
||||
|
||||
def test_ha_discovery_rejects_topic_with_slash_in_node_id() -> None:
|
||||
"""A node_id with embedded slashes would break the unique_id
|
||||
contract; reject."""
|
||||
bad = "homeassistant/binary_sensor/wifi_densepose_aa/bb/presence/config"
|
||||
# The regex won't match because there are too many segments.
|
||||
assert parse_discovery_topic(bad) is None
|
||||
|
||||
|
||||
def test_ha_helper_drops_invalid_topic_silently() -> None:
|
||||
"""`add_payload` should return False (not raise) for non-discovery
|
||||
topics so a misconfigured broker doesn't bring down the client."""
|
||||
h = HABlueprintHelper()
|
||||
assert h.add_payload("garbage", {"x": 1}) is False
|
||||
assert h.add_payload("ruview/aa/raw/edge_vitals", {"x": 1}) is False
|
||||
assert len(h) == 0
|
||||
|
||||
|
||||
def test_ha_helper_handles_non_dict_payload() -> None:
|
||||
"""If the HA discovery body is a list or scalar (broken producer),
|
||||
the helper must reject rather than crash on attribute access."""
|
||||
h = HABlueprintHelper()
|
||||
topic = "homeassistant/binary_sensor/wifi_densepose_aabb/presence/config"
|
||||
assert h.add_payload(topic, "[1, 2, 3]") is False
|
||||
assert h.add_payload(topic, "42") is False
|
||||
assert h.add_payload(topic, b"\xff\xfe invalid utf-8") is False
|
||||
|
||||
|
||||
# ─── Semantic primitive listener — adversarial input ────────────────
|
||||
|
||||
|
||||
def test_primitive_listener_ignores_topic_injection_attempts() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
# Extra leading segments
|
||||
assert listener.handle_mqtt_message(
|
||||
"evil/homeassistant/binary_sensor/wifi_densepose_aa/someone_sleeping/state",
|
||||
"ON",
|
||||
) is None
|
||||
# Wrong final segment
|
||||
assert listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aa/someone_sleeping/STATE",
|
||||
"ON",
|
||||
) is None
|
||||
# Empty node_id after the wifi_densepose_ prefix is still routed
|
||||
# (the node_id is "") because we don't enforce a minimum length —
|
||||
# but that's not an injection vector. Confirm behaviour.
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_/someone_sleeping/state",
|
||||
"ON",
|
||||
)
|
||||
assert evt is not None
|
||||
assert evt.node_id == ""
|
||||
|
||||
|
||||
def test_primitive_listener_handles_garbage_payload_without_crash() -> None:
|
||||
listener = SemanticPrimitiveListener()
|
||||
# Bytes that aren't valid UTF-8
|
||||
evt = listener.handle_mqtt_message(
|
||||
"homeassistant/binary_sensor/wifi_densepose_aa/room_active/state",
|
||||
b"\xff\xfe\xfd",
|
||||
)
|
||||
assert evt is not None # we return a sentinel rather than crash
|
||||
# No assertions on state content — undefined for invalid UTF-8;
|
||||
# what matters is no exception escaped.
|
||||
|
||||
|
||||
# ─── Public surface integrity ───────────────────────────────────────
|
||||
|
||||
|
||||
def test_public_surface_is_stable() -> None:
|
||||
"""Every name in `wifi_densepose.__all__` must be resolvable.
|
||||
Catches accidental re-export breakage between phases."""
|
||||
import wifi_densepose
|
||||
for name in wifi_densepose.__all__:
|
||||
assert hasattr(wifi_densepose, name), f"__all__ promises {name!r} but attribute missing"
|
||||
|
||||
|
||||
def test_client_public_surface_is_stable() -> None:
|
||||
import wifi_densepose.client as c
|
||||
for name in c.__all__:
|
||||
# Lazy re-exports for SensingClient + RuViewMqttClient need to
|
||||
# be resolvable too — touch them to exercise __getattr__.
|
||||
_ = getattr(c, name)
|
||||
|
||||
|
||||
# ─── Handler crash isolation (expanded) ─────────────────────────────
|
||||
|
||||
|
||||
def test_mqtt_handler_exception_isolation_with_multiple_handlers() -> None:
|
||||
"""Earlier test covered one crashing handler; this version makes
|
||||
sure a crashing handler in the *middle* of a list of registered
|
||||
handlers doesn't prevent later handlers from firing."""
|
||||
c = RuViewMqttClient()
|
||||
received_before: list[str] = []
|
||||
received_after: list[str] = []
|
||||
c.on_message("a/+", lambda t, p: received_before.append(t))
|
||||
c.on_message("a/b", lambda t, p: (_ for _ in ()).throw(RuntimeError("middle crash")))
|
||||
c.on_message("+/b", lambda t, p: received_after.append(t))
|
||||
|
||||
msg = SimpleNamespace(topic="a/b", payload=b"x")
|
||||
c._on_message(None, None, msg)
|
||||
|
||||
assert received_before == ["a/b"]
|
||||
assert received_after == ["a/b"]
|
||||
@@ -0,0 +1,81 @@
|
||||
"""ADR-117 P1 smoke tests — assert the maturin-built wheel loads and
|
||||
its compiled module is callable.
|
||||
|
||||
These tests are the first acceptance gate of the v2.0 PyPI publish
|
||||
pipeline (ADR-117 §11.1 — ``cargo test`` equivalent at the Python
|
||||
level). They run on every cibuildwheel target in P5's CI matrix.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def test_package_imports() -> None:
|
||||
"""The top-level package must import without error."""
|
||||
import wifi_densepose # noqa: F401
|
||||
|
||||
|
||||
def test_version_string_well_formed() -> None:
|
||||
"""Version string follows PEP 440 + matches pyproject.toml."""
|
||||
import re
|
||||
|
||||
import wifi_densepose
|
||||
|
||||
assert isinstance(wifi_densepose.__version__, str)
|
||||
# Allow pre-release segments (a, b, rc, dev) for non-final wheels.
|
||||
assert re.match(
|
||||
r"^\d+\.\d+\.\d+(a|b|rc|\.dev)?\d*$", wifi_densepose.__version__
|
||||
), f"non-PEP-440 version: {wifi_densepose.__version__}"
|
||||
|
||||
|
||||
def test_rust_version_surfaced() -> None:
|
||||
"""Bound Rust core version must be reachable from Python.
|
||||
|
||||
This is the diagnostic surface ADR-117 §5.2 promised — users in
|
||||
bug reports can paste ``wifi_densepose.__rust_version__`` so we
|
||||
correlate behaviour with the exact ``v2/crates/`` HEAD.
|
||||
"""
|
||||
import wifi_densepose
|
||||
|
||||
assert isinstance(wifi_densepose.__rust_version__, str)
|
||||
assert wifi_densepose.__rust_version__ # non-empty
|
||||
|
||||
|
||||
def test_build_features_listed() -> None:
|
||||
"""The wheel's build-time features must be enumerable.
|
||||
|
||||
P1 ships only the ``p1-scaffold`` feature marker; later phases
|
||||
add more entries. The test asserts the contract that the list
|
||||
exists and contains the P1 marker.
|
||||
"""
|
||||
import wifi_densepose
|
||||
|
||||
feats = wifi_densepose.__build_features__
|
||||
assert isinstance(feats, list)
|
||||
assert all(isinstance(f, str) for f in feats)
|
||||
assert "p1-scaffold" in feats, f"P1 marker missing: {feats}"
|
||||
|
||||
|
||||
def test_hello_returns_ok() -> None:
|
||||
"""The compiled ``hello`` function round-trips through PyO3.
|
||||
|
||||
This is the actual smoke test — proves the FFI works end-to-end.
|
||||
If this passes on every cibuildwheel target, the PyO3 build matrix
|
||||
is healthy.
|
||||
"""
|
||||
import wifi_densepose
|
||||
|
||||
assert wifi_densepose.hello() == "ok"
|
||||
|
||||
|
||||
def test_native_module_private() -> None:
|
||||
"""The compiled module is reachable but marked private.
|
||||
|
||||
Users should ``import wifi_densepose``, not ``import
|
||||
wifi_densepose._native``. The underscore prefix communicates that.
|
||||
"""
|
||||
import wifi_densepose
|
||||
from wifi_densepose import _native
|
||||
|
||||
assert hasattr(_native, "hello"), "compiled module missing hello()"
|
||||
# Both paths must return the same value.
|
||||
assert wifi_densepose.hello() == _native.hello()
|
||||
@@ -0,0 +1,196 @@
|
||||
"""ADR-117 P3 — Tests for vital-sign extraction bindings.
|
||||
|
||||
Covers:
|
||||
|
||||
- VitalStatus enum (eq, eq_int, hash, frozen)
|
||||
- VitalEstimate construction + getters + immutability
|
||||
- VitalReading composite + getters
|
||||
- BreathingExtractor + HeartRateExtractor — esp32_default, explicit
|
||||
ctor, extract() return type, validation behaviour
|
||||
|
||||
The Rust pipeline is unit-tested in `v2/crates/wifi-densepose-vitals/`.
|
||||
These tests are deliberately scoped to the *binding* layer — does the
|
||||
Python surface return the right shapes, raise the right errors, and
|
||||
release the GIL safely.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from random import Random
|
||||
|
||||
import pytest
|
||||
|
||||
import wifi_densepose
|
||||
from wifi_densepose import (
|
||||
BreathingExtractor,
|
||||
HeartRateExtractor,
|
||||
VitalEstimate,
|
||||
VitalReading,
|
||||
VitalStatus,
|
||||
)
|
||||
|
||||
|
||||
# ─── VitalStatus enum ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_vital_status_variants_present() -> None:
|
||||
assert VitalStatus.Valid != VitalStatus.Degraded
|
||||
assert VitalStatus.Unreliable != VitalStatus.Unavailable
|
||||
|
||||
|
||||
def test_vital_status_equality_against_int() -> None:
|
||||
# eq_int → enum can be compared to int (PyO3 0.22 surface)
|
||||
assert VitalStatus.Valid == 0
|
||||
assert VitalStatus.Unavailable == 3
|
||||
|
||||
|
||||
def test_vital_status_is_hashable() -> None:
|
||||
# frozen + hash → can be used as dict key / set member
|
||||
s = {VitalStatus.Valid, VitalStatus.Valid, VitalStatus.Degraded}
|
||||
assert len(s) == 2
|
||||
|
||||
|
||||
def test_vital_status_repr_contains_variant_name() -> None:
|
||||
r = repr(VitalStatus.Valid)
|
||||
assert "VitalStatus" in r and "Valid" in r
|
||||
|
||||
|
||||
# ─── VitalEstimate ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_vital_estimate_construction_and_getters() -> None:
|
||||
est = VitalEstimate(value_bpm=72.4, confidence=0.85, status=VitalStatus.Valid)
|
||||
assert math.isclose(est.value_bpm, 72.4)
|
||||
assert math.isclose(est.confidence, 0.85)
|
||||
assert est.status == VitalStatus.Valid
|
||||
|
||||
|
||||
def test_vital_estimate_is_frozen() -> None:
|
||||
est = VitalEstimate(value_bpm=72.0, confidence=0.9, status=VitalStatus.Valid)
|
||||
with pytest.raises(AttributeError):
|
||||
est.value_bpm = 100.0 # type: ignore[misc]
|
||||
|
||||
|
||||
def test_vital_estimate_repr_is_readable() -> None:
|
||||
est = VitalEstimate(value_bpm=72.0, confidence=0.9, status=VitalStatus.Valid)
|
||||
r = repr(est)
|
||||
assert "VitalEstimate" in r
|
||||
assert "72" in r
|
||||
|
||||
|
||||
# ─── VitalReading ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_vital_reading_construction_and_getters() -> None:
|
||||
br = VitalEstimate(value_bpm=14.0, confidence=0.9, status=VitalStatus.Valid)
|
||||
hr = VitalEstimate(value_bpm=72.0, confidence=0.8, status=VitalStatus.Degraded)
|
||||
reading = VitalReading(
|
||||
respiratory_rate=br,
|
||||
heart_rate=hr,
|
||||
subcarrier_count=56,
|
||||
signal_quality=0.77,
|
||||
timestamp_secs=1700000000.5,
|
||||
)
|
||||
assert reading.respiratory_rate.value_bpm == 14.0
|
||||
assert reading.heart_rate.status == VitalStatus.Degraded
|
||||
assert reading.subcarrier_count == 56
|
||||
assert math.isclose(reading.signal_quality, 0.77)
|
||||
assert math.isclose(reading.timestamp_secs, 1700000000.5)
|
||||
|
||||
|
||||
# ─── BreathingExtractor ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_breathing_esp32_default_constructs() -> None:
|
||||
br = BreathingExtractor.esp32_default()
|
||||
assert br is not None
|
||||
assert "BreathingExtractor" in repr(br)
|
||||
|
||||
|
||||
def test_breathing_explicit_ctor() -> None:
|
||||
br = BreathingExtractor(n_subcarriers=64, sample_rate=200.0, window_secs=20.0)
|
||||
assert br is not None
|
||||
|
||||
|
||||
def test_breathing_extract_returns_none_with_too_few_samples() -> None:
|
||||
"""One frame can't produce a 30-second window — must return None.
|
||||
|
||||
Verifies the binding propagates Rust's `Option<VitalEstimate>` →
|
||||
Python None correctly (vs raising or returning a default).
|
||||
"""
|
||||
br = BreathingExtractor.esp32_default()
|
||||
out = br.extract(residuals=[0.0] * 56, weights=[])
|
||||
assert out is None
|
||||
|
||||
|
||||
def test_breathing_extract_accepts_empty_weights() -> None:
|
||||
"""Empty weights vector means "equal weight per subcarrier" by
|
||||
convention (per breathing.rs)."""
|
||||
br = BreathingExtractor.esp32_default()
|
||||
out = br.extract(residuals=[0.01] * 56, weights=[])
|
||||
# Even with synthetic input it may return None until enough history
|
||||
# accumulates — what matters is that the call doesn't panic.
|
||||
assert out is None or isinstance(out, VitalEstimate)
|
||||
|
||||
|
||||
def test_breathing_extract_with_synthetic_signal() -> None:
|
||||
"""Drive the extractor with a synthetic 0.25 Hz sine (15 BPM) for
|
||||
enough samples to fill the 30-second window. Don't assert the exact
|
||||
BPM — just that the extractor *eventually* produces a result (rather
|
||||
than returning None forever)."""
|
||||
br = BreathingExtractor.esp32_default()
|
||||
sample_rate = 100.0
|
||||
target_freq = 0.25 # 15 BPM
|
||||
# Run 40 seconds of synthetic data — comfortably past the 30s window.
|
||||
n_samples = int(40 * sample_rate)
|
||||
weights = [1.0] * 56
|
||||
|
||||
produced_estimate = False
|
||||
rng = Random(42)
|
||||
for i in range(n_samples):
|
||||
t = i / sample_rate
|
||||
base = math.sin(2.0 * math.pi * target_freq * t)
|
||||
# Per-subcarrier residual: same signal + small per-carrier noise
|
||||
residuals = [base + rng.gauss(0.0, 0.01) for _ in range(56)]
|
||||
est = br.extract(residuals=residuals, weights=weights)
|
||||
if est is not None:
|
||||
produced_estimate = True
|
||||
assert isinstance(est.value_bpm, float)
|
||||
assert 0.0 <= est.confidence <= 1.0
|
||||
assert est.status in (
|
||||
VitalStatus.Valid,
|
||||
VitalStatus.Degraded,
|
||||
VitalStatus.Unreliable,
|
||||
VitalStatus.Unavailable,
|
||||
)
|
||||
break
|
||||
|
||||
assert produced_estimate, "BreathingExtractor never produced an estimate after 40s of synthetic data"
|
||||
|
||||
|
||||
# ─── HeartRateExtractor ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_heart_rate_esp32_default_constructs() -> None:
|
||||
hr = HeartRateExtractor.esp32_default()
|
||||
assert hr is not None
|
||||
assert "HeartRateExtractor" in repr(hr)
|
||||
|
||||
|
||||
def test_heart_rate_explicit_ctor() -> None:
|
||||
hr = HeartRateExtractor(n_subcarriers=64, sample_rate=200.0, window_secs=10.0)
|
||||
assert hr is not None
|
||||
|
||||
|
||||
def test_heart_rate_extract_returns_none_with_too_few_samples() -> None:
|
||||
hr = HeartRateExtractor.esp32_default()
|
||||
out = hr.extract(residuals=[0.0] * 56, weights=[])
|
||||
assert out is None
|
||||
|
||||
|
||||
# ─── Build feature flag ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_p3_vitals_in_build_features() -> None:
|
||||
assert "p3-vitals-bindings" in wifi_densepose.__build_features__
|
||||
Reference in New Issue
Block a user