Feature Module¶
Feature extraction for technical indicators and derived metrics.
Base Classes¶
signalflow.feature.base.Feature
dataclass
¶
Feature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None)
Bases: KwargsTolerantMixin
Base class for all features.
Two methods to implement
- compute(df): all pairs, abstract for GlobalFeature/Pipeline
- compute_pair(df): one pair, for regular features
Attributes:
| Name | Type | Description |
|---|---|---|
requires |
list[str]
|
Input column templates, e.g. ["{price_col}"] |
outputs |
list[str]
|
Output column templates, e.g. ["rsi_{period}"] |
normalized |
bool
|
If True, apply rolling z-score normalization to output. |
norm_period |
int | None
|
Window for normalization. Defaults to 3x feature period. |
signalflow.feature.feature_pipeline.FeaturePipeline
dataclass
¶
FeaturePipeline(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, features: list[Feature] = list(), raw_data_type: RawDataType | str = RawDataType.SPOT)
Bases: Feature
Orchestrates multiple features with optimized execution.
Groups consecutive per-pair features into batches for single group_by.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
list[Feature]
|
List of features to compute. |
list()
|
raw_data_type
|
RawDataType | str
|
Type of raw data (defines available columns). |
SPOT
|
Example
pipeline = FeaturePipeline( ... features=[ ... RsiFeature(period=14), ... SmaFeature(period=20), ... GlobalFeature(base=RsiFeature(period=14), reference_pair="BTCUSDT"), ... ], ... raw_data_type=RawDataType.SPOT, ... ) df = pipeline.run(raw_data_view)
__post_init__ ¶
_group_into_batches ¶
Group features: consecutive per-pair → batch, global → separate.
Source code in src/signalflow/feature/feature_pipeline.py
_is_per_pair_batch ¶
Check if batch contains only per-pair features.
Source code in src/signalflow/feature/feature_pipeline.py
_validate ¶
Validate all dependencies are satisfied.
Source code in src/signalflow/feature/feature_pipeline.py
compute ¶
Compute all features with optimized batching.
Source code in src/signalflow/feature/feature_pipeline.py
output_cols ¶
run ¶
Entry point: load from RawDataView and compute.
Source code in src/signalflow/feature/feature_pipeline.py
to_mermaid ¶
Generate Mermaid diagram of feature dependencies.
Source code in src/signalflow/feature/feature_pipeline.py
signalflow.feature.base.GlobalFeature
dataclass
¶
GlobalFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, sources: list[str] | None = None)
Bases: Feature
Base class for features computed across all pairs.
Override compute() with custom aggregation logic.
For multi-source features, set sources to specify which exchanges to use.
Use get_source_data() to retrieve data from RawData with proper handling.
Attributes:
| Name | Type | Description |
|---|---|---|
sources |
list[str] | None
|
List of source names to use (e.g., ["binance", "okx"]). If None, uses default source or all available sources. |
Example
compute ¶
Must override - compute global feature across all pairs.
get_source_data ¶
Get DataFrame from RawData for a specific source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
RawData
|
RawData container. |
required |
data_type
|
str
|
Data type key (e.g., "perpetual", "spot"). |
required |
source
|
str | None
|
Specific source name. If None, uses default. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Data for the specified source. |
Source code in src/signalflow/feature/base.py
iter_sources ¶
Iterate over source DataFrames from RawData.
If self.sources is set, iterates only those sources.
Otherwise, iterates all available sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw
|
RawData
|
RawData container. |
required |
data_type
|
str
|
Data type key (e.g., "perpetual"). |
required |
Yields:
| Type | Description |
|---|---|
tuple[str, DataFrame]
|
tuple[str, pl.DataFrame]: (source_name, DataFrame) pairs. |
Source code in src/signalflow/feature/base.py
signalflow.feature.offset_feature.OffsetFeature
dataclass
¶
OffsetFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, feature_name: str | None = None, feature_params: dict = dict(), window: int = 15, prefix: str | None = None)
Bases: Feature
Multi-timeframe feature via offset resampling.
Creates window parallel time series with different offsets.
Each offset computes features as if on window-minute bars.
Supports both regular Feature (compute_pair) and GlobalFeature (compute).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
feature_name
|
str | None
|
Registered component name (sf_component name). |
None
|
feature_params
|
dict
|
Parameters for feature class. |
dict()
|
window
|
int
|
Aggregation window in minutes. Default: 15. |
15
|
prefix
|
str | None
|
Prefix for output columns. Default: "{window}m_". |
None
|
Example
offset = OffsetFeature( ... feature_name="test_rsi", ... feature_params={"period": 14}, ... window=15, ... )
Outputs: 15m_rsi_14, offset¶
With GlobalFeature¶
offset = OffsetFeature( ... feature_name="global/market_log_return", ... feature_params={}, ... window=15, ... )
feature_params
class-attribute
instance-attribute
¶
requires
class-attribute
¶
__post_init__ ¶
Source code in src/signalflow/feature/offset_feature.py
_compute_all_pairs_global ¶
Compute features for all pairs with global base feature.
Source code in src/signalflow/feature/offset_feature.py
_compute_base_feature ¶
Compute base feature - handles both Feature and GlobalFeature.
Source code in src/signalflow/feature/offset_feature.py
_compute_single_pair ¶
Compute features for single pair (non-global base).
Source code in src/signalflow/feature/offset_feature.py
_resample_ohlcv ¶
Resample 1m OHLCV to window-minute bars with given offset.
Source code in src/signalflow/feature/offset_feature.py
compute ¶
Compute for all pairs.
Source code in src/signalflow/feature/offset_feature.py
compute_pair ¶
Compute for single pair (only for non-global base).
Source code in src/signalflow/feature/offset_feature.py
from_dict
classmethod
¶
Deserialize from config.
Source code in src/signalflow/feature/offset_feature.py
output_cols ¶
required_cols ¶
to_dict ¶
signalflow.feature.lin_reg_forecast.LinRegForecastFeature
dataclass
¶
LinRegForecastFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, source_col: str = 'rsi_14', n_lags: int = 10, trend_window: int = 5, mean_window: int = 20, refit_period: Literal['hour', 'day', 'week', 'month', None] = 'day', alpha: float = 1.0, forecast_horizon: int = 1, min_samples: int = 50)
Bases: Feature
Enhanced linear regression forecast with trend and mean-reversion features.
Instead of predicting raw values, predicts change (diff) and adds: - Trend slope (recent momentum) - Mean reversion signal (deviation from rolling mean) - Volatility scaling
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_col
|
str
|
Column to forecast. |
'rsi_14'
|
n_lags
|
int
|
Number of lagged diffs. Default: 10. |
10
|
trend_window
|
int
|
Window for trend calculation. Default: 5. |
5
|
mean_window
|
int
|
Window for mean reversion. Default: 20. |
20
|
refit_period
|
Literal['hour', 'day', 'week', 'month', None]
|
When to refit. Default: 'day'. |
'day'
|
alpha
|
float
|
Ridge regularization. Default: 1.0. |
1.0
|
forecast_horizon
|
int
|
Steps ahead to forecast. Default: 1. |
1
|
outputs
class-attribute
¶
outputs: list[str] = ['{source_col}_forecast', '{source_col}_forecast_change', '{source_col}_forecast_direction']
refit_period
class-attribute
instance-attribute
¶
test_params
class-attribute
¶
test_params: list[dict] = [{'source_col': 'rsi_14', 'n_lags': 10}, {'source_col': 'rsi_14', 'n_lags': 5, 'mean_window': 10}]
__post_init__ ¶
_build_features ¶
Build enhanced feature matrix.
Source code in src/signalflow/feature/lin_reg_forecast.py
_build_targets ¶
Build target: forward diff (change).
Source code in src/signalflow/feature/lin_reg_forecast.py
_get_period_key ¶
Source code in src/signalflow/feature/lin_reg_forecast.py
compute_pair ¶
Compute forecasts for single pair.
Source code in src/signalflow/feature/lin_reg_forecast.py
signalflow.feature.atr.ATRFeature
dataclass
¶
ATRFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 14, smoothing: Literal['sma', 'ema'] = 'ema')
Bases: Feature
Average True Range (ATR) feature.
Measures market volatility as the moving average of True Range. True Range = max(high - low, |high - prev_close|, |low - prev_close|)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
ATR period. Default: 14. |
14
|
smoothing
|
Literal['sma', 'ema']
|
Smoothing method - "sma" or "ema" (Wilder's). Default: "ema". |
'ema'
|
Example
atr = ATRFeature(period=14) atr.output_cols() # ["atr_14"]
test_params
class-attribute
¶
_get_output_name ¶
compute_pair ¶
Compute ATR for a single pair.
Source code in src/signalflow/feature/atr.py
Examples¶
signalflow.feature.examples.ExampleRsiFeature
dataclass
¶
ExampleRsiFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 14, price_col: str = 'close')
Bases: Feature
Relative Strength Index.
Bounded oscillator [0, 100]. In normalized mode, rescales to [-1, 1].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
RSI period. Default: 14. |
14
|
price_col
|
str
|
Price column to use. Default: "close". |
'close'
|
Example
rsi = ExampleRsiFeature(period=21) rsi.output_cols() # ["rsi_21"]
test_params
class-attribute
¶
_get_output_name ¶
compute ¶
compute_pair ¶
Compute RSI for single pair.
Source code in src/signalflow/feature/examples.py
signalflow.feature.examples.ExampleSmaFeature
dataclass
¶
ExampleSmaFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, period: int = 20, price_col: str = 'close')
Bases: Feature
Simple Moving Average.
_get_output_name ¶
compute_pair ¶
Source code in src/signalflow/feature/examples.py
signalflow.feature.examples.ExampleGlobalMeanRsiFeature
dataclass
¶
ExampleGlobalMeanRsiFeature(group_col: str = 'pair', ts_col: str = 'timestamp', normalized: bool = False, norm_period: int | None = None, sources: list[str] | None = None, period: int = 14, price_col: str = 'close', add_diff: bool = False)
Bases: GlobalFeature
Mean RSI across all pairs per timestamp.
- Compute RSI per pair
- Mean across all pairs at time t -> global_mean_rsi
- Optionally: rsi_diff = pair_rsi - global_mean_rsi
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
RSI period. Default: 14. |
14
|
add_diff
|
bool
|
Add per-pair difference column. Default: False. |
False
|
test_params
class-attribute
¶
compute ¶
Source code in src/signalflow/feature/examples.py
output_cols ¶
Feature Informativeness¶
Measures how informative each feature is relative to multiple targets at multiple prediction horizons. Combines MI magnitude with temporal stability into a composite score.
Usage¶
from signalflow.feature.informativeness import FeatureInformativenessAnalyzer
from signalflow.detector.market import MarketZScoreDetector
analyzer = FeatureInformativenessAnalyzer(
event_detector=MarketZScoreDetector(z_threshold=3.0),
)
report = analyzer.analyze(df, feature_columns=["rsi_14", "sma_20", "volume_ratio"])
print(report.top_features(10)) # best features by composite score
print(report.score_matrix) # NMI heatmap: feature x (horizon, target)
report.feature_detail("rsi_14") # per-target breakdown for one feature
signalflow.feature.informativeness.FeatureInformativenessAnalyzer
dataclass
¶
FeatureInformativenessAnalyzer(target_generator: MultiTargetGenerator = MultiTargetGenerator(), event_detector: SignalDetector | None = _default_event_detector(), rolling_mi: RollingMIConfig = RollingMIConfig(), weights: CompositeWeights = CompositeWeights(), bins: int = 20, pair_col: str = 'pair', ts_col: str = 'timestamp', aggregate_pairs: bool = True)
Orchestrator for feature informativeness analysis.
Pipeline
- Generate multi-horizon, multi-target labels
- Detect and mask global events
- Compute MI between each feature and each target
- Compute rolling MI for temporal stability
- Compute composite weighted scores
- Generate report
Attributes:
| Name | Type | Description |
|---|---|---|
target_generator |
MultiTargetGenerator
|
Multi-target label generator. |
event_detector |
SignalDetector | None
|
Global event detector. None to disable. |
rolling_mi |
RollingMIConfig
|
Rolling MI stability configuration. |
weights |
CompositeWeights
|
Composite scoring weights. |
bins |
int
|
Number of histogram bins for MI estimation. |
pair_col |
str
|
Pair column name. |
ts_col |
str
|
Timestamp column name. |
aggregate_pairs |
bool
|
If True, pool all pairs for MI computation. |
event_detector
class-attribute
instance-attribute
¶
rolling_mi
class-attribute
instance-attribute
¶
target_generator
class-attribute
instance-attribute
¶
weights
class-attribute
instance-attribute
¶
_build_score_matrix ¶
Build pivoted Feature x (Horizon, Target) matrix.
Source code in src/signalflow/feature/informativeness.py
_compute_all_mi ¶
_compute_all_mi(df: DataFrame, feature_columns: list[str], target_meta: list[dict[str, str]]) -> list[dict]
Compute MI for all (feature, target) pairs.
Source code in src/signalflow/feature/informativeness.py
_compute_composite ¶
Compute composite scores from raw MI results.
Source code in src/signalflow/feature/informativeness.py
_compute_mi_pair ¶
Compute MI between one feature and one target.
Source code in src/signalflow/feature/informativeness.py
_compute_stability ¶
Compute temporal stability via rolling MI windows.
Source code in src/signalflow/feature/informativeness.py
_extract_arrays ¶
_extract_arrays(df: DataFrame, feat_col: str, target_col: str) -> tuple[np.ndarray | None, np.ndarray | None]
Extract aligned numpy arrays, dropping rows with nulls in either.
Source code in src/signalflow/feature/informativeness.py
_nan_row ¶
Source code in src/signalflow/feature/informativeness.py
_validate ¶
Source code in src/signalflow/feature/informativeness.py
analyze ¶
Run full informativeness analysis.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
OHLCV DataFrame with pre-computed feature columns. |
required |
feature_columns
|
list[str]
|
List of feature column names to evaluate. |
required |
Returns:
| Type | Description |
|---|---|
InformativenessReport
|
InformativenessReport with all results. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If required columns are missing or feature_columns is empty. |
Source code in src/signalflow/feature/informativeness.py
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | |
signalflow.feature.informativeness.InformativenessReport
dataclass
¶
InformativenessReport(raw_mi: DataFrame, composite_scores: DataFrame, score_matrix: DataFrame, global_events: DataFrame | None, metadata: dict)
Container for informativeness analysis results.
Attributes:
| Name | Type | Description |
|---|---|---|
raw_mi |
DataFrame
|
Full MI results (feature x horizon x target). |
composite_scores |
DataFrame
|
Aggregated scores per feature, ranked. |
score_matrix |
DataFrame
|
Pivoted Feature x (Horizon, Target) matrix. |
global_events |
DataFrame | None
|
Global event detection results (if enabled). |
metadata |
dict
|
Analysis configuration and statistics. |
signalflow.feature.informativeness.RollingMIConfig
dataclass
¶
Configuration for rolling MI stability computation.
Attributes:
| Name | Type | Description |
|---|---|---|
window_size |
int
|
Number of bars per rolling window. |
min_window_fill |
float
|
Minimum fraction of non-null values in a window. |
signalflow.feature.informativeness.CompositeWeights
dataclass
¶
CompositeWeights(horizon_weights: dict[str, float] | None = None, target_weights: dict[str, float] | None = None, stability_weight: float = 0.3)
Weights for composite informativeness scoring.
Attributes:
| Name | Type | Description |
|---|---|---|
horizon_weights |
dict[str, float] | None
|
Per-horizon weights. None = equal weights. |
target_weights |
dict[str, float] | None
|
Per-target weights. None = equal weights. |
stability_weight |
float
|
Fraction of score from stability (rest from NMI). |
Mutual Information Functions¶
signalflow.feature.mutual_information ¶
Mutual Information estimation for feature-target pairs.
Provides histogram-based MI estimation for continuous and discrete variables. Used by FeatureInformativenessAnalyzer to measure feature informativeness against multiple target types.
References
- Cover & Thomas (2006) - Elements of Information Theory
- Kraskov et al. (2004) - MI estimation
_bin_continuous ¶
Bin continuous values into integer bin indices.
_isnan_any ¶
Return boolean mask for NaN-like values in any dtype.
Source code in src/signalflow/feature/mutual_information.py
_mi_from_contingency ¶
Compute MI from two discrete arrays via contingency table.
Source code in src/signalflow/feature/mutual_information.py
entropy_continuous ¶
Shannon entropy via histogram of a continuous variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D array of continuous values. |
required |
bins
|
int
|
Number of histogram bins. |
20
|
Returns:
| Type | Description |
|---|---|
float
|
Entropy in bits. NaN if fewer than 2 valid values. |
Source code in src/signalflow/feature/mutual_information.py
entropy_discrete ¶
Shannon entropy of a discrete distribution.
H(X) = -sum_x p(x) * log2(p(x))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D array of discrete values. |
required |
Returns:
| Type | Description |
|---|---|
float
|
Entropy in bits. NaN if fewer than 2 values. |
Source code in src/signalflow/feature/mutual_information.py
mutual_information_continuous ¶
MI between two continuous variables.
Bins both variables and computes MI from the 2D histogram.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D continuous array. |
required |
y
|
ndarray
|
1D continuous array. |
required |
bins
|
int
|
Number of bins per dimension. |
20
|
Returns:
| Type | Description |
|---|---|
float
|
MI in bits. NaN if insufficient data. |
Source code in src/signalflow/feature/mutual_information.py
mutual_information_continuous_discrete ¶
MI between a continuous feature and a discrete target.
Bins the continuous variable, then computes MI from the joint contingency table of (binned_x, y).
This is the primary use case: continuous feature columns (RSI, SMA, etc.) against discrete labels (RISE/FALL/NONE).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D continuous feature array. |
required |
y
|
ndarray
|
1D discrete target array. |
required |
bins
|
int
|
Number of bins for the continuous variable. |
20
|
Returns:
| Type | Description |
|---|---|
float
|
MI in bits. NaN if insufficient data. |
Source code in src/signalflow/feature/mutual_information.py
mutual_information_discrete ¶
MI between two discrete (categorical) arrays.
MI(X;Y) = sum_{x,y} p(x,y) * log2(p(x,y) / (p(x) * p(y)))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
ndarray
|
1D discrete array. |
required |
y
|
ndarray
|
1D discrete array of same length. |
required |
Returns:
| Type | Description |
|---|---|
float
|
MI in bits. NaN if insufficient data. |
Source code in src/signalflow/feature/mutual_information.py
normalized_mutual_information ¶
Normalize MI to [0, 1] using NMI = MI / sqrt(H(X) * H(Y)).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mi
|
float
|
Raw mutual information value. |
required |
h_x
|
float
|
Entropy of X. |
required |
h_y
|
float
|
Entropy of Y. |
required |
Returns:
| Type | Description |
|---|---|
float
|
NMI in [0, 1]. NaN if either entropy is zero or NaN. |