Comprehensive Claude Code guidance system with: - 5 agents: tdd-guardian, code-reviewer, security-scanner, refactor-scan, dependency-audit - 18 skills covering languages (Python, TypeScript, Rust, Go, Java, C#), infrastructure (AWS, Azure, GCP, Terraform, Ansible, Docker/K8s, Database, CI/CD), testing (TDD, UI, Browser), and patterns (Monorepo, API Design, Observability) - 3 hooks: secret detection, auto-formatting, TDD git pre-commit - Strict TDD enforcement with 80%+ coverage requirements - Multi-model strategy: Opus for planning, Sonnet for execution (opusplan) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
13 KiB
13 KiB
name, description
| name | description |
|---|---|
| database-patterns | Database design patterns, migrations with Alembic/Prisma, and query optimization. Use when working with SQL/NoSQL databases or schema migrations. |
Database Patterns Skill
Schema Migrations
Alembic (Python/SQLAlchemy)
Setup
# Initialize Alembic
alembic init alembic
# Configure alembic.ini
sqlalchemy.url = postgresql://user:pass@localhost/myapp
alembic/env.py Configuration
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os
# Import your models
from app.models import Base
config = context.config
# Override with environment variable
config.set_main_option(
"sqlalchemy.url",
os.environ.get("DATABASE_URL", config.get_main_option("sqlalchemy.url"))
)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Migration Commands
# Create migration from model changes
alembic revision --autogenerate -m "add users table"
# Create empty migration
alembic revision -m "add custom index"
# Apply migrations
alembic upgrade head
# Rollback one migration
alembic downgrade -1
# Rollback to specific revision
alembic downgrade abc123
# Show current revision
alembic current
# Show migration history
alembic history --verbose
Migration Best Practices
# alembic/versions/001_add_users_table.py
"""Add users table
Revision ID: abc123
Revises:
Create Date: 2024-01-15 10:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = 'abc123'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
'users',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id'),
)
# Create index separately for clarity
op.create_index('ix_users_email', 'users', ['email'], unique=True)
def downgrade() -> None:
op.drop_index('ix_users_email', table_name='users')
op.drop_table('users')
Data Migrations
"""Backfill user full names
Revision ID: def456
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
def upgrade() -> None:
# Define table structure for data migration
users = table('users',
column('id', sa.UUID),
column('first_name', sa.String),
column('last_name', sa.String),
column('full_name', sa.String),
)
# Batch update for large tables
connection = op.get_bind()
connection.execute(
users.update().values(
full_name=users.c.first_name + ' ' + users.c.last_name
)
)
def downgrade() -> None:
# Data migrations typically aren't reversible
pass
Prisma (TypeScript)
Schema Definition
// prisma/schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
generator client {
provider = "prisma-client-js"
}
model User {
id String @id @default(uuid())
email String @unique
name String
role Role @default(USER)
posts Post[]
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("users")
@@index([email])
}
model Post {
id String @id @default(uuid())
title String
content String?
published Boolean @default(false)
author User @relation(fields: [authorId], references: [id])
authorId String @map("author_id")
createdAt DateTime @default(now()) @map("created_at")
@@map("posts")
@@index([authorId])
}
enum Role {
USER
ADMIN
}
Migration Commands
# Create migration from schema changes
npx prisma migrate dev --name add_users_table
# Apply migrations in production
npx prisma migrate deploy
# Reset database (development only)
npx prisma migrate reset
# Generate client
npx prisma generate
# View database
npx prisma studio
SQLAlchemy 2.0 Patterns
Model Definition
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlalchemy import String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
role: Mapped[str] = mapped_column(String(20), default="user")
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=func.now())
# Relationships
orders: Mapped[list["Order"]] = relationship(back_populates="user")
class Order(Base):
__tablename__ = "orders"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int] # Store as cents
status: Mapped[str] = mapped_column(String(20), default="pending")
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
# Relationships
user: Mapped["User"] = relationship(back_populates="orders")
items: Mapped[list["OrderItem"]] = relationship(back_populates="order")
Async Repository Pattern
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: UUID) -> User | None:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_with_orders(
self,
limit: int = 20,
offset: int = 0
) -> list[User]:
result = await self.session.execute(
select(User)
.options(selectinload(User.orders))
.limit(limit)
.offset(offset)
)
return list(result.scalars().all())
async def create(self, user: User) -> User:
self.session.add(user)
await self.session.flush()
return user
async def update(self, user: User) -> User:
await self.session.flush()
return user
async def delete(self, user: User) -> None:
await self.session.delete(user)
await self.session.flush()
Query Optimization
Indexing Strategies
-- Primary lookup patterns
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
-- Composite indexes (order matters!)
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Partial indexes
CREATE INDEX idx_orders_pending ON orders(user_id) WHERE status = 'pending';
-- Covering indexes
CREATE INDEX idx_users_email_name ON users(email) INCLUDE (name);
N+1 Query Prevention
# BAD - N+1 queries
users = await session.execute(select(User))
for user in users.scalars():
print(user.orders) # Each access triggers a query!
# GOOD - Eager loading
from sqlalchemy.orm import selectinload, joinedload
# Use selectinload for collections
users = await session.execute(
select(User).options(selectinload(User.orders))
)
# Use joinedload for single relations
orders = await session.execute(
select(Order).options(joinedload(Order.user))
)
Pagination
from sqlalchemy import select, func
async def paginate_users(
session: AsyncSession,
page: int = 1,
page_size: int = 20,
) -> dict:
# Count total
count_query = select(func.count()).select_from(User)
total = (await session.execute(count_query)).scalar_one()
# Fetch page
offset = (page - 1) * page_size
query = select(User).limit(page_size).offset(offset).order_by(User.created_at.desc())
result = await session.execute(query)
users = list(result.scalars().all())
return {
"items": users,
"total": total,
"page": page,
"page_size": page_size,
"has_more": offset + len(users) < total,
}
NoSQL Patterns (MongoDB)
Document Design
from pydantic import BaseModel, Field
from datetime import datetime
from bson import ObjectId
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
class UserDocument(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
email: str
name: str
# Embed frequently accessed data
profile: dict = {}
# Reference for large/changing data
order_ids: list[str] = []
created_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
MongoDB with Motor (Async)
from motor.motor_asyncio import AsyncIOMotorClient
class MongoUserRepository:
def __init__(self, client: AsyncIOMotorClient, db_name: str):
self.collection = client[db_name].users
async def get_by_id(self, user_id: str) -> dict | None:
return await self.collection.find_one({"_id": ObjectId(user_id)})
async def create(self, user: UserDocument) -> str:
result = await self.collection.insert_one(user.dict(by_alias=True))
return str(result.inserted_id)
async def find_by_email_domain(self, domain: str) -> list[dict]:
cursor = self.collection.find(
{"email": {"$regex": f"@{domain}$"}},
{"email": 1, "name": 1} # Projection
).limit(100)
return await cursor.to_list(length=100)
Migration Safety
Zero-Downtime Migration Pattern
# Step 1: Add new column (nullable)
def upgrade_step1():
op.add_column('users', sa.Column('full_name', sa.String(200), nullable=True))
# Step 2: Backfill data (separate deployment)
def upgrade_step2():
# Run as background job, not in migration
pass
# Step 3: Make column required (after backfill complete)
def upgrade_step3():
op.alter_column('users', 'full_name', nullable=False)
# Step 4: Remove old columns (after app updated)
def upgrade_step4():
op.drop_column('users', 'first_name')
op.drop_column('users', 'last_name')
Pre-Migration Checklist
- Backup database before migration
- Test migration on copy of production data
- Check migration doesn't lock tables for too long
- Ensure rollback script works
- Plan for zero-downtime if needed
- Coordinate with application deployments
Commands
# Alembic
alembic upgrade head # Apply all migrations
alembic downgrade -1 # Rollback one
alembic history # Show history
alembic current # Show current version
# Prisma
npx prisma migrate dev # Development migration
npx prisma migrate deploy # Production migration
npx prisma db push # Push schema without migration
# PostgreSQL
pg_dump -Fc mydb > backup.dump # Backup
pg_restore -d mydb backup.dump # Restore
psql -d mydb -f migration.sql # Run SQL file