结合你的 Spring Boot 背景和 AI 开发目标,我把对照和**流式响应(AI 项目核心)**重点标出来。

FastAPI 完全教程(含 AI 应用开发重点)


一、FastAPI 是什么 + 对照 Spring Boot

FastAPI 是 Python 的现代 Web 框架,特点:快、自动生成文档、原生异步、类型驱动

概念 Spring Boot FastAPI
框架 Spring Boot FastAPI
控制器 @RestController app 实例 + 路由装饰器
路由 @GetMapping @app.get()
请求体 @RequestBody DTO Pydantic 模型
校验 @Valid Pydantic 自动校验
依赖注入 @Autowired Depends()
服务器 内嵌 Tomcat Uvicorn (ASGI)
文档 Swagger 需配置 自带,零配置

二、安装与第一个应用

# 安装
pip install fastapi uvicorn[standard]
# 或用现代工具
uv add fastapi "uvicorn[standard]"
# main.py
from fastapi import FastAPI

app = FastAPI(
    title="我的AI服务",
    description="API文档描述",
    version="1.0.0"
)

@app.get("/")
def read_root():
    return {"message": "Hello FastAPI"}
# 启动(--reload 热重载,开发用)
uvicorn main:app --reload --port 8000

启动后自动有:

  • 服务:http://localhost:8000
  • Swagger 文档http://localhost:8000/docs(自动生成,可直接测试)
  • ReDoc 文档:http://localhost:8000/redoc

三、路由与 HTTP 方法

@app.get("/items")          # 查询
def get_items():
    return {"items": []}

@app.post("/items")         # 创建
def create_item():
    return {"created": True}

@app.put("/items/{id}")     # 更新
def update_item(id: int):
    return {"updated": id}

@app.delete("/items/{id}")  # 删除
def delete_item(id: int):
    return {"deleted": id}

@app.patch("/items/{id}")   # 部分更新
def patch_item(id: int):
    return {"patched": id}

四、路径参数与查询参数

# 路径参数:URL 的一部分,类型自动转换+校验
@app.get("/users/{user_id}")
def get_user(user_id: int):        # 自动把字符串转 int,转不了返回422
    return {"user_id": user_id}

# 查询参数:URL 问号后面的,函数参数里非路径的都是
# GET /search?keyword=python&page=2&size=10
@app.get("/search")
def search(
    keyword: str,                  # 必填
    page: int = 1,                 # 有默认值=可选
    size: int = 10
):
    return {"keyword": keyword, "page": page, "size": size}

# 可选参数用 Optional 或 | None
from typing import Optional
@app.get("/filter")
def filter_items(category: str | None = None):
    return {"category": category}

# 参数校验(Query 加约束)
from fastapi import Query
@app.get("/products")
def products(
    q: str = Query(min_length=3, max_length=50),     # 长度限制
    page: int = Query(default=1, ge=1, le=100)       # 数值范围 1-100
):
    return {"q": q, "page": page}

五、请求体(Pydantic 模型,核心)

from pydantic import BaseModel, Field, EmailStr
from typing import Optional

# 定义请求模型(相当于 Java 的 DTO)
class UserCreate(BaseModel):
    name: str
    age: int = Field(gt=0, le=150)           # 0<age<=150
    email: EmailStr                          # 自动校验邮箱格式
    bio: Optional[str] = None                # 可选
    tags: list[str] = []                     # 默认空列表

# 接收请求体
@app.post("/users")
def create_user(user: UserCreate):           # FastAPI 自动解析JSON+校验
    return {
        "name": user.name,
        "age": user.age
    }
# 请求:POST /users  body: {"name":"Alice","age":25,"email":"a@x.com"}
# 校验失败自动返回 422 + 详细错误信息

# 嵌套模型
class Address(BaseModel):
    city: str
    street: str

class UserWithAddress(BaseModel):
    name: str
    address: Address                         # 嵌套
    addresses: list[Address] = []            # 嵌套列表

# 模型方法与校验器
from pydantic import field_validator

class Order(BaseModel):
    price: float
    quantity: int

    @field_validator("price")
    @classmethod
    def price_must_positive(cls, v):
        if v <= 0:
            raise ValueError("价格必须大于0")
        return v

    @property
    def total(self) -> float:
        return self.price * self.quantity

六、响应模型与状态码

from fastapi import status

class UserResponse(BaseModel):
    id: int
    name: str
    # 注意:没有 password 字段,敏感数据不返回

# response_model 控制返回结构(自动过滤多余字段)
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate):
    # 即使返回了 password,response_model 也会过滤掉
    return {
        "id": 1,
        "name": user.name,
        "password": "secret"      # 不会出现在响应里
    }

