FastAPI Complete Course FastAPI Module 9

FastAPI Complete Course Module 9: Authentication & Authorization

Introduction

Welcome to Module 9 of the FastAPI Complete Course. In this chapter, we will dive deep into FastAPI Authentication & Authorization. Security is a critical aspect of any web application. Without proper authentication, your API is vulnerable to unauthorized access. By the end of this tutorial, you will be able to implement a complete authentication system, including user registration, login, JWT-based protected routes, and role-based access control (RBAC).

We will build everything step-by-step, from defining Pydantic schemas to creating secure endpoints. This module is designed for beginners aiming for job-ready skills, so we will explain every line of code.

User Registration API

The first step in authentication is allowing users to register. We need a way to store user information securely. Let’s start by defining our data models and schemas.

Defining the User Model (SQLAlchemy)

We will use SQLAlchemy as our ORM. Create a file models.py:

from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True, nullable=False)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    role = Column(String, default="user")  # "user" or "admin"

Explanation:

  • id: Primary key for the user.
  • username and email: Unique identifiers.
  • hashed_password: We never store plain text passwords.
  • is_active: Allows us to deactivate accounts.
  • role: For role-based access control later.

Pydantic Schemas (schemas.py)

Create a file schemas.py for request and response models:

from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

class UserOut(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool
    role: str

    class Config:
        orm_mode = True

Explanation:

  • UserCreate: Used for registration. Contains password (plain text).
  • UserOut: Response model. Never returns the password.
  • orm_mode = True: Allows conversion from SQLAlchemy models.

Password Hashing

Storing passwords in plain text is a major security risk. We will use passlib with bcrypt for hashing.

Installing passlib

pip install passlib[bcrypt]

Creating a Utility Module (utils.py)

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

Explanation:

  • CryptContext: Manages hashing algorithms. We specify bcrypt.
  • hash_password: Converts plain text to a secure hash.
  • verify_password: Compares a plain text password with a stored hash.

JWT Authentication

JSON Web Tokens (JWT) are a compact and secure way to transmit information between parties. We will use python-jose for JWT creation and verification.

Installing python-jose

pip install python-jose[cryptography]

JWT Configuration (config.py)

from pydantic import BaseSettings

class Settings(BaseSettings):
    SECRET_KEY: str = "your-secret-key-here"  # Change in production!
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

    class Config:
        env_file = ".env"

settings = Settings()

Explanation:

  • SECRET_KEY: Used to sign tokens. Keep it secret!
  • ALGORITHM: We use HS256 (HMAC with SHA-256).
  • ACCESS_TOKEN_EXPIRE_MINUTES: Token expiry time for security.

Token Creation and Verification (auth.py)

from datetime import datetime, timedelta
from jose import JWTError, jwt
from .config import settings

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt

def verify_token(token: str):
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        return payload
    except JWTError:
        return None

Explanation:

  • create_access_token: Encodes a dictionary (e.g., {“sub”: user_id}) into a JWT.
  • verify_token: Decodes and validates the token. Returns None if invalid.

Login API

Now we create an endpoint to authenticate users and return a JWT.

Database Dependency (database.py)

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"  # Use PostgreSQL in production
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Login Endpoint (main.py)

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from . import models, schemas, utils, auth, database

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

@app.post("/register", response_model=schemas.UserOut)
def register(user: schemas.UserCreate, db: Session = Depends(database.get_db)):
    # Check if username or email already exists
    existing_user = db.query(models.User).filter(
        (models.User.username == user.username) | (models.User.email == user.email)
    ).first()
    if existing_user:
        raise HTTPException(status_code=400, detail="Username or email already registered")
    
    # Hash the password
    hashed_pw = utils.hash_password(user.password)
    
    # Create new user
    new_user = models.User(
        username=user.username,
        email=user.email,
        hashed_password=hashed_pw
    )
    db.add(new_user)
    db.commit()
    db.refresh(new_user)
    return new_user

@app.post("/login")
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(database.get_db)):
    # Find user by username
    user = db.query(models.User).filter(models.User.username == form_data.username).first()
    if not user:
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    
    # Verify password
    if not utils.verify_password(form_data.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    
    # Create JWT
    access_token = auth.create_access_token(data={"sub": str(user.id), "role": user.role})
    return {"access_token": access_token, "token_type": "bearer"}

Explanation:

  • /register: Accepts username, email, password. Hashes password, stores user.
  • /login: Uses OAuth2PasswordRequestForm (username + password fields). Returns JWT.
  • oauth2_scheme: Tells FastAPI to extract token from Authorization header.

Protected Routes

Now we protect endpoints by requiring a valid JWT.

Dependency for Getting Current User

from fastapi import Depends, HTTPException, status
from jose import JWTError
from . import auth, models, database
from sqlalchemy.orm import Session

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(database.get_db)):
    payload = auth.verify_token(token)
    if payload is None:
        raise HTTPException(status_code=401, detail="Invalid or expired token")
    
    user_id = payload.get("sub")
    if user_id is None:
        raise HTTPException(status_code=401, detail="Invalid token payload")
    
    user = db.query(models.User).filter(models.User.id == int(user_id)).first()
    if user is None:
        raise HTTPException(status_code=401, detail="User not found")
    return user

