Files
ruvnet--RuView/archive/v1/tests/fixtures/csi_data.py
T
rUv 81cc241b9e chore(repo): move v1/ → archive/v1/ + add archive/README.md (#430)
The Rust port at v2/ has been the primary codebase since the rename
in #427. The Python implementation at v1/ is no longer the active
target; the only load-bearing path is the deterministic proof bundle
at v1/data/proof/ (per ADR-011 / ADR-028 witness verification).

Move the whole Python tree into archive/v1/ and document the policy
in archive/README.md: no new features, bug fixes only when they affect
a still-load-bearing path (currently just the proof), CI continues to
verify the proof on every push and PR.

Path references updated in 26 files via path-pattern sed (only
matches v1/<known-child> patterns, never bare v1 or API URLs like
/api/v1/). Two double-prefix typos (archive/archive/v1/) caught and
hand-fixed in verify-pipeline.yml and ADR-011.

Validated:
- Python proof verify.py imports cleanly at archive/v1/data/proof/
  (numpy/scipy still required; CI installs requirements-lock.txt
  from archive/v1/ now)
- cargo test --workspace --no-default-features → 1,539 passed,
  0 failed, 8 ignored (unaffected by Python tree relocation)
- ESP32-S3 on COM7 untouched (no firmware paths changed)

After-merge: contributors should re-run any local `python v1/...`
commands as `python archive/v1/...` (CLAUDE.md and CHANGELOG already
updated).
2026-04-25 23:07:52 -04:00

487 lines
18 KiB
Python

"""
Test data generation utilities for CSI data.
Provides realistic CSI data samples for testing pose estimation pipeline.
"""
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
import json
import random
class CSIDataGenerator:
"""Generate realistic CSI data for testing."""
def __init__(self,
frequency: float = 5.8e9,
bandwidth: float = 80e6,
num_antennas: int = 4,
num_subcarriers: int = 64):
self.frequency = frequency
self.bandwidth = bandwidth
self.num_antennas = num_antennas
self.num_subcarriers = num_subcarriers
self.sample_rate = 1000 # Hz
self.noise_level = 0.1
# Pre-computed patterns for different scenarios
self._initialize_patterns()
def _initialize_patterns(self):
"""Initialize CSI patterns for different scenarios."""
# Empty room pattern (baseline)
self.empty_room_pattern = {
"amplitude_mean": 0.3,
"amplitude_std": 0.05,
"phase_variance": 0.1,
"temporal_stability": 0.95
}
# Single person patterns
self.single_person_patterns = {
"standing": {
"amplitude_mean": 0.5,
"amplitude_std": 0.08,
"phase_variance": 0.2,
"temporal_stability": 0.85,
"movement_frequency": 0.1
},
"walking": {
"amplitude_mean": 0.6,
"amplitude_std": 0.15,
"phase_variance": 0.4,
"temporal_stability": 0.6,
"movement_frequency": 2.0
},
"sitting": {
"amplitude_mean": 0.4,
"amplitude_std": 0.06,
"phase_variance": 0.15,
"temporal_stability": 0.9,
"movement_frequency": 0.05
},
"fallen": {
"amplitude_mean": 0.35,
"amplitude_std": 0.04,
"phase_variance": 0.08,
"temporal_stability": 0.95,
"movement_frequency": 0.02
}
}
# Multi-person patterns
self.multi_person_patterns = {
2: {"amplitude_multiplier": 1.4, "phase_complexity": 1.6},
3: {"amplitude_multiplier": 1.7, "phase_complexity": 2.1},
4: {"amplitude_multiplier": 2.0, "phase_complexity": 2.8}
}
def generate_empty_room_sample(self, timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""Generate CSI sample for empty room."""
if timestamp is None:
timestamp = datetime.utcnow()
pattern = self.empty_room_pattern
# Generate amplitude matrix
amplitude = np.random.normal(
pattern["amplitude_mean"],
pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
amplitude = np.clip(amplitude, 0, 1)
# Generate phase matrix
phase = np.random.uniform(
-np.pi, np.pi,
(self.num_antennas, self.num_subcarriers)
)
# Add temporal stability
if hasattr(self, '_last_empty_sample'):
stability = pattern["temporal_stability"]
amplitude = stability * self._last_empty_sample["amplitude"] + (1 - stability) * amplitude
phase = stability * self._last_empty_sample["phase"] + (1 - stability) * phase
sample = {
"timestamp": timestamp.isoformat(),
"router_id": "router_001",
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
"frequency": self.frequency,
"bandwidth": self.bandwidth,
"num_antennas": self.num_antennas,
"num_subcarriers": self.num_subcarriers,
"sample_rate": self.sample_rate,
"scenario": "empty_room",
"signal_quality": np.random.uniform(0.85, 0.95)
}
self._last_empty_sample = {
"amplitude": amplitude,
"phase": phase
}
return sample
def generate_single_person_sample(self,
activity: str = "standing",
timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""Generate CSI sample for single person activity."""
if timestamp is None:
timestamp = datetime.utcnow()
if activity not in self.single_person_patterns:
raise ValueError(f"Unknown activity: {activity}")
pattern = self.single_person_patterns[activity]
# Generate base amplitude
amplitude = np.random.normal(
pattern["amplitude_mean"],
pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
# Add movement-induced variations
movement_freq = pattern["movement_frequency"]
time_factor = timestamp.timestamp()
movement_modulation = 0.1 * np.sin(2 * np.pi * movement_freq * time_factor)
amplitude += movement_modulation
amplitude = np.clip(amplitude, 0, 1)
# Generate phase with activity-specific variance
phase_base = np.random.uniform(-np.pi, np.pi, (self.num_antennas, self.num_subcarriers))
phase_variance = pattern["phase_variance"]
phase_noise = np.random.normal(0, phase_variance, (self.num_antennas, self.num_subcarriers))
phase = phase_base + phase_noise
phase = np.mod(phase + np.pi, 2 * np.pi) - np.pi # Wrap to [-π, π]
# Add temporal correlation
if hasattr(self, f'_last_{activity}_sample'):
stability = pattern["temporal_stability"]
last_sample = getattr(self, f'_last_{activity}_sample')
amplitude = stability * last_sample["amplitude"] + (1 - stability) * amplitude
phase = stability * last_sample["phase"] + (1 - stability) * phase
sample = {
"timestamp": timestamp.isoformat(),
"router_id": "router_001",
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
"frequency": self.frequency,
"bandwidth": self.bandwidth,
"num_antennas": self.num_antennas,
"num_subcarriers": self.num_subcarriers,
"sample_rate": self.sample_rate,
"scenario": f"single_person_{activity}",
"signal_quality": np.random.uniform(0.7, 0.9),
"activity": activity
}
setattr(self, f'_last_{activity}_sample', {
"amplitude": amplitude,
"phase": phase
})
return sample
def generate_multi_person_sample(self,
num_persons: int = 2,
activities: Optional[List[str]] = None,
timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""Generate CSI sample for multiple persons."""
if timestamp is None:
timestamp = datetime.utcnow()
if num_persons < 2 or num_persons > 4:
raise ValueError("Number of persons must be between 2 and 4")
if activities is None:
activities = random.choices(list(self.single_person_patterns.keys()), k=num_persons)
if len(activities) != num_persons:
raise ValueError("Number of activities must match number of persons")
# Start with empty room baseline
amplitude = np.random.normal(
self.empty_room_pattern["amplitude_mean"],
self.empty_room_pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
phase = np.random.uniform(
-np.pi, np.pi,
(self.num_antennas, self.num_subcarriers)
)
# Add contribution from each person
for i, activity in enumerate(activities):
person_pattern = self.single_person_patterns[activity]
# Generate person-specific contribution
person_amplitude = np.random.normal(
person_pattern["amplitude_mean"] * 0.7, # Reduced for multi-person
person_pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
# Add spatial variation (different persons at different locations)
spatial_offset = i * self.num_subcarriers // num_persons
person_amplitude = np.roll(person_amplitude, spatial_offset, axis=1)
# Add movement modulation
movement_freq = person_pattern["movement_frequency"]
time_factor = timestamp.timestamp() + i * 0.5 # Phase offset between persons
movement_modulation = 0.05 * np.sin(2 * np.pi * movement_freq * time_factor)
person_amplitude += movement_modulation
amplitude += person_amplitude
# Add phase contribution
person_phase = np.random.normal(0, person_pattern["phase_variance"],
(self.num_antennas, self.num_subcarriers))
person_phase = np.roll(person_phase, spatial_offset, axis=1)
phase += person_phase
# Apply multi-person complexity
pattern = self.multi_person_patterns[num_persons]
amplitude *= pattern["amplitude_multiplier"]
phase *= pattern["phase_complexity"]
# Clip and normalize
amplitude = np.clip(amplitude, 0, 1)
phase = np.mod(phase + np.pi, 2 * np.pi) - np.pi
sample = {
"timestamp": timestamp.isoformat(),
"router_id": "router_001",
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
"frequency": self.frequency,
"bandwidth": self.bandwidth,
"num_antennas": self.num_antennas,
"num_subcarriers": self.num_subcarriers,
"sample_rate": self.sample_rate,
"scenario": f"multi_person_{num_persons}",
"signal_quality": np.random.uniform(0.6, 0.8),
"num_persons": num_persons,
"activities": activities
}
return sample
def generate_time_series(self,
duration_seconds: int = 10,
scenario: str = "single_person_walking",
**kwargs) -> List[Dict[str, Any]]:
"""Generate time series of CSI samples."""
samples = []
start_time = datetime.utcnow()
for i in range(duration_seconds * self.sample_rate):
timestamp = start_time + timedelta(seconds=i / self.sample_rate)
if scenario == "empty_room":
sample = self.generate_empty_room_sample(timestamp)
elif scenario.startswith("single_person_"):
activity = scenario.replace("single_person_", "")
sample = self.generate_single_person_sample(activity, timestamp)
elif scenario.startswith("multi_person_"):
num_persons = int(scenario.split("_")[-1])
sample = self.generate_multi_person_sample(num_persons, timestamp=timestamp, **kwargs)
else:
raise ValueError(f"Unknown scenario: {scenario}")
samples.append(sample)
return samples
def add_noise(self, sample: Dict[str, Any], noise_level: Optional[float] = None) -> Dict[str, Any]:
"""Add noise to CSI sample."""
if noise_level is None:
noise_level = self.noise_level
noisy_sample = sample.copy()
# Add amplitude noise
amplitude = np.array(sample["amplitude"])
amplitude_noise = np.random.normal(0, noise_level, amplitude.shape)
noisy_amplitude = amplitude + amplitude_noise
noisy_amplitude = np.clip(noisy_amplitude, 0, 1)
noisy_sample["amplitude"] = noisy_amplitude.tolist()
# Add phase noise
phase = np.array(sample["phase"])
phase_noise = np.random.normal(0, noise_level * np.pi, phase.shape)
noisy_phase = phase + phase_noise
noisy_phase = np.mod(noisy_phase + np.pi, 2 * np.pi) - np.pi
noisy_sample["phase"] = noisy_phase.tolist()
# Reduce signal quality
noisy_sample["signal_quality"] *= (1 - noise_level)
return noisy_sample
def simulate_hardware_artifacts(self, sample: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate hardware-specific artifacts."""
artifact_sample = sample.copy()
amplitude = np.array(sample["amplitude"])
phase = np.array(sample["phase"])
# Simulate antenna coupling
coupling_matrix = np.random.uniform(0.95, 1.05, (self.num_antennas, self.num_antennas))
amplitude = coupling_matrix @ amplitude
# Simulate frequency-dependent gain variations
freq_response = 1 + 0.1 * np.sin(np.linspace(0, 2*np.pi, self.num_subcarriers))
amplitude *= freq_response[np.newaxis, :]
# Simulate phase drift
phase_drift = np.random.uniform(-0.1, 0.1) * np.arange(self.num_subcarriers)
phase += phase_drift[np.newaxis, :]
# Clip and wrap
amplitude = np.clip(amplitude, 0, 1)
phase = np.mod(phase + np.pi, 2 * np.pi) - np.pi
artifact_sample["amplitude"] = amplitude.tolist()
artifact_sample["phase"] = phase.tolist()
return artifact_sample
# Convenience functions for common test scenarios
def generate_fall_detection_sequence() -> List[Dict[str, Any]]:
"""Generate CSI sequence showing fall detection scenario."""
generator = CSIDataGenerator()
sequence = []
# Normal standing (5 seconds)
sequence.extend(generator.generate_time_series(5, "single_person_standing"))
# Walking (3 seconds)
sequence.extend(generator.generate_time_series(3, "single_person_walking"))
# Fall event (1 second transition)
sequence.extend(generator.generate_time_series(1, "single_person_fallen"))
# Fallen state (3 seconds)
sequence.extend(generator.generate_time_series(3, "single_person_fallen"))
return sequence
def generate_multi_person_scenario() -> List[Dict[str, Any]]:
"""Generate CSI sequence for multi-person scenario."""
generator = CSIDataGenerator()
sequence = []
# Start with empty room
sequence.extend(generator.generate_time_series(2, "empty_room"))
# One person enters
sequence.extend(generator.generate_time_series(3, "single_person_walking"))
# Second person enters
sequence.extend(generator.generate_time_series(5, "multi_person_2",
activities=["standing", "walking"]))
# Third person enters
sequence.extend(generator.generate_time_series(4, "multi_person_3",
activities=["standing", "walking", "sitting"]))
return sequence
def generate_noisy_environment_data() -> List[Dict[str, Any]]:
"""Generate CSI data with various noise levels."""
generator = CSIDataGenerator()
# Generate clean data
clean_samples = generator.generate_time_series(5, "single_person_walking")
# Add different noise levels
noisy_samples = []
noise_levels = [0.05, 0.1, 0.2, 0.3]
for noise_level in noise_levels:
for sample in clean_samples[:10]: # Take first 10 samples
noisy_sample = generator.add_noise(sample, noise_level)
noisy_samples.append(noisy_sample)
return noisy_samples
def generate_hardware_test_data() -> List[Dict[str, Any]]:
"""Generate CSI data with hardware artifacts."""
generator = CSIDataGenerator()
# Generate base samples
base_samples = generator.generate_time_series(3, "single_person_standing")
# Add hardware artifacts
artifact_samples = []
for sample in base_samples:
artifact_sample = generator.simulate_hardware_artifacts(sample)
artifact_samples.append(artifact_sample)
return artifact_samples
# Test data validation utilities
def validate_csi_sample(sample: Dict[str, Any]) -> bool:
"""Validate CSI sample structure and data ranges."""
required_fields = [
"timestamp", "router_id", "amplitude", "phase",
"frequency", "bandwidth", "num_antennas", "num_subcarriers"
]
# Check required fields
for field in required_fields:
if field not in sample:
return False
# Validate data types and ranges
amplitude = np.array(sample["amplitude"])
phase = np.array(sample["phase"])
# Check shapes
expected_shape = (sample["num_antennas"], sample["num_subcarriers"])
if amplitude.shape != expected_shape or phase.shape != expected_shape:
return False
# Check value ranges
if not (0 <= amplitude.min() and amplitude.max() <= 1):
return False
if not (-np.pi <= phase.min() and phase.max() <= np.pi):
return False
return True
def extract_features_from_csi(sample: Dict[str, Any]) -> Dict[str, Any]:
"""Extract features from CSI sample for testing."""
amplitude = np.array(sample["amplitude"])
phase = np.array(sample["phase"])
features = {
"amplitude_mean": float(np.mean(amplitude)),
"amplitude_std": float(np.std(amplitude)),
"amplitude_max": float(np.max(amplitude)),
"amplitude_min": float(np.min(amplitude)),
"phase_variance": float(np.var(phase)),
"phase_range": float(np.max(phase) - np.min(phase)),
"signal_energy": float(np.sum(amplitude ** 2)),
"phase_coherence": float(np.abs(np.mean(np.exp(1j * phase)))),
"spatial_correlation": float(np.mean(np.corrcoef(amplitude))),
"frequency_diversity": float(np.std(np.mean(amplitude, axis=0)))
}
return features