--- name: observability description: Logging, metrics, and tracing patterns for application observability. Use when implementing monitoring, debugging, or production visibility. --- # Observability Skill ## Three Pillars 1. **Logs** - Discrete events with context 2. **Metrics** - Aggregated measurements over time 3. **Traces** - Request flow across services ## Structured Logging ### Python (structlog) ```python import structlog from structlog.types import Processor def configure_logging(json_output: bool = True) -> None: """Configure structured logging.""" processors: list[Processor] = [ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), ] if json_output: processors.append(structlog.processors.JSONRenderer()) else: processors.append(structlog.dev.ConsoleRenderer()) structlog.configure( processors=processors, wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), context_class=dict, logger_factory=structlog.PrintLoggerFactory(), cache_logger_on_first_use=True, ) # Usage logger = structlog.get_logger() # Add context that persists across log calls structlog.contextvars.bind_contextvars( request_id="req-123", user_id="user-456", ) logger.info("order_created", order_id="order-789", total=150.00) # {"event": "order_created", "order_id": "order-789", "total": 150.0, "request_id": "req-123", "user_id": "user-456", "level": "info", "timestamp": "2024-01-15T10:30:00Z"} logger.error("payment_failed", order_id="order-789", error="insufficient_funds") ``` ### TypeScript (pino) ```typescript import pino from 'pino'; const logger = pino({ level: process.env.LOG_LEVEL || 'info', formatters: { level: (label) => ({ level: label }), }, timestamp: pino.stdTimeFunctions.isoTime, redact: ['password', 'token', 'authorization'], }); // Create child logger with bound context const requestLogger = logger.child({ requestId: 'req-123', userId: 'user-456', }); requestLogger.info({ orderId: 'order-789', total: 150.0 }, 'order_created'); requestLogger.error({ orderId: 'order-789', error: 'insufficient_funds' }, 'payment_failed'); // Express middleware import { randomUUID } from 'crypto'; const loggingMiddleware = (req, res, next) => { const requestId = req.headers['x-request-id'] || randomUUID(); req.log = logger.child({ requestId, method: req.method, path: req.path, userAgent: req.headers['user-agent'], }); const startTime = Date.now(); res.on('finish', () => { req.log.info({ statusCode: res.statusCode, durationMs: Date.now() - startTime, }, 'request_completed'); }); next(); }; ``` ### Log Levels | Level | When to Use | |-------|-------------| | `error` | Failures requiring attention | | `warn` | Unexpected but handled situations | | `info` | Business events (order created, user logged in) | | `debug` | Technical details for debugging | | `trace` | Very detailed tracing (rarely used in prod) | ## Metrics ### Python (prometheus-client) ```python from prometheus_client import Counter, Histogram, Gauge, start_http_server import time # Define metrics REQUEST_COUNT = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) REQUEST_LATENCY = Histogram( 'http_request_duration_seconds', 'HTTP request latency', ['method', 'endpoint'], buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] ) ACTIVE_CONNECTIONS = Gauge( 'active_connections', 'Number of active connections' ) ORDERS_PROCESSED = Counter( 'orders_processed_total', 'Total orders processed', ['status'] # success, failed ) # Usage def process_request(method: str, endpoint: str): ACTIVE_CONNECTIONS.inc() start_time = time.time() try: # Process request... REQUEST_COUNT.labels(method=method, endpoint=endpoint, status='200').inc() except Exception: REQUEST_COUNT.labels(method=method, endpoint=endpoint, status='500').inc() raise finally: REQUEST_LATENCY.labels(method=method, endpoint=endpoint).observe( time.time() - start_time ) ACTIVE_CONNECTIONS.dec() # FastAPI middleware from fastapi import FastAPI, Request from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from starlette.responses import Response app = FastAPI() @app.middleware("http") async def metrics_middleware(request: Request, call_next): start_time = time.time() response = await call_next(request) REQUEST_COUNT.labels( method=request.method, endpoint=request.url.path, status=response.status_code ).inc() REQUEST_LATENCY.labels( method=request.method, endpoint=request.url.path ).observe(time.time() - start_time) return response @app.get("/metrics") async def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) ``` ### TypeScript (prom-client) ```typescript import { Registry, Counter, Histogram, Gauge, collectDefaultMetrics } from 'prom-client'; const register = new Registry(); collectDefaultMetrics({ register }); const httpRequestsTotal = new Counter({ name: 'http_requests_total', help: 'Total HTTP requests', labelNames: ['method', 'path', 'status'], registers: [register], }); const httpRequestDuration = new Histogram({ name: 'http_request_duration_seconds', help: 'HTTP request duration', labelNames: ['method', 'path'], buckets: [0.01, 0.05, 0.1, 0.5, 1, 5], registers: [register], }); // Express middleware const metricsMiddleware = (req, res, next) => { const end = httpRequestDuration.startTimer({ method: req.method, path: req.path }); res.on('finish', () => { httpRequestsTotal.inc({ method: req.method, path: req.path, status: res.statusCode }); end(); }); next(); }; // Metrics endpoint app.get('/metrics', async (req, res) => { res.set('Content-Type', register.contentType); res.end(await register.metrics()); }); ``` ### Key Metrics (RED Method) | Metric | Description | |--------|-------------| | **R**ate | Requests per second | | **E**rrors | Error rate (%) | | **D**uration | Latency (p50, p95, p99) | ### Key Metrics (USE Method for Resources) | Metric | Description | |--------|-------------| | **U**tilization | % time resource is busy | | **S**aturation | Queue depth, backlog | | **E**rrors | Error count | ## Distributed Tracing ### Python (OpenTelemetry) ```python from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor def configure_tracing(service_name: str, otlp_endpoint: str) -> None: """Configure OpenTelemetry tracing.""" resource = Resource.create({"service.name": service_name}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint)) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Auto-instrument libraries FastAPIInstrumentor.instrument() SQLAlchemyInstrumentor().instrument() HTTPXClientInstrumentor().instrument() # Manual instrumentation tracer = trace.get_tracer(__name__) async def process_order(order_id: str) -> dict: with tracer.start_as_current_span("process_order") as span: span.set_attribute("order.id", order_id) # Child span for validation with tracer.start_as_current_span("validate_order"): validated = await validate_order(order_id) # Child span for payment with tracer.start_as_current_span("process_payment") as payment_span: payment_span.set_attribute("payment.method", "card") result = await charge_payment(order_id) span.set_attribute("order.status", "completed") return result ``` ### TypeScript (OpenTelemetry) ```typescript import { NodeSDK } from '@opentelemetry/sdk-node'; import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'; import { Resource } from '@opentelemetry/resources'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; const sdk = new NodeSDK({ resource: new Resource({ [SemanticResourceAttributes.SERVICE_NAME]: 'my-service', }), traceExporter: new OTLPTraceExporter({ url: process.env.OTLP_ENDPOINT, }), instrumentations: [getNodeAutoInstrumentations()], }); sdk.start(); // Manual instrumentation import { trace, SpanStatusCode } from '@opentelemetry/api'; const tracer = trace.getTracer('my-service'); async function processOrder(orderId: string) { return tracer.startActiveSpan('process_order', async (span) => { try { span.setAttribute('order.id', orderId); await tracer.startActiveSpan('validate_order', async (validateSpan) => { await validateOrder(orderId); validateSpan.end(); }); const result = await tracer.startActiveSpan('process_payment', async (paymentSpan) => { paymentSpan.setAttribute('payment.method', 'card'); const res = await chargePayment(orderId); paymentSpan.end(); return res; }); span.setStatus({ code: SpanStatusCode.OK }); return result; } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: error.message }); span.recordException(error); throw error; } finally { span.end(); } }); } ``` ## Health Checks ```python from fastapi import FastAPI, Response from pydantic import BaseModel from enum import Enum class HealthStatus(str, Enum): HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" class ComponentHealth(BaseModel): name: str status: HealthStatus message: str | None = None class HealthResponse(BaseModel): status: HealthStatus version: str components: list[ComponentHealth] async def check_database() -> ComponentHealth: try: await db.execute("SELECT 1") return ComponentHealth(name="database", status=HealthStatus.HEALTHY) except Exception as e: return ComponentHealth(name="database", status=HealthStatus.UNHEALTHY, message=str(e)) async def check_redis() -> ComponentHealth: try: await redis.ping() return ComponentHealth(name="redis", status=HealthStatus.HEALTHY) except Exception as e: return ComponentHealth(name="redis", status=HealthStatus.DEGRADED, message=str(e)) @app.get("/health", response_model=HealthResponse) async def health_check(response: Response): components = await asyncio.gather( check_database(), check_redis(), ) # Overall status is worst component status if any(c.status == HealthStatus.UNHEALTHY for c in components): overall = HealthStatus.UNHEALTHY response.status_code = 503 elif any(c.status == HealthStatus.DEGRADED for c in components): overall = HealthStatus.DEGRADED else: overall = HealthStatus.HEALTHY return HealthResponse( status=overall, version="1.0.0", components=components, ) @app.get("/ready") async def readiness_check(): """Kubernetes readiness probe - can we serve traffic?""" # Check critical dependencies await check_database() return {"status": "ready"} @app.get("/live") async def liveness_check(): """Kubernetes liveness probe - is the process healthy?""" return {"status": "alive"} ``` ## Alerting Rules ```yaml # prometheus-rules.yaml groups: - name: application rules: # High error rate - alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }}" # High latency - alert: HighLatency expr: | histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1 for: 5m labels: severity: warning annotations: summary: "High latency detected" description: "p95 latency is {{ $value }}s" # Service down - alert: ServiceDown expr: up == 0 for: 1m labels: severity: critical annotations: summary: "Service is down" ``` ## Best Practices ### Logging - Use structured JSON logs - Include correlation/request IDs - Redact sensitive data - Use appropriate log levels - Don't log in hot paths (use sampling) ### Metrics - Use consistent naming conventions - Keep cardinality under control - Use histograms for latency (not averages) - Export business metrics alongside technical ones ### Tracing - Instrument at service boundaries - Propagate context across services - Sample appropriately in production - Add relevant attributes to spans