Skip to content

ADR-003: Market Data Ingestion Pipeline ​

Metadata ​

  • Date: 2025-10-20
  • Status: Accepted
  • Tags: ingestion, websocket, rest, backfill, real-time
  • Related: ADR-002 (Market Data Storage), packages/market-data

Context ​

EdgeHog needs to ingest OHLCV data from multiple crypto exchanges with three conflicting requirements:

  1. Real-time freshness for live trading (WebSocket feeds)
  2. Gap-free completeness for accurate backtesting (REST API backfill)
  3. Data consistency when the same candle arrives from different sources

Without clear precedence rules, we risk data corruption, gaps in historical data, or using stale prices for trading decisions.

Decision ​

Implement a multi-source ingestion pipeline with deterministic conflict resolution:

1. Time Semantics (Normative Rule) ​

All incoming data MUST be normalized to candle open timestamps in UTC.

  • time column: Always represents the start of the candle (open time), never the close time
  • All exchanges (Bybit, Binance, etc.) MUST be normalized to this convention before insertion
  • Time alignment: Round down to nearest minute (e.g., 2024-01-01T09:30:45.123Z → 2024-01-01T09:30:00.000Z)
  • Timezone: Always UTC (stored as TIMESTAMPTZ in PostgreSQL)

Rationale: Exchange APIs have inconsistent timestamp semantics (some report open time, others close time). Normalizing to open time provides deterministic candle alignment and simplifies join operations across timeframes.

Helper utility: Use normalizeToOpenTime() helper from @edgehog/market-data/ingestion to convert exchange-specific timestamps to candle open time.

2. Unified Schema with Provenance Tracking ​

  • Primary key: (instrument_id, time) per timeframe table
  • Timeframe encoding: By table name (market.ohlcv_1m, ohlcv_5m, ohlcv_15m, …)
  • Provenance columns: data_source_id, received_at, precedence
  • Schema: See packages/market-data/schema/04_ohlcv.sql

3. Source Priority Hierarchy (Single Source of Truth) ​

Precedence is stored in market.data_sources.precedence column (added in ADR-003 revision):

  • WebSocket: precedence=1 (highest priority, real-time)
  • REST API: precedence=2 (medium priority, corrections)
  • Backfill: precedence=3 (lowest priority, historical gaps only)
  • CSV Import: precedence=4 (manual data imports)
  • Manual: precedence=5 (operator corrections)

Design rationale: Callers pass only data_source_code; the upsert function derives precedence from the lookup table. This prevents mismatched code/precedence pairs and provides a single source of truth for the data quality hierarchy.

4. Data Source Codes (Immutable API Contract) ​

  • Data sources are seeded in market.data_sources with stable codes
  • Codes MUST be lowercase [a-z0-9_] (schema-enforced)
  • Ingestion code MUST reference by code, NEVER by id
  • See packages/market-data/schema/03_data_sources.sql for seeded values

Standard codes:

  • websocket — real-time exchange feeds
  • rest_api — periodic API polling
  • backfill — historical gap filling
  • csv_import — bulk imports
  • manual — operator corrections

5. Normative Upsert Contract ​

All ingestion MUST use the canonical merge policy via market.ohlcv_1m_upsert():

sql
CREATE OR REPLACE FUNCTION market.ohlcv_1m_upsert(
  p_time TIMESTAMPTZ,
  p_instrument_id INTEGER,
  p_open NUMERIC, p_high NUMERIC, p_low NUMERIC, p_close NUMERIC, p_volume NUMERIC,
  p_data_source_code TEXT,
  p_trades INTEGER DEFAULT NULL,
  p_metadata JSONB DEFAULT '{}'::jsonb
) RETURNS VOID AS $$
DECLARE
  v_data_source_id SMALLINT;
  v_precedence SMALLINT;
