Data Ingestion
Building the Foundation of Observability
Why Data Ingestion Matters
Every monitoring system starts with a simple question: what are we measuring? The ingestion layer answers this by collecting raw metrics from multiple sources, normalizing them into a consistent format, and aggregating them into queryable time windows.
The ingestion layer defines your observability coverage. If you don't collect a metric, you can't detect anomalies in it. If you collect it wrong (misaligned timestamps, mixed units, missing tags), your detectors will produce garbage results.
The principle: Collect everything, normalize aggressively, aggregate thoughtfully.
Anatomy of a Metric
Every metric in the system follows a consistent structure:
| Component | Purpose | Example |
|---|---|---|
| Name | Unique identifier for the metric stream | `api_latency_api_search` |
| Source | Which collector produced it | `"api-latency"` |
| Unit | What the values represent | `"ms"`, `"percent"`, `"count"` |
| Tags | Dimensional metadata for filtering | `{ endpoint: "/api/search", host: "web-1" }` |
| Points | The time series data | `[{ timestamp: "2026-04-10T00:00:00Z", value: 45.2 }, ...]` |
interface Metric {
name: string;
source: string;
unit: string;
tags: Record<string, string>;
points: TimeSeriesPoint[];
}The Collector Pattern
Each data source has its own collector — a function that reads raw data and transforms it into one or more Metric objects. This pattern makes it trivial to add new data sources:
// src/collectors/api-latency.ts
export function collectApiLatency(): Metric[] {
const raw = JSON.parse(readFileSync("data/api-latency.json", "utf-8"));
const byEndpoint = new Map<string, TimeSeriesPoint[]>();
for (const point of raw) {
const existing = byEndpoint.get(point.endpoint) ?? [];
existing.push({ timestamp: point.timestamp, value: point.value });
byEndpoint.set(point.endpoint, existing);
}
return Array.from(byEndpoint.entries()).map(([endpoint, points]) => ({
name: `api_latency_${endpoint}`,
source: "api-latency",
unit: "ms",
tags: { endpoint },
points,
}));
}Adding a new data source is one file plus one line in the index. No other code changes needed.
Timestamp Normalization
Different data sources report timestamps at different resolutions — some per-second, some per-minute, some per-hour. Before downstream analysis can work, timestamps must be aligned:
function normalizeTimestamp(ts: string): string {
const d = new Date(ts);
d.setSeconds(0, 0); // Align to minute boundary
return d.toISOString();
}This is critical for joins. When the detector compares API latency with CPU usage, both metrics need to be on the same time axis.
Windowing
Raw metrics can be extremely noisy. A single slow database query in a 1-second sample looks catastrophic but means nothing at the hour level. Windowing trades temporal resolution for statistical stability.
export function applyWindow(
points: TimeSeriesPoint[],
windowMinutes: number
): TimeSeriesPoint[] {
// Group points into windows, compute average for each
}The window size is the first tuning parameter you'll encounter:
Production systems like Prometheus store multiple resolutions simultaneously — raw (15s), 5m, 1h, 1d — so you can zoom in or out without re-ingesting.
Synthetic Data Design
The project's data contains carefully injected anomalies at known timestamps:
| Day | Event | Metrics Affected |
|---|---|---|
| Day 7 | Brief error burst | order latency, error rate |
| Day 12 | Bad deployment (search) | search latency (3x spike), error rate |
| Day 20-23 | Gradual DB drift | checkout latency (creeping up) |
| Day 25 | Platform-wide incident | all latency, error rate, CPU, memory |
These known anomalies serve as ground truth for evaluating your detectors. A good detector catches all four; a great detector catches them with low false positives on the other 26 days.
This is chapter 1 of AI Anomaly Detection.
Get the full hands-on course — free during early access. Build the complete system. Your projects become your portfolio.
View course details