FastAPI Production Guide: Build, Test, Deploy
FastAPI has become the default choice for building Python APIs in 2026. It is fast, type-safe, and async-native. But most tutorials stop at "hello world." This guide covers everything you need to ship a real production FastAPI application - project layout, async database access, JWT authentication with role-based authorization, structured error handling, Docker packaging, testing, and deployment. Every code block is complete and runnable with Python 3.12+.
1. Project Structure
A flat main.py does not scale. Organize by domain with clear separation between routers, services, models, and schemas:
fastapi-prod/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app factory
│ ├── config.py # pydantic-settings
│ ├── database.py # async engine + session
│ ├── models/
│ │ ├── __init__.py
│ │ ├── base.py # declarative base + mixins
│ │ └── user.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── user.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── users.py
│ │ └── health.py
│ ├── services/
│ │ ├── __init__.py
│ │ └── auth.py
│ ├── dependencies/
│ │ ├── __init__.py
│ │ └── auth.py
│ ├── middleware/
│ │ ├── __init__.py
│ │ └── rate_limit.py
│ └── exceptions.py
├── tests/
│ ├── conftest.py
│ ├── test_auth.py
│ └── test_users.py
├── alembic/
│ ├── env.py
│ └── versions/
├── alembic.ini
├── Dockerfile
├── docker-compose.yml
├── pyproject.toml
└── .env
Key principles: routers handle HTTP, services handle business logic, models define the database, schemas define request/response shapes. Dependencies wire auth and DB sessions into routes via FastAPI's Depends().
2. Configuration
Use pydantic-settings to load config from environment variables and .env files with full type validation:
# app/config.py
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
app_name: str = "fastapi-prod"
debug: bool = False
database_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/app"
redis_url: str = "redis://localhost:6379/0"
secret_key: str = "change-me-in-production"
access_token_expire_minutes: int = 15
refresh_token_expire_days: int = 7
allowed_origins: list[str] = ["http://localhost:3000"]
@lru_cache
def get_settings() -> Settings:
return Settings()
# .env
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/app
REDIS_URL=redis://redis:6379/0
SECRET_KEY=your-256-bit-secret-here
ALLOWED_ORIGINS=["https://example.com"]
The @lru_cache ensures settings are parsed once. In tests, override with get_settings.cache_clear() or dependency injection.
3. Database Layer
SQLAlchemy 2.0 async with asyncpg and connection pooling. All models use UUID primary keys and automatic timestamps:
# app/database.py
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.config import get_settings
settings = get_settings()
engine = create_async_engine(
settings.database_url,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=300,
)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# app/models/base.py
import uuid
from datetime import datetime
from sqlalchemy import DateTime, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
class UUIDMixin:
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
# app/models/user.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base, TimestampMixin, UUIDMixin
class User(UUIDMixin, TimestampMixin, Base):
__tablename__ = "users"
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
role: Mapped[str] = mapped_column(String(50), default="user")
is_active: Mapped[bool] = mapped_column(default=True)
4. Authentication and Authorization
JWT access + refresh tokens with python-jose and passlib. Access tokens are short-lived (15 min), refresh tokens last 7 days and are rotated on use:
# app/services/auth.py
from datetime import datetime, timedelta, timezone
from uuid import UUID
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.config import get_settings
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(subject: UUID, token_type: str = "access") -> str:
if token_type == "access":
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes)
else:
expire = datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days)
payload = {"sub": str(subject), "type": token_type, "exp": expire}
return jwt.encode(payload, settings.secret_key, algorithm="HS256")
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
except JWTError as e:
raise ValueError(f"Invalid token: {e}") from e
The authorization dependency extracts the current user from the token and enforces role-based access:
# app/dependencies/auth.py
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.user import User
from app.services.auth import decode_token
bearer = HTTPBearer()
async def get_current_user(
creds: HTTPAuthorizationCredentials = Depends(bearer),
db: AsyncSession = Depends(get_db),
) -> User:
try:
payload = decode_token(creds.credentials)
except ValueError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
if payload.get("type") != "access":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
user_id = UUID(payload["sub"])
result = await db.execute(select(User).where(User.id == user_id, User.is_active.is_(True)))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
def require_role(*roles: str):
async def checker(user: User = Depends(get_current_user)) -> User:
if user.role not in roles:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return user
return checker
Usage in a route: user: User = Depends(require_role("admin")). This pattern composes cleanly - stack multiple dependencies for complex authorization rules.
5. API Routers
Three routers: auth (register/login/refresh), users (profile management), and health check.
# app/schemas/auth.py
from pydantic import BaseModel, EmailStr
class RegisterRequest(BaseModel):
email: EmailStr
password: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class RefreshRequest(BaseModel):
refresh_token: str
# app/schemas/user.py
import uuid
from datetime import datetime
from pydantic import BaseModel, EmailStr
class UserOut(BaseModel):
id: uuid.UUID
email: EmailStr
role: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class UserPatch(BaseModel):
email: EmailStr | None = None
role: str | None = None
is_active: bool | None = None
# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.user import User
from app.schemas.auth import LoginRequest, RefreshRequest, RegisterRequest, TokenResponse
from app.services.auth import create_token, decode_token, hash_password, verify_password
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=TokenResponse, status_code=201)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
exists = await db.execute(select(User).where(User.email == body.email))
if exists.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Email already registered")
user = User(email=body.email, hashed_password=hash_password(body.password))
db.add(user)
await db.flush()
return TokenResponse(
access_token=create_token(user.id, "access"),
refresh_token=create_token(user.id, "refresh"),
)
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == body.email))
user = result.scalar_one_or_none()
if not user or not verify_password(body.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
return TokenResponse(
access_token=create_token(user.id, "access"),
refresh_token=create_token(user.id, "refresh"),
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh(body: RefreshRequest, db: AsyncSession = Depends(get_db)):
try:
payload = decode_token(body.refresh_token)
except ValueError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Not a refresh token")
from uuid import UUID
user_id = UUID(payload["sub"])
result = await db.execute(select(User).where(User.id == user_id, User.is_active.is_(True)))
if not result.scalar_one_or_none():
raise HTTPException(status_code=401, detail="User not found")
return TokenResponse(
access_token=create_token(user_id, "access"),
refresh_token=create_token(user_id, "refresh"),
)
# app/routers/users.py
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role
from app.models.user import User
from app.schemas.user import UserOut, UserPatch
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/me", response_model=UserOut)
async def get_me(user: User = Depends(get_current_user)):
return user
@router.get("/", response_model=list[UserOut])
async def list_users(
skip: int = 0, limit: int = 50,
user: User = Depends(require_role("admin")),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()
@router.patch("/{user_id}", response_model=UserOut)
async def patch_user(
user_id: UUID, body: UserPatch,
admin: User = Depends(require_role("admin")),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
for field, value in body.model_dump(exclude_unset=True).items():
setattr(user, field, value)
return user
# app/routers/health.py
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
router = APIRouter(tags=["health"])
@router.get("/health")
async def health(db: AsyncSession = Depends(get_db)):
await db.execute(text("SELECT 1"))
return {"status": "healthy"}
6. Error Handling
A custom exception class with structured JSON responses and global handlers that catch everything:
# app/exceptions.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class AppException(Exception):
def __init__(self, status_code: int, code: str, detail: str):
self.status_code = status_code
self.code = code
self.detail = detail
def register_exception_handlers(app: FastAPI) -> None:
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
return JSONResponse(
status_code=exc.status_code,
content={"error": {"code": exc.code, "detail": exc.detail}},
)
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={"error": {"code": "INTERNAL_ERROR", "detail": "An unexpected error occurred"}},
)
Raise anywhere in your code: raise AppException(404, "USER_NOT_FOUND", "No user with that ID"). Every error response follows the same {"error": {"code": "...", "detail": "..."}} shape, making client-side handling predictable.
7. Middleware
Rate limiting with an in-memory sliding window (swap to Redis for multi-process), plus CORS configuration:
# app/middleware/rate_limit.py
import time
from collections import defaultdict
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):
super().__init__(app)
self.max_requests = max_requests
self.window = window_seconds
self.clients: dict[str, list[float]] = defaultdict(list)
async def dispatch(self, request: Request, call_next) -> Response:
client_ip = request.client.host if request.client else "unknown"
now = time.time()
window_start = now - self.window
self.clients[client_ip] = [t for t in self.clients[client_ip] if t > window_start]
if len(self.clients[client_ip]) >= self.max_requests:
return Response(content="Rate limit exceeded", status_code=429)
self.clients[client_ip].append(now)
return await call_next(request)
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import get_settings
from app.exceptions import register_exception_handlers
from app.middleware.rate_limit import RateLimitMiddleware
from app.routers import auth, health, users
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # startup / shutdown hooks go here
app = FastAPI(title=settings.app_name, lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)
register_exception_handlers(app)
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(health.router)
8. Testing
Async tests with pytest + httpx. The conftest overrides the database dependency with an in-memory SQLite engine so tests run without PostgreSQL:
# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.database import get_db
from app.main import app
from app.models.base import Base
TEST_DB_URL = "sqlite+aiosqlite:///test.db"
engine = create_async_engine(TEST_DB_URL, echo=False)
TestSession = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@pytest.fixture(autouse=True)
async def setup_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
async def override_get_db():
async with TestSession() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
app.dependency_overrides[get_db] = override_get_db
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.fixture
def user_payload():
return {"email": "test@example.com", "password": "Str0ngP@ss!"}
# tests/test_auth.py
import pytest
pytestmark = pytest.mark.anyio
async def test_register(client, user_payload):
resp = await client.post("/auth/register", json=user_payload)
assert resp.status_code == 201
data = resp.json()
assert "access_token" in data
assert "refresh_token" in data
async def test_register_duplicate(client, user_payload):
await client.post("/auth/register", json=user_payload)
resp = await client.post("/auth/register", json=user_payload)
assert resp.status_code == 409
async def test_login(client, user_payload):
await client.post("/auth/register", json=user_payload)
resp = await client.post("/auth/login", json=user_payload)
assert resp.status_code == 200
assert "access_token" in resp.json()
async def test_refresh(client, user_payload):
reg = await client.post("/auth/register", json=user_payload)
refresh_token = reg.json()["refresh_token"]
resp = await client.post("/auth/refresh", json={"refresh_token": refresh_token})
assert resp.status_code == 200
# tests/test_users.py
import pytest
pytestmark = pytest.mark.anyio
async def test_get_me(client, user_payload):
reg = await client.post("/auth/register", json=user_payload)
token = reg.json()["access_token"]
resp = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
assert resp.json()["email"] == user_payload["email"]
async def test_list_users_forbidden(client, user_payload):
reg = await client.post("/auth/register", json=user_payload)
token = reg.json()["access_token"]
resp = await client.get("/users/", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 403
9. Docker
Multi-stage build using uv for fast dependency resolution. The final image is under 150 MB:
# Dockerfile
FROM python:3.12-slim AS builder
RUN pip install uv
WORKDIR /app
COPY pyproject.toml .
RUN uv pip install --system --no-cache -r pyproject.toml
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY . .
EXPOSE 8000
CMD ["gunicorn", "app.main:app", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000", "-w", "4"]
# docker-compose.yml
services:
app:
build: .
ports:
- "8000:8000"
env_file: .env
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: app
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
retries: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
pgdata:
Run docker compose up --build and the full stack starts - app, PostgreSQL, and Redis.
10. Performance
Four areas that matter most: eager loading, connection pooling, Redis caching, and async discipline.
# Eager loading - avoid N+1 queries
from sqlalchemy.orm import selectinload
result = await db.execute(
select(User).options(selectinload(User.orders)).limit(50)
)
users = result.scalars().unique().all()
# Redis caching with a simple helper
import json
from redis.asyncio import Redis
redis = Redis.from_url("redis://localhost:6379/0")
async def cached(key: str, ttl: int, fetch):
"""Return cached value or call fetch(), cache result, and return it."""
hit = await redis.get(key)
if hit:
return json.loads(hit)
value = await fetch()
await redis.setex(key, ttl, json.dumps(value, default=str))
return value
# Usage in a route
@router.get("/users/stats")
async def user_stats(db: AsyncSession = Depends(get_db)):
async def fetch():
result = await db.execute(text("SELECT count(*) FROM users"))
return {"total": result.scalar()}
return await cached("users:stats", 300, fetch)
requests.get, time.sleep) inside async routes. Use httpx.AsyncClient for HTTP, aiofiles for disk I/O, and asyncio.to_thread() as a last resort for sync libraries.
11. Deployment
Gunicorn with Uvicorn workers for production. Alembic for database migrations:
# gunicorn.conf.py
import multiprocessing
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 120
keepalive = 5
accesslog = "-"
errorlog = "-"
# alembic/env.py (async version)
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from app.config import get_settings
from app.models.base import Base
config = context.config
if config.config_file_name:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
context.configure(url=get_settings().database_url, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online():
engine = create_async_engine(get_settings().database_url)
async with engine.connect() as connection:
await connection.run_sync(do_run_migrations)
await engine.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
# Generate and run migrations
alembic init -t async alembic
alembic revision --autogenerate -m "create users table"
alembic upgrade head
12. Top 10 Production Mistakes
| # | Mistake | Fix |
|---|---|---|
| 1 | Blocking calls in async routes | Use async libraries or asyncio.to_thread() |
| 2 | No connection pooling | Set pool_size, max_overflow, pool_recycle |
| 3 | Storing secrets in code | Use pydantic-settings with .env files |
| 4 | No request validation | Pydantic schemas on every endpoint |
| 5 | Missing CORS config | Explicit allow_origins, never ["*"] in prod |
| 6 | No health check endpoint | Add /health that pings the database |
| 7 | Running Uvicorn directly in prod | Use Gunicorn with Uvicorn workers |
| 8 | N+1 queries | Use selectinload / joinedload |
| 9 | No rate limiting | Middleware or reverse proxy rate limits |
| 10 | Skipping Alembic migrations | Never use create_all() in production |
13. FastAPI vs Django REST vs Flask
Choosing the right framework depends on your team, timeline, and requirements:
| Feature | FastAPI | Django REST Framework | Flask |
|---|---|---|---|
| Async native | Yes (ASGI) | Partial (Django 5+) | No (WSGI) |
| Type hints / validation | Built-in (Pydantic) | Serializers (manual) | Extensions needed |
| Auto OpenAPI docs | Yes (Swagger + ReDoc) | Via drf-spectacular | Via flask-smorest |
| ORM | Any (SQLAlchemy typical) | Django ORM (built-in) | Any (SQLAlchemy typical) |
| Admin panel | No (use SQLAdmin) | Yes (built-in) | No (use Flask-Admin) |
| Auth system | DIY or fastapi-users | Built-in + DRF tokens | DIY or Flask-Login |
| Performance (req/s) | High (~15k) | Medium (~3k) | Medium (~4k) |
| Learning curve | Low (if you know Python types) | Medium (Django conventions) | Low (minimal API) |
| Best for | APIs, microservices, ML serving | Full-stack apps, rapid CRUD | Small APIs, prototypes |
| Maturity | 2018, growing fast | 2005, battle-tested | 2010, stable |
FastAPI wins for greenfield API projects where performance and type safety matter. Django REST wins when you need an admin panel, ORM migrations, and auth out of the box. Flask is best for tiny services where you want full control with minimal overhead.