41 pages Β· 8 sections
Ctrl K
GitHub Portfolio

ML Pipeline Operations

MLOps applies DevOps principles to machine learning workflows, enabling reliable, reproducible, and scalable model deployment and monitoring. This guide covers the complete MLOps lifecycle from data ingestion to production model monitoring, with working Kubeflow pipelines, drift detection, and Terraform infrastructure for ML on Kubernetes.

MLOps Lifecycle: Data β†’ Model β†’ Serve β†’ Monitor

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        MLOps Lifecycle                                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚  β”‚     DATA     │─────►│    MODEL     │─────►│    SERVE     β”‚          β”‚
β”‚  β”‚              β”‚      β”‚              β”‚      β”‚              β”‚          β”‚
β”‚  β”‚ β€’ Ingestion  β”‚      β”‚ β€’ Training   β”‚      β”‚ β€’ Deploy     β”‚          β”‚
β”‚  β”‚ β€’ Validation β”‚      β”‚ β€’ Evaluation β”‚      β”‚ β€’ A/B Test   β”‚          β”‚
β”‚  β”‚ β€’ Feature    β”‚      β”‚ β€’ Versioning β”‚      β”‚ β€’ Scale      β”‚          β”‚
β”‚  β”‚   Engineeringβ”‚      β”‚ β€’ Register   β”‚      β”‚ β€’ Endpoint   β”‚          β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚         β”‚                     β”‚                     β”‚                    β”‚
β”‚         β”‚            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚                    β”‚
β”‚         β”‚            β”‚    MONITOR      β”‚β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β”‚
β”‚         β”‚            β”‚                 β”‚                                 β”‚
β”‚         β”‚            β”‚ β€’ Drift Detect  β”‚                                 β”‚
β”‚         └───────────►│ β€’ Performance   β”‚                                 β”‚
β”‚                      β”‚ β€’ Data Quality  β”‚                                 β”‚
β”‚                      β”‚ β€’ Auto-Retrain  β”‚                                 β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                 β”‚
β”‚                                                                          β”‚
β”‚  Feedback Loop: Monitor output triggers retraining pipeline             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ML Pipeline Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    ML Pipeline Architecture                              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚                    Kubeflow Pipelines                         β”‚       β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚       β”‚
β”‚  β”‚  β”‚Data    β”‚ β”‚Feature β”‚ β”‚Train   β”‚ β”‚Evaluateβ”‚ β”‚Deploy  β”‚    β”‚       β”‚
β”‚  β”‚  β”‚Ingest  │─►│Engineer│─►│Model   │─►│Model   │─►│Model   β”‚    β”‚       β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚       β”‚
β”‚  β”‚       β”‚           β”‚          β”‚          β”‚          β”‚         β”‚       β”‚
β”‚  β”‚       β–Ό           β–Ό          β–Ό          β–Ό          β–Ό         β”‚       β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚       β”‚
β”‚  β”‚  β”‚              MLflow Tracking Server                     β”‚  β”‚       β”‚
β”‚  β”‚  β”‚  Experiments β€’ Metrics β€’ Parameters β€’ Artifacts        β”‚  β”‚       β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                          β”‚                                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚                       β–Ό                                      β”‚      β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚      β”‚
β”‚  β”‚  β”‚Feature Store β”‚ β”‚ Model        β”‚ β”‚ Monitoring   β”‚        β”‚      β”‚
β”‚  β”‚  β”‚(Feast/Tecton)β”‚ β”‚ Registry     β”‚ β”‚(Evidently/  β”‚        β”‚      β”‚
β”‚  β”‚  β”‚              β”‚ β”‚(MLflow)      β”‚ β”‚  WhyLabs)    β”‚        β”‚      β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                          β”‚                                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚                       β–Ό                                      β”‚      β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚      β”‚
β”‚  β”‚  β”‚ S3 / GCS     β”‚ β”‚ ECR / GCR    β”‚ β”‚ Prometheus   β”‚        β”‚      β”‚
β”‚  β”‚  β”‚ (Data Lake)  β”‚ β”‚ (Model Arti- β”‚ β”‚ + Grafana    β”‚        β”‚      β”‚
β”‚  β”‚  β”‚              β”‚ β”‚  facts)      β”‚ β”‚              β”‚        β”‚      β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚                    Kubernetes (EKS/GKE)                       β”‚      β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚      β”‚
β”‚  β”‚  β”‚ Training β”‚  β”‚ Inferenceβ”‚  β”‚ GPU Node β”‚  β”‚ CPU Node β”‚    β”‚      β”‚
β”‚  β”‚  β”‚ Pods     β”‚  β”‚ Service  β”‚  β”‚ Group    β”‚  β”‚ Group    β”‚    β”‚      β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Feature Stores

Feature stores centralize feature computation and serving, ensuring consistency between training and inference.

Feature StoreTypeBest ForKey Strengths
Feast (open source)Open SourceKubernetes-native; cost-consciousGreat K8s integration; multiple offline stores; active community
TectonCommercialEnterprise; real-time featuresReal-time transformation; enterprise support; managed
SageMaker Feature StoreAWS NativeAWS-only environmentsDeep SageMaker integration; offline/online store
Vertex AI Feature StoreGCP NativeGCP-only environmentsNative GCP integration; streaming features
# feature_store_example.py β€” Feast feature store definition
from feast import Entity, Feature, FeatureView, ValueType, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta

# Define an entity (the primary key for features)
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User identifier"
)

