Skip to content

Core Module

signalflow.core

RawData dataclass

RawData(datetime_start: datetime, datetime_end: datetime, pairs: list[str] = list(), data: dict[str, DataFrame] = dict())

Immutable container for raw market data.

Acts as a unified in-memory bundle for multiple raw datasets (e.g. spot prices, funding, trades, orderbook, signals).

Design principles
  • Canonical storage is dataset-based (dictionary by name)
  • Datasets accessed via string keys (e.g. raw_data["spot"])
  • No business logic or transformations
  • Immutability ensures reproducibility in pipelines

Attributes:

Name Type Description
datetime_start datetime

Start datetime of the data snapshot.

datetime_end datetime

End datetime of the data snapshot.

pairs list[str]

List of trading pairs in the snapshot.

data dict[str, DataFrame]

Dictionary of datasets keyed by name.

Example
from signalflow.core import RawData
import polars as pl
from datetime import datetime

# Create RawData with spot data
raw_data = RawData(
    datetime_start=datetime(2024, 1, 1),
    datetime_end=datetime(2024, 12, 31),
    pairs=["BTCUSDT", "ETHUSDT"],
    data={
        "spot": spot_dataframe,
        "signals": signals_dataframe,
    }
)

# Access datasets
spot_df = raw_data["spot"]
signals_df = raw_data.get("signals")

# Check if dataset exists
if "spot" in raw_data:
    print("Spot data available")
Note

Dataset schemas are defined by convention, not enforced. Views (pandas/polars) should be handled by RawDataView wrapper.

__contains__

__contains__(key: str) -> bool

Check if dataset exists.

Parameters:

Name Type Description Default
key str

Dataset name to check.

required

Returns:

Name Type Description
bool bool

True if dataset exists, False otherwise.

Example
if "spot" in raw_data:
    process_spot_data(raw_data["spot"])
Source code in src/signalflow/core/containers/raw_data.py
def __contains__(self, key: str) -> bool:
    """Check if dataset exists.

    Args:
        key (str): Dataset name to check.

    Returns:
        bool: True if dataset exists, False otherwise.

    Example:
        ```python
        if "spot" in raw_data:
            process_spot_data(raw_data["spot"])
        ```
    """
    return key in self.data

__getitem__

__getitem__(key: str) -> pl.DataFrame

Dictionary-style access to datasets.

Parameters:

Name Type Description Default
key str

Dataset name.

required

Returns:

Type Description
DataFrame

pl.DataFrame: Dataset as Polars DataFrame.

Example
spot_df = raw_data["spot"]
Source code in src/signalflow/core/containers/raw_data.py
def __getitem__(self, key: str) -> pl.DataFrame:
    """Dictionary-style access to datasets.

    Args:
        key (str): Dataset name.

    Returns:
        pl.DataFrame: Dataset as Polars DataFrame.

    Example:
        ```python
        spot_df = raw_data["spot"]
        ```
    """
    return self.get(key)

get

get(key: str) -> pl.DataFrame

Get dataset by key.

Parameters:

Name Type Description Default
key str

Dataset name (e.g. "spot", "signals").

required

Returns:

Type Description
DataFrame

pl.DataFrame: Polars DataFrame if exists, empty DataFrame otherwise.

Raises:

Type Description
TypeError

If dataset exists but is not a Polars DataFrame.

Example
spot_df = raw_data.get("spot")

# Returns empty DataFrame if key doesn't exist
missing_df = raw_data.get("nonexistent")
assert missing_df.is_empty()
Source code in src/signalflow/core/containers/raw_data.py
def get(self, key: str) -> pl.DataFrame:
    """Get dataset by key.

    Args:
        key (str): Dataset name (e.g. "spot", "signals").

    Returns:
        pl.DataFrame: Polars DataFrame if exists, empty DataFrame otherwise.

    Raises:
        TypeError: If dataset exists but is not a Polars DataFrame.

    Example:
        ```python
        spot_df = raw_data.get("spot")

        # Returns empty DataFrame if key doesn't exist
        missing_df = raw_data.get("nonexistent")
        assert missing_df.is_empty()
        ```
    """
    obj = self.data.get(key)
    if obj is None:
        return pl.DataFrame()
    if not isinstance(obj, pl.DataFrame):
        raise TypeError(
            f"Dataset '{key}' is not a polars.DataFrame: {type(obj)}"
        )
    return obj

items

items()

Return (key, dataset) pairs.

Returns:

Name Type Description
Iterator

Iterator over (key, DataFrame) tuples.

Example
for name, df in raw_data.items():
    print(f"{name}: {df.shape}")
Source code in src/signalflow/core/containers/raw_data.py
def items(self):
    """Return (key, dataset) pairs.

    Returns:
        Iterator: Iterator over (key, DataFrame) tuples.

    Example:
        ```python
        for name, df in raw_data.items():
            print(f"{name}: {df.shape}")
        ```
    """
    return self.data.items()

keys

keys() -> Iterator[str]

Return available dataset keys.

Returns:

Type Description
Iterator[str]

Iterator[str]: Iterator over dataset names.

Example
for key in raw_data.keys():
    print(f"Dataset: {key}")
Source code in src/signalflow/core/containers/raw_data.py
def keys(self) -> Iterator[str]:
    """Return available dataset keys.

    Returns:
        Iterator[str]: Iterator over dataset names.

    Example:
        ```python
        for key in raw_data.keys():
            print(f"Dataset: {key}")
        ```
    """
    return self.data.keys()

values

values()

Return dataset values.

Returns:

Name Type Description
Iterator

Iterator over DataFrames.

Example
for df in raw_data.values():
    print(df.columns)
Source code in src/signalflow/core/containers/raw_data.py
def values(self):
    """Return dataset values.

    Returns:
        Iterator: Iterator over DataFrames.

    Example:
        ```python
        for df in raw_data.values():
            print(df.columns)
        ```
    """
    return self.data.values()

Signals dataclass

Signals(value: DataFrame)

Immutable container for trading signals.

Canonical in-memory format is a Polars DataFrame with long schema.