# 自定义响应
from fastapi.responses import JSONResponse
@app.get("/custom")
def custom():
    return JSONResponse(
        status_code=200,
        content={"message": "ok"},
        headers={"X-Custom": "value"}
    )

七、依赖注入(Depends,重点)

这是 FastAPI 的精髓,相当于 Spring 的 @Autowired + 拦截器:

from fastapi import Depends, Header, HTTPException

# 1. 函数依赖(可复用的逻辑)
def pagination(page: int = 1, size: int = 10):
    return {"page": page, "size": size, "offset": (page-1)*size}

@app.get("/items")
def list_items(pg: dict = Depends(pagination)):   # 注入分页逻辑
    return pg

# 2. 鉴权依赖(最常用,相当于拦截器)
def verify_token(authorization: str = Header(None)):
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="未授权")
    token = authorization.replace("Bearer ", "")
    # 验证 token...
    return {"user_id": 123}

@app.get("/profile")
def profile(user: dict = Depends(verify_token)):   # 需要登录才能访问
    return {"user_id": user["user_id"]}

# 3. 类依赖(带状态的服务,相当于 @Service)
class UserService:
    def get_user(self, id: int):
        return {"id": id, "name": "Alice"}

def get_user_service():
    return UserService()

@app.get("/users/{id}")
def get_user(id: int, service: UserService = Depends(get_user_service)):
    return service.get_user(id)

# 4. 全局依赖(所有接口都生效)
app = FastAPI(dependencies=[Depends(verify_token)])

八、异步(AI 开发必备)

import httpx
import asyncio

# 异步路由(IO密集场景,如调用大模型API)
@app.get("/async-data")
async def get_async_data():          # 用 async def
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com")  # 不阻塞
        return resp.json()

# 并发调用多个外部API
@app.get("/aggregate")
async def aggregate():
    async with httpx.AsyncClient() as client:
        results = await asyncio.gather(          # 同时发起,等全部完成
            client.get("https://api1.com"),
            client.get("https://api2.com"),
            client.get("https://api3.com"),
        )
        return [r.json() for r in results]

# 什么时候用 async:
# - 调用外部API、数据库、文件IO → 用 async
# - 纯CPU计算 → 用普通 def(FastAPI自动放线程池)

九、流式响应 SSE(AI 项目核心,你 Java 项目用的就是这个)

from fastapi.responses import StreamingResponse
import asyncio
import json

# 这就是你 Java 项目 ResponseBodyEmitter 的 Python 版!
@app.post("/chat/stream")
async def chat_stream(message: str):
    async def generate():
        # 模拟大模型逐字返回
        words = ["你好", ",", "我是", "AI", "助手"]
        for word in words:
            data = json.dumps({"content": word}, ensure_ascii=False)
            yield f"data: {data}\n\n"        # SSE 格式
            await asyncio.sleep(0.3)
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",      # SSE 媒体类型
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

# 真实场景:流式调用 OpenAI
from openai import AsyncOpenAI

client = AsyncOpenAI(api_key="sk-xxx")

@app.post("/ai/chat")
async def ai_chat(message: str):
    async def generate():
        stream = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": message}],
            stream=True                       # 开启流式
        )
        async for chunk in stream:            # 逐块接收
            content = chunk.choices[0].delta.content
            if content:
                yield f"data: {json.dumps({'content': content})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

十、异常处理

from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse

# 抛出 HTTP 异常
@app.get("/items/{id}")
def get_item(id: int):
    if id > 100:
        raise HTTPException(
            status_code=404,
            detail="商品不存在"
        )
    return {"id": id}

# 自定义异常
class BizException(Exception):
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message

# 全局异常处理器(相当于 @ControllerAdvice)
@app.exception_handler(BizException)
async def biz_exception_handler(request: Request, exc: BizException):
    return JSONResponse(
        status_code=400,
        content={"code": exc.code, "message": exc.message}
    )

@app.get("/business")
def business():
    raise BizException(5001, "业务异常")

十一、中间件(拦截所有请求)

import time
from fastapi import Request

# 相当于 Spring 的 Filter/Interceptor
@app.middleware("http")
async def add_process_time(request: Request, call_next):
    start = time.time()
    response = await call_next(request)       # 执行后续处理
    process_time = time.time() - start
    response.headers["X-Process-Time"] = str(process_time)
    print(f"{request.method} {request.url.path} 耗时 {process_time:.3f}s")
    return response