BEGIN
  -- Resolve data source by code and fetch its precedence (single source of truth)
  SELECT id, precedence INTO v_data_source_id, v_precedence
  FROM market.data_sources
  WHERE code = p_data_source_code;

  IF v_data_source_id IS NULL THEN
    RAISE EXCEPTION 'Unknown data source code: %', p_data_source_code;
  END IF;

  -- Deterministic merge with precedence-based conflict resolution
  INSERT INTO market.ohlcv_1m (
    time, instrument_id, open, high, low, close, volume,
    data_source_id, received_at, precedence, trades, metadata
  ) VALUES (
    p_time, p_instrument_id, p_open, p_high, p_low, p_close, p_volume,
    v_data_source_id, now(), v_precedence, p_trades, p_metadata
  )
  ON CONFLICT (instrument_id, time) DO UPDATE SET
    high = GREATEST(EXCLUDED.high, ohlcv_1m.high),
    low = LEAST(EXCLUDED.low, ohlcv_1m.low),
    volume = GREATEST(EXCLUDED.volume, ohlcv_1m.volume),
    close = CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence
                 THEN EXCLUDED.close ELSE ohlcv_1m.close END,
    precedence = LEAST(EXCLUDED.precedence, ohlcv_1m.precedence),
    received_at = CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence
                       THEN EXCLUDED.received_at ELSE ohlcv_1m.received_at END,
    data_source_id = CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence
                          THEN EXCLUDED.data_source_id ELSE ohlcv_1m.data_source_id END,
    trades = COALESCE(EXCLUDED.trades, ohlcv_1m.trades),
    metadata = ohlcv_1m.metadata || EXCLUDED.metadata;
END;
$$ LANGUAGE plpgsql;

Merge semantics:

  • open → never updated (first-writer-wins, candle open price is immutable)
  • high → maximum across all sources
  • low → minimum across all sources
  • volume → maximum (exchange feeds may report partial volumes)
  • close → from highest-precedence source (lowest precedence value)
  • precedence, data_source_id, received_at → from highest-precedence source
  • trades → first non-NULL value (i.e., prefer existing if already set, else take incoming)
  • metadata → JSON merge (new keys append, conflicts prefer incoming)
  • Tie-break for equal precedence: Keep existing values (first-writer-wins)

Volume Semantics: Monotone Max Design ​

Decision: Volume uses GREATEST(EXCLUDED.volume, ohlcv_1m.volume) — monotone non-decreasing across merges.

Rationale:

  • Consistency over source precedence: Unlike close (which respects source precedence), volume prioritizes accuracy through aggregation
  • Late volume corrections: Exchanges frequently report partial volumes in real-time WebSocket feeds, then publish corrected totals via REST API minutes later
  • Undercount prevention: Taking the maximum ensures we capture the highest reported volume, avoiding systematic undercounts in volume-based strategies (VWAP, volume profile, liquidity analysis)
  • Multi-source reconciliation: When the same candle arrives from multiple sources (e.g., WebSocket + REST backfill), we want the union of reported activity, not the precedence-based choice

Trade-off: This design cannot reflect downward volume corrections. If an exchange publishes an inflated volume (e.g., 5000) and later corrects it downward (e.g., 4800), our stored value remains 5000. In practice:

  • Rare in production: Downward volume corrections are uncommon (<0.01% of candles based on Binance/Bybit data)
  • Detection possible: Monitoring can flag received_at timestamps where lower volumes arrive after higher ones
  • Manual override available: Operators can decompress chunks and apply corrections if critical for backtesting

Alternative considered: Use source precedence (CASE WHEN EXCLUDED.precedence < ohlcv_1m.precedence THEN EXCLUDED.volume ELSE ohlcv_1m.volume END) like we do for close. Rejected because:

  • Volume is an accumulator (sum of trades), not a snapshot (price at close)
  • WebSocket feeds often report running totals that increment throughout the candle
  • REST APIs provide finalized totals after the candle closes
  • Taking max across sources better reflects the "ground truth" of total trading activity

Consistency note: This differs from close semantics (precedence-based) by design. For price data, we trust higher-precedence sources absolutely. For volume data, we assume all sources are observing the same underlying activity and take the maximum observation.

6. Mutable Window vs. Compression ​

Mutable window (72 hours):

  • Data within last 72h accepts updates via upsert
  • Covers 99% of exchange corrections (late trades, price adjustments)
  • Ingestion layer enforces this policy

Compression (7 days):

  • TimescaleDB compresses chunks older than 7 days
  • Compressed chunks CAN be updated but require decompression (slow)
  • See packages/market-data/schema/07_compression_policies.sql for monitoring queries

Operator intervention for old data:

sql
-- Decompress chunk for manual correction (rare)
SELECT decompress_chunk('_timescaledb_internal._hyper_1_42_chunk');

-- Apply correction
UPDATE market.ohlcv_1m SET close = 50123.45
WHERE instrument_id = 123 AND time = '2024-01-01 14:30:00+00';