Required columns
  • pair (str): Trading pair identifier
  • timestamp (datetime): Signal timestamp
  • signal_type (SignalType | int): Signal type (RISE, FALL, NONE)
  • signal (int | float): Signal value
Optional columns
  • probability (float): Signal probability (required for merge logic)

Attributes:

Name Type Description
value DataFrame

Polars DataFrame containing signal data.

Example
from signalflow.core import Signals, SignalType
import polars as pl
from datetime import datetime

# Create signals
signals_df = pl.DataFrame({
    "pair": ["BTCUSDT", "ETHUSDT"],
    "timestamp": [datetime.now(), datetime.now()],
    "signal_type": [SignalType.RISE.value, SignalType.FALL.value],
    "signal": [1, -1],
    "probability": [0.8, 0.7]
})

signals = Signals(signals_df)

# Apply transformation
filtered = signals.apply(filter_transform)

# Chain transformations
processed = signals.pipe(
    transform1,
    transform2,
    transform3
)

# Merge signals
combined = signals1 + signals2
Note

All transformations return new Signals instance. No in-place mutation is allowed.

__add__

__add__(other: 'Signals') -> 'Signals'

Merge two Signals objects.

Merge rules
  1. Key: (pair, timestamp)
  2. Signal type priority:
  3. SignalType.NONE has lowest priority
  4. Non-NONE always overrides NONE
  5. If both non-NONE, other wins
  6. SignalType.NONE normalized to probability = 0
  7. Merge is deterministic

Parameters:

Name Type Description Default
other Signals

Another Signals object to merge.

required

Returns:

Name Type Description
Signals 'Signals'

New merged Signals instance.

Raises:

Type Description
TypeError

If other is not a Signals instance.

Example
# Detector 1 signals
signals1 = detector1.run(data)

# Detector 2 signals
signals2 = detector2.run(data)

# Merge with priority to signals2
merged = signals1 + signals2

# NONE signals overridden by non-NONE
# Non-NONE conflicts resolved by taking signals2
Source code in src/signalflow/core/containers/signals.py
def __add__(self, other: "Signals") -> "Signals":
    """Merge two Signals objects.

    Merge rules:
        1. Key: (pair, timestamp)
        2. Signal type priority:
           - SignalType.NONE has lowest priority
           - Non-NONE always overrides NONE
           - If both non-NONE, `other` wins
        3. SignalType.NONE normalized to probability = 0
        4. Merge is deterministic

    Args:
        other (Signals): Another Signals object to merge.

    Returns:
        Signals: New merged Signals instance.

    Raises:
        TypeError: If other is not a Signals instance.

    Example:
        ```python
        # Detector 1 signals
        signals1 = detector1.run(data)

        # Detector 2 signals
        signals2 = detector2.run(data)

        # Merge with priority to signals2
        merged = signals1 + signals2

        # NONE signals overridden by non-NONE
        # Non-NONE conflicts resolved by taking signals2
        ```
    """
    if not isinstance(other, Signals):
        return NotImplemented

    a = self.value
    b = other.value

    all_cols = list(dict.fromkeys([*a.columns, *b.columns]))

    def align(df: pl.DataFrame) -> pl.DataFrame:
        return (
            df.with_columns(
                [pl.lit(None).alias(c) for c in all_cols if c not in df.columns]
            )
            .select(all_cols)
        )

    a = align(a).with_columns(pl.lit(0).alias("_src"))
    b = align(b).with_columns(pl.lit(1).alias("_src"))

    merged = pl.concat([a, b], how="vertical")

    merged = merged.with_columns(
        pl.when(pl.col("signal_type") == SignalType.NONE.value)
        .then(pl.lit(0))
        .otherwise(pl.col("probability"))
        .alias("probability")
    )

    merged = merged.with_columns(
        pl.when(pl.col("signal_type") == SignalType.NONE.value)
        .then(pl.lit(0))
        .otherwise(pl.lit(1))
        .alias("_priority")
    )

    merged = (
        merged
        .sort(
            ["pair", "timestamp", "_priority", "_src"],
            descending=[False, False, True, True],
        )
        .unique(
            subset=["pair", "timestamp"],
            keep="first",
        )
        .drop(["_priority", "_src"])
        .sort(["pair", "timestamp"])
    )

    return Signals(merged)

apply

apply(transform: SignalsTransform) -> 'Signals'

Apply a single transformation to signals.

Parameters:

Name Type Description Default
transform SignalsTransform

Callable transformation implementing SignalsTransform protocol.

required

Returns:

Name Type Description
Signals 'Signals'

New Signals instance with transformed data.

Example
from signalflow.core import Signals
import polars as pl

def filter_high_probability(df: pl.DataFrame) -> pl.DataFrame:
    return df.filter(pl.col("probability") > 0.7)

filtered = signals.apply(filter_high_probability)
Source code in src/signalflow/core/containers/signals.py
def apply(self, transform: SignalsTransform) -> "Signals":
    """Apply a single transformation to signals.

    Args:
        transform (SignalsTransform): Callable transformation implementing
            SignalsTransform protocol.

    Returns:
        Signals: New Signals instance with transformed data.

    Example:
        ```python
        from signalflow.core import Signals
        import polars as pl

        def filter_high_probability(df: pl.DataFrame) -> pl.DataFrame:
            return df.filter(pl.col("probability") > 0.7)

        filtered = signals.apply(filter_high_probability)
        ```
    """
    out = transform(self.value)
    return Signals(out)

pipe

pipe(*transforms: SignalsTransform) -> 'Signals'

Apply multiple transformations sequentially.

Parameters:

Name Type Description Default
*transforms SignalsTransform

Sequence of transformations to apply in order.

()

Returns:

Name Type Description
Signals 'Signals'

New Signals instance after applying all transformations.

