Skip to content

Models Module

signalflow.models is the pinned-inference delivery layer: declarative, versioned references to forecast-model artefacts plus lazy resolution of their weights.

Forecast models are trained elsewhere and arrive in the trading pipeline as versioned, reproducible artefacts. This package keeps the trading pipeline decoupled from training — a reference carries no weights, only enough metadata to resolve the artefact later.

Lazy by design

Importing signalflow.models does not require mlflow. Weights load only on an explicit resolve / get call.


Overview

Component Role
ModelRef Declarative, versioned pointer to an artefact (frozen, hashable).
Resolver (Protocol) Port: turn a ModelRef into a loaded model.
MlflowResolver MLflow-backed Resolver (lazy mlflow import).
ModelRegistry (Protocol) Port: fetch loaded models by ModelRef.
CachingModelRegistry In-process registry that resolves once and caches.
from signalflow.models import (
    ModelRef,
    Resolver,
    MlflowResolver,
    ModelRegistry,
    CachingModelRegistry,
)

ModelRef

A ModelRef is a frozen dataclass: ModelRef(name, version, source="mlflow").

  • name — registered model name (non-empty).
  • versionmandatory. Usually a numeric string/int ("3").
  • source — backing registry, one of {"mlflow", "hf"} (default "mlflow").

Why version is mandatory

A floating version="latest" silently breaks parity and reproducibility between training and live inference, so it is rejected unless the environment variable SF_ALLOW_LATEST=1 is set (dev opt-in only).

Parsing

ModelRef.parse(spec, *, source="mlflow") accepts two compact forms:

from signalflow.models import ModelRef

ModelRef.parse("models:/revert/3")   # -> ModelRef(name='revert', version='3', source='mlflow')
ModelRef.parse("revert@3")           # -> ModelRef(name='revert', version='3', source='mlflow')
ModelRef.parse("revert@3", source="hf")  # at-spec uses the given source
  • models:/<name>/<version> always forces source="mlflow".
  • <name>@<version> uses the source argument.

URI

ModelRef(name="revert", version="3").uri   # "models:/revert/3"

Resolver

Resolver is a runtime_checkable Protocol with one method:

def resolve(self, ref: ModelRef) -> Any: ...

MlflowResolver(tracking_uri=None) is the MLflow-backed implementation. Loading is fully lazy: mlflow is imported only inside resolve, and the underlying loader is isolated in _load so tests can override it without a real MLflow server. If tracking_uri is set it is applied on the first resolve call. resolve raises ValueError if ref.source != "mlflow".

from signalflow.models import ModelRef, MlflowResolver

resolver = MlflowResolver(tracking_uri="http://mlflow:5000")
model = resolver.resolve(ModelRef.parse("models:/revert/3"))   # loads weights here

ModelRegistry

ModelRegistry is the consumer-facing Protocol:

def get(self, ref: ModelRef) -> Any: ...   # resolve lazily if needed
def has(self, ref: ModelRef) -> bool: ...  # already cached?

CachingModelRegistry(resolver) is a simple, lazy, in-process implementation. It holds a Resolver and a cache keyed by ModelRef (frozen → hashable). The first get for a ref triggers resolution; subsequent calls return the cached artefact without re-resolving. has never triggers resolution.

from signalflow.models import ModelRef, MlflowResolver, CachingModelRegistry

registry = CachingModelRegistry(MlflowResolver())
ref = ModelRef.parse("models:/revert/3")

registry.has(ref)   # False
model = registry.get(ref)   # cache miss -> resolves and caches
registry.has(ref)   # True
model is registry.get(ref)  # True (served from cache)

See Also

  • Model Integration guide — registering forecast artefacts in a flow via .forecast() and consuming them with forecasts= / forecast_window=.
  • Feature APIFeatureSpec / ModelFeaturesPipeline and the feature_hash drift detector that guards train↔serve reproducibility.

API Reference

signalflow.models.model_ref.ModelRef dataclass

ModelRef(name: str, version: str | int, source: str = 'mlflow')

A pinned, versioned reference to a model artifact.

Attributes:

Name Type Description
name str

Registered model name (e.g. "revert"). Non-empty.

version str | int

Mandatory version. Usually a numeric string/int ("3"), but may be "latest" only in dev mode (see module docstring).

