Skip to content

Data Module

Data Sources

Binance

signalflow.data.source.binance.BinanceClient dataclass

BinanceClient(base_url: str = 'https://api.binance.com', klines_path: str = '/api/v3/klines', exchange_info_path: str = '/api/v3/exchangeInfo', max_retries: int = 3, timeout_sec: int = 30, min_delay_sec: float = 0.05)

Bases: RawDataSource

Async client for Binance REST API.

Provides async methods for fetching OHLCV candlestick data with automatic retries, rate limit handling, and pagination.

IMPORTANT: Returned timestamps are candle CLOSE times (Binance k[6]), UTC-naive.

Attributes:

Name Type Description
base_url str

Binance API base URL. Default: "https://api.binance.com".

max_retries int

Maximum retry attempts. Default: 3.

timeout_sec int

Request timeout in seconds. Default: 30.

min_delay_sec float

Minimum delay between requests. Default: 0.05.

__aenter__ async

__aenter__() -> BinanceClient

Enter async context - creates session.

Source code in src/signalflow/data/source/binance.py
async def __aenter__(self) -> "BinanceClient":
    """Enter async context - creates session."""
    timeout = aiohttp.ClientTimeout(total=self.timeout_sec)
    self._session = aiohttp.ClientSession(timeout=timeout)
    return self

__aexit__ async

__aexit__(*args: Any) -> None

Exit async context - closes session.

Source code in src/signalflow/data/source/binance.py
async def __aexit__(self, *args: Any) -> None:
    """Exit async context - closes session."""
    if self._session:
        await self._session.close()
        self._session = None

get_klines async

get_klines(pair: str, timeframe: str = '1m', *, start_time: datetime | None = None, end_time: datetime | None = None, limit: int = 1000) -> list[dict[str, Any]]

Fetch OHLCV klines from Binance.

IMPORTANT: Returned "timestamp" is CANDLE CLOSE TIME (UTC-naive).

Parameters:

Name Type Description Default
pair str

Trading pair (e.g., "BTCUSDT").

required
timeframe str

Interval (1m, 5m, 1h, 1d, etc.). Default: "1m".

'1m'
start_time datetime | None

Range start (naive=UTC or aware).

None
end_time datetime | None

Range end (naive=UTC or aware).

None
limit int

Max candles (max 1000). Default: 1000.

1000

Returns:

Type Description
list[dict[str, Any]]

list[dict]: OHLCV dicts with keys: timestamp, open, high, low, close, volume, trades.

Raises:

Type Description
RuntimeError

If not in async context or API error.

Source code in src/signalflow/data/source/binance.py
async def get_klines(
    self,
    pair: str,
    timeframe: str = "1m",
    *,
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    limit: int = 1000,
) -> list[dict[str, Any]]:
    """Fetch OHLCV klines from Binance.

    IMPORTANT: Returned "timestamp" is CANDLE CLOSE TIME (UTC-naive).

    Args:
        pair (str): Trading pair (e.g., "BTCUSDT").
        timeframe (str): Interval (1m, 5m, 1h, 1d, etc.). Default: "1m".
        start_time (datetime | None): Range start (naive=UTC or aware).
        end_time (datetime | None): Range end (naive=UTC or aware).
        limit (int): Max candles (max 1000). Default: 1000.

    Returns:
        list[dict]: OHLCV dicts with keys: timestamp, open, high, low,
            close, volume, trades.

    Raises:
        RuntimeError: If not in async context or API error.
    """
    if self._session is None:
        raise RuntimeError("BinanceClient must be used as an async context manager.")

    params: dict[str, str | int] = {"symbol": pair, "interval": timeframe, "limit": int(limit)}
    if start_time is not None:
        params["startTime"] = dt_to_ms_utc(start_time)
    if end_time is not None:
        params["endTime"] = dt_to_ms_utc(end_time)

    url = f"{self.base_url}{self.klines_path}"
    last_err: Exception | None = None

    for attempt in range(self.max_retries):
        try:
            async with self._session.get(url, params=params) as resp:
                if resp.status == 429:
                    retry_after = int(resp.headers.get("Retry-After", 60))
                    logger.warning(f"Rate limited, waiting {retry_after}s (pair={pair}, tf={timeframe})")
                    await asyncio.sleep(retry_after)
                    continue

                if resp.status != 200:
                    text = await resp.text()
                    raise RuntimeError(f"Binance API error {resp.status}: {text}")

                data = await resp.json()

            out: list[dict[str, Any]] = []
            for k in data:
                close_ms = int(k[6])
                out.append(
                    {
                        "timestamp": ms_to_dt_utc_naive(close_ms),
                        "open": float(k[1]),
                        "high": float(k[2]),
                        "low": float(k[3]),
                        "close": float(k[4]),
                        "volume": float(k[7]),
                        "trades": int(k[8]),
                    }
                )

            return out

        except (TimeoutError, aiohttp.ClientError, RuntimeError) as e:
            last_err = e
            if attempt < self.max_retries - 1:
                wait = 2**attempt
                logger.warning(f"Request failed, retrying in {wait}s (pair={pair}, tf={timeframe}): {e}")
                await asyncio.sleep(wait)
            else:
                break

    raise last_err or RuntimeError("Unknown error while fetching klines.")

get_klines_range async

get_klines_range(pair: str, timeframe: str, start_time: datetime, end_time: datetime, *, limit: int = 1000) -> list[dict[str, Any]]

Download all klines for period with automatic pagination.

Semantics
  • Range by CANDLE CLOSE TIME: [start_time, end_time] inclusive
  • Returns UTC-naive timestamps
  • Automatic deduplication
Pagination strategy
  • Request windows of size limit * timeframe
  • Advance based on last returned close time + 1ms
  • Additional dedup at end for safety

Parameters:

Name Type Description Default
pair str

Trading pair.

required
timeframe str

Interval (must be in TIMEFRAME_MS).

required
start_time datetime

Range start (inclusive).

required
end_time datetime

Range end (inclusive).

required
limit int

Candles per request. Default: 1000.

1000

Returns:

Type Description
list[dict[str, Any]]

list[dict]: Deduplicated, sorted OHLCV dicts.

Raises:

Type Description
ValueError

If timeframe unsupported.

RuntimeError

If pagination exceeds safety limit (2M loops).

Source code in src/signalflow/data/source/binance.py
async def get_klines_range(
    self,
    pair: str,
    timeframe: str,
    start_time: datetime,
    end_time: datetime,
    *,
    limit: int = 1000,
) -> list[dict[str, Any]]:
    """Download all klines for period with automatic pagination.

    Semantics:
        - Range by CANDLE CLOSE TIME: [start_time, end_time] inclusive
        - Returns UTC-naive timestamps
        - Automatic deduplication

    Pagination strategy:
        - Request windows of size limit * timeframe
        - Advance based on last returned close time + 1ms
        - Additional dedup at end for safety

    Args:
        pair (str): Trading pair.
        timeframe (str): Interval (must be in TIMEFRAME_MS).
        start_time (datetime): Range start (inclusive).
        end_time (datetime): Range end (inclusive).
        limit (int): Candles per request. Default: 1000.

    Returns:
        list[dict]: Deduplicated, sorted OHLCV dicts.

    Raises:
        ValueError: If timeframe unsupported.
        RuntimeError: If pagination exceeds safety limit (2M loops).
    """
    if timeframe not in TIMEFRAME_MS:
        raise ValueError(f"Unsupported timeframe: {timeframe}")

    start_time = ensure_utc_naive(start_time)
    end_time = ensure_utc_naive(end_time)

    if start_time >= end_time:
        return []

    tf_ms = TIMEFRAME_MS[timeframe]
    window = timedelta(milliseconds=tf_ms * limit)

    all_klines: list[dict[str, Any]] = []
    current_start = start_time

    max_loops = 2_000_000
    loops = 0

    while current_start < end_time:
        loops += 1
        if loops > max_loops:
            raise RuntimeError("Pagination guard triggered (too many loops).")

        req_end = min(current_start + window, end_time)

        klines = await self.get_klines(
            pair=pair,
            timeframe=timeframe,
            start_time=current_start,
            end_time=req_end,
            limit=limit,
        )

        if not klines:
            current_start = req_end + timedelta(milliseconds=1)
            await asyncio.sleep(self.min_delay_sec)
            continue

        klines.sort(key=lambda x: x["timestamp"])

        for k in klines:
            ts = k["timestamp"]
            if start_time <= ts <= end_time:
                all_klines.append(k)

        last_close = klines[-1]["timestamp"]
        next_start = last_close + timedelta(milliseconds=1)

        current_start = current_start + timedelta(milliseconds=1) if next_start <= current_start else next_start

        if len(all_klines) and len(all_klines) % 10000 == 0:
            logger.info(f"{pair}: loaded {len(all_klines):,} candles...")

        await asyncio.sleep(self.min_delay_sec)

    uniq: dict[datetime, dict[str, Any]] = {}
    for k in all_klines:
        uniq[k["timestamp"]] = k

    out = list(uniq.values())
    out.sort(key=lambda x: x["timestamp"])
    return out

