Skip to content

Target Module

Signal labeling strategies for machine learning training.

Module Name

The target functionality is implemented in the signalflow.target module.

Base Class

signalflow.target.base.Labeler dataclass

Labeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'))

Bases: ABC

Base class for Polars-only signal labeling.

Assigns forward-looking labels to historical data based on future price movement. Labels are computed per-pair with length-preserving operations.

Key concepts
  • Forward-looking: Labels depend on future data (not available in live trading)
  • Per-pair processing: Each pair labeled independently
  • Length-preserving: Output has same row count as input
  • Signal masking: Optionally label only at signal timestamps
Public API
  • compute(): Main entry point (handles grouping, filtering, projection)
  • compute_group(): Per-pair labeling logic (must implement)
Common labeling strategies
  • Fixed horizon: Label based on return over N bars
  • Triple barrier: Label based on first hit of profit/loss/time barrier
  • Quantile-based: Label based on return quantiles

Attributes:

Name Type Description
component_type ClassVar[SfComponentType]

Always LABELER for registry.

raw_data_type RawDataType

Type of raw data. Default: SPOT.

pair_col str

Trading pair column. Default: "pair".

ts_col str

Timestamp column. Default: "timestamp".

keep_input_columns bool

Keep all input columns. Default: False.

output_columns list[str] | None

Specific columns to output. Default: None.

filter_signal_type SignalType | None

Filter to specific signal type. Default: None.

mask_to_signals bool

Mask labels to signal timestamps only. Default: True.

out_col str

Output label column name. Default: "label".

include_meta bool

Include metadata columns. Default: False.

meta_columns tuple[str, ...]

Metadata column names. Default: ("t_hit", "ret").

Example
from signalflow.target import Labeler
from signalflow.core import SignalType
import polars as pl

class FixedHorizonLabeler(Labeler):
    '''Label based on fixed-horizon return'''

    def __init__(self, horizon: int = 10, threshold: float = 0.01):
        super().__init__()
        self.horizon = horizon
        self.threshold = threshold

    def compute_group(self, group_df, data_context=None):
        # Compute forward return
        labels = group_df.with_columns([
            pl.col("close").shift(-self.horizon).alias("future_close")
        ]).with_columns([
            ((pl.col("future_close") / pl.col("close")) - 1).alias("return")
        ]).with_columns([
            pl.when(pl.col("return") > self.threshold)
            .then(pl.lit(SignalType.RISE.value))
            .when(pl.col("return") < -self.threshold)
            .then(pl.lit(SignalType.FALL.value))
            .otherwise(pl.lit(SignalType.NONE.value))
            .alias("label")
        ])

        return labels

# Usage
labeler = FixedHorizonLabeler(horizon=10, threshold=0.01)
labeled = labeler.compute(ohlcv_df, signals=signals)
Note

compute_group() must preserve row count (no filtering). All timestamps must be timezone-naive. Signal masking requires mask_to_signals=True and signal_keys in context.

See Also

FixedHorizonLabeler: Simple fixed-horizon implementation. TripleBarrierLabeler: Three-barrier labeling strategy.

component_type class-attribute

component_type: SfComponentType = LABELER

filter_signal_type class-attribute instance-attribute

filter_signal_type: SignalType | None = None

include_meta class-attribute instance-attribute

include_meta: bool = False

keep_input_columns class-attribute instance-attribute

keep_input_columns: bool = False

mask_to_signals class-attribute instance-attribute

mask_to_signals: bool = True

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('t_hit', 'ret')

out_col class-attribute instance-attribute

out_col: str = 'label'

output_columns class-attribute instance-attribute

output_columns: list[str] | None = None

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

raw_data_type class-attribute instance-attribute

raw_data_type: RawDataType = SPOT

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

_apply_signal_mask

_apply_signal_mask(df: DataFrame, data_context: dict[str, Any], group_df: DataFrame) -> pl.DataFrame

Mask labels to signal timestamps only.

Labels are computed for all rows, but only signal timestamps get actual labels; others are set to SignalType.NONE.

Used for meta-labeling: only label at detected signal points, not every bar.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with computed labels.

required
data_context dict[str, Any]

Must contain "signal_keys" DataFrame.

required
group_df DataFrame

Original group data for extracting pair value.

required

Returns:

Type Description
DataFrame

pl.DataFrame: DataFrame with masked labels.

