Comprehensive Claude Code guidance system with: - 5 agents: tdd-guardian, code-reviewer, security-scanner, refactor-scan, dependency-audit - 18 skills covering languages (Python, TypeScript, Rust, Go, Java, C#), infrastructure (AWS, Azure, GCP, Terraform, Ansible, Docker/K8s, Database, CI/CD), testing (TDD, UI, Browser), and patterns (Monorepo, API Design, Observability) - 3 hooks: secret detection, auto-formatting, TDD git pre-commit - Strict TDD enforcement with 80%+ coverage requirements - Multi-model strategy: Opus for planning, Sonnet for execution (opusplan) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
511 lines
13 KiB
Markdown
511 lines
13 KiB
Markdown
---
|
|
name: database-patterns
|
|
description: Database design patterns, migrations with Alembic/Prisma, and query optimization. Use when working with SQL/NoSQL databases or schema migrations.
|
|
---
|
|
|
|
# Database Patterns Skill
|
|
|
|
## Schema Migrations
|
|
|
|
### Alembic (Python/SQLAlchemy)
|
|
|
|
#### Setup
|
|
```bash
|
|
# Initialize Alembic
|
|
alembic init alembic
|
|
|
|
# Configure alembic.ini
|
|
sqlalchemy.url = postgresql://user:pass@localhost/myapp
|
|
```
|
|
|
|
#### alembic/env.py Configuration
|
|
```python
|
|
from logging.config import fileConfig
|
|
from sqlalchemy import engine_from_config, pool
|
|
from alembic import context
|
|
import os
|
|
|
|
# Import your models
|
|
from app.models import Base
|
|
|
|
config = context.config
|
|
|
|
# Override with environment variable
|
|
config.set_main_option(
|
|
"sqlalchemy.url",
|
|
os.environ.get("DATABASE_URL", config.get_main_option("sqlalchemy.url"))
|
|
)
|
|
|
|
target_metadata = Base.metadata
|
|
|
|
|
|
def run_migrations_offline() -> None:
|
|
"""Run migrations in 'offline' mode."""
|
|
url = config.get_main_option("sqlalchemy.url")
|
|
context.configure(
|
|
url=url,
|
|
target_metadata=target_metadata,
|
|
literal_binds=True,
|
|
dialect_opts={"paramstyle": "named"},
|
|
)
|
|
|
|
with context.begin_transaction():
|
|
context.run_migrations()
|
|
|
|
|
|
def run_migrations_online() -> None:
|
|
"""Run migrations in 'online' mode."""
|
|
connectable = engine_from_config(
|
|
config.get_section(config.config_ini_section, {}),
|
|
prefix="sqlalchemy.",
|
|
poolclass=pool.NullPool,
|
|
)
|
|
|
|
with connectable.connect() as connection:
|
|
context.configure(
|
|
connection=connection,
|
|
target_metadata=target_metadata,
|
|
)
|
|
|
|
with context.begin_transaction():
|
|
context.run_migrations()
|
|
|
|
|
|
if context.is_offline_mode():
|
|
run_migrations_offline()
|
|
else:
|
|
run_migrations_online()
|
|
```
|
|
|
|
#### Migration Commands
|
|
```bash
|
|
# Create migration from model changes
|
|
alembic revision --autogenerate -m "add users table"
|
|
|
|
# Create empty migration
|
|
alembic revision -m "add custom index"
|
|
|
|
# Apply migrations
|
|
alembic upgrade head
|
|
|
|
# Rollback one migration
|
|
alembic downgrade -1
|
|
|
|
# Rollback to specific revision
|
|
alembic downgrade abc123
|
|
|
|
# Show current revision
|
|
alembic current
|
|
|
|
# Show migration history
|
|
alembic history --verbose
|
|
```
|
|
|
|
#### Migration Best Practices
|
|
```python
|
|
# alembic/versions/001_add_users_table.py
|
|
"""Add users table
|
|
|
|
Revision ID: abc123
|
|
Revises:
|
|
Create Date: 2024-01-15 10:00:00.000000
|
|
"""
|
|
from typing import Sequence, Union
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
|
|
revision: str = 'abc123'
|
|
down_revision: Union[str, None] = None
|
|
branch_labels: Union[str, Sequence[str], None] = None
|
|
depends_on: Union[str, Sequence[str], None] = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
op.create_table(
|
|
'users',
|
|
sa.Column('id', sa.UUID(), nullable=False),
|
|
sa.Column('email', sa.String(255), nullable=False),
|
|
sa.Column('name', sa.String(100), nullable=False),
|
|
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
|
|
sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
)
|
|
# Create index separately for clarity
|
|
op.create_index('ix_users_email', 'users', ['email'], unique=True)
|
|
|
|
|
|
def downgrade() -> None:
|
|
op.drop_index('ix_users_email', table_name='users')
|
|
op.drop_table('users')
|
|
```
|
|
|
|
#### Data Migrations
|
|
```python
|
|
"""Backfill user full names
|
|
|
|
Revision ID: def456
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.sql import table, column
|
|
|
|
def upgrade() -> None:
|
|
# Define table structure for data migration
|
|
users = table('users',
|
|
column('id', sa.UUID),
|
|
column('first_name', sa.String),
|
|
column('last_name', sa.String),
|
|
column('full_name', sa.String),
|
|
)
|
|
|
|
# Batch update for large tables
|
|
connection = op.get_bind()
|
|
connection.execute(
|
|
users.update().values(
|
|
full_name=users.c.first_name + ' ' + users.c.last_name
|
|
)
|
|
)
|
|
|
|
|
|
def downgrade() -> None:
|
|
# Data migrations typically aren't reversible
|
|
pass
|
|
```
|
|
|
|
### Prisma (TypeScript)
|
|
|
|
#### Schema Definition
|
|
```prisma
|
|
// prisma/schema.prisma
|
|
datasource db {
|
|
provider = "postgresql"
|
|
url = env("DATABASE_URL")
|
|
}
|
|
|
|
generator client {
|
|
provider = "prisma-client-js"
|
|
}
|
|
|
|
model User {
|
|
id String @id @default(uuid())
|
|
email String @unique
|
|
name String
|
|
role Role @default(USER)
|
|
posts Post[]
|
|
createdAt DateTime @default(now()) @map("created_at")
|
|
updatedAt DateTime @updatedAt @map("updated_at")
|
|
|
|
@@map("users")
|
|
@@index([email])
|
|
}
|
|
|
|
model Post {
|
|
id String @id @default(uuid())
|
|
title String
|
|
content String?
|
|
published Boolean @default(false)
|
|
author User @relation(fields: [authorId], references: [id])
|
|
authorId String @map("author_id")
|
|
createdAt DateTime @default(now()) @map("created_at")
|
|
|
|
@@map("posts")
|
|
@@index([authorId])
|
|
}
|
|
|
|
enum Role {
|
|
USER
|
|
ADMIN
|
|
}
|
|
```
|
|
|
|
#### Migration Commands
|
|
```bash
|
|
# Create migration from schema changes
|
|
npx prisma migrate dev --name add_users_table
|
|
|
|
# Apply migrations in production
|
|
npx prisma migrate deploy
|
|
|
|
# Reset database (development only)
|
|
npx prisma migrate reset
|
|
|
|
# Generate client
|
|
npx prisma generate
|
|
|
|
# View database
|
|
npx prisma studio
|
|
```
|
|
|
|
## SQLAlchemy 2.0 Patterns
|
|
|
|
### Model Definition
|
|
```python
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from uuid import UUID, uuid4
|
|
from sqlalchemy import String, ForeignKey, func
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
|
|
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
|
|
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
|
|
name: Mapped[str] = mapped_column(String(100))
|
|
role: Mapped[str] = mapped_column(String(20), default="user")
|
|
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
|
|
updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=func.now())
|
|
|
|
# Relationships
|
|
orders: Mapped[list["Order"]] = relationship(back_populates="user")
|
|
|
|
|
|
class Order(Base):
|
|
__tablename__ = "orders"
|
|
|
|
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
|
|
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
|
|
total: Mapped[int] # Store as cents
|
|
status: Mapped[str] = mapped_column(String(20), default="pending")
|
|
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
|
|
|
|
# Relationships
|
|
user: Mapped["User"] = relationship(back_populates="orders")
|
|
items: Mapped[list["OrderItem"]] = relationship(back_populates="order")
|
|
```
|
|
|
|
### Async Repository Pattern
|
|
```python
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
class UserRepository:
|
|
def __init__(self, session: AsyncSession):
|
|
self.session = session
|
|
|
|
async def get_by_id(self, user_id: UUID) -> User | None:
|
|
result = await self.session.execute(
|
|
select(User).where(User.id == user_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_by_email(self, email: str) -> User | None:
|
|
result = await self.session.execute(
|
|
select(User).where(User.email == email)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def list_with_orders(
|
|
self,
|
|
limit: int = 20,
|
|
offset: int = 0
|
|
) -> list[User]:
|
|
result = await self.session.execute(
|
|
select(User)
|
|
.options(selectinload(User.orders))
|
|
.limit(limit)
|
|
.offset(offset)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
async def create(self, user: User) -> User:
|
|
self.session.add(user)
|
|
await self.session.flush()
|
|
return user
|
|
|
|
async def update(self, user: User) -> User:
|
|
await self.session.flush()
|
|
return user
|
|
|
|
async def delete(self, user: User) -> None:
|
|
await self.session.delete(user)
|
|
await self.session.flush()
|
|
```
|
|
|
|
## Query Optimization
|
|
|
|
### Indexing Strategies
|
|
```sql
|
|
-- Primary lookup patterns
|
|
CREATE INDEX idx_users_email ON users(email);
|
|
CREATE INDEX idx_orders_user_id ON orders(user_id);
|
|
|
|
-- Composite indexes (order matters!)
|
|
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
|
|
|
|
-- Partial indexes
|
|
CREATE INDEX idx_orders_pending ON orders(user_id) WHERE status = 'pending';
|
|
|
|
-- Covering indexes
|
|
CREATE INDEX idx_users_email_name ON users(email) INCLUDE (name);
|
|
```
|
|
|
|
### N+1 Query Prevention
|
|
```python
|
|
# BAD - N+1 queries
|
|
users = await session.execute(select(User))
|
|
for user in users.scalars():
|
|
print(user.orders) # Each access triggers a query!
|
|
|
|
# GOOD - Eager loading
|
|
from sqlalchemy.orm import selectinload, joinedload
|
|
|
|
# Use selectinload for collections
|
|
users = await session.execute(
|
|
select(User).options(selectinload(User.orders))
|
|
)
|
|
|
|
# Use joinedload for single relations
|
|
orders = await session.execute(
|
|
select(Order).options(joinedload(Order.user))
|
|
)
|
|
```
|
|
|
|
### Pagination
|
|
```python
|
|
from sqlalchemy import select, func
|
|
|
|
|
|
async def paginate_users(
|
|
session: AsyncSession,
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
) -> dict:
|
|
# Count total
|
|
count_query = select(func.count()).select_from(User)
|
|
total = (await session.execute(count_query)).scalar_one()
|
|
|
|
# Fetch page
|
|
offset = (page - 1) * page_size
|
|
query = select(User).limit(page_size).offset(offset).order_by(User.created_at.desc())
|
|
result = await session.execute(query)
|
|
users = list(result.scalars().all())
|
|
|
|
return {
|
|
"items": users,
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"has_more": offset + len(users) < total,
|
|
}
|
|
```
|
|
|
|
## NoSQL Patterns (MongoDB)
|
|
|
|
### Document Design
|
|
```python
|
|
from pydantic import BaseModel, Field
|
|
from datetime import datetime
|
|
from bson import ObjectId
|
|
|
|
|
|
class PyObjectId(ObjectId):
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def validate(cls, v):
|
|
if not ObjectId.is_valid(v):
|
|
raise ValueError("Invalid ObjectId")
|
|
return ObjectId(v)
|
|
|
|
|
|
class UserDocument(BaseModel):
|
|
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
|
|
email: str
|
|
name: str
|
|
# Embed frequently accessed data
|
|
profile: dict = {}
|
|
# Reference for large/changing data
|
|
order_ids: list[str] = []
|
|
created_at: datetime = Field(default_factory=datetime.utcnow)
|
|
|
|
class Config:
|
|
populate_by_name = True
|
|
json_encoders = {ObjectId: str}
|
|
```
|
|
|
|
### MongoDB with Motor (Async)
|
|
```python
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
|
|
class MongoUserRepository:
|
|
def __init__(self, client: AsyncIOMotorClient, db_name: str):
|
|
self.collection = client[db_name].users
|
|
|
|
async def get_by_id(self, user_id: str) -> dict | None:
|
|
return await self.collection.find_one({"_id": ObjectId(user_id)})
|
|
|
|
async def create(self, user: UserDocument) -> str:
|
|
result = await self.collection.insert_one(user.dict(by_alias=True))
|
|
return str(result.inserted_id)
|
|
|
|
async def find_by_email_domain(self, domain: str) -> list[dict]:
|
|
cursor = self.collection.find(
|
|
{"email": {"$regex": f"@{domain}$"}},
|
|
{"email": 1, "name": 1} # Projection
|
|
).limit(100)
|
|
return await cursor.to_list(length=100)
|
|
```
|
|
|
|
## Migration Safety
|
|
|
|
### Zero-Downtime Migration Pattern
|
|
|
|
```python
|
|
# Step 1: Add new column (nullable)
|
|
def upgrade_step1():
|
|
op.add_column('users', sa.Column('full_name', sa.String(200), nullable=True))
|
|
|
|
# Step 2: Backfill data (separate deployment)
|
|
def upgrade_step2():
|
|
# Run as background job, not in migration
|
|
pass
|
|
|
|
# Step 3: Make column required (after backfill complete)
|
|
def upgrade_step3():
|
|
op.alter_column('users', 'full_name', nullable=False)
|
|
|
|
# Step 4: Remove old columns (after app updated)
|
|
def upgrade_step4():
|
|
op.drop_column('users', 'first_name')
|
|
op.drop_column('users', 'last_name')
|
|
```
|
|
|
|
### Pre-Migration Checklist
|
|
|
|
- [ ] Backup database before migration
|
|
- [ ] Test migration on copy of production data
|
|
- [ ] Check migration doesn't lock tables for too long
|
|
- [ ] Ensure rollback script works
|
|
- [ ] Plan for zero-downtime if needed
|
|
- [ ] Coordinate with application deployments
|
|
|
|
## Commands
|
|
|
|
```bash
|
|
# Alembic
|
|
alembic upgrade head # Apply all migrations
|
|
alembic downgrade -1 # Rollback one
|
|
alembic history # Show history
|
|
alembic current # Show current version
|
|
|
|
# Prisma
|
|
npx prisma migrate dev # Development migration
|
|
npx prisma migrate deploy # Production migration
|
|
npx prisma db push # Push schema without migration
|
|
|
|
# PostgreSQL
|
|
pg_dump -Fc mydb > backup.dump # Backup
|
|
pg_restore -d mydb backup.dump # Restore
|
|
psql -d mydb -f migration.sql # Run SQL file
|
|
```
|