Skip to content

Target Module

Signal labeling strategies for machine learning training. These classes generate look-ahead labels (direction, return magnitude, volume regime) at various horizons.

Module Name

The target functionality is implemented in the signalflow.target module.

Event Detection

Market-wide event detectors are in the detector.market module. Use mask_targets_by_signals() to exclude labels around detected events.

Base Class

signalflow.target.base.Labeler dataclass

Labeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'))

Bases: ABC

Base class for Polars-only signal labeling.

Assigns forward-looking labels to historical data based on future price movement. Labels are computed per-pair with length-preserving operations.

Key concepts
  • Forward-looking: Labels depend on future data (not available in live trading)
  • Per-pair processing: Each pair labeled independently
  • Length-preserving: Output has same row count as input
  • Signal masking: Optionally label only at signal timestamps
Public API
  • compute(): Main entry point (handles grouping, filtering, projection)
  • compute_group(): Per-pair labeling logic (must implement)
Common labeling strategies
  • Fixed horizon: Label based on return over N bars
  • Triple barrier: Label based on first hit of profit/loss/time barrier
  • Quantile-based: Label based on return quantiles

Attributes:

Name Type Description
component_type ClassVar[SfComponentType]

Always LABELER for registry.

raw_data_type RawDataType

Type of raw data. Default: SPOT.

pair_col str

Trading pair column. Default: "pair".

ts_col str

Timestamp column. Default: "timestamp".

keep_input_columns bool

Keep all input columns. Default: False.

output_columns list[str] | None

Specific columns to output. Default: None.

filter_signal_type SignalType | None

Filter to specific signal type. Default: None.

mask_to_signals bool

Mask labels to signal timestamps only. Default: True.

out_col str

Output label column name. Default: "label".

include_meta bool

Include metadata columns. Default: False.

meta_columns tuple[str, ...]

Metadata column names. Default: ("t_hit", "ret").

Example
from signalflow.target import Labeler
from signalflow.core import SignalType
import polars as pl

class FixedHorizonLabeler(Labeler):
    '''Label based on fixed-horizon return'''

    def __init__(self, horizon: int = 10, threshold: float = 0.01):
        super().__init__()
        self.horizon = horizon
        self.threshold = threshold

    def compute_group(self, group_df, data_context=None):
        # Compute forward return
        labels = group_df.with_columns([
            pl.col("close").shift(-self.horizon).alias("future_close")
        ]).with_columns([
            ((pl.col("future_close") / pl.col("close")) - 1).alias("return")
        ]).with_columns([
            pl.when(pl.col("return") > self.threshold)
            .then(pl.lit(SignalType.RISE.value))
            .when(pl.col("return") < -self.threshold)
            .then(pl.lit(SignalType.FALL.value))
            .otherwise(pl.lit(SignalType.NONE.value))
            .alias("label")
        ])

        return labels

# Usage
labeler = FixedHorizonLabeler(horizon=10, threshold=0.01)
labeled = labeler.compute(ohlcv_df, signals=signals)
Note

compute_group() must preserve row count (no filtering). All timestamps must be timezone-naive. Signal masking requires mask_to_signals=True and signal_keys in context.

See Also

FixedHorizonLabeler: Simple fixed-horizon implementation. TripleBarrierLabeler: Three-barrier labeling strategy.

component_type class-attribute

component_type: SfComponentType = LABELER

filter_signal_type class-attribute instance-attribute

filter_signal_type: SignalType | None = None

include_meta class-attribute instance-attribute

include_meta: bool = False

keep_input_columns class-attribute instance-attribute

keep_input_columns: bool = False

mask_to_signals class-attribute instance-attribute

mask_to_signals: bool = True

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('t_hit', 'ret')

out_col class-attribute instance-attribute

out_col: str = 'label'

output_columns class-attribute instance-attribute

output_columns: list[str] | None = None

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

raw_data_type class-attribute instance-attribute

raw_data_type: RawDataType | str = SPOT

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_DIRECTION

Signal category this labeler produces. Default: PRICE_DIRECTION.

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

_apply_signal_mask

_apply_signal_mask(df: DataFrame, data_context: dict[str, Any], group_df: DataFrame) -> pl.DataFrame

Mask labels to signal timestamps only.

Labels are computed for all rows, but only signal timestamps get actual labels; others are set to SignalType.NONE.

Used for meta-labeling: only label at detected signal points, not every bar.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with computed labels.

required
data_context dict[str, Any]

Must contain "signal_keys" DataFrame.

required
group_df DataFrame

Original group data for extracting pair value.

required

Returns:

Type Description
DataFrame

pl.DataFrame: DataFrame with masked labels.

Example
# In compute_group with masking
def compute_group(self, group_df, data_context=None):
    # Compute labels for all rows
    labeled = group_df.with_columns([...])

    # Mask to signal timestamps only
    if self.mask_to_signals and data_context:
        labeled = self._apply_signal_mask(
            labeled, data_context, group_df
        )

    return labeled
Note

Requires signal_keys in data_context with (pair, timestamp) columns. Non-signal rows get label=SignalType.NONE. Metadata columns also masked if include_meta=True.

Source code in src/signalflow/target/base.py
def _apply_signal_mask(
    self,
    df: pl.DataFrame,
    data_context: dict[str, Any],
    group_df: pl.DataFrame,
) -> pl.DataFrame:
    """Mask labels to signal timestamps only.

    Labels are computed for all rows, but only signal timestamps
    get actual labels; others are set to SignalType.NONE.

    Used for meta-labeling: only label at detected signal points,
    not every bar.

    Args:
        df (pl.DataFrame): DataFrame with computed labels.
        data_context (dict[str, Any]): Must contain "signal_keys" DataFrame.
        group_df (pl.DataFrame): Original group data for extracting pair value.

    Returns:
        pl.DataFrame: DataFrame with masked labels.

    Example:
        ```python
        # In compute_group with masking
        def compute_group(self, group_df, data_context=None):
            # Compute labels for all rows
            labeled = group_df.with_columns([...])

            # Mask to signal timestamps only
            if self.mask_to_signals and data_context:
                labeled = self._apply_signal_mask(
                    labeled, data_context, group_df
                )

            return labeled
        ```

    Note:
        Requires signal_keys in data_context with (pair, timestamp) columns.
        Non-signal rows get label=SignalType.NONE.
        Metadata columns also masked if include_meta=True.
    """
    signal_keys: pl.DataFrame = data_context["signal_keys"]
    pair_value = group_df.get_column(self.pair_col)[0]

    signal_ts = signal_keys.filter(pl.col(self.pair_col) == pair_value).select(self.ts_col).unique()

    if signal_ts.height == 0:
        df = df.with_columns(pl.lit(None, dtype=pl.Utf8).alias(self.out_col))
        if self.include_meta:
            df = df.with_columns([pl.lit(None).alias(col) for col in self.meta_columns])
    else:
        is_signal = pl.col("_is_signal").fill_null(False)
        mask_exprs = [
            pl.when(is_signal)
            .then(pl.col(self.out_col))
            .otherwise(pl.lit(None, dtype=pl.Utf8))
            .alias(self.out_col),
        ]
        if self.include_meta:
            mask_exprs += [
                pl.when(is_signal).then(pl.col(col)).otherwise(pl.lit(None)).alias(col) for col in self.meta_columns
            ]

        df = (
            df.join(
                signal_ts.with_columns(pl.lit(True).alias("_is_signal")),
                on=self.ts_col,
                how="left",
            )
            .with_columns(mask_exprs)
            .drop("_is_signal")
        )

    return df

_compute_pl

_compute_pl(df: DataFrame, signals: Signals | None, data_context: dict[str, Any] | None) -> pl.DataFrame

Internal Polars-based computation.

Orchestrates validation, filtering, grouping, and projection.

Parameters:

Name Type Description Default
df DataFrame

Input data.

required
signals Signals | None

Optional signals.

required
data_context dict[str, Any] | None

Optional context.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Labeled data.

Source code in src/signalflow/target/base.py
def _compute_pl(
    self,
    df: pl.DataFrame,
    signals: Signals | None,
    data_context: dict[str, Any] | None,
) -> pl.DataFrame:
    """Internal Polars-based computation.

    Orchestrates validation, filtering, grouping, and projection.

    Args:
        df (pl.DataFrame): Input data.
        signals (Signals | None): Optional signals.
        data_context (dict[str, Any] | None): Optional context.

    Returns:
        pl.DataFrame: Labeled data.
    """
    self._validate_input_pl(df)
    df0 = df.sort([self.pair_col, self.ts_col])

    if signals is not None and self.filter_signal_type is not None:
        s_pl = self._signals_to_pl(signals)
        df0 = self._filter_by_signals_pl(df0, s_pl, self.filter_signal_type)

    input_cols = set(df0.columns)

    def _wrapped(g: pl.DataFrame) -> pl.DataFrame:
        out = self.compute_group(g, data_context=data_context)
        if not isinstance(out, pl.DataFrame):
            raise TypeError(f"{self.__class__.__name__}.compute_group must return pl.DataFrame")
        if out.height != g.height:
            raise ValueError(
                f"{self.__class__.__name__}: len(output_group)={out.height} != len(input_group)={g.height}"
            )
        return out

    out = df0.group_by(self.pair_col, maintain_order=True).map_groups(_wrapped).sort([self.pair_col, self.ts_col])

    if self.keep_input_columns:
        return out

    label_cols = sorted(set(out.columns) - input_cols) if self.output_columns is None else list(self.output_columns)

    keep_cols = [self.pair_col, self.ts_col, *label_cols]
    missing = [c for c in keep_cols if c not in out.columns]
    if missing:
        raise ValueError(f"Projection error, missing columns: {missing}")

    return out.select(keep_cols)

_filter_by_signals_pl

_filter_by_signals_pl(df: DataFrame, s: DataFrame, signal_type: SignalType) -> pl.DataFrame

Filter input to rows matching signal timestamps.

Inner join with signal timestamps of specific type.

Parameters:

Name Type Description Default
df DataFrame

Input data.

required
s DataFrame

Signals DataFrame.

required
signal_type SignalType

Signal type to filter.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Filtered data (only rows at signal timestamps).

Raises:

Type Description
ValueError

If signals missing required columns.

Source code in src/signalflow/target/base.py
def _filter_by_signals_pl(self, df: pl.DataFrame, s: pl.DataFrame, signal_type: SignalType) -> pl.DataFrame:
    """Filter input to rows matching signal timestamps.

    Inner join with signal timestamps of specific type.

    Args:
        df (pl.DataFrame): Input data.
        s (pl.DataFrame): Signals DataFrame.
        signal_type (SignalType): Signal type to filter.

    Returns:
        pl.DataFrame: Filtered data (only rows at signal timestamps).

    Raises:
        ValueError: If signals missing required columns.
    """
    required = {self.pair_col, self.ts_col, "signal_type"}
    missing = required - set(s.columns)
    if missing:
        raise ValueError(f"Signals missing columns: {sorted(missing)}")

    s_f = (
        s.filter(pl.col("signal_type") == signal_type.value)
        .select([self.pair_col, self.ts_col])
        .unique(subset=[self.pair_col, self.ts_col])
    )
    return df.join(s_f, on=[self.pair_col, self.ts_col], how="inner")

