feat(adr-117/p4): pure-Python WS/MQTT client layer

New sub-package `wifi_densepose.client` (no PyO3, no Rust deps):

- ws.SensingClient — asyncio websockets>=12 wrapper for the Rust
  sensing-server /ws/sensing endpoint. Yields typed dataclasses
  (ConnectionEstablishedMessage, EdgeVitalsMessage, PoseDataMessage)
  with raw-payload fallback for forward-compat with unknown types.
  Malformed frames log+drop without breaking the stream.

- mqtt.RuViewMqttClient — paho-mqtt v2 wrapper using the explicit
  CallbackAPIVersion.VERSION2 API. Per-instance unique client_id by
  default (rumqttc memory lesson). MQTT v5-spec-correct topic
  wildcard matcher: + as whole-level wildcard, # matches the prefix
  itself plus all sub-levels. Auto-resubscribes on reconnect.
  Handler exceptions are caught and logged so a misbehaving callback
  can't crash the network loop.

- primitives.SemanticPrimitiveListener — typed router for the 10
  HA-MIND fused inference outputs from ADR-115 §3.12
  (SomeoneSleeping, PossibleDistress, RoomActive, ElderlyInactivity-
  Anomaly, MeetingInProgress, BathroomOccupied, FallRiskElevated,
  BedExit, NoMovementSafety, MultiRoomTransition). Decodes both
  JSON payloads with confidence+explanation AND plain HA state
  strings ("ON"/"OFF"/numeric). Pluggable into RuViewMqttClient.

- ha.HABlueprintHelper — read-only parser for the
  homeassistant/<kind>/wifi_densepose_<node>/<id>/config payload
  family. Aggregator queries: entities_for_node, by_device_class,
  nodes. Useful for blueprint authors + dashboard introspection.

Test coverage (63 new tests, 156 total in Python suite):
- test_client_ha — 18 tests (topic+payload parsing, aggregator)
- test_client_primitives — 13 tests (enum coverage, listener routing)
- test_client_mqtt — 17 tests (matcher parametrize, dispatch path,
  on_connect, exception isolation) — no broker needed
- test_client_ws — 6 tests including end-to-end against an in-process
  websockets.serve() fixture exercising all 4 message types plus a
  malformed-frame survival check

Post-bridge wheel size: 238 KB (well under ADR §5.4 5 MB budget).

