Skip to content

Validator Module

signalflow.validator.base.SignalValidator dataclass

SignalValidator(model: Any | None = None, model_type: str | None = None, model_params: dict | None = None, train_params: dict | None = None, tune_enabled: bool = False, tune_params: dict | None = None, feature_columns: list[str] | None = None, pair_col: str = 'pair', ts_col: str = 'timestamp')

Base class for signal validators (meta-labelers).

Validates trading signals by predicting their risk/quality. In De Prado's terminology - this is a meta-labeler.

Note: Filtering to active signals (RISE/FALL only) should be done BEFORE passing data to fit. This keeps the validator simple and gives users full control over data preparation.

Attributes:

Name Type Description
model Any | None

The trained model instance

model_type str | None

String identifier for model type (e.g., "lightgbm", "xgboost")

model_params dict | None

Parameters for model initialization

train_params dict | None

Parameters for training (e.g., early stopping)

tune_enabled bool

Whether hyperparameter tuning is enabled

tune_params dict | None

Parameters for tuning (e.g., n_trials, cv_folds)

feature_columns list[str] | None

List of feature column names (set after fit)

fit

fit(X_train: DataFrame, y_train: DataFrame | Series, X_val: DataFrame | None = None, y_val: DataFrame | Series | None = None) -> SignalValidator

Train the validator model.

Parameters:

Name Type Description Default
X_train DataFrame

Training features (Polars DataFrame)

required
y_train DataFrame | Series

Training labels

required
X_val DataFrame | None

Validation features (optional)

None
y_val DataFrame | Series | None

Validation labels (optional)

None

Returns:

Type Description
SignalValidator

Self for method chaining

Source code in src/signalflow/validator/base.py
def fit(
    self, 
    X_train: pl.DataFrame, 
    y_train: pl.DataFrame | pl.Series,
    X_val: pl.DataFrame | None = None, 
    y_val: pl.DataFrame | pl.Series | None = None,
) -> "SignalValidator":
    """Train the validator model.

    Args:
        X_train: Training features (Polars DataFrame)
        y_train: Training labels
        X_val: Validation features (optional)
        y_val: Validation labels (optional)

    Returns:
        Self for method chaining
    """
    raise NotImplementedError("Subclasses must implement fit()")

load classmethod

load(path: str | Path) -> SignalValidator

Load model from file.

Source code in src/signalflow/validator/base.py
@classmethod
def load(cls, path: str | Path) -> "SignalValidator":
    """Load model from file."""
    raise NotImplementedError("Subclasses must implement load()")

predict

predict(signals: Signals, X: DataFrame) -> Signals

Predict class labels and return updated Signals.

Parameters:

Name Type Description Default
signals Signals

Input signals container

required
X DataFrame

Features (Polars DataFrame) with (pair, timestamp) + feature columns

required

Returns:

Type Description
Signals

New Signals with prediction column added

Source code in src/signalflow/validator/base.py
def predict(self, signals: Signals, X: pl.DataFrame) -> Signals:
    """Predict class labels and return updated Signals.

    Args:
        signals: Input signals container
        X: Features (Polars DataFrame) with (pair, timestamp) + feature columns

    Returns:
        New Signals with prediction column added
    """
    raise NotImplementedError("Subclasses must implement predict()")

predict_proba

predict_proba(signals: Signals, X: DataFrame) -> Signals

Predict class probabilities and return updated Signals.

Parameters:

Name Type Description Default
signals Signals

Input signals container

required
X DataFrame

Features (Polars DataFrame)

required

Returns:

Type Description
Signals

New Signals with probability columns added

Source code in src/signalflow/validator/base.py
def predict_proba(self, signals: Signals, X: pl.DataFrame) -> Signals:
    """Predict class probabilities and return updated Signals.

    Args:
        signals: Input signals container  
        X: Features (Polars DataFrame)

    Returns:
        New Signals with probability columns added
    """
    raise NotImplementedError("Subclasses must implement predict_proba()")

save

save(path: str | Path) -> None

Save model to file.

Source code in src/signalflow/validator/base.py
def save(self, path: str | Path) -> None:
    """Save model to file."""
    raise NotImplementedError("Subclasses must implement save()")

tune

tune(X_train: DataFrame, y_train: DataFrame | Series, X_val: DataFrame | None = None, y_val: DataFrame | Series | None = None) -> dict[str, Any]

Tune hyperparameters.

