Skip to content

External Model Integration

This guide covers two distinct ways of bringing models into SignalFlow:

  1. Forecast models — pinned, versioned prediction artefacts (e.g. a reversion-probability model) that a flow registers with .forecast(...) and that detectors/validators consume as one more input. Covered first below.
  2. Decision models — an ML/RL model that implements the StrategyModel protocol and directly emits StrategyDecision objects (ENTER/CLOSE/...). Covered from Decision Models onward.

Forecast Models

A forecast is the continuous output of a model (e.g. p_revert, an expected return) — not a trade recommendation. A detector turns forecasts into discrete signals; a validator can read them when deciding whether to keep a signal. Forecast models are trained elsewhere and arrive as versioned, reproducible artefacts; the trading flow never trains them, it only references and consumes them.

Registering a pinned artefact: .forecast(...)

FlowBuilder.forecast() records a pinned ModelRef under a local name. No weights are loaded here — this is purely declarative; weights resolve later through the registry when the flow runs.

def forecast(
    self,
    name: str,
    *,
    mlflow: str | None = None,
    hf_path: str | None = None,
    version: str | int | None = None,
    source: str | None = None,
) -> Self: ...
Argument Meaning
name Local name consumers reference via forecasts=[name].
mlflow MLflow model URI, e.g. "models:/revert/3" (version embedded in the URI).
hf_path HuggingFace Hub path (source="hf"); requires an explicit version=.
version Explicit version when not embedded in the URI. "latest" is rejected unless SF_ALLOW_LATEST=1.
source Override artefact source ("mlflow" | "hf"); inferred otherwise.

Registering the same name twice, omitting both mlflow= and hf_path=, or a version= that conflicts with the version inside an mlflow= URI all raise ConfigurationError.

Consuming a forecast: forecasts= + forecast_window=

.detector(), .validator() and .exit() each accept two keyword arguments:

  • forecasts: list[str] — names of registered forecasts this consumer reads. The consumer sees a window of forecast values [t-w, t], not just the latest point.
  • forecast_window: int — window length in bars. It is required whenever forecasts is given, and must be positive.

This is the warmup-silence contract: by fixing the window in bars (rather than "however much accumulated"), backtest and live cold-start cut the identical slice, so parity holds. Passing forecast_window without forecasts, or forecasts without a positive forecast_window, raises ConfigurationError.

Why the version is mandatory (parity)

A ModelRef requires an explicit version. A floating version="latest" silently breaks parity and reproducibility between training and live inference, so it is rejected unless SF_ALLOW_LATEST=1 is set in the environment (dev opt-in only). Pinning the version is what guarantees that the exact same model artefact is used in research and in production.

Lazy loading: Resolver and ModelRegistry

Registration is cheap and offline. When the flow runs, a ModelRegistry (typically CachingModelRegistry wrapping an MlflowResolver) resolves each ModelRef to actual weights — and caches them so a ref is loaded at most once. Importing signalflow.models does not require mlflow; the dependency is imported only at resolve time.

from signalflow.models import ModelRef, MlflowResolver, CachingModelRegistry

registry = CachingModelRegistry(MlflowResolver())
model = registry.get(ModelRef.parse("models:/revert/3"))   # weights load here, then cached

Worked example

import signalflow as sf

flow = (
    sf.flow()
    .data(store="binance", pair="BTC/USDT")
    # 1. Register a pinned forecast artefact (lazy — no weights loaded yet)
    .forecast("revert", mlflow="models:/revert/3")
    # 2. A detector reads a 30-bar window of that forecast
    .detector("example/sma_cross", forecasts=["revert"], forecast_window=30)
    # 3. A validator may read the same forecast over its own window
    .validator("validator/lightgbm", forecasts=["revert"], forecast_window=60)
)

# Forecast references are validated against registered .forecast() artefacts at run time:
result = flow.run()

A forecast may be referenced before it is registered (the existence check runs lazily at .run()), but every referenced name must ultimately be declared via .forecast(), otherwise .run() raises ConfigurationError.

Features live inside the artefact

The former .features() builder method on flow was removed. A forecast model carries its own feature recipe inside the artefact, and the train↔serve feature hash (see ModelFeaturesPipeline) guards against recipe drift. Detectors compute any primitive features they need internally.


Decision Models (StrategyModel)

The rest of this guide covers a different integration: an ML/RL model that makes trade decisions directly via the StrategyModel protocol (in contrast to a forecast, which only predicts and lets a detector decide).

SignalFlow supports this via a Protocol-based interface:

Backtest Bar
    |
    v
Build ModelContext (signals, metrics, positions)
    |
    v
model.decide(context) --> list[StrategyDecision]
    |
    +---> CLOSE/CLOSE_ALL --> ModelExitRule --> Exit Orders
    |
    +---> ENTER --> ModelEntryRule --> Entry Orders

Design Principle: Models receive signals and metrics, NOT raw OHLCV prices.