get_pairs async

get_pairs(quote: str | None = None) -> list[str]

Get list of available trading pairs.

Parameters:

Name Type Description Default
quote str | None

Filter by quote asset (e.g., "USDT", "BTC"). If None, returns all trading pairs.

None

Returns:

Type Description
list[str]

list[str]: List of trading pair symbols (e.g., ["BTCUSDT", "ETHUSDT"]).

Example
async with BinanceClient() as client:
    # All pairs
    all_pairs = await client.get_pairs()

    # Only USDT pairs
    usdt_pairs = await client.get_pairs(quote="USDT")
Source code in src/signalflow/data/source/binance.py
async def get_pairs(self, quote: str | None = None) -> list[str]:
    """Get list of available trading pairs.

    Args:
        quote (str | None): Filter by quote asset (e.g., "USDT", "BTC").
            If None, returns all trading pairs.

    Returns:
        list[str]: List of trading pair symbols (e.g., ["BTCUSDT", "ETHUSDT"]).

    Example:
        ```python
        async with BinanceClient() as client:
            # All pairs
            all_pairs = await client.get_pairs()

            # Only USDT pairs
            usdt_pairs = await client.get_pairs(quote="USDT")
        ```
    """
    if self._session is None:
        raise RuntimeError("BinanceClient must be used as an async context manager.")

    url = f"{self.base_url}{self.exchange_info_path}"

    for attempt in range(self.max_retries):
        try:
            async with self._session.get(url) as resp:
                if resp.status == 429:
                    retry_after = int(resp.headers.get("Retry-After", 60))
                    logger.warning(f"Rate limited, waiting {retry_after}s")
                    await asyncio.sleep(retry_after)
                    continue

                if resp.status != 200:
                    text = await resp.text()
                    raise RuntimeError(f"Binance API error {resp.status}: {text}")

                data = await resp.json()

            pairs: list[str] = []
            for symbol in data.get("symbols", []):
                if symbol.get("status") != "TRADING":
                    continue
                sym = symbol.get("symbol", "")
                if quote is None or symbol.get("quoteAsset") == quote:
                    pairs.append(sym)

            return sorted(pairs)

        except (TimeoutError, aiohttp.ClientError) as e:
            if attempt < self.max_retries - 1:
                wait = 2**attempt
                logger.warning(f"Request failed, retrying in {wait}s: {e}")
                await asyncio.sleep(wait)
            else:
                raise RuntimeError(f"Failed to get exchange info: {e}") from e

    return []

signalflow.data.source.binance.BinanceSpotLoader dataclass

BinanceSpotLoader(store: DuckDbSpotStore = (lambda: DuckDbSpotStore(db_path=(Path('raw_data.duckdb'))))(), timeframe: str = '1m')

Bases: RawDataLoader

Downloads and stores Binance spot OHLCV data for fixed timeframe.

Combines BinanceClient (source) and DuckDbSpotStore (storage) to provide complete data pipeline with gap filling and incremental updates.

Attributes:

Name Type Description
store DuckDbSpotStore

Storage backend. Default: raw_data.duckdb.

timeframe str

Fixed timeframe for all data. Default: "1m".

download async

download(pairs: list[str], days: int | None = None, start: datetime | None = None, end: datetime | None = None, fill_gaps: bool = True) -> None

Download historical data with intelligent range detection.

Automatically determines what to download
  • If no existing data: download full range
  • If data exists: download before/after existing range
  • If fill_gaps=True: detect and fill gaps in existing range

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to download.

required
days int | None

Number of days back from end. Default: 7.

None
start datetime | None

Range start (overrides days).

None
end datetime | None

Range end. Default: now.

None
fill_gaps bool

Detect and fill gaps. Default: True.

True
Note

Runs async download for all pairs concurrently. Logs progress for large downloads. Errors logged but don't stop other pairs.

Source code in src/signalflow/data/source/binance.py
async def download(  # type: ignore[override]
    self,
    pairs: list[str],
    days: int | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
    fill_gaps: bool = True,
) -> None:
    """Download historical data with intelligent range detection.

    Automatically determines what to download:
        - If no existing data: download full range
        - If data exists: download before/after existing range
        - If fill_gaps=True: detect and fill gaps in existing range

    Args:
        pairs (list[str]): Trading pairs to download.
        days (int | None): Number of days back from end. Default: 7.
        start (datetime | None): Range start (overrides days).
        end (datetime | None): Range end. Default: now.
        fill_gaps (bool): Detect and fill gaps. Default: True.

    Note:
        Runs async download for all pairs concurrently.
        Logs progress for large downloads.
        Errors logged but don't stop other pairs.
    """

    now = datetime.now(UTC).replace(tzinfo=None)
    end = now if end is None else ensure_utc_naive(end)

    start = end - timedelta(days=days if days else 7) if start is None else ensure_utc_naive(start)

    tf_minutes = {
        "1m": 1,
        "3m": 3,
        "5m": 5,
        "15m": 15,
        "30m": 30,
        "1h": 60,
        "2h": 120,
        "4h": 240,
        "6h": 360,
        "8h": 480,
        "12h": 720,
        "1d": 1440,
    }.get(self.timeframe, 1)

    async def download_pair(client: BinanceClient, pair: str) -> None:
        logger.info(f"Processing {pair} from {start} to {end}")

        db_min, db_max = self.store.get_time_bounds(pair)
        ranges_to_download: list[tuple[datetime, datetime]] = []

        if db_min is None:
            ranges_to_download.append((start, end))
        else:
            assert db_max is not None
            if start < db_min:
                pre_end = min(end, db_min - timedelta(minutes=tf_minutes))
                if start < pre_end:
                    ranges_to_download.append((start, pre_end))
            if end > db_max:
                post_start = max(start, db_max + timedelta(minutes=tf_minutes))
                if post_start < end:
                    ranges_to_download.append((post_start, end))

            if fill_gaps:
                overlap_start = max(start, db_min)
                overlap_end = min(end, db_max)
                if overlap_start < overlap_end:
                    gaps = self.store.find_gaps(pair, overlap_start, overlap_end, tf_minutes)
                    ranges_to_download.extend(gaps)

        for range_start, range_end in ranges_to_download:
            if range_start >= range_end:
                continue

            logger.info(f"{pair}: downloading {range_start} -> {range_end}")

            try:
                klines = await client.get_klines_range(
                    pair=pair,
                    timeframe=self.timeframe,
                    start_time=range_start,
                    end_time=range_end,
                )
                self.store.insert_klines(pair, klines)
            except Exception as e:
                logger.error(f"Error downloading {pair}: {e}")

    async with BinanceClient() as client:
        await asyncio.gather(*[download_pair(client, pair) for pair in pairs])

    self.store.close()

get_pairs async

get_pairs(quote: str | None = None) -> list[str]

Get list of available trading pairs from Binance Spot.

Parameters:

Name Type Description Default
quote str | None

Filter by quote asset (e.g., "USDT"). If None, returns all trading pairs.

None

Returns:

Type Description
list[str]

list[str]: List of trading pair symbols.

Example
loader = BinanceSpotLoader(store=store)
usdt_pairs = await loader.get_pairs(quote="USDT")
# ['BTCUSDT', 'ETHUSDT', ...]
Source code in src/signalflow/data/source/binance.py
async def get_pairs(self, quote: str | None = None) -> list[str]:
    """Get list of available trading pairs from Binance Spot.

    Args:
        quote (str | None): Filter by quote asset (e.g., "USDT").
            If None, returns all trading pairs.

    Returns:
        list[str]: List of trading pair symbols.

    Example:
        ```python
        loader = BinanceSpotLoader(store=store)
        usdt_pairs = await loader.get_pairs(quote="USDT")
        # ['BTCUSDT', 'ETHUSDT', ...]
        ```
    """
    async with BinanceClient() as client:
        return await client.get_pairs(quote=quote)