Example
result = signals.pipe(
    filter_none_signals,
    normalize_probabilities,
    add_metadata
)
Source code in src/signalflow/core/containers/signals.py
def pipe(self, *transforms: SignalsTransform) -> "Signals":
    """Apply multiple transformations sequentially.

    Args:
        *transforms (SignalsTransform): Sequence of transformations to apply in order.

    Returns:
        Signals: New Signals instance after applying all transformations.

    Example:
        ```python
        result = signals.pipe(
            filter_none_signals,
            normalize_probabilities,
            add_metadata
        )
        ```
    """
    s = self
    for t in transforms:
        s = s.apply(t)
    return s

SignalType

Bases: str, Enum

Enumeration of signal types.

Represents the direction of a trading signal detected by signal detectors.

Values

NONE: No signal detected or neutral state. RISE: Bullish signal indicating potential price increase. FALL: Bearish signal indicating potential price decrease.

Example
from signalflow.core.enums import SignalType

# Check signal type
if signal_type == SignalType.RISE:
    print("Bullish signal detected")
elif signal_type == SignalType.FALL:
    print("Bearish signal detected")
else:
    print("No signal")

# Use in DataFrame
import polars as pl
signals_df = pl.DataFrame({
    "pair": ["BTCUSDT"],
    "timestamp": [datetime.now()],
    "signal_type": [SignalType.RISE.value]
})

# Compare with enum
is_rise = signals_df.filter(
    pl.col("signal_type") == SignalType.RISE.value
)
Note

Stored as string values in DataFrames for serialization. Use .value to get string representation.

SignalFlowRegistry dataclass

SignalFlowRegistry(_items: Dict[SfComponentType, Dict[str, Type[Any]]] = dict())

Component registry for dynamic component discovery and instantiation.

Provides centralized registration and lookup for SignalFlow components. Components are organized by type (DETECTOR, EXTRACTOR, etc.) and accessed by case-insensitive names.

Registry structure

component_type -> name -> class

Supported component types
  • DETECTOR: Signal detection classes
  • EXTRACTOR: Feature extraction classes
  • LABELER: Signal labeling classes
  • ENTRY_RULE: Position entry rules
  • EXIT_RULE: Position exit rules
  • METRIC: Strategy metrics
  • EXECUTOR: Order execution engines

Attributes:

Name Type Description
_items dict[SfComponentType, dict[str, Type[Any]]]

Internal storage mapping component types to name-class pairs.

Example
from signalflow.core.registry import SignalFlowRegistry
from signalflow.core.enums import SfComponentType

# Create registry
registry = SignalFlowRegistry()

# Register component
registry.register(
    SfComponentType.DETECTOR,
    name="sma_cross",
    cls=SmaCrossDetector
)

# Get component class
detector_cls = registry.get(SfComponentType.DETECTOR, "sma_cross")

# Instantiate component
detector = registry.create(
    SfComponentType.DETECTOR,
    "sma_cross",
    fast_window=10,
    slow_window=20
)

# List available components
detectors = registry.list(SfComponentType.DETECTOR)
print(f"Available detectors: {detectors}")

# Full snapshot
snapshot = registry.snapshot()
print(snapshot)
Note

Component names are stored and looked up in lowercase. Use default_registry singleton for application-wide registration.

See Also

sf_component: Decorator for automatic component registration.

create

create(component_type: SfComponentType, name: str, **kwargs: Any) -> Any

Instantiate a component by registry key.

Convenient method that combines get() and instantiation.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of component to create.

required
name str

Component name (case-insensitive).

required
**kwargs Any

Arguments to pass to component constructor.

{}

Returns:

Name Type Description
Any Any

Instantiated component.

Raises:

Type Description
KeyError

If component not found.

TypeError

If kwargs don't match component constructor.

Example
# Create detector with params
detector = registry.create(
    SfComponentType.DETECTOR,
    "sma_cross",
    fast_window=10,
    slow_window=20
)

# Create extractor
extractor = registry.create(
    SfComponentType.EXTRACTOR,
    "rsi",
    window=14
)

# Create with config dict
config = {"window": 20, "threshold": 0.7}
labeler = registry.create(
    SfComponentType.LABELER,
    "fixed",
    **config
)
Source code in src/signalflow/core/registry.py
def create(self, component_type: SfComponentType, name: str, **kwargs: Any) -> Any:
    """Instantiate a component by registry key.

    Convenient method that combines get() and instantiation.

    Args:
        component_type (SfComponentType): Type of component to create.
        name (str): Component name (case-insensitive).
        **kwargs: Arguments to pass to component constructor.

    Returns:
        Any: Instantiated component.

    Raises:
        KeyError: If component not found.
        TypeError: If kwargs don't match component constructor.

    Example:
        ```python
        # Create detector with params
        detector = registry.create(
            SfComponentType.DETECTOR,
            "sma_cross",
            fast_window=10,
            slow_window=20
        )

        # Create extractor
        extractor = registry.create(
            SfComponentType.EXTRACTOR,
            "rsi",
            window=14
        )

        # Create with config dict
        config = {"window": 20, "threshold": 0.7}
        labeler = registry.create(
            SfComponentType.LABELER,
            "fixed",
            **config
        )
        ```
    """
    cls = self.get(component_type, name)
    return cls(**kwargs)

get

get(component_type: SfComponentType, name: str) -> Type[Any]

Get a registered class by key.

Lookup is case-insensitive. Raises helpful error with available components if key not found.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of component to lookup.

required
name str

Component name (case-insensitive).

required

Returns:

Type Description
Type[Any]

Type[Any]: Registered class.

Raises:

Type Description
KeyError

If component not found. Error message includes available components.

Example
# Get component class
detector_cls = registry.get(SfComponentType.DETECTOR, "sma_cross")

# Case-insensitive
detector_cls = registry.get(SfComponentType.DETECTOR, "SMA_Cross")

# Instantiate manually
detector = detector_cls(fast_window=10, slow_window=20)

# Handle missing component
try:
    cls = registry.get(SfComponentType.DETECTOR, "unknown")
except KeyError as e:
    print(f"Component not found: {e}")
    # Shows: "Component not found: DETECTOR:unknown. Available: [sma_cross, ...]"
