AI Log Analysis
AI-powered log analysis uses natural language processing and machine learning to extract insights from massive log volumes, enabling faster troubleshooting and proactive detection. This guide provides a complete, production-ready implementation using OpenAI embeddings, vector similarity search, and Streamlit โ the same approach I have successfully deployed to reduce log investigation time from hours to seconds.
Challenges of Traditional Log Analysis
Traditional log analysis relies on keyword search, grep, and structured queries. These approaches break down at scale:
| Challenge | Traditional Approach | AI-Powered Approach |
|---|---|---|
| Volume | Grep over GBs of logs is slow and I/O intensive | Vector search is sub-second regardless of log volume |
| Semantic Search | Exact keyword match misses variations ("timeout" vs "timed out") | Embeddings capture semantic meaning; finds conceptually similar logs |
| Unstructured Data | Requires pre-defined parsing rules per log format | LLM understands context without custom parsers |
| Pattern Discovery | Manual log browsing to find patterns | Automatic clustering and pattern extraction |
| Anomaly Detection | Static thresholds on known patterns | Unsupervised learning finds unknown patterns |
| Root Cause | Engineer manually correlates logs across services | Semantic similarity connects related events automatically |
LLM-Based Log Search Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AI Log Analysis Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Log Sources Ingestion Pipeline โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โ
โ โ App Logs โโโโโโโโโโโโโโโโโบ โ Log Parser โ โ
โ โ (JSON) โ โ (Multi-format) โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโโ โ
โ โโโโโโโโโโโโ โ โ
โ โ Syslog โโโโโโโโโโโโโโโโโโโโโโโโโโโโบโ โ
โ โโโโโโโโโโโโ โผ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Apache โโโโโโโโโโโโโโโโโบ โ Chunk & Split โ โ
โ โ /nginx โ โ (Token-aware) โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโโ โ
โ โโโโโโโโโโโโ โ โ
โ โK8s Logs โโโโโโโโโโโโโโโโโโโโโโโโโโโโบโ โ
โ โโโโโโโโโโโโ โผ โ
โ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Embedding Model โ โ
โ โ (text-embedding- โ โ
โ โ 3-small) โ โ
โ โโโโโโโโโโฌโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ User Query โโโโโโโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโโ โ Vector Database โ โ
โ โ"Why is โ โโโโโโโโโโโโ โ (Elasticsearch โ โ
โ โ payment โโโโบโ Query โโโโบโ / OpenSearch / โ โ
โ โ failing?"โ โ Embedder โ โ ChromaDB) โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Similarity โ โ
โ โ Search (k-NN) โ โ
โ โโโโโโโโโโฌโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Results + โ โ
โ โ LLM Summary โ โ
โ โ (GPT-4 / Claude)โ โ
โ โโโโโโโโโโฌโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Streamlit UI โ โ
โ โ or API โ โ
โ โโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Vector Embeddings for Log Similarity
Embeddings transform text into high-dimensional vectors where semantically similar text is positioned closely. This enables searching logs by meaning rather than exact keyword matching.
Complete Working Implementation
This is a complete, production-ready AI log analysis application using the same approach as the ai-pdfsearch project (Streamlit + OpenAI embeddings), adapted for multi-format log files.
requirements.txt
streamlit>=1.28.0
openai>=1.0.0
elasticsearch>=8.11.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
numpy>=1.24.0
pandas>=2.0.0
scikit-learn>=1.3.0
log_parser.py โ Multi-Format Log Parser
#!/usr/bin/env python3
"""
Multi-format log parser supporting JSON, syslog, Apache, nginx, and Kubernetes logs.
Normalizes all formats into a consistent structured representation for embedding.
"""
import json
import re
from datetime import datetime
from dataclasses import dataclass
from typing import List, Optional, Iterator
from pathlib import Path
@dataclass
class LogEntry:
"""Normalized log entry for embedding and search."""
timestamp: Optional[str]
level: str # DEBUG, INFO, WARNING, ERROR, CRITICAL, UNKNOWN
message: str
source: str # service name, file name, or 'unknown'
raw: str # original log line
metadata: dict # format-specific extra fields
def to_embeddable_text(self) -> str:
"""Convert to text suitable for embedding."""
parts = []
if self.source and self.source != "unknown":
parts.append(f"[{self.source}]")
if self.level != "UNKNOWN":
parts.append(f"{self.level}:")
parts.append(self.message)
return " ".join(parts)
class LogParser:
"""Parse multiple log formats into normalized LogEntry objects."""
# Log level detection patterns
LEVEL_PATTERNS = {
'CRITICAL': r'\b(CRITICAL|FATAL|EMERGENCY|ALERT)\b',
'ERROR': r'\b(ERROR|ERR|SEVERE)\b',
'WARNING': r'\b(WARNING|WARN)\b',
'INFO': r'\b(INFO|INFORMATION)\b',
'DEBUG': r'\b(DEBUG|TRACE|FINEST)\b',
}
# Apache Combined Log Format
APACHE_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<time>[^\]]+)\]\s+'
r'"(?P<method>\S+)\s+(?P<path>\S+)\s+(?P<protocol>[^"]+)"\s+'
r'(?P<status>\d{3})\s+(?P<bytes>\S+)\s+'
r'"(?P<referer>[^"]*)"\s+"(?P<ua>[^"]*)"'
)
# Syslog format (RFC3164)
SYSLOG_PATTERN = re.compile(
r'^(?P<month>[A-Z][a-z]{2})\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})\s+'
r'(?P<host>\S+)\s+(?P<process>[^\[:]*)(?:\[(?P<pid>\d+)\])?:\s*(?P<message>.*)$'
)
# nginx error log
NGINX_ERROR_PATTERN = re.compile(
r'^(?P<time>\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+'
r'\[(?P<level>\w+)\]\s+\S+:\s+(?P<message>.*)$'
)
# Kubernetes log format
K8S_PATTERN = re.compile(
r'^(?P<level>[IWEF])(?P<month>\d{2})(?P<day>\d{2})\s+'
r'(?P<time>\d{2}:\d{2}:\d{2}\.\d+)\s+'
r'(?P<thread>\d+)\s+(?P<file>[^\]]+)\]\s+(?P<message>.*)$'
)
def __init__(self, source_name: str = "unknown"):
self.source_name = source_name
def _detect_level(self, message: str) -> str:
"""Detect log level from message text."""
message_upper = message.upper()
for level, pattern in self.LEVEL_PATTERNS.items():
if re.search(pattern, message_upper):
return level
return "UNKNOWN"
def _parse_timestamp(self, ts_str: str) -> Optional[str]:
"""Parse various timestamp formats to ISO format."""
formats = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%d %H:%M:%S,%f",
"%Y-%m-%d %H:%M:%S",
"%d/%b/%Y:%H:%M:%S %z",
"%Y/%m/%d %H:%M:%S",
]
for fmt in formats:
try:
dt = datetime.strptime(ts_str, fmt)
return dt.isoformat()
except ValueError:
continue
return ts_str if ts_str else None
def parse_json(self, line: str) -> Optional[LogEntry]:
"""Parse JSON-structured log line."""
try:
data = json.loads(line)
message = data.get('message', data.get('msg', data.get('log', '')))
if not message:
return None
timestamp = data.get('timestamp', data.get('ts', data.get('time', None)))
level = data.get('level', data.get('severity', self._detect_level(message)))
source = data.get('service', data.get('source', data.get('app', self.source_name)))
return LogEntry(
timestamp=self._parse_timestamp(timestamp) if timestamp else None,
level=level.upper() if level else self._detect_level(message),
message=message,
source=source,
raw=line,
metadata={k: v for k, v in data.items() if k not in ('message', 'msg', 'log', 'timestamp', 'ts', 'time', 'level', 'severity', 'service', 'source', 'app')}
)
except (json.JSONDecodeError, KeyError):
return None
def parse_syslog(self, line: str) -> Optional[LogEntry]:
"""Parse RFC3164 syslog format."""
match = self.SYSLOG_PATTERN.match(line)
if not match:
return None
gd = match.groupdict()
message = gd.get('message', '')
return LogEntry(
timestamp=f"{datetime.now().year}-{gd['month']}-{gd['day']}T{gd['time']}",
level=self._detect_level(message),
message=message,
source=gd.get('process', self.source_name).strip(),
raw=line,
metadata={"host": gd.get('host'), "pid": gd.get('pid')}
)
def parse_apache(self, line: str) -> Optional[LogEntry]:
"""Parse Apache Combined Log Format."""
match = self.APACHE_PATTERN.match(line)
if not match:
return None
gd = match.groupdict()
status = int(gd.get('status', 0))
level = 'ERROR' if status >= 500 else 'WARNING' if status >= 400 else 'INFO'
message = f"{gd['method']} {gd['path']} -> {gd['status']} ({gd['bytes']} bytes)"
return LogEntry(
timestamp=self._parse_timestamp(gd['time']),
level=level,
message=message,
source=f"apache-{gd.get('ip', 'unknown')}",
raw=line,
metadata={"ip": gd.get('ip'), "status": status, "method": gd.get('method'),
"referer": gd.get('referer'), "user_agent": gd.get('ua')}
)
def parse_nginx_error(self, line: str) -> Optional[LogEntry]:
"""Parse nginx error log format."""
match = self.NGINX_ERROR_PATTERN.match(line)
if not match:
return None
gd = match.groupdict()
return LogEntry(
timestamp=self._parse_timestamp(gd['time']),
level=gd.get('level', 'UNKNOWN').upper(),
message=gd.get('message', ''),
source=f"nginx-{self.source_name}",
raw=line,
metadata={}
)
def parse_line(self, line: str) -> Optional[LogEntry]:
"""Try all parsers, return first successful match."""
line = line.strip()
if not line:
return None
# Try JSON first (most common for modern apps)
result = self.parse_json(line)
if result:
return result
# Try nginx error
result = self.parse_nginx_error(line)
if result:
return result
# Try Apache
result = self.parse_apache(line)
if result:
return result
# Try syslog
result = self.parse_syslog(line)
if result:
return result
# Fallback: treat entire line as message
return LogEntry(
timestamp=None,
level=self._detect_level(line),
message=line,
source=self.source_name,
raw=line,
metadata={"parser": "fallback"}
)
def parse_file(self, file_path: str) -> Iterator[LogEntry]:
"""Parse all lines from a log file."""
path = Path(file_path)
source = path.stem
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
entry = self.parse_line(line)
if entry:
entry.source = source if entry.source == "unknown" else entry.source
yield entry
def parse_files(self, file_paths: List[str]) -> List[LogEntry]:
"""Parse multiple log files."""
entries = []
for fp in file_paths:
entries.extend(self.parse_file(fp))
return entries
embeddings_store.py โ Vector Store with Elasticsearch
#!/usr/bin/env python3
"""
Vector store for log embeddings using Elasticsearch dense_vector field type.
Supports OpenAI text-embedding-3-small for high-quality semantic search.
"""
import os
import time
from typing import List, Dict, Optional
from dataclasses import asdict
import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from openai import OpenAI
from log_parser import LogEntry
class LogEmbeddingsStore:
"""Store and search log entries using vector embeddings."""
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIM = 1536 # text-embedding-3-small dimension
INDEX_NAME = "logs-semantic-search"
def __init__(
self,
es_host: str = None,
es_api_key: str = None,
openai_api_key: str = None
):
self.openai_client = OpenAI(api_key=openai_api_key or os.environ.get("OPENAI_API_KEY"))
# Elasticsearch connection
self.es = Elasticsearch(
[es_host or os.environ.get("ES_HOST", "http://localhost:9200")],
api_key=es_api_key or os.environ.get("ES_API_KEY"),
verify_certs=False,
request_timeout=30
)
if not self.es.ping():
raise ConnectionError("Cannot connect to Elasticsearch")
def create_index(self, index_name: str = None) -> None:
"""Create Elasticsearch index with dense_vector mapping."""
index = index_name or self.INDEX_NAME
# Delete if exists (for demo; in production use index aliases)
if self.es.indices.exists(index=index):
self.es.indices.delete(index=index)
mapping = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index": {
"mapping": {
"total_fields": {"limit": 1000}
}
}
},
"mappings": {
"properties": {
"timestamp": {"type": "date", "ignore_malformed": True},
"level": {"type": "keyword"},
"message": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 32766}
}
},
"source": {"type": "keyword"},
"raw": {"type": "text", "index": False},
"embedding": {
"type": "dense_vector",
"dims": self.EMBEDDING_DIM,
"index": True,
"similarity": "cosine"
},
"metadata": {"type": "object", "enabled": True}
}
}
}
self.es.indices.create(index=index, body=mapping)
print(f"Created index: {index}")
def get_embedding(self, text: str) -> List[float]:
"""Get OpenAI embedding for text."""
response = self.openai_client.embeddings.create(
model=self.EMBEDDING_MODEL,
input=text[:8191] # OpenAI token limit
)
return response.data[0].embedding
def get_embeddings_batch(self, texts: List[str], batch_size: int = 100) -> List[List[float]]:
"""Get embeddings for multiple texts in batches."""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = self.openai_client.embeddings.create(
model=self.EMBEDDING_MODEL,
input=[t[:8191] for t in batch]
)
all_embeddings.extend([d.embedding for d in response.data])
time.sleep(0.1) # Rate limit protection
return all_embeddings
def index_logs(self, logs: List[LogEntry], index_name: str = None) -> int:
"""Index log entries with their embeddings."""
index = index_name or self.INDEX_NAME
# Generate embeddings in batches
texts = [log.to_embeddable_text() for log in logs]
print(f"Generating embeddings for {len(texts)} log entries...")
embeddings = self.get_embeddings_batch(texts)
# Prepare bulk indexing actions
actions = []
for log, embedding in zip(logs, embeddings):
doc = {
"timestamp": log.timestamp,
"level": log.level,
"message": log.message,
"source": log.source,
"raw": log.raw,
"embedding": embedding,
"metadata": log.metadata
}
actions.append({
"_index": index,
"_source": doc
})
# Bulk index
success, errors = bulk(self.es, actions, raise_on_error=False)
if errors:
print(f"Indexing errors: {len(errors)}")
for err in errors[:5]:
print(f" Error: {err}")
self.es.indices.refresh(index=index)
print(f"Indexed {success} log entries")
return success
def search(
self,
query: str,
top_k: int = 10,
level_filter: str = None,
source_filter: str = None,
index_name: str = None
) -> List[Dict]:
"""Search logs using semantic similarity."""
index = index_name or self.INDEX_NAME
# Get query embedding
query_embedding = self.get_embedding(query)
# Build Elasticsearch k-NN query
must_conditions = []
# Semantic search using k-NN
knn_query = {
"field": "embedding",
"query_vector": query_embedding,
"k": top_k,
"num_candidates": top_k * 10
}
# Optional filters
if level_filter:
must_conditions.append({"term": {"level": level_filter}})
if source_filter:
must_conditions.append({"term": {"source": source_filter}})
if must_conditions:
search_body = {
"knn": knn_query,
"query": {"bool": {"filter": must_conditions}},
"_source": ["timestamp", "level", "message", "source", "raw", "metadata"]
}
else:
search_body = {
"knn": knn_query,
"_source": ["timestamp", "level", "message", "source", "raw", "metadata"]
}
response = self.es.search(index=index, body=search_body, size=top_k)
results = []
for hit in response['hits']['hits']:
results.append({
"score": hit['_score'],
**hit['_source']
})
return results
def hybrid_search(
self,
query: str,
top_k: int = 10,
semantic_weight: float = 0.7,
keyword_weight: float = 0.3
) -> List[Dict]:
"""Combine semantic search with keyword search for best results."""
index = self.INDEX_NAME
query_embedding = self.get_embedding(query)
search_body = {
"query": {
"bool": {
"should": [
# Semantic component (k-NN as script_score)
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": query_embedding}
},
"boost": semantic_weight
}
},
# Keyword component
{
"multi_match": {
"query": query,
"fields": ["message^3", "source", "raw"],
"type": "best_fields",
"fuzziness": "AUTO",
"boost": keyword_weight
}
}
]
}
},
"_source": ["timestamp", "level", "message", "source", "raw", "metadata"],
"size": top_k
}
response = self.es.search(index=index, body=search_body)
results = []
for hit in response['hits']['hits']:
results.append({
"score": hit['_score'],
**hit['_source']
})
return results
def get_stats(self, index_name: str = None) -> Dict:
"""Get index statistics."""
index = index_name or self.INDEX_NAME
stats = self.es.indices.stats(index=index)
return {
"total_docs": stats['indices'][index]['total']['docs']['count'],
"size_mb": stats['indices'][index]['total']['store']['size_in_bytes'] / (1024 * 1024)
}
streamlit_app.py โ Log Search UI
#!/usr/bin/env python3
"""
Streamlit UI for AI-powered log search.
Provides natural language search, filtering, and LLM-powered summaries.
"""
import os
import streamlit as st
from openai import OpenAI
from log_parser import LogParser
from embeddings_store import LogEmbeddingsStore
# Page configuration
st.set_page_config(
page_title="AI Log Analyzer",
page_icon="๐",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize clients
@st.cache_resource
def get_store():
return LogEmbeddingsStore()
@st.cache_resource
def get_openai_client():
return OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
store = get_store()
openai_client = get_openai_client()
def summarize_results(query: str, results: list) -> str:
"""Use LLM to summarize search results."""
if not results:
return "No results found."
# Build context from top results
context_lines = []
for i, r in enumerate(results[:8], 1):
context_lines.append(
f"[{i}] {r.get('level', 'INFO')}: {r.get('message', '')[:300]}"
)
context = "\n".join(context_lines)
prompt = f"""You are an expert SRE analyzing system logs. A colleague searched for: "{query}"
Here are the most relevant log entries found:
{context}
Provide a concise summary (3-5 bullet points) of:
1. What appears to be happening based on these logs
2. The severity and scope of any issues
3. Recommended next steps for investigation
Be specific and actionable. If logs show errors, explain what type and potential causes."""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an expert Site Reliability Engineer who specializes in log analysis and incident response."},
{"role": "user", "content": prompt}
],
max_tokens=400,
temperature=0.3
)
return response.choices[0].message.content
# โโโ Sidebar โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
with st.sidebar:
st.title("๐ AI Log Analyzer")
st.markdown("""
**Natural language search** across your log files.
Type questions like:
- "Why are payments failing?"
- "Show database connection errors"
- "What happened at 3 AM?"
""")
st.header("๐ Data Management")
# File upload
uploaded_files = st.file_uploader(
"Upload log files",
type=["log", "txt", "json"],
accept_multiple_files=True
)
if uploaded_files and st.button("๐ Index Logs"):
with st.spinner("Parsing and indexing..."):
# Save uploaded files temporarily
temp_paths = []
for uploaded_file in uploaded_files:
temp_path = f"/tmp/{uploaded_file.name}"
with open(temp_path, "wb") as f:
f.write(uploaded_file.getvalue())
temp_paths.append(temp_path)
# Parse all files
parser = LogParser()
all_logs = parser.parse_files(temp_paths)
if not all_logs:
st.error("No parseable log entries found.")
else:
# Show parsing stats
levels = {}
sources = {}
for log in all_logs:
levels[log.level] = levels.get(log.level, 0) + 1
sources[log.source] = sources.get(log.source, 0) + 1
st.info(f"Parsed {len(all_logs)} log entries")
# Create index and store
store.create_index()
indexed = store.index_logs(all_logs)
st.success(f"โ
Indexed {indexed} entries")
# Cleanup
for p in temp_paths:
os.remove(p)
st.header("โ๏ธ Filters")
level_filter = st.selectbox(
"Log Level",
["All", "CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]
)
search_type = st.radio(
"Search Type",
["Semantic (AI)", "Hybrid (AI + Keywords)"]
)
top_k = st.slider("Results to show", 5, 50, 10)
# Index stats
try:
stats = store.get_stats()
st.header("๐ Index Stats")
st.metric("Indexed Entries", stats['total_docs'])
st.metric("Index Size", f"{stats['size_mb']:.1f} MB")
except Exception:
st.info("No index created yet. Upload and index logs to begin.")
# โโโ Main Content โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
st.title("Natural Language Log Search")
# Search bar
query = st.text_input(
"Search logs in plain English",
placeholder="e.g., 'database connection timeout errors last night'",
key="search_query"
)
col1, col2 = st.columns([3, 1])
with col2:
search_button = st.button("๐ Search", use_container_width=True, type="primary")
if search_button and query:
with st.spinner("Searching with AI..."):
# Execute search
level = None if level_filter == "All" else level_filter
if search_type == "Semantic (AI)":
results = store.search(
query=query,
top_k=top_k,
level_filter=level
)
else:
results = store.hybrid_search(
query=query,
top_k=top_k
)
# LLM summary
st.subheader("๐ค AI Summary")
summary = summarize_results(query, results)
st.markdown(summary)
st.divider()
# Results
st.subheader(f"๐ Results ({len(results)} found)")
for i, result in enumerate(results, 1):
level = result.get('level', 'INFO')
color_map = {
'CRITICAL': '๐ด', 'ERROR': '๐ ', 'WARNING': '๐ก',
'INFO': '๐ต', 'DEBUG': 'โช', 'UNKNOWN': 'โซ'
}
icon = color_map.get(level, 'โช')
with st.expander(f"{icon} [{i}] {level}: {result.get('message', '')[:120]}..."):
col_l, col_r = st.columns(2)
with col_l:
st.text(f"Source: {result.get('source', 'unknown')}")
st.text(f"Score: {result.get('score', 0):.3f}")
with col_r:
st.text(f"Time: {result.get('timestamp', 'N/A')}")
st.code(result.get('raw', result.get('message', '')), language="text")
if result.get('metadata'):
st.json(result['metadata'])
elif search_button and not query:
st.warning("Please enter a search query.")
# โโโ Quick Stats โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
st.divider()
st.subheader("๐ Log Overview")
if 'indexed' in dir() or True: # Always show placeholder
cols = st.columns(4)
with cols[0]:
st.metric("Total Logs", "โ", help="Upload logs to see stats")
with cols[1]:
st.metric("Error Rate", "โ")
with cols[2]:
st.metric("Unique Sources", "โ")
with cols[3]:
st.metric("Time Range", "โ")
main.py โ CLI Entry Point
#!/usr/bin/env python3
"""
CLI entry point for AI log analysis.
Usage:
python main.py index /var/log/app/*.log
python main.py search "database timeout errors"
python main.py search "payment failures" --level ERROR --top-k 20
streamlit run streamlit_app.py
"""
import argparse
import os
import sys
from pathlib import Path
from log_parser import LogParser
from embeddings_store import LogEmbeddingsStore
def cmd_index(args):
"""Index log files for search."""
store = LogEmbeddingsStore()
parser = LogParser()
# Expand glob patterns
files = []
for pattern in args.files:
path = Path(pattern)
if path.is_file():
files.append(str(path))
else:
files.extend([str(p) for p in Path('.').glob(pattern)])
if not files:
print("No files found matching patterns.")
return 1
print(f"Parsing {len(files)} files...")
logs = parser.parse_files(files)
print(f"Parsed {len(logs)} log entries")
if not logs:
print("No parseable entries found.")
return 1
# Show breakdown
levels = {}
for log in logs:
levels[log.level] = levels.get(log.level, 0) + 1
print("Log levels:", levels)
# Create index
store.create_index(args.index)
indexed = store.index_logs(logs, args.index)
print(f"Successfully indexed {indexed} entries.")
return 0
def cmd_search(args):
"""Search indexed logs."""
store = LogEmbeddingsStore()
if args.hybrid:
results = store.hybrid_search(
query=args.query,
top_k=args.top_k
)
else:
results = store.search(
query=args.query,
top_k=args.top_k,
level_filter=args.level,
index_name=args.index
)
print(f"\nFound {len(results)} results for: \"{args.query}\"\n")
for i, r in enumerate(results, 1):
print(f"[{i}] Score: {r['score']:.3f} | Level: {r.get('level', 'N/A')}")
print(f" Source: {r.get('source', 'unknown')} | Time: {r.get('timestamp', 'N/A')}")
print(f" {r.get('message', '')[:200]}")
print()
return 0
def main():
parser = argparse.ArgumentParser(description="AI Log Analysis Tool")
subparsers = parser.add_subparsers(dest='command')
# Index command
index_parser = subparsers.add_parser('index', help='Index log files')
index_parser.add_argument('files', nargs='+', help='Log file paths or glob patterns')
index_parser.add_argument('--index', default='logs-semantic-search', help='Index name')
# Search command
search_parser = subparsers.add_parser('search', help='Search indexed logs')
search_parser.add_argument('query', help='Natural language search query')
search_parser.add_argument('--top-k', type=int, default=10, help='Number of results')
search_parser.add_argument('--level', choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'])
search_parser.add_argument('--hybrid', action='store_true', help='Use hybrid search')
search_parser.add_argument('--index', default='logs-semantic-search', help='Index name')
# Stats command
stats_parser = subparsers.add_parser('stats', help='Show index statistics')
stats_parser.add_argument('--index', default='logs-semantic-search', help='Index name')
args = parser.parse_args()
if args.command == 'index':
return cmd_index(args)
elif args.command == 'search':
return cmd_search(args)
elif args.command == 'stats':
store = LogEmbeddingsStore()
stats = store.get_stats(args.index)
print(f"Index: {args.index}")
print(f" Documents: {stats['total_docs']}")
print(f" Size: {stats['size_mb']:.2f} MB")
return 0
else:
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main())
# Example commands:
# python main.py index /var/log/myapp/*.log
# python main.py search "database connection timeout" --level ERROR --top-k 20
# python main.py search "why is the API slow" --hybrid
# python main.py stats
Log Parsing with Grok Patterns
For Logstash/Elasticsearch ingestion pipelines, Grok patterns parse unstructured logs:
# grok_patterns.txt โ Common patterns for log ingestion
# Apache Combined Log Format
%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_request})" %{NUMBER:status} (?:%{NUMBER:bytes}|-) "%{DATA:referrer}" "%{DATA:user_agent}"
# Syslog (RFC3164)
%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}
# nginx access log
%{IPORHOST:remote_addr} - %{HTTPDUSER:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}" %{NUMBER:request_time}
# nginx error log
%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME} \[%{LOGLEVEL:level}\] %{DATA}: %{GREEDYDATA:message}
# JSON log (parse with json filter after grok match)
%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:json_message}
# Then use: json { source => "json_message" }
# Kubernetes pod log
%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}\s+\[(?[\w-]+)\]\s+(?[\w.]+):\s+%{GREEDYDATA:message}
# Application log with correlation ID
%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:service} %{UUID:trace_id} %{UUID:span_id} %{GREEDYDATA:message}
Log Clustering and Pattern Detection
#!/usr/bin/env python3
"""
Log clustering using vector embeddings and DBSCAN.
Groups similar log messages to identify patterns and reduce noise.
"""
import numpy as np
from sklearn.cluster import DBSCAN
from collections import Counter
from embeddings_store import LogEmbeddingsStore
def cluster_logs(store: LogEmbeddingsStore, eps: float = 0.15, min_samples: int = 5):
"""
Cluster log messages by semantic similarity.
Args:
store: Initialized LogEmbeddingsStore
eps: DBSCAN neighborhood distance (lower = tighter clusters)
min_samples: Minimum points to form a cluster
Returns:
dict: cluster_id -> list of log messages
"""
# Retrieve all logs with embeddings
response = store.es.search(
index=store.INDEX_NAME,
body={
"query": {"match_all": {}},
"_source": ["message", "level", "source", "timestamp"],
"size": 10000
}
)
hits = response['hits']['hits']
if not hits:
return {}
# Get embeddings for all messages
messages = [h['_source']['message'] for h in hits]
embeddings = store.get_embeddings_batch(messages[:1000]) # Batch limit
# DBSCAN clustering on embeddings
clustering = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine')
labels = clustering.fit_predict(embeddings)
# Group by cluster
clusters = {}
for i, label in enumerate(labels):
if i >= len(hits):
break
cluster_id = int(label)
if cluster_id not in clusters:
clusters[cluster_id] = []
clusters[cluster_id].append({
"message": hits[i]['_source']['message'],
"level": hits[i]['_source'].get('level', 'UNKNOWN'),
"source": hits[i]['_source'].get('source', 'unknown')
})
return clusters
def print_cluster_summary(clusters: dict):
"""Print summary of discovered log clusters."""
noise_count = len(clusters.get(-1, []))
valid_clusters = {k: v for k, v in clusters.items() if k != -1}
print(f"\n{'='*60}")
print(f"Log Clustering Results")
print(f"{'='*60}")
print(f"Total clusters found: {len(valid_clusters)}")
print(f"Noise points (unique logs): {noise_count}")
print(f"Compression ratio: {sum(len(v) for v in valid_clusters.values()) / max(sum(len(v) for v in valid_clusters.values()), 1):.1f}x\n")
for cluster_id, logs in sorted(valid_clusters.items(), key=lambda x: len(x[1]), reverse=True)[:10]:
levels = Counter(l['level'] for l in logs)
sources = Counter(l['source'] for l in logs)
representative = logs[0]['message'][:100]
print(f"Cluster {cluster_id}: {len(logs)} occurrences")
print(f" Levels: {dict(levels)}")
print(f" Sources: {dict(sources)}")
print(f" Pattern: {representative}...")
print()
# Example usage:
# store = LogEmbeddingsStore()
# clusters = cluster_logs(store, eps=0.2, min_samples=3)
# print_cluster_summary(clusters)
Anomaly Detection in Log Streams
#!/usr/bin/env python3
"""
Real-time anomaly detection in log streams using Isolation Forest
on log embedding vectors. Flags log messages that are semantically
unusual compared to historical patterns.
"""
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import deque
import time
from embeddings_store import LogEmbeddingsStore
from log_parser import LogParser, LogEntry
class LogAnomalyDetector:
"""Real-time anomaly detection for log streams."""
def __init__(
self,
store: LogEmbeddingsStore = None,
contamination: float = 0.01, # Expected % of anomalies
window_size: int = 1000, # Training window
novelty_threshold: float = 0.3 # Cosine distance threshold for novel patterns
):
self.store = store or LogEmbeddingsStore()
self.contamination = contamination
self.window_size = window_size
self.novelty_threshold = novelty_threshold
self.model = None
self.recent_embeddings = deque(maxlen=window_size)
self.is_trained = False
def train(self, log_entries: list = None):
"""Train the anomaly detector on historical logs."""
if log_entries:
texts = [log.to_embeddable_text() for log in log_entries]
embeddings = self.store.get_embeddings_batch(texts)
else:
# Fetch from Elasticsearch
response = self.store.es.search(
index=self.store.INDEX_NAME,
body={
"query": {"match_all": {}},
"_source": ["embedding"],
"size": self.window_size
}
)
embeddings = [h['_source']['embedding'] for h in response['hits']['hits']]
if len(embeddings) < 100:
print(f"Warning: Only {len(embeddings)} samples for training. Need at least 100.")
return
X = np.array(embeddings)
self.model = IsolationForest(
contamination=self.contamination,
random_state=42,
n_estimators=100
)
self.model.fit(X)
self.recent_embeddings.extend(embeddings)
self.is_trained = True
print(f"Trained anomaly detector on {len(embeddings)} samples")
def score(self, log_entry: LogEntry) -> dict:
"""Score a single log entry for anomaly."""
if not self.is_trained:
return {"is_anomaly": False, "score": 0, "reason": "model_not_trained"}
embedding = self.store.get_embedding(log_entry.to_embeddable_text())
embedding_array = np.array(embedding).reshape(1, -1)
# Isolation Forest score (negative = anomaly)
anomaly_score = self.model.score_samples(embedding_array)[0]
is_anomaly = self.model.predict(embedding_array)[0] == -1
# Novelty detection: distance from known patterns
if self.recent_embeddings:
recent_mean = np.mean(list(self.recent_embeddings), axis=0)
cosine_sim = np.dot(embedding, recent_mean) / (
np.linalg.norm(embedding) * np.linalg.norm(recent_mean)
)
is_novel = cosine_sim < (1 - self.novelty_threshold)
else:
is_novel = False
# Update rolling window
self.recent_embeddings.append(embedding)
return {
"is_anomaly": is_anomaly or is_novel,
"anomaly_score": float(anomaly_score),
"is_novel_pattern": is_novel,
"novelty_score": float(cosine_sim) if 'cosine_sim' in dir() else None,
"reason": "novel_pattern" if is_novel else ("isolation_forest" if is_anomaly else "normal")
}
def process_stream(self, log_entries: list, callback=None):
"""Process a stream of log entries and flag anomalies."""
anomalies = []
for entry in log_entries:
result = self.score(entry)
if result['is_anomaly']:
anomalies.append({
"log": entry,
"detection": result
})
if callback:
callback(entry, result)
return anomalies
# Example usage:
# detector = LogAnomalyDetector()
# detector.train() # Train on indexed logs
#
# new_logs = parser.parse_file("/var/log/app/current.log")
# for log in new_logs:
# result = detector.score(log)
# if result['is_anomaly']:
# print(f"ANOMALY: {log.message} (score: {result['anomaly_score']:.3f})")
Integration with Elasticsearch/OpenSearch
For production deployments, the vector store connects to existing Elasticsearch or OpenSearch clusters:
# docker-compose.yaml โ Local development stack
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
volumes:
es_data:
Cost Considerations for LLM-Based Analysis
| Component | Cost Factor | Optimization Strategy |
|---|---|---|
| OpenAI Embeddings | $0.02 per 1M tokens (text-embedding-3-small) | Batch API calls; cache embeddings; use local models for dev |
| OpenAI Chat (Summaries) | $0.15 per 1M input tokens (GPT-4o-mini) | Use mini for summaries; limit context to top 8 results |
| Elasticsearch | Compute + storage for vector index | Use index lifecycle management; archive old embeddings |
| Inference Compute | CPU/GPU for embedding generation | Use OpenAI API for production; local SentenceTransformers for dev |
Data Privacy and PII Handling
- Pre-filtering: Strip PII before embedding (credit card numbers, SSNs, emails, API keys).
- Self-hosted models: Use local embedding models (sentence-transformers/all-MiniLM-L6-v2) for sensitive environments.
- Private cloud LLMs: Deploy models on private infrastructure (AWS Bedrock with VPC, Azure OpenAI Service).
- Data classification: Only send logs tagged DataClassification=public or internal to external APIs.
#!/usr/bin/env python3
"""
PII sanitization for log data before sending to LLM APIs.
"""
import re
class LogSanitizer:
"""Remove PII and sensitive data from log messages."""
PATTERNS = {
'credit_card': (r'\b(?:\d{4}[-\s]?){3}\d{4}\b', '[CREDIT_CARD_REDACTED]'),
'ssn': (r'\b\d{3}-\d{2}-\d{4}\b', '[SSN_REDACTED]'),
'email': (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL_REDACTED]'),
'api_key': (r'\b(?:api[_-]?key|apikey|token)[\s]*[:=][\s]*[\w-]{10,}\b', '[API_KEY_REDACTED]', re.IGNORECASE),
'password': (r'\b(?:password|passwd|pwd)[\s]*[:=][\s]*\S+\b', '[PASSWORD_REDACTED]', re.IGNORECASE),
'ip_address': (r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP_REDACTED]'),
'phone': (r'\b\d{3}[-.]\d{3}[-.]\d{4}\b', '[PHONE_REDACTED]'),
}
@classmethod
def sanitize(cls, text: str) -> str:
"""Remove all PII patterns from text."""
for name, pattern_config in cls.PATTERNS.items():
if len(pattern_config) == 3:
pattern, replacement, flags = pattern_config
text = re.sub(pattern, replacement, text, flags=flags)
else:
pattern, replacement = pattern_config
text = re.sub(pattern, replacement, text)
return text
@classmethod
def sanitize_batch(cls, texts: list) -> list:
"""Sanitize multiple texts."""
return [cls.sanitize(t) for t in texts]
# Example:
# sanitized = LogSanitizer.sanitize("User john@example.com with SSN 123-45-6789 logged in")
# Result: "User [EMAIL_REDACTED] with [SSN_REDACTED] logged in"