sync async

sync(pairs: list[str], update_interval_sec: int = 60) -> None

Real-time sync - continuously update with latest data.

Runs indefinitely, fetching latest candles at specified interval. Useful for live trading or monitoring.

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to sync.

required
update_interval_sec int

Update interval in seconds. Default: 60.

60
Note

Runs forever - use Ctrl+C to stop or run in background task. Fetches last 5 candles per update (ensures no gaps). Errors logged but sync continues.

Source code in src/signalflow/data/source/binance.py
async def sync(  # type: ignore[override]
    self,
    pairs: list[str],
    update_interval_sec: int = 60,
) -> None:
    """Real-time sync - continuously update with latest data.

    Runs indefinitely, fetching latest candles at specified interval.
    Useful for live trading or monitoring.

    Args:
        pairs (list[str]): Trading pairs to sync.
        update_interval_sec (int): Update interval in seconds. Default: 60.

    Note:
        Runs forever - use Ctrl+C to stop or run in background task.
        Fetches last 5 candles per update (ensures no gaps).
        Errors logged but sync continues.
    """

    logger.info(f"Starting real-time sync for {pairs}")
    logger.info(f"Update interval: {update_interval_sec}s (timeframe={self.timeframe})")

    async def fetch_and_store(client: BinanceClient, pair: str) -> None:
        try:
            klines = await client.get_klines(pair=pair, timeframe=self.timeframe, limit=5)
            self.store.insert_klines(pair, klines)
        except Exception as e:
            logger.error(f"Error syncing {pair}: {e}")

    async with BinanceClient() as client:
        while True:
            await asyncio.gather(*[fetch_and_store(client, pair) for pair in pairs])
            logger.debug(f"Synced {len(pairs)} pairs")
            await asyncio.sleep(update_interval_sec)

signalflow.data.source.binance.BinanceFuturesUsdtLoader dataclass

BinanceFuturesUsdtLoader(store: DuckDbSpotStore = (lambda: DuckDbSpotStore(db_path=(Path('raw_data_futures_usdt.duckdb'))))(), timeframe: str = '1m')

Bases: RawDataLoader

Downloads and stores Binance USDT-M Futures OHLCV data.

Uses the fapi.binance.com endpoint for USDT-margined perpetual and delivery contracts. Follows the same pipeline as BinanceSpotLoader (gap filling, incremental sync).

Attributes:

Name Type Description
store DuckDbSpotStore

Storage backend.

timeframe str

Fixed timeframe for all data. Default: "1m".

download async

download(pairs: list[str], days: int | None = None, start: datetime | None = None, end: datetime | None = None, fill_gaps: bool = True) -> None

Download historical USDT-M futures data.

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to download (e.g. ["BTCUSDT"]).

required
days int | None

Number of days back from end. Default: 7.

None
start datetime | None

Range start (overrides days).

None
end datetime | None

Range end. Default: now.

None
fill_gaps bool

Detect and fill gaps. Default: True.

True
Source code in src/signalflow/data/source/binance.py
async def download(  # type: ignore[override]
    self,
    pairs: list[str],
    days: int | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
    fill_gaps: bool = True,
) -> None:
    """Download historical USDT-M futures data.

    Args:
        pairs (list[str]): Trading pairs to download (e.g. ["BTCUSDT"]).
        days (int | None): Number of days back from *end*. Default: 7.
        start (datetime | None): Range start (overrides *days*).
        end (datetime | None): Range end. Default: now.
        fill_gaps (bool): Detect and fill gaps. Default: True.
    """
    now = datetime.now(UTC).replace(tzinfo=None)
    end = now if end is None else ensure_utc_naive(end)

    start = end - timedelta(days=days if days else 7) if start is None else ensure_utc_naive(start)

    tf_minutes = {
        "1m": 1,
        "3m": 3,
        "5m": 5,
        "15m": 15,
        "30m": 30,
        "1h": 60,
        "2h": 120,
        "4h": 240,
        "6h": 360,
        "8h": 480,
        "12h": 720,
        "1d": 1440,
    }.get(self.timeframe, 1)

    async def download_pair(client: BinanceClient, pair: str) -> None:
        logger.info(f"Processing {pair} (futures-usdt) from {start} to {end}")
        db_min, db_max = self.store.get_time_bounds(pair)
        ranges_to_download: list[tuple[datetime, datetime]] = []

        if db_min is None:
            ranges_to_download.append((start, end))
        else:
            assert db_max is not None
            if start < db_min:
                pre_end = min(end, db_min - timedelta(minutes=tf_minutes))
                if start < pre_end:
                    ranges_to_download.append((start, pre_end))
            if end > db_max:
                post_start = max(start, db_max + timedelta(minutes=tf_minutes))
                if post_start < end:
                    ranges_to_download.append((post_start, end))
            if fill_gaps:
                overlap_start = max(start, db_min)
                overlap_end = min(end, db_max)
                if overlap_start < overlap_end:
                    gaps = self.store.find_gaps(pair, overlap_start, overlap_end, tf_minutes)
                    ranges_to_download.extend(gaps)

        for range_start, range_end in ranges_to_download:
            if range_start >= range_end:
                continue
            logger.info(f"{pair}: downloading {range_start} -> {range_end}")
            try:
                klines = await client.get_klines_range(
                    pair=pair,
                    timeframe=self.timeframe,
                    start_time=range_start,
                    end_time=range_end,
                )
                self.store.insert_klines(pair, klines)
            except Exception as e:
                logger.error(f"Error downloading {pair}: {e}")

    async with BinanceClient(
        base_url="https://fapi.binance.com",
        klines_path="/fapi/v1/klines",
    ) as client:
        await asyncio.gather(*[download_pair(client, pair) for pair in pairs])

    self.store.close()

get_pairs async

get_pairs(quote: str | None = None) -> list[str]

Get list of available USDT-M futures pairs.

Parameters:

Name Type Description Default
quote str | None

Filter by quote asset (e.g., "USDT"). If None, returns all futures pairs.

None

Returns:

Type Description
list[str]

list[str]: List of futures pair symbols.

Example
loader = BinanceFuturesUsdtLoader(store=store)
pairs = await loader.get_pairs()
# ['BTCUSDT', 'ETHUSDT', ...]
Source code in src/signalflow/data/source/binance.py
async def get_pairs(self, quote: str | None = None) -> list[str]:
    """Get list of available USDT-M futures pairs.

    Args:
        quote (str | None): Filter by quote asset (e.g., "USDT").
            If None, returns all futures pairs.

    Returns:
        list[str]: List of futures pair symbols.

    Example:
        ```python
        loader = BinanceFuturesUsdtLoader(store=store)
        pairs = await loader.get_pairs()
        # ['BTCUSDT', 'ETHUSDT', ...]
        ```
    """
    async with BinanceClient(
        base_url="https://fapi.binance.com",
        exchange_info_path="/fapi/v1/exchangeInfo",
    ) as client:
        return await client.get_pairs(quote=quote)

sync async

sync(pairs: list[str], update_interval_sec: int = 60) -> None

Continuously sync latest USDT-M futures data.

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to sync.

required
update_interval_sec int

Update interval in seconds. Default: 60.

60
Source code in src/signalflow/data/source/binance.py
async def sync(  # type: ignore[override]
    self,
    pairs: list[str],
    update_interval_sec: int = 60,
) -> None:
    """Continuously sync latest USDT-M futures data.

    Args:
        pairs (list[str]): Trading pairs to sync.
        update_interval_sec (int): Update interval in seconds. Default: 60.
    """
    logger.info(f"Starting real-time sync (futures-usdt) for {pairs}")
    logger.info(f"Update interval: {update_interval_sec}s (timeframe={self.timeframe})")

    async def fetch_and_store(client: BinanceClient, pair: str) -> None:
        try:
            klines = await client.get_klines(pair=pair, timeframe=self.timeframe, limit=5)
            self.store.insert_klines(pair, klines)
        except Exception as e:
            logger.error(f"Error syncing {pair}: {e}")

    async with BinanceClient(
        base_url="https://fapi.binance.com",
        klines_path="/fapi/v1/klines",
    ) as client:
        while True:
            await asyncio.gather(*[fetch_and_store(client, pair) for pair in pairs])
            logger.debug(f"Synced {len(pairs)} pairs (futures-usdt)")
            await asyncio.sleep(update_interval_sec)