Protected Endpoint Example

@app.get("/users/me", response_model=schemas.UserOut)
def read_users_me(current_user: models.User = Depends(get_current_user)):
    return current_user

@app.get("/protected-data")
def get_protected_data(current_user: models.User = Depends(get_current_user)):
    return {"message": f"Hello {current_user.username}, this is protected data!"}

Explanation:

  • get_current_user: Decodes token, fetches user from DB, returns user object.
  • Any endpoint with current_user dependency is now protected.
  • If token is missing or invalid, FastAPI returns 401 automatically.

Role-Based Access Control (RBAC)

Sometimes you need to restrict access based on user roles (e.g., admin-only endpoints).

Role Checker Dependency

def require_role(required_role: str):
    def role_checker(current_user: models.User = Depends(get_current_user)):
        if current_user.role != required_role:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return current_user
    return role_checker

Admin-Only Endpoint

@app.get("/admin/dashboard")
def admin_dashboard(admin_user: models.User = Depends(require_role("admin"))):
    return {"message": f"Welcome Admin {admin_user.username}!"}

@app.get("/users/all")
def get_all_users(db: Session = Depends(database.get_db), 
                  admin_user: models.User = Depends(require_role("admin"))):
    users = db.query(models.User).all()
    return users

Explanation:

  • require_role: A factory function that returns a dependency.
  • Only users with role “admin” can access these endpoints.
  • Other users get a 403 Forbidden error.

Common Mistakes

  1. Storing passwords in plain text: Always hash passwords using bcrypt or similar.
  2. Exposing the SECRET_KEY: Never commit it to version control. Use environment variables.
  3. Not validating token expiry: Always check the exp claim in JWT.
  4. Using weak algorithms: Stick to HS256 or RS256. Avoid “none” algorithm.
  5. Ignoring CORS: If your frontend is on a different domain, configure CORS properly.
  6. Not handling token refresh: For long sessions, implement refresh tokens.

Practice Task

Build a complete mini-project with the following requirements:

  1. Create a FastAPI app with user registration and login.
  2. Implement password hashing using bcrypt.
  3. Use JWT for authentication (token expires in 15 minutes).
  4. Create a protected route that returns the current user’s profile.
  5. Add an admin-only route that lists all users.
  6. Test all endpoints using Swagger UI or Postman.

Bonus: Add a role field during registration (default “user”) and allow only admins to change roles.

Summary