# CORS 跨域(前后端分离必配)
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],                      # 生产环境应指定具体域名
    allow_methods=["*"],
    allow_headers=["*"],
    allow_credentials=True,
)

十二、文件上传与后台任务

from fastapi import UploadFile, File, BackgroundTasks

# 文件上传
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
    content = await file.read()               # 读取内容
    with open(f"uploads/{file.filename}", "wb") as f:
        f.write(content)
    return {
        "filename": file.filename,
        "size": len(content),
        "content_type": file.content_type
    }

# 后台任务(请求返回后异步执行,如发邮件、写日志)
def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(message + "\n")

@app.post("/notify")
def notify(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, f"通知发给 {email}")
    return {"message": "已接收"}              # 立即返回,日志后台写

十三、数据库集成(SQLAlchemy)

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker, Session

# 配置
engine = create_engine("mysql+pymysql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()

# 定义模型(相当于 Entity)
class UserModel(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# 依赖:获取数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db                              # 用 yield,请求结束自动关闭
    finally:
        db.close()

# 使用
@app.get("/db-users/{id}")
def get_user(id: int, db: Session = Depends(get_db)):
    user = db.query(UserModel).filter(UserModel.id == id).first()
    if not user:
        raise HTTPException(404, "用户不存在")
    return {"id": user.id, "name": user.name}

@app.post("/db-users")
def create_user(name: str, email: str, db: Session = Depends(get_db)):
    user = UserModel(name=name, email=email)
    db.add(user)
    db.commit()
    db.refresh(user)
    return {"id": user.id}

十四、JWT 认证

from jose import jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

# 生成 token
def create_token(user_id: int) -> str:
    payload = {
        "sub": str(user_id),
        "exp": datetime.utcnow() + timedelta(hours=2)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

# 验证 token 的依赖
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        return int(payload["sub"])
    except Exception:
        raise HTTPException(401, "无效的token")

# 登录
@app.post("/login")
def login(username: str, password: str):
    # 验证账密...
    token = create_token(user_id=1)
    return {"access_token": token, "token_type": "bearer"}

# 受保护接口
@app.get("/me")
def me(user_id: int = Depends(get_current_user)):
    return {"user_id": user_id}

十五、项目结构组织(生产级)

myproject/
├── app/
│   ├── main.py              # 入口,创建 app
│   ├── config.py           # 配置
│   ├── routers/            # 路由(按模块分)
│   │   ├── users.py
│   │   └── chat.py
│   ├── models/             # 数据库模型
│   ├── schemas/            # Pydantic 模型
│   ├── services/           # 业务逻辑
│   └── dependencies.py     # 公共依赖
├── requirements.txt
└── .env
# routers/users.py —— 用 APIRouter 拆分路由(相当于多个Controller)
from fastapi import APIRouter

router = APIRouter(prefix="/users", tags=["用户"])

@router.get("/")
def list_users():
    return []

@router.get("/{id}")
def get_user(id: int):
    return {"id": id}

# main.py —— 注册路由
from fastapi import FastAPI
from app.routers import users, chat

app = FastAPI()
app.include_router(users.router)
app.include_router(chat.router)

十六、配置管理

# config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    app_name: str = "AI服务"
    openai_api_key: str
    database_url: str
    debug: bool = False

    class Config:
        env_file = ".env"        # 自动读取 .env 文件

settings = Settings()

# .env 文件
# OPENAI_API_KEY=sk-xxx
# DATABASE_URL=mysql://...

# 使用
@app.get("/config")
def get_config():
    return {"app_name": settings.app_name}

十七、部署

# 开发
uvicorn main:app --reload

# 生产(多进程)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# 或用 gunicorn + uvicorn worker
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

完整 AI 服务示例(综合实战)

把上面的串起来,一个最小可用的 AI 对话服务:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import json, asyncio

app = FastAPI(title="AI对话服务")

app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])

# 请求模型
class ChatRequest(BaseModel):
    message: str
    session_id: str
    max_tokens: int = 1000

# 鉴权依赖
def verify_api_key(authorization: str = None):
    # 简化示例
    return {"user": "demo"}

# 流式对话接口
@app.post("/api/v1/chat")
async def chat(request: ChatRequest, user: dict = Depends(verify_api_key)):
    async def generate():
        # 这里替换成真实的 OpenAI 流式调用
        response = f"收到你的消息:{request.message}"
        for char in response:
            yield f"data: {json.dumps({'content': char}, ensure_ascii=False)}\n\n"
            await asyncio.sleep(0.05)
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

# 健康检查
@app.get("/health")
def health():
    return {"status": "ok"}

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