ML Pipeline Operations
MLOps applies DevOps principles to machine learning workflows, enabling reliable, reproducible, and scalable model deployment and monitoring. This guide covers the complete MLOps lifecycle from data ingestion to production model monitoring, with working Kubeflow pipelines, drift detection, and Terraform infrastructure for ML on Kubernetes.
MLOps Lifecycle: Data β Model β Serve β Monitor
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MLOps Lifecycle β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β DATA βββββββΊβ MODEL βββββββΊβ SERVE β β
β β β β β β β β
β β β’ Ingestion β β β’ Training β β β’ Deploy β β
β β β’ Validation β β β’ Evaluation β β β’ A/B Test β β
β β β’ Feature β β β’ Versioning β β β’ Scale β β
β β Engineeringβ β β’ Register β β β’ Endpoint β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β β
β β ββββββββββ΄βββββββββ β β
β β β MONITOR ββββββββββββββ β
β β β β β
β β β β’ Drift Detect β β
β βββββββββββββΊβ β’ Performance β β
β β β’ Data Quality β β
β β β’ Auto-Retrain β β
β βββββββββββββββββββ β
β β
β Feedback Loop: Monitor output triggers retraining pipeline β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ML Pipeline Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML Pipeline Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Kubeflow Pipelines β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β βData β βFeature β βTrain β βEvaluateβ βDeploy β β β
β β βIngest βββΊβEngineerβββΊβModel βββΊβModel βββΊβModel β β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β β β β β β β β
β β βΌ βΌ βΌ βΌ βΌ β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β MLflow Tracking Server β β β
β β β Experiments β’ Metrics β’ Parameters β’ Artifacts β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ β
β β βΌ β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β βFeature Store β β Model β β Monitoring β β β
β β β(Feast/Tecton)β β Registry β β(Evidently/ β β β
β β β β β(MLflow) β β WhyLabs) β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ β
β β βΌ β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β S3 / GCS β β ECR / GCR β β Prometheus β β β
β β β (Data Lake) β β (Model Arti- β β + Grafana β β β
β β β β β facts) β β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Kubernetes (EKS/GKE) β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Training β β Inferenceβ β GPU Node β β CPU Node β β β
β β β Pods β β Service β β Group β β Group β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Feature Stores
Feature stores centralize feature computation and serving, ensuring consistency between training and inference.
| Feature Store | Type | Best For | Key Strengths |
|---|---|---|---|
| Feast (open source) | Open Source | Kubernetes-native; cost-conscious | Great K8s integration; multiple offline stores; active community |
| Tecton | Commercial | Enterprise; real-time features | Real-time transformation; enterprise support; managed |
| SageMaker Feature Store | AWS Native | AWS-only environments | Deep SageMaker integration; offline/online store |
| Vertex AI Feature Store | GCP Native | GCP-only environments | Native GCP integration; streaming features |
# feature_store_example.py β Feast feature store definition
from feast import Entity, Feature, FeatureView, ValueType, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta
# Define an entity (the primary key for features)
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User identifier"
)
# Define a feature view (how features are computed and stored)
user_features = FeatureView(
name="user_transaction_features",
entities=["user_id"],
ttl=timedelta(hours=24),
features=[
Feature(name="total_transactions_30d", dtype=Float32),
Feature(name="avg_transaction_amount", dtype=Float32),
Feature(name="days_since_last_transaction", dtype=Float32),
Feature(name="total_spend_90d", dtype=Float32),
Feature(name="merchant_category_count", dtype=Int64),
],
online=True,
source=FileSource(
path="s3://features-bucket/user_features.parquet",
event_timestamp_column="feature_timestamp"
),
tags={"team": "ml-platform", "version": "v1"}
)
# Feature service (consumable bundle of features for a model)
from feast import FeatureService
fraud_detection_features = FeatureService(
name="fraud_detection_v1",
features=[user_features],
tags={"model": "fraud-detection-xgboost", "version": "1.0.0"}
)
# Retrieve features for inference
from feast import FeatureStore
store = FeatureStore(repo_path=".")
features = store.get_online_features(
features=[
"user_transaction_features:total_transactions_30d",
"user_transaction_features:avg_transaction_amount",
"user_transaction_features:days_since_last_transaction"
],
entity_rows=[{"user_id": 12345}]
).to_dict()
print(f"Retrieved features: {features}")
Model Versioning with MLflow
# mlflow_tracking.py β Model training with MLflow tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import numpy as np
def train_and_register_model(
data_path: str,
experiment_name: str = "fraud-detection",
model_name: str = "fraud-detection-rf"
):
"""Train model with full MLflow tracking and registration."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"{model_name}-training") as run:
# βββ Load Data βββββββββββββββββββββββββββββββββββββββββββ
df = pd.read_parquet(data_path)
X = df.drop(['is_fraud', 'transaction_id'], axis=1)
y = df['is_fraud']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# βββ Log Parameters ββββββββββββββββββββββββββββββββββββββ
params = {
"n_estimators": 200,
"max_depth": 15,
"min_samples_split": 10,
"min_samples_leaf": 4,
"class_weight": "balanced",
"random_state": 42
}
mlflow.log_params(params)
# βββ Train βββββββββββββββββββββββββββββββββββββββββββββββ
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# βββ Evaluate ββββββββββββββββββββββββββββββββββββββββββββ
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred),
"test_set_size": len(X_test),
"feature_count": X.shape[1]
}
mlflow.log_metrics(metrics)
# βββ Log Model with Signature ββββββββββββββββββββββββββββ
signature = mlflow.models.signature.infer_signature(
model_input=X_train.head(5),
model_output=model.predict(X_train.head(5))
)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
registered_model_name=model_name,
conda_env={
"channels": ["defaults", "conda-forge"],
"dependencies": [
"python=3.10",
"scikit-learn=1.3.0",
"pandas=2.0.0",
"numpy=1.24.0",
"pip",
{"pip": ["mlflow==2.8.0"]}
],
"name": "mlflow-env"
}
)
# βββ Log Artifacts βββββββββββββββββββββββββββββββββββββββ
# Feature importance
importance_df = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
importance_df.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
return run.info.run_id, metrics
# Transition model to production stage
def promote_model(model_name: str, version: int, stage: str = "Production"):
"""Promote a registered model to a new stage."""
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=model_name,
version=version,
stage=stage,
archive_existing_versions=True # Archive previous production model
)
print(f"Model {model_name} v{version} promoted to {stage}")
# Example usage:
# run_id, metrics = train_and_register_model("s3://data-bucket/fraud/training.parquet")
# promote_model("fraud-detection-rf", version=3, stage="Production")
Model Deployment Patterns
Batch Inference
# batch_inference.py β Scheduled batch inference pipeline
import mlflow
import pandas as pd
from datetime import datetime
import boto3
def batch_inference(
model_name: str = "fraud-detection-rf",
model_stage: str = "Production",
input_path: str = "s3://data-bucket/fraud/daily_transactions.parquet",
output_path: str = None
):
"""Run batch inference on daily transactions."""
# Load production model from MLflow Registry
model_uri = f"models:/{model_name}/{model_stage}"
model = mlflow.sklearn.load_model(model_uri)
# Load input data
df = pd.read_parquet(input_path)
feature_cols = [c for c in df.columns if c not in ['transaction_id', 'is_fraud']]
# Run inference
df['fraud_probability'] = model.predict_proba(df[feature_cols])[:, 1]
df['fraud_prediction'] = (df['fraud_probability'] > 0.5).astype(int)
df['prediction_timestamp'] = datetime.utcnow().isoformat()
df['model_version'] = model_name
# Write results
if output_path is None:
date_str = datetime.utcnow().strftime("%Y-%m-%d")
output_path = f"s3://data-bucket/fraud/predictions/date={date_str}/"
df.to_parquet(output_path, index=False)
# Publish metrics
print(f"Processed {len(df)} transactions")
print(f"High-risk flagged: {(df['fraud_probability'] > 0.8).sum()}")
print(f"Results written to: {output_path}")
return df
# Kubernetes CronJob for daily batch inference
# batch-inference-cronjob.yaml
"""
apiVersion: batch/v1
kind: CronJob
metadata:
name: fraud-batch-inference
namespace: ml-platform
spec:
schedule: "0 2 * * *" # 2 AM daily
concurrencyPolicy: Forbid
jobTemplate:
spec:
template:
spec:
containers:
- name: inference
image: ml-platform/inference:latest
command:
- python
- -c
- |
from batch_inference import batch_inference
batch_inference(
model_name="fraud-detection-rf",
input_path="s3://data-bucket/fraud/daily/{{ ds }}/transactions.parquet"
)
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow.ml-platform.svc.cluster.local:5000"
- name: AWS_REGION
value: "us-east-1"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
restartPolicy: OnFailure
"""
Real-Time Inference (API Endpoint)
# inference_service.py β FastAPI real-time inference service
import os
import mlflow
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import time
app = FastAPI(title="Fraud Detection API", version="1.0.0")
# Prometheus metrics
PREDICTION_COUNT = Counter(
'model_predictions_total',
'Total predictions',
['model_version', 'prediction_class']
)
PREDICTION_LATENCY = Histogram(
'model_prediction_latency_seconds',
'Prediction latency',
['model_version']
)
MODEL_INFO = Gauge(
'model_info',
'Model information',
['model_name', 'model_version', 'model_stage']
)
# Load model at startup
MODEL_NAME = os.environ.get("MODEL_NAME", "fraud-detection-rf")
MODEL_STAGE = os.environ.get("MODEL_STAGE", "Production")
MLFLOW_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")
mlflow.set_tracking_uri(MLFLOW_URI)
_model = None
_model_version = None
class TransactionFeatures(BaseModel):
amount: float
merchant_category: int
hour_of_day: int
day_of_week: int
days_since_last_transaction: float
avg_transaction_amount_30d: float
transaction_count_30d: int
total_spend_90d: float
merchant_risk_score: float
card_present: int
international: int
class PredictionResponse(BaseModel):
fraud_probability: float
fraud_prediction: int
model_version: str
prediction_time_ms: float
@app.on_event("startup")
def load_model():
global _model, _model_version
model_uri = f"models:/{MODEL_NAME}/{MODEL_STAGE}"
_model = mlflow.sklearn.load_model(model_uri)
client = mlflow.tracking.MlflowClient()
versions = client.get_latest_versions(MODEL_NAME, stages=[MODEL_STAGE])
_model_version = versions[0].version if versions else "unknown"
MODEL_INFO.labels(
model_name=MODEL_NAME,
model_version=_model_version,
model_stage=MODEL_STAGE
).set(1)
print(f"Loaded model {MODEL_NAME} v{_model_version} from stage {MODEL_STAGE}")
@app.get("/health")
def health():
return {
"status": "healthy",
"model": MODEL_NAME,
"version": _model_version,
"stage": MODEL_STAGE
}
@app.post("/predict", response_model=PredictionResponse)
def predict(features: TransactionFeatures):
if _model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
# Convert to DataFrame (preserves feature names)
df = pd.DataFrame([features.dict()])
# Predict
probability = float(_model.predict_proba(df)[0][1])
prediction = int(probability > 0.5)
latency = time.time() - start
# Record metrics
PREDICTION_COUNT.labels(
model_version=_model_version,
prediction_class="fraud" if prediction == 1 else "legitimate"
).inc()
PREDICTION_LATENCY.labels(model_version=_model_version).observe(latency)
return PredictionResponse(
fraud_probability=probability,
fraud_prediction=prediction,
model_version=_model_version,
prediction_time_ms=latency * 1000
)
@app.post("/predict/batch")
def predict_batch(features: List[TransactionFeatures]):
if _model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
df = pd.DataFrame([f.dict() for f in features])
probabilities = _model.predict_proba(df)[:, 1].tolist()
predictions = [int(p > 0.5) for p in probabilities]
return {
"predictions": predictions,
"probabilities": probabilities,
"count": len(features),
"model_version": _model_version
}
# Dockerfile
"""
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY inference_service.py .
EXPOSE 8000
CMD ["uvicorn", "inference_service:app", "--host", "0.0.0.0", "--port", "8000"]
"""
Edge Deployment
# Convert sklearn model to ONNX for edge deployment
import skl2onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
def convert_to_onnx(sklearn_model, feature_names, output_path):
"""Convert scikit-learn model to ONNX for edge/embedded deployment."""
initial_type = [('float_input', FloatTensorType([None, len(feature_names)]))]
onnx_model = convert_sklearn(
sklearn_model,
initial_types=initial_type,
target_opset=13
)
with open(output_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"ONNX model saved to {output_path}")
return output_path
# Inference with ONNX Runtime (lightweight, no Python dependencies)
"""
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession("fraud_model.onnx")
input_name = session.get_inputs()[0].name
features = np.array([[100.0, 3, 14, 2, 5.0, 150.0, 10, 5000.0, 0.2, 1, 0]], dtype=np.float32)
probabilities = session.run(None, {input_name: features})
fraud_prob = probabilities[0][0][1]
"""
Complete Kubeflow Pipeline Example
# kubeflow_pipeline.py β End-to-end ML pipeline
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
# βββ Component 1: Data Validation ββββββββββββββββββββββββββββββββββ
def validate_data(
input_path: str,
validation_output: dsl.OutputPath(str),
stats_output: dsl.OutputPath(str)
) -> None:
import pandas as pd
from datetime import datetime
import json
df = pd.read_parquet(input_path)
# Schema validation
required_columns = [
'transaction_id', 'amount', 'merchant_category',
'is_fraud', 'timestamp'
]
missing = [c for c in required_columns if c not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Data quality checks
null_pct = df.isnull().sum() / len(df) * 100
high_null_cols = null_pct[null_pct > 5].to_dict()
checks = {
"row_count": len(df),
"column_count": len(df.columns),
"null_percentages": null_pct.to_dict(),
"high_null_columns": high_null_cols,
"fraud_rate": float(df['is_fraud'].mean()),
"amount_stats": {
"min": float(df['amount'].min()),
"max": float(df['amount'].max()),
"mean": float(df['amount'].mean())
},
"validation_timestamp": datetime.utcnow().isoformat(),
"passed": len(high_null_cols) == 0 and len(df) > 1000
}
with open(validation_output, 'w') as f:
json.dump(checks, f, indent=2)
# Write statistics
with open(stats_output, 'w') as f:
json.dump(df.describe().to_dict(), f)
data_validation_op = create_component_from_func(
validate_data,
base_image="python:3.10-slim",
packages_to_install=["pandas==2.0.0", "pyarrow==14.0.0"]
)
# βββ Component 2: Feature Engineering ββββββββββββββββββββββββββββββ
def engineer_features(
input_path: str,
validation_input: dsl.InputPath(str),
output_path: dsl.OutputPath(str)
) -> None:
import pandas as pd
import json
with open(validation_input) as f:
validation = json.load(f)
if not validation.get('passed', False):
raise ValueError("Data validation failed. Aborting.")
df = pd.read_parquet(input_path)
# Time-based features
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour_of_day'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
df['is_night'] = ((df['hour_of_day'] >= 23) | (df['hour_of_day'] <= 5)).astype(int)
# Amount features
df['amount_log'] = df['amount'].apply(lambda x: 0 if x <= 0 else __import__('math').log(x))
df['amount_bin'] = pd.qcut(df['amount'], q=10, labels=False, duplicates='drop')
# Interaction features
df['amount_x_hour'] = df['amount'] * df['hour_of_day']
df['amount_x_merchant'] = df['amount'] * df['merchant_category']
df.to_parquet(output_path)
feature_engineering_op = create_component_from_func(
engineer_features,
base_image="python:3.10-slim",
packages_to_install=["pandas==2.0.0", "pyarrow==14.0.0", "numpy==1.24.0"]
)
# βββ Component 3: Model Training βββββββββββββββββββββββββββββββββββ
def train_model(
features_path: dsl.InputPath(str),
model_output: dsl.OutputPath(str),
metrics_output: dsl.OutputPath(str),
experiment_name: str = "fraud-detection",
model_name: str = "fraud-detection-rf"
) -> None:
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
df = pd.read_parquet(features_path)
# Prepare features
exclude = ['transaction_id', 'timestamp', 'is_fraud']
feature_cols = [c for c in df.columns if c not in exclude]
X = df[feature_cols]
y = df['is_fraud']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train
model = RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=10,
min_samples_leaf=4,
class_weight="balanced",
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": float(accuracy_score(y_test, y_pred)),
"precision": float(precision_score(y_test, y_pred)),
"recall": float(recall_score(y_test, y_pred)),
"f1_score": float(f1_score(y_test, y_pred)),
"roc_auc": float(roc_auc_score(y_test, y_prob)),
"cv_f1_mean": float(cross_val_score(model, X, y, cv=5, scoring='f1').mean()),
"feature_count": len(feature_cols),
"train_samples": len(X_train),
"test_samples": len(X_test)
}
# Save model
with open(model_output, 'wb') as f:
pickle.dump({
'model': model,
'feature_names': feature_cols,
'metrics': metrics
}, f)
with open(metrics_output, 'w') as f:
json.dump(metrics, f, indent=2)
train_model_op = create_component_from_func(
train_model,
base_image="python:3.10-slim",
packages_to_install=[
"pandas==2.0.0", "pyarrow==14.0.0", "numpy==1.24.0",
"scikit-learn==1.3.0"
]
)
# βββ Component 4: Model Validation βββββββββββββββββββββββββββββββββ
def validate_model(
model_path: dsl.InputPath(str),
metrics_path: dsl.InputPath(str),
approval_output: dsl.OutputPath(str)
) -> None:
import json
import pickle
with open(metrics_path) as f:
metrics = json.load(f)
# Quality gates
gates = {
"f1_score": metrics.get('f1_score', 0) >= 0.75,
"precision": metrics.get('precision', 0) >= 0.70,
"recall": metrics.get('recall', 0) >= 0.60,
"roc_auc": metrics.get('roc_auc', 0) >= 0.85
}
approved = all(gates.values())
result = {
"approved": approved,
"metrics": metrics,
"gates": gates,
"rejection_reasons": [k for k, v in gates.items() if not v] if not approved else []
}
with open(approval_output, 'w') as f:
json.dump(result, f, indent=2)
if not approved:
raise ValueError(f"Model failed quality gates: {result['rejection_reasons']}")
model_validation_op = create_component_from_func(
validate_model,
base_image="python:3.10-slim"
)
# βββ Pipeline Definition βββββββββββββββββββββββββββββββββββββββββββ
@dsl.pipeline(
name="Fraud Detection Training Pipeline",
description="End-to-end training pipeline for fraud detection model"
)
def fraud_training_pipeline(
input_data_path: str = "s3://data-bucket/fraud/training_data.parquet"
):
# Step 1: Validate data
validation = data_validation_op(input_path=input_data_path)
# Step 2: Engineer features
features = feature_engineering_op(
input_path=input_data_path,
validation_input=validation.outputs['validation_output']
)
# Step 3: Train model
training = train_model_op(features_path=features.outputs['output_path'])
# Step 4: Validate model quality
approval = validate_model_op(
model_path=training.outputs['model_output'],
metrics_path=training.outputs['metrics_output']
)
# Output artifacts
dsl.get_pipeline_conf().add_op_transformer(
lambda op: op.add_pod_annotation('sidecar.istio.io/inject', 'false')
)
# Compile and run
if __name__ == "__main__":
kfp.compiler.Compiler().compile(
pipeline_func=fraud_training_pipeline,
package_path='fraud_training_pipeline.yaml'
)
print("Pipeline compiled to fraud_training_pipeline.yaml")
Model Monitoring: Drift Detection and Performance
#!/usr/bin/env python3
"""
model_monitoring.py β Detect data drift and model performance degradation.
Uses statistical tests and PSI (Population Stability Index).
"""
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
from dataclasses import dataclass
@dataclass
class DriftReport:
feature_name: str
drift_detected: bool
psi_score: float
ks_statistic: float
ks_pvalue: float
mean_shift: float
reference_mean: float
current_mean: float
class ModelMonitor:
"""Monitor ML models for drift and performance degradation."""
PSI_BINS = 10
PSI_THRESHOLDS = {
"no_drift": 0.1,
"moderate_drift": 0.25,
"significant_drift": float('inf')
}
def __init__(self, reference_data: pd.DataFrame):
"""
Args:
reference_data: Training or baseline data distribution
"""
self.reference = reference_data.copy()
self.reference_stats = self._compute_stats(reference_data)
def _compute_stats(self, df: pd.DataFrame) -> Dict:
"""Compute reference statistics."""
stats_dict = {}
for col in df.select_dtypes(include=[np.number]).columns:
stats_dict[col] = {
"mean": df[col].mean(),
"std": df[col].std(),
"min": df[col].min(),
"max": df[col].max(),
"quantiles": df[col].quantile([0.1, 0.25, 0.5, 0.75, 0.9]).to_dict(),
"histogram": np.histogram(df[col].dropna(), bins=self.PSI_BINS)[0].tolist()
}
return stats_dict
def _calculate_psi(self, reference: np.ndarray, current: np.ndarray) -> float:
"""Calculate Population Stability Index."""
# Create bins from reference distribution
min_val = reference.min()
max_val = reference.max()
if min_val == max_val:
return 0.0
bins = np.linspace(min_val, max_val, self.PSI_BINS + 1)
bins[0] = -np.inf
bins[-1] = np.inf
ref_hist, _ = np.histogram(reference, bins=bins)
cur_hist, _ = np.histogram(current, bins=bins)
# Convert to percentages
ref_pct = ref_hist / len(reference) if len(reference) > 0 else np.ones(self.PSI_BINS) / self.PSI_BINS
cur_pct = cur_hist / len(current) if len(current) > 0 else np.ones(self.PSI_BINS) / self.PSI_BINS
# Avoid division by zero
ref_pct = np.clip(ref_pct, 0.0001, 1.0)
cur_pct = np.clip(cur_pct, 0.0001, 1.0)
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return float(psi)
def detect_drift(self, current_data: pd.DataFrame) -> List[DriftReport]:
"""Detect drift between reference and current data."""
reports = []
numeric_cols = current_data.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col not in self.reference.columns:
continue
ref_vals = self.reference[col].dropna()
cur_vals = current_data[col].dropna()
if len(ref_vals) < 30 or len(cur_vals) < 30:
continue
# PSI
psi = self._calculate_psi(ref_vals.values, cur_vals.values)
# Kolmogorov-Smirnov test
ks_stat, ks_pvalue = stats.ks_2samp(ref_vals, cur_vals)
# Mean shift
ref_mean = ref_vals.mean()
cur_mean = cur_vals.mean()
mean_shift = abs(cur_mean - ref_mean) / ref_mean if ref_mean != 0 else 0
# Drift detected if PSI > 0.25 or KS p-value < 0.05
drift_detected = psi > 0.25 or ks_pvalue < 0.05
reports.append(DriftReport(
feature_name=col,
drift_detected=drift_detected,
psi_score=psi,
ks_statistic=ks_stat,
ks_pvalue=ks_pvalue,
mean_shift=mean_shift,
reference_mean=ref_mean,
current_mean=cur_mean
))
return reports
def check_performance(
self,
actual_labels: np.ndarray,
predicted_labels: np.ndarray,
predicted_probabilities: np.ndarray,
reference_f1: float = None
) -> Dict:
"""Check model performance against baseline."""
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
f1 = f1_score(actual_labels, predicted_labels)
metrics = {
"accuracy": float(accuracy_score(actual_labels, predicted_labels)),
"precision": float(precision_score(actual_labels, predicted_labels)),
"recall": float(recall_score(actual_labels, predicted_labels)),
"f1_score": float(f1),
"roc_auc": float(roc_auc_score(actual_labels, predicted_probabilities))
}
alerts = []
if reference_f1 and f1 < reference_f1 * 0.9:
alerts.append(f"F1 score degraded: {f1:.3f} vs reference {reference_f1:.3f}")
return {
"metrics": metrics,
"alerts": alerts,
"needs_retraining": len(alerts) > 0
}
def generate_report(self, current_data: pd.DataFrame) -> Dict:
"""Generate comprehensive monitoring report."""
drift_reports = self.detect_drift(current_data)
drifting_features = [r for r in drift_reports if r.drift_detected]
report = {
"timestamp": pd.Timestamp.now().isoformat(),
"total_features_checked": len(drift_reports),
"features_with_drift": len(drifting_features),
"drift_detected": len(drifting_features) > 0,
"drift_details": [
{
"feature": r.feature_name,
"psi": round(r.psi_score, 4),
"ks_pvalue": round(r.ks_pvalue, 6),
"mean_shift_pct": round(r.mean_shift * 100, 2)
}
for r in drift_reports if r.drift_detected
],
"all_features": [
{
"feature": r.feature_name,
"psi": round(r.psi_score, 4),
"drift_detected": r.drift_detected
}
for r in drift_reports
]
}
return report
# Example usage:
# monitor = ModelMonitor(training_data)
# report = monitor.generate_report(production_data)
# if report['drift_detected']:
# print(f"DRIFT DETECTED in {report['features_with_drift']} features!")
# for d in report['drift_details']:
# print(f" {d['feature']}: PSI={d['psi']}")
A/B Testing for ML Models
# ab_test_controller.py β Route traffic between model versions and measure
import random
import time
from dataclasses import dataclass
from typing import Dict, Optional, Callable
from collections import defaultdict
@dataclass
class ABTestConfig:
"""Configuration for A/B test."""
test_name: str
control_model: str
treatment_model: str
traffic_split: float = 0.5 # % to treatment
min_samples: int = 1000
primary_metric: str = "f1_score"
class ABTestController:
"""Route traffic and compare model versions."""
def __init__(self, config: ABTestConfig):
self.config = config
self.control_predictions = []
self.treatment_predictions = []
self.assignment_log = []
def route(self, request_id: str) -> str:
"""Route request to control or treatment model."""
# Deterministic routing based on request_id hash
hash_val = hash(request_id) % 10000
is_treatment = hash_val < (self.config.traffic_split * 10000)
variant = "treatment" if is_treatment else "control"
self.assignment_log.append({
"request_id": request_id,
"variant": variant,
"timestamp": time.time()
})
return variant
def log_prediction(self, variant: str, prediction: dict):
"""Log prediction result for comparison."""
if variant == "control":
self.control_predictions.append(prediction)
else:
self.treatment_predictions.append(prediction)
def get_results(self) -> Dict:
"""Compare control vs treatment performance."""
control_metrics = self._compute_metrics(self.control_predictions)
treatment_metrics = self._compute_metrics(self.treatment_predictions)
n_control = len(self.control_predictions)
n_treatment = len(self.treatment_predictions)
return {
"test_name": self.config.test_name,
"control_samples": n_control,
"treatment_samples": n_treatment,
"control_metrics": control_metrics,
"treatment_metrics": treatment_metrics,
"improvement": {
k: treatment_metrics.get(k, 0) - control_metrics.get(k, 0)
for k in control_metrics
},
"is_significant": n_control >= self.config.min_samples and n_treatment >= self.config.min_samples
}
def _compute_metrics(self, predictions: list) -> Dict:
"""Compute evaluation metrics from predictions."""
if not predictions:
return {}
# Extract labels and predictions
y_true = [p.get('actual', 0) for p in predictions]
y_pred = [p.get('predicted', 0) for p in predictions]
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
return {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, zero_division=0),
"recall": recall_score(y_true, y_pred, zero_division=0),
"f1_score": f1_score(y_true, y_pred, zero_division=0)
}
# Kubernetes Istio traffic splitting for model A/B testing
# istio-ab-test.yaml
"""
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: fraud-model
namespace: ml-platform
spec:
hosts:
- fraud-model.ml-platform.svc.cluster.local
http:
- match:
- headers:
x-model-version:
exact: "v2"
route:
- destination:
host: fraud-model-v2.ml-platform.svc.cluster.local
port:
number: 8000
weight: 100
- route:
- destination:
host: fraud-model-v1.ml-platform.svc.cluster.local
port:
number: 8000
weight: 80
- destination:
host: fraud-model-v2.ml-platform.svc.cluster.local
port:
number: 8000
weight: 20
"""
ML Infrastructure on Kubernetes
GPU Resource Management
# gpu-node-pool.yaml β EKS GPU node group for training
resource "aws_eks_node_group" "gpu_training" {
cluster_name = aws_eks_cluster.ml.name
node_group_name = "gpu-training"
node_role_arn = aws_iam_role.eks_nodes.arn
subnet_ids = var.private_subnet_ids
instance_types = ["p4d.24xlarge"] # A100 GPUs
ami_type = "AL2_x86_64_GPU"
capacity_type = "ON_DEMAND" # Training jobs need reliability
scaling_config {
desired_size = 0 # Scale to zero when not training
min_size = 0
max_size = 4
}
taint {
key = "nvidia.com/gpu"
value = "true"
effect = "NO_SCHEDULE"
}
labels = {
"workload-type" = "ml-training"
"gpu-enabled" = "true"
}
tags = merge(var.common_tags, {
Name = "gpu-training-nodes"
"k8s.io/cluster-autoscaler/enabled" = "true"
"k8s.io/cluster-autoscaler/${var.cluster_name}" = "owned"
})
}
# GPU pod for training
# training-pod.yaml
"""
apiVersion: v1
kind: Pod
metadata:
name: gpu-training-job
namespace: ml-platform
spec:
containers:
- name: training
image: ml-platform/training:latest
resources:
limits:
nvidia.com/gpu: 4 # Request 4 GPUs
memory: "256Gi"
cpu: "64"
requests:
nvidia.com/gpu: 4
memory: "128Gi"
cpu: "32"
env:
- name: CUDA_VISIBLE_DEVICES
value: "0,1,2,3"
- name: MLFLOW_TRACKING_URI
value: "http://mlflow.ml-platform.svc.cluster.local:5000"
tolerations:
- key: "nvidia.com/gpu"
operator: "Equal"
value: "true"
effect: "NoSchedule"
nodeSelector:
gpu-enabled: "true"
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: workload-type
operator: In
values: ["ml-training"]
topologyKey: kubernetes.io/hostname
"""
Complete Terraform for ML Infrastructure
# ml_infrastructure.tf β EKS + S3 + SageMaker for MLOps
terraform {
required_version = ">= 1.5"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
kubernetes = { source = "hashicorp/kubernetes", version = "~> 2.23" }
helm = { source = "hashicorp/helm", version = "~> 2.11" }
}
}
# βββ S3 Buckets ββββββββββββββββββββββββββββββββββββββββββββββββββββ
resource "aws_s3_bucket" "ml_data_lake" {
bucket = "${var.project_name}-ml-data-${data.aws_caller_identity.current.account_id}"
tags = local.common_tags
}
resource "aws_s3_bucket_lifecycle_configuration" "ml_data" {
bucket = aws_s3_bucket.ml_data_lake.id
rule {
id = "archive-old-data"
status = "Enabled"
transition {
days = 90
storage_class = "STANDARD_IA"
}
transition {
days = 365
storage_class = "GLACIER"
}
}
}
resource "aws_s3_bucket" "model_artifacts" {
bucket = "${var.project_name}-ml-models-${data.aws_caller_identity.current.account_id}"
tags = local.common_tags
}
resource "aws_s3_bucket_versioning" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
versioning_configuration {
status = "Enabled"
}
}
# βββ EKS Cluster βββββββββββββββββββββββββββββββββββββββββββββββββββ
resource "aws_eks_cluster" "ml" {
name = "${var.project_name}-ml-platform"
role_arn = aws_iam_role.eks_cluster.arn
version = var.kubernetes_version
vpc_config {
subnet_ids = var.private_subnet_ids
endpoint_private_access = true
endpoint_public_access = true
public_access_cidrs = var.admin_cidr_blocks
}
enabled_cluster_log_types = ["api", "audit", "authenticator"]
tags = local.common_tags
}
# CPU node group for inference
resource "aws_eks_node_group" "cpu_inference" {
cluster_name = aws_eks_cluster.ml.name
node_group_name = "cpu-inference"
node_role_arn = aws_iam_role.eks_nodes.arn
subnet_ids = var.private_subnet_ids
instance_types = ["m6i.2xlarge", "m6g.2xlarge"]
capacity_type = "SPOT" # Cost-effective for inference
scaling_config {
desired_size = 3
min_size = 2
max_size = 20
}
labels = {
"workload-type" = "ml-inference"
}
taint {
key = "dedicated"
value = "ml-inference"
effect = "NO_SCHEDULE"
}
tags = merge(local.common_tags, {
Name = "cpu-inference-nodes"
"k8s.io/cluster-autoscaler/enabled" = "true"
"k8s.io/cluster-autoscaler/${aws_eks_cluster.ml.name}" = "owned"
})
}
# βββ SageMaker βββββββββββββββββββββββββββββββββββββββββββββββββββββ
resource "aws_sagemaker_domain" "ml" {
domain_name = "${var.project_name}-ml-workspace"
auth_mode = "IAM"
vpc_id = aws_vpc.ml.id
subnet_ids = var.private_subnet_ids[:2]
default_user_settings {
execution_role = aws_iam_role.sagemaker_execution.arn
security_groups = [aws_security_group.sagemaker.id]
sharing_settings {
notebook_output_option = "Allowed"
s3_output_path = "s3://${aws_s3_bucket.ml_data_lake.id}/sagemaker-outputs/"
}
}
}
resource "aws_sagemaker_user_profile" "data_scientists" {
for_each = toset(var.data_scientist_users)
domain_id = aws_sagemaker_domain.ml.id
user_profile_name = each.value
user_settings {
execution_role = aws_iam_role.sagemaker_execution.arn
}
}
# SageMaker Feature Store
resource "aws_sagemaker_feature_group" "transaction_features" {
feature_group_name = "transaction-features"
record_identifier_feature_name = "transaction_id"
event_time_feature_name = "event_timestamp"
role_arn = aws_iam_role.sagemaker_execution.arn
feature_definition {
feature_name = "transaction_id"
feature_type = "String"
}
feature_definition {
feature_name = "event_timestamp"
feature_type = "String"
}
feature_definition {
feature_name = "amount"
feature_type = "Fractional"
}
feature_definition {
feature_name = "fraud_probability"
feature_type = "Fractional"
}
online_store_config {
enable_online_store = true
}
offline_store_config {
s3_storage_config {
s3_uri = "s3://${aws_s3_bucket.ml_data_lake.id}/feature-store/"
}
}
}
# βββ Kubeflow via Helm βββββββββββββββββββββββββββββββββββββββββββββ
resource "helm_release" "kubeflow_pipelines" {
name = "kubeflow-pipelines"
repository = "https://googlecloudplatform.github.io/ai-platform/charts"
chart = "kubeflow-pipelines"
namespace = kubernetes_namespace.ml_platform.metadata[0].name
version = "2.0.0"
set {
name = "mysql.persistence.enabled"
value = "true"
}
set {
name = "minio.persistence.enabled"
value = "true"
}
depends_on = [aws_eks_node_group.cpu_inference]
}
# βββ MLflow via Helm βββββββββββββββββββββββββββββββββββββββββββββββ
resource "helm_release" "mlflow" {
name = "mlflow"
repository = "https://community-charts.github.io/helm-charts"
chart = "mlflow"
namespace = kubernetes_namespace.ml_platform.metadata[0].name
version = "0.7.0"
set {
name = "backendStore.postgres.enabled"
value = "true"
}
set {
name = "artifactRoot.s3.enabled"
value = "true"
}
set {
name = "artifactRoot.s3.bucket"
value = aws_s3_bucket.model_artifacts.id
}
set_sensitive {
name = "artifactRoot.s3.awsAccessKeyId"
value = aws_iam_access_key.mlflow.id
}
set_sensitive {
name = "artifactRoot.s3.awsSecretAccessKey"
value = aws_iam_access_key.mlflow.secret
}
}
# βββ Kubernetes Resources ββββββββββββββββββββββββββββββββββββββββββ
resource "kubernetes_namespace" "ml_platform" {
metadata {
name = "ml-platform"
labels = {
"istio-injection" = "enabled"
"pod-security.kubernetes.io/enforce" = "restricted"
}
}
depends_on = [aws_eks_cluster.ml]
}
resource "kubernetes_resource_quota" "ml_training" {
metadata {
name = "training-quota"
namespace = kubernetes_namespace.ml_platform.metadata[0].name
}
spec {
hard = {
"requests.cpu" = "100"
"requests.memory" = "500Gi"
"requests.nvidia.com/gpu" = "8"
"limits.nvidia.com/gpu" = "8"
"pods" = "50"
}
}
}
resource "kubernetes_network_policy" "ml_isolation" {
metadata {
name = "ml-platform-isolation"
namespace = kubernetes_namespace.ml_platform.metadata[0].name
}
spec {
pod_selector {}
policy_types = ["Ingress", "Egress"]
ingress {
from {
namespace_selector {
match_labels = {
name = "ml-platform"
}
}
}
}
egress {
to {
namespace_selector {
match_labels = {
name = "ml-platform"
}
}
}
}
}
}
# βββ IAM Roles βββββββββββββββββββββββββββββββββββββββββββββββββββββ
resource "aws_iam_role" "sagemaker_execution" {
name = "${var.project_name}-sagemaker-execution"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "sagemaker.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy" "sagemaker_s3" {
name = "s3-access"
role = aws_iam_role.sagemaker_execution.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.ml_data_lake.arn,
"${aws_s3_bucket.ml_data_lake.arn}/*",
aws_s3_bucket.model_artifacts.arn,
"${aws_s3_bucket.model_artifacts.arn}/*"
]
}
]
})
}
data "aws_caller_identity" "current" {}
locals {
common_tags = {
Project = var.project_name
Environment = var.environment
ManagedBy = "terraform"
Team = "ml-platform"
}
}
Self-Healing Infrastructure Vision: This MLOps foundation enables the self-healing infrastructure agents I am currently building as a POC. The architecture works as follows: (1) The model monitoring pipeline continuously evaluates production model performance; (2) When drift or degradation is detected, it triggers the Kubeflow retraining pipeline; (3) The newly trained model is automatically evaluated against quality gates; (4) If it passes, the deployment pipeline performs a canary rollout via Istio traffic splitting; (5) A/B testing validates the new model before full promotion. This closes the loop from detection to remediation without human intervention β the core of autonomous operations.
Multi-Agent Orchestration Note: My deep-dive research on multi-agent orchestration for DevOps automation explores extending this architecture beyond single pipelines. In a multi-agent system, specialized agents handle: data validation, feature engineering, model selection, deployment, and monitoring. An orchestration agent coordinates these using a shared state and goal-driven planning. This is the next evolution of MLOps β from pipeline automation to intelligent agent collaboration.