Strategy Module¶
The strategy module provides components for backtesting and live trading execution.
Architecture¶
flowchart LR
A[Signals] --> B[SignalAggregator]
B --> C[EntryFilter]
C --> D[PositionSizer]
D --> E[Entry Rules]
E --> F[Broker]
F --> G[Exit Rules]
style B fill:#16a34a,stroke:#22c55e,color:#fff
style C fill:#ea580c,stroke:#f97316,color:#fff
style D fill:#2563eb,stroke:#3b82f6,color:#fff
Execution¶
signalflow.strategy.runner.backtest_runner.BacktestRunner
dataclass
¶
BacktestRunner(strategy_id: str = 'backtest', broker: Any = None, entry_rules: list[EntryRule] = list(), exit_rules: list = list(), metrics: list[StrategyMetric] = list(), initial_capital: float = 10000.0, pair_col: str = 'pair', ts_col: str = 'timestamp', price_col: str = 'close', data_key: str = 'spot', show_progress: bool = True, progress_callback: Callable[[int, int, dict[str, Any]], None] | None = None, progress_interval: int = 500, cancel_event: Event | None = None)
Bases: StrategyRunner
cancel_event
class-attribute
instance-attribute
¶
Set externally to request graceful cancellation.
progress_callback
class-attribute
instance-attribute
¶
Called periodically: (current_bar, total_bars, latest_metrics).
progress_interval
class-attribute
instance-attribute
¶
Call progress_callback every N bars (default: 500).
Entry Rules¶
signalflow.strategy.component.entry.signal.SignalEntryRule
dataclass
¶
SignalEntryRule(signal_type_map: dict[str, str] | None = None, source_detector: str | None = None, position_sizer: PositionSizer | None = None, entry_filters: list[EntryFilter] | EntryFilter | None = None, base_position_size: float = 100.0, use_probability_sizing: bool = True, min_probability: float = 0.5, max_positions_per_pair: int = 1, max_total_positions: int = 20, allow_shorts: bool = False, max_capital_usage: float = 0.95, min_order_notional: float = 10.0, pair_col: str = 'pair', ts_col: str = 'timestamp', _composite_filter: CompositeEntryFilter | None = None)
Bases: EntryRule
Signal-based entry rule with injectable sizer and filters.
Converts signals to entry orders with configurable position sizing and pre-trade filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_sizer
|
PositionSizer | None
|
Optional PositionSizer for custom sizing logic. |
None
|
entry_filters
|
list[EntryFilter] | EntryFilter | None
|
Optional list of EntryFilters for pre-trade validation. |
None
|
base_position_size
|
float
|
Base notional value (used if no sizer provided). |
100.0
|
use_probability_sizing
|
bool
|
Scale size by probability (legacy mode). |
True
|
min_probability
|
float
|
Minimum signal probability. |
0.5
|
max_positions_per_pair
|
int
|
Maximum concurrent positions per pair. |
1
|
max_total_positions
|
int
|
Maximum total open positions. |
20
|
allow_shorts
|
bool
|
Allow FALL signals to create short positions. |
False
|
max_capital_usage
|
float
|
Maximum fraction of equity in positions. |
0.95
|
min_order_notional
|
float
|
Minimum order size. |
10.0
|
pair_col
|
str
|
Column name for pair in signals. |
'pair'
|
ts_col
|
str
|
Column name for timestamp in signals. |
'timestamp'
|
Example
With custom sizer and filters¶
entry = SignalEntryRule( ... position_sizer=FixedFractionSizer(fraction=0.02), ... entry_filters=[ ... DrawdownFilter(max_drawdown=0.10), ... PriceDistanceFilter(min_distance_pct=0.02), ... ], ... max_positions_per_pair=5, # For grid strategy ... )
Legacy mode (backward compatible)¶
entry = SignalEntryRule( ... base_position_size=100.0, ... use_probability_sizing=True, ... )
__post_init__ ¶
Normalize filters to composite.
Source code in src/signalflow/strategy/component/entry/signal.py
check_entries ¶
Check signals and generate entry orders.
Source code in src/signalflow/strategy/component/entry/signal.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 | |
from_directional_map
classmethod
¶
Create entry rule using the global DIRECTIONAL_SIGNAL_MAP.
Example
entry = SignalEntryRule.from_directional_map(base_position_size=200.0)
Source code in src/signalflow/strategy/component/entry/signal.py
signalflow.strategy.component.entry.fixed_size.FixedSizeEntryRule
dataclass
¶
FixedSizeEntryRule(signal_type_map: dict[str, str] | None = None, position_size: float = 0.01, signal_types: list[str] = (lambda: [SignalType.RISE.value])(), max_positions: int = 10, pair_col: str = 'pair')
Bases: EntryRule
Simple entry rule with fixed position size.
Attributes:
| Name | Type | Description |
|---|---|---|
signal_type_map |
dict[str, str] | None
|
Mapping signal_type -> "BUY"/"SELL". When set,
overrides |
signal_types |
list[str]
|
Legacy list of actionable signal types (used when signal_type_map is None). |
from_directional_map
classmethod
¶
Create entry rule using the global DIRECTIONAL_SIGNAL_MAP.
Exit Rules¶
signalflow.strategy.component.exit.tp_sl.TakeProfitStopLossExit
dataclass
¶
TakeProfitStopLossExit(take_profit_pct: float = 0.02, stop_loss_pct: float = 0.01, use_position_levels: bool = False)
Bases: ExitRule
Exit rule based on take-profit and stop-loss levels.
Can use fixed percentages or dynamic levels from position meta.
Position Sizing¶
Position sizers compute the notional value (in quote currency) for trades based on signal strength, portfolio state, and market conditions.
Base Classes¶
signalflow.strategy.component.sizing.base.SignalContext
dataclass
¶
SignalContext(pair: str, signal_type: str, probability: float, price: float, timestamp: Any = None, meta: dict[str, Any] = dict())
Context for a single signal being sized.
Provides all relevant information about a signal for sizing decisions.
Attributes:
| Name | Type | Description |
|---|---|---|
pair |
str
|
Trading pair (e.g., "BTCUSDT"). |
signal_type |
str
|
Signal direction ("rise", "fall", "none"). |
probability |
float
|
Signal confidence [0, 1]. |
price |
float
|
Current market price. |
timestamp |
Any
|
Signal timestamp. |
meta |
dict[str, Any]
|
Additional signal metadata from detector. |
signalflow.strategy.component.sizing.base.PositionSizer
dataclass
¶
Bases: ABC
Base class for position sizing strategies.
Computes the notional value (in quote currency) for a trade based on signal strength, portfolio state, and market conditions.
Design principles
- Sizers compute NOTIONAL value, not quantity
- Quantity = notional / price (computed by entry rule)
- Sizers should be stateless where possible
- Historical data accessed via state.runtime or state.metrics
Example
sizer = FixedFractionSizer(fraction=0.02) notional = sizer.compute_size(signal_ctx, state, prices) qty = notional / prices[signal_ctx.pair]
compute_size
abstractmethod
¶
Compute position size (notional value) for a signal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signal
|
SignalContext
|
Context about the signal being sized. |
required |
state
|
StrategyState
|
Current strategy state (portfolio, metrics, runtime). |
required |
prices
|
dict[str, float]
|
Current prices for all pairs. |
required |
Returns:
| Type | Description |
|---|---|
float
|
Notional value in quote currency (e.g., USDT). |
float
|
Return 0.0 to skip this signal. |
Source code in src/signalflow/strategy/component/sizing/base.py
Available Sizers¶
FixedFractionSizer¶
Allocate a fixed percentage of equity per trade.
signalflow.strategy.component.sizing.fixed_fraction.FixedFractionSizer
dataclass
¶
FixedFractionSizer(fraction: float = 0.02, min_notional: float = 10.0, max_notional: float = float('inf'))
Bases: PositionSizer
Fixed percentage of equity per trade.
Classic position sizing: risk a fixed fraction of current equity. Simple and consistent regardless of signal strength or volatility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fraction
|
float
|
Fraction of equity to allocate (e.g., 0.02 = 2%). |
0.02
|
min_notional
|
float
|
Minimum trade size (skip if below). |
10.0
|
max_notional
|
float
|
Maximum trade size cap. |
float('inf')
|
Example
sizer = FixedFractionSizer(fraction=0.02) # 2% per trade
With $10,000 equity: notional = $200¶
SignalStrengthSizer¶
Scale position size by signal probability.
signalflow.strategy.component.sizing.signal_strength.SignalStrengthSizer
dataclass
¶
SignalStrengthSizer(base_size: float = 100.0, min_probability: float = 0.5, scale_factor: float = 1.0, min_notional: float = 10.0, max_notional: float = float('inf'))
Bases: PositionSizer
Size proportional to signal probability/strength.
Higher confidence signals get larger positions. Essentially the current SignalEntryRule behavior extracted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_size
|
float
|
Base notional value. |
100.0
|
min_probability
|
float
|
Skip signals below this threshold. |
0.5
|
scale_factor
|
float
|
Multiplier for probability-based scaling. |
1.0
|
min_notional
|
float
|
Minimum trade size. |
10.0
|
max_notional
|
float
|
Maximum trade size. |
float('inf')
|
Example
sizer = SignalStrengthSizer(base_size=100.0)
Signal with probability=0.8 -> notional = 80¶
Signal with probability=0.5 -> notional = 50¶
KellyCriterionSizer¶
Optimal sizing using the Kelly Criterion formula.
signalflow.strategy.component.sizing.kelly.KellyCriterionSizer
dataclass
¶
KellyCriterionSizer(kelly_fraction: float = 0.5, min_trades_for_stats: int = 30, default_win_rate: float = 0.5, default_payoff_ratio: float = 1.0, use_signal_probability: bool = True, min_notional: float = 10.0, max_fraction: float = 0.25)
Bases: PositionSizer
Kelly Criterion position sizing.
Formula: f* = (p * b - q) / b Where: p = win probability (from signal or historical) q = 1 - p (loss probability) b = win/loss ratio (payoff ratio)
Half-Kelly (kelly_fraction=0.5) is recommended for practical use to reduce volatility while capturing most of the edge.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
kelly_fraction
|
float
|
Fraction of Kelly to use (0.5 = half-Kelly recommended). |
0.5
|
min_trades_for_stats
|
int
|
Minimum closed trades before using historical stats. |
30
|
default_win_rate
|
float
|
Fallback win rate if insufficient history. |
0.5
|
default_payoff_ratio
|
float
|
Fallback payoff ratio if insufficient history. |
1.0
|
use_signal_probability
|
bool
|
Use signal.probability as win rate proxy. |
True
|
min_notional
|
float
|
Minimum trade size. |
10.0
|
max_fraction
|
float
|
Maximum fraction of equity (safety cap). |
0.25
|
Example
sizer = KellyCriterionSizer(kelly_fraction=0.5) # Half-Kelly
With 60% win rate and 1.5:1 payoff ratio:¶
Full Kelly f* = (0.6 * 1.5 - 0.4) / 1.5 = 0.333¶
Half Kelly = 0.167 = 16.7% of equity¶
VolatilityTargetSizer¶
Size positions to achieve target volatility contribution.
signalflow.strategy.component.sizing.volatility_target.VolatilityTargetSizer
dataclass
¶
VolatilityTargetSizer(target_volatility: float = 0.01, volatility_source: str = 'atr', default_volatility_pct: float = 0.02, min_notional: float = 10.0, max_fraction: float = 0.2)
Bases: PositionSizer
Target specific portfolio volatility per position.
Sizes positions to contribute equal volatility to the portfolio. Smaller positions in volatile assets, larger in stable ones.
Formula: notional = (target_vol * equity) / asset_vol_pct
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_volatility
|
float
|
Target contribution to portfolio vol (e.g., 0.01 = 1%). |
0.01
|
volatility_source
|
str
|
Key in state.runtime for ATR/volatility data. |
'atr'
|
default_volatility_pct
|
float
|
Default volatility if ATR not available. |
0.02
|
min_notional
|
float
|
Minimum trade size. |
10.0
|
max_fraction
|
float
|
Maximum fraction of equity per position. |
0.2
|
Example
sizer = VolatilityTargetSizer(target_volatility=0.01)
Asset with 2% daily vol -> 50% of target allocation¶
Asset with 0.5% daily vol -> 200% of target allocation (capped)¶
RiskParitySizer¶
Equal risk contribution across positions.
signalflow.strategy.component.sizing.risk_parity.RiskParitySizer
dataclass
¶
RiskParitySizer(target_positions: int = 10, volatility_source: str = 'atr', default_volatility_pct: float = 0.02, min_notional: float = 10.0)
Bases: PositionSizer
Equal risk contribution across all positions.
Allocates capital so each position contributes equally to portfolio risk, accounting for existing positions and their volatilities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_positions
|
int
|
Target number of equal-risk positions. |
10
|
volatility_source
|
str
|
Key in state.runtime for volatility data. |
'atr'
|
default_volatility_pct
|
float
|
Default volatility if not available. |
0.02
|
min_notional
|
float
|
Minimum trade size. |
10.0
|
Example
sizer = RiskParitySizer(target_positions=10)
Each position should contribute 10% of total risk budget¶
High-vol assets get smaller notional allocation¶
MartingaleSizer¶
Grid/DCA strategy with increasing position sizes.
signalflow.strategy.component.sizing.martingale.MartingaleSizer
dataclass
¶
MartingaleSizer(base_size: float = 100.0, multiplier: float = 1.5, max_grid_levels: int = 5, max_notional: float = float('inf'), min_notional: float = 10.0)
Bases: PositionSizer
Martingale position sizing for grid strategies.
Increases position size with each grid level filled. Useful for DCA (Dollar Cost Averaging) and grid trading strategies.
Formula: notional = base_size * (multiplier ^ grid_level)
Where grid_level = number of existing open positions in the same pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_size
|
float
|
Initial position size for first grid level. |
100.0
|
multiplier
|
float
|
Size multiplier per level (e.g., 1.5 = 50% increase). |
1.5
|
max_grid_levels
|
int
|
Maximum number of grid levels to fill. |
5
|
max_notional
|
float
|
Maximum position size cap. |
float('inf')
|
min_notional
|
float
|
Minimum trade size. |
10.0
|
Example
sizer = MartingaleSizer(base_size=100, multiplier=1.5)
Level 0: $100¶
Level 1: $150¶
Level 2: $225¶
Level 3: $337.50¶
Warning
Martingale can lead to large losses in trending markets. Always use with appropriate risk limits and max_grid_levels.
Usage Example¶
from signalflow.strategy.component.sizing import (
FixedFractionSizer,
KellyCriterionSizer,
VolatilityTargetSizer,
SignalContext,
)
from signalflow.core import StrategyState
# Create test state
state = StrategyState(strategy_id="demo")
state.portfolio.cash = 10_000.0
prices = {"BTCUSDT": 50000.0}
# Signal context
signal = SignalContext(
pair="BTCUSDT",
signal_type="rise",
probability=0.75,
price=50000.0,
)
# Fixed 2% of equity per trade
sizer = FixedFractionSizer(fraction=0.02)
notional = sizer.compute_size(signal, state, prices) # $200
# Half-Kelly sizing
kelly = KellyCriterionSizer(kelly_fraction=0.5, default_win_rate=0.55)
notional = kelly.compute_size(signal, state, prices)
# Volatility targeting (requires ATR in state.runtime)
state.runtime["atr"] = {"BTCUSDT": 1000.0} # 2% ATR
vol_sizer = VolatilityTargetSizer(target_volatility=0.01)
notional = vol_sizer.compute_size(signal, state, prices)
Entry Filters¶
Entry filters provide pre-trade validation to improve signal quality.
All filters return (allowed: bool, reason: str) tuples.
Base Classes¶
signalflow.strategy.component.entry.filters.EntryFilter
dataclass
¶
Bases: ABC
Base class for entry filters.
Filters determine whether a signal should be acted upon. All filters must pass (AND logic) for entry to proceed.
Design principles
- Filters are binary (allow/reject)
- Should provide rejection reason for debugging
- Can be composed via CompositeEntryFilter
allow_entry
abstractmethod
¶
allow_entry(signal: SignalContext, state: StrategyState, prices: dict[str, float]) -> tuple[bool, str]
Check if entry is allowed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signal
|
SignalContext
|
Signal context. |
required |
state
|
StrategyState
|
Strategy state. |
required |
prices
|
dict[str, float]
|
Current prices. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Tuple of (allowed, reason). |
str
|
reason is empty string if allowed, else rejection reason. |
Source code in src/signalflow/strategy/component/entry/filters.py
signalflow.strategy.component.entry.filters.CompositeEntryFilter
dataclass
¶
Bases: EntryFilter
Combines multiple entry filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
list[EntryFilter]
|
List of filters to apply. |
list()
|
require_all
|
bool
|
If True (default), all must pass. If False, any can pass. |
True
|
Example
composite = CompositeEntryFilter( ... filters=[DrawdownFilter(max_drawdown=0.10), VolatilityFilter()], ... require_all=True ... )
Available Filters¶
RegimeFilter¶
Only enter when market regime matches signal direction.
signalflow.strategy.component.entry.filters.RegimeFilter
dataclass
¶
RegimeFilter(signal_regime_map: dict[str, str] | None = None, regime_key: str = 'regime', allowed_regimes_bullish: list[str] = (lambda: ['trend_up', 'mean_reversion_oversold'])(), allowed_regimes_bearish: list[str] = (lambda: ['trend_down', 'mean_reversion_overbought'])())
Bases: EntryFilter
Filter entries based on market regime.
Only allow entries when market regime matches signal type: - Bullish signals in trend-up or mean-reversion-oversold regimes - Bearish signals in trend-down or mean-reversion-overbought regimes
Regime detected via state.runtime["regime"][pair] or global regime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signal_regime_map
|
dict[str, str] | None
|
Mapping signal_type -> "bullish"/"bearish". When set, overrides legacy "rise"/"fall" hardcoding. None = legacy behavior (only "rise" and "fall" are regime-checked). |
None
|
regime_key
|
str
|
Key in state.runtime for regime data. |
'regime'
|
allowed_regimes_bullish
|
list[str]
|
Regimes allowing bullish entries. |
(lambda: ['trend_up', 'mean_reversion_oversold'])()
|
allowed_regimes_bearish
|
list[str]
|
Regimes allowing bearish entries. |
(lambda: ['trend_down', 'mean_reversion_overbought'])()
|
VolatilityFilter¶
Skip entries in extreme volatility conditions.
signalflow.strategy.component.entry.filters.VolatilityFilter
dataclass
¶
VolatilityFilter(volatility_key: str = 'atr', min_volatility: float = 0.0, max_volatility: float = float('inf'), use_relative: bool = True)
Bases: EntryFilter
Skip entries in extreme volatility conditions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
volatility_key
|
str
|
Key in state.runtime for volatility data (default: "atr"). |
'atr'
|
min_volatility
|
float
|
Minimum relative volatility to allow entry. |
0.0
|
max_volatility
|
float
|
Maximum relative volatility to allow entry. |
float('inf')
|
use_relative
|
bool
|
If True, compare vol/price ratio instead of absolute. |
True
|
DrawdownFilter¶
Pause trading after significant drawdown.
signalflow.strategy.component.entry.filters.DrawdownFilter
dataclass
¶
DrawdownFilter(max_drawdown: float = 0.1, recovery_threshold: float = 0.05, drawdown_key: str = 'current_drawdown', _paused: bool = False)
Bases: EntryFilter
Pause trading after significant drawdown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_drawdown
|
float
|
Maximum drawdown before pausing (e.g., 0.10 = 10%). |
0.1
|
recovery_threshold
|
float
|
Resume when drawdown reduces to this level. |
0.05
|
drawdown_key
|
str
|
Key in state.metrics for current drawdown. |
'current_drawdown'
|
CorrelationFilter¶
Avoid concentrated positions in correlated assets.
signalflow.strategy.component.entry.filters.CorrelationFilter
dataclass
¶
CorrelationFilter(correlation_key: str = 'correlations', max_correlation: float = 0.7, max_correlated_positions: int = 2)
Bases: EntryFilter
Avoid concentrated positions in correlated assets.
Rejects entry if already holding highly correlated assets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
correlation_key
|
str
|
Key in state.runtime for correlation matrix. |
'correlations'
|
max_correlation
|
float
|
Maximum allowed correlation with existing positions. |
0.7
|
max_correlated_positions
|
int
|
Max positions in correlated group. |
2
|
TimeOfDayFilter¶
Restrict trading to specific hours.
signalflow.strategy.component.entry.filters.TimeOfDayFilter
dataclass
¶
Bases: EntryFilter
Restrict trading to specific hours.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
allowed_hours
|
list[int] | None
|
List of hours (0-23) when trading is allowed. |
None
|
blocked_hours
|
list[int] | None
|
List of hours (0-23) when trading is blocked. |
None
|
Note: If both are None, all hours are allowed.
PriceDistanceFilter¶
Filter entries based on price distance from existing positions.
signalflow.strategy.component.entry.filters.PriceDistanceFilter
dataclass
¶
PriceDistanceFilter(signal_direction_map: dict[str, str] | None = None, min_distance_pct: float = 0.02, direction_aware: bool = True)
Bases: EntryFilter
Filter entries based on price distance from existing positions.
For grid strategies: prevents buying when price is too close to existing positions in the same pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signal_direction_map
|
dict[str, str] | None
|
Mapping signal_type -> "long"/"short". When set, overrides legacy "rise"/"fall" hardcoding. None = legacy behavior (only "rise" and "fall" are direction-aware). |
None
|
min_distance_pct
|
float
|
Minimum price difference as percentage (e.g., 0.02 = 2%). |
0.02
|
direction_aware
|
bool
|
If True, check distance based on position direction. - LONG: new entry must be below existing entry by min_distance_pct - SHORT: new entry must be above existing entry by min_distance_pct If False, check absolute distance in either direction. |
True
|
Example
Grid strategy: only buy when price drops 2% from last position¶
filter = PriceDistanceFilter(min_distance_pct=0.02, direction_aware=True)
SignalAccuracyFilter¶
Pause trading when signal accuracy drops below threshold.
signalflow.strategy.component.entry.filters.SignalAccuracyFilter
dataclass
¶
SignalAccuracyFilter(accuracy_key: str = 'signal_accuracy', min_accuracy: float = 0.45, min_samples: int = 20, window_key: str | None = None)
Bases: EntryFilter
Filter based on real-time signal accuracy metrics.
Tracks detector/model accuracy and pauses trading when accuracy drops. Useful for detecting model degradation or regime changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
accuracy_key
|
str
|
Key in state.runtime for accuracy data. |
'signal_accuracy'
|
min_accuracy
|
float
|
Minimum required accuracy to allow entry. |
0.45
|
min_samples
|
int
|
Minimum samples before applying filter. |
20
|
window_key
|
str | None
|
Optional key for accuracy over recent window only. |
None
|
Example
Pause if recent signal accuracy drops below 45%¶
filter = SignalAccuracyFilter(min_accuracy=0.45, min_samples=20)
Usage Example¶
from signalflow.strategy.component.entry import (
CompositeEntryFilter,
DrawdownFilter,
RegimeFilter,
VolatilityFilter,
TimeOfDayFilter,
)
# Combine multiple filters (AND logic)
composite = CompositeEntryFilter(
filters=[
DrawdownFilter(max_drawdown=0.10, recovery_threshold=0.05),
RegimeFilter(),
VolatilityFilter(max_volatility=0.03),
TimeOfDayFilter(blocked_hours=[0, 1, 2, 3]), # Skip overnight
],
require_all=True, # All must pass
)
allowed, reason = composite.allow_entry(signal, state, prices)
if not allowed:
print(f"Entry rejected: {reason}")
Signal Aggregation¶
Combine signals from multiple detectors using voting or weighting logic.
VotingMode¶
signalflow.strategy.component.entry.aggregation.VotingMode ¶
Bases: str, Enum
Signal aggregation voting modes.
| Mode | Description |
|---|---|
MAJORITY |
Most common signal type wins (requires min_agreement) |
WEIGHTED |
Weighted average of probabilities |
UNANIMOUS |
All detectors must agree |
ANY |
Any non-NONE signal passes (highest probability wins) |
META_LABELING |
Detector direction × validator probability |
SignalAggregator¶
signalflow.strategy.component.entry.aggregation.SignalAggregator
dataclass
¶
SignalAggregator(voting_mode: VotingMode = VotingMode.MAJORITY, min_agreement: float = 0.5, weights: list[float] | None = None, probability_threshold: float = 0.5, pair_col: str = 'pair', ts_col: str = 'timestamp')
Combine signals from multiple detectors.
Aggregates multiple Signals DataFrames into one based on voting/weighting logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
voting_mode
|
VotingMode
|
How to combine signals (see VotingMode). |
MAJORITY
|
min_agreement
|
float
|
Minimum fraction of detectors agreeing (for MAJORITY). |
0.5
|
weights
|
list[float] | None
|
Optional weights per detector (for WEIGHTED mode). |
None
|
probability_threshold
|
float
|
Minimum combined probability to emit signal. |
0.5
|
pair_col
|
str
|
Column name for pair. |
'pair'
|
ts_col
|
str
|
Column name for timestamp. |
'timestamp'
|
Example
Majority voting¶
aggregator = SignalAggregator(voting_mode=VotingMode.MAJORITY) combined = aggregator.aggregate([signals1, signals2, signals3])
Meta-labeling: detector direction * validator confidence¶
aggregator = SignalAggregator(voting_mode=VotingMode.META_LABELING) combined = aggregator.aggregate([detector_signals, validator_signals])
__call__ ¶
Alias for aggregate().
aggregate ¶
Aggregate multiple signal sources into one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signals_list
|
list[Signals]
|
List of Signals from different detectors. |
required |
detector_names
|
list[str] | None
|
Optional names for tracing (len must match signals_list). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Aggregated Signals DataFrame. |
Source code in src/signalflow/strategy/component/entry/aggregation.py
Usage Examples¶
from signalflow.strategy.component.entry import SignalAggregator, VotingMode
# Majority voting: signal needs >50% agreement
aggregator = SignalAggregator(
voting_mode=VotingMode.MAJORITY,
min_agreement=0.5,
)
combined = aggregator.aggregate([signals_1, signals_2, signals_3])
# Weighted voting with custom weights
aggregator = SignalAggregator(
voting_mode=VotingMode.WEIGHTED,
weights=[2.0, 1.0, 1.0], # First detector weighted 2x
probability_threshold=0.6,
)
combined = aggregator.aggregate([primary_signals, secondary_1, secondary_2])
# Meta-labeling: detector direction × validator confidence
aggregator = SignalAggregator(
voting_mode=VotingMode.META_LABELING,
probability_threshold=0.5,
)
combined = aggregator.aggregate([detector_signals, validator_signals])
# Combined probability = detector_prob * validator_prob
# Unanimous: all must agree for high-conviction trades
aggregator = SignalAggregator(
voting_mode=VotingMode.UNANIMOUS,
probability_threshold=0.7,
)
combined = aggregator.aggregate([detector_1, detector_2, detector_3])
Integration with SignalEntryRule¶
Position sizers and entry filters can be injected into SignalEntryRule:
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.component.entry import (
SignalEntryRule,
CompositeEntryFilter,
DrawdownFilter,
RegimeFilter,
)
from signalflow.strategy.component.sizing import KellyCriterionSizer
from signalflow.strategy.component.exit import TakeProfitStopLossExit
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
# Create advanced entry rule
entry_rule = SignalEntryRule(
position_sizer=KellyCriterionSizer(kelly_fraction=0.5),
entry_filters=CompositeEntryFilter(
filters=[
DrawdownFilter(max_drawdown=0.10),
RegimeFilter(),
],
),
)
# Run backtest
runner = BacktestRunner(
strategy_id="advanced_strategy",
broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
entry_rules=[entry_rule],
exit_rules=[TakeProfitStopLossExit(take_profit_pct=0.02, stop_loss_pct=0.01)],
initial_capital=10_000.0,
)
state = runner.run(raw_data=raw_data, signals=signals)
Grid Trading Example¶
Combine MartingaleSizer with PriceDistanceFilter for grid strategies:
from signalflow.strategy.component.entry import (
SignalEntryRule,
PriceDistanceFilter,
)
from signalflow.strategy.component.sizing import MartingaleSizer
# Grid strategy: buy more as price drops
entry_rule = SignalEntryRule(
position_sizer=MartingaleSizer(
base_size=100.0, # Start with $100
multiplier=1.5, # Increase 50% per level
max_grid_levels=5, # Max 5 levels
),
entry_filters=PriceDistanceFilter(
min_distance_pct=0.02, # 2% price drop between levels
direction_aware=True,
),
max_positions_per_pair=5,
)
Data Sources¶
Components access data through StrategyState:
| Component | Data Source | Key |
|---|---|---|
VolatilityTargetSizer |
ATR values | state.runtime["atr"] |
RiskParitySizer |
ATR values | state.runtime["atr"] |
DrawdownFilter |
Current drawdown | state.metrics["current_drawdown"] |
VolatilityFilter |
ATR values | state.runtime["atr"] |
RegimeFilter |
Market regime | state.runtime["regime"] |
CorrelationFilter |
Correlation matrix | state.runtime["correlations"] |
SignalAccuracyFilter |
Accuracy metrics | state.runtime["signal_accuracy"] |
Populate these during backtest:
# Example: populate runtime data before entry rule
def on_bar_hook(state, timestamp, prices):
state.runtime["atr"] = calculate_atr(prices)
state.runtime["regime"] = detect_regime(prices)
Full Backtest Example¶
Complete example using signal aggregation, entry filters, and position sizing:
from datetime import datetime
from pathlib import Path
import polars as pl
from signalflow.data.raw_store import DuckDbRawStore
from signalflow.data.source import VirtualDataProvider
from signalflow.data import RawDataFactory
from signalflow.detector import ExampleSmaCrossDetector
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.component.entry import (
SignalEntryRule,
SignalAggregator,
VotingMode,
CompositeEntryFilter,
DrawdownFilter,
TimeOfDayFilter,
)
from signalflow.strategy.component.sizing import VolatilityTargetSizer
from signalflow.strategy.component.exit import TakeProfitStopLossExit
# 1. Generate synthetic data
PAIRS = ["BTCUSDT", "ETHUSDT"]
START = datetime(2025, 1, 1)
spot_store = DuckDbRawStore(db_path=Path("backtest.duckdb"), timeframe="1m")
provider = VirtualDataProvider(store=spot_store, seed=42)
provider.download(pairs=PAIRS, n_bars=5000, start=START)
# 2. Load data
raw_data = RawDataFactory.from_duckdb_spot_store(
spot_store_path=Path("backtest.duckdb"),
pairs=PAIRS,
start=START,
end=datetime(2025, 1, 4),
)
# 3. Create multiple detectors
detector_fast = ExampleSmaCrossDetector(fast_period=10, slow_period=30)
detector_slow = ExampleSmaCrossDetector(fast_period=20, slow_period=50)
signals_fast = detector_fast.run(raw_data.view())
signals_slow = detector_slow.run(raw_data.view())
# 4. Aggregate signals (unanimous agreement)
aggregator = SignalAggregator(
voting_mode=VotingMode.UNANIMOUS,
probability_threshold=0.0,
)
signals = aggregator.aggregate([signals_fast, signals_slow])
# 5. Configure entry rule with sizer and filters
entry_rule = SignalEntryRule(
position_sizer=VolatilityTargetSizer(
target_volatility=0.015,
default_volatility_pct=0.02,
max_fraction=0.15,
),
entry_filters=CompositeEntryFilter(
filters=[
DrawdownFilter(max_drawdown=0.10),
TimeOfDayFilter(allowed_hours=list(range(6, 22))),
],
),
max_positions_per_pair=1,
max_total_positions=5,
)
# 6. Run backtest
runner = BacktestRunner(
strategy_id="advanced_strategy",
broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
entry_rules=[entry_rule],
exit_rules=[TakeProfitStopLossExit(take_profit_pct=0.02, stop_loss_pct=0.015)],
initial_capital=10_000.0,
)
state = runner.run(raw_data, signals)
results = runner.get_results()
print(f"Total Return: {results.get('final_return', 0) * 100:.2f}%")
print(f"Max Drawdown: {results.get('max_drawdown', 0) * 100:.2f}%")
print(f"Win Rate: {results.get('win_rate', 0) * 100:.1f}%")
# Cleanup
spot_store.close()
External Model Integration¶
SignalFlow supports integration with external ML/RL models via a Protocol-based interface. Models make trading decisions (entry, exit, hold) based on signals and metrics.
Architecture¶
flowchart TB
A[Signals + Metrics] --> B[ModelContext]
B --> C[StrategyModel.decide]
C --> D[list of StrategyDecision]
D --> E{Action Type}
E -->|ENTER| F[ModelEntryRule]
E -->|CLOSE/CLOSE_ALL| G[ModelExitRule]
F --> H[Entry Orders]
G --> I[Exit Orders]
style C fill:#7c3aed,stroke:#8b5cf6,color:#fff
style D fill:#059669,stroke:#10b981,color:#fff
Design Principle: Strategy models see signals and metrics only, NOT raw OHLCV prices.
StrategyAction¶
signalflow.strategy.model.decision.StrategyAction ¶
Bases: StrEnum
Actions a strategy model can take.
Values
ENTER: Open new position for pair (uses size_multiplier). SKIP: Skip this signal (do not enter). CLOSE: Close specific position (requires position_id). CLOSE_ALL: Close all positions for a pair. HOLD: Do nothing (no action).
| Action | Description |
|---|---|
ENTER |
Open new position (uses size_multiplier) |
SKIP |
Skip this signal |
CLOSE |
Close specific position (requires position_id) |
CLOSE_ALL |
Close all positions for a pair |
HOLD |
Do nothing |
StrategyDecision¶
signalflow.strategy.model.decision.StrategyDecision
dataclass
¶
StrategyDecision(action: StrategyAction, pair: str, position_id: str | None = None, size_multiplier: float = 1.0, confidence: float = 1.0, meta: dict[str, Any] = dict())
Model output for a single trading decision.
Represents one decision from the model about whether to enter, exit, or hold positions. Multiple decisions can be returned per bar.
Attributes:
| Name | Type | Description |
|---|---|---|
action |
StrategyAction
|
The action to take (ENTER, SKIP, CLOSE, CLOSE_ALL, HOLD). |
pair |
str
|
Trading pair this decision applies to. |
position_id |
str | None
|
For CLOSE action - specific position to close. |
size_multiplier |
float
|
For ENTER action - multiplier on base position size (default 1.0). |
confidence |
float
|
Model confidence in this decision (0-1). |
meta |
dict[str, Any]
|
Additional metadata (e.g., reason, model_name). |
Example
Enter decision¶
decision = StrategyDecision( ... action=StrategyAction.ENTER, ... pair="BTCUSDT", ... size_multiplier=1.5, ... confidence=0.85, ... meta={"signal_type": "rise", "model": "rf_v2"} ... )
Close specific position¶
decision = StrategyDecision( ... action=StrategyAction.CLOSE, ... pair="BTCUSDT", ... position_id="pos_abc123", ... confidence=0.92, ... meta={"reason": "model_exit"} ... )
Raises:
| Type | Description |
|---|---|
ValueError
|
If CLOSE action is missing position_id. |
ValueError
|
If ENTER action has non-positive size_multiplier. |
__post_init__ ¶
Validate decision parameters.
Source code in src/signalflow/strategy/model/decision.py
ModelContext¶
signalflow.strategy.model.context.ModelContext
dataclass
¶
ModelContext(timestamp: datetime, signals: Signals, prices: dict[str, float] = dict(), positions: list[Position] = list(), metrics: dict[str, float] = dict(), runtime: dict[str, Any] = dict())
Aggregated context passed to strategy models.
Provides all information a model needs to make decisions: - Current bar signals - Strategy metrics (equity, drawdown, etc.) - Current positions and their states - Runtime state (for custom indicators, cooldowns, etc.)
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp |
datetime
|
Current bar timestamp. |
signals |
Signals
|
Current bar signals (Signals container). |
prices |
dict[str, float]
|
Current prices per pair. |
positions |
list[Position]
|
List of open positions. |
metrics |
dict[str, float]
|
Current strategy metrics snapshot. |
runtime |
dict[str, Any]
|
Runtime state dict (cooldowns, custom state). |
Example
Model receives context each bar¶
def decide(self, context: ModelContext) -> list[StrategyDecision]: ... # Access signals ... for row in context.signals.value.iter_rows(named=True): ... pair = row["pair"] ... signal_type = row["signal_type"] ... probability = row.get("probability", 0.5) ... ... # Use metrics for risk management ... if context.metrics.get("max_drawdown", 0) > 0.15: ... continue # Skip during high drawdown ... ... # Check existing positions ... pair_positions = [p for p in context.positions if p.pair == pair] ... ...
StrategyModel Protocol¶
signalflow.strategy.model.protocol.StrategyModel ¶
Bases: Protocol
Protocol for external strategy models.
External models must implement this protocol to integrate with SignalFlow. The model receives context (signals, metrics, positions) and returns a list of trading decisions.
Implementation Notes
- Models are called ONCE per bar (not per signal).
- Return empty list for "no action".
- Multiple decisions per bar are allowed.
- Model should be stateless (use context.runtime for state).
Example Implementation
class MyRLModel: ... '''Reinforcement learning model for trading.''' ... ... def init(self, model_path: str): ... self.model = load_model(model_path) ... ... def decide(self, context: ModelContext) -> list[StrategyDecision]: ... decisions = [] ... ... # Skip if drawdown too high ... if context.metrics.get("max_drawdown", 0) > 0.2: ... return decisions ... ... # Process each signal ... for row in context.signals.value.iter_rows(named=True): ... pair = row["pair"] ... prob = row.get("probability", 0.5) ... ... features = self._build_features(row, context.metrics) ... action, confidence = self.model.predict(features) ... ... if action == "enter" and confidence > 0.6: ... decisions.append(StrategyDecision( ... action=StrategyAction.ENTER, ... pair=pair, ... size_multiplier=min(confidence, 1.5), ... confidence=confidence, ... )) ... ... # Check if should close any positions ... for pos in context.positions: ... if self._should_close(pos, context): ... decisions.append(StrategyDecision( ... action=StrategyAction.CLOSE, ... pair=pos.pair, ... position_id=pos.id, ... )) ... ... return decisions
decide ¶
Make trading decisions based on current context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ModelContext
|
Current bar context with signals, metrics, positions. |
required |
Returns:
| Type | Description |
|---|---|
list[StrategyDecision]
|
List of trading decisions (can be empty). |
Source code in src/signalflow/strategy/model/protocol.py
ModelEntryRule¶
signalflow.strategy.model.rules.ModelEntryRule
dataclass
¶
ModelEntryRule(signal_type_map: dict[str, str] | None = None, model: StrategyModel = None, base_position_size: float = 0.01, max_positions: int = 10, min_confidence: float = 0.5, allow_shorts: bool = False, pair_col: str = 'pair')
Bases: EntryRule
Entry rule that delegates to an external model.
The model is called once per bar. Its decisions are cached in state.runtime so that ModelExitRule can access exit decisions without calling the model twice.
Attributes:
| Name | Type | Description |
|---|---|---|
model |
StrategyModel
|
External model implementing StrategyModel protocol. |
base_position_size |
float
|
Base position size (multiplied by decision.size_multiplier). |
max_positions |
int
|
Maximum concurrent positions. |
min_confidence |
float
|
Minimum confidence to act on ENTER decisions. |
allow_shorts |
bool
|
Allow FALL signals to create short positions. |
pair_col |
str
|
Column name for pair in signals. |
Example
from signalflow.strategy.model import ModelEntryRule, ModelExitRule
model = MyRLModel("model.pt") entry_rule = ModelEntryRule( ... model=model, ... base_position_size=0.01, ... max_positions=5, ... min_confidence=0.6, ... ) exit_rule = ModelExitRule(model=model)
runner = BacktestRunner( ... entry_rules=[entry_rule], ... exit_rules=[exit_rule], ... ... ... )
check_entries ¶
Generate entry orders from model decisions.
Source code in src/signalflow/strategy/model/rules.py
ModelExitRule¶
signalflow.strategy.model.rules.ModelExitRule
dataclass
¶
Bases: ExitRule
Exit rule that uses cached model decisions.
NOTE: If decisions are not cached yet (exit runs before entry), this rule will call the model and cache the results.
Attributes:
| Name | Type | Description |
|---|---|---|
model |
StrategyModel
|
External model implementing StrategyModel protocol. |
min_confidence |
float
|
Minimum confidence to act on CLOSE/CLOSE_ALL decisions. |
Example
exit_rule = ModelExitRule( ... model=model, ... min_confidence=0.7, # Higher threshold for exits ... )
check_exits ¶
check_exits(positions: list[Position], prices: dict[str, float], state: StrategyState) -> list[Order]
Generate exit orders from model decisions.
Source code in src/signalflow/strategy/model/rules.py
Model Integration Example¶
from signalflow.strategy.model import (
StrategyModel,
StrategyAction,
StrategyDecision,
ModelContext,
ModelEntryRule,
ModelExitRule,
)
from signalflow.strategy.runner import BacktestRunner
from signalflow.strategy.broker import BacktestBroker
from signalflow.strategy.broker.executor import VirtualSpotExecutor
from signalflow.strategy.component.exit import TakeProfitStopLossExit
# 1. Implement the StrategyModel protocol
class MyRLModel:
"""Example RL model for trading decisions."""
def __init__(self, model_path: str):
# Load your trained model
self.model = self._load_model(model_path)
def decide(self, context: ModelContext) -> list[StrategyDecision]:
decisions = []
# Risk management: skip during high drawdown
if context.metrics.get("max_drawdown", 0) > 0.15:
return decisions
# Process each signal
for row in context.signals.value.iter_rows(named=True):
pair = row["pair"]
prob = row.get("probability", 0.5)
# Get model prediction
features = self._build_features(row, context.metrics)
action, confidence = self.model.predict(features)
if action == "enter" and confidence > 0.6:
decisions.append(StrategyDecision(
action=StrategyAction.ENTER,
pair=pair,
size_multiplier=min(confidence, 1.5),
confidence=confidence,
meta={"model": "rl_v1"},
))
# Check if should close any positions
for pos in context.positions:
if self._should_close(pos, context):
decisions.append(StrategyDecision(
action=StrategyAction.CLOSE,
pair=pos.pair,
position_id=pos.id,
confidence=0.9,
meta={"reason": "model_exit"},
))
return decisions
# 2. Create rules with the model
model = MyRLModel("model.pt")
entry_rule = ModelEntryRule(
model=model,
base_position_size=0.02, # 2% base size
max_positions=5,
min_confidence=0.6,
)
exit_rule = ModelExitRule(
model=model,
min_confidence=0.7, # Higher threshold for exits
)
# 3. Run backtest
runner = BacktestRunner(
strategy_id="model_strategy",
broker=BacktestBroker(executor=VirtualSpotExecutor(fee_rate=0.001)),
entry_rules=[entry_rule],
exit_rules=[
exit_rule,
TakeProfitStopLossExit(take_profit_pct=0.03, stop_loss_pct=0.02),
],
initial_capital=10_000.0,
)
state = runner.run(raw_data, signals)
Data Export¶
Export backtest results for external ML model training.
BacktestExporter¶
signalflow.strategy.exporter.parquet_exporter.BacktestExporter
dataclass
¶
Export backtest results for external ML training.
Exports per-bar data (signals + metrics) and per-trade data to Parquet format. Does NOT include raw OHLCV prices - only signals and derived metrics.
Output Files
- {output_path}/bars.parquet: Per-bar signals and metrics
- {output_path}/trades.parquet: Entry/exit trade pairs with outcomes
Attributes:
| Name | Type | Description |
|---|---|---|
pair_col |
str
|
Column name for pair in signals. |
ts_col |
str
|
Column name for timestamp in signals. |
Example
exporter = BacktestExporter()
During backtest (can be integrated with runner)¶
for ts in timestamps: ... # ... process bar ... ... exporter.export_bar(ts, signals, state.metrics, state) ... ... for trade in bar_trades: ... exporter.export_trade(trade_data)
After backtest¶
exporter.finalize(Path("./training_data"))
Load for training¶
bars = pl.read_parquet("./training_data/bars.parquet") trades = pl.read_parquet("./training_data/trades.parquet")
export_bar ¶
export_bar(timestamp: datetime, signals: Signals, metrics: dict[str, float], state: StrategyState) -> None
Record bar data for export.
Records
- Timestamp
- All signals for this bar (flattened)
- All metrics values
- Position summary (count, total exposure)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timestamp
|
datetime
|
Current bar timestamp. |
required |
signals
|
Signals
|
Current bar signals. |
required |
metrics
|
dict[str, float]
|
Current strategy metrics. |
required |
state
|
StrategyState
|
Current strategy state. |
required |
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
export_position_close ¶
export_position_close(position: Position, exit_time: datetime, exit_price: float, exit_reason: str = 'unknown') -> None
Convenience method to export when a position closes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
Position
|
The position being closed. |
required |
exit_time
|
datetime
|
Time of exit. |
required |
exit_price
|
float
|
Price at exit. |
required |
exit_reason
|
str
|
Reason for exit (e.g., "take_profit", "stop_loss", "model_exit"). |
'unknown'
|
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
export_trade ¶
Record completed trade for export.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
trade_data
|
dict[str, Any]
|
Dictionary containing trade information. Expected keys: - position_id - pair - entry_time, entry_price - exit_time, exit_price (if closed) - realized_pnl - hold_duration_bars - entry_signal_type, entry_confidence - exit_reason |
required |
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
finalize ¶
Write all data to Parquet files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_path
|
Path
|
Directory to write output files. Creates directory if it doesn't exist. |
required |
Source code in src/signalflow/strategy/exporter/parquet_exporter.py
Export Format¶
bars.parquet - Per-bar signals and metrics:
| Column | Description |
|---|---|
timestamp |
Bar timestamp |
pair |
Trading pair |
signal_type |
Signal type (e.g. rise, fall, local_max) |
probability |
Signal probability |
metric_equity |
Portfolio equity |
metric_max_drawdown |
Max drawdown |
open_position_count |
Open positions count |
trades.parquet - Completed trades:
| Column | Description |
|---|---|
position_id |
Position ID |
pair |
Trading pair |
entry_time, exit_time |
Trade timestamps |
entry_price, exit_price |
Prices |
realized_pnl |
Realized profit/loss |
exit_reason |
Why trade closed |
model_confidence |
Model confidence at entry |
Export Example¶
from pathlib import Path
from signalflow.strategy.exporter import BacktestExporter
# Create exporter
exporter = BacktestExporter()
# Option 1: Manual export during custom backtest loop
for ts in timestamps:
# ... process bar ...
exporter.export_bar(ts, signals, state.metrics, state)
# Export when positions close
for closed_position in newly_closed:
exporter.export_position_close(
position=closed_position,
exit_time=ts,
exit_price=prices[closed_position.pair],
exit_reason="take_profit",
)
# Write to disk
exporter.finalize(Path("./training_data"))
# Option 2: Load for training
import polars as pl
bars_df = pl.read_parquet("./training_data/bars.parquet")
trades_df = pl.read_parquet("./training_data/trades.parquet")
# Prepare features for ML training
features = bars_df.select([
"timestamp", "pair", "signal_type", "probability",
"metric_equity", "metric_max_drawdown",
])
See Also¶
- Model Integration Guide: Detailed ML/RL integration tutorial
- Quick Start: Basic strategy setup
- Core API:
StrategyState,Portfolio,Position