Skip to content

Analytic Module

The analytic module provides metrics for evaluating trading strategies and signals, along with statistical validation tools.


Architecture

flowchart TB
    subgraph Strategy Metrics
        A[StrategyMetric] --> B[Main Metrics]
        A --> C[Extended Metrics]
    end

    subgraph Signal Metrics
        D[SignalMetric] --> E[Classification]
        D --> F[Correlation]
        D --> G[Timing]
    end

    subgraph Visualization
        H[StrategyMainResult]
        I[StrategyDistributionResult]
        J[StrategyEquityResult]
    end

    subgraph Statistical Validation
        K[MonteCarloSimulator]
        L[BootstrapValidator]
        M[StatisticalTestsValidator]
    end

    style A fill:#16a34a,stroke:#22c55e,color:#fff
    style D fill:#2563eb,stroke:#3b82f6,color:#fff
    style H fill:#7c3aed,stroke:#8b5cf6,color:#fff
    style K fill:#ea580c,stroke:#f97316,color:#fff

Strategy Metrics

Strategy metrics compute performance indicators during backtesting. All metrics inherit from StrategyMetric and are registered via semantic decorators (e.g., @sf.metric("name")).

Base Class

signalflow.analytic.base.StrategyMetric dataclass

StrategyMetric()

Bases: ABC

Base class for strategy metrics.

compute

compute(state: StrategyState, prices: dict[str, float], **kwargs: Any) -> dict[str, float]

Compute metric values.

Source code in src/signalflow/analytic/base.py
def compute(self, state: StrategyState, prices: dict[str, float], **kwargs: Any) -> dict[str, float]:
    """Compute metric values."""
    logger.warning("Computing is not implemented for this component")
    return {}

plot

plot(results: dict[str, Any], state: StrategyState | None = None, raw_data: RawData | None = None, **kwargs: Any) -> list[go.Figure] | go.Figure | None

Plot metric values.

Source code in src/signalflow/analytic/base.py
def plot(
    self,
    results: dict[str, Any],
    state: StrategyState | None = None,
    raw_data: RawData | None = None,
    **kwargs: Any,
) -> list[go.Figure] | go.Figure | None:
    """Plot metric values."""
    logger.warning("Plotting is not implemented for this component")
    return None

Main Metrics

Core performance metrics computed during backtest execution.

TotalReturnMetric

signalflow.analytic.strategy.main_strategy_metrics.TotalReturnMetric dataclass

TotalReturnMetric(initial_capital: float = 10000.0)

Bases: StrategyMetric

Computes total return metrics for the portfolio.

DrawdownMetric

signalflow.analytic.strategy.main_strategy_metrics.DrawdownMetric dataclass

DrawdownMetric(_peak_equity: float = 0.0, _max_drawdown: float = 0.0)

WinRateMetric

signalflow.analytic.strategy.main_strategy_metrics.WinRateMetric dataclass

WinRateMetric()

SharpeRatioMetric

signalflow.analytic.strategy.main_strategy_metrics.SharpeRatioMetric dataclass

SharpeRatioMetric(initial_capital: float = 10000.0, window_size: int = 100, risk_free_rate: float = 0.0, _returns_history: list[float] = list())

BalanceAllocationMetric

signalflow.analytic.strategy.main_strategy_metrics.BalanceAllocationMetric dataclass

BalanceAllocationMetric(initial_capital: float = 10000.0)

Extended Metrics

Advanced performance metrics for deeper analysis.

SortinoRatioMetric

Risk-adjusted return using only downside volatility.

signalflow.analytic.strategy.extended_metrics.SortinoRatioMetric dataclass

SortinoRatioMetric(initial_capital: float = 10000.0, window_size: int = 100, risk_free_rate: float = 0.0, target_return: float = 0.0, _returns_history: list[float] = list())

Bases: StrategyMetric

Computes Sortino ratio using only downside volatility.

Parameters:

Parameter Type Default Description
window_size int 100 Rolling window for returns history
risk_free_rate float 0.0 Risk-free rate for ratio calculation
target_return float 0.0 Target return threshold for downside

Output: {"sortino_ratio": float}

CalmarRatioMetric

Return relative to maximum drawdown.

signalflow.analytic.strategy.extended_metrics.CalmarRatioMetric dataclass

CalmarRatioMetric(initial_capital: float = 10000.0, _peak_equity: float = 0.0, _max_drawdown: float = 0.0, _initial_equity: float = 0.0)

Bases: StrategyMetric

Computes Calmar ratio (return / max drawdown).

Output: {"calmar_ratio": float, "annualized_return": float, "max_drawdown_calmar": float}

ProfitFactorMetric

Gross profit divided by gross loss.

signalflow.analytic.strategy.extended_metrics.ProfitFactorMetric dataclass

ProfitFactorMetric()

Bases: StrategyMetric

Computes profit factor (gross profit / gross loss).

Output: {"profit_factor": float, "gross_profit": float, "gross_loss": float}

AverageTradeMetric

Average profit, loss, and trade duration statistics.

signalflow.analytic.strategy.extended_metrics.AverageTradeMetric dataclass

AverageTradeMetric()

Bases: StrategyMetric

Computes average profit, loss, and trade duration.

Output:

Key Description
avg_profit Mean profit from winning trades
avg_loss Mean loss from losing trades
avg_trade Mean PnL across all trades
avg_duration_minutes Mean trade duration
avg_win_duration Mean duration of winning trades
avg_loss_duration Mean duration of losing trades

ExpectancyMetric

Mathematical expectancy of profit per trade.

signalflow.analytic.strategy.extended_metrics.ExpectancyMetric dataclass

ExpectancyMetric()

Bases: StrategyMetric

Computes trade expectancy (win_rate * avg_win - loss_rate * avg_loss).

Formula: expectancy = win_rate * avg_win - loss_rate * avg_loss

Output: {"expectancy": float, "expectancy_ratio": float}

RiskRewardMetric

Risk/reward ratio (average win / average loss).

signalflow.analytic.strategy.extended_metrics.RiskRewardMetric dataclass

RiskRewardMetric()

Bases: StrategyMetric

Computes risk/reward ratio (avg_win / avg_loss).

Output: {"risk_reward_ratio": float, "payoff_ratio": float}

MaxConsecutiveMetric

Tracks consecutive winning and losing streaks.

signalflow.analytic.strategy.extended_metrics.MaxConsecutiveMetric dataclass

MaxConsecutiveMetric(_last_closed_count: int = 0, _current_win_streak: int = 0, _current_loss_streak: int = 0, _max_win_streak: int = 0, _max_loss_streak: int = 0, _last_result_win: bool | None = None)

Bases: StrategyMetric

Tracks maximum consecutive wins and losses.

Output:

Key Description
max_consecutive_wins Maximum winning streak
max_consecutive_losses Maximum losing streak
current_win_streak Current winning streak
current_loss_streak Current losing streak

Usage Example