signalflow.data.source.binance.BinanceFuturesCoinLoader dataclass

BinanceFuturesCoinLoader(store: DuckDbSpotStore = (lambda: DuckDbSpotStore(db_path=(Path('raw_data_futures_coin.duckdb'))))(), timeframe: str = '1m')

Bases: RawDataLoader

Downloads and stores Binance COIN-M Futures OHLCV data.

Uses the dapi.binance.com endpoint for coin-margined perpetual and delivery contracts.

Attributes:

Name Type Description
store DuckDbSpotStore

Storage backend.

timeframe str

Fixed timeframe for all data. Default: "1m".

download async

download(pairs: list[str], days: int | None = None, start: datetime | None = None, end: datetime | None = None, fill_gaps: bool = True) -> None

Download historical COIN-M futures data.

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to download (e.g. ["BTCUSD_PERP"]).

required
days int | None

Number of days back from end. Default: 7.

None
start datetime | None

Range start (overrides days).

None
end datetime | None

Range end. Default: now.

None
fill_gaps bool

Detect and fill gaps. Default: True.

True
Source code in src/signalflow/data/source/binance.py
async def download(  # type: ignore[override]
    self,
    pairs: list[str],
    days: int | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
    fill_gaps: bool = True,
) -> None:
    """Download historical COIN-M futures data.

    Args:
        pairs (list[str]): Trading pairs to download (e.g. ["BTCUSD_PERP"]).
        days (int | None): Number of days back from *end*. Default: 7.
        start (datetime | None): Range start (overrides *days*).
        end (datetime | None): Range end. Default: now.
        fill_gaps (bool): Detect and fill gaps. Default: True.
    """
    now = datetime.now(UTC).replace(tzinfo=None)
    end = now if end is None else ensure_utc_naive(end)

    start = end - timedelta(days=days if days else 7) if start is None else ensure_utc_naive(start)

    tf_minutes = {
        "1m": 1,
        "3m": 3,
        "5m": 5,
        "15m": 15,
        "30m": 30,
        "1h": 60,
        "2h": 120,
        "4h": 240,
        "6h": 360,
        "8h": 480,
        "12h": 720,
        "1d": 1440,
    }.get(self.timeframe, 1)

    async def download_pair(client: BinanceClient, pair: str) -> None:
        logger.info(f"Processing {pair} (futures-coin) from {start} to {end}")
        db_min, db_max = self.store.get_time_bounds(pair)
        ranges_to_download: list[tuple[datetime, datetime]] = []

        if db_min is None:
            ranges_to_download.append((start, end))
        else:
            assert db_max is not None
            if start < db_min:
                pre_end = min(end, db_min - timedelta(minutes=tf_minutes))
                if start < pre_end:
                    ranges_to_download.append((start, pre_end))
            if end > db_max:
                post_start = max(start, db_max + timedelta(minutes=tf_minutes))
                if post_start < end:
                    ranges_to_download.append((post_start, end))
            if fill_gaps:
                overlap_start = max(start, db_min)
                overlap_end = min(end, db_max)
                if overlap_start < overlap_end:
                    gaps = self.store.find_gaps(pair, overlap_start, overlap_end, tf_minutes)
                    ranges_to_download.extend(gaps)

        for range_start, range_end in ranges_to_download:
            if range_start >= range_end:
                continue
            logger.info(f"{pair}: downloading {range_start} -> {range_end}")
            try:
                klines = await client.get_klines_range(
                    pair=pair,
                    timeframe=self.timeframe,
                    start_time=range_start,
                    end_time=range_end,
                )
                self.store.insert_klines(pair, klines)
            except Exception as e:
                logger.error(f"Error downloading {pair}: {e}")

    async with BinanceClient(
        base_url="https://dapi.binance.com",
        klines_path="/dapi/v1/klines",
    ) as client:
        await asyncio.gather(*[download_pair(client, pair) for pair in pairs])

    self.store.close()

get_pairs async

get_pairs(quote: str | None = None) -> list[str]

Get list of available COIN-M futures pairs.

Parameters:

Name Type Description Default
quote str | None

Filter by margin asset (e.g., "BTC"). If None, returns all COIN-M futures pairs.

None

Returns:

Type Description
list[str]

list[str]: List of futures pair symbols (e.g., ["BTCUSD_PERP"]).

Example
loader = BinanceFuturesCoinLoader(store=store)
pairs = await loader.get_pairs()
# ['BTCUSD_PERP', 'ETHUSD_PERP', ...]
Source code in src/signalflow/data/source/binance.py
async def get_pairs(self, quote: str | None = None) -> list[str]:
    """Get list of available COIN-M futures pairs.

    Args:
        quote (str | None): Filter by margin asset (e.g., "BTC").
            If None, returns all COIN-M futures pairs.

    Returns:
        list[str]: List of futures pair symbols (e.g., ["BTCUSD_PERP"]).

    Example:
        ```python
        loader = BinanceFuturesCoinLoader(store=store)
        pairs = await loader.get_pairs()
        # ['BTCUSD_PERP', 'ETHUSD_PERP', ...]
        ```
    """
    async with BinanceClient(
        base_url="https://dapi.binance.com",
        exchange_info_path="/dapi/v1/exchangeInfo",
    ) as client:
        return await client.get_pairs(quote=quote)

sync async

sync(pairs: list[str], update_interval_sec: int = 60) -> None

Continuously sync latest COIN-M futures data.

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to sync.

required
update_interval_sec int

Update interval in seconds. Default: 60.

60
Source code in src/signalflow/data/source/binance.py
async def sync(  # type: ignore[override]
    self,
    pairs: list[str],
    update_interval_sec: int = 60,
) -> None:
    """Continuously sync latest COIN-M futures data.

    Args:
        pairs (list[str]): Trading pairs to sync.
        update_interval_sec (int): Update interval in seconds. Default: 60.
    """
    logger.info(f"Starting real-time sync (futures-coin) for {pairs}")
    logger.info(f"Update interval: {update_interval_sec}s (timeframe={self.timeframe})")

    async def fetch_and_store(client: BinanceClient, pair: str) -> None:
        try:
            klines = await client.get_klines(pair=pair, timeframe=self.timeframe, limit=5)
            self.store.insert_klines(pair, klines)
        except Exception as e:
            logger.error(f"Error syncing {pair}: {e}")

    async with BinanceClient(
        base_url="https://dapi.binance.com",
        klines_path="/dapi/v1/klines",
    ) as client:
        while True:
            await asyncio.gather(*[fetch_and_store(client, pair) for pair in pairs])
            logger.debug(f"Synced {len(pairs)} pairs (futures-coin)")
            await asyncio.sleep(update_interval_sec)

Other Exchanges

Additional exchange sources (Bybit, OKX, Deribit, Kraken, Hyperliquid, WhiteBIT) are provided by the signalflow-data extension package. Install it separately to access these loaders.

Virtual Data

signalflow.data.source.virtual.VirtualDataProvider dataclass

VirtualDataProvider(store: Any = None, timeframe: str = '1m', base_prices: dict[str, float] = dict(), volatility: float = 0.02, trend: float = 0.0001, seed: int | None = 42)

Bases: RawDataLoader

Generates and streams synthetic OHLCV data into a store.

Drop-in replacement for BinanceSpotLoader in tests and paper trading dry-runs. Data is generated deterministically when a seed is provided.

Attributes:

Name Type Description
store Any

Any raw data store with insert_klines(pair, klines).

timeframe str

Candle interval.

base_prices dict[str, float]

Starting price per pair (defaults to 100.0).

volatility float

Per-bar return std dev.

trend float

Per-bar drift.

seed int | None

Random seed for reproducibility.

download

download(pairs: list[str] | None = None, n_bars: int = 200, start: datetime | None = None, **kwargs: Any) -> None

Pre-populate store with historical data.

Parameters:

Name Type Description Default
pairs list[str] | None

Trading pairs to generate.

None
n_bars int

Number of bars per pair.

200
start datetime | None

Start timestamp. Defaults to n_bars intervals before now.

