External Model Integration¶
This guide covers integrating external ML/RL models with SignalFlow for automated trading decisions.
Overview¶
SignalFlow supports external model integration via a Protocol-based interface:
Backtest Bar
|
v
Build ModelContext (signals, metrics, positions)
|
v
model.decide(context) --> list[StrategyDecision]
|
+---> CLOSE/CLOSE_ALL --> ModelExitRule --> Exit Orders
|
+---> ENTER --> ModelEntryRule --> Entry Orders
Design Principle: Models receive signals and metrics, NOT raw OHLCV prices.
Quick Start¶
1. Implement the Protocol¶
from signalflow.strategy.model import (
StrategyModel,
StrategyAction,
StrategyDecision,
ModelContext,
)
class MyModel:
"""Your ML/RL model implementing StrategyModel protocol."""
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
for row in context.signals.value.iter_rows(named=True):
prob = row.get("probability", 0.5)
if prob > 0.7:
decisions.append(StrategyDecision(
action=StrategyAction.ENTER,
pair=row["pair"],
confidence=prob,
))
return decisions
2. Create Rules¶
from signalflow.strategy.model import ModelEntryRule, ModelExitRule
model = MyModel()
entry_rule = ModelEntryRule(
model=model,
base_position_size=0.02,
max_positions=5,
min_confidence=0.6,
)
exit_rule = ModelExitRule(
model=model,
min_confidence=0.7,
)
3. Run Backtest¶
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
runner = BacktestRunner(
strategy_id="model_strategy",
broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
entry_rules=[entry_rule],
exit_rules=[exit_rule],
initial_capital=10_000.0,
)
state = runner.run(raw_data, signals)
Decision Types¶
Models return StrategyDecision objects with these actions:
| Action | Description | Required Fields |
|---|---|---|
ENTER |
Open new position | pair, optionally size_multiplier |
SKIP |
Skip this signal | pair |
CLOSE |
Close specific position | pair, position_id |
CLOSE_ALL |
Close all positions for pair | pair |
HOLD |
Do nothing | - |
Entry Decision¶
StrategyDecision(
action=StrategyAction.ENTER,
pair="BTCUSDT",
size_multiplier=1.5, # 1.5x base position size
confidence=0.85,
meta={"reason": "high_confidence_signal"},
)
Exit Decision¶
StrategyDecision(
action=StrategyAction.CLOSE,
pair="BTCUSDT",
position_id="pos_abc123", # Required for CLOSE
confidence=0.9,
meta={"reason": "take_profit"},
)
Close All Positions¶
StrategyDecision(
action=StrategyAction.CLOSE_ALL,
pair="BTCUSDT", # Close all BTC positions
confidence=0.8,
meta={"reason": "risk_off"},
)
ModelContext¶
The context passed to model.decide() contains:
| Field | Type | Description |
|---|---|---|
timestamp |
datetime |
Current bar timestamp |
signals |
Signals |
Current bar signals (from detectors) |
prices |
dict[str, float] |
Current prices per pair |
positions |
list[Position] |
Open positions |
metrics |
dict[str, float] |
Portfolio metrics (equity, drawdown, etc.) |
runtime |
dict[str, Any] |
Custom state (regime, ATR, etc.) |
Accessing Context Data¶
def decide(self, context: ModelContext) -> list[StrategyDecision]:
# Check portfolio state
equity = context.metrics.get("equity", 0)
drawdown = context.metrics.get("max_drawdown", 0)
# Risk check
if drawdown > 0.15:
return [] # No trading during high drawdown
# Process signals
for row in context.signals.value.iter_rows(named=True):
pair = row["pair"]
signal_type = row["signal_type"]
prob = row["probability"]
# Get current price
price = context.prices.get(pair, 0)
# Check existing positions
pair_positions = [p for p in context.positions if p.pair == pair]
# Your model logic here...
Model-Based Exit Management¶
Models can manage exits by analyzing open positions:
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
for pos in context.positions:
price = context.prices.get(pos.pair, pos.entry_price)
pnl_pct = (price - pos.entry_price) / pos.entry_price
# Take profit
if pnl_pct > 0.05: # 5% profit
decisions.append(StrategyDecision(
action=StrategyAction.CLOSE,
pair=pos.pair,
position_id=pos.id,
confidence=0.9,
meta={"reason": "model_take_profit"},
))
# Stop loss
elif pnl_pct < -0.02: # 2% loss
decisions.append(StrategyDecision(
action=StrategyAction.CLOSE,
pair=pos.pair,
position_id=pos.id,
confidence=0.95,
meta={"reason": "model_stop_loss"},
))
return decisions
Combining with Traditional Rules¶
Model rules work alongside traditional rules:
from signalflow.strategy.component.exit import TakeProfitStopLossExit
runner = BacktestRunner(
strategy_id="hybrid_strategy",
broker=broker,
entry_rules=[
ModelEntryRule(model=model, base_position_size=0.02),
],
exit_rules=[
ModelExitRule(model=model), # Model-based exits first
TakeProfitStopLossExit( # Fallback TP/SL
take_profit_pct=0.05,
stop_loss_pct=0.03,
),
],
initial_capital=10_000.0,
)
Exit rules are processed in order - model exits run first, then traditional rules catch remaining positions.
Decision Caching¶
The model is called once per bar. Decisions are cached in state.runtime:
Bar Start
|
v
ExitRule.check_exits()
├── Check cache for decisions
├── If empty: call model.decide(), cache result
└── Process CLOSE/CLOSE_ALL decisions
|
v
EntryRule.check_entries()
├── Check cache (uses cached decisions)
└── Process ENTER decisions
|
v
state.reset_tick_cache() clears cache for next bar
This ensures consistent decisions across entry and exit processing.
Training Data Export¶
Export backtest results for model training:
from pathlib import Path
from signalflow.strategy.exporter import BacktestExporter
exporter = BacktestExporter()
# During backtest
for ts in timestamps:
# ... process bar ...
exporter.export_bar(ts, signals, state.metrics, state)
# When positions close
exporter.export_position_close(position, exit_time, exit_price, "take_profit")
# Write to disk
exporter.finalize(Path("./training_data"))
Exported Files¶
bars.parquet - Per-bar state:
import polars as pl
bars = pl.read_parquet("./training_data/bars.parquet")
# Columns: timestamp, pair, signal_type, probability,
# metric_equity, metric_max_drawdown, ...
trades.parquet - Completed trades:
trades = pl.read_parquet("./training_data/trades.parquet")
# Columns: position_id, pair, entry_time, exit_time,
# entry_price, exit_price, realized_pnl, exit_reason, ...
RL Model Example¶
Complete example with reinforcement learning patterns:
import numpy as np
from signalflow.strategy.model import (
StrategyAction,
StrategyDecision,
ModelContext,
)
class RLTradingModel:
"""RL model for trading decisions."""
def __init__(self, model_path: str, epsilon: float = 0.0):
self.model = self._load_model(model_path)
self.epsilon = epsilon # Exploration rate
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
# Build state features
features = self._build_features(context)
# Get Q-values from model
q_values = self.model.predict(features)
# Epsilon-greedy action selection (for training)
if np.random.random() < self.epsilon:
action_idx = np.random.choice(len(q_values))
else:
action_idx = np.argmax(q_values)
# Map to trading action
action = self._map_action(action_idx, context)
if action:
decisions.append(action)
return decisions
def _build_features(self, context: ModelContext) -> np.ndarray:
"""Build feature vector from context."""
features = []
# Portfolio state
features.append(context.metrics.get("equity", 10000) / 10000)
features.append(context.metrics.get("max_drawdown", 0))
# Position count
features.append(len(context.positions) / 10)
# Signal features
for row in context.signals.value.head(5).iter_rows(named=True):
features.append(row.get("probability", 0.5))
features.append(1 if row.get("signal_type") == "rise" else -1)
# Pad if fewer signals
while len(features) < 15:
features.append(0)
return np.array(features[:15])
def _map_action(
self, action_idx: int, context: ModelContext
) -> StrategyDecision | None:
"""Map action index to StrategyDecision."""
# 0: HOLD, 1: ENTER, 2: CLOSE_ALL
if action_idx == 1 and context.signals.value.height > 0:
row = context.signals.value.row(0, named=True)
return StrategyDecision(
action=StrategyAction.ENTER,
pair=row["pair"],
confidence=0.8,
)
elif action_idx == 2 and context.positions:
return StrategyDecision(
action=StrategyAction.CLOSE_ALL,
pair=context.positions[0].pair,
confidence=0.9,
)
return None
Best Practices¶
1. Confidence Thresholds¶
Set appropriate confidence thresholds:
entry_rule = ModelEntryRule(
model=model,
min_confidence=0.6, # Only act on confident entries
)
exit_rule = ModelExitRule(
model=model,
min_confidence=0.7, # Higher threshold for exits
)
2. Position Size Scaling¶
Use size_multiplier to scale by confidence:
StrategyDecision(
action=StrategyAction.ENTER,
pair="BTCUSDT",
size_multiplier=min(confidence, 1.5), # Cap at 1.5x
confidence=confidence,
)
3. Risk Management in Model¶
Check portfolio state before trading:
def decide(self, context: ModelContext) -> list[StrategyDecision]:
# Skip during high drawdown
if context.metrics.get("max_drawdown", 0) > 0.15:
return []
# Limit position count
if len(context.positions) >= 5:
return [] # Or only return exit decisions
# Your trading logic...
4. Meta in Decisions¶
Include debugging info in meta:
StrategyDecision(
action=StrategyAction.ENTER,
pair="BTCUSDT",
confidence=0.85,
meta={
"model_version": "v2.1",
"signal_type": "rise",
"features": {"rsi": 35, "trend": "up"},
},
)
See Also¶
- API Reference: Detailed component documentation
- Advanced Strategies: Position sizing and entry filters
- Quick Start: Basic strategy setup