Example
# In compute_group with masking
def compute_group(self, group_df, data_context=None):
    # Compute labels for all rows
    labeled = group_df.with_columns([...])

    # Mask to signal timestamps only
    if self.mask_to_signals and data_context:
        labeled = self._apply_signal_mask(
            labeled, data_context, group_df
        )

    return labeled
Note

Requires signal_keys in data_context with (pair, timestamp) columns. Non-signal rows get label=SignalType.NONE. Metadata columns also masked if include_meta=True.

Source code in src/signalflow/target/base.py
def _apply_signal_mask(
    self,
    df: pl.DataFrame,
    data_context: dict[str, Any],
    group_df: pl.DataFrame,
) -> pl.DataFrame:
    """Mask labels to signal timestamps only.

    Labels are computed for all rows, but only signal timestamps
    get actual labels; others are set to SignalType.NONE.

    Used for meta-labeling: only label at detected signal points,
    not every bar.

    Args:
        df (pl.DataFrame): DataFrame with computed labels.
        data_context (dict[str, Any]): Must contain "signal_keys" DataFrame.
        group_df (pl.DataFrame): Original group data for extracting pair value.

    Returns:
        pl.DataFrame: DataFrame with masked labels.

    Example:
        ```python
        # In compute_group with masking
        def compute_group(self, group_df, data_context=None):
            # Compute labels for all rows
            labeled = group_df.with_columns([...])

            # Mask to signal timestamps only
            if self.mask_to_signals and data_context:
                labeled = self._apply_signal_mask(
                    labeled, data_context, group_df
                )

            return labeled
        ```

    Note:
        Requires signal_keys in data_context with (pair, timestamp) columns.
        Non-signal rows get label=SignalType.NONE.
        Metadata columns also masked if include_meta=True.
    """
    signal_keys: pl.DataFrame = data_context["signal_keys"]
    pair_value = group_df.get_column(self.pair_col)[0]

    signal_ts = (
        signal_keys.filter(pl.col(self.pair_col) == pair_value)
        .select(self.ts_col)
        .unique()
    )

    if signal_ts.height == 0:
        df = df.with_columns(pl.lit(SignalType.NONE.value).alias(self.out_col))
        if self.include_meta:
            df = df.with_columns(
                [pl.lit(None).alias(col) for col in self.meta_columns]
            )
    else:
        is_signal = pl.col("_is_signal").fill_null(False)
        mask_exprs = [
            pl.when(is_signal)
            .then(pl.col(self.out_col))
            .otherwise(pl.lit(SignalType.NONE.value))
            .alias(self.out_col),
        ]
        if self.include_meta:
            mask_exprs += [
                pl.when(is_signal)
                .then(pl.col(col))
                .otherwise(pl.lit(None))
                .alias(col)
                for col in self.meta_columns
            ]

        df = (
            df.join(
                signal_ts.with_columns(pl.lit(True).alias("_is_signal")),
                on=self.ts_col,
                how="left",
            )
            .with_columns(mask_exprs)
            .drop("_is_signal")
        )

    return df

_compute_pl

_compute_pl(df: DataFrame, signals: Signals | None, data_context: dict[str, Any] | None) -> pl.DataFrame

Internal Polars-based computation.

Orchestrates validation, filtering, grouping, and projection.

Parameters:

Name Type Description Default
df DataFrame

Input data.

required
signals Signals | None

Optional signals.

required
data_context dict[str, Any] | None

Optional context.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Labeled data.