# Define a feature view (how features are computed and stored)
user_features = FeatureView(
    name="user_transaction_features",
    entities=["user_id"],
    ttl=timedelta(hours=24),
    features=[
        Feature(name="total_transactions_30d", dtype=Float32),
        Feature(name="avg_transaction_amount", dtype=Float32),
        Feature(name="days_since_last_transaction", dtype=Float32),
        Feature(name="total_spend_90d", dtype=Float32),
        Feature(name="merchant_category_count", dtype=Int64),
    ],
    online=True,
    source=FileSource(
        path="s3://features-bucket/user_features.parquet",
        event_timestamp_column="feature_timestamp"
    ),
    tags={"team": "ml-platform", "version": "v1"}
)

# Feature service (consumable bundle of features for a model)
from feast import FeatureService

fraud_detection_features = FeatureService(
    name="fraud_detection_v1",
    features=[user_features],
    tags={"model": "fraud-detection-xgboost", "version": "1.0.0"}
)

# Retrieve features for inference
from feast import FeatureStore

store = FeatureStore(repo_path=".")

features = store.get_online_features(
    features=[
        "user_transaction_features:total_transactions_30d",
        "user_transaction_features:avg_transaction_amount",
        "user_transaction_features:days_since_last_transaction"
    ],
    entity_rows=[{"user_id": 12345}]
).to_dict()

print(f"Retrieved features: {features}")

Model Versioning with MLflow

# mlflow_tracking.py β€” Model training with MLflow tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import numpy as np

def train_and_register_model(
    data_path: str,
    experiment_name: str = "fraud-detection",
    model_name: str = "fraud-detection-rf"
):
    """Train model with full MLflow tracking and registration."""

    mlflow.set_experiment(experiment_name)

    with mlflow.start_run(run_name=f"{model_name}-training") as run:
        # ─── Load Data ───────────────────────────────────────────
        df = pd.read_parquet(data_path)
        X = df.drop(['is_fraud', 'transaction_id'], axis=1)
        y = df['is_fraud']

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # ─── Log Parameters ──────────────────────────────────────
        params = {
            "n_estimators": 200,
            "max_depth": 15,
            "min_samples_split": 10,
            "min_samples_leaf": 4,
            "class_weight": "balanced",
            "random_state": 42
        }
        mlflow.log_params(params)

        # ─── Train ───────────────────────────────────────────────
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)

        # ─── Evaluate ────────────────────────────────────────────
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]

        metrics = {
            "accuracy": accuracy_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred),
            "recall": recall_score(y_test, y_pred),
            "f1_score": f1_score(y_test, y_pred),
            "test_set_size": len(X_test),
            "feature_count": X.shape[1]
        }
        mlflow.log_metrics(metrics)

        # ─── Log Model with Signature ────────────────────────────
        signature = mlflow.models.signature.infer_signature(
            model_input=X_train.head(5),
            model_output=model.predict(X_train.head(5))
        )

        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path="model",
            signature=signature,
            registered_model_name=model_name,
            conda_env={
                "channels": ["defaults", "conda-forge"],
                "dependencies": [
                    "python=3.10",
                    "scikit-learn=1.3.0",
                    "pandas=2.0.0",
                    "numpy=1.24.0",
                    "pip",
                    {"pip": ["mlflow==2.8.0"]}
                ],
                "name": "mlflow-env"
            }
        )

        # ─── Log Artifacts ───────────────────────────────────────
        # Feature importance
        importance_df = pd.DataFrame({
            'feature': X.columns,
            'importance': model.feature_importances_
        }).sort_values('importance', ascending=False)
        importance_df.to_csv("feature_importance.csv", index=False)
        mlflow.log_artifact("feature_importance.csv")

        print(f"Run ID: {run.info.run_id}")
        print(f"Metrics: {metrics}")

        return run.info.run_id, metrics


# Transition model to production stage
def promote_model(model_name: str, version: int, stage: str = "Production"):
    """Promote a registered model to a new stage."""
    client = mlflow.tracking.MlflowClient()

    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage=stage,
        archive_existing_versions=True  # Archive previous production model
    )

    print(f"Model {model_name} v{version} promoted to {stage}")


# Example usage:
# run_id, metrics = train_and_register_model("s3://data-bucket/fraud/training.parquet")
# promote_model("fraud-detection-rf", version=3, stage="Production")

Model Deployment Patterns

Batch Inference

# batch_inference.py β€” Scheduled batch inference pipeline
import mlflow
import pandas as pd
from datetime import datetime
import boto3


def batch_inference(
    model_name: str = "fraud-detection-rf",
    model_stage: str = "Production",
    input_path: str = "s3://data-bucket/fraud/daily_transactions.parquet",
    output_path: str = None
):
    """Run batch inference on daily transactions."""

    # Load production model from MLflow Registry
    model_uri = f"models:/{model_name}/{model_stage}"
    model = mlflow.sklearn.load_model(model_uri)

    # Load input data
    df = pd.read_parquet(input_path)
    feature_cols = [c for c in df.columns if c not in ['transaction_id', 'is_fraud']]

    # Run inference
    df['fraud_probability'] = model.predict_proba(df[feature_cols])[:, 1]
    df['fraud_prediction'] = (df['fraud_probability'] > 0.5).astype(int)
    df['prediction_timestamp'] = datetime.utcnow().isoformat()
    df['model_version'] = model_name

    # Write results
    if output_path is None:
        date_str = datetime.utcnow().strftime("%Y-%m-%d")
        output_path = f"s3://data-bucket/fraud/predictions/date={date_str}/"

    df.to_parquet(output_path, index=False)

    # Publish metrics
    print(f"Processed {len(df)} transactions")
    print(f"High-risk flagged: {(df['fraud_probability'] > 0.8).sum()}")
    print(f"Results written to: {output_path}")

    return df


