Skip to content

Feature Module

Feature extraction for technical indicators and derived metrics.

Base Classes

signalflow.feature.base.FeatureExtractor dataclass

FeatureExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False)

Bases: ABC

Base class for Polars-first feature extraction.

Extracts technical indicators and derived features from raw OHLCV data with optional sliding window resampling for multi-timeframe features.

Key features
  • Polars-native for performance
  • Optional sliding window resampling (e.g., 5m bars from 1m bars)
  • Per-pair, per-offset-window processing
  • Length-preserving operations
  • Automatic projection (keep only new features)
Processing pipeline
  1. Sort by (pair, timestamp)
  2. Add resample_offset column
  3. (optional) Apply sliding resample
  4. (optional) Filter to last offset
  5. Group by (pair, resample_offset) and compute features
  6. Sort output
  7. Project columns (keep input or features only)

Attributes:

Name Type Description
offset_window int

Sliding window size in bars. Default: 1.

compute_last_offset bool

Keep only last offset. Default: False.

pair_col str

Trading pair column. Default: "pair".

ts_col str

Timestamp column. Default: "timestamp".

offset_col str

Offset tracking column. Default: "resample_offset".

use_resample bool

Apply sliding resample. Default: False.

resample_mode Literal['add', 'replace']

Resample mode. Default: "add".

resample_prefix str | None

Prefix for resampled columns. Default: None.

raw_data_type RawDataType

Type of raw data. Default: SPOT.

component_type ClassVar[SfComponentType]

Always FEATURE_EXTRACTOR.

keep_input_columns bool

Keep all input columns. Default: False.

Example
from signalflow.feature import FeatureExtractor
import polars as pl

class RsiExtractor(FeatureExtractor):
    '''RSI indicator extractor'''

    def __init__(self, window: int = 14, column: str = "close"):
        super().__init__()
        self.window = window
        self.column = column

    def compute_group(self, group_df, data_context=None):
        # Compute RSI per group
        delta = group_df.select(pl.col(self.column).diff().alias("delta"))

        gain = delta.select(
            pl.when(pl.col("delta") > 0)
            .then(pl.col("delta"))
            .otherwise(0)
            .alias("gain")
        )

        loss = delta.select(
            pl.when(pl.col("delta") < 0)
            .then(-pl.col("delta"))
            .otherwise(0)
            .alias("loss")
        )

        avg_gain = gain.select(
            pl.col("gain").rolling_mean(self.window).alias("avg_gain")
        )

        avg_loss = loss.select(
            pl.col("loss").rolling_mean(self.window).alias("avg_loss")
        )

        rs = avg_gain.select(
            (pl.col("avg_gain") / pl.col("avg_loss")).alias("rs")
        )

        rsi = group_df.with_columns([
            (100 - (100 / (1 + rs.get_column("rs")))).alias(f"rsi_{self.window}")
        ])

        return rsi

# Usage
extractor = RsiExtractor(window=14)
features = extractor.extract(ohlcv_df)
Note

compute_group() must preserve row count (length-preserving). All timestamps must be timezone-naive. For multi-timeframe features, use use_resample=True.

See Also

RollingAggregator: Sliding window resampler. FeatureSet: Orchestrates multiple extractors.

_resampler property

_resampler: RollingAggregator

Get configured RollingAggregator instance.

Returns:

Name Type Description
RollingAggregator RollingAggregator

Resampler with current configuration.

component_type class-attribute

component_type: SfComponentType = FEATURE_EXTRACTOR

compute_last_offset class-attribute instance-attribute

compute_last_offset: bool = False

keep_input_columns class-attribute instance-attribute

keep_input_columns: bool = False

offset_col class-attribute instance-attribute

offset_col: str = 'resample_offset'

offset_window class-attribute instance-attribute

offset_window: int = 1

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

raw_data_type class-attribute instance-attribute

raw_data_type: RawDataType = SPOT

resample_mode class-attribute instance-attribute

resample_mode: Literal['add', 'replace'] = 'add'

resample_prefix class-attribute instance-attribute

resample_prefix: str | None = None

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

use_resample class-attribute instance-attribute

use_resample: bool = False

__post_init__

__post_init__() -> None

Validate configuration after initialization.

Raises:

Type Description
ValueError

If offset_window <= 0, invalid resample_mode, or wrong offset_col.

TypeError

If column names not strings.

