FastAPI Complete Course FastAPI Module 8

FastAPI Complete Course Module 8: Database Integration

Introduction

Welcome to Module 8 of the FastAPI Complete Course. In this chapter, we will tackle one of the most critical aspects of building real-world applications: FastAPI Database Integration. Most professional APIs need to store, retrieve, and manage data persistently. Without a database, your application is limited to in-memory data that disappears when the server restarts.

By the end of this module, you will be able to connect FastAPI to both SQLite (for development) and PostgreSQL (for production), define database models using SQLAlchemy, establish relationships between tables, and manage schema changes with Alembic migrations. This knowledge is essential for any job-oriented learner aiming to build scalable, data-driven APIs.

We will use SQLAlchemy as our Object Relational Mapper (ORM). SQLAlchemy is the most popular Python ORM and works seamlessly with FastAPI. We will also use Alembic for database migrations, which allows us to evolve our database schema over time without losing data.

Introduction to SQLAlchemy

SQLAlchemy is a powerful SQL toolkit and ORM for Python. It provides a full suite of well-known enterprise-level persistence patterns. For FastAPI developers, SQLAlchemy offers two main benefits:

  • ORM (Object Relational Mapper): Allows you to define database tables as Python classes. Each class instance represents a row in the table.
  • Core: Provides a lower-level SQL abstraction layer. We will focus on the ORM for this module.

The typical workflow with FastAPI and SQLAlchemy involves:

  1. Defining database models (Python classes).
  2. Creating a database session to interact with the database.
  3. Using dependency injection to provide database sessions to your route handlers.

Let’s start by installing the necessary packages.

pip install fastapi uvicorn sqlalchemy databases alembic psycopg2-binary

Here, sqlalchemy is the ORM, databases is an async database adapter (optional but recommended for async FastAPI), alembic handles migrations, and psycopg2-binary is the PostgreSQL driver.

SQLite Database

SQLite is a lightweight, file-based database perfect for development and testing. It requires no separate server setup. We will start with SQLite to understand the fundamentals before moving to PostgreSQL.

Setting Up SQLite with SQLAlchemy

Create a new file called database.py in your project root. This file will contain the database engine, session local, and base class.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# SQLite database URL (relative path)
SQLALCHEMY_DATABASE_URL = "sqlite:///./blog.db"

# Create engine for SQLite
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False}  # Needed for SQLite
)

# Create session local class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Create base class for models
Base = declarative_base()

Let’s break this down:

  • create_engine: Creates the database engine. The URL sqlite:///./blog.db creates a file named blog.db in your current directory.
  • connect_args={"check_same_thread": False}: Required for SQLite to allow multiple threads to access the database (FastAPI uses multiple threads).
  • sessionmaker: Creates a configured “Session” class. We will use this to create database sessions.
  • declarative_base(): Returns a base class from which all model classes will inherit.

Creating a Database Session Dependency

In FastAPI, we use dependency injection to get a database session for each request. Add this function to your database.py:

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

This function creates a new session, yields it to the route handler, and then closes it after the request is complete. The yield keyword makes it a generator, which FastAPI uses for dependency injection with cleanup.

PostgreSQL Setup

For production applications, you will use PostgreSQL. It is a robust, feature-rich, open-source relational database. Setting it up with SQLAlchemy is similar to SQLite, but with a different URL.

Installing PostgreSQL

First, install PostgreSQL on your system:

  • Windows: Download from postgresql.org.
  • macOS: brew install postgresql
  • Linux (Ubuntu): sudo apt install postgresql postgresql-contrib

Start the PostgreSQL service and create a database:

# Start PostgreSQL (Linux/macOS)
sudo service postgresql start

# Access PostgreSQL shell
sudo -u postgres psql

# Create a database and user
CREATE DATABASE blogdb;
CREATE USER bloguser WITH PASSWORD 'password123';
GRANT ALL PRIVILEGES ON DATABASE blogdb TO bloguser;
q

Configuring PostgreSQL in FastAPI

Update your database.py to support PostgreSQL. We will use environment variables for the database URL to keep it secure.

