FastAPI 完全教程(含 AI 应用开发重点)
·
结合你的 Spring Boot 背景和 AI 开发目标,我把对照和**流式响应(AI 项目核心)**重点标出来。
FastAPI 完全教程(含 AI 应用开发重点)
一、FastAPI 是什么 + 对照 Spring Boot
FastAPI 是 Python 的现代 Web 框架,特点:快、自动生成文档、原生异步、类型驱动。
| 概念 | Spring Boot | FastAPI |
|---|---|---|
| 框架 | Spring Boot | FastAPI |
| 控制器 | @RestController |
app 实例 + 路由装饰器 |
| 路由 | @GetMapping |
@app.get() |
| 请求体 | @RequestBody DTO |
Pydantic 模型 |
| 校验 | @Valid |
Pydantic 自动校验 |
| 依赖注入 | @Autowired |
Depends() |
| 服务器 | 内嵌 Tomcat | Uvicorn (ASGI) |
| 文档 | Swagger 需配置 | 自带,零配置 |
二、安装与第一个应用
# 安装
pip install fastapi uvicorn[standard]
# 或用现代工具
uv add fastapi "uvicorn[standard]"
# main.py
from fastapi import FastAPI
app = FastAPI(
title="我的AI服务",
description="API文档描述",
version="1.0.0"
)
@app.get("/")
def read_root():
return {"message": "Hello FastAPI"}
# 启动(--reload 热重载,开发用)
uvicorn main:app --reload --port 8000
启动后自动有:
- 服务:
http://localhost:8000 - Swagger 文档:
http://localhost:8000/docs(自动生成,可直接测试) - ReDoc 文档:
http://localhost:8000/redoc
三、路由与 HTTP 方法
@app.get("/items") # 查询
def get_items():
return {"items": []}
@app.post("/items") # 创建
def create_item():
return {"created": True}
@app.put("/items/{id}") # 更新
def update_item(id: int):
return {"updated": id}
@app.delete("/items/{id}") # 删除
def delete_item(id: int):
return {"deleted": id}
@app.patch("/items/{id}") # 部分更新
def patch_item(id: int):
return {"patched": id}
四、路径参数与查询参数
# 路径参数:URL 的一部分,类型自动转换+校验
@app.get("/users/{user_id}")
def get_user(user_id: int): # 自动把字符串转 int,转不了返回422
return {"user_id": user_id}
# 查询参数:URL 问号后面的,函数参数里非路径的都是
# GET /search?keyword=python&page=2&size=10
@app.get("/search")
def search(
keyword: str, # 必填
page: int = 1, # 有默认值=可选
size: int = 10
):
return {"keyword": keyword, "page": page, "size": size}
# 可选参数用 Optional 或 | None
from typing import Optional
@app.get("/filter")
def filter_items(category: str | None = None):
return {"category": category}
# 参数校验(Query 加约束)
from fastapi import Query
@app.get("/products")
def products(
q: str = Query(min_length=3, max_length=50), # 长度限制
page: int = Query(default=1, ge=1, le=100) # 数值范围 1-100
):
return {"q": q, "page": page}
五、请求体(Pydantic 模型,核心)
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
# 定义请求模型(相当于 Java 的 DTO)
class UserCreate(BaseModel):
name: str
age: int = Field(gt=0, le=150) # 0<age<=150
email: EmailStr # 自动校验邮箱格式
bio: Optional[str] = None # 可选
tags: list[str] = [] # 默认空列表
# 接收请求体
@app.post("/users")
def create_user(user: UserCreate): # FastAPI 自动解析JSON+校验
return {
"name": user.name,
"age": user.age
}
# 请求:POST /users body: {"name":"Alice","age":25,"email":"a@x.com"}
# 校验失败自动返回 422 + 详细错误信息
# 嵌套模型
class Address(BaseModel):
city: str
street: str
class UserWithAddress(BaseModel):
name: str
address: Address # 嵌套
addresses: list[Address] = [] # 嵌套列表
# 模型方法与校验器
from pydantic import field_validator
class Order(BaseModel):
price: float
quantity: int
@field_validator("price")
@classmethod
def price_must_positive(cls, v):
if v <= 0:
raise ValueError("价格必须大于0")
return v
@property
def total(self) -> float:
return self.price * self.quantity
六、响应模型与状态码
from fastapi import status
class UserResponse(BaseModel):
id: int
name: str
# 注意:没有 password 字段,敏感数据不返回
# response_model 控制返回结构(自动过滤多余字段)
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate):
# 即使返回了 password,response_model 也会过滤掉
return {
"id": 1,
"name": user.name,
"password": "secret" # 不会出现在响应里
}
# 自定义响应
from fastapi.responses import JSONResponse
@app.get("/custom")
def custom():
return JSONResponse(
status_code=200,
content={"message": "ok"},
headers={"X-Custom": "value"}
)
七、依赖注入(Depends,重点)
这是 FastAPI 的精髓,相当于 Spring 的 @Autowired + 拦截器:
from fastapi import Depends, Header, HTTPException
# 1. 函数依赖(可复用的逻辑)
def pagination(page: int = 1, size: int = 10):
return {"page": page, "size": size, "offset": (page-1)*size}
@app.get("/items")
def list_items(pg: dict = Depends(pagination)): # 注入分页逻辑
return pg
# 2. 鉴权依赖(最常用,相当于拦截器)
def verify_token(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未授权")
token = authorization.replace("Bearer ", "")
# 验证 token...
return {"user_id": 123}
@app.get("/profile")
def profile(user: dict = Depends(verify_token)): # 需要登录才能访问
return {"user_id": user["user_id"]}
# 3. 类依赖(带状态的服务,相当于 @Service)
class UserService:
def get_user(self, id: int):
return {"id": id, "name": "Alice"}
def get_user_service():
return UserService()
@app.get("/users/{id}")
def get_user(id: int, service: UserService = Depends(get_user_service)):
return service.get_user(id)
# 4. 全局依赖(所有接口都生效)
app = FastAPI(dependencies=[Depends(verify_token)])
八、异步(AI 开发必备)
import httpx
import asyncio
# 异步路由(IO密集场景,如调用大模型API)
@app.get("/async-data")
async def get_async_data(): # 用 async def
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com") # 不阻塞
return resp.json()
# 并发调用多个外部API
@app.get("/aggregate")
async def aggregate():
async with httpx.AsyncClient() as client:
results = await asyncio.gather( # 同时发起,等全部完成
client.get("https://api1.com"),
client.get("https://api2.com"),
client.get("https://api3.com"),
)
return [r.json() for r in results]
# 什么时候用 async:
# - 调用外部API、数据库、文件IO → 用 async
# - 纯CPU计算 → 用普通 def(FastAPI自动放线程池)
九、流式响应 SSE(AI 项目核心,你 Java 项目用的就是这个)
from fastapi.responses import StreamingResponse
import asyncio
import json
# 这就是你 Java 项目 ResponseBodyEmitter 的 Python 版!
@app.post("/chat/stream")
async def chat_stream(message: str):
async def generate():
# 模拟大模型逐字返回
words = ["你好", ",", "我是", "AI", "助手"]
for word in words:
data = json.dumps({"content": word}, ensure_ascii=False)
yield f"data: {data}\n\n" # SSE 格式
await asyncio.sleep(0.3)
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream", # SSE 媒体类型
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
# 真实场景:流式调用 OpenAI
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key="sk-xxx")
@app.post("/ai/chat")
async def ai_chat(message: str):
async def generate():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message}],
stream=True # 开启流式
)
async for chunk in stream: # 逐块接收
content = chunk.choices[0].delta.content
if content:
yield f"data: {json.dumps({'content': content})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
十、异常处理
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
# 抛出 HTTP 异常
@app.get("/items/{id}")
def get_item(id: int):
if id > 100:
raise HTTPException(
status_code=404,
detail="商品不存在"
)
return {"id": id}
# 自定义异常
class BizException(Exception):
def __init__(self, code: int, message: str):
self.code = code
self.message = message
# 全局异常处理器(相当于 @ControllerAdvice)
@app.exception_handler(BizException)
async def biz_exception_handler(request: Request, exc: BizException):
return JSONResponse(
status_code=400,
content={"code": exc.code, "message": exc.message}
)
@app.get("/business")
def business():
raise BizException(5001, "业务异常")
十一、中间件(拦截所有请求)
import time
from fastapi import Request
# 相当于 Spring 的 Filter/Interceptor
@app.middleware("http")
async def add_process_time(request: Request, call_next):
start = time.time()
response = await call_next(request) # 执行后续处理
process_time = time.time() - start
response.headers["X-Process-Time"] = str(process_time)
print(f"{request.method} {request.url.path} 耗时 {process_time:.3f}s")
return response
# CORS 跨域(前后端分离必配)
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应指定具体域名
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=True,
)
十二、文件上传与后台任务
from fastapi import UploadFile, File, BackgroundTasks
# 文件上传
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
content = await file.read() # 读取内容
with open(f"uploads/{file.filename}", "wb") as f:
f.write(content)
return {
"filename": file.filename,
"size": len(content),
"content_type": file.content_type
}
# 后台任务(请求返回后异步执行,如发邮件、写日志)
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(message + "\n")
@app.post("/notify")
def notify(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"通知发给 {email}")
return {"message": "已接收"} # 立即返回,日志后台写
十三、数据库集成(SQLAlchemy)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker, Session
# 配置
engine = create_engine("mysql+pymysql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
# 定义模型(相当于 Entity)
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
# 依赖:获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db # 用 yield,请求结束自动关闭
finally:
db.close()
# 使用
@app.get("/db-users/{id}")
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(UserModel).filter(UserModel.id == id).first()
if not user:
raise HTTPException(404, "用户不存在")
return {"id": user.id, "name": user.name}
@app.post("/db-users")
def create_user(name: str, email: str, db: Session = Depends(get_db)):
user = UserModel(name=name, email=email)
db.add(user)
db.commit()
db.refresh(user)
return {"id": user.id}
十四、JWT 认证
from jose import jwt
from datetime import datetime, timedelta
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
# 生成 token
def create_token(user_id: int) -> str:
payload = {
"sub": str(user_id),
"exp": datetime.utcnow() + timedelta(hours=2)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# 验证 token 的依赖
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
return int(payload["sub"])
except Exception:
raise HTTPException(401, "无效的token")
# 登录
@app.post("/login")
def login(username: str, password: str):
# 验证账密...
token = create_token(user_id=1)
return {"access_token": token, "token_type": "bearer"}
# 受保护接口
@app.get("/me")
def me(user_id: int = Depends(get_current_user)):
return {"user_id": user_id}
十五、项目结构组织(生产级)
myproject/
├── app/
│ ├── main.py # 入口,创建 app
│ ├── config.py # 配置
│ ├── routers/ # 路由(按模块分)
│ │ ├── users.py
│ │ └── chat.py
│ ├── models/ # 数据库模型
│ ├── schemas/ # Pydantic 模型
│ ├── services/ # 业务逻辑
│ └── dependencies.py # 公共依赖
├── requirements.txt
└── .env
# routers/users.py —— 用 APIRouter 拆分路由(相当于多个Controller)
from fastapi import APIRouter
router = APIRouter(prefix="/users", tags=["用户"])
@router.get("/")
def list_users():
return []
@router.get("/{id}")
def get_user(id: int):
return {"id": id}
# main.py —— 注册路由
from fastapi import FastAPI
from app.routers import users, chat
app = FastAPI()
app.include_router(users.router)
app.include_router(chat.router)
十六、配置管理
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
app_name: str = "AI服务"
openai_api_key: str
database_url: str
debug: bool = False
class Config:
env_file = ".env" # 自动读取 .env 文件
settings = Settings()
# .env 文件
# OPENAI_API_KEY=sk-xxx
# DATABASE_URL=mysql://...
# 使用
@app.get("/config")
def get_config():
return {"app_name": settings.app_name}
十七、部署
# 开发
uvicorn main:app --reload
# 生产(多进程)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# 或用 gunicorn + uvicorn worker
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
完整 AI 服务示例(综合实战)
把上面的串起来,一个最小可用的 AI 对话服务:
from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import json, asyncio
app = FastAPI(title="AI对话服务")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# 请求模型
class ChatRequest(BaseModel):
message: str
session_id: str
max_tokens: int = 1000
# 鉴权依赖
def verify_api_key(authorization: str = None):
# 简化示例
return {"user": "demo"}
# 流式对话接口
@app.post("/api/v1/chat")
async def chat(request: ChatRequest, user: dict = Depends(verify_api_key)):
async def generate():
# 这里替换成真实的 OpenAI 流式调用
response = f"收到你的消息:{request.message}"
for char in response:
yield f"data: {json.dumps({'content': char}, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.05)
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
# 健康检查
@app.get("/health")
def health():
return {"status": "ok"}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)