source str

Backing registry, one of {"mlflow", "hf"}. Default "mlflow".

Construction is cheap and never touches the network or weights — use a :class:~signalflow.models.resolver.Resolver to actually load the model.

Example

ModelRef.parse("models:/revert/3") ModelRef(name='revert', version='3', source='mlflow') ModelRef.parse("revert@3") ModelRef(name='revert', version='3', source='mlflow')

name instance-attribute

name: str

source class-attribute instance-attribute

source: str = 'mlflow'

uri property

uri: str

MLflow-style URI for this ref: models:/<name>/<version>.

version instance-attribute

version: str | int

__post_init__

__post_init__() -> None
Source code in src/signalflow/models/model_ref.py
def __post_init__(self) -> None:
    if not isinstance(self.name, str) or not self.name.strip():
        raise ValueError("ModelRef.name must be a non-empty string")

    version_str = str(self.version).strip()
    if not version_str:
        raise ValueError("ModelRef.version is mandatory and must be non-empty")

    if self.source not in _VALID_SOURCES:
        raise ValueError(f"ModelRef.source must be one of {sorted(_VALID_SOURCES)}, got {self.source!r}")

    if version_str.lower() == "latest" and not _latest_allowed():
        raise ValueError(
            "ModelRef.version='latest' is forbidden: a floating version breaks "
            "parity/reproducibility between training and live inference. "
            f"Pin an explicit version, or set {ALLOW_LATEST_ENV}=1 for dev only."
        )

parse classmethod

parse(spec: str, *, source: str = 'mlflow') -> ModelRef

Parse a ModelRef from a compact string spec.

Supported forms
  • MLflow URI: "models:/<name>/<version>" (forces source="mlflow").
  • At-spec: "<name>@<version>" (uses the source argument).

Parameters:

Name Type Description Default
spec str

The string to parse.

required
source str

Source to use for non-URI specs. Default "mlflow".

'mlflow'

Returns:

Type Description
ModelRef

A validated ModelRef.

Raises:

Type Description
ValueError

If the spec is malformed or violates ModelRef invariants.

Source code in src/signalflow/models/model_ref.py
@classmethod
def parse(cls, spec: str, *, source: str = "mlflow") -> ModelRef:
    """Parse a ModelRef from a compact string spec.

    Supported forms:
        - MLflow URI: ``"models:/<name>/<version>"`` (forces ``source="mlflow"``).
        - At-spec: ``"<name>@<version>"`` (uses the ``source`` argument).

    Args:
        spec: The string to parse.
        source: Source to use for non-URI specs. Default ``"mlflow"``.

    Returns:
        A validated ModelRef.

    Raises:
        ValueError: If the spec is malformed or violates ModelRef invariants.
    """
    if not isinstance(spec, str) or not spec.strip():
        raise ValueError("ModelRef spec must be a non-empty string")

    text = spec.strip()

    if text.startswith("models:/"):
        rest = text[len("models:/") :]
        parts = rest.split("/")
        if len(parts) != 2 or not parts[0] or not parts[1]:
            raise ValueError(f"invalid MLflow model URI {spec!r}; expected 'models:/<name>/<version>'")
        return cls(name=parts[0], version=parts[1], source="mlflow")

    if "@" in text:
        name, _, version = text.partition("@")
        if not name or not version:
            raise ValueError(f"invalid model spec {spec!r}; expected '<name>@<version>'")
        return cls(name=name, version=version, source=source)

    raise ValueError(f"unrecognized model spec {spec!r}; expected 'models:/<name>/<version>' or '<name>@<version>'")

signalflow.models.resolver.Resolver

Bases: Protocol

Port: resolve a ModelRef into a loaded, ready-to-use model object.

resolve

resolve(ref: ModelRef) -> Any

Load and return the artifact referenced by ref.

Source code in src/signalflow/models/resolver.py
def resolve(self, ref: ModelRef) -> Any:
    """Load and return the artifact referenced by ``ref``."""
    ...

signalflow.models.resolver.MlflowResolver

MlflowResolver(tracking_uri: str | None = None)

Resolver backed by the MLflow Model Registry.

Loading is fully lazy: mlflow is imported only when :meth:resolve is called, and the underlying loader is isolated in :meth:_load so tests can override it without a real MLflow server.