None
Source code in src/signalflow/data/source/virtual.py
def download(
    self,
    pairs: list[str] | None = None,
    n_bars: int = 200,
    start: datetime | None = None,
    **kwargs: Any,
) -> None:
    """Pre-populate store with historical data.

    Args:
        pairs: Trading pairs to generate.
        n_bars: Number of bars per pair.
        start: Start timestamp.  Defaults to ``n_bars`` intervals
            before now.
    """
    pairs = pairs or []
    _TIMEFRAME_MINUTES.get(self.timeframe, 1)

    if start is None:
        start = datetime(2024, 1, 1)

    for pair in pairs:
        base = self.base_prices.get(pair, 100.0)
        pair_seed = None if self.seed is None else self.seed + hash(pair) % 10000

        klines = generate_ohlcv(
            pair=pair,
            start=start,
            n_bars=n_bars,
            timeframe=self.timeframe,
            base_price=base,
            volatility=self.volatility,
            trend=self.trend,
            seed=pair_seed,
        )
        self.store.insert_klines(pair, klines)

        # Track state for continuation in sync()
        if klines:
            self._last_prices[pair] = klines[-1]["close"]
            self._bars_generated[pair] = n_bars

        logger.info(f"VirtualDataProvider: generated {n_bars} bars for {pair}")

sync async

sync(pairs: list[str], update_interval_sec: int = 60) -> None

Continuously generate new bars at a fixed interval.

Mimics BinanceSpotLoader.sync() - runs forever, writing new bars to the store each cycle.

Parameters:

Name Type Description Default
pairs list[str]

Trading pairs to stream.

required
update_interval_sec int

Seconds between new bars.

60
Source code in src/signalflow/data/source/virtual.py
async def sync(  # type: ignore[override]
    self,
    pairs: list[str],
    update_interval_sec: int = 60,
) -> None:
    """Continuously generate new bars at a fixed interval.

    Mimics ``BinanceSpotLoader.sync()`` - runs forever, writing
    new bars to the store each cycle.

    Args:
        pairs: Trading pairs to stream.
        update_interval_sec: Seconds between new bars.
    """
    logger.info(f"VirtualDataProvider sync started pairs={pairs} interval={update_interval_sec}s")

    tf_minutes = _TIMEFRAME_MINUTES.get(self.timeframe, 1)
    delta = timedelta(minutes=tf_minutes)
    rng = random.Random(self.seed)

    while True:
        for pair in pairs:
            price = self._last_prices.get(pair, self.base_prices.get(pair, 100.0))
            n = self._bars_generated.get(pair, 0)

            # Get last timestamp from store
            _, max_ts = self.store.get_time_bounds(pair)
            ts = datetime(2024, 1, 1) if max_ts is None else max_ts + delta

            # Generate one new bar
            ret = self.trend + self.volatility * rng.gauss(0, 1)
            close = price * (1 + ret)
            close = max(close, 0.01)

            spread = abs(ret) + self.volatility * 0.5
            high = close * (1 + abs(rng.gauss(0, spread * 0.3)))
            low = close * (1 - abs(rng.gauss(0, spread * 0.3)))
            open_ = price

            high = max(high, open_, close)
            low = min(low, open_, close)
            low = max(low, 0.01)

            volume = 1000.0 * (0.5 + rng.random())

            kline = {
                "timestamp": ts,
                "open": round(open_, 8),
                "high": round(high, 8),
                "low": round(low, 8),
                "close": round(close, 8),
                "volume": round(volume, 2),
                "trades": rng.randint(10, 200),
            }

            self.store.insert_klines(pair, [kline])
            self._last_prices[pair] = close
            self._bars_generated[pair] = n + 1

        logger.debug(f"VirtualDataProvider: synced {len(pairs)} pairs")
        await asyncio.sleep(update_interval_sec)

Storage

signalflow.data.raw_store.duckdb_stores.DuckDbSpotStore module-attribute

DuckDbSpotStore = DuckDbRawStore

Factory

signalflow.data.raw_data_factory.RawDataFactory

Factory for creating RawData instances from various sources.

Provides static methods to construct RawData objects from different storage backends (DuckDB, Parquet, etc.) with proper validation and schema normalization.

Key features
  • Automatic schema validation
  • Duplicate detection
  • Timezone normalization
  • Column cleanup (remove unnecessary columns)
  • Proper sorting by (pair, timestamp)
Example
from signalflow.data import RawDataFactory
from pathlib import Path
from datetime import datetime

# Load spot data from DuckDB
raw_data = RawDataFactory.from_duckdb_spot_store(
    spot_store_path=Path("data/binance_spot.duckdb"),
    pairs=["BTCUSDT", "ETHUSDT"],
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31),
    data_types=["spot"]
)

# Access loaded data
spot_df = raw_data["spot"]
print(f"Loaded {len(spot_df)} bars")
print(f"Pairs: {raw_data.pairs}")
print(f"Date range: {raw_data.datetime_start} to {raw_data.datetime_end}")

# Use in detector
from signalflow.detector import SmaCrossSignalDetector

detector = SmaCrossSignalDetector(fast_window=10, slow_window=20)
signals = detector.detect(raw_data)
See Also

RawData: Immutable container for raw market data. DuckDbSpotStore: DuckDB storage backend for spot data.

from_duckdb_spot_store staticmethod

from_duckdb_spot_store(spot_store_path: Path, pairs: list[str], start: datetime, end: datetime, data_types: list[str] | None = None, target_timeframe: str | None = None) -> RawData

Create RawData from DuckDB spot store.

Loads spot trading data from DuckDB storage with validation, deduplication checks, and schema normalization.

If data_types is None, defaults to ["spot"].

Processing steps
  1. Load data from DuckDB for specified pairs and date range
  2. Validate required columns (pair, timestamp)
  3. Remove unnecessary columns (timeframe)
  4. Normalize timestamps (microseconds, timezone-naive)
  5. Check for duplicates (pair, timestamp)
  6. Sort by (pair, timestamp)
  7. Package into RawData container

Parameters:

Name Type Description Default
spot_store_path Path

Path to DuckDB file.

required
pairs list[str]

List of trading pairs to load (e.g., ["BTCUSDT", "ETHUSDT"]).

required
start datetime

Start datetime (inclusive).

required
end datetime

End datetime (inclusive).

required
data_types list[str] | None

Data types to load. Default: None. Currently supports: ["spot"].

None

Returns:

Name Type Description
RawData RawData

Immutable container with loaded and validated data.

Raises:

Type Description
ValueError

If required columns missing (pair, timestamp).

ValueError

If duplicate (pair, timestamp) combinations detected.

Example
from pathlib import Path
from datetime import datetime
from signalflow.data import RawDataFactory

# Load single pair
raw_data = RawDataFactory.from_duckdb_spot_store(
    spot_store_path=Path("data/binance.duckdb"),
    pairs=["BTCUSDT"],
    start=datetime(2024, 1, 1),
    end=datetime(2024, 1, 31),
    data_types=["spot"]
)

# Load multiple pairs
raw_data = RawDataFactory.from_duckdb_spot_store(
    spot_store_path=Path("data/binance.duckdb"),
    pairs=["BTCUSDT", "ETHUSDT", "BNBUSDT"],
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31),
    data_types=["spot"]
)

# Check loaded data
spot_df = raw_data["spot"]
print(f"Shape: {spot_df.shape}")
print(f"Columns: {spot_df.columns}")
print(f"Pairs: {spot_df['pair'].unique().to_list()}")

# Verify no duplicates
dup_check = (
    spot_df.group_by(["pair", "timestamp"])
    .len()
    .filter(pl.col("len") > 1)
)
assert dup_check.is_empty()

# Use in pipeline
from signalflow.core import RawDataView
view = RawDataView(raw=raw_data)
spot_pandas = view.to_pandas("spot")
Example
# Handle missing data gracefully
try:
    raw_data = RawDataFactory.from_duckdb_spot_store(
        spot_store_path=Path("data/binance.duckdb"),
        pairs=["BTCUSDT"],
        start=datetime(2024, 1, 1),
        end=datetime(2024, 1, 31),
        data_types=["spot"]
    )
except ValueError as e:
    if "missing columns" in str(e):
        print("Data schema invalid")
    elif "Duplicate" in str(e):
        print("Data contains duplicates")
    raise

# Validate date range
assert raw_data.datetime_start == datetime(2024, 1, 1)
assert raw_data.datetime_end == datetime(2024, 1, 31)

# Check data quality
spot_df = raw_data["spot"]

# Verify timestamps are sorted
assert spot_df["timestamp"].is_sorted()