In this chapter, we covered the complete flow of FastAPI Authentication & Authorization. You learned:

  • How to create a User model with SQLAlchemy.
  • How to hash passwords securely using passlib.
  • How to generate and verify JWT tokens.
  • How to build registration and login endpoints.
  • How to protect routes using dependencies.
  • How to implement role-based access control.

These skills are essential for any production-ready API. You now have a solid foundation to secure your FastAPI applications.

FAQs

1. What is the difference between authentication and authorization?

Authentication verifies who you are (login). Authorization determines what you can access (roles/permissions).

2. Why should I use JWT instead of session-based authentication?

JWT is stateless, scalable, and works well with mobile apps and microservices. Sessions require server-side storage.

3. How do I refresh an expired token?

Implement a /refresh endpoint that accepts a refresh token (long-lived) and returns a new access token. Store refresh tokens securely.

4. Is it safe to store JWT in localStorage?

Not recommended for production. Use httpOnly cookies to prevent XSS attacks. For SPAs, consider using Authorization headers with short-lived tokens.

5. How do I test protected routes in Swagger UI?

Click “Authorize” button, enter your token as Bearer <token>. All subsequent requests will include the token.

Next up: Module 10 – Database Migrations with Alembic. You’ll learn how to manage database schema changes safely in production. Stay tuned!

More Practical Examples

Token Refresh Endpoint

In real-world applications, access tokens typically have a short lifespan (e.g., 15–30 minutes) to limit damage if they are stolen. A refresh token, which lives longer, allows users to obtain new access tokens without logging in again. Here’s how to implement a refresh endpoint:

from datetime import timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# Assume you already have SECRET_KEY, ALGORITHM, and ACCESS_TOKEN_EXPIRE_MINUTES
REFRESH_TOKEN_EXPIRE_DAYS = 7

class TokenRefreshRequest(BaseModel):
    refresh_token: str

class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    refresh_token: str | None = None

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

@app.post("/token/refresh", response_model=TokenResponse)
async def refresh_access_token(request: TokenRefreshRequest):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid refresh token",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "refresh":
            raise credentials_exception
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    # Optionally check if user still exists in database
    user = await fake_db.get_user(username)
    if user is None:
        raise credentials_exception

    new_access_token = create_access_token(data={"sub": username})
    return TokenResponse(access_token=new_access_token)

Explanation: This code adds a /token/refresh endpoint. It accepts a refresh token, verifies it has the correct type ("refresh"), extracts the username, and issues a new access token. Notice we include a "type" claim in both tokens to distinguish them. The refresh token is stored client-side (e.g., in an HTTP-only cookie or secure local storage) and used only to get new access tokens. This pattern is standard in production apps.

Password Reset with Token

Another common feature is allowing users to reset their password via email. Here’s a simplified version using a short-lived token:

from fastapi import BackgroundTasks
import secrets

# In-memory store for reset tokens (use Redis/DB in production)
reset_tokens = {}

def send_reset_email(email: str, token: str):
    # In real app, use an email service (SendGrid, SMTP, etc.)
    print(f"Password reset link: http://yourapp.com/reset?token={token}")

@app.post("/forgot-password")
async def forgot_password(email: str, background_tasks: BackgroundTasks):
    # Check if email exists in database
    user = await fake_db.get_user_by_email(email)
    if not user:
        # Return success anyway to avoid revealing user existence
        return {"msg": "If that email exists, a reset link has been sent."}

    # Generate a secure random token
    token = secrets.token_urlsafe(32)
    reset_tokens[token] = {"email": email, "expires": datetime.utcnow() + timedelta(hours=1)}

    # Send email in background to avoid blocking
    background_tasks.add_task(send_reset_email, email, token)
    return {"msg": "If that email exists, a reset link has been sent."}

@app.post("/reset-password")
async def reset_password(token: str, new_password: str):
    stored = reset_tokens.get(token)
    if not stored or stored["expires"] < datetime.utcnow():
        raise HTTPException(status_code=400, detail="Invalid or expired token")

    # Hash the new password
    hashed_password = pwd_context.hash(new_password)
    # Update user in database
    await fake_db.update_password(stored["email"], hashed_password)

    # Remove used token
    del reset_tokens[token]
    return {"msg": "Password reset successful."}

