Detector Module¶
Signal detectors and event detectors for real-time market analysis.
Module Name
The detector functionality is implemented in the signalflow.detector module.
Signal Detection¶
signalflow.detector.base.SignalDetector
dataclass
¶
SignalDetector(signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False)
Bases: KwargsTolerantMixin, ABC
Base class for Polars-first signal detection.
Provides standardized pipeline for detecting trading signals from raw data
- preprocess: Extract features from raw data
- detect: Generate signals from features
- validate: Ensure data quality
Key features
- Polars-native for performance
- Automatic feature extraction via FeaturePipeline
- Built-in validation (schema, duplicates, timezones)
- Optional probability requirement
- Keep latest signal per pair option
Public API
- run(): Complete pipeline (preprocess → detect → validate)
- preprocess(): Feature extraction (delegates to FeaturePipeline)
- detect(): Signal generation (must implement)
Attributes:
| Name | Type | Description |
|---|---|---|
component_type |
ClassVar[SfComponentType]
|
Always DETECTOR for registry. |
pair_col |
str
|
Trading pair column name. Default: "pair". |
ts_col |
str
|
Timestamp column name. Default: "timestamp". |
raw_data_type |
RawDataType
|
Type of raw data to process. Default: SPOT. |
features |
Feature | list[Feature] | FeaturePipeline | None
|
Features to compute. Can be a single Feature, list of Features, or FeaturePipeline. If None, preprocess() returns raw OHLCV data. Default: None. |
require_probability |
bool
|
Require probability column in signals. Default: False. |
keep_only_latest_per_pair |
bool
|
Keep only latest signal per pair. Default: False. |
Example
from signalflow.detector import SignalDetector
from signalflow.core import Signals, SignalType
import polars as pl
class SmaCrossDetector(SignalDetector):
'''Simple SMA crossover detector'''
def __init__(self, fast_window: int = 10, slow_window: int = 20):
super().__init__()
# Auto-generate features
from signalflow.feature import FeaturePipeline, SmaExtractor
# Can be FeaturePipeline, list of features, or single feature
self.features = FeaturePipeline([
SmaExtractor(window=fast_window, column="close"),
SmaExtractor(window=slow_window, column="close")
])
def detect(self, features: pl.DataFrame, context=None) -> Signals:
signals = features.with_columns([
# Detect crossover
(pl.col("sma_10") > pl.col("sma_20")).alias("is_bull"),
(pl.col("sma_10") < pl.col("sma_20")).alias("is_bear")
]).with_columns([
# Assign signal type
pl.when(pl.col("is_bull"))
.then(pl.lit(SignalType.RISE.value))
.when(pl.col("is_bear"))
.then(pl.lit(SignalType.FALL.value))
.otherwise(pl.lit(SignalType.NONE.value))
.alias("signal_type")
]).select([
self.pair_col,
self.ts_col,
"signal_type",
pl.lit(1).alias("signal")
])
return Signals(signals)
# Usage
detector = SmaCrossDetector(fast_window=10, slow_window=20)
signals = detector.run(raw_data_view)
Note
Subclasses must implement detect() method. All DataFrames must use timezone-naive timestamps. Duplicate (pair, timestamp) combinations are rejected.
See Also
FeaturePipeline: Orchestrates feature extraction. Signals: Container for signal output.
features
class-attribute
instance-attribute
¶
keep_only_latest_per_pair
class-attribute
instance-attribute
¶
signal_category
class-attribute
instance-attribute
¶
Signal category this detector produces. Default: PRICE_DIRECTION.
_keep_only_latest ¶
Keep only latest signal per pair.
Useful for strategies that only trade most recent signal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signals
|
Signals
|
Input signals. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Filtered signals with one per pair. |
Source code in src/signalflow/detector/base.py
_normalize_index ¶
Normalize timestamps to timezone-naive.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: DataFrame with timezone-naive timestamps. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If df is not pl.DataFrame. |
Source code in src/signalflow/detector/base.py
_validate_features ¶
Validate feature DataFrame.
Checks
- Is pl.DataFrame
- Has required columns (pair, timestamp)
- Timestamps are timezone-naive
- No duplicate (pair, timestamp) combinations
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Features to validate. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If not pl.DataFrame. |
ValueError
|
If validation fails. |
Source code in src/signalflow/detector/base.py
_validate_signals ¶
Validate signal output.
Checks
- Is Signals instance with pl.DataFrame value
- Has required columns (pair, timestamp, signal_type)
- signal_type values are valid SignalType enums
- Timestamps are timezone-naive
- No duplicate (pair, timestamp) combinations
- (optional) Has probability column if required
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signals
|
Signals
|
Signals to validate. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If not Signals or value not pl.DataFrame. |
ValueError
|
If validation fails. |
Source code in src/signalflow/detector/base.py
detect
abstractmethod
¶
Generate signals from features.
Core detection logic - must be implemented by subclasses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Preprocessed features. |
required |
context
|
dict[str, Any] | None
|
Additional context. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Detected signals with columns: - pair (str): Trading pair - timestamp (datetime): Signal timestamp (timezone-naive) - signal_type (int): SignalType enum value - signal (int | float): Signal value - probability (float, optional): Signal probability |
Example
def detect(self, features, context=None):
# Simple threshold detector
signals = features.filter(
pl.col("rsi") > 70 # Overbought
).with_columns([
pl.lit(SignalType.FALL.value).alias("signal_type"),
pl.lit(-1).alias("signal"),
pl.lit(0.8).alias("probability")
]).select([
self.pair_col,
self.ts_col,
"signal_type",
"signal",
"probability"
])
return Signals(signals)
Note
Must return Signals with at minimum: pair, timestamp, signal_type. Timestamps must be timezone-naive. No duplicate (pair, timestamp) combinations allowed.
Source code in src/signalflow/detector/base.py
preprocess ¶
Extract features from raw data.
Base implementation
- Load raw OHLCV data from raw_data_view
- Apply features_pipe if provided
Subclasses can override to add helper columns for their detection method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Features with at minimum pair and timestamp columns. |
Example
# Base: returns OHLCV (if features is None)
# or OHLCV + computed features (if features is provided)
feats = detector.preprocess(raw_data_view)
# Custom override to add helper columns
class ZScoreDetector(SignalDetector):
target_feature: str = "RSI_14"
rolling_window: int = 100
def preprocess(self, raw_data_view, context=None):
# 1. Base preprocessing (OHLCV + features)
df = super().preprocess(raw_data_view, context)
# 2. Add helper columns for z-score method
df = df.with_columns([
pl.col(self.target_feature)
.rolling_mean(window_size=self.rolling_window)
.over(self.pair_col)
.alias("_target_rol_mean"),
pl.col(self.target_feature)
.rolling_std(window_size=self.rolling_window)
.over(self.pair_col)
.alias("_target_rol_std"),
])
return df
Source code in src/signalflow/detector/base.py
run ¶
Execute complete detection pipeline.
Pipeline steps
- preprocess: Extract features
- normalize: Ensure timezone-naive timestamps
- validate features: Check schema and duplicates
- detect: Generate signals
- validate signals: Check output quality
- (optional) keep latest: Filter to latest per pair
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context for detection. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Detected signals. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If preprocess doesn't return pl.DataFrame. |
ValueError
|
If features/signals fail validation. |
Example
Note
Can also be called directly: detector(raw_data_view). All validation errors include helpful diagnostic information.
Source code in src/signalflow/detector/base.py
signalflow.detector.sma_cross.ExampleSmaCrossDetector
dataclass
¶
ExampleSmaCrossDetector(signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = None, fast_period: int = 20, slow_period: int = 50, price_col: str = 'close')
Bases: SignalDetector
SMA crossover signal detector.
Signals
- "rise": fast crosses above slow
- "fall": fast crosses below slow
allowed_signal_types
class-attribute
instance-attribute
¶
__post_init__ ¶
Source code in src/signalflow/detector/sma_cross.py
detect ¶
Source code in src/signalflow/detector/sma_cross.py
Real-Time Detectors¶
Anomaly Detector¶
signalflow.detector.anomaly_detector.AnomalyDetector
dataclass
¶
AnomalyDetector(signal_category: SignalCategory = SignalCategory.ANOMALY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'extreme_positive_anomaly', 'extreme_negative_anomaly'})(), price_col: str = 'close', vol_window: int = 1440, threshold_return_std: float = 4.0)
Bases: SignalDetector
Detects anomalous price movements in real-time (backward-looking only).
Unlike AnomalyLabeler, this detector uses only past data and is safe
for live trading. It flags the current bar as anomalous when the current
return exceeds a multiple of rolling volatility.
Algorithm
- Compute log returns: log(close[t] / close[t-1])
- Compute rolling std of returns over
vol_windowbars - Current bar return magnitude: |log_return[t]|
- If magnitude > threshold_return_std * rolling_std[t] -> "extreme_positive_anomaly"
- If magnitude > threshold AND return is negative -> "extreme_negative_anomaly"
- Otherwise: row is skipped (no signal emitted)
Attributes:
| Name | Type | Description |
|---|---|---|
price_col |
str
|
Price column name. Default: "close". |
vol_window |
int
|
Rolling window for volatility estimation. Default: 1440. |
threshold_return_std |
float
|
Number of standard deviations for anomaly threshold. Default: 4.0. |
Example
Note
This detector overrides preprocess() to work directly with raw
OHLCV data and does not require a FeaturePipeline.
allowed_signal_types
class-attribute
instance-attribute
¶
allowed_signal_types: set[str] | None = field(default_factory=lambda: {'extreme_positive_anomaly', 'extreme_negative_anomaly'})
detect ¶
Detect anomalous price movements on the current bar.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
OHLCV data with pair and timestamp columns. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Signals |
Signals
|
Detected anomaly signals with columns: pair, timestamp, signal_type, signal, probability. |
Source code in src/signalflow/detector/anomaly_detector.py
preprocess ¶
Extract raw OHLCV data without feature pipeline.
Overrides base preprocess() to bypass FeaturePipeline and return
the raw spot data directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Raw OHLCV data sorted by (pair, timestamp). |
Source code in src/signalflow/detector/anomaly_detector.py
Volatility Detector¶
signalflow.detector.volatility_detector.VolatilityDetector
dataclass
¶
VolatilityDetector(signal_category: SignalCategory = SignalCategory.VOLATILITY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'high_volatility', 'low_volatility'})(), price_col: str = 'close', vol_window: int = 60, lookback_window: int = 1440, upper_quantile: float = 0.67, lower_quantile: float = 0.33)
Bases: SignalDetector
Detects volatility regime shifts in real-time (backward-looking only).
Unlike VolatilityRegimeLabeler, this detector uses only past and
current data and is safe for live trading.
Algorithm
- Compute log returns: log(close[t] / close[t-1])
- Backward realized volatility: std of last
vol_windowreturns - Rolling percentile of realized vol over
lookback_window - If percentile > upper_quantile -> "high_volatility"
- If percentile < lower_quantile -> "low_volatility"
- Otherwise: no signal emitted
Attributes:
| Name | Type | Description |
|---|---|---|
price_col |
str
|
Price column name. Default: "close". |
vol_window |
int
|
Window for realized vol calculation. Default: 60. |
lookback_window |
int
|
Window for percentile computation. Default: 1440. |
upper_quantile |
float
|
Upper percentile threshold. Default: 0.67. |
lower_quantile |
float
|
Lower percentile threshold. Default: 0.33. |
Example
Note
This detector overrides preprocess() to work directly with raw
OHLCV data and does not require a FeaturePipeline.
allowed_signal_types
class-attribute
instance-attribute
¶
allowed_signal_types: set[str] | None = field(default_factory=lambda: {'high_volatility', 'low_volatility'})
detect ¶
Detect volatility regime from backward-looking realized vol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
OHLCV data with pair and timestamp columns. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with high_volatility/low_volatility signal types. |
Source code in src/signalflow/detector/volatility_detector.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | |
preprocess ¶
Extract raw OHLCV data without feature pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Raw OHLCV data sorted by (pair, timestamp). |
Source code in src/signalflow/detector/volatility_detector.py
Structure Detector (Local Extrema)¶
signalflow.detector.structure_detector.StructureDetector
dataclass
¶
StructureDetector(signal_category: SignalCategory = SignalCategory.PRICE_STRUCTURE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'local_max', 'local_min'})(), price_col: str = 'close', lookback: int = 60, confirmation_bars: int = 10, min_swing_pct: float = 0.02)
Bases: SignalDetector
Detects local price structure (tops/bottoms) in real-time.
Unlike StructureLabeler, this detector uses only past data and
requires a confirmation delay -- a local extremum is only confirmed
after confirmation_bars bars have passed showing the reversal.
Algorithm
- For each bar t, look back
lookbackbars - Find the max and min in the lookback window
- A local_max is confirmed when:
- The max occurred at bar (t - confirmation_bars) or earlier
- Price has dropped >= min_swing_pct from the max
- Current price < max price
- A local_min is confirmed when:
- The min occurred at bar (t - confirmation_bars) or earlier
- Price has risen >= min_swing_pct from the min
- Current price > min price
- Only emit the signal once at the confirmation bar
Attributes:
| Name | Type | Description |
|---|---|---|
price_col |
str
|
Price column name. Default: "close". |
lookback |
int
|
Backward window for extrema search. Default: 60. |
confirmation_bars |
int
|
Bars of reversal needed for confirmation. Default: 10. |
min_swing_pct |
float
|
Minimum swing percentage. Default: 0.02. |
Example
Note
This detector overrides preprocess() to work directly with raw
OHLCV data and does not require a FeaturePipeline.
allowed_signal_types
class-attribute
instance-attribute
¶
signal_category
class-attribute
instance-attribute
¶
detect ¶
Detect local tops/bottoms with confirmation delay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
OHLCV data with pair and timestamp columns. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with local_max/local_min signal types. |
Source code in src/signalflow/detector/structure_detector.py
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | |
preprocess ¶
Extract raw OHLCV data without feature pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Raw OHLCV data sorted by (pair, timestamp). |
Source code in src/signalflow/detector/structure_detector.py
Market-Wide Detection¶
Exogenous market-wide signals (regulatory news, rate decisions, black swans) cause correlated price moves that no feature could predict. Market-wide detectors identify these timestamps so that labels near them can be masked (set to null), preventing MI estimate pollution.
All detectors extend SignalDetector with signal_category=MARKET_WIDE.
SignalDetector (signal_category=MARKET_WIDE)
├── AgreementDetector @sf.detector("market/agreement")
├── MarketZScoreDetector @sf.detector("market/zscore")
└── MarketCusumDetector @sf.detector("market/cusum")
Usage¶
from signalflow.detector.market import MarketZScoreDetector, MarketCusumDetector
from signalflow.target.utils import mask_targets_by_signals
# Z-Score: detects sudden shocks (z-score of aggregate cross-pair return)
zscore_det = MarketZScoreDetector(z_threshold=6.0, rolling_window=500)
signals = zscore_det.run(raw_data_view)
# Default signal_type_name: "aggregate_outlier"
# CUSUM: detects sustained regime shifts (cumulative sum of deviations)
cusum_det = MarketCusumDetector(drift=0.005, cusum_threshold=0.05)
signals = cusum_det.run(raw_data_view)
# Default signal_type_name: "structural_break"
# Mask labels near detected signals
df_masked = mask_targets_by_signals(
df=df,
signals=signals,
mask_signal_types={"aggregate_outlier", "structural_break"},
horizon_bars=60,
)
Agreement-Based Detector¶
signalflow.detector.market.agreement_detector.AgreementDetector
dataclass
¶
AgreementDetector(signal_category: SignalCategory = SignalCategory.MARKET_WIDE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'synchronization'})(), agreement_threshold: float = 0.8, min_pairs: int = 5, return_window: int = 1, price_col: str = 'close', signal_type_name: str = 'synchronization')
Bases: SignalDetector
Detects timestamps where cross-pair return agreement is abnormally high.
A market-wide detector that signals when a high fraction of trading pairs move in the same direction simultaneously. This indicates exogenous macro events (interest rate decisions, regulatory news, etc.) that cannot be predicted from individual pair features.
Algorithm
- Compute log-return for each pair at each timestamp.
- At each timestamp, compute the fraction of pairs with same-sign return (majority sign).
- If fraction >=
agreement_threshold, emit signal.
Attributes:
| Name | Type | Description |
|---|---|---|
agreement_threshold |
float
|
Fraction of pairs that must agree for detection. |
min_pairs |
int
|
Minimum number of active pairs at a timestamp. |
return_window |
int
|
Bars for return computation. |
signal_type_name |
str
|
Signal type name for detected signals. |
Example
from signalflow.detector import AgreementDetector
from signalflow.target.utils import mask_targets_by_signals
# Detect market-wide agreement
detector = AgreementDetector(agreement_threshold=0.8)
signals = detector.run(raw_data_view)
# Mask labels overlapping with detected signals
labeled_df = mask_targets_by_signals(
df=labeled_df,
signals=signals,
mask_signal_types={"synchronization"},
horizon_bars=60,
)
Note
Returns one signal per timestamp (not per pair) since market-wide signals affect all pairs simultaneously. The signal has a synthetic "ALL" pair.
allowed_signal_types
class-attribute
instance-attribute
¶
__post_init__ ¶
detect ¶
Detect market-wide agreement timestamps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Multi-pair OHLCV DataFrame with _ret column. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with synchronization signal type for detected timestamps. |
Source code in src/signalflow/detector/market/agreement_detector.py
preprocess ¶
Preprocess raw data: compute log returns.
Returns raw OHLCV with _ret column added.
Source code in src/signalflow/detector/market/agreement_detector.py
Z-Score Detector¶
signalflow.detector.market.zscore_detector.MarketZScoreDetector
dataclass
¶
MarketZScoreDetector(signal_category: SignalCategory = SignalCategory.MARKET_WIDE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'aggregate_outlier'})(), z_threshold: float = 3.0, rolling_window: int = 100, min_pairs: int = 5, return_window: int = 1, price_col: str = 'close', signal_type_name: str = 'aggregate_outlier')
Bases: SignalDetector
Detects market-wide signals via z-score of aggregate cross-pair return.
More robust than agreement-based detection on correlated markets because it adapts to the current volatility regime.
Algorithm
- Compute log-return per pair per timestamp.
- Compute cross-pair mean return at each timestamp.
- Compute rolling mean and std of the aggregate return over
rolling_windowbars. - z_score = (agg_return - rolling_mean) / rolling_std
- Signal if |z_score| >
z_threshold.
Attributes:
| Name | Type | Description |
|---|---|---|
z_threshold |
float
|
Absolute z-score threshold for detection. |
rolling_window |
int
|
Window size for rolling statistics. |
min_pairs |
int
|
Minimum number of active pairs at a timestamp. |
return_window |
int
|
Bars for return computation. |
signal_type_name |
str
|
Signal type name for detected signals. |
Example
from signalflow.detector import MarketZScoreDetector
from signalflow.target.utils import mask_targets_by_signals
# Detect market-wide z-score outliers
detector = MarketZScoreDetector(z_threshold=3.0)
signals = detector.run(raw_data_view)
# Mask labels overlapping with detected signals
labeled_df = mask_targets_by_signals(
df=labeled_df,
signals=signals,
mask_signal_types={"aggregate_outlier"},
horizon_bars=60,
)
Note
Returns one signal per timestamp (not per pair) since market-wide signals affect all pairs simultaneously. The signal has a synthetic "ALL" pair.
allowed_signal_types
class-attribute
instance-attribute
¶
__post_init__ ¶
detect ¶
Detect market-wide signals via z-score.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Multi-pair OHLCV DataFrame with _ret column. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with aggregate_outlier signal type for detected timestamps. |
Source code in src/signalflow/detector/market/zscore_detector.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | |
preprocess ¶
Preprocess raw data: compute log returns.
Returns raw OHLCV with _ret column added.
Source code in src/signalflow/detector/market/zscore_detector.py
CUSUM Detector¶
signalflow.detector.market.cusum_detector.MarketCusumDetector
dataclass
¶
MarketCusumDetector(signal_category: SignalCategory = SignalCategory.MARKET_WIDE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'structural_break'})(), drift: float = 0.005, cusum_threshold: float = 0.05, rolling_window: int = 100, min_pairs: int = 5, return_window: int = 1, price_col: str = 'close', signal_type_name: str = 'structural_break')
Bases: SignalDetector
Detects market-wide signals via CUSUM of cross-pair aggregate return.
Unlike point-in-time z-score detection, CUSUM accumulates evidence over multiple bars, making it better at detecting gradual structural changes.
Algorithm
- Compute cross-pair mean return at each timestamp.
- Compute rolling mean
mu(expected return) overrolling_window. - S_pos = max(0, S_pos + (x - mu - drift))
- S_neg = max(0, S_neg + (-x + mu - drift))
- Signal if S_pos > cusum_threshold or S_neg > cusum_threshold.
- Reset S_pos, S_neg to 0 after signal detection.
Attributes:
| Name | Type | Description |
|---|---|---|
drift |
float
|
Slack parameter (allowance for normal variation). |
cusum_threshold |
float
|
Decision interval for CUSUM alarm. |
rolling_window |
int
|
Window for estimating expected return (mu). |
min_pairs |
int
|
Minimum number of active pairs at a timestamp. |
return_window |
int
|
Bars for return computation. |
signal_type_name |
str
|
Signal type name for detected signals. |
Example
from signalflow.detector import MarketCusumDetector
from signalflow.target.utils import mask_targets_by_signals
# Detect market-wide regime shifts
detector = MarketCusumDetector(cusum_threshold=0.05)
signals = detector.run(raw_data_view)
# Mask labels overlapping with detected signals
labeled_df = mask_targets_by_signals(
df=labeled_df,
signals=signals,
mask_signal_types={"structural_break"},
horizon_bars=60,
)
Note
Returns one signal per timestamp (not per pair) since market-wide signals affect all pairs simultaneously. The signal has a synthetic "ALL" pair.
Reference
Page, E. S. (1954) - "Continuous Inspection Schemes"
allowed_signal_types
class-attribute
instance-attribute
¶
__post_init__ ¶
detect ¶
Detect market-wide signals via CUSUM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Multi-pair OHLCV DataFrame with _ret column. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with structural_break signal type for detected timestamps. |
Source code in src/signalflow/detector/market/cusum_detector.py
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | |
preprocess ¶
Preprocess raw data: compute log returns.
Returns raw OHLCV with _ret column added.
Source code in src/signalflow/detector/market/cusum_detector.py
Funding Rate Detector¶
signalflow.detector.funding_rate.FundingRateDetector
dataclass
¶
FundingRateDetector(signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.PERPETUAL, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, min_positive_hours: int = 24, funding_col: str = 'funding_rate', allowed_signal_types: set[str] | None = (lambda: {'rise'})())
Bases: SignalDetector
Detects long entries when funding rate transitions from positive to negative.
Strategy logic
- Extract non-null funding rate observations per pair
- Track the gap between consecutive non-positive readings
- When funding turns negative AND the previous non-positive reading
was >=
min_positive_hoursago (meaning all interim readings were positive), generate a RISE signal
This pattern suggests overleveraged longs are exiting, potentially creating upward price pressure as shorts cover.
Attributes:
| Name | Type | Description |
|---|---|---|
min_positive_hours |
int
|
Minimum hours of sustained positive funding before a negative transition triggers a signal. Default: 24. |
funding_col |
str
|
Column name for funding rate data. |
allowed_signal_types
class-attribute
instance-attribute
¶
signal_category
class-attribute
instance-attribute
¶
detect ¶
Detect funding rate transition signals.
Algorithm
- Filter to rows where
funding_rateis not null - For each non-positive reading, record its timestamp
- Shift by 1 and forward-fill to get the previous non-positive timestamp at each row
- At each negative reading, compute hours since that previous non-positive reading
- If hours >=
min_positive_hours, all interim readings were positive for long enough → signal
Source code in src/signalflow/detector/funding_rate.py
preprocess ¶
Extract perpetual data with funding rates.
Source code in src/signalflow/detector/funding_rate.py
Generic Detectors¶
Z-Score Anomaly Detector¶
signalflow.detector.zscore_anomaly.ZScoreAnomalyDetector
dataclass
¶
ZScoreAnomalyDetector(signal_category: SignalCategory = SignalCategory.ANOMALY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'positive_anomaly', 'negative_anomaly'})(), target_feature: str = 'close', rolling_window: int = 1440, threshold: float = 4.0, signal_high: str = 'positive_anomaly', signal_low: str = 'negative_anomaly')
Bases: SignalDetector
Z-score based anomaly detector on any feature.
Detects anomalies when a feature value deviates significantly from its rolling mean, measured in standard deviations (z-score).
Algorithm
- Compute rolling mean of target_feature over rolling_window
- Compute rolling std of target_feature over rolling_window
- Calculate z-score: (value - rolling_mean) / rolling_std
- Signal if |z-score| > threshold
Attributes:
| Name | Type | Description |
|---|---|---|
target_feature |
str
|
Column name to analyze for anomalies. |
rolling_window |
int
|
Window size for rolling mean/std calculation. |
threshold |
float
|
Z-score threshold for anomaly detection. |
signal_high |
str
|
Signal type when z-score > threshold. |
signal_low |
str
|
Signal type when z-score < -threshold. |
Example
from signalflow.detector import ZScoreAnomalyDetector
from signalflow.feature import RsiExtractor
# Detect anomalies on RSI
detector = ZScoreAnomalyDetector(
features=[RsiExtractor(period=14)],
target_feature="RSI_14",
threshold=3.0,
)
signals = detector.run(raw_data_view)
# Detect anomalies on log returns
detector = ZScoreAnomalyDetector(
target_feature="close", # Will compute on raw close prices
threshold=4.0,
signal_high="extreme_positive_anomaly",
signal_low="extreme_negative_anomaly",
)
Note
This detector overrides preprocess() to add helper columns (_target_rol_mean, _target_rol_std) for z-score calculation.
allowed_signal_types
class-attribute
instance-attribute
¶
allowed_signal_types: set[str] | None = field(default_factory=lambda: {'positive_anomaly', 'negative_anomaly'})
__post_init__ ¶
detect ¶
Detect anomalies using z-score method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Preprocessed DataFrame with _target_rol_mean, _target_rol_std. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with positive_anomaly/negative_anomaly signal types. |
Source code in src/signalflow/detector/zscore_anomaly.py
preprocess ¶
Run features + compute rolling mean/std for target_feature.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data_view
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with original columns plus _target_rol_mean, _target_rol_std. |
Source code in src/signalflow/detector/zscore_anomaly.py
Percentile Regime Detector¶
signalflow.detector.percentile_regime.PercentileRegimeDetector
dataclass
¶
PercentileRegimeDetector(signal_category: SignalCategory = SignalCategory.VOLATILITY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'high_volatility', 'low_volatility'})(), target_feature: str = '_realized_vol', lookback_window: int = 1440, upper_quantile: float = 0.67, lower_quantile: float = 0.33, signal_high: str = 'high_volatility', signal_low: str = 'low_volatility')
Bases: SignalDetector
Percentile-based regime detector on any feature.
Classifies the current regime by computing the rolling percentile of a target feature within a lookback window.
Algorithm
- For each bar, compute percentile of target_feature within lookback_window
- If percentile > upper_quantile -> signal_high
- If percentile < lower_quantile -> signal_low
- Otherwise -> no signal
Attributes:
| Name | Type | Description |
|---|---|---|
target_feature |
str
|
Column name to analyze for regime. |
lookback_window |
int
|
Window size for percentile calculation. |
upper_quantile |
float
|
Upper threshold (signal if percentile > this). |
lower_quantile |
float
|
Lower threshold (signal if percentile < this). |
signal_high |
str
|
Signal type when percentile > upper_quantile. |
signal_low |
str
|
Signal type when percentile < lower_quantile. |
Example
from signalflow.detector import PercentileRegimeDetector
from signalflow.feature import FeaturePipeline, RealizedVolExtractor
# Volatility regime detection
detector = PercentileRegimeDetector(
features_pipe=FeaturePipeline([RealizedVolExtractor()]),
target_feature="_realized_vol",
upper_quantile=0.67,
lower_quantile=0.33,
signal_high="high_volatility",
signal_low="low_volatility",
)
signals = detector.run(raw_data_view)
Note
Uses numpy for percentile calculation per group since Polars doesn't have native rolling percentile.
allowed_signal_types
class-attribute
instance-attribute
¶
allowed_signal_types: set[str] | None = field(default_factory=lambda: {'high_volatility', 'low_volatility'})
__post_init__ ¶
Source code in src/signalflow/detector/percentile_regime.py
detect ¶
Detect regime using rolling percentile method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
Preprocessed DataFrame with target_feature column. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with high_volatility/low_volatility signal types. |
Source code in src/signalflow/detector/percentile_regime.py
Local Extrema Detector¶
signalflow.detector.local_extrema.LocalExtremaDetector
dataclass
¶
LocalExtremaDetector(signal_category: SignalCategory = SignalCategory.PRICE_STRUCTURE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'local_max', 'local_min'})(), price_col: str = 'close', lookback: int = 60, confirmation_bars: int = 10, min_swing_pct: float = 0.02, signal_top: str = 'local_max', signal_bottom: str = 'local_min')
Bases: SignalDetector
Local price extrema detector (tops/bottoms) with confirmation.
Detects local price structure using backward-looking zigzag with confirmation delay - a local extremum is only confirmed after confirmation_bars bars have passed showing the reversal.
Algorithm
- For each bar t, look back lookback bars
- Find the max and min in the lookback window
- A local_max is confirmed when:
- The max occurred at bar (t - confirmation_bars) or earlier
- Price has dropped >= min_swing_pct from the max
- Current price < max price
- A local_min is confirmed when:
- The min occurred at bar (t - confirmation_bars) or earlier
- Price has risen >= min_swing_pct from the min
- Current price > min price
- Only emit the signal once at the confirmation bar
Attributes:
| Name | Type | Description |
|---|---|---|
price_col |
str
|
Price column to analyze. Default: "close". |
lookback |
int
|
Backward window for extrema search. Default: 60. |
confirmation_bars |
int
|
Bars of reversal needed for confirmation. Default: 10. |
min_swing_pct |
float
|
Minimum swing percentage. Default: 0.02. |
signal_top |
str
|
Signal type for local max. Default: "local_max". |
signal_bottom |
str
|
Signal type for local min. Default: "local_min". |
Example
Note
This detector is backward-looking and safe for live trading.
allowed_signal_types
class-attribute
instance-attribute
¶
signal_category
class-attribute
instance-attribute
¶
__post_init__ ¶
Source code in src/signalflow/detector/local_extrema.py
detect ¶
Detect local tops/bottoms with confirmation delay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
DataFrame
|
OHLCV data with pair and timestamp columns. |
required |
context
|
dict[str, Any] | None
|
Additional context (unused). |
None
|
Returns:
| Type | Description |
|---|---|
Signals
|
Signals with local_max/local_min signal types. |
Source code in src/signalflow/detector/local_extrema.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | |