Skip to content

Strategy Module

The strategy module provides components for backtesting and live trading execution.


Architecture

flowchart LR
    A[Signals] --> B[SignalAggregator]
    B --> C[EntryFilter]
    C --> D[PositionSizer]
    D --> E[Entry Rules]
    E --> F[Broker]
    F --> G[Exit Rules]

    style B fill:#16a34a,stroke:#22c55e,color:#fff
    style C fill:#ea580c,stroke:#f97316,color:#fff
    style D fill:#2563eb,stroke:#3b82f6,color:#fff

Execution

signalflow.strategy.runner.backtest_runner.BacktestRunner dataclass

BacktestRunner(strategy_id: str = 'backtest', broker: Any = None, entry_rules: list[EntryRule] = list(), exit_rules: list = list(), metrics: list[StrategyMetric] = list(), initial_capital: float = 10000.0, pair_col: str = 'pair', ts_col: str = 'timestamp', price_col: str = 'close', data_key: str = 'spot', show_progress: bool = True, progress_callback: Callable[[int, int, dict[str, Any]], None] | None = None, progress_interval: int = 500, cancel_event: Event | None = None)

Bases: StrategyRunner

cancel_event class-attribute instance-attribute

cancel_event: Event | None = None

Set externally to request graceful cancellation.

progress_callback class-attribute instance-attribute

progress_callback: Callable[[int, int, dict[str, Any]], None] | None = None

Called periodically: (current_bar, total_bars, latest_metrics).

progress_interval class-attribute instance-attribute

progress_interval: int = 500

Call progress_callback every N bars (default: 500).


Entry Rules

signalflow.strategy.component.entry.signal.SignalEntryRule dataclass

SignalEntryRule(signal_type_map: dict[str, str] | None = None, source_detector: str | None = None, position_sizer: PositionSizer | None = None, entry_filters: list[EntryFilter] | EntryFilter | None = None, base_position_size: float = 100.0, use_probability_sizing: bool = True, min_probability: float = 0.5, max_positions_per_pair: int = 1, max_total_positions: int = 20, allow_shorts: bool = False, max_capital_usage: float = 0.95, min_order_notional: float = 10.0, pair_col: str = 'pair', ts_col: str = 'timestamp', _composite_filter: CompositeEntryFilter | None = None)

Bases: EntryRule

Signal-based entry rule with injectable sizer and filters.

Converts signals to entry orders with configurable position sizing and pre-trade filtering.

Parameters:

Name Type Description Default
position_sizer PositionSizer | None

Optional PositionSizer for custom sizing logic.

None
entry_filters list[EntryFilter] | EntryFilter | None

Optional list of EntryFilters for pre-trade validation.

None
base_position_size float

Base notional value (used if no sizer provided).

100.0
use_probability_sizing bool

Scale size by probability (legacy mode).

True
min_probability float

Minimum signal probability.

0.5
max_positions_per_pair int

Maximum concurrent positions per pair.

1
max_total_positions int

Maximum total open positions.

20
allow_shorts bool

Allow FALL signals to create short positions.

False
max_capital_usage float

Maximum fraction of equity in positions.

0.95
min_order_notional float

Minimum order size.

10.0
pair_col str

Column name for pair in signals.

'pair'
ts_col str

Column name for timestamp in signals.

'timestamp'
Example

With custom sizer and filters