# Kubernetes CronJob for daily batch inference
# batch-inference-cronjob.yaml
"""
apiVersion: batch/v1
kind: CronJob
metadata:
  name: fraud-batch-inference
  namespace: ml-platform
spec:
  schedule: "0 2 * * *"  # 2 AM daily
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: inference
            image: ml-platform/inference:latest
            command:
            - python
            - -c
            - |
              from batch_inference import batch_inference
              batch_inference(
                model_name="fraud-detection-rf",
                input_path="s3://data-bucket/fraud/daily/{{ ds }}/transactions.parquet"
              )
            env:
            - name: MLFLOW_TRACKING_URI
              value: "http://mlflow.ml-platform.svc.cluster.local:5000"
            - name: AWS_REGION
              value: "us-east-1"
            resources:
              requests:
                memory: "2Gi"
                cpu: "1"
              limits:
                memory: "4Gi"
                cpu: "2"
          restartPolicy: OnFailure
"""

Real-Time Inference (API Endpoint)

# inference_service.py β€” FastAPI real-time inference service
import os
import mlflow
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import time

app = FastAPI(title="Fraud Detection API", version="1.0.0")

# Prometheus metrics
PREDICTION_COUNT = Counter(
    'model_predictions_total',
    'Total predictions',
    ['model_version', 'prediction_class']
)
PREDICTION_LATENCY = Histogram(
    'model_prediction_latency_seconds',
    'Prediction latency',
    ['model_version']
)
MODEL_INFO = Gauge(
    'model_info',
    'Model information',
    ['model_name', 'model_version', 'model_stage']
)

# Load model at startup
MODEL_NAME = os.environ.get("MODEL_NAME", "fraud-detection-rf")
MODEL_STAGE = os.environ.get("MODEL_STAGE", "Production")
MLFLOW_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")

mlflow.set_tracking_uri(MLFLOW_URI)
_model = None
_model_version = None


class TransactionFeatures(BaseModel):
    amount: float
    merchant_category: int
    hour_of_day: int
    day_of_week: int
    days_since_last_transaction: float
    avg_transaction_amount_30d: float
    transaction_count_30d: int
    total_spend_90d: float
    merchant_risk_score: float
    card_present: int
    international: int


class PredictionResponse(BaseModel):
    fraud_probability: float
    fraud_prediction: int
    model_version: str
    prediction_time_ms: float


@app.on_event("startup")
def load_model():
    global _model, _model_version
    model_uri = f"models:/{MODEL_NAME}/{MODEL_STAGE}"
    _model = mlflow.sklearn.load_model(model_uri)

    client = mlflow.tracking.MlflowClient()
    versions = client.get_latest_versions(MODEL_NAME, stages=[MODEL_STAGE])
    _model_version = versions[0].version if versions else "unknown"

    MODEL_INFO.labels(
        model_name=MODEL_NAME,
        model_version=_model_version,
        model_stage=MODEL_STAGE
    ).set(1)

    print(f"Loaded model {MODEL_NAME} v{_model_version} from stage {MODEL_STAGE}")


@app.get("/health")
def health():
    return {
        "status": "healthy",
        "model": MODEL_NAME,
        "version": _model_version,
        "stage": MODEL_STAGE
    }


@app.post("/predict", response_model=PredictionResponse)
def predict(features: TransactionFeatures):
    if _model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    start = time.time()

    # Convert to DataFrame (preserves feature names)
    df = pd.DataFrame([features.dict()])

    # Predict
    probability = float(_model.predict_proba(df)[0][1])
    prediction = int(probability > 0.5)

    latency = time.time() - start

    # Record metrics
    PREDICTION_COUNT.labels(
        model_version=_model_version,
        prediction_class="fraud" if prediction == 1 else "legitimate"
    ).inc()
    PREDICTION_LATENCY.labels(model_version=_model_version).observe(latency)

    return PredictionResponse(
        fraud_probability=probability,
        fraud_prediction=prediction,
        model_version=_model_version,
        prediction_time_ms=latency * 1000
    )


@app.post("/predict/batch")
def predict_batch(features: List[TransactionFeatures]):
    if _model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    df = pd.DataFrame([f.dict() for f in features])
    probabilities = _model.predict_proba(df)[:, 1].tolist()
    predictions = [int(p > 0.5) for p in probabilities]

    return {
        "predictions": predictions,
        "probabilities": probabilities,
        "count": len(features),
        "model_version": _model_version
    }


# Dockerfile
"""
FROM python:3.10-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY inference_service.py .
EXPOSE 8000

CMD ["uvicorn", "inference_service:app", "--host", "0.0.0.0", "--port", "8000"]
"""

Edge Deployment

# Convert sklearn model to ONNX for edge deployment
import skl2onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType


def convert_to_onnx(sklearn_model, feature_names, output_path):
    """Convert scikit-learn model to ONNX for edge/embedded deployment."""
    initial_type = [('float_input', FloatTensorType([None, len(feature_names)]))]

    onnx_model = convert_sklearn(
        sklearn_model,
        initial_types=initial_type,
        target_opset=13
    )

    with open(output_path, "wb") as f:
        f.write(onnx_model.SerializeToString())

    print(f"ONNX model saved to {output_path}")
    return output_path


# Inference with ONNX Runtime (lightweight, no Python dependencies)
"""
import onnxruntime as ort
import numpy as np

session = ort.InferenceSession("fraud_model.onnx")
input_name = session.get_inputs()[0].name

features = np.array([[100.0, 3, 14, 2, 5.0, 150.0, 10, 5000.0, 0.2, 1, 0]], dtype=np.float32)
probabilities = session.run(None, {input_name: features})
fraud_prob = probabilities[0][0][1]
"""

Complete Kubeflow Pipeline Example

