feat(edge-registry): ADR-102 — surface Cognitum cog catalog via /api/v1/edge/registry (#648)

* feat(edge-registry): ADR-102 — surface Cognitum cog catalog via /api/v1/edge/registry

Adds a new sensing-server endpoint that fetches and caches the canonical
Cognitum app registry at
https://storage.googleapis.com/cognitum-apps/app-registry.json (105 cogs
across 11 categories as of v2.1.0). RuView previously had no live
awareness of the catalog — the README's capability table was hand-
curated and went stale as Cognitum shipped new cogs (the registry was
last updated 6 days ago).

ADR:
* docs/adr/ADR-102-edge-module-registry.md — full design, response
  shape, configuration flags, failure modes, and a 12-row security
  review covering SSRF, response inflation, ?refresh abuse, stale-serve
  semantics, TLS, cache poisoning, JSON-panic resistance, etc.

Code:
* v2/.../edge_registry.rs — EdgeRegistry struct + UreqFetcher +
  MockFetcher trait + 7 unit tests. RwLock<Option<CachedEntry>> with
  stale-on-error fallback. MAX_PAYLOAD_BYTES=8 MiB, 10s wire timeout.
* v2/.../main.rs — constructs Option<Arc<EdgeRegistry>> at startup,
  registers GET /api/v1/edge/registry handler, wires Extension layer.
  Handler runs the blocking ureq fetch via tokio::task::spawn_blocking
  so the async runtime stays free.
* v2/.../cli.rs / main.rs Args — three new flags (per user request to
  "allow the registry to be disabled or changed"):
    --edge-registry-url <URL>       (env RUVIEW_EDGE_REGISTRY_URL)
    --edge-registry-ttl-secs <N>    (env RUVIEW_EDGE_REGISTRY_TTL_SECS)
    --no-edge-registry              (env RUVIEW_NO_EDGE_REGISTRY)
  When --no-edge-registry is set or the URL is empty, the endpoint
  returns 404.

Cargo.toml: adds ureq (rustls), sha2, thiserror as direct deps.

