~/About~/Foundry~/Blueprint~/Journal~/Projects
Book a Call
Journal

Why Anomaly Detection Can't Block the Ingestion Pipeline

From the Sensor Telemetry Engine system

·8 min read·Kingsley Onoh·View on GitHub

The first version of the anomaly evaluator ran inline. A batch of 100 readings flushed to TimescaleDB, then the same function called evaluate_batch, which fetched alert rules, computed rolling statistics from the continuous aggregates, checked cooldown windows, persisted alerts, published NATS events, and fired HTTP calls to the Notification Hub and Workflow Engine. All of it synchronous. All of it in the same call stack as the NATS consumer loop.

It worked for about thirty seconds. Then the Workflow Engine returned a 502, the reqwest client waited for its default timeout, the batch flush stalled, NATS messages piled up, and the consumer loop fell behind by 3,000 messages before I killed the process.

The lesson was obvious in retrospect: a sensor data pipeline that sustains 5,000 readings per second cannot wait for a downstream HTTP response. The interesting question was what "don't wait" actually means at the architecture level. Making the HTTP call async solves the timeout. It doesn't solve the failure model. I needed every component downstream of the batch insert to be able to fail, silently, without the consumer loop noticing or caring.

The pipeline, end to end

A sensor message arrives on NATS as JSON. The consumer deserializes it, validates the value (must be finite, metric must be non-empty), resolves the tenant via the TenantResolver's DashMap cache, auto-registers the device if unknown, and pushes the reading into the BatchBuffer. When the buffer hits 100 readings or 500 milliseconds elapse, whichever comes first, it flushes.

The flush itself is the only synchronous bottleneck I allow. The execute_insert method builds UNNEST arrays for all seven columns and sends a single INSERT to TimescaleDB. If that fails, it retries once after 100 milliseconds. If the retry fails, it logs at error level and drops the batch. There's no infinite retry loop. No dead-letter queue. The batch is gone.

I made that choice deliberately. At 5,000 readings per second, a retry queue that grows faster than it drains is worse than data loss. Thirty seconds of buffered retries at full throughput means 150,000 queued readings. The server runs in 512MB. The math doesn't work. Drop the batch, log the failure, let the operator investigate. The continuous aggregates will smooth over a missing 100-reading gap, and the anomaly detector uses rolling statistics over minutes, not individual readings.

The buffer swap pattern

The BatchBuffer holds a Vec<ValidatedReading> behind a tokio::sync::Mutex. The critical design detail is in the push method: when the buffer reaches capacity, the code calls std::mem::replace to swap the full Vec with a fresh one, then drops the Mutex guard before executing the INSERT.

let readings = std::mem::replace(&mut *buf, Vec::with_capacity(self.batch_size));
drop(buf); // release lock before DB I/O
let flushed = self.flush_to_db_with_retry(readings).await?;

This matters because the INSERT takes milliseconds. If the lock were held during the INSERT, every concurrent push call would block. At 5,000 messages per second, even a 5ms INSERT means 25 messages queued behind the lock. The swap pattern reduces the critical section to a memory operation, not a network round-trip.

I considered using a lock-free structure like a crossbeam channel or a ring buffer. The Mutex won the trade-off because the batch logic needs to check the current buffer length before deciding to flush, and that check-then-act pattern doesn't map cleanly to a channel. The swap keeps the lock hold time under a microsecond, which is well below the threshold where contention would show up at 5,000 messages per second.

Detection runs post-flush, not inline

After the BatchBuffer returns the flushed readings, the consumer calls run_anomaly_evaluation. This function is the firewall between ingestion and detection. Here's what it looks like:

async fn run_anomaly_evaluation(
    pool: &PgPool,
    nats_client: &async_nats::Client,
    readings: &[ValidatedReading],
    notification_emitter: &NotificationEmitter,
    workflow_trigger: &WorkflowTrigger,
) {
    match evaluate_batch(pool, nats_client, readings, notification_emitter, workflow_trigger).await {
        Ok(alert_ids) => {
            if !alert_ids.is_empty() {
                info!(count = alert_ids.len(), "Anomaly evaluation created alerts");
            }
        }
        Err(e) => {
            warn!(error = %e, "Anomaly evaluation failed, continuing");
        }
    }
}

The return type is (), not Result. Errors are logged at warn level and swallowed. The consumer loop continues processing the next NATS message regardless of whether detection succeeded. A database timeout in the rule query, a serialization failure in the NATS publish, a network error calling the Notification Hub: none of these stop ingestion.