# kubeflow_pipeline.py β€” End-to-end ML pipeline
import kfp
from kfp import dsl
from kfp.components import create_component_from_func


# ─── Component 1: Data Validation ──────────────────────────────────
def validate_data(
    input_path: str,
    validation_output: dsl.OutputPath(str),
    stats_output: dsl.OutputPath(str)
) -> None:
    import pandas as pd
    from datetime import datetime
    import json

    df = pd.read_parquet(input_path)

    # Schema validation
    required_columns = [
        'transaction_id', 'amount', 'merchant_category',
        'is_fraud', 'timestamp'
    ]
    missing = [c for c in required_columns if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Data quality checks
    null_pct = df.isnull().sum() / len(df) * 100
    high_null_cols = null_pct[null_pct > 5].to_dict()

    checks = {
        "row_count": len(df),
        "column_count": len(df.columns),
        "null_percentages": null_pct.to_dict(),
        "high_null_columns": high_null_cols,
        "fraud_rate": float(df['is_fraud'].mean()),
        "amount_stats": {
            "min": float(df['amount'].min()),
            "max": float(df['amount'].max()),
            "mean": float(df['amount'].mean())
        },
        "validation_timestamp": datetime.utcnow().isoformat(),
        "passed": len(high_null_cols) == 0 and len(df) > 1000
    }

    with open(validation_output, 'w') as f:
        json.dump(checks, f, indent=2)

    # Write statistics
    with open(stats_output, 'w') as f:
        json.dump(df.describe().to_dict(), f)


data_validation_op = create_component_from_func(
    validate_data,
    base_image="python:3.10-slim",
    packages_to_install=["pandas==2.0.0", "pyarrow==14.0.0"]
)


# ─── Component 2: Feature Engineering ──────────────────────────────
def engineer_features(
    input_path: str,
    validation_input: dsl.InputPath(str),
    output_path: dsl.OutputPath(str)
) -> None:
    import pandas as pd
    import json

    with open(validation_input) as f:
        validation = json.load(f)

    if not validation.get('passed', False):
        raise ValueError("Data validation failed. Aborting.")

    df = pd.read_parquet(input_path)

    # Time-based features
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['hour_of_day'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
    df['is_night'] = ((df['hour_of_day'] >= 23) | (df['hour_of_day'] <= 5)).astype(int)

    # Amount features
    df['amount_log'] = df['amount'].apply(lambda x: 0 if x <= 0 else __import__('math').log(x))
    df['amount_bin'] = pd.qcut(df['amount'], q=10, labels=False, duplicates='drop')

    # Interaction features
    df['amount_x_hour'] = df['amount'] * df['hour_of_day']
    df['amount_x_merchant'] = df['amount'] * df['merchant_category']

    df.to_parquet(output_path)


feature_engineering_op = create_component_from_func(
    engineer_features,
    base_image="python:3.10-slim",
    packages_to_install=["pandas==2.0.0", "pyarrow==14.0.0", "numpy==1.24.0"]
)


# ─── Component 3: Model Training ───────────────────────────────────
def train_model(
    features_path: dsl.InputPath(str),
    model_output: dsl.OutputPath(str),
    metrics_output: dsl.OutputPath(str),
    experiment_name: str = "fraud-detection",
    model_name: str = "fraud-detection-rf"
) -> None:
    import pandas as pd
    import pickle
    import json
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

    df = pd.read_parquet(features_path)

    # Prepare features
    exclude = ['transaction_id', 'timestamp', 'is_fraud']
    feature_cols = [c for c in df.columns if c not in exclude]
    X = df[feature_cols]
    y = df['is_fraud']

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Train
    model = RandomForestClassifier(
        n_estimators=200,
        max_depth=15,
        min_samples_split=10,
        min_samples_leaf=4,
        class_weight="balanced",
        random_state=42,
        n_jobs=-1
    )
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]

    metrics = {
        "accuracy": float(accuracy_score(y_test, y_pred)),
        "precision": float(precision_score(y_test, y_pred)),
        "recall": float(recall_score(y_test, y_pred)),
        "f1_score": float(f1_score(y_test, y_pred)),
        "roc_auc": float(roc_auc_score(y_test, y_prob)),
        "cv_f1_mean": float(cross_val_score(model, X, y, cv=5, scoring='f1').mean()),
        "feature_count": len(feature_cols),
        "train_samples": len(X_train),
        "test_samples": len(X_test)
    }

    # Save model
    with open(model_output, 'wb') as f:
        pickle.dump({
            'model': model,
            'feature_names': feature_cols,
            'metrics': metrics
        }, f)

    with open(metrics_output, 'w') as f:
        json.dump(metrics, f, indent=2)


train_model_op = create_component_from_func(
    train_model,
    base_image="python:3.10-slim",
    packages_to_install=[
        "pandas==2.0.0", "pyarrow==14.0.0", "numpy==1.24.0",
        "scikit-learn==1.3.0"
    ]
)


# ─── Component 4: Model Validation ─────────────────────────────────
def validate_model(
    model_path: dsl.InputPath(str),
    metrics_path: dsl.InputPath(str),
    approval_output: dsl.OutputPath(str)
) -> None:
    import json
    import pickle

    with open(metrics_path) as f:
        metrics = json.load(f)

    # Quality gates
    gates = {
        "f1_score": metrics.get('f1_score', 0) >= 0.75,
        "precision": metrics.get('precision', 0) >= 0.70,
        "recall": metrics.get('recall', 0) >= 0.60,
        "roc_auc": metrics.get('roc_auc', 0) >= 0.85
    }

    approved = all(gates.values())

    result = {
        "approved": approved,
        "metrics": metrics,
        "gates": gates,
        "rejection_reasons": [k for k, v in gates.items() if not v] if not approved else []
    }

    with open(approval_output, 'w') as f:
        json.dump(result, f, indent=2)

    if not approved:
        raise ValueError(f"Model failed quality gates: {result['rejection_reasons']}")


model_validation_op = create_component_from_func(
    validate_model,
    base_image="python:3.10-slim"
)


# ─── Pipeline Definition ───────────────────────────────────────────
@dsl.pipeline(
    name="Fraud Detection Training Pipeline",
    description="End-to-end training pipeline for fraud detection model"
)
def fraud_training_pipeline(
    input_data_path: str = "s3://data-bucket/fraud/training_data.parquet"
):
    # Step 1: Validate data
    validation = data_validation_op(input_path=input_data_path)

    # Step 2: Engineer features
    features = feature_engineering_op(
        input_path=input_data_path,
        validation_input=validation.outputs['validation_output']
    )

    # Step 3: Train model
    training = train_model_op(features_path=features.outputs['output_path'])

    # Step 4: Validate model quality
    approval = validate_model_op(
        model_path=training.outputs['model_output'],
        metrics_path=training.outputs['metrics_output']
    )

    # Output artifacts
    dsl.get_pipeline_conf().add_op_transformer(
        lambda op: op.add_pod_annotation('sidecar.istio.io/inject', 'false')
    )


# Compile and run
if __name__ == "__main__":
    kfp.compiler.Compiler().compile(
        pipeline_func=fraud_training_pipeline,
        package_path='fraud_training_pipeline.yaml'
    )
    print("Pipeline compiled to fraud_training_pipeline.yaml")

Model Monitoring: Drift Detection and Performance

#!/usr/bin/env python3
"""
model_monitoring.py β€” Detect data drift and model performance degradation.
Uses statistical tests and PSI (Population Stability Index).
"""
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
from dataclasses import dataclass


@dataclass
class DriftReport:
    feature_name: str
    drift_detected: bool
    psi_score: float
    ks_statistic: float
    ks_pvalue: float
    mean_shift: float
    reference_mean: float
    current_mean: float


class ModelMonitor:
    """Monitor ML models for drift and performance degradation."""

    PSI_BINS = 10
    PSI_THRESHOLDS = {
        "no_drift": 0.1,
        "moderate_drift": 0.25,
        "significant_drift": float('inf')
    }

    def __init__(self, reference_data: pd.DataFrame):
        """
        Args:
            reference_data: Training or baseline data distribution
        """
        self.reference = reference_data.copy()
        self.reference_stats = self._compute_stats(reference_data)

    def _compute_stats(self, df: pd.DataFrame) -> Dict:
        """Compute reference statistics."""
        stats_dict = {}
        for col in df.select_dtypes(include=[np.number]).columns:
            stats_dict[col] = {
                "mean": df[col].mean(),
                "std": df[col].std(),
                "min": df[col].min(),
                "max": df[col].max(),
                "quantiles": df[col].quantile([0.1, 0.25, 0.5, 0.75, 0.9]).to_dict(),
                "histogram": np.histogram(df[col].dropna(), bins=self.PSI_BINS)[0].tolist()
            }
        return stats_dict

    def _calculate_psi(self, reference: np.ndarray, current: np.ndarray) -> float:
        """Calculate Population Stability Index."""
        # Create bins from reference distribution
        min_val = reference.min()
        max_val = reference.max()

        if min_val == max_val:
            return 0.0

        bins = np.linspace(min_val, max_val, self.PSI_BINS + 1)
        bins[0] = -np.inf
        bins[-1] = np.inf

        ref_hist, _ = np.histogram(reference, bins=bins)
        cur_hist, _ = np.histogram(current, bins=bins)

        # Convert to percentages
        ref_pct = ref_hist / len(reference) if len(reference) > 0 else np.ones(self.PSI_BINS) / self.PSI_BINS
        cur_pct = cur_hist / len(current) if len(current) > 0 else np.ones(self.PSI_BINS) / self.PSI_BINS

        # Avoid division by zero
        ref_pct = np.clip(ref_pct, 0.0001, 1.0)
        cur_pct = np.clip(cur_pct, 0.0001, 1.0)

        psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
        return float(psi)

    def detect_drift(self, current_data: pd.DataFrame) -> List[DriftReport]:
        """Detect drift between reference and current data."""
        reports = []

        numeric_cols = current_data.select_dtypes(include=[np.number]).columns

        for col in numeric_cols:
            if col not in self.reference.columns:
                continue

            ref_vals = self.reference[col].dropna()
            cur_vals = current_data[col].dropna()

            if len(ref_vals) < 30 or len(cur_vals) < 30:
                continue

            # PSI
            psi = self._calculate_psi(ref_vals.values, cur_vals.values)

            # Kolmogorov-Smirnov test
            ks_stat, ks_pvalue = stats.ks_2samp(ref_vals, cur_vals)

            # Mean shift
            ref_mean = ref_vals.mean()
            cur_mean = cur_vals.mean()
            mean_shift = abs(cur_mean - ref_mean) / ref_mean if ref_mean != 0 else 0

            # Drift detected if PSI > 0.25 or KS p-value < 0.05
            drift_detected = psi > 0.25 or ks_pvalue < 0.05

            reports.append(DriftReport(
                feature_name=col,
                drift_detected=drift_detected,
                psi_score=psi,
                ks_statistic=ks_stat,
                ks_pvalue=ks_pvalue,
                mean_shift=mean_shift,
                reference_mean=ref_mean,
                current_mean=cur_mean
            ))

        return reports

    def check_performance(
        self,
        actual_labels: np.ndarray,
        predicted_labels: np.ndarray,
        predicted_probabilities: np.ndarray,
        reference_f1: float = None
    ) -> Dict:
        """Check model performance against baseline."""
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

        f1 = f1_score(actual_labels, predicted_labels)

        metrics = {
            "accuracy": float(accuracy_score(actual_labels, predicted_labels)),
            "precision": float(precision_score(actual_labels, predicted_labels)),
            "recall": float(recall_score(actual_labels, predicted_labels)),
            "f1_score": float(f1),
            "roc_auc": float(roc_auc_score(actual_labels, predicted_probabilities))
        }

        alerts = []
        if reference_f1 and f1 < reference_f1 * 0.9:
            alerts.append(f"F1 score degraded: {f1:.3f} vs reference {reference_f1:.3f}")

        return {
            "metrics": metrics,
            "alerts": alerts,
            "needs_retraining": len(alerts) > 0
        }

    def generate_report(self, current_data: pd.DataFrame) -> Dict:
        """Generate comprehensive monitoring report."""
        drift_reports = self.detect_drift(current_data)
        drifting_features = [r for r in drift_reports if r.drift_detected]

        report = {
            "timestamp": pd.Timestamp.now().isoformat(),
            "total_features_checked": len(drift_reports),
            "features_with_drift": len(drifting_features),
            "drift_detected": len(drifting_features) > 0,
            "drift_details": [
                {
                    "feature": r.feature_name,
                    "psi": round(r.psi_score, 4),
                    "ks_pvalue": round(r.ks_pvalue, 6),
                    "mean_shift_pct": round(r.mean_shift * 100, 2)
                }
                for r in drift_reports if r.drift_detected
            ],
            "all_features": [
                {
                    "feature": r.feature_name,
                    "psi": round(r.psi_score, 4),
                    "drift_detected": r.drift_detected
                }
                for r in drift_reports
            ]
        }

        return report


# Example usage:
# monitor = ModelMonitor(training_data)
# report = monitor.generate_report(production_data)
# if report['drift_detected']:
#     print(f"DRIFT DETECTED in {report['features_with_drift']} features!")
#     for d in report['drift_details']:
#         print(f"  {d['feature']}: PSI={d['psi']}")

A/B Testing for ML Models

# ab_test_controller.py β€” Route traffic between model versions and measure
import random
import time
from dataclasses import dataclass
from typing import Dict, Optional, Callable
from collections import defaultdict


@dataclass
class ABTestConfig:
    """Configuration for A/B test."""
    test_name: str
    control_model: str
    treatment_model: str
    traffic_split: float = 0.5  # % to treatment
    min_samples: int = 1000
    primary_metric: str = "f1_score"


class ABTestController:
    """Route traffic and compare model versions."""

    def __init__(self, config: ABTestConfig):
        self.config = config
        self.control_predictions = []
        self.treatment_predictions = []
        self.assignment_log = []

    def route(self, request_id: str) -> str:
        """Route request to control or treatment model."""
        # Deterministic routing based on request_id hash
        hash_val = hash(request_id) % 10000
        is_treatment = hash_val < (self.config.traffic_split * 10000)

        variant = "treatment" if is_treatment else "control"
        self.assignment_log.append({
            "request_id": request_id,
            "variant": variant,
            "timestamp": time.time()
        })

        return variant

    def log_prediction(self, variant: str, prediction: dict):
        """Log prediction result for comparison."""
        if variant == "control":
            self.control_predictions.append(prediction)
        else:
            self.treatment_predictions.append(prediction)

    def get_results(self) -> Dict:
        """Compare control vs treatment performance."""
        control_metrics = self._compute_metrics(self.control_predictions)
        treatment_metrics = self._compute_metrics(self.treatment_predictions)

        n_control = len(self.control_predictions)
        n_treatment = len(self.treatment_predictions)

        return {
            "test_name": self.config.test_name,
            "control_samples": n_control,
            "treatment_samples": n_treatment,
            "control_metrics": control_metrics,
            "treatment_metrics": treatment_metrics,
            "improvement": {
                k: treatment_metrics.get(k, 0) - control_metrics.get(k, 0)
                for k in control_metrics
            },
            "is_significant": n_control >= self.config.min_samples and n_treatment >= self.config.min_samples
        }

    def _compute_metrics(self, predictions: list) -> Dict:
        """Compute evaluation metrics from predictions."""
        if not predictions:
            return {}

        # Extract labels and predictions
        y_true = [p.get('actual', 0) for p in predictions]
        y_pred = [p.get('predicted', 0) for p in predictions]

        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

        return {
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": precision_score(y_true, y_pred, zero_division=0),
            "recall": recall_score(y_true, y_pred, zero_division=0),
            "f1_score": f1_score(y_true, y_pred, zero_division=0)
        }


# Kubernetes Istio traffic splitting for model A/B testing
# istio-ab-test.yaml
"""
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: fraud-model
  namespace: ml-platform
spec:
  hosts:
  - fraud-model.ml-platform.svc.cluster.local
  http:
  - match:
    - headers:
        x-model-version:
          exact: "v2"
    route:
    - destination:
        host: fraud-model-v2.ml-platform.svc.cluster.local
        port:
          number: 8000
      weight: 100
  - route:
    - destination:
        host: fraud-model-v1.ml-platform.svc.cluster.local
        port:
          number: 8000
      weight: 80
    - destination:
        host: fraud-model-v2.ml-platform.svc.cluster.local
        port:
          number: 8000
      weight: 20
"""

ML Infrastructure on Kubernetes

GPU Resource Management

# gpu-node-pool.yaml β€” EKS GPU node group for training
resource "aws_eks_node_group" "gpu_training" {
  cluster_name    = aws_eks_cluster.ml.name
  node_group_name = "gpu-training"
  node_role_arn   = aws_iam_role.eks_nodes.arn
  subnet_ids      = var.private_subnet_ids

  instance_types = ["p4d.24xlarge"]  # A100 GPUs
  ami_type       = "AL2_x86_64_GPU"

  capacity_type = "ON_DEMAND"  # Training jobs need reliability

  scaling_config {
    desired_size = 0  # Scale to zero when not training
    min_size     = 0
    max_size     = 4
  }

  taint {
    key    = "nvidia.com/gpu"
    value  = "true"
    effect = "NO_SCHEDULE"
  }

  labels = {
    "workload-type" = "ml-training"
    "gpu-enabled"   = "true"
  }

  tags = merge(var.common_tags, {
    Name = "gpu-training-nodes"
    "k8s.io/cluster-autoscaler/enabled"             = "true"
    "k8s.io/cluster-autoscaler/${var.cluster_name}" = "owned"
  })
}

# GPU pod for training
# training-pod.yaml
"""
apiVersion: v1
kind: Pod
metadata:
  name: gpu-training-job
  namespace: ml-platform
spec:
  containers:
  - name: training
    image: ml-platform/training:latest
    resources:
      limits:
        nvidia.com/gpu: 4  # Request 4 GPUs
        memory: "256Gi"
        cpu: "64"
      requests:
        nvidia.com/gpu: 4
        memory: "128Gi"
        cpu: "32"
    env:
    - name: CUDA_VISIBLE_DEVICES
      value: "0,1,2,3"
    - name: MLFLOW_TRACKING_URI
      value: "http://mlflow.ml-platform.svc.cluster.local:5000"
  tolerations:
  - key: "nvidia.com/gpu"
    operator: "Equal"
    value: "true"
    effect: "NoSchedule"
  nodeSelector:
    gpu-enabled: "true"
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          labelSelector:
            matchExpressions:
            - key: workload-type
              operator: In
              values: ["ml-training"]
          topologyKey: kubernetes.io/hostname
"""

Complete Terraform for ML Infrastructure

# ml_infrastructure.tf β€” EKS + S3 + SageMaker for MLOps
terraform {
  required_version = ">= 1.5"
  required_providers {
    aws = { source = "hashicorp/aws", version = "~> 5.0" }
    kubernetes = { source = "hashicorp/kubernetes", version = "~> 2.23" }
    helm = { source = "hashicorp/helm", version = "~> 2.11" }
  }
}

# ─── S3 Buckets ────────────────────────────────────────────────────
resource "aws_s3_bucket" "ml_data_lake" {
  bucket = "${var.project_name}-ml-data-${data.aws_caller_identity.current.account_id}"

  tags = local.common_tags
}

resource "aws_s3_bucket_lifecycle_configuration" "ml_data" {
  bucket = aws_s3_bucket.ml_data_lake.id

  rule {
    id     = "archive-old-data"
    status = "Enabled"

    transition {
      days          = 90
      storage_class = "STANDARD_IA"
    }

    transition {
      days          = 365
      storage_class = "GLACIER"
    }
  }
}

resource "aws_s3_bucket" "model_artifacts" {
  bucket = "${var.project_name}-ml-models-${data.aws_caller_identity.current.account_id}"

  tags = local.common_tags
}

resource "aws_s3_bucket_versioning" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id
  versioning_configuration {
    status = "Enabled"
  }
}

