Skip to content

Detector Module

Signal detectors and event detectors for real-time market analysis.

Module Name

The detector functionality is implemented in the signalflow.detector module.

Signal Detection

signalflow.detector.base.SignalDetector dataclass

SignalDetector(signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False)

Bases: KwargsTolerantMixin, ABC

Base class for Polars-first signal detection.

Provides standardized pipeline for detecting trading signals from raw data
  1. preprocess: Extract features from raw data
  2. detect: Generate signals from features
  3. validate: Ensure data quality
Key features
  • Polars-native for performance
  • Automatic feature extraction via FeaturePipeline
  • Built-in validation (schema, duplicates, timezones)
  • Optional probability requirement
  • Keep latest signal per pair option
Public API
  • run(): Complete pipeline (preprocess → detect → validate)
  • preprocess(): Feature extraction (delegates to FeaturePipeline)
  • detect(): Signal generation (must implement)

Attributes:

Name Type Description
component_type ClassVar[SfComponentType]

Always DETECTOR for registry.

pair_col str

Trading pair column name. Default: "pair".

ts_col str

Timestamp column name. Default: "timestamp".

raw_data_type RawDataType

Type of raw data to process. Default: SPOT.

features Feature | list[Feature] | FeaturePipeline | None

Features to compute. Can be a single Feature, list of Features, or FeaturePipeline. If None, preprocess() returns raw OHLCV data. Default: None.

require_probability bool

Require probability column in signals. Default: False.

keep_only_latest_per_pair bool

Keep only latest signal per pair. Default: False.

Example
from signalflow.detector import SignalDetector
from signalflow.core import Signals, SignalType
import polars as pl

class SmaCrossDetector(SignalDetector):
    '''Simple SMA crossover detector'''

    def __init__(self, fast_window: int = 10, slow_window: int = 20):
        super().__init__()
        # Auto-generate features
        from signalflow.feature import FeaturePipeline, SmaExtractor
        # Can be FeaturePipeline, list of features, or single feature
        self.features = FeaturePipeline([
            SmaExtractor(window=fast_window, column="close"),
            SmaExtractor(window=slow_window, column="close")
        ])

    def detect(self, features: pl.DataFrame, context=None) -> Signals:
        signals = features.with_columns([
            # Detect crossover
            (pl.col("sma_10") > pl.col("sma_20")).alias("is_bull"),
            (pl.col("sma_10") < pl.col("sma_20")).alias("is_bear")
        ]).with_columns([
            # Assign signal type
            pl.when(pl.col("is_bull"))
            .then(pl.lit(SignalType.RISE.value))
            .when(pl.col("is_bear"))
            .then(pl.lit(SignalType.FALL.value))
            .otherwise(pl.lit(SignalType.NONE.value))
            .alias("signal_type")
        ]).select([
            self.pair_col,
            self.ts_col,
            "signal_type",
            pl.lit(1).alias("signal")
        ])

        return Signals(signals)

# Usage
detector = SmaCrossDetector(fast_window=10, slow_window=20)
signals = detector.run(raw_data_view)
Note

Subclasses must implement detect() method. All DataFrames must use timezone-naive timestamps. Duplicate (pair, timestamp) combinations are rejected.

See Also

FeaturePipeline: Orchestrates feature extraction. Signals: Container for signal output.

__call__ class-attribute instance-attribute

__call__ = run

component_type class-attribute

component_type: SfComponentType = DETECTOR

features class-attribute instance-attribute

features: Feature | list[Feature] | FeaturePipeline | None = None

keep_only_latest_per_pair class-attribute instance-attribute

keep_only_latest_per_pair: bool = False

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

raw_data_type class-attribute instance-attribute

raw_data_type: RawDataType | str = SPOT

require_probability class-attribute instance-attribute

require_probability: bool = False

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_DIRECTION

Signal category this detector produces. Default: PRICE_DIRECTION.

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

_keep_only_latest

_keep_only_latest(signals: Signals) -> Signals

Keep only latest signal per pair.

Useful for strategies that only trade most recent signal.

Parameters:

Name Type Description Default
signals Signals

Input signals.

required

Returns:

Name Type Description
Signals Signals

Filtered signals with one per pair.

Source code in src/signalflow/detector/base.py
def _keep_only_latest(self, signals: Signals) -> Signals:
    """Keep only latest signal per pair.

    Useful for strategies that only trade most recent signal.

    Args:
        signals (Signals): Input signals.

    Returns:
        Signals: Filtered signals with one per pair.
    """
    s = signals.value
    out = (
        s.sort([self.pair_col, self.ts_col])
        .group_by(self.pair_col, maintain_order=True)
        .tail(1)
        .sort([self.pair_col, self.ts_col])
    )
    return Signals(out)

_normalize_index

_normalize_index(df: DataFrame) -> pl.DataFrame

Normalize timestamps to timezone-naive.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pl.DataFrame: DataFrame with timezone-naive timestamps.

Raises:

Type Description
TypeError

If df is not pl.DataFrame.

Source code in src/signalflow/detector/base.py
def _normalize_index(self, df: pl.DataFrame) -> pl.DataFrame:
    """Normalize timestamps to timezone-naive.

    Args:
        df (pl.DataFrame): Input DataFrame.

    Returns:
        pl.DataFrame: DataFrame with timezone-naive timestamps.

    Raises:
        TypeError: If df is not pl.DataFrame.
    """
    if not isinstance(df, pl.DataFrame):
        raise TypeError(f"Expected pl.DataFrame, got {type(df)}")

    if self.ts_col in df.columns:
        ts_dtype = df.schema.get(self.ts_col)
        if isinstance(ts_dtype, pl.Datetime) and ts_dtype.time_zone is not None:
            df = df.with_columns(pl.col(self.ts_col).dt.replace_time_zone(None))
    return df

_validate_features

_validate_features(df: DataFrame) -> None

Validate feature DataFrame.

Checks
  • Is pl.DataFrame
  • Has required columns (pair, timestamp)
  • Timestamps are timezone-naive
  • No duplicate (pair, timestamp) combinations

Parameters:

Name Type Description Default
df DataFrame

Features to validate.

required

Raises:

Type Description
TypeError

If not pl.DataFrame.

ValueError

If validation fails.

Source code in src/signalflow/detector/base.py
def _validate_features(self, df: pl.DataFrame) -> None:
    """Validate feature DataFrame.

    Checks:
        - Is pl.DataFrame
        - Has required columns (pair, timestamp)
        - Timestamps are timezone-naive
        - No duplicate (pair, timestamp) combinations

    Args:
        df (pl.DataFrame): Features to validate.

    Raises:
        TypeError: If not pl.DataFrame.
        ValueError: If validation fails.
    """
    if not isinstance(df, pl.DataFrame):
        raise TypeError(f"preprocess must return polars.DataFrame, got {type(df)}")

    missing = [c for c in (self.pair_col, self.ts_col) if c not in df.columns]
    if missing:
        raise ValueError(f"Features missing required columns: {missing}")

    ts_dtype = df.schema.get(self.ts_col)
    if isinstance(ts_dtype, pl.Datetime) and ts_dtype.time_zone is not None:
        raise ValueError(
            f"Features column '{self.ts_col}' must be timezone-naive, got tz={ts_dtype.time_zone}. "
            f"Use .dt.replace_time_zone(None)."
        )

    dup = df.group_by([self.pair_col, self.ts_col]).len().filter(pl.col("len") > 1)
    if dup.height > 0:
        raise ValueError(
            "Features contain duplicate keys (pair,timestamp). "
            f"Examples:\n{dup.select([self.pair_col, self.ts_col]).head(10)}"
        )

_validate_signals

_validate_signals(signals: Signals) -> None

Validate signal output.

Checks
  • Is Signals instance with pl.DataFrame value
  • Has required columns (pair, timestamp, signal_type)
  • signal_type values are valid SignalType enums
  • Timestamps are timezone-naive
  • No duplicate (pair, timestamp) combinations
  • (optional) Has probability column if required

Parameters:

Name Type Description Default
signals Signals

Signals to validate.

required

Raises:

Type Description
TypeError

If not Signals or value not pl.DataFrame.

ValueError

If validation fails.

Source code in src/signalflow/detector/base.py
def _validate_signals(self, signals: Signals) -> None:
    """Validate signal output.

    Checks:
        - Is Signals instance with pl.DataFrame value
        - Has required columns (pair, timestamp, signal_type)
        - signal_type values are valid SignalType enums
        - Timestamps are timezone-naive
        - No duplicate (pair, timestamp) combinations
        - (optional) Has probability column if required

    Args:
        signals (Signals): Signals to validate.

    Raises:
        TypeError: If not Signals or value not pl.DataFrame.
        ValueError: If validation fails.
    """
    if not isinstance(signals, Signals):
        raise TypeError(f"detect must return Signals, got {type(signals)}")

    s = signals.value
    if not isinstance(s, pl.DataFrame):
        raise TypeError(f"Signals.value must be polars.DataFrame, got {type(s)}")

    required = {self.pair_col, self.ts_col, "signal_type"}
    missing = sorted(required - set(s.columns))
    if missing:
        raise ValueError(f"Signals missing required columns: {missing}")

    allowed = getattr(self, "allowed_signal_types", None)
    if allowed:
        non_null = s.filter(pl.col("signal_type").is_not_null())
        bad = non_null.select(pl.col("signal_type")).unique().filter(~pl.col("signal_type").is_in(list(allowed)))
        if bad.height > 0:
            raise ValueError(
                f"Signals contain unknown signal_type values: {bad.get_column('signal_type').to_list()}"
            )

    if self.require_probability and "probability" not in s.columns:
        raise ValueError("Signals must contain 'probability' column (require_probability=True)")

    ts_dtype = s.schema.get(self.ts_col)
    if isinstance(ts_dtype, pl.Datetime) and ts_dtype.time_zone is not None:
        raise ValueError(f"Signals column '{self.ts_col}' must be timezone-naive, got tz={ts_dtype.time_zone}.")

    # optional: hard guarantee no duplicates in signals
    dup = s.group_by([self.pair_col, self.ts_col]).len().filter(pl.col("len") > 1)
    if dup.height > 0:
        raise ValueError(
            "Signals contain duplicate keys (pair,timestamp). "
            f"Examples:\n{dup.select([self.pair_col, self.ts_col]).head(10)}"
        )

