research(R7): Stoer-Wagner mincut detects adversarial CSI nodes 3/3 in synthetic (#704)

Premise: in a multi-node CSI mesh, all nodes see the same physical
scene through slightly different multipath. Their per-window CSI
vectors cluster tightly under cosine similarity. An adversarial node
(replay / shift / noise injection) sits *outside* that cluster. The
Stoer-Wagner minimum cut on the inter-node similarity graph isolates
it cleanly when the cut is sharp.

Demo synthesises 4 honest nodes (one real CSI window from the paired
data + per-node Gaussian noise 6 dB below signal) and 1 adversarial
node under three attack modes. Cosine-similarity matrix, then
Stoer-Wagner mincut, then check whether partition_B is the singleton
{4} — the adversarial node.

  Attack       Mincut value   Partition_B   Isolated?
  -------      ------------   -----------   ---------
  replay       3.4513         {4}           YES
  shift        3.5724         {4}           YES
  noise        2.5586         {4}           YES

Detection rate: 3/3 = 100%.

Architectural payoff: this is the primitive that fills the stub at
. ADR-103 v0.2.0
can wire it in directly. The mincut value also becomes a continuous
'mesh trustworthiness' metric for the cog-gateway dashboard.

Honest scope: the demo uses sloppy attackers. Adaptive attackers who
have read this note can almost certainly evade by adding calibrated
noise that keeps cosine similarity above the cluster floor. The next
research step is the Stackelberg-game extension. See the
'Honest scope of this result' section in the research note.

Connections:
* R5 — top-8 saliency subcarriers are the priority list for a
  more-targeted per-subcarrier consistency check.
* R8 — same primitive likely works at lower SNR with RSSI-only
  metrics; cluster structure is preserved by the band integral.

Files:
* examples/research-sota/r7_multilink_consistency.py — pure-NumPy
  Stoer-Wagner mincut + synthetic-adversary harness.
* examples/research-sota/r7_multilink_consistency_results.json —
  full result JSON for cross-tick reproducibility.
* docs/research/sota-2026-05-22/R7-multilink-consistency.md — note.
* docs/research/sota-2026-05-22/PROGRESS.md — updated index + Done.
This commit is contained in:
rUv
2026-05-21 23:28:46 -04:00
committed by GitHub
parent d9ca9b3684
commit bb92419ccb
4 changed files with 437 additions and 0 deletions
@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""R7 — multi-link consistency detection via Stoer-Wagner-style mincut.
See docs/research/sota-2026-05-22/R7-multilink-consistency.md.
Premise: in a multi-node CSI mesh, all nodes observe the same physical
scene through slightly different channels. Their per-window CSI features
should cluster tightly under a similarity metric. If one node is
compromised (spoofed CSI, replay attack, jamming-induced corruption), its
features fall outside the cluster — and the mincut of the inter-node
similarity graph isolates it cleanly.
This demo:
1. Synthesises 4 "honest" CSI windows from one underlying scene + per-node
Gaussian noise (realistic multipath variability).
2. Synthesises 1 "adversarial" CSI window via three attack modes:
(a) replay — paste in a stale window from earlier
(b) shift — add a constant offset to every subcarrier
(c) noise — pure white noise of the same magnitude as honest CSI
3. Builds a 5×5 cross-node CSI cosine-similarity matrix.
4. Solves Stoer-Wagner mincut on the resulting graph.
5. Reports whether the mincut partition isolates the adversarial node.
No framework deps — pure NumPy.
Usage:
python examples/research-sota/r7_multilink_consistency.py \
--paired data/paired/wiflow-p7-1779210883.paired.jsonl
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import numpy as np
N_SUB, N_FRAMES = 56, 20
def load_one_window(path: Path, idx: int = 0) -> np.ndarray:
"""Pull one [56, 20] CSI window from the paired data — the scene we'll synthesise around."""
with path.open(encoding="utf-8") as f:
for i, line in enumerate(f):
if i < idx:
continue
d = json.loads(line)
shape = d.get("csi_shape", [N_SUB, N_FRAMES])
if shape == [N_SUB, N_FRAMES]:
return np.asarray(d["csi"], dtype=np.float32).reshape(N_SUB, N_FRAMES)
return None
return None
def synth_honest_nodes(base: np.ndarray, n_nodes: int = 4, noise_db: float = 6.0, seed: int = 42):
"""`n_nodes` honest observers — each sees the base scene through independent multipath
(modelled as additive Gaussian on the per-subcarrier amplitudes at `noise_db` below signal)."""
rng = np.random.default_rng(seed)
sigma = base.std() * 10 ** (-noise_db / 20.0)
return np.stack([base + rng.normal(0, sigma, size=base.shape).astype(np.float32) for _ in range(n_nodes)])
def synth_adversarial(base: np.ndarray, mode: str, replay_window: np.ndarray | None = None, seed: int = 7):
"""One adversarial observer. `mode` ∈ {replay, shift, noise}."""
rng = np.random.default_rng(seed)
if mode == "replay":
if replay_window is None:
raise ValueError("replay needs a stale window")
# Stale window with a tiny perturbation to look "fresh"
return replay_window + rng.normal(0, 0.01, size=base.shape).astype(np.float32)
if mode == "shift":
return base + 3.0 * base.std() # constant offset — gives away the attack
if mode == "noise":
return rng.normal(base.mean(), base.std(), size=base.shape).astype(np.float32)
raise ValueError(f"unknown adversarial mode: {mode}")
def cosine_sim_matrix(windows: np.ndarray) -> np.ndarray:
"""Pairwise cosine similarity on flattened windows. Returns [N, N] matrix."""
flat = windows.reshape(windows.shape[0], -1)
norms = np.linalg.norm(flat, axis=1, keepdims=True) + 1e-9
normalized = flat / norms
return normalized @ normalized.T
def stoer_wagner_mincut(W: np.ndarray) -> tuple[float, list[int]]:
"""Classical Stoer-Wagner mincut. Input: symmetric [N, N] non-negative weights.
Returns: (cut_value, partition_a_node_indices)
The algorithm:
while G has more than one node:
do a minimum-cut-phase: find the order in which nodes are added
the last node added is one side of a candidate cut; the rest is the other side
merge the last two nodes into one super-node, accumulate their weights
track the minimum candidate cut across all phases
"""
n = W.shape[0]
nodes = [{i} for i in range(n)] # start with each node a singleton
W = W.astype(np.float64).copy()
best_cut = np.inf
best_partition_b = None
while len(nodes) > 1:
# minimum-cut-phase
n_left = len(nodes)
A = [0] # start anywhere
in_A = np.zeros(n_left, dtype=bool); in_A[0] = True
weights_to_A = W[:, 0].copy()
weights_to_A[0] = -1
last, second_last = 0, 0
for _ in range(n_left - 1):
# pick the not-yet-in-A node most tightly connected to A
cand = int(np.argmax(np.where(in_A, -1, weights_to_A)))
second_last = last
last = cand
in_A[cand] = True
A.append(cand)
# update weights — add cand's edges
weights_to_A = np.where(in_A, -1, weights_to_A + W[:, cand])
# cut-of-the-phase = sum of edges from `last` to all others
cut_val = float((W[last, :].sum() - W[last, last]))
if cut_val < best_cut:
best_cut = cut_val
best_partition_b = nodes[last].copy()
# merge last + second_last
merged = nodes[last] | nodes[second_last]
# merge their rows/cols
W[second_last, :] += W[last, :]
W[:, second_last] += W[:, last]
W[second_last, second_last] = 0
# remove `last`
keep = [i for i in range(n_left) if i != last]
W = W[np.ix_(keep, keep)]
nodes = [merged if i == second_last else nodes[i] for i in keep]
partition_b = sorted(best_partition_b) if best_partition_b else []
return best_cut, partition_b
def run_scenario(base: np.ndarray, replay_window: np.ndarray, mode: str, n_honest: int = 4):
"""Run one adversarial scenario, return diagnostic info."""
honest = synth_honest_nodes(base, n_nodes=n_honest, noise_db=6.0)
adv = synth_adversarial(base, mode=mode, replay_window=replay_window)
windows = np.concatenate([honest, adv[None, ...]], axis=0) # [n_honest + 1, 56, 20]
adv_idx = n_honest # last node is the adversarial one
sim = cosine_sim_matrix(windows)
# Convert similarity → edge weight. Mincut on similarity finds the
# minimum-similarity partition, which is the *most-suspicious* split.
# Use (1 - sim) as the weight if we want to minimise dissimilarity, but
# the natural framing is: mincut over similarity-weighted graph isolates
# the node least-similar to the rest.
np.fill_diagonal(sim, 0.0)
cut_val, partition_b = stoer_wagner_mincut(sim)
detected = (set(partition_b) == {adv_idx}) or (set(range(len(windows))) - set(partition_b) == {adv_idx})
return {
"mode": mode,
"n_honest": n_honest,
"adv_idx": adv_idx,
"sim_matrix": sim.round(4).tolist(),
"mincut_value": float(cut_val),
"partition_b": partition_b,
"adv_isolated": bool(detected),
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--paired", required=True)
parser.add_argument("--out", default="examples/research-sota/r7_multilink_consistency_results.json")
args = parser.parse_args()
base = load_one_window(Path(args.paired), idx=10)
stale = load_one_window(Path(args.paired), idx=900)
if base is None or stale is None:
raise SystemExit("need at least 901 samples in the paired file")
results = {}
for mode in ["replay", "shift", "noise"]:
scenario = run_scenario(base, stale, mode=mode, n_honest=4)
results[mode] = scenario
print(f"\n=== adversarial mode: {mode} ===")
print(f" mincut value: {scenario['mincut_value']:.4f}")
print(f" partition B (less-similar side): {scenario['partition_b']}")
print(f" adversarial node isolated? {'YES' if scenario['adv_isolated'] else 'no'}")
n_detected = sum(1 for r in results.values() if r["adv_isolated"])
summary = {
"n_scenarios": len(results),
"n_detected": n_detected,
"detection_rate": n_detected / len(results),
}
print(f"\n=== summary ===")
print(f" detection rate: {n_detected}/{len(results)} = {summary['detection_rate']:.0%}")
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps({"summary": summary, "scenarios": results}, indent=2))
print(f"\nWrote {out_path}")
if __name__ == "__main__":
main()
@@ -0,0 +1,150 @@
{
"summary": {
"n_scenarios": 3,
"n_detected": 3,
"detection_rate": 1.0
},
"scenarios": {
"replay": {
"mode": "replay",
"n_honest": 4,
"adv_idx": 4,
"sim_matrix": [
[
0.0,
0.9218999743461609,
0.9277999997138977,
0.9269000291824341,
0.863099992275238
],
[
0.9218999743461609,
0.0,
0.9218999743461609,
0.9254000186920166,
0.8618999719619751
],
[
0.9277999997138977,
0.9218999743461609,
0.0,
0.9291999936103821,
0.8615999817848206
],
[
0.9269000291824341,
0.9254000186920166,
0.9291999936103821,
0.0,
0.864799976348877
],
[
0.863099992275238,
0.8618999719619751,
0.8615999817848206,
0.864799976348877,
0.0
]
],
"mincut_value": 3.451315999031067,
"partition_b": [
4
],
"adv_isolated": true
},
"shift": {
"mode": "shift",
"n_honest": 4,
"adv_idx": 4,
"sim_matrix": [
[
0.0,
0.9218999743461609,
0.9277999997138977,
0.9269000291824341,
0.8944000005722046
],
[
0.9218999743461609,
0.0,
0.9218999743461609,
0.9254000186920166,
0.8917999863624573
],
[
0.9277999997138977,
0.9218999743461609,
0.0,
0.9291999936103821,
0.8942999839782715
],
[
0.9269000291824341,
0.9254000186920166,
0.9291999936103821,
0.0,
0.8917999863624573
],
[
0.8944000005722046,
0.8917999863624573,
0.8942999839782715,
0.8917999863624573,
0.0
]
],
"mincut_value": 3.5724358558654785,
"partition_b": [
4
],
"adv_isolated": true
},
"noise": {
"mode": "noise",
"n_honest": 4,
"adv_idx": 4,
"sim_matrix": [
[
0.0,
0.9218999743461609,
0.9277999997138977,
0.9269000291824341,
0.6425999999046326
],
[
0.9218999743461609,
0.0,
0.9218999743461609,
0.9254000186920166,
0.6444000005722046
],
[
0.9277999997138977,
0.9218999743461609,
0.0,
0.9291999936103821,
0.6389999985694885
],
[
0.9269000291824341,
0.9254000186920166,
0.9291999936103821,
0.0,
0.6326000094413757
],
[
0.6425999999046326,
0.6444000005722046,
0.6389999985694885,
0.6326000094413757,
0.0
]
],
"mincut_value": 2.5585585832595825,
"partition_b": [
4
],
"adv_isolated": true
}
}
}