Source code in src/signalflow/core/registry.py
def get(self, component_type: SfComponentType, name: str) -> Type[Any]:
    """Get a registered class by key.

    Lookup is case-insensitive. Raises helpful error with available
    components if key not found.

    Args:
        component_type (SfComponentType): Type of component to lookup.
        name (str): Component name (case-insensitive).

    Returns:
        Type[Any]: Registered class.

    Raises:
        KeyError: If component not found. Error message includes available components.

    Example:
        ```python
        # Get component class
        detector_cls = registry.get(SfComponentType.DETECTOR, "sma_cross")

        # Case-insensitive
        detector_cls = registry.get(SfComponentType.DETECTOR, "SMA_Cross")

        # Instantiate manually
        detector = detector_cls(fast_window=10, slow_window=20)

        # Handle missing component
        try:
            cls = registry.get(SfComponentType.DETECTOR, "unknown")
        except KeyError as e:
            print(f"Component not found: {e}")
            # Shows: "Component not found: DETECTOR:unknown. Available: [sma_cross, ...]"
        ```
    """
    self._ensure(component_type)
    key = name.lower()
    try:
        return self._items[component_type][key]
    except KeyError as e:
        available = ", ".join(sorted(self._items[component_type]))
        raise KeyError(
            f"Component not found: {component_type.value}:{key}. Available: [{available}]"
        ) from e

list

list(component_type: SfComponentType) -> list[str]

List registered components for a type.

Returns sorted list of component names for given type.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of components to list.

required

Returns:

Type Description
list[str]

list[str]: Sorted list of registered component names.

Example
# List all detectors
detectors = registry.list(SfComponentType.DETECTOR)
print(f"Available detectors: {detectors}")
# Output: ['ema_cross', 'macd', 'rsi_threshold', 'sma_cross']

# Check if component exists
if "sma_cross" in registry.list(SfComponentType.DETECTOR):
    detector = registry.create(SfComponentType.DETECTOR, "sma_cross")

# List all component types
from signalflow.core.enums import SfComponentType
for component_type in SfComponentType:
    components = registry.list(component_type)
    print(f"{component_type.value}: {components}")
Source code in src/signalflow/core/registry.py
def list(self, component_type: SfComponentType) -> list[str]:
    """List registered components for a type.

    Returns sorted list of component names for given type.

    Args:
        component_type (SfComponentType): Type of components to list.

    Returns:
        list[str]: Sorted list of registered component names.

    Example:
        ```python
        # List all detectors
        detectors = registry.list(SfComponentType.DETECTOR)
        print(f"Available detectors: {detectors}")
        # Output: ['ema_cross', 'macd', 'rsi_threshold', 'sma_cross']

        # Check if component exists
        if "sma_cross" in registry.list(SfComponentType.DETECTOR):
            detector = registry.create(SfComponentType.DETECTOR, "sma_cross")

        # List all component types
        from signalflow.core.enums import SfComponentType
        for component_type in SfComponentType:
            components = registry.list(component_type)
            print(f"{component_type.value}: {components}")
        ```
    """
    self._ensure(component_type)
    return sorted(self._items[component_type])

register

register(component_type: SfComponentType, name: str, cls: Type[Any], *, override: bool = False) -> None

Register a class under (component_type, name).

Stores class in registry for later lookup and instantiation. Names are normalized to lowercase for case-insensitive lookup.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of component (DETECTOR, EXTRACTOR, etc.).

required
name str

Registry name (case-insensitive, will be lowercased).

required
cls Type[Any]

Class to register.

required
override bool

Allow overriding existing registration. Default: False.

False

Raises:

Type Description
ValueError

If name is empty or already registered (when override=False).

Example
# Register new component
registry.register(
    SfComponentType.DETECTOR,
    name="my_detector",
    cls=MyDetector
)

# Override existing component
registry.register(
    SfComponentType.DETECTOR,
    name="my_detector",
    cls=ImprovedDetector,
    override=True  # Logs warning
)

# Register multiple types
registry.register(SfComponentType.EXTRACTOR, "rsi", RsiExtractor)
registry.register(SfComponentType.LABELER, "fixed", FixedHorizonLabeler)
Source code in src/signalflow/core/registry.py
def register(self, component_type: SfComponentType, name: str, cls: Type[Any], *, override: bool = False) -> None:
    """Register a class under (component_type, name).

    Stores class in registry for later lookup and instantiation.
    Names are normalized to lowercase for case-insensitive lookup.

    Args:
        component_type (SfComponentType): Type of component (DETECTOR, EXTRACTOR, etc.).
        name (str): Registry name (case-insensitive, will be lowercased).
        cls (Type[Any]): Class to register.
        override (bool): Allow overriding existing registration. Default: False.

    Raises:
        ValueError: If name is empty or already registered (when override=False).

    Example:
        ```python
        # Register new component
        registry.register(
            SfComponentType.DETECTOR,
            name="my_detector",
            cls=MyDetector
        )

        # Override existing component
        registry.register(
            SfComponentType.DETECTOR,
            name="my_detector",
            cls=ImprovedDetector,
            override=True  # Logs warning
        )

        # Register multiple types
        registry.register(SfComponentType.EXTRACTOR, "rsi", RsiExtractor)
        registry.register(SfComponentType.LABELER, "fixed", FixedHorizonLabeler)
        ```
    """
    if not isinstance(name, str) or not name.strip():
        raise ValueError("name must be a non-empty string")

    key = name.strip().lower()
    self._ensure(component_type)

    if key in self._items[component_type] and not override:
        raise ValueError(f"{component_type.value}:{key} already registered")

    if key in self._items[component_type] and override:
        logger.warning(f"Overriding {component_type.value}:{key} with {cls.__name__}")

    self._items[component_type][key] = cls

snapshot

snapshot() -> dict[str, list[str]]

Snapshot of registry for debugging.

Returns complete registry state organized by component type.

Returns:

Type Description
dict[str, list[str]]

dict[str, list[str]]: Dictionary mapping component type names to sorted lists of registered component names.