# ─── EKS Cluster ───────────────────────────────────────────────────
resource "aws_eks_cluster" "ml" {
  name     = "${var.project_name}-ml-platform"
  role_arn = aws_iam_role.eks_cluster.arn
  version  = var.kubernetes_version

  vpc_config {
    subnet_ids              = var.private_subnet_ids
    endpoint_private_access = true
    endpoint_public_access  = true
    public_access_cidrs     = var.admin_cidr_blocks
  }

  enabled_cluster_log_types = ["api", "audit", "authenticator"]

  tags = local.common_tags
}

# CPU node group for inference
resource "aws_eks_node_group" "cpu_inference" {
  cluster_name    = aws_eks_cluster.ml.name
  node_group_name = "cpu-inference"
  node_role_arn   = aws_iam_role.eks_nodes.arn
  subnet_ids      = var.private_subnet_ids

  instance_types = ["m6i.2xlarge", "m6g.2xlarge"]
  capacity_type  = "SPOT"  # Cost-effective for inference

  scaling_config {
    desired_size = 3
    min_size     = 2
    max_size     = 20
  }

  labels = {
    "workload-type" = "ml-inference"
  }

  taint {
    key    = "dedicated"
    value  = "ml-inference"
    effect = "NO_SCHEDULE"
  }

  tags = merge(local.common_tags, {
    Name                                          = "cpu-inference-nodes"
    "k8s.io/cluster-autoscaler/enabled"             = "true"
    "k8s.io/cluster-autoscaler/${aws_eks_cluster.ml.name}" = "owned"
  })
}

