Skip to content

Feature Module

Feature extraction for technical indicators and derived metrics.

Base Classes

signalflow.feature.base.Feature dataclass

Feature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None)

Bases: KwargsTolerantMixin

Base class for all features.

Two methods to implement
  • compute(df): all pairs, abstract for GlobalFeature/Pipeline
  • compute_pair(df): one pair, for regular features

Attributes:

Name Type Description
requires list[str]

Input column templates, e.g. ["{price_col}"]

outputs list[str]

Output column templates, e.g. ["rsi_{period}"]

normalized bool

If True, apply rolling z-score normalization to output.

norm_period int | None

Window for normalization. Defaults to 3x feature period.

component_type class-attribute

component_type: SfComponentType = FEATURE

group_col class-attribute instance-attribute

group_col: str = 'pair'

norm_period class-attribute instance-attribute

norm_period: int | None = None

normalized class-attribute instance-attribute

normalized: bool = False

outputs class-attribute

outputs: list[str] = []

requires class-attribute

requires: list[str] = []

test_params class-attribute

test_params: list[dict] = []

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

warmup property

warmup: int

Minimum bars needed before output is stable.

Override in subclasses with feature-specific logic. Default: 0 (no warmup required).

compute

compute(df: DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame

Compute feature for all pairs

Source code in src/signalflow/feature/base.py
def compute(self, df: pl.DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute feature for all pairs"""
    return df.group_by(self.group_col, maintain_order=True).map_groups(self.compute_pair)

compute_pair

compute_pair(df: DataFrame) -> pl.DataFrame

Compute feature for single pair. Override for per-pair features.

Source code in src/signalflow/feature/base.py
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute feature for single pair. Override for per-pair features."""
    raise NotImplementedError(f"{self.__class__.__name__} must implement compute_pair()")

output_cols

output_cols(prefix: str = '') -> list[str]

Actual output column names with parameter substitution.

Source code in src/signalflow/feature/base.py
def output_cols(self, prefix: str = "") -> list[str]:
    """Actual output column names with parameter substitution."""
    return [f"{prefix}{tpl.format(**self.__dict__)}" for tpl in self.outputs]

required_cols

required_cols() -> list[str]

Actual required column names with parameter substitution.

Source code in src/signalflow/feature/base.py
def required_cols(self) -> list[str]:
    """Actual required column names with parameter substitution."""
    return [tpl.format(**self.__dict__) if "{" in tpl else tpl for tpl in self.requires]

signalflow.feature.feature_pipeline.FeaturePipeline dataclass

FeaturePipeline(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, features: list[Feature] = list(), raw_data_type: RawDataType | str = RawDataType.SPOT)

Bases: Feature

Orchestrates multiple features with optimized execution.

Groups consecutive per-pair features into batches for single group_by.

Parameters:

Name Type Description Default
features list[Feature]

List of features to compute.

list()
raw_data_type RawDataType | str

Type of raw data (defines available columns).

SPOT
Example

pipeline = FeaturePipeline( ... features=[ ... RsiFeature(period=14), ... SmaFeature(period=20), ... GlobalFeature(base=RsiFeature(period=14), reference_pair="BTCUSDT"), ... ], ... raw_data_type=RawDataType.SPOT, ... ) df = pipeline.run(raw_data_view)

features class-attribute instance-attribute

features: list[Feature] = field(default_factory=list)

outputs property

outputs: list[str]

Aggregated outputs from all features.

raw_data_type class-attribute instance-attribute

raw_data_type: RawDataType | str = SPOT

requires class-attribute

requires: list[str] = []

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/feature_pipeline.py
def __post_init__(self) -> None:
    if not self.features:
        raise ValueError("FeaturePipeline requires at least one feature")
    self._validate()

_group_into_batches

_group_into_batches() -> list[list[Feature]]

Group features: consecutive per-pair → batch, global → separate.

Source code in src/signalflow/feature/feature_pipeline.py
def _group_into_batches(self) -> list[list[Feature]]:
    """Group features: consecutive per-pair → batch, global → separate."""
    batches = []
    current_batch: list[Feature] = []

    for f in self.features:
        is_global = isinstance(f, (GlobalFeature, FeaturePipeline)) or getattr(f, "_is_global", False)

        if is_global:
            if current_batch:
                batches.append(current_batch)
                current_batch = []
            batches.append([f])
        else:
            current_batch.append(f)

    if current_batch:
        batches.append(current_batch)

    return batches

_is_per_pair_batch

_is_per_pair_batch(batch: list[Feature]) -> bool

Check if batch contains only per-pair features.

Source code in src/signalflow/feature/feature_pipeline.py
def _is_per_pair_batch(self, batch: list[Feature]) -> bool:
    """Check if batch contains only per-pair features."""
    return not any(
        isinstance(f, (GlobalFeature, FeaturePipeline)) or getattr(f, "_is_global", False) for f in batch
    )

_validate

_validate() -> None

Validate all dependencies are satisfied.

Source code in src/signalflow/feature/feature_pipeline.py
def _validate(self) -> None:
    """Validate all dependencies are satisfied."""
    available = default_registry.get_raw_data_columns(self.raw_data_type)

    for f in self.features:
        required = set(f.required_cols())
        missing = required - available

        if missing:
            raise ValueError(f"{f.__class__.__name__} requires {missing}, available: {sorted(available)}")

        available.update(f.output_cols())

compute

compute(df: DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame

Compute all features with optimized batching.

Source code in src/signalflow/feature/feature_pipeline.py
def compute(self, df: pl.DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute all features with optimized batching."""
    df = df.sort([self.group_col, self.ts_col])

    batches = self._group_into_batches()

    for batch in batches:
        if self._is_per_pair_batch(batch):

            def apply_batch(pair_df: pl.DataFrame, features: list[Feature] = batch) -> pl.DataFrame:
                for f in features:
                    pair_df = f.compute_pair(pair_df)
                return pair_df

            df = df.group_by(self.group_col, maintain_order=True).map_groups(apply_batch)
        else:
            for f in batch:
                df = f.compute(df, context=context)

    return df

output_cols

output_cols(prefix: str = '') -> list[str]
Source code in src/signalflow/feature/feature_pipeline.py
def output_cols(self, prefix: str = "") -> list[str]:
    return [f"{prefix}{col}" for col in self.outputs]

run

run(raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Entry point: load from RawDataView and compute.

Source code in src/signalflow/feature/feature_pipeline.py
def run(self, raw_data_view: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Entry point: load from RawDataView and compute."""
    key = getattr(self.raw_data_type, "value", self.raw_data_type)
    df = raw_data_view.to_polars(key)
    return self.compute(df)

to_mermaid

to_mermaid() -> str

Generate Mermaid diagram of feature dependencies.

Source code in src/signalflow/feature/feature_pipeline.py
def to_mermaid(self) -> str:
    """Generate Mermaid diagram of feature dependencies."""
    lines = ["graph LR"]
    lines.append("    subgraph Input")
    for col in sorted(default_registry.get_raw_data_columns(self.raw_data_type)):
        lines.append(f"        {col}[{col}]")
    lines.append("    end")

    for f in self.features:
        name = f.__class__.__name__
        if hasattr(f, "period"):
            name = f"{name}_{f.period}"

        for req in f.required_cols():
            lines.append(f"    {req} --> {name}")
        for out in f.output_cols():
            lines.append(f"    {name} --> {out}[{out}]")

    return "\n".join(lines)

signalflow.feature.base.GlobalFeature dataclass

GlobalFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, sources: list[str] | None = None)

Bases: Feature

Base class for features computed across all pairs.

Override compute() with custom aggregation logic.

For multi-source features, set sources to specify which exchanges to use. Use get_source_data() to retrieve data from RawData with proper handling.

Attributes:

Name Type Description
sources list[str] | None

List of source names to use (e.g., ["binance", "okx"]). If None, uses default source or all available sources.

Example
@dataclass
class AggregatedOI(GlobalFeature):
    sources: list[str] | None = None

    def compute_from_raw(self, raw: RawData, context=None) -> pl.DataFrame:
        # Get data from specified sources
        for source, df in self.iter_sources(raw, "perpetual"):
            ...

sources class-attribute instance-attribute

sources: list[str] | None = field(default=None)

compute

compute(df: DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame

Must override - compute global feature across all pairs.

Source code in src/signalflow/feature/base.py
def compute(self, df: pl.DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Must override - compute global feature across all pairs."""
    raise NotImplementedError(f"{self.__class__.__name__} must implement compute()")

get_source_data

get_source_data(raw: RawData, data_type: str, source: str | None = None) -> pl.DataFrame

Get DataFrame from RawData for a specific source.

Parameters:

Name Type Description Default
raw RawData

RawData container.

required
data_type str

Data type key (e.g., "perpetual", "spot").

required
source str | None

Specific source name. If None, uses default.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Data for the specified source.

Source code in src/signalflow/feature/base.py
def get_source_data(
    self,
    raw: "RawData",
    data_type: str,
    source: str | None = None,
) -> pl.DataFrame:
    """Get DataFrame from RawData for a specific source.

    Args:
        raw: RawData container.
        data_type: Data type key (e.g., "perpetual", "spot").
        source: Specific source name. If None, uses default.

    Returns:
        pl.DataFrame: Data for the specified source.
    """
    if source is not None:
        return raw.get(data_type, source=source)
    return raw.get(data_type)

iter_sources

iter_sources(raw: RawData, data_type: str) -> Iterator[tuple[str, pl.DataFrame]]

Iterate over source DataFrames from RawData.

If self.sources is set, iterates only those sources. Otherwise, iterates all available sources.

Parameters:

Name Type Description Default
raw RawData

RawData container.

required
data_type str

Data type key (e.g., "perpetual").

required

Yields:

Type Description
tuple[str, DataFrame]

tuple[str, pl.DataFrame]: (source_name, DataFrame) pairs.

Example
for source, df in self.iter_sources(raw, "perpetual"):
    print(f"{source}: {df.shape}")
Source code in src/signalflow/feature/base.py
def iter_sources(
    self,
    raw: "RawData",
    data_type: str,
) -> Iterator[tuple[str, pl.DataFrame]]:
    """Iterate over source DataFrames from RawData.

    If `self.sources` is set, iterates only those sources.
    Otherwise, iterates all available sources.

    Args:
        raw: RawData container.
        data_type: Data type key (e.g., "perpetual").

    Yields:
        tuple[str, pl.DataFrame]: (source_name, DataFrame) pairs.

    Example:
        ```python
        for source, df in self.iter_sources(raw, "perpetual"):
            print(f"{source}: {df.shape}")
        ```
    """
    if data_type not in raw:
        return

    accessor = getattr(raw, data_type)

    # Determine which sources to iterate
    sources_to_use = self.sources if self.sources else accessor.sources

    for source in sources_to_use:
        if source in accessor:
            yield source, getattr(accessor, source)

signalflow.feature.offset_feature.OffsetFeature dataclass

OffsetFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, feature_name: str | None = None, feature_params: dict = dict(), window: int = 15, prefix: str | None = None)

Bases: Feature

Multi-timeframe feature via offset resampling.

Creates window parallel time series with different offsets. Each offset computes features as if on window-minute bars.

Supports both regular Feature (compute_pair) and GlobalFeature (compute).

Parameters:

Name Type Description Default
feature_name str | None

Registered component name (sf_component name).

None
feature_params dict

Parameters for feature class.

dict()
window int

Aggregation window in minutes. Default: 15.

15
prefix str | None

Prefix for output columns. Default: "{window}m_".

None
Example

offset = OffsetFeature( ... feature_name="test_rsi", ... feature_params={"period": 14}, ... window=15, ... )

Outputs: 15m_rsi_14, offset

With GlobalFeature

offset = OffsetFeature( ... feature_name="global/market_log_return", ... feature_params={}, ... window=15, ... )

feature_name class-attribute instance-attribute

feature_name: str | None = None

feature_params class-attribute instance-attribute

feature_params: dict = field(default_factory=dict)

outputs class-attribute

outputs: list[str] = ['offset']

prefix class-attribute instance-attribute

prefix: str | None = None

requires class-attribute

requires: list[str] = ['open', 'high', 'low', 'close', 'volume', 'timestamp']

window class-attribute instance-attribute

window: int = 15

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/offset_feature.py
def __post_init__(self) -> None:
    if self.feature_name is None:
        raise ValueError("OffsetFeature requires 'feature_name'")

    self._feature_cls = default_registry.get(SfComponentType.FEATURE, self.feature_name)
    self._base = self._feature_cls(**self.feature_params)
    self._is_global = isinstance(self._base, GlobalFeature)

    if self.prefix is None:
        self.prefix = f"{self.window}m_"

_compute_all_pairs_global

_compute_all_pairs_global(df: DataFrame) -> pl.DataFrame

Compute features for all pairs with global base feature.

Source code in src/signalflow/feature/offset_feature.py
def _compute_all_pairs_global(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute features for all pairs with global base feature."""
    df = df.sort([self.group_col, self.ts_col])
    original_len = len(df)

    df = df.with_columns(
        pl.col(self.ts_col).rank("ordinal").over(self.group_col).cast(pl.UInt32).alias("_orig_idx") - 1
    )

    df = df.with_columns((pl.col("_orig_idx") % self.window).cast(pl.UInt8).alias("offset"))

    offset_results = []
    for offset in range(self.window):
        resampled = (
            df.drop(["_orig_idx", "offset"])
            .with_columns(
                (pl.col(self.ts_col).rank("ordinal").over(self.group_col).cast(pl.Int64) - 1 - offset)
                .floordiv(self.window)
                .alias("_grp")
            )
            .group_by([self.group_col, "_grp"], maintain_order=True)
            .agg(
                [
                    pl.col(self.ts_col).last(),
                    pl.col("open").first(),
                    pl.col("high").max(),
                    pl.col("low").min(),
                    pl.col("close").last(),
                    pl.col("volume").sum(),
                ]
            )
        )

        with_feat = self._compute_base_feature(resampled)
        with_feat = with_feat.with_columns(pl.lit(offset).cast(pl.UInt8).alias("_offset"))

        for col in self._base.output_cols():
            if col in with_feat.columns:
                with_feat = with_feat.rename({col: f"{self.prefix}{col}"})

        offset_results.append(with_feat)

    all_offsets = pl.concat(offset_results)

    df = df.with_columns(
        ((pl.col("_orig_idx").cast(pl.Int64) - pl.col("offset").cast(pl.Int64)) // self.window).alias("_grp")
    )

    feature_cols = [f"{self.prefix}{col}" for col in self._base.output_cols()]

    result = df.join(
        all_offsets.select([self.group_col, "_grp", "_offset", *feature_cols]),
        left_on=[self.group_col, "_grp", "offset"],
        right_on=[self.group_col, "_grp", "_offset"],
        how="left",
    )

    result = result.drop(["_orig_idx", "_grp"])
    assert len(result) == original_len

    return result

_compute_base_feature

_compute_base_feature(resampled: DataFrame) -> pl.DataFrame

Compute base feature - handles both Feature and GlobalFeature.

Source code in src/signalflow/feature/offset_feature.py
def _compute_base_feature(self, resampled: pl.DataFrame) -> pl.DataFrame:
    """Compute base feature - handles both Feature and GlobalFeature."""
    if self._is_global:
        return cast(pl.DataFrame, self._base.compute(resampled))
    else:
        return cast(pl.DataFrame, self._base.compute_pair(resampled))

_compute_single_pair

_compute_single_pair(df: DataFrame) -> pl.DataFrame

Compute features for single pair (non-global base).

Source code in src/signalflow/feature/offset_feature.py
def _compute_single_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute features for single pair (non-global base)."""
    df = df.sort(self.ts_col)
    original_len = len(df)
    df = df.with_row_index("_orig_idx")

    df = df.with_columns((pl.col("_orig_idx") % self.window).cast(pl.UInt8).alias("offset"))

    offset_results = []
    for offset in range(self.window):
        resampled = self._resample_ohlcv(df.drop(["_orig_idx", "offset"]), offset)

        with_feat = self._compute_base_feature(resampled)
        with_feat = with_feat.with_columns(pl.lit(offset).cast(pl.UInt8).alias("_offset"))

        for col in self._base.output_cols():
            if col in with_feat.columns:
                with_feat = with_feat.rename({col: f"{self.prefix}{col}"})

        offset_results.append(with_feat)

    all_offsets = pl.concat(offset_results)

    df = df.with_columns(
        ((pl.col("_orig_idx").cast(pl.Int64) - pl.col("offset").cast(pl.Int64)) // self.window).alias("_grp")
    )

    feature_cols = [f"{self.prefix}{col}" for col in self._base.output_cols()]

    result = df.join(
        all_offsets.select(["_grp", "_offset", *feature_cols]),
        left_on=["_grp", "offset"],
        right_on=["_grp", "_offset"],
        how="left",
    )

    result = result.drop(["_orig_idx", "_grp"])
    assert len(result) == original_len

    return result

_resample_ohlcv

_resample_ohlcv(df: DataFrame, offset: int) -> pl.DataFrame

Resample 1m OHLCV to window-minute bars with given offset.

Source code in src/signalflow/feature/offset_feature.py
def _resample_ohlcv(self, df: pl.DataFrame, offset: int) -> pl.DataFrame:
    """Resample 1m OHLCV to window-minute bars with given offset."""
    df = df.with_row_index("_row_idx")

    df = df.with_columns(((pl.col("_row_idx").cast(pl.Int64) - offset) // self.window).alias("_grp"))

    agg_exprs = [
        pl.col(self.ts_col).last(),
        pl.col("open").first(),
        pl.col("high").max(),
        pl.col("low").min(),
        pl.col("close").last(),
        pl.col("volume").sum(),
    ]
    if self.group_col in df.columns:
        agg_exprs.append(pl.col(self.group_col).first())

    return df.group_by("_grp", maintain_order=True).agg(agg_exprs)

compute

compute(df: DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame

Compute for all pairs.

Source code in src/signalflow/feature/offset_feature.py
def compute(self, df: pl.DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute for all pairs."""
    if self._is_global:
        return self._compute_all_pairs_global(df)
    else:
        return df.group_by(self.group_col, maintain_order=True).map_groups(self._compute_single_pair)

compute_pair

compute_pair(df: DataFrame) -> pl.DataFrame

Compute for single pair (only for non-global base).

Source code in src/signalflow/feature/offset_feature.py
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute for single pair (only for non-global base)."""
    if self._is_global:
        raise NotImplementedError("GlobalFeature base requires compute(), not compute_pair()")
    return self._compute_single_pair(df)

from_dict classmethod

from_dict(data: dict) -> OffsetFeature

Deserialize from config.

Source code in src/signalflow/feature/offset_feature.py
@classmethod
def from_dict(cls, data: dict) -> "OffsetFeature":
    """Deserialize from config."""
    return cls(
        feature_name=data["feature_name"],
        feature_params=data["feature_params"],
        window=data["window"],
        prefix=data.get("prefix"),
    )

output_cols

output_cols(prefix: str = '') -> list[str]
Source code in src/signalflow/feature/offset_feature.py
def output_cols(self, prefix: str = "") -> list[str]:
    base_cols = self._base.output_cols(prefix=f"{prefix}{self.prefix}")
    return [*base_cols, f"{prefix}offset"]

required_cols

required_cols() -> list[str]
Source code in src/signalflow/feature/offset_feature.py
def required_cols(self) -> list[str]:
    return ["open", "high", "low", "close", "volume", self.ts_col]

to_dict

to_dict() -> dict

Serialize for Kedro.

Source code in src/signalflow/feature/offset_feature.py
def to_dict(self) -> dict:
    """Serialize for Kedro."""
    return {
        "feature_name": self.feature_name,
        "feature_params": self.feature_params,
        "window": self.window,
        "prefix": self.prefix,
    }

signalflow.feature.lin_reg_forecast.LinRegForecastFeature dataclass

LinRegForecastFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, source_col: str = 'rsi_14', n_lags: int = 10, trend_window: int = 5, mean_window: int = 20, refit_period: Literal['hour', 'day', 'week', 'month', None] = 'day', alpha: float = 1.0, forecast_horizon: int = 1, min_samples: int = 50)

Bases: Feature

Enhanced linear regression forecast with trend and mean-reversion features.

Instead of predicting raw values, predicts change (diff) and adds: - Trend slope (recent momentum) - Mean reversion signal (deviation from rolling mean) - Volatility scaling

Parameters:

Name Type Description Default
source_col str

Column to forecast.

'rsi_14'
n_lags int

Number of lagged diffs. Default: 10.

10
trend_window int

Window for trend calculation. Default: 5.

5
mean_window int

Window for mean reversion. Default: 20.

20
refit_period Literal['hour', 'day', 'week', 'month', None]

When to refit. Default: 'day'.

'day'
alpha float

Ridge regularization. Default: 1.0.

1.0
forecast_horizon int

Steps ahead to forecast. Default: 1.

1

alpha class-attribute instance-attribute

alpha: float = 1.0

forecast_horizon class-attribute instance-attribute

forecast_horizon: int = 1

mean_window class-attribute instance-attribute

mean_window: int = 20

min_samples class-attribute instance-attribute

min_samples: int = 50

n_lags class-attribute instance-attribute

n_lags: int = 10

outputs class-attribute

outputs: list[str] = ['{source_col}_forecast', '{source_col}_forecast_change', '{source_col}_forecast_direction']

refit_period class-attribute instance-attribute

refit_period: Literal['hour', 'day', 'week', 'month', None] = 'day'

requires class-attribute

requires: list[str] = ['{source_col}']

source_col class-attribute instance-attribute

source_col: str = 'rsi_14'

test_params class-attribute

test_params: list[dict] = [{'source_col': 'rsi_14', 'n_lags': 10}, {'source_col': 'rsi_14', 'n_lags': 5, 'mean_window': 10}]

trend_window class-attribute instance-attribute

trend_window: int = 5

warmup property

warmup: int

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/lin_reg_forecast.py
def __post_init__(self) -> None:
    if self.n_lags < 1:
        raise ValueError("n_lags must be >= 1")

_build_features

_build_features(values: ndarray) -> np.ndarray

Build enhanced feature matrix.

Source code in src/signalflow/feature/lin_reg_forecast.py
def _build_features(self, values: np.ndarray) -> np.ndarray:
    """Build enhanced feature matrix."""
    n = len(values)

    diffs = np.diff(values, prepend=values[0])

    n_features = self.n_lags + 3
    X = np.full((n, n_features), np.nan, dtype=np.float64)

    start_idx = max(self.n_lags, self.mean_window)

    for i in range(start_idx, n):
        for lag in range(self.n_lags):
            X[i, lag] = diffs[i - lag - 1]

        window = values[i - self.trend_window : i]
        if len(window) == self.trend_window:
            x_trend = np.arange(self.trend_window)
            X[i, self.n_lags] = np.polyfit(x_trend, window, 1)[0]

        mean_window = values[i - self.mean_window : i]
        if len(mean_window) == self.mean_window:
            mean_val = np.mean(mean_window)
            std_val = np.std(mean_window)
            if std_val > 1e-8:
                X[i, self.n_lags + 1] = (values[i] - mean_val) / std_val
            else:
                X[i, self.n_lags + 1] = 0

        vol_window = diffs[i - self.trend_window : i]
        if len(vol_window) == self.trend_window:
            X[i, self.n_lags + 2] = np.std(vol_window)

    return X

_build_targets

_build_targets(values: ndarray) -> np.ndarray

Build target: forward diff (change).

Source code in src/signalflow/feature/lin_reg_forecast.py
def _build_targets(self, values: np.ndarray) -> np.ndarray:
    """Build target: forward diff (change)."""
    n = len(values)
    y = np.full(n, np.nan, dtype=np.float64)

    if self.forecast_horizon < n:
        y[: -self.forecast_horizon] = np.diff(
            values, n=self.forecast_horizon, append=[np.nan] * self.forecast_horizon
        )[: -self.forecast_horizon]
        for i in range(n - self.forecast_horizon):
            y[i] = values[i + self.forecast_horizon] - values[i]

    return y

_get_period_key

_get_period_key(ts: datetime) -> tuple | None
Source code in src/signalflow/feature/lin_reg_forecast.py
def _get_period_key(self, ts: datetime) -> tuple | None:
    if self.refit_period == "hour":
        return (ts.year, ts.month, ts.day, ts.hour)
    elif self.refit_period == "day":
        return (ts.year, ts.month, ts.day)
    elif self.refit_period == "week":
        return (ts.year, ts.isocalendar()[1])
    elif self.refit_period == "month":
        return (ts.year, ts.month)
    return None

compute_pair

compute_pair(df: DataFrame) -> pl.DataFrame

Compute forecasts for single pair.

Source code in src/signalflow/feature/lin_reg_forecast.py
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute forecasts for single pair."""
    values = df[self.source_col].to_numpy().astype(np.float64)
    timestamps = df[self.ts_col].to_list()
    n = len(values)

    X = self._build_features(values)
    y = self._build_targets(values)

    forecasts = np.full(n, np.nan, dtype=np.float64)
    forecast_changes = np.full(n, np.nan, dtype=np.float64)

    valid_mask = ~np.any(np.isnan(X), axis=1)
    target_valid = ~np.isnan(y)
    train_valid = valid_mask & target_valid

    current_period = None
    model = Ridge(alpha=self.alpha)
    fitted = False

    for i in range(self.min_samples, n):
        if not valid_mask[i]:
            continue

        period = self._get_period_key(timestamps[i])

        if period != current_period:
            current_period = period
            train_idx = np.where(train_valid[:i])[0]
            if len(train_idx) >= 20:
                model.fit(X[train_idx], y[train_idx])
                fitted = True

        if fitted:
            predicted_change = model.predict(X[i : i + 1])[0]
            forecast_changes[i] = predicted_change
            forecasts[i] = values[i] + predicted_change

    forecast_col = f"{self.source_col}_forecast"
    change_col = f"{self.source_col}_forecast_change"
    direction_col = f"{self.source_col}_forecast_direction"

    return df.with_columns(
        [
            pl.Series(name=forecast_col, values=forecasts),
            pl.Series(name=change_col, values=forecast_changes),
            pl.Series(name=direction_col, values=np.sign(forecast_changes)),
        ]
    )

signalflow.feature.atr.ATRFeature dataclass

ATRFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 14, smoothing: Literal['sma', 'ema'] = 'ema')

Bases: Feature

Average True Range (ATR) feature.

Measures market volatility as the moving average of True Range. True Range = max(high - low, |high - prev_close|, |low - prev_close|)

Parameters:

Name Type Description Default
period int

ATR period. Default: 14.

14
smoothing Literal['sma', 'ema']

Smoothing method - "sma" or "ema" (Wilder's). Default: "ema".

'ema'
Example

atr = ATRFeature(period=14) atr.output_cols() # ["atr_14"]

outputs class-attribute

outputs: list[str] = ['atr_{period}']

period class-attribute instance-attribute

period: int = 14

requires class-attribute

requires: list[str] = ['high', 'low', 'close']

smoothing class-attribute instance-attribute

smoothing: Literal['sma', 'ema'] = 'ema'

test_params class-attribute

test_params: list[dict] = [{'period': 14}, {'period': 14, 'smoothing': 'sma'}, {'period': 20}]

warmup property

warmup: int

_get_output_name

_get_output_name() -> str
Source code in src/signalflow/feature/atr.py
def _get_output_name(self) -> str:
    suffix = "_norm" if self.normalized else ""
    return f"atr_{self.period}{suffix}"

compute_pair

compute_pair(df: DataFrame) -> pl.DataFrame

Compute ATR for a single pair.

Source code in src/signalflow/feature/atr.py
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute ATR for a single pair."""
    col_name = self._get_output_name()

    high = pl.col("high")
    low = pl.col("low")
    prev_close = pl.col("close").shift(1)

    # True Range = max(H-L, |H-prevC|, |L-prevC|)
    tr = pl.max_horizontal(
        high - low,
        (high - prev_close).abs(),
        (low - prev_close).abs(),
    )

    # Apply smoothing
    if self.smoothing == "sma":
        atr = tr.rolling_mean(window_size=self.period)
    else:
        # EMA (Wilder's smoothing)
        atr = tr.ewm_mean(span=self.period, adjust=False)

    df = df.with_columns(atr.alias(col_name))

    # Optional z-score normalization
    if self.normalized:
        from signalflow.feature.examples import _get_norm_window, _normalize_zscore

        norm_window = self.norm_period or _get_norm_window(self.period)
        vals = df[col_name].to_numpy()
        normed = _normalize_zscore(vals, window=norm_window)
        df = df.with_columns(pl.Series(name=col_name, values=normed))

    return df

Examples

signalflow.feature.examples.ExampleRsiFeature dataclass

ExampleRsiFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 14, price_col: str = 'close')

Bases: Feature

Relative Strength Index.

Bounded oscillator [0, 100]. In normalized mode, rescales to [-1, 1].

Parameters:

Name Type Description Default
period int

RSI period. Default: 14.

14
price_col str

Price column to use. Default: "close".

'close'
Example

rsi = ExampleRsiFeature(period=21) rsi.output_cols() # ["rsi_21"]

outputs class-attribute

outputs: list[str] = ['rsi_{period}']

period class-attribute instance-attribute

period: int = 14

price_col class-attribute instance-attribute

price_col: str = 'close'

requires class-attribute

requires: list[str] = ['{price_col}']

test_params class-attribute

test_params: list[dict] = [{'period': 14}, {'period': 14, 'normalized': True}, {'period': 21}]

warmup property

warmup: int

_get_output_name

_get_output_name() -> str
Source code in src/signalflow/feature/examples.py
def _get_output_name(self) -> str:
    suffix = "_norm" if self.normalized else ""
    return f"rsi_{self.period}{suffix}"

compute

compute(df: DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame

Compute RSI for all pairs.

Source code in src/signalflow/feature/examples.py
def compute(self, df: pl.DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Compute RSI for all pairs."""
    return df.group_by(self.group_col, maintain_order=True).map_groups(self.compute_pair)

compute_pair

compute_pair(df: DataFrame) -> pl.DataFrame

Compute RSI for single pair.

Source code in src/signalflow/feature/examples.py
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    """Compute RSI for single pair."""
    col_name = self._get_output_name()

    delta = pl.col(self.price_col).diff()
    gain = pl.when(delta > 0).then(delta).otherwise(0)
    loss = pl.when(delta < 0).then(-delta).otherwise(0)

    avg_gain = gain.rolling_mean(window_size=self.period)
    avg_loss = loss.rolling_mean(window_size=self.period)

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))

    df = df.with_columns(rsi.alias(col_name))

    # Normalization: rescale bounded [0, 100] → [-1, 1]
    if self.normalized:
        df = df.with_columns(((pl.col(col_name) - 50) / 50).alias(col_name))

    return df

signalflow.feature.examples.ExampleSmaFeature dataclass

ExampleSmaFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 20, price_col: str = 'close')

Bases: Feature

Simple Moving Average.

outputs class-attribute

outputs: list[str] = ['sma_{period}']

period class-attribute instance-attribute

period: int = 20

price_col class-attribute instance-attribute

price_col: str = 'close'

requires class-attribute

requires: list[str] = ['{price_col}']

test_params class-attribute

test_params: list[dict] = [{'period': 20}, {'period': 50}]

warmup property

warmup: int

_get_output_name

_get_output_name() -> str
Source code in src/signalflow/feature/examples.py
def _get_output_name(self) -> str:
    suffix = "_norm" if self.normalized else ""
    return f"sma_{self.period}{suffix}"

compute_pair

compute_pair(df: DataFrame) -> pl.DataFrame
Source code in src/signalflow/feature/examples.py
def compute_pair(self, df: pl.DataFrame) -> pl.DataFrame:
    col_name = self._get_output_name()
    sma = pl.col(self.price_col).rolling_mean(window_size=self.period)
    df = df.with_columns(sma.alias(col_name))

    if self.normalized:
        norm_window = self.norm_period or _get_norm_window(self.period)
        vals = df[col_name].to_numpy()
        normed = _normalize_zscore(vals, window=norm_window)
        df = df.with_columns(pl.Series(name=col_name, values=normed))

    return df

signalflow.feature.examples.ExampleGlobalMeanRsiFeature dataclass

ExampleGlobalMeanRsiFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, sources: list[str] | None = None, period: int = 14, price_col: str = 'close', add_diff: bool = False)

Bases: GlobalFeature

Mean RSI across all pairs per timestamp.

  1. Compute RSI per pair
  2. Mean across all pairs at time t -> global_mean_rsi
  3. Optionally: rsi_diff = pair_rsi - global_mean_rsi

Parameters:

Name Type Description Default
period int

RSI period. Default: 14.

14
add_diff bool

Add per-pair difference column. Default: False.

False

add_diff class-attribute instance-attribute

add_diff: bool = False

outputs class-attribute

outputs: list[str] = ['global_mean_rsi_{period}']

period class-attribute instance-attribute

period: int = 14

price_col class-attribute instance-attribute

price_col: str = 'close'

requires class-attribute

requires: list[str] = ['{price_col}']

test_params class-attribute

test_params: list[dict] = [{'period': 14}, {'period': 14, 'add_diff': True}]

warmup property

warmup: int

compute

compute(df: DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame
Source code in src/signalflow/feature/examples.py
def compute(self, df: pl.DataFrame, context: dict[str, Any] | None = None) -> pl.DataFrame:
    rsi_col = f"rsi_{self.period}"
    out_col = f"global_mean_rsi_{self.period}"

    has_rsi = rsi_col in df.columns
    if not has_rsi:
        rsi = ExampleRsiFeature(period=self.period, price_col=self.price_col)
        df = rsi.compute(df)

    mean_df = df.group_by(self.ts_col).agg(pl.col(rsi_col).mean().alias(out_col))

    df = df.join(mean_df, on=self.ts_col, how="left")

    if self.add_diff:
        df = df.with_columns((pl.col(rsi_col) - pl.col(out_col)).alias(f"rsi_{self.period}_diff"))

    if not has_rsi:
        df = df.drop(rsi_col)

    return df

output_cols

output_cols(prefix: str = '') -> list[str]
Source code in src/signalflow/feature/examples.py
def output_cols(self, prefix: str = "") -> list[str]:
    cols = [f"{prefix}global_mean_rsi_{self.period}"]
    if self.add_diff:
        cols.append(f"{prefix}rsi_{self.period}_diff")
    return cols

Feature Informativeness

Measures how informative each feature is relative to multiple targets at multiple prediction horizons. Combines MI magnitude with temporal stability into a composite score.

Usage

from signalflow.feature.informativeness import FeatureInformativenessAnalyzer
from signalflow.detector.market import MarketZScoreDetector

analyzer = FeatureInformativenessAnalyzer(
    event_detector=MarketZScoreDetector(z_threshold=3.0),
)
report = analyzer.analyze(df, feature_columns=["rsi_14", "sma_20", "volume_ratio"])

print(report.top_features(10))      # best features by composite score
print(report.score_matrix)          # NMI heatmap: feature x (horizon, target)
report.feature_detail("rsi_14")     # per-target breakdown for one feature

signalflow.feature.informativeness.FeatureInformativenessAnalyzer dataclass

FeatureInformativenessAnalyzer(target_generator: MultiTargetGenerator = MultiTargetGenerator(), event_detector: SignalDetector | None = _default_event_detector(), rolling_mi: RollingMIConfig = RollingMIConfig(), weights: CompositeWeights = CompositeWeights(), bins: int = 20, pair_col: str = 'pair', ts_col: str = 'timestamp', aggregate_pairs: bool = True)

Orchestrator for feature informativeness analysis.

Pipeline
  1. Generate multi-horizon, multi-target labels
  2. Detect and mask global events
  3. Compute MI between each feature and each target
  4. Compute rolling MI for temporal stability
  5. Compute composite weighted scores
  6. Generate report

Attributes:

Name Type Description
target_generator MultiTargetGenerator

Multi-target label generator.

event_detector SignalDetector | None

Global event detector. None to disable.

rolling_mi RollingMIConfig

Rolling MI stability configuration.

weights CompositeWeights

Composite scoring weights.

bins int

Number of histogram bins for MI estimation.

pair_col str

Pair column name.

ts_col str

Timestamp column name.

aggregate_pairs bool

If True, pool all pairs for MI computation.

aggregate_pairs class-attribute instance-attribute

aggregate_pairs: bool = True

bins class-attribute instance-attribute

bins: int = 20

event_detector class-attribute instance-attribute

event_detector: SignalDetector | None = field(default_factory=_default_event_detector)

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

rolling_mi class-attribute instance-attribute

rolling_mi: RollingMIConfig = field(default_factory=RollingMIConfig)

target_generator class-attribute instance-attribute

target_generator: MultiTargetGenerator = field(default_factory=MultiTargetGenerator)

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

weights class-attribute instance-attribute

weights: CompositeWeights = field(default_factory=CompositeWeights)

_build_score_matrix

_build_score_matrix(raw_mi: DataFrame) -> pl.DataFrame

Build pivoted Feature x (Horizon, Target) matrix.

Source code in src/signalflow/feature/informativeness.py
def _build_score_matrix(self, raw_mi: pl.DataFrame) -> pl.DataFrame:
    """Build pivoted Feature x (Horizon, Target) matrix."""
    if raw_mi.height == 0:
        return pl.DataFrame()

    matrix = raw_mi.with_columns((pl.col("horizon") + "_" + pl.col("target_type")).alias("_col_key")).pivot(
        on="_col_key",
        index="feature",
        values="nmi",
    )

    return matrix

_compute_all_mi

_compute_all_mi(df: DataFrame, feature_columns: list[str], target_meta: list[dict[str, str]]) -> list[dict]

Compute MI for all (feature, target) pairs.

Source code in src/signalflow/feature/informativeness.py
def _compute_all_mi(
    self,
    df: pl.DataFrame,
    feature_columns: list[str],
    target_meta: list[dict[str, str]],
) -> list[dict]:
    """Compute MI for all (feature, target) pairs."""
    rows = []

    for feat_col in feature_columns:
        for tmeta in target_meta:
            target_col = tmeta["column"]
            target_kind = tmeta["kind"]

            feat_arr, target_arr = self._extract_arrays(df, feat_col, target_col)
            if feat_arr is None or target_arr is None:
                rows.append(self._nan_row(feat_col, tmeta))
                continue

            mi = self._compute_mi_pair(feat_arr, target_arr, target_kind)
            h_feat = entropy_continuous(feat_arr, bins=self.bins)
            h_target = (
                entropy_discrete(target_arr)
                if target_kind == "discrete"
                else entropy_continuous(target_arr, bins=self.bins)
            )
            nmi = normalized_mutual_information(mi, h_feat, h_target)

            stability = self._compute_stability(feat_arr, target_arr, target_kind)

            rows.append(
                {
                    "feature": feat_col,
                    "horizon": tmeta["horizon"],
                    "target_type": tmeta["target_type"],
                    "mi": mi,
                    "nmi": nmi,
                    "stability_score": stability,
                }
            )

    return rows

_compute_composite

_compute_composite(raw_mi: DataFrame) -> pl.DataFrame

Compute composite scores from raw MI results.

Source code in src/signalflow/feature/informativeness.py
def _compute_composite(self, raw_mi: pl.DataFrame) -> pl.DataFrame:
    """Compute composite scores from raw MI results."""
    if raw_mi.height == 0:
        return pl.DataFrame(schema={"feature": pl.Utf8, "composite_score": pl.Float64, "rank": pl.UInt32})

    w = self.weights
    alpha = w.stability_weight

    # Build weights per (horizon, target_type)
    horizons = raw_mi.get_column("horizon").unique().to_list()
    targets = raw_mi.get_column("target_type").unique().to_list()

    h_weights = w.horizon_weights or {h: 1.0 / len(horizons) for h in horizons}
    t_weights = w.target_weights or {t: 1.0 / len(targets) for t in targets}

    # Normalize
    h_total = sum(h_weights.values())
    t_total = sum(t_weights.values())
    h_weights = {k: v / h_total for k, v in h_weights.items()}
    t_weights = {k: v / t_total for k, v in t_weights.items()}

    scored = (
        raw_mi.with_columns(
            [
                pl.col("horizon").replace_strict(h_weights, default=0.0).alias("_h_w"),
                pl.col("target_type").replace_strict(t_weights, default=0.0).alias("_t_w"),
            ]
        )
        .with_columns((pl.col("_h_w") * pl.col("_t_w")).alias("_weight"))
        .with_columns(
            ((1.0 - alpha) * pl.col("nmi").fill_null(0.0) + alpha * pl.col("stability_score").fill_null(0.0)).alias(
                "_cell_score"
            )
        )
        .with_columns((pl.col("_cell_score") * pl.col("_weight")).alias("_weighted_score"))
    )

    result = (
        scored.group_by("feature")
        .agg(pl.col("_weighted_score").sum().alias("composite_score"))
        .sort("composite_score", descending=True)
        .with_row_index("rank", offset=1)
        .select(["feature", "composite_score", "rank"])
    )

    return result

_compute_mi_pair

_compute_mi_pair(feat: ndarray, target: ndarray, target_kind: str) -> float

Compute MI between one feature and one target.

Source code in src/signalflow/feature/informativeness.py
def _compute_mi_pair(
    self,
    feat: np.ndarray,
    target: np.ndarray,
    target_kind: str,
) -> float:
    """Compute MI between one feature and one target."""
    if target_kind == "discrete":
        return mutual_information_continuous_discrete(feat, target, bins=self.bins)
    return mutual_information_continuous(feat, target, bins=self.bins)

_compute_stability

_compute_stability(feat: ndarray, target: ndarray, target_kind: str) -> float

Compute temporal stability via rolling MI windows.

Source code in src/signalflow/feature/informativeness.py
def _compute_stability(
    self,
    feat: np.ndarray,
    target: np.ndarray,
    target_kind: str,
) -> float:
    """Compute temporal stability via rolling MI windows."""
    cfg = self.rolling_mi
    n = len(feat)
    step = cfg.window_size
    min_fill = int(step * cfg.min_window_fill)

    mi_values = []
    for start in range(0, n - min_fill + 1, step):
        end = min(start + step, n)
        f_win = feat[start:end]
        t_win = target[start:end]

        # Check fill rate
        valid = np.isfinite(f_win).sum() if np.issubdtype(f_win.dtype, np.floating) else len(f_win)

        if valid < min_fill:
            continue

        mi = self._compute_mi_pair(f_win, t_win, target_kind)
        if not np.isnan(mi):
            mi_values.append(mi)

    if len(mi_values) < 2:
        return np.nan

    mean_mi = np.mean(mi_values)
    std_mi = np.std(mi_values)

    if mean_mi <= 0:
        return 0.0

    cv = std_mi / mean_mi
    return float(1.0 / (1.0 + cv))

_extract_arrays

_extract_arrays(df: DataFrame, feat_col: str, target_col: str) -> tuple[np.ndarray | None, np.ndarray | None]

Extract aligned numpy arrays, dropping rows with nulls in either.

Source code in src/signalflow/feature/informativeness.py
def _extract_arrays(
    self,
    df: pl.DataFrame,
    feat_col: str,
    target_col: str,
) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Extract aligned numpy arrays, dropping rows with nulls in either."""
    if feat_col not in df.columns or target_col not in df.columns:
        return None, None

    subset = df.select([feat_col, target_col]).drop_nulls()
    if subset.height < 10:
        return None, None

    feat_arr = subset.get_column(feat_col).to_numpy().astype(np.float64)
    target_series = subset.get_column(target_col)

    if target_series.dtype == pl.Utf8:
        target_arr = target_series.to_numpy()
    else:
        target_arr = target_series.to_numpy().astype(np.float64)

    return feat_arr, target_arr

_nan_row

_nan_row(feat_col: str, tmeta: dict) -> dict
Source code in src/signalflow/feature/informativeness.py
def _nan_row(self, feat_col: str, tmeta: dict) -> dict:
    return {
        "feature": feat_col,
        "horizon": tmeta["horizon"],
        "target_type": tmeta["target_type"],
        "mi": np.nan,
        "nmi": np.nan,
        "stability_score": np.nan,
    }

_validate

_validate(df: DataFrame, feature_columns: list[str]) -> None
Source code in src/signalflow/feature/informativeness.py
def _validate(self, df: pl.DataFrame, feature_columns: list[str]) -> None:
    if not feature_columns:
        raise ValueError("feature_columns must not be empty")

    required = {self.pair_col, self.ts_col}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {sorted(missing)}")

    missing_features = [c for c in feature_columns if c not in df.columns]
    if missing_features:
        raise ValueError(f"Feature columns not found in DataFrame: {missing_features}")

analyze

analyze(df: DataFrame, feature_columns: list[str]) -> InformativenessReport

Run full informativeness analysis.

Parameters:

Name Type Description Default
df DataFrame

OHLCV DataFrame with pre-computed feature columns.

required
feature_columns list[str]

List of feature column names to evaluate.

required

Returns:

Type Description
InformativenessReport

InformativenessReport with all results.

Raises:

Type Description
ValueError

If required columns are missing or feature_columns is empty.

Source code in src/signalflow/feature/informativeness.py
def analyze(
    self,
    df: pl.DataFrame,
    feature_columns: list[str],
) -> InformativenessReport:
    """Run full informativeness analysis.

    Args:
        df: OHLCV DataFrame with pre-computed feature columns.
        feature_columns: List of feature column names to evaluate.

    Returns:
        InformativenessReport with all results.

    Raises:
        ValueError: If required columns are missing or feature_columns is empty.
    """
    self._validate(df, feature_columns)

    # 1. Generate targets
    logger.info("Generating multi-horizon targets...")
    df = self.target_generator.generate(df)
    target_meta = self.target_generator.target_columns()

    # 2. Detect and mask global events
    global_events = None
    if self.event_detector is not None:
        logger.info("Detecting global events...")
        # Convert DataFrame to RawDataView for SignalDetector
        raw_view = _df_to_raw_data_view(df, self.pair_col, self.ts_col)
        signals = self.event_detector.run(raw_view)
        global_events = signals.value

        # Get all target columns
        target_columns = [meta["column"] for meta in target_meta]

        # Mask targets using the maximum horizon
        max_horizon = max(h.horizon for h in self.target_generator.horizons)

        df = mask_targets_by_signals(
            df=df,
            signals=signals,
            mask_signal_types=self.event_detector.allowed_signal_types or set(),  # type: ignore[attr-defined]
            horizon_bars=max_horizon,
            cooldown_bars=60,
            target_columns=target_columns,
            pair_col=self.pair_col,
            ts_col=self.ts_col,
        )

    # 3-4. Compute MI and rolling stability
    logger.info(f"Computing MI for {len(feature_columns)} features x {len(target_meta)} targets...")
    mi_rows = self._compute_all_mi(df, feature_columns, target_meta)
    raw_mi = pl.DataFrame(mi_rows)

    # 5. Composite scoring
    logger.info("Computing composite scores...")
    composite_scores = self._compute_composite(raw_mi)
    score_matrix = self._build_score_matrix(raw_mi)

    n_events = 0
    if global_events is not None:
        n_events = global_events.height

    metadata = {
        "n_features": len(feature_columns),
        "n_horizons": len(self.target_generator.horizons),
        "n_target_types": len(self.target_generator.target_types),
        "n_global_events": n_events,
        "bins": self.bins,
        "aggregate_pairs": self.aggregate_pairs,
    }

    logger.info("Informativeness analysis complete.")
    return InformativenessReport(
        raw_mi=raw_mi,
        composite_scores=composite_scores,
        score_matrix=score_matrix,
        global_events=global_events,
        metadata=metadata,
    )

signalflow.feature.informativeness.InformativenessReport dataclass

InformativenessReport(raw_mi: DataFrame, composite_scores: DataFrame, score_matrix: DataFrame, global_events: DataFrame | None, metadata: dict)

Container for informativeness analysis results.

Attributes:

Name Type Description
raw_mi DataFrame

Full MI results (feature x horizon x target).

composite_scores DataFrame

Aggregated scores per feature, ranked.

score_matrix DataFrame

Pivoted Feature x (Horizon, Target) matrix.

global_events DataFrame | None

Global event detection results (if enabled).

metadata dict

Analysis configuration and statistics.

composite_scores instance-attribute

composite_scores: DataFrame

global_events instance-attribute

global_events: DataFrame | None

metadata instance-attribute

metadata: dict

raw_mi instance-attribute

raw_mi: DataFrame

score_matrix instance-attribute

score_matrix: DataFrame

bottom_features

bottom_features(n: int = 20) -> pl.DataFrame

Return bottom N features (least informative).

Source code in src/signalflow/feature/informativeness.py
def bottom_features(self, n: int = 20) -> pl.DataFrame:
    """Return bottom N features (least informative)."""
    return self.composite_scores.tail(n)

feature_detail

feature_detail(feature_name: str) -> pl.DataFrame

Return detailed MI breakdown for a single feature.

Source code in src/signalflow/feature/informativeness.py
def feature_detail(self, feature_name: str) -> pl.DataFrame:
    """Return detailed MI breakdown for a single feature."""
    return self.raw_mi.filter(pl.col("feature") == feature_name)

top_features

top_features(n: int = 20) -> pl.DataFrame

Return top N features by composite score.

Source code in src/signalflow/feature/informativeness.py
def top_features(self, n: int = 20) -> pl.DataFrame:
    """Return top N features by composite score."""
    return self.composite_scores.head(n)

signalflow.feature.informativeness.RollingMIConfig dataclass

RollingMIConfig(window_size: int = 5000, min_window_fill: float = 0.7)

Configuration for rolling MI stability computation.

Attributes:

Name Type Description
window_size int

Number of bars per rolling window.

min_window_fill float

Minimum fraction of non-null values in a window.

signalflow.feature.informativeness.CompositeWeights dataclass

CompositeWeights(horizon_weights: dict[str, float] | None = None, target_weights: dict[str, float] | None = None, stability_weight: float = 0.3)

Weights for composite informativeness scoring.

Attributes:

Name Type Description
horizon_weights dict[str, float] | None

Per-horizon weights. None = equal weights.

target_weights dict[str, float] | None

Per-target weights. None = equal weights.

stability_weight float

Fraction of score from stability (rest from NMI).

Mutual Information Functions

signalflow.feature.mutual_information

Mutual Information estimation for feature-target pairs.

Provides histogram-based MI estimation for continuous and discrete variables. Used by FeatureInformativenessAnalyzer to measure feature informativeness against multiple target types.

References
  • Cover & Thomas (2006) - Elements of Information Theory
  • Kraskov et al. (2004) - MI estimation

_bin_continuous

_bin_continuous(x: ndarray, bins: int) -> np.ndarray

Bin continuous values into integer bin indices.

Source code in src/signalflow/feature/mutual_information.py
def _bin_continuous(x: np.ndarray, bins: int) -> np.ndarray:
    """Bin continuous values into integer bin indices."""
    _, edges = np.histogram(x, bins=bins)
    return np.clip(np.digitize(x, edges[:-1]) - 1, 0, bins - 1)

_isnan_any

_isnan_any(arr: ndarray) -> np.ndarray

Return boolean mask for NaN-like values in any dtype.

Source code in src/signalflow/feature/mutual_information.py
def _isnan_any(arr: np.ndarray) -> np.ndarray:
    """Return boolean mask for NaN-like values in any dtype."""
    if np.issubdtype(arr.dtype, np.floating):
        result: np.ndarray = np.isnan(arr)
        return result
    if arr.dtype == object:
        return np.array([v is None or (isinstance(v, float) and np.isnan(v)) for v in arr])
    return np.zeros(len(arr), dtype=bool)

_mi_from_contingency

_mi_from_contingency(x: ndarray, y: ndarray) -> float

Compute MI from two discrete arrays via contingency table.

Source code in src/signalflow/feature/mutual_information.py
def _mi_from_contingency(x: np.ndarray, y: np.ndarray) -> float:
    """Compute MI from two discrete arrays via contingency table."""
    x_vals, x_idx = np.unique(x, return_inverse=True)
    y_vals, y_idx = np.unique(y, return_inverse=True)

    contingency = np.zeros((len(x_vals), len(y_vals)), dtype=np.float64)
    np.add.at(contingency, (x_idx, y_idx), 1)

    pxy = contingency / contingency.sum()
    px = pxy.sum(axis=1)
    py = pxy.sum(axis=0)

    outer = px[:, None] * py[None, :]
    valid = (pxy > 0) & (outer > 0)
    mi = np.sum(pxy[valid] * np.log2(pxy[valid] / outer[valid]))
    return float(max(mi, 0.0))

entropy_continuous

entropy_continuous(x: ndarray, bins: int = 20) -> float

Shannon entropy via histogram of a continuous variable.

Parameters:

Name Type Description Default
x ndarray

1D array of continuous values.

required
bins int

Number of histogram bins.

20

Returns:

Type Description
float

Entropy in bits. NaN if fewer than 2 valid values.

Source code in src/signalflow/feature/mutual_information.py
def entropy_continuous(x: np.ndarray, bins: int = 20) -> float:
    """Shannon entropy via histogram of a continuous variable.

    Args:
        x: 1D array of continuous values.
        bins: Number of histogram bins.

    Returns:
        Entropy in bits. NaN if fewer than 2 valid values.
    """
    x = x[np.isfinite(x)]
    if len(x) < 2:
        return np.nan

    counts, _ = np.histogram(x, bins=bins)
    probs = counts / counts.sum()
    probs = probs[probs > 0]
    return float(-np.sum(probs * np.log2(probs)))

entropy_discrete

entropy_discrete(x: ndarray) -> float

Shannon entropy of a discrete distribution.

H(X) = -sum_x p(x) * log2(p(x))

Parameters:

Name Type Description Default
x ndarray

1D array of discrete values.

required

Returns:

Type Description
float

Entropy in bits. NaN if fewer than 2 values.

Source code in src/signalflow/feature/mutual_information.py
def entropy_discrete(x: np.ndarray) -> float:
    """Shannon entropy of a discrete distribution.

    H(X) = -sum_x p(x) * log2(p(x))

    Args:
        x: 1D array of discrete values.

    Returns:
        Entropy in bits. NaN if fewer than 2 values.
    """
    x = x[~_isnan_any(x)]
    if len(x) < 2:
        return np.nan

    _, counts = np.unique(x, return_counts=True)
    probs = counts / counts.sum()
    probs = probs[probs > 0]
    return float(-np.sum(probs * np.log2(probs)))

mutual_information_continuous

mutual_information_continuous(x: ndarray, y: ndarray, bins: int = 20) -> float

MI between two continuous variables.

Bins both variables and computes MI from the 2D histogram.

Parameters:

Name Type Description Default
x ndarray

1D continuous array.

required
y ndarray

1D continuous array.

required
bins int

Number of bins per dimension.

20

Returns:

Type Description
float

MI in bits. NaN if insufficient data.

Source code in src/signalflow/feature/mutual_information.py
def mutual_information_continuous(
    x: np.ndarray,
    y: np.ndarray,
    bins: int = 20,
) -> float:
    """MI between two continuous variables.

    Bins both variables and computes MI from the 2D histogram.

    Args:
        x: 1D continuous array.
        y: 1D continuous array.
        bins: Number of bins per dimension.

    Returns:
        MI in bits. NaN if insufficient data.
    """
    mask = np.isfinite(x) & np.isfinite(y)
    x, y = x[mask], y[mask]
    if len(x) < 2:
        return np.nan

    hist_2d, _, _ = np.histogram2d(x, y, bins=bins)
    pxy = hist_2d / hist_2d.sum()
    px = pxy.sum(axis=1)
    py = pxy.sum(axis=0)

    outer = px[:, None] * py[None, :]
    valid = (pxy > 0) & (outer > 0)
    mi = np.sum(pxy[valid] * np.log2(pxy[valid] / outer[valid]))
    return float(max(mi, 0.0))

mutual_information_continuous_discrete

mutual_information_continuous_discrete(x: ndarray, y: ndarray, bins: int = 20) -> float

MI between a continuous feature and a discrete target.

Bins the continuous variable, then computes MI from the joint contingency table of (binned_x, y).

This is the primary use case: continuous feature columns (RSI, SMA, etc.) against discrete labels (RISE/FALL/NONE).

Parameters:

Name Type Description Default
x ndarray

1D continuous feature array.

required
y ndarray

1D discrete target array.

required
bins int

Number of bins for the continuous variable.

20

Returns:

Type Description
float

MI in bits. NaN if insufficient data.

Source code in src/signalflow/feature/mutual_information.py
def mutual_information_continuous_discrete(
    x: np.ndarray,
    y: np.ndarray,
    bins: int = 20,
) -> float:
    """MI between a continuous feature and a discrete target.

    Bins the continuous variable, then computes MI from the
    joint contingency table of (binned_x, y).

    This is the primary use case: continuous feature columns
    (RSI, SMA, etc.) against discrete labels (RISE/FALL/NONE).

    Args:
        x: 1D continuous feature array.
        y: 1D discrete target array.
        bins: Number of bins for the continuous variable.

    Returns:
        MI in bits. NaN if insufficient data.
    """
    mask = np.isfinite(x) & ~_isnan_any(y)
    x, y = x[mask], y[mask]
    if len(x) < 2:
        return np.nan

    x_binned = _bin_continuous(x, bins)
    return _mi_from_contingency(x_binned, y)

mutual_information_discrete

mutual_information_discrete(x: ndarray, y: ndarray) -> float

MI between two discrete (categorical) arrays.

MI(X;Y) = sum_{x,y} p(x,y) * log2(p(x,y) / (p(x) * p(y)))

Parameters:

Name Type Description Default
x ndarray

1D discrete array.

required
y ndarray

1D discrete array of same length.

required

Returns:

Type Description
float

MI in bits. NaN if insufficient data.

Source code in src/signalflow/feature/mutual_information.py
def mutual_information_discrete(x: np.ndarray, y: np.ndarray) -> float:
    """MI between two discrete (categorical) arrays.

    MI(X;Y) = sum_{x,y} p(x,y) * log2(p(x,y) / (p(x) * p(y)))

    Args:
        x: 1D discrete array.
        y: 1D discrete array of same length.

    Returns:
        MI in bits. NaN if insufficient data.
    """
    mask = ~(_isnan_any(x) | _isnan_any(y))
    x, y = x[mask], y[mask]
    if len(x) < 2:
        return np.nan

    return _mi_from_contingency(x, y)

normalized_mutual_information

normalized_mutual_information(mi: float, h_x: float, h_y: float) -> float

Normalize MI to [0, 1] using NMI = MI / sqrt(H(X) * H(Y)).

Parameters:

Name Type Description Default
mi float

Raw mutual information value.

required
h_x float

Entropy of X.

required
h_y float

Entropy of Y.

required

Returns:

Type Description
float

NMI in [0, 1]. NaN if either entropy is zero or NaN.

Source code in src/signalflow/feature/mutual_information.py
def normalized_mutual_information(mi: float, h_x: float, h_y: float) -> float:
    """Normalize MI to [0, 1] using NMI = MI / sqrt(H(X) * H(Y)).

    Args:
        mi: Raw mutual information value.
        h_x: Entropy of X.
        h_y: Entropy of Y.

    Returns:
        NMI in [0, 1]. NaN if either entropy is zero or NaN.
    """
    if np.isnan(mi) or np.isnan(h_x) or np.isnan(h_y):
        return np.nan
    denom = np.sqrt(h_x * h_y)
    if denom <= 0:
        return np.nan
    return float(min(mi / denom, 1.0))