Skip to content

State Persistence

The StateManager provides async state persistence for live and paper trading. It tracks open positions, pending orders, risk state, and signal deduplication across three backends: Redis, DuckDB, and Memory.


Quick Start

from signalflow.strategy import StateManager

config = {
    "backend": "duckdb",
    "duckdb": {"path": "state/{flow_id}.db"},
    "recovery": {"mode": "sync"},
}

async with StateManager.from_config(flow_id="my_bot", config=config) as mgr:
    # Save position
    await mgr.save_position(Position(
        id="pos_001", pair="BTC/USDT", side="long",
        size=0.5, entry_price=50000, entry_ts=datetime.now(),
    ))

    # Check positions
    positions = await mgr.get_positions()

Backends

Backend Best For Persistence Multi-Process
Redis Production, multi-bot RDB/AOF snapshots Yes
DuckDB Single bot, embedded Always on disk No
Memory Testing, development None No

Redis

config = {
    "backend": "redis",
    "redis": {"url": "redis://localhost:6379"},
}

Key schema: sf:{flow_id}:{category}:{type}

  • sf:bot:positions:open — Hash of open positions
  • sf:bot:risk:daily — Daily PnL and trade count
  • sf:bot:signals:cooldowns — Pair cooldown expiry times
  • sf:bot:execution:heartbeat — Liveness timestamp

DuckDB

config = {
    "backend": "duckdb",
    "duckdb": {"path": "state/{flow_id}.db"},
}

Creates tables: positions, pending_orders, risk_state, signal_state, heartbeat. The {flow_id} placeholder is replaced automatically.

Memory

config = {"backend": "memory"}

No persistence — state is lost on restart. Useful for testing.


State Types

Position

Tracks an open trading position:

from signalflow.strategy import Position

pos = Position(
    id="pos_001",
    pair="BTC/USDT",
    side="long",        # "long" or "short"
    size=0.5,
    entry_price=50000,
    entry_ts=datetime.now(),
    tp=55000,           # Take profit (optional)
    sl=45000,           # Stop loss (optional)
    metadata={},        # Custom data
)

RiskState

Circuit breaker and daily risk tracking:

from signalflow.strategy import RiskState

risk = RiskState(
    daily_pnl=0.0,
    daily_trades=0,
    consecutive_losses=0,
    current_drawdown=0.0,
    peak_equity=0.0,
    circuit_breaker_active=False,
)

SignalState

Signal deduplication and cooldown tracking:

from signalflow.strategy import SignalState
# Tracks last_processed_ts, cooldowns per pair, recent_signal_ids

Position Management

async with StateManager.from_config(flow_id="bot", config=cfg) as mgr:
    # Save
    await mgr.save_position(position)

    # Get all
    positions = await mgr.get_positions()

    # Remove
    await mgr.remove_position("pos_001")

Risk Management

Daily PnL

risk = await mgr.update_daily_pnl(pnl_change=100.50)
print(f"Daily PnL: {risk.daily_pnl}")

Circuit Breaker

from datetime import timedelta

# Trigger
await mgr.trigger_circuit_breaker(
    reason="Daily loss limit exceeded",
    duration=timedelta(hours=2),
)

# Check before trading
if await mgr.check_circuit_breaker():
    print("Trading paused — circuit breaker active")
    return

Signal Deduplication

# Mark signal as processed
await mgr.mark_signal_processed(
    signal_id="sig_789",
    pair="BTC/USDT",
    timestamp=datetime.now(),
)

# Check before processing
if await mgr.is_signal_processed("sig_789"):
    return  # Skip duplicate

# Set cooldown after entry
await mgr.set_cooldown("BTC/USDT", duration=timedelta(minutes=5))

if await mgr.is_on_cooldown("BTC/USDT"):
    return  # Pair on cooldown

Heartbeat & Staleness

# Update heartbeat (call periodically)
await mgr.heartbeat()

# Check if state is stale (e.g. after crash)
if await mgr.check_stale(max_age=timedelta(hours=24)):
    print("State is stale — recovery needed")

Recovery Modes

Configure how the bot handles restarts:

Mode Behavior
sync Sync state with exchange after restart
restore Restore from persistence only
close_all Close all positions on restart
manual Require manual intervention

Orphan Position Handling

When exchange has positions not tracked in state:

Action Behavior
close Close orphaned positions
adopt Adopt into state
manual Manual intervention required
# YAML configuration
state:
  backend: redis
  redis:
    url: redis://localhost:6379
  recovery:
    mode: sync
    orphan_positions: close
    max_state_age: 24h

Full Configuration Reference

Option Values Default
backend redis, duckdb, memory memory
redis.url URL string redis://localhost:6379
redis.key_prefix string sf
duckdb.path path (supports {flow_id}) state/{flow_id}.db
recovery.mode sync, restore, close_all, manual sync
recovery.orphan_positions close, adopt, manual close
recovery.max_state_age duration string 24h

Imports

from signalflow.strategy import (
    StateManager,
    StateConfig,
    StateBackend,
    Position,
    RiskState,
    SignalState,
)