detect abstractmethod

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Generate signals from features.

Core detection logic - must be implemented by subclasses.

Parameters:

Name Type Description Default
features DataFrame

Preprocessed features.

required
context dict[str, Any] | None

Additional context.

None

Returns:

Name Type Description
Signals Signals

Detected signals with columns: - pair (str): Trading pair - timestamp (datetime): Signal timestamp (timezone-naive) - signal_type (int): SignalType enum value - signal (int | float): Signal value - probability (float, optional): Signal probability

Example
def detect(self, features, context=None):
    # Simple threshold detector
    signals = features.filter(
        pl.col("rsi") > 70  # Overbought
    ).with_columns([
        pl.lit(SignalType.FALL.value).alias("signal_type"),
        pl.lit(-1).alias("signal"),
        pl.lit(0.8).alias("probability")
    ]).select([
        self.pair_col,
        self.ts_col,
        "signal_type",
        "signal",
        "probability"
    ])

    return Signals(signals)
Note

Must return Signals with at minimum: pair, timestamp, signal_type. Timestamps must be timezone-naive. No duplicate (pair, timestamp) combinations allowed.

Source code in src/signalflow/detector/base.py
@abstractmethod
def detect(self, features: pl.DataFrame, context: dict[str, Any] | None = None) -> Signals:
    """Generate signals from features.

    Core detection logic - must be implemented by subclasses.

    Args:
        features (pl.DataFrame): Preprocessed features.
        context (dict[str, Any] | None): Additional context.

    Returns:
        Signals: Detected signals with columns:
            - pair (str): Trading pair
            - timestamp (datetime): Signal timestamp (timezone-naive)
            - signal_type (int): SignalType enum value
            - signal (int | float): Signal value
            - probability (float, optional): Signal probability

    Example:
        ```python
        def detect(self, features, context=None):
            # Simple threshold detector
            signals = features.filter(
                pl.col("rsi") > 70  # Overbought
            ).with_columns([
                pl.lit(SignalType.FALL.value).alias("signal_type"),
                pl.lit(-1).alias("signal"),
                pl.lit(0.8).alias("probability")
            ]).select([
                self.pair_col,
                self.ts_col,
                "signal_type",
                "signal",
                "probability"
            ])

            return Signals(signals)
        ```

    Note:
        Must return Signals with at minimum: pair, timestamp, signal_type.
        Timestamps must be timezone-naive.
        No duplicate (pair, timestamp) combinations allowed.
    """
    raise NotImplementedError

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Extract features from raw data.

Base implementation
  1. Load raw OHLCV data from raw_data_view
  2. Apply features_pipe if provided

Subclasses can override to add helper columns for their detection method.

Parameters:

Name Type Description Default
raw_data_view RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Features with at minimum pair and timestamp columns.

Example
# Base: returns OHLCV (if features is None)
# or OHLCV + computed features (if features is provided)
feats = detector.preprocess(raw_data_view)

# Custom override to add helper columns
class ZScoreDetector(SignalDetector):
    target_feature: str = "RSI_14"
    rolling_window: int = 100

    def preprocess(self, raw_data_view, context=None):
        # 1. Base preprocessing (OHLCV + features)
        df = super().preprocess(raw_data_view, context)

        # 2. Add helper columns for z-score method
        df = df.with_columns([
            pl.col(self.target_feature)
                .rolling_mean(window_size=self.rolling_window)
                .over(self.pair_col)
                .alias("_target_rol_mean"),
            pl.col(self.target_feature)
                .rolling_std(window_size=self.rolling_window)
                .over(self.pair_col)
                .alias("_target_rol_std"),
        ])
        return df