import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Use environment variable or fallback to SQLite for development
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./blog.db")

# If using PostgreSQL, the URL looks like:
# DATABASE_URL = "postgresql://bloguser:password123@localhost/blogdb"

engine = create_engine(DATABASE_URL)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

To switch between databases, simply set the DATABASE_URL environment variable. For development, leave it as SQLite. For production, set it to your PostgreSQL URL.

Database Models

Database models are Python classes that represent tables in your database. Each attribute of the class represents a column. Let’s create models for a simple blog application with users and posts.

Creating Models

Create a new file models.py:

from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.sql import func
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password = Column(String(255), nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    published = Column(Integer, default=0)  # 0 = draft, 1 = published
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    def __repr__(self):
        return f"<Post(id={self.id}, title='{self.title}')>"

Explanation:

  • __tablename__: The name of the table in the database.
  • Column: Defines a column with its type and constraints.
  • primary_key=True: Sets the column as the primary key.
  • index=True: Creates an index for faster queries.
  • unique=True: Ensures all values in this column are unique.
  • nullable=False: The column cannot be NULL.
  • server_default=func.now(): Sets the default value to the current timestamp at the database level.
  • ForeignKey("users.id"): Creates a foreign key relationship to the users table.
  • onupdate=func.now(): Automatically updates the timestamp when the row is updated.

Creating Tables

To create the tables in the database, add this code to your main app.py or main.py:

from fastapi import FastAPI
from database import engine, Base
import models

# Create all tables
Base.metadata.create_all(bind=engine)

app = FastAPI()

@app.get("/")
def root():
    return {"message": "Database tables created!"}

Run the application with uvicorn main:app --reload. The tables will be created automatically. For SQLite, you will see a blog.db file appear.

Database Relationships

Relationships allow you to navigate between related tables. SQLAlchemy provides relationship() to define these connections. Let’s add relationships to our models.

Adding Relationships

Update your models.py to include relationships:

from sqlalchemy.orm import relationship

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password = Column(String(255), nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

    # Relationship to posts
    posts = relationship("Post", back_populates="owner")

    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    published = Column(Integer, default=0)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    # Relationship to user
    owner = relationship("User", back_populates="posts")

    def __repr__(self):
        return f"<Post(id={self.id}, title='{self.title}')>"

Now, when you query a user, you can access all their posts via user.posts. Similarly, from a post, you can access the author via post.owner.

Using Relationships in Routes

Here is an example of how to use relationships in a FastAPI route:

from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session
from database import get_db
import models
from pydantic import BaseModel

# Pydantic schemas for request/response
class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    owner_id: int

    class Config:
        orm_mode = True

class UserWithPosts(BaseModel):
    id: int
    username: str
    email: str
    posts: list[PostResponse] = []

    class Config:
        orm_mode = True

@app.get("/users/{user_id}/posts", response_model=UserWithPosts)
def get_user_with_posts(user_id: int, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

Notice that user.posts is automatically populated because of the relationship. The orm_mode = True in Pydantic allows it to read data from ORM objects.

Migrations with Alembic

As your application evolves, you will need to modify the database schema (add columns, change types, etc.). Alembic handles these changes safely through migrations. Let’s set it up.

Initializing Alembic

Run the following command in your project root:

alembic init alembic

This creates an alembic directory with configuration files. The main configuration file is alembic.ini.

Configuring Alembic

Open alembic.ini and set the database URL:

sqlalchemy.url = sqlite:///./blog.db

For PostgreSQL, change it to:

sqlalchemy.url = postgresql://bloguser:password123@localhost/blogdb

Next, update the env.py file inside the alembic directory to import your models:

from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import sys
import os

# Add the parent directory to the path so Alembic can find your models
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

from database import Base
import models  # Import your models here

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

Creating Your First Migration

Now, create a migration to generate the initial tables:

alembic revision --autogenerate -m "Initial migration"

This compares your SQLAlchemy models with the current database state and generates a migration script. You will find it in alembic/versions/.

Applying Migrations

To apply the migration to the database, run:

alembic upgrade head

This executes all pending migrations. If you ever need to roll back, use alembic downgrade -1.

Making Schema Changes

Let’s say you want to add a status column to the users table. Update your model:

class User(Base):
    __tablename__ = "users"
    # ... existing columns ...
    status = Column(String(20), default="active")  # New column

Then generate and apply a new migration:

alembic revision --autogenerate -m "Add status column to users"
alembic upgrade head

Alembic will generate the ALTER TABLE statement automatically.

Common Mistakes

Here are pitfalls beginners often encounter with FastAPI Database Integration:

  • Forgetting to import models before creating tables: If you don’t import your model files, Base.metadata.create_all() won’t know about them. Always import your models in main.py.
  • Not closing database sessions: Always use the get_db dependency or manually close sessions. Unclosed sessions can cause connection leaks.
  • Using synchronous SQLAlchemy with async routes: If you use async route handlers, use databases or asyncpg for async database access. Mixing sync and async can block the event loop.
  • Ignoring Alembic migrations for schema changes: Manually altering the database schema without Alembic leads to inconsistencies between your models and the database.
  • Hardcoding database URLs: Always use environment variables for database URLs, especially for passwords and production connections.

Practice Task

Now it’s your turn to apply what you’ve learned. Build a simple FastAPI application with the following requirements:

  1. Set up a SQLite database for development and PostgreSQL for production (use environment variables).
  2. Create two models: Category and Product.
    • Category: id (PK), name (unique), description (optional).
    • Product: id (PK), name, price (float), stock (integer), category_id (FK to Category), created_at.
  3. Add relationships: Category has many Products, Product belongs to one Category.
  4. Create a dependency for the database session.
  5. Initialize Alembic and create an initial migration.
  6. Create a route that returns all products with their category names.

Test your application by adding a few categories and products using the FastAPI interactive docs at /docs.

Summary

In this module, we covered the essentials of FastAPI Database Integration. You learned how to:

  • Set up SQLAlchemy with SQLite and PostgreSQL.
  • Create database models with columns, constraints, and relationships.
  • Use dependency injection to manage database sessions.
  • Define one-to-many relationships between models.
  • Use Alembic for database migrations to evolve your schema safely.

These skills are fundamental for building data-driven APIs. You can now connect FastAPI to a database, define your data structure, and handle schema changes professionally.

FAQs

1. What is the difference between SQLAlchemy ORM and Core?

SQLAlchemy ORM maps Python classes to database tables, allowing you to work with objects. Core provides a lower-level SQL abstraction where you write SQL-like expressions. ORM is easier for most applications; Core is useful for complex queries or when you need more control.

2. Can I use FastAPI with NoSQL databases like MongoDB?

Yes, but it requires different libraries. For MongoDB, you would use motor (async driver) or pymongo (sync). This module focuses on relational databases with SQLAlchemy.

3. Why do we need Alembic if SQLAlchemy can create tables automatically?

Base.metadata.create_all() creates tables but does not handle schema changes (like adding columns) without dropping and recreating tables. Alembic generates migration scripts that alter existing tables without data loss, which is critical in production.

4. How do I handle multiple database connections in FastAPI?

You can create multiple engines and session locals, then provide them through different dependencies. For example, one for the main database and another for a reporting database.

5. What is the best practice for database URL security?

Never hardcode database URLs. Use environment variables (e.g., os.getenv("DATABASE_URL")) and consider using a .env file with python-dotenv. For production, use secret management tools like Docker secrets or cloud provider key stores.

This concludes Module 8. You now have a solid foundation in database integration with FastAPI. In Module 9: Authentication and Authorization, we will build on this knowledge to secure your API with JWT tokens, OAuth2, and role-based access control. Get ready to make your APIs production-ready!

More Practical Examples

Let’s expand your understanding with some real-world database integration scenarios that go beyond basic CRUD operations. These examples will help you handle common challenges when building FastAPI applications with databases.

Handling Pagination and Filtering

When your database grows, you’ll need to return data in manageable chunks. Here’s a practical example of implementing pagination with filtering:

from fastapi import FastAPI, Query, Depends
from sqlalchemy.orm import Session
from sqlalchemy import and_
from typing import Optional

app = FastAPI()

@app.get("/users/")
async def get_users(
    db: Session = Depends(get_db),
    skip: int = Query(0, ge=0, description="Number of records to skip"),
    limit: int = Query(10, ge=1, le=100, description="Max records to return"),
    min_age: Optional[int] = Query(None, ge=0, description="Minimum age filter"),
    max_age: Optional[int] = Query(None, ge=0, description="Maximum age filter"),
    active_only: bool = Query(False, description="Filter only active users")
):
    """Get paginated users with optional filters."""
    query = db.query(User)
    
    # Apply filters dynamically
    filters = []
    if min_age is not None:
        filters.append(User.age >= min_age)
    if max_age is not None:
        filters.append(User.age <= max_age)
    if active_only:
        filters.append(User.is_active == True)
    
    if filters:
        query = query.filter(and_(*filters))
    
    total_count = query.count()
    users = query.offset(skip).limit(limit).all()
    
    return {
        "total": total_count,
        "skip": skip,
        "limit": limit,
        "users": users
    }

Explanation: This endpoint demonstrates several important patterns. The skip and limit parameters control pagination, while optional query parameters allow filtering. We use and_ from SQLAlchemy to combine multiple conditions dynamically. The response includes metadata about the total count, which is essential for frontend pagination controls. Notice how we validate input with ge (greater than or equal) and le (less than or equal) constraints.

Bulk Operations and Batch Processing

Sometimes you need to insert or update many records efficiently. Here’s how to handle bulk operations:

from fastapi import HTTPException
from sqlalchemy import update, delete
from typing import List

@app.post("/users/bulk-create/")
async def bulk_create_users(
    users_data: List[UserCreate],
    db: Session = Depends(get_db)
):
    """Create multiple users in a single transaction."""
    try:
        # Create User objects from input data
        db_users = [User(**user.dict()) for user in users_data]
        
        # Add all users to session
        db.add_all(db_users)
        
        # Commit once for better performance
        db.commit()
        
        # Refresh to get auto-generated IDs
        for user in db_users:
            db.refresh(user)
        
        return {
            "message": f"Successfully created {len(db_users)} users",
            "users": db_users
        }
    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=400, detail=str(e))

@app.put("/users/bulk-deactivate/")
async def bulk_deactivate_users(
    user_ids: List[int],
    db: Session = Depends(get_db)
):
    """Deactivate multiple users at once."""
    try:
        stmt = (
            update(User)
            .where(User.id.in_(user_ids))
            .values(is_active=False)
        )
        result = db.execute(stmt)
        db.commit()
        
        return {
            "message": f"Deactivated {result.rowcount} users",
            "affected_ids": user_ids
        }
    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=400, detail=str(e))

Explanation: Bulk operations are crucial for performance. The first endpoint uses db.add_all() to insert many records in one database round-trip. We wrap everything in a try-except block and rollback on failure to maintain data consistency. The second endpoint demonstrates bulk updates using SQLAlchemy’s update() statement with .in_() for matching multiple IDs. The rowcount attribute tells us how many records were affected.

Soft Deletes and Query Filters

Instead of permanently deleting records, many applications use soft deletes. Here’s a practical implementation:

from datetime import datetime
from sqlalchemy import event

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    is_deleted = Column(Boolean, default=False)
    deleted_at = Column(DateTime, nullable=True)
    
    @classmethod
    def active_query(cls, db: Session):
        """Return a query that excludes soft-deleted records."""
        return db.query(cls).filter(cls.is_deleted == False)

@app.delete("/users/{user_id}/soft-delete/")
async def soft_delete_user(
    user_id: int,
    db: Session = Depends(get_db)
):
    """Soft delete a user by marking them as deleted."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    user.is_deleted = True
    user.deleted_at = datetime.utcnow()
    db.commit()
    
    return {"message": "User soft deleted"}

@app.get("/users/active/")
async def get_active_users(
    db: Session = Depends(get_db)
):
    """Get only non-deleted users."""
    users = User.active_query(db).all()
    return {"users": users}

Explanation: Soft deletes preserve data integrity by marking records as deleted rather than removing them. The is_deleted boolean flag and deleted_at timestamp track deletion status. We provide a class method active_query() that automatically filters out deleted records. This pattern is especially useful for audit trails and data recovery scenarios.

Class-Based Example

While FastAPI works well with functions, class-based views can organize related endpoints better. Here’s a complete example of a database service class:

from sqlalchemy.orm import Session
from typing import List, Optional, TypeVar, Generic
from pydantic import BaseModel

# Generic type for database models
ModelType = TypeVar("ModelType")
CreateSchemaType = TypeVar("CreateSchemaType")
UpdateSchemaType = TypeVar("UpdateSchemaType")

class BaseService(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
    """Generic base service for database operations."""
    
    def __init__(self, model: ModelType, db: Session):
        self.model = model
        self.db = db
    
    def get(self, id: int) -> Optional[ModelType]:
        """Get a single record by ID."""
        return self.db.query(self.model).filter(self.model.id == id).first()
    
    def get_multi(
        self,
        skip: int = 0,
        limit: int = 100,
        **filters
    ) -> List[ModelType]:
        """Get multiple records with optional filters."""
        query = self.db.query(self.model)
        
        # Apply dynamic filters
        for field, value in filters.items():
            if hasattr(self.model, field):
                query = query.filter(getattr(self.model, field) == value)
        
        return query.offset(skip).limit(limit).all()
    
    def create(self, obj_in: CreateSchemaType) -> ModelType:
        """Create a new record."""
        obj_data = obj_in.dict()
        db_obj = self.model(**obj_data)
        self.db.add(db_obj)
        self.db.commit()
        self.db.refresh(db_obj)
        return db_obj
    
    def update(
        self,
        id: int,
        obj_in: UpdateSchemaType
    ) -> Optional[ModelType]:
        """Update an existing record."""
        db_obj = self.get(id)
        if not db_obj:
            return None
        
        obj_data = obj_in.dict(exclude_unset=True)
        for field, value in obj_data.items():
            setattr(db_obj, field, value)
        
        self.db.commit()
        self.db.refresh(db_obj)
        return db_obj
    
    def remove(self, id: int) -> bool:
        """Delete a record by ID."""
        db_obj = self.get(id)
        if not db_obj:
            return False
        
        self.db.delete(db_obj)
        self.db.commit()
        return True

# Concrete implementation for User model
class UserService(BaseService[User, UserCreate, UserUpdate]):
    """Service class for User operations."""
    
    def get_by_email(self, email: str) -> Optional[User]:
        """Get user by email."""
        return self.db.query(User).filter(User.email == email).first()
    
    def get_active_users(self) -> List[User]:
        """Get all active users."""
        return self.db.query(User).filter(
            User.is_active == True,
            User.is_deleted == False
        ).all()

# FastAPI dependency for getting service instance
def get_user_service(db: Session = Depends(get_db)):
    return UserService(User, db)

@app.get("/users/{user_id}")
async def read_user(
    user_id: int,
    service: UserService = Depends(get_user_service)
):
    user = service.get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users/")
async def create_user(
    user_in: UserCreate,
    service: UserService = Depends(get_user_service)
):
    # Check for duplicate email
    existing = service.get_by_email(user_in.email)
    if existing:
        raise HTTPException(
            status_code=400,
            detail="Email already registered"
        )
    return service.create(user_in)

Explanation: This class-based approach uses Python generics to create a reusable BaseService that works with any model. The TypeVar and Generic imports allow type hints for different models and schemas. The service class encapsulates all database operations, making your route handlers cleaner. Notice how we use exclude_unset=True in the update method to only update fields that were actually provided. The get_user_service dependency creates a service instance with the database session, following FastAPI’s dependency injection pattern.

Step-by-Step Exercise

Now it’s time to practice what you’ve learned. Follow these steps to build a complete database integration feature:

Exercise: Building a Blog Post System with Tags

Objective: Create a system where blog posts can have multiple tags, with proper database relationships and API endpoints.

Step 1: Create the Database Models

# models.py
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Table
from sqlalchemy.orm import relationship
from datetime import datetime

# Association table for many-to-many relationship
post_tags = Table(
    'post_tags',
    Base.metadata,
    Column('post_id', Integer, ForeignKey('posts.id')),
    Column('tag_id', Integer, ForeignKey('tags.id'))
)

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    author = relationship("User", back_populates="posts")
    tags = relationship("Tag", secondary=post_tags, back_populates="posts")

class Tag(Base):
    __tablename__ = "tags"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    
    posts = relationship("Post", secondary=post_tags, back_populates="tags")

Step 2: Create Pydantic Schemas

# schemas.py
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

class TagBase(BaseModel):
    name: str

class TagCreate(TagBase):
    pass

class Tag(TagBase):
    id: int
    
    class Config:
        from_attributes = True

class PostBase(BaseModel):
    title: str
    content: str

class PostCreate(PostBase):
    tag_ids: List[int] = []

class PostUpdate(PostBase):
    tag_ids: Optional[List[int]] = None

class Post(PostBase):
    id: int
    author_id: int
    created_at: datetime
    updated_at: datetime
    tags: List[Tag] = []
    
    class Config:
        from_attributes = True

Step 3: Implement the API Endpoints

# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session

app = FastAPI()

@app.post("/posts/", response_model=Post)
async def create_post(
    post: PostCreate,
    db: Session = Depends(get_db)
):
    """Create a new post with optional tags."""
    # Create the post
    db_post = Post(
        title=post.title,
        content=post.content,
        author_id=1  # In real app, get from auth
    )
    
    # Add tags if provided
    if post.tag_ids:
        tags = db.query(Tag).filter(Tag.id.in_(post.tag_ids)).all()
        if len(tags) != len(post.tag_ids):
            raise HTTPException(status_code=404, detail="Some tags not found")
        db_post.tags = tags
    
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post

@app.get("/posts/", response_model=List[Post])
async def list_posts(
    skip: int = 0,
    limit: int = 10,
    tag: Optional[str] = None,
    db: Session = Depends(get_db)
):
    """List posts with optional tag filter."""
    query = db.query(Post)
    
    if tag:
        query = query.join(Post.tags).filter(Tag.name == tag)
    
    return query.offset(skip).limit(limit).all()

@app.get("/posts/{post_id}", response_model=Post)
async def get_post(post_id: int, db: Session = Depends(get_db)):
    """Get a single post by ID."""
    post = db.query(Post).filter(Post.id == post_id).first()
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    return post

@app.put("/posts/{post_id}", response_model=Post)
async def update_post(
    post_id: int,
    post_update: PostUpdate,
    db: Session = Depends(get_db)
):
    """Update a post and its tags."""
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if not db_post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    # Update basic fields
    if post_update.title is not None:
        db_post.title = post_update.title
    if post_update.content is not None:
        db_post.content = post_update.content
    
    # Update tags if provided
    if post_update.tag_ids is not None:
        tags = db.query(Tag).filter(Tag.id.in_(post_update.tag_ids)).all()
        if len(tags) != len(post_update.tag_ids):
            raise HTTPException(status_code=404, detail="Some tags not found")
        db_post.tags = tags
    
    db.commit()
    db.refresh(db_post)
    return db_post

@app.delete("/posts/{post_id}")
async def delete_post(post_id: int, db: Session = Depends(get_db)):
    """Delete a post."""
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if not db_post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    db.delete(db_post)
    db.commit()
    return {"message": "Post deleted successfully"}

Step 4: Test Your Implementation

# Create some tags first
curl -X POST "http://localhost:8000/tags/" -H "Content-Type: application/json" -d '{"name": "python"}'
curl -X POST "http://localhost:8000/tags/" -H "Content-Type: application/json" -d '{"name": "fastapi"}'
curl -X POST "http://localhost:8000/tags/" -H "Content-Type: application/json" -d '{"name": "database"}'

# Create a post with tags
curl -X POST "http://localhost:8000/posts/" 
  -H "Content-Type: application/json" 
  -d '{"title": "Getting Started with FastAPI", "content": "FastAPI is awesome...", "tag_ids": [1, 2]}'

# List posts filtered by tag
curl "http://localhost:8000/posts/?tag=python"

# Update post tags
curl -X PUT "http://localhost:8000/posts/1" 
  -H "Content-Type: application/json" 
  -d '{"tag_ids": [1, 2, 3]}'

Step 5: Verify the Results

Check that your API returns the correct data with proper relationships. The post should include its tags in the response, and filtering by tag should work correctly. This exercise demonstrates many-to-many relationships, pagination, filtering, and complete CRUD operations in a real-world context.

Interview and Job Use Cases

Understanding database integration with FastAPI is crucial for technical interviews and real-world development. Here are common scenarios you’ll encounter:

Performance Optimization Questions

Question: “How would you optimize a slow API endpoint that queries a large database table?”

Answer: Start by adding proper database indexes on frequently queried columns. Use SQLAlchemy’s selectinload or joinedload to avoid N+1 query problems. Implement pagination with cursor-based pagination for large datasets. Consider adding Redis caching for frequently accessed data. Example optimization:

from sqlalchemy.orm import selectinload

# Optimized query with eager loading
@app.get("/posts-with-comments/")
async def get_posts_with_comments(db: Session = Depends(get_db)):
    posts = db.query(Post).options(
        selectinload(Post.comments),
        selectinload(Post.tags)
    ).all()
    return posts

Transaction Management

Question: “How do you handle multiple database operations that must all succeed or fail together?”

Answer: Use database transactions with proper error handling. Here’s a real-world example of creating an order with multiple items:

from contextlib import contextmanager

@contextmanager
def transaction(db: Session):
    """Context manager for database transactions."""
    try:
        yield
        db.commit()
    except Exception:
        db.rollback()
        raise

@app.post("/orders/")
async def create_order(
    order_data: OrderCreate,
    db: Session = Depends(get_db)
):
    """Create an order with multiple items in a transaction."""
    with transaction(db):
        # Create the order
        db_order = Order(
            user_id=order_data.user_id,
            total_amount=0
        )
        db.add(db_order)
        db.flush()  # Get order ID without committing
        
        total = 0
        for item in order_data.items:
            # Check inventory
            product = db.query(Product).with_for_update().filter(
                Product.id == item.product_id
            ).first()
            
            if not product or product.stock < item.quantity:
                raise HTTPException(
                    status_code=400,
                    detail=f"Insufficient stock for {product.name}"
                )
            
            # Create order item
            db_item = OrderItem(
                order_id=db_order.id,
                product_id=item.product_id,
                quantity=item.quantity,
                price=product.price
            )
            db.add(db_item)
            
            # Update inventory
            product.stock -= item.quantity
            total += product.price * item.quantity
        
        db_order.total_amount = total
    
    db.refresh(db_order)
    return db_order

Real-World Job Scenario

Scenario: “You’re building a SaaS platform where users can create projects and invite team members. Design the database schema and API endpoints.”

Solution: This requires careful consideration of relationships and permissions:

class Project(Base):
    __tablename__ = "projects"
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    owner_id = Column(Integer, ForeignKey("users.id"))
    created_at = Column(DateTime, default=datetime.utcnow)
    
    owner = relationship("User", back_populates="owned_projects")
    members = relationship("ProjectMember", back_populates="project")

class ProjectMember(Base):
    __tablename__ = "project_members"
    
    id = Column(Integer, primary_key=True)
    project_id = Column(Integer, ForeignKey("projects.id"))
    user_id = Column(Integer, ForeignKey("users.id"))
    role = Column(String(20), default="member")  # owner, admin, member, viewer
    joined_at = Column(DateTime, default=datetime.utcnow)
    
    project = relationship("Project", back_populates="members")
    user = relationship("User")

@app.post("/projects/{project_id}/invite/")
async def invite_member(
    project_id: int,
    invite: InviteCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """Invite a user to a project."""
    # Check permissions
    project = db.query(Project).filter(Project.id == project_id).first()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
    
    if project.owner_id != current_user.id:
        raise HTTPException(status_code=403, detail="Only project owner can invite")
    
    # Check if user exists
    invited_user = db.query(User).filter(User.email == invite.email).first()
    if not invited_user:
        raise HTTPException(status_code=404, detail="User not found")
    
    # Check if already a member
    existing = db.query(ProjectMember).filter(
        ProjectMember.project_id == project_id,
        ProjectMember.user_id == invited_user.id
    ).first()
    if existing:
        raise HTTPException(status_code=400, detail="User already a member")
    
    # Add member
    member = ProjectMember(
        project_id=project_id,
        user_id=invited_user.id,
        role=invite.role
    )
    db.add(member)
    db.commit()
    
    return {"message": f"Invited {invite.email} to project"}

Extra Beginner FAQs

1. What’s the difference between SQLite and PostgreSQL for development?

SQLite is perfect for development and testing because it requires no server setup and stores data in a single file. However, it doesn’t support concurrent writes well and lacks some PostgreSQL features like full-text search and advanced indexing. For production, always use PostgreSQL. You can easily switch between them by changing the database URL in your configuration.

2. Why do I get “database is locked” errors with SQLite?

SQLite has limitations with concurrent writes. If multiple requests try to write simultaneously, you’ll see this error. Solutions include: using SQLite only for development, implementing retry logic, or using a queue system. For production, switch to PostgreSQL which handles concurrent writes natively.

# Retry logic for SQLite
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def safe_db_operation(db: Session):
    try:
        # Your database operation
        db.commit()
    except Exception as e:
        db.rollback()
        raise

3. How do I handle database migrations safely in production?

Always test migrations on a staging environment first. Use Alembic’s --sql option to generate SQL scripts for review before applying. Create backups before running migrations. Use the downgrade command to revert if something goes wrong. Consider using blue-green deployment where you can switch between database versions.

# Generate SQL for review
alembic upgrade head --sql > migration_review.sql

# Apply migration
alembic upgrade head

# If something goes wrong, downgrade
alembic downgrade -1

4. What’s the best way to handle database connections in FastAPI?

Use FastAPI’s dependency injection system with a session factory. Create a single SessionLocal instance per request. Never share sessions across requests. Here’s the recommended pattern:

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False}  # Only for SQLite
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

5. How do I prevent SQL injection attacks?

SQLAlchemy automatically parameterizes queries when you use its ORM methods, preventing SQL injection. Never use raw SQL with string formatting. If you must use raw SQL, always use parameterized queries:

# Safe - SQLAlchemy ORM
users = db.query(User).filter(User.email == email).all()

# Also safe - parameterized raw SQL
result = db.execute(
    text("SELECT * FROM users WHERE email = :email"),
    {"email": email}
)

# DANGEROUS - never do this
result = db.execute(f"SELECT * FROM users WHERE email = '{email}'")

6. Why are my database changes not saving?

Common reasons include: forgetting to call db.commit(), not refreshing the object after commit, or session scope issues. Remember that db.add() only stages the change – you must call db.commit() to persist. After commit, call db.refresh(obj) to get auto-generated values like IDs.

7. How do I handle database errors gracefully?

Use FastAPI’s exception handlers to catch database errors and return user-friendly messages. Here’s a complete error handling setup:

from fastapi import Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import SQLAlchemyError, IntegrityError

@app.exception_handler(SQLAlchemyError)
async def database_exception_handler(request: Request, exc: SQLAlchemyError):
    return JSONResponse(
        status_code=500,
        content={
            "detail": "A database error occurred",
            "error_type": type(exc).__name__
        }
    )

@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
    return JSONResponse(
        status_code=400,
        content={
            "detail": "This operation violates database constraints. "
                     "Check for duplicate entries or missing required fields."
        }
    )

Remember, database integration is a foundational skill for FastAPI development. Practice these examples, understand the patterns, and you’ll be well-prepared for both interviews and real-world projects. Start with SQLite for learning, then graduate to PostgreSQL for production applications.

Leave a Reply

Your email address will not be published. Required fields are marked *