Example
# Get full registry snapshot
snapshot = registry.snapshot()
print(snapshot)
# Output:
# {
#     'DETECTOR': ['ema_cross', 'sma_cross'],
#     'EXTRACTOR': ['rsi', 'sma'],
#     'LABELER': ['fixed', 'triple_barrier'],
#     'ENTRY_RULE': ['fixed_size'],
#     'EXIT_RULE': ['take_profit', 'time_based']
# }

# Use for debugging
import json
print(json.dumps(registry.snapshot(), indent=2))

# Check registration status
snapshot = registry.snapshot()
if 'DETECTOR' in snapshot and 'sma_cross' in snapshot['DETECTOR']:
    print("SMA detector is registered")
Source code in src/signalflow/core/registry.py
def snapshot(self) -> dict[str, list[str]]:
    """Snapshot of registry for debugging.

    Returns complete registry state organized by component type.

    Returns:
        dict[str, list[str]]: Dictionary mapping component type names 
            to sorted lists of registered component names.

    Example:
        ```python
        # Get full registry snapshot
        snapshot = registry.snapshot()
        print(snapshot)
        # Output:
        # {
        #     'DETECTOR': ['ema_cross', 'sma_cross'],
        #     'EXTRACTOR': ['rsi', 'sma'],
        #     'LABELER': ['fixed', 'triple_barrier'],
        #     'ENTRY_RULE': ['fixed_size'],
        #     'EXIT_RULE': ['take_profit', 'time_based']
        # }

        # Use for debugging
        import json
        print(json.dumps(registry.snapshot(), indent=2))

        # Check registration status
        snapshot = registry.snapshot()
        if 'DETECTOR' in snapshot and 'sma_cross' in snapshot['DETECTOR']:
            print("SMA detector is registered")
        ```
    """
    return {t.value: sorted(v.keys()) for t, v in self._items.items()}

sf_component

sf_component(*, name: str, override: bool = False)

Register class as SignalFlow component.

Decorator that registers a class in the global component registry, making it discoverable by name for dynamic instantiation.

The decorated class must have a component_type class attribute of type SfComponentType to indicate what kind of component it is (e.g., DETECTOR, EXTRACTOR, LABELER, ENTRY_RULE, EXIT_RULE).

Parameters:

Name Type Description Default
name str

Registry name for the component (case-insensitive).

required
override bool

Allow overriding existing registration. Default: False.

False

Returns:

Name Type Description
Callable

Decorator function that registers and returns the class unchanged.

Raises:

Type Description
ValueError

If class doesn't define component_type attribute.

ValueError

If name already registered and override=False.

Example
from signalflow.core import sf_component
from signalflow.core.enums import SfComponentType
from signalflow.detector import SignalDetector

@sf_component(name="my_detector")
class MyDetector(SignalDetector):
    component_type = SfComponentType.DETECTOR

    def detect(self, df):
        # Detection logic
        return signals

# Later, instantiate by name
from signalflow.core.registry import default_registry

detector_cls = default_registry.get(
    SfComponentType.DETECTOR,
    "my_detector"
)
detector = detector_cls(params={"window": 20})

# Override existing registration
@sf_component(name="my_detector", override=True)
class ImprovedDetector(SignalDetector):
    component_type = SfComponentType.DETECTOR
    # ... improved implementation
Example
# Register multiple component types

@sf_component(name="sma_cross")
class SmaCrossDetector(SignalDetector):
    component_type = SfComponentType.DETECTOR
    # ...

@sf_component(name="rsi")
class RsiExtractor(FeatureExtractor):
    component_type = SfComponentType.EXTRACTOR
    # ...

@sf_component(name="fixed_size")
class FixedSizeEntry(SignalEntryRule):
    component_type = SfComponentType.ENTRY_RULE
    # ...

@sf_component(name="take_profit")
class TakeProfitExit(ExitRule):
    component_type = SfComponentType.EXIT_RULE
    # ...
Note

Component names are case-insensitive for lookup. The class itself is not modified - only registered. Use override=True carefully to avoid accidental overrides.

Source code in src/signalflow/core/decorators.py
def sf_component(*, name: str, override: bool = False):
    """Register class as SignalFlow component.

    Decorator that registers a class in the global component registry,
    making it discoverable by name for dynamic instantiation.

    The decorated class must have a `component_type` class attribute
    of type `SfComponentType` to indicate what kind of component it is
    (e.g., DETECTOR, EXTRACTOR, LABELER, ENTRY_RULE, EXIT_RULE).

    Args:
        name (str): Registry name for the component (case-insensitive).
        override (bool): Allow overriding existing registration. Default: False.

    Returns:
        Callable: Decorator function that registers and returns the class unchanged.

    Raises:
        ValueError: If class doesn't define component_type attribute.
        ValueError: If name already registered and override=False.

    Example:
        ```python
        from signalflow.core import sf_component
        from signalflow.core.enums import SfComponentType
        from signalflow.detector import SignalDetector

        @sf_component(name="my_detector")
        class MyDetector(SignalDetector):
            component_type = SfComponentType.DETECTOR

            def detect(self, df):
                # Detection logic
                return signals

        # Later, instantiate by name
        from signalflow.core.registry import default_registry

        detector_cls = default_registry.get(
            SfComponentType.DETECTOR,
            "my_detector"
        )
        detector = detector_cls(params={"window": 20})

        # Override existing registration
        @sf_component(name="my_detector", override=True)
        class ImprovedDetector(SignalDetector):
            component_type = SfComponentType.DETECTOR
            # ... improved implementation
        ```

    Example:
        ```python
        # Register multiple component types

        @sf_component(name="sma_cross")
        class SmaCrossDetector(SignalDetector):
            component_type = SfComponentType.DETECTOR
            # ...

        @sf_component(name="rsi")
        class RsiExtractor(FeatureExtractor):
            component_type = SfComponentType.EXTRACTOR
            # ...

        @sf_component(name="fixed_size")
        class FixedSizeEntry(SignalEntryRule):
            component_type = SfComponentType.ENTRY_RULE
            # ...

        @sf_component(name="take_profit")
        class TakeProfitExit(ExitRule):
            component_type = SfComponentType.EXIT_RULE
            # ...
        ```

    Note:
        Component names are case-insensitive for lookup.
        The class itself is not modified - only registered.
        Use override=True carefully to avoid accidental overrides.
    """
    def decorator(cls: Type[Any]) -> Type[Any]:
        component_type = getattr(cls, "component_type", None)
        if not isinstance(component_type, SfComponentType):
            raise ValueError(
                f"{cls.__name__} must define class attribute "
                f"'component_type: SfComponentType'"
            )

        default_registry.register(
            component_type,
            name=name,
            cls=cls,
            override=override,
        )
        return cls

    return decorator

