Custom Data Types¶
SignalFlow ships with three built-in raw data types: SPOT, FUTURES, and PERPETUAL. Each type defines a set of required columns that the framework uses for validation, feature computation, and visualization.
You can register your own data types to work with non-standard data sources such as limit order books, tick data, or any custom schema.
Built-in Types¶
| Type | Columns |
|---|---|
SPOT |
pair, timestamp, open, high, low, close, volume |
FUTURES |
SPOT + open_interest |
PERPETUAL |
SPOT + funding_rate, open_interest |
Access columns programmatically:
from signalflow.core.enums import RawDataType
print(RawDataType.SPOT.columns)
# {'pair', 'timestamp', 'open', 'high', 'low', 'close', 'volume'}
Registering a Custom Type¶
Use default_registry.register_raw_data_type() to add a new data type:
from signalflow.core.registry import default_registry
default_registry.register_raw_data_type(
name="lob",
columns=["pair", "timestamp", "bid", "ask", "bid_size", "ask_size", "depth"],
)
After registration, the type is available everywhere in the framework:
# Look up columns
cols = default_registry.get_raw_data_columns("lob")
# {'pair', 'timestamp', 'bid', 'ask', 'bid_size', 'ask_size', 'depth'}
# List all registered types
print(default_registry.list_raw_data_types())
# ['futures', 'lob', 'perpetual', 'spot']
Using Custom Types with Components¶
FeaturePipeline¶
Pass the custom type name as a string to raw_data_type:
from signalflow.feature import FeaturePipeline
pipeline = FeaturePipeline(
features=[MyLobFeature(depth_levels=5)],
raw_data_type="lob", # custom type
)
The pipeline validates that each feature's required columns are satisfied by the registered column set. If a feature requires a column not present in your type, you get a clear error at construction time.
SignalDetector¶
from signalflow.detector import SignalDetector
class LobImbalanceDetector(SignalDetector):
raw_data_type = "lob"
def detect(self, df, context=None):
# df is guaranteed to have lob columns
imbalance = df["bid_size"] - df["ask_size"]
...
RawDataView¶
from signalflow.core import RawData, RawDataView
from signalflow.core.enums import DataFrameType
raw = RawData(
datetime_start=start,
datetime_end=end,
pairs=["BTCUSDT"],
data={"lob": lob_dataframe},
)
view = RawDataView(raw=raw)
# Access by custom type name
lob_pl = view.get_data("lob", DataFrameType.POLARS)
lob_pd = view.get_data("lob", DataFrameType.PANDAS)
Overriding Built-in Types¶
If you need to extend or redefine columns for a built-in type, pass override=True:
default_registry.register_raw_data_type(
name="spot",
columns=["pair", "timestamp", "open", "high", "low", "close", "volume", "trades"],
override=True,
)
Warning
Overriding built-in types affects all components that reference them. Use with caution.
Full Example: Tick Data Pipeline¶
from signalflow.core.registry import default_registry
from signalflow.core import RawData, RawDataView
from signalflow.feature import Feature, FeaturePipeline
from signalflow.core.enums import DataFrameType
from dataclasses import dataclass
from typing import ClassVar
from datetime import datetime
import polars as pl
# 1. Register custom type
default_registry.register_raw_data_type(
name="tick",
columns=["pair", "timestamp", "price", "qty", "is_buyer_maker"],
)
# 2. Define a feature for tick data
@dataclass
class TickVwapFeature(Feature):
window: int = 100
requires: ClassVar[list[str]] = ["price", "qty"]
outputs: ClassVar[list[str]] = ["vwap_{window}"]
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
pq = (pl.col("price") * pl.col("qty")).rolling_sum(self.window)
q = pl.col("qty").rolling_sum(self.window)
return df.with_columns((pq / q).alias(f"vwap_{self.window}"))
# 3. Build pipeline
pipeline = FeaturePipeline(
features=[TickVwapFeature(window=100)],
raw_data_type="tick",
)
# 4. Create data and compute
tick_df = pl.DataFrame({
"pair": ["BTCUSDT"] * 200,
"timestamp": [datetime(2024, 1, 1, 10, 0, i) for i in range(200)],
"price": [45000.0 + i * 0.5 for i in range(200)],
"qty": [0.1] * 200,
"is_buyer_maker": [True, False] * 100,
})
raw = RawData(
datetime_start=datetime(2024, 1, 1),
datetime_end=datetime(2024, 1, 2),
pairs=["BTCUSDT"],
data={"tick": tick_df},
)
view = RawDataView(raw=raw)
result = pipeline.run(view)
print(result.select("pair", "timestamp", "price", "vwap_100").tail(5))
API Reference¶
See the full API documentation for the registry methods:
register_raw_data_type()- Register a new data typeget_raw_data_columns()- Look up columns for any typelist_raw_data_types()- List all registered types