MLflow Plugins
MLflow's plugin architecture enables seamless integration with third-party tools and custom infrastructure. As a framework-agnostic platform, MLflow provides developer APIs for extending functionality across storage, authentication, execution backends, and model evaluation.
Quick Start
Installing and Using a Plugin
Try the built-in test plugin to see how plugins work:
# Clone MLflow and install example plugin
git clone https://github.com/mlflow/mlflow
cd mlflow
pip install -e tests/resources/mlflow-test-plugin
# Use the plugin with custom tracking URI scheme
MLFLOW_TRACKING_URI=file-plugin:$(PWD)/mlruns python examples/quickstart/mlflow_tracking.py
# Launch MLflow UI to view results
mlflow server --backend-store-uri ./mlruns
Open http://localhost:5000 to see your tracked experiment:
Plugins let you integrate MLflow with your existing infrastructure without modifying core MLflow code, ensuring smooth upgrades and maintenance.
Plugin Types & Use Cases
MLflow supports eight types of plugins, each addressing different integration needs:
Storage & Persistence
Plugin Type | Purpose | Example Use Cases |
---|---|---|
Tracking Store | Custom experiment data storage | Enterprise databases, cloud data warehouses |
Artifact Repository | Custom artifact storage | In-house blob storage, specialized file systems |
Model Registry Store | Custom model registry backend | Enterprise model catalogs, version control systems |
Authentication & Headers
Plugin Type | Purpose | Example Use Cases |
---|---|---|
Request Auth Provider | Custom authentication | OAuth, API keys, certificate-based auth |
Request Header Provider | Custom HTTP headers | Environment identification, compliance headers |
Run Context Provider | Automatic run metadata | Git info, environment details, custom tags |
Execution & Evaluation
Plugin Type | Purpose | Example Use Cases |
---|---|---|
Project Backend | Custom execution environments | Internal clusters, job schedulers, cloud platforms |
Model Evaluator | Custom evaluation metrics | Domain-specific validation, custom test suites |
Deployment | Custom serving platforms | Internal serving infrastructure, edge deployment |
Developing Custom Plugins
Plugin Structure
Create a plugin as a standalone Python package:
# setup.py
from setuptools import setup
setup(
name="my-mlflow-plugin",
version="0.1.0",
install_requires=["mlflow>=2.0.0"],
entry_points={
# Define plugin entry points
"mlflow.tracking_store": "my-scheme=my_plugin.store:MyTrackingStore",
"mlflow.artifact_repository": "my-scheme=my_plugin.artifacts:MyArtifactRepo",
"mlflow.run_context_provider": "unused=my_plugin.context:MyContextProvider",
"mlflow.request_auth_provider": "unused=my_plugin.auth:MyAuthProvider",
"mlflow.model_evaluator": "my-evaluator=my_plugin.evaluator:MyEvaluator",
"mlflow.project_backend": "my-backend=my_plugin.backend:MyBackend",
"mlflow.deployments": "my-target=my_plugin.deployment",
"mlflow.app": "my-app=my_plugin.app:create_app",
},
)
Storage Plugins
- Tracking Store
- Artifact Repository
- Model Registry Store
# my_plugin/store.py
from mlflow.store.tracking.abstract_store import AbstractStore
class MyTrackingStore(AbstractStore):
"""Custom tracking store for scheme 'my-scheme://'"""
def __init__(self, store_uri):
super().__init__()
self.store_uri = store_uri
# Initialize your custom storage backend
def create_experiment(self, name, artifact_location=None, tags=None):
# Implement experiment creation logic
pass
def log_metric(self, run_id, metric):
# Implement metric logging logic
pass
def log_param(self, run_id, param):
# Implement parameter logging logic
pass
# Implement other required AbstractStore methods...
# my_plugin/artifacts.py
from mlflow.store.artifact.artifact_repo import ArtifactRepository
class MyArtifactRepo(ArtifactRepository):
"""Custom artifact repository for scheme 'my-scheme://'"""
def __init__(self, artifact_uri):
super().__init__(artifact_uri)
# Initialize your artifact storage backend
def log_artifact(self, local_file, artifact_path=None):
# Upload file to your storage system
pass
def log_artifacts(self, local_dir, artifact_path=None):
# Upload directory to your storage system
pass
def list_artifacts(self, path=None):
# List artifacts in your storage system
pass
def download_artifacts(self, artifact_path, dst_path=None):
# Download artifacts from your storage system
pass
# my_plugin/registry.py
from mlflow.store.model_registry.abstract_store import AbstractStore
class MyModelRegistryStore(AbstractStore):
"""Custom model registry store for scheme 'my-scheme://'"""
def __init__(self, store_uri):
super().__init__()
self.store_uri = store_uri
# Initialize your model registry backend
def create_registered_model(self, name, tags=None, description=None):
# Implement model registration logic
pass
def create_model_version(
self, name, source, run_id=None, tags=None, run_link=None, description=None
):
# Implement model version creation logic
pass
def get_registered_model(self, name):
# Implement model retrieval logic
pass
# Implement other required AbstractStore methods...
Authentication Plugins
- Request Auth Provider
- Run Context Provider
- Request Header Provider
# my_plugin/auth.py
from mlflow.tracking.request_auth.abstract_request_auth_provider import (
RequestAuthProvider,
)
class MyAuthProvider(RequestAuthProvider):
"""Custom authentication provider"""
def get_name(self):
return "my_auth_provider"
def get_auth(self):
# Return authentication object for HTTP requests
# Can be anything that requests.auth accepts
return MyCustomAuth()
class MyCustomAuth:
"""Custom authentication class"""
def __call__(self, request):
# Add authentication headers to request
token = self._get_token()
request.headers["Authorization"] = f"Bearer {token}"
return request
def _get_token(self):
# Implement token retrieval logic
# E.g., read from file, environment, or API call
pass
Usage:
export MLFLOW_TRACKING_AUTH=my_auth_provider
python your_mlflow_script.py
# my_plugin/context.py
from mlflow.tracking.context.abstract_context import RunContextProvider
class MyContextProvider(RunContextProvider):
"""Automatically add custom tags to runs"""
def in_context(self):
# Return True if this context applies
return True
def tags(self):
# Return dictionary of tags to add to runs
return {
"environment": self._get_environment(),
"team": self._get_team(),
"cost_center": self._get_cost_center(),
}
def _get_environment(self):
# Detect environment (dev/staging/prod)
pass
def _get_team(self):
# Get team from environment or config
pass
def _get_cost_center(self):
# Get cost center for billing
pass
# my_plugin/headers.py
from mlflow.tracking.request_header.abstract_request_header_provider import (
RequestHeaderProvider,
)
class MyHeaderProvider(RequestHeaderProvider):
"""Add custom headers to MLflow requests"""
def in_context(self):
return True
def request_headers(self):
return {
"X-Client-Version": self._get_client_version(),
"X-Environment": self._get_environment(),
"X-User-Agent": "MyOrganization-MLflow-Client",
}
def _get_client_version(self):
# Return your client version
return "1.0.0"
def _get_environment(self):
# Detect environment context
return os.getenv("DEPLOYMENT_ENV", "development")
Execution Plugins
Project Backend Plugin
# my_plugin/backend.py
from mlflow.projects.backend import AbstractBackend
from mlflow.projects.submitted_run import SubmittedRun
class MyBackend(AbstractBackend):
"""Custom execution backend for MLflow Projects"""
def run(
self,
project_uri,
entry_point,
parameters,
version,
backend_config,
tracking_uri,
experiment_id,
):
"""Execute project on custom infrastructure"""
# Parse backend configuration
cluster_config = backend_config.get("cluster_config", {})
# Submit job to your execution system
job_id = self._submit_job(
project_uri=project_uri,
entry_point=entry_point,
parameters=parameters,
cluster_config=cluster_config,
)
# Return SubmittedRun for monitoring
return MySubmittedRun(job_id, tracking_uri)
def _submit_job(self, project_uri, entry_point, parameters, cluster_config):
# Implement job submission to your infrastructure
# Return job ID for monitoring
pass
class MySubmittedRun(SubmittedRun):
"""Handle for submitted run"""
def __init__(self, job_id, tracking_uri):
self.job_id = job_id
self.tracking_uri = tracking_uri
super().__init__()
def wait(self):
# Wait for job completion and return success status
return self._poll_job_status()
def cancel(self):
# Cancel the running job
self._cancel_job()
def get_status(self):
# Get current job status
return self._get_job_status()
Model Evaluation Plugin
# my_plugin/evaluator.py
from mlflow.models.evaluation import ModelEvaluator
from mlflow.models import EvaluationResult
class MyEvaluator(ModelEvaluator):
"""Custom model evaluator"""
def can_evaluate(self, *, model_type, evaluator_config, **kwargs):
"""Check if this evaluator can handle the model type"""
supported_types = ["classifier", "regressor"]
return model_type in supported_types
def evaluate(
self, *, model, model_type, dataset, run_id, evaluator_config, **kwargs
):
"""Perform custom evaluation"""
# Get predictions
predictions = model.predict(dataset.features_data)
# Compute custom metrics
metrics = self._compute_custom_metrics(
predictions, dataset.labels_data, evaluator_config
)
# Generate custom artifacts
artifacts = self._generate_artifacts(predictions, dataset, evaluator_config)
return EvaluationResult(metrics=metrics, artifacts=artifacts)
def _compute_custom_metrics(self, predictions, labels, config):
# Implement domain-specific metrics
return {
"custom_score": self._calculate_custom_score(predictions, labels),
"business_metric": self._calculate_business_metric(predictions, labels),
}
def _generate_artifacts(self, predictions, dataset, config):
# Generate custom plots, reports, etc.
return {}
Popular Community Plugins
- Storage Solutions
- Model Deployment
- Model Evaluation
- Execution Backends
- Enterprise Solutions
SQL Server Plugin
Store artifacts directly in SQL Server databases:
pip install mlflow[sqlserver]
import mlflow
# Use SQL Server as artifact store
db_uri = "mssql+pyodbc://user:pass@host:port/db?driver=ODBC+Driver+17+for+SQL+Server"
mlflow.create_experiment("sql_experiment", artifact_location=db_uri)
with mlflow.start_run():
mlflow.onnx.log_model(model, name="model") # Stored as BLOB in SQL Server
Alibaba Cloud OSS Plugin
Integrate with Aliyun Object Storage Service:
pip install mlflow[aliyun-oss]
import os
import mlflow
# Configure OSS credentials
os.environ["MLFLOW_OSS_ENDPOINT_URL"] = "https://oss-region.aliyuncs.com"
os.environ["MLFLOW_OSS_KEY_ID"] = "your_access_key"
os.environ["MLFLOW_OSS_KEY_SECRET"] = "your_secret_key"
# Use OSS as artifact store
mlflow.create_experiment("oss_experiment", artifact_location="oss://bucket/path")
XetHub Plugin
Use XetHub for versioned artifact storage:
pip install mlflow[xethub]
import mlflow
# Authenticate with XetHub (via CLI or environment variables)
mlflow.create_experiment(
"xet_experiment", artifact_location="xet://username/repo/branch"
)
Elasticsearch Plugin
Use Elasticsearch for experiment tracking:
pip install mlflow-elasticsearchstore
Plugin | Target Platform | Installation |
---|---|---|
mlflow-redisai | RedisAI | pip install mlflow-redisai |
mlflow-torchserve | TorchServe | pip install mlflow-torchserve |
mlflow-ray-serve | Ray Serve | pip install mlflow-ray-serve |
mlflow-azureml | Azure ML | Built-in with Azure ML |
oci-mlflow | Oracle Cloud | pip install oci-mlflow |
Example deployment usage:
import mlflow.deployments
# Deploy to custom target
client = mlflow.deployments.get_deploy_client("redisai")
client.create_deployment(
name="my_model", model_uri="models:/MyModel/1", config={"device": "GPU"}
)
Giskard Plugin
Comprehensive model validation and bias detection:
pip install mlflow-giskard
import mlflow
# Evaluate with Giskard
result = mlflow.evaluate(
model,
eval_data,
evaluators=["giskard"],
evaluator_config={
"giskard": {
"test_suite": "full_suite",
"bias_tests": True,
"performance_tests": True,
}
},
)
Detects vulnerabilities:
- Performance bias
- Ethical bias
- Data leakage
- Overconfidence/Underconfidence
- Spurious correlations