Feature Module¶
Feature extraction for technical indicators and derived metrics.
Base Classes¶
signalflow.feature.base.FeatureExtractor
dataclass
¶
FeatureExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False)
Bases: ABC
Base class for Polars-first feature extraction.
Extracts technical indicators and derived features from raw OHLCV data with optional sliding window resampling for multi-timeframe features.
Key features
- Polars-native for performance
- Optional sliding window resampling (e.g., 5m bars from 1m bars)
- Per-pair, per-offset-window processing
- Length-preserving operations
- Automatic projection (keep only new features)
Processing pipeline
- Sort by (pair, timestamp)
- Add resample_offset column
- (optional) Apply sliding resample
- (optional) Filter to last offset
- Group by (pair, resample_offset) and compute features
- Sort output
- Project columns (keep input or features only)
Attributes:
| Name | Type | Description |
|---|---|---|
offset_window |
int
|
Sliding window size in bars. Default: 1. |
compute_last_offset |
bool
|
Keep only last offset. Default: False. |
pair_col |
str
|
Trading pair column. Default: "pair". |
ts_col |
str
|
Timestamp column. Default: "timestamp". |
offset_col |
str
|
Offset tracking column. Default: "resample_offset". |
use_resample |
bool
|
Apply sliding resample. Default: False. |
resample_mode |
Literal['add', 'replace']
|
Resample mode. Default: "add". |
resample_prefix |
str | None
|
Prefix for resampled columns. Default: None. |
raw_data_type |
RawDataType
|
Type of raw data. Default: SPOT. |
component_type |
ClassVar[SfComponentType]
|
Always FEATURE_EXTRACTOR. |
keep_input_columns |
bool
|
Keep all input columns. Default: False. |
Example
from signalflow.feature import FeatureExtractor
import polars as pl
class RsiExtractor(FeatureExtractor):
'''RSI indicator extractor'''
def __init__(self, window: int = 14, column: str = "close"):
super().__init__()
self.window = window
self.column = column
def compute_group(self, group_df, data_context=None):
# Compute RSI per group
delta = group_df.select(pl.col(self.column).diff().alias("delta"))
gain = delta.select(
pl.when(pl.col("delta") > 0)
.then(pl.col("delta"))
.otherwise(0)
.alias("gain")
)
loss = delta.select(
pl.when(pl.col("delta") < 0)
.then(-pl.col("delta"))
.otherwise(0)
.alias("loss")
)
avg_gain = gain.select(
pl.col("gain").rolling_mean(self.window).alias("avg_gain")
)
avg_loss = loss.select(
pl.col("loss").rolling_mean(self.window).alias("avg_loss")
)
rs = avg_gain.select(
(pl.col("avg_gain") / pl.col("avg_loss")).alias("rs")
)
rsi = group_df.with_columns([
(100 - (100 / (1 + rs.get_column("rs")))).alias(f"rsi_{self.window}")
])
return rsi
# Usage
extractor = RsiExtractor(window=14)
features = extractor.extract(ohlcv_df)
Note
compute_group() must preserve row count (length-preserving). All timestamps must be timezone-naive. For multi-timeframe features, use use_resample=True.
See Also
RollingAggregator: Sliding window resampler. FeatureSet: Orchestrates multiple extractors.
_resampler
property
¶
Get configured RollingAggregator instance.
Returns:
| Name | Type | Description |
|---|---|---|
RollingAggregator |
RollingAggregator
|
Resampler with current configuration. |
__post_init__ ¶
Validate configuration after initialization.
Raises:
| Type | Description |
|---|---|
ValueError
|
If offset_window <= 0, invalid resample_mode, or wrong offset_col. |
TypeError
|
If column names not strings. |
Source code in src/signalflow/feature/base.py
_validate_input ¶
Validate input DataFrame has required columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input to validate. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If required columns missing. |
Source code in src/signalflow/feature/base.py
compute_group ¶
Compute features for single (pair, resample_offset) group.
Core feature extraction logic - must be implemented by subclasses.
CRITICAL: Must preserve row count (len(output) == len(input)). Should preserve ordering within group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_df
|
DataFrame
|
Single group's data, sorted by timestamp. |
required |
data_context
|
dict[str, Any] | None
|
Additional context. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Same length as input with added feature columns. |
Example
def compute_group(self, group_df, data_context=None):
# Simple moving average
return group_df.with_columns([
pl.col("close")
.rolling_mean(self.window)
.alias(f"sma_{self.window}")
])
# Multiple features
def compute_group(self, group_df, data_context=None):
return group_df.with_columns([
pl.col("close").rolling_mean(10).alias("sma_10"),
pl.col("close").rolling_mean(20).alias("sma_20"),
pl.col("high").rolling_max(14).alias("high_14"),
pl.col("low").rolling_min(14).alias("low_14")
])
Note
Output must have same height as input (length-preserving). Use rolling operations for windowed features. First N-1 bars may have null values for N-period indicators.
Source code in src/signalflow/feature/base.py
extract ¶
Extract features from input DataFrame.
Main entry point - handles sorting, resampling, grouping, and projection.
Processing pipeline
- Validate input (required columns)
- Sort by (pair, timestamp)
- Add resample_offset column if missing
- (optional) Apply sliding resample
- (optional) Filter to last offset
- Group by (pair, resample_offset) and compute features
- Sort output
- Project to output columns
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input OHLCV data with pair and timestamp columns. |
required |
data_context
|
dict[str, Any] | None
|
Additional context for computation. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Features DataFrame with columns: - pair, timestamp (always included) - feature columns (from compute_group) |
Raises:
| Type | Description |
|---|---|
TypeError
|
If df not pl.DataFrame or compute_group returns wrong type. |
ValueError
|
If compute_group changes row count or columns missing. |
Example
# Basic extraction
features = extractor.extract(ohlcv_df)
# With resampling (5m from 1m)
extractor = RsiExtractor(
window=14,
offset_window=5,
use_resample=True
)
features = extractor.extract(ohlcv_df)
# Keep input columns
extractor.keep_input_columns = True
features_with_ohlcv = extractor.extract(ohlcv_df)
Note
Only accepts pl.DataFrame (Polars-first design). Use PandasFeatureExtractor adapter for Pandas data.
Source code in src/signalflow/feature/base.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 | |
signalflow.feature.feature_set.FeatureSet
dataclass
¶
FeatureSet(extractors: list[FeatureExtractor], parallel: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp')
Polars-first orchestrator for multiple feature extractors.
Combines independent feature extractors via outer join on (pair, timestamp). Each extractor fetches its required data, computes features, and results are merged into single DataFrame.
Key features
- Automatic data fetching per extractor
- Timezone normalization (all → naive)
- Outer join on (pair, timestamp) for alignment
- Duplicate feature column detection
- Consistent index columns across extractors
Processing flow
For each extractor: 1. Fetch appropriate raw data as Polars 2. Run extractor.extract() 3. Normalize timestamps to timezone-naive 4. Validate index columns present Then: 5. Outer join all results on (pair, timestamp)
Attributes:
| Name | Type | Description |
|---|---|---|
extractors |
list[FeatureExtractor]
|
Feature extractors to orchestrate. |
parallel |
bool
|
Parallel execution flag (not yet implemented). Default: False. |
pair_col |
str
|
Trading pair column name. Default: "pair". |
ts_col |
str
|
Timestamp column name. Default: "timestamp". |
Example
from signalflow.feature import FeatureSet, SmaExtractor, RsiExtractor
# Create feature set
feature_set = FeatureSet([
SmaExtractor(window=10, column="close"),
SmaExtractor(window=20, column="close"),
RsiExtractor(window=14, column="close")
])
# Extract all features at once
from signalflow.core import RawDataView
view = RawDataView(raw=raw_data)
features = feature_set.extract(view)
# Result has: pair, timestamp, sma_10, sma_20, rsi_14
print(features.columns)
# ['pair', 'timestamp', 'sma_10', 'sma_20', 'rsi_14']
Example
# With multi-timeframe features
feature_set = FeatureSet([
# 1-minute features
SmaExtractor(window=10, offset_window=1),
# 5-minute features
SmaExtractor(
window=10,
offset_window=5,
use_resample=True,
resample_prefix="5m_"
)
])
features = feature_set.extract(view)
# Has both 1m and 5m features aligned
Note
All extractors must use same pair_col and ts_col. Feature column names must be unique across extractors. Timestamps automatically normalized to timezone-naive.
See Also
FeatureExtractor: Base class for individual extractors. RawDataView: Provides data in required format.
__post_init__ ¶
Validate extractors configuration.
Checks
- At least one extractor provided
- All extractors use same pair_col
- All extractors use same ts_col
Raises:
| Type | Description |
|---|---|
ValueError
|
If validation fails. |
Source code in src/signalflow/feature/feature_set.py
_combine_features ¶
Combine feature DataFrames via outer join.
Merges all feature DataFrames on (pair, timestamp) index. Detects and rejects duplicate feature column names.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
feature_dfs
|
list[DataFrame]
|
Feature DataFrames to combine. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Combined features with outer join semantics. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no DataFrames or duplicate feature columns found. |
Example
Note
Outer join preserves all (pair, timestamp) from all extractors. Duplicate columns trigger error - use unique prefixes.
Source code in src/signalflow/feature/feature_set.py
_get_input_df ¶
Fetch input data for extractor in Polars format.
Determines required data type from extractor.raw_data_type and fetches as Polars DataFrame (canonical format).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data
|
RawDataView
|
Data view. |
required |
extractor
|
FeatureExtractor
|
Extractor needing data. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Raw data in Polars format. |
Note
Always returns Polars (Polars-first design). Falls back to string "polars" for backward compatibility.
Source code in src/signalflow/feature/feature_set.py
_normalize_index ¶
Normalize timestamp to timezone-naive.
Ensures consistent timezone handling across all extractors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to normalize. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: DataFrame with timezone-naive timestamps. |
Source code in src/signalflow/feature/feature_set.py
extract ¶
Extract and combine features from all extractors.
Main entry point - orchestrates extraction and merging.
Processing
- For each extractor:
- Fetch appropriate data format
- Run extraction
- Normalize timestamps
- Validate output
- Outer join all results on (pair, timestamp)
- Detect duplicate feature columns
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data
|
RawDataView
|
View to raw market data. |
required |
context
|
dict[str, Any] | None
|
Additional context passed to extractors. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Combined features with columns: - pair, timestamp (index) - feature columns from all extractors |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no extractors or duplicate feature columns. |
TypeError
|
If extractor doesn't return pl.DataFrame. |
Example
from signalflow.core import RawData, RawDataView
# Create view
view = RawDataView(raw=raw_data)
# Extract features
features = feature_set.extract(view)
# Check result
print(f"Features: {features.columns}")
print(f"Shape: {features.shape}")
# With context
features = feature_set.extract(
view,
context={"lookback_bars": 100}
)
Note
Outer join means all (pair, timestamp) combinations preserved. Missing features filled with null for non-matching timestamps.
Source code in src/signalflow/feature/feature_set.py
Smoothing Extractors¶
signalflow.feature.smoother.sma_extractor.SmaExtractor
dataclass
¶
SmaExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = True, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, sma_period: int = 20, price_col: str = 'close', out_col: str = 'sma')
Bases: FeatureExtractor
SMA per (pair, resample_offset) group.
Notes:
- offset_window here is for RollingAggregator (your framework requirement).
SMA window is sma_period.
- In v1 you said only spot -> keep data_type="spot" by default.
__post_init__ ¶
compute_group ¶
Source code in src/signalflow/feature/smoother/sma_extractor.py
Pandas-TA Extractors¶
signalflow.feature.pandasta.top_pandasta_extractors.PandasTaRsiExtractor
dataclass
¶
PandasTaRsiExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, length: int = 14, *, pandas_group_fn: PandasGroupFn | None = None)
Bases: PandasTaExtractor
__post_init__ ¶
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
signalflow.feature.pandasta.top_pandasta_extractors.PandasTaBbandsExtractor
dataclass
¶
PandasTaBbandsExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, length: int = 20, std: float = 2.0, *, pandas_group_fn: PandasGroupFn | None = None)
Bases: PandasTaExtractor
__post_init__ ¶
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
signalflow.feature.pandasta.top_pandasta_extractors.PandasTaAtrExtractor
dataclass
¶
PandasTaAtrExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, length: int = 14, *, pandas_group_fn: PandasGroupFn | None = None)
Bases: PandasTaExtractor
__post_init__ ¶
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
signalflow.feature.pandasta.top_pandasta_extractors.PandasTaMacdExtractor
dataclass
¶
PandasTaMacdExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, fast: int = 12, slow: int = 26, signal: int = 9, *, pandas_group_fn: PandasGroupFn | None = None)
Bases: PandasTaExtractor
__post_init__ ¶
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
Pandas-TA Base¶
signalflow.feature.pandasta.pandas_ta_extractor.PandasTaExtractor
dataclass
¶
PandasTaExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, *, pandas_group_fn: PandasGroupFn | None = None)
Bases: PandasFeatureExtractor
Polars-first Pandas-TA adapter.
This extractor runs pandas-ta inside pandas_group_fn per (pair, resample_offset) group,
then merges produced feature columns back into the Polars pipeline.
Key guarantees
- pandas-ta output is normalized to pd.DataFrame
- output length matches input group length
- output columns are namespaced to avoid collisions across extractors
additional_inputs
class-attribute
instance-attribute
¶
__post_init__ ¶
Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
_namespace_columns ¶
Prefix output columns to avoid collisions across different indicators/extractors.
Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
_normalize_output ¶
Normalize pandas-ta output to pd.DataFrame and ensure length matches group.