feat: ADR-076 CNN spectrogram embeddings + graph transformer fusion

CSI-as-image: 64x20 subcarrier×time matrix → 224x224 → CNN → 128-dim
embedding. Same-node similarity 0.95+, cross-node 0.6-0.8.

- csi-spectrogram.js: WASM CNN embedding, ASCII visualization, Seed ingest
- mesh-graph-transformer.js: GATv2 multi-head attention over ESP32 mesh,
  fuses multi-node features, generalizes to 3+ nodes

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv
2026-04-03 00:36:38 -04:00
parent 4bb8c3303f
commit 28368b2c70
3 changed files with 1597 additions and 0 deletions
+672
View File
@@ -0,0 +1,672 @@
#!/usr/bin/env node
/**
* ADR-076: CSI Spectrogram Embedding Pipeline
*
* Converts raw CSI frames into 128-dim CNN embeddings by treating the
* subcarrier x time matrix as a grayscale spectrogram image.
*
* Modes:
* --live Listen on UDP for real-time CSI frames
* --file FILE Read from a .csi.jsonl recording
* --ascii Print ASCII spectrogram visualization
* --ingest Send 128-dim embeddings to Cognitum Seed
* --knn K Find K most similar past spectrograms
*
* Usage:
* node scripts/csi-spectrogram.js --file data/recordings/pretrain-1775182186.csi.jsonl --ascii
* node scripts/csi-spectrogram.js --live --port 5006 --ingest --seed-url https://169.254.42.1:8443
* node scripts/csi-spectrogram.js --file data/recordings/pretrain-1775182186.csi.jsonl --knn 5
*
* ADR: docs/adr/ADR-076-csi-spectrogram-embeddings.md
*/
'use strict';
const dgram = require('dgram');
const fs = require('fs');
const path = require('path');
const readline = require('readline');
const { parseArgs } = require('util');
// ---------------------------------------------------------------------------
// CLI
// ---------------------------------------------------------------------------
const { values: args } = parseArgs({
options: {
file: { type: 'string', short: 'f' },
live: { type: 'boolean', default: false },
port: { type: 'string', short: 'p', default: '5006' },
ascii: { type: 'boolean', default: false },
ingest: { type: 'boolean', default: false },
knn: { type: 'string', short: 'k' },
'seed-url': { type: 'string', default: 'https://169.254.42.1:8443' },
'seed-token': { type: 'string', default: '' },
window: { type: 'string', short: 'w', default: '20' },
stride: { type: 'string', short: 's', default: '10' },
dim: { type: 'string', short: 'd', default: '128' },
json: { type: 'boolean', default: false },
limit: { type: 'string', short: 'l' },
},
strict: true,
});
const WINDOW_SIZE = parseInt(args.window, 10); // frames per spectrogram
const STRIDE = parseInt(args.stride, 10); // frames between windows
const EMBED_DIM = parseInt(args.dim, 10); // CNN output dimension
const KNN_K = args.knn ? parseInt(args.knn, 10) : 0;
const LIMIT = args.limit ? parseInt(args.limit, 10) : Infinity;
const PORT = parseInt(args.port, 10);
const JSON_OUTPUT = args.json;
// ADR-018 packet constants
const CSI_MAGIC = 0xC5110001;
const HEADER_SIZE = 20;
// CNN input size (ruvector/cnn expects 224x224 RGB)
const CNN_INPUT_SIZE = 224;
// ASCII visualization characters (8 intensity levels)
const BARS = [' ', '\u2581', '\u2582', '\u2583', '\u2584', '\u2585', '\u2586', '\u2587', '\u2588'];
// ---------------------------------------------------------------------------
// IQ Hex Parsing
// ---------------------------------------------------------------------------
/**
* Parse iq_hex string into subcarrier amplitudes.
* Format: 4 hex chars per subcarrier (I byte + Q byte).
* @param {string} iqHex - Hex-encoded I/Q data
* @param {number} nSubcarriers - Expected number of subcarriers
* @returns {Float32Array} Amplitude per subcarrier
*/
function parseIqHex(iqHex, nSubcarriers) {
const amps = new Float32Array(nSubcarriers);
for (let sc = 0; sc < nSubcarriers; sc++) {
const offset = sc * 4;
if (offset + 4 > iqHex.length) break;
const iVal = parseInt(iqHex.substring(offset, offset + 2), 16);
const qVal = parseInt(iqHex.substring(offset + 2, offset + 4), 16);
amps[sc] = Math.sqrt(iVal * iVal + qVal * qVal);
}
return amps;
}
/**
* Parse an ADR-018 binary UDP packet into subcarrier amplitudes.
* @param {Buffer} buf - Raw UDP packet
* @returns {{ nodeId: number, rssi: number, nSubcarriers: number, amplitudes: Float32Array } | null}
*/
function parseBinaryFrame(buf) {
if (buf.length < HEADER_SIZE) return null;
const magic = buf.readUInt32LE(0);
if (magic !== CSI_MAGIC) return null;
const nodeId = buf.readUInt8(4);
const rssi = buf.readInt8(5);
const nSubcarriers = buf.readUInt16LE(6);
const payloadSize = buf.readUInt16LE(8);
if (buf.length < HEADER_SIZE + payloadSize) return null;
const amps = new Float32Array(nSubcarriers);
for (let sc = 0; sc < nSubcarriers; sc++) {
const off = HEADER_SIZE + sc * 2;
if (off + 2 > buf.length) break;
const iVal = buf[off];
const qVal = buf[off + 1];
amps[sc] = Math.sqrt(iVal * iVal + qVal * qVal);
}
return { nodeId, rssi, nSubcarriers, amplitudes: amps };
}
// ---------------------------------------------------------------------------
// Spectrogram Window
// ---------------------------------------------------------------------------
class SpectrogramWindow {
/**
* @param {number} nSubcarriers - Number of subcarriers per frame
* @param {number} windowSize - Number of time frames per window
*/
constructor(nSubcarriers, windowSize) {
this.nSubcarriers = nSubcarriers;
this.windowSize = windowSize;
/** @type {Float32Array[]} Ring buffer of amplitude vectors */
this.frames = [];
this.totalPushed = 0;
}
/** Push a new amplitude vector. */
push(amplitudes) {
if (amplitudes.length !== this.nSubcarriers) {
// Pad or truncate to expected size
const padded = new Float32Array(this.nSubcarriers);
padded.set(amplitudes.subarray(0, Math.min(amplitudes.length, this.nSubcarriers)));
this.frames.push(padded);
} else {
this.frames.push(new Float32Array(amplitudes));
}
if (this.frames.length > this.windowSize) {
this.frames.shift();
}
this.totalPushed++;
}
/** @returns {boolean} True when window is full */
isFull() {
return this.frames.length >= this.windowSize;
}
/**
* Get the subcarrier x time matrix as a flat grayscale image (0-255).
* Layout: row-major, rows = subcarriers, cols = time frames.
* @returns {{ pixels: Uint8Array, width: number, height: number }}
*/
toGrayscale() {
const h = this.nSubcarriers;
const w = this.windowSize;
const pixels = new Uint8Array(h * w);
// Find min/max across entire window for normalization
let min = Infinity;
let max = -Infinity;
for (let t = 0; t < w; t++) {
const frame = this.frames[t];
for (let sc = 0; sc < h; sc++) {
const v = frame[sc];
if (v < min) min = v;
if (v > max) max = v;
}
}
const range = max - min || 1;
for (let sc = 0; sc < h; sc++) {
for (let t = 0; t < w; t++) {
const v = this.frames[t][sc];
pixels[sc * w + t] = Math.round(255 * (v - min) / range);
}
}
return { pixels, width: w, height: h };
}
/**
* Upsample grayscale to CNN input size using nearest-neighbor interpolation.
* Replicates to 3-channel RGB as required by @ruvector/cnn.
* @returns {Uint8Array} RGB pixel data (CNN_INPUT_SIZE * CNN_INPUT_SIZE * 3)
*/
toCnnInput() {
const { pixels, width, height } = this.toGrayscale();
const out = new Uint8Array(CNN_INPUT_SIZE * CNN_INPUT_SIZE * 3);
for (let y = 0; y < CNN_INPUT_SIZE; y++) {
const srcY = Math.min(Math.floor(y * height / CNN_INPUT_SIZE), height - 1);
for (let x = 0; x < CNN_INPUT_SIZE; x++) {
const srcX = Math.min(Math.floor(x * width / CNN_INPUT_SIZE), width - 1);
const gray = pixels[srcY * width + srcX];
const dstIdx = (y * CNN_INPUT_SIZE + x) * 3;
out[dstIdx] = gray;
out[dstIdx + 1] = gray;
out[dstIdx + 2] = gray;
}
}
return out;
}
}
// ---------------------------------------------------------------------------
// ASCII Visualization
// ---------------------------------------------------------------------------
/**
* Print an ASCII spectrogram of the current window.
* Rows = subcarrier index (downsampled), columns = time.
*/
function printAsciiSpectrogram(window, meta = {}) {
const { pixels, width, height } = window.toGrayscale();
// Downsample rows to fit terminal (max 32 rows)
const maxRows = Math.min(height, 32);
const rowStep = Math.ceil(height / maxRows);
const lines = [];
lines.push(`--- Spectrogram [${height}sc x ${width}t] node=${meta.nodeId || '?'} rssi=${meta.rssi || '?'} ---`);
for (let r = 0; r < maxRows; r++) {
const sc = r * rowStep;
const label = String(sc).padStart(3);
let row = `sc${label} |`;
for (let t = 0; t < width; t++) {
const v = pixels[sc * width + t];
const level = Math.min(Math.floor(v / 29), BARS.length - 1);
row += BARS[level];
}
row += '|';
lines.push(row);
}
lines.push(` ${''.padStart(width + 2, '-')}`);
lines.push(` t=0${''.padStart(width - 6)}t=${width - 1}`);
console.log(lines.join('\n'));
}
// ---------------------------------------------------------------------------
// CNN Embedding
// ---------------------------------------------------------------------------
let cnnEmbedder = null;
let cnnInitialized = false;
/**
* Initialize the CNN embedder from vendor WASM.
*/
async function initCnn() {
if (cnnInitialized) return;
// Load WASM bindings directly to work around the CnnEmbedder wrapper bug:
// The wrapper's constructor calls `new wasm.WasmCnnEmbedder(wasmConfig)` which
// consumes (destroys) the EmbedderConfig pointer, then tries to read
// `wasmConfig.embedding_dim` from the now-null pointer. We use the WASM
// classes directly and track the dimension ourselves.
const wasmPath = path.resolve(
__dirname, '..', 'vendor', 'ruvector', 'npm', 'packages', 'ruvector-cnn'
);
const wasmModule = require(path.join(wasmPath, 'ruvector_cnn_wasm.js'));
const wasmBuffer = fs.readFileSync(path.join(wasmPath, 'ruvector_cnn_wasm_bg.wasm'));
await wasmModule.default(wasmBuffer);
const config = new wasmModule.EmbedderConfig();
config.input_size = CNN_INPUT_SIZE;
config.embedding_dim = EMBED_DIM;
config.normalize = true;
// Save dim before construction (constructor consumes config)
const savedDim = EMBED_DIM;
const inner = new wasmModule.WasmCnnEmbedder(config);
// Wrap in a compatible interface
cnnEmbedder = {
_inner: inner,
embeddingDim: savedDim,
extract(imageData, width, height) {
return new Float32Array(inner.extract(imageData, width, height));
},
cosineSimilarity(a, b) {
return inner.cosine_similarity(a, b);
},
};
cnnInitialized = true;
if (!JSON_OUTPUT) {
console.log(`[cnn] Initialized: embeddingDim=${savedDim}, inputSize=${CNN_INPUT_SIZE}x${CNN_INPUT_SIZE}`);
}
}
/**
* Extract CNN embedding from a spectrogram window.
* @param {SpectrogramWindow} window
* @returns {Float32Array} 128-dim embedding
*/
function extractEmbedding(window) {
const rgbPixels = window.toCnnInput();
return cnnEmbedder.extract(rgbPixels, CNN_INPUT_SIZE, CNN_INPUT_SIZE);
}
// ---------------------------------------------------------------------------
// Embedding Store (in-memory kNN)
// ---------------------------------------------------------------------------
class EmbeddingStore {
constructor() {
/** @type {{ embedding: Float32Array, timestamp: number, nodeId: number, windowIdx: number }[]} */
this.entries = [];
}
add(embedding, meta) {
this.entries.push({ embedding, ...meta });
}
/**
* Find k nearest neighbors by cosine similarity.
* @param {Float32Array} query
* @param {number} k
* @returns {{ index: number, similarity: number, meta: object }[]}
*/
knn(query, k) {
const scores = this.entries.map((entry, index) => ({
index,
similarity: cosineSimilarity(query, entry.embedding),
timestamp: entry.timestamp,
nodeId: entry.nodeId,
windowIdx: entry.windowIdx,
}));
scores.sort((a, b) => b.similarity - a.similarity);
return scores.slice(0, k);
}
get size() { return this.entries.length; }
}
function cosineSimilarity(a, b) {
let dot = 0, normA = 0, normB = 0;
for (let i = 0; i < a.length; i++) {
dot += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
const denom = Math.sqrt(normA) * Math.sqrt(normB);
return denom > 0 ? dot / denom : 0;
}
// ---------------------------------------------------------------------------
// Cognitum Seed Ingest
// ---------------------------------------------------------------------------
/**
* Send a 128-dim embedding to Cognitum Seed's RVF vector store.
* @param {Float32Array} embedding
* @param {object} meta
*/
async function ingestToSeed(embedding, meta) {
const seedUrl = args['seed-url'];
const token = args['seed-token'] || process.env.SEED_TOKEN;
if (!token) {
console.error('[seed] No token provided (--seed-token or $SEED_TOKEN)');
return;
}
const https = require('https');
const payload = JSON.stringify({
store: 'csi-spectrograms',
vectors: [{
id: `spectrogram-${meta.nodeId}-${meta.windowIdx}`,
values: Array.from(embedding),
metadata: {
node_id: meta.nodeId,
timestamp: meta.timestamp,
window_idx: meta.windowIdx,
rssi: meta.rssi,
subcarriers: meta.nSubcarriers,
},
}],
});
return new Promise((resolve, reject) => {
const url = new URL('/v1/vectors/upsert', seedUrl);
const req = https.request(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
'Content-Length': Buffer.byteLength(payload),
},
rejectUnauthorized: false,
}, (res) => {
let body = '';
res.on('data', (chunk) => body += chunk);
res.on('end', () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve(JSON.parse(body));
} else {
reject(new Error(`Seed HTTP ${res.statusCode}: ${body}`));
}
});
});
req.on('error', reject);
req.write(payload);
req.end();
});
}
// ---------------------------------------------------------------------------
// File Mode: Read JSONL Recording
// ---------------------------------------------------------------------------
async function processFile(filePath) {
await initCnn();
const store = new EmbeddingStore();
const windows = new Map(); // nodeId -> SpectrogramWindow
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity,
});
let frameCount = 0;
let windowCount = 0;
let lastNodeId = 0;
let lastRssi = 0;
for await (const line of rl) {
if (frameCount >= LIMIT) break;
let frame;
try {
frame = JSON.parse(line);
} catch {
continue;
}
const nodeId = frame.node_id || 0;
const nSubcarriers = frame.subcarriers || 64;
const iqHex = frame.iq_hex || '';
if (!iqHex) continue;
const amplitudes = parseIqHex(iqHex, nSubcarriers);
lastNodeId = nodeId;
lastRssi = frame.rssi || 0;
if (!windows.has(nodeId)) {
windows.set(nodeId, new SpectrogramWindow(nSubcarriers, WINDOW_SIZE));
}
const win = windows.get(nodeId);
win.push(amplitudes);
frameCount++;
// Check if this window is ready and stride condition met
if (win.isFull() && (win.totalPushed - WINDOW_SIZE) % STRIDE === 0) {
const t0 = Date.now();
const embedding = extractEmbedding(win);
const embedMs = Date.now() - t0;
const meta = {
timestamp: frame.timestamp,
nodeId,
windowIdx: windowCount,
rssi: frame.rssi || 0,
nSubcarriers,
};
store.add(embedding, meta);
if (args.ascii) {
printAsciiSpectrogram(win, { nodeId, rssi: frame.rssi });
}
if (JSON_OUTPUT) {
console.log(JSON.stringify({
type: 'embedding',
windowIdx: windowCount,
nodeId,
dim: embedding.length,
embedMs,
embedding: Array.from(embedding).map(v => +v.toFixed(6)),
}));
} else {
const embSnippet = Array.from(embedding.subarray(0, 4)).map(v => v.toFixed(4)).join(', ');
console.log(`[window ${windowCount}] node=${nodeId} embed=[${embSnippet}, ...] (${embedMs}ms)`);
}
// kNN search against previous windows
if (KNN_K > 0 && store.size > 1) {
const neighbors = store.knn(embedding, KNN_K + 1);
// Skip self (first result)
const results = neighbors.filter(n => n.windowIdx !== windowCount).slice(0, KNN_K);
if (JSON_OUTPUT) {
console.log(JSON.stringify({ type: 'knn', query: windowCount, results }));
} else {
console.log(` kNN(${KNN_K}): ${results.map(r => `w${r.windowIdx}(${r.similarity.toFixed(3)})`).join(' ')}`);
}
}
// Cognitum Seed ingest
if (args.ingest) {
try {
await ingestToSeed(embedding, meta);
if (!JSON_OUTPUT) console.log(` -> ingested to Seed`);
} catch (err) {
console.error(` -> Seed ingest failed: ${err.message}`);
}
}
windowCount++;
}
}
if (!JSON_OUTPUT) {
console.log(`\nProcessed ${frameCount} frames -> ${windowCount} spectrogram windows`);
console.log(`Store contains ${store.size} embeddings of dimension ${EMBED_DIM}`);
}
return store;
}
// ---------------------------------------------------------------------------
// Live Mode: UDP Listener
// ---------------------------------------------------------------------------
async function processLive() {
await initCnn();
const store = new EmbeddingStore();
const windows = new Map();
let windowCount = 0;
const server = dgram.createSocket('udp4');
server.on('message', async (msg, rinfo) => {
// Try binary ADR-018 format first
let parsed = parseBinaryFrame(msg);
let nodeId, nSubcarriers, amplitudes, rssi;
if (parsed) {
nodeId = parsed.nodeId;
nSubcarriers = parsed.nSubcarriers;
amplitudes = parsed.amplitudes;
rssi = parsed.rssi;
} else {
// Try JSONL format
try {
const frame = JSON.parse(msg.toString());
nodeId = frame.node_id || 0;
nSubcarriers = frame.subcarriers || 64;
amplitudes = parseIqHex(frame.iq_hex || '', nSubcarriers);
rssi = frame.rssi || 0;
} catch {
return; // Unknown format
}
}
if (!windows.has(nodeId)) {
windows.set(nodeId, new SpectrogramWindow(nSubcarriers, WINDOW_SIZE));
}
const win = windows.get(nodeId);
win.push(amplitudes);
if (win.isFull() && (win.totalPushed - WINDOW_SIZE) % STRIDE === 0) {
const t0 = Date.now();
const embedding = extractEmbedding(win);
const embedMs = Date.now() - t0;
const meta = {
timestamp: Date.now() / 1000,
nodeId,
windowIdx: windowCount,
rssi,
nSubcarriers,
};
store.add(embedding, meta);
if (args.ascii) {
printAsciiSpectrogram(win, { nodeId, rssi });
}
if (JSON_OUTPUT) {
console.log(JSON.stringify({
type: 'embedding',
windowIdx: windowCount,
nodeId,
dim: embedding.length,
embedMs,
embedding: Array.from(embedding).map(v => +v.toFixed(6)),
}));
} else {
const embSnippet = Array.from(embedding.subarray(0, 4)).map(v => v.toFixed(4)).join(', ');
console.log(`[window ${windowCount}] node=${nodeId} rssi=${rssi} embed=[${embSnippet}, ...] (${embedMs}ms)`);
}
if (KNN_K > 0 && store.size > 1) {
const neighbors = store.knn(embedding, KNN_K + 1);
const results = neighbors.filter(n => n.windowIdx !== windowCount).slice(0, KNN_K);
if (!JSON_OUTPUT) {
console.log(` kNN(${KNN_K}): ${results.map(r => `w${r.windowIdx}(${r.similarity.toFixed(3)})`).join(' ')}`);
}
}
if (args.ingest) {
try {
await ingestToSeed(embedding, meta);
} catch (err) {
console.error(` -> Seed ingest failed: ${err.message}`);
}
}
windowCount++;
}
});
server.on('listening', () => {
const addr = server.address();
console.log(`[live] Listening for CSI on UDP ${addr.address}:${addr.port}`);
console.log(`[live] Window: ${WINDOW_SIZE} frames, stride: ${STRIDE}, embed dim: ${EMBED_DIM}`);
if (KNN_K > 0) console.log(`[live] kNN search: k=${KNN_K}`);
if (args.ingest) console.log(`[live] Ingesting to Cognitum Seed at ${args['seed-url']}`);
});
server.bind(PORT);
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main() {
if (!args.file && !args.live) {
console.error('Usage: node scripts/csi-spectrogram.js --file <path> [--ascii] [--knn K]');
console.error(' node scripts/csi-spectrogram.js --live [--port 5006] [--ingest]');
process.exit(1);
}
if (args.file) {
const filePath = path.resolve(args.file);
if (!fs.existsSync(filePath)) {
console.error(`File not found: ${filePath}`);
process.exit(1);
}
await processFile(filePath);
} else {
await processLive();
}
}
main().catch((err) => {
console.error('Fatal:', err);
process.exit(1);
});
+666
View File
@@ -0,0 +1,666 @@
#!/usr/bin/env node
/**
* ADR-076: Multi-Node Graph Transformer for CSI Fusion
*
* Builds a graph from multiple ESP32 nodes and applies graph attention to
* fuse their CSI feature vectors (either 8-dim hand-crafted or 128-dim CNN)
* into a single multi-viewpoint representation.
*
* The graph structure:
* - Each ESP32 node = graph node with a feature vector
* - Edge between nodes weighted by cross-node correlation
* - Attention learns which node to trust more per prediction
*
* Modes:
* --live Listen on UDP for real-time multi-node CSI
* --file FILE Read from a .csi.jsonl recording with multiple node_ids
* --dim DIM Feature dimension (8 for hand-crafted, 128 for CNN)
* --heads H Number of attention heads (default: 4)
* --json JSON output
*
* Usage:
* node scripts/mesh-graph-transformer.js --file data/recordings/pretrain-1775182186.csi.jsonl
* node scripts/mesh-graph-transformer.js --live --port 5006 --dim 128
*
* ADR: docs/adr/ADR-076-csi-spectrogram-embeddings.md
*/
'use strict';
const dgram = require('dgram');
const fs = require('fs');
const path = require('path');
const readline = require('readline');
const { parseArgs } = require('util');
// ---------------------------------------------------------------------------
// CLI
// ---------------------------------------------------------------------------
const { values: args } = parseArgs({
options: {
file: { type: 'string', short: 'f' },
live: { type: 'boolean', default: false },
port: { type: 'string', short: 'p', default: '5006' },
dim: { type: 'string', short: 'd', default: '8' },
heads: { type: 'string', short: 'h', default: '4' },
window: { type: 'string', short: 'w', default: '20' },
json: { type: 'boolean', default: false },
limit: { type: 'string', short: 'l' },
},
strict: true,
});
const FEAT_DIM = parseInt(args.dim, 10);
const NUM_HEADS = parseInt(args.heads, 10);
const WINDOW_SIZE = parseInt(args.window, 10);
const PORT = parseInt(args.port, 10);
const LIMIT = args.limit ? parseInt(args.limit, 10) : Infinity;
const JSON_OUTPUT = args.json;
// ADR-018 packet constants
const CSI_MAGIC = 0xC5110001;
const HEADER_SIZE = 20;
// ---------------------------------------------------------------------------
// IQ Parsing (shared with csi-spectrogram.js)
// ---------------------------------------------------------------------------
function parseIqHex(iqHex, nSubcarriers) {
const amps = new Float32Array(nSubcarriers);
for (let sc = 0; sc < nSubcarriers; sc++) {
const offset = sc * 4;
if (offset + 4 > iqHex.length) break;
const iVal = parseInt(iqHex.substring(offset, offset + 2), 16);
const qVal = parseInt(iqHex.substring(offset + 2, offset + 4), 16);
amps[sc] = Math.sqrt(iVal * iVal + qVal * qVal);
}
return amps;
}
// ---------------------------------------------------------------------------
// 8-dim Hand-Crafted Feature Extraction
// ---------------------------------------------------------------------------
/**
* Extract 8-dim feature vector from subcarrier amplitudes.
* Matches the features used by seed_csi_bridge.py (ADR-069).
* @param {Float32Array} amplitudes
* @param {number} rssi
* @returns {Float32Array}
*/
function extract8DimFeatures(amplitudes, rssi) {
const n = amplitudes.length;
if (n === 0) return new Float32Array(8);
let sum = 0, sumSq = 0, maxAmp = 0;
for (let i = 0; i < n; i++) {
const v = amplitudes[i];
sum += v;
sumSq += v * v;
if (v > maxAmp) maxAmp = v;
}
const mean = sum / n;
const variance = sumSq / n - mean * mean;
// Phase: approximate from I/Q sign pattern (simplified)
const phaseMean = 0; // Would need raw I/Q for true phase
const phaseVariance = 0;
// Bandwidth: number of subcarriers above noise floor
const noiseFloor = mean * 0.1;
let bw = 0;
for (let i = 0; i < n; i++) {
if (amplitudes[i] > noiseFloor) bw++;
}
// Spectral centroid
let weightedSum = 0;
for (let i = 0; i < n; i++) {
weightedSum += i * amplitudes[i];
}
const centroid = sum > 0 ? weightedSum / sum : n / 2;
return new Float32Array([
mean,
variance,
maxAmp,
phaseMean,
phaseVariance,
bw / n, // normalized bandwidth
centroid / n, // normalized centroid
Math.abs(rssi) / 100, // normalized RSSI
]);
}
// ---------------------------------------------------------------------------
// Graph Attention Layer (Pure JS, no WASM dependency)
// ---------------------------------------------------------------------------
/**
* Multi-head graph attention network (GATv2-style).
*
* For a graph with N nodes each having D-dimensional features:
* 1. Project features to Q, K, V using learned weights
* 2. Compute attention scores with edge weight bias
* 3. Aggregate via softmax-weighted sum
* 4. Produce fused D-dimensional output
*/
class GraphAttentionLayer {
/**
* @param {number} inputDim - Feature dimension per node
* @param {number} numHeads - Number of attention heads
*/
constructor(inputDim, numHeads) {
this.inputDim = inputDim;
this.numHeads = numHeads;
this.headDim = Math.max(1, Math.floor(inputDim / numHeads));
// Initialize projection weights (Xavier uniform)
this.Wq = this._initWeights(inputDim, this.headDim * numHeads);
this.Wk = this._initWeights(inputDim, this.headDim * numHeads);
this.Wv = this._initWeights(inputDim, this.headDim * numHeads);
this.Wo = this._initWeights(this.headDim * numHeads, inputDim);
// Edge weight bias scale
this.edgeBiasScale = 0.5;
}
/** Xavier-uniform weight initialization. */
_initWeights(rows, cols) {
const limit = Math.sqrt(6 / (rows + cols));
const w = new Float32Array(rows * cols);
for (let i = 0; i < w.length; i++) {
w[i] = (Math.random() * 2 - 1) * limit;
}
return { data: w, rows, cols };
}
/** Matrix-vector multiply: out = W * x. */
_matvec(W, x) {
const out = new Float32Array(W.rows);
for (let r = 0; r < W.rows; r++) {
let sum = 0;
for (let c = 0; c < W.cols; c++) {
sum += W.data[r * W.cols + c] * x[c];
}
out[r] = sum;
}
return out;
}
/**
* Compute attention-fused output for a set of nodes.
*
* @param {Float32Array[]} nodeFeatures - Array of D-dim feature vectors, one per node
* @param {Map<string, number>} edgeWeights - Map of "i-j" -> weight (cross-correlation)
* @returns {{ fused: Float32Array, attentionWeights: number[][] }}
*/
forward(nodeFeatures, edgeWeights) {
const N = nodeFeatures.length;
if (N === 0) return { fused: new Float32Array(this.inputDim), attentionWeights: [] };
if (N === 1) return { fused: new Float32Array(nodeFeatures[0]), attentionWeights: [[1.0]] };
const D = this.headDim;
const H = this.numHeads;
// Project to Q, K, V for each node
const queries = nodeFeatures.map(f => this._matvec(this.Wq, f));
const keys = nodeFeatures.map(f => this._matvec(this.Wk, f));
const values = nodeFeatures.map(f => this._matvec(this.Wv, f));
// Compute per-head attention scores with edge bias
const scale = 1 / Math.sqrt(D);
const allAttentionWeights = [];
// Aggregate output per node (we produce a fused vector for each node)
const nodeOutputs = [];
for (let i = 0; i < N; i++) {
const headOutputs = [];
for (let h = 0; h < H; h++) {
const hOff = h * D;
// Compute attention scores from node i to all other nodes
const scores = new Float32Array(N);
for (let j = 0; j < N; j++) {
let dot = 0;
for (let d = 0; d < D; d++) {
dot += queries[i][hOff + d] * keys[j][hOff + d];
}
// Add edge weight bias
const edgeKey = i < j ? `${i}-${j}` : `${j}-${i}`;
const ew = edgeWeights.get(edgeKey) || 0;
scores[j] = dot * scale + ew * this.edgeBiasScale;
}
// Softmax
let maxScore = -Infinity;
for (let j = 0; j < N; j++) {
if (scores[j] > maxScore) maxScore = scores[j];
}
let sumExp = 0;
const attn = new Float32Array(N);
for (let j = 0; j < N; j++) {
attn[j] = Math.exp(scores[j] - maxScore);
sumExp += attn[j];
}
for (let j = 0; j < N; j++) {
attn[j] /= sumExp;
}
if (i === 0 && h === 0) {
allAttentionWeights.push(Array.from(attn));
}
// Weighted sum of values
const headOut = new Float32Array(D);
for (let j = 0; j < N; j++) {
for (let d = 0; d < D; d++) {
headOut[d] += attn[j] * values[j][hOff + d];
}
}
headOutputs.push(headOut);
}
// Concatenate heads
const concat = new Float32Array(H * D);
for (let h = 0; h < H; h++) {
concat.set(headOutputs[h], h * D);
}
// Project back to input dimension
nodeOutputs.push(this._matvec(this.Wo, concat));
}
// Fuse all node outputs via mean pooling
const fused = new Float32Array(this.inputDim);
for (let i = 0; i < N; i++) {
for (let d = 0; d < this.inputDim; d++) {
fused[d] += nodeOutputs[i][d] / N;
}
}
return { fused, attentionWeights: allAttentionWeights };
}
}
// ---------------------------------------------------------------------------
// Cross-Node Correlation
// ---------------------------------------------------------------------------
/**
* Compute Pearson correlation between two amplitude vectors.
* Used as edge weight in the graph.
*/
function pearsonCorrelation(a, b) {
const n = Math.min(a.length, b.length);
if (n === 0) return 0;
let sumA = 0, sumB = 0, sumAB = 0, sumA2 = 0, sumB2 = 0;
for (let i = 0; i < n; i++) {
sumA += a[i];
sumB += b[i];
sumAB += a[i] * b[i];
sumA2 += a[i] * a[i];
sumB2 += b[i] * b[i];
}
const num = n * sumAB - sumA * sumB;
const den = Math.sqrt((n * sumA2 - sumA * sumA) * (n * sumB2 - sumB * sumB));
return den > 0 ? num / den : 0;
}
// ---------------------------------------------------------------------------
// Graph Builder
// ---------------------------------------------------------------------------
/**
* Build and maintain a graph of ESP32 nodes.
* Stores the latest feature vector per node and computes edge weights.
*/
class MeshGraph {
constructor(featDim, numHeads) {
this.featDim = featDim;
/** @type {Map<number, { features: Float32Array, amplitudes: Float32Array, rssi: number, timestamp: number }>} */
this.nodes = new Map();
this.attention = new GraphAttentionLayer(featDim, numHeads);
this.fusionCount = 0;
}
/**
* Update a node's features.
* @param {number} nodeId
* @param {Float32Array} features - D-dim feature vector
* @param {Float32Array} amplitudes - Raw subcarrier amplitudes (for cross-correlation)
* @param {number} rssi
* @param {number} timestamp
*/
updateNode(nodeId, features, amplitudes, rssi, timestamp) {
this.nodes.set(nodeId, { features, amplitudes, rssi, timestamp });
}
/**
* Compute edge weights between all node pairs.
* @returns {Map<string, number>}
*/
computeEdgeWeights() {
const weights = new Map();
const nodeIds = Array.from(this.nodes.keys()).sort();
for (let i = 0; i < nodeIds.length; i++) {
for (let j = i + 1; j < nodeIds.length; j++) {
const a = this.nodes.get(nodeIds[i]);
const b = this.nodes.get(nodeIds[j]);
const corr = pearsonCorrelation(a.amplitudes, b.amplitudes);
weights.set(`${i}-${j}`, corr);
}
}
return weights;
}
/**
* Run graph attention to produce a fused feature vector.
* @returns {{ fused: Float32Array, attentionWeights: number[][], nodeIds: number[], edgeWeights: Map<string, number> } | null}
*/
fuse() {
if (this.nodes.size < 2) return null;
const nodeIds = Array.from(this.nodes.keys()).sort();
const features = nodeIds.map(id => this.nodes.get(id).features);
const edgeWeights = this.computeEdgeWeights();
const { fused, attentionWeights } = this.attention.forward(features, edgeWeights);
this.fusionCount++;
return { fused, attentionWeights, nodeIds, edgeWeights };
}
/** Pretty-print graph state. */
toString() {
const nodeIds = Array.from(this.nodes.keys()).sort();
const lines = [`Graph: ${nodeIds.length} nodes [${nodeIds.join(', ')}]`];
if (nodeIds.length >= 2) {
const edgeWeights = this.computeEdgeWeights();
for (const [key, weight] of edgeWeights) {
const [i, j] = key.split('-').map(Number);
lines.push(` Edge ${nodeIds[i]}->${nodeIds[j]}: correlation=${weight.toFixed(4)}`);
}
}
return lines.join('\n');
}
}
// ---------------------------------------------------------------------------
// Optional: Graph-WASM Visualization
// ---------------------------------------------------------------------------
let graphDb = null;
/**
* Initialize @ruvector/graph-wasm for persistent graph storage.
* Optional -- only used if the WASM file exists.
*/
async function initGraphDb() {
try {
const graphWasmPath = path.resolve(
__dirname, '..', 'vendor', 'ruvector', 'npm', 'packages', 'graph-wasm'
);
const graphWasm = require(graphWasmPath);
await graphWasm.default();
graphDb = new graphWasm.GraphDB('cosine');
if (!JSON_OUTPUT) console.log('[graph-wasm] Initialized persistent graph DB');
return true;
} catch {
if (!JSON_OUTPUT) console.log('[graph-wasm] Not available, using in-memory graph only');
return false;
}
}
/**
* Persist the mesh graph to @ruvector/graph-wasm.
* @param {MeshGraph} mesh
* @param {object} fusionResult
*/
function persistToGraphDb(mesh, fusionResult) {
if (!graphDb) return;
const { nodeIds, edgeWeights, fused, attentionWeights } = fusionResult;
// Create/update nodes
for (const nodeId of nodeIds) {
const node = mesh.nodes.get(nodeId);
const existingId = `esp32-node-${nodeId}`;
try { graphDb.deleteNode(existingId); } catch { /* ignore */ }
graphDb.createNode(['ESP32', 'SensingNode'], {
id: existingId,
node_id: nodeId,
rssi: node.rssi,
timestamp: node.timestamp,
feature_dim: mesh.featDim,
});
}
// Create edges with correlation weights
for (const [key, weight] of edgeWeights) {
const [i, j] = key.split('-').map(Number);
try {
graphDb.createEdge(
`esp32-node-${nodeIds[i]}`,
`esp32-node-${nodeIds[j]}`,
'CSI_CORRELATION',
{ weight, fusion_count: mesh.fusionCount }
);
} catch { /* ignore duplicate edges */ }
}
}
// ---------------------------------------------------------------------------
// File Mode
// ---------------------------------------------------------------------------
async function processFile(filePath) {
await initGraphDb();
const mesh = new MeshGraph(FEAT_DIM, NUM_HEADS);
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity,
});
let frameCount = 0;
let fusionCount = 0;
const nodeFrameCounts = new Map();
for await (const line of rl) {
if (frameCount >= LIMIT) break;
let frame;
try {
frame = JSON.parse(line);
} catch {
continue;
}
const nodeId = frame.node_id || 0;
const nSubcarriers = frame.subcarriers || 64;
const iqHex = frame.iq_hex || '';
if (!iqHex) continue;
const amplitudes = parseIqHex(iqHex, nSubcarriers);
const rssi = frame.rssi || 0;
// Extract feature vector based on configured dimension
let features;
if (FEAT_DIM === 8) {
features = extract8DimFeatures(amplitudes, rssi);
} else {
// For CNN embeddings, we need the csi-spectrogram.js pipeline.
// In file mode without CNN, use padded 8-dim features as a placeholder.
const base = extract8DimFeatures(amplitudes, rssi);
features = new Float32Array(FEAT_DIM);
features.set(base.subarray(0, Math.min(8, FEAT_DIM)));
}
mesh.updateNode(nodeId, features, amplitudes, rssi, frame.timestamp || 0);
frameCount++;
const nc = (nodeFrameCounts.get(nodeId) || 0) + 1;
nodeFrameCounts.set(nodeId, nc);
// Attempt fusion every WINDOW_SIZE frames (when we have data from multiple nodes)
if (frameCount % WINDOW_SIZE === 0 && mesh.nodes.size >= 2) {
const result = mesh.fuse();
if (result) {
fusionCount++;
persistToGraphDb(mesh, result);
if (JSON_OUTPUT) {
console.log(JSON.stringify({
type: 'fusion',
fusionIdx: fusionCount,
nodeIds: result.nodeIds,
edgeWeights: Object.fromEntries(result.edgeWeights),
attentionWeights: result.attentionWeights,
fused: Array.from(result.fused).map(v => +v.toFixed(6)),
}));
} else {
console.log(`\n[fusion ${fusionCount}] ${mesh.toString()}`);
if (result.attentionWeights.length > 0) {
const aw = result.attentionWeights[0].map(w => w.toFixed(3));
console.log(` Attention (head 0): [${aw.join(', ')}]`);
}
const fusedSnippet = Array.from(result.fused.subarray(0, 4)).map(v => v.toFixed(4)).join(', ');
console.log(` Fused: [${fusedSnippet}, ...] (dim=${FEAT_DIM})`);
}
}
}
}
if (!JSON_OUTPUT) {
console.log(`\nProcessed ${frameCount} frames from ${nodeFrameCounts.size} nodes`);
console.log(`Produced ${fusionCount} fusions with ${NUM_HEADS}-head attention`);
for (const [nodeId, count] of nodeFrameCounts) {
console.log(` Node ${nodeId}: ${count} frames`);
}
if (graphDb) {
const stats = graphDb.stats();
console.log(`Graph DB: ${stats.nodeCount} nodes, ${stats.edgeCount} edges`);
}
}
}
// ---------------------------------------------------------------------------
// Live Mode
// ---------------------------------------------------------------------------
async function processLive() {
await initGraphDb();
const mesh = new MeshGraph(FEAT_DIM, NUM_HEADS);
let frameCount = 0;
let fusionCount = 0;
const server = dgram.createSocket('udp4');
server.on('message', (msg) => {
let nodeId, nSubcarriers, amplitudes, rssi;
// Try binary ADR-018 format
if (msg.length >= HEADER_SIZE && msg.readUInt32LE(0) === CSI_MAGIC) {
nodeId = msg.readUInt8(4);
rssi = msg.readInt8(5);
nSubcarriers = msg.readUInt16LE(6);
amplitudes = new Float32Array(nSubcarriers);
for (let sc = 0; sc < nSubcarriers; sc++) {
const off = HEADER_SIZE + sc * 2;
if (off + 2 > msg.length) break;
amplitudes[sc] = Math.sqrt(msg[off] ** 2 + msg[off + 1] ** 2);
}
} else {
// Try JSONL
try {
const frame = JSON.parse(msg.toString());
nodeId = frame.node_id || 0;
nSubcarriers = frame.subcarriers || 64;
amplitudes = parseIqHex(frame.iq_hex || '', nSubcarriers);
rssi = frame.rssi || 0;
} catch {
return;
}
}
let features;
if (FEAT_DIM === 8) {
features = extract8DimFeatures(amplitudes, rssi);
} else {
const base = extract8DimFeatures(amplitudes, rssi);
features = new Float32Array(FEAT_DIM);
features.set(base.subarray(0, Math.min(8, FEAT_DIM)));
}
mesh.updateNode(nodeId, features, amplitudes, rssi, Date.now() / 1000);
frameCount++;
if (frameCount % WINDOW_SIZE === 0 && mesh.nodes.size >= 2) {
const result = mesh.fuse();
if (result) {
fusionCount++;
persistToGraphDb(mesh, result);
if (JSON_OUTPUT) {
console.log(JSON.stringify({
type: 'fusion',
fusionIdx: fusionCount,
nodeIds: result.nodeIds,
edgeWeights: Object.fromEntries(result.edgeWeights),
attentionWeights: result.attentionWeights,
fused: Array.from(result.fused).map(v => +v.toFixed(6)),
}));
} else {
console.log(`[fusion ${fusionCount}] nodes=${result.nodeIds.join(',')}` +
` corr=${Array.from(result.edgeWeights.values()).map(v => v.toFixed(3)).join(',')}`);
}
}
}
});
server.on('listening', () => {
const addr = server.address();
console.log(`[live] Mesh graph transformer on UDP ${addr.address}:${addr.port}`);
console.log(`[live] Feature dim: ${FEAT_DIM}, heads: ${NUM_HEADS}, window: ${WINDOW_SIZE}`);
});
server.bind(PORT);
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main() {
if (!args.file && !args.live) {
console.error('Usage: node scripts/mesh-graph-transformer.js --file <path> [--dim 8|128] [--heads 4]');
console.error(' node scripts/mesh-graph-transformer.js --live [--port 5006] [--dim 128]');
process.exit(1);
}
if (args.file) {
const filePath = path.resolve(args.file);
if (!fs.existsSync(filePath)) {
console.error(`File not found: ${filePath}`);
process.exit(1);
}
await processFile(filePath);
} else {
await processLive();
}
}
main().catch((err) => {
console.error('Fatal:', err);
process.exit(1);
});