Source code for mlflow.genai.scorers.registry

"""
Registered scorer functionality for MLflow GenAI.

This module provides functions to manage registered scorers that automatically
evaluate traces in MLflow experiments.
"""

import json
import warnings
from abc import ABCMeta, abstractmethod

from mlflow.exceptions import MlflowException
from mlflow.genai.scheduled_scorers import ScorerScheduleConfig
from mlflow.genai.scorers.base import Scorer, ScorerSamplingConfig
from mlflow.tracking._tracking_service.utils import _get_store
from mlflow.tracking.fluent import _get_experiment_id
from mlflow.utils.annotations import experimental
from mlflow.utils.plugins import get_entry_points
from mlflow.utils.uri import get_uri_scheme


class UnsupportedScorerStoreURIException(MlflowException):
    """Exception thrown when building a scorer store with an unsupported URI"""

    def __init__(self, unsupported_uri, supported_uri_schemes):
        message = (
            f"Scorer registration functionality is unavailable; got unsupported URI"
            f" '{unsupported_uri}' for scorer data storage. Supported URI schemes are:"
            f" {supported_uri_schemes}."
        )
        super().__init__(message)
        self.supported_uri_schemes = supported_uri_schemes


class AbstractScorerStore(metaclass=ABCMeta):
    """
    Abstract class defining the interface for scorer store implementations.

    This class defines the API interface for scorer operations that can be implemented
    by different backend stores (e.g., MLflow tracking store, Databricks API).
    """

    @abstractmethod
    def register_scorer(self, experiment_id: str, scorer: Scorer) -> int | None:
        """
        Register a scorer for an experiment.

        Args:
            experiment_id: The experiment ID.
            scorer: The scorer object.

        Returns:
            The registered scorer version. If versioning is not supported, return None.
        """

    @abstractmethod
    def list_scorers(self, experiment_id) -> list["Scorer"]:
        """
        List all scorers for an experiment.

        Args:
            experiment_id: The experiment ID.

        Returns:
            List of mlflow.genai.scorers.Scorer objects (latest version for each scorer name).
        """

    @abstractmethod
    def get_scorer(self, experiment_id, name, version=None) -> "Scorer":
        """
        Get a specific scorer for an experiment.

        Args:
            experiment_id: The experiment ID.
            name: The scorer name.
            version: The scorer version. If None, returns the scorer with maximum version.

        Returns:
            A list of tuple, each tuple contains `mlflow.genai.scorers.Scorer` object.

        Raises:
            MlflowException: If scorer is not found.
        """

    @abstractmethod
    def list_scorer_versions(self, experiment_id, name) -> list[tuple["Scorer", int]]:
        """
        List all versions of a specific scorer for an experiment.

        Args:
            experiment_id: The experiment ID.
            name: The scorer name.

        Returns:
            A list of tuple, each tuple contains `mlflow.genai.scorers.Scorer` object
            and the version number.

        Raises:
            MlflowException: If scorer is not found.
        """

    @abstractmethod
    def delete_scorer(self, experiment_id, name, version):
        """
        Delete a scorer by name and optional version.

        Args:
            experiment_id: The experiment ID.
            name: The scorer name.
            version: The scorer version to delete.

        Raises:
            MlflowException: If scorer is not found.
        """