# Verify timezone-naive
assert spot_df["timestamp"].dtype == pl.Datetime("us")

# Verify no nulls in key columns
assert spot_df["pair"].null_count() == 0
assert spot_df["timestamp"].null_count() == 0
Note

Store connection is automatically closed via finally block. Timestamps are normalized to timezone-naive microseconds. Duplicate detection shows first 10 examples if found. All data sorted by (pair, timestamp) for consistent ordering.

Source code in src/signalflow/data/raw_data_factory.py
@staticmethod
def from_duckdb_spot_store(
    spot_store_path: Path,
    pairs: list[str],
    start: datetime,
    end: datetime,
    data_types: list[str] | None = None,
    target_timeframe: str | None = None,
) -> RawData:
    """Create RawData from DuckDB spot store.

    Loads spot trading data from DuckDB storage with validation,
    deduplication checks, and schema normalization.

    If ``data_types`` is ``None``, defaults to ``["spot"]``.

    Processing steps:
        1. Load data from DuckDB for specified pairs and date range
        2. Validate required columns (pair, timestamp)
        3. Remove unnecessary columns (timeframe)
        4. Normalize timestamps (microseconds, timezone-naive)
        5. Check for duplicates (pair, timestamp)
        6. Sort by (pair, timestamp)
        7. Package into RawData container

    Args:
        spot_store_path (Path): Path to DuckDB file.
        pairs (list[str]): List of trading pairs to load (e.g., ["BTCUSDT", "ETHUSDT"]).
        start (datetime): Start datetime (inclusive).
        end (datetime): End datetime (inclusive).
        data_types (list[str] | None): Data types to load. Default: None.
            Currently supports: ["spot"].

    Returns:
        RawData: Immutable container with loaded and validated data.

    Raises:
        ValueError: If required columns missing (pair, timestamp).
        ValueError: If duplicate (pair, timestamp) combinations detected.

    Example:
        ```python
        from pathlib import Path
        from datetime import datetime
        from signalflow.data import RawDataFactory

        # Load single pair
        raw_data = RawDataFactory.from_duckdb_spot_store(
            spot_store_path=Path("data/binance.duckdb"),
            pairs=["BTCUSDT"],
            start=datetime(2024, 1, 1),
            end=datetime(2024, 1, 31),
            data_types=["spot"]
        )

        # Load multiple pairs
        raw_data = RawDataFactory.from_duckdb_spot_store(
            spot_store_path=Path("data/binance.duckdb"),
            pairs=["BTCUSDT", "ETHUSDT", "BNBUSDT"],
            start=datetime(2024, 1, 1),
            end=datetime(2024, 12, 31),
            data_types=["spot"]
        )

        # Check loaded data
        spot_df = raw_data["spot"]
        print(f"Shape: {spot_df.shape}")
        print(f"Columns: {spot_df.columns}")
        print(f"Pairs: {spot_df['pair'].unique().to_list()}")

        # Verify no duplicates
        dup_check = (
            spot_df.group_by(["pair", "timestamp"])
            .len()
            .filter(pl.col("len") > 1)
        )
        assert dup_check.is_empty()

        # Use in pipeline
        from signalflow.core import RawDataView
        view = RawDataView(raw=raw_data)
        spot_pandas = view.to_pandas("spot")
        ```

    Example:
        ```python
        # Handle missing data gracefully
        try:
            raw_data = RawDataFactory.from_duckdb_spot_store(
                spot_store_path=Path("data/binance.duckdb"),
                pairs=["BTCUSDT"],
                start=datetime(2024, 1, 1),
                end=datetime(2024, 1, 31),
                data_types=["spot"]
            )
        except ValueError as e:
            if "missing columns" in str(e):
                print("Data schema invalid")
            elif "Duplicate" in str(e):
                print("Data contains duplicates")
            raise

        # Validate date range
        assert raw_data.datetime_start == datetime(2024, 1, 1)
        assert raw_data.datetime_end == datetime(2024, 1, 31)

        # Check data quality
        spot_df = raw_data["spot"]

        # Verify timestamps are sorted
        assert spot_df["timestamp"].is_sorted()

        # Verify timezone-naive
        assert spot_df["timestamp"].dtype == pl.Datetime("us")

        # Verify no nulls in key columns
        assert spot_df["pair"].null_count() == 0
        assert spot_df["timestamp"].null_count() == 0
        ```

    Note:
        Store connection is automatically closed via finally block.
        Timestamps are normalized to timezone-naive microseconds.
        Duplicate detection shows first 10 examples if found.
        All data sorted by (pair, timestamp) for consistent ordering.
    """
    if data_types is None:
        data_types = ["spot"]

    data: dict[str, pl.DataFrame] = {}
    store = DuckDbSpotStore(spot_store_path)
    try:
        if "spot" in data_types:
            spot = store.load_many(pairs=pairs, start=start, end=end)

            required = {"pair", "timestamp"}
            missing = required - set(spot.columns)
            if missing:
                raise ValueError(f"Spot df missing columns: {sorted(missing)}")

            if "timeframe" in spot.columns:
                spot = spot.drop("timeframe")

            spot = spot.with_columns(pl.col("timestamp").cast(pl.Datetime("us")).dt.replace_time_zone(None))

            dup_count = spot.group_by(["pair", "timestamp"]).len().filter(pl.col("len") > 1)
            if dup_count.height > 0:
                dups = (
                    spot.join(
                        dup_count.select(["pair", "timestamp"]),
                        on=["pair", "timestamp"],
                    )
                    .select(["pair", "timestamp"])
                    .head(10)
                )
                raise ValueError(f"Duplicate (pair, timestamp) detected. Examples:\n{dups}")

            spot = spot.sort(["pair", "timestamp"])

            # Resample to target timeframe if requested
            if target_timeframe is not None and spot.height > 1:
                from signalflow.data.resample import align_to_timeframe

                spot = align_to_timeframe(spot, target_timeframe)

            data["spot"] = spot

        return RawData(
            datetime_start=start,
            datetime_end=end,
            pairs=pairs,
            data=cast(dict[str, pl.DataFrame | dict[str, pl.DataFrame]], data),
        )
    finally:
        store.close()

from_stores staticmethod

from_stores(stores: Mapping[str, RawDataStore] | Sequence[RawDataStore], pairs: list[str], start: datetime, end: datetime, default_source: str | None = None, target_timeframe: str | None = None) -> RawData

Create RawData from multiple stores.

Supports two input formats: - Dict: {source_name: store} for multi-source per data_type (nested structure) - Sequence: [store1, store2] for single source per data_type (flat structure)

Parameters:

Name Type Description Default
stores Mapping[str, RawDataStore] | Sequence[RawDataStore]

Either dict mapping source names to stores, or sequence of stores. Dict format creates nested structure: data[data_type][source] = DataFrame. Sequence format creates flat structure: data[data_type] = DataFrame.

required
pairs list[str]

List of trading pairs to load.

required
start datetime

Start datetime (inclusive).

required
end datetime

End datetime (inclusive).

required
default_source str | None

Default source for nested data access. Used when accessing data without explicit source.

None
target_timeframe str | None

Target timeframe (e.g. "1h"). If provided, data is auto-resampled to this timeframe.

None

Returns:

Name Type Description
RawData RawData

Container with merged data from all stores.

Raises:

Type Description
ValueError

If stores have duplicate keys (flat) or conflicting data_types (nested).

Example
from signalflow.data import RawDataFactory, DuckDbRawStore

# Multi-source (dict) - creates nested structure
raw = RawDataFactory.from_stores(
    stores={
        "binance": DuckDbRawStore(db_path="binance.duckdb", data_type="perpetual"),
        "okx": DuckDbRawStore(db_path="okx.duckdb", data_type="perpetual"),
        "bybit": DuckDbRawStore(db_path="bybit.duckdb", data_type="perpetual"),
    },
    pairs=["BTCUSDT", "ETHUSDT"],
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31),
    default_source="binance",
)

# Hierarchical access
df = raw.perpetual.binance           # specific source
df = raw.perpetual.to_polars()       # default with warning
print(raw.perpetual.sources)         # ["binance", "okx", "bybit"]

# Single-source (sequence) - creates flat structure
raw = RawDataFactory.from_stores(
    stores=[spot_store, futures_store],
    pairs=["BTCUSDT", "ETHUSDT"],
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31),
)