Explanation: The /forgot-password endpoint generates a cryptographically secure token and stores it with an expiration time. It uses BackgroundTasks to send the email without slowing the response. The /reset-password endpoint validates the token, hashes the new password using the same pwd_context from earlier, and updates the database. Always return the same message whether the email exists or not to prevent email enumeration attacks.

Class-Based Example

While FastAPI works beautifully with functions, you can also organize authentication logic using classes. This is useful when you need to reuse logic across multiple endpoints or want to follow an OOP approach. Here’s a class-based authentication handler:

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from typing import Optional

class AuthHandler:
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

    def create_access_token(self, data: dict, expires_minutes: int = 30) -> str:
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(minutes=expires_minutes)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)

    def verify_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return payload
        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials",
                headers={"WWW-Authenticate": "Bearer"},
            )

    def get_current_user(self, token: str = Depends(oauth2_scheme)) -> dict:
        payload = self.verify_token(token)
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid token payload")
        # In real app, fetch user from DB here
        return {"username": username}

# Usage
auth = AuthHandler(secret_key="your-secret-key")

@app.post("/login-class")
async def login_class(username: str, password: str):
    # Verify credentials (pseudo-code)
    user = await fake_db.authenticate_user(username, password)
    if not user:
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    access_token = auth.create_access_token(data={"sub": username})
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/protected-class")
async def protected_route(current_user: dict = Depends(auth.get_current_user)):
    return {"message": f"Hello, {current_user['username']}! This is a protected route."}

Explanation: The AuthHandler class encapsulates all JWT logic. Its constructor takes the secret key and algorithm, and it exposes methods for creating tokens, verifying them, and extracting the current user. The get_current_user method is designed to be used as a FastAPI dependency via Depends(). This pattern makes it easy to inject the same authentication logic into any route. You can also extend the class to support roles, token refresh, or different token types.

Step-by-Step Exercise

Let’s build a complete mini-project step by step. You’ll create a simple task manager API with user registration, login, and role-based access control.

Step 1: Setup

Create a new directory and set up a virtual environment:

mkdir fastapi-auth-exercise
cd fastapi-auth-exercise
python -m venv venv
source venv/bin/activate  # On Windows: venvScriptsactivate
pip install fastapi uvicorn python-jose passlib[bcrypt] python-multipart

Step 2: Create the Database Models

Create a file models.py:

from pydantic import BaseModel
from typing import Optional

class User(BaseModel):
    username: str
    email: str
    full_name: Optional[str] = None
    disabled: bool = False
    role: str = "user"  # "admin" or "user"

class UserInDB(User):
    hashed_password: str

class Task(BaseModel):
    id: int
    title: str
    description: Optional[str] = None
    owner: str
    completed: bool = False

Step 3: Implement Authentication

Create auth.py with the core logic:

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = fake_users_db.get(username)
    if user is None:
        raise credentials_exception
    return UserInDB(**user)

async def get_current_active_user(current_user: UserInDB = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# Role checker dependency
def role_required(required_role: str):
    async def role_checker(current_user: UserInDB = Depends(get_current_active_user)):
        if current_user.role != required_role:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions"
            )
        return current_user
    return role_checker

Step 4: Create the Main App

Create main.py:

from fastapi import FastAPI, Depends, HTTPException
from models import User, UserInDB, Task
from auth import (
    get_password_hash, verify_password, create_access_token,
    get_current_active_user, role_required
)

app = FastAPI()