Quick Start

1. Implement the Protocol

from signalflow.strategy.model import (
    StrategyModel,
    StrategyAction,
    StrategyDecision,
    ModelContext,
)


class MyModel:
    """Your ML/RL model implementing StrategyModel protocol."""

    def decide(self, context: ModelContext) -> list[StrategyDecision]:
        decisions = []

        for row in context.signals.value.iter_rows(named=True):
            prob = row.get("probability", 0.5)

            if prob > 0.7:
                decisions.append(StrategyDecision(
                    action=StrategyAction.ENTER,
                    pair=row["pair"],
                    confidence=prob,
                ))

        return decisions

2. Create Rules

from signalflow.strategy.model import ModelEntryRule, ModelExitRule

model = MyModel()

entry_rule = ModelEntryRule(
    model=model,
    base_position_size=0.02,
    max_positions=5,
    min_confidence=0.6,
)

exit_rule = ModelExitRule(
    model=model,
    min_confidence=0.7,
)

3. Run Backtest

from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor

runner = BacktestRunner(
    strategy_id="model_strategy",
    broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
    entry_rules=[entry_rule],
    exit_rules=[exit_rule],
    initial_capital=10_000.0,
)

state = runner.run(raw_data, signals)

Decision Types

Models return StrategyDecision objects with these actions:

Action Description Required Fields
ENTER Open new position pair, optionally size_multiplier
SKIP Skip this signal pair
CLOSE Close specific position pair, position_id
CLOSE_ALL Close all positions for pair pair
HOLD Do nothing -

Entry Decision

StrategyDecision(
    action=StrategyAction.ENTER,
    pair="BTCUSDT",
    size_multiplier=1.5,  # 1.5x base position size
    confidence=0.85,
    meta={"reason": "high_confidence_signal"},
)

Exit Decision

StrategyDecision(
    action=StrategyAction.CLOSE,
    pair="BTCUSDT",
    position_id="pos_abc123",  # Required for CLOSE
    confidence=0.9,
    meta={"reason": "take_profit"},
)

Close All Positions

StrategyDecision(
    action=StrategyAction.CLOSE_ALL,
    pair="BTCUSDT",  # Close all BTC positions
    confidence=0.8,
    meta={"reason": "risk_off"},
)

ModelContext

The context passed to model.decide() contains:

Field Type Description
timestamp datetime Current bar timestamp
signals Signals Current bar signals (from detectors)
prices dict[str, float] Current prices per pair
positions list[Position] Open positions
metrics dict[str, float] Portfolio metrics (equity, drawdown, etc.)
runtime dict[str, Any] Custom state (regime, ATR, etc.)

Accessing Context Data

def decide(self, context: ModelContext) -> list[StrategyDecision]:
    # Check portfolio state
    equity = context.metrics.get("equity", 0)
    drawdown = context.metrics.get("max_drawdown", 0)

    # Risk check
    if drawdown > 0.15:
        return []  # No trading during high drawdown

    # Process signals
    for row in context.signals.value.iter_rows(named=True):
        pair = row["pair"]
        signal_type = row["signal_type"]
        prob = row["probability"]

        # Get current price
        price = context.prices.get(pair, 0)

        # Check existing positions
        pair_positions = [p for p in context.positions if p.pair == pair]

        # Your model logic here...

Model-Based Exit Management

Models can manage exits by analyzing open positions:

def decide(self, context: ModelContext) -> list[StrategyDecision]:
    decisions = []

    for pos in context.positions:
        price = context.prices.get(pos.pair, pos.entry_price)
        pnl_pct = (price - pos.entry_price) / pos.entry_price

        # Take profit
        if pnl_pct > 0.05:  # 5% profit
            decisions.append(StrategyDecision(
                action=StrategyAction.CLOSE,
                pair=pos.pair,
                position_id=pos.id,
                confidence=0.9,
                meta={"reason": "model_take_profit"},
            ))

        # Stop loss
        elif pnl_pct < -0.02:  # 2% loss
            decisions.append(StrategyDecision(
                action=StrategyAction.CLOSE,
                pair=pos.pair,
                position_id=pos.id,
                confidence=0.95,
                meta={"reason": "model_stop_loss"},
            ))

    return decisions

Combining with Traditional Rules

Model rules work alongside traditional rules:

from signalflow.strategy.component.exit import TakeProfitStopLossExit

runner = BacktestRunner(
    strategy_id="hybrid_strategy",
    broker=broker,
    entry_rules=[
        ModelEntryRule(model=model, base_position_size=0.02),
    ],
    exit_rules=[
        ModelExitRule(model=model),  # Model-based exits first
        TakeProfitStopLossExit(      # Fallback TP/SL
            take_profit_pct=0.05,
            stop_loss_pct=0.03,
        ),
    ],
    initial_capital=10_000.0,
)

Exit rules are processed in order - model exits run first, then traditional rules catch remaining positions.