Parameters:

Name Type Description Default
tracking_uri str | None

Optional MLflow tracking URI. If set, applied on the first resolve call.

None
Source code in src/signalflow/models/resolver.py
def __init__(self, tracking_uri: str | None = None) -> None:
    self.tracking_uri = tracking_uri

tracking_uri instance-attribute

tracking_uri = tracking_uri

_load

_load(uri: str) -> Any

Load a model from an MLflow models:/ URI (lazy mlflow import).

Overridable for testing. Uses the generic pyfunc loader so any model flavor registered under the URI can be loaded.

Source code in src/signalflow/models/resolver.py
def _load(self, uri: str) -> Any:
    """Load a model from an MLflow ``models:/`` URI (lazy mlflow import).

    Overridable for testing. Uses the generic pyfunc loader so any model
    flavor registered under the URI can be loaded.
    """
    import mlflow.pyfunc  # type: ignore[import-not-found]

    if self.tracking_uri is not None:
        import mlflow  # type: ignore[import-not-found]

        mlflow.set_tracking_uri(self.tracking_uri)
    return mlflow.pyfunc.load_model(uri)

resolve

resolve(ref: ModelRef) -> Any

Resolve ref to a loaded MLflow model.

Builds the URI models:/{ref.name}/{ref.version} and delegates the actual load to :meth:_load.

Raises:

Type Description
ValueError

If ref.source is not "mlflow".

Source code in src/signalflow/models/resolver.py
def resolve(self, ref: ModelRef) -> Any:
    """Resolve ``ref`` to a loaded MLflow model.

    Builds the URI ``models:/{ref.name}/{ref.version}`` and delegates the
    actual load to :meth:`_load`.

    Raises:
        ValueError: If ``ref.source`` is not ``"mlflow"``.
    """
    if ref.source != "mlflow":
        raise ValueError(f"MlflowResolver cannot resolve source={ref.source!r}; expected 'mlflow'")
    uri = ref.uri
    logger.debug(f"MlflowResolver: resolving {uri}")
    return self._load(uri)

signalflow.models.registry.ModelRegistry

Bases: Protocol

Port: fetch loaded models by ModelRef.

get

get(ref: ModelRef) -> Any

Return the loaded model for ref (resolving lazily if needed).

Source code in src/signalflow/models/registry.py
def get(self, ref: ModelRef) -> Any:
    """Return the loaded model for ``ref`` (resolving lazily if needed)."""
    ...

has

has(ref: ModelRef) -> bool

Return True if ref is already loaded/cached.

Source code in src/signalflow/models/registry.py
def has(self, ref: ModelRef) -> bool:
    """Return True if ``ref`` is already loaded/cached."""
    ...

signalflow.models.registry.CachingModelRegistry

CachingModelRegistry(resolver: Resolver)

In-process registry that lazily resolves and caches models by ModelRef.

Holds a :class:Resolver and a cache keyed by ModelRef (which is frozen and hashable). The first :meth:get for a ref triggers resolution; subsequent calls return the cached artifact without re-resolving.

Parameters:

Name Type Description Default
resolver Resolver

The Resolver used to load uncached refs.

required
Source code in src/signalflow/models/registry.py
def __init__(self, resolver: Resolver) -> None:
    self._resolver = resolver
    self._cache: dict[ModelRef, Any] = {}

_cache instance-attribute

_cache: dict[ModelRef, Any] = {}

_resolver instance-attribute

_resolver = resolver

get

get(ref: ModelRef) -> Any

Return the model for ref, resolving and caching on first access.

Source code in src/signalflow/models/registry.py
def get(self, ref: ModelRef) -> Any:
    """Return the model for ``ref``, resolving and caching on first access."""
    if ref in self._cache:
        return self._cache[ref]
    logger.debug(f"CachingModelRegistry: cache miss for {ref.uri}, resolving")
    model = self._resolver.resolve(ref)
    self._cache[ref] = model
    return model

has

has(ref: ModelRef) -> bool

Return True if ref is already cached (no resolution triggered).

Source code in src/signalflow/models/registry.py
def has(self, ref: ModelRef) -> bool:
    """Return True if ``ref`` is already cached (no resolution triggered)."""
    return ref in self._cache