Returns:

Type Description
dict[str, Any]

Best parameters found

Source code in src/signalflow/validator/base.py
def tune(
    self, 
    X_train: pl.DataFrame, 
    y_train: pl.DataFrame | pl.Series,
    X_val: pl.DataFrame | None = None, 
    y_val: pl.DataFrame | pl.Series | None = None,
) -> dict[str, Any]:
    """Tune hyperparameters.

    Returns:
        Best parameters found
    """
    if not self.tune_enabled:
        raise ValueError("Tuning is not enabled for this validator")
    raise NotImplementedError("Subclasses must implement tune()")

validate_signals

validate_signals(signals: Signals, features: DataFrame, prefix: str = 'probability_') -> Signals

Add validation predictions to signals.

Convenience method - calls predict_proba internally.

Parameters:

Name Type Description Default
signals Signals

Input signals container

required
features DataFrame

Features DataFrame with (pair, timestamp) + feature columns

required
prefix str

Prefix for probability columns

'probability_'

Returns:

Type Description
Signals

Signals with added validation columns

Source code in src/signalflow/validator/base.py
def validate_signals(
    self, 
    signals: Signals, 
    features: pl.DataFrame,
    prefix: str = "probability_",
) -> Signals:
    """Add validation predictions to signals.

    Convenience method - calls predict_proba internally.

    Args:
        signals: Input signals container
        features: Features DataFrame with (pair, timestamp) + feature columns
        prefix: Prefix for probability columns

    Returns:
        Signals with added validation columns
    """
    raise NotImplementedError("Subclasses must implement validate_signals()")

signalflow.validator.sklearn_validator.SklearnSignalValidator dataclass

SklearnSignalValidator(model: Any | None = None, model_type: str | None = None, model_params: dict | None = None, train_params: dict | None = None, tune_enabled: bool = False, tune_params: dict | None = None, feature_columns: list[str] | None = None, pair_col: str = 'pair', ts_col: str = 'timestamp', auto_select_metric: str = 'roc_auc', auto_select_cv_folds: int = 5)

Bases: SignalValidator

Sklearn-based signal validator.

Supports: - Multiple sklearn-compatible models (LightGBM, XGBoost, RF, etc.) - Automatic model selection via cross-validation - Hyperparameter tuning with Optuna - Early stopping for boosting models

Note: Filter data to active signals (not NONE) BEFORE calling fit(). This gives you full control over data preparation.

Example

Prepare data - filter to active signals

df = df.filter(pl.col("signal_type") != "none")

validator = SklearnSignalValidator(model_type="lightgbm") validator.fit( ... train_df.select(["pair", "timestamp"] + feature_cols), ... train_df.select("label"), ... )

validate_signals returns Signals object

validated = validator.validate_signals( ... Signals(test_df.select(signal_cols)), ... test_df.select(["pair", "timestamp"] + feature_cols), ... ) validated.value.filter(pl.col("probability_rise") > 0.7)

fit

fit(X_train: DataFrame, y_train: DataFrame | Series, X_val: DataFrame | None = None, y_val: DataFrame | Series | None = None) -> SklearnSignalValidator

Train the validator.

Note: Filter to active signals BEFORE calling this method.

For boosting models with validation data, early stopping is applied.

Parameters:

Name Type Description Default
X_train DataFrame

Training features (already filtered to active signals)

required
y_train DataFrame | Series

Training labels

required
X_val DataFrame | None

Validation features (optional)

None
y_val DataFrame | Series | None

Validation labels (optional)

None

Returns:

Type Description
SklearnSignalValidator

Self for method chaining