# Simple access
spot_df = raw["spot"]
futures_df = raw["futures"]
Source code in src/signalflow/data/raw_data_factory.py
@staticmethod
def from_stores(
    stores: Mapping[str, RawDataStore] | Sequence[RawDataStore],
    pairs: list[str],
    start: datetime,
    end: datetime,
    default_source: str | None = None,
    target_timeframe: str | None = None,
) -> RawData:
    """Create RawData from multiple stores.

    Supports two input formats:
    - Dict: {source_name: store} for multi-source per data_type (nested structure)
    - Sequence: [store1, store2] for single source per data_type (flat structure)

    Args:
        stores: Either dict mapping source names to stores, or sequence of stores.
            Dict format creates nested structure: data[data_type][source] = DataFrame.
            Sequence format creates flat structure: data[data_type] = DataFrame.
        pairs: List of trading pairs to load.
        start: Start datetime (inclusive).
        end: End datetime (inclusive).
        default_source: Default source for nested data access.
            Used when accessing data without explicit source.
        target_timeframe: Target timeframe (e.g. ``"1h"``).
            If provided, data is auto-resampled to this timeframe.

    Returns:
        RawData: Container with merged data from all stores.

    Raises:
        ValueError: If stores have duplicate keys (flat) or conflicting data_types (nested).

    Example:
        ```python
        from signalflow.data import RawDataFactory, DuckDbRawStore

        # Multi-source (dict) - creates nested structure
        raw = RawDataFactory.from_stores(
            stores={
                "binance": DuckDbRawStore(db_path="binance.duckdb", data_type="perpetual"),
                "okx": DuckDbRawStore(db_path="okx.duckdb", data_type="perpetual"),
                "bybit": DuckDbRawStore(db_path="bybit.duckdb", data_type="perpetual"),
            },
            pairs=["BTCUSDT", "ETHUSDT"],
            start=datetime(2024, 1, 1),
            end=datetime(2024, 12, 31),
            default_source="binance",
        )

        # Hierarchical access
        df = raw.perpetual.binance           # specific source
        df = raw.perpetual.to_polars()       # default with warning
        print(raw.perpetual.sources)         # ["binance", "okx", "bybit"]

        # Single-source (sequence) - creates flat structure
        raw = RawDataFactory.from_stores(
            stores=[spot_store, futures_store],
            pairs=["BTCUSDT", "ETHUSDT"],
            start=datetime(2024, 1, 1),
            end=datetime(2024, 12, 31),
        )

        # Simple access
        spot_df = raw["spot"]
        futures_df = raw["futures"]
        ```
    """
    if not stores:
        return RawData(
            datetime_start=start,
            datetime_end=end,
            pairs=pairs,
            data={},
            default_source=default_source,
        )

    # Dict input: multi-source per data_type (nested structure)
    if isinstance(stores, Mapping):
        return RawDataFactory._from_stores_dict(
            stores=stores,
            pairs=pairs,
            start=start,
            end=end,
            default_source=default_source,
            target_timeframe=target_timeframe,
        )

    # Sequence input: single source per data_type (flat structure)
    return RawDataFactory._from_stores_sequence(
        stores=stores,
        pairs=pairs,
        start=start,
        end=end,
        default_source=default_source,
        target_timeframe=target_timeframe,
    )

Resampling

Unified OHLCV timeframe resampling with exchange-aware timeframe selection.

from signalflow.data.resample import (
    resample_ohlcv,
    align_to_timeframe,
    detect_timeframe,
    select_best_timeframe,
    can_resample,
)

# Auto-detect and resample to 1h
df_1h = align_to_timeframe(raw_df, target_tf="1h")

# Find best exchange timeframe for download
best = select_best_timeframe("bybit", target_tf="8h")  # "4h"

signalflow.data.resample.resample_ohlcv

resample_ohlcv(df: DataFrame, source_tf: str, target_tf: str, *, pair_col: str = 'pair', ts_col: str = 'timestamp', fill_rules: dict[str, str] | None = None) -> pl.DataFrame

Resample an OHLCV DataFrame from source_tf to target_tf.

Parameters:

Name Type Description Default
df DataFrame

Source DataFrame with OHLCV columns.

required
source_tf str

Current timeframe of the data.

required
target_tf str

Desired target timeframe.

required
pair_col str

Pair/group column name.

'pair'
ts_col str

Timestamp column name.

'timestamp'
fill_rules dict[str, str] | None

Per-column aggregation rules. Defaults to :data:DEFAULT_FILL_RULES. Unknown columns default to "last".

None

Returns:

Type Description
DataFrame

Resampled DataFrame.

Raises:

Type Description
ValueError

If resampling is not possible.

Example

df_4h = resample_ohlcv(df_1h, "1h", "4h")

Source code in src/signalflow/data/resample.py
def resample_ohlcv(
    df: pl.DataFrame,
    source_tf: str,
    target_tf: str,
    *,
    pair_col: str = "pair",
    ts_col: str = "timestamp",
    fill_rules: dict[str, str] | None = None,
) -> pl.DataFrame:
    """Resample an OHLCV DataFrame from *source_tf* to *target_tf*.

    Args:
        df: Source DataFrame with OHLCV columns.
        source_tf: Current timeframe of the data.
        target_tf: Desired target timeframe.
        pair_col: Pair/group column name.
        ts_col: Timestamp column name.
        fill_rules: Per-column aggregation rules. Defaults to
            :data:`DEFAULT_FILL_RULES`. Unknown columns default to
            ``"last"``.

    Returns:
        Resampled DataFrame.

    Raises:
        ValueError: If resampling is not possible.

    Example:
        >>> df_4h = resample_ohlcv(df_1h, "1h", "4h")
    """
    if source_tf == target_tf:
        return df

    if not can_resample(source_tf, target_tf):
        raise ValueError(f"Cannot resample from {source_tf} to {target_tf}: target must be an exact multiple of source")

    rules = {**DEFAULT_FILL_RULES, **(fill_rules or {})}
    every = _TRUNCATE_EVERY[target_tf]

    # Build aggregation expressions.
    agg_exprs: list[pl.Expr] = [pl.col(ts_col).max()]
    skip = {pair_col, ts_col}

    for col in df.columns:
        if col in skip:
            continue
        rule = rules.get(col, "last")
        if rule not in _AGG_MAP:
            raise ValueError(f"Unknown fill rule {rule!r} for column {col!r}")
        agg_exprs.append(_AGG_MAP[rule](col))

    # Truncate timestamps to target bins and aggregate.
    result = (
        df.with_columns(pl.col(ts_col).dt.truncate(every).alias("_bin"))
        .group_by([pair_col, "_bin"], maintain_order=True)
        .agg(agg_exprs)
        .drop("_bin")
        .sort([pair_col, ts_col])
    )

    return result

signalflow.data.resample.align_to_timeframe

align_to_timeframe(df: DataFrame, target_tf: str, *, pair_col: str = 'pair', ts_col: str = 'timestamp', fill_rules: dict[str, str] | None = None) -> pl.DataFrame

Detect source timeframe and resample to target_tf if possible.

If the source timeframe equals the target, returns the data unchanged. If resampling is not possible (e.g. "3m""2m"), emits a warning and returns the original data.

Parameters:

Name Type Description Default
df DataFrame

OHLCV DataFrame.

required
target_tf str

Desired timeframe.

required
pair_col str

Pair/group column name.

'pair'
ts_col str

Timestamp column name.

'timestamp'
fill_rules dict[str, str] | None

Per-column aggregation rules.

None

Returns:

Type Description
DataFrame

Resampled DataFrame, or original if alignment is not possible.

Example

df_1h = align_to_timeframe(raw_df, "1h")

Source code in src/signalflow/data/resample.py
def align_to_timeframe(
    df: pl.DataFrame,
    target_tf: str,
    *,
    pair_col: str = "pair",
    ts_col: str = "timestamp",
    fill_rules: dict[str, str] | None = None,
) -> pl.DataFrame:
    """Detect source timeframe and resample to *target_tf* if possible.

    If the source timeframe equals the target, returns the data unchanged.
    If resampling is not possible (e.g. ``"3m"`` → ``"2m"``), emits a
    warning and returns the original data.

    Args:
        df: OHLCV DataFrame.
        target_tf: Desired timeframe.
        pair_col: Pair/group column name.
        ts_col: Timestamp column name.
        fill_rules: Per-column aggregation rules.

    Returns:
        Resampled DataFrame, or original if alignment is not possible.

    Example:
        >>> df_1h = align_to_timeframe(raw_df, "1h")
    """
    try:
        source_tf = detect_timeframe(df, ts_col=ts_col, pair_col=pair_col)
    except ValueError:
        warnings.warn(
            f"Could not detect source timeframe; returning data as-is. Target was {target_tf!r}.",
            stacklevel=2,
        )
        return df

    if source_tf == target_tf:
        return df

    if not can_resample(source_tf, target_tf):
        warnings.warn(
            f"Cannot resample from {source_tf} to {target_tf} "
            f"(target must be an exact multiple of source). Returning data as-is.",
            stacklevel=2,
        )
        return df

    return resample_ohlcv(
        df,
        source_tf,
        target_tf,
        pair_col=pair_col,
        ts_col=ts_col,
        fill_rules=fill_rules,
    )

