Feature Module¶
Feature extraction for technical indicators and derived metrics.
v2: where features live
flow no longer constructs features — the .features() builder method was
removed. Features now live inside a forecast artefact (pinned with its
weights) or as primitive parameters on a detector. The FeaturePipeline
class itself is unchanged: it remains the single computation engine and
can still be used directly to turn a DataFrame into feature columns. The new
FeatureSpec and ModelFeaturesPipeline add a reproducibility layer (recipe
+ hash) around that engine without duplicating any computation.
Base Classes¶
signalflow.feature.base.Feature
dataclass
¶
Feature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None)
Bases: KwargsTolerantMixin
Base class for all features.
Two methods to implement
- compute(df): all pairs, abstract for GlobalFeature/Pipeline
- compute_pair(df): one pair, for regular features
Attributes:
| Name | Type | Description |
|---|---|---|
requires |
list[str]
|
Input column templates, e.g. ["{price_col}"] |
outputs |
list[str]
|
Output column templates, e.g. ["rsi_{period}"] |
normalized |
bool
|
If True, apply rolling z-score normalization to output. |
norm_period |
int | None
|
Window for normalization. Defaults to 3x feature period. |
warmup
property
¶
Minimum bars needed before output is stable.
Override in subclasses with feature-specific logic. Default: 0 (no warmup required).
assert_reproducible ¶
Assert this feature honours the warmup reproducibility contract.
A recursive feature that does not guarantee entry-point invariance
(is_recursive and not warmup_invariant) will produce different
values in live vs. backtest depending on the warmup start point,
breaking parity. Such a feature raises here so the problem surfaces
before it reaches production.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
if the feature is recursive and not warmup-invariant. |
Source code in src/signalflow/feature/base.py
compute ¶
compute_pair ¶
Compute feature for single pair. Override for per-pair features.
output_cols ¶
required_cols ¶
signalflow.feature.feature_pipeline.FeaturePipeline
dataclass
¶
FeaturePipeline(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, features: list[Feature] = list(), raw_data_type: RawDataType | str = RawDataType.SPOT)
Bases: Feature
Orchestrates multiple features with optimized execution.
Groups consecutive per-pair features into batches for single group_by.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
list[Feature]
|
List of features to compute. |
list()
|
raw_data_type
|
RawDataType | str
|
Type of raw data (defines available columns). |
SPOT
|
Example
pipeline = FeaturePipeline( ... features=[ ... RsiFeature(period=14), ... SmaFeature(period=20), ... GlobalFeature(base=RsiFeature(period=14), reference_pair="BTCUSDT"), ... ], ... raw_data_type=RawDataType.SPOT, ... ) df = pipeline.run(raw_data_view)
__post_init__ ¶
_group_into_batches ¶
Group features: consecutive per-pair → batch, global → separate.
Source code in src/signalflow/feature/feature_pipeline.py
_is_per_pair_batch ¶
Check if batch contains only per-pair features.
Source code in src/signalflow/feature/feature_pipeline.py
_validate ¶
Validate all dependencies are satisfied.
Source code in src/signalflow/feature/feature_pipeline.py
assert_reproducible ¶
Assert every nested feature honours the warmup reproducibility contract.
Delegates to :meth:Feature.assert_reproducible for each nested feature
and aggregates the names of all offending features into a single error,
so a pipeline with one or more non-invariant recursive features fails
loudly with the specific culprits named.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
if any nested feature is recursive and not warmup-invariant. |
Source code in src/signalflow/feature/feature_pipeline.py
compute ¶
Compute all features with optimized batching.
Source code in src/signalflow/feature/feature_pipeline.py
output_cols ¶
run ¶
Entry point: load from RawDataView and compute.
Source code in src/signalflow/feature/feature_pipeline.py
to_mermaid ¶
Generate Mermaid diagram of feature dependencies.
Source code in src/signalflow/feature/feature_pipeline.py
signalflow.feature.base.GlobalFeature
dataclass
¶
GlobalFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, sources: list[str] | None = None)
Bases: Feature
Base class for features computed across all pairs.
Override compute() with custom aggregation logic.
For multi-source features, set sources to specify which exchanges to use.
Use get_source_data() to retrieve data from RawData with proper handling.
Attributes:
| Name | Type | Description |
|---|---|---|
sources |
list[str] | None
|
List of source names to use (e.g., ["binance", "okx"]). If None, uses default source or all available sources. |
Example
compute ¶
Must override - compute global feature across all pairs.
get_source_data ¶
Get DataFrame from RawData for a specific source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
RawData
|
RawData container. |
required |
data_type
|
str
|
Data type key (e.g., "perpetual", "spot"). |
required |
source
|
str | None
|
Specific source name. If None, uses default. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Data for the specified source. |
Source code in src/signalflow/feature/base.py
iter_sources ¶
Iterate over source DataFrames from RawData.
If self.sources is set, iterates only those sources.
Otherwise, iterates all available sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
RawData
|
RawData container. |
required |
data_type
|
str
|
Data type key (e.g., "perpetual"). |
required |
Yields:
| Type | Description |
|---|---|
tuple[str, DataFrame]
|
tuple[str, pl.DataFrame]: (source_name, DataFrame) pairs. |
Source code in src/signalflow/feature/base.py
signalflow.feature.offset_feature.OffsetFeature
dataclass
¶
OffsetFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, feature_name: str | None = None, feature_params: dict = dict(), window: int = 15, prefix: str | None = None)
Bases: Feature
Multi-timeframe feature via offset resampling.
Creates window parallel time series with different offsets.
Each offset computes features as if on window-minute bars.
Supports both regular Feature (compute_pair) and GlobalFeature (compute).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
feature_name
|
str | None
|
Registered component name (sf_component name). |
None
|
feature_params
|
dict
|
Parameters for feature class. |
dict()
|
window
|
int
|
Aggregation window in minutes. Default: 15. |
15
|
prefix
|
str | None
|
Prefix for output columns. Default: "{window}m_". |
None
|
Example
offset = OffsetFeature( ... feature_name="test_rsi", ... feature_params={"period": 14}, ... window=15, ... )
Outputs: 15m_rsi_14, offset¶
With GlobalFeature¶
offset = OffsetFeature( ... feature_name="global/market_log_return", ... feature_params={}, ... window=15, ... )
feature_params
class-attribute
instance-attribute
¶
requires
class-attribute
¶
__post_init__ ¶
Source code in src/signalflow/feature/offset_feature.py
_compute_all_pairs_global ¶
Compute features for all pairs with global base feature.
Source code in src/signalflow/feature/offset_feature.py
_compute_base_feature ¶
Compute base feature - handles both Feature and GlobalFeature.
Source code in src/signalflow/feature/offset_feature.py
_compute_single_pair ¶
Compute features for single pair (non-global base).
Source code in src/signalflow/feature/offset_feature.py
_resample_ohlcv ¶
Resample 1m OHLCV to window-minute bars with given offset.
Source code in src/signalflow/feature/offset_feature.py
compute ¶
Compute for all pairs.
Source code in src/signalflow/feature/offset_feature.py
compute_pair ¶
Compute for single pair (only for non-global base).
Source code in src/signalflow/feature/offset_feature.py
from_dict
classmethod
¶
Deserialize from config.
Source code in src/signalflow/feature/offset_feature.py
output_cols ¶
required_cols ¶
to_dict ¶
signalflow.feature.lin_reg_forecast.LinRegForecastFeature
dataclass
¶
LinRegForecastFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, source_col: str = 'rsi_14', n_lags: int = 10, trend_window: int = 5, mean_window: int = 20, refit_period: Literal['hour', 'day', 'week', 'month', None] = 'day', alpha: float = 1.0, forecast_horizon: int = 1, min_samples: int = 50)
Bases: Feature
Enhanced linear regression forecast with trend and mean-reversion features.
Instead of predicting raw values, predicts change (diff) and adds: - Trend slope (recent momentum) - Mean reversion signal (deviation from rolling mean) - Volatility scaling
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_col
|
str
|
Column to forecast. |
'rsi_14'
|
n_lags
|
int
|
Number of lagged diffs. Default: 10. |
10
|
trend_window
|
int
|
Window for trend calculation. Default: 5. |
5
|
mean_window
|
int
|
Window for mean reversion. Default: 20. |
20
|
refit_period
|
Literal['hour', 'day', 'week', 'month', None]
|
When to refit. Default: 'day'. |
'day'
|
alpha
|
float
|
Ridge regularization. Default: 1.0. |
1.0
|
forecast_horizon
|
int
|
Steps ahead to forecast. Default: 1. |
1
|
outputs
class-attribute
¶
outputs: list[str] = ['{source_col}_forecast', '{source_col}_forecast_change', '{source_col}_forecast_direction']
refit_period
class-attribute
instance-attribute
¶
test_params
class-attribute
¶
test_params: list[dict] = [{'source_col': 'rsi_14', 'n_lags': 10}, {'source_col': 'rsi_14', 'n_lags': 5, 'mean_window': 10}]
__post_init__ ¶
_build_features ¶
Build enhanced feature matrix.
Source code in src/signalflow/feature/lin_reg_forecast.py
_build_targets ¶
Build target: forward diff (change).
Source code in src/signalflow/feature/lin_reg_forecast.py
_get_period_key ¶
Source code in src/signalflow/feature/lin_reg_forecast.py
compute_pair ¶
Compute forecasts for single pair.
Source code in src/signalflow/feature/lin_reg_forecast.py
signalflow.feature.atr.ATRFeature
dataclass
¶
ATRFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 14, smoothing: Literal['sma', 'ema'] = 'ema')
Bases: Feature
Average True Range (ATR) feature.
Measures market volatility as the moving average of True Range. True Range = max(high - low, |high - prev_close|, |low - prev_close|)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
ATR period. Default: 14. |
14
|
smoothing
|
Literal['sma', 'ema']
|
Smoothing method - "sma" or "ema" (Wilder's). Default: "ema". |
'ema'
|
Example
atr = ATRFeature(period=14) atr.output_cols() # ["atr_14"]
test_params
class-attribute
¶
_get_output_name ¶
compute_pair ¶
Compute ATR for a single pair.
Source code in src/signalflow/feature/atr.py
Examples¶
signalflow.feature.examples.ExampleRsiFeature
dataclass
¶
ExampleRsiFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 14, price_col: str = 'close')
Bases: Feature
Relative Strength Index.
Bounded oscillator [0, 100]. In normalized mode, rescales to [-1, 1].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
RSI period. Default: 14. |
14
|
price_col
|
str
|
Price column to use. Default: "close". |
'close'
|
Example
rsi = ExampleRsiFeature(period=21) rsi.output_cols() # ["rsi_21"]
test_params
class-attribute
¶
_get_output_name ¶
compute ¶
compute_pair ¶
Compute RSI for single pair.
Source code in src/signalflow/feature/examples.py
signalflow.feature.examples.ExampleSmaFeature
dataclass
¶
ExampleSmaFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 20, price_col: str = 'close')
Bases: Feature
Simple Moving Average.
_get_output_name ¶
compute_pair ¶
Source code in src/signalflow/feature/examples.py
signalflow.feature.examples.ExampleGlobalMeanRsiFeature
dataclass
¶
ExampleGlobalMeanRsiFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, sources: list[str] | None = None, period: int = 14, price_col: str = 'close', add_diff: bool = False)
Bases: GlobalFeature
Mean RSI across all pairs per timestamp.
- Compute RSI per pair
- Mean across all pairs at time t -> global_mean_rsi
- Optionally: rsi_diff = pair_rsi - global_mean_rsi
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
RSI period. Default: 14. |
14
|
add_diff
|
bool
|
Add per-pair difference column. Default: False. |
False
|
test_params
class-attribute
¶
compute ¶
Source code in src/signalflow/feature/examples.py
output_cols ¶
Warmup Reproducibility Contract¶
Every Feature declares whether it can be reproduced identically in live and
backtest after warmup. Two ClassVar flags on Feature express this:
| Flag | Meaning |
|---|---|
is_recursive (ClassVar[bool], default False) |
The indicator is stateful / entry-point dependent: its value at bar N depends on where the series starts unless correctly initialized. |
warmup_invariant (ClassVar[bool], default True) |
The indicator guarantees entry-point invariance after warmup (e.g. deterministic SMA-seeded initialization). For a recursive feature that does not re-seed deterministically this must be False. |
The existing warmup property remains — the minimum number of bars before the
output is stable (default 0).
Feature.assert_reproducible() raises RuntimeError for exactly the dangerous
combination is_recursive and not warmup_invariant — a feature whose value
depends on the warmup start point, which would diverge between live and
backtest and break parity:
FeaturePipeline.assert_reproducible() delegates to each nested feature and
aggregates all offending feature names into a single error.
FeatureSpec¶
FeatureSpec is the serializable, hashable recipe for a FeaturePipeline
— it captures how to rebuild a pipeline (ordered features + params +
ta_version + raw_data_type), not the computed values.
from signalflow.feature import FeaturePipeline, ExampleRsiFeature, ExampleSmaFeature
from signalflow.feature.spec import FeatureSpec
pipeline = FeaturePipeline(
features=[ExampleRsiFeature(period=14), ExampleSmaFeature(period=20)],
raw_data_type="spot",
)
spec = FeatureSpec.from_pipeline(pipeline, ta_version="1.0.0")
spec.feature_hash() # stable SHA-256 of the recipe
config = spec.to_config() # plain dict: {"features": [...], "meta": {...}}
spec2 = FeatureSpec.from_config(config)
assert spec2.feature_hash() == spec.feature_hash() # round-trips
rebuilt = spec.build() # -> FeaturePipeline
Attributes: features (ordered list of {"name", "params", "scope"} dicts —
order is significant), ta_version, raw_data_type (default "spot"),
order_significant (default True, provenance metadata; features are never
reordered).
| Method | Purpose |
|---|---|
from_pipeline(pipeline, *, ta_version=None) |
Extract a spec from a live pipeline (feature names via registry reverse-lookup). |
from_config(data) / to_config() |
Round-trip a plain dict (flat or YAML meta: nested form). |
build() |
Reconstruct a FeaturePipeline in the recorded order. |
to_yaml(path) / from_yaml(path) |
Persist / load the recipe as YAML (survives class refactors, unlike pickle). |
feature_hash() |
Stable SHA-256 of the canonical recipe. |
feature_hash¶
canonical_feature_hash(features, ta_version, raw_data_type) is a pure
function (no I/O, no global state) backing FeatureSpec.feature_hash(). The
hash is:
- identical for two logically-equal recipes — dict key order is irrelevant,
float jitter is normalized (e.g.
0.1 == 0.1000000001), and omitted defaults are resolved explicitly (rsi()==rsi(period=14)); - different whenever something meaningful changes — a param value, the
order of features (never sorted), or the
ta_version(the same feature name across TA library versions is not the same implementation).
It is a configuration-drift detector: recompute it when loading a model artefact and refuse to continue on mismatch.
from signalflow.feature.spec import canonical_feature_hash
h = canonical_feature_hash(
features=[{"name": "rsi", "params": {"period": 14}, "scope": "pair"}],
ta_version="1.0.0",
raw_data_type="spot",
)
signalflow.feature.spec.FeatureSpec
dataclass
¶
FeatureSpec(features: list[dict] = list(), ta_version: str | None = None, raw_data_type: str = 'spot', order_significant: bool = True)
Serializable, hashable recipe for a :class:FeaturePipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
features |
list[dict]
|
Ordered feature records. Each is
|
ta_version |
str | None
|
Pinned TA-implementation version (part of the hash). |
raw_data_type |
str
|
Raw data type key (e.g. |
order_significant |
bool
|
Declares that feature order is part of the truth. Always honored (features are never reordered); kept as serialized metadata for provenance. |
build ¶
Reconstruct a :class:FeaturePipeline from the recipe.
For each record, instantiate via
default_registry.create(FEATURE, name, **params) and assemble in the
given order (order is significant).
Source code in src/signalflow/feature/spec.py
feature_hash ¶
Stable SHA-256 of the canonical recipe (drift detector).
from_config
classmethod
¶
Reconstruct a spec from a plain config dict.
Accepts both the flat form (ta_version/raw_data_type at top
level) and the YAML meta: nested form shown in VISION.md §5.4.
Source code in src/signalflow/feature/spec.py
from_pipeline
classmethod
¶
Extract a :class:FeatureSpec from a live pipeline.
Feature names come from the registry reverse-lookup
(:func:_registry_name_for_class). If a feature class is not
registered, we fall back to its class name and the resulting spec may
not round-trip through :meth:build (documented limitation).
Source code in src/signalflow/feature/spec.py
from_yaml
classmethod
¶
Load a spec from a YAML file.
to_config ¶
Serialize to a plain dict (YAML meta: nested form).
Source code in src/signalflow/feature/spec.py
to_yaml ¶
Persist the spec as YAML (survives class refactors, unlike pickle).
Source code in src/signalflow/feature/spec.py
signalflow.feature.spec.canonical_feature_hash ¶
Compute the stable feature hash from a feature recipe.
Pure function (no I/O, no global state) — trivially unit-testable.
Canonicalization rules (VISION.md §5.5):
* json.dumps(..., sort_keys=True) makes dict key order irrelevant;
* floats are rounded to fixed precision so 0.1 == 0.1000000001;
* the order of features is significant — the list is hashed as-is,
never sorted;
* ta_version is part of the hash (same feature name across TA library
versions is not the same implementation).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
list[dict]
|
Ordered list of |
required |
ta_version
|
str | None
|
Pinned TA-implementation version, or |
required |
raw_data_type
|
str
|
Raw data type key (e.g. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Hex-encoded SHA-256 digest. |
Source code in src/signalflow/feature/spec.py
ModelFeaturesPipeline¶
ModelFeaturesPipeline is the reproducibility wrapper for train↔serve parity.
It is a composition (not a subclass) over a FeaturePipeline (the one and
only compute engine) plus a FeatureSpec (the serializable recipe). There is
zero duplicated feature computation: compute() delegates straight into the
wrapped pipeline.
from signalflow.feature import FeaturePipeline, ExampleRsiFeature
from signalflow.feature.model_features import ModelFeaturesPipeline
pipeline = FeaturePipeline(features=[ExampleRsiFeature(period=14)], raw_data_type="spot")
mfp = ModelFeaturesPipeline.from_pipeline(pipeline, ta_version="1.0.0")
# Bundle recipe + hash to store alongside the trained model:
artifact = mfp.to_artifact_dict() # {"features_config": {...}, "feature_hash": "..."}
# At serve time, rebuild from config and guard against drift:
served = ModelFeaturesPipeline.from_config(artifact["features_config"])
served.verify_hash(artifact["feature_hash"]) # raises RuntimeError on mismatch
served.validate_reproducible() # raises if a recursive non-invariant feature is present
features_df = served.compute(df) # pure delegation to FeaturePipeline.compute
| Member | Purpose |
|---|---|
from_config(data) |
Build the recipe and instantiate the engine from it (single source of truth). |
from_pipeline(pipeline, *, ta_version=None) |
Wrap an already-built pipeline, deriving its recipe. |
to_config() |
Serialize the recipe (delegates to the spec). |
to_artifact_dict() |
Bundle features_config + feature_hash for storing with a model. |
feature_hash (property) |
Stable SHA-256 of the recipe. |
validate_reproducible() |
Assert every nested feature honours the warmup-invariance contract. |
verify_hash(expected) |
Recompute the hash and raise RuntimeError if it differs (drift detector). |
compute(df, context=None) |
Compute features by delegating to the wrapped engine (no math here). |
signalflow.feature.model_features.ModelFeaturesPipeline
dataclass
¶
Reproducibility wrapper around a :class:FeaturePipeline.
Holds both the live compute engine (_pipeline) and its serializable,
hashable recipe (_spec). All feature computation is delegated to the
nested pipeline — this class only adds reconstruction, hash verification and
warmup-reproducibility checks.
Attributes:
| Name | Type | Description |
|---|---|---|
_pipeline |
FeaturePipeline
|
The wrapped computational engine (single source of compute). |
_spec |
FeatureSpec
|
The serializable recipe used for config round-trip and hashing. |
compute ¶
Compute all features by delegating to the wrapped engine.
No feature math lives here — this is a thin pass-through to
:meth:FeaturePipeline.compute, which is the single computational
engine for the whole system.
Source code in src/signalflow/feature/model_features.py
from_config
classmethod
¶
Reconstruct from a plain config dict.
Builds the recipe (:class:FeatureSpec) and uses it to instantiate the
compute engine. The spec is the single source of truth for both the
engine and the hash, so they can never disagree.
Source code in src/signalflow/feature/model_features.py
from_pipeline
classmethod
¶
Wrap an already-built pipeline, deriving its recipe.
The recipe is extracted from the live pipeline via
:meth:FeatureSpec.from_pipeline; ta_version pins the TA
implementation in the hash when provided.
Source code in src/signalflow/feature/model_features.py
to_artifact_dict ¶
Bundle the recipe with its hash for storing alongside a model.
The persisted feature_hash is what :meth:verify_hash checks at
load time, turning silent recipe drift into a loud failure.
Source code in src/signalflow/feature/model_features.py
to_config ¶
validate_reproducible ¶
Assert every nested feature honours the warmup-invariance contract.
Delegates to :meth:FeaturePipeline.assert_reproducible, which raises if
any nested feature is recursive and not warmup-invariant (entry-point
dependent → would break live/backtest parity).
Raises:
| Type | Description |
|---|---|
RuntimeError
|
if a recursive non-invariant feature is present. |
Source code in src/signalflow/feature/model_features.py
verify_hash ¶
Verify the recipe hash matches an expected value (drift detector).
Recompute :attr:feature_hash and compare it to the hash recorded when
the artifact was produced. A mismatch means the feature recipe changed
between train and serve — never continue silently.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
if the recomputed hash differs from |
Source code in src/signalflow/feature/model_features.py
Feature Informativeness¶
Measures how informative each feature is relative to multiple targets at multiple prediction horizons. Combines MI magnitude with temporal stability into a composite score.
Usage¶
from signalflow.feature.informativeness import FeatureInformativenessAnalyzer
from signalflow.detector.market import MarketZScoreDetector
analyzer = FeatureInformativenessAnalyzer(
event_detector=MarketZScoreDetector(z_threshold=3.0),
)
report = analyzer.analyze(df, feature_columns=["rsi_14", "sma_20", "volume_ratio"])
print(report.top_features(10)) # best features by composite score
print(report.score_matrix) # NMI heatmap: feature x (horizon, target)
report.feature_detail("rsi_14") # per-target breakdown for one feature
signalflow.feature.informativeness.FeatureInformativenessAnalyzer
dataclass
¶
FeatureInformativenessAnalyzer(target_generator: MultiTargetGenerator = MultiTargetGenerator(), event_detector: SignalDetector | None = _default_event_detector(), rolling_mi: RollingMIConfig = RollingMIConfig(), weights: CompositeWeights = CompositeWeights(), bins: int = 20, pair_col: str = 'pair', ts_col: str = 'timestamp', aggregate_pairs: bool = True)
Orchestrator for feature informativeness analysis.
Pipeline
- Generate multi-horizon, multi-target labels
- Detect and mask global events
- Compute MI between each feature and each target
- Compute rolling MI for temporal stability
- Compute composite weighted scores
- Generate report
Attributes:
| Name | Type | Description |
|---|---|---|
target_generator |
MultiTargetGenerator
|
Multi-target label generator. |
event_detector |
SignalDetector | None
|
Global event detector. None to disable. |
rolling_mi |
RollingMIConfig
|
Rolling MI stability configuration. |
weights |
CompositeWeights
|
Composite scoring weights. |
bins |
int
|
Number of histogram bins for MI estimation. |
pair_col |
str
|
Pair column name. |
ts_col |
str
|
Timestamp column name. |
aggregate_pairs |
bool
|
If True, pool all pairs for MI computation. |
event_detector
class-attribute
instance-attribute
¶
rolling_mi
class-attribute
instance-attribute
¶
target_generator
class-attribute
instance-attribute
¶
weights
class-attribute
instance-attribute
¶
_build_score_matrix ¶
Build pivoted Feature x (Horizon, Target) matrix.
Source code in src/signalflow/feature/informativeness.py
_compute_all_mi ¶
_compute_all_mi(df: DataFrame, feature_columns: list[str], target_meta: list[dict[str, str]]) -> list[dict]
Compute MI for all (feature, target) pairs.
Source code in src/signalflow/feature/informativeness.py
_compute_composite ¶
Compute composite scores from raw MI results.
Source code in src/signalflow/feature/informativeness.py
_compute_mi_pair ¶
Compute MI between one feature and one target.
Source code in src/signalflow/feature/informativeness.py
_compute_stability ¶
Compute temporal stability via rolling MI windows.
Source code in src/signalflow/feature/informativeness.py
_extract_arrays ¶
_extract_arrays(df: DataFrame, feat_col: str, target_col: str) -> tuple[np.ndarray | None, np.ndarray | None]
Extract aligned numpy arrays, dropping rows with nulls in either.
Source code in src/signalflow/feature/informativeness.py
_nan_row ¶
Source code in src/signalflow/feature/informativeness.py
_validate ¶
Source code in src/signalflow/feature/informativeness.py
analyze ¶
Run full informativeness analysis.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
OHLCV DataFrame with pre-computed feature columns. |
required |
feature_columns
|
list[str]
|
List of feature column names to evaluate. |
required |
Returns:
| Type | Description |
|---|---|
InformativenessReport
|
InformativenessReport with all results. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If required columns are missing or feature_columns is empty. |
Source code in src/signalflow/feature/informativeness.py
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | |
signalflow.feature.informativeness.InformativenessReport
dataclass
¶
InformativenessReport(raw_mi: DataFrame, composite_scores: DataFrame, score_matrix: DataFrame, global_events: DataFrame | None, metadata: dict)
Container for informativeness analysis results.
Attributes:
| Name | Type | Description |
|---|---|---|
raw_mi |
DataFrame
|
Full MI results (feature x horizon x target). |
composite_scores |
DataFrame
|
Aggregated scores per feature, ranked. |
score_matrix |
DataFrame
|
Pivoted Feature x (Horizon, Target) matrix. |
global_events |
DataFrame | None
|
Global event detection results (if enabled). |
metadata |
dict
|
Analysis configuration and statistics. |
signalflow.feature.informativeness.RollingMIConfig
dataclass
¶
Configuration for rolling MI stability computation.
Attributes:
| Name | Type | Description |
|---|---|---|
window_size |
int
|
Number of bars per rolling window. |
min_window_fill |
float
|
Minimum fraction of non-null values in a window. |
signalflow.feature.informativeness.CompositeWeights
dataclass
¶
CompositeWeights(horizon_weights: dict[str, float] | None = None, target_weights: dict[str, float] | None = None, stability_weight: float = 0.3)
Weights for composite informativeness scoring.
Attributes:
| Name | Type | Description |
|---|---|---|
horizon_weights |
dict[str, float] | None
|
Per-horizon weights. None = equal weights. |
target_weights |
dict[str, float] | None
|
Per-target weights. None = equal weights. |
stability_weight |
float
|
Fraction of score from stability (rest from NMI). |
Mutual Information Functions¶
signalflow.feature.mutual_information ¶
Mutual Information estimation for feature-target pairs.
Provides histogram-based MI estimation for continuous and discrete variables. Used by FeatureInformativenessAnalyzer to measure feature informativeness against multiple target types.
References
- Cover & Thomas (2006) - Elements of Information Theory
- Kraskov et al. (2004) - MI estimation
_bin_continuous ¶
Bin continuous values into integer bin indices.
_isnan_any ¶
Return boolean mask for NaN-like values in any dtype.
Source code in src/signalflow/feature/mutual_information.py
_mi_from_contingency ¶
Compute MI from two discrete arrays via contingency table.
Source code in src/signalflow/feature/mutual_information.py
entropy_continuous ¶
Shannon entropy via histogram of a continuous variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D array of continuous values. |
required |
bins
|
int
|
Number of histogram bins. |
20
|
Returns:
| Type | Description |
|---|---|
float
|
Entropy in bits. NaN if fewer than 2 valid values. |
Source code in src/signalflow/feature/mutual_information.py
entropy_discrete ¶
Shannon entropy of a discrete distribution.
H(X) = -sum_x p(x) * log2(p(x))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D array of discrete values. |
required |
Returns:
| Type | Description |
|---|---|
float
|
Entropy in bits. NaN if fewer than 2 values. |
Source code in src/signalflow/feature/mutual_information.py
mutual_information_continuous ¶
MI between two continuous variables.
Bins both variables and computes MI from the 2D histogram.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D continuous array. |
required |
y
|
ndarray
|
1D continuous array. |
required |
bins
|
int
|
Number of bins per dimension. |
20
|
Returns:
| Type | Description |
|---|---|
float
|
MI in bits. NaN if insufficient data. |
Source code in src/signalflow/feature/mutual_information.py
mutual_information_continuous_discrete ¶
MI between a continuous feature and a discrete target.
Bins the continuous variable, then computes MI from the joint contingency table of (binned_x, y).
This is the primary use case: continuous feature columns (RSI, SMA, etc.) against discrete labels (RISE/FALL/NONE).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D continuous feature array. |
required |
y
|
ndarray
|
1D discrete target array. |
required |
bins
|
int
|
Number of bins for the continuous variable. |
20
|
Returns:
| Type | Description |
|---|---|
float
|
MI in bits. NaN if insufficient data. |
Source code in src/signalflow/feature/mutual_information.py
mutual_information_discrete ¶
MI between two discrete (categorical) arrays.
MI(X;Y) = sum_{x,y} p(x,y) * log2(p(x,y) / (p(x) * p(y)))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D discrete array. |
required |
y
|
ndarray
|
1D discrete array of same length. |
required |
Returns:
| Type | Description |
|---|---|
float
|
MI in bits. NaN if insufficient data. |
Source code in src/signalflow/feature/mutual_information.py
normalized_mutual_information ¶
Normalize MI to [0, 1] using NMI = MI / sqrt(H(X) * H(Y)).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mi
|
float
|
Raw mutual information value. |
required |
h_x
|
float
|
Entropy of X. |
required |
h_y
|
float
|
Entropy of Y. |
required |
Returns:
| Type | Description |
|---|---|
float
|
NMI in [0, 1]. NaN if either entropy is zero or NaN. |