_signals_to_pl

_signals_to_pl(signals: Signals) -> pl.DataFrame

Convert Signals to Polars DataFrame.

Parameters:

Name Type Description Default
signals Signals

Signals container.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Signals as DataFrame.

Raises:

Type Description
TypeError

If Signals.value is not pl.DataFrame.

Source code in src/signalflow/target/base.py
def _signals_to_pl(self, signals: Signals) -> pl.DataFrame:
    """Convert Signals to Polars DataFrame.

    Args:
        signals (Signals): Signals container.

    Returns:
        pl.DataFrame: Signals as DataFrame.

    Raises:
        TypeError: If Signals.value is not pl.DataFrame.
    """
    s = signals.value
    if isinstance(s, pl.DataFrame):
        return s
    raise TypeError(f"Unsupported Signals.value type: {type(s)}")

_validate_input_pl

_validate_input_pl(df: DataFrame) -> None

Validate input DataFrame schema.

Parameters:

Name Type Description Default
df DataFrame

Input to validate.

required

Raises:

Type Description
ValueError

If required columns missing.

Source code in src/signalflow/target/base.py
def _validate_input_pl(self, df: pl.DataFrame) -> None:
    """Validate input DataFrame schema.

    Args:
        df (pl.DataFrame): Input to validate.

    Raises:
        ValueError: If required columns missing.
    """
    missing = [c for c in (self.pair_col, self.ts_col) if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

compute

compute(df: DataFrame, signals: Signals | None = None, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute labels for input DataFrame.

Main entry point - handles validation, filtering, grouping, and projection.

Processing steps
  1. Validate input schema
  2. Sort by (pair, timestamp)
  3. (optional) Filter to specific signal type
  4. Group by pair and apply compute_group()
  5. Validate output (length-preserving)
  6. Project to output columns

Parameters:

Name Type Description Default
df DataFrame

Input data with OHLCV and required columns.

required
signals Signals | None

Signals for filtering/masking.

None
data_context dict[str, Any] | None

Additional context.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Labeled data with columns: - pair, timestamp (always included) - label column(s) (as specified by out_col) - (optional) metadata columns

Raises:

Type Description
TypeError

If df not pl.DataFrame or compute_group returns wrong type.

ValueError

If compute_group changes row count or columns missing.

Example
# Basic labeling
labeled = labeler.compute(ohlcv_df)

# With signal filtering
labeled = labeler.compute(
    ohlcv_df,
    signals=signals,
    filter_signal_type=SignalType.RISE
)

# With masking context
labeled = labeler.compute(
    ohlcv_df,
    signals=signals,
    data_context={"signal_keys": signal_timestamps_df}
)
Source code in src/signalflow/target/base.py
def compute(
    self,
    df: pl.DataFrame,
    signals: Signals | None = None,
    data_context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Compute labels for input DataFrame.

    Main entry point - handles validation, filtering, grouping, and projection.

    Processing steps:
        1. Validate input schema
        2. Sort by (pair, timestamp)
        3. (optional) Filter to specific signal type
        4. Group by pair and apply compute_group()
        5. Validate output (length-preserving)
        6. Project to output columns

    Args:
        df (pl.DataFrame): Input data with OHLCV and required columns.
        signals (Signals | None): Signals for filtering/masking.
        data_context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Labeled data with columns:
            - pair, timestamp (always included)
            - label column(s) (as specified by out_col)
            - (optional) metadata columns

    Raises:
        TypeError: If df not pl.DataFrame or compute_group returns wrong type.
        ValueError: If compute_group changes row count or columns missing.

    Example:
        ```python
        # Basic labeling
        labeled = labeler.compute(ohlcv_df)

        # With signal filtering
        labeled = labeler.compute(
            ohlcv_df,
            signals=signals,
            filter_signal_type=SignalType.RISE
        )

        # With masking context
        labeled = labeler.compute(
            ohlcv_df,
            signals=signals,
            data_context={"signal_keys": signal_timestamps_df}
        )
        ```
    """
    if not isinstance(df, pl.DataFrame):
        raise TypeError(f"{self.__class__.__name__}.compute expects pl.DataFrame, got {type(df)}")
    return self._compute_pl(df=df, signals=signals, data_context=data_context)

compute_group abstractmethod

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame

Compute labels for single pair group.

Core labeling logic - must be implemented by subclasses.

CRITICAL: Must preserve row count (len(output) == len(input)). No filtering allowed inside compute_group.

Parameters:

Name Type Description Default
group_df DataFrame

Single pair's data, sorted by timestamp.

required
data_context dict[str, Any] | None

Additional context.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Same length as input with added label columns.

Example
def compute_group(self, group_df, data_context=None):
    # Compute 10-bar forward return
    return group_df.with_columns([
        pl.col("close").shift(-10).alias("future_close")
    ]).with_columns([
        ((pl.col("future_close") / pl.col("close")) - 1).alias("return"),
        pl.when((pl.col("future_close") / pl.col("close") - 1) > 0.01)
        .then(pl.lit(SignalType.RISE.value))
        .otherwise(pl.lit(SignalType.NONE.value))
        .alias("label")
    ])
Note

Output must have same height as input (length-preserving). Use shift(-n) for forward-looking operations. Last N bars will have null labels (no future data).

Source code in src/signalflow/target/base.py
@abstractmethod
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame:
    """Compute labels for single pair group.

    Core labeling logic - must be implemented by subclasses.

    CRITICAL: Must preserve row count (len(output) == len(input)).
    No filtering allowed inside compute_group.

    Args:
        group_df (pl.DataFrame): Single pair's data, sorted by timestamp.
        data_context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Same length as input with added label columns.

    Example:
        ```python
        def compute_group(self, group_df, data_context=None):
            # Compute 10-bar forward return
            return group_df.with_columns([
                pl.col("close").shift(-10).alias("future_close")
            ]).with_columns([
                ((pl.col("future_close") / pl.col("close")) - 1).alias("return"),
                pl.when((pl.col("future_close") / pl.col("close") - 1) > 0.01)
                .then(pl.lit(SignalType.RISE.value))
                .otherwise(pl.lit(SignalType.NONE.value))
                .alias("label")
            ])
        ```

    Note:
        Output must have same height as input (length-preserving).
        Use shift(-n) for forward-looking operations.
        Last N bars will have null labels (no future data).
    """
    raise NotImplementedError

Labeling Strategies

Fixed Horizon

signalflow.target.fixed_horizon_labeler.FixedHorizonLabeler dataclass

FixedHorizonLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t1', 'ret'), price_col: str = 'close', horizon: int = 60)

Bases: Labeler

Fixed-Horizon Labeling

label[t0] = sign(close[t0 + horizon] - close[t0])

If signals provided, labels are written only on signal rows, while horizon is computed on full series (per pair).

horizon class-attribute instance-attribute

horizon: int = 60

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('t1', 'ret')

price_col class-attribute instance-attribute

price_col: str = 'close'

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/fixed_horizon_labeler.py
def __post_init__(self) -> None:
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame
Source code in src/signalflow/target/fixed_horizon_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    h = int(self.horizon)
    price = pl.col(self.price_col)
    future_price = price.shift(-h)

    df = group_df.with_columns(future_price.alias("_future_price"))

    label_expr = (
        pl.when(
            pl.col("_future_price").is_null()
            | pl.col(self.price_col).is_null()
            | (pl.col(self.price_col) <= 0)
            | (pl.col("_future_price") <= 0)
        )
        .then(pl.lit(SignalType.NONE.value))
        .when(pl.col("_future_price") > pl.col(self.price_col))
        .then(pl.lit(SignalType.RISE.value))
        .when(pl.col("_future_price") < pl.col(self.price_col))
        .then(pl.lit(SignalType.FALL.value))
        .otherwise(pl.lit(SignalType.NONE.value))
    )

    df = df.with_columns(label_expr.alias(self.out_col))

    if self.include_meta:
        df = df.with_columns(
            [
                pl.col(self.ts_col).shift(-h).alias("t1"),
                pl.when(
                    pl.col("_future_price").is_not_null()
                    & (pl.col(self.price_col) > 0)
                    & (pl.col("_future_price") > 0)
                )
                .then((pl.col("_future_price") / pl.col(self.price_col)).log())
                .otherwise(pl.lit(None))
                .alias("ret"),
            ]
        )

    df = df.drop("_future_price")

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Triple Barrier (Dynamic)

signalflow.target.triple_barrier_labeler.TripleBarrierLabeler dataclass

TripleBarrierLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'), price_col: str = 'close', vol_window: int = 60, horizon: int = 1440, profit_multiplier: float = 1.0, stop_loss_multiplier: float = 1.0)

Bases: Labeler

Triple-Barrier Labeling (De Prado), Numba-accelerated.

Volatility-based barriers
  • pt = close * exp(vol * profit_multiplier)
  • sl = close * exp(-vol * stop_loss_multiplier)

horizon class-attribute instance-attribute

horizon: int = 1440

price_col class-attribute instance-attribute

price_col: str = 'close'

profit_multiplier class-attribute instance-attribute

profit_multiplier: float = 1.0

stop_loss_multiplier class-attribute instance-attribute

stop_loss_multiplier: float = 1.0

vol_window class-attribute instance-attribute

vol_window: int = 60

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/triple_barrier_labeler.py
def __post_init__(self) -> None:
    if self.vol_window <= 1:
        raise ValueError("vol_window must be > 1")
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")
    if self.profit_multiplier <= 0 or self.stop_loss_multiplier <= 0:
        raise ValueError("profit_multiplier/stop_loss_multiplier must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

_apply_labels

_apply_labels(df: DataFrame) -> pl.DataFrame

Apply RISE/FALL/NONE labels based on barrier hits.

Source code in src/signalflow/target/triple_barrier_labeler.py
def _apply_labels(self, df: pl.DataFrame) -> pl.DataFrame:
    """Apply RISE/FALL/NONE labels based on barrier hits."""
    choose_up = pl.col("_up_off").is_not_null() & (
        pl.col("_dn_off").is_null() | (pl.col("_up_off") <= pl.col("_dn_off"))
    )
    choose_dn = pl.col("_dn_off").is_not_null() & (
        pl.col("_up_off").is_null() | (pl.col("_dn_off") < pl.col("_up_off"))
    )

    return df.with_columns(
        pl.when(choose_up)
        .then(pl.lit(SignalType.RISE.value))
        .when(choose_dn)
        .then(pl.lit(SignalType.FALL.value))
        .otherwise(pl.lit(SignalType.NONE.value))
        .alias(self.out_col)
    )

_compute_meta

_compute_meta(df: DataFrame, prices: ndarray, up_off_series: Series, dn_off_series: Series, lf: int) -> pl.DataFrame

Compute t_hit and ret meta columns.

Source code in src/signalflow/target/triple_barrier_labeler.py
def _compute_meta(
    self,
    df: pl.DataFrame,
    prices: np.ndarray,
    up_off_series: pl.Series,
    dn_off_series: pl.Series,
    lf: int,
) -> pl.DataFrame:
    """Compute t_hit and ret meta columns."""
    n = df.height
    ts_arr = df.get_column(self.ts_col).to_numpy()

    idx = np.arange(n)
    up_np = up_off_series.fill_null(0).to_numpy()
    dn_np = dn_off_series.fill_null(0).to_numpy()

    hit_off = np.where(
        (up_np > 0) & ((dn_np == 0) | (up_np <= dn_np)),
        up_np,
        np.where(dn_np > 0, dn_np, 0),
    )

    hit_idx = np.clip(idx + hit_off, 0, n - 1)
    vert_idx = np.clip(idx + lf, 0, n - 1)
    final_idx = np.where(hit_off > 0, hit_idx, vert_idx)

    t_hit = ts_arr[final_idx]
    ret = np.where(prices > 0, np.log(prices[final_idx] / prices), np.nan)

    return df.with_columns(
        [
            pl.Series("t_hit", t_hit),
            pl.Series("ret", ret),
        ]
    )

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame
Source code in src/signalflow/target/triple_barrier_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    lf = int(self.horizon)
    vw = int(self.vol_window)

    df = group_df.with_columns(
        (pl.col(self.price_col) / pl.col(self.price_col).shift(1))
        .log()
        .rolling_std(window_size=vw, ddof=1)
        .alias("_vol")
    ).with_columns(
        [
            (pl.col(self.price_col) * (pl.col("_vol") * self.profit_multiplier).exp()).alias("_pt"),
            (pl.col(self.price_col) * (-pl.col("_vol") * self.stop_loss_multiplier).exp()).alias("_sl"),
        ]
    )

    prices = df.get_column(self.price_col).to_numpy().astype(np.float64)
    pt = df.get_column("_pt").fill_null(np.nan).to_numpy().astype(np.float64)
    sl = df.get_column("_sl").fill_null(np.nan).to_numpy().astype(np.float64)

    up_off, dn_off = _find_first_hit(prices, pt, sl, lf)

    up_off_series = pl.Series("_up_off", up_off).replace(0, None).cast(pl.Int32)
    dn_off_series = pl.Series("_dn_off", dn_off).replace(0, None).cast(pl.Int32)

    df = df.with_columns([up_off_series, dn_off_series])

    df = self._apply_labels(df)

    if self.include_meta:
        df = self._compute_meta(df, prices, up_off_series, dn_off_series, lf)

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    drop_cols = ["_vol", "_pt", "_sl", "_up_off", "_dn_off"]
    df = df.drop([c for c in drop_cols if c in df.columns])

    return df

Take Profit (Symmetric Barrier)

signalflow.target.take_profit_labeler.TakeProfitLabeler dataclass

TakeProfitLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.PRICE_DIRECTION, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'), price_col: str = 'close', horizon: int = 1440, barrier_pct: float = 0.01)

Bases: Labeler

First-touch labeling with symmetric fixed-percentage barriers.

Barriers
  • TP = close[t0] * (1 + barrier_pct)
  • SL = close[t0] * (1 - barrier_pct)
  • Vertical barrier at t0 + horizon
Label by first touch
  • RISE if TP touched first (ties -> TP)
  • FALL if SL touched first
  • NONE if neither touched within horizon

barrier_pct class-attribute instance-attribute

barrier_pct: float = 0.01

horizon class-attribute instance-attribute

horizon: int = 1440

price_col class-attribute instance-attribute

price_col: str = 'close'

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/take_profit_labeler.py
def __post_init__(self) -> None:
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")
    if self.barrier_pct <= 0:
        raise ValueError("barrier_pct must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame
Source code in src/signalflow/target/take_profit_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    lf = int(self.horizon)
    n = group_df.height

    prices = group_df.get_column(self.price_col).to_numpy().astype(np.float64)
    pt = prices * (1.0 + self.barrier_pct)
    sl = prices * (1.0 - self.barrier_pct)

    up_off, dn_off = _find_first_hit_static(prices, pt, sl, lf)

    up_off_series = pl.Series("_up_off", up_off).replace(0, None).cast(pl.Int32)
    dn_off_series = pl.Series("_dn_off", dn_off).replace(0, None).cast(pl.Int32)

    df = group_df.with_columns([up_off_series, dn_off_series])

    choose_up = pl.col("_up_off").is_not_null() & (
        pl.col("_dn_off").is_null() | (pl.col("_up_off") <= pl.col("_dn_off"))
    )
    choose_dn = pl.col("_dn_off").is_not_null() & (
        pl.col("_up_off").is_null() | (pl.col("_dn_off") < pl.col("_up_off"))
    )

    df = df.with_columns(
        pl.when(choose_up)
        .then(pl.lit(SignalType.RISE.value))
        .when(choose_dn)
        .then(pl.lit(SignalType.FALL.value))
        .otherwise(pl.lit(SignalType.NONE.value))
        .alias(self.out_col)
    )

    if self.include_meta:
        ts_arr = group_df.get_column(self.ts_col).to_numpy()

        up_np = up_off_series.fill_null(0).to_numpy()
        dn_np = dn_off_series.fill_null(0).to_numpy()
        idx = np.arange(n)

        hit_off = np.where(
            (up_np > 0) & ((dn_np == 0) | (up_np <= dn_np)),
            up_np,
            np.where(dn_np > 0, dn_np, 0),
        )

        hit_idx = np.clip(idx + hit_off, 0, n - 1)
        vert_idx = np.clip(idx + lf, 0, n - 1)
        final_idx = np.where(hit_off > 0, hit_idx, vert_idx)

        t_hit = ts_arr[final_idx]
        ret = np.log(prices[final_idx] / prices)

        df = df.with_columns(
            [
                pl.Series("t_hit", t_hit),
                pl.Series("ret", ret),
            ]
        )

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    df = df.drop(["_up_off", "_dn_off"])

    return df

Non-Price-Direction Labelers

Anomaly Labeler

signalflow.target.anomaly_labeler.AnomalyLabeler dataclass

AnomalyLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.ANOMALY, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('forward_ret', 'vol'), price_col: str = 'close', horizon: int = 60, vol_window: int = 1440, threshold_return_std: float = 4.0, flash_horizon: int = 10)

Bases: Labeler

Labels black swan and flash crash events in historical data.

Forward-looking labeler that identifies anomalous price movements by comparing forward return magnitude against rolling volatility.

Algorithm
  1. Compute log returns: log(close[t] / close[t-1])
  2. Compute rolling std of returns over vol_window bars
  3. Compute forward return magnitude: |log(close[t+horizon] / close[t])|
  4. If forward return > threshold_return_std * rolling_std -> "extreme_positive_anomaly"
  5. If additionally the return is negative AND happened in < flash_horizon bars -> "extreme_negative_anomaly"
  6. Otherwise -> null (no label)

Attributes:

Name Type Description
price_col str

Price column name. Default: "close".

horizon int

Forward-looking horizon in bars. Default: 60.

vol_window int

Rolling window for volatility estimation. Default: 1440.

threshold_return_std float

Number of standard deviations for anomaly threshold. Default: 4.0.

flash_horizon int

Maximum bars for flash crash classification. Default: 10.

Example
from signalflow.target.anomaly_labeler import AnomalyLabeler

labeler = AnomalyLabeler(
    horizon=60,
    vol_window=1440,
    threshold_return_std=4.0,
    mask_to_signals=False,
)
labeled = labeler.compute(ohlcv_df)
Note

This is a forward-looking labeler -- it uses future data and is NOT suitable for live trading. Use AnomalyDetector for real-time anomaly detection.

flash_horizon class-attribute instance-attribute

flash_horizon: int = 10

horizon class-attribute instance-attribute

horizon: int = 60

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('forward_ret', 'vol')

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = ANOMALY

threshold_return_std class-attribute instance-attribute

threshold_return_std: float = 4.0

vol_window class-attribute instance-attribute

vol_window: int = 1440

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/anomaly_labeler.py
def __post_init__(self) -> None:
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")
    if self.vol_window <= 0:
        raise ValueError("vol_window must be > 0")
    if self.threshold_return_std <= 0:
        raise ValueError("threshold_return_std must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute anomaly labels for a single pair group.

Parameters:

Name Type Description Default
group_df DataFrame

Single pair's data sorted by timestamp.

required
data_context dict[str, Any] | None

Additional context.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Same length as input with anomaly label column added. Labels are "extreme_positive_anomaly", "extreme_negative_anomaly", or null.

Source code in src/signalflow/target/anomaly_labeler.py
def compute_group(
    self,
    group_df: pl.DataFrame,
    data_context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Compute anomaly labels for a single pair group.

    Args:
        group_df (pl.DataFrame): Single pair's data sorted by timestamp.
        data_context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Same length as input with anomaly label column added.
            Labels are "extreme_positive_anomaly", "extreme_negative_anomaly", or null.
    """
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    price = pl.col(self.price_col)

    # Step 1: log returns
    df = group_df.with_columns(
        (price / price.shift(1)).log().alias("_log_ret"),
    )

    # Step 2: rolling std of returns
    df = df.with_columns(
        pl.col("_log_ret")
        .rolling_std(window_size=self.vol_window, min_samples=max(2, self.vol_window // 4))
        .alias("_rolling_vol"),
    )

    # Step 3: forward return (signed) and magnitude
    df = df.with_columns(
        (price.shift(-self.horizon) / price).log().alias("_forward_ret"),
    )
    df = df.with_columns(
        pl.col("_forward_ret").abs().alias("_forward_ret_abs"),
    )

    # Step 4-5: compute threshold and classify
    # Scale per-bar volatility to horizon-length volatility: vol * sqrt(horizon)
    horizon_threshold = pl.col("_rolling_vol") * self.threshold_return_std * math.sqrt(self.horizon)

    # For flash crash detection, check if a large negative move happens
    # within flash_horizon bars (shorter window).
    flash_threshold = pl.col("_rolling_vol") * self.threshold_return_std * math.sqrt(self.flash_horizon)
    df = df.with_columns(
        (price.shift(-self.flash_horizon) / price).log().alias("_flash_ret"),
    )

    is_anomaly = (
        pl.col("_forward_ret_abs").is_not_null()
        & pl.col("_rolling_vol").is_not_null()
        & (pl.col("_forward_ret_abs") > horizon_threshold)
    )

    is_flash_crash = (
        is_anomaly
        & pl.col("_flash_ret").is_not_null()
        & (pl.col("_flash_ret") < 0)
        & (pl.col("_flash_ret").abs() > flash_threshold)
    )

    label_expr = (
        pl.when(is_flash_crash)
        .then(pl.lit("extreme_negative_anomaly"))
        .when(is_anomaly)
        .then(pl.lit("extreme_positive_anomaly"))
        .otherwise(pl.lit(None, dtype=pl.Utf8))
        .alias(self.out_col)
    )

    df = df.with_columns(label_expr)

    # Step 6: meta columns
    if self.include_meta:
        df = df.with_columns(
            [
                pl.col("_forward_ret").alias("forward_ret"),
                pl.col("_rolling_vol").alias("vol"),
            ]
        )

    # Clean up temporary columns
    df = df.drop(
        [
            c
            for c in ("_log_ret", "_rolling_vol", "_forward_ret", "_forward_ret_abs", "_flash_ret")
            if c in df.columns
        ]
    )

    # Apply signal masking if configured
    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Volatility Regime Labeler

signalflow.target.volatility_labeler.VolatilityRegimeLabeler dataclass

VolatilityRegimeLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.VOLATILITY, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('realized_vol', 'vol_percentile'), price_col: str = 'close', horizon: int = 60, upper_quantile: float = 0.67, lower_quantile: float = 0.33, lookback_window: int = 1440)

Bases: Labeler

Label bars by forward realized volatility regime.

Algorithm
  1. Compute log returns: ln(close[t] / close[t-1])
  2. Forward realized volatility: std(log_returns[t+1 : t+horizon+1]) computed using reverse-shifted rolling std.
  3. Rolling percentile of realized vol over lookback_window.
  4. If vol > upper_quantile percentile -> "high_volatility"
  5. If vol < lower_quantile percentile -> "low_volatility"
  6. Otherwise -> null (Polars null)
Implementation

Uses pure Polars expressions instead of numpy loops for better performance and memory efficiency.

Attributes:

Name Type Description
price_col str

Price column name. Default: "close".

horizon int

Number of forward bars for realized vol. Default: 60.

upper_quantile float

Upper percentile threshold (0-1). Default: 0.67.

lower_quantile float

Lower percentile threshold (0-1). Default: 0.33.

lookback_window int

Rolling window for percentile calc. Default: 1440.

Example
from signalflow.target.volatility_labeler import VolatilityRegimeLabeler

labeler = VolatilityRegimeLabeler(
    horizon=60,
    upper_quantile=0.67,
    lower_quantile=0.33,
    mask_to_signals=False,
)
result = labeler.compute(ohlcv_df)

horizon class-attribute instance-attribute

horizon: int = 60

lookback_window class-attribute instance-attribute

lookback_window: int = 1440

lower_quantile class-attribute instance-attribute

lower_quantile: float = 0.33

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('realized_vol', 'vol_percentile')

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = VOLATILITY

upper_quantile class-attribute instance-attribute

upper_quantile: float = 0.67

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/volatility_labeler.py
def __post_init__(self) -> None:
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")
    if not (0.0 < self.lower_quantile < self.upper_quantile < 1.0):
        raise ValueError(
            "Require 0 < lower_quantile < upper_quantile < 1, "
            f"got lower_quantile={self.lower_quantile}, upper_quantile={self.upper_quantile}"
        )

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

_compute_percentile_series staticmethod

_compute_percentile_series(s: Series, window: int) -> pl.Series

Compute rolling percentile for a series of structs.

Parameters:

Name Type Description Default
s Series

Series of structs with 'val' and 'idx' fields.

required
window int

Lookback window size.

required

Returns:

Type Description
Series

Series of percentile values.

Source code in src/signalflow/target/volatility_labeler.py
@staticmethod
def _compute_percentile_series(s: pl.Series, window: int) -> pl.Series:
    """Compute rolling percentile for a series of structs.

    Args:
        s: Series of structs with 'val' and 'idx' fields.
        window: Lookback window size.

    Returns:
        Series of percentile values.
    """
    df = s.struct.unnest()
    vals = df["val"].to_numpy()
    n = len(vals)
    result: list[float | None] = [None] * n

    import numpy as np

    for i in range(n):
        if np.isnan(vals[i]) if vals[i] is not None else True:
            continue

        start = max(0, i - window + 1)
        window_vals = vals[start : i + 1]

        # Filter out NaN/None values
        valid = window_vals[~np.isnan(window_vals)]
        if len(valid) < 2:
            continue

        # Percentile = fraction of values <= current
        result[i] = float(np.mean(valid <= vals[i]))

    return pl.Series(result, dtype=pl.Float64)

_rolling_percentile_expr

_rolling_percentile_expr(col_name: str, window: int) -> pl.Expr

Compute rolling percentile using Polars expressions.

For each row, computes the fraction of values in the lookback window that are less than or equal to the current value.

Parameters:

Name Type Description Default
col_name str

Column to compute percentile for.

required
window int

Lookback window size.

required

Returns:

Type Description
Expr

Polars expression computing rolling percentile.

Source code in src/signalflow/target/volatility_labeler.py
def _rolling_percentile_expr(self, col_name: str, window: int) -> pl.Expr:
    """Compute rolling percentile using Polars expressions.

    For each row, computes the fraction of values in the lookback window
    that are less than or equal to the current value.

    Args:
        col_name: Column to compute percentile for.
        window: Lookback window size.

    Returns:
        Polars expression computing rolling percentile.
    """
    col = pl.col(col_name)

    # Create a struct with current value and row index
    # Then use rolling_map to compute percentile within each window
    return pl.struct([col.alias("val"), pl.int_range(pl.len()).alias("idx")]).map_batches(
        lambda s: self._compute_percentile_series(s, window),
        return_dtype=pl.Float64,
    )

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute volatility regime labels for a single pair.

Parameters:

Name Type Description Default
group_df DataFrame

Single pair data sorted by timestamp.

required
data_context dict[str, Any] | None

Optional additional context.

None

Returns:

Type Description
DataFrame

DataFrame with same row count, plus label and optional meta columns.

Source code in src/signalflow/target/volatility_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute volatility regime labels for a single pair.

    Args:
        group_df: Single pair data sorted by timestamp.
        data_context: Optional additional context.

    Returns:
        DataFrame with same row count, plus label and optional meta columns.
    """
    if group_df.height == 0:
        return group_df

    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    # Step 1: Log returns
    df = group_df.with_columns((pl.col(self.price_col) / pl.col(self.price_col).shift(1)).log().alias("_log_ret"))

    # Step 2: Forward realized volatility
    # To compute std of log_returns[t+1 : t+horizon+1], we:
    # - Shift log_ret by -1 to start from next bar
    # - Apply rolling_std with window=horizon
    # - The result at position t+horizon-1 contains std of [t, t+horizon)
    # - Shift back by -(horizon-1) to align with position t
    df = df.with_columns(
        pl.col("_log_ret")
        .shift(-1)
        .rolling_std(window_size=self.horizon, min_samples=2)
        .shift(-(self.horizon - 1))
        .alias("_realized_vol")
    )

    # Step 3: Rolling percentile using rank-based approach
    # For each bar, compute what fraction of values in the lookback window
    # are <= current value. This is equivalent to percentile.
    #
    # Using rolling_map with a custom expression to compute percentile:
    # percentile = count(x <= current) / count(valid)
    df = df.with_columns(
        self._rolling_percentile_expr("_realized_vol", self.lookback_window).alias("_vol_percentile")
    )

    # Step 4-5: Assign labels based on percentile thresholds
    label_expr = (
        pl.when(pl.col("_vol_percentile").is_null())
        .then(pl.lit(None, dtype=pl.Utf8))
        .when(pl.col("_vol_percentile") > self.upper_quantile)
        .then(pl.lit("high_volatility"))
        .when(pl.col("_vol_percentile") < self.lower_quantile)
        .then(pl.lit("low_volatility"))
        .otherwise(pl.lit(None, dtype=pl.Utf8))
        .alias(self.out_col)
    )

    df = df.with_columns(label_expr)

    if self.include_meta:
        df = df.with_columns(
            [
                pl.col("_realized_vol").alias("realized_vol"),
                pl.col("_vol_percentile").alias("vol_percentile"),
            ]
        )

    # Clean up temporary columns
    df = df.drop(["_log_ret", "_realized_vol", "_vol_percentile"])

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Trend Scanning Labeler

signalflow.target.trend_scanning.TrendScanningLabeler dataclass

TrendScanningLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.TREND_MOMENTUM, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_stat', 'best_window'), price_col: str = 'close', min_lookforward: int = 5, max_lookforward: int = 60, step: int = 5, critical_value: float = 1.96)

Bases: Labeler

Label bars using De Prado's trend scanning method.

For each bar, fits OLS regressions over multiple forward windows and selects the window with the strongest t-statistic. The sign and magnitude of the t-statistic determine the label.

Reference

De Prado, M. L. (2020). Machine Learning for Asset Managers, Ch. 5.

Algorithm
  1. For each bar t, for each window L in range(min_lookforward, max_lookforward+1, step):

  2. Fit OLS: Price[t+i] = alpha + beta * i, for i=0..L-1

  3. Compute t-statistic: t = beta / SE(beta)

  4. Select L* = argmax_L |t_stat(t, L)|

  5. Label:

  6. "rise" if t_stat > critical_value

  7. "fall" if t_stat < -critical_value
  8. null otherwise

Attributes:

Name Type Description
price_col str

Price column. Default: "close".

min_lookforward int

Minimum forward window. Default: 5.

max_lookforward int

Maximum forward window. Default: 60.

step int

Step between window sizes. Default: 5.

critical_value float

T-stat threshold for significance. Default: 1.96.

Example
from signalflow.target.trend_scanning import TrendScanningLabeler

labeler = TrendScanningLabeler(
    min_lookforward=5,
    max_lookforward=60,
    critical_value=1.96,
    mask_to_signals=False,
)
result = labeler.compute(ohlcv_df)

critical_value class-attribute instance-attribute

critical_value: float = 1.96

max_lookforward class-attribute instance-attribute

max_lookforward: int = 60

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('t_stat', 'best_window')

min_lookforward class-attribute instance-attribute

min_lookforward: int = 5

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = TREND_MOMENTUM

step class-attribute instance-attribute

step: int = 5

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/trend_scanning.py
def __post_init__(self) -> None:
    if self.min_lookforward < 3:
        raise ValueError("min_lookforward must be >= 3 (need at least 3 points for OLS)")
    if self.max_lookforward < self.min_lookforward:
        raise ValueError("max_lookforward must be >= min_lookforward")
    if self.step < 1:
        raise ValueError("step must be >= 1")
    if self.critical_value <= 0:
        raise ValueError("critical_value must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute trend scanning labels for a single pair.

Parameters:

Name Type Description Default
group_df DataFrame

Single pair data sorted by timestamp.

required
data_context dict[str, Any] | None

Optional additional context.

None

Returns:

Type Description
DataFrame

DataFrame with same row count, plus label and optional meta columns.

Source code in src/signalflow/target/trend_scanning.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute trend scanning labels for a single pair.

    Args:
        group_df: Single pair data sorted by timestamp.
        data_context: Optional additional context.

    Returns:
        DataFrame with same row count, plus label and optional meta columns.
    """
    if group_df.height == 0:
        return group_df

    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    prices = group_df[self.price_col].to_numpy().astype(np.float64)

    t_stats, best_windows = _trend_scan(
        prices,
        self.min_lookforward,
        self.max_lookforward,
        self.step,
    )

    # Assign labels based on t-statistic vs critical value
    n = len(prices)
    labels: list[str | None] = [None] * n
    for t in range(n):
        if np.isnan(t_stats[t]):
            continue
        if t_stats[t] > self.critical_value:
            labels[t] = "rise"
        elif t_stats[t] < -self.critical_value:
            labels[t] = "fall"

    df = group_df.with_columns(pl.Series(name=self.out_col, values=labels, dtype=pl.Utf8))

    if self.include_meta:
        df = df.with_columns(
            [
                pl.Series(name="t_stat", values=t_stats.tolist(), dtype=pl.Float64),
                pl.Series(
                    name="best_window",
                    values=best_windows.tolist(),
                    dtype=pl.Float64,
                ),
            ]
        )

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Structure Labeler (Local Extrema)

signalflow.target.structure_labeler.StructureLabeler dataclass

StructureLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.PRICE_STRUCTURE, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('swing_pct',), price_col: str = 'close', lookforward: int = 60, lookback: int = 60, min_swing_pct: float = 0.02, min_swing_zscore: float | None = None, vol_window: int = 500)

Bases: Labeler

Label local tops and bottoms using a symmetric window.

Uses future knowledge (look-forward) to identify bars that are local extrema within a combined lookback + lookforward window, filtered by either a fixed percentage or a rolling z-score threshold.

Swing Filter Modes

Fixed percentage (default): swing must exceed min_swing_pct.

Rolling z-score: set min_swing_zscore to enable. Computes rolling mean and std of window swings over vol_window bars, then filters by z-score >= threshold. Adapts to market volatility automatically — tighter in calm markets, wider in volatile ones.

Algorithm
  1. For each bar t, examine close[t-lookback : t+lookforward+1].
  2. Compute swing = (window_max - window_min) / window_min.
  3. If close[t] is the maximum in that window -> candidate top.
  4. If close[t] is the minimum in that window -> candidate bottom.
  5. Apply swing filter to confirm:
  6. Fixed: swing >= min_swing_pct
  7. Z-score: (swing - rolling_mean) / rolling_std >= min_swing_zscore
  8. Otherwise -> null.
Implementation

Uses Polars rolling expressions for computing window max/min and detecting extrema, reducing numpy loop overhead.

Attributes:

Name Type Description
price_col str

Price column. Default: "close".

lookforward int

Forward window size. Default: 60.

lookback int

Backward window size. Default: 60.

min_swing_pct float

Fixed minimum swing percentage. Default: 0.02 (2%). Ignored when min_swing_zscore is set.

min_swing_zscore float | None

Z-score threshold for adaptive filtering. Default: None. When set, overrides min_swing_pct.

vol_window int

Rolling window for z-score baseline. Default: 500.

Example
# Fixed percentage mode (default)
labeler = StructureLabeler(min_swing_pct=0.02, mask_to_signals=False)

# Rolling z-score mode (adaptive)
labeler = StructureLabeler(
    min_swing_zscore=2.0,
    vol_window=500,
    mask_to_signals=False,
)
result = labeler.compute(ohlcv_df)

lookback class-attribute instance-attribute

lookback: int = 60

lookforward class-attribute instance-attribute

lookforward: int = 60

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('swing_pct',)

min_swing_pct class-attribute instance-attribute

min_swing_pct: float = 0.02

min_swing_zscore class-attribute instance-attribute

min_swing_zscore: float | None = None

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_STRUCTURE

vol_window class-attribute instance-attribute

vol_window: int = 500

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/structure_labeler.py
def __post_init__(self) -> None:
    if self.lookforward <= 0:
        raise ValueError("lookforward must be > 0")
    if self.lookback <= 0:
        raise ValueError("lookback must be > 0")
    if self.min_swing_zscore is not None:
        if self.min_swing_zscore <= 0:
            raise ValueError("min_swing_zscore must be > 0")
        if self.vol_window < 20:
            raise ValueError("vol_window must be >= 20 for z-score mode")
    elif self.min_swing_pct < 0:
        raise ValueError("min_swing_pct must be >= 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute structure labels for a single pair.

Source code in src/signalflow/target/structure_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute structure labels for a single pair."""
    if group_df.height == 0:
        return group_df

    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    price = pl.col(self.price_col)

    # Step 1: Compute centered window max/min using rolling + shift
    #
    # For a centered window [t-lookback, t+lookforward]:
    # - Lookback max/min: rolling_max/min with window=lookback+1 (includes current)
    # - Lookforward max/min: shift(-lookforward), then rolling_max/min, shift back
    #
    # Full window max = max(lookback_max, lookforward_max)
    # Full window min = min(lookback_min, lookforward_min)

    lookback_window = self.lookback + 1  # Include current bar
    self.lookforward + 1  # Include current bar

    # Lookback rolling (includes current bar, looks back)
    lookback_max = price.rolling_max(window_size=lookback_window, min_samples=1)
    lookback_min = price.rolling_min(window_size=lookback_window, min_samples=1)

    # Lookforward rolling (shift, apply rolling, shift back)
    # After shift(-1), position t contains value from t+1
    # Apply rolling_max to get max of next `lookforward` bars
    # Then shift back to align
    lookforward_max = (
        price.shift(-1).rolling_max(window_size=self.lookforward, min_samples=1).shift(-(self.lookforward - 1))
    )
    lookforward_min = (
        price.shift(-1).rolling_min(window_size=self.lookforward, min_samples=1).shift(-(self.lookforward - 1))
    )

    df = group_df.with_columns(
        [
            lookback_max.alias("_lb_max"),
            lookback_min.alias("_lb_min"),
            lookforward_max.alias("_lf_max"),
            lookforward_min.alias("_lf_min"),
        ]
    )

    # Full window max/min (handle null in lookforward part at edges)
    df = df.with_columns(
        [
            pl.max_horizontal("_lb_max", "_lf_max").alias("_win_max"),
            pl.min_horizontal("_lb_min", "_lf_min").alias("_win_min"),
        ]
    )

    # Step 2: Compute swing = (win_max - win_min) / win_min
    df = df.with_columns(
        pl.when((pl.col("_win_min") > 0) & (pl.col("_win_max") != pl.col("_win_min")))
        .then((pl.col("_win_max") - pl.col("_win_min")) / pl.col("_win_min"))
        .otherwise(pl.lit(None))
        .alias("_swing")
    )

    # Step 3: Detect if current price is the window max or min
    df = df.with_columns(
        [
            (price == pl.col("_win_max")).alias("_is_max"),
            (price == pl.col("_win_min")).alias("_is_min"),
        ]
    )

    # Step 4: Apply threshold filter
    if self.min_swing_zscore is not None:
        # Z-score mode: compute rolling mean/std of swings
        df = df.with_columns(
            [
                pl.col("_swing").rolling_mean(window_size=self.vol_window, min_samples=20).alias("_swing_mean"),
                pl.col("_swing").rolling_std(window_size=self.vol_window, min_samples=20).alias("_swing_std"),
            ]
        )

        # Z-score = (swing - mean) / std >= threshold
        df = df.with_columns(
            pl.when(pl.col("_swing_std") > 0)
            .then((pl.col("_swing") - pl.col("_swing_mean")) / pl.col("_swing_std"))
            .otherwise(pl.lit(None))
            .alias("_zscore")
        )

        threshold_mask = pl.col("_zscore") >= self.min_swing_zscore
    else:
        # Fixed percentage mode
        threshold_mask = pl.col("_swing") >= self.min_swing_pct

    # Step 5: Assign labels
    label_expr = (
        pl.when(threshold_mask & pl.col("_is_max"))
        .then(pl.lit("local_max"))
        .when(threshold_mask & pl.col("_is_min"))
        .then(pl.lit("local_min"))
        .otherwise(pl.lit(None, dtype=pl.Utf8))
        .alias(self.out_col)
    )

    df = df.with_columns(label_expr)

    # Meta: swing_pct only for labeled rows
    if self.include_meta:
        df = df.with_columns(
            pl.when(pl.col(self.out_col).is_not_null())
            .then(pl.col("_swing"))
            .otherwise(pl.lit(None))
            .alias("swing_pct")
        )

    # Clean up temporary columns
    temp_cols = [
        "_lb_max",
        "_lb_min",
        "_lf_max",
        "_lf_min",
        "_win_max",
        "_win_min",
        "_swing",
        "_is_max",
        "_is_min",
    ]
    if self.min_swing_zscore is not None:
        temp_cols.extend(["_swing_mean", "_swing_std", "_zscore"])

    df = df.drop([c for c in temp_cols if c in df.columns])

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Zigzag Structure Labeler (Global)

signalflow.target.structure_labeler.ZigzagStructureLabeler dataclass

ZigzagStructureLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.PRICE_STRUCTURE, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('swing_pct',), price_col: str = 'close', min_swing_pct: float = 0.02, min_swing_zscore: float | None = None, vol_window: int = 500)

Bases: Labeler

Label local tops and bottoms using a full-series zigzag algorithm.

Unlike StructureLabeler (which uses fixed-size windows around each bar), this labeler scans the entire price series to find alternating swing highs and lows. A new pivot is confirmed only when the price reverses by more than the threshold from the current extreme.

The zigzag algorithm ensures
  • Tops and bottoms strictly alternate (no consecutive tops or bottoms).
  • Each swing exceeds the threshold (either fixed % or adaptive).
  • Pivots are globally consistent across the full series.
Swing Filter Modes

Fixed percentage (default): reversal must exceed min_swing_pct.

Adaptive (z-score): set min_swing_zscore to enable. Uses rolling volatility (std of log-returns) to compute a per-bar threshold: threshold = zscore x vol x sqrt(vol_window).

Algorithm
  1. Find first significant swing to determine initial direction.
  2. Track the running extreme (highest high or lowest low).
  3. When price reverses from the extreme by > threshold:
  4. Mark the extreme as "local_max" or "local_min".
  5. Switch direction and start tracking the new extreme.
  6. Result: alternating pivots across the full price series.
Implementation

Uses a sequential state-machine algorithm. This is inherently not parallelizable, so numpy/python loops are used. Polars is used for rolling volatility computation in z-score mode.

Attributes:

Name Type Description
price_col str

Price column. Default: "close".

min_swing_pct float

Fixed minimum reversal percentage. Default: 0.02. Ignored when min_swing_zscore is set.

min_swing_zscore float | None

Z-score multiplier for adaptive threshold. Default: None. When set, overrides min_swing_pct.

vol_window int

Rolling window for volatility computation. Default: 500.

Example
# Fixed percentage
labeler = ZigzagStructureLabeler(min_swing_pct=0.03, mask_to_signals=False)

# Adaptive threshold (z-score x rolling volatility)
labeler = ZigzagStructureLabeler(
    min_swing_zscore=2.0,
    vol_window=500,
    mask_to_signals=False,
)
result = labeler.compute(ohlcv_df)

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('swing_pct',)

min_swing_pct class-attribute instance-attribute

min_swing_pct: float = 0.02

min_swing_zscore class-attribute instance-attribute

min_swing_zscore: float | None = None

price_col class-attribute instance-attribute

price_col: str = 'close'

signal_category class-attribute instance-attribute

signal_category: SignalCategory = PRICE_STRUCTURE

vol_window class-attribute instance-attribute

vol_window: int = 500

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/structure_labeler.py
def __post_init__(self) -> None:
    if self.min_swing_zscore is not None:
        if self.min_swing_zscore <= 0:
            raise ValueError("min_swing_zscore must be > 0")
        if self.vol_window < 20:
            raise ValueError("vol_window must be >= 20 for z-score mode")
    elif self.min_swing_pct <= 0:
        raise ValueError("min_swing_pct must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

_adaptive_thresholds

_adaptive_thresholds(df: DataFrame, prices: ndarray) -> np.ndarray

Compute per-bar thresholds: zscore x rolling_vol x sqrt(vol_window).

Uses Polars for rolling std computation.

Source code in src/signalflow/target/structure_labeler.py
def _adaptive_thresholds(self, df: pl.DataFrame, prices: np.ndarray) -> np.ndarray:
    """Compute per-bar thresholds: zscore x rolling_vol x sqrt(vol_window).

    Uses Polars for rolling std computation.
    """
    n = len(prices)
    if n < 2:
        return np.full(n, np.inf)

    # Compute log returns using Polars
    price_col = pl.col(self.price_col)
    log_ret = (price_col / price_col.shift(1)).log()

    # Rolling std of returns
    rolling_vol = log_ret.rolling_std(window_size=self.vol_window, min_samples=20)

    # Compute thresholds
    vol_arr = df.select(rolling_vol.alias("vol"))["vol"].to_numpy()

    # threshold = zscore x vol x sqrt(vol_window)
    thresholds = self.min_swing_zscore * vol_arr * np.sqrt(self.vol_window)

    # Before we have enough data, use infinity (don't create pivots)
    thresholds = np.where(np.isnan(thresholds), np.inf, thresholds)

    return thresholds

_zigzag

_zigzag(prices: ndarray, thresholds: ndarray) -> tuple[list[str | None], np.ndarray]

Run zigzag algorithm with per-bar adaptive thresholds.

This is a state-machine algorithm that must process bars sequentially to maintain alternating top/bottom structure.

Returns:

Type Description
tuple[list[str | None], ndarray]

(labels, swing_pcts) — parallel arrays of length n.

Source code in src/signalflow/target/structure_labeler.py
def _zigzag(self, prices: np.ndarray, thresholds: np.ndarray) -> tuple[list[str | None], np.ndarray]:
    """Run zigzag algorithm with per-bar adaptive thresholds.

    This is a state-machine algorithm that must process bars sequentially
    to maintain alternating top/bottom structure.

    Returns:
        (labels, swing_pcts) — parallel arrays of length n.
    """
    n = len(prices)
    labels: list[str | None] = [None] * n
    swing_pcts = np.full(n, np.nan, dtype=np.float64)

    if n < 2:
        return labels, swing_pcts

    # Phase 1: Find initial direction ─────────────────────────────
    high_idx = 0
    low_idx = 0
    direction = 0  # 0=unknown, 1=going up (seeking top), -1=going down (seeking bottom)
    init_end = 0

    for i in range(1, n):
        if prices[i] > prices[high_idx]:
            high_idx = i
        if prices[i] < prices[low_idx]:
            low_idx = i

        if high_idx != low_idx and prices[low_idx] > 0:
            swing = (prices[high_idx] - prices[low_idx]) / prices[low_idx]
            if swing >= thresholds[i]:
                if high_idx > low_idx:
                    # Went down first then up → bottom confirmed
                    labels[low_idx] = "local_min"
                    swing_pcts[low_idx] = swing
                    direction = 1  # now going up, seeking top
                else:
                    # Went up first then down → top confirmed
                    labels[high_idx] = "local_max"
                    swing_pcts[high_idx] = swing
                    direction = -1  # now going down, seeking bottom
                init_end = i
                break

    if direction == 0:
        return labels, swing_pcts  # No significant swing in entire series

    # Phase 2: Main zigzag loop ───────────────────────────────────
    if direction == 1:
        candidate_idx = high_idx
        candidate_price = prices[high_idx]
    else:
        candidate_idx = low_idx
        candidate_price = prices[low_idx]

    for i in range(init_end + 1, n):
        threshold = thresholds[i]

        if direction == 1:  # Going up → seeking top
            if prices[i] >= candidate_price:
                # New high → update candidate top
                candidate_idx = i
                candidate_price = prices[i]
            elif candidate_price > 0:
                reversal = (candidate_price - prices[i]) / candidate_price
                if reversal >= threshold:
                    # Confirm top
                    labels[candidate_idx] = "local_max"
                    swing_pcts[candidate_idx] = reversal
                    direction = -1
                    candidate_idx = i
                    candidate_price = prices[i]

        else:  # direction == -1: Going down → seeking bottom
            if prices[i] <= candidate_price:
                candidate_idx = i
                candidate_price = prices[i]
            elif candidate_price > 0:
                reversal = (prices[i] - candidate_price) / candidate_price
                if reversal >= threshold:
                    # Confirm bottom
                    labels[candidate_idx] = "local_min"
                    swing_pcts[candidate_idx] = reversal
                    direction = 1
                    candidate_idx = i
                    candidate_price = prices[i]

    return labels, swing_pcts

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute zigzag structure labels for a single pair.

Source code in src/signalflow/target/structure_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute zigzag structure labels for a single pair."""
    if group_df.height == 0:
        return group_df

    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    prices = group_df[self.price_col].to_numpy().astype(np.float64)

    # Compute per-bar thresholds using Polars for rolling vol
    if self.min_swing_zscore is not None:
        thresholds = self._adaptive_thresholds(group_df, prices)
    else:
        thresholds = np.full(len(prices), self.min_swing_pct)

    # Run sequential zigzag algorithm
    labels, swing_pcts = self._zigzag(prices, thresholds)

    df = group_df.with_columns(pl.Series(name=self.out_col, values=labels, dtype=pl.Utf8))

    if self.include_meta:
        df = df.with_columns(
            pl.Series(
                name="swing_pct",
                values=swing_pcts.tolist(),
                dtype=pl.Float64,
            )
        )

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Volume Regime Labeler

signalflow.target.volume_labeler.VolumeRegimeLabeler dataclass

VolumeRegimeLabeler(raw_data_type: RawDataType | str = RawDataType.SPOT, signal_category: SignalCategory = SignalCategory.VOLUME_LIQUIDITY, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('volume_ratio',), volume_col: str = 'volume', horizon: int = 60, vol_sma_window: int = 1440, spike_threshold: float = 2.0, drought_threshold: float = 0.3)

Bases: Labeler

Label bars by forward volume regime.

Detects volume spikes and droughts by comparing forward average volume to a trailing volume SMA.

Algorithm
  1. Compute trailing volume SMA: rolling_mean(volume, vol_sma_window).
  2. Compute forward volume ratio: mean(volume[t+1 : t+horizon+1]) / trailing_sma[t]
  3. If ratio > spike_threshold -> "abnormal_volume"
  4. If ratio < drought_threshold -> "illiquidity"
  5. Otherwise -> null
Implementation

Uses pure Polars expressions instead of numpy loops for better performance and memory efficiency.

Attributes:

Name Type Description
volume_col str

Volume column. Default: "volume".

horizon int

Number of forward bars. Default: 60.

vol_sma_window int

Trailing SMA window. Default: 1440.

spike_threshold float

Threshold for volume spike. Default: 2.0.

drought_threshold float

Threshold for volume drought. Default: 0.3.

Example
from signalflow.target.volume_labeler import VolumeRegimeLabeler

labeler = VolumeRegimeLabeler(
    horizon=60,
    spike_threshold=2.0,
    drought_threshold=0.3,
    mask_to_signals=False,
)
result = labeler.compute(ohlcv_df)

drought_threshold class-attribute instance-attribute

drought_threshold: float = 0.3

horizon class-attribute instance-attribute

horizon: int = 60

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('volume_ratio',)

signal_category class-attribute instance-attribute

signal_category: SignalCategory = VOLUME_LIQUIDITY

spike_threshold class-attribute instance-attribute

spike_threshold: float = 2.0

vol_sma_window class-attribute instance-attribute

vol_sma_window: int = 1440

volume_col class-attribute instance-attribute

volume_col: str = 'volume'

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/volume_labeler.py
def __post_init__(self) -> None:
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")
    if self.vol_sma_window <= 0:
        raise ValueError("vol_sma_window must be > 0")
    if self.drought_threshold >= self.spike_threshold:
        raise ValueError(
            f"drought_threshold ({self.drought_threshold}) must be < spike_threshold ({self.spike_threshold})"
        )

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute volume regime labels for a single pair.

Parameters:

Name Type Description Default
group_df DataFrame

Single pair data sorted by timestamp.

required
data_context dict[str, Any] | None

Optional additional context.

None

Returns:

Type Description
DataFrame

DataFrame with same row count, plus label and optional meta columns.

Source code in src/signalflow/target/volume_labeler.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute volume regime labels for a single pair.

    Args:
        group_df: Single pair data sorted by timestamp.
        data_context: Optional additional context.

    Returns:
        DataFrame with same row count, plus label and optional meta columns.
    """
    if group_df.height == 0:
        return group_df

    if self.volume_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.volume_col}'")

    vol = pl.col(self.volume_col)

    # Step 1: Trailing volume SMA using Polars rolling_mean
    # min_samples=1 allows SMA from the first bar (like original behavior)
    df = group_df.with_columns(
        vol.rolling_mean(window_size=self.vol_sma_window, min_samples=1).alias("_trailing_sma")
    )

    # Step 2: Forward average volume
    # To compute mean(volume[t+1 : t+horizon+1]), we:
    # - Shift volume by -1 to start from next bar
    # - Apply rolling_mean with window=horizon
    # - The result at position t+horizon-1 contains mean of [t, t+horizon)
    # - Shift back by -(horizon-1) to align with position t
    df = df.with_columns(
        vol.shift(-1)
        .rolling_mean(window_size=self.horizon, min_samples=1)
        .shift(-(self.horizon - 1))
        .alias("_forward_avg")
    )

    # Step 3: Volume ratio = forward_avg / trailing_sma
    df = df.with_columns(
        pl.when(pl.col("_trailing_sma") > 0)
        .then(pl.col("_forward_avg") / pl.col("_trailing_sma"))
        .otherwise(pl.lit(None))
        .alias("_volume_ratio")
    )

    # Step 4-5: Assign labels based on thresholds
    label_expr = (
        pl.when(pl.col("_volume_ratio").is_null())
        .then(pl.lit(None, dtype=pl.Utf8))
        .when(pl.col("_volume_ratio") > self.spike_threshold)
        .then(pl.lit("abnormal_volume"))
        .when(pl.col("_volume_ratio") < self.drought_threshold)
        .then(pl.lit("illiquidity"))
        .otherwise(pl.lit(None, dtype=pl.Utf8))
        .alias(self.out_col)
    )

    df = df.with_columns(label_expr)

    if self.include_meta:
        df = df.with_columns(pl.col("_volume_ratio").alias("volume_ratio"))

    # Clean up temporary columns
    df = df.drop(["_trailing_sma", "_forward_avg", "_volume_ratio"])

    if self.mask_to_signals and data_context is not None and "signal_keys" in data_context:
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Multi-Target Generation

signalflow.target.multi_target_generator.MultiTargetGenerator dataclass

MultiTargetGenerator(horizons: list[HorizonConfig] = (lambda: list(DEFAULT_HORIZONS))(), target_types: list[TargetType] = (lambda: list(DEFAULT_TARGET_TYPES))(), volume_window: int = 60, volume_quantiles: tuple[float, float] = (0.33, 0.67), crash_quantiles: tuple[float, float] = (0.1, 0.9), pair_col: str = 'pair', ts_col: str = 'timestamp', price_col: str = 'close')

Generates multiple targets at multiple horizons from OHLCV data.

For each (horizon, target_type) combination, adds a column to the DataFrame. Column naming convention: target_{target_name}_{horizon_name}.

Direction targets use the existing Labeler infrastructure. Return magnitude is |log(close[t+h] / close[t])|. Volume regime discretizes volume / sma(volume) into HIGH/MED/LOW. Crash regime classifies forward return into crash/rally/normal.

Attributes:

Name Type Description
horizons list[HorizonConfig]

List of HorizonConfig.

target_types list[TargetType]

List of TargetType to generate.

volume_window int

Rolling window for volume SMA baseline.

volume_quantiles tuple[float, float]

(low, high) thresholds for volume regime.

crash_quantiles tuple[float, float]

(crash, rally) quantile thresholds for crash regime.

pair_col str

Trading pair column name.

ts_col str

Timestamp column name.

price_col str

Price column name.

crash_quantiles class-attribute instance-attribute

crash_quantiles: tuple[float, float] = (0.1, 0.9)

horizons class-attribute instance-attribute

horizons: list[HorizonConfig] = field(default_factory=lambda: list(DEFAULT_HORIZONS))

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

price_col class-attribute instance-attribute

price_col: str = 'close'

target_types class-attribute instance-attribute

target_types: list[TargetType] = field(default_factory=lambda: list(DEFAULT_TARGET_TYPES))

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

volume_quantiles class-attribute instance-attribute

volume_quantiles: tuple[float, float] = (0.33, 0.67)

volume_window class-attribute instance-attribute

volume_window: int = 60

_crash_regime_expr

_crash_regime_expr(horizon: int) -> pl.Expr

Polars expression for crash/rally regime classification.

Computes forward log-return over horizon bars, then discretizes into crash/rally/normal based on quantile thresholds.

Source code in src/signalflow/target/multi_target_generator.py
def _crash_regime_expr(self, horizon: int) -> pl.Expr:
    """Polars expression for crash/rally regime classification.

    Computes forward log-return over ``horizon`` bars, then discretizes
    into crash/rally/normal based on quantile thresholds.
    """
    price = pl.col(self.price_col)
    forward_return = (price.shift(-horizon) / price).log()

    crash_q, rally_q = self.crash_quantiles
    return (
        pl.when(forward_return.is_null())
        .then(pl.lit(None, dtype=pl.Utf8))
        .when(forward_return <= forward_return.quantile(crash_q))
        .then(pl.lit("crash"))
        .when(forward_return >= forward_return.quantile(rally_q))
        .then(pl.lit("rally"))
        .otherwise(pl.lit("normal"))
    )

_create_labeler

_create_labeler(h: HorizonConfig) -> Labeler

Instantiate a labeler for the given horizon.

Source code in src/signalflow/target/multi_target_generator.py
def _create_labeler(self, h: HorizonConfig) -> Labeler:
    """Instantiate a labeler for the given horizon."""
    kwargs: dict[str, Any] = {
        "price_col": self.price_col,
        "pair_col": self.pair_col,
        "ts_col": self.ts_col,
        "keep_input_columns": True,
        "mask_to_signals": False,
        "include_meta": False,
    }

    kwargs["horizon"] = h.horizon

    kwargs.update(h.labeler_kwargs)
    return h.labeler_cls(**kwargs)

_generate_crash_regime

_generate_crash_regime(df: DataFrame) -> pl.DataFrame
Source code in src/signalflow/target/multi_target_generator.py
def _generate_crash_regime(self, df: pl.DataFrame) -> pl.DataFrame:
    for h in self.horizons:
        col_name = f"target_crash_regime_{h.name}"

        df = df.with_columns(
            df.group_by(self.pair_col, maintain_order=True)
            .agg(self._crash_regime_expr(h.horizon).alias(col_name))
            .sort(self.pair_col)
            .get_column(col_name)
            .explode()
            .alias(col_name)
        )

        logger.debug(f"Generated crash regime target: {col_name}")

    return df

_generate_direction

_generate_direction(df: DataFrame) -> pl.DataFrame
Source code in src/signalflow/target/multi_target_generator.py
def _generate_direction(self, df: pl.DataFrame) -> pl.DataFrame:
    for h in self.horizons:
        col_name = f"target_direction_{h.name}"
        labeler = self._create_labeler(h)
        labeled = labeler.compute(df)

        label_series = labeled.get_column(labeler.out_col).alias(col_name)
        df = df.with_columns(label_series)

        logger.debug(f"Generated direction target: {col_name}")

    return df

_generate_return_magnitude

_generate_return_magnitude(df: DataFrame) -> pl.DataFrame
Source code in src/signalflow/target/multi_target_generator.py
def _generate_return_magnitude(self, df: pl.DataFrame) -> pl.DataFrame:
    for h in self.horizons:
        col_name = f"target_return_magnitude_{h.name}"

        df = df.with_columns(
            df.group_by(self.pair_col, maintain_order=True)
            .agg((pl.col(self.price_col).shift(-h.horizon) / pl.col(self.price_col)).log().abs().alias(col_name))
            .sort(self.pair_col)
            .get_column(col_name)
            .explode()
            .alias(col_name)
        )

        logger.debug(f"Generated return magnitude target: {col_name}")

    return df

_generate_volume_regime

_generate_volume_regime(df: DataFrame) -> pl.DataFrame
Source code in src/signalflow/target/multi_target_generator.py
def _generate_volume_regime(self, df: pl.DataFrame) -> pl.DataFrame:
    for h in self.horizons:
        col_name = f"target_volume_regime_{h.name}"

        df = df.with_columns(
            df.group_by(self.pair_col, maintain_order=True)
            .agg(self._volume_regime_expr(h.horizon).alias(col_name))
            .sort(self.pair_col)
            .get_column(col_name)
            .explode()
            .alias(col_name)
        )

        logger.debug(f"Generated volume regime target: {col_name}")

    return df

_validate

_validate(df: DataFrame) -> None
Source code in src/signalflow/target/multi_target_generator.py
def _validate(self, df: pl.DataFrame) -> None:
    required = {self.pair_col, self.ts_col, self.price_col}
    target_type_names = {t.name for t in self.target_types}
    if "volume_regime" in target_type_names:
        required.add("volume")

    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {sorted(missing)}")

    if not self.horizons:
        raise ValueError("At least one horizon is required")
    if not self.target_types:
        raise ValueError("At least one target type is required")

_volume_regime_expr

_volume_regime_expr(horizon: int) -> pl.Expr

Polars expression for volume regime classification.

Computes forward-looking average volume ratio vs current rolling SMA, then discretizes into HIGH/MED/LOW based on quantile thresholds.

Source code in src/signalflow/target/multi_target_generator.py
def _volume_regime_expr(self, horizon: int) -> pl.Expr:
    """Polars expression for volume regime classification.

    Computes forward-looking average volume ratio vs current rolling SMA,
    then discretizes into HIGH/MED/LOW based on quantile thresholds.
    """
    vol = pl.col("volume")
    vol_sma = vol.rolling_mean(window_size=self.volume_window)
    vol_ratio = vol / vol_sma

    # Use forward-looking mean of volume_ratio over the horizon
    forward_vol_ratio = vol_ratio.shift(-horizon)

    low_q, high_q = self.volume_quantiles
    return (
        pl.when(forward_vol_ratio.is_null())
        .then(pl.lit(None, dtype=pl.Utf8))
        .when(forward_vol_ratio >= forward_vol_ratio.quantile(high_q))
        .then(pl.lit("HIGH"))
        .when(forward_vol_ratio <= forward_vol_ratio.quantile(low_q))
        .then(pl.lit("LOW"))
        .otherwise(pl.lit("MED"))
    )

generate

generate(df: DataFrame) -> pl.DataFrame

Generate all target columns.

Parameters:

Name Type Description Default
df DataFrame

OHLCV DataFrame with pair, timestamp, open, high, low, close, volume.

required

Returns:

Type Description
DataFrame

Original DataFrame with added target_* columns.

Source code in src/signalflow/target/multi_target_generator.py
def generate(self, df: pl.DataFrame) -> pl.DataFrame:
    """Generate all target columns.

    Args:
        df: OHLCV DataFrame with pair, timestamp, open, high, low, close, volume.

    Returns:
        Original DataFrame with added ``target_*`` columns.
    """
    self._validate(df)

    target_type_map = {t.name: t for t in self.target_types}

    if "direction" in target_type_map:
        df = self._generate_direction(df)

    if "return_magnitude" in target_type_map:
        df = self._generate_return_magnitude(df)

    if "volume_regime" in target_type_map:
        df = self._generate_volume_regime(df)

    if "crash_regime" in target_type_map:
        df = self._generate_crash_regime(df)

    return df

target_columns

target_columns() -> list[dict[str, str]]

Return metadata for all generated target columns.

Returns:

Type Description
list[dict[str, str]]

List of dicts with keys: column, horizon, target_type, kind.

Source code in src/signalflow/target/multi_target_generator.py
def target_columns(self) -> list[dict[str, str]]:
    """Return metadata for all generated target columns.

    Returns:
        List of dicts with keys: column, horizon, target_type, kind.
    """
    result = []
    for h in self.horizons:
        for t in self.target_types:
            result.append(
                {
                    "column": f"target_{t.name}_{h.name}",
                    "horizon": h.name,
                    "target_type": t.name,
                    "kind": t.kind,
                }
            )
    return result

signalflow.target.multi_target_generator.HorizonConfig dataclass

HorizonConfig(name: str, horizon: int, labeler_cls: type[Labeler] = TripleBarrierLabeler, labeler_kwargs: dict[str, Any] = dict())

Configuration for a single prediction horizon.

Attributes:

Name Type Description
name str

Human-readable name (e.g., "short", "mid", "long").

horizon int

Number of bars for the horizon.

labeler_cls type[Labeler]

Labeler class to use for direction targets.

labeler_kwargs dict[str, Any]

Extra kwargs passed to the labeler constructor.

signalflow.target.multi_target_generator.TargetType dataclass

TargetType(name: str, kind: str)

Defines a target type derived from OHLCV data.

Attributes:

Name Type Description
name str

Target name (e.g., "direction", "return_magnitude", "volume_regime").

kind str

"discrete" or "continuous" — determines MI computation method.

Utility Functions

mask_targets_by_signals

signalflow.target.utils.mask_targets_by_signals

mask_targets_by_signals(df: DataFrame, signals: Signals, mask_signal_types: set[str], horizon_bars: int, cooldown_bars: int = 60, target_columns: list[str] | None = None, pair_col: str = 'pair', ts_col: str = 'timestamp') -> pl.DataFrame

Mask target columns for timestamps overlapping with specified signals.

For each signal at time T with type in mask_signal_types: - Masks range [T - horizon_bars, T + cooldown_bars]

This is useful for excluding labels that overlap with exogenous events (e.g., flash crashes, global market events) from training data, since no feature could predict such events.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with target columns.

required
signals Signals

Signals object containing detected events.

required
mask_signal_types set[str]

Signal types to mask (e.g. {"flash_crash", "global_event"}).

required
horizon_bars int

Forward horizon (bars before signal that "see" it).

required
cooldown_bars int

Bars after signal to mask (default: 60).

60
target_columns list[str] | None

Columns to mask (default: all columns ending with "_label").

None
pair_col str

Pair column name (default: "pair").

'pair'
ts_col str

Timestamp column name (default: "timestamp").

'timestamp'

Returns:

Type Description
DataFrame

DataFrame with affected target columns set to null.

Example
from signalflow.detector import ZScoreAnomalyDetector
from signalflow.target.utils import mask_targets_by_signals

# Detect anomalies
detector = ZScoreAnomalyDetector(threshold=4.0)
signals = detector.run(raw_data_view)

# Mask labels overlapping with flash crashes
labeled_df = mask_targets_by_signals(
    df=labeled_df,
    signals=signals,
    mask_signal_types={"anomaly_low"},  # flash crashes
    horizon_bars=60,
    cooldown_bars=60,
)
Note
  • Masking is done per-pair: signal at time T for pair A only masks labels for pair A at affected timestamps.
  • If signals DataFrame is empty or has no matching signal_types, the input DataFrame is returned unchanged.
Source code in src/signalflow/target/utils.py
def mask_targets_by_signals(
    df: pl.DataFrame,
    signals: Signals,
    mask_signal_types: set[str],
    horizon_bars: int,
    cooldown_bars: int = 60,
    target_columns: list[str] | None = None,
    pair_col: str = "pair",
    ts_col: str = "timestamp",
) -> pl.DataFrame:
    """Mask target columns for timestamps overlapping with specified signals.

    For each signal at time T with type in mask_signal_types:
    - Masks range [T - horizon_bars, T + cooldown_bars]

    This is useful for excluding labels that overlap with exogenous events
    (e.g., flash crashes, global market events) from training data, since
    no feature could predict such events.

    Args:
        df: DataFrame with target columns.
        signals: Signals object containing detected events.
        mask_signal_types: Signal types to mask (e.g. {"flash_crash", "global_event"}).
        horizon_bars: Forward horizon (bars before signal that "see" it).
        cooldown_bars: Bars after signal to mask (default: 60).
        target_columns: Columns to mask (default: all columns ending with "_label").
        pair_col: Pair column name (default: "pair").
        ts_col: Timestamp column name (default: "timestamp").

    Returns:
        DataFrame with affected target columns set to null.

    Example:
        ```python
        from signalflow.detector import ZScoreAnomalyDetector
        from signalflow.target.utils import mask_targets_by_signals

        # Detect anomalies
        detector = ZScoreAnomalyDetector(threshold=4.0)
        signals = detector.run(raw_data_view)

        # Mask labels overlapping with flash crashes
        labeled_df = mask_targets_by_signals(
            df=labeled_df,
            signals=signals,
            mask_signal_types={"anomaly_low"},  # flash crashes
            horizon_bars=60,
            cooldown_bars=60,
        )
        ```

    Note:
        - Masking is done per-pair: signal at time T for pair A only masks
          labels for pair A at affected timestamps.
        - If signals DataFrame is empty or has no matching signal_types,
          the input DataFrame is returned unchanged.
    """
    # Get events matching the specified signal types
    signals_df = signals.value

    if signals_df.height == 0:
        return df

    if "signal_type" not in signals_df.columns:
        logger.warning("Signals DataFrame has no 'signal_type' column, returning unchanged")
        return df

    # Filter to matching signal types
    events = signals_df.filter(pl.col("signal_type").is_in(list(mask_signal_types)))

    if events.height == 0:
        logger.debug(f"No signals matching types {mask_signal_types}, returning unchanged")
        return df

    # Determine target columns to mask
    if target_columns is None:
        target_columns = [c for c in df.columns if c.endswith("_label")]

    if not target_columns:
        logger.warning("No target columns found to mask")
        return df

    existing_cols = [c for c in target_columns if c in df.columns]
    if not existing_cols:
        logger.warning(f"Target columns {target_columns} not found in DataFrame")
        return df

    # Get all unique timestamps for efficient index lookup
    df.select([pair_col, ts_col]).unique().sort([pair_col, ts_col])

    # Process per pair for correct masking
    masked_rows: list[pl.DataFrame] = []

    for pair_name in df.get_column(pair_col).unique().to_list():
        pair_df = df.filter(pl.col(pair_col) == pair_name)
        pair_events = events.filter(pl.col(pair_col) == pair_name)

        if pair_events.height == 0:
            masked_rows.append(pair_df)
            continue

        ts_array = pair_df.get_column(ts_col).to_numpy()
        event_ts = pair_events.get_column(ts_col).to_numpy()

        mask = np.zeros(len(ts_array), dtype=bool)

        for evt in event_ts:
            idx = np.searchsorted(ts_array, evt)
            if idx >= len(ts_array):
                continue

            start = max(0, idx - horizon_bars)
            end = min(len(ts_array), idx + cooldown_bars + 1)
            mask[start:end] = True

        n_masked = int(mask.sum())
        if n_masked > 0:
            logger.debug(f"Masking {n_masked} timestamps for pair {pair_name}")

            # Create masked DataFrame
            mask_series = pl.Series("_mask", mask)
            pair_df = (
                pair_df.with_columns(mask_series)
                .with_columns(
                    [
                        pl.when(pl.col("_mask")).then(pl.lit(None)).otherwise(pl.col(col)).alias(col)
                        for col in existing_cols
                    ]
                )
                .drop("_mask")
            )

        masked_rows.append(pair_df)

    if not masked_rows:
        return df

    result = pl.concat(masked_rows, how="vertical_relaxed")

    total_events = events.height
    logger.info(
        f"mask_targets_by_signals: masked around {total_events} events "
        f"(types={mask_signal_types}, horizon={horizon_bars}, cooldown={cooldown_bars})"
    )

    return result

mask_targets_by_timestamps

signalflow.target.utils.mask_targets_by_timestamps

mask_targets_by_timestamps(df: DataFrame, event_timestamps: list, horizon_bars: int, cooldown_bars: int = 60, target_columns: list[str] | None = None, ts_col: str = 'timestamp') -> pl.DataFrame

Mask target columns for timestamps overlapping with event timestamps.

Simpler version of mask_targets_by_signals that works with raw timestamps instead of Signals objects. Applies masking globally (not per-pair).

Parameters:

Name Type Description Default
df DataFrame

DataFrame with target columns.

required
event_timestamps list

List of event timestamps to mask around.

required
horizon_bars int

Forward horizon (bars before event that "see" it).

required
cooldown_bars int

Bars after event to mask (default: 60).

60
target_columns list[str] | None

Columns to mask (default: all columns ending with "_label").

None
ts_col str

Timestamp column name (default: "timestamp").

'timestamp'

Returns:

Type Description
DataFrame

DataFrame with affected target columns set to null.

Example
from signalflow.target.utils import mask_targets_by_timestamps
from datetime import datetime

# Mask around known events
labeled_df = mask_targets_by_timestamps(
    df=labeled_df,
    event_timestamps=[
        datetime(2024, 3, 1, 10, 30),  # Known flash crash
        datetime(2024, 5, 15, 14, 0),  # Fed announcement
    ],
    horizon_bars=60,
    cooldown_bars=120,
)
Source code in src/signalflow/target/utils.py
def mask_targets_by_timestamps(
    df: pl.DataFrame,
    event_timestamps: list,
    horizon_bars: int,
    cooldown_bars: int = 60,
    target_columns: list[str] | None = None,
    ts_col: str = "timestamp",
) -> pl.DataFrame:
    """Mask target columns for timestamps overlapping with event timestamps.

    Simpler version of mask_targets_by_signals that works with raw timestamps
    instead of Signals objects. Applies masking globally (not per-pair).

    Args:
        df: DataFrame with target columns.
        event_timestamps: List of event timestamps to mask around.
        horizon_bars: Forward horizon (bars before event that "see" it).
        cooldown_bars: Bars after event to mask (default: 60).
        target_columns: Columns to mask (default: all columns ending with "_label").
        ts_col: Timestamp column name (default: "timestamp").

    Returns:
        DataFrame with affected target columns set to null.

    Example:
        ```python
        from signalflow.target.utils import mask_targets_by_timestamps
        from datetime import datetime

        # Mask around known events
        labeled_df = mask_targets_by_timestamps(
            df=labeled_df,
            event_timestamps=[
                datetime(2024, 3, 1, 10, 30),  # Known flash crash
                datetime(2024, 5, 15, 14, 0),  # Fed announcement
            ],
            horizon_bars=60,
            cooldown_bars=120,
        )
        ```
    """
    if not event_timestamps:
        return df

    # Determine target columns to mask
    if target_columns is None:
        target_columns = [c for c in df.columns if c.endswith("_label")]

    if not target_columns:
        logger.warning("No target columns found to mask")
        return df

    existing_cols = [c for c in target_columns if c in df.columns]
    if not existing_cols:
        logger.warning(f"Target columns {target_columns} not found in DataFrame")
        return df

    ts_array = df.get_column(ts_col).to_numpy()
    mask = np.zeros(len(ts_array), dtype=bool)

    for evt in event_timestamps:
        idx = np.searchsorted(ts_array, evt)
        if idx >= len(ts_array):
            continue

        start = max(0, idx - horizon_bars)
        end = min(len(ts_array), idx + cooldown_bars + 1)
        mask[start:end] = True

    n_masked = int(mask.sum())
    if n_masked == 0:
        return df

    logger.info(f"mask_targets_by_timestamps: masked {n_masked} timestamps around {len(event_timestamps)} events")

    mask_series = pl.Series("_mask", mask)
    result = (
        df.with_columns(mask_series)
        .with_columns(
            [pl.when(pl.col("_mask")).then(pl.lit(None)).otherwise(pl.col(col)).alias(col) for col in existing_cols]
        )
        .drop("_mask")
    )

    return result