# ─── SageMaker ─────────────────────────────────────────────────────
resource "aws_sagemaker_domain" "ml" {
  domain_name = "${var.project_name}-ml-workspace"
  auth_mode   = "IAM"
  vpc_id      = aws_vpc.ml.id
  subnet_ids  = var.private_subnet_ids[:2]

  default_user_settings {
    execution_role  = aws_iam_role.sagemaker_execution.arn
    security_groups = [aws_security_group.sagemaker.id]

    sharing_settings {
      notebook_output_option = "Allowed"
      s3_output_path         = "s3://${aws_s3_bucket.ml_data_lake.id}/sagemaker-outputs/"
    }
  }
}

resource "aws_sagemaker_user_profile" "data_scientists" {
  for_each = toset(var.data_scientist_users)

  domain_id         = aws_sagemaker_domain.ml.id
  user_profile_name = each.value

  user_settings {
    execution_role = aws_iam_role.sagemaker_execution.arn
  }
}

# SageMaker Feature Store
resource "aws_sagemaker_feature_group" "transaction_features" {
  feature_group_name             = "transaction-features"
  record_identifier_feature_name = "transaction_id"
  event_time_feature_name        = "event_timestamp"
  role_arn                       = aws_iam_role.sagemaker_execution.arn

  feature_definition {
    feature_name = "transaction_id"
    feature_type = "String"
  }

  feature_definition {
    feature_name = "event_timestamp"
    feature_type = "String"
  }

  feature_definition {
    feature_name = "amount"
    feature_type = "Fractional"
  }

  feature_definition {
    feature_name = "fraud_probability"
    feature_type = "Fractional"
  }

  online_store_config {
    enable_online_store = true
  }

  offline_store_config {
    s3_storage_config {
      s3_uri = "s3://${aws_s3_bucket.ml_data_lake.id}/feature-store/"
    }
  }
}