from signalflow.analytic.strategy import (
    SortinoRatioMetric,
    CalmarRatioMetric,
    ProfitFactorMetric,
    ExpectancyMetric,
)
from signalflow.strategy.runner import BacktestRunner

# Create runner with extended metrics
runner = BacktestRunner(
    strategy_id="my_strategy",
    broker=broker,
    entry_rules=[entry_rule],
    exit_rules=[exit_rule],
    metrics=[
        SortinoRatioMetric(window_size=100, risk_free_rate=0.02),
        CalmarRatioMetric(),
        ProfitFactorMetric(),
        ExpectancyMetric(),
    ],
)

state = runner.run(raw_data, signals)

# Access metric values
print(f"Sortino: {state.metrics.get('sortino_ratio', 0):.2f}")
print(f"Calmar: {state.metrics.get('calmar_ratio', 0):.2f}")
print(f"Profit Factor: {state.metrics.get('profit_factor', 0):.2f}")
print(f"Expectancy: ${state.metrics.get('expectancy', 0):.2f}")

Signal Metrics

Signal metrics analyze the quality and effectiveness of trading signals.

Base Class

signalflow.analytic.base.SignalMetric dataclass

SignalMetric()

Base class for signal metrics computation and visualization.

compute

compute(raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> tuple[dict[str, Any] | None, dict[str, Any]]

Compute metrics from signals.

Returns:

Type Description
tuple[dict[str, Any] | None, dict[str, Any]]

Dictionary with computed metrics

Source code in src/signalflow/analytic/base.py
def compute(
    self,
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
    """Compute metrics from signals.

    Returns:
        Dictionary with computed metrics
    """
    logger.warning("Computing is not implemented for this component")
    return {}, {}

plot

plot(computed_metrics: dict[str, Any] | None, plots_context: dict[str, Any], raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> list[go.Figure] | go.Figure | None

Generate visualization from computed metrics.

Returns:

Type Description
list[Figure] | Figure | None

Single figure or list of figures

Source code in src/signalflow/analytic/base.py
def plot(
    self,
    computed_metrics: dict[str, Any] | None,
    plots_context: dict[str, Any],
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> list[go.Figure] | go.Figure | None:
    """Generate visualization from computed metrics.

    Returns:
        Single figure or list of figures
    """
    logger.warning("Plotting is not implemented for this component")
    return None

SignalClassificationMetric

signalflow.analytic.signals.classification_metrics.SignalClassificationMetric dataclass

SignalClassificationMetric(positive_labels: list = list(), negative_labels: list = list(), chart_height: int = 900, chart_width: int = 1400, roc_n_thresholds: int = 100)

Bases: SignalMetric

Analyze signal classification performance against labels.

Computes standard classification metrics including: - Precision, Recall, F1 Score - Confusion Matrix - ROC Curve and AUC - Signal strength distribution

Requires labels to be provided.

__post_init__

__post_init__() -> None

Set default label mappings if not provided.

Source code in src/signalflow/analytic/signals/classification_metrics.py
def __post_init__(self) -> None:
    """Set default label mappings if not provided."""
    if not self.positive_labels:
        self.positive_labels = ["rise", "up", 1, "positive", "buy"]
    if not self.negative_labels:
        self.negative_labels = ["fall", "down", 0, "negative", "sell"]

compute

compute(raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> tuple[dict[str, Any] | None, dict[str, Any]]

Compute classification metrics.

Source code in src/signalflow/analytic/signals/classification_metrics.py
def compute(
    self,
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
    """Compute classification metrics."""

    if labels is None:
        logger.error("Labels are required for classification metrics")
        return None, {}

    signals_df = signals.value

    signals_with_labels = signals_df.join(labels, on=["timestamp", "pair"], how="inner")

    predictions = signals_with_labels.filter(pl.col("signal") != 0)

    if predictions.height == 0:
        logger.warning("No non-zero signals found for classification")
        return None, {}

    logger.info(f"Found {predictions.height} signal-label pairs for classification")

    y_pred = predictions["signal"].to_numpy()
    y_true_raw = predictions["label"].to_numpy()
    unique_labels = np.unique(y_true_raw)
    logger.info(f"Unique label values: {unique_labels}")
    logger.info(f"Unique prediction values: {np.unique(y_pred)}")

    y_true = self._map_labels_to_binary(y_true_raw)

    y_pred_binary = (y_pred > 0).astype(int)

    logger.info(f"After conversion - Unique y_true: {np.unique(y_true)}, y_pred: {np.unique(y_pred_binary)}")

    if "strength" in predictions.columns:
        strengths = predictions["strength"].to_numpy()
    else:
        strengths = np.abs(y_pred).astype(float)

    if np.std(strengths) < 1e-10:
        logger.warning("All strengths are identical, ROC curve will be degenerate")
        roc_scores = y_pred_binary.astype(float)
    else:
        roc_scores = (strengths - strengths.min()) / (strengths.max() - strengths.min())

    try:
        cm = confusion_matrix(y_true, y_pred_binary)
        if cm.shape == (2, 2):
            tn, fp, fn, tp = cm.ravel()
        else:
            logger.warning(f"Unexpected confusion matrix shape: {cm.shape}")
            tn, fp, fn, tp = 1, 1, 1, 1
    except Exception as e:
        logger.warning(f"Could not compute confusion matrix: {e}, using defaults")
        tn, fp, fn, tp = 1, 1, 1, 1

    precision = precision_score(y_true, y_pred_binary, zero_division=0)
    recall = recall_score(y_true, y_pred_binary, zero_division=0)
    f1 = f1_score(y_true, y_pred_binary, zero_division=0)

    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
    balanced_acc = (sensitivity + specificity) / 2

    positive_rate = np.mean(y_true)

    if len(np.unique(y_true)) > 1:
        try:
            fpr_arr, tpr_arr, thresholds_arr = roc_curve(y_true, roc_scores)
            auc = roc_auc_score(y_true, roc_scores)
        except Exception as e:
            logger.warning(f"Could not compute ROC: {e}")
            fpr_arr, tpr_arr, thresholds_arr = np.array([0, 1]), np.array([0, 1]), np.array([1, 0])
            auc = 0.5
    else:
        fpr_arr, tpr_arr, thresholds_arr = np.array([0, 1]), np.array([0, 1]), np.array([1, 0])
        auc = 0.5
        logger.warning("Only one class present, AUC undefined")

    logloss = np.nan
    try:
        if len(np.unique(y_true)) > 1 and np.std(roc_scores) > 1e-10:
            probs = np.clip(roc_scores, 1e-10, 1 - 1e-10)
            logloss = log_loss(y_true=y_true, y_pred=probs, labels=[0, 1])
    except Exception as e:
        logger.warning(f"Could not compute log loss: {e}")

    strength_mean = float(np.mean(strengths))
    strength_std = float(np.std(strengths)) if len(strengths) > 1 else 0.0
    strength_quartiles = np.percentile(strengths, [25, 50, 75]).tolist() if len(strengths) > 0 else [0, 0, 0]

    computed_metrics = {
        "quant": {
            "total_signals": int(predictions.height),
            "total_positive_signals": int(tp + fp),
            "total_negative_signals": int(tn + fn),
            "precision": float(precision),
            "recall": float(recall),
            "specificity": float(specificity),
            "sensitivity": float(sensitivity),
            "balanced_accuracy": float(balanced_acc),
            "f1": float(f1),
            "positive_rate": float(positive_rate),
            "auc": float(auc),
            "log_loss": float(logloss) if not np.isnan(logloss) else None,
            "confusion_matrix": {
                "tn": int(tn),
                "fp": int(fp),
                "fn": int(fn),
                "tp": int(tp),
            },
            "strength_mean": strength_mean,
            "strength_std": strength_std,
        },
        "series": {
            "roc_curve": {
                "tpr": tpr_arr.tolist(),
                "fpr": fpr_arr.tolist(),
                "thresholds": thresholds_arr.tolist(),
            },
            "strength_quartiles": strength_quartiles,
            "strengths_raw": strengths.tolist(),
        },
    }

    plots_context = {
        "total_samples": predictions.height,
        "label_mapping": {
            "positive": self.positive_labels,
            "negative": self.negative_labels,
        },
    }

    logger.info(
        f"Classification metrics computed: "
        f"Precision={precision:.3f}, Recall={recall:.3f}, "
        f"F1={f1:.3f}, AUC={auc:.3f}"
    )

    return computed_metrics, plots_context

plot

plot(computed_metrics: dict[str, Any] | None, plots_context: dict[str, Any], raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> go.Figure

Generate classification metrics visualization.

Source code in src/signalflow/analytic/signals/classification_metrics.py
def plot(
    self,
    computed_metrics: dict[str, Any] | None,
    plots_context: dict[str, Any],
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> go.Figure:
    """Generate classification metrics visualization."""

    if computed_metrics is None:
        logger.error("No metrics available for plotting")
        return None

    fig = self._create_figure()

    self._add_roc_curve(fig, computed_metrics)
    self._add_confusion_matrix(fig, computed_metrics)
    self._add_strength_distribution(fig, computed_metrics)
    self._add_metrics_table(fig, computed_metrics)
    self._update_layout(fig)

    return fig

SignalProfileMetric

signalflow.analytic.signals.profile_metrics.SignalProfileMetric dataclass

SignalProfileMetric(look_ahead: int = 1440, quantiles: tuple[float, float] = (0.25, 0.75), chart_height: int = 900, chart_width: int = 1400)

Bases: SignalMetric

Analyze post-signal price behavior profiles with statistical aggregations.

Computes mean, median, percentile profiles of price changes after signals, including cumulative max/min statistics for understanding typical signal outcomes.

compute

compute(raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> tuple[dict[str, Any] | None, dict[str, Any]]

Calculate performance metrics for signals across all pairs.

Source code in src/signalflow/analytic/signals/profile_metrics.py
def compute(
    self,
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
    """Calculate performance metrics for signals across all pairs."""

    if "spot" in raw_data:
        price_df = raw_data["spot"]
    elif "futures" in raw_data:
        price_df = raw_data["futures"]
    else:
        raise ValueError("No price data found in raw_data")

    signals_df = signals.value

    buy_signals = signals_df.filter(pl.col("signal") == 1)

    if buy_signals.height == 0:
        logger.warning("No buy signals found for profile analysis")
        return None, {}

    post_signal_changes = []
    daily_max_uplifts = []

    pairs = buy_signals["pair"].unique().to_list()
    logger.info(f"Analyzing {buy_signals.height} signals across {len(pairs)} pairs")

    for pair in pairs:
        pair_price = price_df.filter(pl.col("pair") == pair).sort("timestamp")
        pair_signals = buy_signals.filter(pl.col("pair") == pair)

        price_pd = pair_price.to_pandas().set_index("timestamp")

        for signal_row in pair_signals.iter_rows(named=True):
            signal_ts = signal_row["timestamp"]

            try:
                signal_idx = price_pd.index.get_loc(signal_ts)
            except KeyError:
                continue

            if signal_idx + self.look_ahead < len(price_pd):
                signal_price = price_pd.iloc[signal_idx]["close"]
                future_prices = price_pd["close"].iloc[signal_idx : signal_idx + self.look_ahead + 1].values

                relative_changes = (future_prices / signal_price) - 1.0
                post_signal_changes.append(relative_changes)

                max_uplift = relative_changes.max()
                daily_max_uplifts.append(max_uplift)

    if not post_signal_changes:
        logger.warning("No valid signal sequences found with sufficient future data")
        return None, {}

    post_signal_df = pd.DataFrame(post_signal_changes)

    mean_profile = post_signal_df.mean()
    std_profile = post_signal_df.std()
    median_profile = post_signal_df.median()
    lower_quant = post_signal_df.quantile(self.quantiles[0])
    upper_quant = post_signal_df.quantile(self.quantiles[1])

    # Compute cumulative max/min profiles
    cummax_df = post_signal_df.cummax(axis=1)
    cummax_mean = cummax_df.mean()
    cummax_median = cummax_df.median()
    cummax_lower = cummax_df.quantile(self.quantiles[0])
    cummax_upper = cummax_df.quantile(self.quantiles[1])

    cummin_df = post_signal_df.cummin(axis=1)
    cummin_mean = cummin_df.mean()
    cummin_median = cummin_df.median()
    cummin_lower = cummin_df.quantile(self.quantiles[0])
    cummin_upper = cummin_df.quantile(self.quantiles[1])

    signal_counts = post_signal_df.count()

    avg_max_uplift = np.mean(daily_max_uplifts) * 100
    median_max_uplift = np.median(daily_max_uplifts) * 100
    max_mean_val = mean_profile.max()
    max_mean_idx = mean_profile.idxmax()
    max_mean_pct = max_mean_val * 100
    final_mean = mean_profile.iloc[-1] * 100
    final_median = median_profile.iloc[-1] * 100
    n_signals = len(post_signal_changes)

    computed_metrics = {
        "quant": {
            "n_signals": n_signals,
            "final_mean": final_mean,
            "final_median": final_median,
            "avg_max_uplift": avg_max_uplift,
            "median_max_uplift": median_max_uplift,
            "max_mean_val": max_mean_val,
            "max_mean_idx": max_mean_idx,
            "max_mean_pct": max_mean_pct,
        },
        "series": {
            "mean_profile": mean_profile,
            "std_profile": std_profile,
            "median_profile": median_profile,
            "lower_quant": lower_quant,
            "upper_quant": upper_quant,
            "cummax_mean": cummax_mean,
            "cummax_median": cummax_median,
            "cummax_lower": cummax_lower,
            "cummax_upper": cummax_upper,
            "cummin_mean": cummin_mean,
            "cummin_median": cummin_median,
            "cummin_lower": cummin_lower,
            "cummin_upper": cummin_upper,
            "signal_counts": signal_counts,
        },
    }

    plots_context = {
        "pairs_analyzed": len(pairs),
        "total_signals": n_signals,
    }

    logger.info(f"Profile computed: {n_signals} signals, final mean: {final_mean:.2f}%, max: {max_mean_pct:.2f}%")

    return computed_metrics, plots_context

plot

plot(computed_metrics: dict[str, Any] | None, plots_context: dict[str, Any], raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> go.Figure

Generate visualization from computed metrics.

Source code in src/signalflow/analytic/signals/profile_metrics.py
def plot(
    self,
    computed_metrics: dict[str, Any] | None,
    plots_context: dict[str, Any],
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> go.Figure:
    """Generate visualization from computed metrics."""

    if computed_metrics is None:
        logger.error("No metrics available for plotting")
        return None

    fig = self._create_figure()

    self._add_mean_profile(fig, computed_metrics)
    self._add_std_bands(fig, computed_metrics)
    self._add_median_profile(fig, computed_metrics)
    self._add_percentile_bands(fig, computed_metrics)
    self._add_key_timepoints(fig, computed_metrics)
    self._add_max_mean_marker(fig, computed_metrics)

    self._add_cummax_profiles(fig, computed_metrics)
    self._add_cummin_profiles(fig, computed_metrics)
    self._add_cummax_percentiles(fig, computed_metrics)

    self._add_summary_annotation(fig, computed_metrics)
    self._add_profit_target_line(fig)
    self._update_layout(fig, computed_metrics, plots_context)

    return fig

SignalDistributionMetric

signalflow.analytic.signals.distribution_metrics.SignalDistributionMetric dataclass

SignalDistributionMetric(n_bars: int = 10, rolling_window_minutes: int = 60, ma_window_hours: int = 12, chart_height: int = 1200, chart_width: int = 1400)

Bases: SignalMetric

Analyze signal distribution across pairs and time.

compute

compute(raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> tuple[dict[str, Any] | None, dict[str, Any]]

Compute signal distribution metrics.

Source code in src/signalflow/analytic/signals/distribution_metrics.py
def compute(
    self,
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
    """Compute signal distribution metrics."""

    signals_df = signals.value

    signals_per_pair = (
        signals_df.filter(pl.col("signal") != 0)
        .group_by("pair")
        .agg(pl.count().alias("signal_count"))
        .sort("signal_count", descending=True)
    )

    if signals_per_pair.height == 0:
        logger.warning("No non-zero signals found")
        return None, {}

    signal_counts = signals_per_pair["signal_count"].to_numpy()
    min_count = int(signal_counts.min())
    max_count = int(signal_counts.max())
    mean_count = signal_counts.mean()
    median_count = np.median(signal_counts)
    n_pairs = len(signal_counts)

    if n_pairs <= 15:
        grouped_data = []
        for row in signals_per_pair.iter_rows(named=True):
            grouped_data.append(
                {
                    "category": row["pair"],
                    "num_columns": row["signal_count"],
                    "columns_in_group": row["pair"],
                }
            )
        bin_labels = [g["category"] for g in grouped_data]
        use_histogram = False
    else:
        actual_n_bars = min(self.n_bars, max(3, n_pairs // 5))

        if min_count == max_count:
            bin_edges = np.array([min_count - 0.5, max_count + 0.5])
            bin_labels = [f"{min_count}"]
        else:
            bin_edges = np.linspace(min_count, max_count, actual_n_bars + 1)
            bin_labels = []
            for i in range(actual_n_bars):
                lower = int(np.floor(bin_edges[i]))
                upper = int(np.ceil(bin_edges[i + 1]))
                label = f"{lower}" if lower == upper else f"{lower}-{upper}"
                bin_labels.append(label)

        binned = np.digitize(signal_counts, bin_edges[:-1]) - 1
        binned = np.clip(binned, 0, len(bin_labels) - 1)

        grouped_data = []
        for i, label in enumerate(bin_labels):
            mask = binned == i
            pairs_in_bin = signals_per_pair.filter(pl.Series(mask))["pair"].to_list()

            if pairs_in_bin:
                grouped_data.append(
                    {
                        "category": label,
                        "num_columns": len(pairs_in_bin),
                        "columns_in_group": "<br>".join(pairs_in_bin),
                    }
                )
        use_histogram = True

    signals_by_time = (
        signals_df.filter(pl.col("signal") != 0)
        .sort("timestamp")
        .group_by_dynamic("timestamp", every="1m")
        .agg(pl.count().alias("signal_count"))
        .sort("timestamp")
    )

    signals_rolling = signals_by_time.with_columns(
        pl.col("signal_count")
        .rolling_sum(
            window_size=self.rolling_window_minutes,
            min_samples=1,
            center=False,
        )
        .alias("rolling_sum")
    )

    ma_window_minutes = self.ma_window_hours * 60
    if signals_rolling.height > ma_window_minutes:
        signals_rolling = signals_rolling.with_columns(
            pl.col("rolling_sum")
            .rolling_mean(
                window_size=ma_window_minutes,
                min_samples=1,
                center=True,
            )
            .alias("ma")
        )
    else:
        signals_rolling = signals_rolling.with_columns(pl.lit(None).alias("ma"))

    mean_rolling = signals_rolling["rolling_sum"].mean()
    max_rolling = signals_rolling["rolling_sum"].max()

    computed_metrics = {
        "quant": {
            "mean_signals_per_pair": float(mean_count),
            "median_signals_per_pair": float(median_count),
            "min_signals_per_pair": min_count,
            "max_signals_per_pair": max_count,
            "total_pairs": n_pairs,
            "mean_rolling_signals": float(cast(float, mean_rolling)) if mean_rolling else 0.0,
            "max_rolling_signals": int(cast(int, max_rolling)) if max_rolling else 0,
        },
        "series": {
            "grouped": grouped_data,
            "signals_per_pair": signals_per_pair,
            "signals_rolling": signals_rolling,
        },
    }

    plots_context = {
        "bin_labels": bin_labels,
        "rolling_window": self.rolling_window_minutes,
        "ma_window": self.ma_window_hours,
        "use_histogram": use_histogram,
    }

    logger.info(
        f"Distribution computed: {n_pairs} pairs, "
        f"mean {mean_count:.1f} signals/pair, "
        f"max rolling {max_rolling!s} signals/{self.rolling_window_minutes}min"
    )

    return computed_metrics, plots_context

plot

plot(computed_metrics: dict[str, Any] | None, plots_context: dict[str, Any], raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> go.Figure

Generate distribution visualization.

Source code in src/signalflow/analytic/signals/distribution_metrics.py
def plot(
    self,
    computed_metrics: dict[str, Any] | None,
    plots_context: dict[str, Any],
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> go.Figure:
    """Generate distribution visualization."""

    if computed_metrics is None:
        logger.error("No metrics available for plotting")
        return None

    fig = self._create_figure(plots_context)

    self._add_histogram(fig, computed_metrics, plots_context)
    self._add_sorted_signals(fig, computed_metrics)
    self._add_rolling_signals(fig, computed_metrics, plots_context)
    self._update_layout(fig, plots_context)

    return fig

SignalCorrelationMetric

Analyzes correlation between signal strength and actual returns.

signalflow.analytic.signals.correlation_metrics.SignalCorrelationMetric dataclass

SignalCorrelationMetric(look_ahead_periods: list[int] = (lambda: [15, 60, 240, 1440])(), strength_col: str = 'strength', chart_height: int = 900, chart_width: int = 1400)

Bases: SignalMetric

Analyze correlation between signal strength and actual returns.

Computes Pearson and Spearman correlations for different look-ahead periods, and analyzes returns by signal strength quintiles.

compute

compute(raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> tuple[dict[str, Any] | None, dict[str, Any]]

Compute signal-return correlations.

Source code in src/signalflow/analytic/signals/correlation_metrics.py
def compute(
    self,
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
    """Compute signal-return correlations."""
    if "spot" in raw_data:
        price_df = raw_data["spot"]
    elif "futures" in raw_data:
        price_df = raw_data["futures"]
    else:
        logger.error("No price data found in raw_data")
        return None, {}

    signals_df = signals.value
    active_signals = signals_df.filter(pl.col("signal") != 0)

    if active_signals.height == 0:
        logger.warning("No non-zero signals found for correlation analysis")
        return None, {}

    correlations = {}
    scatter_data = {}

    for period in self.look_ahead_periods:
        strengths, returns = self._calculate_signal_returns(
            signals_df=active_signals,
            price_df=price_df,
            look_ahead=period,
        )

        if len(strengths) > 2:
            corr, p_value = stats.pearsonr(strengths, returns)
            spearman_corr, spearman_p = stats.spearmanr(strengths, returns)

            correlations[f"period_{period}"] = {
                "pearson_corr": float(corr),
                "pearson_p_value": float(p_value),
                "spearman_corr": float(spearman_corr),
                "spearman_p_value": float(spearman_p),
                "n_samples": len(strengths),
            }

            scatter_data[f"period_{period}"] = {
                "strengths": strengths.tolist(),
                "returns": returns.tolist(),
            }

    quintile_returns = self._analyze_quintiles(active_signals, price_df)

    computed_metrics = {
        "quant": {
            "correlations": correlations,
            "quintile_analysis": quintile_returns,
            "total_signals": active_signals.height,
        },
        "series": {
            "scatter_data": scatter_data,
        },
    }

    logger.info(
        f"Correlation computed for {active_signals.height} signals across {len(self.look_ahead_periods)} periods"
    )

    return computed_metrics, {"look_ahead_periods": self.look_ahead_periods}

plot

plot(computed_metrics: dict[str, Any] | None, plots_context: dict[str, Any], raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> go.Figure

Generate correlation visualization.

Source code in src/signalflow/analytic/signals/correlation_metrics.py
def plot(
    self,
    computed_metrics: dict[str, Any] | None,
    plots_context: dict[str, Any],
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> go.Figure:
    """Generate correlation visualization."""
    if computed_metrics is None:
        logger.error("No metrics available for plotting")
        return None

    fig = make_subplots(
        rows=2,
        cols=2,
        subplot_titles=(
            "Correlation by Look-ahead Period",
            "Strength vs Return Scatter",
            "Returns by Strength Quintile",
            "Win Rate by Quintile",
        ),
        vertical_spacing=0.12,
        horizontal_spacing=0.1,
    )

    self._add_correlation_bars(fig, computed_metrics)
    self._add_scatter_plot(fig, computed_metrics, plots_context)
    self._add_quintile_returns(fig, computed_metrics)
    self._add_quintile_winrate(fig, computed_metrics)
    self._update_layout(fig)

    return fig

Parameters:

Parameter Type Default Description
look_ahead_periods list[int] [15, 30, 60, 120] Minutes to look ahead for return calculation
min_samples int 30 Minimum samples for correlation calculation

Output:

Key Description
correlations Pearson/Spearman correlations by period
quintile_analysis Performance by signal strength quintile
total_signals Number of signals analyzed

Visualization: Scatter plots of signal strength vs. returns, quintile performance bars.

SignalTimingMetric

Analyzes optimal hold time after signal entry.

signalflow.analytic.signals.correlation_metrics.SignalTimingMetric dataclass

SignalTimingMetric(max_look_ahead: int = 1440, sample_points: int = 48, chart_height: int = 800, chart_width: int = 1200)

Bases: SignalMetric

Analyze optimal holding period for signals.

Evaluates signal performance at different holding periods to find optimal exit timing based on mean return, Sharpe ratio, or win rate.

compute

compute(raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> tuple[dict[str, Any] | None, dict[str, Any]]

Compute optimal timing metrics.

Source code in src/signalflow/analytic/signals/correlation_metrics.py
def compute(
    self,
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
    """Compute optimal timing metrics."""
    if "spot" in raw_data:
        price_df = raw_data["spot"]
    elif "futures" in raw_data:
        price_df = raw_data["futures"]
    else:
        logger.error("No price data found in raw_data")
        return None, {}

    signals_df = signals.value
    active_signals = signals_df.filter(pl.col("signal") != 0)

    if active_signals.height == 0:
        logger.warning("No non-zero signals found for timing analysis")
        return None, {}

    time_points = np.linspace(1, self.max_look_ahead, self.sample_points).astype(int)

    mean_returns = []
    sharpe_at_time = []
    win_rate_at_time = []
    std_returns = []

    for t in time_points:
        returns = self._get_returns_at_time(active_signals, price_df, t)

        if len(returns) > 0:
            mean_ret = np.mean(returns)
            std_ret = np.std(returns)
            sharpe = mean_ret / std_ret if std_ret > 0 else 0
            win_rate = np.mean(returns > 0)

            mean_returns.append(float(mean_ret))
            std_returns.append(float(std_ret))
            sharpe_at_time.append(float(sharpe))
            win_rate_at_time.append(float(win_rate))
        else:
            mean_returns.append(0.0)
            std_returns.append(0.0)
            sharpe_at_time.append(0.0)
            win_rate_at_time.append(0.0)

    optimal_time_mean = int(time_points[np.argmax(mean_returns)])
    optimal_time_sharpe = int(time_points[np.argmax(sharpe_at_time)])
    optimal_time_winrate = int(time_points[np.argmax(win_rate_at_time)])

    computed_metrics = {
        "quant": {
            "optimal_hold_time_mean": optimal_time_mean,
            "optimal_hold_time_sharpe": optimal_time_sharpe,
            "optimal_hold_time_winrate": optimal_time_winrate,
            "peak_mean_return": float(np.max(mean_returns)) * 100,
            "peak_sharpe": float(np.max(sharpe_at_time)),
            "peak_win_rate": float(np.max(win_rate_at_time)) * 100,
            "total_signals": active_signals.height,
        },
        "series": {
            "time_points": time_points.tolist(),
            "mean_returns": [r * 100 for r in mean_returns],
            "std_returns": [r * 100 for r in std_returns],
            "sharpe_at_time": sharpe_at_time,
            "win_rate_at_time": [r * 100 for r in win_rate_at_time],
        },
    }

    logger.info(
        f"Timing analysis: optimal hold time by mean={optimal_time_mean}min, "
        f"by Sharpe={optimal_time_sharpe}min, peak return={np.max(mean_returns) * 100:.2f}%"
    )

    return computed_metrics, {}

plot

plot(computed_metrics: dict[str, Any] | None, plots_context: dict[str, Any], raw_data: RawData, signals: Signals, labels: DataFrame | None = None) -> go.Figure

Generate timing optimization visualization.

Source code in src/signalflow/analytic/signals/correlation_metrics.py
def plot(
    self,
    computed_metrics: dict[str, Any] | None,
    plots_context: dict[str, Any],
    raw_data: RawData,
    signals: Signals,
    labels: pl.DataFrame | None = None,
) -> go.Figure:
    """Generate timing optimization visualization."""
    if computed_metrics is None:
        logger.error("No metrics available for plotting")
        return None

    fig = make_subplots(
        rows=3,
        cols=1,
        shared_xaxes=True,
        vertical_spacing=0.08,
        subplot_titles=(
            "Mean Return Over Time",
            "Sharpe Ratio Over Time",
            "Win Rate Over Time",
        ),
        row_heights=[0.35, 0.35, 0.30],
    )

    series = computed_metrics["series"]
    quant = computed_metrics["quant"]
    time_points = series["time_points"]

    # Mean return with std band
    fig.add_trace(
        go.Scatter(
            x=time_points,
            y=series["mean_returns"],
            mode="lines",
            name="Mean Return",
            line=dict(color="#2171b5", width=2),
            fill="tozeroy",
            fillcolor="rgba(33, 113, 181, 0.1)",
        ),
        row=1,
        col=1,
    )

    fig.add_trace(
        go.Scatter(
            x=[quant["optimal_hold_time_mean"]],
            y=[quant["peak_mean_return"]],
            mode="markers+text",
            name="Optimal (Mean)",
            marker=dict(color="red", size=12, symbol="star"),
            text=[f"{quant['optimal_hold_time_mean']}min"],
            textposition="top center",
        ),
        row=1,
        col=1,
    )

    # Sharpe ratio
    fig.add_trace(
        go.Scatter(
            x=time_points,
            y=series["sharpe_at_time"],
            mode="lines",
            name="Sharpe Ratio",
            line=dict(color="#31a354", width=2),
        ),
        row=2,
        col=1,
    )

    fig.add_trace(
        go.Scatter(
            x=[quant["optimal_hold_time_sharpe"]],
            y=[quant["peak_sharpe"]],
            mode="markers+text",
            name="Optimal (Sharpe)",
            marker=dict(color="red", size=12, symbol="star"),
            text=[f"{quant['optimal_hold_time_sharpe']}min"],
            textposition="top center",
        ),
        row=2,
        col=1,
    )

    # Win rate
    fig.add_trace(
        go.Scatter(
            x=time_points,
            y=series["win_rate_at_time"],
            mode="lines",
            name="Win Rate",
            line=dict(color="#756bb1", width=2),
        ),
        row=3,
        col=1,
    )

    fig.add_hline(y=50, line_dash="dash", line_color="gray", row=3, col=1)

    fig.add_trace(
        go.Scatter(
            x=[quant["optimal_hold_time_winrate"]],
            y=[quant["peak_win_rate"]],
            mode="markers+text",
            name="Optimal (Win Rate)",
            marker=dict(color="red", size=12, symbol="star"),
            text=[f"{quant['optimal_hold_time_winrate']}min"],
            textposition="top center",
        ),
        row=3,
        col=1,
    )

    fig.update_layout(
        title=dict(
            text=f"<b>Signal Timing Analysis</b><br>"
            f"<sub>Optimal hold: {quant['optimal_hold_time_mean']}min (mean), "
            f"{quant['optimal_hold_time_sharpe']}min (Sharpe)</sub>",
            font=dict(color="#333333", size=18),
            x=0.5,
            xanchor="center",
        ),
        height=self.chart_height,
        width=self.chart_width,
        template="plotly_white",
        showlegend=True,
        hovermode="x unified",
        paper_bgcolor="#fafafa",
        plot_bgcolor="#ffffff",
    )

    fig.update_yaxes(title_text="Return (%)", row=1, col=1)
    fig.update_yaxes(title_text="Sharpe", row=2, col=1)
    fig.update_yaxes(title_text="Win Rate (%)", row=3, col=1)
    fig.update_xaxes(title_text="Hold Time (minutes)", row=3, col=1)

    return fig

Parameters:

Parameter Type Default Description
max_look_ahead int 240 Maximum minutes to analyze
sample_points int 24 Number of time points to sample

Output:

Key Description
optimal_hold_time_mean Time with highest mean return
optimal_hold_time_sharpe Time with highest Sharpe ratio
peak_mean_return Maximum mean return achieved
series Time series data for plotting

Visualization: Line charts showing mean return, Sharpe ratio, and win rate over time.

Signal Metrics Usage

from signalflow.analytic.signals import (
    SignalCorrelationMetric,
    SignalTimingMetric,
    SignalClassificationMetric,
)

# Create metrics
correlation = SignalCorrelationMetric(
    look_ahead_periods=[15, 30, 60],
    min_samples=50,
)
timing = SignalTimingMetric(
    max_look_ahead=120,
    sample_points=12,
)

# Compute metrics
corr_result, corr_ctx = correlation.compute(raw_data, signals)
timing_result, timing_ctx = timing.compute(raw_data, signals)

# Generate visualizations
fig_corr = correlation.plot(corr_result, corr_ctx, raw_data, signals)
fig_timing = timing.plot(timing_result, timing_ctx, raw_data, signals)

# Access quantitative results
print(f"30min correlation: {corr_result['quant']['correlations']['period_30']['pearson_corr']:.3f}")
print(f"Optimal hold time: {timing_result['quant']['optimal_hold_time_mean']} minutes")

Result Visualization

Classes for visualizing backtest results.

StrategyMainResult

Main dashboard with equity curve, trades, and key metrics.

signalflow.analytic.strategy.result_metrics.StrategyMainResult dataclass

StrategyMainResult()

Bases: StrategyMetric

Strategy-level visualization based on results['metrics_df'] (Polars DataFrame).

compute

compute(state: StrategyState, prices: dict[str, float], **kwargs: Any) -> dict[str, float]

Compute metric values.

Source code in src/signalflow/analytic/strategy/result_metrics.py
def compute(
    self,
    state: StrategyState,
    prices: dict[str, float],
    **kwargs: Any,
) -> dict[str, float]:
    """Compute metric values."""
    return {}

StrategyPairResult

Per-pair performance breakdown.

signalflow.analytic.strategy.result_metrics.StrategyPairResult dataclass

StrategyPairResult(pairs: list[str] = list(), price_col: str = 'close', ts_col: str = 'timestamp', pair_col: str = 'pair', trade_id_col: str = 'id', entry_ts_col: str = 'entry_ts', exit_ts_col: str = 'exit_ts', size_col: str = 'size', height: int = 760, template: str = 'plotly_white', hovermode: str = 'x unified')

Bases: StrategyMetric

Pair visualization with price line, entry/exit markers, and net position size.

StrategyDistributionResult

Returns distribution analysis with histogram and QQ plot.

signalflow.analytic.strategy.result_metrics.StrategyDistributionResult dataclass

StrategyDistributionResult()

Bases: StrategyMetric

Returns distribution and monthly heatmap visualization.

Features:

  • Returns histogram with normal distribution overlay
  • QQ plot for normality assessment
  • Monthly returns heatmap (Year × Month)
  • Distribution statistics (skew, kurtosis)

StrategyEquityResult

Equity curve with optional benchmark comparison.

signalflow.analytic.strategy.result_metrics.StrategyEquityResult dataclass

StrategyEquityResult(benchmark_returns: list[float] | None = None, benchmark_name: str = 'Benchmark')

Bases: StrategyMetric

Equity curve with optional benchmark comparison.

Features:

  • Strategy equity curve
  • Optional benchmark overlay
  • Drawdown highlighting
  • Performance statistics panel

Visualization Usage

from signalflow.analytic.strategy import (
    StrategyMainResult,
    StrategyDistributionResult,
    StrategyEquityResult,
)

# After running backtest
state = runner.run(raw_data, signals)

# Main dashboard
main_viz = StrategyMainResult()
fig_main = main_viz.plot(state, raw_data)
fig_main.show()

# Distribution analysis
dist_viz = StrategyDistributionResult()
fig_dist = dist_viz.plot(state, raw_data)
fig_dist.show()

# Equity with benchmark
equity_viz = StrategyEquityResult(benchmark_col="close")
fig_equity = equity_viz.plot(state, raw_data)
fig_equity.show()

Statistical Validation

Tools for validating strategy robustness.

MonteCarloSimulator

signalflow.analytic.stats.MonteCarloSimulator dataclass

MonteCarloSimulator(n_simulations: int = 10000, random_seed: int | None = None, confidence_levels: tuple[float, ...] = (0.05, 0.5, 0.95), ruin_threshold: float = 0.2)

Bases: StatisticalValidator

Monte Carlo simulation via trade shuffling.

Randomizes trade execution order to estimate distribution of outcomes under different trade sequences. This helps assess strategy robustness and estimate risk metrics like probability of ruin.

Attributes:

Name Type Description
n_simulations int

Number of simulations to run (default: 10,000)

random_seed int | None

Random seed for reproducibility (None for random)

confidence_levels tuple[float, ...]

Percentile levels to compute (default: 5%, 50%, 95%)

ruin_threshold float

Max drawdown threshold for risk of ruin (default: 20%)

Example

from signalflow.analytic.stats import MonteCarloSimulator mc = MonteCarloSimulator(n_simulations=10_000, ruin_threshold=0.30) mc_result = mc.validate(backtest_result) print(mc_result.summary()) mc_result.plot()

Note

This simulation shuffles trade order but keeps trade PnLs unchanged. It answers: "What if these same trades occurred in a different order?"

validate

validate(result: BacktestResult) -> MonteCarloResult

Run Monte Carlo simulation on backtest trades.

Parameters:

Name Type Description Default
result BacktestResult

BacktestResult containing trades to simulate

required

Returns:

Type Description
MonteCarloResult

MonteCarloResult with simulation distributions and risk metrics

Raises:

Type Description
ValueError

If no trades available for simulation

Source code in src/signalflow/analytic/stats/monte_carlo.py
def validate(self, result: BacktestResult) -> MonteCarloResult:
    """Run Monte Carlo simulation on backtest trades.

    Args:
        result: BacktestResult containing trades to simulate

    Returns:
        MonteCarloResult with simulation distributions and risk metrics

    Raises:
        ValueError: If no trades available for simulation
    """
    # Extract trade PnLs
    pnls = extract_pnls(result.trades)

    if len(pnls) == 0:
        raise ValueError("No trades available for Monte Carlo simulation")

    initial_capital = result.initial_capital
    if initial_capital <= 0:
        initial_capital = 10_000.0  # Default fallback

    seed = self.random_seed if self.random_seed is not None else 42

    # Run simulation using Numba-accelerated kernel
    (
        final_equities,
        max_drawdowns,
        max_consec_losses,
        longest_dd_durations,
    ) = simulate_equity_curves(
        pnls=pnls,
        initial_capital=initial_capital,
        n_simulations=self.n_simulations,
        seed=seed,
    )

    # Compute percentiles
    equity_percentiles = {p: float(np.percentile(final_equities, p * 100)) for p in self.confidence_levels}
    drawdown_percentiles = {p: float(np.percentile(max_drawdowns, p * 100)) for p in self.confidence_levels}

    # Risk of ruin: probability of hitting ruin threshold
    risk_of_ruin = float(np.mean(max_drawdowns > self.ruin_threshold))

    # Get original metrics from backtest
    original_final_equity = result.final_capital
    original_max_drawdown = result.metrics.get("max_drawdown", 0.0)

    return MonteCarloResult(
        n_simulations=self.n_simulations,
        final_equity_dist=final_equities,
        max_drawdown_dist=max_drawdowns,
        max_consecutive_losses_dist=max_consec_losses,
        longest_drawdown_duration_dist=longest_dd_durations,
        equity_percentiles=equity_percentiles,
        drawdown_percentiles=drawdown_percentiles,
        risk_of_ruin=risk_of_ruin,
        ruin_threshold=self.ruin_threshold,
        expected_max_drawdown=float(np.mean(max_drawdowns)),
        expected_worst_equity=float(np.percentile(final_equities, 5)),
        original_final_equity=original_final_equity,
        original_max_drawdown=original_max_drawdown,
    )

BootstrapValidator

signalflow.analytic.stats.BootstrapValidator dataclass

BootstrapValidator(n_bootstrap: int = 5000, method: Literal['bca', 'percentile', 'block'] = 'bca', block_size: int | None = None, confidence_level: float = 0.95, random_seed: int | None = None, metrics: tuple[str, ...] = ('sharpe_ratio', 'sortino_ratio', 'calmar_ratio', 'profit_factor', 'win_rate'))

Bases: StatisticalValidator

Bootstrap confidence interval estimation with BCa and block support.

Supports: - BCa (bias-corrected accelerated) bootstrap for general metrics - Percentile bootstrap for simple intervals - Block bootstrap for time-series data with autocorrelation

Attributes:

Name Type Description
n_bootstrap int

Number of bootstrap resamples (default: 5,000)

method Literal['bca', 'percentile', 'block']

Bootstrap method ("bca", "percentile", "block")

block_size int | None

Block size for block bootstrap (auto if None)

confidence_level float

Confidence level (default: 0.95)

random_seed int | None

Random seed for reproducibility

metrics tuple[str, ...]

Metrics to compute intervals for

Example

from signalflow.analytic.stats import BootstrapValidator bootstrap = BootstrapValidator( ... n_bootstrap=5000, ... method="bca", ... metrics=("sharpe_ratio", "sortino_ratio", "profit_factor") ... ) result = bootstrap.validate(backtest_result) print(result.intervals["sharpe_ratio"])

validate

validate(result: BacktestResult) -> BootstrapResult

Run bootstrap analysis on backtest result.

Parameters:

Name Type Description Default
result BacktestResult

BacktestResult to analyze

required

Returns:

Type Description
BootstrapResult

BootstrapResult with confidence intervals for each metric

Source code in src/signalflow/analytic/stats/bootstrap.py
def validate(self, result: BacktestResult) -> BootstrapResult:
    """Run bootstrap analysis on backtest result.

    Args:
        result: BacktestResult to analyze

    Returns:
        BootstrapResult with confidence intervals for each metric
    """
    returns = extract_returns(result)
    pnls = extract_pnls(result.trades)

    seed = self.random_seed if self.random_seed is not None else 42

    intervals: dict[str, ConfidenceInterval] = {}
    distributions: dict[str, np.ndarray] = {}

    for metric in self.metrics:
        if metric not in METRIC_FUNCTIONS:
            continue

        # Compute bootstrap distribution
        dist = self._bootstrap_metric(metric, returns, pnls, seed)
        distributions[metric] = dist

        # Compute point estimate
        point_estimate = METRIC_FUNCTIONS[metric](returns, pnls)

        # Compute confidence interval
        if self.method == "bca":
            ci = self._bca_interval(dist, point_estimate, returns, pnls, metric)
        elif self.method == "block":
            ci = self._block_bootstrap_interval(metric, returns, pnls, seed)
        else:  # percentile
            ci = self._percentile_interval(dist, point_estimate, metric)

        intervals[metric] = ci

    return BootstrapResult(
        n_bootstrap=self.n_bootstrap,
        method=self.method,
        intervals=intervals,
        distributions=distributions,
        block_size=self.block_size,
    )

StatisticalTestsValidator

signalflow.analytic.stats.StatisticalTestsValidator dataclass

StatisticalTestsValidator(sr_benchmark: float = 0.0, confidence_level: float = 0.95, annualization_factor: float = np.sqrt(252))

Bases: StatisticalValidator

Statistical significance tests for trading performance.

Implements: - Probabilistic Sharpe Ratio (PSR): P(SR > benchmark | observed data) - Minimum Track Record Length (MinTRL): trades needed for significance

Based on Bailey & Lopez de Prado (2012): "The Sharpe Ratio Efficient Frontier"

Attributes:

Name Type Description
sr_benchmark float

Benchmark Sharpe ratio to compare against (default: 0)

confidence_level float

Required confidence level (default: 0.95)

annualization_factor float

Factor to annualize Sharpe ratio (default: sqrt(252))

Example

from signalflow.analytic.stats import StatisticalTestsValidator tests = StatisticalTestsValidator( ... sr_benchmark=0.5, # Compare against SR of 0.5 ... confidence_level=0.95 ... ) result = tests.validate(backtest_result) print(f"PSR: {result.psr:.2%}") print(f"Min trades needed: {result.min_track_record_length}")

validate

validate(result: BacktestResult) -> StatisticalTestResult

Run statistical significance tests.

Parameters:

Name Type Description Default
result BacktestResult

BacktestResult to analyze

required

Returns:

Type Description
StatisticalTestResult

StatisticalTestResult with PSR and MinTRL values

Source code in src/signalflow/analytic/stats/statistical_tests.py
def validate(self, result: BacktestResult) -> StatisticalTestResult:
    """Run statistical significance tests.

    Args:
        result: BacktestResult to analyze

    Returns:
        StatisticalTestResult with PSR and MinTRL values
    """
    returns = extract_returns(result)
    n = len(returns)

    if n < 2:
        return StatisticalTestResult(
            psr=None,
            psr_benchmark=self.sr_benchmark,
            psr_is_significant=False,
            min_track_record_length=None,
            current_track_record=n,
            track_record_sufficient=False,
        )

    # Compute observed Sharpe ratio
    sr_observed = self._compute_sharpe(returns)

    # Probabilistic Sharpe Ratio
    psr = self._probabilistic_sharpe_ratio(returns, sr_observed)

    # Minimum Track Record Length
    min_trl = self._minimum_track_record_length(returns, sr_observed)

    # Check significance
    psr_significant = psr > self.confidence_level if psr is not None else False
    track_record_sufficient = n >= min_trl if min_trl is not None else False

    return StatisticalTestResult(
        psr=psr,
        psr_benchmark=self.sr_benchmark,
        psr_is_significant=psr_significant,
        min_track_record_length=min_trl,
        current_track_record=n,
        track_record_sufficient=track_record_sufficient,
    )

Convenience Functions

from signalflow.analytic import (
    monte_carlo,
    bootstrap,
    statistical_tests,
    plot_monte_carlo,
    plot_bootstrap,
    plot_validation_summary,
)

# Monte Carlo simulation
mc_result = monte_carlo(returns, n_simulations=1000)
fig_mc = plot_monte_carlo(mc_result)

# Bootstrap analysis
bs_result = bootstrap(returns, n_bootstrap=1000)
fig_bs = plot_bootstrap(bs_result)

# Statistical tests
test_result = statistical_tests(returns)
fig_summary = plot_validation_summary(test_result)

Metrics Summary Table

Strategy Metrics

Metric Output Keys Description
TotalReturnMetric final_return Total portfolio return
DrawdownMetric max_drawdown, current_drawdown Drawdown tracking
WinRateMetric win_rate, total_trades Win percentage
SharpeRatioMetric sharpe_ratio Risk-adjusted return
SortinoRatioMetric sortino_ratio Downside-adjusted return
CalmarRatioMetric calmar_ratio, annualized_return Return / max drawdown
ProfitFactorMetric profit_factor, gross_profit, gross_loss Profit vs loss ratio
AverageTradeMetric avg_profit, avg_loss, avg_trade, avg_duration_minutes Trade statistics
ExpectancyMetric expectancy, expectancy_ratio Expected profit per trade
RiskRewardMetric risk_reward_ratio, payoff_ratio Avg win / avg loss
MaxConsecutiveMetric max_consecutive_wins, max_consecutive_losses Streak tracking

Signal Metrics

Metric Key Features
SignalClassificationMetric Precision, recall, F1-score
SignalProfileMetric Signal distribution by type, pair
SignalDistributionMetric Probability distribution analysis
SignalCorrelationMetric Strength vs. returns correlation
SignalTimingMetric Optimal hold time analysis

See Also