Intelligent Monitoring
Intelligent monitoring combines ML-based anomaly detection with traditional thresholds to reduce alert noise and surface actionable insights. By learning normal behavior patterns from historical data, intelligent systems detect deviations that static thresholds miss โ while maintaining the predictability engineers need for operational response.
Static Thresholds vs Dynamic Baselines
| Dimension | Static Thresholds | Dynamic Baselines (ML) |
|---|---|---|
| Configuration | Manually set per metric (e.g., CPU > 80%) | Automatically learned from historical data |
| Seasonality | Misses context โ alerts on expected peaks | Learns daily/weekly patterns automatically |
| Tuning Effort | Requires ongoing manual adjustment | Self-tuning; adapts to changing workloads |
| False Positive Rate | High โ especially for bursty metrics | Low โ understands normal variation |
| Novel Detection | Only catches pre-defined conditions | Discovers unknown anomaly patterns |
| Interpretability | Clear โ "CPU was 85%, threshold 80%" | Requires explanation mechanisms |
| Cold Start | Immediate โ set and go | Requires 1-4 weeks of training data |
| Operational Complexity | Simple โ any monitoring tool supports it | Requires ML pipeline and model management |
Time Series Anomaly Detection Methods
Statistical Methods
Statistical methods are fast, interpretable, and require no training data. They are ideal as a first line of defense.
#!/usr/bin/env python3
"""
Statistical anomaly detection methods for time series metrics.
Fast, lightweight, and require no training.
"""
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
@dataclass
class AnomalyResult:
"""Result of anomaly detection on a single data point."""
timestamp: float
value: float
is_anomaly: bool
score: float # anomaly score (higher = more anomalous)
method: str
lower_bound: Optional[float] = None
upper_bound: Optional[float] = None
class ZScoreDetector:
"""Z-score based anomaly detection."""
def __init__(self, threshold: float = 3.0):
self.threshold = threshold
def detect(self, values: np.ndarray, timestamps: np.ndarray = None) -> List[AnomalyResult]:
"""Detect anomalies using z-score."""
mean = np.mean(values)
std = np.std(values)
results = []
for i, val in enumerate(values):
z_score = abs((val - mean) / std) if std > 0 else 0
ts = timestamps[i] if timestamps is not None else i
results.append(AnomalyResult(
timestamp=ts,
value=val,
is_anomaly=z_score > self.threshold,
score=z_score,
method="z_score",
lower_bound=mean - self.threshold * std,
upper_bound=mean + self.threshold * std
))
return results
class IQRDetector:
"""Interquartile Range based anomaly detection. Robust to outliers."""
def __init__(self, k: float = 1.5):
self.k = k # IQR multiplier (1.5 = standard, 3.0 = extreme)
def detect(self, values: np.ndarray, timestamps: np.ndarray = None) -> List[AnomalyResult]:
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower = q1 - self.k * iqr
upper = q3 + self.k * iqr
results = []
for i, val in enumerate(values):
ts = timestamps[i] if timestamps is not None else i
is_anomaly = val < lower or val > upper
score = max(abs(val - q1), abs(val - q3)) / iqr if iqr > 0 else 0
results.append(AnomalyResult(
timestamp=ts,
value=val,
is_anomaly=is_anomaly,
score=score,
method="iqr",
lower_bound=lower,
upper_bound=upper
))
return results
class RollingZScoreDetector:
"""Rolling window z-score โ adapts to changing baselines."""
def __init__(self, window: int = 60, threshold: float = 3.0):
self.window = window
self.threshold = threshold
def detect(self, values: np.ndarray, timestamps: np.ndarray = None) -> List[AnomalyResult]:
results = []
for i in range(len(values)):
ts = timestamps[i] if timestamps is not None else i
if i < self.window:
# Not enough history โ use global stats
window_vals = values[:i + 1]
else:
window_vals = values[i - self.window:i]
mean = np.mean(window_vals)
std = np.std(window_vals)
val = values[i]
z_score = abs((val - mean) / std) if std > 0 else 0
results.append(AnomalyResult(
timestamp=ts,
value=val,
is_anomaly=z_score > self.threshold,
score=z_score,
method="rolling_z_score",
lower_bound=mean - self.threshold * std,
upper_bound=mean + self.threshold * std
))
return results
class MADDetector:
"""Median Absolute Deviation โ extremely robust to outliers."""
def __init__(self, threshold: float = 3.5):
self.threshold = threshold
def detect(self, values: np.ndarray, timestamps: np.ndarray = None) -> List[AnomalyResult]:
median = np.median(values)
mad = np.median(np.abs(values - median))
# Convert MAD to std equivalent
modified_z_scores = 0.6745 * (values - median) / mad if mad > 0 else np.zeros_like(values)
results = []
for i, val in enumerate(values):
ts = timestamps[i] if timestamps is not None else i
results.append(AnomalyResult(
timestamp=ts,
value=val,
is_anomaly=abs(modified_z_scores[i]) > self.threshold,
score=abs(modified_z_scores[i]),
method="mad",
lower_bound=median - self.threshold * mad / 0.6745,
upper_bound=median + self.threshold * mad / 0.6745
))
return results
# Example usage:
# data = np.random.normal(100, 10, 1000)
# data[500] = 200 # Inject anomaly
# detector = RollingZScoreDetector(window=60, threshold=3.0)
# results = detector.detect(data)
# anomalies = [r for r in results if r.is_anomaly]
# print(f"Detected {len(anomalies)} anomalies")
ML-Based Methods
#!/usr/bin/env python3
"""
ML-based anomaly detection for Prometheus metrics.
Isolation Forest and LSTM Autoencoder implementations.
"""
import numpy as np
import pickle
import os
from typing import List, Optional
from dataclasses import dataclass
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, RobustScaler
@dataclass
class MetricSample:
"""A single metric sample with features."""
timestamp: float
value: float
features: np.ndarray # engineered features
class FeatureEngineer:
"""Engineer time series features for ML models."""
def __init__(
self,
window_sizes: List[int] = [5, 15, 60],
include_diff: bool = True,
include_time_features: bool = True
):
self.window_sizes = window_sizes
self.include_diff = include_diff
self.include_time_features = include_time_features
self.scaler = RobustScaler()
self.is_fitted = False
def transform(self, values: np.ndarray, timestamps: Optional[np.ndarray] = None, fit: bool = False) -> np.ndarray:
"""Transform raw values into feature vectors."""
features = []
for i in range(len(values)):
row_features = []
# Current value
row_features.append(values[i])
# Rolling statistics for each window
for w in self.window_sizes:
if i >= w:
window = values[i - w:i]
else:
window = values[:i + 1] if i > 0 else [values[i]]
row_features.extend([
np.mean(window),
np.std(window) if len(window) > 1 else 0,
np.max(window),
np.min(window),
np.percentile(window, 25),
np.percentile(window, 75)
])
# Differences
if self.include_diff:
row_features.append(values[i] - values[i - 1] if i > 0 else 0)
row_features.append(
(values[i] - values[i - 1]) / values[i - 1]
if i > 0 and values[i - 1] != 0 else 0
)
# Time features (if timestamps provided)
if self.include_time_features and timestamps is not None:
import pandas as pd
ts = pd.Timestamp(timestamps[i], unit='s')
row_features.extend([
ts.hour,
ts.dayofweek,
int(ts.dayofweek >= 5), # is_weekend
int(9 <= ts.hour <= 17 and ts.dayofweek < 5) # is_business_hours
])
features.append(row_features)
X = np.array(features)
# Scale features
if fit:
X = self.scaler.fit_transform(X)
self.is_fitted = True
elif self.is_fitted:
X = self.scaler.transform(X)
return X
class IsolationForestDetector:
"""Isolation Forest anomaly detector for time series metrics."""
def __init__(
self,
contamination: float = 0.01,
n_estimators: int = 100,
feature_engineer: Optional[FeatureEngineer] = None
):
self.contamination = contamination
self.n_estimators = n_estimators
self.model = None
self.feature_engineer = feature_engineer or FeatureEngineer()
def fit(self, values: np.ndarray, timestamps: Optional[np.ndarray] = None):
"""Train the model on historical data."""
X = self.feature_engineer.transform(values, timestamps, fit=True)
self.model = IsolationForest(
contamination=self.contamination,
n_estimators=self.n_estimators,
random_state=42,
n_jobs=-1
)
self.model.fit(X)
return self
def predict(self, values: np.ndarray, timestamps: Optional[np.ndarray] = None) -> np.ndarray:
"""Predict anomaly scores for new data."""
if self.model is None:
raise ValueError("Model not trained. Call fit() first.")
X = self.feature_engineer.transform(values, timestamps, fit=False)
# anomaly_score returns: -1 for anomaly, 1 for normal
# decision_function gives anomaly score (negative = anomaly)
scores = self.model.decision_function(X)
predictions = self.model.predict(X)
return predictions, scores
def save(self, path: str):
"""Save model to disk."""
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as f:
pickle.dump({
'model': self.model,
'feature_engineer': self.feature_engineer,
'contamination': self.contamination
}, f)
@classmethod
def load(cls, path: str):
"""Load model from disk."""
with open(path, 'rb') as f:
data = pickle.load(f)
instance = cls(contamination=data['contamination'])
instance.model = data['model']
instance.feature_engineer = data['feature_engineer']
return instance
# Example usage:
# detector = IsolationForestDetector(contamination=0.01)
# detector.fit(historical_values, historical_timestamps)
# predictions, scores = detector.predict(new_values)
# anomalies = np.where(predictions == -1)[0]
# print(f"Detected {len(anomalies)} anomalies")
Complete Python Example: Prometheus + ML Anomaly Detection
This Flask API service fetches metrics from Prometheus, scores them using Isolation Forest, and exposes anomaly alerts as Prometheus metrics.
#!/usr/bin/env python3
"""
prometheus_anomaly_detector.py โ Flask API for ML-based anomaly detection
on Prometheus metrics.
Endpoints:
POST /train โ Train model on historical Prometheus data
GET /predict โ Get anomaly predictions for recent data
GET /metrics โ Prometheus-compatible metrics with anomaly scores
"""
import os
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import numpy as np
import requests
from flask import Flask, jsonify, request
from prometheus_client import Gauge, generate_latest, CONTENT_TYPE_LATEST
from ml_detector import IsolationForestDetector, FeatureEngineer, AnomalyResult
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Prometheus configuration
PROMETHEUS_URL = os.environ.get("PROMETHEUS_URL", "http://localhost:9090")
MODEL_DIR = os.environ.get("MODEL_DIR", "/tmp/anomaly_models")
# In-memory model cache
_models: Dict[str, IsolationForestDetector] = {}
# Prometheus metrics for anomaly scores
ANOMALY_SCORE = Gauge(
'ml_anomaly_score',
'ML anomaly score for metric',
['metric_name', 'instance', 'job']
)
ANOMALY_FLAG = Gauge(
'ml_anomaly_flag',
'1 if anomaly detected, 0 otherwise',
['metric_name', 'instance', 'job']
)
TRAINING_TIMESTAMP = Gauge(
'ml_model_last_training_timestamp',
'Unix timestamp of last model training',
['metric_name']
)
def query_prometheus(
promql: str,
start: datetime,
end: datetime,
step: str = "1m"
) -> List[dict]:
"""Query Prometheus range data."""
params = {
"query": promql,
"start": start.timestamp(),
"end": end.timestamp(),
"step": step
}
resp = requests.get(
f"{PROMETHEUS_URL}/api/v1/query_range",
params=params,
timeout=30
)
resp.raise_for_status()
data = resp.json()
if data.get("status") != "success":
raise ValueError(f"Prometheus query failed: {data.get('error')}")
return data["data"]["result"]
def parse_prometheus_samples(result: dict) -> tuple:
"""Parse Prometheus result into timestamps and values."""
metric_meta = result.get("metric", {})
values = result.get("values", [])
timestamps = np.array([float(v[0]) for v in values])
vals = np.array([float(v[1]) for v in values])
return timestamps, vals, metric_meta
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy", "models_loaded": len(_models)})
@app.route("/train", methods=["POST"])
def train():
"""Train anomaly detection model on Prometheus historical data."""
body = request.json or {}
promql = body.get("promql")
model_name = body.get("model_name", promql.replace(" ", "_"))
training_days = body.get("training_days", 14)
contamination = body.get("contamination", 0.01)
if not promql:
return jsonify({"error": "promql is required"}), 400
try:
end = datetime.now()
start = end - timedelta(days=training_days)
logger.info(f"Fetching training data for query: {promql}")
results = query_prometheus(promql, start, end, step="5m")
if not results:
return jsonify({"error": "No data returned from Prometheus"}), 404
all_timestamps = []
all_values = []
for result in results:
ts, vals, meta = parse_prometheus_samples(result)
all_timestamps.extend(ts)
all_values.extend(vals)
if len(all_values) < 100:
return jsonify({
"error": f"Insufficient data: {len(all_values)} samples (need 100+)"
}), 400
logger.info(f"Training on {len(all_values)} samples")
detector = IsolationForestDetector(contamination=contamination)
detector.fit(
np.array(all_values),
np.array(all_timestamps)
)
# Save model
model_path = f"{MODEL_DIR}/{model_name}.pkl"
detector.save(model_path)
_models[model_name] = detector
TRAINING_TIMESTAMP.labels(metric_name=model_name).set_to_current_time()
return jsonify({
"status": "trained",
"model_name": model_name,
"samples": len(all_values),
"training_period_days": training_days,
"model_path": model_path
})
except Exception as e:
logger.exception("Training failed")
return jsonify({"error": str(e)}), 500
@app.route("/predict", methods=["GET"])
def predict():
"""Get anomaly predictions for a metric."""
promql = request.args.get("promql")
model_name = request.args.get("model_name", promql.replace(" ", "_") if promql else None)
lookback_minutes = int(request.args.get("lookback_minutes", "60"))
if not model_name or not promql:
return jsonify({"error": "promql and model_name are required"}), 400
try:
# Load model if not in memory
if model_name not in _models:
model_path = f"{MODEL_DIR}/{model_name}.pkl"
if os.path.exists(model_path):
_models[model_name] = IsolationForestDetector.load(model_path)
else:
return jsonify({"error": f"Model not found: {model_name}. Train first."}), 404
detector = _models[model_name]
# Fetch recent data
end = datetime.now()
start = end - timedelta(minutes=lookback_minutes)
results = query_prometheus(promql, start, end, step="1m")
predictions = []
for result in results:
ts, vals, meta = parse_prometheus_samples(result)
if len(vals) < 10:
continue
pred_labels, scores = detector.predict(vals, ts)
for i in range(len(vals)):
predictions.append({
"timestamp": datetime.fromtimestamp(ts[i]).isoformat(),
"value": float(vals[i]),
"is_anomaly": int(pred_labels[i]) == -1,
"anomaly_score": float(scores[i]),
"metric": meta
})
# Update Prometheus metrics
instance = meta.get("instance", "unknown")
job = meta.get("job", "unknown")
metric_key = promql[:50] # Truncate for label safety
ANOMALY_SCORE.labels(
metric_name=metric_key,
instance=instance,
job=job
).set(scores[i])
ANOMALY_FLAG.labels(
metric_name=metric_key,
instance=instance,
job=job
).set(1 if pred_labels[i] == -1 else 0)
anomalies = [p for p in predictions if p["is_anomaly"]]
return jsonify({
"model_name": model_name,
"total_samples": len(predictions),
"anomalies_detected": len(anomalies),
"anomaly_rate": len(anomalies) / max(len(predictions), 1),
"predictions": predictions[-100:], # Return last 100
"anomalies": anomalies[-20:] # Return last 20 anomalies
})
except Exception as e:
logger.exception("Prediction failed")
return jsonify({"error": str(e)}), 500
@app.route("/metrics")
def metrics():
"""Prometheus-compatible metrics endpoint."""
return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST}
if __name__ == "__main__":
os.makedirs(MODEL_DIR, exist_ok=True)
app.run(host='0.0.0.0', port=5000, debug=False)
# prom-ml-anomaly.service โ Systemd service file
[Unit]
Description=Prometheus ML Anomaly Detector
After=network.target
[Service]
Type=simple
User=prometheus
Group=prometheus
Environment="PROMETHEUS_URL=http://prometheus:9090"
Environment="MODEL_DIR=/var/lib/anomaly_models"
Environment="OPENAI_API_KEY=${OPENAI_API_KEY}"
WorkingDirectory=/opt/prometheus-ml-detector
ExecStart=/opt/prometheus-ml-detector/venv/bin/python prometheus_anomaly_detector.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Predictive Alerting
Predictive alerting forecasts failures before they occur, enabling proactive remediation:
#!/usr/bin/env python3
"""
predictive_alerting.py โ Forecast metric values and alert on predicted thresholds.
Uses Prophet for time series forecasting.
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
try:
from prophet import Prophet
PROPHET_AVAILABLE = True
except ImportError:
PROPHET_AVAILABLE = False
class PredictiveAlerter:
"""Forecast metrics and alert on predicted threshold breaches."""
def __init__(self, forecast_hours: int = 4):
self.forecast_hours = forecast_hours
def forecast(self, timestamps: np.ndarray, values: np.ndarray) -> dict:
"""Forecast future values using Prophet."""
if not PROPHET_AVAILABLE:
# Fallback: simple linear extrapolation
return self._linear_forecast(timestamps, values)
df = pd.DataFrame({
'ds': pd.to_datetime(timestamps, unit='s'),
'y': values
})
model = Prophet(
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=False,
changepoint_prior_scale=0.05,
interval_width=0.8
)
model.fit(df)
future = model.make_future_dataframe(
periods=self.forecast_hours,
freq='H'
)
forecast = model.predict(future)
# Get predictions for future periods only
future_forecast = forecast[forecast['ds'] > df['ds'].max()]
return {
'timestamps': future_forecast['ds'].dt.tz_localize(None).astype(str).tolist(),
'predicted_values': future_forecast['yhat'].tolist(),
'lower_bound': future_forecast['yhat_lower'].tolist(),
'upper_bound': future_forecast['yhat_upper'].tolist(),
'method': 'prophet'
}
def _linear_forecast(self, timestamps: np.ndarray, values: np.ndarray) -> dict:
"""Simple linear trend forecast fallback."""
x = np.arange(len(values))
coeffs = np.polyfit(x[-100:], values[-100:], 1) # Linear fit on last 100 points
trend = np.poly1d(coeffs)
future_indices = np.arange(len(values), len(values) + self.forecast_hours)
predicted = trend(future_indices)
std_dev = np.std(values[-100:])
last_ts = timestamps[-1]
future_timestamps = [
datetime.fromtimestamp(last_ts) + timedelta(hours=i)
for i in range(1, self.forecast_hours + 1)
]
return {
'timestamps': [ts.isoformat() for ts in future_timestamps],
'predicted_values': predicted.tolist(),
'lower_bound': (predicted - 2 * std_dev).tolist(),
'upper_bound': (predicted + 2 * std_dev).tolist(),
'method': 'linear_trend'
}
def check_threshold(
self,
forecast: dict,
upper_threshold: Optional[float] = None,
lower_threshold: Optional[float] = None
) -> dict:
"""Check if forecasted values will breach thresholds."""
predictions = forecast['predicted_values']
upper_bounds = forecast['upper_bound']
lower_bounds = forecast['lower_bound']
alerts = []
if upper_threshold:
for i, (pred, upper) in enumerate(zip(predictions, upper_bounds)):
if upper > upper_threshold:
alerts.append({
'type': 'upper_threshold_predicted',
'threshold': upper_threshold,
'predicted_value': pred,
'predicted_at': forecast['timestamps'][i],
'confidence': 'high' if pred > upper_threshold else 'medium'
})
if lower_threshold:
for i, (pred, lower) in enumerate(zip(predictions, lower_bounds)):
if lower < lower_threshold:
alerts.append({
'type': 'lower_threshold_predicted',
'threshold': lower_threshold,
'predicted_value': pred,
'predicted_at': forecast['timestamps'][i],
'confidence': 'high' if pred < lower_threshold else 'medium'
})
return {
'will_breach': len(alerts) > 0,
'breach_time': alerts[0]['predicted_at'] if alerts else None,
'alerts': alerts,
'forecast': forecast
}
# Example: Predict disk full before it happens
# alerter = PredictiveAlerter(forecast_hours=6)
# forecast = alerter.forecast(disk_timestamps, disk_usage_percent)
# alert = alerter.check_threshold(forecast, upper_threshold=85)
# if alert['will_breach']:
# print(f"WARNING: Disk predicted to reach 85% at {alert['breach_time']}")
Alert Correlation and Deduplication
#!/usr/bin/env python3
"""
Alert correlator that groups related alerts using
service dependency graph and temporal proximity.
"""
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Set
@dataclass
class Alert:
id: str
metric_name: str
service: str
severity: str # critical, warning, info
message: str
timestamp: float
tags: Dict[str, str] = field(default_factory=dict)
class AlertCorrelator:
"""Correlate alerts using service topology and temporal proximity."""
def __init__(
self,
time_window_seconds: float = 300, # 5 minutes
service_dependencies: Dict[str, List[str]] = None
):
self.time_window = time_window_seconds
self.dependencies = service_dependencies or {}
self.active_alerts: Dict[str, Alert] = {}
self.incidents: List[Dict] = []
def add_service_dependency(self, service: str, dependencies: List[str]):
"""Add service dependency mapping."""
self.dependencies[service] = dependencies
def _are_related(self, alert1: Alert, alert2: Alert) -> bool:
"""Determine if two alerts are related."""
# Time proximity
time_diff = abs(alert1.timestamp - alert2.timestamp)
if time_diff > self.time_window:
return False
# Same service
if alert1.service == alert2.service:
return True
# Service dependency
deps1 = self.dependencies.get(alert1.service, [])
deps2 = self.dependencies.get(alert2.service, [])
if alert2.service in deps1 or alert1.service in deps2:
return True
# Same host/instance
if alert1.tags.get('instance') == alert2.tags.get('instance'):
return True
# Similar metric names (same prefix)
if alert1.metric_name.split('_')[0] == alert2.metric_name.split('_')[0]:
return True
return False
def correlate(self, new_alert: Alert) -> Dict:
"""Ingest a new alert and return correlation results."""
self.active_alerts[new_alert.id] = new_alert
# Find related alerts
related = [new_alert]
for alert in self.active_alerts.values():
if alert.id != new_alert.id and self._are_related(new_alert, alert):
related.append(alert)
if len(related) > 1:
# Form or update incident
incident = {
"id": f"incident-{int(time.time())}",
"alerts": [a.id for a in related],
"services": list(set(a.service for a in related)),
"primary_service": self._find_root_cause(related),
"severity": max(related, key=lambda a: ['info', 'warning', 'critical'].index(a.severity)).severity,
"start_time": min(a.timestamp for a in related),
"alert_count": len(related)
}
return {
"action": "correlate",
"incident": incident,
"message": f"Correlated {len(related)} alerts into incident {incident['id']}"
}
return {
"action": "new_alert",
"alert": new_alert,
"message": "New uncorrelated alert"
}
def _find_root_cause(self, alerts: List[Alert]) -> str:
"""Heuristic to find likely root cause service."""
# Services with most dependents are likely root causes
service_counts = defaultdict(int)
for alert in alerts:
service_counts[alert.service] += 1
# Prefer infrastructure services (databases, load balancers)
infra_services = ['database', 'cache', 'lb', 'network', 'storage']
for svc in service_counts:
if any(infra in svc.lower() for infra in infra_services):
return svc
return max(service_counts, key=service_counts.get)
# Example topology
# correlator = AlertCorrelator(time_window_seconds=300)
# correlator.add_service_dependency("web-api", ["database", "cache", "auth-service"])
# correlator.add_service_dependency("payment-service", ["database", "fraud-detection"])
# correlator.add_service_dependency("auth-service", ["database"])
Datadog Anomaly Detection Monitors
Datadog provides built-in anomaly detection that requires no model management:
# datadog_anomaly_monitor.tf โ Terraform for Datadog anomaly monitors
resource "datadog_monitor" "api_latency_anomaly" {
name = "API Latency Anomaly โ ${var.service_name}"
type = "metric alert"
message = <<-EOT
{{#is_alert}}
API latency for {{service.name}} is anomalous โ significantly higher than expected.
Current p99: {{value}}
Expected range: {{expected_bounds}}
Runbook: ${var.runbook_url}
@slack-sre-alerts
{{/is_alert}}
{{#is_recovery}}
API latency has returned to normal.
{{/is_recovery}}
EOT
# Datadog's anomaly() function uses seasonal decomposition
query = <<-EOT
avg(last_4h):anomalies(
avg:trace.http.request.duration{service:${var.service_name},env:production}.as_rate().rollup(avg, 60),
'basic',
2
) >= 1
EOT
monitor_thresholds {
critical = 1.0
warning = 0.5
}
notify_audit = false
require_full_window = false
notify_no_data = false
renotify_interval = 60
tags = concat(var.common_tags, [
"team:${var.team}",
"service:${var.service_name}",
"alert_type:anomaly"
])
}
# Anomaly detection for throughput drops (detects service degradation)
resource "datadog_monitor" "throughput_drop_anomaly" {
name = "Request Rate Drop โ ${var.service_name}"
type = "metric alert"
message = <<-EOT
{{#is_alert}}
Request rate for {{service.name}} is unusually low โ potential service degradation.
Current rate: {{value}} req/s
Expected range: {{expected_bounds}}
Check: Is the service healthy? Are upstream callers failing?
@slack-sre-alerts
{{/is_alert}}
EOT
# Outliers() detects when a group member deviates from peers
query = <<-EOT
avg(last_1h):outliers(
avg:trace.http.request.hits{service:${var.service_name}} by {host},
'DBSCAN',
3
) >= 1
EOT
monitor_thresholds {
critical = 1.0
}
tags = concat(var.common_tags, [
"team:${var.team}",
"service:${var.service_name}",
"alert_type:outlier"
])
}
# Forecast-based monitor โ alert when forecast shows problem
resource "datadog_monitor" "disk_forecast" {
name = "Disk Space Forecast โ Critical"
type = "metric alert"
message = <<-EOT
{{#is_alert}}
Disk usage on {{host.name}} is forecasted to exceed 90% within 24 hours.
Current: {{value}}%
@pagerduty-SRE-oncall
{{/is_alert}}
EOT
# forecast() predicts future values
query = <<-EOT
avg(last_1d):forecast(
avg:system.disk.used{env:production} by {host} / avg:system.disk.total{env:production} by {host} * 100,
'linear',
1,
interval='60m',
history='1d',
model='linear'
) >= 90
EOT
monitor_thresholds {
critical = 90
warning = 80
}
tags = concat(var.common_tags, ["alert_type:forecast"])
}
Dynamic Threshold Configuration
Configure dynamic thresholds based on time-of-day and day-of-week patterns:
#!/usr/bin/env python3
"""
dynamic_thresholds.py โ Generate time-aware thresholds based on
historical metric patterns.
"""
import numpy as np
import pandas as pd
from datetime import datetime
class DynamicThresholdGenerator:
"""Generate context-aware thresholds based on historical patterns."""
def __init__(self, historical_data: pd.DataFrame):
"""
Args:
historical_data: DataFrame with columns [timestamp, value]
"""
self.df = historical_data.copy()
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
self.df['hour'] = self.df['timestamp'].dt.hour
self.df['dayofweek'] = self.df['timestamp'].dt.dayofweek
self.baseline = self._compute_baseline()
def _compute_baseline(self) -> pd.DataFrame:
"""Compute baseline statistics for each hour-day combination."""
baseline = self.df.groupby(['dayofweek', 'hour']).agg({
'value': ['mean', 'std', 'median', 'count']
}).reset_index()
baseline.columns = ['dayofweek', 'hour', 'mean', 'std', 'median', 'count']
# Fill missing combinations with global stats
global_mean = self.df['value'].mean()
global_std = self.df['value'].std()
baseline['mean'] = baseline['mean'].fillna(global_mean)
baseline['std'] = baseline['std'].fillna(global_std)
return baseline
def get_threshold(
self,
timestamp: datetime,
sigma_multiplier: float = 3.0
) -> dict:
"""Get dynamic threshold for a specific timestamp."""
dow = timestamp.dayofweek
hour = timestamp.hour
row = self.baseline[
(self.baseline['dayofweek'] == dow) &
(self.baseline['hour'] == hour)
]
if row.empty:
return {
'upper': self.df['value'].mean() + sigma_multiplier * self.df['value'].std(),
'lower': self.df['value'].mean() - sigma_multiplier * self.df['value'].std(),
'expected': self.df['value'].mean(),
'method': 'global_fallback'
}
mean_val = row['mean'].values[0]
std_val = row['std'].values[0]
return {
'upper': mean_val + sigma_multiplier * std_val,
'lower': max(0, mean_val - sigma_multiplier * std_val),
'expected': mean_val,
'method': 'hourly_baseline'
}
def evaluate(self, timestamp: datetime, value: float, sigma: float = 3.0) -> dict:
"""Evaluate a value against dynamic threshold."""
threshold = self.get_threshold(timestamp, sigma)
is_anomaly = value > threshold['upper'] or value < threshold['lower']
deviation = abs(value - threshold['expected']) / threshold['expected'] if threshold['expected'] > 0 else 0
return {
'timestamp': timestamp.isoformat(),
'value': value,
'is_anomaly': is_anomaly,
'upper_threshold': threshold['upper'],
'lower_threshold': threshold['lower'],
'expected_value': threshold['expected'],
'deviation_pct': deviation * 100,
'method': threshold['method']
}
# Example:
# df = pd.DataFrame({'timestamp': timestamps, 'value': values})
# dt = DynamicThresholdGenerator(df)
# result = dt.evaluate(datetime.now(), current_value, sigma=2.5)
# if result['is_anomaly']:
# print(f"Anomaly: {result['value']} (expected: {result['expected_value']:.1f})")
Reducing MTTD with ML-Based Detection
| Detection Method | MTTD | False Positive Rate | Coverage |
|---|---|---|---|
| Static thresholds | 15-60 min | 30-50% | Known patterns only |
| Simple statistical (z-score) | 5-15 min | 15-25% | Univariate anomalies |
| Isolation Forest | 2-10 min | 5-15% | Multi-variate patterns |
| LSTM autoencoder | 1-5 min | 5-10% | Complex temporal patterns |
| Ensemble (multiple methods) | 1-3 min | <5% | Broad with high precision |