Source code in src/signalflow/detector/base.py
def preprocess(self, raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Extract features from raw data.

    Base implementation:
        1. Load raw OHLCV data from raw_data_view
        2. Apply features_pipe if provided

    Subclasses can override to add helper columns for their detection method.

    Args:
        raw_data_view (RawDataView): View to raw market data.
        context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Features with at minimum pair and timestamp columns.

    Example:
        ```python
        # Base: returns OHLCV (if features is None)
        # or OHLCV + computed features (if features is provided)
        feats = detector.preprocess(raw_data_view)

        # Custom override to add helper columns
        class ZScoreDetector(SignalDetector):
            target_feature: str = "RSI_14"
            rolling_window: int = 100

            def preprocess(self, raw_data_view, context=None):
                # 1. Base preprocessing (OHLCV + features)
                df = super().preprocess(raw_data_view, context)

                # 2. Add helper columns for z-score method
                df = df.with_columns([
                    pl.col(self.target_feature)
                        .rolling_mean(window_size=self.rolling_window)
                        .over(self.pair_col)
                        .alias("_target_rol_mean"),
                    pl.col(self.target_feature)
                        .rolling_std(window_size=self.rolling_window)
                        .over(self.pair_col)
                        .alias("_target_rol_std"),
                ])
                return df
        ```
    """
    # 1. Load raw OHLCV data
    key = self.raw_data_type.value if hasattr(self.raw_data_type, "value") else str(self.raw_data_type)
    df = raw_data_view.to_polars(key).sort([self.pair_col, self.ts_col])

    # 2. Apply features if provided
    if self.features is not None:
        if isinstance(self.features, FeaturePipeline):
            df = self.features.compute(df, context=context)
        elif isinstance(self.features, list):
            for feat in self.features:
                df = feat.compute(df, context=context)
        elif isinstance(self.features, Feature):
            df = self.features.compute(df, context=context)
        else:
            raise TypeError(
                f"features must be Feature, list[Feature], or FeaturePipeline, got {type(self.features)}"
            )

        if not isinstance(df, pl.DataFrame):
            raise TypeError(f"{self.__class__.__name__}.features.compute must return pl.DataFrame, got {type(df)}")

    return df

run

run(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> Signals

Execute complete detection pipeline.

Pipeline steps
  1. preprocess: Extract features
  2. normalize: Ensure timezone-naive timestamps
  3. validate features: Check schema and duplicates
  4. detect: Generate signals
  5. validate signals: Check output quality
  6. (optional) keep latest: Filter to latest per pair

Parameters:

Name Type Description Default
raw_data_view RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context for detection.

None

Returns:

Name Type Description
Signals Signals

Detected signals.

Raises:

Type Description
TypeError

If preprocess doesn't return pl.DataFrame.

ValueError

If features/signals fail validation.

Example
from signalflow.core import RawData, RawDataView

# Create view
view = RawDataView(raw=raw_data)

# Run detection
signals = detector.run(view)

# With context
signals = detector.run(view, context={"threshold": 0.7})
Note

Can also be called directly: detector(raw_data_view). All validation errors include helpful diagnostic information.

Source code in src/signalflow/detector/base.py
def run(self, raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> Signals:
    """Execute complete detection pipeline.

    Pipeline steps:
        1. preprocess: Extract features
        2. normalize: Ensure timezone-naive timestamps
        3. validate features: Check schema and duplicates
        4. detect: Generate signals
        5. validate signals: Check output quality
        6. (optional) keep latest: Filter to latest per pair

    Args:
        raw_data_view (RawDataView): View to raw market data.
        context (dict[str, Any] | None): Additional context for detection.

    Returns:
        Signals: Detected signals.

    Raises:
        TypeError: If preprocess doesn't return pl.DataFrame.
        ValueError: If features/signals fail validation.

    Example:
        ```python
        from signalflow.core import RawData, RawDataView

        # Create view
        view = RawDataView(raw=raw_data)

        # Run detection
        signals = detector.run(view)

        # With context
        signals = detector.run(view, context={"threshold": 0.7})
        ```

    Note:
        Can also be called directly: detector(raw_data_view).
        All validation errors include helpful diagnostic information.
    """
    feats = self.preprocess(raw_data_view, context=context)
    feats = self._normalize_index(feats)
    self._validate_features(feats)

    signals = self.detect(feats, context=context)
    self._validate_signals(signals)

    if self.keep_only_latest_per_pair:
        signals = self._keep_only_latest(signals)

    return signals

signalflow.detector.sma_cross.ExampleSmaCrossDetector dataclass

ExampleSmaCrossDetector(signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = None, fast_period: int = 20, slow_period: int = 50, price_col: str = 'close')

Bases: SignalDetector

SMA crossover signal detector.

Signals
  • "rise": fast crosses above slow
  • "fall": fast crosses below slow

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = None

fast_period class-attribute instance-attribute

fast_period: int = 20

price_col class-attribute instance-attribute

price_col: str = 'close'

slow_period class-attribute instance-attribute

slow_period: int = 50

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/sma_cross.py
def __post_init__(self) -> None:
    if self.fast_period >= self.slow_period:
        raise ValueError("fast_period must be < slow_period")

    self.fast_col = f"sma_{self.fast_period}"
    self.slow_col = f"sma_{self.slow_period}"
    self.allowed_signal_types = {"rise", "fall"}

    self.features = [
        ExampleSmaFeature(period=self.fast_period, price_col=self.price_col),
        ExampleSmaFeature(period=self.slow_period, price_col=self.price_col),
    ]

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals
Source code in src/signalflow/detector/sma_cross.py
def detect(self, features: pl.DataFrame, context: dict[str, Any] | None = None) -> Signals:
    df = features.sort([self.pair_col, self.ts_col])

    df = df.filter(pl.col(self.fast_col).is_not_null() & pl.col(self.slow_col).is_not_null())

    fast = pl.col(self.fast_col)
    slow = pl.col(self.slow_col)
    fast_prev = fast.shift(1).over(self.pair_col)
    slow_prev = slow.shift(1).over(self.pair_col)

    cross_up = (fast > slow) & (fast_prev <= slow_prev)
    cross_down = (fast < slow) & (fast_prev >= slow_prev)

    out = df.select(
        [
            self.pair_col,
            self.ts_col,
            pl.when(cross_up)
            .then(pl.lit("rise"))
            .when(cross_down)
            .then(pl.lit("fall"))
            .otherwise(pl.lit(None, dtype=pl.Utf8))
            .alias("signal_type"),
            pl.when(cross_up).then(1).when(cross_down).then(-1).otherwise(0).alias("signal"),
        ]
    ).filter(pl.col("signal_type").is_not_null())

    return Signals(out)

Real-Time Detectors

Anomaly Detector

signalflow.detector.anomaly_detector.AnomalyDetector dataclass

AnomalyDetector(signal_category: SignalCategory = SignalCategory.ANOMALY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'extreme_positive_anomaly', 'extreme_negative_anomaly'})(), price_col: str = 'close', vol_window: int = 1440, threshold_return_std: float = 4.0)

Bases: SignalDetector

Detects anomalous price movements in real-time (backward-looking only).

Unlike AnomalyLabeler, this detector uses only past data and is safe for live trading. It flags the current bar as anomalous when the current return exceeds a multiple of rolling volatility.

Algorithm
  1. Compute log returns: log(close[t] / close[t-1])
  2. Compute rolling std of returns over vol_window bars
  3. Current bar return magnitude: |log_return[t]|
  4. If magnitude > threshold_return_std * rolling_std[t] -> "extreme_positive_anomaly"
  5. If magnitude > threshold AND return is negative -> "extreme_negative_anomaly"
  6. Otherwise: row is skipped (no signal emitted)

Attributes:

Name Type Description
price_col str

Price column name. Default: "close".

vol_window int

Rolling window for volatility estimation. Default: 1440.

threshold_return_std float

Number of standard deviations for anomaly threshold. Default: 4.0.

Example
from signalflow.core import RawData, RawDataView
from signalflow.detector.anomaly_detector import AnomalyDetector

detector = AnomalyDetector(
    vol_window=1440,
    threshold_return_std=4.0,
)
signals = detector.run(raw_data_view)
Note

This detector overrides preprocess() to work directly with raw OHLCV data and does not require a FeaturePipeline.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'extreme_positive_anomaly', 'extreme_negative_anomaly'})

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = ANOMALY

threshold_return_std class-attribute instance-attribute

threshold_return_std: float = 4.0

vol_window class-attribute instance-attribute

vol_window: int = 1440

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect anomalous price movements on the current bar.

Parameters:

Name Type Description Default
features DataFrame

OHLCV data with pair and timestamp columns.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Name Type Description
Signals Signals

Detected anomaly signals with columns: pair, timestamp, signal_type, signal, probability.

Source code in src/signalflow/detector/anomaly_detector.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect anomalous price movements on the current bar.

    Args:
        features (pl.DataFrame): OHLCV data with pair and timestamp columns.
        context (dict[str, Any] | None): Additional context (unused).

    Returns:
        Signals: Detected anomaly signals with columns:
            pair, timestamp, signal_type, signal, probability.
    """
    price = pl.col(self.price_col)

    # Step 1: log returns (per pair)
    df = features.with_columns(
        (price / price.shift(1).over(self.pair_col)).log().alias("_log_ret"),
    )

    # Step 2: rolling std of returns (per pair)
    df = df.with_columns(
        pl.col("_log_ret")
        .rolling_std(window_size=self.vol_window, min_samples=max(2, self.vol_window // 4))
        .over(self.pair_col)
        .alias("_rolling_vol"),
    )

    # Step 3: current bar return magnitude
    df = df.with_columns(
        pl.col("_log_ret").abs().alias("_ret_abs"),
    )

    # Step 4-5: classify
    threshold = pl.col("_rolling_vol") * self.threshold_return_std

    is_anomaly = (
        pl.col("_ret_abs").is_not_null()
        & pl.col("_rolling_vol").is_not_null()
        & (pl.col("_rolling_vol") > 0)
        & (pl.col("_ret_abs") > threshold)
    )

    is_flash_crash = is_anomaly & (pl.col("_log_ret") < 0)

    signal_type_expr = (
        pl.when(is_flash_crash)
        .then(pl.lit("extreme_negative_anomaly"))
        .when(is_anomaly)
        .then(pl.lit("extreme_positive_anomaly"))
        .otherwise(pl.lit(None, dtype=pl.Utf8))
        .alias("signal_type")
    )

    # Compute probability as ratio of return magnitude to threshold
    probability_expr = (
        pl.when(is_anomaly)
        .then((pl.col("_ret_abs") / threshold).clip(0.0, 1.0))
        .otherwise(pl.lit(None, dtype=pl.Float64))
        .alias("probability")
    )

    df = df.with_columns([signal_type_expr, probability_expr])

    # Step 6: filter to only anomalous bars (skip rows with no signal)
    signals_df = df.filter(pl.col("signal_type").is_not_null()).select(
        [
            self.pair_col,
            self.ts_col,
            "signal_type",
            pl.lit(1).alias("signal"),
            "probability",
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Extract raw OHLCV data without feature pipeline.

Overrides base preprocess() to bypass FeaturePipeline and return the raw spot data directly.

Parameters:

Name Type Description Default
raw_data_view RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
DataFrame

pl.DataFrame: Raw OHLCV data sorted by (pair, timestamp).

Source code in src/signalflow/detector/anomaly_detector.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Extract raw OHLCV data without feature pipeline.

    Overrides base ``preprocess()`` to bypass FeaturePipeline and return
    the raw spot data directly.

    Args:
        raw_data_view (RawDataView): View to raw market data.
        context (dict[str, Any] | None): Additional context (unused).

    Returns:
        pl.DataFrame: Raw OHLCV data sorted by (pair, timestamp).
    """
    key = self.raw_data_type.value if hasattr(self.raw_data_type, "value") else str(self.raw_data_type)
    return raw_data_view.to_polars(key).sort([self.pair_col, self.ts_col])

Volatility Detector

signalflow.detector.volatility_detector.VolatilityDetector dataclass

VolatilityDetector(signal_category: SignalCategory = SignalCategory.VOLATILITY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'high_volatility', 'low_volatility'})(), price_col: str = 'close', vol_window: int = 60, lookback_window: int = 1440, upper_quantile: float = 0.67, lower_quantile: float = 0.33)

Bases: SignalDetector

Detects volatility regime shifts in real-time (backward-looking only).

Unlike VolatilityRegimeLabeler, this detector uses only past and current data and is safe for live trading.

Algorithm
  1. Compute log returns: log(close[t] / close[t-1])
  2. Backward realized volatility: std of last vol_window returns
  3. Rolling percentile of realized vol over lookback_window
  4. If percentile > upper_quantile -> "high_volatility"
  5. If percentile < lower_quantile -> "low_volatility"
  6. Otherwise: no signal emitted

Attributes:

Name Type Description
price_col str

Price column name. Default: "close".

vol_window int

Window for realized vol calculation. Default: 60.

lookback_window int

Window for percentile computation. Default: 1440.

upper_quantile float

Upper percentile threshold. Default: 0.67.

lower_quantile float

Lower percentile threshold. Default: 0.33.

Example
from signalflow.detector.volatility_detector import VolatilityDetector

detector = VolatilityDetector(
    vol_window=60,
    upper_quantile=0.67,
    lower_quantile=0.33,
)
signals = detector.run(raw_data_view)
Note

This detector overrides preprocess() to work directly with raw OHLCV data and does not require a FeaturePipeline.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'high_volatility', 'low_volatility'})

lookback_window class-attribute instance-attribute

lookback_window: int = 1440

lower_quantile class-attribute instance-attribute

lower_quantile: float = 0.33

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = VOLATILITY

upper_quantile class-attribute instance-attribute

upper_quantile: float = 0.67

vol_window class-attribute instance-attribute

vol_window: int = 60

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect volatility regime from backward-looking realized vol.

Parameters:

Name Type Description Default
features DataFrame

OHLCV data with pair and timestamp columns.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with high_volatility/low_volatility signal types.

Source code in src/signalflow/detector/volatility_detector.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect volatility regime from backward-looking realized vol.

    Args:
        features: OHLCV data with pair and timestamp columns.
        context: Additional context (unused).

    Returns:
        Signals with high_volatility/low_volatility signal types.
    """
    price = pl.col(self.price_col)

    # Step 1: log returns (per pair)
    df = features.with_columns(
        (price / price.shift(1).over(self.pair_col)).log().alias("_log_ret"),
    )

    # Step 2: backward realized volatility (rolling std of returns)
    df = df.with_columns(
        pl.col("_log_ret")
        .rolling_std(window_size=self.vol_window, min_samples=max(2, self.vol_window // 4))
        .over(self.pair_col)
        .alias("_realized_vol"),
    )

    # Step 3-5: compute rolling percentile and classify per group
    # Polars doesn't have rolling_quantile with a rank, so we compute via numpy per group
    results = []
    for _pair_name, group in df.group_by(self.pair_col, maintain_order=True):
        vol_arr = group["_realized_vol"].to_numpy().astype(np.float64)
        n = len(vol_arr)

        signal_types: list[str | None] = [None] * n
        probabilities: list[float | None] = [None] * n

        for t in range(n):
            if np.isnan(vol_arr[t]):
                continue

            lb_start = max(0, t - self.lookback_window + 1)
            window = vol_arr[lb_start : t + 1]
            valid = window[~np.isnan(window)]
            if len(valid) < 2:
                continue

            percentile = float(np.mean(valid <= vol_arr[t]))

            if percentile > self.upper_quantile:
                signal_types[t] = "high_volatility"
                probabilities[t] = percentile
            elif percentile < self.lower_quantile:
                signal_types[t] = "low_volatility"
                probabilities[t] = 1.0 - percentile

        group = group.with_columns(
            [
                pl.Series(name="signal_type", values=signal_types, dtype=pl.Utf8),
                pl.Series(name="probability", values=probabilities, dtype=pl.Float64),
            ]
        )
        results.append(group)

    if not results:
        return Signals(pl.DataFrame())

    combined = pl.concat(results, how="vertical_relaxed")

    # Filter to only bars with signals
    signals_df = combined.filter(pl.col("signal_type").is_not_null()).select(
        [
            self.pair_col,
            self.ts_col,
            "signal_type",
            pl.lit(1).alias("signal"),
            "probability",
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Extract raw OHLCV data without feature pipeline.

Parameters:

Name Type Description Default
raw_data_view RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
DataFrame

Raw OHLCV data sorted by (pair, timestamp).

Source code in src/signalflow/detector/volatility_detector.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Extract raw OHLCV data without feature pipeline.

    Args:
        raw_data_view: View to raw market data.
        context: Additional context (unused).

    Returns:
        Raw OHLCV data sorted by (pair, timestamp).
    """
    key = self.raw_data_type.value if hasattr(self.raw_data_type, "value") else str(self.raw_data_type)
    return raw_data_view.to_polars(key).sort([self.pair_col, self.ts_col])

Structure Detector (Local Extrema)

signalflow.detector.structure_detector.StructureDetector dataclass

StructureDetector(signal_category: SignalCategory = SignalCategory.PRICE_STRUCTURE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'local_max', 'local_min'})(), price_col: str = 'close', lookback: int = 60, confirmation_bars: int = 10, min_swing_pct: float = 0.02)

Bases: SignalDetector

Detects local price structure (tops/bottoms) in real-time.

Unlike StructureLabeler, this detector uses only past data and requires a confirmation delay -- a local extremum is only confirmed after confirmation_bars bars have passed showing the reversal.

Algorithm
  1. For each bar t, look back lookback bars
  2. Find the max and min in the lookback window
  3. A local_max is confirmed when:
  4. The max occurred at bar (t - confirmation_bars) or earlier
  5. Price has dropped >= min_swing_pct from the max
  6. Current price < max price
  7. A local_min is confirmed when:
  8. The min occurred at bar (t - confirmation_bars) or earlier
  9. Price has risen >= min_swing_pct from the min
  10. Current price > min price
  11. Only emit the signal once at the confirmation bar

Attributes:

Name Type Description
price_col str

Price column name. Default: "close".

lookback int

Backward window for extrema search. Default: 60.

confirmation_bars int

Bars of reversal needed for confirmation. Default: 10.

min_swing_pct float

Minimum swing percentage. Default: 0.02.

Example
from signalflow.detector.structure_detector import StructureDetector

detector = StructureDetector(
    lookback=60,
    confirmation_bars=10,
    min_swing_pct=0.02,
)
signals = detector.run(raw_data_view)
Note

This detector overrides preprocess() to work directly with raw OHLCV data and does not require a FeaturePipeline.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'local_max', 'local_min'})

confirmation_bars class-attribute instance-attribute

confirmation_bars: int = 10

lookback class-attribute instance-attribute

lookback: int = 60

min_swing_pct class-attribute instance-attribute

min_swing_pct: float = 0.02

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_STRUCTURE

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect local tops/bottoms with confirmation delay.

Parameters:

Name Type Description Default
features DataFrame

OHLCV data with pair and timestamp columns.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with local_max/local_min signal types.

Source code in src/signalflow/detector/structure_detector.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect local tops/bottoms with confirmation delay.

    Args:
        features: OHLCV data with pair and timestamp columns.
        context: Additional context (unused).

    Returns:
        Signals with local_max/local_min signal types.
    """
    results = []

    for _pair_name, group in features.group_by(self.pair_col, maintain_order=True):
        prices = group[self.price_col].to_numpy().astype(np.float64)
        n = len(prices)

        signal_types: list[str | None] = [None] * n
        probabilities: list[float | None] = [None] * n

        # Track last emitted extremum to avoid duplicates
        last_emitted_type = None
        last_emitted_idx = -self.lookback

        for t in range(self.lookback + self.confirmation_bars, n):
            p_current = prices[t]
            if np.isnan(p_current):
                continue

            # Search window: [t - lookback, t - confirmation_bars]
            search_start = t - self.lookback
            search_end = t - self.confirmation_bars + 1

            if search_end <= search_start:
                continue

            search_window = prices[search_start:search_end]
            valid_mask = ~np.isnan(search_window)
            if not np.any(valid_mask):
                continue

            valid_prices = search_window[valid_mask]

            max_val = np.max(valid_prices)
            min_val = np.min(valid_prices)

            # Check local_max: max in search window, price dropped since
            if max_val > 0 and p_current < max_val:
                swing = (max_val - p_current) / max_val
                if swing >= self.min_swing_pct and (
                    last_emitted_type != "local_max" or (t - last_emitted_idx) > self.lookback
                ):
                    signal_types[t] = "local_max"
                    probabilities[t] = min(1.0, swing / (self.min_swing_pct * 3))
                    last_emitted_type = "local_max"
                    last_emitted_idx = t
                    continue

            # Check local_min: min in search window, price risen since
            if min_val > 0 and p_current > min_val:
                swing = (p_current - min_val) / min_val
                if swing >= self.min_swing_pct and (
                    last_emitted_type != "local_min" or (t - last_emitted_idx) > self.lookback
                ):
                    signal_types[t] = "local_min"
                    probabilities[t] = min(1.0, swing / (self.min_swing_pct * 3))
                    last_emitted_type = "local_min"
                    last_emitted_idx = t

        group = group.with_columns(
            [
                pl.Series(name="signal_type", values=signal_types, dtype=pl.Utf8),
                pl.Series(name="probability", values=probabilities, dtype=pl.Float64),
            ]
        )
        results.append(group)

    if not results:
        return Signals(pl.DataFrame())

    combined = pl.concat(results, how="vertical_relaxed")

    signals_df = combined.filter(pl.col("signal_type").is_not_null()).select(
        [
            self.pair_col,
            self.ts_col,
            "signal_type",
            pl.lit(1).alias("signal"),
            "probability",
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Extract raw OHLCV data without feature pipeline.

Parameters:

Name Type Description Default
raw_data_view RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
DataFrame

Raw OHLCV data sorted by (pair, timestamp).

Source code in src/signalflow/detector/structure_detector.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Extract raw OHLCV data without feature pipeline.

    Args:
        raw_data_view: View to raw market data.
        context: Additional context (unused).

    Returns:
        Raw OHLCV data sorted by (pair, timestamp).
    """
    key = self.raw_data_type.value if hasattr(self.raw_data_type, "value") else str(self.raw_data_type)
    return raw_data_view.to_polars(key).sort([self.pair_col, self.ts_col])

Market-Wide Detection

Exogenous market-wide signals (regulatory news, rate decisions, black swans) cause correlated price moves that no feature could predict. Market-wide detectors identify these timestamps so that labels near them can be masked (set to null), preventing MI estimate pollution.

All detectors extend SignalDetector with signal_category=MARKET_WIDE.

SignalDetector (signal_category=MARKET_WIDE)
    ├── AgreementDetector         @sf.detector("market/agreement")
    ├── MarketZScoreDetector      @sf.detector("market/zscore")
    └── MarketCusumDetector       @sf.detector("market/cusum")

Usage

from signalflow.detector.market import MarketZScoreDetector, MarketCusumDetector
from signalflow.target.utils import mask_targets_by_signals

# Z-Score: detects sudden shocks (z-score of aggregate cross-pair return)
zscore_det = MarketZScoreDetector(z_threshold=6.0, rolling_window=500)
signals = zscore_det.run(raw_data_view)
# Default signal_type_name: "aggregate_outlier"

# CUSUM: detects sustained regime shifts (cumulative sum of deviations)
cusum_det = MarketCusumDetector(drift=0.005, cusum_threshold=0.05)
signals = cusum_det.run(raw_data_view)
# Default signal_type_name: "structural_break"

# Mask labels near detected signals
df_masked = mask_targets_by_signals(
    df=df,
    signals=signals,
    mask_signal_types={"aggregate_outlier", "structural_break"},
    horizon_bars=60,
)

Agreement-Based Detector

signalflow.detector.market.agreement_detector.AgreementDetector dataclass

AgreementDetector(signal_category: SignalCategory = SignalCategory.MARKET_WIDE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'synchronization'})(), agreement_threshold: float = 0.8, min_pairs: int = 5, return_window: int = 1, price_col: str = 'close', signal_type_name: str = 'synchronization')

Bases: SignalDetector

Detects timestamps where cross-pair return agreement is abnormally high.

A market-wide detector that signals when a high fraction of trading pairs move in the same direction simultaneously. This indicates exogenous macro events (interest rate decisions, regulatory news, etc.) that cannot be predicted from individual pair features.

Algorithm
  1. Compute log-return for each pair at each timestamp.
  2. At each timestamp, compute the fraction of pairs with same-sign return (majority sign).
  3. If fraction >= agreement_threshold, emit signal.

Attributes:

Name Type Description
agreement_threshold float

Fraction of pairs that must agree for detection.

min_pairs int

Minimum number of active pairs at a timestamp.

return_window int

Bars for return computation.

signal_type_name str

Signal type name for detected signals.

Example
from signalflow.detector import AgreementDetector
from signalflow.target.utils import mask_targets_by_signals

# Detect market-wide agreement
detector = AgreementDetector(agreement_threshold=0.8)
signals = detector.run(raw_data_view)

# Mask labels overlapping with detected signals
labeled_df = mask_targets_by_signals(
    df=labeled_df,
    signals=signals,
    mask_signal_types={"synchronization"},
    horizon_bars=60,
)
Note

Returns one signal per timestamp (not per pair) since market-wide signals affect all pairs simultaneously. The signal has a synthetic "ALL" pair.

agreement_threshold class-attribute instance-attribute

agreement_threshold: float = 0.8

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'synchronization'})

min_pairs class-attribute instance-attribute

min_pairs: int = 5

price_col class-attribute instance-attribute

price_col: str = 'close'

return_window class-attribute instance-attribute

return_window: int = 1

signal_category class-attribute instance-attribute

signal_category: SignalCategory = MARKET_WIDE

signal_type_name class-attribute instance-attribute

signal_type_name: str = 'synchronization'

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/market/agreement_detector.py
def __post_init__(self) -> None:
    self.allowed_signal_types = {self.signal_type_name}

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect market-wide agreement timestamps.

Parameters:

Name Type Description Default
features DataFrame

Multi-pair OHLCV DataFrame with _ret column.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with synchronization signal type for detected timestamps.

Source code in src/signalflow/detector/market/agreement_detector.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect market-wide agreement timestamps.

    Args:
        features: Multi-pair OHLCV DataFrame with _ret column.
        context: Additional context (unused).

    Returns:
        Signals with synchronization signal type for detected timestamps.
    """
    agreement = (
        features.filter(pl.col("_ret").is_not_null() & pl.col("_ret").is_finite())
        .group_by(self.ts_col)
        .agg(
            pl.col("_ret").count().alias("_n_pairs"),
            (pl.col("_ret") > 0).sum().alias("_n_positive"),
            (pl.col("_ret") < 0).sum().alias("_n_negative"),
        )
        .filter(pl.col("_n_pairs") >= self.min_pairs)
        .with_columns(
            (
                pl.max_horizontal("_n_positive", "_n_negative").cast(pl.Float64)
                / pl.col("_n_pairs").cast(pl.Float64)
            ).alias("_agreement")
        )
        .filter(pl.col("_agreement") >= self.agreement_threshold)
    )

    n_signals = agreement.height
    logger.info(f"AgreementDetector: detected {n_signals} timestamps")

    if n_signals == 0:
        return Signals(
            pl.DataFrame(
                schema={
                    self.pair_col: pl.Utf8,
                    self.ts_col: pl.Datetime,
                    "signal_type": pl.Utf8,
                    "signal": pl.Int64,
                    "probability": pl.Float64,
                }
            )
        )

    # Create signals with synthetic "ALL" pair for market-wide signals
    signals_df = agreement.select(
        [
            pl.lit("ALL").alias(self.pair_col),
            self.ts_col,
            pl.lit(self.signal_type_name).alias("signal_type"),
            pl.lit(1).alias("signal"),
            pl.col("_agreement").alias("probability"),
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Preprocess raw data: compute log returns.

Returns raw OHLCV with _ret column added.

Source code in src/signalflow/detector/market/agreement_detector.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Preprocess raw data: compute log returns.

    Returns raw OHLCV with _ret column added.
    """
    df = super().preprocess(raw_data_view, context)

    df = df.sort([self.pair_col, self.ts_col]).with_columns(
        (pl.col(self.price_col) / pl.col(self.price_col).shift(self.return_window))
        .log()
        .over(self.pair_col)
        .alias("_ret")
    )

    return df

Z-Score Detector

signalflow.detector.market.zscore_detector.MarketZScoreDetector dataclass

MarketZScoreDetector(signal_category: SignalCategory = SignalCategory.MARKET_WIDE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'aggregate_outlier'})(), z_threshold: float = 3.0, rolling_window: int = 100, min_pairs: int = 5, return_window: int = 1, price_col: str = 'close', signal_type_name: str = 'aggregate_outlier')

Bases: SignalDetector

Detects market-wide signals via z-score of aggregate cross-pair return.

More robust than agreement-based detection on correlated markets because it adapts to the current volatility regime.

Algorithm
  1. Compute log-return per pair per timestamp.
  2. Compute cross-pair mean return at each timestamp.
  3. Compute rolling mean and std of the aggregate return over rolling_window bars.
  4. z_score = (agg_return - rolling_mean) / rolling_std
  5. Signal if |z_score| > z_threshold.

Attributes:

Name Type Description
z_threshold float

Absolute z-score threshold for detection.

rolling_window int

Window size for rolling statistics.

min_pairs int

Minimum number of active pairs at a timestamp.

return_window int

Bars for return computation.

signal_type_name str

Signal type name for detected signals.

Example
from signalflow.detector import MarketZScoreDetector
from signalflow.target.utils import mask_targets_by_signals

# Detect market-wide z-score outliers
detector = MarketZScoreDetector(z_threshold=3.0)
signals = detector.run(raw_data_view)

# Mask labels overlapping with detected signals
labeled_df = mask_targets_by_signals(
    df=labeled_df,
    signals=signals,
    mask_signal_types={"aggregate_outlier"},
    horizon_bars=60,
)
Note

Returns one signal per timestamp (not per pair) since market-wide signals affect all pairs simultaneously. The signal has a synthetic "ALL" pair.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'aggregate_outlier'})

min_pairs class-attribute instance-attribute

min_pairs: int = 5

price_col class-attribute instance-attribute

price_col: str = 'close'

return_window class-attribute instance-attribute

return_window: int = 1

rolling_window class-attribute instance-attribute

rolling_window: int = 100

signal_category class-attribute instance-attribute

signal_category: SignalCategory = MARKET_WIDE

signal_type_name class-attribute instance-attribute

signal_type_name: str = 'aggregate_outlier'

z_threshold class-attribute instance-attribute

z_threshold: float = 3.0

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/market/zscore_detector.py
def __post_init__(self) -> None:
    self.allowed_signal_types = {self.signal_type_name}

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect market-wide signals via z-score.

Parameters:

Name Type Description Default
features DataFrame

Multi-pair OHLCV DataFrame with _ret column.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with aggregate_outlier signal type for detected timestamps.

Source code in src/signalflow/detector/market/zscore_detector.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect market-wide signals via z-score.

    Args:
        features: Multi-pair OHLCV DataFrame with _ret column.
        context: Additional context (unused).

    Returns:
        Signals with aggregate_outlier signal type for detected timestamps.
    """
    min_samples = self.rolling_window

    # Cross-pair mean return per timestamp
    agg_return = (
        features.filter(pl.col("_ret").is_not_null() & pl.col("_ret").is_finite())
        .group_by(self.ts_col)
        .agg(
            pl.col("_ret").mean().alias("_agg_return"),
            pl.col("_ret").count().alias("_n_pairs"),
        )
        .filter(pl.col("_n_pairs") >= self.min_pairs)
        .sort(self.ts_col)
    )

    # Rolling mean/std and z-score
    result = (
        agg_return.with_columns(
            [
                pl.col("_agg_return")
                .rolling_mean(window_size=self.rolling_window, min_samples=min_samples)
                .alias("_rolling_mean"),
                pl.col("_agg_return")
                .rolling_std(window_size=self.rolling_window, min_samples=min_samples)
                .alias("_rolling_std"),
            ]
        )
        .with_columns(
            pl.when(pl.col("_rolling_std") > 1e-12)
            .then((pl.col("_agg_return") - pl.col("_rolling_mean")) / pl.col("_rolling_std"))
            .otherwise(pl.lit(0.0))
            .alias("_z_score")
        )
        .filter(pl.col("_z_score").abs() > self.z_threshold)
    )

    n_signals = result.height
    logger.info(f"MarketZScoreDetector: detected {n_signals} timestamps")

    if n_signals == 0:
        return Signals(
            pl.DataFrame(
                schema={
                    self.pair_col: pl.Utf8,
                    self.ts_col: pl.Datetime,
                    "signal_type": pl.Utf8,
                    "signal": pl.Int64,
                    "probability": pl.Float64,
                }
            )
        )

    # Create signals with synthetic "ALL" pair for market-wide signals
    # Probability based on normalized z-score
    signals_df = result.select(
        [
            pl.lit("ALL").alias(self.pair_col),
            self.ts_col,
            pl.lit(self.signal_type_name).alias("signal_type"),
            pl.lit(1).alias("signal"),
            (pl.col("_z_score").abs() / self.z_threshold).clip(0.0, 1.0).alias("probability"),
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Preprocess raw data: compute log returns.

Returns raw OHLCV with _ret column added.

Source code in src/signalflow/detector/market/zscore_detector.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Preprocess raw data: compute log returns.

    Returns raw OHLCV with _ret column added.
    """
    df = super().preprocess(raw_data_view, context)

    df = df.sort([self.pair_col, self.ts_col]).with_columns(
        (pl.col(self.price_col) / pl.col(self.price_col).shift(self.return_window))
        .log()
        .over(self.pair_col)
        .alias("_ret")
    )

    return df

CUSUM Detector

signalflow.detector.market.cusum_detector.MarketCusumDetector dataclass

MarketCusumDetector(signal_category: SignalCategory = SignalCategory.MARKET_WIDE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'structural_break'})(), drift: float = 0.005, cusum_threshold: float = 0.05, rolling_window: int = 100, min_pairs: int = 5, return_window: int = 1, price_col: str = 'close', signal_type_name: str = 'structural_break')

Bases: SignalDetector

Detects market-wide signals via CUSUM of cross-pair aggregate return.

Unlike point-in-time z-score detection, CUSUM accumulates evidence over multiple bars, making it better at detecting gradual structural changes.

Algorithm
  1. Compute cross-pair mean return at each timestamp.
  2. Compute rolling mean mu (expected return) over rolling_window.
  3. S_pos = max(0, S_pos + (x - mu - drift))
  4. S_neg = max(0, S_neg + (-x + mu - drift))
  5. Signal if S_pos > cusum_threshold or S_neg > cusum_threshold.
  6. Reset S_pos, S_neg to 0 after signal detection.

Attributes:

Name Type Description
drift float

Slack parameter (allowance for normal variation).

cusum_threshold float

Decision interval for CUSUM alarm.

rolling_window int

Window for estimating expected return (mu).

min_pairs int

Minimum number of active pairs at a timestamp.

return_window int

Bars for return computation.

signal_type_name str

Signal type name for detected signals.

Example
from signalflow.detector import MarketCusumDetector
from signalflow.target.utils import mask_targets_by_signals

# Detect market-wide regime shifts
detector = MarketCusumDetector(cusum_threshold=0.05)
signals = detector.run(raw_data_view)

# Mask labels overlapping with detected signals
labeled_df = mask_targets_by_signals(
    df=labeled_df,
    signals=signals,
    mask_signal_types={"structural_break"},
    horizon_bars=60,
)
Note

Returns one signal per timestamp (not per pair) since market-wide signals affect all pairs simultaneously. The signal has a synthetic "ALL" pair.

Reference

Page, E. S. (1954) - "Continuous Inspection Schemes"

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'structural_break'})

cusum_threshold class-attribute instance-attribute

cusum_threshold: float = 0.05

drift class-attribute instance-attribute

drift: float = 0.005

min_pairs class-attribute instance-attribute

min_pairs: int = 5

price_col class-attribute instance-attribute

price_col: str = 'close'

return_window class-attribute instance-attribute

return_window: int = 1

rolling_window class-attribute instance-attribute

rolling_window: int = 100

signal_category class-attribute instance-attribute

signal_category: SignalCategory = MARKET_WIDE

signal_type_name class-attribute instance-attribute

signal_type_name: str = 'structural_break'

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/market/cusum_detector.py
def __post_init__(self) -> None:
    self.allowed_signal_types = {self.signal_type_name}

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect market-wide signals via CUSUM.

Parameters:

Name Type Description Default
features DataFrame

Multi-pair OHLCV DataFrame with _ret column.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with structural_break signal type for detected timestamps.

Source code in src/signalflow/detector/market/cusum_detector.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect market-wide signals via CUSUM.

    Args:
        features: Multi-pair OHLCV DataFrame with _ret column.
        context: Additional context (unused).

    Returns:
        Signals with structural_break signal type for detected timestamps.
    """
    min_samples = self.rolling_window

    # Cross-pair mean return per timestamp
    agg_return = (
        features.filter(pl.col("_ret").is_not_null() & pl.col("_ret").is_finite())
        .group_by(self.ts_col)
        .agg(
            pl.col("_ret").mean().alias("_agg_return"),
            pl.col("_ret").count().alias("_n_pairs"),
        )
        .filter(pl.col("_n_pairs") >= self.min_pairs)
        .sort(self.ts_col)
    )

    # Compute rolling mean (expected return mu)
    agg_return = agg_return.with_columns(
        pl.col("_agg_return").rolling_mean(window_size=self.rolling_window, min_samples=min_samples).alias("_mu")
    )

    # CUSUM with reset (sequential — inherently stateful)
    x_arr = agg_return.get_column("_agg_return").to_numpy()
    mu_arr = agg_return.get_column("_mu").to_numpy()

    n = len(x_arr)
    is_signal = np.zeros(n, dtype=bool)
    cusum_values = np.zeros(n, dtype=np.float64)
    s_pos = 0.0
    s_neg = 0.0

    for i in range(n):
        if np.isnan(mu_arr[i]) or np.isnan(x_arr[i]):
            continue

        deviation = x_arr[i] - mu_arr[i]
        s_pos = max(0.0, s_pos + deviation - self.drift)
        s_neg = max(0.0, s_neg - deviation - self.drift)

        cusum_values[i] = max(s_pos, s_neg)

        if s_pos > self.cusum_threshold or s_neg > self.cusum_threshold:
            is_signal[i] = True
            s_pos = 0.0
            s_neg = 0.0

    n_signals = int(is_signal.sum())
    logger.info(f"MarketCusumDetector: detected {n_signals} timestamps")

    if n_signals == 0:
        return Signals(
            pl.DataFrame(
                schema={
                    self.pair_col: pl.Utf8,
                    self.ts_col: pl.Datetime,
                    "signal_type": pl.Utf8,
                    "signal": pl.Int64,
                    "probability": pl.Float64,
                }
            )
        )

    # Filter to signal timestamps only
    signal_df = agg_return.with_columns(
        [
            pl.Series("_is_signal", is_signal),
            pl.Series("_cusum", cusum_values),
        ]
    ).filter(pl.col("_is_signal"))

    # Create signals with synthetic "ALL" pair for market-wide signals
    # Probability based on normalized cusum value
    signals_df = signal_df.select(
        [
            pl.lit("ALL").alias(self.pair_col),
            self.ts_col,
            pl.lit(self.signal_type_name).alias("signal_type"),
            pl.lit(1).alias("signal"),
            (pl.col("_cusum") / self.cusum_threshold).clip(0.0, 1.0).alias("probability"),
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Preprocess raw data: compute log returns.

Returns raw OHLCV with _ret column added.

Source code in src/signalflow/detector/market/cusum_detector.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Preprocess raw data: compute log returns.

    Returns raw OHLCV with _ret column added.
    """
    df = super().preprocess(raw_data_view, context)

    df = df.sort([self.pair_col, self.ts_col]).with_columns(
        (pl.col(self.price_col) / pl.col(self.price_col).shift(self.return_window))
        .log()
        .over(self.pair_col)
        .alias("_ret")
    )

    return df

Funding Rate Detector

signalflow.detector.funding_rate.FundingRateDetector dataclass

FundingRateDetector(signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.PERPETUAL, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, min_positive_hours: int = 24, funding_col: str = 'funding_rate', allowed_signal_types: set[str] | None = (lambda: {'rise'})())

Bases: SignalDetector

Detects long entries when funding rate transitions from positive to negative.

Strategy logic
  1. Extract non-null funding rate observations per pair
  2. Track the gap between consecutive non-positive readings
  3. When funding turns negative AND the previous non-positive reading was >= min_positive_hours ago (meaning all interim readings were positive), generate a RISE signal

This pattern suggests overleveraged longs are exiting, potentially creating upward price pressure as shorts cover.

Attributes:

Name Type Description
min_positive_hours int

Minimum hours of sustained positive funding before a negative transition triggers a signal. Default: 24.

funding_col str

Column name for funding rate data.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'rise'})

funding_col class-attribute instance-attribute

funding_col: str = 'funding_rate'

min_positive_hours class-attribute instance-attribute

min_positive_hours: int = 24

raw_data_type class-attribute instance-attribute

raw_data_type: RawDataType | str = PERPETUAL

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_DIRECTION

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect funding rate transition signals.

Algorithm
  1. Filter to rows where funding_rate is not null
  2. For each non-positive reading, record its timestamp
  3. Shift by 1 and forward-fill to get the previous non-positive timestamp at each row
  4. At each negative reading, compute hours since that previous non-positive reading
  5. If hours >= min_positive_hours, all interim readings were positive for long enough → signal
Source code in src/signalflow/detector/funding_rate.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect funding rate transition signals.

    Algorithm:
        1. Filter to rows where ``funding_rate`` is not null
        2. For each non-positive reading, record its timestamp
        3. Shift by 1 and forward-fill to get the *previous*
           non-positive timestamp at each row
        4. At each negative reading, compute hours since that
           previous non-positive reading
        5. If hours >= ``min_positive_hours``, all interim readings
           were positive for long enough → signal
    """
    pair_col = self.pair_col
    ts_col = self.ts_col
    fr_col = self.funding_col

    # 1. Extract rows with actual funding rate observations
    funding = (
        features.filter(pl.col(fr_col).is_not_null()).select([pair_col, ts_col, fr_col]).sort([pair_col, ts_col])
    )

    if funding.is_empty():
        return Signals(
            pl.DataFrame(
                schema={
                    pair_col: pl.Utf8,
                    ts_col: pl.Datetime("us"),
                    "signal_type": pl.Utf8,
                    "signal": pl.Int32,
                }
            )
        )

    # 2. For non-positive readings, record their timestamp; null otherwise
    funding = funding.with_columns(
        pl.when(pl.col(fr_col) <= 0)
        .then(pl.col(ts_col))
        .otherwise(pl.lit(None, dtype=pl.Datetime("us")))
        .alias("_non_pos_ts"),
    )

    # 3. Shift by 1 to exclude the current row, then forward-fill
    #    within each pair to propagate the last *previous* non-positive ts
    funding = funding.with_columns(
        pl.col("_non_pos_ts").shift(1).forward_fill().over(pair_col).alias("_last_prev_non_pos_ts"),
    )

    # 4. Compute hours since previous non-positive reading
    funding = funding.with_columns(
        ((pl.col(ts_col) - pl.col("_last_prev_non_pos_ts")).dt.total_hours()).alias("_hours_gap"),
    )

    # 5. Signal: current is negative AND gap >= min_positive_hours
    signal_mask = (pl.col(fr_col) < 0) & (pl.col("_hours_gap") >= self.min_positive_hours)

    signals_df = funding.filter(signal_mask).select(
        [
            pair_col,
            ts_col,
            pl.lit("rise").alias("signal_type"),
            pl.lit(1).alias("signal"),
        ]
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Extract perpetual data with funding rates.

Source code in src/signalflow/detector/funding_rate.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Extract perpetual data with funding rates."""
    key = self.raw_data_type.value if hasattr(self.raw_data_type, "value") else str(self.raw_data_type)
    df = raw_data_view.to_polars(key)
    return df.sort([self.pair_col, self.ts_col])

Generic Detectors

Z-Score Anomaly Detector

signalflow.detector.zscore_anomaly.ZScoreAnomalyDetector dataclass

ZScoreAnomalyDetector(signal_category: SignalCategory = SignalCategory.ANOMALY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'positive_anomaly', 'negative_anomaly'})(), target_feature: str = 'close', rolling_window: int = 1440, threshold: float = 4.0, signal_high: str = 'positive_anomaly', signal_low: str = 'negative_anomaly')

Bases: SignalDetector

Z-score based anomaly detector on any feature.

Detects anomalies when a feature value deviates significantly from its rolling mean, measured in standard deviations (z-score).

Algorithm
  1. Compute rolling mean of target_feature over rolling_window
  2. Compute rolling std of target_feature over rolling_window
  3. Calculate z-score: (value - rolling_mean) / rolling_std
  4. Signal if |z-score| > threshold

Attributes:

Name Type Description
target_feature str

Column name to analyze for anomalies.

rolling_window int

Window size for rolling mean/std calculation.

threshold float

Z-score threshold for anomaly detection.

signal_high str

Signal type when z-score > threshold.

signal_low str

Signal type when z-score < -threshold.

Example
from signalflow.detector import ZScoreAnomalyDetector
from signalflow.feature import RsiExtractor

# Detect anomalies on RSI
detector = ZScoreAnomalyDetector(
    features=[RsiExtractor(period=14)],
    target_feature="RSI_14",
    threshold=3.0,
)
signals = detector.run(raw_data_view)

# Detect anomalies on log returns
detector = ZScoreAnomalyDetector(
    target_feature="close",  # Will compute on raw close prices
    threshold=4.0,
    signal_high="extreme_positive_anomaly",
    signal_low="extreme_negative_anomaly",
)
Note

This detector overrides preprocess() to add helper columns (_target_rol_mean, _target_rol_std) for z-score calculation.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'positive_anomaly', 'negative_anomaly'})

rolling_window class-attribute instance-attribute

rolling_window: int = 1440

signal_category class-attribute instance-attribute

signal_category: SignalCategory = ANOMALY

signal_high class-attribute instance-attribute

signal_high: str = 'positive_anomaly'

signal_low class-attribute instance-attribute

signal_low: str = 'negative_anomaly'

target_feature class-attribute instance-attribute

target_feature: str = 'close'

threshold class-attribute instance-attribute

threshold: float = 4.0

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/zscore_anomaly.py
def __post_init__(self) -> None:
    # Update allowed_signal_types based on configured signal names
    self.allowed_signal_types = {self.signal_high, self.signal_low}

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect anomalies using z-score method.

Parameters:

Name Type Description Default
features DataFrame

Preprocessed DataFrame with _target_rol_mean, _target_rol_std.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with positive_anomaly/negative_anomaly signal types.

Source code in src/signalflow/detector/zscore_anomaly.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect anomalies using z-score method.

    Args:
        features: Preprocessed DataFrame with _target_rol_mean, _target_rol_std.
        context: Additional context (unused).

    Returns:
        Signals with positive_anomaly/negative_anomaly signal types.
    """
    target = pl.col(self.target_feature)
    mean = pl.col("_target_rol_mean")
    std = pl.col("_target_rol_std")

    # Z-score calculation
    z_score = (target - mean) / std

    # Classify anomalies
    is_high = (std > 0) & (z_score > self.threshold)
    is_low = (std > 0) & (z_score < -self.threshold)

    signal_type_expr = (
        pl.when(is_high)
        .then(pl.lit(self.signal_high))
        .when(is_low)
        .then(pl.lit(self.signal_low))
        .otherwise(pl.lit(None, dtype=pl.Utf8))
        .alias("signal_type")
    )

    # Probability: how far beyond threshold (clipped to [0, 1])
    probability_expr = (
        pl.when(is_high | is_low)
        .then((z_score.abs() / self.threshold).clip(0.0, 1.0))
        .otherwise(pl.lit(None, dtype=pl.Float64))
        .alias("probability")
    )

    signals_df = (
        features.with_columns([signal_type_expr, probability_expr])
        .filter(pl.col("signal_type").is_not_null())
        .select(
            [
                self.pair_col,
                self.ts_col,
                "signal_type",
                pl.lit(1).alias("signal"),
                "probability",
            ]
        )
    )

    return Signals(signals_df)

preprocess

preprocess(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Run features + compute rolling mean/std for target_feature.

Parameters:

Name Type Description Default
raw_data_view RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
DataFrame

DataFrame with original columns plus _target_rol_mean, _target_rol_std.

Source code in src/signalflow/detector/zscore_anomaly.py
def preprocess(
    self,
    raw_data_view: RawDataView,
    context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Run features + compute rolling mean/std for target_feature.

    Args:
        raw_data_view: View to raw market data.
        context: Additional context (unused).

    Returns:
        DataFrame with original columns plus _target_rol_mean, _target_rol_std.
    """
    # 1. Base preprocessing (OHLCV + features)
    df = super().preprocess(raw_data_view, context)

    # 2. Compute helper columns for z-score
    min_samples = max(2, self.rolling_window // 4)
    df = df.with_columns(
        [
            pl.col(self.target_feature)
            .rolling_mean(window_size=self.rolling_window, min_samples=min_samples)
            .over(self.pair_col)
            .alias("_target_rol_mean"),
            pl.col(self.target_feature)
            .rolling_std(window_size=self.rolling_window, min_samples=min_samples)
            .over(self.pair_col)
            .alias("_target_rol_std"),
        ]
    )

    return df

Percentile Regime Detector

signalflow.detector.percentile_regime.PercentileRegimeDetector dataclass

PercentileRegimeDetector(signal_category: SignalCategory = SignalCategory.VOLATILITY, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'high_volatility', 'low_volatility'})(), target_feature: str = '_realized_vol', lookback_window: int = 1440, upper_quantile: float = 0.67, lower_quantile: float = 0.33, signal_high: str = 'high_volatility', signal_low: str = 'low_volatility')

Bases: SignalDetector

Percentile-based regime detector on any feature.

Classifies the current regime by computing the rolling percentile of a target feature within a lookback window.

Algorithm
  1. For each bar, compute percentile of target_feature within lookback_window
  2. If percentile > upper_quantile -> signal_high
  3. If percentile < lower_quantile -> signal_low
  4. Otherwise -> no signal

Attributes:

Name Type Description
target_feature str

Column name to analyze for regime.

lookback_window int

Window size for percentile calculation.

upper_quantile float

Upper threshold (signal if percentile > this).

lower_quantile float

Lower threshold (signal if percentile < this).

signal_high str

Signal type when percentile > upper_quantile.

signal_low str

Signal type when percentile < lower_quantile.

Example
from signalflow.detector import PercentileRegimeDetector
from signalflow.feature import FeaturePipeline, RealizedVolExtractor

# Volatility regime detection
detector = PercentileRegimeDetector(
    features_pipe=FeaturePipeline([RealizedVolExtractor()]),
    target_feature="_realized_vol",
    upper_quantile=0.67,
    lower_quantile=0.33,
    signal_high="high_volatility",
    signal_low="low_volatility",
)
signals = detector.run(raw_data_view)
Note

Uses numpy for percentile calculation per group since Polars doesn't have native rolling percentile.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'high_volatility', 'low_volatility'})

lookback_window class-attribute instance-attribute

lookback_window: int = 1440

lower_quantile class-attribute instance-attribute

lower_quantile: float = 0.33

signal_category class-attribute instance-attribute

signal_category: SignalCategory = VOLATILITY

signal_high class-attribute instance-attribute

signal_high: str = 'high_volatility'

signal_low class-attribute instance-attribute

signal_low: str = 'low_volatility'

target_feature class-attribute instance-attribute

target_feature: str = '_realized_vol'

upper_quantile class-attribute instance-attribute

upper_quantile: float = 0.67

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/percentile_regime.py
def __post_init__(self) -> None:
    if not 0 < self.lower_quantile < self.upper_quantile < 1:
        raise ValueError(
            f"Quantiles must satisfy 0 < lower < upper < 1, "
            f"got lower={self.lower_quantile}, upper={self.upper_quantile}"
        )
    # Update allowed_signal_types based on configured signal names
    self.allowed_signal_types = {self.signal_high, self.signal_low}

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect regime using rolling percentile method.

Parameters:

Name Type Description Default
features DataFrame

Preprocessed DataFrame with target_feature column.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with high_volatility/low_volatility signal types.

Source code in src/signalflow/detector/percentile_regime.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect regime using rolling percentile method.

    Args:
        features: Preprocessed DataFrame with target_feature column.
        context: Additional context (unused).

    Returns:
        Signals with high_volatility/low_volatility signal types.
    """
    results = []

    for _pair_name, group in features.group_by(self.pair_col, maintain_order=True):
        arr = group[self.target_feature].to_numpy().astype(np.float64)
        n = len(arr)

        signal_types: list[str | None] = [None] * n
        probabilities: list[float | None] = [None] * n

        for t in range(n):
            if np.isnan(arr[t]):
                continue

            # Get lookback window
            lb_start = max(0, t - self.lookback_window + 1)
            window = arr[lb_start : t + 1]
            valid = window[~np.isnan(window)]

            if len(valid) < 2:
                continue

            # Compute percentile (fraction of values <= current)
            percentile = float(np.mean(valid <= arr[t]))

            if percentile > self.upper_quantile:
                signal_types[t] = self.signal_high
                probabilities[t] = percentile
            elif percentile < self.lower_quantile:
                signal_types[t] = self.signal_low
                probabilities[t] = 1.0 - percentile

        group = group.with_columns(
            [
                pl.Series(name="signal_type", values=signal_types, dtype=pl.Utf8),
                pl.Series(name="probability", values=probabilities, dtype=pl.Float64),
            ]
        )
        results.append(group)

    if not results:
        return Signals(pl.DataFrame())

    combined = pl.concat(results, how="vertical_relaxed")

    signals_df = combined.filter(pl.col("signal_type").is_not_null()).select(
        [
            self.pair_col,
            self.ts_col,
            "signal_type",
            pl.lit(1).alias("signal"),
            "probability",
        ]
    )

    return Signals(signals_df)

Local Extrema Detector

signalflow.detector.local_extrema.LocalExtremaDetector dataclass

LocalExtremaDetector(signal_category: SignalCategory = SignalCategory.PRICE_STRUCTURE, pair_col: str = 'pair', ts_col: str = 'timestamp', raw_data_type: RawDataType | str = RawDataType.SPOT, features: Feature | list[Feature] | FeaturePipeline | None = None, require_probability: bool = False, keep_only_latest_per_pair: bool = False, allowed_signal_types: set[str] | None = (lambda: {'local_max', 'local_min'})(), price_col: str = 'close', lookback: int = 60, confirmation_bars: int = 10, min_swing_pct: float = 0.02, signal_top: str = 'local_max', signal_bottom: str = 'local_min')

Bases: SignalDetector

Local price extrema detector (tops/bottoms) with confirmation.

Detects local price structure using backward-looking zigzag with confirmation delay - a local extremum is only confirmed after confirmation_bars bars have passed showing the reversal.

Algorithm
  1. For each bar t, look back lookback bars
  2. Find the max and min in the lookback window
  3. A local_max is confirmed when:
  4. The max occurred at bar (t - confirmation_bars) or earlier
  5. Price has dropped >= min_swing_pct from the max
  6. Current price < max price
  7. A local_min is confirmed when:
  8. The min occurred at bar (t - confirmation_bars) or earlier
  9. Price has risen >= min_swing_pct from the min
  10. Current price > min price
  11. Only emit the signal once at the confirmation bar

Attributes:

Name Type Description
price_col str

Price column to analyze. Default: "close".

lookback int

Backward window for extrema search. Default: 60.

confirmation_bars int

Bars of reversal needed for confirmation. Default: 10.

min_swing_pct float

Minimum swing percentage. Default: 0.02.

signal_top str

Signal type for local max. Default: "local_max".

signal_bottom str

Signal type for local min. Default: "local_min".

Example
from signalflow.detector import LocalExtremaDetector

detector = LocalExtremaDetector(
    lookback=60,
    confirmation_bars=10,
    min_swing_pct=0.02,
)
signals = detector.run(raw_data_view)
Note

This detector is backward-looking and safe for live trading.

allowed_signal_types class-attribute instance-attribute

allowed_signal_types: set[str] | None = field(default_factory=lambda: {'local_max', 'local_min'})

confirmation_bars class-attribute instance-attribute

confirmation_bars: int = 10

lookback class-attribute instance-attribute

lookback: int = 60

min_swing_pct class-attribute instance-attribute

min_swing_pct: float = 0.02

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_bottom class-attribute instance-attribute

signal_bottom: str = 'local_min'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_STRUCTURE

signal_top class-attribute instance-attribute

signal_top: str = 'local_max'

__post_init__

__post_init__() -> None
Source code in src/signalflow/detector/local_extrema.py
def __post_init__(self) -> None:
    if self.confirmation_bars >= self.lookback:
        raise ValueError(f"confirmation_bars ({self.confirmation_bars}) must be < lookback ({self.lookback})")
    # Update allowed_signal_types based on configured signal names
    self.allowed_signal_types = {self.signal_top, self.signal_bottom}

detect

detect(features: DataFrame, context: dict[str, Any] | None = None) -> Signals

Detect local tops/bottoms with confirmation delay.

Parameters:

Name Type Description Default
features DataFrame

OHLCV data with pair and timestamp columns.

required
context dict[str, Any] | None

Additional context (unused).

None

Returns:

Type Description
Signals

Signals with local_max/local_min signal types.

Source code in src/signalflow/detector/local_extrema.py
def detect(
    self,
    features: pl.DataFrame,
    context: dict[str, Any] | None = None,
) -> Signals:
    """Detect local tops/bottoms with confirmation delay.

    Args:
        features: OHLCV data with pair and timestamp columns.
        context: Additional context (unused).

    Returns:
        Signals with local_max/local_min signal types.
    """
    results = []

    for _pair_name, group in features.group_by(self.pair_col, maintain_order=True):
        prices = group[self.price_col].to_numpy().astype(np.float64)
        n = len(prices)

        signal_types: list[str | None] = [None] * n
        probabilities: list[float | None] = [None] * n

        # Track last emitted extremum to avoid duplicates
        last_emitted_type: str | None = None
        last_emitted_idx = -self.lookback

        for t in range(self.lookback + self.confirmation_bars, n):
            p_current = prices[t]
            if np.isnan(p_current):
                continue

            # Search window: [t - lookback, t - confirmation_bars]
            search_start = t - self.lookback
            search_end = t - self.confirmation_bars + 1

            if search_end <= search_start:
                continue

            search_window = prices[search_start:search_end]
            valid_mask = ~np.isnan(search_window)
            if not np.any(valid_mask):
                continue

            valid_prices = search_window[valid_mask]

            max_val = np.max(valid_prices)
            min_val = np.min(valid_prices)

            # Check LOCAL_TOP: max in search window, price dropped since
            if max_val > 0 and p_current < max_val:
                swing = (max_val - p_current) / max_val
                if swing >= self.min_swing_pct and (
                    last_emitted_type != self.signal_top or (t - last_emitted_idx) > self.lookback
                ):
                    signal_types[t] = self.signal_top
                    probabilities[t] = min(1.0, swing / (self.min_swing_pct * 3))
                    last_emitted_type = self.signal_top
                    last_emitted_idx = t
                    continue

            # Check LOCAL_BOTTOM: min in search window, price risen since
            if min_val > 0 and p_current > min_val:
                swing = (p_current - min_val) / min_val
                if swing >= self.min_swing_pct and (
                    last_emitted_type != self.signal_bottom or (t - last_emitted_idx) > self.lookback
                ):
                    signal_types[t] = self.signal_bottom
                    probabilities[t] = min(1.0, swing / (self.min_swing_pct * 3))
                    last_emitted_type = self.signal_bottom
                    last_emitted_idx = t

        group = group.with_columns(
            [
                pl.Series(name="signal_type", values=signal_types, dtype=pl.Utf8),
                pl.Series(name="probability", values=probabilities, dtype=pl.Float64),
            ]
        )
        results.append(group)

    if not results:
        return Signals(pl.DataFrame())

    combined = pl.concat(results, how="vertical_relaxed")

    signals_df = combined.filter(pl.col("signal_type").is_not_null()).select(
        [
            self.pair_col,
            self.ts_col,
            "signal_type",
            pl.lit(1).alias("signal"),
            "probability",
        ]
    )

    return Signals(signals_df)