Source code in src/signalflow/target/base.py
def _compute_pl(
    self,
    df: pl.DataFrame,
    signals: Signals | None,
    data_context: dict[str, Any] | None,
) -> pl.DataFrame:
    """Internal Polars-based computation.

    Orchestrates validation, filtering, grouping, and projection.

    Args:
        df (pl.DataFrame): Input data.
        signals (Signals | None): Optional signals.
        data_context (dict[str, Any] | None): Optional context.

    Returns:
        pl.DataFrame: Labeled data.
    """
    self._validate_input_pl(df)
    df0 = df.sort([self.pair_col, self.ts_col])

    if signals is not None and self.filter_signal_type is not None:
        s_pl = self._signals_to_pl(signals)
        df0 = self._filter_by_signals_pl(df0, s_pl, self.filter_signal_type)

    input_cols = set(df0.columns)

    def _wrapped(g: pl.DataFrame) -> pl.DataFrame:
        out = self.compute_group(g, data_context=data_context)
        if not isinstance(out, pl.DataFrame):
            raise TypeError(f"{self.__class__.__name__}.compute_group must return pl.DataFrame")
        if out.height != g.height:
            raise ValueError(
                f"{self.__class__.__name__}: len(output_group)={out.height} != len(input_group)={g.height}"
            )
        return out

    out = (
        df0.group_by(self.pair_col, maintain_order=True)
        .map_groups(_wrapped)
        .sort([self.pair_col, self.ts_col])
    )

    if self.keep_input_columns:
        return out

    label_cols = (
        sorted(set(out.columns) - input_cols)
        if self.output_columns is None
        else list(self.output_columns)
    )

    keep_cols = [self.pair_col, self.ts_col] + label_cols
    missing = [c for c in keep_cols if c not in out.columns]
    if missing:
        raise ValueError(f"Projection error, missing columns: {missing}")

    return out.select(keep_cols)

_filter_by_signals_pl

_filter_by_signals_pl(df: DataFrame, s: DataFrame, signal_type: SignalType) -> pl.DataFrame

Filter input to rows matching signal timestamps.

Inner join with signal timestamps of specific type.

Parameters:

Name Type Description Default
df DataFrame

Input data.

required
s DataFrame

Signals DataFrame.

required
signal_type SignalType

Signal type to filter.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Filtered data (only rows at signal timestamps).

Raises:

Type Description
ValueError

If signals missing required columns.

Source code in src/signalflow/target/base.py
def _filter_by_signals_pl(
    self, df: pl.DataFrame, s: pl.DataFrame, signal_type: SignalType
) -> pl.DataFrame:
    """Filter input to rows matching signal timestamps.

    Inner join with signal timestamps of specific type.

    Args:
        df (pl.DataFrame): Input data.
        s (pl.DataFrame): Signals DataFrame.
        signal_type (SignalType): Signal type to filter.

    Returns:
        pl.DataFrame: Filtered data (only rows at signal timestamps).

    Raises:
        ValueError: If signals missing required columns.
    """
    required = {self.pair_col, self.ts_col, "signal_type"}
    missing = required - set(s.columns)
    if missing:
        raise ValueError(f"Signals missing columns: {sorted(missing)}")

    s_f = (
        s.filter(pl.col("signal_type") == signal_type.value)
        .select([self.pair_col, self.ts_col])
        .unique(subset=[self.pair_col, self.ts_col])
    )
    return df.join(s_f, on=[self.pair_col, self.ts_col], how="inner")

_signals_to_pl

_signals_to_pl(signals: Signals) -> pl.DataFrame

Convert Signals to Polars DataFrame.

Parameters:

Name Type Description Default
signals Signals

Signals container.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Signals as DataFrame.

Raises:

Type Description
TypeError

If Signals.value is not pl.DataFrame.

Source code in src/signalflow/target/base.py
def _signals_to_pl(self, signals: Signals) -> pl.DataFrame:
    """Convert Signals to Polars DataFrame.

    Args:
        signals (Signals): Signals container.

    Returns:
        pl.DataFrame: Signals as DataFrame.

    Raises:
        TypeError: If Signals.value is not pl.DataFrame.
    """
    s = signals.value
    if isinstance(s, pl.DataFrame):
        return s
    raise TypeError(f"Unsupported Signals.value type: {type(s)}")

_validate_input_pl

_validate_input_pl(df: DataFrame) -> None

Validate input DataFrame schema.

Parameters:

Name Type Description Default
df DataFrame

Input to validate.

required

Raises:

Type Description
ValueError

If required columns missing.