-- Recompress (optional, policy will do this automatically)
SELECT compress_chunk('_timescaledb_internal._hyper_1_42_chunk');

7. Connection Management ​

  • WebSocket: Single persistent connection per exchange
  • Reconnection: Exponential backoff (1s → 2s → 4s → … → 60s max)
  • REST fallback: Activated if WebSocket down for 30s+
  • Gap detection: Every 5 minutes, triggers automatic backfill

Alternatives Considered ​

  • WebSocket-only: Missing historical data, no gap recovery
  • REST-only: Too slow for real-time trading (5-10s latency)
  • Separate tables per source: Complex queries, storage overhead

Consequences ​

Benefits ​

  • ✓ Deterministic data: Same result regardless of arrival order
  • ✓ Self-healing: Automatic gap detection and backfill
  • ✓ Audit trail: Full provenance for every data point
  • ✓ Exchange-agnostic: Works with any exchange API

Risks ​

  • âš  Complexity: More columns and merge logic
  • âš  Storage overhead: ~20% more space for metadata columns
  • âš  Decompression cost: Fixing old data requires manual intervention

Mitigations ​

  • Keep mutable window short (72h covers 99% of corrections)
  • Create operator runbook for handling late corrections
  • Monitor precedence distribution to detect degraded feeds

Operational Defaults ​

All ingestion pipelines use these defaults unless explicitly overridden:

ParameterDefaultRationale
Mutable window72 hoursCovers 99% of exchange corrections; enforced at ingestion layer
Compression age7 daysTimescaleDB policy; older data compressible but still updatable
Precedence hierarchyWS=1, REST=2, Backfill=3Lower value = higher priority; deterministic conflict resolution
WebSocket reconnectionExponential backoff: 1s → 2s → 4s → 60s maxPrevents thundering herd; fast recovery for transient failures
REST fallback triggerWebSocket down for 30s+Ensures data freshness during connection instability
Gap detection interval5 minutesFrequency of checking for missing candles; triggers automatic backfill
Backfill batch size1000 candles per requestBalances exchange API limits with ingestion throughput

Acceptance Criteria ​

Ingestion pipeline MUST satisfy these pass/fail checks:

Schema & Function ​

  • [ ] market.ohlcv_1m_upsert() function exists and implements merge policy exactly as specified
  • [ ] Schema includes data_source_id, received_at, precedence columns
  • [ ] market.data_sources table seeded with standard codes (websocket, rest_api, backfill, etc.)

Late Data Handling ​

  • [ ] PASS: Data within 72h window updates successfully via upsert
  • [ ] PASS: Data >72h but <7d (uncompressed) updates successfully
  • [ ] WARN: Data >7d (compressed) requires manual decompression; logged with error code COMPRESSED_CHUNK_UPDATE
  • [ ] FAIL: Ingestion rejects future-dated candles (>5min ahead of wall clock)

Conflict Resolution ​

  • [ ] PASS: WebSocket data (precedence=1) overwrites REST data (precedence=2) for same candle
  • [ ] PASS: REST data does NOT overwrite WebSocket data (precedence check enforced)
  • [ ] PASS: high = max, low = min, volume = max across all sources
  • [ ] PASS: close and received_at reflect highest-precedence source (lowest precedence value)

Connection Resilience ​

  • [ ] WebSocket reconnection uses exponential backoff (1s → 60s max)
  • [ ] REST fallback activates within 30s of WebSocket failure
  • [ ] Gap detection runs every 5min; triggers backfill for missing candles

Monitoring ​

  • [ ] Dashboard displays: ingestion lag, gap rate, precedence distribution
  • [ ] Alerts fire for: >1h ingestion lag, >5% gap rate, WebSocket downtime >5min
  • [ ] Views implemented in packages/market-data/schema/11_ingestion_monitoring.sql
  • [ ] Optional materialized views in packages/market-data/schema/12_optional_materialized_views.sql

Monitoring queries:

sql
-- Ingestion lag (alert if data_lag > 1h)
SELECT * FROM market.ingestion_lag_metrics WHERE data_lag > INTERVAL '1 hour';

-- Gap rate (alert if gap_rate_pct > 5%)
SELECT * FROM market.ingestion_gap_metrics WHERE gap_rate_pct > 5;

-- WebSocket health (alert if downtime_duration IS NOT NULL)
SELECT * FROM market.websocket_health;

Implementation Layers ​

This ADR defines a two-layer hybrid architecture for multi-client reuse:

