--- name: database-patterns description: Database design patterns, migrations with Alembic/Prisma, and query optimization. Use when working with SQL/NoSQL databases or schema migrations. --- # Database Patterns Skill ## Schema Migrations ### Alembic (Python/SQLAlchemy) #### Setup ```bash # Initialize Alembic alembic init alembic # Configure alembic.ini sqlalchemy.url = postgresql://user:pass@localhost/myapp ``` #### alembic/env.py Configuration ```python from logging.config import fileConfig from sqlalchemy import engine_from_config, pool from alembic import context import os # Import your models from app.models import Base config = context.config # Override with environment variable config.set_main_option( "sqlalchemy.url", os.environ.get("DATABASE_URL", config.get_main_option("sqlalchemy.url")) ) target_metadata = Base.metadata def run_migrations_offline() -> None: """Run migrations in 'offline' mode.""" url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def run_migrations_online() -> None: """Run migrations in 'online' mode.""" connectable = engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata, ) with context.begin_transaction(): context.run_migrations() if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online() ``` #### Migration Commands ```bash # Create migration from model changes alembic revision --autogenerate -m "add users table" # Create empty migration alembic revision -m "add custom index" # Apply migrations alembic upgrade head # Rollback one migration alembic downgrade -1 # Rollback to specific revision alembic downgrade abc123 # Show current revision alembic current # Show migration history alembic history --verbose ``` #### Migration Best Practices ```python # alembic/versions/001_add_users_table.py """Add users table Revision ID: abc123 Revises: Create Date: 2024-01-15 10:00:00.000000 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa revision: str = 'abc123' down_revision: Union[str, None] = None branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: op.create_table( 'users', sa.Column('id', sa.UUID(), nullable=False), sa.Column('email', sa.String(255), nullable=False), sa.Column('name', sa.String(100), nullable=False), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=sa.func.now()), sa.PrimaryKeyConstraint('id'), ) # Create index separately for clarity op.create_index('ix_users_email', 'users', ['email'], unique=True) def downgrade() -> None: op.drop_index('ix_users_email', table_name='users') op.drop_table('users') ``` #### Data Migrations ```python """Backfill user full names Revision ID: def456 """ from alembic import op import sqlalchemy as sa from sqlalchemy.sql import table, column def upgrade() -> None: # Define table structure for data migration users = table('users', column('id', sa.UUID), column('first_name', sa.String), column('last_name', sa.String), column('full_name', sa.String), ) # Batch update for large tables connection = op.get_bind() connection.execute( users.update().values( full_name=users.c.first_name + ' ' + users.c.last_name ) ) def downgrade() -> None: # Data migrations typically aren't reversible pass ``` ### Prisma (TypeScript) #### Schema Definition ```prisma // prisma/schema.prisma datasource db { provider = "postgresql" url = env("DATABASE_URL") } generator client { provider = "prisma-client-js" } model User { id String @id @default(uuid()) email String @unique name String role Role @default(USER) posts Post[] createdAt DateTime @default(now()) @map("created_at") updatedAt DateTime @updatedAt @map("updated_at") @@map("users") @@index([email]) } model Post { id String @id @default(uuid()) title String content String? published Boolean @default(false) author User @relation(fields: [authorId], references: [id]) authorId String @map("author_id") createdAt DateTime @default(now()) @map("created_at") @@map("posts") @@index([authorId]) } enum Role { USER ADMIN } ``` #### Migration Commands ```bash # Create migration from schema changes npx prisma migrate dev --name add_users_table # Apply migrations in production npx prisma migrate deploy # Reset database (development only) npx prisma migrate reset # Generate client npx prisma generate # View database npx prisma studio ``` ## SQLAlchemy 2.0 Patterns ### Model Definition ```python from datetime import datetime from typing import Optional from uuid import UUID, uuid4 from sqlalchemy import String, ForeignKey, func from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) email: Mapped[str] = mapped_column(String(255), unique=True, index=True) name: Mapped[str] = mapped_column(String(100)) role: Mapped[str] = mapped_column(String(20), default="user") created_at: Mapped[datetime] = mapped_column(server_default=func.now()) updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=func.now()) # Relationships orders: Mapped[list["Order"]] = relationship(back_populates="user") class Order(Base): __tablename__ = "orders" id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id")) total: Mapped[int] # Store as cents status: Mapped[str] = mapped_column(String(20), default="pending") created_at: Mapped[datetime] = mapped_column(server_default=func.now()) # Relationships user: Mapped["User"] = relationship(back_populates="orders") items: Mapped[list["OrderItem"]] = relationship(back_populates="order") ``` ### Async Repository Pattern ```python from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession class UserRepository: def __init__(self, session: AsyncSession): self.session = session async def get_by_id(self, user_id: UUID) -> User | None: result = await self.session.execute( select(User).where(User.id == user_id) ) return result.scalar_one_or_none() async def get_by_email(self, email: str) -> User | None: result = await self.session.execute( select(User).where(User.email == email) ) return result.scalar_one_or_none() async def list_with_orders( self, limit: int = 20, offset: int = 0 ) -> list[User]: result = await self.session.execute( select(User) .options(selectinload(User.orders)) .limit(limit) .offset(offset) ) return list(result.scalars().all()) async def create(self, user: User) -> User: self.session.add(user) await self.session.flush() return user async def update(self, user: User) -> User: await self.session.flush() return user async def delete(self, user: User) -> None: await self.session.delete(user) await self.session.flush() ``` ## Query Optimization ### Indexing Strategies ```sql -- Primary lookup patterns CREATE INDEX idx_users_email ON users(email); CREATE INDEX idx_orders_user_id ON orders(user_id); -- Composite indexes (order matters!) CREATE INDEX idx_orders_user_status ON orders(user_id, status); -- Partial indexes CREATE INDEX idx_orders_pending ON orders(user_id) WHERE status = 'pending'; -- Covering indexes CREATE INDEX idx_users_email_name ON users(email) INCLUDE (name); ``` ### N+1 Query Prevention ```python # BAD - N+1 queries users = await session.execute(select(User)) for user in users.scalars(): print(user.orders) # Each access triggers a query! # GOOD - Eager loading from sqlalchemy.orm import selectinload, joinedload # Use selectinload for collections users = await session.execute( select(User).options(selectinload(User.orders)) ) # Use joinedload for single relations orders = await session.execute( select(Order).options(joinedload(Order.user)) ) ``` ### Pagination ```python from sqlalchemy import select, func async def paginate_users( session: AsyncSession, page: int = 1, page_size: int = 20, ) -> dict: # Count total count_query = select(func.count()).select_from(User) total = (await session.execute(count_query)).scalar_one() # Fetch page offset = (page - 1) * page_size query = select(User).limit(page_size).offset(offset).order_by(User.created_at.desc()) result = await session.execute(query) users = list(result.scalars().all()) return { "items": users, "total": total, "page": page, "page_size": page_size, "has_more": offset + len(users) < total, } ``` ## NoSQL Patterns (MongoDB) ### Document Design ```python from pydantic import BaseModel, Field from datetime import datetime from bson import ObjectId class PyObjectId(ObjectId): @classmethod def __get_validators__(cls): yield cls.validate @classmethod def validate(cls, v): if not ObjectId.is_valid(v): raise ValueError("Invalid ObjectId") return ObjectId(v) class UserDocument(BaseModel): id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") email: str name: str # Embed frequently accessed data profile: dict = {} # Reference for large/changing data order_ids: list[str] = [] created_at: datetime = Field(default_factory=datetime.utcnow) class Config: populate_by_name = True json_encoders = {ObjectId: str} ``` ### MongoDB with Motor (Async) ```python from motor.motor_asyncio import AsyncIOMotorClient class MongoUserRepository: def __init__(self, client: AsyncIOMotorClient, db_name: str): self.collection = client[db_name].users async def get_by_id(self, user_id: str) -> dict | None: return await self.collection.find_one({"_id": ObjectId(user_id)}) async def create(self, user: UserDocument) -> str: result = await self.collection.insert_one(user.dict(by_alias=True)) return str(result.inserted_id) async def find_by_email_domain(self, domain: str) -> list[dict]: cursor = self.collection.find( {"email": {"$regex": f"@{domain}$"}}, {"email": 1, "name": 1} # Projection ).limit(100) return await cursor.to_list(length=100) ``` ## Migration Safety ### Zero-Downtime Migration Pattern ```python # Step 1: Add new column (nullable) def upgrade_step1(): op.add_column('users', sa.Column('full_name', sa.String(200), nullable=True)) # Step 2: Backfill data (separate deployment) def upgrade_step2(): # Run as background job, not in migration pass # Step 3: Make column required (after backfill complete) def upgrade_step3(): op.alter_column('users', 'full_name', nullable=False) # Step 4: Remove old columns (after app updated) def upgrade_step4(): op.drop_column('users', 'first_name') op.drop_column('users', 'last_name') ``` ### Pre-Migration Checklist - [ ] Backup database before migration - [ ] Test migration on copy of production data - [ ] Check migration doesn't lock tables for too long - [ ] Ensure rollback script works - [ ] Plan for zero-downtime if needed - [ ] Coordinate with application deployments ## Commands ```bash # Alembic alembic upgrade head # Apply all migrations alembic downgrade -1 # Rollback one alembic history # Show history alembic current # Show current version # Prisma npx prisma migrate dev # Development migration npx prisma migrate deploy # Production migration npx prisma db push # Push schema without migration # PostgreSQL pg_dump -Fc mydb > backup.dump # Backup pg_restore -d mydb backup.dump # Restore psql -d mydb -f migration.sql # Run SQL file ```