README:
* New collapsed "🧩 Edge Module Catalog" section with the full 105-cog
  table generated from the registry, grouped by category with practical
  one-line descriptions (e.g. "Spots irregular heartbeats and abnormal
  heart rhythms", "Detects walking problems and scores fall risk").
  Links to https://seed.cognitum.one/store and the local appliance
  /cogs page. Sits between the HF model section and How It Works.

Tests (7/7 pass):
  first_call_hits_upstream_and_caches
  ttl_expiry_triggers_refetch
  force_refresh_bypasses_fresh_cache
  stale_serve_on_upstream_failure_after_cached_success
  no_cache_no_upstream_returns_error
  upstream_invalid_json_is_treated_as_error
  upstream_sha256_is_deterministic

Security highlights (full review in ADR-102 §"Security review"):
- The registry is metadata-only; per-cog binary signatures (ADR-100)
  remain the trust root for installs. A compromised registry can
  mislead a human reader but cannot ship malicious binaries.
- 8 MiB cap + 10s timeout + Option<Arc<...>> via Extension layer means
  the endpoint can't be used to exhaust memory or pin tokio threads.
- Stale-on-error responses carry an explicit `stale: true` field so
  upstream outages are visible to consumers rather than silently
  masked.
- Endpoint sits behind the existing RUVIEW_API_TOKEN bearer gate when
  set, otherwise unauthenticated (registry contents are public anyway).

* chore: refresh Cargo.lock for ureq/sha2/thiserror deps added by ADR-102
This commit is contained in:
rUv
2026-05-19 18:08:43 -04:00
committed by GitHub
parent dc7f6cd096
commit 67fec45e61
7 changed files with 858 additions and 0 deletions
Generated
+5
View File
@@ -8441,6 +8441,8 @@ dependencies = [
"once_cell",
"rustls 0.23.37",
"rustls-pki-types",
"serde",
"serde_json",
"url",
"webpki-roots 0.26.11",
]
@@ -9158,12 +9160,15 @@ dependencies = [
"ruvector-mincut",
"serde",
"serde_json",
"sha2",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tower 0.4.13",
"tower-http",
"tracing",
"tracing-subscriber",
"ureq 2.12.1",
"wifi-densepose-signal",
"wifi-densepose-wifiscan",
]
@@ -56,6 +56,15 @@ wifi-densepose-signal = { version = "0.3.0", path = "../wifi-densepose-signal",
midstreamer-temporal-compare = "0.2" # DTW / LCS / Edit-Distance pattern matching
midstreamer-attractor = "0.2" # Lyapunov + regime classification
# ADR-102: Edge Module Registry — fetch the canonical Cognitum cog catalog
# at `https://storage.googleapis.com/cognitum-apps/app-registry.json`,
# cache with TTL, surface via /api/v1/edge/registry. ureq is the smallest
# blocking HTTP client we can use without dragging a tokio HTTP stack in;
# rustls is enabled implicitly via the `tls` default feature.
ureq = { version = "2", default-features = false, features = ["tls", "json"] }
sha2 = "0.10"
thiserror = "1"
[dev-dependencies]
tempfile = "3.10"
# `tower::ServiceExt::oneshot` for in-process Router tests (bearer_auth).
@@ -0,0 +1,379 @@
//! Edge Module Registry — surfaces the canonical Cognitum cog catalog at
//! `https://storage.googleapis.com/cognitum-apps/app-registry.json` through
//! the sensing-server's HTTP surface. See ADR-102 for the design and trust
//! model; see ADR-100 for the underlying cog binary trust model.
//!
//! On-demand fetch + in-process TTL cache. Stale-while-error semantics: if
//! the upstream is unreachable but we have a cached copy, return the cached
//! copy with `stale: true` rather than 503.
use std::io::Read;
use std::sync::RwLock;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
/// Canonical upstream registry URL. Overridable via CLI for air-gapped or
/// mirror deployments.
pub const DEFAULT_UPSTREAM_URL: &str =
"https://storage.googleapis.com/cognitum-apps/app-registry.json";
/// Default cache TTL — the registry updates on a roughly-weekly cadence;
/// one hour of staleness is fine.
pub const DEFAULT_TTL_SECS: u64 = 3600;
/// Wire request timeout. The registry is ~50200 KB; on a healthy network
/// it lands in well under a second.
pub const DEFAULT_FETCH_TIMEOUT_SECS: u64 = 10;
/// Response shape served by `GET /api/v1/edge/registry`. Documented in
/// ADR-102 §"Response shape".
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryResponse {
pub fetched_at: u64,
pub ttl_seconds: u64,
pub stale: bool,
pub upstream_url: String,
pub upstream_sha256: String,
pub registry: Value,
}
/// Internal cache entry.
#[derive(Debug, Clone)]
struct CachedEntry {
payload: Value,
fetched_at_instant: Instant,
fetched_at_unix: u64,
upstream_sha256: String,
}
/// On-demand registry fetcher + cache. Cheap to construct; one instance is
/// shared across all incoming HTTP requests via `Arc<EdgeRegistry>`.
pub struct EdgeRegistry {
cached: RwLock<Option<CachedEntry>>,
ttl: Duration,
upstream_url: String,
fetcher: Box<dyn Fetcher>,
}
/// Pluggable fetcher abstraction — concrete impl is `UreqFetcher`; tests
/// can swap in `MockFetcher` to drive the cache logic without network.
pub trait Fetcher: Send + Sync {
fn fetch(&self, url: &str) -> Result<Vec<u8>, FetcherError>;
}
#[derive(Debug, thiserror::Error)]
pub enum FetcherError {
#[error("network error: {0}")]
Network(String),
#[error("http {status}: {body}")]
Http { status: u16, body: String },
#[error("response too large: {0} bytes")]
TooLarge(usize),
}
/// Cap on the response size to avoid pathological upstream responses
/// chewing through memory. 8 MiB is generous — the v2.1.0 registry is well
/// under 200 KB.
pub const MAX_PAYLOAD_BYTES: usize = 8 * 1024 * 1024;
/// Live `ureq`-backed fetcher.
pub struct UreqFetcher {
timeout: Duration,
}
impl UreqFetcher {
pub fn new(timeout: Duration) -> Self {
Self { timeout }
}
}
impl Default for UreqFetcher {
fn default() -> Self {
Self::new(Duration::from_secs(DEFAULT_FETCH_TIMEOUT_SECS))
}
}
impl Fetcher for UreqFetcher {
fn fetch(&self, url: &str) -> Result<Vec<u8>, FetcherError> {
let agent = ureq::AgentBuilder::new()
.timeout(self.timeout)
.build();
let resp = agent
.get(url)
.call()
.map_err(|e| match e {
ureq::Error::Status(status, r) => FetcherError::Http {
status,
body: r.into_string().unwrap_or_default(),
},
ureq::Error::Transport(t) => FetcherError::Network(t.to_string()),
})?;
let mut reader = resp.into_reader().take((MAX_PAYLOAD_BYTES + 1) as u64);
let mut buf = Vec::with_capacity(64 * 1024);
reader
.read_to_end(&mut buf)
.map_err(|e| FetcherError::Network(e.to_string()))?;
if buf.len() > MAX_PAYLOAD_BYTES {
return Err(FetcherError::TooLarge(buf.len()));
}
Ok(buf)
}
}
impl EdgeRegistry {
pub fn new(upstream_url: impl Into<String>, ttl: Duration) -> Self {
Self::with_fetcher(upstream_url, ttl, Box::new(UreqFetcher::default()))
}
pub fn with_fetcher(
upstream_url: impl Into<String>,
ttl: Duration,
fetcher: Box<dyn Fetcher>,
) -> Self {
Self {
cached: RwLock::new(None),
ttl,
upstream_url: upstream_url.into(),
fetcher,
}
}
/// Return a `RegistryResponse`. Uses the cache if fresh; otherwise
/// re-fetches from upstream. On upstream failure with a non-empty
/// cache, returns the stale copy.
pub fn get(&self, force_refresh: bool) -> Result<RegistryResponse, FetcherError> {
if !force_refresh {
if let Some(entry) = self.fresh_cache_snapshot() {
return Ok(self.response_from(&entry, false));
}
}
// Either no cache, expired, or forced refresh — try upstream.
match self.fetch_and_cache() {
Ok(entry) => Ok(self.response_from(&entry, false)),
Err(e) => {
// Upstream failed — serve stale if available.
if let Some(entry) = self.any_cache_snapshot() {
Ok(self.response_from(&entry, true))
} else {
Err(e)
}
}
}
}
fn fresh_cache_snapshot(&self) -> Option<CachedEntry> {
let guard = self.cached.read().ok()?;
let entry = guard.as_ref()?;
if entry.fetched_at_instant.elapsed() < self.ttl {
Some(entry.clone())
} else {
None
}
}
fn any_cache_snapshot(&self) -> Option<CachedEntry> {
let guard = self.cached.read().ok()?;
guard.clone()
}
fn fetch_and_cache(&self) -> Result<CachedEntry, FetcherError> {
let bytes = self.fetcher.fetch(&self.upstream_url)?;
let payload: Value = serde_json::from_slice(&bytes)
.map_err(|e| FetcherError::Network(format!("invalid upstream JSON: {e}")))?;
let mut hasher = Sha256::new();
hasher.update(&bytes);
let upstream_sha256 = hex_encode(&hasher.finalize());
let now_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = CachedEntry {
payload,
fetched_at_instant: Instant::now(),
fetched_at_unix: now_unix,
upstream_sha256,
};
if let Ok(mut guard) = self.cached.write() {
*guard = Some(entry.clone());
}
Ok(entry)
}
fn response_from(&self, entry: &CachedEntry, stale: bool) -> RegistryResponse {
RegistryResponse {
fetched_at: entry.fetched_at_unix,
ttl_seconds: self.ttl.as_secs(),
stale,
upstream_url: self.upstream_url.clone(),
upstream_sha256: entry.upstream_sha256.clone(),
registry: entry.payload.clone(),
}
}
}
fn hex_encode(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{:02x}", b));
}
s
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
/// Mock fetcher backed by a queue of canned responses. Lets us drive
/// the cache logic deterministically.
struct MockFetcher {
responses: std::sync::Mutex<Vec<Result<Vec<u8>, FetcherError>>>,
call_count: AtomicUsize,
}
impl MockFetcher {
fn new(responses: Vec<Result<Vec<u8>, FetcherError>>) -> Arc<Self> {
Arc::new(Self {
responses: std::sync::Mutex::new(responses),
call_count: AtomicUsize::new(0),
})
}
}
impl Fetcher for Arc<MockFetcher> {
fn fetch(&self, _url: &str) -> Result<Vec<u8>, FetcherError> {
self.call_count.fetch_add(1, Ordering::SeqCst);
let mut q = self.responses.lock().unwrap();
if q.is_empty() {
return Err(FetcherError::Network("mock: queue empty".into()));
}
q.remove(0)
}
}
fn sample_payload() -> Vec<u8> {
br#"{"version":"2.1.0","updated":"2026-05-13","cogs":[]}"#.to_vec()
}
#[test]
fn first_call_hits_upstream_and_caches() {
let fetcher = MockFetcher::new(vec![Ok(sample_payload())]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_secs(3600),
Box::new(fetcher.clone()),
);
let resp = reg.get(false).expect("get");
assert!(!resp.stale);
assert_eq!(resp.registry["version"], "2.1.0");
assert_eq!(fetcher.call_count.load(Ordering::SeqCst), 1);
// Second call within TTL — no new fetch.
let _ = reg.get(false).expect("get");
assert_eq!(fetcher.call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn ttl_expiry_triggers_refetch() {
let fetcher = MockFetcher::new(vec![Ok(sample_payload()), Ok(sample_payload())]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_millis(10), // very short TTL
Box::new(fetcher.clone()),
);
let _ = reg.get(false).expect("first");
std::thread::sleep(Duration::from_millis(30));
let _ = reg.get(false).expect("second after expiry");
assert_eq!(fetcher.call_count.load(Ordering::SeqCst), 2);
}
#[test]
fn force_refresh_bypasses_fresh_cache() {
let fetcher = MockFetcher::new(vec![Ok(sample_payload()), Ok(sample_payload())]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_secs(3600),
Box::new(fetcher.clone()),
);
let _ = reg.get(false).expect("first");
let _ = reg.get(true).expect("refresh");
assert_eq!(fetcher.call_count.load(Ordering::SeqCst), 2);
}
#[test]
fn stale_serve_on_upstream_failure_after_cached_success() {
// First call succeeds and populates the cache. Second call hits upstream
// failure but we still have a cached copy — should serve it with stale=true.
let fetcher = MockFetcher::new(vec![
Ok(sample_payload()),
Err(FetcherError::Network("simulated".into())),
]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_millis(1), // expire quickly so call 2 retries upstream
Box::new(fetcher.clone()),
);
let first = reg.get(false).expect("first");
assert!(!first.stale);
std::thread::sleep(Duration::from_millis(5));
let second = reg.get(false).expect("stale-serve");
assert!(second.stale, "expected stale=true when upstream failed");
assert_eq!(second.registry["version"], "2.1.0");
}
#[test]
fn no_cache_no_upstream_returns_error() {
let fetcher = MockFetcher::new(vec![Err(FetcherError::Network("down".into()))]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_secs(3600),
Box::new(fetcher),
);
let err = reg.get(false).expect_err("should be err");
match err {
FetcherError::Network(_) => {}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn upstream_invalid_json_is_treated_as_error() {
let fetcher = MockFetcher::new(vec![Ok(b"not json".to_vec())]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_secs(3600),
Box::new(fetcher),
);
let err = reg.get(false).expect_err("invalid json");
match err {
FetcherError::Network(msg) => assert!(msg.contains("invalid upstream JSON")),
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn upstream_sha256_is_deterministic() {
let fetcher = MockFetcher::new(vec![Ok(sample_payload())]);
let reg = EdgeRegistry::with_fetcher(
"http://test.invalid/registry.json",
Duration::from_secs(3600),
Box::new(fetcher),
);
let resp = reg.get(false).expect("get");
// SHA-256 of br#"{"version":"2.1.0","updated":"2026-05-13","cogs":[]}"#
let mut hasher = Sha256::new();
hasher.update(&sample_payload());
let expected = hex_encode(&hasher.finalize());
assert_eq!(resp.upstream_sha256, expected);
assert_eq!(resp.upstream_sha256.len(), 64);
}
}
@@ -8,6 +8,7 @@
//! - Real-time CSI introspection / low-latency tap (`introspection`, ADR-099)
pub mod bearer_auth;
pub mod edge_registry;
pub mod host_validation;
pub mod introspection;
pub mod path_safety;
@@ -35,10 +35,13 @@ use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path,
Query,
State,
},
http::StatusCode,
response::{Html, IntoResponse, Json},
routing::{delete, get, post},
Extension,
Router,
};
use clap::Parser;
@@ -181,6 +184,35 @@ struct Args {
/// Start field model calibration on boot (empty room required)
#[arg(long)]
calibrate: bool,
// ---------------------------------------------------------------
// ADR-102: Edge Module Registry — surface the canonical Cognitum
// cog catalog via `GET /api/v1/edge/registry`.
// ---------------------------------------------------------------
/// Override the upstream URL for the edge module registry. Set to a
/// mirror or local file://... URL for air-gapped deployments. Empty
/// string or --no-edge-registry disables the endpoint entirely.
#[arg(
long,
value_name = "URL",
env = "RUVIEW_EDGE_REGISTRY_URL",
default_value = "https://storage.googleapis.com/cognitum-apps/app-registry.json"
)]
edge_registry_url: String,
/// Cache TTL for the edge module registry, in seconds.
#[arg(
long,
value_name = "SECS",
env = "RUVIEW_EDGE_REGISTRY_TTL_SECS",
default_value = "3600"
)]
edge_registry_ttl_secs: u64,
/// Disable the edge module registry endpoint entirely. Returns 404 on
/// `GET /api/v1/edge/registry`. Use for air-gapped deployments.
#[arg(long, env = "RUVIEW_NO_EDGE_REGISTRY")]
no_edge_registry: bool,
}
// ── Data types ───────────────────────────────────────────────────────────────
@@ -3689,6 +3721,67 @@ async fn vital_signs_endpoint(State(state): State<SharedState>) -> Json<serde_js
}))
}
/// Query params for `GET /api/v1/edge/registry`.
#[derive(Debug, Deserialize)]
struct EdgeRegistryParams {
/// `?refresh=1` bypasses the in-process cache. Logged at debug for
/// abuse visibility. ADR-102 §"Cache semantics".
#[serde(default)]
refresh: Option<String>,
}
/// GET /api/v1/edge/registry — surfaces the canonical Cognitum cog catalog.
///
/// See ADR-102 (`docs/adr/ADR-102-edge-module-registry.md`) for the design
/// + trust model + security review.
async fn edge_registry_endpoint(
Extension(reg): Extension<
Option<Arc<wifi_densepose_sensing_server::edge_registry::EdgeRegistry>>,
>,
Query(params): Query<EdgeRegistryParams>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
let Some(reg) = reg else {
// --no-edge-registry, or upstream URL empty.
return Err((
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": "edge_registry_disabled",
"detail": "This sensing-server was started with --no-edge-registry."
})),
));
};
let force_refresh = matches!(params.refresh.as_deref(), Some("1") | Some("true"));
if force_refresh {
tracing::debug!(
event = "edge_registry.refresh_requested",
"?refresh=1 bypassed the cache; verify this isn't being abused"
);
}
match tokio::task::spawn_blocking(move || reg.get(force_refresh)).await {
Ok(Ok(resp)) => Ok(Json(serde_json::to_value(resp).unwrap_or(serde_json::json!({})))),
Ok(Err(err)) => {
tracing::warn!(error = %err, "edge_registry upstream fetch failed and no cache");
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "edge_registry_upstream_unavailable",
"detail": err.to_string()
})),
))
}
Err(join_err) => {
tracing::error!(error = %join_err, "edge_registry spawn_blocking task panicked");
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "edge_registry_internal_error",
"detail": join_err.to_string()
})),
))
}
}
}
/// GET /api/v1/edge-vitals — latest edge vitals from ESP32 (ADR-039).
async fn edge_vitals_endpoint(State(state): State<SharedState>) -> Json<serde_json::Value> {
let s = state.read().await;
@@ -5048,6 +5141,26 @@ async fn main() {
let runtime_config = load_runtime_config(&data_dir);
info!("Loaded runtime config: dedup_factor={:.2}", runtime_config.dedup_factor);
// ADR-102: optional Edge Module Registry. None when --no-edge-registry
// is set (or when the URL is empty); otherwise we construct one with
// the configured TTL. The fetch happens lazily on first request.
let edge_registry: Option<std::sync::Arc<wifi_densepose_sensing_server::edge_registry::EdgeRegistry>> =
if args.no_edge_registry || args.edge_registry_url.is_empty() {
info!("Edge module registry: DISABLED (--no-edge-registry or empty URL)");
None
} else {
info!(
"Edge module registry: enabled — upstream={} ttl={}s",
args.edge_registry_url, args.edge_registry_ttl_secs
);
Some(std::sync::Arc::new(
wifi_densepose_sensing_server::edge_registry::EdgeRegistry::new(
args.edge_registry_url.clone(),
std::time::Duration::from_secs(args.edge_registry_ttl_secs),
),
))
};
let (tx, _) = broadcast::channel::<String>(256);
// ADR-099: parallel broadcast for the per-frame introspection snapshot stream
// consumed by `/ws/introspection`. Same ring size as `tx` (256) — slow
@@ -5242,6 +5355,11 @@ async fn main() {
// Vital sign endpoints
.route("/api/v1/vital-signs", get(vital_signs_endpoint))
.route("/api/v1/edge-vitals", get(edge_vitals_endpoint))
// ADR-102: Edge Module Registry — surfaces the canonical Cognitum cog
// catalog (`https://storage.googleapis.com/cognitum-apps/app-registry.json`)
// with in-process TTL cache + stale-on-error fallback. Disabled when
// --no-edge-registry is set (returns 404).
.route("/api/v1/edge/registry", get(edge_registry_endpoint))
.route("/api/v1/wasm-events", get(wasm_events_endpoint))
// RVF model container info
.route("/api/v1/model/info", get(model_info))
@@ -5292,6 +5410,9 @@ async fn main() {
.route("/api/v1/config/ground-truth", post(config_set_ground_truth))
// Static UI files
.nest_service("/ui", ServeDir::new(&ui_path))
// ADR-102: make the edge registry handle (Option<Arc<EdgeRegistry>>)
// available to the /api/v1/edge/registry handler. None when disabled.
.layer(Extension(edge_registry.clone()))
.layer(SetResponseHeaderLayer::overriding(
axum::http::header::CACHE_CONTROL,
HeaderValue::from_static("no-cache, no-store, must-revalidate"),