External Model Integration¶
This guide covers two distinct ways of bringing models into SignalFlow:
- Forecast models — pinned, versioned prediction artefacts (e.g. a
reversion-probability model) that a flow registers with
.forecast(...)and that detectors/validators consume as one more input. Covered first below. - Decision models — an ML/RL model that implements the
StrategyModelprotocol and directly emitsStrategyDecisionobjects (ENTER/CLOSE/...). Covered from Decision Models onward.
Forecast Models¶
A forecast is the continuous output of a model (e.g. p_revert, an
expected return) — not a trade recommendation. A detector turns forecasts
into discrete signals; a validator can read them when
deciding whether to keep a signal. Forecast models are trained elsewhere and
arrive as versioned, reproducible artefacts; the trading flow never trains
them, it only references and consumes them.
Registering a pinned artefact: .forecast(...)¶
FlowBuilder.forecast() records a pinned ModelRef under a
local name. No weights are loaded here — this is purely declarative; weights
resolve later through the registry when the flow runs.
def forecast(
self,
name: str,
*,
mlflow: str | None = None,
hf_path: str | None = None,
version: str | int | None = None,
source: str | None = None,
) -> Self: ...
| Argument | Meaning |
|---|---|
name |
Local name consumers reference via forecasts=[name]. |
mlflow |
MLflow model URI, e.g. "models:/revert/3" (version embedded in the URI). |
hf_path |
HuggingFace Hub path (source="hf"); requires an explicit version=. |
version |
Explicit version when not embedded in the URI. "latest" is rejected unless SF_ALLOW_LATEST=1. |
source |
Override artefact source ("mlflow" | "hf"); inferred otherwise. |
Registering the same name twice, omitting both mlflow= and hf_path=, or a
version= that conflicts with the version inside an mlflow= URI all raise
ConfigurationError.
Consuming a forecast: forecasts= + forecast_window=¶
.detector(), .validator() and .exit() each accept two keyword arguments:
forecasts: list[str]— names of registered forecasts this consumer reads. The consumer sees a window of forecast values[t-w, t], not just the latest point.forecast_window: int— window length in bars. It is required wheneverforecastsis given, and must be positive.
This is the warmup-silence contract: by fixing the window in bars (rather
than "however much accumulated"), backtest and live cold-start cut the identical
slice, so parity holds. Passing forecast_window without forecasts, or
forecasts without a positive forecast_window, raises ConfigurationError.
Why the version is mandatory (parity)¶
A ModelRef requires an explicit version. A floating version="latest"
silently breaks parity and reproducibility between training and live inference,
so it is rejected unless SF_ALLOW_LATEST=1 is set in the environment (dev
opt-in only). Pinning the version is what guarantees that the exact same model
artefact is used in research and in production.
Lazy loading: Resolver and ModelRegistry¶
Registration is cheap and offline. When the flow runs, a
ModelRegistry (typically CachingModelRegistry wrapping
an MlflowResolver) resolves each ModelRef to actual weights — and caches
them so a ref is loaded at most once. Importing signalflow.models does not
require mlflow; the dependency is imported only at resolve time.
from signalflow.models import ModelRef, MlflowResolver, CachingModelRegistry
registry = CachingModelRegistry(MlflowResolver())
model = registry.get(ModelRef.parse("models:/revert/3")) # weights load here, then cached
Worked example¶
import signalflow as sf
flow = (
sf.flow()
.data(store="binance", pair="BTC/USDT")
# 1. Register a pinned forecast artefact (lazy — no weights loaded yet)
.forecast("revert", mlflow="models:/revert/3")
# 2. A detector reads a 30-bar window of that forecast
.detector("example/sma_cross", forecasts=["revert"], forecast_window=30)
# 3. A validator may read the same forecast over its own window
.validator("validator/lightgbm", forecasts=["revert"], forecast_window=60)
)
# Forecast references are validated against registered .forecast() artefacts at run time:
result = flow.run()
A forecast may be referenced before it is registered (the existence check runs
lazily at .run()), but every referenced name must ultimately be declared via
.forecast(), otherwise .run() raises ConfigurationError.
Features live inside the artefact
The former .features() builder method on flow was removed. A forecast
model carries its own feature recipe inside the artefact, and the
train↔serve feature hash (see ModelFeaturesPipeline)
guards against recipe drift. Detectors compute any primitive features they
need internally.
Decision Models (StrategyModel)¶
The rest of this guide covers a different integration: an ML/RL model that
makes trade decisions directly via the StrategyModel protocol (in contrast
to a forecast, which only predicts and lets a detector decide).
SignalFlow supports this via a Protocol-based interface:
Backtest Bar
|
v
Build ModelContext (signals, metrics, positions)
|
v
model.decide(context) --> list[StrategyDecision]
|
+---> CLOSE/CLOSE_ALL --> ModelExitRule --> Exit Orders
|
+---> ENTER --> ModelEntryRule --> Entry Orders
Design Principle: Models receive signals and metrics, NOT raw OHLCV prices.
Quick Start¶
1. Implement the Protocol¶
from signalflow.strategy.model import (
StrategyModel,
StrategyAction,
StrategyDecision,
ModelContext,
)
class MyModel:
"""Your ML/RL model implementing StrategyModel protocol."""
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
for row in context.signals.value.iter_rows(named=True):
prob = row.get("probability", 0.5)
if prob > 0.7:
decisions.append(StrategyDecision(
action=StrategyAction.ENTER,
pair=row["pair"],
confidence=prob,
))
return decisions
2. Create Rules¶
from signalflow.strategy.model import ModelEntryRule, ModelExitRule
model = MyModel()
entry_rule = ModelEntryRule(
model=model,
base_position_size=0.02,
max_positions=5,
min_confidence=0.6,
)
exit_rule = ModelExitRule(
model=model,
min_confidence=0.7,
)
3. Run Backtest¶
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
runner = BacktestRunner(
strategy_id="model_strategy",
broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
entry_rules=[entry_rule],
exit_rules=[exit_rule],
initial_capital=10_000.0,
)
state = runner.run(raw_data, signals)
Decision Types¶
Models return StrategyDecision objects with these actions:
| Action | Description | Required Fields |
|---|---|---|
ENTER |
Open new position | pair, optionally size_multiplier |
SKIP |
Skip this signal | pair |
CLOSE |
Close specific position | pair, position_id |
CLOSE_ALL |
Close all positions for pair | pair |
HOLD |
Do nothing | - |
Entry Decision¶
StrategyDecision(
action=StrategyAction.ENTER,
pair="BTCUSDT",
size_multiplier=1.5, # 1.5x base position size
confidence=0.85,
meta={"reason": "high_confidence_signal"},
)
Exit Decision¶
StrategyDecision(
action=StrategyAction.CLOSE,
pair="BTCUSDT",
position_id="pos_abc123", # Required for CLOSE
confidence=0.9,
meta={"reason": "take_profit"},
)
Close All Positions¶
StrategyDecision(
action=StrategyAction.CLOSE_ALL,
pair="BTCUSDT", # Close all BTC positions
confidence=0.8,
meta={"reason": "risk_off"},
)
ModelContext¶
The context passed to model.decide() contains:
| Field | Type | Description |
|---|---|---|
timestamp |
datetime |
Current bar timestamp |
signals |
Signals |
Current bar signals (from detectors) |
prices |
dict[str, float] |
Current prices per pair |
positions |
list[Position] |
Open positions |
metrics |
dict[str, float] |
Portfolio metrics (equity, drawdown, etc.) |
runtime |
dict[str, Any] |
Custom state (regime, ATR, etc.) |
Accessing Context Data¶
def decide(self, context: ModelContext) -> list[StrategyDecision]:
# Check portfolio state
equity = context.metrics.get("equity", 0)
drawdown = context.metrics.get("max_drawdown", 0)
# Risk check
if drawdown > 0.15:
return [] # No trading during high drawdown
# Process signals
for row in context.signals.value.iter_rows(named=True):
pair = row["pair"]
signal_type = row["signal_type"]
prob = row["probability"]
# Get current price
price = context.prices.get(pair, 0)
# Check existing positions
pair_positions = [p for p in context.positions if p.pair == pair]
# Your model logic here...
Model-Based Exit Management¶
Models can manage exits by analyzing open positions:
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
for pos in context.positions:
price = context.prices.get(pos.pair, pos.entry_price)
pnl_pct = (price - pos.entry_price) / pos.entry_price
# Take profit
if pnl_pct > 0.05: # 5% profit
decisions.append(StrategyDecision(
action=StrategyAction.CLOSE,
pair=pos.pair,
position_id=pos.id,
confidence=0.9,
meta={"reason": "model_take_profit"},
))
# Stop loss
elif pnl_pct < -0.02: # 2% loss
decisions.append(StrategyDecision(
action=StrategyAction.CLOSE,
pair=pos.pair,
position_id=pos.id,
confidence=0.95,
meta={"reason": "model_stop_loss"},
))
return decisions
Combining with Traditional Rules¶
Model rules work alongside traditional rules:
from signalflow.strategy.component.exit import TakeProfitStopLossExit
runner = BacktestRunner(
strategy_id="hybrid_strategy",
broker=broker,
entry_rules=[
ModelEntryRule(model=model, base_position_size=0.02),
],
exit_rules=[
ModelExitRule(model=model), # Model-based exits first
TakeProfitStopLossExit( # Fallback TP/SL
take_profit_pct=0.05,
stop_loss_pct=0.03,
),
],
initial_capital=10_000.0,
)
Exit rules are processed in order - model exits run first, then traditional rules catch remaining positions.
Decision Caching¶
The model is called once per bar. Decisions are cached in state.runtime:
Bar Start
|
v
ExitRule.check_exits()
├── Check cache for decisions
├── If empty: call model.decide(), cache result
└── Process CLOSE/CLOSE_ALL decisions
|
v
EntryRule.check_entries()
├── Check cache (uses cached decisions)
└── Process ENTER decisions
|
v
state.reset_tick_cache() clears cache for next bar
This ensures consistent decisions across entry and exit processing.
Training Data Export¶
Export backtest results for model training:
from pathlib import Path
from signalflow.strategy.exporter import BacktestExporter
exporter = BacktestExporter()
# During backtest
for ts in timestamps:
# ... process bar ...
exporter.export_bar(ts, signals, state.metrics, state)
# When positions close
exporter.export_position_close(position, exit_time, exit_price, "take_profit")
# Write to disk
exporter.finalize(Path("./training_data"))
Exported Files¶
bars.parquet - Per-bar state:
import polars as pl
bars = pl.read_parquet("./training_data/bars.parquet")
# Columns: timestamp, pair, signal_type, probability,
# metric_equity, metric_max_drawdown, ...
trades.parquet - Completed trades:
trades = pl.read_parquet("./training_data/trades.parquet")
# Columns: position_id, pair, entry_time, exit_time,
# entry_price, exit_price, realized_pnl, exit_reason, ...
RL Model Example¶
Complete example with reinforcement learning patterns:
import numpy as np
from signalflow.strategy.model import (
StrategyAction,
StrategyDecision,
ModelContext,
)
class RLTradingModel:
"""RL model for trading decisions."""
def __init__(self, model_path: str, epsilon: float = 0.0):
self.model = self._load_model(model_path)
self.epsilon = epsilon # Exploration rate
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
# Build state features
features = self._build_features(context)
# Get Q-values from model
q_values = self.model.predict(features)
# Epsilon-greedy action selection (for training)
if np.random.random() < self.epsilon:
action_idx = np.random.choice(len(q_values))
else:
action_idx = np.argmax(q_values)
# Map to trading action
action = self._map_action(action_idx, context)
if action:
decisions.append(action)
return decisions
def _build_features(self, context: ModelContext) -> np.ndarray:
"""Build feature vector from context."""
features = []
# Portfolio state
features.append(context.metrics.get("equity", 10000) / 10000)
features.append(context.metrics.get("max_drawdown", 0))
# Position count
features.append(len(context.positions) / 10)
# Signal features
for row in context.signals.value.head(5).iter_rows(named=True):
features.append(row.get("probability", 0.5))
features.append(1 if row.get("signal_type") == "rise" else -1)
# Pad if fewer signals
while len(features) < 15:
features.append(0)
return np.array(features[:15])
def _map_action(
self, action_idx: int, context: ModelContext
) -> StrategyDecision | None:
"""Map action index to StrategyDecision."""
# 0: HOLD, 1: ENTER, 2: CLOSE_ALL
if action_idx == 1 and context.signals.value.height > 0:
row = context.signals.value.row(0, named=True)
return StrategyDecision(
action=StrategyAction.ENTER,
pair=row["pair"],
confidence=0.8,
)
elif action_idx == 2 and context.positions:
return StrategyDecision(
action=StrategyAction.CLOSE_ALL,
pair=context.positions[0].pair,
confidence=0.9,
)
return None
Best Practices¶
1. Confidence Thresholds¶
Set appropriate confidence thresholds:
entry_rule = ModelEntryRule(
model=model,
min_confidence=0.6, # Only act on confident entries
)
exit_rule = ModelExitRule(
model=model,
min_confidence=0.7, # Higher threshold for exits
)
2. Position Size Scaling¶
Use size_multiplier to scale by confidence:
StrategyDecision(
action=StrategyAction.ENTER,
pair="BTCUSDT",
size_multiplier=min(confidence, 1.5), # Cap at 1.5x
confidence=confidence,
)
3. Risk Management in Model¶
Check portfolio state before trading:
def decide(self, context: ModelContext) -> list[StrategyDecision]:
# Skip during high drawdown
if context.metrics.get("max_drawdown", 0) > 0.15:
return []
# Limit position count
if len(context.positions) >= 5:
return [] # Or only return exit decisions
# Your trading logic...
4. Meta in Decisions¶
Include debugging info in meta:
StrategyDecision(
action=StrategyAction.ENTER,
pair="BTCUSDT",
confidence=0.85,
meta={
"model_version": "v2.1",
"signal_type": "rise",
"features": {"rsi": 35, "trend": "up"},
},
)
See Also¶
- API Reference: Detailed component documentation
- Advanced Strategies: Position sizing and entry filters
- Quick Start: Basic strategy setup