# Fake database (use a real DB in production)
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "email": "john@example.com",
        "full_name": "John Doe",
        "hashed_password": get_password_hash("secret123"),
        "disabled": False,
        "role": "user"
    },
    "admin": {
        "username": "admin",
        "email": "admin@example.com",
        "full_name": "Admin User",
        "hashed_password": get_password_hash("admin123"),
        "disabled": False,
        "role": "admin"
    }
}

tasks_db = []
task_counter = 0

@app.post("/register")
async def register(user: User, password: str):
    if user.username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already exists")
    fake_users_db[user.username] = {
        "username": user.username,
        "email": user.email,
        "full_name": user.full_name,
        "hashed_password": get_password_hash(password),
        "disabled": False,
        "role": user.role
    }
    return {"msg": "User created successfully"}

@app.post("/token")
async def login(username: str, password: str):
    user = fake_users_db.get(username)
    if not user or not verify_password(password, user["hashed_password"]):
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    access_token = create_access_token(data={"sub": username})
    return {"access_token": access_token, "token_type": "bearer"}

@app.post("/tasks", response_model=Task)
async def create_task(
    title: str,
    description: str = None,
    current_user: UserInDB = Depends(get_current_active_user)
):
    global task_counter
    task_counter += 1
    task = Task(
        id=task_counter,
        title=title,
        description=description,
        owner=current_user.username,
        completed=False
    )
    tasks_db.append(task)
    return task

@app.get("/tasks", response_model=list[Task])
async def get_my_tasks(current_user: UserInDB = Depends(get_current_active_user)):
    return [t for t in tasks_db if t.owner == current_user.username]

@app.get("/admin/tasks", response_model=list[Task])
async def get_all_tasks(
    current_user: UserInDB = Depends(role_required("admin"))
):
    return tasks_db

@app.delete("/tasks/{task_id}")
async def delete_task(
    task_id: int,
    current_user: UserInDB = Depends(get_current_active_user)
):
    for task in tasks_db:
        if task.id == task_id:
            if task.owner != current_user.username and current_user.role != "admin":
                raise HTTPException(status_code=403, detail="Not your task")
            tasks_db.remove(task)
            return {"msg": "Task deleted"}
    raise HTTPException(status_code=404, detail="Task not found")

Step 5: Test Your API

Run the server:

uvicorn main:app --reload

Now test with curl or the interactive docs at http://localhost:8000/docs:

# Register a user
curl -X POST "http://localhost:8000/register?password=test123" 
  -H "Content-Type: application/json" 
  -d '{"username":"testuser","email":"test@example.com","role":"user"}'

# Login and get token
curl -X POST "http://localhost:8000/token?username=testuser&password=test123"

# Use the token to create a task
curl -X POST "http://localhost:8000/tasks?title=My+first+task" 
  -H "Authorization: Bearer YOUR_TOKEN_HERE"

# Try admin-only endpoint as regular user (should fail)
curl "http://localhost:8000/admin/tasks" 
  -H "Authorization: Bearer YOUR_TOKEN_HERE"

Interview and Job Use Cases

Common Interview Questions

  • Q: Explain the difference between authentication and authorization.
    A: Authentication verifies who you are (e.g., via username/password or JWT). Authorization determines what you can do (e.g., role-based access to endpoints). In FastAPI, authentication is handled by OAuth2PasswordBearer and JWT decoding; authorization is implemented via custom dependencies like role_required().
  • Q: Why use JWT instead of session-based auth?
    A: JWTs are stateless—no server-side session storage needed. They work well for APIs and microservices. However, they can’t be revoked easily, so use short expiration times and refresh tokens.
  • Q: How do you securely store passwords?
    A: Never store plain text. Use a strong hashing algorithm like bcrypt (via PassLib). Always add a salt (bcrypt does this automatically). Hash on registration, verify on login.
  • Q: How would you implement rate limiting on login?
    A: Track failed attempts per IP or username in Redis. After N failures (e.g., 5), block for a cooldown period. Use FastAPI middleware or a dependency that checks a counter before processing the request.