# ─── Kubeflow via Helm ─────────────────────────────────────────────
resource "helm_release" "kubeflow_pipelines" {
  name       = "kubeflow-pipelines"
  repository = "https://googlecloudplatform.github.io/ai-platform/charts"
  chart      = "kubeflow-pipelines"
  namespace  = kubernetes_namespace.ml_platform.metadata[0].name
  version    = "2.0.0"

  set {
    name  = "mysql.persistence.enabled"
    value = "true"
  }

  set {
    name  = "minio.persistence.enabled"
    value = "true"
  }

  depends_on = [aws_eks_node_group.cpu_inference]
}

# ─── MLflow via Helm ───────────────────────────────────────────────
resource "helm_release" "mlflow" {
  name       = "mlflow"
  repository = "https://community-charts.github.io/helm-charts"
  chart      = "mlflow"
  namespace  = kubernetes_namespace.ml_platform.metadata[0].name
  version    = "0.7.0"

  set {
    name  = "backendStore.postgres.enabled"
    value = "true"
  }

  set {
    name  = "artifactRoot.s3.enabled"
    value = "true"
  }

  set {
    name  = "artifactRoot.s3.bucket"
    value = aws_s3_bucket.model_artifacts.id
  }

  set_sensitive {
    name  = "artifactRoot.s3.awsAccessKeyId"
    value = aws_iam_access_key.mlflow.id
  }

  set_sensitive {
    name  = "artifactRoot.s3.awsSecretAccessKey"
    value = aws_iam_access_key.mlflow.secret
  }
}