class ScorerStoreRegistry:
    """
    Scheme-based registry for scorer store implementations.

    This class allows the registration of a function or class to provide an
    implementation for a given scheme of `store_uri` through the `register`
    methods. Implementations declared though the entrypoints
    `mlflow.scorer_store` group can be automatically registered through the
    `register_entrypoints` method.

    When instantiating a store through the `get_store` method, the scheme of
    the store URI provided (or inferred from environment) will be used to
    select which implementation to instantiate, which will be called with same
    arguments passed to the `get_store` method.
    """

    def __init__(self):
        self._registry = {}
        self.group_name = "mlflow.scorer_store"

    def register(self, scheme, store_builder):
        self._registry[scheme] = store_builder

    def register_entrypoints(self):
        """Register scorer stores provided by other packages"""
        for entrypoint in get_entry_points(self.group_name):
            try:
                self.register(entrypoint.name, entrypoint.load())
            except (AttributeError, ImportError) as exc:
                warnings.warn(
                    'Failure attempting to register scorer store for scheme "{}": {}'.format(
                        entrypoint.name, str(exc)
                    ),
                    stacklevel=2,
                )

    def get_store_builder(self, store_uri):
        """Get a store from the registry based on the scheme of store_uri

        Args:
            store_uri: The store URI. If None, it will be inferred from the environment. This
                URI is used to select which scorer store implementation to instantiate
                and is passed to the constructor of the implementation.

        Returns:
            A function that returns an instance of
            ``mlflow.genai.scorers.registry.AbstractScorerStore`` that fulfills the store
            URI requirements.
        """
        scheme = store_uri if store_uri == "databricks" else get_uri_scheme(store_uri)
        try:
            store_builder = self._registry[scheme]
        except KeyError:
            raise UnsupportedScorerStoreURIException(
                unsupported_uri=store_uri, supported_uri_schemes=list(self._registry.keys())
            )
        return store_builder

    def get_store(self, tracking_uri=None):
        from mlflow.tracking._tracking_service import utils

        resolved_store_uri = utils._resolve_tracking_uri(tracking_uri)
        builder = self.get_store_builder(resolved_store_uri)
        return builder(tracking_uri=resolved_store_uri)


class MlflowTrackingStore(AbstractScorerStore):
    """
    MLflow tracking store that provides scorer functionality through the tracking store.
    This store delegates all scorer operations to the underlying tracking store.
    """

    def __init__(self, tracking_uri=None):
        self._tracking_store = _get_store(tracking_uri)

    def register_scorer(self, experiment_id: str, scorer: Scorer) -> int | None:
        serialized_scorer = json.dumps(scorer.model_dump())
        return self._tracking_store.register_scorer(experiment_id, scorer.name, serialized_scorer)

    def list_scorers(self, experiment_id) -> list["Scorer"]:
        from mlflow.genai.scorers import Scorer

        experiment_id = experiment_id or _get_experiment_id()

        # Get ScorerVersion entities from tracking store
        scorer_versions = self._tracking_store.list_scorers(experiment_id)

        # Convert to Scorer objects
        scorers = []
        for scorer_version in scorer_versions:
            scorer = Scorer.model_validate(scorer_version.serialized_scorer)
            scorers.append(scorer)

        return scorers

    def get_scorer(self, experiment_id, name, version=None) -> "Scorer":
        from mlflow.genai.scorers import Scorer

        experiment_id = experiment_id or _get_experiment_id()

        # Get ScorerVersion entity from tracking store
        scorer_version = self._tracking_store.get_scorer(experiment_id, name, version)

        # Convert to Scorer object
        return Scorer.model_validate(scorer_version.serialized_scorer)

    def list_scorer_versions(self, experiment_id, name) -> list[tuple[Scorer, int]]:
        from mlflow.genai.scorers import Scorer

        experiment_id = experiment_id or _get_experiment_id()

        # Get ScorerVersion entities from tracking store
        scorer_versions = self._tracking_store.list_scorer_versions(experiment_id, name)

        # Convert to Scorer objects
        scorers = []
        for scorer_version in scorer_versions:
            scorer = Scorer.model_validate(scorer_version.serialized_scorer)
            version = scorer_version.scorer_version
            scorers.append((scorer, version))

        return scorers

    def delete_scorer(self, experiment_id, name, version):
        if version is None:
            raise MlflowException.invalid_parameter_value(
                "You must set `version` argument to either an integer or 'all'."
            )
        if version == "all":
            version = None

        experiment_id = experiment_id or _get_experiment_id()
        return self._tracking_store.delete_scorer(experiment_id, name, version)