Source code in src/signalflow/target/base.py
def _validate_input_pl(self, df: pl.DataFrame) -> None:
    """Validate input DataFrame schema.

    Args:
        df (pl.DataFrame): Input to validate.

    Raises:
        ValueError: If required columns missing.
    """
    missing = [c for c in (self.pair_col, self.ts_col) if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

compute

compute(df: DataFrame, signals: Signals | None = None, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Compute labels for input DataFrame.

Main entry point - handles validation, filtering, grouping, and projection.

Processing steps
  1. Validate input schema
  2. Sort by (pair, timestamp)
  3. (optional) Filter to specific signal type
  4. Group by pair and apply compute_group()
  5. Validate output (length-preserving)
  6. Project to output columns

Parameters:

Name Type Description Default
df DataFrame

Input data with OHLCV and required columns.

required
signals Signals | None

Signals for filtering/masking.

None
data_context dict[str, Any] | None

Additional context.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Labeled data with columns: - pair, timestamp (always included) - label column(s) (as specified by out_col) - (optional) metadata columns

Raises:

Type Description
TypeError

If df not pl.DataFrame or compute_group returns wrong type.

ValueError

If compute_group changes row count or columns missing.

Example
# Basic labeling
labeled = labeler.compute(ohlcv_df)

# With signal filtering
labeled = labeler.compute(
    ohlcv_df,
    signals=signals,
    filter_signal_type=SignalType.RISE
)

# With masking context
labeled = labeler.compute(
    ohlcv_df,
    signals=signals,
    data_context={"signal_keys": signal_timestamps_df}
)
Source code in src/signalflow/target/base.py
def compute(
    self,
    df: pl.DataFrame,
    signals: Signals | None = None,
    data_context: dict[str, Any] | None = None,
) -> pl.DataFrame:
    """Compute labels for input DataFrame.

    Main entry point - handles validation, filtering, grouping, and projection.

    Processing steps:
        1. Validate input schema
        2. Sort by (pair, timestamp)
        3. (optional) Filter to specific signal type
        4. Group by pair and apply compute_group()
        5. Validate output (length-preserving)
        6. Project to output columns

    Args:
        df (pl.DataFrame): Input data with OHLCV and required columns.
        signals (Signals | None): Signals for filtering/masking.
        data_context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Labeled data with columns:
            - pair, timestamp (always included)
            - label column(s) (as specified by out_col)
            - (optional) metadata columns

    Raises:
        TypeError: If df not pl.DataFrame or compute_group returns wrong type.
        ValueError: If compute_group changes row count or columns missing.

    Example:
        ```python
        # Basic labeling
        labeled = labeler.compute(ohlcv_df)

        # With signal filtering
        labeled = labeler.compute(
            ohlcv_df,
            signals=signals,
            filter_signal_type=SignalType.RISE
        )

        # With masking context
        labeled = labeler.compute(
            ohlcv_df,
            signals=signals,
            data_context={"signal_keys": signal_timestamps_df}
        )
        ```
    """
    if not isinstance(df, pl.DataFrame):
        raise TypeError(f"{self.__class__.__name__}.compute expects pl.DataFrame, got {type(df)}")
    return self._compute_pl(df=df, signals=signals, data_context=data_context)

compute_group abstractmethod

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame

Compute labels for single pair group.

Core labeling logic - must be implemented by subclasses.

CRITICAL: Must preserve row count (len(output) == len(input)). No filtering allowed inside compute_group.

Parameters:

Name Type Description Default
group_df DataFrame

Single pair's data, sorted by timestamp.

required
data_context dict[str, Any] | None

Additional context.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Same length as input with added label columns.

Example
def compute_group(self, group_df, data_context=None):
    # Compute 10-bar forward return
    return group_df.with_columns([
        pl.col("close").shift(-10).alias("future_close")
    ]).with_columns([
        ((pl.col("future_close") / pl.col("close")) - 1).alias("return"),
        pl.when((pl.col("future_close") / pl.col("close") - 1) > 0.01)
        .then(pl.lit(SignalType.RISE.value))
        .otherwise(pl.lit(SignalType.NONE.value))
        .alias("label")
    ])
Note

Output must have same height as input (length-preserving). Use shift(-n) for forward-looking operations. Last N bars will have null labels (no future data).

Source code in src/signalflow/target/base.py
@abstractmethod
def compute_group(
    self, group_df: pl.DataFrame, data_context: dict[str, Any] | None
) -> pl.DataFrame:
    """Compute labels for single pair group.

    Core labeling logic - must be implemented by subclasses.

    CRITICAL: Must preserve row count (len(output) == len(input)).
    No filtering allowed inside compute_group.

    Args:
        group_df (pl.DataFrame): Single pair's data, sorted by timestamp.
        data_context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Same length as input with added label columns.

    Example:
        ```python
        def compute_group(self, group_df, data_context=None):
            # Compute 10-bar forward return
            return group_df.with_columns([
                pl.col("close").shift(-10).alias("future_close")
            ]).with_columns([
                ((pl.col("future_close") / pl.col("close")) - 1).alias("return"),
                pl.when((pl.col("future_close") / pl.col("close") - 1) > 0.01)
                .then(pl.lit(SignalType.RISE.value))
                .otherwise(pl.lit(SignalType.NONE.value))
                .alias("label")
            ])
        ```

    Note:
        Output must have same height as input (length-preserving).
        Use shift(-n) for forward-looking operations.
        Last N bars will have null labels (no future data).
    """
    raise NotImplementedError

Labeling Strategies

Fixed Horizon

signalflow.target.fixed_horizon_labeler.FixedHorizonLabeler dataclass

FixedHorizonLabeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t1', 'ret'), price_col: str = 'close', horizon: int = 60)

Bases: Labeler

Fixed-Horizon Labeling

label[t0] = sign(close[t0 + horizon] - close[t0])

If signals provided, labels are written only on signal rows, while horizon is computed on full series (per pair).

horizon class-attribute instance-attribute

horizon: int = 60

meta_columns class-attribute instance-attribute

meta_columns: tuple[str, ...] = ('t1', 'ret')

price_col class-attribute instance-attribute

price_col: str = 'close'

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/fixed_horizon_labeler.py
def __post_init__(self) -> None:
    if self.horizon <= 0:
        raise ValueError("horizon must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame
Source code in src/signalflow/target/fixed_horizon_labeler.py
def compute_group(
    self, group_df: pl.DataFrame, data_context: dict[str, Any] | None
) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    h = int(self.horizon)
    price = pl.col(self.price_col)
    future_price = price.shift(-h)

    df = group_df.with_columns(future_price.alias("_future_price"))

    label_expr = (
        pl.when(
            pl.col("_future_price").is_null()
            | pl.col(self.price_col).is_null()
            | (pl.col(self.price_col) <= 0)
            | (pl.col("_future_price") <= 0)
        )
        .then(pl.lit(SignalType.NONE.value))
        .when(pl.col("_future_price") > pl.col(self.price_col))
        .then(pl.lit(SignalType.RISE.value))
        .when(pl.col("_future_price") < pl.col(self.price_col))
        .then(pl.lit(SignalType.FALL.value))
        .otherwise(pl.lit(SignalType.NONE.value))
    )

    df = df.with_columns(label_expr.alias(self.out_col))

    if self.include_meta:
        df = df.with_columns(
            [
                pl.col(self.ts_col).shift(-h).alias("t1"),
                pl.when(
                    pl.col("_future_price").is_not_null()
                    & (pl.col(self.price_col) > 0)
                    & (pl.col("_future_price") > 0)
                )
                .then((pl.col("_future_price") / pl.col(self.price_col)).log())
                .otherwise(pl.lit(None))
                .alias("ret"),
            ]
        )

    df = df.drop("_future_price")

    if (
        self.mask_to_signals
        and data_context is not None
        and "signal_keys" in data_context
    ):
        df = self._apply_signal_mask(df, data_context, group_df)

    return df

Triple Barrier (Dynamic)

signalflow.target.triple_barrier.TripleBarrierLabeler dataclass

TripleBarrierLabeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'), price_col: str = 'close', vol_window: int = 60, lookforward_window: int = 1440, profit_multiplier: float = 1.0, stop_loss_multiplier: float = 1.0)

Bases: Labeler

Triple-Barrier Labeling (De Prado), Numba-accelerated.

Volatility-based barriers
  • pt = close * exp(vol * profit_multiplier)
  • sl = close * exp(-vol * stop_loss_multiplier)

lookforward_window class-attribute instance-attribute

lookforward_window: int = 1440

price_col class-attribute instance-attribute

price_col: str = 'close'

profit_multiplier class-attribute instance-attribute

profit_multiplier: float = 1.0

stop_loss_multiplier class-attribute instance-attribute

stop_loss_multiplier: float = 1.0

vol_window class-attribute instance-attribute

vol_window: int = 60

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/triple_barrier.py
def __post_init__(self) -> None:
    if self.vol_window <= 1:
        raise ValueError("vol_window must be > 1")
    if self.lookforward_window <= 0:
        raise ValueError("lookforward_window must be > 0")
    if self.profit_multiplier <= 0 or self.stop_loss_multiplier <= 0:
        raise ValueError("profit_multiplier/stop_loss_multiplier must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

_apply_labels

_apply_labels(df: DataFrame) -> pl.DataFrame

Apply RISE/FALL/NONE labels based on barrier hits.

Source code in src/signalflow/target/triple_barrier.py
def _apply_labels(self, df: pl.DataFrame) -> pl.DataFrame:
    """Apply RISE/FALL/NONE labels based on barrier hits."""
    choose_up = pl.col("_up_off").is_not_null() & (
        pl.col("_dn_off").is_null() | (pl.col("_up_off") <= pl.col("_dn_off"))
    )
    choose_dn = pl.col("_dn_off").is_not_null() & (
        pl.col("_up_off").is_null() | (pl.col("_dn_off") < pl.col("_up_off"))
    )

    return df.with_columns(
        pl.when(choose_up)
        .then(pl.lit(SignalType.RISE.value))
        .when(choose_dn)
        .then(pl.lit(SignalType.FALL.value))
        .otherwise(pl.lit(SignalType.NONE.value))
        .alias(self.out_col)
    )

_compute_meta

_compute_meta(df: DataFrame, prices: ndarray, up_off_series: Series, dn_off_series: Series, lf: int) -> pl.DataFrame

Compute t_hit and ret meta columns.

Source code in src/signalflow/target/triple_barrier.py
def _compute_meta(
    self,
    df: pl.DataFrame,
    prices: np.ndarray,
    up_off_series: pl.Series,
    dn_off_series: pl.Series,
    lf: int,
) -> pl.DataFrame:
    """Compute t_hit and ret meta columns."""
    n = df.height
    ts_arr = df.get_column(self.ts_col).to_numpy()

    idx = np.arange(n)
    up_np = up_off_series.fill_null(0).to_numpy()
    dn_np = dn_off_series.fill_null(0).to_numpy()

    hit_off = np.where(
        (up_np > 0) & ((dn_np == 0) | (up_np <= dn_np)),
        up_np,
        np.where(dn_np > 0, dn_np, 0),
    )

    hit_idx = np.clip(idx + hit_off, 0, n - 1)
    vert_idx = np.clip(idx + lf, 0, n - 1)
    final_idx = np.where(hit_off > 0, hit_idx, vert_idx)

    t_hit = ts_arr[final_idx]
    ret = np.where(prices > 0, np.log(prices[final_idx] / prices), np.nan)

    return df.with_columns(
        [
            pl.Series("t_hit", t_hit),
            pl.Series("ret", ret),
        ]
    )

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame
Source code in src/signalflow/target/triple_barrier.py
def compute_group(
    self, group_df: pl.DataFrame, data_context: dict[str, Any] | None
) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    lf = int(self.lookforward_window)
    vw = int(self.vol_window)

    df = group_df.with_columns(
        (pl.col(self.price_col) / pl.col(self.price_col).shift(1))
        .log()
        .rolling_std(window_size=vw, ddof=1)
        .alias("_vol")
    ).with_columns(
        [
            (
                pl.col(self.price_col)
                * (pl.col("_vol") * self.profit_multiplier).exp()
            ).alias("_pt"),
            (
                pl.col(self.price_col)
                * (-pl.col("_vol") * self.stop_loss_multiplier).exp()
            ).alias("_sl"),
        ]
    )

    prices = df.get_column(self.price_col).to_numpy().astype(np.float64)
    pt = df.get_column("_pt").fill_null(np.nan).to_numpy().astype(np.float64)
    sl = df.get_column("_sl").fill_null(np.nan).to_numpy().astype(np.float64)

    up_off, dn_off = _find_first_hit(prices, pt, sl, lf)

    up_off_series = pl.Series("_up_off", up_off).replace(0, None).cast(pl.Int32)
    dn_off_series = pl.Series("_dn_off", dn_off).replace(0, None).cast(pl.Int32)

    df = df.with_columns([up_off_series, dn_off_series])

    df = self._apply_labels(df)

    if self.include_meta:
        df = self._compute_meta(df, prices, up_off_series, dn_off_series, lf)

    if (
        self.mask_to_signals
        and data_context is not None
        and "signal_keys" in data_context
    ):
        df = self._apply_signal_mask(df, data_context, group_df)

    drop_cols = ["_vol", "_pt", "_sl", "_up_off", "_dn_off"]
    df = df.drop([c for c in drop_cols if c in df.columns])

    return df

Static Triple Barrier

signalflow.target.static_triple_barrier.StaticTripleBarrierLabeler dataclass

StaticTripleBarrierLabeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'), price_col: str = 'close', lookforward_window: int = 1440, profit_pct: float = 0.01, stop_loss_pct: float = 0.01)

Bases: Labeler

Triple-Barrier (first-touch) labeling with STATIC horizontal barriers. Numba-accelerated version.

De Prado's framework: - Vertical barrier at t1 = t0 + lookforward_window - Horizontal barriers defined as % from initial price at t0: pt = close[t0] * (1 + profit_pct) sl = close[t0] * (1 - stop_loss_pct) - Label by first touch within (t0, t1]: RISE if PT touched first (ties -> PT) FALL if SL touched first NONE if none touched by t1

lookforward_window class-attribute instance-attribute

lookforward_window: int = 1440

price_col class-attribute instance-attribute

price_col: str = 'close'

profit_pct class-attribute instance-attribute

profit_pct: float = 0.01

stop_loss_pct class-attribute instance-attribute

stop_loss_pct: float = 0.01

__post_init__

__post_init__() -> None
Source code in src/signalflow/target/static_triple_barrier.py
def __post_init__(self) -> None:
    if self.lookforward_window <= 0:
        raise ValueError("lookforward_window must be > 0")
    if self.profit_pct <= 0 or self.stop_loss_pct <= 0:
        raise ValueError("profit_pct/stop_loss_pct must be > 0")

    cols = [self.out_col]
    if self.include_meta:
        cols += list(self.meta_columns)
    self.output_columns = cols

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame
Source code in src/signalflow/target/static_triple_barrier.py
def compute_group(
    self, group_df: pl.DataFrame, data_context: dict[str, Any] | None
) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column '{self.price_col}'")

    if group_df.height == 0:
        return group_df

    lf = int(self.lookforward_window)
    n = group_df.height

    prices = group_df.get_column(self.price_col).to_numpy().astype(np.float64)
    pt = prices * (1.0 + self.profit_pct)
    sl = prices * (1.0 - self.stop_loss_pct)

    up_off, dn_off = _find_first_hit_static(prices, pt, sl, lf)

    up_off_series = pl.Series("_up_off", up_off).replace(0, None).cast(pl.Int32)
    dn_off_series = pl.Series("_dn_off", dn_off).replace(0, None).cast(pl.Int32)

    df = group_df.with_columns([up_off_series, dn_off_series])

    choose_up = pl.col("_up_off").is_not_null() & (
        pl.col("_dn_off").is_null() | (pl.col("_up_off") <= pl.col("_dn_off"))
    )
    choose_dn = pl.col("_dn_off").is_not_null() & (
        pl.col("_up_off").is_null() | (pl.col("_dn_off") < pl.col("_up_off"))
    )

    df = df.with_columns(
        pl.when(choose_up)
        .then(pl.lit(SignalType.RISE.value))
        .when(choose_dn)
        .then(pl.lit(SignalType.FALL.value))
        .otherwise(pl.lit(SignalType.NONE.value))
        .alias(self.out_col)
    )

    if self.include_meta:
        ts_arr = group_df.get_column(self.ts_col).to_numpy()

        up_np = up_off_series.fill_null(0).to_numpy()
        dn_np = dn_off_series.fill_null(0).to_numpy()
        idx = np.arange(n)

        hit_off = np.where(
            (up_np > 0) & ((dn_np == 0) | (up_np <= dn_np)),
            up_np,
            np.where(dn_np > 0, dn_np, 0),
        )

        hit_idx = np.clip(idx + hit_off, 0, n - 1)
        vert_idx = np.clip(idx + lf, 0, n - 1)
        final_idx = np.where(hit_off > 0, hit_idx, vert_idx)

        t_hit = ts_arr[final_idx]
        ret = np.log(prices[final_idx] / prices)

        df = df.with_columns(
            [
                pl.Series("t_hit", t_hit),
                pl.Series("ret", ret),
            ]
        )

    if (
        self.mask_to_signals
        and data_context is not None
        and "signal_keys" in data_context
    ):
        df = self._apply_signal_mask(df, data_context, group_df)

    df = df.drop(["_up_off", "_dn_off"])

    return df