Data Sources¶
SignalFlow provides async clients and loaders for downloading historical OHLCV data from cryptocurrency exchanges.
Architecture Overview¶
The data module follows a clear separation of concerns:
flowchart LR
A[Exchange API] --> B[RawDataSource]
B --> C[RawDataLoader]
C --> D[RawDataStore]
D --> E[RawData]
style A fill:#2563eb,stroke:#3b82f6,stroke-width:2px,color:#fff
style B fill:#ea580c,stroke:#f97316,stroke-width:2px,color:#fff
style C fill:#16a34a,stroke:#22c55e,stroke-width:2px,color:#fff
style D fill:#dc2626,stroke:#ef4444,stroke-width:2px,color:#fff
style E fill:#7c3aed,stroke:#8b5cf6,stroke-width:2px,color:#fff
| Component | Responsibility |
|---|---|
RawDataSource |
API client (fetch klines, handle rate limits) |
RawDataLoader |
Orchestrates download/sync to storage |
RawDataStore |
Persists data (DuckDB, SQLite, PostgreSQL) |
RawData |
Immutable container for strategy consumption |
Built-in Exchange Sources¶
Binance¶
from signalflow.data.source import BinanceClient, BinanceSpotLoader
from signalflow.data.raw_store import DuckDbRawStore
from datetime import datetime
from pathlib import Path
# Direct API client usage
async with BinanceClient() as client:
klines = await client.get_klines("BTCUSDT", "1h")
# With date range
klines = await client.get_klines_range(
pair="BTCUSDT",
timeframe="1m",
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 31),
)
# Using loader for automated sync
store = DuckDbRawStore(db_path=Path("data/spot.duckdb"))
loader = BinanceSpotLoader(store_path=Path("data/spot.duckdb"))
await loader.sync(
pairs=["BTCUSDT", "ETHUSDT"],
days=30,
fill_gaps=True,
)
Available loaders:
| Loader | Market Type | Base URL |
|---|---|---|
BinanceSpotLoader |
Spot | api.binance.com |
BinanceFuturesUsdtLoader |
USDT-M Futures | fapi.binance.com |
BinanceFuturesCoinLoader |
COIN-M Futures | dapi.binance.com |
Bybit¶
from signalflow.data.source import BybitClient, BybitSpotLoader
async with BybitClient() as client:
klines = await client.get_klines("BTCUSDT", timeframe="1h")
klines = await client.get_klines_range(
pair="BTCUSDT",
category="spot", # or "linear" for futures
timeframe="1m",
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 31),
)
| Loader | Category | Symbols |
|---|---|---|
BybitSpotLoader |
spot |
BTCUSDT, ETHUSDT |
BybitFuturesLoader |
linear |
BTCUSDT, ETHUSDT |
BybitFuturesInverseLoader |
inverse |
BTCUSD, ETHUSD |
OKX¶
from signalflow.data.source import OkxClient, OkxSpotLoader
async with OkxClient() as client:
# OKX uses "BTC-USDT" format
klines = await client.get_klines("BTC-USDT", timeframe="1h")
| Loader | Instrument Suffix |
|---|---|
OkxSpotLoader |
(none) |
OkxFuturesLoader |
-SWAP |
Deribit¶
from signalflow.data.source import DeribitClient, DeribitFuturesLoader
async with DeribitClient() as client:
# Deribit uses BTC-PERPETUAL format
klines = await client.get_klines("BTC-PERPETUAL", timeframe="1h")
# Get available instruments
pairs = await client.get_pairs(currency="BTC", kind="future")
| Loader | Instruments |
|---|---|
DeribitFuturesLoader |
BTC-PERPETUAL, ETH-PERPETUAL |
Kraken¶
from signalflow.data.source import KrakenClient, KrakenSpotLoader
async with KrakenClient() as client:
# Kraken spot uses XXBTZUSD format (auto-converted)
klines = await client.get_spot_klines("XXBTZUSD", timeframe="1h")
# Kraken futures uses pi_xbtusd format
klines = await client.get_futures_klines("PI_XBTUSD", timeframe="1h")
| Loader | Market Type | Symbols |
|---|---|---|
KrakenSpotLoader |
Spot | XXBTZUSD, XETHZUSD |
KrakenFuturesLoader |
Futures | PI_XBTUSD, PI_ETHUSD |
Kraken timestamps
Kraken uses seconds for timestamps (not milliseconds like most exchanges).
Hyperliquid¶
from signalflow.data.source import HyperliquidClient, HyperliquidFuturesLoader
async with HyperliquidClient() as client:
# Hyperliquid uses simple coin names: BTC, ETH
klines = await client.get_klines("BTC", timeframe="1h")
# Get all available coins
coins = await client.get_pairs()
| Loader | Market Type |
|---|---|
HyperliquidFuturesLoader |
Perpetuals (DEX) |
Hyperliquid limits
Maximum 5000 historical candles available.
WhiteBIT¶
from signalflow.data.source import WhitebitClient, WhitebitSpotLoader, WhitebitFuturesLoader
async with WhitebitClient() as client:
# WhiteBIT uses BTC_USDT format
klines = await client.get_klines("BTC_USDT", timeframe="1h")
# Futures use BTC_PERP format
futures_pairs = await client.get_futures_pairs()
| Loader | Market Type | Symbols |
|---|---|---|
WhitebitSpotLoader |
Spot | BTC_USDT, ETH_USDT |
WhitebitFuturesLoader |
Futures | BTC_PERP, ETH_PERP |
WhiteBIT timestamps
WhiteBIT uses seconds for timestamps (not milliseconds).
Virtual Data Provider¶
For testing without API access:
from signalflow.data.source import VirtualDataProvider, generate_ohlcv
from signalflow.data.raw_store import DuckDbRawStore
from pathlib import Path
store = DuckDbRawStore(db_path=Path("test.duckdb"))
# Generate synthetic data
provider = VirtualDataProvider(store=store, seed=42)
provider.download(pairs=["BTCUSDT", "ETHUSDT"], n_bars=10_000)
# Or generate in-memory
ohlcv_data = generate_ohlcv(n_bars=1000, seed=42)
Creating a Custom Source¶
Step 1: Implement RawDataSource¶
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import aiohttp
import signalflow as sf
from signalflow.data.source.base import RawDataSource
from signalflow.data.source._helpers import dt_to_ms_utc, ms_to_dt_utc_naive
@dataclass
@sf.data_source("myexchange")
class MyExchangeClient(RawDataSource):
"""Async client for custom exchange REST API."""
base_url: str = "https://api.myexchange.com"
max_retries: int = 3
_session: Optional[aiohttp.ClientSession] = field(default=None, repr=False)
async def __aenter__(self):
self._session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
self._session = None
async def get_klines(
self,
pair: str,
timeframe: str = "1m",
start_time: Optional[datetime] = None,
) -> list[dict]:
"""Fetch OHLCV data from exchange."""
if self._session is None:
raise RuntimeError("Must be used as async context manager")
interval_map = {"1m": "1", "5m": "5", "15m": "15", "1h": "60", "4h": "240", "1d": "1440"}
interval = interval_map.get(timeframe)
if interval is None:
raise ValueError(f"Unsupported timeframe: {timeframe}")
params = {"symbol": pair, "interval": interval}
if start_time:
params["startTime"] = dt_to_ms_utc(start_time)
async with self._session.get(
f"{self.base_url}/api/v1/klines",
params=params,
) as resp:
if resp.status != 200:
raise RuntimeError(f"API error: {resp.status}")
data = await resp.json()
# Transform to canonical format (close time convention)
klines = []
for row in data:
# Assuming response: [open_time_ms, o, h, l, c, volume]
open_time_ms = int(row[0])
tf_ms = int(interval) * 60 * 1000
close_time_ms = open_time_ms + tf_ms
klines.append({
"timestamp": ms_to_dt_utc_naive(close_time_ms),
"open": float(row[1]),
"high": float(row[2]),
"low": float(row[3]),
"close": float(row[4]),
"volume": float(row[5]),
"trades": 0, # Not provided
})
return klines
Step 2: Implement RawDataLoader¶
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import signalflow as sf
from signalflow.data.source.base import RawDataLoader
from signalflow.data.raw_store import DuckDbSpotStore
@dataclass
@sf.data_loader("myexchange/spot")
class MyExchangeSpotLoader(RawDataLoader):
"""Loader for custom exchange spot data."""
store: DuckDbSpotStore = field(
default_factory=lambda: DuckDbSpotStore(db_path=Path("data/myexchange.duckdb"))
)
timeframe: str = "1m"
async def get_pairs(self) -> list[str]:
"""Get available trading pairs."""
async with MyExchangeClient() as client:
# Implement pair fetching
return ["BTCUSDT", "ETHUSDT"]
async def download(
self,
pairs: list[str],
days: Optional[int] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
fill_gaps: bool = True,
):
"""Download historical data."""
if end is None:
end = datetime.utcnow()
if start is None:
start = end - timedelta(days=days or 7)
async with MyExchangeClient() as client:
for pair in pairs:
klines = await client.get_klines(
pair=pair,
timeframe=self.timeframe,
start_time=start,
)
# Filter by end date
klines = [k for k in klines if k["timestamp"] <= end]
if klines:
self.store.insert_klines(pair, klines)
print(f"Downloaded {len(klines)} klines for {pair}")
async def sync(
self,
pairs: list[str],
update_interval_sec: int = 60,
):
"""Continuously sync latest data."""
# Implementation...
pass
Step 3: Register and Use¶
from signalflow.core.registry import default_registry
from signalflow.core.enums import SfComponentType
# Components are auto-registered via semantic decorators (e.g., @sf.data_source)
# Or register manually:
default_registry.register(
SfComponentType.RAW_DATA_SOURCE,
"myexchange",
MyExchangeClient,
)
default_registry.register(
SfComponentType.RAW_DATA_LOADER,
"myexchange/spot",
MyExchangeSpotLoader,
)
# Usage
loader = MyExchangeSpotLoader()
await loader.download(pairs=["BTCUSDT", "ETHUSDT"], days=30)
Shared Utilities¶
The _helpers module provides common datetime utilities:
from signalflow.data.source._helpers import (
TIMEFRAME_MS, # {"1m": 60_000, "1h": 3_600_000, ...}
dt_to_ms_utc, # datetime -> milliseconds
ms_to_dt_utc_naive, # milliseconds -> datetime (UTC naive)
ensure_utc_naive, # normalize timezone
# Seconds-based helpers (for Kraken, WhiteBIT)
dt_to_sec_utc, # datetime -> seconds
sec_to_dt_utc_naive, # seconds -> datetime (UTC naive)
)
# Convert datetime to milliseconds
ms = dt_to_ms_utc(datetime(2024, 1, 1)) # 1704067200000
# Convert back
dt = ms_to_dt_utc_naive(1704067200000) # datetime(2024, 1, 1, 0, 0)
# For seconds-based exchanges (Kraken, WhiteBIT)
sec = dt_to_sec_utc(datetime(2024, 1, 1)) # 1704067200
dt = sec_to_dt_utc_naive(1704067200) # datetime(2024, 1, 1, 0, 0)
# Normalize timezone
dt_naive = ensure_utc_naive(datetime(2024, 1, 1, tzinfo=timezone.utc))
Pair Normalization Helpers¶
Each exchange has pair normalization utilities:
from signalflow.data.source._helpers import (
# Kraken
normalize_kraken_spot_pair, # XXBTZUSD -> BTCUSD
normalize_kraken_futures_pair, # pi_xbtusd -> BTCUSD
# Deribit
normalize_deribit_pair, # BTC-PERPETUAL -> BTCUSD
to_deribit_instrument, # BTCUSD -> BTC-PERPETUAL
# Hyperliquid
normalize_hyperliquid_pair, # BTC -> BTCUSD
to_hyperliquid_coin, # BTCUSD -> BTC
# WhiteBIT
normalize_whitebit_pair, # BTC_USDT -> BTCUSDT
normalize_whitebit_futures_pair, # BTC_PERP -> BTCUSD
)
Best Practices¶
Rate Limiting¶
import asyncio
@dataclass
class MyClient(RawDataSource):
requests_per_minute: int = 60
_last_request: float = 0
async def _rate_limit(self):
elapsed = time.time() - self._last_request
min_interval = 60 / self.requests_per_minute
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
self._last_request = time.time()
async def get_klines(self, pair: str):
await self._rate_limit()
# ... make request
Retry Logic¶
async def get_klines(self, pair: str):
last_error = None
for attempt in range(self.max_retries):
try:
async with self._session.get(url, params=params) as resp:
if resp.status == 429: # Rate limited
await asyncio.sleep(2 ** attempt)
continue
if resp.status != 200:
raise RuntimeError(f"HTTP {resp.status}")
return await resp.json()
except aiohttp.ClientError as e:
last_error = e
await asyncio.sleep(1)
raise RuntimeError(f"Failed after {self.max_retries} attempts: {last_error}")
Timestamp Convention¶
All SignalFlow sources use close time as the canonical timestamp:
# Binance returns open time in k[0], close time in k[6]
close_ms = int(kline[6])
timestamp = ms_to_dt_utc_naive(close_ms)
# For sources returning open time, add timeframe duration
open_time = datetime(2024, 1, 1, 10, 0)
close_time = open_time + timedelta(minutes=1) # for 1m candle
API Reference¶
See the full API documentation:
RawDataSource- Base class for data sourcesRawDataLoader- Base class for data loadersBinanceClient- Binance API clientBybitClient- Bybit API clientOkxClient- OKX API clientDeribitClient- Deribit API clientKrakenClient- Kraken API clientHyperliquidClient- Hyperliquid API clientWhitebitClient- WhiteBIT API client