signalflow.core.enums

SfComponentType

Bases: str, Enum

Enumeration of SignalFlow component types.

Defines all component types that can be registered in the component registry. Used by sf_component decorator and SignalFlowRegistry for type-safe registration.

Component categories
  • Data: Raw data loading and storage
  • Feature: Feature extraction
  • Signals: Signal detection, transformation, labeling, validation
  • Strategy: Execution, rules, metrics
Values

RAW_DATA_STORE: Raw data storage backends (e.g., DuckDB, Parquet). RAW_DATA_SOURCE: Raw data sources (e.g., Binance API). RAW_DATA_LOADER: Raw data loaders combining source + store. FEATURE_EXTRACTOR: Feature extraction classes (e.g., RSI, SMA). SIGNALS_TRANSFORM: Signal transformation functions. LABELER: Signal labeling strategies (e.g., triple barrier). DETECTOR: Signal detection algorithms (e.g., SMA cross). VALIDATOR: Signal validation models. TORCH_MODULE: PyTorch neural network modules. VALIDATOR_MODEL: Pre-trained validator models. STRATEGY_STORE: Strategy state persistence backends. STRATEGY_RUNNER: Backtest/live runner implementations. STRATEGY_BROKER: Order management and position tracking. STRATEGY_EXECUTOR: Order execution engines (backtest/live). STRATEGY_EXIT_RULE: Position exit rules (e.g., take profit, stop loss). STRATEGY_ENTRY_RULE: Position entry rules (e.g., fixed size). STRATEGY_METRIC: Strategy performance metrics.

Example
from signalflow.core import sf_component
from signalflow.core.enums import SfComponentType
from signalflow.detector import SignalDetector

# Register detector
@sf_component(name="my_detector")
class MyDetector(SignalDetector):
    component_type = SfComponentType.DETECTOR
    # ... implementation

# Register extractor
@sf_component(name="my_feature")
class MyExtractor(FeatureExtractor):
    component_type = SfComponentType.FEATURE_EXTRACTOR
    # ... implementation

# Register exit rule
@sf_component(name="my_exit")
class MyExit(ExitRule):
    component_type = SfComponentType.STRATEGY_EXIT_RULE
    # ... implementation

# Use in registry
from signalflow.core.registry import default_registry

detector = default_registry.create(
    SfComponentType.DETECTOR,
    "my_detector"
)
Note

All registered components must have component_type class attribute. Component types are organized hierarchically (category/subcategory).

DataFrameType

Bases: str, Enum

Supported DataFrame backends.

Specifies which DataFrame library to use for data processing. Used by FeatureExtractor and other components to determine input/output format.

Values

POLARS: Polars DataFrame (faster, modern). PANDAS: Pandas DataFrame (legacy compatibility).

Example
from signalflow.core.enums import DataFrameType
from signalflow.feature import FeatureExtractor