class DatabricksStore(AbstractScorerStore):
    """
    Databricks store that provides scorer functionality through the Databricks API.
    This store delegates all scorer operations to the Databricks agents API.
    """

    def __init__(self, tracking_uri=None):
        pass

    @staticmethod
    def _scheduled_scorer_to_scorer(scheduled_scorer: ScorerScheduleConfig) -> Scorer:
        scorer = scheduled_scorer.scorer
        scorer._sampling_config = ScorerSamplingConfig(
            sample_rate=scheduled_scorer.sample_rate,
            filter_string=scheduled_scorer.filter_string,
        )
        return scorer

    # Private functions for internal use by Scorer methods
    @staticmethod
    def add_registered_scorer(
        *,
        name: str,
        scorer: Scorer,
        sample_rate: float,
        filter_string: str | None = None,
        experiment_id: str | None = None,
    ) -> Scorer:
        """Internal function to add a registered scorer."""
        try:
            from databricks.agents.scorers import add_scheduled_scorer
        except ImportError as e:
            raise ImportError(_ERROR_MSG) from e

        scheduled_scorer = add_scheduled_scorer(
            experiment_id=experiment_id,
            scheduled_scorer_name=name,
            scorer=scorer,
            sample_rate=sample_rate,
            filter_string=filter_string,
        )
        return DatabricksStore._scheduled_scorer_to_scorer(scheduled_scorer)

    @staticmethod
    def list_scheduled_scorers(experiment_id):
        try:
            from databricks.agents.scorers import list_scheduled_scorers
        except ImportError as e:
            raise ImportError(_ERROR_MSG) from e

        return list_scheduled_scorers(experiment_id=experiment_id)

    @staticmethod
    def get_scheduled_scorer(name, experiment_id):
        try:
            from databricks.agents.scorers import get_scheduled_scorer
        except ImportError as e:
            raise ImportError(_ERROR_MSG) from e

        return get_scheduled_scorer(
            scheduled_scorer_name=name,
            experiment_id=experiment_id,
        )

    @staticmethod
    def delete_scheduled_scorer(experiment_id, name):
        try:
            from databricks.agents.scorers import delete_scheduled_scorer
        except ImportError as e:
            raise ImportError(_ERROR_MSG) from e

        delete_scheduled_scorer(
            experiment_id=experiment_id,
            scheduled_scorer_name=name,
        )

    @staticmethod
    def update_registered_scorer(
        *,
        name: str,
        scorer: Scorer | None = None,
        sample_rate: float | None = None,
        filter_string: str | None = None,
        experiment_id: str | None = None,
    ) -> Scorer:
        """Internal function to update a registered scorer."""
        try:
            from databricks.agents.scorers import update_scheduled_scorer
        except ImportError as e:
            raise ImportError(_ERROR_MSG) from e

        scheduled_scorer = update_scheduled_scorer(
            experiment_id=experiment_id,
            scheduled_scorer_name=name,
            scorer=scorer,
            sample_rate=sample_rate,
            filter_string=filter_string,
        )
        return DatabricksStore._scheduled_scorer_to_scorer(scheduled_scorer)

    def register_scorer(self, experiment_id: str, scorer: Scorer) -> int | None:
        # Add the scorer to the server with sample_rate=0 (not actively sampling)
        DatabricksStore.add_registered_scorer(
            name=scorer.name,
            scorer=scorer,
            sample_rate=0.0,
            filter_string=None,
            experiment_id=experiment_id,
        )

        # Set the sampling config on the new instance
        scorer._sampling_config = ScorerSamplingConfig(sample_rate=0.0, filter_string=None)

        return None

    def list_scorers(self, experiment_id) -> list["Scorer"]:
        # Get scheduled scorers from the server
        scheduled_scorers = DatabricksStore.list_scheduled_scorers(experiment_id)

        # Convert to Scorer instances with registration info
        scorers = []
        for scheduled_scorer in scheduled_scorers:
            scorers.append(DatabricksStore._scheduled_scorer_to_scorer(scheduled_scorer))

        return scorers

    def get_scorer(self, experiment_id, name, version=None) -> "Scorer":
        if version is not None:
            raise MlflowException.invalid_parameter_value(
                "Databricks does not support getting a certain version scorer."
            )

        # Get the scheduled scorer from the server
        scheduled_scorer = DatabricksStore.get_scheduled_scorer(name, experiment_id)

        # Extract the scorer and set registration fields
        return DatabricksStore._scheduled_scorer_to_scorer(scheduled_scorer)

    def list_scorer_versions(self, experiment_id, name) -> list[tuple["Scorer", int]]:
        raise MlflowException("Scorer DatabricksStore does not support versioning.")

    def delete_scorer(self, experiment_id, name, version):
        if version is not None:
            raise MlflowException.invalid_parameter_value(
                "Databricks does not support deleting a certain version scorer."
            )

        DatabricksStore.delete_scheduled_scorer(experiment_id, name)