Layer 1: Database Contract (SQL) ​

market.ohlcv_1m_upsert() — Minimal, immutable merge logic

  • Location: packages/market-data/schema/08_upsert_function.sql
  • Scope: Core merge semantics ONLY (high=max, low=min, volume=max, close=precedence-based)
  • No validation: No window checks, no timestamps validation (application responsibility)
  • Rationale:
    • Single source of truth for merge policy (prevents drift across clients)
    • Works for any language (TypeScript, Python, CLI, SQL scripts)
    • Preserves composability (allows optional bypass for historical fixes)

Layer 2: Ingestion Policies (TypeScript) ​

IngestionPipeline class — Business logic and validation

  • Location: packages/market-data/src/ingestion/pipeline.ts
  • Scope: Mutable window enforcement, future-date rejection, error handling, batch operations
  • Rationale:
    • Better error messages (structured IngestionResult with error codes)
    • Easier testing (mockable DatabaseAdapter interface)
    • Flexible policies (configurable mutableWindowMs, strictMode)
    • Efficient batching (SQL UNNEST for multi-row inserts)

All TypeScript ingestion code MUST use IngestionPipeline (which internally calls market.ohlcv_1m_upsert()).

Non-TypeScript clients (Python, CLI) MAY call market.ohlcv_1m_upsert() directly but SHOULD implement equivalent validation.

Usage Examples ​

TypeScript (recommended):

typescript
import { IngestionPipeline, DataSource } from '@edgehog/market-data/ingestion';

const pipeline = new IngestionPipeline({ db });

// Single candle (precedence derived from DataSource.WebSocket)
const result = await pipeline.ingest({
  time: new Date('2024-01-01T09:30:00Z'),
  instrumentId: 123,
  open: 100, high: 102.5, low: 99.5, close: 101, volume: 5000,
}, DataSource.WebSocket);
// result: { success: true }

// Batch (efficient for backfill)
await pipeline.ingestBatch([
  { candle: {...}, sourceCode: DataSource.RestApi },
  { candle: {...}, sourceCode: DataSource.RestApi },
]);

SQL (direct access):

sql
-- Single candle (precedence derived from 'websocket' data source)
SELECT market.ohlcv_1m_upsert(
  '2024-01-01 09:30:00+00'::timestamptz, 123,
  100.0, 102.5, 99.5, 101.0, 5000.0,
  'websocket', NULL, '{}'::jsonb
);

-- Batch via UNNEST
SELECT market.ohlcv_1m_upsert(time, instrument_id, open, high, low, close, volume, source_code, trades, metadata::jsonb)
FROM UNNEST(
  ARRAY['2024-01-01 09:30:00+00'::timestamptz, '2024-01-01 09:31:00+00'::timestamptz],
  ARRAY[123, 123],
  ARRAY[100.0, 101.0], ARRAY[102.5, 103.0], ARRAY[99.5, 99.0], ARRAY[101.0, 102.0], ARRAY[5000.0, 5100.0],
  ARRAY['websocket', 'websocket'], ARRAY[NULL::int, NULL::int], ARRAY['{}'::text, '{}'::text]
) AS t(time, instrument_id, open, high, low, close, volume, source_code, trades, metadata);

References ​

Implementation Files ​

Schema & Functions:

  • packages/market-data/schema/03_data_sources.sql — seeded data source codes and constraints
  • packages/market-data/schema/04_ohlcv.sql — OHLCV table definition, inline upsert examples (lines 191-208)
  • packages/market-data/schema/05_continuous_aggregates.sql — continuous aggregates for 5m, 15m, 1h, 4h, 1d timeframes (uses schema-qualified timescaledb_toolkit.first/last for defensive coding)
  • packages/market-data/schema/07_compression_policies.sql — compression config, decompression commands (line 148)
  • packages/market-data/schema/08_upsert_function.sql — canonical SQL upsert function
  • packages/market-data/schema/11_ingestion_monitoring.sql — monitoring views (lag, gaps, precedence)
  • packages/market-data/schema/12_optional_materialized_views.sql — optional materialized views for high-volume deployments

TypeScript Implementation:

  • packages/market-data/src/ingestion/pipeline.ts — TypeScript ingestion wrapper
  • packages/market-data/src/ingestion/pipeline.test.ts — integration tests including 72h mutable window boundary test
  • packages/market-data/src/ingestion/types.ts — TypeScript type definitions

External Documentation ​