Detector Module¶
signalflow.detector.base.SignalDetector
dataclass
¶
SignalDetector(pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType = RawDataType.SPOT, feature_set: FeatureSet | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False)
Bases: ABC
Base class for Polars-first signal detection.
Provides standardized pipeline for detecting trading signals from raw data
- preprocess: Extract features from raw data
- detect: Generate signals from features
- validate: Ensure data quality
Key features
- Polars-native for performance
- Automatic feature extraction via FeatureSet
- Built-in validation (schema, duplicates, timezones)
- Optional probability requirement
- Keep latest signal per pair option
Public API
- run(): Complete pipeline (preprocess → detect → validate)
- preprocess(): Feature extraction (delegates to FeatureSet)
- detect(): Signal generation (must implement)
Attributes:
| Name | Type | Description |
|---|---|---|
component_type |
ClassVar[SfComponentType]
|
Always DETECTOR for registry. |
pair_col |
str
|
Trading pair column name. Default: "pair". |
ts_col |
str
|
Timestamp column name. Default: "timestamp". |
raw_data_type |
RawDataType
|
Type of raw data to process. Default: SPOT. |
feature_set |
FeatureSet | None
|
Feature extractor. Default: None. |
require_probability |
bool
|
Require probability column in signals. Default: False. |
keep_only_latest_per_pair |
bool
|
Keep only latest signal per pair. Default: False. |
Example
from signalflow.detector import SignalDetector
from signalflow.core import Signals, SignalType
import polars as pl
class SmaCrossDetector(SignalDetector):
'''Simple SMA crossover detector'''
def __init__(self, fast_window: int = 10, slow_window: int = 20):
super().__init__()
# Auto-generate features
from signalflow.feature import FeatureSet, SmaExtractor
self.feature_set = FeatureSet([
SmaExtractor(window=fast_window, column="close"),
SmaExtractor(window=slow_window, column="close")
])
def detect(self, features: pl.DataFrame, context=None) -> Signals:
signals = features.with_columns([
# Detect crossover
(pl.col("sma_10") > pl.col("sma_20")).alias("is_bull"),
(pl.col("sma_10") < pl.col("sma_20")).alias("is_bear")
]).with_columns([
# Assign signal type
pl.when(pl.col("is_bull"))
.then(pl.lit(SignalType.RISE.value))
.when(pl.col("is_bear"))
.then(pl.lit(SignalType.FALL.value))
.otherwise(pl.lit(SignalType.NONE.value))
.alias("signal_type")
]).select([
self.pair_col,
self.ts_col,
"signal_type",
pl.lit(1).alias("signal")
])
return Signals(signals)
# Usage
detector = SmaCrossDetector(fast_window=10, slow_window=20)
signals = detector.run(raw_data_view)
Note
Subclasses must implement detect() method. All DataFrames must use timezone-naive timestamps. Duplicate (pair, timestamp) combinations are rejected.
See Also
FeatureSet: Orchestrates feature extraction. Signals: Container for signal output.
keep_only_latest_per_pair
class-attribute
instance-attribute
¶
_keep_only_latest ¶
Keep only latest signal per pair.
Useful for strategies that only trade most recent signal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signals
|
Signals
|
Input signals. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Filtered signals with one per pair. |
Source code in src/signalflow/detector/base.py
_normalize_index ¶
Normalize timestamps to timezone-naive.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: DataFrame with timezone-naive timestamps. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If df is not pl.DataFrame. |
Source code in src/signalflow/detector/base.py
_validate_features ¶
Validate feature DataFrame.
Checks
- Is pl.DataFrame
- Has required columns (pair, timestamp)
- Timestamps are timezone-naive
- No duplicate (pair, timestamp) combinations
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Features to validate. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If not pl.DataFrame. |
ValueError
|
If validation fails. |
Source code in src/signalflow/detector/base.py
_validate_signals ¶
Validate signal output.
Checks
- Is Signals instance with pl.DataFrame value
- Has required columns (pair, timestamp, signal_type)
- signal_type values are valid SignalType enums
- Timestamps are timezone-naive
- No duplicate (pair, timestamp) combinations
- (optional) Has probability column if required
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signals
|
Signals
|
Signals to validate. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If not Signals or value not pl.DataFrame. |
ValueError
|
If validation fails. |
Source code in src/signalflow/detector/base.py
detect
abstractmethod
¶
Generate signals from features.
Core detection logic - must be implemented by subclasses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Preprocessed features. |
required |
context
|
dict[str, Any] | None
|
Additional context. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Detected signals with columns: - pair (str): Trading pair - timestamp (datetime): Signal timestamp (timezone-naive) - signal_type (int): SignalType enum value - signal (int | float): Signal value - probability (float, optional): Signal probability |
Example
def detect(self, features, context=None):
# Simple threshold detector
signals = features.filter(
pl.col("rsi") > 70 # Overbought
).with_columns([
pl.lit(SignalType.FALL.value).alias("signal_type"),
pl.lit(-1).alias("signal"),
pl.lit(0.8).alias("probability")
]).select([
self.pair_col,
self.ts_col,
"signal_type",
"signal",
"probability"
])
return Signals(signals)
Note
Must return Signals with at minimum: pair, timestamp, signal_type. Timestamps must be timezone-naive. No duplicate (pair, timestamp) combinations allowed.
Source code in src/signalflow/detector/base.py
preprocess ¶
Extract features from raw data.
Default implementation delegates to FeatureSet. Override for custom feature extraction logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Features with at minimum pair and timestamp columns. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If feature_set is None and not overridden. |
TypeError
|
If FeatureSet doesn't return pl.DataFrame. |
Example
Source code in src/signalflow/detector/base.py
run ¶
Execute complete detection pipeline.
Pipeline steps
- preprocess: Extract features
- normalize: Ensure timezone-naive timestamps
- validate features: Check schema and duplicates
- detect: Generate signals
- validate signals: Check output quality
- (optional) keep latest: Filter to latest per pair
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context for detection. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Detected signals. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If preprocess doesn't return pl.DataFrame. |
ValueError
|
If features/signals fail validation. |
Example
Note
Can also be called directly: detector(raw_data_view). All validation errors include helpful diagnostic information.
Source code in src/signalflow/detector/base.py
signalflow.detector.sma_cross.SmaCrossSignalDetector
dataclass
¶
SmaCrossSignalDetector(pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType = RawDataType.SPOT, feature_set: FeatureSet | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, fast_period: int = 20, slow_period: int = 50, price_col: str = 'close', fast_col: str | None = None, slow_col: str | None = None)
Bases: SignalDetector
SMA crossover signal detector.
Signal rules (per pair, per timestamp): - RISE : fast crosses above slow (fast_t > slow_t) and (fast_{t-1} <= slow_{t-1}) - FALL : fast crosses below slow (fast_t < slow_t) and (fast_{t-1} >= slow_{t-1}) - NONE : otherwise
Output Signals columns
- pair, timestamp, signal_type, signal
- signal: +1 for RISE, -1 for FALL, 0 for NONE