mirror of
https://github.com/ruvnet/RuView
synced 2026-06-09 10:13:17 +00:00
feat(rvcsi): rvcsi-events — window aggregation + event detectors (ADR-095 FR5)
- WindowBuffer: buffers exposable CsiFrames from one (session,source), emits a CsiWindow on a frame-count or duration threshold; computes per-subcarrier mean_amplitude / phase_variance and scalar motion_energy / presence_score / quality_score; skips mixed source/session and mismatched-subcarrier frames. - EventDetector trait + 4 state machines: PresenceDetector (hysteresis on presence_score), MotionDetector (debounced rising/falling edges on motion_energy), QualityDetector (SignalQualityDropped + once-per-stretch CalibrationRequired), BaselineDriftDetector (EWMA baseline → BaselineChanged / AnomalyDetected). Each with new()/with_config() + a public config struct. - EventPipeline: owns a WindowBuffer + Vec<Box<dyn EventDetector>> + IdGenerator; process_frame / flush / add_detector / recent_windows (32-window ring) / with_defaults. - 18 tests (incl. a 150-frame quiet/active/quiet end-to-end run via a seeded LCG + a determinism check). clippy-clean, forbid(unsafe_code), no heavy deps. https://claude.ai/code/session_01CdYAPvRTjcch6YrYf42n1z
This commit is contained in:
@@ -11,7 +11,6 @@ categories = ["science"]
|
||||
|
||||
[dependencies]
|
||||
rvcsi-core = { path = "../rvcsi-core" }
|
||||
rvcsi-dsp = { path = "../rvcsi-dsp" }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@@ -0,0 +1,779 @@
|
||||
//! Event detectors — small deterministic state machines over [`CsiWindow`]s.
|
||||
//!
|
||||
//! Every detector implements [`EventDetector`]; an [`crate::EventPipeline`]
|
||||
//! runs each in turn on every closed window and concatenates the emitted
|
||||
//! [`CsiEvent`]s. Detectors are intentionally tiny and side-effect-free: the
|
||||
//! only state they keep is the bare minimum to debounce / hysteresis-gate, so
|
||||
//! replaying the same window stream is fully deterministic.
|
||||
|
||||
use rvcsi_core::{CsiEvent, CsiEventKind, CsiWindow, IdGenerator, WindowId};
|
||||
|
||||
/// Consumes [`CsiWindow`]s and emits [`CsiEvent`]s.
|
||||
pub trait EventDetector {
|
||||
/// Process one window; return any events it triggers (possibly empty).
|
||||
fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec<CsiEvent>;
|
||||
|
||||
/// Stable name for logging / inspection.
|
||||
fn name(&self) -> &'static str;
|
||||
}
|
||||
|
||||
/// Build a single-window-evidence [`CsiEvent`] (validated in debug builds).
|
||||
fn make_event(
|
||||
ids: &IdGenerator,
|
||||
kind: CsiEventKind,
|
||||
window: &CsiWindow,
|
||||
timestamp_ns: u64,
|
||||
confidence: f32,
|
||||
) -> CsiEvent {
|
||||
let evidence: Vec<WindowId> = vec![window.window_id];
|
||||
let confidence = confidence.clamp(0.0, 1.0);
|
||||
let event = CsiEvent::new(
|
||||
ids.next_event(),
|
||||
kind,
|
||||
window.session_id,
|
||||
window.source_id.clone(),
|
||||
timestamp_ns,
|
||||
confidence,
|
||||
evidence,
|
||||
);
|
||||
debug_assert!(
|
||||
event.validate().is_ok(),
|
||||
"detector produced an invalid CsiEvent: {:?}",
|
||||
event.validate()
|
||||
);
|
||||
event
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// PresenceDetector
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Tunables for [`PresenceDetector`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct PresenceConfig {
|
||||
/// Enter `Present` when `presence_score >= on_threshold` for `enter_windows` windows.
|
||||
pub on_threshold: f32,
|
||||
/// Exit to `Absent` when `presence_score <= off_threshold` for `exit_windows` windows.
|
||||
pub off_threshold: f32,
|
||||
/// Consecutive high windows required to declare presence.
|
||||
pub enter_windows: u32,
|
||||
/// Consecutive low windows required to declare absence.
|
||||
pub exit_windows: u32,
|
||||
}
|
||||
|
||||
impl Default for PresenceConfig {
|
||||
fn default() -> Self {
|
||||
// A truly quiet window has `presence_score ≈ 0.40` (the
|
||||
// `WindowBuffer` logistic floor at zero motion), so `off_threshold`
|
||||
// sits just above that and `on_threshold` well above it.
|
||||
PresenceConfig {
|
||||
on_threshold: 0.7,
|
||||
off_threshold: 0.45,
|
||||
enter_windows: 2,
|
||||
exit_windows: 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PresenceConfig {
|
||||
/// Validate the relationship `on_threshold > off_threshold` and positivity.
|
||||
fn checked(self) -> Self {
|
||||
assert!(
|
||||
self.on_threshold > self.off_threshold,
|
||||
"PresenceConfig requires on_threshold > off_threshold"
|
||||
);
|
||||
assert!(self.enter_windows >= 1 && self.exit_windows >= 1);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
enum PresenceState {
|
||||
Absent,
|
||||
Present,
|
||||
}
|
||||
|
||||
/// Hysteresis state machine over [`CsiWindow::presence_score`].
|
||||
///
|
||||
/// Emits a single [`CsiEventKind::PresenceStarted`] when the score has been
|
||||
/// high for `enter_windows` consecutive windows, and a single
|
||||
/// [`CsiEventKind::PresenceEnded`] when it has been low for `exit_windows`
|
||||
/// consecutive windows. A window that breaks the streak resets the counter.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PresenceDetector {
|
||||
cfg: PresenceConfig,
|
||||
state: PresenceState,
|
||||
streak: u32,
|
||||
}
|
||||
|
||||
impl Default for PresenceDetector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl PresenceDetector {
|
||||
/// New detector with default thresholds.
|
||||
pub fn new() -> Self {
|
||||
Self::with_config(PresenceConfig::default())
|
||||
}
|
||||
|
||||
/// New detector with explicit config.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `on_threshold <= off_threshold` or a window count is zero.
|
||||
pub fn with_config(cfg: PresenceConfig) -> Self {
|
||||
PresenceDetector {
|
||||
cfg: cfg.checked(),
|
||||
state: PresenceState::Absent,
|
||||
streak: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventDetector for PresenceDetector {
|
||||
fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec<CsiEvent> {
|
||||
let p = window.presence_score;
|
||||
match self.state {
|
||||
PresenceState::Absent => {
|
||||
if p >= self.cfg.on_threshold {
|
||||
self.streak += 1;
|
||||
if self.streak >= self.cfg.enter_windows {
|
||||
self.state = PresenceState::Present;
|
||||
self.streak = 0;
|
||||
return vec![make_event(
|
||||
ids,
|
||||
CsiEventKind::PresenceStarted,
|
||||
window,
|
||||
window.end_ns,
|
||||
p,
|
||||
)];
|
||||
}
|
||||
} else {
|
||||
self.streak = 0;
|
||||
}
|
||||
}
|
||||
PresenceState::Present => {
|
||||
if p <= self.cfg.off_threshold {
|
||||
self.streak += 1;
|
||||
if self.streak >= self.cfg.exit_windows {
|
||||
self.state = PresenceState::Absent;
|
||||
self.streak = 0;
|
||||
return vec![make_event(
|
||||
ids,
|
||||
CsiEventKind::PresenceEnded,
|
||||
window,
|
||||
window.end_ns,
|
||||
(1.0 - p).clamp(0.0, 1.0),
|
||||
)];
|
||||
}
|
||||
} else {
|
||||
self.streak = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
"presence"
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// MotionDetector
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Tunables for [`MotionDetector`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct MotionConfig {
|
||||
/// Rising-edge threshold on `motion_energy`.
|
||||
pub on_threshold: f32,
|
||||
/// Falling-edge threshold on `motion_energy` (`< on_threshold`).
|
||||
pub off_threshold: f32,
|
||||
/// Consecutive windows above/below the relevant threshold before firing.
|
||||
pub debounce_windows: u32,
|
||||
}
|
||||
|
||||
impl Default for MotionConfig {
|
||||
fn default() -> Self {
|
||||
MotionConfig {
|
||||
on_threshold: 0.05,
|
||||
off_threshold: 0.02,
|
||||
debounce_windows: 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MotionConfig {
|
||||
fn checked(self) -> Self {
|
||||
assert!(
|
||||
self.on_threshold > self.off_threshold,
|
||||
"MotionConfig requires on_threshold > off_threshold"
|
||||
);
|
||||
assert!(self.debounce_windows >= 1);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
enum MotionState {
|
||||
Settled,
|
||||
Moving,
|
||||
}
|
||||
|
||||
/// State machine over [`CsiWindow::motion_energy`].
|
||||
///
|
||||
/// Emits [`CsiEventKind::MotionDetected`] on a debounced rising edge and
|
||||
/// [`CsiEventKind::MotionSettled`] on a debounced falling edge.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MotionDetector {
|
||||
cfg: MotionConfig,
|
||||
state: MotionState,
|
||||
streak: u32,
|
||||
}
|
||||
|
||||
impl Default for MotionDetector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl MotionDetector {
|
||||
/// New detector with default thresholds.
|
||||
pub fn new() -> Self {
|
||||
Self::with_config(MotionConfig::default())
|
||||
}
|
||||
|
||||
/// New detector with explicit config.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `on_threshold <= off_threshold` or `debounce_windows == 0`.
|
||||
pub fn with_config(cfg: MotionConfig) -> Self {
|
||||
MotionDetector {
|
||||
cfg: cfg.checked(),
|
||||
state: MotionState::Settled,
|
||||
streak: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventDetector for MotionDetector {
|
||||
fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec<CsiEvent> {
|
||||
let m = window.motion_energy;
|
||||
match self.state {
|
||||
MotionState::Settled => {
|
||||
if m > self.cfg.on_threshold {
|
||||
self.streak += 1;
|
||||
if self.streak >= self.cfg.debounce_windows {
|
||||
self.state = MotionState::Moving;
|
||||
self.streak = 0;
|
||||
let conf = (m / (2.0 * self.cfg.on_threshold)).clamp(0.0, 1.0);
|
||||
return vec![make_event(
|
||||
ids,
|
||||
CsiEventKind::MotionDetected,
|
||||
window,
|
||||
window.end_ns,
|
||||
conf,
|
||||
)];
|
||||
}
|
||||
} else {
|
||||
self.streak = 0;
|
||||
}
|
||||
}
|
||||
MotionState::Moving => {
|
||||
if m < self.cfg.off_threshold {
|
||||
self.streak += 1;
|
||||
if self.streak >= self.cfg.debounce_windows {
|
||||
self.state = MotionState::Settled;
|
||||
self.streak = 0;
|
||||
let rise = (m / (2.0 * self.cfg.on_threshold)).clamp(0.0, 1.0);
|
||||
return vec![make_event(
|
||||
ids,
|
||||
CsiEventKind::MotionSettled,
|
||||
window,
|
||||
window.end_ns,
|
||||
(1.0 - rise).clamp(0.0, 1.0),
|
||||
)];
|
||||
}
|
||||
} else {
|
||||
self.streak = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
"motion"
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// QualityDetector
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Tunables for [`QualityDetector`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct QualityConfig {
|
||||
/// `quality_score` below this (debounced) raises [`CsiEventKind::SignalQualityDropped`].
|
||||
pub drop_threshold: f32,
|
||||
/// Consecutive low windows before [`CsiEventKind::SignalQualityDropped`] fires.
|
||||
pub debounce_windows: u32,
|
||||
/// Consecutive low windows (counting from the first low one) before
|
||||
/// [`CsiEventKind::CalibrationRequired`] also fires — once per low stretch.
|
||||
pub calib_windows: u32,
|
||||
}
|
||||
|
||||
impl Default for QualityConfig {
|
||||
fn default() -> Self {
|
||||
QualityConfig {
|
||||
drop_threshold: 0.4,
|
||||
debounce_windows: 2,
|
||||
calib_windows: 4,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl QualityConfig {
|
||||
fn checked(self) -> Self {
|
||||
assert!(self.debounce_windows >= 1 && self.calib_windows >= 1);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// State machine over [`CsiWindow::quality_score`].
|
||||
///
|
||||
/// While `quality_score` stays below `drop_threshold` it counts a low streak.
|
||||
/// At `debounce_windows` it emits [`CsiEventKind::SignalQualityDropped`]; at
|
||||
/// `calib_windows` it additionally emits [`CsiEventKind::CalibrationRequired`]
|
||||
/// (only once until quality recovers). Any window at or above `drop_threshold`
|
||||
/// resets the streak and re-arms both events.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct QualityDetector {
|
||||
cfg: QualityConfig,
|
||||
low_streak: u32,
|
||||
dropped_emitted: bool,
|
||||
calib_emitted: bool,
|
||||
}
|
||||
|
||||
impl Default for QualityDetector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl QualityDetector {
|
||||
/// New detector with default thresholds.
|
||||
pub fn new() -> Self {
|
||||
Self::with_config(QualityConfig::default())
|
||||
}
|
||||
|
||||
/// New detector with explicit config.
|
||||
pub fn with_config(cfg: QualityConfig) -> Self {
|
||||
QualityDetector {
|
||||
cfg: cfg.checked(),
|
||||
low_streak: 0,
|
||||
dropped_emitted: false,
|
||||
calib_emitted: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventDetector for QualityDetector {
|
||||
fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec<CsiEvent> {
|
||||
let q = window.quality_score;
|
||||
if q < self.cfg.drop_threshold {
|
||||
self.low_streak += 1;
|
||||
let mut out = Vec::new();
|
||||
if !self.dropped_emitted && self.low_streak >= self.cfg.debounce_windows {
|
||||
self.dropped_emitted = true;
|
||||
out.push(make_event(
|
||||
ids,
|
||||
CsiEventKind::SignalQualityDropped,
|
||||
window,
|
||||
window.end_ns,
|
||||
(1.0 - q).clamp(0.0, 1.0),
|
||||
));
|
||||
}
|
||||
if !self.calib_emitted && self.low_streak >= self.cfg.calib_windows {
|
||||
self.calib_emitted = true;
|
||||
out.push(make_event(
|
||||
ids,
|
||||
CsiEventKind::CalibrationRequired,
|
||||
window,
|
||||
window.end_ns,
|
||||
(1.0 - q).clamp(0.0, 1.0),
|
||||
));
|
||||
}
|
||||
out
|
||||
} else {
|
||||
self.low_streak = 0;
|
||||
self.dropped_emitted = false;
|
||||
self.calib_emitted = false;
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
"quality"
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// BaselineDriftDetector
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Tunables for [`BaselineDriftDetector`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct BaselineDriftConfig {
|
||||
/// Per-window drift `||mean_amplitude - baseline||_2 / sqrt(n)` above this
|
||||
/// for `drift_windows` windows in a row triggers [`CsiEventKind::BaselineChanged`].
|
||||
pub drift_threshold: f32,
|
||||
/// Consecutive drifting windows before [`CsiEventKind::BaselineChanged`] fires.
|
||||
pub drift_windows: u32,
|
||||
/// A single window with drift above this (much larger) value triggers
|
||||
/// [`CsiEventKind::AnomalyDetected`].
|
||||
pub anomaly_threshold: f32,
|
||||
/// EWMA smoothing factor for the running baseline (`baseline = a*current + (1-a)*baseline`).
|
||||
pub ewma_alpha: f32,
|
||||
}
|
||||
|
||||
impl Default for BaselineDriftConfig {
|
||||
fn default() -> Self {
|
||||
BaselineDriftConfig {
|
||||
drift_threshold: 0.15,
|
||||
drift_windows: 3,
|
||||
anomaly_threshold: 1.0,
|
||||
ewma_alpha: 0.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BaselineDriftConfig {
|
||||
fn checked(self) -> Self {
|
||||
assert!(self.drift_windows >= 1);
|
||||
assert!(self.anomaly_threshold > self.drift_threshold);
|
||||
assert!(self.ewma_alpha > 0.0 && self.ewma_alpha <= 1.0);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks an EWMA baseline of `mean_amplitude` and flags sustained drift /
|
||||
/// single-window anomalies.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BaselineDriftDetector {
|
||||
cfg: BaselineDriftConfig,
|
||||
baseline: Option<Vec<f32>>,
|
||||
drift_streak: u32,
|
||||
}
|
||||
|
||||
impl Default for BaselineDriftDetector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl BaselineDriftDetector {
|
||||
/// New detector with default thresholds.
|
||||
pub fn new() -> Self {
|
||||
Self::with_config(BaselineDriftConfig::default())
|
||||
}
|
||||
|
||||
/// New detector with explicit config.
|
||||
pub fn with_config(cfg: BaselineDriftConfig) -> Self {
|
||||
BaselineDriftDetector {
|
||||
cfg: cfg.checked(),
|
||||
baseline: None,
|
||||
drift_streak: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// L2 distance between two equal-length vectors, normalized by `sqrt(len)`.
|
||||
fn rms_distance(a: &[f32], b: &[f32]) -> f32 {
|
||||
let n = a.len();
|
||||
if n == 0 {
|
||||
return 0.0;
|
||||
}
|
||||
let mut sq = 0.0f64;
|
||||
for k in 0..n {
|
||||
let d = (a[k] - b[k]) as f64;
|
||||
sq += d * d;
|
||||
}
|
||||
(sq.sqrt() / (n as f64).sqrt()) as f32
|
||||
}
|
||||
|
||||
fn update_ewma(&mut self, current: &[f32]) {
|
||||
match &mut self.baseline {
|
||||
None => self.baseline = Some(current.to_vec()),
|
||||
Some(b) if b.len() != current.len() => {
|
||||
self.baseline = Some(current.to_vec());
|
||||
}
|
||||
Some(b) => {
|
||||
let a = self.cfg.ewma_alpha;
|
||||
for k in 0..b.len() {
|
||||
b[k] = a * current[k] + (1.0 - a) * b[k];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventDetector for BaselineDriftDetector {
|
||||
fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec<CsiEvent> {
|
||||
let current = &window.mean_amplitude;
|
||||
let baseline = match &self.baseline {
|
||||
None => {
|
||||
// First window establishes the baseline; no drift possible yet.
|
||||
self.baseline = Some(current.clone());
|
||||
return Vec::new();
|
||||
}
|
||||
Some(b) if b.len() != current.len() => {
|
||||
// Subcarrier count changed — reset and skip this window.
|
||||
self.baseline = Some(current.clone());
|
||||
self.drift_streak = 0;
|
||||
return Vec::new();
|
||||
}
|
||||
Some(b) => b.clone(),
|
||||
};
|
||||
|
||||
let drift = Self::rms_distance(current, &baseline);
|
||||
let mut out = Vec::new();
|
||||
|
||||
if drift > self.cfg.anomaly_threshold {
|
||||
out.push(make_event(
|
||||
ids,
|
||||
CsiEventKind::AnomalyDetected,
|
||||
window,
|
||||
window.end_ns,
|
||||
(drift / (2.0 * self.cfg.anomaly_threshold)).clamp(0.0, 1.0),
|
||||
));
|
||||
}
|
||||
|
||||
if drift > self.cfg.drift_threshold {
|
||||
self.drift_streak += 1;
|
||||
if self.drift_streak >= self.cfg.drift_windows {
|
||||
out.push(make_event(
|
||||
ids,
|
||||
CsiEventKind::BaselineChanged,
|
||||
window,
|
||||
window.end_ns,
|
||||
(drift / (2.0 * self.cfg.drift_threshold)).clamp(0.0, 1.0),
|
||||
));
|
||||
self.drift_streak = 0;
|
||||
// Hard-reset the baseline to the new operating point.
|
||||
self.baseline = Some(current.clone());
|
||||
return out;
|
||||
}
|
||||
} else {
|
||||
self.drift_streak = 0;
|
||||
}
|
||||
|
||||
self.update_ewma(current);
|
||||
out
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
"baseline_drift"
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rvcsi_core::{SessionId, SourceId};
|
||||
|
||||
fn window(window_id: u64, end_ns: u64, motion: f32, presence: f32, quality: f32) -> CsiWindow {
|
||||
let end_ns = end_ns.max(1);
|
||||
CsiWindow {
|
||||
window_id: WindowId(window_id),
|
||||
session_id: SessionId(0),
|
||||
source_id: SourceId::from("s"),
|
||||
start_ns: end_ns.saturating_sub(1_000),
|
||||
end_ns,
|
||||
frame_count: 8,
|
||||
mean_amplitude: vec![1.0; 8],
|
||||
phase_variance: vec![0.0; 8],
|
||||
motion_energy: motion,
|
||||
presence_score: presence,
|
||||
quality_score: quality,
|
||||
}
|
||||
}
|
||||
|
||||
fn window_amp(window_id: u64, end_ns: u64, amp: Vec<f32>) -> CsiWindow {
|
||||
let n = amp.len();
|
||||
CsiWindow {
|
||||
window_id: WindowId(window_id),
|
||||
session_id: SessionId(0),
|
||||
source_id: SourceId::from("s"),
|
||||
start_ns: 0,
|
||||
end_ns: end_ns.max(1),
|
||||
frame_count: 8,
|
||||
mean_amplitude: amp,
|
||||
phase_variance: vec![0.0; n],
|
||||
motion_energy: 0.0,
|
||||
presence_score: 0.0,
|
||||
quality_score: 0.9,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn presence_detector_emits_started_then_ended() {
|
||||
let g = IdGenerator::new();
|
||||
let mut d = PresenceDetector::with_config(PresenceConfig {
|
||||
on_threshold: 0.6,
|
||||
off_threshold: 0.35,
|
||||
enter_windows: 2,
|
||||
exit_windows: 3,
|
||||
});
|
||||
let mut events = Vec::new();
|
||||
// Low windows.
|
||||
for k in 0..3u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.05, 0.9), &g));
|
||||
}
|
||||
assert!(events.is_empty());
|
||||
// High run -> PresenceStarted after the 2nd one.
|
||||
for k in 3..8u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.5, 0.95, 0.9), &g));
|
||||
}
|
||||
// Low run -> PresenceEnded after the 3rd low one.
|
||||
for k in 8..13u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.05, 0.9), &g));
|
||||
}
|
||||
assert_eq!(events.len(), 2, "events = {events:?}");
|
||||
assert_eq!(events[0].kind, CsiEventKind::PresenceStarted);
|
||||
assert_eq!(events[1].kind, CsiEventKind::PresenceEnded);
|
||||
for e in &events {
|
||||
assert!(e.validate().is_ok());
|
||||
assert!(!e.evidence_window_ids.is_empty());
|
||||
assert!((0.0..=1.0).contains(&e.confidence));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn presence_detector_streak_reset() {
|
||||
let g = IdGenerator::new();
|
||||
let mut d = PresenceDetector::new();
|
||||
// 1 high, 1 low (resets), then enough highs.
|
||||
assert!(d.on_window(&window(0, 1_000, 0.0, 0.95, 0.9), &g).is_empty());
|
||||
assert!(d.on_window(&window(1, 2_000, 0.0, 0.05, 0.9), &g).is_empty());
|
||||
assert!(d.on_window(&window(2, 3_000, 0.0, 0.95, 0.9), &g).is_empty());
|
||||
let e = d.on_window(&window(3, 4_000, 0.0, 0.95, 0.9), &g);
|
||||
assert_eq!(e.len(), 1);
|
||||
assert_eq!(e[0].kind, CsiEventKind::PresenceStarted);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn motion_detector_emits_detected_then_settled() {
|
||||
let g = IdGenerator::new();
|
||||
let mut d = MotionDetector::with_config(MotionConfig {
|
||||
on_threshold: 0.05,
|
||||
off_threshold: 0.02,
|
||||
debounce_windows: 2,
|
||||
});
|
||||
let mut events = Vec::new();
|
||||
for k in 0..2u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.001, 0.0, 0.9), &g));
|
||||
}
|
||||
for k in 2..6u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.3, 0.0, 0.9), &g));
|
||||
}
|
||||
for k in 6..10u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.0, 0.9), &g));
|
||||
}
|
||||
assert_eq!(events.len(), 2, "events = {events:?}");
|
||||
assert_eq!(events[0].kind, CsiEventKind::MotionDetected);
|
||||
assert_eq!(events[1].kind, CsiEventKind::MotionSettled);
|
||||
for e in &events {
|
||||
assert!(e.validate().is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn quality_detector_drop_then_calibration_once() {
|
||||
let g = IdGenerator::new();
|
||||
let mut d = QualityDetector::with_config(QualityConfig {
|
||||
drop_threshold: 0.4,
|
||||
debounce_windows: 2,
|
||||
calib_windows: 4,
|
||||
});
|
||||
let mut events = Vec::new();
|
||||
// Good window first.
|
||||
events.extend(d.on_window(&window(0, 1_000, 0.0, 0.0, 0.9), &g));
|
||||
// Low run.
|
||||
for k in 1..8u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.0, 0.1), &g));
|
||||
}
|
||||
let dropped = events
|
||||
.iter()
|
||||
.filter(|e| e.kind == CsiEventKind::SignalQualityDropped)
|
||||
.count();
|
||||
let calib = events
|
||||
.iter()
|
||||
.filter(|e| e.kind == CsiEventKind::CalibrationRequired)
|
||||
.count();
|
||||
assert_eq!(dropped, 1, "events = {events:?}");
|
||||
assert_eq!(calib, 1, "events = {events:?}");
|
||||
for e in &events {
|
||||
assert!(e.validate().is_ok());
|
||||
}
|
||||
// Recover and drop again -> re-armed.
|
||||
events.clear();
|
||||
events.extend(d.on_window(&window(8, 9_000, 0.0, 0.0, 0.95), &g));
|
||||
for k in 9..14u64 {
|
||||
events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.0, 0.1), &g));
|
||||
}
|
||||
assert_eq!(
|
||||
events
|
||||
.iter()
|
||||
.filter(|e| e.kind == CsiEventKind::SignalQualityDropped)
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn baseline_drift_stable_then_shift_then_anomaly() {
|
||||
let g = IdGenerator::new();
|
||||
let mut d = BaselineDriftDetector::with_config(BaselineDriftConfig {
|
||||
drift_threshold: 0.15,
|
||||
drift_windows: 3,
|
||||
anomaly_threshold: 1.0,
|
||||
ewma_alpha: 0.1,
|
||||
});
|
||||
// Stable baseline -> no events.
|
||||
let mut events = Vec::new();
|
||||
for k in 0..5u64 {
|
||||
events.extend(d.on_window(&window_amp(k, (k + 1) * 1_000, vec![1.0; 8]), &g));
|
||||
}
|
||||
assert!(events.is_empty(), "events = {events:?}");
|
||||
// Sustained shift -> BaselineChanged.
|
||||
for k in 5..10u64 {
|
||||
events.extend(d.on_window(&window_amp(k, (k + 1) * 1_000, vec![1.5; 8]), &g));
|
||||
}
|
||||
assert!(
|
||||
events.iter().any(|e| e.kind == CsiEventKind::BaselineChanged),
|
||||
"events = {events:?}"
|
||||
);
|
||||
// Single huge spike -> AnomalyDetected.
|
||||
events.clear();
|
||||
events.extend(d.on_window(&window_amp(10, 11_000, vec![50.0; 8]), &g));
|
||||
assert!(
|
||||
events.iter().any(|e| e.kind == CsiEventKind::AnomalyDetected),
|
||||
"events = {events:?}"
|
||||
);
|
||||
for e in &events {
|
||||
assert!(e.validate().is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn baseline_drift_resets_on_subcarrier_change() {
|
||||
let g = IdGenerator::new();
|
||||
let mut d = BaselineDriftDetector::new();
|
||||
assert!(d.on_window(&window_amp(0, 1_000, vec![1.0; 8]), &g).is_empty());
|
||||
// Different length -> reset, no event.
|
||||
assert!(d.on_window(&window_amp(1, 2_000, vec![1.0; 16]), &g).is_empty());
|
||||
assert!(d.on_window(&window_amp(2, 3_000, vec![1.0; 16]), &g).is_empty());
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,37 @@
|
||||
//! # rvCSI events (skeleton — implemented by the rvcsi-events swarm agent)
|
||||
//! # rvCSI events — window aggregation + semantic event extraction (ADR-095 FR5)
|
||||
//!
|
||||
//! Window aggregation and semantic event extraction (ADR-095 FR5).
|
||||
#![forbid(unsafe_code)]
|
||||
//! This crate turns a stream of validated [`rvcsi_core::CsiFrame`]s into
|
||||
//! [`rvcsi_core::CsiWindow`]s and then into [`rvcsi_core::CsiEvent`]s.
|
||||
//!
|
||||
//! The pipeline has three layers:
|
||||
//!
|
||||
//! 1. [`WindowBuffer`] — buffers exposable frames from one
|
||||
//! `(session_id, source_id)` and emits a [`rvcsi_core::CsiWindow`] when a
|
||||
//! frame-count or duration threshold is hit. Per-subcarrier statistics
|
||||
//! (`mean_amplitude`, `phase_variance`) and the scalar `motion_energy`,
|
||||
//! `presence_score` and `quality_score` are computed here.
|
||||
//! 2. [`EventDetector`] implementations — small, deterministic state machines
|
||||
//! that consume windows and emit events:
|
||||
//! [`PresenceDetector`], [`MotionDetector`], [`QualityDetector`] and
|
||||
//! [`BaselineDriftDetector`].
|
||||
//! 3. [`EventPipeline`] — wires a [`WindowBuffer`] and a set of detectors
|
||||
//! together and owns an [`rvcsi_core::IdGenerator`].
|
||||
//!
|
||||
//! Determinism: feeding the same frame stream through an [`EventPipeline`]
|
||||
//! always produces the same event list (modulo the ids, which are minted in a
|
||||
//! deterministic order). All "noise" in the tests comes from a tiny LCG, never
|
||||
//! from `rand`.
|
||||
|
||||
/// Placeholder so the crate compiles before the agent fills it in.
|
||||
pub fn __rvcsi_events_placeholder() {}
|
||||
#![forbid(unsafe_code)]
|
||||
#![warn(missing_docs)]
|
||||
|
||||
mod detectors;
|
||||
mod pipeline;
|
||||
mod window_buffer;
|
||||
|
||||
pub use detectors::{
|
||||
BaselineDriftConfig, BaselineDriftDetector, EventDetector, MotionConfig, MotionDetector,
|
||||
PresenceConfig, PresenceDetector, QualityConfig, QualityDetector,
|
||||
};
|
||||
pub use pipeline::EventPipeline;
|
||||
pub use window_buffer::{WindowBuffer, WindowBufferConfig};
|
||||
|
||||
@@ -0,0 +1,260 @@
|
||||
//! [`EventPipeline`] — wires a [`WindowBuffer`] to a set of [`EventDetector`]s.
|
||||
//!
|
||||
//! A pipeline owns its own [`IdGenerator`] so window/event ids are minted in a
|
||||
//! deterministic order. Feed it frames with [`EventPipeline::process_frame`]
|
||||
//! and drain the tail with [`EventPipeline::flush`].
|
||||
|
||||
use rvcsi_core::{CsiEvent, CsiFrame, CsiWindow, IdGenerator, SessionId, SourceId};
|
||||
|
||||
use crate::detectors::{
|
||||
BaselineDriftDetector, EventDetector, MotionDetector, PresenceDetector, QualityDetector,
|
||||
};
|
||||
use crate::window_buffer::{WindowBuffer, WindowBufferConfig};
|
||||
|
||||
/// How many recently-closed windows the pipeline keeps for inspection.
|
||||
const RECENT_WINDOW_CAP: usize = 32;
|
||||
|
||||
/// Aggregates frames into windows and runs detectors over them.
|
||||
pub struct EventPipeline {
|
||||
buffer: WindowBuffer,
|
||||
detectors: Vec<Box<dyn EventDetector>>,
|
||||
ids: IdGenerator,
|
||||
recent: Vec<CsiWindow>,
|
||||
}
|
||||
|
||||
impl core::fmt::Debug for EventPipeline {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
f.debug_struct("EventPipeline")
|
||||
.field("detectors", &self.detectors.iter().map(|d| d.name()).collect::<Vec<_>>())
|
||||
.field("pending_frame_count", &self.buffer.pending_frame_count())
|
||||
.field("recent_windows", &self.recent.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventPipeline {
|
||||
/// New pipeline with the given window-buffer config and no detectors.
|
||||
///
|
||||
/// Add detectors with [`EventPipeline::add_detector`].
|
||||
pub fn new(session_id: SessionId, source_id: SourceId, buffer_cfg: WindowBufferConfig) -> Self {
|
||||
EventPipeline {
|
||||
buffer: WindowBuffer::with_config(session_id, source_id, buffer_cfg),
|
||||
detectors: Vec::new(),
|
||||
ids: IdGenerator::new(),
|
||||
recent: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// New pipeline with the four default detectors and a 16-frame / 1-second
|
||||
/// window buffer.
|
||||
pub fn with_defaults(session_id: SessionId, source_id: SourceId) -> Self {
|
||||
let mut p = Self::new(
|
||||
session_id,
|
||||
source_id,
|
||||
WindowBufferConfig::new(16, 1_000_000_000),
|
||||
);
|
||||
p.add_detector(Box::new(PresenceDetector::new()));
|
||||
p.add_detector(Box::new(MotionDetector::new()));
|
||||
p.add_detector(Box::new(QualityDetector::new()));
|
||||
p.add_detector(Box::new(BaselineDriftDetector::new()));
|
||||
p
|
||||
}
|
||||
|
||||
/// Append a detector. Detectors run in insertion order on every window.
|
||||
pub fn add_detector(&mut self, detector: Box<dyn EventDetector>) {
|
||||
self.detectors.push(detector);
|
||||
}
|
||||
|
||||
/// Names of the registered detectors, in order.
|
||||
pub fn detector_names(&self) -> Vec<&'static str> {
|
||||
self.detectors.iter().map(|d| d.name()).collect()
|
||||
}
|
||||
|
||||
/// The most-recently-closed windows (newest last), capped at 32.
|
||||
pub fn recent_windows(&self) -> &[CsiWindow] {
|
||||
&self.recent
|
||||
}
|
||||
|
||||
/// Frames buffered but not yet emitted as a window.
|
||||
pub fn pending_frame_count(&self) -> usize {
|
||||
self.buffer.pending_frame_count()
|
||||
}
|
||||
|
||||
/// Push one frame; if it closes a window, run every detector on that window
|
||||
/// and return their concatenated events. Otherwise return an empty `Vec`.
|
||||
pub fn process_frame(&mut self, frame: &CsiFrame) -> Vec<CsiEvent> {
|
||||
match self.buffer.push(frame, &self.ids) {
|
||||
Some(window) => self.run_detectors(window),
|
||||
None => Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Close whatever frames remain in the buffer into a final window and run
|
||||
/// detectors on it. Returns an empty `Vec` if the buffer was empty.
|
||||
pub fn flush(&mut self) -> Vec<CsiEvent> {
|
||||
match self.buffer.flush(&self.ids) {
|
||||
Some(window) => self.run_detectors(window),
|
||||
None => Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn run_detectors(&mut self, window: CsiWindow) -> Vec<CsiEvent> {
|
||||
let mut events = Vec::new();
|
||||
for d in &mut self.detectors {
|
||||
events.extend(d.on_window(&window, &self.ids));
|
||||
}
|
||||
debug_assert!(events.iter().all(|e| e.validate().is_ok()));
|
||||
self.recent.push(window);
|
||||
if self.recent.len() > RECENT_WINDOW_CAP {
|
||||
let overflow = self.recent.len() - RECENT_WINDOW_CAP;
|
||||
self.recent.drain(0..overflow);
|
||||
}
|
||||
events
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rvcsi_core::{AdapterKind, CsiEventKind, FrameId, ValidationStatus};
|
||||
|
||||
/// Deterministic LCG (Numerical Recipes constants) -> `[0.0, 1.0)`.
|
||||
struct Lcg(u64);
|
||||
impl Lcg {
|
||||
fn new(seed: u64) -> Self {
|
||||
Lcg(seed)
|
||||
}
|
||||
fn next_unit(&mut self) -> f32 {
|
||||
self.0 = self.0.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407);
|
||||
// top 24 bits -> [0,1)
|
||||
((self.0 >> 40) as f32) / (1u64 << 24) as f32
|
||||
}
|
||||
}
|
||||
|
||||
fn accepted_frame(frame_id: u64, ts: u64, amp: &[f32], quality: f32) -> CsiFrame {
|
||||
let i: Vec<f32> = amp.to_vec();
|
||||
let q: Vec<f32> = vec![0.0; amp.len()];
|
||||
let mut f = CsiFrame::from_iq(
|
||||
FrameId(frame_id),
|
||||
SessionId(1),
|
||||
SourceId::from("dev"),
|
||||
AdapterKind::Synthetic,
|
||||
ts,
|
||||
6,
|
||||
20,
|
||||
i,
|
||||
q,
|
||||
);
|
||||
f.validation = ValidationStatus::Accepted;
|
||||
f.quality_score = quality;
|
||||
f
|
||||
}
|
||||
|
||||
/// Build a quiet / active / quiet frame stream with monotonic 50 ms
|
||||
/// timestamps. Long enough that the default 16-frame window buffer yields
|
||||
/// enough windows for the detectors' debounce / hysteresis chains.
|
||||
fn synthetic_stream() -> Vec<CsiFrame> {
|
||||
let mut rng = Lcg::new(0xC0FFEE);
|
||||
let mut frames = Vec::new();
|
||||
let dt = 50_000_000u64; // 50 ms
|
||||
let quiet_a = 30u64;
|
||||
let active = 60u64;
|
||||
let quiet_b = 60u64;
|
||||
let total = quiet_a + active + quiet_b;
|
||||
for k in 0..total {
|
||||
let ts = k * dt;
|
||||
let is_active = (quiet_a..quiet_a + active).contains(&k);
|
||||
let amp: Vec<f32> = (0..32)
|
||||
.map(|_| {
|
||||
if is_active {
|
||||
// Large per-frame jitter.
|
||||
1.0 + (rng.next_unit() - 0.5) * 4.0
|
||||
} else {
|
||||
// Tiny deterministic noise around 1.0.
|
||||
1.0 + (rng.next_unit() - 0.5) * 0.001
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
frames.push(accepted_frame(k, ts, &, 0.9));
|
||||
}
|
||||
frames
|
||||
}
|
||||
|
||||
fn run_stream(frames: &[CsiFrame]) -> Vec<CsiEvent> {
|
||||
let mut p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev"));
|
||||
let mut events = Vec::new();
|
||||
for f in frames {
|
||||
events.extend(p.process_frame(f));
|
||||
}
|
||||
events.extend(p.flush());
|
||||
events
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pipeline_detects_motion_and_presence_and_settles() {
|
||||
let frames = synthetic_stream();
|
||||
let events = run_stream(&frames);
|
||||
assert!(!events.is_empty(), "expected some events");
|
||||
for e in &events {
|
||||
assert!(e.validate().is_ok(), "invalid event: {e:?}");
|
||||
}
|
||||
let kinds: Vec<CsiEventKind> = events.iter().map(|e| e.kind).collect();
|
||||
assert!(kinds.contains(&CsiEventKind::MotionDetected), "kinds = {kinds:?}");
|
||||
assert!(kinds.contains(&CsiEventKind::PresenceStarted), "kinds = {kinds:?}");
|
||||
assert!(kinds.contains(&CsiEventKind::MotionSettled), "kinds = {kinds:?}");
|
||||
assert!(kinds.contains(&CsiEventKind::PresenceEnded), "kinds = {kinds:?}");
|
||||
|
||||
// MotionDetected should come before MotionSettled.
|
||||
let det = events.iter().position(|e| e.kind == CsiEventKind::MotionDetected).unwrap();
|
||||
let set = events.iter().position(|e| e.kind == CsiEventKind::MotionSettled).unwrap();
|
||||
assert!(det < set);
|
||||
let start = events.iter().position(|e| e.kind == CsiEventKind::PresenceStarted).unwrap();
|
||||
let end = events.iter().position(|e| e.kind == CsiEventKind::PresenceEnded).unwrap();
|
||||
assert!(start < end);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pipeline_is_deterministic() {
|
||||
let frames = synthetic_stream();
|
||||
let a = run_stream(&frames);
|
||||
let b = run_stream(&frames);
|
||||
assert_eq!(a, b, "same stream must yield identical events");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pipeline_recent_windows_and_pending_count() {
|
||||
let mut p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev"));
|
||||
let amp = vec![1.0f32; 32];
|
||||
// Two windows worth of frames (16 each at the 16-frame cap).
|
||||
for k in 0..16u64 {
|
||||
p.process_frame(&accepted_frame(k, k * 10_000, &, 0.9));
|
||||
}
|
||||
assert_eq!(p.recent_windows().len(), 1);
|
||||
assert_eq!(p.pending_frame_count(), 0);
|
||||
p.process_frame(&accepted_frame(16, 200_000, &, 0.9));
|
||||
assert_eq!(p.pending_frame_count(), 1);
|
||||
let leftover = p.flush();
|
||||
let _ = leftover;
|
||||
assert_eq!(p.recent_windows().len(), 2);
|
||||
assert_eq!(p.pending_frame_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pipeline_skips_foreign_frames() {
|
||||
let mut p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev"));
|
||||
let amp = vec![1.0f32; 8];
|
||||
let mut foreign = accepted_frame(0, 0, &, 0.9);
|
||||
foreign.session_id = SessionId(99);
|
||||
assert!(p.process_frame(&foreign).is_empty());
|
||||
assert_eq!(p.pending_frame_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detector_names_in_order() {
|
||||
let p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev"));
|
||||
assert_eq!(
|
||||
p.detector_names(),
|
||||
vec!["presence", "motion", "quality", "baseline_drift"]
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,392 @@
|
||||
//! [`WindowBuffer`] — aggregates exposable [`CsiFrame`]s into [`CsiWindow`]s.
|
||||
|
||||
use rvcsi_core::{CsiFrame, CsiWindow, IdGenerator, SessionId, SourceId};
|
||||
|
||||
/// Tunables for a [`WindowBuffer`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct WindowBufferConfig {
|
||||
/// Close the window once this many frames have been buffered. Must be `>= 2`.
|
||||
pub max_frames: usize,
|
||||
/// Close the window once `last_ts - first_ts >= max_duration_ns`.
|
||||
pub max_duration_ns: u64,
|
||||
/// Centre of the logistic that maps `motion_energy` to `presence_score`.
|
||||
pub presence_threshold: f32,
|
||||
}
|
||||
|
||||
impl WindowBufferConfig {
|
||||
/// Build a config with a default `presence_threshold` of `0.05`.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `max_frames < 2`.
|
||||
pub fn new(max_frames: usize, max_duration_ns: u64) -> Self {
|
||||
assert!(max_frames >= 2, "WindowBuffer max_frames must be >= 2");
|
||||
WindowBufferConfig {
|
||||
max_frames,
|
||||
max_duration_ns,
|
||||
presence_threshold: 0.05,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder-style setter for [`WindowBufferConfig::presence_threshold`].
|
||||
pub fn with_presence_threshold(mut self, t: f32) -> Self {
|
||||
self.presence_threshold = t;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Buffers frames from one `(session_id, source_id)` and emits windows.
|
||||
///
|
||||
/// Use [`WindowBuffer::push`] for each incoming frame; it returns `Some(window)`
|
||||
/// on the frame that closes a window (that frame being the last in the window).
|
||||
/// Call [`WindowBuffer::flush`] at end-of-stream to drain whatever is buffered.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WindowBuffer {
|
||||
session_id: SessionId,
|
||||
source_id: SourceId,
|
||||
cfg: WindowBufferConfig,
|
||||
/// Subcarrier count fixed by the first buffered frame of the current window.
|
||||
subcarrier_count: Option<u16>,
|
||||
/// Buffered `amplitude` vectors (one per accepted frame).
|
||||
amplitudes: Vec<Vec<f32>>,
|
||||
/// Buffered `phase` vectors (one per accepted frame).
|
||||
phases: Vec<Vec<f32>>,
|
||||
/// Buffered `quality_score`s.
|
||||
qualities: Vec<f32>,
|
||||
/// Buffered timestamps (ns).
|
||||
timestamps: Vec<u64>,
|
||||
}
|
||||
|
||||
impl WindowBuffer {
|
||||
/// Create a buffer for `session_id` / `source_id` with the given thresholds.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `max_frames < 2`.
|
||||
pub fn new(
|
||||
session_id: SessionId,
|
||||
source_id: SourceId,
|
||||
max_frames: usize,
|
||||
max_duration_ns: u64,
|
||||
) -> Self {
|
||||
Self::with_config(
|
||||
session_id,
|
||||
source_id,
|
||||
WindowBufferConfig::new(max_frames, max_duration_ns),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a buffer from a [`WindowBufferConfig`].
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `cfg.max_frames < 2`.
|
||||
pub fn with_config(session_id: SessionId, source_id: SourceId, cfg: WindowBufferConfig) -> Self {
|
||||
assert!(cfg.max_frames >= 2, "WindowBuffer max_frames must be >= 2");
|
||||
WindowBuffer {
|
||||
session_id,
|
||||
source_id,
|
||||
cfg,
|
||||
subcarrier_count: None,
|
||||
amplitudes: Vec::new(),
|
||||
phases: Vec::new(),
|
||||
qualities: Vec::new(),
|
||||
timestamps: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of frames currently buffered (not yet emitted as a window).
|
||||
pub fn pending_frame_count(&self) -> usize {
|
||||
self.amplitudes.len()
|
||||
}
|
||||
|
||||
/// Add a frame; returns `Some(window)` if this frame closed a window.
|
||||
///
|
||||
/// Frames are skipped (returning `None`, not buffered) when:
|
||||
/// * `!frame.is_exposable()`,
|
||||
/// * the frame's `session_id` / `source_id` don't match the buffer's, or
|
||||
/// * the frame's `subcarrier_count` differs from the first buffered frame's.
|
||||
pub fn push(&mut self, frame: &CsiFrame, ids: &IdGenerator) -> Option<CsiWindow> {
|
||||
if !frame.is_exposable() {
|
||||
return None;
|
||||
}
|
||||
if frame.session_id != self.session_id || frame.source_id != self.source_id {
|
||||
return None;
|
||||
}
|
||||
match self.subcarrier_count {
|
||||
None => self.subcarrier_count = Some(frame.subcarrier_count),
|
||||
Some(n) if n != frame.subcarrier_count => return None,
|
||||
Some(_) => {}
|
||||
}
|
||||
|
||||
self.amplitudes.push(frame.amplitude.clone());
|
||||
self.phases.push(frame.phase.clone());
|
||||
self.qualities.push(frame.quality_score);
|
||||
self.timestamps.push(frame.timestamp_ns);
|
||||
|
||||
let reached_count = self.amplitudes.len() >= self.cfg.max_frames;
|
||||
let reached_duration = match (self.timestamps.first(), self.timestamps.last()) {
|
||||
(Some(&first), Some(&last)) => last.saturating_sub(first) >= self.cfg.max_duration_ns,
|
||||
_ => false,
|
||||
};
|
||||
if reached_count || reached_duration {
|
||||
Some(self.close(ids))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain whatever is buffered (>= 1 frame) into a final window.
|
||||
///
|
||||
/// Returns `None` when the buffer is empty.
|
||||
pub fn flush(&mut self, ids: &IdGenerator) -> Option<CsiWindow> {
|
||||
if self.amplitudes.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.close(ids))
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the [`CsiWindow`] from the buffered frames and reset the buffer.
|
||||
fn close(&mut self, ids: &IdGenerator) -> CsiWindow {
|
||||
let frame_count = self.amplitudes.len();
|
||||
debug_assert!(frame_count >= 1, "close() called on an empty buffer");
|
||||
let n = self.subcarrier_count.unwrap_or(0) as usize;
|
||||
|
||||
// Per-subcarrier mean amplitude.
|
||||
let mut mean_amplitude = vec![0.0f32; n];
|
||||
for amp in &self.amplitudes {
|
||||
for (slot, a) in mean_amplitude.iter_mut().zip(amp.iter()) {
|
||||
*slot += *a;
|
||||
}
|
||||
}
|
||||
for v in &mut mean_amplitude {
|
||||
*v /= frame_count as f32;
|
||||
}
|
||||
|
||||
// Per-subcarrier population variance of the phase.
|
||||
let mut phase_mean = vec![0.0f32; n];
|
||||
for ph in &self.phases {
|
||||
for (slot, p) in phase_mean.iter_mut().zip(ph.iter()) {
|
||||
*slot += *p;
|
||||
}
|
||||
}
|
||||
for v in &mut phase_mean {
|
||||
*v /= frame_count as f32;
|
||||
}
|
||||
let mut phase_variance = vec![0.0f32; n];
|
||||
for ph in &self.phases {
|
||||
for k in 0..n {
|
||||
let d = ph.get(k).copied().unwrap_or(0.0) - phase_mean[k];
|
||||
phase_variance[k] += d * d;
|
||||
}
|
||||
}
|
||||
for v in &mut phase_variance {
|
||||
*v /= frame_count as f32;
|
||||
}
|
||||
|
||||
// Motion energy: mean over consecutive pairs of ||amp_b - amp_a||_2 / sqrt(n).
|
||||
let motion_energy = if frame_count < 2 || n == 0 {
|
||||
0.0
|
||||
} else {
|
||||
let mut acc = 0.0f64;
|
||||
for w in self.amplitudes.windows(2) {
|
||||
let (a, b) = (&w[0], &w[1]);
|
||||
let mut sq = 0.0f64;
|
||||
for k in 0..n {
|
||||
let d = (b.get(k).copied().unwrap_or(0.0) - a.get(k).copied().unwrap_or(0.0))
|
||||
as f64;
|
||||
sq += d * d;
|
||||
}
|
||||
acc += sq.sqrt() / (n as f64).sqrt();
|
||||
}
|
||||
(acc / (frame_count - 1) as f64) as f32
|
||||
};
|
||||
let motion_energy = if motion_energy.is_finite() && motion_energy >= 0.0 {
|
||||
motion_energy
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
// Presence score: logistic of (motion_energy - threshold).
|
||||
let z = (motion_energy - self.cfg.presence_threshold) * 8.0;
|
||||
let presence_score = (1.0 / (1.0 + (-z).exp())).clamp(0.0, 1.0);
|
||||
|
||||
// Quality score: mean of frame quality scores.
|
||||
let quality_sum: f32 = self.qualities.iter().sum();
|
||||
let quality_score = (quality_sum / frame_count as f32).clamp(0.0, 1.0);
|
||||
|
||||
let start_ns = *self.timestamps.first().unwrap();
|
||||
let raw_end = *self.timestamps.last().unwrap();
|
||||
// Edge case: a single-frame window would have start_ns == end_ns, which
|
||||
// CsiWindow::validate() rejects. Bump the end by 1 ns so it stays valid.
|
||||
let end_ns = if raw_end > start_ns { raw_end } else { start_ns + 1 };
|
||||
|
||||
let window = CsiWindow {
|
||||
window_id: ids.next_window(),
|
||||
session_id: self.session_id,
|
||||
source_id: self.source_id.clone(),
|
||||
start_ns,
|
||||
end_ns,
|
||||
frame_count: frame_count as u32,
|
||||
mean_amplitude,
|
||||
phase_variance,
|
||||
motion_energy,
|
||||
presence_score,
|
||||
quality_score,
|
||||
};
|
||||
debug_assert!(
|
||||
window.validate().is_ok(),
|
||||
"WindowBuffer produced an invalid CsiWindow: {:?}",
|
||||
window.validate()
|
||||
);
|
||||
|
||||
// Reset for the next window.
|
||||
self.subcarrier_count = None;
|
||||
self.amplitudes.clear();
|
||||
self.phases.clear();
|
||||
self.qualities.clear();
|
||||
self.timestamps.clear();
|
||||
|
||||
window
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rvcsi_core::{AdapterKind, FrameId, ValidationStatus};
|
||||
|
||||
fn frame(
|
||||
session: u64,
|
||||
source: &str,
|
||||
frame_id: u64,
|
||||
ts: u64,
|
||||
amp: &[f32],
|
||||
quality: f32,
|
||||
) -> CsiFrame {
|
||||
// Build I/Q so that amplitude == amp and phase == 0.
|
||||
let i: Vec<f32> = amp.to_vec();
|
||||
let q: Vec<f32> = vec![0.0; amp.len()];
|
||||
let mut f = CsiFrame::from_iq(
|
||||
FrameId(frame_id),
|
||||
SessionId(session),
|
||||
SourceId::from(source),
|
||||
AdapterKind::Synthetic,
|
||||
ts,
|
||||
6,
|
||||
20,
|
||||
i,
|
||||
q,
|
||||
);
|
||||
f.validation = ValidationStatus::Accepted;
|
||||
f.quality_score = quality;
|
||||
f
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn closes_after_exactly_max_frames() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 4, u64::MAX);
|
||||
let amp = [1.0f32, 1.0, 1.0];
|
||||
assert!(buf.push(&frame(0, "s", 0, 0, &, 0.9), &g).is_none());
|
||||
assert!(buf.push(&frame(0, "s", 1, 10, &, 0.9), &g).is_none());
|
||||
assert!(buf.push(&frame(0, "s", 2, 20, &, 0.9), &g).is_none());
|
||||
assert_eq!(buf.pending_frame_count(), 3);
|
||||
let w = buf.push(&frame(0, "s", 3, 30, &, 0.9), &g).expect("window");
|
||||
assert_eq!(w.frame_count, 4);
|
||||
assert_eq!(buf.pending_frame_count(), 0);
|
||||
assert!(w.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn closes_on_duration_with_fewer_frames() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 100, 1_000);
|
||||
let amp = [1.0f32, 2.0];
|
||||
assert!(buf.push(&frame(0, "s", 0, 0, &, 0.8), &g).is_none());
|
||||
assert!(buf.push(&frame(0, "s", 1, 500, &, 0.8), &g).is_none());
|
||||
let w = buf
|
||||
.push(&frame(0, "s", 2, 1_000, &, 0.8), &g)
|
||||
.expect("window closed on duration");
|
||||
assert_eq!(w.frame_count, 3);
|
||||
assert_eq!(w.start_ns, 0);
|
||||
assert_eq!(w.end_ns, 1_000);
|
||||
assert!(w.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_returns_remainder_and_handles_single_frame() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 10, u64::MAX);
|
||||
let amp = [1.0f32, 1.0];
|
||||
assert!(buf.push(&frame(0, "s", 0, 100, &, 0.7), &g).is_none());
|
||||
let w = buf.flush(&g).expect("flush returns the single buffered frame");
|
||||
assert_eq!(w.frame_count, 1);
|
||||
assert_eq!(w.start_ns, 100);
|
||||
assert_eq!(w.end_ns, 101); // bumped so validate() passes
|
||||
assert_eq!(w.motion_energy, 0.0);
|
||||
assert!(w.validate().is_ok());
|
||||
assert!(buf.flush(&g).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skips_mismatched_session_and_source() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(7), SourceId::from("good"), 4, u64::MAX);
|
||||
let amp = [1.0f32, 1.0];
|
||||
assert!(buf.push(&frame(7, "good", 0, 0, &, 0.9), &g).is_none());
|
||||
// Wrong session.
|
||||
assert!(buf.push(&frame(8, "good", 1, 10, &, 0.9), &g).is_none());
|
||||
// Wrong source.
|
||||
assert!(buf.push(&frame(7, "bad", 2, 20, &, 0.9), &g).is_none());
|
||||
assert_eq!(buf.pending_frame_count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skips_non_exposable_and_mismatched_subcarrier_count() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 4, u64::MAX);
|
||||
// Non-exposable frame is dropped.
|
||||
let mut bad = frame(0, "s", 0, 0, &[1.0, 1.0], 0.9);
|
||||
bad.validation = ValidationStatus::Pending;
|
||||
assert!(buf.push(&bad, &g).is_none());
|
||||
assert_eq!(buf.pending_frame_count(), 0);
|
||||
// First good frame fixes subcarrier count = 2.
|
||||
assert!(buf.push(&frame(0, "s", 1, 10, &[1.0, 1.0], 0.9), &g).is_none());
|
||||
// Different subcarrier count is dropped.
|
||||
assert!(buf
|
||||
.push(&frame(0, "s", 2, 20, &[1.0, 1.0, 1.0], 0.9), &g)
|
||||
.is_none());
|
||||
assert_eq!(buf.pending_frame_count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identical_frames_have_zero_motion_low_presence() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 8, u64::MAX);
|
||||
let amp = [1.0f32; 32];
|
||||
let mut last = None;
|
||||
for k in 0..8u64 {
|
||||
last = buf.push(&frame(0, "s", k, k * 10, &, 0.9), &g);
|
||||
}
|
||||
let w = last.expect("window");
|
||||
assert_eq!(w.motion_energy, 0.0);
|
||||
assert!(w.presence_score < 0.5, "presence_score = {}", w.presence_score);
|
||||
assert!(w.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn growing_jitter_raises_motion_and_presence() {
|
||||
let g = IdGenerator::new();
|
||||
let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 16, u64::MAX);
|
||||
// Large alternating jitter -> high motion energy.
|
||||
let mut last = None;
|
||||
for k in 0..16u64 {
|
||||
let bump = if k % 2 == 0 { 0.0 } else { 1.0 };
|
||||
let amp: Vec<f32> = (0..32).map(|_| 1.0 + bump).collect();
|
||||
last = buf.push(&frame(0, "s", k, k * 10, &, 0.9), &g);
|
||||
}
|
||||
let w = last.expect("window");
|
||||
assert!(w.motion_energy > 0.1, "motion_energy = {}", w.motion_energy);
|
||||
assert!(w.presence_score > 0.5, "presence_score = {}", w.presence_score);
|
||||
assert!(w.validate().is_ok());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user