Files
James Bland befb8fbaeb feat: initial Claude Code configuration scaffold
Comprehensive Claude Code guidance system with:

- 5 agents: tdd-guardian, code-reviewer, security-scanner, refactor-scan, dependency-audit
- 18 skills covering languages (Python, TypeScript, Rust, Go, Java, C#),
  infrastructure (AWS, Azure, GCP, Terraform, Ansible, Docker/K8s, Database, CI/CD),
  testing (TDD, UI, Browser), and patterns (Monorepo, API Design, Observability)
- 3 hooks: secret detection, auto-formatting, TDD git pre-commit
- Strict TDD enforcement with 80%+ coverage requirements
- Multi-model strategy: Opus for planning, Sonnet for execution (opusplan)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 15:47:34 -05:00

13 KiB

name, description
name description
database-patterns Database design patterns, migrations with Alembic/Prisma, and query optimization. Use when working with SQL/NoSQL databases or schema migrations.

Database Patterns Skill

Schema Migrations

Alembic (Python/SQLAlchemy)

Setup

# Initialize Alembic
alembic init alembic

# Configure alembic.ini
sqlalchemy.url = postgresql://user:pass@localhost/myapp

alembic/env.py Configuration

from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os

# Import your models
from app.models import Base

config = context.config

# Override with environment variable
config.set_main_option(
    "sqlalchemy.url",
    os.environ.get("DATABASE_URL", config.get_main_option("sqlalchemy.url"))
)

target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Migration Commands

# Create migration from model changes
alembic revision --autogenerate -m "add users table"

# Create empty migration
alembic revision -m "add custom index"

# Apply migrations
alembic upgrade head

# Rollback one migration
alembic downgrade -1

# Rollback to specific revision
alembic downgrade abc123

# Show current revision
alembic current

# Show migration history
alembic history --verbose

Migration Best Practices

# alembic/versions/001_add_users_table.py
"""Add users table

Revision ID: abc123
Revises:
Create Date: 2024-01-15 10:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa

revision: str = 'abc123'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    op.create_table(
        'users',
        sa.Column('id', sa.UUID(), nullable=False),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('name', sa.String(100), nullable=False),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
        sa.PrimaryKeyConstraint('id'),
    )
    # Create index separately for clarity
    op.create_index('ix_users_email', 'users', ['email'], unique=True)


def downgrade() -> None:
    op.drop_index('ix_users_email', table_name='users')
    op.drop_table('users')

Data Migrations

"""Backfill user full names

Revision ID: def456
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column

def upgrade() -> None:
    # Define table structure for data migration
    users = table('users',
        column('id', sa.UUID),
        column('first_name', sa.String),
        column('last_name', sa.String),
        column('full_name', sa.String),
    )

    # Batch update for large tables
    connection = op.get_bind()
    connection.execute(
        users.update().values(
            full_name=users.c.first_name + ' ' + users.c.last_name
        )
    )


def downgrade() -> None:
    # Data migrations typically aren't reversible
    pass

Prisma (TypeScript)

Schema Definition

// prisma/schema.prisma
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

generator client {
  provider = "prisma-client-js"
}

model User {
  id        String   @id @default(uuid())
  email     String   @unique
  name      String
  role      Role     @default(USER)
  posts     Post[]
  createdAt DateTime @default(now()) @map("created_at")
  updatedAt DateTime @updatedAt @map("updated_at")

  @@map("users")
  @@index([email])
}

model Post {
  id        String   @id @default(uuid())
  title     String
  content   String?
  published Boolean  @default(false)
  author    User     @relation(fields: [authorId], references: [id])
  authorId  String   @map("author_id")
  createdAt DateTime @default(now()) @map("created_at")

  @@map("posts")
  @@index([authorId])
}

enum Role {
  USER
  ADMIN
}

Migration Commands

# Create migration from schema changes
npx prisma migrate dev --name add_users_table

# Apply migrations in production
npx prisma migrate deploy

# Reset database (development only)
npx prisma migrate reset

# Generate client
npx prisma generate

# View database
npx prisma studio

SQLAlchemy 2.0 Patterns

Model Definition

from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlalchemy import String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    role: Mapped[str] = mapped_column(String(20), default="user")
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=func.now())

    # Relationships
    orders: Mapped[list["Order"]] = relationship(back_populates="user")


class Order(Base):
    __tablename__ = "orders"

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
    user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
    total: Mapped[int]  # Store as cents
    status: Mapped[str] = mapped_column(String(20), default="pending")
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())

    # Relationships
    user: Mapped["User"] = relationship(back_populates="orders")
    items: Mapped[list["OrderItem"]] = relationship(back_populates="order")

Async Repository Pattern

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession


class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: UUID) -> User | None:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> User | None:
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def list_with_orders(
        self,
        limit: int = 20,
        offset: int = 0
    ) -> list[User]:
        result = await self.session.execute(
            select(User)
            .options(selectinload(User.orders))
            .limit(limit)
            .offset(offset)
        )
        return list(result.scalars().all())

    async def create(self, user: User) -> User:
        self.session.add(user)
        await self.session.flush()
        return user

    async def update(self, user: User) -> User:
        await self.session.flush()
        return user

    async def delete(self, user: User) -> None:
        await self.session.delete(user)
        await self.session.flush()

Query Optimization

Indexing Strategies

-- Primary lookup patterns
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);

-- Composite indexes (order matters!)
CREATE INDEX idx_orders_user_status ON orders(user_id, status);

-- Partial indexes
CREATE INDEX idx_orders_pending ON orders(user_id) WHERE status = 'pending';

-- Covering indexes
CREATE INDEX idx_users_email_name ON users(email) INCLUDE (name);

N+1 Query Prevention

# BAD - N+1 queries
users = await session.execute(select(User))
for user in users.scalars():
    print(user.orders)  # Each access triggers a query!

# GOOD - Eager loading
from sqlalchemy.orm import selectinload, joinedload

# Use selectinload for collections
users = await session.execute(
    select(User).options(selectinload(User.orders))
)

# Use joinedload for single relations
orders = await session.execute(
    select(Order).options(joinedload(Order.user))
)

Pagination

from sqlalchemy import select, func


async def paginate_users(
    session: AsyncSession,
    page: int = 1,
    page_size: int = 20,
) -> dict:
    # Count total
    count_query = select(func.count()).select_from(User)
    total = (await session.execute(count_query)).scalar_one()

    # Fetch page
    offset = (page - 1) * page_size
    query = select(User).limit(page_size).offset(offset).order_by(User.created_at.desc())
    result = await session.execute(query)
    users = list(result.scalars().all())

    return {
        "items": users,
        "total": total,
        "page": page,
        "page_size": page_size,
        "has_more": offset + len(users) < total,
    }

NoSQL Patterns (MongoDB)

Document Design

from pydantic import BaseModel, Field
from datetime import datetime
from bson import ObjectId


class PyObjectId(ObjectId):
    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid ObjectId")
        return ObjectId(v)


class UserDocument(BaseModel):
    id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
    email: str
    name: str
    # Embed frequently accessed data
    profile: dict = {}
    # Reference for large/changing data
    order_ids: list[str] = []
    created_at: datetime = Field(default_factory=datetime.utcnow)

    class Config:
        populate_by_name = True
        json_encoders = {ObjectId: str}

MongoDB with Motor (Async)

from motor.motor_asyncio import AsyncIOMotorClient


class MongoUserRepository:
    def __init__(self, client: AsyncIOMotorClient, db_name: str):
        self.collection = client[db_name].users

    async def get_by_id(self, user_id: str) -> dict | None:
        return await self.collection.find_one({"_id": ObjectId(user_id)})

    async def create(self, user: UserDocument) -> str:
        result = await self.collection.insert_one(user.dict(by_alias=True))
        return str(result.inserted_id)

    async def find_by_email_domain(self, domain: str) -> list[dict]:
        cursor = self.collection.find(
            {"email": {"$regex": f"@{domain}$"}},
            {"email": 1, "name": 1}  # Projection
        ).limit(100)
        return await cursor.to_list(length=100)

Migration Safety

Zero-Downtime Migration Pattern

# Step 1: Add new column (nullable)
def upgrade_step1():
    op.add_column('users', sa.Column('full_name', sa.String(200), nullable=True))

# Step 2: Backfill data (separate deployment)
def upgrade_step2():
    # Run as background job, not in migration
    pass

# Step 3: Make column required (after backfill complete)
def upgrade_step3():
    op.alter_column('users', 'full_name', nullable=False)

# Step 4: Remove old columns (after app updated)
def upgrade_step4():
    op.drop_column('users', 'first_name')
    op.drop_column('users', 'last_name')

Pre-Migration Checklist

  • Backup database before migration
  • Test migration on copy of production data
  • Check migration doesn't lock tables for too long
  • Ensure rollback script works
  • Plan for zero-downtime if needed
  • Coordinate with application deployments

Commands

# Alembic
alembic upgrade head           # Apply all migrations
alembic downgrade -1           # Rollback one
alembic history                # Show history
alembic current                # Show current version

# Prisma
npx prisma migrate dev         # Development migration
npx prisma migrate deploy      # Production migration
npx prisma db push             # Push schema without migration

# PostgreSQL
pg_dump -Fc mydb > backup.dump # Backup
pg_restore -d mydb backup.dump # Restore
psql -d mydb -f migration.sql  # Run SQL file