State Persistence¶
The StateManager provides async state persistence for live and paper trading.
It tracks open positions, pending orders, risk state, and signal deduplication
across three backends: Redis, DuckDB, and Memory.
Quick Start¶
from signalflow.strategy import StateManager
config = {
"backend": "duckdb",
"duckdb": {"path": "state/{flow_id}.db"},
"recovery": {"mode": "sync"},
}
async with StateManager.from_config(flow_id="my_bot", config=config) as mgr:
# Save position
await mgr.save_position(Position(
id="pos_001", pair="BTC/USDT", side="long",
size=0.5, entry_price=50000, entry_ts=datetime.now(),
))
# Check positions
positions = await mgr.get_positions()
Backends¶
| Backend | Best For | Persistence | Multi-Process |
|---|---|---|---|
| Redis | Production, multi-bot | RDB/AOF snapshots | Yes |
| DuckDB | Single bot, embedded | Always on disk | No |
| Memory | Testing, development | None | No |
Redis¶
Key schema: sf:{flow_id}:{category}:{type}
sf:bot:positions:open— Hash of open positionssf:bot:risk:daily— Daily PnL and trade countsf:bot:signals:cooldowns— Pair cooldown expiry timessf:bot:execution:heartbeat— Liveness timestamp
DuckDB¶
Creates tables: positions, pending_orders, risk_state, signal_state, heartbeat.
The {flow_id} placeholder is replaced automatically.
Memory¶
No persistence — state is lost on restart. Useful for testing.
State Types¶
Position¶
Tracks an open trading position:
from signalflow.strategy import Position
pos = Position(
id="pos_001",
pair="BTC/USDT",
side="long", # "long" or "short"
size=0.5,
entry_price=50000,
entry_ts=datetime.now(),
tp=55000, # Take profit (optional)
sl=45000, # Stop loss (optional)
metadata={}, # Custom data
)
RiskState¶
Circuit breaker and daily risk tracking:
from signalflow.strategy import RiskState
risk = RiskState(
daily_pnl=0.0,
daily_trades=0,
consecutive_losses=0,
current_drawdown=0.0,
peak_equity=0.0,
circuit_breaker_active=False,
)
SignalState¶
Signal deduplication and cooldown tracking:
from signalflow.strategy import SignalState
# Tracks last_processed_ts, cooldowns per pair, recent_signal_ids
Position Management¶
async with StateManager.from_config(flow_id="bot", config=cfg) as mgr:
# Save
await mgr.save_position(position)
# Get all
positions = await mgr.get_positions()
# Remove
await mgr.remove_position("pos_001")
Risk Management¶
Daily PnL¶
Circuit Breaker¶
from datetime import timedelta
# Trigger
await mgr.trigger_circuit_breaker(
reason="Daily loss limit exceeded",
duration=timedelta(hours=2),
)
# Check before trading
if await mgr.check_circuit_breaker():
print("Trading paused — circuit breaker active")
return
Signal Deduplication¶
# Mark signal as processed
await mgr.mark_signal_processed(
signal_id="sig_789",
pair="BTC/USDT",
timestamp=datetime.now(),
)
# Check before processing
if await mgr.is_signal_processed("sig_789"):
return # Skip duplicate
# Set cooldown after entry
await mgr.set_cooldown("BTC/USDT", duration=timedelta(minutes=5))
if await mgr.is_on_cooldown("BTC/USDT"):
return # Pair on cooldown
Heartbeat & Staleness¶
# Update heartbeat (call periodically)
await mgr.heartbeat()
# Check if state is stale (e.g. after crash)
if await mgr.check_stale(max_age=timedelta(hours=24)):
print("State is stale — recovery needed")
Recovery Modes¶
Configure how the bot handles restarts:
| Mode | Behavior |
|---|---|
sync |
Sync state with exchange after restart |
restore |
Restore from persistence only |
close_all |
Close all positions on restart |
manual |
Require manual intervention |
Orphan Position Handling¶
When exchange has positions not tracked in state:
| Action | Behavior |
|---|---|
close |
Close orphaned positions |
adopt |
Adopt into state |
manual |
Manual intervention required |
# YAML configuration
state:
backend: redis
redis:
url: redis://localhost:6379
recovery:
mode: sync
orphan_positions: close
max_state_age: 24h
Full Configuration Reference¶
| Option | Values | Default |
|---|---|---|
backend |
redis, duckdb, memory |
memory |
redis.url |
URL string | redis://localhost:6379 |
redis.key_prefix |
string | sf |
duckdb.path |
path (supports {flow_id}) |
state/{flow_id}.db |
recovery.mode |
sync, restore, close_all, manual |
sync |
recovery.orphan_positions |
close, adopt, manual |
close |
recovery.max_state_age |
duration string | 24h |