Real-World Job Scenarios

  • Building a SaaS product: You’ll need multi-tenant authentication where each user belongs to an organization. Implement a tenant ID in the JWT and filter all database queries by it.
  • API for a mobile app: Use refresh tokens stored securely on the device. The app silently refreshes the access token when it expires, providing a seamless user experience.
  • Admin dashboard: Implement granular permissions (e.g., “can edit users”, “can view reports”) instead of just “admin” vs “user”. Store permissions in the database and include them in the JWT as a list of strings.
  • Third-party API integration: Use OAuth2 with external providers (Google, GitHub). FastAPI’s python-social-auth or authlib can handle the OAuth flow, returning a JWT for your own system.

Extra Beginner FAQs

1. What’s the difference between OAuth2PasswordBearer and OAuth2PasswordRequestForm?

OAuth2PasswordBearer is a dependency that extracts the token from the Authorization header (Bearer scheme). OAuth2PasswordRequestForm is a form dependency for the login endpoint—it expects username and password form fields. They work together: the login endpoint uses the form, returns a token; protected endpoints use the bearer dependency to extract and validate that token.

2. Why do I get “401 Unauthorized” even with a valid token?

Common causes: (a) Your token expired—check the exp claim. (b) You’re using the wrong secret key or algorithm. (c) The token was generated for a different audience (aud claim). (d) You have a typo in the Authorization header (e.g., missing “Bearer ” prefix). Use jwt.io to decode and inspect your token.

3. How do I log out?

JWTs are stateless—you can’t “invalidate” them server-side without a blacklist. Common approaches: (a) Set a very short expiration (e.g., 5 minutes) and use refresh tokens. (b) Maintain a server-side blacklist of revoked tokens (stored in Redis). (c) On logout, delete the token from the client side (e.g., remove from local storage). For high-security apps, use option (b) with a database check on every request.

4. Is it safe to store JWTs in localStorage?

It’s convenient but vulnerable to XSS attacks—if an attacker injects JavaScript, they can steal the token. A more secure option is to store the access token in memory (a JavaScript variable) and the refresh token in an HTTP-only cookie. The cookie is not accessible to JavaScript, preventing XSS theft, but be aware of CSRF attacks—use SameSite=Strict and CSRF tokens.

5. How do I handle token expiration gracefully on the frontend?

Intercept 401 responses from your API. When you get a 401, try to refresh the token using the refresh endpoint. If that also fails, redirect the user to the login page. Most HTTP client libraries (axios, fetch) allow you to set up an interceptor for this. Example with fetch:

// Pseudo-code for frontend
async function apiCall(url, options) {
  let response = await fetch(url, options);
  if (response.status === 401) {
    const newToken = await refreshToken();
    if (newToken) {
      options.headers.Authorization = `Bearer ${newToken}`;
      response = await fetch(url, options);
    } else {
      window.location.href = '/login';
    }
  }
  return response;
}

6. What’s the best way to test authentication in FastAPI?

Use FastAPI’s TestClient with a fixture that creates a test user and obtains a token. For example:

from fastapi.testclient import TestClient

def test_protected_route():
    client = TestClient(app)
    # First, login
    response = client.post("/token", data={"username": "testuser", "password": "testpass"})
    token = response.json()["access_token"]
    # Then access protected route
    response = client.get("/protected", headers={"Authorization": f"Bearer {token}"})
    assert response.status_code == 200

For unit tests, you can also mock the get_current_user dependency to return a fake user without needing a real token.

7. Why use Depends() instead of manually calling functions?

Depends() integrates with FastAPI’s dependency injection system. It automatically handles: (a) Sub-dependencies (e.g., get_current_active_user depends on get_current_user). (b) Caching within a request—if two endpoints both use the same dependency, it’s only called once. (c) OpenAPI schema generation—your docs will show the required security scheme. Manually calling functions would require you to replicate all this logic.

Leave a Reply

Your email address will not be published. Required fields are marked *