entry = SignalEntryRule( ... position_sizer=FixedFractionSizer(fraction=0.02), ... entry_filters=[ ... DrawdownFilter(max_drawdown=0.10), ... PriceDistanceFilter(min_distance_pct=0.02), ... ], ... max_positions_per_pair=5, # For grid strategy ... )

Legacy mode (backward compatible)

entry = SignalEntryRule( ... base_position_size=100.0, ... use_probability_sizing=True, ... )

__post_init__

__post_init__() -> None

Normalize filters to composite.

Source code in src/signalflow/strategy/component/entry/signal.py
def __post_init__(self) -> None:
    """Normalize filters to composite."""
    if self.entry_filters is not None:
        # Import here to avoid circular import
        from signalflow.strategy.component.entry.filters import (
            CompositeEntryFilter,
            EntryFilter,
        )

        if isinstance(self.entry_filters, EntryFilter):
            self._composite_filter = CompositeEntryFilter(filters=[self.entry_filters])
        elif isinstance(self.entry_filters, list):
            self._composite_filter = CompositeEntryFilter(filters=self.entry_filters)

check_entries

check_entries(signals: Signals, prices: dict[str, float], state: StrategyState) -> list[Order]

Check signals and generate entry orders.

Source code in src/signalflow/strategy/component/entry/signal.py
def check_entries(self, signals: Signals, prices: dict[str, float], state: StrategyState) -> list[Order]:
    """Check signals and generate entry orders."""
    orders: list[Order] = []

    # Cross-referencing: use signals from specific detector if configured
    if self.source_detector:
        named = state.runtime.get("_named_signals", {})
        if self.source_detector in named:
            signals = named[self.source_detector]
        else:
            return orders

    if signals is None or signals.value.height == 0:
        return orders

    # Build positions map
    positions_by_pair: dict[str, list[Position | None]] = {}
    for pos in state.portfolio.open_positions():
        positions_by_pair.setdefault(pos.pair, []).append(pos)

    total_open = len(state.portfolio.open_positions())
    if total_open >= self.max_total_positions:
        return orders

    # Calculate capital limits
    available_cash = state.portfolio.cash
    used_capital = sum(pos.entry_price * pos.qty for pos in state.portfolio.open_positions())
    total_equity = available_cash + used_capital
    max_allowed_in_positions = total_equity * self.max_capital_usage
    remaining_allocation = max_allowed_in_positions - used_capital

    # Filter signals
    df = self._filter_signals(signals.value)

    for row in df.iter_rows(named=True):
        if total_open >= self.max_total_positions:
            break
        if remaining_allocation <= self.min_order_notional:
            break
        if available_cash <= self.min_order_notional:
            break

        pair = row[self.pair_col]
        signal_type = row["signal_type"]
        probability = row.get("probability", 1.0) or 1.0

        # Check position limits
        existing_positions = positions_by_pair.get(pair, [])
        if len(existing_positions) >= self.max_positions_per_pair:
            continue

        price = prices.get(pair)
        if price is None or price <= 0:
            continue

        # Determine side
        side = self._determine_side(signal_type)
        if side is None:
            continue

        # Build signal context
        signal_ctx = SignalContext(
            pair=pair,
            signal_type=signal_type,
            probability=probability,
            price=price,
            timestamp=row.get(self.ts_col),
            meta=dict(row),
        )

        # === Apply filters ===
        if self._composite_filter is not None:
            allowed, _reason = self._composite_filter.allow_entry(signal_ctx, state, prices)
            if not allowed:
                continue

        # === Compute size ===
        if self.position_sizer is not None:
            notional = self.position_sizer.compute_size(signal_ctx, state, prices)
        else:
            # Legacy sizing logic
            notional = self._compute_legacy_size(probability)

        # Apply capital constraints
        notional = min(notional, available_cash * 0.99)
        notional = min(notional, remaining_allocation)

        if notional < self.min_order_notional:
            continue

        qty = notional / price

        order_meta = {
            "signal_type": signal_type,
            "signal_probability": probability,
            "signal_ts": row.get(self.ts_col),
            "requested_notional": notional,
            "sizer_used": (self.position_sizer.__class__.__name__ if self.position_sizer else "legacy"),
        }
        if self.source_detector:
            order_meta["source_detector"] = self.source_detector

        order = Order(
            pair=pair,
            side=cast(Literal["BUY", "SELL"], side),
            order_type="MARKET",
            qty=qty,
            signal_strength=probability,
            meta=order_meta,
        )
        orders.append(order)

        # Update tracking
        total_open += 1
        available_cash -= notional * 1.002
        remaining_allocation -= notional
        positions_by_pair.setdefault(pair, []).append(None)

    return orders

from_directional_map classmethod

from_directional_map(**kwargs: Any) -> SignalEntryRule

Create entry rule using the global DIRECTIONAL_SIGNAL_MAP.

Example

entry = SignalEntryRule.from_directional_map(base_position_size=200.0)

Source code in src/signalflow/strategy/component/entry/signal.py
@classmethod
def from_directional_map(cls, **kwargs: Any) -> SignalEntryRule:
    """Create entry rule using the global DIRECTIONAL_SIGNAL_MAP.

    Example:
        >>> entry = SignalEntryRule.from_directional_map(base_position_size=200.0)
    """
    return cls(signal_type_map=dict(DIRECTIONAL_SIGNAL_MAP), **kwargs)

signalflow.strategy.component.entry.fixed_size.FixedSizeEntryRule dataclass

FixedSizeEntryRule(signal_type_map: dict[str, str] | None = None, position_size: float = 0.01, signal_types: list[str] = (lambda: [SignalType.RISE.value])(), max_positions: int = 10, pair_col: str = 'pair')

Bases: EntryRule

Simple entry rule with fixed position size.

Attributes:

Name Type Description
signal_type_map dict[str, str] | None

Mapping signal_type -> "BUY"/"SELL". When set, overrides signal_types for filtering and side determination. None = legacy behavior using signal_types list.

signal_types list[str]

Legacy list of actionable signal types (used when signal_type_map is None).

from_directional_map classmethod

from_directional_map(**kwargs: Any) -> FixedSizeEntryRule

Create entry rule using the global DIRECTIONAL_SIGNAL_MAP.

Source code in src/signalflow/strategy/component/entry/fixed_size.py
@classmethod
def from_directional_map(cls, **kwargs: Any) -> FixedSizeEntryRule:
    """Create entry rule using the global DIRECTIONAL_SIGNAL_MAP."""
    return cls(signal_type_map=dict(DIRECTIONAL_SIGNAL_MAP), **kwargs)

Exit Rules

signalflow.strategy.component.exit.tp_sl.TakeProfitStopLossExit dataclass

TakeProfitStopLossExit(take_profit_pct: float = 0.02, stop_loss_pct: float = 0.01, use_position_levels: bool = False)

Bases: ExitRule

Exit rule based on take-profit and stop-loss levels.

Can use fixed percentages or dynamic levels from position meta.


Position Sizing

Position sizers compute the notional value (in quote currency) for trades based on signal strength, portfolio state, and market conditions.

Base Classes

signalflow.strategy.component.sizing.base.SignalContext dataclass

SignalContext(pair: str, signal_type: str, probability: float, price: float, timestamp: Any = None, meta: dict[str, Any] = dict())

Context for a single signal being sized.

Provides all relevant information about a signal for sizing decisions.

Attributes:

Name Type Description
pair str

Trading pair (e.g., "BTCUSDT").

signal_type str

Signal direction ("rise", "fall", "none").

probability float

Signal confidence [0, 1].

price float

Current market price.

timestamp Any

Signal timestamp.

meta dict[str, Any]

Additional signal metadata from detector.

signalflow.strategy.component.sizing.base.PositionSizer dataclass

PositionSizer()

Bases: ABC

Base class for position sizing strategies.

Computes the notional value (in quote currency) for a trade based on signal strength, portfolio state, and market conditions.

Design principles
  • Sizers compute NOTIONAL value, not quantity
  • Quantity = notional / price (computed by entry rule)
  • Sizers should be stateless where possible
  • Historical data accessed via state.runtime or state.metrics
Example

sizer = FixedFractionSizer(fraction=0.02) notional = sizer.compute_size(signal_ctx, state, prices) qty = notional / prices[signal_ctx.pair]

compute_size abstractmethod

compute_size(signal: SignalContext, state: StrategyState, prices: dict[str, float]) -> float

Compute position size (notional value) for a signal.

Parameters:

Name Type Description Default
signal SignalContext

Context about the signal being sized.

required
state StrategyState

Current strategy state (portfolio, metrics, runtime).

required
prices dict[str, float]

Current prices for all pairs.

required

Returns:

Type Description
float

Notional value in quote currency (e.g., USDT).

float

Return 0.0 to skip this signal.

Source code in src/signalflow/strategy/component/sizing/base.py
@abstractmethod
def compute_size(
    self,
    signal: SignalContext,
    state: StrategyState,
    prices: dict[str, float],
) -> float:
    """Compute position size (notional value) for a signal.

    Args:
        signal: Context about the signal being sized.
        state: Current strategy state (portfolio, metrics, runtime).
        prices: Current prices for all pairs.

    Returns:
        Notional value in quote currency (e.g., USDT).
        Return 0.0 to skip this signal.
    """
    ...

Available Sizers

FixedFractionSizer

Allocate a fixed percentage of equity per trade.

signalflow.strategy.component.sizing.fixed_fraction.FixedFractionSizer dataclass

FixedFractionSizer(fraction: float = 0.02, min_notional: float = 10.0, max_notional: float = float('inf'))

Bases: PositionSizer

Fixed percentage of equity per trade.

Classic position sizing: risk a fixed fraction of current equity. Simple and consistent regardless of signal strength or volatility.

Parameters:

Name Type Description Default
fraction float

Fraction of equity to allocate (e.g., 0.02 = 2%).

0.02
min_notional float

Minimum trade size (skip if below).

10.0
max_notional float

Maximum trade size cap.

float('inf')
Example

sizer = FixedFractionSizer(fraction=0.02) # 2% per trade

With $10,000 equity: notional = $200

SignalStrengthSizer

Scale position size by signal probability.

signalflow.strategy.component.sizing.signal_strength.SignalStrengthSizer dataclass

SignalStrengthSizer(base_size: float = 100.0, min_probability: float = 0.5, scale_factor: float = 1.0, min_notional: float = 10.0, max_notional: float = float('inf'))

Bases: PositionSizer

Size proportional to signal probability/strength.

Higher confidence signals get larger positions. Essentially the current SignalEntryRule behavior extracted.

Parameters:

Name Type Description Default
base_size float

Base notional value.

100.0
min_probability float

Skip signals below this threshold.

0.5
scale_factor float

Multiplier for probability-based scaling.

1.0
min_notional float

Minimum trade size.

10.0
max_notional float

Maximum trade size.

float('inf')
Example

sizer = SignalStrengthSizer(base_size=100.0)

Signal with probability=0.8 -> notional = 80

Signal with probability=0.5 -> notional = 50

KellyCriterionSizer

Optimal sizing using the Kelly Criterion formula.

signalflow.strategy.component.sizing.kelly.KellyCriterionSizer dataclass

KellyCriterionSizer(kelly_fraction: float = 0.5, min_trades_for_stats: int = 30, default_win_rate: float = 0.5, default_payoff_ratio: float = 1.0, use_signal_probability: bool = True, min_notional: float = 10.0, max_fraction: float = 0.25)

Bases: PositionSizer

Kelly Criterion position sizing.

Formula: f* = (p * b - q) / b Where: p = win probability (from signal or historical) q = 1 - p (loss probability) b = win/loss ratio (payoff ratio)

Half-Kelly (kelly_fraction=0.5) is recommended for practical use to reduce volatility while capturing most of the edge.

Parameters:

Name Type Description Default
kelly_fraction float

Fraction of Kelly to use (0.5 = half-Kelly recommended).

0.5
min_trades_for_stats int

Minimum closed trades before using historical stats.

30
default_win_rate float

Fallback win rate if insufficient history.

0.5
default_payoff_ratio float

Fallback payoff ratio if insufficient history.

1.0
use_signal_probability bool

Use signal.probability as win rate proxy.

True
min_notional float

Minimum trade size.

10.0
max_fraction float

Maximum fraction of equity (safety cap).

0.25
Example

sizer = KellyCriterionSizer(kelly_fraction=0.5) # Half-Kelly

With 60% win rate and 1.5:1 payoff ratio:

Full Kelly f* = (0.6 * 1.5 - 0.4) / 1.5 = 0.333

Half Kelly = 0.167 = 16.7% of equity

VolatilityTargetSizer

Size positions to achieve target volatility contribution.

signalflow.strategy.component.sizing.volatility_target.VolatilityTargetSizer dataclass

VolatilityTargetSizer(target_volatility: float = 0.01, volatility_source: str = 'atr', default_volatility_pct: float = 0.02, min_notional: float = 10.0, max_fraction: float = 0.2)

Bases: PositionSizer

Target specific portfolio volatility per position.

Sizes positions to contribute equal volatility to the portfolio. Smaller positions in volatile assets, larger in stable ones.

Formula: notional = (target_vol * equity) / asset_vol_pct

Parameters:

Name Type Description Default
target_volatility float

Target contribution to portfolio vol (e.g., 0.01 = 1%).

0.01
volatility_source str

Key in state.runtime for ATR/volatility data.

'atr'
default_volatility_pct float

Default volatility if ATR not available.

0.02
min_notional float

Minimum trade size.

10.0
max_fraction float

Maximum fraction of equity per position.

0.2
Example

sizer = VolatilityTargetSizer(target_volatility=0.01)

Asset with 2% daily vol -> 50% of target allocation

Asset with 0.5% daily vol -> 200% of target allocation (capped)

RiskParitySizer

Equal risk contribution across positions.

signalflow.strategy.component.sizing.risk_parity.RiskParitySizer dataclass

RiskParitySizer(target_positions: int = 10, volatility_source: str = 'atr', default_volatility_pct: float = 0.02, min_notional: float = 10.0)

Bases: PositionSizer

Equal risk contribution across all positions.

Allocates capital so each position contributes equally to portfolio risk, accounting for existing positions and their volatilities.

Parameters:

Name Type Description Default
target_positions int

Target number of equal-risk positions.

10
volatility_source str

Key in state.runtime for volatility data.

'atr'
default_volatility_pct float

Default volatility if not available.

0.02
min_notional float

Minimum trade size.

10.0
Example

sizer = RiskParitySizer(target_positions=10)

Each position should contribute 10% of total risk budget

High-vol assets get smaller notional allocation

MartingaleSizer

Grid/DCA strategy with increasing position sizes.

signalflow.strategy.component.sizing.martingale.MartingaleSizer dataclass

MartingaleSizer(base_size: float = 100.0, multiplier: float = 1.5, max_grid_levels: int = 5, max_notional: float = float('inf'), min_notional: float = 10.0)

Bases: PositionSizer

Martingale position sizing for grid strategies.

Increases position size with each grid level filled. Useful for DCA (Dollar Cost Averaging) and grid trading strategies.

Formula: notional = base_size * (multiplier ^ grid_level)

Where grid_level = number of existing open positions in the same pair.

Parameters:

Name Type Description Default
base_size float

Initial position size for first grid level.

100.0
multiplier float

Size multiplier per level (e.g., 1.5 = 50% increase).

1.5
max_grid_levels int

Maximum number of grid levels to fill.

5
max_notional float

Maximum position size cap.

float('inf')
min_notional float

Minimum trade size.

10.0
Example

sizer = MartingaleSizer(base_size=100, multiplier=1.5)

Level 0: $100

Level 1: $150

Level 2: $225

Level 3: $337.50

Warning

Martingale can lead to large losses in trending markets. Always use with appropriate risk limits and max_grid_levels.

Usage Example

from signalflow.strategy.component.sizing import (
    FixedFractionSizer,
    KellyCriterionSizer,
    VolatilityTargetSizer,
    SignalContext,
)
from signalflow.core import StrategyState

# Create test state
state = StrategyState(strategy_id="demo")
state.portfolio.cash = 10_000.0
prices = {"BTCUSDT": 50000.0}

# Signal context
signal = SignalContext(
    pair="BTCUSDT",
    signal_type="rise",
    probability=0.75,
    price=50000.0,
)

# Fixed 2% of equity per trade
sizer = FixedFractionSizer(fraction=0.02)
notional = sizer.compute_size(signal, state, prices)  # $200

# Half-Kelly sizing
kelly = KellyCriterionSizer(kelly_fraction=0.5, default_win_rate=0.55)
notional = kelly.compute_size(signal, state, prices)

# Volatility targeting (requires ATR in state.runtime)
state.runtime["atr"] = {"BTCUSDT": 1000.0}  # 2% ATR
vol_sizer = VolatilityTargetSizer(target_volatility=0.01)
notional = vol_sizer.compute_size(signal, state, prices)

Entry Filters

Entry filters provide pre-trade validation to improve signal quality. All filters return (allowed: bool, reason: str) tuples.

Base Classes

signalflow.strategy.component.entry.filters.EntryFilter dataclass

EntryFilter()

Bases: ABC

Base class for entry filters.

Filters determine whether a signal should be acted upon. All filters must pass (AND logic) for entry to proceed.

Design principles
  • Filters are binary (allow/reject)
  • Should provide rejection reason for debugging
  • Can be composed via CompositeEntryFilter

allow_entry abstractmethod

allow_entry(signal: SignalContext, state: StrategyState, prices: dict[str, float]) -> tuple[bool, str]

Check if entry is allowed.

Parameters:

Name Type Description Default
signal SignalContext

Signal context.

required
state StrategyState

Strategy state.

required
prices dict[str, float]

Current prices.

required

Returns:

Type Description
bool

Tuple of (allowed, reason).

str

reason is empty string if allowed, else rejection reason.

Source code in src/signalflow/strategy/component/entry/filters.py
@abstractmethod
def allow_entry(
    self,
    signal: SignalContext,
    state: StrategyState,
    prices: dict[str, float],
) -> tuple[bool, str]:
    """Check if entry is allowed.

    Args:
        signal: Signal context.
        state: Strategy state.
        prices: Current prices.

    Returns:
        Tuple of (allowed, reason).
        reason is empty string if allowed, else rejection reason.
    """
    ...

signalflow.strategy.component.entry.filters.CompositeEntryFilter dataclass

CompositeEntryFilter(filters: list[EntryFilter] = list(), require_all: bool = True)

Bases: EntryFilter

Combines multiple entry filters.

Parameters:

Name Type Description Default
filters list[EntryFilter]

List of filters to apply.

list()
require_all bool

If True (default), all must pass. If False, any can pass.

True
Example

composite = CompositeEntryFilter( ... filters=[DrawdownFilter(max_drawdown=0.10), VolatilityFilter()], ... require_all=True ... )

Available Filters

RegimeFilter

Only enter when market regime matches signal direction.

signalflow.strategy.component.entry.filters.RegimeFilter dataclass

RegimeFilter(signal_regime_map: dict[str, str] | None = None, regime_key: str = 'regime', allowed_regimes_bullish: list[str] = (lambda: ['trend_up', 'mean_reversion_oversold'])(), allowed_regimes_bearish: list[str] = (lambda: ['trend_down', 'mean_reversion_overbought'])())

Bases: EntryFilter

Filter entries based on market regime.

Only allow entries when market regime matches signal type: - Bullish signals in trend-up or mean-reversion-oversold regimes - Bearish signals in trend-down or mean-reversion-overbought regimes

Regime detected via state.runtime["regime"][pair] or global regime.

Parameters:

Name Type Description Default
signal_regime_map dict[str, str] | None

Mapping signal_type -> "bullish"/"bearish". When set, overrides legacy "rise"/"fall" hardcoding. None = legacy behavior (only "rise" and "fall" are regime-checked).

None
regime_key str

Key in state.runtime for regime data.

'regime'
allowed_regimes_bullish list[str]

Regimes allowing bullish entries.

(lambda: ['trend_up', 'mean_reversion_oversold'])()
allowed_regimes_bearish list[str]

Regimes allowing bearish entries.

(lambda: ['trend_down', 'mean_reversion_overbought'])()

VolatilityFilter

Skip entries in extreme volatility conditions.

signalflow.strategy.component.entry.filters.VolatilityFilter dataclass

VolatilityFilter(volatility_key: str = 'atr', min_volatility: float = 0.0, max_volatility: float = float('inf'), use_relative: bool = True)

Bases: EntryFilter

Skip entries in extreme volatility conditions.

Parameters:

Name Type Description Default
volatility_key str

Key in state.runtime for volatility data (default: "atr").

'atr'
min_volatility float

Minimum relative volatility to allow entry.

0.0
max_volatility float

Maximum relative volatility to allow entry.

float('inf')
use_relative bool

If True, compare vol/price ratio instead of absolute.

True

DrawdownFilter

Pause trading after significant drawdown.

signalflow.strategy.component.entry.filters.DrawdownFilter dataclass

DrawdownFilter(max_drawdown: float = 0.1, recovery_threshold: float = 0.05, drawdown_key: str = 'current_drawdown', _paused: bool = False)

Bases: EntryFilter

Pause trading after significant drawdown.

Parameters:

Name Type Description Default
max_drawdown float

Maximum drawdown before pausing (e.g., 0.10 = 10%).

0.1
recovery_threshold float

Resume when drawdown reduces to this level.

0.05
drawdown_key str

Key in state.metrics for current drawdown.

'current_drawdown'

CorrelationFilter

Avoid concentrated positions in correlated assets.

signalflow.strategy.component.entry.filters.CorrelationFilter dataclass

CorrelationFilter(correlation_key: str = 'correlations', max_correlation: float = 0.7, max_correlated_positions: int = 2)

Bases: EntryFilter

Avoid concentrated positions in correlated assets.

Rejects entry if already holding highly correlated assets.

Parameters:

Name Type Description Default
correlation_key str

Key in state.runtime for correlation matrix.

'correlations'
max_correlation float

Maximum allowed correlation with existing positions.

0.7
max_correlated_positions int

Max positions in correlated group.

2

TimeOfDayFilter

Restrict trading to specific hours.

signalflow.strategy.component.entry.filters.TimeOfDayFilter dataclass

TimeOfDayFilter(allowed_hours: list[int] | None = None, blocked_hours: list[int] | None = None)

Bases: EntryFilter

Restrict trading to specific hours.

Parameters:

Name Type Description Default
allowed_hours list[int] | None

List of hours (0-23) when trading is allowed.

None
blocked_hours list[int] | None

List of hours (0-23) when trading is blocked.

None

Note: If both are None, all hours are allowed.

PriceDistanceFilter

Filter entries based on price distance from existing positions.

signalflow.strategy.component.entry.filters.PriceDistanceFilter dataclass

PriceDistanceFilter(signal_direction_map: dict[str, str] | None = None, min_distance_pct: float = 0.02, direction_aware: bool = True)

Bases: EntryFilter

Filter entries based on price distance from existing positions.

For grid strategies: prevents buying when price is too close to existing positions in the same pair.

Parameters:

Name Type Description Default
signal_direction_map dict[str, str] | None

Mapping signal_type -> "long"/"short". When set, overrides legacy "rise"/"fall" hardcoding. None = legacy behavior (only "rise" and "fall" are direction-aware).

None
min_distance_pct float

Minimum price difference as percentage (e.g., 0.02 = 2%).

0.02
direction_aware bool

If True, check distance based on position direction. - LONG: new entry must be below existing entry by min_distance_pct - SHORT: new entry must be above existing entry by min_distance_pct If False, check absolute distance in either direction.

True
Example

Grid strategy: only buy when price drops 2% from last position

filter = PriceDistanceFilter(min_distance_pct=0.02, direction_aware=True)

SignalAccuracyFilter

Pause trading when signal accuracy drops below threshold.

signalflow.strategy.component.entry.filters.SignalAccuracyFilter dataclass

SignalAccuracyFilter(accuracy_key: str = 'signal_accuracy', min_accuracy: float = 0.45, min_samples: int = 20, window_key: str | None = None)

Bases: EntryFilter

Filter based on real-time signal accuracy metrics.

Tracks detector/model accuracy and pauses trading when accuracy drops. Useful for detecting model degradation or regime changes.

Parameters:

Name Type Description Default
accuracy_key str

Key in state.runtime for accuracy data.

'signal_accuracy'
min_accuracy float

Minimum required accuracy to allow entry.

0.45
min_samples int

Minimum samples before applying filter.

20
window_key str | None

Optional key for accuracy over recent window only.

None
Example

Pause if recent signal accuracy drops below 45%

filter = SignalAccuracyFilter(min_accuracy=0.45, min_samples=20)

Usage Example

from signalflow.strategy.component.entry import (
    CompositeEntryFilter,
    DrawdownFilter,
    RegimeFilter,
    VolatilityFilter,
    TimeOfDayFilter,
)

# Combine multiple filters (AND logic)
composite = CompositeEntryFilter(
    filters=[
        DrawdownFilter(max_drawdown=0.10, recovery_threshold=0.05),
        RegimeFilter(),
        VolatilityFilter(max_volatility=0.03),
        TimeOfDayFilter(blocked_hours=[0, 1, 2, 3]),  # Skip overnight
    ],
    require_all=True,  # All must pass
)

allowed, reason = composite.allow_entry(signal, state, prices)
if not allowed:
    print(f"Entry rejected: {reason}")

Signal Aggregation

Combine signals from multiple detectors using voting or weighting logic.

VotingMode

signalflow.strategy.component.entry.aggregation.VotingMode

Bases: str, Enum

Signal aggregation voting modes.

Mode Description
MAJORITY Most common signal type wins (requires min_agreement)
WEIGHTED Weighted average of probabilities
UNANIMOUS All detectors must agree
ANY Any non-NONE signal passes (highest probability wins)
META_LABELING Detector direction × validator probability

SignalAggregator

signalflow.strategy.component.entry.aggregation.SignalAggregator dataclass

SignalAggregator(voting_mode: VotingMode = VotingMode.MAJORITY, min_agreement: float = 0.5, weights: list[float] | None = None, probability_threshold: float = 0.5, pair_col: str = 'pair', ts_col: str = 'timestamp')

Combine signals from multiple detectors.

Aggregates multiple Signals DataFrames into one based on voting/weighting logic.

Parameters:

Name Type Description Default
voting_mode VotingMode

How to combine signals (see VotingMode).

MAJORITY
min_agreement float

Minimum fraction of detectors agreeing (for MAJORITY).

0.5
weights list[float] | None

Optional weights per detector (for WEIGHTED mode).

None
probability_threshold float

Minimum combined probability to emit signal.

0.5
pair_col str

Column name for pair.

'pair'
ts_col str

Column name for timestamp.

'timestamp'
Example

Majority voting

aggregator = SignalAggregator(voting_mode=VotingMode.MAJORITY) combined = aggregator.aggregate([signals1, signals2, signals3])

Meta-labeling: detector direction * validator confidence

aggregator = SignalAggregator(voting_mode=VotingMode.META_LABELING) combined = aggregator.aggregate([detector_signals, validator_signals])

__call__

__call__(signals_list: list[Signals], detector_names: list[str] | None = None) -> Signals

Alias for aggregate().

Source code in src/signalflow/strategy/component/entry/aggregation.py
def __call__(
    self,
    signals_list: list[Signals],
    detector_names: list[str] | None = None,
) -> Signals:
    """Alias for aggregate()."""
    return self.aggregate(signals_list, detector_names)

aggregate

aggregate(signals_list: list[Signals], detector_names: list[str] | None = None) -> Signals

Aggregate multiple signal sources into one.

Parameters:

Name Type Description Default
signals_list list[Signals]

List of Signals from different detectors.

required
detector_names list[str] | None

Optional names for tracing (len must match signals_list).

None

Returns:

Type Description
Signals

Aggregated Signals DataFrame.

Source code in src/signalflow/strategy/component/entry/aggregation.py
def aggregate(
    self,
    signals_list: list[Signals],
    detector_names: list[str] | None = None,
) -> Signals:
    """Aggregate multiple signal sources into one.

    Args:
        signals_list: List of Signals from different detectors.
        detector_names: Optional names for tracing (len must match signals_list).

    Returns:
        Aggregated Signals DataFrame.
    """
    if not signals_list:
        return Signals(pl.DataFrame())

    if len(signals_list) == 1:
        return signals_list[0]

    if self.voting_mode == VotingMode.MAJORITY:
        return self._aggregate_majority(signals_list)
    elif self.voting_mode == VotingMode.WEIGHTED:
        return self._aggregate_weighted(signals_list)
    elif self.voting_mode == VotingMode.UNANIMOUS:
        return self._aggregate_unanimous(signals_list)
    elif self.voting_mode == VotingMode.ANY:
        return self._aggregate_any(signals_list)
    elif self.voting_mode == VotingMode.META_LABELING:
        return self._aggregate_meta_labeling(signals_list)
    else:
        raise ValueError(f"Unknown voting mode: {self.voting_mode}")

Usage Examples

from signalflow.strategy.component.entry import SignalAggregator, VotingMode

# Majority voting: signal needs >50% agreement
aggregator = SignalAggregator(
    voting_mode=VotingMode.MAJORITY,
    min_agreement=0.5,
)
combined = aggregator.aggregate([signals_1, signals_2, signals_3])

# Weighted voting with custom weights
aggregator = SignalAggregator(
    voting_mode=VotingMode.WEIGHTED,
    weights=[2.0, 1.0, 1.0],  # First detector weighted 2x
    probability_threshold=0.6,
)
combined = aggregator.aggregate([primary_signals, secondary_1, secondary_2])

# Meta-labeling: detector direction × validator confidence
aggregator = SignalAggregator(
    voting_mode=VotingMode.META_LABELING,
    probability_threshold=0.5,
)
combined = aggregator.aggregate([detector_signals, validator_signals])
# Combined probability = detector_prob * validator_prob

# Unanimous: all must agree for high-conviction trades
aggregator = SignalAggregator(
    voting_mode=VotingMode.UNANIMOUS,
    probability_threshold=0.7,
)
combined = aggregator.aggregate([detector_1, detector_2, detector_3])

Integration with SignalEntryRule

Position sizers and entry filters can be injected into SignalEntryRule:

from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.component.entry import (
    SignalEntryRule,
    CompositeEntryFilter,
    DrawdownFilter,
    RegimeFilter,
)
from signalflow.strategy.component.sizing import KellyCriterionSizer
from signalflow.strategy.component.exit import TakeProfitStopLossExit
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor

# Create advanced entry rule
entry_rule = SignalEntryRule(
    position_sizer=KellyCriterionSizer(kelly_fraction=0.5),
    entry_filters=CompositeEntryFilter(
        filters=[
            DrawdownFilter(max_drawdown=0.10),
            RegimeFilter(),
        ],
    ),
)

# Run backtest
runner = BacktestRunner(
    strategy_id="advanced_strategy",
    broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
    entry_rules=[entry_rule],
    exit_rules=[TakeProfitStopLossExit(take_profit_pct=0.02, stop_loss_pct=0.01)],
    initial_capital=10_000.0,
)

state = runner.run(raw_data=raw_data, signals=signals)

Grid Trading Example

Combine MartingaleSizer with PriceDistanceFilter for grid strategies:

from signalflow.strategy.component.entry import (
    SignalEntryRule,
    PriceDistanceFilter,
)
from signalflow.strategy.component.sizing import MartingaleSizer

# Grid strategy: buy more as price drops
entry_rule = SignalEntryRule(
    position_sizer=MartingaleSizer(
        base_size=100.0,      # Start with $100
        multiplier=1.5,       # Increase 50% per level
        max_grid_levels=5,    # Max 5 levels
    ),
    entry_filters=PriceDistanceFilter(
        min_distance_pct=0.02,  # 2% price drop between levels
        direction_aware=True,
    ),
    max_positions_per_pair=5,
)

Data Sources

Components access data through StrategyState:

Component Data Source Key
VolatilityTargetSizer ATR values state.runtime["atr"]
RiskParitySizer ATR values state.runtime["atr"]
DrawdownFilter Current drawdown state.metrics["current_drawdown"]
VolatilityFilter ATR values state.runtime["atr"]
RegimeFilter Market regime state.runtime["regime"]
CorrelationFilter Correlation matrix state.runtime["correlations"]
SignalAccuracyFilter Accuracy metrics state.runtime["signal_accuracy"]

Populate these during backtest:

# Example: populate runtime data before entry rule
def on_bar_hook(state, timestamp, prices):
    state.runtime["atr"] = calculate_atr(prices)
    state.runtime["regime"] = detect_regime(prices)

Full Backtest Example

Complete example using signal aggregation, entry filters, and position sizing:

from datetime import datetime
from pathlib import Path

import polars as pl
from signalflow.data.raw_store import DuckDbRawStore
from signalflow.data.source import VirtualDataProvider
from signalflow.data import RawDataFactory
from signalflow.detector import ExampleSmaCrossDetector
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.component.entry import (
    SignalEntryRule,
    SignalAggregator,
    VotingMode,
    CompositeEntryFilter,
    DrawdownFilter,
    TimeOfDayFilter,
)
from signalflow.strategy.component.sizing import VolatilityTargetSizer
from signalflow.strategy.component.exit import TakeProfitStopLossExit

# 1. Generate synthetic data
PAIRS = ["BTCUSDT", "ETHUSDT"]
START = datetime(2025, 1, 1)

spot_store = DuckDbRawStore(db_path=Path("backtest.duckdb"), timeframe="1m")
provider = VirtualDataProvider(store=spot_store, seed=42)
provider.download(pairs=PAIRS, n_bars=5000, start=START)

# 2. Load data
raw_data = RawDataFactory.from_duckdb_spot_store(
    spot_store_path=Path("backtest.duckdb"),
    pairs=PAIRS,
    start=START,
    end=datetime(2025, 1, 4),
)

# 3. Create multiple detectors
detector_fast = ExampleSmaCrossDetector(fast_period=10, slow_period=30)
detector_slow = ExampleSmaCrossDetector(fast_period=20, slow_period=50)

signals_fast = detector_fast.run(raw_data.view())
signals_slow = detector_slow.run(raw_data.view())

# 4. Aggregate signals (unanimous agreement)
aggregator = SignalAggregator(
    voting_mode=VotingMode.UNANIMOUS,
    probability_threshold=0.0,
)
signals = aggregator.aggregate([signals_fast, signals_slow])

# 5. Configure entry rule with sizer and filters
entry_rule = SignalEntryRule(
    position_sizer=VolatilityTargetSizer(
        target_volatility=0.015,
        default_volatility_pct=0.02,
        max_fraction=0.15,
    ),
    entry_filters=CompositeEntryFilter(
        filters=[
            DrawdownFilter(max_drawdown=0.10),
            TimeOfDayFilter(allowed_hours=list(range(6, 22))),
        ],
    ),
    max_positions_per_pair=1,
    max_total_positions=5,
)

# 6. Run backtest
runner = BacktestRunner(
    strategy_id="advanced_strategy",
    broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
    entry_rules=[entry_rule],
    exit_rules=[TakeProfitStopLossExit(take_profit_pct=0.02, stop_loss_pct=0.015)],
    initial_capital=10_000.0,
)

state = runner.run(raw_data, signals)
results = runner.get_results()

print(f"Total Return: {results.get('final_return', 0) * 100:.2f}%")
print(f"Max Drawdown: {results.get('max_drawdown', 0) * 100:.2f}%")
print(f"Win Rate: {results.get('win_rate', 0) * 100:.1f}%")

# Cleanup
spot_store.close()

External Model Integration

SignalFlow supports integration with external ML/RL models via a Protocol-based interface. Models make trading decisions (entry, exit, hold) based on signals and metrics.

Architecture

flowchart TB
    A[Signals + Metrics] --> B[ModelContext]
    B --> C[StrategyModel.decide]
    C --> D[list of StrategyDecision]
    D --> E{Action Type}
    E -->|ENTER| F[ModelEntryRule]
    E -->|CLOSE/CLOSE_ALL| G[ModelExitRule]
    F --> H[Entry Orders]
    G --> I[Exit Orders]

    style C fill:#7c3aed,stroke:#8b5cf6,color:#fff
    style D fill:#059669,stroke:#10b981,color:#fff

Design Principle: Strategy models see signals and metrics only, NOT raw OHLCV prices.

StrategyAction

signalflow.strategy.model.decision.StrategyAction

Bases: StrEnum

Actions a strategy model can take.

Values

ENTER: Open new position for pair (uses size_multiplier). SKIP: Skip this signal (do not enter). CLOSE: Close specific position (requires position_id). CLOSE_ALL: Close all positions for a pair. HOLD: Do nothing (no action).

Action Description
ENTER Open new position (uses size_multiplier)
SKIP Skip this signal
CLOSE Close specific position (requires position_id)
CLOSE_ALL Close all positions for a pair
HOLD Do nothing

StrategyDecision

signalflow.strategy.model.decision.StrategyDecision dataclass

StrategyDecision(action: StrategyAction, pair: str, position_id: str | None = None, size_multiplier: float = 1.0, confidence: float = 1.0, meta: dict[str, Any] = dict())

Model output for a single trading decision.

Represents one decision from the model about whether to enter, exit, or hold positions. Multiple decisions can be returned per bar.

Attributes:

Name Type Description
action StrategyAction

The action to take (ENTER, SKIP, CLOSE, CLOSE_ALL, HOLD).

pair str

Trading pair this decision applies to.

position_id str | None

For CLOSE action - specific position to close.

size_multiplier float

For ENTER action - multiplier on base position size (default 1.0).

confidence float

Model confidence in this decision (0-1).

meta dict[str, Any]

Additional metadata (e.g., reason, model_name).

Example

Enter decision

decision = StrategyDecision( ... action=StrategyAction.ENTER, ... pair="BTCUSDT", ... size_multiplier=1.5, ... confidence=0.85, ... meta={"signal_type": "rise", "model": "rf_v2"} ... )

Close specific position

decision = StrategyDecision( ... action=StrategyAction.CLOSE, ... pair="BTCUSDT", ... position_id="pos_abc123", ... confidence=0.92, ... meta={"reason": "model_exit"} ... )

Raises:

Type Description
ValueError

If CLOSE action is missing position_id.

ValueError

If ENTER action has non-positive size_multiplier.

__post_init__

__post_init__() -> None

Validate decision parameters.

Source code in src/signalflow/strategy/model/decision.py
def __post_init__(self) -> None:
    """Validate decision parameters."""
    if self.action == StrategyAction.CLOSE and self.position_id is None:
        raise ValueError("CLOSE action requires position_id")
    if self.action == StrategyAction.ENTER and self.size_multiplier <= 0:
        raise ValueError("size_multiplier must be positive for ENTER action")

ModelContext

signalflow.strategy.model.context.ModelContext dataclass

ModelContext(timestamp: datetime, signals: Signals, prices: dict[str, float] = dict(), positions: list[Position] = list(), metrics: dict[str, float] = dict(), runtime: dict[str, Any] = dict())

Aggregated context passed to strategy models.

Provides all information a model needs to make decisions: - Current bar signals - Strategy metrics (equity, drawdown, etc.) - Current positions and their states - Runtime state (for custom indicators, cooldowns, etc.)

Attributes:

Name Type Description
timestamp datetime

Current bar timestamp.

signals Signals

Current bar signals (Signals container).

prices dict[str, float]

Current prices per pair.

positions list[Position]

List of open positions.

metrics dict[str, float]

Current strategy metrics snapshot.

runtime dict[str, Any]

Runtime state dict (cooldowns, custom state).

Example

Model receives context each bar

def decide(self, context: ModelContext) -> list[StrategyDecision]: ... # Access signals ... for row in context.signals.value.iter_rows(named=True): ... pair = row["pair"] ... signal_type = row["signal_type"] ... probability = row.get("probability", 0.5) ... ... # Use metrics for risk management ... if context.metrics.get("max_drawdown", 0) > 0.15: ... continue # Skip during high drawdown ... ... # Check existing positions ... pair_positions = [p for p in context.positions if p.pair == pair] ... ...

StrategyModel Protocol

signalflow.strategy.model.protocol.StrategyModel

Bases: Protocol

Protocol for external strategy models.

External models must implement this protocol to integrate with SignalFlow. The model receives context (signals, metrics, positions) and returns a list of trading decisions.

Implementation Notes
  • Models are called ONCE per bar (not per signal).
  • Return empty list for "no action".
  • Multiple decisions per bar are allowed.
  • Model should be stateless (use context.runtime for state).
Example Implementation

class MyRLModel: ... '''Reinforcement learning model for trading.''' ... ... def init(self, model_path: str): ... self.model = load_model(model_path) ... ... def decide(self, context: ModelContext) -> list[StrategyDecision]: ... decisions = [] ... ... # Skip if drawdown too high ... if context.metrics.get("max_drawdown", 0) > 0.2: ... return decisions ... ... # Process each signal ... for row in context.signals.value.iter_rows(named=True): ... pair = row["pair"] ... prob = row.get("probability", 0.5) ... ... features = self._build_features(row, context.metrics) ... action, confidence = self.model.predict(features) ... ... if action == "enter" and confidence > 0.6: ... decisions.append(StrategyDecision( ... action=StrategyAction.ENTER, ... pair=pair, ... size_multiplier=min(confidence, 1.5), ... confidence=confidence, ... )) ... ... # Check if should close any positions ... for pos in context.positions: ... if self._should_close(pos, context): ... decisions.append(StrategyDecision( ... action=StrategyAction.CLOSE, ... pair=pos.pair, ... position_id=pos.id, ... )) ... ... return decisions

decide

decide(context: ModelContext) -> list[StrategyDecision]

Make trading decisions based on current context.

Parameters:

Name Type Description Default
context ModelContext

Current bar context with signals, metrics, positions.

required

Returns:

Type Description
list[StrategyDecision]

List of trading decisions (can be empty).

Source code in src/signalflow/strategy/model/protocol.py
def decide(self, context: ModelContext) -> list[StrategyDecision]:
    """Make trading decisions based on current context.

    Args:
        context: Current bar context with signals, metrics, positions.

    Returns:
        List of trading decisions (can be empty).
    """
    ...

ModelEntryRule

signalflow.strategy.model.rules.ModelEntryRule dataclass

ModelEntryRule(signal_type_map: dict[str, str] | None = None, model: StrategyModel = None, base_position_size: float = 0.01, max_positions: int = 10, min_confidence: float = 0.5, allow_shorts: bool = False, pair_col: str = 'pair')

Bases: EntryRule

Entry rule that delegates to an external model.

The model is called once per bar. Its decisions are cached in state.runtime so that ModelExitRule can access exit decisions without calling the model twice.

Attributes:

Name Type Description
model StrategyModel

External model implementing StrategyModel protocol.

base_position_size float

Base position size (multiplied by decision.size_multiplier).

max_positions int

Maximum concurrent positions.

min_confidence float

Minimum confidence to act on ENTER decisions.

allow_shorts bool

Allow FALL signals to create short positions.

pair_col str

Column name for pair in signals.

Example

from signalflow.strategy.model import ModelEntryRule, ModelExitRule

model = MyRLModel("model.pt") entry_rule = ModelEntryRule( ... model=model, ... base_position_size=0.01, ... max_positions=5, ... min_confidence=0.6, ... ) exit_rule = ModelExitRule(model=model)

runner = BacktestRunner( ... entry_rules=[entry_rule], ... exit_rules=[exit_rule], ... ... ... )

check_entries

check_entries(signals: Signals, prices: dict[str, float], state: StrategyState) -> list[Order]

Generate entry orders from model decisions.

Source code in src/signalflow/strategy/model/rules.py
def check_entries(
    self,
    signals: Signals,
    prices: dict[str, float],
    state: StrategyState,
) -> list[Order]:
    """Generate entry orders from model decisions."""
    orders: list[Order] = []

    if signals is None or signals.value.height == 0:
        return orders

    if self.model is None:
        return orders

    # Get or compute decisions
    decisions = _get_cached_decisions(state)
    if decisions is None:
        context = _build_model_context(signals, prices, state)
        decisions = self.model.decide(context)
        _cache_decisions(state, decisions)

    # Filter to ENTER decisions
    enter_decisions = [
        d for d in decisions if d.action == StrategyAction.ENTER and d.confidence >= self.min_confidence
    ]

    open_count = len(state.portfolio.open_positions())

    for decision in enter_decisions:
        if open_count >= self.max_positions:
            break

        price = prices.get(decision.pair)
        if price is None or price <= 0:
            continue

        # Calculate position size
        qty = self.base_position_size * decision.size_multiplier

        # Determine side from signal (lookup in signals)
        side: OrderSide = self._get_side_from_signals(signals, decision.pair) or "BUY"

        order = Order(
            pair=decision.pair,
            side=side,
            order_type="MARKET",
            qty=qty,
            signal_strength=decision.confidence,
            meta={
                "decision_action": decision.action.value,
                "model_confidence": decision.confidence,
                "size_multiplier": decision.size_multiplier,
                **decision.meta,
            },
        )
        orders.append(order)
        open_count += 1

    return orders

ModelExitRule

signalflow.strategy.model.rules.ModelExitRule dataclass

ModelExitRule(model: StrategyModel = None, min_confidence: float = 0.5)

Bases: ExitRule

Exit rule that uses cached model decisions.

NOTE: If decisions are not cached yet (exit runs before entry), this rule will call the model and cache the results.

Attributes:

Name Type Description
model StrategyModel

External model implementing StrategyModel protocol.

min_confidence float

Minimum confidence to act on CLOSE/CLOSE_ALL decisions.

Example

exit_rule = ModelExitRule( ... model=model, ... min_confidence=0.7, # Higher threshold for exits ... )

check_exits

check_exits(positions: list[Position], prices: dict[str, float], state: StrategyState) -> list[Order]

Generate exit orders from model decisions.

Source code in src/signalflow/strategy/model/rules.py
def check_exits(
    self,
    positions: list[Position],
    prices: dict[str, float],
    state: StrategyState,
) -> list[Order]:
    """Generate exit orders from model decisions."""
    orders: list[Order] = []

    if not positions:
        return orders

    if self.model is None:
        return orders

    # Get cached decisions or compute them
    # (ExitRule runs BEFORE EntryRule in BacktestRunner)
    decisions = _get_cached_decisions(state)
    if decisions is None:
        # Get signals from runtime (stored by runner)
        signals = state.runtime.get(BAR_SIGNALS_KEY, Signals(pl.DataFrame()))
        context = _build_model_context(signals, prices, state)
        decisions = self.model.decide(context)
        _cache_decisions(state, decisions)

    # Build position lookup
    positions_by_id = {p.id: p for p in positions if not p.is_closed}
    positions_by_pair: dict[str, list[Position]] = {}
    for p in positions:
        if not p.is_closed:
            positions_by_pair.setdefault(p.pair, []).append(p)

    # Track which positions we've already added exit orders for
    exited_position_ids: set[str] = set()

    # Process CLOSE and CLOSE_ALL decisions
    for decision in decisions:
        if decision.confidence < self.min_confidence:
            continue

        if decision.action == StrategyAction.CLOSE:
            # Close specific position
            if decision.position_id is None:
                continue
            if decision.position_id in exited_position_ids:
                continue

            position = positions_by_id.get(decision.position_id)
            if position and not position.is_closed:
                order = self._create_exit_order(position, prices, decision)
                if order:
                    orders.append(order)
                    exited_position_ids.add(position.id)

        elif decision.action == StrategyAction.CLOSE_ALL:
            # Close all positions for pair
            pair_positions = positions_by_pair.get(decision.pair, [])
            for position in pair_positions:
                if position.id in exited_position_ids:
                    continue
                if position.is_closed:
                    continue

                order = self._create_exit_order(position, prices, decision)
                if order:
                    orders.append(order)
                    exited_position_ids.add(position.id)

    return orders

Model Integration Example

from signalflow.strategy.model import (
    StrategyModel,
    StrategyAction,
    StrategyDecision,
    ModelContext,
    ModelEntryRule,
    ModelExitRule,
)
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
from signalflow.strategy.component.exit import TakeProfitStopLossExit


# 1. Implement the StrategyModel protocol
class MyRLModel:
    """Example RL model for trading decisions."""

    def __init__(self, model_path: str):
        # Load your trained model
        self.model = self._load_model(model_path)

    def decide(self, context: ModelContext) -> list[StrategyDecision]:
        decisions = []

        # Risk management: skip during high drawdown
        if context.metrics.get("max_drawdown", 0) > 0.15:
            return decisions

        # Process each signal
        for row in context.signals.value.iter_rows(named=True):
            pair = row["pair"]
            prob = row.get("probability", 0.5)

            # Get model prediction
            features = self._build_features(row, context.metrics)
            action, confidence = self.model.predict(features)

            if action == "enter" and confidence > 0.6:
                decisions.append(StrategyDecision(
                    action=StrategyAction.ENTER,
                    pair=pair,
                    size_multiplier=min(confidence, 1.5),
                    confidence=confidence,
                    meta={"model": "rl_v1"},
                ))

        # Check if should close any positions
        for pos in context.positions:
            if self._should_close(pos, context):
                decisions.append(StrategyDecision(
                    action=StrategyAction.CLOSE,
                    pair=pos.pair,
                    position_id=pos.id,
                    confidence=0.9,
                    meta={"reason": "model_exit"},
                ))

        return decisions


# 2. Create rules with the model
model = MyRLModel("model.pt")

entry_rule = ModelEntryRule(
    model=model,
    base_position_size=0.02,  # 2% base size
    max_positions=5,
    min_confidence=0.6,
)

exit_rule = ModelExitRule(
    model=model,
    min_confidence=0.7,  # Higher threshold for exits
)

# 3. Run backtest
runner = BacktestRunner(
    strategy_id="model_strategy",
    broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
    entry_rules=[entry_rule],
    exit_rules=[
        exit_rule,
        TakeProfitStopLossExit(take_profit_pct=0.03, stop_loss_pct=0.02),
    ],
    initial_capital=10_000.0,
)

state = runner.run(raw_data, signals)

Data Export

Export backtest results for external ML model training.

BacktestExporter

signalflow.strategy.exporter.parquet_exporter.BacktestExporter dataclass

BacktestExporter(pair_col: str = 'pair', ts_col: str = 'timestamp')

Export backtest results for external ML training.

Exports per-bar data (signals + metrics) and per-trade data to Parquet format. Does NOT include raw OHLCV prices - only signals and derived metrics.

Output Files
  • {output_path}/bars.parquet: Per-bar signals and metrics
  • {output_path}/trades.parquet: Entry/exit trade pairs with outcomes

Attributes:

Name Type Description
pair_col str

Column name for pair in signals.

ts_col str

Column name for timestamp in signals.

Example

exporter = BacktestExporter()

During backtest (can be integrated with runner)

for ts in timestamps: ... # ... process bar ... ... exporter.export_bar(ts, signals, state.metrics, state) ... ... for trade in bar_trades: ... exporter.export_trade(trade_data)

After backtest

exporter.finalize(Path("./training_data"))

Load for training

bars = pl.read_parquet("./training_data/bars.parquet") trades = pl.read_parquet("./training_data/trades.parquet")

bar_count property

bar_count: int

Number of bar records collected.

trade_count property

trade_count: int

Number of trade records collected.

export_bar

export_bar(timestamp: datetime, signals: Signals, metrics: dict[str, float], state: StrategyState) -> None

Record bar data for export.

Records
  • Timestamp
  • All signals for this bar (flattened)
  • All metrics values
  • Position summary (count, total exposure)

Parameters:

Name Type Description Default
timestamp datetime

Current bar timestamp.

required
signals Signals

Current bar signals.

required
metrics dict[str, float]

Current strategy metrics.

required
state StrategyState

Current strategy state.

required
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
def export_bar(
    self,
    timestamp: datetime,
    signals: Signals,
    metrics: dict[str, float],
    state: StrategyState,
) -> None:
    """Record bar data for export.

    Records:
        - Timestamp
        - All signals for this bar (flattened)
        - All metrics values
        - Position summary (count, total exposure)

    Args:
        timestamp: Current bar timestamp.
        signals: Current bar signals.
        metrics: Current strategy metrics.
        state: Current strategy state.
    """
    if signals is None or signals.value.height == 0:
        # Still record metrics even without signals
        record = {
            self.ts_col: timestamp,
            **{f"metric_{k}": v for k, v in metrics.items() if k != "timestamp"},
            "open_position_count": len(state.portfolio.open_positions()),
        }
        self._bar_records.append(record)
        return

    # For each signal, create a record with metrics
    for row in signals.value.iter_rows(named=True):
        record = {
            self.ts_col: timestamp,
            self.pair_col: row.get(self.pair_col, ""),
            "signal_type": row.get("signal_type", ""),
            "signal": row.get("signal", 0),
            "probability": row.get("probability", 0.0),
            **{f"metric_{k}": v for k, v in metrics.items() if k != "timestamp"},
            "open_position_count": len(state.portfolio.open_positions()),
        }
        self._bar_records.append(record)

export_position_close

export_position_close(position: Position, exit_time: datetime, exit_price: float, exit_reason: str = 'unknown') -> None

Convenience method to export when a position closes.

Parameters:

Name Type Description Default
position Position

The position being closed.

required
exit_time datetime

Time of exit.

required
exit_price float

Price at exit.

required
exit_reason str

Reason for exit (e.g., "take_profit", "stop_loss", "model_exit").

'unknown'
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
def export_position_close(
    self,
    position: Position,
    exit_time: datetime,
    exit_price: float,
    exit_reason: str = "unknown",
) -> None:
    """Convenience method to export when a position closes.

    Args:
        position: The position being closed.
        exit_time: Time of exit.
        exit_price: Price at exit.
        exit_reason: Reason for exit (e.g., "take_profit", "stop_loss", "model_exit").
    """
    entry_meta = position.meta or {}

    trade_data = {
        "position_id": position.id,
        "pair": position.pair,
        "position_type": position.position_type.value,
        "entry_time": position.entry_time,
        "entry_price": position.entry_price,
        "exit_time": exit_time,
        "exit_price": exit_price,
        "qty": position.qty,
        "realized_pnl": position.realized_pnl,
        "total_pnl": position.total_pnl,
        "fees_paid": position.fees_paid,
        "signal_strength": position.signal_strength,
        "exit_reason": exit_reason,
        "entry_signal_type": entry_meta.get("signal_type", ""),
        "model_confidence": entry_meta.get("model_confidence", 0.0),
    }
    self._trade_records.append(trade_data)

export_trade

export_trade(trade_data: dict[str, Any]) -> None

Record completed trade for export.

Parameters:

Name Type Description Default
trade_data dict[str, Any]

Dictionary containing trade information. Expected keys: - position_id - pair - entry_time, entry_price - exit_time, exit_price (if closed) - realized_pnl - hold_duration_bars - entry_signal_type, entry_confidence - exit_reason

required
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
def export_trade(
    self,
    trade_data: dict[str, Any],
) -> None:
    """Record completed trade for export.

    Args:
        trade_data: Dictionary containing trade information.
            Expected keys:
            - position_id
            - pair
            - entry_time, entry_price
            - exit_time, exit_price (if closed)
            - realized_pnl
            - hold_duration_bars
            - entry_signal_type, entry_confidence
            - exit_reason
    """
    self._trade_records.append(trade_data)

finalize

finalize(output_path: Path) -> None

Write all data to Parquet files.

Parameters:

Name Type Description Default
output_path Path

Directory to write output files. Creates directory if it doesn't exist.

required
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
def finalize(self, output_path: Path) -> None:
    """Write all data to Parquet files.

    Args:
        output_path: Directory to write output files.
            Creates directory if it doesn't exist.
    """
    output_path = Path(output_path)
    output_path.mkdir(parents=True, exist_ok=True)

    # Export bars
    if self._bar_records:
        bars_df = pl.DataFrame(self._bar_records)
        bars_df.write_parquet(output_path / "bars.parquet")

    # Export trades
    if self._trade_records:
        trades_df = pl.DataFrame(self._trade_records)
        trades_df.write_parquet(output_path / "trades.parquet")

reset

reset() -> None

Clear all recorded data.

Source code in src/signalflow/strategy/exporter/parquet_exporter.py
def reset(self) -> None:
    """Clear all recorded data."""
    self._bar_records.clear()
    self._trade_records.clear()

Export Format

bars.parquet - Per-bar signals and metrics:

Column Description
timestamp Bar timestamp
pair Trading pair
signal_type Signal type (e.g. rise, fall, local_max)
probability Signal probability
metric_equity Portfolio equity
metric_max_drawdown Max drawdown
open_position_count Open positions count

trades.parquet - Completed trades:

Column Description
position_id Position ID
pair Trading pair
entry_time, exit_time Trade timestamps
entry_price, exit_price Prices
realized_pnl Realized profit/loss
exit_reason Why trade closed
model_confidence Model confidence at entry

Export Example

from pathlib import Path
from signalflow.strategy.exporter import BacktestExporter

# Create exporter
exporter = BacktestExporter()

# Option 1: Manual export during custom backtest loop
for ts in timestamps:
    # ... process bar ...
    exporter.export_bar(ts, signals, state.metrics, state)

# Export when positions close
for closed_position in newly_closed:
    exporter.export_position_close(
        position=closed_position,
        exit_time=ts,
        exit_price=prices[closed_position.pair],
        exit_reason="take_profit",
    )

# Write to disk
exporter.finalize(Path("./training_data"))

# Option 2: Load for training
import polars as pl

bars_df = pl.read_parquet("./training_data/bars.parquet")
trades_df = pl.read_parquet("./training_data/trades.parquet")

# Prepare features for ML training
features = bars_df.select([
    "timestamp", "pair", "signal_type", "probability",
    "metric_equity", "metric_max_drawdown",
])

See Also