Target Module¶
Signal labeling strategies for machine learning training.
Module Name
The target functionality is implemented in the signalflow.target module.
Base Class¶
signalflow.target.base.Labeler
dataclass
¶
Labeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'))
Bases: ABC
Base class for Polars-only signal labeling.
Assigns forward-looking labels to historical data based on future price movement. Labels are computed per-pair with length-preserving operations.
Key concepts
- Forward-looking: Labels depend on future data (not available in live trading)
- Per-pair processing: Each pair labeled independently
- Length-preserving: Output has same row count as input
- Signal masking: Optionally label only at signal timestamps
Public API
- compute(): Main entry point (handles grouping, filtering, projection)
- compute_group(): Per-pair labeling logic (must implement)
Common labeling strategies
- Fixed horizon: Label based on return over N bars
- Triple barrier: Label based on first hit of profit/loss/time barrier
- Quantile-based: Label based on return quantiles
Attributes:
| Name | Type | Description |
|---|---|---|
component_type |
ClassVar[SfComponentType]
|
Always LABELER for registry. |
raw_data_type |
RawDataType
|
Type of raw data. Default: SPOT. |
pair_col |
str
|
Trading pair column. Default: "pair". |
ts_col |
str
|
Timestamp column. Default: "timestamp". |
keep_input_columns |
bool
|
Keep all input columns. Default: False. |
output_columns |
list[str] | None
|
Specific columns to output. Default: None. |
filter_signal_type |
SignalType | None
|
Filter to specific signal type. Default: None. |
mask_to_signals |
bool
|
Mask labels to signal timestamps only. Default: True. |
out_col |
str
|
Output label column name. Default: "label". |
include_meta |
bool
|
Include metadata columns. Default: False. |
meta_columns |
tuple[str, ...]
|
Metadata column names. Default: ("t_hit", "ret"). |
Example
from signalflow.target import Labeler
from signalflow.core import SignalType
import polars as pl
class FixedHorizonLabeler(Labeler):
'''Label based on fixed-horizon return'''
def __init__(self, horizon: int = 10, threshold: float = 0.01):
super().__init__()
self.horizon = horizon
self.threshold = threshold
def compute_group(self, group_df, data_context=None):
# Compute forward return
labels = group_df.with_columns([
pl.col("close").shift(-self.horizon).alias("future_close")
]).with_columns([
((pl.col("future_close") / pl.col("close")) - 1).alias("return")
]).with_columns([
pl.when(pl.col("return") > self.threshold)
.then(pl.lit(SignalType.RISE.value))
.when(pl.col("return") < -self.threshold)
.then(pl.lit(SignalType.FALL.value))
.otherwise(pl.lit(SignalType.NONE.value))
.alias("label")
])
return labels
# Usage
labeler = FixedHorizonLabeler(horizon=10, threshold=0.01)
labeled = labeler.compute(ohlcv_df, signals=signals)
Note
compute_group() must preserve row count (no filtering). All timestamps must be timezone-naive. Signal masking requires mask_to_signals=True and signal_keys in context.
See Also
FixedHorizonLabeler: Simple fixed-horizon implementation. TripleBarrierLabeler: Three-barrier labeling strategy.
filter_signal_type
class-attribute
instance-attribute
¶
_apply_signal_mask ¶
_apply_signal_mask(df: DataFrame, data_context: dict[str, Any], group_df: DataFrame) -> pl.DataFrame
Mask labels to signal timestamps only.
Labels are computed for all rows, but only signal timestamps get actual labels; others are set to SignalType.NONE.
Used for meta-labeling: only label at detected signal points, not every bar.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame with computed labels. |
required |
data_context
|
dict[str, Any]
|
Must contain "signal_keys" DataFrame. |
required |
group_df
|
DataFrame
|
Original group data for extracting pair value. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: DataFrame with masked labels. |
Example
# In compute_group with masking
def compute_group(self, group_df, data_context=None):
# Compute labels for all rows
labeled = group_df.with_columns([...])
# Mask to signal timestamps only
if self.mask_to_signals and data_context:
labeled = self._apply_signal_mask(
labeled, data_context, group_df
)
return labeled
Note
Requires signal_keys in data_context with (pair, timestamp) columns. Non-signal rows get label=SignalType.NONE. Metadata columns also masked if include_meta=True.
Source code in src/signalflow/target/base.py
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 | |
_compute_pl ¶
_compute_pl(df: DataFrame, signals: Signals | None, data_context: dict[str, Any] | None) -> pl.DataFrame
Internal Polars-based computation.
Orchestrates validation, filtering, grouping, and projection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input data. |
required |
signals
|
Signals | None
|
Optional signals. |
required |
data_context
|
dict[str, Any] | None
|
Optional context. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Labeled data. |
Source code in src/signalflow/target/base.py
_filter_by_signals_pl ¶
Filter input to rows matching signal timestamps.
Inner join with signal timestamps of specific type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input data. |
required |
s
|
DataFrame
|
Signals DataFrame. |
required |
signal_type
|
SignalType
|
Signal type to filter. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Filtered data (only rows at signal timestamps). |
Raises:
| Type | Description |
|---|---|
ValueError
|
If signals missing required columns. |
Source code in src/signalflow/target/base.py
_signals_to_pl ¶
Convert Signals to Polars DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signals
|
Signals
|
Signals container. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Signals as DataFrame. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If Signals.value is not pl.DataFrame. |
Source code in src/signalflow/target/base.py
_validate_input_pl ¶
Validate input DataFrame schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input to validate. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If required columns missing. |
Source code in src/signalflow/target/base.py
compute ¶
compute(df: DataFrame, signals: Signals | None = None, data_context: dict[str, Any] | None = None) -> pl.DataFrame
Compute labels for input DataFrame.
Main entry point - handles validation, filtering, grouping, and projection.
Processing steps
- Validate input schema
- Sort by (pair, timestamp)
- (optional) Filter to specific signal type
- Group by pair and apply compute_group()
- Validate output (length-preserving)
- Project to output columns
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input data with OHLCV and required columns. |
required |
signals
|
Signals | None
|
Signals for filtering/masking. |
None
|
data_context
|
dict[str, Any] | None
|
Additional context. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Labeled data with columns: - pair, timestamp (always included) - label column(s) (as specified by out_col) - (optional) metadata columns |
Raises:
| Type | Description |
|---|---|
TypeError
|
If df not pl.DataFrame or compute_group returns wrong type. |
ValueError
|
If compute_group changes row count or columns missing. |
Example
Source code in src/signalflow/target/base.py
compute_group
abstractmethod
¶
Compute labels for single pair group.
Core labeling logic - must be implemented by subclasses.
CRITICAL: Must preserve row count (len(output) == len(input)). No filtering allowed inside compute_group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_df
|
DataFrame
|
Single pair's data, sorted by timestamp. |
required |
data_context
|
dict[str, Any] | None
|
Additional context. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Same length as input with added label columns. |
Example
def compute_group(self, group_df, data_context=None):
# Compute 10-bar forward return
return group_df.with_columns([
pl.col("close").shift(-10).alias("future_close")
]).with_columns([
((pl.col("future_close") / pl.col("close")) - 1).alias("return"),
pl.when((pl.col("future_close") / pl.col("close") - 1) > 0.01)
.then(pl.lit(SignalType.RISE.value))
.otherwise(pl.lit(SignalType.NONE.value))
.alias("label")
])
Note
Output must have same height as input (length-preserving). Use shift(-n) for forward-looking operations. Last N bars will have null labels (no future data).
Source code in src/signalflow/target/base.py
Labeling Strategies¶
Fixed Horizon¶
signalflow.target.fixed_horizon_labeler.FixedHorizonLabeler
dataclass
¶
FixedHorizonLabeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t1', 'ret'), price_col: str = 'close', horizon: int = 60)
Bases: Labeler
Fixed-Horizon Labeling
label[t0] = sign(close[t0 + horizon] - close[t0])
If signals provided, labels are written only on signal rows, while horizon is computed on full series (per pair).
__post_init__ ¶
compute_group ¶
Source code in src/signalflow/target/fixed_horizon_labeler.py
Triple Barrier (Dynamic)¶
signalflow.target.triple_barrier.TripleBarrierLabeler
dataclass
¶
TripleBarrierLabeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'), price_col: str = 'close', vol_window: int = 60, lookforward_window: int = 1440, profit_multiplier: float = 1.0, stop_loss_multiplier: float = 1.0)
Bases: Labeler
Triple-Barrier Labeling (De Prado), Numba-accelerated.
__post_init__ ¶
Source code in src/signalflow/target/triple_barrier.py
_apply_labels ¶
Apply RISE/FALL/NONE labels based on barrier hits.
Source code in src/signalflow/target/triple_barrier.py
_compute_meta ¶
_compute_meta(df: DataFrame, prices: ndarray, up_off_series: Series, dn_off_series: Series, lf: int) -> pl.DataFrame
Compute t_hit and ret meta columns.
Source code in src/signalflow/target/triple_barrier.py
compute_group ¶
Source code in src/signalflow/target/triple_barrier.py
Static Triple Barrier¶
signalflow.target.static_triple_barrier.StaticTripleBarrierLabeler
dataclass
¶
StaticTripleBarrierLabeler(raw_data_type: RawDataType = RawDataType.SPOT, pair_col: str = 'pair', ts_col: str = 'timestamp', keep_input_columns: bool = False, output_columns: list[str] | None = None, filter_signal_type: SignalType | None = None, mask_to_signals: bool = True, out_col: str = 'label', include_meta: bool = False, meta_columns: tuple[str, ...] = ('t_hit', 'ret'), price_col: str = 'close', lookforward_window: int = 1440, profit_pct: float = 0.01, stop_loss_pct: float = 0.01)
Bases: Labeler
Triple-Barrier (first-touch) labeling with STATIC horizontal barriers. Numba-accelerated version.
De Prado's framework: - Vertical barrier at t1 = t0 + lookforward_window - Horizontal barriers defined as % from initial price at t0: pt = close[t0] * (1 + profit_pct) sl = close[t0] * (1 - stop_loss_pct) - Label by first touch within (t0, t1]: RISE if PT touched first (ties -> PT) FALL if SL touched first NONE if none touched by t1