Source code in src/signalflow/validator/sklearn_validator.py
def fit(
    self, 
    X_train: pl.DataFrame, 
    y_train: pl.DataFrame | pl.Series,
    X_val: pl.DataFrame | None = None, 
    y_val: pl.DataFrame | pl.Series | None = None,
) -> "SklearnSignalValidator":
    """Train the validator.

    Note: Filter to active signals BEFORE calling this method.

    For boosting models with validation data, early stopping is applied.

    Args:
        X_train: Training features (already filtered to active signals)
        y_train: Training labels
        X_val: Validation features (optional)
        y_val: Validation labels (optional)

    Returns:
        Self for method chaining
    """
    X_np = self._extract_features(X_train, fit_mode=True)
    y_np = self._extract_labels(y_train)

    if self.model_type == "auto" or self.model_type is None:
        self.model_type, self.model_params = self._auto_select_model(X_np, y_np)

    self.model = self._create_model(self.model_type, self.model_params)

    fit_kwargs: dict[str, Any] = {}

    if X_val is not None and y_val is not None:
        X_val_np = self._extract_features(X_val)
        y_val_np = self._extract_labels(y_val)

        if self.model_type in ("lightgbm", "xgboost"):
            early_stopping = self.train_params.get("early_stopping_rounds", 50)

            if self.model_type == "lightgbm":
                fit_kwargs["eval_set"] = [(X_val_np, y_val_np)]
                fit_kwargs["callbacks"] = [
                    __import__("lightgbm").early_stopping(early_stopping, verbose=False)
                ]
            elif self.model_type == "xgboost":
                fit_kwargs["eval_set"] = [(X_val_np, y_val_np)]
                fit_kwargs["early_stopping_rounds"] = early_stopping
                fit_kwargs["verbose"] = False

    self.model.fit(X_np, y_np, **fit_kwargs)

    return self

load classmethod

load(path: str | Path) -> SklearnSignalValidator

Load validator from file.

Source code in src/signalflow/validator/sklearn_validator.py
@classmethod
def load(cls, path: str | Path) -> "SklearnSignalValidator":
    """Load validator from file."""
    path = Path(path)

    with open(path, "rb") as f:
        state = pickle.load(f)

    validator = cls(
        model=state["model"],
        model_type=state["model_type"],
        model_params=state["model_params"],
        train_params=state["train_params"],
        tune_params=state["tune_params"],
        feature_columns=state["feature_columns"],
        pair_col=state.get("pair_col", "pair"),
        ts_col=state.get("ts_col", "timestamp"),
    )

    return validator

predict

predict(signals: Signals, X: DataFrame) -> Signals

Predict class labels and return updated Signals.

Parameters:

Name Type Description Default
signals Signals

Input signals container

required
X DataFrame

Features DataFrame with (pair, timestamp) + feature columns

required

Returns:

Type Description
Signals

New Signals with 'validation_pred' column added

Source code in src/signalflow/validator/sklearn_validator.py
def predict(self, signals: Signals, X: pl.DataFrame) -> Signals:
    """Predict class labels and return updated Signals.

    Args:
        signals: Input signals container
        X: Features DataFrame with (pair, timestamp) + feature columns

    Returns:
        New Signals with 'validation_pred' column added
    """
    if self.model is None:
        raise ValueError("Model not fitted. Call fit() first.")

    signals_df = signals.value

    # Join features to signals by keys
    X_matched = signals_df.select([self.pair_col, self.ts_col]).join(
        X,
        on=[self.pair_col, self.ts_col],
        how="left",
    )

    X_np = self._extract_features(X_matched)
    predictions = self.model.predict(X_np)

    result_df = signals_df.with_columns(
        pl.Series(name="validation_pred", values=predictions)
    )

    return Signals(result_df)

predict_proba

predict_proba(signals: Signals, X: DataFrame) -> Signals

Predict class probabilities and return updated Signals.

Parameters:

Name Type Description Default
signals Signals

Input signals container

required
X DataFrame

Features DataFrame with (pair, timestamp) + feature columns

required

Returns:

Type Description
Signals

New Signals with probability columns (probability_none, probability_rise, probability_fall)

Source code in src/signalflow/validator/sklearn_validator.py
def predict_proba(self, signals: Signals, X: pl.DataFrame) -> Signals:
    """Predict class probabilities and return updated Signals.

    Args:
        signals: Input signals container
        X: Features DataFrame with (pair, timestamp) + feature columns

    Returns:
        New Signals with probability columns (probability_none, probability_rise, probability_fall)
    """
    if self.model is None:
        raise ValueError("Model not fitted. Call fit() first.")

    signals_df = signals.value
    classes = self._get_class_labels()

    # Join features to signals by keys
    X_matched = signals_df.select([self.pair_col, self.ts_col]).join(
        X,
        on=[self.pair_col, self.ts_col],
        how="left",
    )

    X_np = self._extract_features(X_matched)
    probas = self.model.predict_proba(X_np)

    # Add probability columns
    result_df = signals_df
    for i, class_label in enumerate(classes):
        col_name = f"probability_{class_label}"
        result_df = result_df.with_columns(
            pl.Series(name=col_name, values=probas[:, i])
        )

    return Signals(result_df)