Deduplication before evaluation

The evaluate_batch function doesn't evaluate every reading in the batch. It extracts unique (tenant_id, device_id, metric) tuples, keeping only the latest value for each. A batch of 100 readings from 5 devices reporting 3 metrics each produces 15 unique tuples, not 100. The deduplication uses a HashSet<ReadingKey> and iterates the batch in reverse so the most recent reading for each tuple wins.

This was a performance decision. Fetching alert rules from PostgreSQL and computing rolling deviation statistics from the readings_1m aggregate table costs a query per unique tuple. At 100 queries per batch, the evaluator would be the bottleneck. At 15 queries per batch, it completes in single-digit milliseconds.

Three condition types, one evaluation path

Each alert rule specifies a condition type: above_threshold, below_threshold, or deviation_from_mean. The first two are trivial comparisons. The third runs a SQL query against the readings_1m continuous aggregate, computing avg(avg_value) and stddev(avg_value) over the configured window_minutes. If fewer than 5 buckets exist in the window, the check skips entirely.

That minimum sample count of 5 was the result of a frustrating cold-start problem. When a new device starts reporting, the first few readings have no aggregate history. A threshold of 2 standard deviations from a mean computed from 2 data points fires on everything. The system was generating hundreds of false alerts during device onboarding. Setting the floor at 5 samples eliminated the noise without significantly delaying real anomaly detection. Five minutes of readings at one-per-second gives 5 one-minute aggregate buckets. That's the minimum window before deviation detection activates.

Cooldown prevents alert storms

Without cooldown, a sensor stuck at a high value generates an alert on every batch flush. At 500ms intervals, that's 120 alerts per minute for a single sensor. The check_cooldown function queries the alerts table for any alert with the same rule_id and device_id created within the last N minutes (default 15). If one exists, the alert is suppressed and logged at info level.

The cooldown query hits an index on (rule_id, device_id, created_at). I briefly considered an in-memory cooldown cache (DashMap with TTL, similar to the TenantResolver), but the database-backed approach won because cooldown state needs to survive process restarts. If the service crashes and restarts, a memory-based cooldown would fire duplicate alerts for every rule that was in cooldown before the crash.

Fire-and-forget for ecosystem calls

The Notification Hub and Workflow Engine integrations use the same pattern: serialize the payload, spawn a Tokio task, return immediately. The spawned task makes the HTTP call and logs the result. The caller never sees the response.

tokio::spawn(async move {
    let result = client.post(&url).header("X-API-Key", &api_key).json(&event).send().await;
    match result {
        Ok(resp) => { /* log success or failure status */ }
        Err(e) => { warn!(error = %e, "Failed to send event"); }
    }
});

The Workflow Engine call adds HMAC-SHA256 signing. The compute_signature method takes the secret and raw body bytes, produces a sha256=<hex> signature, and attaches it as X-Hub-Signature-256. If no secret is configured, the header is omitted entirely.

This is at-most-once delivery by design. If the Notification Hub is down, the alert event is lost. I chose this over at-least-once (which would require a persistent outbox or retry queue) because the alerts are persisted in the database regardless. The Notification Hub is a convenience layer for pushing alerts to email or Telegram. An operator who checks the API's /api/alerts endpoint will always see the full alert history, even if the Hub missed a notification.

What I'd redesign

The anomaly evaluator currently runs on the same Tokio runtime as the NATS consumer and the Axum HTTP server. At 5,000 readings per second this is fine. At 50,000, the rule evaluation queries would compete with API queries for the 20-connection database pool. I'd split the evaluator into a separate process that consumes a NATS subject of "batches flushed" events, running its own connection pool. The ingestion binary would publish a summary event after each successful flush instead of calling the evaluator directly.

The other weak point is the fire-and-forget pattern. At scale, lost notifications become a support problem. The fix is an outbox table: persist the notification payload alongside the alert, then have a background worker poll the outbox and deliver with retries. The outbox adds latency and complexity, but it makes notification delivery observable. Right now, the only way to know a notification was lost is to notice a gap in the Notification Hub's event log. That's not good enough for a production system where the person on call needs to trust that critical alerts reach them.

#rust#anomaly-detection#async#failure-isolation#tokio

The architecture behind this essay for Sensor Telemetry Engine

Act II — Blueprint
Act I — Foundry

Get Notified

New system breakdown? You'll know first.