Source code in src/signalflow/feature/base.py
def __post_init__(self) -> None:
    """Validate configuration after initialization.

    Raises:
        ValueError: If offset_window <= 0, invalid resample_mode, or wrong offset_col.
        TypeError: If column names not strings.
    """
    if self.offset_window <= 0:
        raise ValueError(f"offset_window must be > 0, got {self.offset_window}")

    if self.resample_mode not in ("add", "replace"):
        raise ValueError(f"Invalid resample_mode: {self.resample_mode}")

    if self.offset_col != RollingAggregator.OFFSET_COL:
        raise ValueError(
            f"offset_col must be '{RollingAggregator.OFFSET_COL}', got '{self.offset_col}'"
        )

    if not isinstance(self.pair_col, str) or not isinstance(self.ts_col, str) or not isinstance(self.offset_col, str):
        raise TypeError("pair_col/ts_col/offset_col must be str")

_validate_input

_validate_input(df: DataFrame) -> None

Validate input DataFrame has required columns.

Parameters:

Name Type Description Default
df DataFrame

Input to validate.

required

Raises:

Type Description
ValueError

If required columns missing.

Source code in src/signalflow/feature/base.py
def _validate_input(self, df: pl.DataFrame) -> None:
    """Validate input DataFrame has required columns.

    Args:
        df (pl.DataFrame): Input to validate.

    Raises:
        ValueError: If required columns missing.
    """
    missing = [c for c in (self.pair_col, self.ts_col) if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

compute_group

compute_group(group_df: DataFrame, data_context: dict[str, Any] | None) -> pl.DataFrame

Compute features for single (pair, resample_offset) group.

Core feature extraction logic - must be implemented by subclasses.

CRITICAL: Must preserve row count (len(output) == len(input)). Should preserve ordering within group.

Parameters:

Name Type Description Default
group_df DataFrame

Single group's data, sorted by timestamp.

required
data_context dict[str, Any] | None

Additional context.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Same length as input with added feature columns.

Example
def compute_group(self, group_df, data_context=None):
    # Simple moving average
    return group_df.with_columns([
        pl.col("close")
        .rolling_mean(self.window)
        .alias(f"sma_{self.window}")
    ])

# Multiple features
def compute_group(self, group_df, data_context=None):
    return group_df.with_columns([
        pl.col("close").rolling_mean(10).alias("sma_10"),
        pl.col("close").rolling_mean(20).alias("sma_20"),
        pl.col("high").rolling_max(14).alias("high_14"),
        pl.col("low").rolling_min(14).alias("low_14")
    ])
Note

Output must have same height as input (length-preserving). Use rolling operations for windowed features. First N-1 bars may have null values for N-period indicators.

Source code in src/signalflow/feature/base.py
def compute_group(
    self,
    group_df: pl.DataFrame,
    data_context: dict[str, Any] | None,
) -> pl.DataFrame:
    """Compute features for single (pair, resample_offset) group.

    Core feature extraction logic - must be implemented by subclasses.

    CRITICAL: Must preserve row count (len(output) == len(input)).
    Should preserve ordering within group.

    Args:
        group_df (pl.DataFrame): Single group's data, sorted by timestamp.
        data_context (dict[str, Any] | None): Additional context.

    Returns:
        pl.DataFrame: Same length as input with added feature columns.

    Example:
        ```python
        def compute_group(self, group_df, data_context=None):
            # Simple moving average
            return group_df.with_columns([
                pl.col("close")
                .rolling_mean(self.window)
                .alias(f"sma_{self.window}")
            ])

        # Multiple features
        def compute_group(self, group_df, data_context=None):
            return group_df.with_columns([
                pl.col("close").rolling_mean(10).alias("sma_10"),
                pl.col("close").rolling_mean(20).alias("sma_20"),
                pl.col("high").rolling_max(14).alias("high_14"),
                pl.col("low").rolling_min(14).alias("low_14")
            ])
        ```

    Note:
        Output must have same height as input (length-preserving).
        Use rolling operations for windowed features.
        First N-1 bars may have null values for N-period indicators.
    """
    raise NotImplementedError

extract

extract(df: DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame

Extract features from input DataFrame.

Main entry point - handles sorting, resampling, grouping, and projection.

Processing pipeline
  1. Validate input (required columns)
  2. Sort by (pair, timestamp)
  3. Add resample_offset column if missing
  4. (optional) Apply sliding resample
  5. (optional) Filter to last offset
  6. Group by (pair, resample_offset) and compute features
  7. Sort output
  8. Project to output columns

Parameters:

Name Type Description Default
df DataFrame

Input OHLCV data with pair and timestamp columns.

required
data_context dict[str, Any] | None

Additional context for computation.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Features DataFrame with columns: - pair, timestamp (always included) - feature columns (from compute_group)

Raises:

Type Description
TypeError

If df not pl.DataFrame or compute_group returns wrong type.

ValueError

If compute_group changes row count or columns missing.

Example
# Basic extraction
features = extractor.extract(ohlcv_df)

# With resampling (5m from 1m)
extractor = RsiExtractor(
    window=14,
    offset_window=5,
    use_resample=True
)
features = extractor.extract(ohlcv_df)

# Keep input columns
extractor.keep_input_columns = True
features_with_ohlcv = extractor.extract(ohlcv_df)
Note

Only accepts pl.DataFrame (Polars-first design). Use PandasFeatureExtractor adapter for Pandas data.

Source code in src/signalflow/feature/base.py
def extract(self, df: pl.DataFrame, data_context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Extract features from input DataFrame.

    Main entry point - handles sorting, resampling, grouping, and projection.

    Processing pipeline:
        1. Validate input (required columns)
        2. Sort by (pair, timestamp)
        3. Add resample_offset column if missing
        4. (optional) Apply sliding resample
        5. (optional) Filter to last offset
        6. Group by (pair, resample_offset) and compute features
        7. Sort output
        8. Project to output columns

    Args:
        df (pl.DataFrame): Input OHLCV data with pair and timestamp columns.
        data_context (dict[str, Any] | None): Additional context for computation.

    Returns:
        pl.DataFrame: Features DataFrame with columns:
            - pair, timestamp (always included)
            - feature columns (from compute_group)

    Raises:
        TypeError: If df not pl.DataFrame or compute_group returns wrong type.
        ValueError: If compute_group changes row count or columns missing.

    Example:
        ```python
        # Basic extraction
        features = extractor.extract(ohlcv_df)

        # With resampling (5m from 1m)
        extractor = RsiExtractor(
            window=14,
            offset_window=5,
            use_resample=True
        )
        features = extractor.extract(ohlcv_df)

        # Keep input columns
        extractor.keep_input_columns = True
        features_with_ohlcv = extractor.extract(ohlcv_df)
        ```

    Note:
        Only accepts pl.DataFrame (Polars-first design).
        Use PandasFeatureExtractor adapter for Pandas data.
    """
    if not isinstance(df, pl.DataFrame):
        raise TypeError(
            f"{self.__class__.__name__} is polars-first and accepts only pl.DataFrame. "
            f"Got: {type(df)}. Use an adapter for other dataframe types."
        )
    self._validate_input(df)

    df0 = df.sort([self.pair_col, self.ts_col])

    if self.offset_col not in df0.columns:
        df0 = self._resampler.add_offset_column(df0)

    if self.use_resample:
        df0 = self._resampler.resample(df0)

    if self.compute_last_offset:
        last_off = self._resampler.get_last_offset(df0)
        df0 = df0.filter(pl.col(self.offset_col) == last_off)

    prepared_cols = set(df0.columns)
    inferred_features: set[str] = set()

    def _wrapped(g: pl.DataFrame) -> pl.DataFrame:
        nonlocal inferred_features

        in_cols = set(g.columns)
        out = self.compute_group(g, data_context=data_context)

        if not isinstance(out, pl.DataFrame):
            raise TypeError(f"{self.__class__.__name__}.compute_pl_group must return pl.DataFrame")

        if out.height != g.height:
            raise ValueError(
                f"{self.__class__.__name__}: len(output_group)={out.height} != len(input_group)={g.height}"
            )

        if not inferred_features:
            inferred_features = set(out.columns) - in_cols

        return out

    out = (
        df0.group_by(self.pair_col, self.offset_col, maintain_order=True)
        .map_groups(_wrapped)
        .sort([self.pair_col, self.ts_col])
    )

    if self.keep_input_columns:
        return out

    feature_cols = sorted(set(out.columns) - prepared_cols)
    keep_cols = [self.pair_col, self.ts_col] + feature_cols

    missing = [c for c in keep_cols if c not in out.columns]
    if missing:
        raise ValueError(f"Projection error, missing columns: {missing}")

    return out.select(keep_cols)

signalflow.feature.feature_set.FeatureSet dataclass

FeatureSet(extractors: list[FeatureExtractor], parallel: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp')

Polars-first orchestrator for multiple feature extractors.

Combines independent feature extractors via outer join on (pair, timestamp). Each extractor fetches its required data, computes features, and results are merged into single DataFrame.

Key features
  • Automatic data fetching per extractor
  • Timezone normalization (all → naive)
  • Outer join on (pair, timestamp) for alignment
  • Duplicate feature column detection
  • Consistent index columns across extractors
Processing flow

For each extractor: 1. Fetch appropriate raw data as Polars 2. Run extractor.extract() 3. Normalize timestamps to timezone-naive 4. Validate index columns present Then: 5. Outer join all results on (pair, timestamp)

Attributes:

Name Type Description
extractors list[FeatureExtractor]

Feature extractors to orchestrate.

parallel bool

Parallel execution flag (not yet implemented). Default: False.

pair_col str

Trading pair column name. Default: "pair".

ts_col str

Timestamp column name. Default: "timestamp".

Example
from signalflow.feature import FeatureSet, SmaExtractor, RsiExtractor

# Create feature set
feature_set = FeatureSet([
    SmaExtractor(window=10, column="close"),
    SmaExtractor(window=20, column="close"),
    RsiExtractor(window=14, column="close")
])

# Extract all features at once
from signalflow.core import RawDataView
view = RawDataView(raw=raw_data)
features = feature_set.extract(view)

# Result has: pair, timestamp, sma_10, sma_20, rsi_14
print(features.columns)
# ['pair', 'timestamp', 'sma_10', 'sma_20', 'rsi_14']
Example
# With multi-timeframe features
feature_set = FeatureSet([
    # 1-minute features
    SmaExtractor(window=10, offset_window=1),
    # 5-minute features
    SmaExtractor(
        window=10,
        offset_window=5,
        use_resample=True,
        resample_prefix="5m_"
    )
])

features = feature_set.extract(view)
# Has both 1m and 5m features aligned
Note

All extractors must use same pair_col and ts_col. Feature column names must be unique across extractors. Timestamps automatically normalized to timezone-naive.

See Also

FeatureExtractor: Base class for individual extractors. RawDataView: Provides data in required format.

extractors instance-attribute

extractors: list[FeatureExtractor]

pair_col class-attribute instance-attribute

pair_col: str = 'pair'

parallel class-attribute instance-attribute

parallel: bool = False

ts_col class-attribute instance-attribute

ts_col: str = 'timestamp'

__post_init__

__post_init__() -> None

Validate extractors configuration.

Checks
  • At least one extractor provided
  • All extractors use same pair_col
  • All extractors use same ts_col

Raises:

Type Description
ValueError

If validation fails.

Source code in src/signalflow/feature/feature_set.py
def __post_init__(self) -> None:
    """Validate extractors configuration.

    Checks:
        - At least one extractor provided
        - All extractors use same pair_col
        - All extractors use same ts_col

    Raises:
        ValueError: If validation fails.
    """
    if not self.extractors:
        raise ValueError("At least one extractor must be provided")

    for ex in self.extractors:
        if getattr(ex, "pair_col", self.pair_col) != self.pair_col:
            raise ValueError(
                f"All extractors must use pair_col='{self.pair_col}'. "
                f"{ex.__class__.__name__} uses '{getattr(ex, 'pair_col', None)}'"
            )
        if getattr(ex, "ts_col", self.ts_col) != self.ts_col:
            raise ValueError(
                f"All extractors must use ts_col='{self.ts_col}'. "
                f"{ex.__class__.__name__} uses '{getattr(ex, 'ts_col', None)}'"
            )

_combine_features

_combine_features(feature_dfs: list[DataFrame]) -> pl.DataFrame

Combine feature DataFrames via outer join.

Merges all feature DataFrames on (pair, timestamp) index. Detects and rejects duplicate feature column names.

Parameters:

Name Type Description Default
feature_dfs list[DataFrame]

Feature DataFrames to combine.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Combined features with outer join semantics.

Raises:

Type Description
ValueError

If no DataFrames or duplicate feature columns found.

Example
# Internal usage
df1 = pl.DataFrame({"pair": ["BTC"], "timestamp": [t1], "sma_10": [45000]})
df2 = pl.DataFrame({"pair": ["BTC"], "timestamp": [t1], "rsi_14": [50]})
combined = self._combine_features([df1, df2])
# Result: pair, timestamp, sma_10, rsi_14
Note

Outer join preserves all (pair, timestamp) from all extractors. Duplicate columns trigger error - use unique prefixes.

Source code in src/signalflow/feature/feature_set.py
def _combine_features(self, feature_dfs: list[pl.DataFrame]) -> pl.DataFrame:
    """Combine feature DataFrames via outer join.

    Merges all feature DataFrames on (pair, timestamp) index.
    Detects and rejects duplicate feature column names.

    Args:
        feature_dfs (list[pl.DataFrame]): Feature DataFrames to combine.

    Returns:
        pl.DataFrame: Combined features with outer join semantics.

    Raises:
        ValueError: If no DataFrames or duplicate feature columns found.

    Example:
        ```python
        # Internal usage
        df1 = pl.DataFrame({"pair": ["BTC"], "timestamp": [t1], "sma_10": [45000]})
        df2 = pl.DataFrame({"pair": ["BTC"], "timestamp": [t1], "rsi_14": [50]})
        combined = self._combine_features([df1, df2])
        # Result: pair, timestamp, sma_10, rsi_14
        ```

    Note:
        Outer join preserves all (pair, timestamp) from all extractors.
        Duplicate columns trigger error - use unique prefixes.
    """
    if not feature_dfs:
        raise ValueError("No feature DataFrames to combine")

    combined = feature_dfs[0]

    for right in feature_dfs[1:]:
        right_feature_cols = [c for c in right.columns if c not in (self.pair_col, self.ts_col)]
        dup = set(right_feature_cols).intersection(set(combined.columns))
        if dup:
            raise ValueError(
                f"Duplicate feature columns during FeatureSet combine: {sorted(dup)}. "
                f"Rename features or set unique prefixes."
            )

        combined = combined.join(right, on=[self.pair_col, self.ts_col], how="outer", coalesce=True)

    return combined

_get_input_df

_get_input_df(raw_data: RawDataView, extractor: FeatureExtractor) -> pl.DataFrame

Fetch input data for extractor in Polars format.

Determines required data type from extractor.raw_data_type and fetches as Polars DataFrame (canonical format).

Parameters:

Name Type Description Default
raw_data RawDataView

Data view.

required
extractor FeatureExtractor

Extractor needing data.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Raw data in Polars format.

Note

Always returns Polars (Polars-first design). Falls back to string "polars" for backward compatibility.

Source code in src/signalflow/feature/feature_set.py
def _get_input_df(self, raw_data: RawDataView, extractor: FeatureExtractor) -> pl.DataFrame:
    """Fetch input data for extractor in Polars format.

    Determines required data type from extractor.raw_data_type and
    fetches as Polars DataFrame (canonical format).

    Args:
        raw_data (RawDataView): Data view.
        extractor (FeatureExtractor): Extractor needing data.

    Returns:
        pl.DataFrame: Raw data in Polars format.

    Note:
        Always returns Polars (Polars-first design).
        Falls back to string "polars" for backward compatibility.
    """
    raw_data_type = getattr(extractor, "raw_data_type", RawDataType.SPOT)

    try:
        return raw_data.get_data(raw_data_type, DataFrameType.POLARS)
    except TypeError:
        return raw_data.get_data(raw_data_type, "polars")

_normalize_index

_normalize_index(df: DataFrame) -> pl.DataFrame

Normalize timestamp to timezone-naive.

Ensures consistent timezone handling across all extractors.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to normalize.

required

Returns:

Type Description
DataFrame

pl.DataFrame: DataFrame with timezone-naive timestamps.

Source code in src/signalflow/feature/feature_set.py
def _normalize_index(self, df: pl.DataFrame) -> pl.DataFrame:
    """Normalize timestamp to timezone-naive.

    Ensures consistent timezone handling across all extractors.

    Args:
        df (pl.DataFrame): DataFrame to normalize.

    Returns:
        pl.DataFrame: DataFrame with timezone-naive timestamps.
    """
    if self.ts_col in df.columns:
        ts_dtype = df.schema.get(self.ts_col)
        if isinstance(ts_dtype, pl.Datetime) and ts_dtype.time_zone is not None:
            df = df.with_columns(pl.col(self.ts_col).dt.replace_time_zone(None))
    return df

extract

extract(raw_data: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame

Extract and combine features from all extractors.

Main entry point - orchestrates extraction and merging.

Processing
  1. For each extractor:
    • Fetch appropriate data format
    • Run extraction
    • Normalize timestamps
    • Validate output
  2. Outer join all results on (pair, timestamp)
  3. Detect duplicate feature columns

Parameters:

Name Type Description Default
raw_data RawDataView

View to raw market data.

required
context dict[str, Any] | None

Additional context passed to extractors.

None

Returns:

Type Description
DataFrame

pl.DataFrame: Combined features with columns: - pair, timestamp (index) - feature columns from all extractors

Raises:

Type Description
ValueError

If no extractors or duplicate feature columns.

TypeError

If extractor doesn't return pl.DataFrame.

Example
from signalflow.core import RawData, RawDataView

# Create view
view = RawDataView(raw=raw_data)

# Extract features
features = feature_set.extract(view)

# Check result
print(f"Features: {features.columns}")
print(f"Shape: {features.shape}")

# With context
features = feature_set.extract(
    view,
    context={"lookback_bars": 100}
)
Note

Outer join means all (pair, timestamp) combinations preserved. Missing features filled with null for non-matching timestamps.

Source code in src/signalflow/feature/feature_set.py
def extract(self, raw_data: RawDataView, context: dict[str, Any] | None = None) -> pl.DataFrame:
    """Extract and combine features from all extractors.

    Main entry point - orchestrates extraction and merging.

    Processing:
        1. For each extractor:
            - Fetch appropriate data format
            - Run extraction
            - Normalize timestamps
            - Validate output
        2. Outer join all results on (pair, timestamp)
        3. Detect duplicate feature columns

    Args:
        raw_data (RawDataView): View to raw market data.
        context (dict[str, Any] | None): Additional context passed to extractors.

    Returns:
        pl.DataFrame: Combined features with columns:
            - pair, timestamp (index)
            - feature columns from all extractors

    Raises:
        ValueError: If no extractors or duplicate feature columns.
        TypeError: If extractor doesn't return pl.DataFrame.

    Example:
        ```python
        from signalflow.core import RawData, RawDataView

        # Create view
        view = RawDataView(raw=raw_data)

        # Extract features
        features = feature_set.extract(view)

        # Check result
        print(f"Features: {features.columns}")
        print(f"Shape: {features.shape}")

        # With context
        features = feature_set.extract(
            view,
            context={"lookback_bars": 100}
        )
        ```

    Note:
        Outer join means all (pair, timestamp) combinations preserved.
        Missing features filled with null for non-matching timestamps.
    """
    feature_dfs: list[pl.DataFrame] = []

    for extractor in self.extractors:
        input_df = self._get_input_df(raw_data, extractor)

        result_df = extractor.extract(input_df, data_context=context)
        if not isinstance(result_df, pl.DataFrame):
            raise TypeError(
                f"{extractor.__class__.__name__}.extract must return pl.DataFrame, got {type(result_df)}"
            )

        result_df = self._normalize_index(result_df)

        if self.pair_col not in result_df.columns or self.ts_col not in result_df.columns:
            raise ValueError(
                f"{extractor.__class__.__name__} returned no index columns "
                f"('{self.pair_col}', '{self.ts_col}'). "
                f"FeatureSet requires index columns to combine features."
            )

        feature_dfs.append(result_df)

    return self._combine_features(feature_dfs)

Smoothing Extractors

signalflow.feature.smoother.sma_extractor.SmaExtractor dataclass

SmaExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = True, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, sma_period: int = 20, price_col: str = 'close', out_col: str = 'sma')

Bases: FeatureExtractor

SMA per (pair, resample_offset) group.

Notes: - offset_window here is for RollingAggregator (your framework requirement). SMA window is sma_period. - In v1 you said only spot -> keep data_type="spot" by default.

offset_window class-attribute instance-attribute

offset_window: int = 1

out_col class-attribute instance-attribute

out_col: str = 'sma'

price_col class-attribute instance-attribute

price_col: str = 'close'

sma_period class-attribute instance-attribute

sma_period: int = 20

use_resample class-attribute instance-attribute

use_resample: bool = True

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/smoother/sma_extractor.py
def __post_init__(self) -> None:
    super().__post_init__()
    if self.sma_period <= 0:
        raise ValueError(f"sma_period must be > 0, got {self.sma_period}")
    if not self.out_col:
        self.out_col = "sma"

compute_group

compute_group(group_df: DataFrame, data_context: dict | None) -> pl.DataFrame
Source code in src/signalflow/feature/smoother/sma_extractor.py
def compute_group(self, group_df: pl.DataFrame, data_context: dict | None) -> pl.DataFrame:
    if self.price_col not in group_df.columns:
        raise ValueError(f"Missing required column: {self.price_col}")

    sma = (
        pl.col(self.price_col)
        .rolling_mean(window_size=self.sma_period, min_samples=self.sma_period)
        .alias(self.out_col)
    )
    return group_df.with_columns(sma)

Pandas-TA Extractors

signalflow.feature.pandasta.top_pandasta_extractors.PandasTaRsiExtractor dataclass

PandasTaRsiExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, length: int = 14, *, pandas_group_fn: PandasGroupFn | None = None)

Bases: PandasTaExtractor

length class-attribute instance-attribute

length: int = 14

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
def __post_init__(self) -> None:
    self.indicator = "rsi"
    self.params = {"length": int(self.length)}
    self.input_column = "close"
    self.additional_inputs = {}
    self.feature_prefix = "rsi"
    super().__post_init__()

signalflow.feature.pandasta.top_pandasta_extractors.PandasTaBbandsExtractor dataclass

PandasTaBbandsExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, length: int = 20, std: float = 2.0, *, pandas_group_fn: PandasGroupFn | None = None)

Bases: PandasTaExtractor

length class-attribute instance-attribute

length: int = 20

std class-attribute instance-attribute

std: float = 2.0

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
def __post_init__(self) -> None:
    self.indicator = "bbands"
    self.params = {"length": int(self.length), "std": float(self.std)}
    self.input_column = "close"
    self.additional_inputs = {}
    self.feature_prefix = f"bbands_{int(self.length)}_{float(self.std)}"
    super().__post_init__()

signalflow.feature.pandasta.top_pandasta_extractors.PandasTaAtrExtractor dataclass

PandasTaAtrExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, length: int = 14, *, pandas_group_fn: PandasGroupFn | None = None)

Bases: PandasTaExtractor

length class-attribute instance-attribute

length: int = 14

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
def __post_init__(self) -> None:
    self.indicator = "atr"
    self.params = {"length": int(self.length)}
    self.input_column = "high"
    self.additional_inputs = {"low": "low", "close": "close"}
    self.feature_prefix = f"atr_{int(self.length)}"
    super().__post_init__()

signalflow.feature.pandasta.top_pandasta_extractors.PandasTaMacdExtractor dataclass

PandasTaMacdExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, fast: int = 12, slow: int = 26, signal: int = 9, *, pandas_group_fn: PandasGroupFn | None = None)

Bases: PandasTaExtractor

fast class-attribute instance-attribute

fast: int = 12

signal class-attribute instance-attribute

signal: int = 9

slow class-attribute instance-attribute

slow: int = 26

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/pandasta/top_pandasta_extractors.py
def __post_init__(self) -> None:
    self.indicator = "macd"
    self.params = {"fast": int(self.fast), "slow": int(self.slow), "signal": int(self.signal)}
    self.input_column = "close"
    self.additional_inputs = {}
    self.feature_prefix = f"macd_{int(self.fast)}_{int(self.slow)}_{int(self.signal)}"
    super().__post_init__()

Pandas-TA Base

signalflow.feature.pandasta.pandas_ta_extractor.PandasTaExtractor dataclass

PandasTaExtractor(offset_window: int = 1, compute_last_offset: bool = False, pair_col: str = 'pair', ts_col: str = 'timestamp', offset_col: str = 'resample_offset', use_resample: bool = False, resample_mode: Literal['add', 'replace'] = 'add', resample_prefix: str | None = None, raw_data_type: RawDataType = RawDataType.SPOT, keep_input_columns: bool = False, out_cols: list[str] | None = None, series_name: str = 'feature', rename_outputs: dict[str, str] = dict(), indicator: str = 'rsi', params: dict[str, Any] = dict(), input_column: str = 'close', additional_inputs: dict[str, str] = dict(), feature_prefix: str | None = None, *, pandas_group_fn: PandasGroupFn | None = None)

Bases: PandasFeatureExtractor

Polars-first Pandas-TA adapter.

This extractor runs pandas-ta inside pandas_group_fn per (pair, resample_offset) group, then merges produced feature columns back into the Polars pipeline.

Key guarantees
  • pandas-ta output is normalized to pd.DataFrame
  • output length matches input group length
  • output columns are namespaced to avoid collisions across extractors

additional_inputs class-attribute instance-attribute

additional_inputs: dict[str, str] = field(default_factory=dict)

feature_prefix class-attribute instance-attribute

feature_prefix: str | None = None

indicator class-attribute instance-attribute

indicator: str = 'rsi'

input_column class-attribute instance-attribute

input_column: str = 'close'

params class-attribute instance-attribute

params: dict[str, Any] = field(default_factory=dict)

__post_init__

__post_init__() -> None
Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
def __post_init__(self) -> None:
    try:
        import pandas_ta as _  
    except ImportError as e:
        raise ImportError("pandas-ta is required. Install with: pip install pandas-ta") from e

    if not isinstance(self.indicator, str) or not self.indicator.strip():
        raise ValueError("indicator name must be a non-empty string")

    if not isinstance(self.input_column, str) or not self.input_column.strip():
        raise ValueError("input_column must be a non-empty string")

    if not isinstance(self.params, dict):
        raise TypeError(f"params must be dict[str, Any], got {type(self.params)}")

    if not isinstance(self.additional_inputs, dict):
        raise TypeError(f"additional_inputs must be dict[str, str], got {type(self.additional_inputs)}")

    for k, v in self.additional_inputs.items():
        if not isinstance(k, str) or not k.strip():
            raise TypeError(f"additional_inputs keys must be non-empty str, got {k!r}")
        if not isinstance(v, str) or not v.strip():
            raise TypeError(f"additional_inputs values must be non-empty str column names, got {v!r}")

    self.pandas_group_fn = self._pandas_ta_group_fn

    super().__post_init__()

_namespace_columns

_namespace_columns(df: DataFrame) -> pd.DataFrame

Prefix output columns to avoid collisions across different indicators/extractors.

Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
def _namespace_columns(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Prefix output columns to avoid collisions across different indicators/extractors.
    """
    prefix = self.feature_prefix or self.indicator
    prefix = str(prefix).strip()

    df = df.copy()
    new_cols: list[str] = []
    for i, c in enumerate(df.columns):
        name = str(c) if c is not None else f"{self.indicator}_{i}"
        name = name.strip() or f"{self.indicator}_{i}"

        if name == prefix or name.startswith(prefix + "_"):
            new_cols.append(name)
        else:
            new_cols.append(f"{prefix}_{name}")

    df.columns = new_cols
    return df

_normalize_output

_normalize_output(out: Any, group_len: int) -> pd.DataFrame

Normalize pandas-ta output to pd.DataFrame and ensure length matches group.

Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
def _normalize_output(self, out: Any, group_len: int) -> pd.DataFrame:
    """
    Normalize pandas-ta output to pd.DataFrame and ensure length matches group.
    """
    if isinstance(out, pd.Series):
        out_df = out.to_frame()
        col = out_df.columns[0]
        if col is None or (isinstance(col, str) and not col.strip()):
            out_df.columns = [self.indicator]
    elif isinstance(out, pd.DataFrame):
        out_df = out
        if out_df.columns.isnull().any():
            out_df = out_df.copy()
            out_df.columns = [
                c if (c is not None and (not isinstance(c, str) or c.strip())) else f"{self.indicator}_{i}"
                for i, c in enumerate(out_df.columns)
            ]
    else:
        raise TypeError(
            f"pandas-ta '{self.indicator}' returned unsupported type: {type(out)}. "
            f"Expected pd.Series or pd.DataFrame."
        )

    if len(out_df) != group_len:
        raise ValueError(
            f"{self.__class__.__name__}: len(output_group)={len(out_df)} != len(input_group)={group_len}"
        )

    return out_df

_pandas_ta_group_fn

_pandas_ta_group_fn(group: DataFrame, ctx: dict[str, Any] | None) -> pd.DataFrame
Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
def _pandas_ta_group_fn(self, group: pd.DataFrame, ctx: dict[str, Any] | None) -> pd.DataFrame:
    import pandas_ta as ta

    self._validate_required_columns(group)

    try:
        indicator_func = getattr(ta, self.indicator)
    except AttributeError as e:
        raise AttributeError(f"Indicator '{self.indicator}' not found in pandas-ta.") from e

    kwargs = dict(self.params)

    primary_input = group[self.input_column]
    for param_name, column_name in self.additional_inputs.items():
        kwargs[param_name] = group[column_name]

    out = indicator_func(primary_input, **kwargs)

    out_df = self._normalize_output(out, group_len=len(group))
    out_df = self._namespace_columns(out_df)

    return out_df

_validate_required_columns

_validate_required_columns(df: DataFrame) -> None
Source code in src/signalflow/feature/pandasta/pandas_ta_extractor.py
def _validate_required_columns(self, df: pd.DataFrame) -> None:
    required = [self.input_column, *self.additional_inputs.values()]
    missing = sorted(set(required) - set(df.columns))
    if missing:
        raise ValueError(f"Missing required columns for pandas-ta: {missing}")