signalflow.data.resample.detect_timeframe

detect_timeframe(df: DataFrame, ts_col: str = 'timestamp', pair_col: str = 'pair') -> str

Auto-detect the timeframe of an OHLCV DataFrame.

Computes the most common timestamp delta per pair and maps it to the closest known timeframe string.

Parameters:

Name Type Description Default
df DataFrame

OHLCV DataFrame with at least ts_col and pair_col.

required
ts_col str

Timestamp column name.

'timestamp'
pair_col str

Pair/group column name.

'pair'

Returns:

Type Description
str

Detected timeframe string (e.g. "1h").

Raises:

Type Description
ValueError

If the DataFrame is too small or the delta doesn't match any known timeframe.

Example

detect_timeframe(hourly_df) '1h'

Source code in src/signalflow/data/resample.py
def detect_timeframe(
    df: pl.DataFrame,
    ts_col: str = "timestamp",
    pair_col: str = "pair",
) -> str:
    """Auto-detect the timeframe of an OHLCV DataFrame.

    Computes the most common timestamp delta per pair and maps it to the
    closest known timeframe string.

    Args:
        df: OHLCV DataFrame with at least *ts_col* and *pair_col*.
        ts_col: Timestamp column name.
        pair_col: Pair/group column name.

    Returns:
        Detected timeframe string (e.g. ``"1h"``).

    Raises:
        ValueError: If the DataFrame is too small or the delta doesn't
            match any known timeframe.

    Example:
        >>> detect_timeframe(hourly_df)
        '1h'
    """
    if df.height < 2:
        raise ValueError("DataFrame must have at least 2 rows to detect timeframe")

    # Compute per-pair diffs and find the mode.
    deltas = (
        df.sort([pair_col, ts_col])
        .with_columns(pl.col(ts_col).diff().over(pair_col).alias("_delta"))
        .filter(pl.col("_delta").is_not_null())
        .select("_delta")
    )

    if deltas.height == 0:
        raise ValueError("Cannot detect timeframe: no timestamp deltas computed")

    mode_delta = deltas.group_by("_delta").len().sort("len", descending=True).row(0)[0]

    delta_minutes = int(mode_delta.total_seconds() // 60)

    # Match to known timeframe.
    minutes_to_tf = {v: k for k, v in TIMEFRAME_MINUTES.items()}
    if delta_minutes not in minutes_to_tf:
        raise ValueError(
            f"Detected delta of {delta_minutes} minutes does not match any known "
            f"timeframe. Known: {sorted(minutes_to_tf.keys())}"
        )

    return minutes_to_tf[delta_minutes]

signalflow.data.resample.can_resample

can_resample(source_tf: str, target_tf: str) -> bool

Check whether source_tf can be resampled to target_tf.

Resampling is possible when target_tf is an exact integer multiple of source_tf (and target >= source).

Parameters:

Name Type Description Default
source_tf str

Source timeframe (e.g. "1m").

required
target_tf str

Target timeframe (e.g. "1h").

required

Returns:

Type Description
bool

True if resampling is possible.

Example

can_resample("1h", "4h") True can_resample("1h", "3h") False

Source code in src/signalflow/data/resample.py
def can_resample(source_tf: str, target_tf: str) -> bool:
    """Check whether *source_tf* can be resampled to *target_tf*.

    Resampling is possible when *target_tf* is an exact integer multiple
    of *source_tf* (and target >= source).

    Args:
        source_tf: Source timeframe (e.g. ``"1m"``).
        target_tf: Target timeframe (e.g. ``"1h"``).

    Returns:
        ``True`` if resampling is possible.

    Example:
        >>> can_resample("1h", "4h")
        True
        >>> can_resample("1h", "3h")
        False
    """
    if source_tf not in TIMEFRAME_MINUTES or target_tf not in TIMEFRAME_MINUTES:
        return False
    src = TIMEFRAME_MINUTES[source_tf]
    tgt = TIMEFRAME_MINUTES[target_tf]
    if tgt < src:
        return False
    return tgt % src == 0

signalflow.data.resample.select_best_timeframe

select_best_timeframe(exchange: str, target_tf: str) -> str

Select the best download timeframe for an exchange.

Strategy
  1. If the exchange supports target_tf, return it.
  2. Otherwise pick the largest supported timeframe that evenly divides target_tf.
  3. If nothing divides evenly, return the smallest supported timeframe ("1m" in most cases).

Parameters:

Name Type Description Default
exchange str

Exchange name (lowercase, e.g. "bybit").

required
target_tf str

Desired target timeframe.

required

Returns:

Type Description
str

Best timeframe to download.

Raises:

Type Description
ValueError

If exchange is unknown.

Example

select_best_timeframe("bybit", "8h") '4h' select_best_timeframe("binance", "8h") '8h'

Source code in src/signalflow/data/resample.py
def select_best_timeframe(exchange: str, target_tf: str) -> str:
    """Select the best download timeframe for an exchange.

    Strategy:
        1. If the exchange supports *target_tf*, return it.
        2. Otherwise pick the largest supported timeframe that evenly
           divides *target_tf*.
        3. If nothing divides evenly, return the smallest supported
           timeframe (``"1m"`` in most cases).

    Args:
        exchange: Exchange name (lowercase, e.g. ``"bybit"``).
        target_tf: Desired target timeframe.

    Returns:
        Best timeframe to download.

    Raises:
        ValueError: If *exchange* is unknown.

    Example:
        >>> select_best_timeframe("bybit", "8h")
        '4h'
        >>> select_best_timeframe("binance", "8h")
        '8h'
    """
    exchange = exchange.lower()
    if exchange not in EXCHANGE_TIMEFRAMES:
        valid = ", ".join(sorted(EXCHANGE_TIMEFRAMES))
        raise ValueError(f"Unknown exchange {exchange!r}. Known: {valid}")

    supported = EXCHANGE_TIMEFRAMES[exchange]

    if target_tf in supported:
        return target_tf

    target_min = timeframe_to_minutes(target_tf)

    # Find largest divisor.
    best: str | None = None
    best_min = 0
    for tf in supported:
        tf_min = TIMEFRAME_MINUTES[tf]
        if tf_min <= target_min and target_min % tf_min == 0 and tf_min > best_min:
            best = tf
            best_min = tf_min

    if best is not None:
        return best

    # Fallback: smallest supported timeframe.
    return min(supported, key=lambda t: TIMEFRAME_MINUTES[t])

signalflow.data.resample.timeframe_to_minutes

timeframe_to_minutes(tf: str) -> int

Convert a timeframe string to minutes.

Parameters:

Name Type Description Default
tf str

Timeframe string (e.g. "1h", "4h", "1d").

required

Returns:

Type Description
int

Number of minutes.

Raises:

Type Description
ValueError

If tf is not a recognised timeframe.

Example

timeframe_to_minutes("4h") 240

Source code in src/signalflow/data/resample.py
def timeframe_to_minutes(tf: str) -> int:
    """Convert a timeframe string to minutes.

    Args:
        tf: Timeframe string (e.g. ``"1h"``, ``"4h"``, ``"1d"``).

    Returns:
        Number of minutes.

    Raises:
        ValueError: If *tf* is not a recognised timeframe.

    Example:
        >>> timeframe_to_minutes("4h")
        240
    """
    if tf not in TIMEFRAME_MINUTES:
        valid = ", ".join(sorted(TIMEFRAME_MINUTES, key=lambda k: TIMEFRAME_MINUTES[k]))
        raise ValueError(f"Unknown timeframe {tf!r}. Valid: {valid}")
    return TIMEFRAME_MINUTES[tf]