# ─── Kubernetes Resources ──────────────────────────────────────────
resource "kubernetes_namespace" "ml_platform" {
  metadata {
    name = "ml-platform"
    labels = {
      "istio-injection" = "enabled"
      "pod-security.kubernetes.io/enforce" = "restricted"
    }
  }

  depends_on = [aws_eks_cluster.ml]
}

resource "kubernetes_resource_quota" "ml_training" {
  metadata {
    name      = "training-quota"
    namespace = kubernetes_namespace.ml_platform.metadata[0].name
  }

  spec {
    hard = {
      "requests.cpu"       = "100"
      "requests.memory"    = "500Gi"
      "requests.nvidia.com/gpu" = "8"
      "limits.nvidia.com/gpu"   = "8"
      "pods"               = "50"
    }
  }
}

resource "kubernetes_network_policy" "ml_isolation" {
  metadata {
    name      = "ml-platform-isolation"
    namespace = kubernetes_namespace.ml_platform.metadata[0].name
  }

  spec {
    pod_selector {}

    policy_types = ["Ingress", "Egress"]

    ingress {
      from {
        namespace_selector {
          match_labels = {
            name = "ml-platform"
          }
        }
      }
    }

    egress {
      to {
        namespace_selector {
          match_labels = {
            name = "ml-platform"
          }
        }
      }
    }
  }
}

# ─── IAM Roles ─────────────────────────────────────────────────────
resource "aws_iam_role" "sagemaker_execution" {
  name = "${var.project_name}-sagemaker-execution"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "sagemaker.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "sagemaker_s3" {
  name = "s3-access"
  role = aws_iam_role.sagemaker_execution.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.ml_data_lake.arn,
          "${aws_s3_bucket.ml_data_lake.arn}/*",
          aws_s3_bucket.model_artifacts.arn,
          "${aws_s3_bucket.model_artifacts.arn}/*"
        ]
      }
    ]
  })
}

data "aws_caller_identity" "current" {}

locals {
  common_tags = {
    Project     = var.project_name
    Environment = var.environment
    ManagedBy   = "terraform"
    Team        = "ml-platform"
  }
}
Self-Healing Infrastructure Vision: This MLOps foundation enables the self-healing infrastructure agents I am currently building as a POC. The architecture works as follows: (1) The model monitoring pipeline continuously evaluates production model performance; (2) When drift or degradation is detected, it triggers the Kubeflow retraining pipeline; (3) The newly trained model is automatically evaluated against quality gates; (4) If it passes, the deployment pipeline performs a canary rollout via Istio traffic splitting; (5) A/B testing validates the new model before full promotion. This closes the loop from detection to remediation without human intervention β€” the core of autonomous operations.
Multi-Agent Orchestration Note: My deep-dive research on multi-agent orchestration for DevOps automation explores extending this architecture beyond single pipelines. In a multi-agent system, specialized agents handle: data validation, feature engineering, model selection, deployment, and monitoring. An orchestration agent coordinates these using a shared state and goal-driven planning. This is the next evolution of MLOps β€” from pipeline automation to intelligent agent collaboration.