Decision Caching

The model is called once per bar. Decisions are cached in state.runtime:

Bar Start
    |
    v
ExitRule.check_exits()
    ├── Check cache for decisions
    ├── If empty: call model.decide(), cache result
    └── Process CLOSE/CLOSE_ALL decisions
    |
    v
EntryRule.check_entries()
    ├── Check cache (uses cached decisions)
    └── Process ENTER decisions
    |
    v
state.reset_tick_cache() clears cache for next bar

This ensures consistent decisions across entry and exit processing.


Training Data Export

Export backtest results for model training:

from pathlib import Path
from signalflow.strategy.exporter import BacktestExporter

exporter = BacktestExporter()

# During backtest
for ts in timestamps:
    # ... process bar ...
    exporter.export_bar(ts, signals, state.metrics, state)

# When positions close
exporter.export_position_close(position, exit_time, exit_price, "take_profit")

# Write to disk
exporter.finalize(Path("./training_data"))

Exported Files

bars.parquet - Per-bar state:

import polars as pl

bars = pl.read_parquet("./training_data/bars.parquet")
# Columns: timestamp, pair, signal_type, probability,
#          metric_equity, metric_max_drawdown, ...

trades.parquet - Completed trades:

trades = pl.read_parquet("./training_data/trades.parquet")
# Columns: position_id, pair, entry_time, exit_time,
#          entry_price, exit_price, realized_pnl, exit_reason, ...


RL Model Example

Complete example with reinforcement learning patterns:

import numpy as np
from signalflow.strategy.model import (
    StrategyAction,
    StrategyDecision,
    ModelContext,
)


class RLTradingModel:
    """RL model for trading decisions."""

    def __init__(self, model_path: str, epsilon: float = 0.0):
        self.model = self._load_model(model_path)
        self.epsilon = epsilon  # Exploration rate

    def decide(self, context: ModelContext) -> list[StrategyDecision]:
        decisions = []

        # Build state features
        features = self._build_features(context)

        # Get Q-values from model
        q_values = self.model.predict(features)

        # Epsilon-greedy action selection (for training)
        if np.random.random() < self.epsilon:
            action_idx = np.random.choice(len(q_values))
        else:
            action_idx = np.argmax(q_values)

        # Map to trading action
        action = self._map_action(action_idx, context)
        if action:
            decisions.append(action)

        return decisions

    def _build_features(self, context: ModelContext) -> np.ndarray:
        """Build feature vector from context."""
        features = []

        # Portfolio state
        features.append(context.metrics.get("equity", 10000) / 10000)
        features.append(context.metrics.get("max_drawdown", 0))

        # Position count
        features.append(len(context.positions) / 10)

        # Signal features
        for row in context.signals.value.head(5).iter_rows(named=True):
            features.append(row.get("probability", 0.5))
            features.append(1 if row.get("signal_type") == "rise" else -1)

        # Pad if fewer signals
        while len(features) < 15:
            features.append(0)

        return np.array(features[:15])

    def _map_action(
        self, action_idx: int, context: ModelContext
    ) -> StrategyDecision | None:
        """Map action index to StrategyDecision."""
        # 0: HOLD, 1: ENTER, 2: CLOSE_ALL
        if action_idx == 1 and context.signals.value.height > 0:
            row = context.signals.value.row(0, named=True)
            return StrategyDecision(
                action=StrategyAction.ENTER,
                pair=row["pair"],
                confidence=0.8,
            )
        elif action_idx == 2 and context.positions:
            return StrategyDecision(
                action=StrategyAction.CLOSE_ALL,
                pair=context.positions[0].pair,
                confidence=0.9,
            )
        return None

Best Practices

1. Confidence Thresholds

Set appropriate confidence thresholds:

entry_rule = ModelEntryRule(
    model=model,
    min_confidence=0.6,  # Only act on confident entries
)

exit_rule = ModelExitRule(
    model=model,
    min_confidence=0.7,  # Higher threshold for exits
)

2. Position Size Scaling

Use size_multiplier to scale by confidence:

StrategyDecision(
    action=StrategyAction.ENTER,
    pair="BTCUSDT",
    size_multiplier=min(confidence, 1.5),  # Cap at 1.5x
    confidence=confidence,
)

3. Risk Management in Model

Check portfolio state before trading:

def decide(self, context: ModelContext) -> list[StrategyDecision]:
    # Skip during high drawdown
    if context.metrics.get("max_drawdown", 0) > 0.15:
        return []

    # Limit position count
    if len(context.positions) >= 5:
        return []  # Or only return exit decisions

    # Your trading logic...

4. Meta in Decisions

Include debugging info in meta:

StrategyDecision(
    action=StrategyAction.ENTER,
    pair="BTCUSDT",
    confidence=0.85,
    meta={
        "model_version": "v2.1",
        "signal_type": "rise",
        "features": {"rsi": 35, "trend": "up"},
    },
)

See Also