save

save(path: str | Path) -> None

Save validator to file.

Source code in src/signalflow/validator/sklearn_validator.py
def save(self, path: str | Path) -> None:
    """Save validator to file."""
    path = Path(path)

    state = {
        "model": self.model,
        "model_type": self.model_type,
        "model_params": self.model_params,
        "train_params": self.train_params,
        "tune_params": self.tune_params,
        "feature_columns": self.feature_columns,
        "pair_col": self.pair_col,
        "ts_col": self.ts_col,
    }

    with open(path, "wb") as f:
        pickle.dump(state, f)

tune

tune(X_train: DataFrame, y_train: DataFrame | Series, X_val: DataFrame | None = None, y_val: DataFrame | Series | None = None) -> dict[str, Any]

Tune hyperparameters using Optuna.

Note: Filter to active signals BEFORE calling this method.

Returns:

Type Description
dict[str, Any]

Best parameters found

Source code in src/signalflow/validator/sklearn_validator.py
def tune(
    self, 
    X_train: pl.DataFrame, 
    y_train: pl.DataFrame | pl.Series,
    X_val: pl.DataFrame | None = None, 
    y_val: pl.DataFrame | pl.Series | None = None,
) -> dict[str, Any]:
    """Tune hyperparameters using Optuna.

    Note: Filter to active signals BEFORE calling this method.

    Returns:
        Best parameters found
    """
    import optuna
    from sklearn.model_selection import cross_val_score

    if self.model_type is None or self.model_type == "auto":
        raise ValueError("Set model_type before tuning (not 'auto')")

    config = self._get_model_config(self.model_type)
    tune_space = config["tune_space"]

    X_np = self._extract_features(X_train, fit_mode=True)
    y_np = self._extract_labels(y_train)

    n_trials = self.tune_params.get("n_trials", 50)
    cv_folds = self.tune_params.get("cv_folds", 5)
    timeout = self.tune_params.get("timeout", 600)

    def objective(trial: optuna.Trial) -> float:
        params = build_optuna_params(trial, tune_space)
        params.update(config["default_params"])  # Base params

        model = self._create_model(self.model_type, params)

        scores = cross_val_score(
            model, X_np, y_np,
            cv=cv_folds,
            scoring=self.auto_select_metric,
            n_jobs=-1,
        )
        return scores.mean()

    study = optuna.create_study(direction="maximize")
    study.optimize(
        objective, 
        n_trials=n_trials, 
        timeout=timeout,
        show_progress_bar=True,
    )

    best_params = {**config["default_params"], **study.best_params}
    self.model_params = best_params

    return best_params

validate_signals

validate_signals(signals: Signals, features: DataFrame, prefix: str = 'probability_') -> Signals

Add validation probabilities to signals.

Adds probability columns for each class: - probability_none: P(signal is noise / not actionable) - probability_rise: P(signal leads to price rise)
- probability_fall: P(signal leads to price fall)

Parameters:

Name Type Description Default
signals Signals

Input Signals container

required
features DataFrame

Features DataFrame with (pair, timestamp) + features

required
prefix str

Prefix for probability columns (default: "probability_")

'probability_'

Returns:

Type Description
Signals

New Signals with probability columns added.

Example

validated = validator.validate_signals(signals, features) df = validated.value confident_rise = df.filter( ... (pl.col("signal_type") == "rise") & ... (pl.col("probability_rise") > 0.7) ... )

Source code in src/signalflow/validator/sklearn_validator.py
def validate_signals(
    self, 
    signals: Signals, 
    features: pl.DataFrame,
    prefix: str = "probability_",
) -> Signals:
    """Add validation probabilities to signals.

    Adds probability columns for each class:
    - probability_none: P(signal is noise / not actionable)
    - probability_rise: P(signal leads to price rise)  
    - probability_fall: P(signal leads to price fall)

    Args:
        signals: Input Signals container
        features: Features DataFrame with (pair, timestamp) + features
        prefix: Prefix for probability columns (default: "probability_")

    Returns:
        New Signals with probability columns added.

    Example:
        >>> validated = validator.validate_signals(signals, features)
        >>> df = validated.value
        >>> confident_rise = df.filter(
        ...     (pl.col("signal_type") == "rise") & 
        ...     (pl.col("probability_rise") > 0.7)
        ... )
    """
    return self.predict_proba(signals, features)