Refs: docs/adr/ADR-117-pip-wifi-densepose-modernization.md §5.6
Refs: docs/adr/ADR-115-home-assistant-integration.md §3.12
Refs: #785

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv
2026-05-24 11:31:29 -04:00
parent 2d29359809
commit f21daf9aa8
10 changed files with 1816 additions and 1 deletions
+6 -1
View File
@@ -62,7 +62,12 @@ python/
Beamforming Feedback Loop Data. numpy Complex64 bridge. 19 tests.
Real Rust ingestion lands post-v2.0 in a `wifi-densepose-bfld`
crate (see ADR-117 §11.11/12); the Python API does not change.
- **P4 — WS/MQTT client**: pure-Python `wifi_densepose.client` extra.
- **P4 — WS/MQTT client**: pure-Python `wifi_densepose.client` extra
(no Rust). `SensingClient` (asyncio websockets), `RuViewMqttClient`
(paho-mqtt v2 with VERSION2 callbacks), `HABlueprintHelper` (HA
discovery payload parser), `SemanticPrimitiveListener` (typed router
for the 10 HA-MIND primitives from ADR-115 §3.12). 63 tests including
end-to-end against an in-process `websockets.serve` fixture.
-**P5 — cibuildwheel + PyPI publish**: Linux/macOS/Windows × abi3-py310.
-**P-tomb — v1.99.0 tombstone wheel**: pure-Python ImportError
with migration URL, published to PyPI to soft-fence v1.x users
+205
View File
@@ -0,0 +1,205 @@
"""ADR-117 P4 — Tests for HA-DISCO payload parsing.
Pure parsing tests — no MQTT broker needed.
"""
from __future__ import annotations
import json
import pytest
from wifi_densepose.client import (
HABlueprintHelper,
HaDiscoveryPayload,
HaEntity,
)
from wifi_densepose.client.ha import (
parse_discovery_payload,
parse_discovery_topic,
)
# Real discovery payloads pulled from ADR-115 §3 (formatted for test
# readability; payloads are otherwise verbatim).
_PRESENCE_TOPIC = "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/config"
_PRESENCE_BODY = {
"name": "Presence",
"unique_id": "wifi_densepose_aabbccddeeff_presence",
"object_id": "wifi_densepose_aabbccddeeff_presence",
"state_topic": "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state",
"availability_topic": "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/availability",
"device_class": "occupancy",
"icon": "mdi:motion-sensor",
}
_HEART_RATE_TOPIC = "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/config"
_HEART_RATE_BODY = {
"name": "Heart rate",
"unique_id": "wifi_densepose_aabbccddeeff_heart_rate",
"state_topic": "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/state",
"state_class": "measurement",
"unit_of_measurement": "bpm",
"icon": "mdi:heart-pulse",
"json_attributes_topic": "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/state",
}
# ─── Topic parsing ───────────────────────────────────────────────────
def test_parse_discovery_topic_binary_sensor() -> None:
out = parse_discovery_topic(_PRESENCE_TOPIC)
assert out == ("binary_sensor", "aabbccddeeff", "presence")
def test_parse_discovery_topic_sensor() -> None:
out = parse_discovery_topic(_HEART_RATE_TOPIC)
assert out == ("sensor", "aabbccddeeff", "heart_rate")
def test_parse_discovery_topic_event() -> None:
out = parse_discovery_topic(
"homeassistant/event/wifi_densepose_aabbccddeeff/fall/config"
)
assert out == ("event", "aabbccddeeff", "fall")
def test_parse_discovery_topic_returns_none_for_non_discovery() -> None:
assert parse_discovery_topic("homeassistant/binary_sensor/foo/state") is None
assert parse_discovery_topic("ruview/aabbccddeeff/raw/edge_vitals") is None
assert parse_discovery_topic("") is None
# ─── Payload parsing ─────────────────────────────────────────────────
def test_parse_discovery_payload_from_dict() -> None:
out = parse_discovery_payload(_PRESENCE_TOPIC, _PRESENCE_BODY)
assert out is not None
assert out.entity_kind == "binary_sensor"
assert out.node_id == "aabbccddeeff"
assert out.object_id == "presence"
assert out.payload["device_class"] == "occupancy"
def test_parse_discovery_payload_from_bytes() -> None:
raw = json.dumps(_PRESENCE_BODY).encode("utf-8")
out = parse_discovery_payload(_PRESENCE_TOPIC, raw)
assert out is not None
assert out.payload["unique_id"] == "wifi_densepose_aabbccddeeff_presence"
def test_parse_discovery_payload_from_string() -> None:
raw = json.dumps(_PRESENCE_BODY)
out = parse_discovery_payload(_PRESENCE_TOPIC, raw)
assert out is not None
assert out.entity_kind == "binary_sensor"
def test_parse_discovery_payload_rejects_malformed_json() -> None:
assert parse_discovery_payload(_PRESENCE_TOPIC, "{ broken: json") is None
def test_parse_discovery_payload_rejects_non_object_root() -> None:
assert parse_discovery_payload(_PRESENCE_TOPIC, "[1, 2, 3]") is None
def test_parse_discovery_payload_returns_none_for_non_discovery_topic() -> None:
assert parse_discovery_payload(
"ruview/aabbccddeeff/raw/edge_vitals",
_PRESENCE_BODY,
) is None
# ─── HaEntity projection ─────────────────────────────────────────────
def test_ha_entity_from_payload_extracts_fields() -> None:
p = HaDiscoveryPayload(
entity_kind="sensor",
node_id="aabbccddeeff",
object_id="heart_rate",
payload=_HEART_RATE_BODY,
)
e = HaEntity.from_payload(p)
assert e.entity_kind == "sensor"
assert e.unique_id == "wifi_densepose_aabbccddeeff_heart_rate"
assert e.unit_of_measurement == "bpm"
assert e.icon == "mdi:heart-pulse"
assert e.json_attributes_topic == _HEART_RATE_BODY["json_attributes_topic"]
def test_ha_entity_handles_missing_optional_fields() -> None:
p = HaDiscoveryPayload(
entity_kind="event",
node_id="aabbccddeeff",
object_id="bed_exit",
payload={"unique_id": "wifi_densepose_aabbccddeeff_bed_exit"},
)
e = HaEntity.from_payload(p)
assert e.unique_id == "wifi_densepose_aabbccddeeff_bed_exit"
assert e.device_class == ""
assert e.unit_of_measurement == ""
# ─── HABlueprintHelper aggregation ───────────────────────────────────
def _populated_helper() -> HABlueprintHelper:
h = HABlueprintHelper()
h.add_payload(_PRESENCE_TOPIC, _PRESENCE_BODY)
h.add_payload(_HEART_RATE_TOPIC, _HEART_RATE_BODY)
# Same fields but a different node
h.add_payload(
"homeassistant/binary_sensor/wifi_densepose_ff00ff00ff00/presence/config",
{**_PRESENCE_BODY, "unique_id": "wifi_densepose_ff00ff00ff00_presence"},
)
return h
def test_helper_starts_empty() -> None:
h = HABlueprintHelper()
assert len(h) == 0
assert h.nodes() == []
assert h.all_payloads() == []
def test_helper_aggregates_multiple_payloads() -> None:
h = _populated_helper()
assert len(h) == 3
assert h.nodes() == ["aabbccddeeff", "ff00ff00ff00"]
def test_helper_entities_for_node() -> None:
h = _populated_helper()
entities = h.entities_for_node("aabbccddeeff")
object_ids = sorted(e.object_id for e in entities)
assert object_ids == ["heart_rate", "presence"]
def test_helper_by_device_class() -> None:
h = _populated_helper()
occupancy_entities = h.by_device_class("occupancy")
assert len(occupancy_entities) == 2 # presence on both nodes
assert {e.node_id for e in occupancy_entities} == {"aabbccddeeff", "ff00ff00ff00"}
def test_helper_remove() -> None:
h = _populated_helper()
assert h.remove("aabbccddeeff", "binary_sensor", "presence") is True
assert h.remove("aabbccddeeff", "binary_sensor", "presence") is False # no-op
assert len(h) == 2
def test_helper_rejects_non_discovery_topics() -> None:
h = HABlueprintHelper()
ok = h.add_payload("ruview/aabbccddeeff/raw/edge_vitals", _PRESENCE_BODY)
assert ok is False
assert len(h) == 0
def test_helper_in_operator() -> None:
h = _populated_helper()
assert ("aabbccddeeff", "binary_sensor", "presence") in h
assert ("nonexistent", "binary_sensor", "presence") not in h
+208
View File
@@ -0,0 +1,208 @@
"""ADR-117 P4 — Tests for RuViewMqttClient.
These tests do NOT bring up a broker — they exercise:
1. Topic-wildcard matching (`_topic_matches`)
2. Client construction + handler registration
3. The callback path by directly invoking the paho callback methods
with synthesized messages
End-to-end broker integration is a P4-followon item (the mosquitto
patterns from memory [[feedback_mqtt_integration_test_patterns]] go
there). This file keeps unit coverage tight without requiring a
broker on every CI run.
"""
from __future__ import annotations
import json
from types import SimpleNamespace
from typing import Any
import pytest
from wifi_densepose.client import RuViewMqttClient
from wifi_densepose.client.mqtt import _topic_matches
# ─── Topic wildcard matcher ──────────────────────────────────────────
@pytest.mark.parametrize("pattern,topic,expected", [
("ruview/+/raw/edge_vitals", "ruview/aabb/raw/edge_vitals", True),
("ruview/+/raw/edge_vitals", "ruview/aabb/cooked/edge_vitals", False),
("ruview/+/raw/+", "ruview/aabb/raw/pose", True),
("ruview/+/raw/+", "ruview/aabb/raw/pose/extra", False),
# Per MQTT v5 §4.7.1.2: `+` is a whole-level wildcard only — mid-
# segment `+` is a literal `+` character, not a wildcard. The
# spec-correct way to wildcard the third segment of the HA
# discovery topic is `homeassistant/+/+/+/config`.
("homeassistant/+/+/+/config",
"homeassistant/binary_sensor/wifi_densepose_aabb/presence/config", True),
# `wifi_densepose_+` is therefore NOT a wildcard — it matches the
# literal string only. Asserting that behaviour stays stable.
("homeassistant/+/wifi_densepose_+/+/config",
"homeassistant/binary_sensor/wifi_densepose_aabb/presence/config", False),
("ruview/#", "ruview/aabb/raw/edge_vitals", True),
# Per MQTT v5 §4.7.1.2: `<prefix>/#` ALSO matches the bare
# `<prefix>` itself (it represents "this topic and all sub-topics").
("ruview/#", "ruview", True),
("ruview/+/raw/#", "ruview/aabb/raw/pose/extra", True),
("exact/topic", "exact/topic", True),
("exact/topic", "exact/topic/extra", False),
("a/b/c", "a/b", False),
])
def test_topic_matches(pattern: str, topic: str, expected: bool) -> None:
assert _topic_matches(pattern, topic) is expected
# ─── RuViewMqttClient construction ──────────────────────────────────
def test_client_constructs_with_defaults() -> None:
c = RuViewMqttClient()
assert c.broker_host == "localhost"
assert c.broker_port == 1883
assert c.connected is False
assert c.client_id.startswith("wifi-densepose-client-")
def test_client_unique_client_id_per_instance() -> None:
"""Per the rumqttc memory lesson — each instance needs a unique
client_id so parallel tests don't kick each other off the broker."""
c1 = RuViewMqttClient()
c2 = RuViewMqttClient()
assert c1.client_id != c2.client_id
def test_client_accepts_explicit_client_id() -> None:
c = RuViewMqttClient(client_id="explicit-id")
assert c.client_id == "explicit-id"
# ─── Handler registration ────────────────────────────────────────────
def test_handler_registration_stores_callback() -> None:
c = RuViewMqttClient()
seen: list[Any] = []
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: seen.append((t, p)))
# Internal state — we're allowed to inspect since the handler
# path needs to be unit-testable without a broker.
assert "ruview/+/raw/edge_vitals" in c._handlers
def test_handler_unregister_drops_callback() -> None:
c = RuViewMqttClient()
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: None)
c.unsubscribe_handler("ruview/+/raw/edge_vitals")
assert "ruview/+/raw/edge_vitals" not in c._handlers
# ─── Callback dispatch (synthesized) ─────────────────────────────────
def _fake_message(topic: str, body: Any) -> Any:
"""Synthesize a paho-mqtt MQTTMessage-ish object."""
if isinstance(body, (dict, list)):
payload_bytes = json.dumps(body).encode("utf-8")
elif isinstance(body, bytes):
payload_bytes = body
else:
payload_bytes = str(body).encode("utf-8")
return SimpleNamespace(topic=topic, payload=payload_bytes)
def test_message_dispatch_to_matching_handler() -> None:
c = RuViewMqttClient()
received: list[tuple[str, Any]] = []
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: received.append((t, p)))
msg = _fake_message(
"ruview/aabbccddeeff/raw/edge_vitals",
{"breathing_rate_bpm": 14.0, "heartrate_bpm": 72.0, "presence": True},
)
c._on_message(None, None, msg)
assert len(received) == 1
topic, payload = received[0]
assert topic == "ruview/aabbccddeeff/raw/edge_vitals"
assert payload["breathing_rate_bpm"] == 14.0
def test_message_dispatch_ignores_non_matching_topic() -> None:
c = RuViewMqttClient()
received: list[Any] = []
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: received.append(p))
msg = _fake_message("ruview/aabb/raw/pose", {"persons": []})
c._on_message(None, None, msg)
assert received == []
def test_message_dispatch_falls_back_to_bytes_on_non_json() -> None:
c = RuViewMqttClient()
received: list[Any] = []
c.on_message("custom/binary/+", lambda t, p: received.append(p))
msg = _fake_message("custom/binary/data", b"\x00\x01\x02not-json")
c._on_message(None, None, msg)
assert received == [b"\x00\x01\x02not-json"]
def test_handler_exception_does_not_propagate() -> None:
"""A misbehaving user callback must not crash the paho network
loop — exceptions are caught and logged."""
c = RuViewMqttClient()
seen_after_crash: list[Any] = []
def crashing(_topic: str, _p: Any) -> None:
raise RuntimeError("simulated callback crash")
c.on_message("crashy/topic", crashing)
c.on_message("safe/topic", lambda t, p: seen_after_crash.append(p))
# First, the crashing handler — must NOT raise out of _on_message.
c._on_message(None, None, _fake_message("crashy/topic", "anything"))
# Then the safe handler — must still fire on a subsequent message.
c._on_message(None, None, _fake_message("safe/topic", {"x": 1}))
assert seen_after_crash == [{"x": 1}]
def test_multiple_handlers_for_overlapping_patterns_all_fire() -> None:
c = RuViewMqttClient()
a_received: list[Any] = []
b_received: list[Any] = []
c.on_message("ruview/+/raw/+", lambda t, p: a_received.append(p))
c.on_message("ruview/aabb/raw/edge_vitals", lambda t, p: b_received.append(p))
msg = _fake_message("ruview/aabb/raw/edge_vitals", {"presence": True})
c._on_message(None, None, msg)
assert len(a_received) == 1
assert len(b_received) == 1
# ─── on_connect path ─────────────────────────────────────────────────
def test_on_connect_sets_event_and_subscribes() -> None:
c = RuViewMqttClient()
c.on_message("ruview/+/raw/edge_vitals", lambda t, p: None)
# Stub the paho client so we can capture subscribe() calls.
subscribed: list[str] = []
stub = SimpleNamespace(subscribe=lambda pattern: subscribed.append(pattern))
c._on_connect(stub, None, None, 0)
assert c.connected is True
assert subscribed == ["ruview/+/raw/edge_vitals"]
def test_on_connect_with_nonzero_rc_does_not_set_connected() -> None:
c = RuViewMqttClient()
stub = SimpleNamespace(subscribe=lambda pattern: None)
c._on_connect(stub, None, None, 5) # CONNACK fail
assert c.connected is False
+180
View File
@@ -0,0 +1,180 @@
"""ADR-117 P4 — Tests for the HA-MIND semantic primitive listener.
Pure routing tests — no MQTT broker needed.
"""
from __future__ import annotations
import json
from wifi_densepose.client import (
SemanticPrimitive,
SemanticPrimitiveEvent,
SemanticPrimitiveListener,
)
# ─── SemanticPrimitive enum ──────────────────────────────────────────
def test_enum_covers_all_10_v1_primitives() -> None:
expected = {
"someone_sleeping",
"possible_distress",
"room_active",
"elderly_inactivity",
"meeting_in_progress",
"bathroom_occupied",
"fall_risk_elevated",
"bed_exit",
"no_movement_safety",
"multi_room_transition",
}
actual = {p.value for p in SemanticPrimitive}
assert actual == expected
def test_enum_from_object_id_round_trips() -> None:
for p in SemanticPrimitive:
assert SemanticPrimitive.from_object_id(p.value) is p
def test_enum_from_object_id_returns_none_for_unknown() -> None:
assert SemanticPrimitive.from_object_id("garbage") is None
# ─── Listener routing ────────────────────────────────────────────────
def test_listener_dispatches_to_specific_handler() -> None:
listener = SemanticPrimitiveListener()
received: list[SemanticPrimitiveEvent] = []
listener.on(SemanticPrimitive.SomeoneSleeping, received.append)
evt = listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/someone_sleeping/state",
json.dumps({"state": "ON", "confidence": 0.92, "explanation": ["motion<5%"]}),
)
assert evt is not None
assert evt.kind is SemanticPrimitive.SomeoneSleeping
assert evt.node_id == "aabb"
assert evt.state == "ON"
assert evt.confidence == 0.92
assert evt.explanation == ("motion<5%",)
assert len(received) == 1
assert received[0] is evt
def test_listener_on_any_fires_for_every_primitive() -> None:
listener = SemanticPrimitiveListener()
seen: list[SemanticPrimitiveEvent] = []
listener.on_any(seen.append)
listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state",
json.dumps({"state": "ON"}),
)
listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/bathroom_occupied/state",
json.dumps({"state": "OFF"}),
)
assert len(seen) == 2
assert seen[0].kind is SemanticPrimitive.RoomActive
assert seen[1].kind is SemanticPrimitive.BathroomOccupied
def test_listener_specific_handler_does_not_fire_for_other_primitives() -> None:
listener = SemanticPrimitiveListener()
received: list[SemanticPrimitiveEvent] = []
listener.on(SemanticPrimitive.PossibleDistress, received.append)
listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/someone_sleeping/state",
json.dumps({"state": "ON"}),
)
assert received == []
def test_listener_decodes_plain_state_string() -> None:
"""HA convention: binary_sensors that don't carry attributes emit
plain strings ('ON' / 'OFF'). We must accept that too."""
listener = SemanticPrimitiveListener()
evt = listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state",
"ON",
)
assert evt is not None
assert evt.state == "ON"
assert evt.confidence == 0.0 # not provided in plain string
assert evt.explanation == ()
def test_listener_decodes_numeric_sensor_state() -> None:
"""fall_risk_elevated is a 0100 sensor — verify numeric string."""
listener = SemanticPrimitiveListener()
evt = listener.handle_mqtt_message(
"homeassistant/sensor/wifi_densepose_aabb/fall_risk_elevated/state",
"73",
)
assert evt is not None
assert evt.kind is SemanticPrimitive.FallRiskElevated
assert evt.state == "73"
def test_listener_decodes_bytes_payload() -> None:
listener = SemanticPrimitiveListener()
evt = listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state",
b"ON",
)
assert evt is not None
assert evt.state == "ON"
def test_listener_ignores_non_state_topics() -> None:
listener = SemanticPrimitiveListener()
assert listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/room_active/config",
json.dumps({"name": "Room Active"}),
) is None
def test_listener_ignores_unknown_slug() -> None:
listener = SemanticPrimitiveListener()
assert listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/unknown_primitive/state",
"ON",
) is None
def test_listener_ignores_non_wifi_densepose_node() -> None:
listener = SemanticPrimitiveListener()
# third segment doesn't start with wifi_densepose_
assert listener.handle_mqtt_message(
"homeassistant/binary_sensor/aqara_fp2/room_active/state",
"ON",
) is None
def test_listener_explanation_string_is_normalised_to_tuple() -> None:
"""Producers may send `explanation` as a single string by mistake;
accept that and wrap in a 1-tuple so downstream code can iterate
uniformly."""
listener = SemanticPrimitiveListener()
evt = listener.handle_mqtt_message(
"homeassistant/binary_sensor/wifi_densepose_aabb/possible_distress/state",
json.dumps({"state": "ON", "explanation": "HR=120 baseline=80"}),
)
assert evt is not None
assert evt.explanation == ("HR=120 baseline=80",)
def test_event_is_frozen() -> None:
evt = SemanticPrimitiveEvent(
kind=SemanticPrimitive.SomeoneSleeping,
node_id="aabb",
state="ON",
)
import pytest
with pytest.raises((AttributeError, Exception)): # FrozenInstanceError subclass
evt.state = "OFF" # type: ignore[misc]
+195
View File
@@ -0,0 +1,195 @@
"""ADR-117 P4 — End-to-end test for SensingClient against an in-process
WS server.
We spin up a real `websockets.serve()` server in the same event loop,
send the four message types defined in ADR-115 §1, and assert the
client decodes them into the right dataclasses. No mocks — the only
moving part this test does NOT exercise is the actual sensing-server
binary, but the wire protocol is the contract under test here.
"""
from __future__ import annotations
import asyncio
import json
from typing import Any
import pytest
import websockets
from wifi_densepose.client import (
ConnectionEstablishedMessage,
EdgeVitalsMessage,
PoseDataMessage,
SensingClient,
SensingMessage,
)
# ─── In-process WS server fixture ────────────────────────────────────
_FIXTURE_MESSAGES = [
{
"type": "connection_established",
"node_id": "test-node-001",
"version": "0.7.4",
"capabilities": ["edge_vitals", "pose_data"],
},
{
"type": "edge_vitals",
"node_id": "test-node-001",
"presence": True,
"fall_detected": False,
"motion": 0.21,
"breathing_rate_bpm": 14.5,
"heartrate_bpm": 72.3,
"n_persons": 1,
"motion_energy": 0.034,
"presence_score": 0.91,
"rssi": -42.0,
},
{
"type": "pose_data",
"node_id": "test-node-001",
"timestamp": 1700000000.5,
"persons": [{"id": 1, "keypoints": []}],
"confidence": 0.88,
},
# Unknown type — should NOT crash the stream; should yield a plain
# SensingMessage.
{
"type": "future_message_type_not_yet_modelled",
"extra": "data",
},
]
async def _handler(websocket: Any) -> None:
for msg in _FIXTURE_MESSAGES:
await websocket.send(json.dumps(msg))
# Send one malformed frame to assert the client logs+drops it
# rather than crashing the stream.
await websocket.send("{not valid json")
# And one final "real" message so the test can confirm the stream
# survived the malformed one.
await websocket.send(json.dumps({"type": "edge_vitals", "node_id": "post-bad-frame"}))
@pytest.fixture
async def ws_server() -> Any:
"""Start a websocket server on a random port; yield the bound URL."""
server = await websockets.serve(_handler, "127.0.0.1", 0)
# Get the bound port (host="127.0.0.1" returns one socket).
port = server.sockets[0].getsockname()[1] # type: ignore[union-attr]
try:
yield f"ws://127.0.0.1:{port}/ws/sensing"
finally:
server.close()
await server.wait_closed()
# ─── End-to-end stream test ──────────────────────────────────────────
async def test_sensing_client_decodes_all_message_types(ws_server: str) -> None:
received: list[SensingMessage] = []
async with SensingClient(ws_server) as client:
async for msg in client.stream():
received.append(msg)
if len(received) >= len(_FIXTURE_MESSAGES) + 1: # +1 for post-bad-frame
break
# connection_established → typed
assert isinstance(received[0], ConnectionEstablishedMessage)
assert received[0].node_id == "test-node-001"
assert received[0].version == "0.7.4"
assert "edge_vitals" in received[0].capabilities
# edge_vitals → typed with full fields
assert isinstance(received[1], EdgeVitalsMessage)
assert received[1].presence is True
assert received[1].fall_detected is False
assert received[1].breathing_rate_bpm == 14.5
assert received[1].heartrate_bpm == 72.3
assert received[1].n_persons == 1
assert received[1].rssi == -42.0
# pose_data → typed
assert isinstance(received[2], PoseDataMessage)
assert received[2].timestamp == 1700000000.5
assert len(received[2].persons) == 1
assert received[2].confidence == 0.88
# Unknown type → plain SensingMessage (forward-compat)
assert type(received[3]) is SensingMessage # exact base class
assert received[3].type == "future_message_type_not_yet_modelled"
assert received[3].raw["extra"] == "data"
# After the malformed frame: the stream should have survived and
# yielded the post-bad-frame message.
assert isinstance(received[4], EdgeVitalsMessage)
assert received[4].node_id == "post-bad-frame"
async def test_sensing_client_recv_one(ws_server: str) -> None:
async with SensingClient(ws_server) as client:
msg = await client.recv_one(timeout=2.0)
assert isinstance(msg, ConnectionEstablishedMessage)
async def test_sensing_client_raises_when_used_without_context() -> None:
client = SensingClient("ws://127.0.0.1:1/") # never connects
with pytest.raises(RuntimeError, match="not connected"):
await client.recv_one(timeout=0.1)
with pytest.raises(RuntimeError, match="not connected"):
async for _ in client.stream():
pass
async def test_sensing_client_close_is_idempotent(ws_server: str) -> None:
client = SensingClient(ws_server)
await client.__aenter__()
await client.close()
await client.close() # second close is a no-op
def test_sensing_client_decoder_directly() -> None:
"""The decoder is pure — exercise it without bringing up a WS
server, so we have a fast unit test for the type mapping."""
from wifi_densepose.client.ws import _decode
msg = _decode(json.dumps({
"type": "edge_vitals",
"node_id": "x",
"presence": True,
"fall_detected": False,
"motion": 1.5,
}))
assert isinstance(msg, EdgeVitalsMessage)
assert msg.presence is True
assert msg.motion == 1.5
assert msg.breathing_rate_bpm is None # not present → None, not 0.0
assert msg.heartrate_bpm is None
assert msg.rssi is None
def test_sensing_client_decoder_handles_None_subfields() -> None:
"""When the sensing-server explicitly emits null for HR/BR (no
measurement yet), the client should propagate None, not crash."""
from wifi_densepose.client.ws import _decode
msg = _decode(json.dumps({
"type": "edge_vitals",
"node_id": "x",
"presence": False,
"fall_detected": False,
"motion": 0.0,
"breathing_rate_bpm": None,
"heartrate_bpm": None,
"rssi": None,
}))
assert isinstance(msg, EdgeVitalsMessage)
assert msg.breathing_rate_bpm is None
assert msg.heartrate_bpm is None
assert msg.rssi is None
+93
View File
@@ -0,0 +1,93 @@
"""ADR-117 P4 — Pure-Python client layer.
This sub-package is the **client-facing** half of `wifi-densepose`:
end users who only want to *consume* live RuView telemetry (rather than
running DSP locally) get a tight, opt-in client extra:
```
pip install "wifi-densepose[client]"
```
The runtime install footprint stays small for users who only need the
compiled PyO3 surface: `websockets` and `paho-mqtt` are declared as the
`[client]` extra in `pyproject.toml` and are NOT pulled in by the
default install.
## Modules
- `ws` — `SensingClient`: asyncio WebSocket client for the
sensing-server `/ws/sensing` endpoint (ADR-115 §1)
- `mqtt` — `RuViewMqttClient`: paho-mqtt v2 wrapper for
`ruview/<node>/raw/+` + `homeassistant/+/wifi_densepose_<node>/+/+`
topics (ADR-115 §3)
- `primitives` — `SemanticPrimitiveListener`: typed view over the
10 HA-MIND semantic primitives (ADR-115 §3.12)
- `ha` — `HABlueprintHelper`: parses MQTT-discovery payloads, helps
users introspect what entities a node is publishing
No PyO3 here — this module is pure Python so it loads without the
compiled extension (useful for users who only want the client surface
and not the DSP pipeline).
"""
from __future__ import annotations
# Re-export the user-facing types. Import errors are deferred to the
# moment the user actually instantiates one of these classes — that way
# `from wifi_densepose.client import HABlueprintHelper` still works
# even if the user hasn't installed `[client]` extras yet (HABlueprint
# is pure stdlib).
from wifi_densepose.client.ha import (
HaDiscoveryPayload,
HaEntity,
HABlueprintHelper,
)
from wifi_densepose.client.primitives import (
SemanticPrimitive,
SemanticPrimitiveEvent,
SemanticPrimitiveListener,
)
__all__ = [
# ws — re-exported lazily; see module docstring
"SensingClient",
"SensingMessage",
"EdgeVitalsMessage",
"PoseDataMessage",
"ConnectionEstablishedMessage",
# mqtt — re-exported lazily; see module docstring
"RuViewMqttClient",
# ha — pure stdlib
"HaDiscoveryPayload",
"HaEntity",
"HABlueprintHelper",
# primitives — pure stdlib
"SemanticPrimitive",
"SemanticPrimitiveEvent",
"SemanticPrimitiveListener",
]
def __getattr__(name: str):
"""Lazy re-exports for the modules that pull in optional extras.
`SensingClient` needs `websockets`; `RuViewMqttClient` needs
`paho-mqtt`. Importing those at package init would make
`wifi_densepose.client` unusable without the extras installed
— defeating the point of an *optional* extra. We defer the import
until the attribute is actually looked up.
"""
if name in {
"SensingClient",
"SensingMessage",
"EdgeVitalsMessage",
"PoseDataMessage",
"ConnectionEstablishedMessage",
}:
from wifi_densepose.client import ws as _ws
return getattr(_ws, name)
if name == "RuViewMqttClient":
from wifi_densepose.client.mqtt import RuViewMqttClient as _R
return _R
raise AttributeError(f"module 'wifi_densepose.client' has no attribute {name!r}")
+194
View File
@@ -0,0 +1,194 @@
"""ADR-117 P4 — Home Assistant MQTT-discovery payload helpers.
Parses the `homeassistant/<entity_kind>/wifi_densepose_<node>/<id>/config`
discovery payloads described in ADR-115 §3 into typed Python objects so
client code can introspect what a node is publishing without
hand-parsing JSON.
This is **read-only**: we do NOT generate discovery payloads from
Python (that's the sensing-server's job). The helper exists so a
client (HA blueprint author, debugger, dashboard) can ask "what
entities does this node expose?" and get a structured answer.
Example:
```python
from wifi_densepose.client import HaDiscoveryPayload, HABlueprintHelper
helper = HABlueprintHelper()
helper.add_payload(topic, json_bytes)
for entity in helper.entities_for_node("aabbccddeeff"):
print(entity.entity_kind, entity.object_id, entity.unique_id)
```
"""
from __future__ import annotations
import json
import re
from dataclasses import dataclass, field
from typing import Any, Iterable
# ─── Topic schema ────────────────────────────────────────────────────
# Matches discovery topics like:
# homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/config
# homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/config
# homeassistant/event/wifi_densepose_aabbccddeeff/fall/config
_DISCOVERY_TOPIC_RE = re.compile(
r"^homeassistant/"
r"(?P<entity_kind>[A-Za-z_]+)/"
r"wifi_densepose_(?P<node_id>[A-Za-z0-9]+)/"
r"(?P<object_id>[A-Za-z0-9_\-]+)/"
r"config$"
)
@dataclass(frozen=True)
class HaDiscoveryPayload:
"""One MQTT discovery payload (config topic + JSON body)."""
entity_kind: str # "binary_sensor", "sensor", "event", "switch", ...
node_id: str # the node's MAC-ish identifier
object_id: str # entity slug (e.g. "presence", "heart_rate")
payload: dict[str, Any]
@property
def topic(self) -> str:
return (
f"homeassistant/{self.entity_kind}/"
f"wifi_densepose_{self.node_id}/{self.object_id}/config"
)
@dataclass(frozen=True)
class HaEntity:
"""A user-facing view of one HA entity registered by a node."""
entity_kind: str
node_id: str
object_id: str
unique_id: str = ""
name: str = ""
state_topic: str = ""
device_class: str = ""
unit_of_measurement: str = ""
icon: str = ""
json_attributes_topic: str = ""
@classmethod
def from_payload(cls, p: HaDiscoveryPayload) -> "HaEntity":
body = p.payload
return cls(
entity_kind=p.entity_kind,
node_id=p.node_id,
object_id=p.object_id,
unique_id=str(body.get("unique_id", "")),
name=str(body.get("name", "")),
state_topic=str(body.get("state_topic", "")),
device_class=str(body.get("device_class", "")),
unit_of_measurement=str(body.get("unit_of_measurement", "")),
icon=str(body.get("icon", "")),
json_attributes_topic=str(body.get("json_attributes_topic", "")),
)
def parse_discovery_topic(topic: str) -> tuple[str, str, str] | None:
"""Parse a discovery config topic into (entity_kind, node_id,
object_id). Returns None for non-discovery topics."""
m = _DISCOVERY_TOPIC_RE.match(topic)
if not m:
return None
return (m.group("entity_kind"), m.group("node_id"), m.group("object_id"))
def parse_discovery_payload(
topic: str, payload: bytes | str | dict[str, Any]
) -> HaDiscoveryPayload | None:
"""Decode an HA discovery payload. Returns None for non-discovery
topics OR malformed JSON; raises only on programmer error."""
parsed = parse_discovery_topic(topic)
if parsed is None:
return None
entity_kind, node_id, object_id = parsed
body: dict[str, Any]
if isinstance(payload, dict):
body = payload
else:
if isinstance(payload, bytes):
try:
payload = payload.decode("utf-8")
except UnicodeDecodeError:
return None
try:
decoded = json.loads(payload)
except json.JSONDecodeError:
return None
if not isinstance(decoded, dict):
return None
body = decoded
return HaDiscoveryPayload(
entity_kind=entity_kind,
node_id=node_id,
object_id=object_id,
payload=body,
)
# ─── Helper / aggregator ─────────────────────────────────────────────
class HABlueprintHelper:
"""Aggregates HA discovery payloads observed on the bus and offers
structured queries against them.
Intended use: subscribe a RuViewMqttClient to
`homeassistant/+/wifi_densepose_+/+/config`, feed every message
into `add_payload()`, then ask the helper "what entities does
node X expose?" or "what binary_sensors are presence-class?".
"""
def __init__(self) -> None:
# (node_id, entity_kind, object_id) → HaDiscoveryPayload
self._payloads: dict[tuple[str, str, str], HaDiscoveryPayload] = {}
def add_payload(self, topic: str, payload: bytes | str | dict[str, Any]) -> bool:
"""Returns True if the payload was a valid HA discovery
message and was stored; False otherwise."""
parsed = parse_discovery_payload(topic, payload)
if parsed is None:
return False
self._payloads[(parsed.node_id, parsed.entity_kind, parsed.object_id)] = parsed
return True
def remove(self, node_id: str, entity_kind: str, object_id: str) -> bool:
"""Drop a stored payload — useful when handling a discovery
retain-flag clear (HA's convention for removing an entity)."""
return self._payloads.pop((node_id, entity_kind, object_id), None) is not None
def __len__(self) -> int:
return len(self._payloads)
def __contains__(self, item: tuple[str, str, str]) -> bool:
return item in self._payloads
def all_payloads(self) -> list[HaDiscoveryPayload]:
return list(self._payloads.values())
def entities_for_node(self, node_id: str) -> list[HaEntity]:
return [
HaEntity.from_payload(p)
for p in self._payloads.values()
if p.node_id == node_id
]
def nodes(self) -> list[str]:
return sorted({p.node_id for p in self._payloads.values()})
def by_device_class(self, device_class: str) -> list[HaEntity]:
out: list[HaEntity] = []
for p in self._payloads.values():
e = HaEntity.from_payload(p)
if e.device_class == device_class:
out.append(e)
return out
+257
View File
@@ -0,0 +1,257 @@
"""ADR-117 P4 — paho-mqtt v2 wrapper for RuView MQTT topics.
Subscribes to the topic namespaces defined in ADR-115:
- `ruview/<node>/raw/edge_vitals` — opt-in firehose of the WS edge_vitals
- `ruview/<node>/raw/pose` — opt-in firehose of pose data
- `ruview/<node>/raw/sensing_update` — opt-in firehose of every sensing update
- `homeassistant/+/wifi_densepose_<node>/+/config` — HA discovery payloads
- `homeassistant/+/wifi_densepose_<node>/+/state` — HA state payloads
The client uses **paho-mqtt v2's `Client(CallbackAPIVersion.VERSION2)`**
API explicitly. v1's deprecated callback signatures will not work.
Example:
```python
from wifi_densepose.client import RuViewMqttClient
def on_edge_vitals(topic, payload):
print(topic, payload["breathing_rate_bpm"])
client = RuViewMqttClient(broker_host="localhost", broker_port=1883)
client.on_message("ruview/+/raw/edge_vitals", on_edge_vitals)
client.start()
# ... runs in a background thread; call client.stop() to disconnect
```
The constructor never connects; call `.start()` to enter the network
loop and `.stop()` to disconnect cleanly. Both are idempotent.
"""
from __future__ import annotations
import json
import logging
import threading
import uuid
from typing import Any, Callable, Optional
try:
import paho.mqtt.client as mqtt # type: ignore[import-not-found]
from paho.mqtt.enums import CallbackAPIVersion # type: ignore[import-not-found]
_PAHO_AVAILABLE = True
except ImportError: # pragma: no cover
_PAHO_AVAILABLE = False
log = logging.getLogger(__name__)
MessageHandler = Callable[[str, Any], None]
"""(topic, decoded_payload) → None. The payload is JSON-decoded if the
content is valid JSON, otherwise the raw bytes are passed through."""
class RuViewMqttClient:
"""Wrapper around paho-mqtt v2 with per-topic-pattern callbacks.
Per the rumqttc lesson [[feedback_mqtt_integration_test_patterns]]:
- Each instance gets a unique client_id (per-test isolation when
tests run in parallel against the same broker).
- Subscription wildcards (`+`, `#`) are supported by paho's
built-in matcher; we route by exact pattern match against the
registered handler.
"""
def __init__(
self,
*,
broker_host: str = "localhost",
broker_port: int = 1883,
client_id: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
keepalive: int = 60,
tls: bool = False,
) -> None:
if not _PAHO_AVAILABLE:
raise ImportError(
"RuViewMqttClient requires the `paho-mqtt` package. Install with "
"`pip install \"wifi-densepose[client]\"` to enable the client extras."
)
self.broker_host = broker_host
self.broker_port = broker_port
self.keepalive = keepalive
self._client_id = client_id or f"wifi-densepose-client-{uuid.uuid4().hex[:12]}"
self._handlers: dict[str, MessageHandler] = {}
self._handlers_lock = threading.Lock()
self._client = mqtt.Client(
callback_api_version=CallbackAPIVersion.VERSION2,
client_id=self._client_id,
clean_session=True,
)
if username is not None:
self._client.username_pw_set(username, password)
if tls:
self._client.tls_set()
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.on_disconnect = self._on_disconnect
self._started = False
self._connected_event = threading.Event()
@property
def client_id(self) -> str:
return self._client_id
@property
def connected(self) -> bool:
return self._connected_event.is_set()
# ── handler registration ─────────────────────────────────────────
def on_message(self, topic_pattern: str, handler: MessageHandler) -> None:
"""Register a handler for a topic pattern. Replaces any
previous handler for the same pattern."""
with self._handlers_lock:
self._handlers[topic_pattern] = handler
def unsubscribe_handler(self, topic_pattern: str) -> None:
with self._handlers_lock:
self._handlers.pop(topic_pattern, None)
if self._started:
self._client.unsubscribe(topic_pattern)
# ── lifecycle ────────────────────────────────────────────────────
def start(self) -> None:
"""Connect to the broker and enter the network loop in a
background thread. Idempotent."""
if self._started:
return
self._client.connect(self.broker_host, self.broker_port, self.keepalive)
self._client.loop_start()
self._started = True
def wait_connected(self, timeout: float = 5.0) -> bool:
"""Block until CONNACK has been received. Returns True on
connect, False on timeout. Mirrors the rumqttc SubAck pump
pattern but for paho's connect step."""
return self._connected_event.wait(timeout=timeout)
def stop(self) -> None:
"""Disconnect and stop the network loop. Idempotent."""
if not self._started:
return
try:
self._client.disconnect()
except Exception as e: # pragma: no cover — best-effort
log.debug("ignored mqtt disconnect error: %r", e)
try:
self._client.loop_stop()
except Exception as e: # pragma: no cover
log.debug("ignored mqtt loop_stop error: %r", e)
self._started = False
self._connected_event.clear()
def publish(
self,
topic: str,
payload: Any,
*,
qos: int = 0,
retain: bool = False,
) -> None:
"""Publish a payload. Dicts/lists are JSON-encoded; bytes pass
through; strings are encoded UTF-8."""
if isinstance(payload, (dict, list)):
data: Any = json.dumps(payload, default=str)
else:
data = payload
info = self._client.publish(topic, data, qos=qos, retain=retain)
# paho v2 returns MQTTMessageInfo; rc != MQTT_ERR_SUCCESS is a
# broker-side error we should propagate so callers don't think
# the publish succeeded.
if info.rc != mqtt.MQTT_ERR_SUCCESS:
raise RuntimeError(f"mqtt publish failed: topic={topic} rc={info.rc}")
# ── paho callbacks (v2 signatures) ───────────────────────────────
def _on_connect(self, client: Any, _userdata: Any, _flags: Any, reason_code: Any, _properties: Any = None) -> None:
# paho v2 passes ReasonCode; success is 0 ("Success" / Granted_QoS_0)
rc = int(reason_code) if hasattr(reason_code, "__int__") else reason_code
if rc == 0:
self._connected_event.set()
# Re-subscribe to all known patterns. Important after a
# reconnect — paho doesn't auto-resubscribe with
# clean_session=True.
with self._handlers_lock:
patterns = list(self._handlers.keys())
for pattern in patterns:
client.subscribe(pattern)
log.debug("mqtt CONNACK ok; subscribed to %d pattern(s)", len(patterns))
else:
log.warning("mqtt CONNACK with non-success rc=%r", reason_code)
def _on_disconnect(self, _client: Any, _userdata: Any, _flags: Any = None, reason_code: Any = None, _properties: Any = None) -> None:
self._connected_event.clear()
log.debug("mqtt disconnected rc=%r", reason_code)
def _on_message(self, _client: Any, _userdata: Any, message: Any) -> None:
topic = message.topic
# Best-effort JSON decode — fall back to raw bytes if it's not JSON.
payload: Any
try:
payload = json.loads(message.payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
payload = message.payload
with self._handlers_lock:
handlers = list(self._handlers.items())
for pattern, handler in handlers:
if _topic_matches(pattern, topic):
try:
handler(topic, payload)
except Exception as e: # never let a user callback crash the loop
log.exception("handler for pattern %r raised: %r", pattern, e)
# ── re-subscribe on demand ──────────────────────────────────────
def subscribe_registered(self) -> None:
"""Explicitly issue SUBSCRIBE for every registered handler.
Useful when you registered handlers AFTER calling start().
"""
if not self._started:
return
with self._handlers_lock:
patterns = list(self._handlers.keys())
for pattern in patterns:
self._client.subscribe(pattern)
# ─── Topic-pattern matching ──────────────────────────────────────────
def _topic_matches(pattern: str, topic: str) -> bool:
"""MQTT topic wildcard matcher.
- `+` matches exactly one topic level
- `#` matches one or more remaining levels (must be the final segment)
"""
p_parts = pattern.split("/")
t_parts = topic.split("/")
i = 0
while i < len(p_parts):
if p_parts[i] == "#":
return i == len(p_parts) - 1 and len(t_parts) >= i
if i >= len(t_parts):
return False
if p_parts[i] == "+":
i += 1
continue
if p_parts[i] != t_parts[i]:
return False
i += 1
return len(p_parts) == len(t_parts)
+222
View File
@@ -0,0 +1,222 @@
"""ADR-117 P4 — Typed listener for HA-MIND semantic primitives.
ADR-115 §3.12 defines 10 fused inference outputs that the sensing-server
publishes under the HA-DISCO MQTT namespace. This module gives clients
a typed handle on them so they can write `if event.kind ==
SemanticPrimitive.SomeoneSleeping: ...` instead of pattern-matching
strings.
The 10 v1 primitives (ADR-115 §3.12.1):
| Enum value | Topic suffix | Output kind |
|---|---|---|
| `SomeoneSleeping` | `someone_sleeping` | binary_sensor |
| `PossibleDistress` | `possible_distress` | binary_sensor + event |
| `RoomActive` | `room_active` | binary_sensor |
| `ElderlyInactivityAnomaly` | `elderly_inactivity` | binary_sensor + event |
| `MeetingInProgress` | `meeting_in_progress` | binary_sensor |
| `BathroomOccupied` | `bathroom_occupied` | binary_sensor |
| `FallRiskElevated` | `fall_risk_elevated` | sensor (0100) + event |
| `BedExit` | `bed_exit` | event |
| `NoMovementSafety` | `no_movement_safety` | binary_sensor + event |
| `MultiRoomTransition` | `multi_room_transition` | event |
"""
from __future__ import annotations
import enum
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
# ─── Enum ────────────────────────────────────────────────────────────
class SemanticPrimitive(enum.Enum):
"""One of the 10 HA-MIND fused inference outputs."""
SomeoneSleeping = "someone_sleeping"
PossibleDistress = "possible_distress"
RoomActive = "room_active"
ElderlyInactivityAnomaly = "elderly_inactivity"
MeetingInProgress = "meeting_in_progress"
BathroomOccupied = "bathroom_occupied"
FallRiskElevated = "fall_risk_elevated"
BedExit = "bed_exit"
NoMovementSafety = "no_movement_safety"
MultiRoomTransition = "multi_room_transition"
@classmethod
def from_object_id(cls, object_id: str) -> Optional["SemanticPrimitive"]:
for v in cls:
if v.value == object_id:
return v
return None
# ─── Event payload ───────────────────────────────────────────────────
@dataclass(frozen=True)
class SemanticPrimitiveEvent:
"""A single fired event for one semantic primitive.
`state` semantics depend on the primitive kind:
- binary_sensor: "ON" / "OFF"
- sensor: numeric string (e.g. "73" for fall_risk_elevated 0100)
- event: "fired" or an event-class string like "bed_exit_detected"
"""
kind: SemanticPrimitive
node_id: str
state: str
confidence: float = 0.0
explanation: tuple[str, ...] = ()
timestamp: float = 0.0
raw: dict[str, Any] = field(default_factory=dict, hash=False, compare=False)
# ─── Listener ────────────────────────────────────────────────────────
Callback = Callable[[SemanticPrimitiveEvent], None]
class SemanticPrimitiveListener:
"""Routes raw MQTT state messages to per-primitive callbacks.
Designed to plug into RuViewMqttClient:
```python
from wifi_densepose.client import (
RuViewMqttClient, SemanticPrimitive, SemanticPrimitiveListener
)
listener = SemanticPrimitiveListener()
listener.on(SemanticPrimitive.SomeoneSleeping, lambda e: print(e))
client = RuViewMqttClient()
client.on_message(
"homeassistant/+/wifi_densepose_+/+/state",
listener.handle_mqtt_message,
)
client.start()
```
The listener itself never touches MQTT — it's a pure router. You
feed it `(topic, payload)` pairs and it figures out which primitive
the topic refers to and decodes the payload.
"""
# Matches state topics for any of the 10 primitives.
# homeassistant/<kind>/wifi_densepose_<node>/<primitive_slug>/state
_SLUGS = {p.value for p in SemanticPrimitive}
def __init__(self) -> None:
self._handlers: dict[Optional[SemanticPrimitive], list[Callback]] = {}
def on(self, primitive: SemanticPrimitive, cb: Callback) -> None:
"""Register a callback for a specific primitive."""
self._handlers.setdefault(primitive, []).append(cb)
def on_any(self, cb: Callback) -> None:
"""Register a callback that fires for ALL primitives. Useful
for logging or dashboards."""
self._handlers.setdefault(None, []).append(cb)
def handle_mqtt_message(self, topic: str, payload: Any) -> Optional[SemanticPrimitiveEvent]:
"""Decode one MQTT message into a SemanticPrimitiveEvent and
fire the matching callbacks. Returns the event (or None if the
topic was not a semantic-primitive state topic)."""
parts = topic.split("/")
# Shape: homeassistant / <kind> / wifi_densepose_<node> / <slug> / state
if len(parts) != 5:
return None
if parts[0] != "homeassistant" or parts[4] != "state":
return None
node_prefix = parts[2]
if not node_prefix.startswith("wifi_densepose_"):
return None
slug = parts[3]
if slug not in self._SLUGS:
return None
primitive = SemanticPrimitive.from_object_id(slug)
if primitive is None: # pragma: no cover — guarded above
return None
node_id = node_prefix[len("wifi_densepose_"):]
event = _decode_event(primitive, node_id, payload)
# Dispatch — primitive-specific first, then "any" handlers.
for cb in self._handlers.get(primitive, ()):
cb(event)
for cb in self._handlers.get(None, ()):
cb(event)
return event
def _decode_event(
primitive: SemanticPrimitive,
node_id: str,
payload: Any,
) -> SemanticPrimitiveEvent:
"""Decode a raw state payload into a typed event.
HA state payloads come in two shapes:
1. Plain string ("ON", "OFF", "73") — used by binary_sensor/sensor
with no json_attributes_topic.
2. JSON object with `state` + `confidence` + `explanation` fields —
used by HA-MIND semantic primitives per ADR-115 §3.12.4.
Both are supported transparently.
"""
if isinstance(payload, bytes):
try:
payload = payload.decode("utf-8")
except UnicodeDecodeError:
return SemanticPrimitiveEvent(
kind=primitive, node_id=node_id, state="", raw={}
)
if isinstance(payload, dict):
body = payload
elif isinstance(payload, str):
# Try to JSON-decode; if it's not JSON, treat as a plain state string.
try:
decoded = json.loads(payload)
except json.JSONDecodeError:
return SemanticPrimitiveEvent(
kind=primitive,
node_id=node_id,
state=payload,
raw={"state": payload},
)
if isinstance(decoded, dict):
body = decoded
else:
return SemanticPrimitiveEvent(
kind=primitive,
node_id=node_id,
state=str(decoded),
raw={"state": decoded},
)
else:
return SemanticPrimitiveEvent(
kind=primitive, node_id=node_id, state=str(payload), raw={}
)
expl = body.get("explanation") or body.get("reason") or ()
if isinstance(expl, str):
expl_tuple: tuple[str, ...] = (expl,)
else:
expl_tuple = tuple(str(x) for x in expl)
return SemanticPrimitiveEvent(
kind=primitive,
node_id=node_id,
state=str(body.get("state", "")),
confidence=float(body.get("confidence", 0.0)),
explanation=expl_tuple,
timestamp=float(body.get("timestamp", 0.0)),
raw=body,
)
+256
View File
@@ -0,0 +1,256 @@
"""ADR-117 P4 — Asyncio WebSocket client for the sensing-server.
The Rust sensing-server (`v2/crates/wifi-densepose-sensing-server`)
broadcasts three structured message types over `ws://<host>:<port>/ws/sensing`:
| `type` field | Source line in main.rs | Payload shape |
|---|---|---|
| `connection_established` | 2596 | `{node_id, version, capabilities}` |
| `pose_data` | 2655 | `{node_id, timestamp, persons: [...], confidence}` |
| `edge_vitals` | 4548 | `{node_id, presence, fall_detected, motion, breathing_rate_bpm, heartrate_bpm, ...}` |
`SensingClient` is a pure-Python asyncio wrapper around `websockets>=12`
that connects, decodes JSON, and yields typed dataclasses.
Example:
```python
import asyncio
from wifi_densepose.client import SensingClient, EdgeVitalsMessage
async def main():
async with SensingClient("ws://localhost:8765/ws/sensing") as client:
async for msg in client.stream():
if isinstance(msg, EdgeVitalsMessage):
print(f"BR={msg.breathing_rate_bpm}, HR={msg.heartrate_bpm}")
asyncio.run(main())
```
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Optional
# Defer import — only fail at construction time, not at module load.
try:
import websockets # type: ignore[import-not-found]
from websockets.exceptions import ConnectionClosed # type: ignore[import-not-found]
_WEBSOCKETS_AVAILABLE = True
except ImportError: # pragma: no cover
_WEBSOCKETS_AVAILABLE = False
log = logging.getLogger(__name__)
# ─── Typed messages ──────────────────────────────────────────────────
@dataclass(frozen=True)
class SensingMessage:
"""Base class for typed sensing-server messages. The original JSON
payload is preserved in ``raw`` for forward-compatibility with
fields not yet modelled here."""
type: str
raw: dict[str, Any] = field(default_factory=dict, hash=False, compare=False)
@dataclass(frozen=True)
class ConnectionEstablishedMessage(SensingMessage):
"""First message after a successful WS handshake. Lets the client
discover the node ID and capability flags without making a separate
REST call."""
node_id: str = ""
version: str = ""
capabilities: tuple[str, ...] = ()
@dataclass(frozen=True)
class EdgeVitalsMessage(SensingMessage):
"""Vital-sign telemetry fused from the edge-vitals path
(ADR-021/ADR-110). Optional fields may be ``None`` when the
upstream channel hasn't produced a measurement yet."""
node_id: str = ""
presence: bool = False
fall_detected: bool = False
motion: float = 0.0
breathing_rate_bpm: Optional[float] = None
heartrate_bpm: Optional[float] = None
n_persons: int = 0
motion_energy: float = 0.0
presence_score: float = 0.0
rssi: Optional[float] = None
@dataclass(frozen=True)
class PoseDataMessage(SensingMessage):
"""17-keypoint pose data broadcast at the sensing-server's frame
cadence. Persons are a list of opaque dicts — typed PoseEstimate
decoding lives in the P2 bindings; the WS client passes through."""
node_id: str = ""
timestamp: float = 0.0
persons: tuple[dict[str, Any], ...] = ()
confidence: float = 0.0
# ─── Decoder ─────────────────────────────────────────────────────────
def _decode(raw_text: str) -> SensingMessage:
"""Decode a single WS frame into a typed message.
Unknown ``type`` values yield a plain ``SensingMessage`` rather
than raising — the sensing-server is on a faster release cadence
than this client, and unknown types should not break the stream.
"""
obj = json.loads(raw_text)
if not isinstance(obj, dict):
raise ValueError(f"sensing-server emitted non-dict payload: {type(obj).__name__}")
mtype = obj.get("type", "")
if mtype == "connection_established":
return ConnectionEstablishedMessage(
type=mtype,
raw=obj,
node_id=obj.get("node_id", ""),
version=obj.get("version", ""),
capabilities=tuple(obj.get("capabilities", ())),
)
if mtype == "edge_vitals":
return EdgeVitalsMessage(
type=mtype,
raw=obj,
node_id=obj.get("node_id", ""),
presence=bool(obj.get("presence", False)),
fall_detected=bool(obj.get("fall_detected", False)),
motion=float(obj.get("motion", 0.0)),
breathing_rate_bpm=(
float(obj["breathing_rate_bpm"])
if obj.get("breathing_rate_bpm") is not None else None
),
heartrate_bpm=(
float(obj["heartrate_bpm"])
if obj.get("heartrate_bpm") is not None else None
),
n_persons=int(obj.get("n_persons", 0)),
motion_energy=float(obj.get("motion_energy", 0.0)),
presence_score=float(obj.get("presence_score", 0.0)),
rssi=(float(obj["rssi"]) if obj.get("rssi") is not None else None),
)
if mtype == "pose_data":
persons = obj.get("persons", ())
return PoseDataMessage(
type=mtype,
raw=obj,
node_id=obj.get("node_id", ""),
timestamp=float(obj.get("timestamp", 0.0)),
persons=tuple(persons) if isinstance(persons, list) else (),
confidence=float(obj.get("confidence", 0.0)),
)
return SensingMessage(type=mtype, raw=obj)
# ─── Client ──────────────────────────────────────────────────────────
class SensingClient:
"""Asyncio WebSocket client for the RuView sensing-server.
Usage as async context manager:
```python
async with SensingClient("ws://localhost:8765/ws/sensing") as c:
async for msg in c.stream():
...
```
The client does NOT auto-reconnect — if you want resilience, wrap
the ``async with`` in your own retry loop. Auto-reconnect logic is
application-specific (e.g., "retry forever" for a long-running
automation vs "fail fast" for a CLI tool that should exit).
"""
def __init__(
self,
url: str,
*,
ping_interval: float = 20.0,
ping_timeout: float = 20.0,
max_size: int = 16 * 1024 * 1024,
) -> None:
if not _WEBSOCKETS_AVAILABLE:
raise ImportError(
"SensingClient requires the `websockets` package. Install with "
"`pip install \"wifi-densepose[client]\"` to enable the client extras."
)
self.url = url
self._ping_interval = ping_interval
self._ping_timeout = ping_timeout
self._max_size = max_size
self._ws: Any = None # websockets.WebSocketClientProtocol — typed Any to avoid import cost
async def __aenter__(self) -> "SensingClient":
self._ws = await websockets.connect(
self.url,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
max_size=self._max_size,
)
return self
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
await self.close()
async def close(self) -> None:
"""Idempotent connection close."""
if self._ws is not None:
try:
await self._ws.close()
except Exception as e: # pragma: no cover — best-effort close
log.debug("ignored WS close error: %r", e)
self._ws = None
async def stream(self) -> AsyncIterator[SensingMessage]:
"""Yield typed messages until the server closes the connection
or the context is exited.
Decode failures on individual frames are logged at WARN and
swallowed — a malformed frame should not terminate the stream
(the next frame may be fine)."""
if self._ws is None:
raise RuntimeError("SensingClient not connected. Use `async with` first.")
try:
async for frame in self._ws:
if isinstance(frame, bytes):
frame = frame.decode("utf-8", errors="replace")
try:
yield _decode(frame)
except (ValueError, json.JSONDecodeError) as e:
log.warning("dropping malformed sensing-server frame: %r", e)
except ConnectionClosed:
# Graceful EOF — exit the iterator normally.
return
async def send_ping(self) -> None:
"""Send an application-level ping. The sensing-server replies
with `{"type": "pong"}` (main.rs:2698)."""
if self._ws is None:
raise RuntimeError("SensingClient not connected. Use `async with` first.")
await self._ws.send(json.dumps({"type": "ping"}))
async def recv_one(self, *, timeout: Optional[float] = None) -> SensingMessage:
"""Receive a single decoded message. Convenience for short
scripts and tests that don't need an async generator."""
if self._ws is None:
raise RuntimeError("SensingClient not connected. Use `async with` first.")
if timeout is None:
frame = await self._ws.recv()
else:
frame = await asyncio.wait_for(self._ws.recv(), timeout=timeout)
if isinstance(frame, bytes):
frame = frame.decode("utf-8", errors="replace")
return _decode(frame)