# Create the global scorer store registry instance
_scorer_store_registry = ScorerStoreRegistry()


def _register_scorer_stores():
    """Register the default scorer store implementations"""
    from mlflow.store.db.db_types import DATABASE_ENGINES

    # Register for database schemes (these will use MlflowTrackingStore)
    for scheme in DATABASE_ENGINES + ["http", "https"]:
        _scorer_store_registry.register(scheme, MlflowTrackingStore)

    # Register Databricks store
    _scorer_store_registry.register("databricks", DatabricksStore)

    # Register entrypoints for custom implementations
    _scorer_store_registry.register_entrypoints()


# Register the default stores
_register_scorer_stores()


def _get_scorer_store(tracking_uri=None):
    """Get a scorer store from the registry"""
    return _scorer_store_registry.get_store(tracking_uri)


_ERROR_MSG = (
    "The `databricks-agents` package is required to register scorers. "
    "Please install it with `pip install databricks-agents`."
)


[docs]@experimental(version="3.2.0") def list_scorers(*, experiment_id: str | None = None) -> list[Scorer]: """ List all registered scorers for an experiment. This function returns a list of Scorer instances with their current registration configuration, including sampling rate and filter criteria. Args: experiment_id: The ID of the MLflow experiment containing the scorers. If None, uses the currently active experiment. Returns: A list of Scorer objects with their current registration configuration. Example: .. code-block:: python from mlflow.genai.scorers import list_scorers # List all scorers in the current experiment scorers = list_scorers() # List all scorers in a specific experiment scorers = list_scorers(experiment_id="123") for scorer in scorers: print(f"Name: {scorer.name}") print(f"Sample rate: {scorer.sample_rate}") print(f"Filter: {scorer.filter_string}") """ store = _get_scorer_store() return store.list_scorers(experiment_id)
@experimental(version="3.2.0") def list_scorer_versions(*, name: str, experiment_id: str | None = None) -> list[Scorer, int]: """ List all versions of a specific scorer for an experiment. Args: name: The scorer name. experiment_id: The experiment ID. If None, uses the currently active experiment. Returns: A list of tuple, each tuple contains `mlflow.genai.scorers.Scorer` object and the version number. Raises: MlflowException: If scorer is not found. """ store = _get_scorer_store() return store.list_scorer_versions(experiment_id, name)
[docs]@experimental(version="3.2.0") def get_scorer( *, name: str, experiment_id: str | None = None, version: int | None = None ) -> Scorer: """ Retrieve a specific registered scorer by name. This function returns a Scorer instance with its current registration configuration, including sampling rate and filter criteria. Args: name: The name of the registered scorer to retrieve. experiment_id: The ID of the MLflow experiment containing the scorer. If None, uses the currently active experiment. version: The scorer version. If None, returns the scorer with maximum version. Returns: A Scorer object with its current registration configuration. Example: .. code-block:: python from mlflow.genai.scorers import get_scorer # Get a specific scorer my_scorer = get_scorer(name="my_safety_scorer") print(f"Sample rate: {my_scorer.sample_rate}") print(f"Filter: {my_scorer.filter_string}") # Update the scorer my_scorer = my_scorer.update(sample_rate=0.5) """ store = _get_scorer_store() return store.get_scorer(experiment_id, name, version)
[docs]@experimental(version="3.2.0") def delete_scorer( *, name: str, experiment_id: str | None = None, version: int | str | None = None, ) -> None: """ Delete scorer with given name from the server. This method permanently removes the scorer registration from the MLflow server. After deletion, the scorer will no longer evaluate traces automatically and must be registered again if needed. Args: name: Name of the scorer to delete. experiment_id: The ID of the MLflow experiment containing the scorer. If None, uses the currently active experiment. version: The version to delete. For MLflow tracking backend, set it to 'all' to delete all versions, or set it to an integer value to delete the certain version. For Databricks backend, versioning is not supported, you must set it to `None`. Example: .. code-block:: python from mlflow.genai.scorers import delete_scorer # Delete a scorer from the current experiment delete_scorer(name="my_safety_scorer") # Delete a scorer from a specific experiment delete_scorer(name="my_safety_scorer", experiment_id="123") """ store = _get_scorer_store() return store.delete_scorer(experiment_id, name, version)