Files
ruvnet--RuView/scripts/record-csi-udp.py
T
ruv 15a983b555 fix(paired-data): 4 bugs corrupting/blocking camera-supervised training data (#1007)
1. record-csi-udp.py stamped LOCAL time with a 'Z' (UTC) suffix → camera/CSI disagreed
   by the UTC offset → 0 aligned pairs. Now writes true UTC via datetime.now(timezone.utc).
2. align-ground-truth.js kept empty-keypoint (non-detection) records at confidence 0,
   collapsing window avgConf below threshold → all windows rejected. Now skipped at load.
3. extractCsiMatrix silently zero-padded/truncated mixed-subcarrier frames. Now frames
   are filtered to the session's modal subcarrier count before windowing — never padded.
4. CSI/feature matrices are filled frame-major but were labeled shape [nSc, nFrames] —
   transposed. Labels corrected to [nFrames, nSc] / [nFrames, dim].

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-06-17 10:17:12 -04:00

114 lines
3.8 KiB
Python

#!/usr/bin/env python3
"""
Lightweight ESP32 CSI UDP recorder (ADR-079).
Captures raw CSI packets from ESP32 nodes over UDP and writes to JSONL.
Runs alongside collect-ground-truth.py for synchronized capture.
Usage:
python scripts/record-csi-udp.py --duration 300 --output data/recordings
"""
import argparse
import json
import os
import socket
import struct
import time
from datetime import datetime, timezone
def parse_csi_packet(data):
"""Parse ADR-018 binary CSI packet into dict."""
if len(data) < 8:
return None
# ADR-018 header: [magic(2), len(2), node_id(1), seq(1), rssi(1), channel(1), iq_data...]
# Simplified: extract what we can from the raw packet
node_id = data[4] if len(data) > 4 else 0
rssi = struct.unpack('b', bytes([data[6]]))[0] if len(data) > 6 else 0
channel = data[7] if len(data) > 7 else 0
# IQ data starts at offset 8
iq_data = data[8:] if len(data) > 8 else b''
n_subcarriers = len(iq_data) // 2 # I,Q pairs
# Compute amplitudes
amplitudes = []
for i in range(0, len(iq_data) - 1, 2):
I = struct.unpack('b', bytes([iq_data[i]]))[0]
Q = struct.unpack('b', bytes([iq_data[i + 1]]))[0]
amplitudes.append(round((I * I + Q * Q) ** 0.5, 2))
return {
"type": "raw_csi",
# true UTC, not local-time-labeled-Z (#1007 Bug 1) — e.g. "2026-06-17T01:23:45.678Z"
"timestamp": datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z"),
"ts_ns": time.time_ns(),
"node_id": node_id,
"rssi": rssi,
"channel": channel,
"subcarriers": n_subcarriers,
"amplitudes": amplitudes,
"iq_hex": iq_data.hex(),
}
def main():
parser = argparse.ArgumentParser(description="Record ESP32 CSI over UDP")
parser.add_argument("--port", type=int, default=5005, help="UDP port (default: 5005)")
parser.add_argument("--duration", type=int, default=300, help="Duration in seconds (default: 300)")
parser.add_argument("--output", default="data/recordings", help="Output directory")
args = parser.parse_args()
os.makedirs(args.output, exist_ok=True)
filename = f"csi-{int(time.time())}.csi.jsonl"
filepath = os.path.join(args.output, filename)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", args.port))
sock.settimeout(1)
print(f"Recording CSI on UDP :{args.port} for {args.duration}s")
print(f"Output: {filepath}")
count = 0
start = time.time()
nodes_seen = set()
with open(filepath, "w") as f:
try:
while time.time() - start < args.duration:
try:
data, addr = sock.recvfrom(4096)
frame = parse_csi_packet(data)
if frame:
f.write(json.dumps(frame) + "\n")
count += 1
nodes_seen.add(frame["node_id"])
if count % 500 == 0:
elapsed = time.time() - start
rate = count / elapsed
print(f" {count} frames | {rate:.0f} fps | "
f"nodes: {sorted(nodes_seen)} | "
f"{elapsed:.0f}s / {args.duration}s")
except socket.timeout:
continue
except KeyboardInterrupt:
print("\nStopped by user")
sock.close()
elapsed = time.time() - start
print(f"\n=== CSI Recording Complete ===")
print(f" Frames: {count}")
print(f" Duration: {elapsed:.0f}s")
print(f" Rate: {count / max(elapsed, 1):.0f} fps")
print(f" Nodes: {sorted(nodes_seen)}")
print(f" Output: {filepath}")
if __name__ == "__main__":
main()