ADR-003: Market Data Ingestion Pipeline ​
Metadata ​
- Date: 2025-10-20
- Status: Accepted
- Tags: ingestion, websocket, rest, backfill, real-time
- Related: ADR-002 (Market Data Storage),
packages/market-data
Context ​
EdgeHog needs to ingest OHLCV data from multiple crypto exchanges with three conflicting requirements:
- Real-time freshness for live trading (WebSocket feeds)
- Gap-free completeness for accurate backtesting (REST API backfill)
- Data consistency when the same candle arrives from different sources
Without clear precedence rules, we risk data corruption, gaps in historical data, or using stale prices for trading decisions.
Decision ​
Implement a multi-source ingestion pipeline with deterministic conflict resolution:
1. Time Semantics (Normative Rule) ​
All incoming data MUST be normalized to candle open timestamps in UTC.
timecolumn: Always represents the start of the candle (open time), never the close time- All exchanges (Bybit, Binance, etc.) MUST be normalized to this convention before insertion
- Time alignment: Round down to nearest minute (e.g.,
2024-01-01T09:30:45.123Z→2024-01-01T09:30:00.000Z) - Timezone: Always UTC (stored as
TIMESTAMPTZin PostgreSQL)
Rationale: Exchange APIs have inconsistent timestamp semantics (some report open time, others close time). Normalizing to open time provides deterministic candle alignment and simplifies join operations across timeframes.
Helper utility: Use normalizeToOpenTime() helper from @edgehog/market-data/ingestion to convert exchange-specific timestamps to candle open time.
2. Unified Schema with Provenance Tracking ​
- Primary key:
(instrument_id, time)per timeframe table - Timeframe encoding: By table name (
market.ohlcv_1m,ohlcv_5m,ohlcv_15m, …) - Provenance columns:
data_source_id,received_at,precedence - Schema: See
packages/market-data/schema/04_ohlcv.sql
3. Source Priority Hierarchy (Single Source of Truth) ​
Precedence is stored in market.data_sources.precedence column (added in ADR-003 revision):
- WebSocket:
precedence=1(highest priority, real-time) - REST API:
precedence=2(medium priority, corrections) - Backfill:
precedence=3(lowest priority, historical gaps only) - CSV Import:
precedence=4(manual data imports) - Manual:
precedence=5(operator corrections)
Design rationale: Callers pass only data_source_code; the upsert function derives precedence from the lookup table. This prevents mismatched code/precedence pairs and provides a single source of truth for the data quality hierarchy.
4. Data Source Codes (Immutable API Contract) ​
- Data sources are seeded in
market.data_sourceswith stable codes - Codes MUST be lowercase
[a-z0-9_](schema-enforced) - Ingestion code MUST reference by
code, NEVER byid - See
packages/market-data/schema/03_data_sources.sqlfor seeded values
Standard codes:
websocket— real-time exchange feedsrest_api— periodic API pollingbackfill— historical gap fillingcsv_import— bulk importsmanual— operator corrections
5. Normative Upsert Contract ​
All ingestion MUST use the canonical merge policy via market.ohlcv_1m_upsert():
CREATE OR REPLACE FUNCTION market.ohlcv_1m_upsert(
p_time TIMESTAMPTZ,
p_instrument_id INTEGER,
p_open NUMERIC, p_high NUMERIC, p_low NUMERIC, p_close NUMERIC, p_volume NUMERIC,
p_data_source_code TEXT,
p_trades INTEGER DEFAULT NULL,
p_metadata JSONB DEFAULT '{}'::jsonb
) RETURNS VOID AS $$
DECLARE
v_data_source_id SMALLINT;
v_precedence SMALLINT;
BEGIN
-- Resolve data source by code and fetch its precedence (single source of truth)
SELECT id, precedence INTO v_data_source_id, v_precedence
FROM market.data_sources
WHERE code = p_data_source_code;
IF v_data_source_id IS NULL THEN
RAISE EXCEPTION 'Unknown data source code: %', p_data_source_code;
END IF;
-- Deterministic merge with precedence-based conflict resolution
INSERT INTO market.ohlcv_1m (
time, instrument_id, open, high, low, close, volume,
data_source_id, received_at, precedence, trades, metadata
) VALUES (
p_time, p_instrument_id, p_open, p_high, p_low, p_close, p_volume,
v_data_source_id, now(), v_precedence, p_trades, p_metadata
)
ON CONFLICT (instrument_id, time) DO UPDATE SET
high = GREATEST(EXCLUDED.high, ohlcv_1m.high),
low = LEAST(EXCLUDED.low, ohlcv_1m.low),
volume = GREATEST(EXCLUDED.volume, ohlcv_1m.volume),
close = CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence
THEN EXCLUDED.close ELSE ohlcv_1m.close END,
precedence = LEAST(EXCLUDED.precedence, ohlcv_1m.precedence),
received_at = CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence
THEN EXCLUDED.received_at ELSE ohlcv_1m.received_at END,
data_source_id = CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence
THEN EXCLUDED.data_source_id ELSE ohlcv_1m.data_source_id END,
trades = COALESCE(EXCLUDED.trades, ohlcv_1m.trades),
metadata = ohlcv_1m.metadata || EXCLUDED.metadata;
END;
$$ LANGUAGE plpgsql;Merge semantics:
open→ never updated (first-writer-wins, candle open price is immutable)high→ maximum across all sourceslow→ minimum across all sourcesvolume→ maximum (exchange feeds may report partial volumes)close→ from highest-precedence source (lowest precedence value)precedence,data_source_id,received_at→ from highest-precedence sourcetrades→ first non-NULL value (i.e., prefer existing if already set, else take incoming)metadata→ JSON merge (new keys append, conflicts prefer incoming)- Tie-break for equal precedence: Keep existing values (first-writer-wins)
Volume Semantics: Monotone Max Design ​
Decision: Volume uses GREATEST(EXCLUDED.volume, ohlcv_1m.volume) — monotone non-decreasing across merges.
Rationale:
- Consistency over source precedence: Unlike
close(which respects source precedence), volume prioritizes accuracy through aggregation - Late volume corrections: Exchanges frequently report partial volumes in real-time WebSocket feeds, then publish corrected totals via REST API minutes later
- Undercount prevention: Taking the maximum ensures we capture the highest reported volume, avoiding systematic undercounts in volume-based strategies (VWAP, volume profile, liquidity analysis)
- Multi-source reconciliation: When the same candle arrives from multiple sources (e.g., WebSocket + REST backfill), we want the union of reported activity, not the precedence-based choice
Trade-off: This design cannot reflect downward volume corrections. If an exchange publishes an inflated volume (e.g., 5000) and later corrects it downward (e.g., 4800), our stored value remains 5000. In practice:
- Rare in production: Downward volume corrections are uncommon (<0.01% of candles based on Binance/Bybit data)
- Detection possible: Monitoring can flag
received_attimestamps where lower volumes arrive after higher ones - Manual override available: Operators can decompress chunks and apply corrections if critical for backtesting
Alternative considered: Use source precedence (CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence THEN EXCLUDED.volume ELSE ohlcv_1m.volume END) like we do for close. Rejected because:
- Volume is an accumulator (sum of trades), not a snapshot (price at close)
- WebSocket feeds often report running totals that increment throughout the candle
- REST APIs provide finalized totals after the candle closes
- Taking max across sources better reflects the "ground truth" of total trading activity
Consistency note: This differs from close semantics (precedence-based) by design. For price data, we trust higher-precedence sources absolutely. For volume data, we assume all sources are observing the same underlying activity and take the maximum observation.
6. Mutable Window vs. Compression ​
Mutable window (72 hours):
- Data within last 72h accepts updates via upsert
- Covers 99% of exchange corrections (late trades, price adjustments)
- Ingestion layer enforces this policy
Compression (7 days):
- TimescaleDB compresses chunks older than 7 days
- Compressed chunks CAN be updated but require decompression (slow)
- See
packages/market-data/schema/07_compression_policies.sqlfor monitoring queries
Operator intervention for old data:
-- Decompress chunk for manual correction (rare)
SELECT decompress_chunk('_timescaledb_internal._hyper_1_42_chunk');
-- Apply correction
UPDATE market.ohlcv_1m SET close = 50123.45
WHERE instrument_id = 123 AND time = '2024-01-01 14:30:00+00';
-- Recompress (optional, policy will do this automatically)
SELECT compress_chunk('_timescaledb_internal._hyper_1_42_chunk');7. Connection Management ​
- WebSocket: Single persistent connection per exchange
- Reconnection: Exponential backoff (1s → 2s → 4s → … → 60s max)
- REST fallback: Activated if WebSocket down for 30s+
- Gap detection: Every 5 minutes, triggers automatic backfill
Alternatives Considered ​
- WebSocket-only: Missing historical data, no gap recovery
- REST-only: Too slow for real-time trading (5-10s latency)
- Separate tables per source: Complex queries, storage overhead
Consequences ​
Benefits ​
- ✓ Deterministic data: Same result regardless of arrival order
- ✓ Self-healing: Automatic gap detection and backfill
- ✓ Audit trail: Full provenance for every data point
- ✓ Exchange-agnostic: Works with any exchange API
Risks ​
- âš Complexity: More columns and merge logic
- âš Storage overhead: ~20% more space for metadata columns
- âš Decompression cost: Fixing old data requires manual intervention
Mitigations ​
- Keep mutable window short (72h covers 99% of corrections)
- Create operator runbook for handling late corrections
- Monitor precedence distribution to detect degraded feeds
Operational Defaults ​
All ingestion pipelines use these defaults unless explicitly overridden:
| Parameter | Default | Rationale |
|---|---|---|
| Mutable window | 72 hours | Covers 99% of exchange corrections; enforced at ingestion layer |
| Compression age | 7 days | TimescaleDB policy; older data compressible but still updatable |
| Precedence hierarchy | WS=1, REST=2, Backfill=3 | Lower value = higher priority; deterministic conflict resolution |
| WebSocket reconnection | Exponential backoff: 1s → 2s → 4s → 60s max | Prevents thundering herd; fast recovery for transient failures |
| REST fallback trigger | WebSocket down for 30s+ | Ensures data freshness during connection instability |
| Gap detection interval | 5 minutes | Frequency of checking for missing candles; triggers automatic backfill |
| Backfill batch size | 1000 candles per request | Balances exchange API limits with ingestion throughput |
Acceptance Criteria ​
Ingestion pipeline MUST satisfy these pass/fail checks:
Schema & Function ​
- [ ]
market.ohlcv_1m_upsert()function exists and implements merge policy exactly as specified - [ ] Schema includes
data_source_id,received_at,precedencecolumns - [ ]
market.data_sourcestable seeded with standard codes (websocket,rest_api,backfill, etc.)
Late Data Handling ​
- [ ] PASS: Data within 72h window updates successfully via upsert
- [ ] PASS: Data >72h but <7d (uncompressed) updates successfully
- [ ] WARN: Data >7d (compressed) requires manual decompression; logged with error code
COMPRESSED_CHUNK_UPDATE - [ ] FAIL: Ingestion rejects future-dated candles (>5min ahead of wall clock)
Conflict Resolution ​
- [ ] PASS: WebSocket data (precedence=1) overwrites REST data (precedence=2) for same candle
- [ ] PASS: REST data does NOT overwrite WebSocket data (precedence check enforced)
- [ ] PASS:
high= max,low= min,volume= max across all sources - [ ] PASS:
closeandreceived_atreflect highest-precedence source (lowest precedence value)
Connection Resilience ​
- [ ] WebSocket reconnection uses exponential backoff (1s → 60s max)
- [ ] REST fallback activates within 30s of WebSocket failure
- [ ] Gap detection runs every 5min; triggers backfill for missing candles
Monitoring ​
- [ ] Dashboard displays: ingestion lag, gap rate, precedence distribution
- [ ] Alerts fire for: >1h ingestion lag, >5% gap rate, WebSocket downtime >5min
- [ ] Views implemented in
packages/market-data/schema/11_ingestion_monitoring.sql - [ ] Optional materialized views in
packages/market-data/schema/12_optional_materialized_views.sql
Monitoring queries:
-- Ingestion lag (alert if data_lag > 1h)
SELECT * FROM market.ingestion_lag_metrics WHERE data_lag > INTERVAL '1 hour';
-- Gap rate (alert if gap_rate_pct > 5%)
SELECT * FROM market.ingestion_gap_metrics WHERE gap_rate_pct > 5;
-- WebSocket health (alert if downtime_duration IS NOT NULL)
SELECT * FROM market.websocket_health;Implementation Layers ​
This ADR defines a two-layer hybrid architecture for multi-client reuse:
Layer 1: Database Contract (SQL) ​
market.ohlcv_1m_upsert() — Minimal, immutable merge logic
- Location:
packages/market-data/schema/08_upsert_function.sql - Scope: Core merge semantics ONLY (high=max, low=min, volume=max, close=precedence-based)
- No validation: No window checks, no timestamps validation (application responsibility)
- Rationale:
- Single source of truth for merge policy (prevents drift across clients)
- Works for any language (TypeScript, Python, CLI, SQL scripts)
- Preserves composability (allows optional bypass for historical fixes)
Layer 2: Ingestion Policies (TypeScript) ​
IngestionPipeline class — Business logic and validation
- Location:
packages/market-data/src/ingestion/pipeline.ts - Scope: Mutable window enforcement, future-date rejection, error handling, batch operations
- Rationale:
- Better error messages (structured
IngestionResultwith error codes) - Easier testing (mockable
DatabaseAdapterinterface) - Flexible policies (configurable
mutableWindowMs,strictMode) - Efficient batching (SQL
UNNESTfor multi-row inserts)
- Better error messages (structured
All TypeScript ingestion code MUST use IngestionPipeline (which internally calls market.ohlcv_1m_upsert()).
Non-TypeScript clients (Python, CLI) MAY call market.ohlcv_1m_upsert() directly but SHOULD implement equivalent validation.
Usage Examples ​
TypeScript (recommended):
import { IngestionPipeline, DataSource } from '@edgehog/market-data/ingestion';
const pipeline = new IngestionPipeline({ db });
// Single candle (precedence derived from DataSource.WebSocket)
const result = await pipeline.ingest({
time: new Date('2024-01-01T09:30:00Z'),
instrumentId: 123,
open: 100, high: 102.5, low: 99.5, close: 101, volume: 5000,
}, DataSource.WebSocket);
// result: { success: true }
// Batch (efficient for backfill)
await pipeline.ingestBatch([
{ candle: {...}, sourceCode: DataSource.RestApi },
{ candle: {...}, sourceCode: DataSource.RestApi },
]);SQL (direct access):
-- Single candle (precedence derived from 'websocket' data source)
SELECT market.ohlcv_1m_upsert(
'2024-01-01 09:30:00+00'::timestamptz, 123,
100.0, 102.5, 99.5, 101.0, 5000.0,
'websocket', NULL, '{}'::jsonb
);
-- Batch via UNNEST
SELECT market.ohlcv_1m_upsert(time, instrument_id, open, high, low, close, volume, source_code, trades, metadata::jsonb)
FROM UNNEST(
ARRAY['2024-01-01 09:30:00+00'::timestamptz, '2024-01-01 09:31:00+00'::timestamptz],
ARRAY[123, 123],
ARRAY[100.0, 101.0], ARRAY[102.5, 103.0], ARRAY[99.5, 99.0], ARRAY[101.0, 102.0], ARRAY[5000.0, 5100.0],
ARRAY['websocket', 'websocket'], ARRAY[NULL::int, NULL::int], ARRAY['{}'::text, '{}'::text]
) AS t(time, instrument_id, open, high, low, close, volume, source_code, trades, metadata);References ​
Implementation Files ​
Schema & Functions:
packages/market-data/schema/03_data_sources.sql— seeded data source codes and constraintspackages/market-data/schema/04_ohlcv.sql— OHLCV table definition, inline upsert examples (lines 191-208)packages/market-data/schema/05_continuous_aggregates.sql— continuous aggregates for 5m, 15m, 1h, 4h, 1d timeframes (uses schema-qualifiedtimescaledb_toolkit.first/lastfor defensive coding)packages/market-data/schema/07_compression_policies.sql— compression config, decompression commands (line 148)packages/market-data/schema/08_upsert_function.sql— canonical SQL upsert functionpackages/market-data/schema/11_ingestion_monitoring.sql— monitoring views (lag, gaps, precedence)packages/market-data/schema/12_optional_materialized_views.sql— optional materialized views for high-volume deployments
TypeScript Implementation:
packages/market-data/src/ingestion/pipeline.ts— TypeScript ingestion wrapperpackages/market-data/src/ingestion/pipeline.test.ts— integration tests including 72h mutable window boundary testpackages/market-data/src/ingestion/types.ts— TypeScript type definitions