Data Stores¶
SignalFlow provides multiple storage backends for persisting historical market data with support for different data types (spot, futures, perpetual).
Available Backends¶
| Backend | Use Case | Dependencies |
|---|---|---|
DuckDbRawStore |
High-performance local storage | duckdb |
SqliteRawStore |
Lightweight, zero-config | built-in |
InMemoryRawStore |
Testing and notebooks | none |
PgRawStore |
Production PostgreSQL | psycopg2 (optional) |
Quick Start¶
DuckDB Store (Recommended)¶
from signalflow.data.raw_store import DuckDbRawStore
from datetime import datetime
from pathlib import Path
# Create store
store = DuckDbRawStore(
db_path=Path("data/spot.duckdb"),
data_type="spot", # or "futures", "perpetual"
)
# Insert data
klines = [
{"timestamp": datetime(2024, 1, 1, 0, 0), "open": 42000.0, "high": 42100.0,
"low": 41900.0, "close": 42050.0, "volume": 100.0, "trades": 500},
{"timestamp": datetime(2024, 1, 1, 0, 1), "open": 42050.0, "high": 42150.0,
"low": 41950.0, "close": 42100.0, "volume": 120.0, "trades": 600},
]
store.insert_klines("BTCUSDT", klines)
# Load single pair
df = store.load("BTCUSDT", hours=24)
# Load date range
df = store.load(
"BTCUSDT",
start=datetime(2024, 1, 1),
end=datetime(2024, 1, 31),
)
# Load multiple pairs
df = store.load_many(
pairs=["BTCUSDT", "ETHUSDT"],
start=datetime(2024, 1, 1),
end=datetime(2024, 1, 31),
)
# Cleanup
store.close()
SQLite Store¶
from signalflow.data.raw_store import SqliteRawStore
from pathlib import Path
store = SqliteRawStore(
db_path=Path("data/spot.sqlite"),
data_type="spot",
)
# Same API as DuckDB
store.insert_klines("BTCUSDT", klines)
df = store.load("BTCUSDT", hours=24)
store.close()
In-Memory Store¶
from signalflow.data.raw_store import InMemoryRawStore
store = InMemoryRawStore(data_type="spot")
store.insert_klines("BTCUSDT", klines)
df = store.load("BTCUSDT")
# Data is lost when store is closed
store.close()
Converting to RawData¶
The to_raw_data() method converts store data to an immutable RawData container:
from datetime import datetime
# Direct from store
raw_data = store.to_raw_data(
pairs=["BTCUSDT", "ETHUSDT"],
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
)
# Access data
spot_df = raw_data["spot"] # Polars DataFrame
print(raw_data.pairs) # ["BTCUSDT", "ETHUSDT"]
print(raw_data.datetime_start) # datetime(2024, 1, 1)
Custom Data Key¶
# Use custom key instead of data_type
raw_data = store.to_raw_data(
pairs=["BTCUSDT"],
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
data_key="binance_spot",
)
df = raw_data["binance_spot"]
RawDataFactory¶
For combining data from multiple stores:
from signalflow.data import RawDataFactory
from signalflow.data.raw_store import DuckDbRawStore
from pathlib import Path
from datetime import datetime
# Create stores
spot_store = DuckDbRawStore(db_path=Path("data/spot.duckdb"), data_type="spot")
futures_store = DuckDbRawStore(db_path=Path("data/futures.duckdb"), data_type="futures")
# Combine into single RawData
raw_data = RawDataFactory.from_stores(
stores=[spot_store, futures_store],
pairs=["BTCUSDT", "ETHUSDT"],
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
)
# Access both data types
spot_df = raw_data["spot"]
futures_df = raw_data["futures"]
Legacy Factory Methods¶
# Single DuckDB store
raw_data = RawDataFactory.from_duckdb_spot_store(
spot_store_path=Path("data/spot.duckdb"),
pairs=["BTCUSDT"],
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
)
# From Polars DataFrames
raw_data = RawDataFactory.from_polars(
spot_df=my_polars_df,
pairs=["BTCUSDT"],
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
)
Data Types and Schema¶
Built-in Data Types¶
| Type | Columns |
|---|---|
spot |
pair, timestamp, open, high, low, close, volume, trades |
futures |
spot + open_interest |
perpetual |
spot + open_interest, funding_rate |
Using Different Data Types¶
# Spot data
spot_store = DuckDbRawStore(db_path=Path("spot.duckdb"), data_type="spot")
# Futures data (includes open_interest)
futures_store = DuckDbRawStore(db_path=Path("futures.duckdb"), data_type="futures")
futures_klines = [
{"timestamp": datetime(2024, 1, 1), "open": 42000.0, "high": 42100.0,
"low": 41900.0, "close": 42050.0, "volume": 100.0, "open_interest": 50000.0},
]
futures_store.insert_klines("BTCUSDT", futures_klines)
# Perpetual data (includes open_interest + funding_rate)
perp_store = DuckDbRawStore(db_path=Path("perp.duckdb"), data_type="perpetual")
perp_klines = [
{"timestamp": datetime(2024, 1, 1), "open": 42000.0, "high": 42100.0,
"low": 41900.0, "close": 42050.0, "volume": 100.0,
"open_interest": 50000.0, "funding_rate": 0.0001},
]
perp_store.insert_klines("BTCUSDT", perp_klines)
Store Operations¶
Upsert Semantics¶
All stores use upsert (INSERT OR REPLACE) based on (pair, timestamp) key:
# Insert initial data
store.insert_klines("BTCUSDT", [
{"timestamp": datetime(2024, 1, 1, 0, 0), "close": 42000.0, ...}
])
# Re-insert with updated values (replaces existing)
store.insert_klines("BTCUSDT", [
{"timestamp": datetime(2024, 1, 1, 0, 0), "close": 42100.0, ...}
])
# Result: only one row with close=42100.0
Time Bounds¶
# Get first and last timestamp for a pair
first_ts, last_ts = store.get_time_bounds("BTCUSDT")
print(f"Data from {first_ts} to {last_ts}")
Gap Detection¶
from datetime import datetime
# Find gaps in data
gaps = store.find_gaps(
pair="BTCUSDT",
start=datetime(2024, 1, 1),
end=datetime(2024, 1, 31),
tf_minutes=1, # 1-minute candles
)
for gap_start, gap_end in gaps:
print(f"Missing data: {gap_start} to {gap_end}")
Statistics¶
# Get per-pair statistics
stats = store.get_stats()
print(stats)
# shape: (2, 5)
# ┌──────────┬───────┬─────────────────────┬─────────────────────┬──────────────┐
# │ pair ┆ rows ┆ first_candle ┆ last_candle ┆ total_volume │
# │ --- ┆ --- ┆ --- ┆ --- ┆ --- │
# │ str ┆ u32 ┆ datetime[μs] ┆ datetime[μs] ┆ f64 │
# ╞══════════╪═══════╪═════════════════════╪═════════════════════╪══════════════╡
# │ BTCUSDT ┆ 10000 ┆ 2024-01-01 00:00:00 ┆ 2024-01-07 22:39:00 ┆ 1500000.0 │
# │ ETHUSDT ┆ 10000 ┆ 2024-01-01 00:00:00 ┆ 2024-01-07 22:39:00 ┆ 2100000.0 │
# └──────────┴───────┴─────────────────────┴─────────────────────┴──────────────┘
Creating a Custom Store¶
Extend RawDataStore to implement your own backend:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Iterable
import polars as pl
import signalflow as sf
from signalflow.core.containers.raw_data import RawData
from signalflow.data.raw_store.base import RawDataStore
@dataclass
@sf.data_store("parquet/spot")
class ParquetRawStore(RawDataStore):
"""Parquet file storage backend."""
data_dir: Path
data_type: str = "spot"
_cache: dict[str, pl.DataFrame] = field(default_factory=dict, repr=False)
def __post_init__(self):
self.data_dir.mkdir(parents=True, exist_ok=True)
def _file_path(self, pair: str) -> Path:
return self.data_dir / f"{pair}.parquet"
def insert_klines(self, pair: str, klines: list[dict]) -> None:
if not klines:
return
new_df = pl.DataFrame(klines)
path = self._file_path(pair)
if path.exists():
existing = pl.read_parquet(path)
# Upsert: remove existing timestamps, add new
combined = pl.concat([
existing.filter(~pl.col("timestamp").is_in(new_df["timestamp"])),
new_df,
]).sort("timestamp")
else:
combined = new_df
combined.write_parquet(path)
def get_time_bounds(self, pair: str) -> tuple[Optional[datetime], Optional[datetime]]:
path = self._file_path(pair)
if not path.exists():
return (None, None)
df = pl.read_parquet(path, columns=["timestamp"])
if df.is_empty():
return (None, None)
return (df["timestamp"].min(), df["timestamp"].max())
def find_gaps(
self, pair: str, start: datetime, end: datetime, tf_minutes: int
) -> list[tuple[datetime, datetime]]:
# Implementation similar to other stores
pass
def load(
self,
pair: str,
hours: Optional[int] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> pl.DataFrame:
path = self._file_path(pair)
if not path.exists():
return pl.DataFrame()
df = pl.read_parquet(path)
# Apply time filters
if hours is not None:
cutoff = datetime.now() - timedelta(hours=hours)
df = df.filter(pl.col("timestamp") > cutoff)
elif start and end:
df = df.filter(
(pl.col("timestamp") >= start) & (pl.col("timestamp") <= end)
)
return df.sort("timestamp")
def load_many(
self,
pairs: Iterable[str],
hours: Optional[int] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> pl.DataFrame:
dfs = []
for pair in pairs:
df = self.load(pair, hours=hours, start=start, end=end)
if not df.is_empty():
dfs.append(df.with_columns(pl.lit(pair).alias("pair")))
if not dfs:
return pl.DataFrame()
return pl.concat(dfs).sort(["pair", "timestamp"])
def load_many_pandas(self, pairs: list[str], start=None, end=None):
return self.load_many(pairs, start=start, end=end).to_pandas()
def get_stats(self) -> pl.DataFrame:
# Implementation
pass
def close(self) -> None:
self._cache.clear()
def to_raw_data(
self,
pairs: list[str],
start: datetime,
end: datetime,
data_key: Optional[str] = None,
) -> RawData:
key = data_key if data_key is not None else self.data_type
df = self.load_many(pairs=pairs, start=start, end=end)
return RawData(
datetime_start=start,
datetime_end=end,
pairs=pairs,
data={key: df},
)
StoreFactory¶
Create stores dynamically using the factory:
from signalflow.data import StoreFactory
# Create DuckDB store
store = StoreFactory.create_raw_store(
backend="duckdb",
data_type="spot",
db_path="data/spot.duckdb",
)
# Create SQLite store
store = StoreFactory.create_raw_store(
backend="sqlite",
data_type="futures",
db_path="data/futures.sqlite",
)
# Create in-memory store
store = StoreFactory.create_raw_store(
backend="memory",
data_type="spot",
)
PostgreSQL Store (Optional)¶
For production deployments:
# Requires: pip install signalflow[postgres]
from signalflow.data.raw_store import PgRawStore
store = PgRawStore(
host="localhost",
port=5432,
database="signalflow",
user="postgres",
password="secret",
data_type="spot",
)
store.insert_klines("BTCUSDT", klines)
df = store.load("BTCUSDT", hours=24)
store.close()
Best Practices¶
Connection Management¶
# Use try/finally
store = DuckDbRawStore(db_path=Path("data.duckdb"))
try:
df = store.load("BTCUSDT")
finally:
store.close()
# Or context manager (if implemented)
with DuckDbRawStore(db_path=Path("data.duckdb")) as store:
df = store.load("BTCUSDT")
Batch Inserts¶
# Prefer batch insert over single klines
all_klines = fetch_many_klines() # e.g., 10,000 klines
store.insert_klines("BTCUSDT", all_klines) # Single batch
# Avoid
for kline in all_klines:
store.insert_klines("BTCUSDT", [kline]) # 10,000 separate inserts
Data Validation¶
# to_raw_data() validates data automatically:
# - Checks required columns (pair, timestamp)
# - Normalizes timestamps to UTC-naive
# - Detects duplicate (pair, timestamp) combinations
try:
raw_data = store.to_raw_data(pairs=["BTCUSDT"], start=start, end=end)
except ValueError as e:
print(f"Data validation failed: {e}")
API Reference¶
See the full API documentation:
RawDataStore- Base class for all storesDuckDbRawStore- DuckDB implementationSqliteRawStore- SQLite implementationInMemoryRawStore- In-memory implementationRawDataFactory- Factory for creating RawDataStoreFactory- Factory for creating stores