Create Your Own Signal Detector¶
SignalFlow v0.5.0 provides a flexible detector framework for building custom trading signal generators. Detectors encapsulate the logic of transforming raw market data into actionable signals.
What you'll learn:
- The
SignalDetectorlifecycle:preprocess()extracts features,detect()generates signals - How to register detectors with the
@sf.detectordecorator for discovery and reuse - How to combine multiple detectors with multi-detector aggregation
Prerequisites: 01 - Quick Start
1. Setup & Data¶
from datetime import datetime
from pathlib import Path
import polars as pl
import signalflow as sf
from signalflow.data import RawDataFactory
from signalflow.data.raw_store import DuckDbSpotStore
from signalflow.data.source import VirtualDataProvider
# Generate synthetic OHLCV data (no API keys required)
db_path = Path("/tmp/custom_detector_tutorial.duckdb")
store = DuckDbSpotStore(db_path=db_path)
VirtualDataProvider(store=store, seed=42).download(
pairs=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
n_bars=15_000,
)
# Load into RawData container
raw_data = RawDataFactory.from_duckdb_spot_store(
spot_store_path=db_path,
pairs=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
start=datetime(2020, 1, 1),
end=datetime(2030, 1, 1),
)
print(f"Loaded {len(raw_data.pairs)} pairs")
print(f"Spot data shape: {raw_data.get('spot').shape}")
2026-02-15 00:49:41.958 | INFO | signalflow.data.raw_store.duckdb_stores:_ensure_tables:153 - Database initialized: /tmp/custom_detector_tutorial.duckdb (data_type=spot, timeframe=1m) 2026-02-15 00:49:42.075 | DEBUG | signalflow.data.raw_store.duckdb_stores:insert_klines:220 - Inserted 15,000 rows for BTCUSDT 2026-02-15 00:49:42.077 | INFO | signalflow.data.source.virtual:download:255 - VirtualDataProvider: generated 15000 bars for BTCUSDT 2026-02-15 00:49:42.174 | DEBUG | signalflow.data.raw_store.duckdb_stores:insert_klines:220 - Inserted 15,000 rows for ETHUSDT 2026-02-15 00:49:42.176 | INFO | signalflow.data.source.virtual:download:255 - VirtualDataProvider: generated 15000 bars for ETHUSDT 2026-02-15 00:49:42.267 | DEBUG | signalflow.data.raw_store.duckdb_stores:insert_klines:220 - Inserted 15,000 rows for SOLUSDT 2026-02-15 00:49:42.268 | INFO | signalflow.data.source.virtual:download:255 - VirtualDataProvider: generated 15000 bars for SOLUSDT 2026-02-15 00:49:42.276 | INFO | signalflow.data.raw_store.duckdb_stores:_ensure_tables:153 - Database initialized: /tmp/custom_detector_tutorial.duckdb (data_type=spot, timeframe=1m)
Loaded 3 pairs Spot data shape: (45000, 8)
2. Built-in Detector Example¶
SignalFlow ships with ExampleSmaCrossDetector, a simple SMA crossover detector registered as "example/sma_cross". It demonstrates the core pattern:
- Features are declared in
__post_init__()-- the base class runs them automatically inpreprocess() detect()receives the feature-enriched DataFrame and returns aSignalsobjectSignalswraps a Polars DataFrame with columns:pair,timestamp,signal_type,signal
Let's run it and inspect the output.
from signalflow.core import RawDataView
from signalflow.detector import ExampleSmaCrossDetector
# Create detector and run
sma_detector = ExampleSmaCrossDetector(fast_period=20, slow_period=50)
view = RawDataView(raw=raw_data)
sma_signals = sma_detector.run(view)
# Inspect signal output
print(f"Signal DataFrame columns: {sma_signals.value.columns}")
print(f"Total signals: {sma_signals.value.height}")
print()
# Signal count per pair
print("Signals per pair:")
print(sma_signals.value.group_by("pair").agg(pl.len().alias("count")).sort("pair"))
print()
# Signal type distribution
print("Signal type distribution:")
print(sma_signals.value.group_by("signal_type").agg(pl.len().alias("count")).sort("signal_type"))
Signal DataFrame columns: ['pair', 'timestamp', 'signal_type', 'signal'] Total signals: 1022 Signals per pair: shape: (3, 2) ┌─────────┬───────┐ │ pair ┆ count │ │ --- ┆ --- │ │ str ┆ u32 │ ╞═════════╪═══════╡ │ BTCUSDT ┆ 333 │ │ ETHUSDT ┆ 346 │ │ SOLUSDT ┆ 343 │ └─────────┴───────┘ Signal type distribution: shape: (2, 2) ┌─────────────┬───────┐ │ signal_type ┆ count │ │ --- ┆ --- │ │ str ┆ u32 │ ╞═════════════╪═══════╡ │ fall ┆ 511 │ │ rise ┆ 511 │ └─────────────┴───────┘
3. Create an RSI Threshold Detector¶
Now let's build a custom detector from scratch. The pattern is:
- Subclass
SignalDetectorwith@dataclassand@sf.detector("name") - Set
allowed_signal_typesto declare which signal types this detector produces - Implement
detect()-- receive a Polars DataFrame, returnSignals
Our RSI detector will generate "rise" signals when RSI drops below an oversold threshold, indicating a potential buying opportunity.
from dataclasses import dataclass
from typing import Any, ClassVar
import polars as pl
from signalflow.core import RawDataView, SfComponentType, Signals
from signalflow.detector.base import SignalDetector
@dataclass
@sf.detector("tutorial/rsi_oversold")
class RsiOversoldDetector(SignalDetector):
"""Generates BUY signals when RSI drops below threshold."""
component_type: ClassVar[SfComponentType] = SfComponentType.DETECTOR
rsi_period: int = 14
oversold_threshold: float = 30.0
allowed_signal_types: set[str] | None = None
def __post_init__(self):
self.allowed_signal_types = {"rise"}
def detect(self, features: pl.DataFrame, context: dict[str, Any] | None = None) -> Signals:
pair_col = self.pair_col
ts_col = self.ts_col
# Compute RSI using Polars
delta = pl.col("close").diff().over(pair_col)
gain = delta.clip(lower_bound=0).rolling_mean(self.rsi_period).over(pair_col)
loss = (-delta.clip(upper_bound=0)).rolling_mean(self.rsi_period).over(pair_col)
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
df = features.with_columns(rsi.alias("_rsi"))
# Generate signals where RSI < threshold
signals_df = df.filter(pl.col("_rsi") < self.oversold_threshold).select(
[
pair_col,
ts_col,
pl.lit("rise").alias("signal_type"),
pl.lit(1).alias("signal"),
]
)
return Signals(signals_df)
print(f"Detector registered: {RsiOversoldDetector.__name__}")
4. Test the Custom Detector¶
Run the detector the same way as any built-in detector: create a RawDataView and call .run().
# Create detector instance and run
rsi_detector = RsiOversoldDetector(rsi_period=14, oversold_threshold=30)
view = RawDataView(raw=raw_data)
rsi_signals = rsi_detector.run(view)
print(f"Total RSI oversold signals: {rsi_signals.value.height}")
print()
# Signal stats per pair
print("Signals per pair:")
print(rsi_signals.value.group_by("pair").agg(pl.len().alias("count")).sort("pair"))
print()
# Show first few signals
print("Sample signals:")
print(rsi_signals.value.head(5))
Total RSI oversold signals: 5762 Signals per pair: shape: (3, 2) ┌─────────┬───────┐ │ pair ┆ count │ │ --- ┆ --- │ │ str ┆ u32 │ ╞═════════╪═══════╡ │ BTCUSDT ┆ 1852 │ │ ETHUSDT ┆ 1854 │ │ SOLUSDT ┆ 2056 │ └─────────┴───────┘ Sample signals: shape: (5, 4) ┌─────────┬─────────────────────┬─────────────┬────────┐ │ pair ┆ timestamp ┆ signal_type ┆ signal │ │ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ datetime[μs] ┆ str ┆ i32 │ ╞═════════╪═════════════════════╪═════════════╪════════╡ │ BTCUSDT ┆ 2024-01-01 01:32:00 ┆ rise ┆ 1 │ │ BTCUSDT ┆ 2024-01-01 01:33:00 ┆ rise ┆ 1 │ │ BTCUSDT ┆ 2024-01-01 01:34:00 ┆ rise ┆ 1 │ │ BTCUSDT ┆ 2024-01-01 01:35:00 ┆ rise ┆ 1 │ │ BTCUSDT ┆ 2024-01-01 01:53:00 ┆ rise ┆ 1 │ └─────────┴─────────────────────┴─────────────┴────────┘
5. Use in a Backtest¶
Custom detectors work seamlessly with the sf.Backtest() fluent API. You can pass either:
- A registry name (string):
"tutorial/rsi_oversold" - A detector instance:
RsiOversoldDetector(...)
# Option A: pass detector instance directly
result = (
sf.Backtest("rsi_oversold")
.data(raw=raw_data)
.detector(RsiOversoldDetector(rsi_period=14, oversold_threshold=25))
.exit(tp=0.03, sl=0.015)
.capital(50_000)
.run()
)
print(result.summary())
print()
# Option B: use registry name (works because @sf.detector registered it)
result_via_registry = (
sf.Backtest("rsi_oversold_registry")
.data(raw=raw_data)
.detector("tutorial/rsi_oversold", rsi_period=14, oversold_threshold=25)
.exit(tp=0.03, sl=0.015)
.capital(50_000)
.run()
)
print(f"Registry-based result: {result_via_registry.n_trades} trades, win rate {result_via_registry.win_rate:.1%}")
6. Multi-Detector Strategy¶
SignalFlow supports combining multiple detectors with configurable aggregation. Each detector runs independently, and their signals are merged according to the chosen mode:
| Mode | Description |
|---|---|
"merge" |
Sequential merge via Signals.__add__ (last detector has priority) |
"any" |
Signal fires if any detector agrees |
"majority" |
Signal fires if majority of detectors agree |
"unanimous" |
Signal fires only if all detectors agree |
"weighted" |
Weighted vote with per-detector weights |
result = (
sf.Backtest("ensemble")
.data(raw=raw_data)
.detector("example/sma_cross", fast_period=20, slow_period=50, name="sma")
.detector(RsiOversoldDetector(rsi_period=14, oversold_threshold=25), name="rsi")
.aggregation(mode="any")
.exit(tp=0.03, sl=0.015)
.capital(50_000)
.run()
)
print(result.summary())
Backtesting: 100%|██████████| 15000/15000 [00:05<00:00, 2537.45it/s]
==================================================
BACKTEST SUMMARY
==================================================
Trades: 4260
Win Rate: 0.0%
Profit Factor: 0.00
--------------------------------------------------
Initial Capital: $ 50,000.00
Final Capital: $ 0.00
Total Return: -100.0%
--------------------------------------------------
==================================================
7. Discover Available Detectors¶
The default_registry keeps track of all detectors registered via @sf.detector. Use it to discover what's available.
from signalflow.core import SfComponentType, default_registry
detectors = default_registry.list(SfComponentType.DETECTOR)
print(f"Available detectors ({len(detectors)}):")
for name in sorted(detectors):
print(f" - {name}")
Available detectors (12): - anomaly_detector - example/sma_cross - funding/rate_transition - local_extrema_detector - market_wide/agreement - market_wide/cusum - market_wide/zscore - percentile_regime_detector - structure_detector - tutorial/rsi_oversold - volatility_detector - zscore_anomaly_detector
Cleanup¶
store.close()
db_path.unlink(missing_ok=True)
print("Done!")
Done!
Key Takeaways¶
- Subclass
SignalDetectorand implementdetect()to return aSignalsobject. - Use
@sf.detector("category/name")to register your detector for discovery and use via registry name. preprocess()handles feature extraction automatically if you setself.featuresin__post_init__().Signalswraps a Polars DataFrame with required columns:pair,timestamp,signal_type, andsignal.- Multi-detector strategies combine signals with aggregation modes like
"any","majority", or"weighted".
Next Steps¶
- 03 - Data Loading & Resampling: Work with multiple timeframes and auto-resampling
- 04 - Pipeline Visualization: Visualize your strategy pipeline as an interactive graph