# Polars-based extractor
class MyExtractor(FeatureExtractor):
    df_type = DataFrameType.POLARS

    def extract(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.with_columns(
            pl.col("close").rolling_mean(20).alias("sma_20")
        )

# Pandas-based extractor
class LegacyExtractor(FeatureExtractor):
    df_type = DataFrameType.PANDAS

    def extract(self, df: pd.DataFrame) -> pd.DataFrame:
        df["sma_20"] = df["close"].rolling(20).mean()
        return df

# Use in RawDataView
from signalflow.core import RawDataView

view = RawDataView(raw=raw_data)

# Get data in required format
df_polars = view.get_data("spot", DataFrameType.POLARS)
df_pandas = view.get_data("spot", DataFrameType.PANDAS)
Note

New code should prefer POLARS for better performance. PANDAS supported for backward compatibility and legacy libraries.

RawDataType

Bases: str, Enum

Supported raw data types.

Defines types of market data that can be loaded and processed.

Values

SPOT: Spot trading data (OHLCV).

Example
from signalflow.core.enums import RawDataType

# Load spot data
loader = BinanceLoader(
    pairs=["BTCUSDT", "ETHUSDT"],
    data_type=RawDataType.SPOT
)

raw_data = loader.load(
    datetime_start=datetime(2024, 1, 1),
    datetime_end=datetime(2024, 12, 31)
)

# Access spot data
spot_df = raw_data[RawDataType.SPOT.value]

# Check data type
if raw_data_type == RawDataType.SPOT:
    print("Processing spot data")
Note

Future versions will add: - FUTURES: Futures trading data - PERPETUAL: Perpetual swaps data - LOB: Limit order book data

signalflow.core.registry

default_registry module-attribute

default_registry = SignalFlowRegistry()

Global default registry instance.

Use this singleton for application-wide component registration.

Example
from signalflow.core.registry import default_registry
from signalflow.core.enums import SfComponentType

# Register to default registry
default_registry.register(
    SfComponentType.DETECTOR,
    "my_detector",
    MyDetector
)

# Access from anywhere
detector = default_registry.create(
    SfComponentType.DETECTOR,
    "my_detector"
)

SignalFlowRegistry dataclass

SignalFlowRegistry(_items: Dict[SfComponentType, Dict[str, Type[Any]]] = dict())

Component registry for dynamic component discovery and instantiation.

Provides centralized registration and lookup for SignalFlow components. Components are organized by type (DETECTOR, EXTRACTOR, etc.) and accessed by case-insensitive names.

Registry structure

component_type -> name -> class

Supported component types
  • DETECTOR: Signal detection classes
  • EXTRACTOR: Feature extraction classes
  • LABELER: Signal labeling classes
  • ENTRY_RULE: Position entry rules
  • EXIT_RULE: Position exit rules
  • METRIC: Strategy metrics
  • EXECUTOR: Order execution engines

Attributes:

Name Type Description
_items dict[SfComponentType, dict[str, Type[Any]]]

Internal storage mapping component types to name-class pairs.

Example
from signalflow.core.registry import SignalFlowRegistry
from signalflow.core.enums import SfComponentType

# Create registry
registry = SignalFlowRegistry()

# Register component
registry.register(
    SfComponentType.DETECTOR,
    name="sma_cross",
    cls=SmaCrossDetector
)

# Get component class
detector_cls = registry.get(SfComponentType.DETECTOR, "sma_cross")

# Instantiate component
detector = registry.create(
    SfComponentType.DETECTOR,
    "sma_cross",
    fast_window=10,
    slow_window=20
)

# List available components
detectors = registry.list(SfComponentType.DETECTOR)
print(f"Available detectors: {detectors}")

# Full snapshot
snapshot = registry.snapshot()
print(snapshot)
Note

Component names are stored and looked up in lowercase. Use default_registry singleton for application-wide registration.

See Also

sf_component: Decorator for automatic component registration.

create

create(component_type: SfComponentType, name: str, **kwargs: Any) -> Any

Instantiate a component by registry key.

Convenient method that combines get() and instantiation.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of component to create.

required
name str

Component name (case-insensitive).

required
**kwargs Any

Arguments to pass to component constructor.

{}

Returns:

Name Type Description
Any Any

Instantiated component.

Raises:

Type Description
KeyError

If component not found.

TypeError

If kwargs don't match component constructor.

Example
# Create detector with params
detector = registry.create(
    SfComponentType.DETECTOR,
    "sma_cross",
    fast_window=10,
    slow_window=20
)

# Create extractor
extractor = registry.create(
    SfComponentType.EXTRACTOR,
    "rsi",
    window=14
)

# Create with config dict
config = {"window": 20, "threshold": 0.7}
labeler = registry.create(
    SfComponentType.LABELER,
    "fixed",
    **config
)
Source code in src/signalflow/core/registry.py
def create(self, component_type: SfComponentType, name: str, **kwargs: Any) -> Any:
    """Instantiate a component by registry key.

    Convenient method that combines get() and instantiation.

    Args:
        component_type (SfComponentType): Type of component to create.
        name (str): Component name (case-insensitive).
        **kwargs: Arguments to pass to component constructor.

    Returns:
        Any: Instantiated component.

    Raises:
        KeyError: If component not found.
        TypeError: If kwargs don't match component constructor.

    Example:
        ```python
        # Create detector with params
        detector = registry.create(
            SfComponentType.DETECTOR,
            "sma_cross",
            fast_window=10,
            slow_window=20
        )

        # Create extractor
        extractor = registry.create(
            SfComponentType.EXTRACTOR,
            "rsi",
            window=14
        )

        # Create with config dict
        config = {"window": 20, "threshold": 0.7}
        labeler = registry.create(
            SfComponentType.LABELER,
            "fixed",
            **config
        )
        ```
    """
    cls = self.get(component_type, name)
    return cls(**kwargs)

get

get(component_type: SfComponentType, name: str) -> Type[Any]

Get a registered class by key.

Lookup is case-insensitive. Raises helpful error with available components if key not found.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of component to lookup.

required
name str

Component name (case-insensitive).

required

Returns:

Type Description
Type[Any]

Type[Any]: Registered class.

Raises:

Type Description
KeyError

If component not found. Error message includes available components.

Example
# Get component class
detector_cls = registry.get(SfComponentType.DETECTOR, "sma_cross")

# Case-insensitive
detector_cls = registry.get(SfComponentType.DETECTOR, "SMA_Cross")

# Instantiate manually
detector = detector_cls(fast_window=10, slow_window=20)

# Handle missing component
try:
    cls = registry.get(SfComponentType.DETECTOR, "unknown")
except KeyError as e:
    print(f"Component not found: {e}")
    # Shows: "Component not found: DETECTOR:unknown. Available: [sma_cross, ...]"
Source code in src/signalflow/core/registry.py
def get(self, component_type: SfComponentType, name: str) -> Type[Any]:
    """Get a registered class by key.

    Lookup is case-insensitive. Raises helpful error with available
    components if key not found.

    Args:
        component_type (SfComponentType): Type of component to lookup.
        name (str): Component name (case-insensitive).

    Returns:
        Type[Any]: Registered class.

    Raises:
        KeyError: If component not found. Error message includes available components.

    Example:
        ```python
        # Get component class
        detector_cls = registry.get(SfComponentType.DETECTOR, "sma_cross")

        # Case-insensitive
        detector_cls = registry.get(SfComponentType.DETECTOR, "SMA_Cross")

        # Instantiate manually
        detector = detector_cls(fast_window=10, slow_window=20)

        # Handle missing component
        try:
            cls = registry.get(SfComponentType.DETECTOR, "unknown")
        except KeyError as e:
            print(f"Component not found: {e}")
            # Shows: "Component not found: DETECTOR:unknown. Available: [sma_cross, ...]"
        ```
    """
    self._ensure(component_type)
    key = name.lower()
    try:
        return self._items[component_type][key]
    except KeyError as e:
        available = ", ".join(sorted(self._items[component_type]))
        raise KeyError(
            f"Component not found: {component_type.value}:{key}. Available: [{available}]"
        ) from e

list

list(component_type: SfComponentType) -> list[str]

List registered components for a type.

Returns sorted list of component names for given type.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of components to list.

required

Returns:

Type Description
list[str]

list[str]: Sorted list of registered component names.

Example
# List all detectors
detectors = registry.list(SfComponentType.DETECTOR)
print(f"Available detectors: {detectors}")
# Output: ['ema_cross', 'macd', 'rsi_threshold', 'sma_cross']

# Check if component exists
if "sma_cross" in registry.list(SfComponentType.DETECTOR):
    detector = registry.create(SfComponentType.DETECTOR, "sma_cross")

# List all component types
from signalflow.core.enums import SfComponentType
for component_type in SfComponentType:
    components = registry.list(component_type)
    print(f"{component_type.value}: {components}")
Source code in src/signalflow/core/registry.py
def list(self, component_type: SfComponentType) -> list[str]:
    """List registered components for a type.

    Returns sorted list of component names for given type.

    Args:
        component_type (SfComponentType): Type of components to list.

    Returns:
        list[str]: Sorted list of registered component names.

    Example:
        ```python
        # List all detectors
        detectors = registry.list(SfComponentType.DETECTOR)
        print(f"Available detectors: {detectors}")
        # Output: ['ema_cross', 'macd', 'rsi_threshold', 'sma_cross']

        # Check if component exists
        if "sma_cross" in registry.list(SfComponentType.DETECTOR):
            detector = registry.create(SfComponentType.DETECTOR, "sma_cross")

        # List all component types
        from signalflow.core.enums import SfComponentType
        for component_type in SfComponentType:
            components = registry.list(component_type)
            print(f"{component_type.value}: {components}")
        ```
    """
    self._ensure(component_type)
    return sorted(self._items[component_type])

register

register(component_type: SfComponentType, name: str, cls: Type[Any], *, override: bool = False) -> None

Register a class under (component_type, name).

Stores class in registry for later lookup and instantiation. Names are normalized to lowercase for case-insensitive lookup.

Parameters:

Name Type Description Default
component_type SfComponentType

Type of component (DETECTOR, EXTRACTOR, etc.).

required
name str

Registry name (case-insensitive, will be lowercased).

required
cls Type[Any]

Class to register.

required
override bool

Allow overriding existing registration. Default: False.

False

Raises:

Type Description
ValueError

If name is empty or already registered (when override=False).

Example
# Register new component
registry.register(
    SfComponentType.DETECTOR,
    name="my_detector",
    cls=MyDetector
)

# Override existing component
registry.register(
    SfComponentType.DETECTOR,
    name="my_detector",
    cls=ImprovedDetector,
    override=True  # Logs warning
)

# Register multiple types
registry.register(SfComponentType.EXTRACTOR, "rsi", RsiExtractor)
registry.register(SfComponentType.LABELER, "fixed", FixedHorizonLabeler)
Source code in src/signalflow/core/registry.py
def register(self, component_type: SfComponentType, name: str, cls: Type[Any], *, override: bool = False) -> None:
    """Register a class under (component_type, name).

    Stores class in registry for later lookup and instantiation.
    Names are normalized to lowercase for case-insensitive lookup.

    Args:
        component_type (SfComponentType): Type of component (DETECTOR, EXTRACTOR, etc.).
        name (str): Registry name (case-insensitive, will be lowercased).
        cls (Type[Any]): Class to register.
        override (bool): Allow overriding existing registration. Default: False.

    Raises:
        ValueError: If name is empty or already registered (when override=False).

    Example:
        ```python
        # Register new component
        registry.register(
            SfComponentType.DETECTOR,
            name="my_detector",
            cls=MyDetector
        )

        # Override existing component
        registry.register(
            SfComponentType.DETECTOR,
            name="my_detector",
            cls=ImprovedDetector,
            override=True  # Logs warning
        )

        # Register multiple types
        registry.register(SfComponentType.EXTRACTOR, "rsi", RsiExtractor)
        registry.register(SfComponentType.LABELER, "fixed", FixedHorizonLabeler)
        ```
    """
    if not isinstance(name, str) or not name.strip():
        raise ValueError("name must be a non-empty string")

    key = name.strip().lower()
    self._ensure(component_type)

    if key in self._items[component_type] and not override:
        raise ValueError(f"{component_type.value}:{key} already registered")

    if key in self._items[component_type] and override:
        logger.warning(f"Overriding {component_type.value}:{key} with {cls.__name__}")

    self._items[component_type][key] = cls

snapshot

snapshot() -> dict[str, list[str]]

Snapshot of registry for debugging.

Returns complete registry state organized by component type.

Returns:

Type Description
dict[str, list[str]]

dict[str, list[str]]: Dictionary mapping component type names to sorted lists of registered component names.

Example
# Get full registry snapshot
snapshot = registry.snapshot()
print(snapshot)
# Output:
# {
#     'DETECTOR': ['ema_cross', 'sma_cross'],
#     'EXTRACTOR': ['rsi', 'sma'],
#     'LABELER': ['fixed', 'triple_barrier'],
#     'ENTRY_RULE': ['fixed_size'],
#     'EXIT_RULE': ['take_profit', 'time_based']
# }

# Use for debugging
import json
print(json.dumps(registry.snapshot(), indent=2))

# Check registration status
snapshot = registry.snapshot()
if 'DETECTOR' in snapshot and 'sma_cross' in snapshot['DETECTOR']:
    print("SMA detector is registered")
Source code in src/signalflow/core/registry.py
def snapshot(self) -> dict[str, list[str]]:
    """Snapshot of registry for debugging.

    Returns complete registry state organized by component type.

    Returns:
        dict[str, list[str]]: Dictionary mapping component type names 
            to sorted lists of registered component names.

    Example:
        ```python
        # Get full registry snapshot
        snapshot = registry.snapshot()
        print(snapshot)
        # Output:
        # {
        #     'DETECTOR': ['ema_cross', 'sma_cross'],
        #     'EXTRACTOR': ['rsi', 'sma'],
        #     'LABELER': ['fixed', 'triple_barrier'],
        #     'ENTRY_RULE': ['fixed_size'],
        #     'EXIT_RULE': ['take_profit', 'time_based']
        # }

        # Use for debugging
        import json
        print(json.dumps(registry.snapshot(), indent=2))

        # Check registration status
        snapshot = registry.snapshot()
        if 'DETECTOR' in snapshot and 'sma_cross' in snapshot['DETECTOR']:
            print("SMA detector is registered")
        ```
    """
    return {t.value: sorted(v.keys()) for t, v in self._items.items()}