用FastAPI调用DeepSeek API搭智能助手
用FastAPI调用DeepSeek API搭智能助手
文章信息
- 标题:用FastAPI调用DeepSeek API搭智能助手
- 字数:4200字
- 预估阅读时间:18分钟
- 难度:⭐⭐⭐☆☆
一、为什么选择DeepSeek?
2024年底至2025年初,DeepSeek-V3和DeepSeek-R1的发布彻底改变了大模型的价格格局。DeepSeek API的核心优势:
- 价格极低:DeepSeek-V3输入约$0.27/M tokens,仅为GPT-4o的1/9左右(数据来源:DeepSeek官方定价页,价格变动频繁,请查阅最新定价)
- 性能强劲:在MMLU(大规模多任务语言理解)、HumanEval(代码生成能力)等基准测试中与GPT-4o持平
- 国产可用:国内直接访问,无需代理,延迟低
- 兼容OpenAI格式:API接口与OpenAI完全兼容,切换成本为零
| 模型 | 输入价格($/M tokens) | 输出价格($/M tokens) | 上下文长度 |
|---|---|---|---|
| DeepSeek-V3 | ~0.27 | ~1.10 | 64K |
| DeepSeek-R1 | ~0.55 | ~2.19 | 64K |
| GPT-4o | ~2.50 | ~10.00 | 128K |
| GPT-4o-mini | ~0.15 | ~0.60 | 128K |
| Qwen-Max | ~2.00 | ~6.00 | 32K |
注意:以上为参考价格,大模型价格变动频繁,实际价格请查阅DeepSeek定价、OpenAI定价、通义千问定价。
选型建议:日常对话和代码生成用DeepSeek-V3,复杂推理用DeepSeek-R1,需要超长上下文才考虑GPT-4o。
二、环境配置
2.1 项目结构
fastapi-deepseek/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI入口
│ ├── config.py # 配置管理
│ ├── models.py # Pydantic模型
│ ├── services/
│ │ ├── __init__.py
│ │ └── deepseek.py # DeepSeek服务封装
│ └── routes/
│ ├── __init__.py
│ └── chat.py # 聊天路由
├── .env # 环境变量
├── .env.example # 环境变量示例
├── requirements.txt
└── pyproject.toml
2.2 安装依赖
# 创建项目
mkdir fastapi-deepseek && cd fastapi-deepseek
uv venv --python 3.12
source .venv/bin/activate
# 安装依赖
uv pip install fastapi uvicorn httpx pydantic-settings python-dotenv sse-starlette
# 验证安装
python -c "import httpx; print(f'httpx {httpx.__version__}')"
2.3 环境变量配置
创建.env文件:
# DeepSeek API配置
DEEPSEEK_API_KEY=sk-your-api-key-here
DEEPSEEK_BASE_URL=https://api.deepseek.com
DEEPSEEK_DEFAULT_MODEL=deepseek-chat
DEEPSEEK_DEFAULT_MAX_TOKENS=2048
DEEPSEEK_DEFAULT_TEMPERATURE=0.7
# 服务配置
APP_HOST=0.0.0.0
APP_PORT=8000
APP_DEBUG=true
2.4 配置管理
"""app/config.py - 配置管理"""
from pydantic_settings import BaseSettings
from functools import lru_cache
class DeepSeekSettings(BaseSettings):
"""DeepSeek API配置"""
api_key: str
base_url: str = "https://api.deepseek.com"
default_model: str = "deepseek-chat"
default_max_tokens: int = 2048
default_temperature: float = 0.7
model_config = {"env_prefix": "DEEPSEEK_", "env_file": ".env"}
class AppSettings(BaseSettings):
"""应用配置"""
host: str = "0.0.0.0"
port: int = 8000
debug: bool = False
model_config = {"env_prefix": "APP_", "env_file": ".env"}
@lru_cache
def get_deepseek_settings() -> DeepSeekSettings:
return DeepSeekSettings()
@lru_cache
def get_app_settings() -> AppSettings:
return AppSettings()
三、Pydantic模型定义
"""app/models.py - 请求/响应模型"""
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
class MessageRole(str, Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
class ChatMessage(BaseModel):
"""聊天消息"""
role: MessageRole
content: str
class ChatRequest(BaseModel):
"""聊天请求"""
messages: list[ChatMessage] = Field(
...,
description="对话消息列表",
min_length=1,
)
model: Optional[str] = Field(
None,
description="模型名称,默认使用配置中的模型",
)
max_tokens: Optional[int] = Field(
None,
ge=1,
le=8192,
description="最大生成token数",
)
temperature: Optional[float] = Field(
None,
ge=0.0,
le=2.0,
description="温度参数,控制随机性",
)
stream: bool = Field(
False,
description="是否使用流式输出",
)
class ChatResponse(BaseModel):
"""聊天响应"""
content: str
model: str
usage: dict # {"prompt_tokens": n, "completion_tokens": n, "total_tokens": n}
class StreamChunk(BaseModel):
"""流式响应块"""
delta: str
model: str
finished: bool = False
四、DeepSeek服务封装
4.1 为什么用httpx而不是openai SDK?
| 对比项 | httpx | openai SDK |
|---|---|---|
| 依赖大小 | ~200KB | ~2MB |
| 流式支持 | 原生SSE(Server-Sent Events) | 内置streaming |
| 多API兼容 | 需自己封装 | 仅限OpenAI |
| 底层控制 | 完全可控 | 黑盒封装 |
| 超时配置 | 灵活 | 需要hack |
结论:DeepSeek兼容OpenAI格式,用httpx直接调HTTP接口更轻量、更可控。如果后续要对接多种API,httpx的灵活性是关键。
4.2 完整服务实现
创建app/services/deepseek.py。服务类封装了 DeepSeek API 的同步调用和流式调用,使用 httpx.AsyncClient 复用连接池。
先定义类和同步调用方法:
import httpx
import json
from typing import AsyncGenerator
from app.config import get_deepseek_settings
from app.models import ChatMessage, ChatResponse, StreamChunk
class DeepSeekService:
def __init__(self):
settings = get_deepseek_settings()
self.api_key = settings.api_key
self.base_url = settings.base_url.rstrip("/")
self.default_model = settings.default_model
self.default_max_tokens = settings.default_max_tokens
self.default_temperature = settings.default_temperature
# 持久化 HTTP 客户端,复用 TCP 连接
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
timeout=httpx.Timeout(60.0, connect=10.0),
)
async def chat(
self, messages: list[ChatMessage],
model: str | None = None, max_tokens: int | None = None, temperature: float | None = None,
) -> ChatResponse:
payload = {
"model": model or self.default_model,
"messages": [m.model_dump() for m in messages],
"max_tokens": max_tokens if max_tokens is not None else self.default_max_tokens,
"temperature": temperature if temperature is not None else self.default_temperature,
}
response = await self.client.post("/v1/chat/completions", json=payload)
if response.status_code != 200:
raise Exception(f"DeepSeek API错误 [{response.status_code}]: {response.text}")
data = response.json()
return ChatResponse(
content=data["choices"][0]["message"]["content"],
model=data["model"],
usage=data.get("usage", {}),
)
流式调用使用 client.stream() 接收 SSE(Server-Sent Events)数据,逐块返回:
async def chat_stream(
self, messages: list[ChatMessage],
model: str | None = None, max_tokens: int | None = None, temperature: float | None = None,
) -> AsyncGenerator[StreamChunk, None]:
payload = {
"model": model or self.default_model,
"messages": [m.model_dump() for m in messages],
"max_tokens": max_tokens if max_tokens is not None else self.default_max_tokens,
"temperature": temperature if temperature is not None else self.default_temperature,
"stream": True,
}
async with self.client.stream("POST", "/v1/chat/completions", json=payload) as response:
if response.status_code != 200:
raise Exception(f"DeepSeek API错误 [{response.status_code}]: {(await response.aread()).decode()}")
async for line in response.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str.strip() == "[DONE]":
yield StreamChunk(delta="", model=model or self.default_model, finished=True)
break
try:
data = json.loads(data_str)
delta = data["choices"][0].get("delta", {})
yield StreamChunk(
delta=delta.get("content", ""),
model=data.get("model", self.default_model),
finished=data["choices"][0].get("finish_reason") == "stop",
)
except (json.JSONDecodeError, KeyError, IndexError):
continue
async def close(self):
await self.client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
_deepseek_service: DeepSeekService | None = None
async def get_deepseek_service() -> DeepSeekService:
global _deepseek_service
if _deepseek_service is None:
_deepseek_service = DeepSeekService()
return _deepseek_service
temperature if temperature is not None而不是temperature or 0.7:因为temperature=0.0是合法值(最确定性的输出),但0.0 or 0.7会返回0.7。
五、FastAPI路由集成
5.1 聊天路由
"""app/routes/chat.py - 聊天接口"""
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse
from app.models import ChatRequest, ChatResponse, StreamChunk
from app.services.deepseek import get_deepseek_service, DeepSeekService
import asyncio
router = APIRouter(prefix="/api/v1", tags=["聊天"])
@router.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest,
service: DeepSeekService = Depends(get_deepseek_service),
):
"""同步聊天接口"""
response = await service.chat(
messages=request.messages,
model=request.model,
max_tokens=request.max_tokens,
temperature=request.temperature,
)
return response
@router.post("/chat/stream")
async def chat_stream(
request: ChatRequest,
service: DeepSeekService = Depends(get_deepseek_service),
):
"""流式聊天接口(SSE)"""
async def event_generator():
try:
async for chunk in service.chat_stream(
messages=request.messages,
model=request.model,
max_tokens=request.max_tokens,
temperature=request.temperature,
):
yield {
"event": "message",
"data": chunk.model_dump_json(),
}
# 控制发送频率,避免客户端处理不过来
if not chunk.finished:
await asyncio.sleep(0.01)
yield {"event": "done", "data": ""}
except Exception as e:
yield {
"event": "error",
"data": str(e),
}
return EventSourceResponse(event_generator())
5.2 应用入口
创建app/main.py。lifespan 函数管理应用启动和关闭时的资源清理:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.routes import chat
from app.services.deepseek import _deepseek_service
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # 启动时无需初始化,服务懒加载
if _deepseek_service is not None:
await _deepseek_service.close() # 关闭时清理连接池
app = FastAPI(
title="FastAPI + DeepSeek 智能助手",
description="基于DeepSeek API的智能对话服务",
version="1.0.0",
lifespan=lifespan,
)
app.include_router(chat.router)
@app.get("/health")
async def health_check():
return {"status": "ok"}
启动命令:uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
5.3 启动服务
# 启动开发服务器
python -m app.main
# 或使用uvicorn直接启动(支持热重载)
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
访问 http://localhost:8000/docs 查看Swagger文档。
六、多轮对话实现
6.1 会话管理
实际应用中需要维护对话历史。下面是一个基于内存的会话管理器:
"""app/services/session.py - 会话管理"""
from collections import defaultdict
from datetime import datetime
from app.models import ChatMessage, MessageRole
import asyncio
class SessionManager:
"""会话管理器(内存存储)"""
def __init__(self, max_history: int = 20, max_sessions: int = 1000):
self._sessions: dict[str, list[ChatMessage]] = defaultdict(list)
self._max_history = max_history # 每个会话最大消息数
self._max_sessions = max_sessions
self._lock = asyncio.Lock()
async def add_message(self, session_id: str, role: MessageRole, content: str):
"""添加消息到会话"""
async with self._lock:
if len(self._sessions) >= self._max_sessions:
# FIFO淘汰:删除最早的会话
oldest_key = next(iter(self._sessions))
del self._sessions[oldest_key]
self._sessions[session_id].append(
ChatMessage(role=role, content=content)
)
# 超过最大历史长度,保留system + 最近的消息
if len(self._sessions[session_id]) > self._max_history:
messages = self._sessions[session_id]
system_msgs = [m for m in messages if m.role == MessageRole.SYSTEM]
other_msgs = messages[-(self._max_history - len(system_msgs)):]
self._sessions[session_id] = system_msgs + other_msgs
async def get_messages(self, session_id: str) -> list[ChatMessage]:
"""获取会话历史"""
return self._sessions.get(session_id, [])
async def clear_session(self, session_id: str):
"""清空会话"""
self._sessions.pop(session_id, None)
# 全局单例
session_manager = SessionManager()
6.2 带会话的聊天接口
在无状态接口的基础上,增加一个会话管理接口。客户端只需传 content,服务端自动维护对话历史。
先定义请求/响应模型:
from pydantic import BaseModel
class SessionChatRequest(BaseModel):
session_id: str | None = None # 为空则自动创建新会话
content: str
model: str | None = None
max_tokens: int | None = None
temperature: float | None = None
class SessionChatResponse(BaseModel):
session_id: str
content: str
model: str
usage: dict
然后实现路由——核心流程是:添加用户消息 → 获取历史 → 调用 API → 保存回复:
from fastapi import APIRouter, Depends
from app.models import MessageRole
from app.services.deepseek import get_deepseek_service, DeepSeekService
from app.services.session import session_manager
import uuid
router = APIRouter(prefix="/api/v1", tags=["聊天"])
@router.post("/chat/session", response_model=SessionChatResponse)
async def chat_with_session(
request: SessionChatRequest,
service: DeepSeekService = Depends(get_deepseek_service),
):
session_id = request.session_id or str(uuid.uuid4())
await session_manager.add_message(session_id, MessageRole.USER, request.content)
messages = await session_manager.get_messages(session_id)
response = await service.chat(
messages=messages, model=request.model,
max_tokens=request.max_tokens, temperature=request.temperature,
)
await session_manager.add_message(session_id, MessageRole.ASSISTANT, response.content)
return SessionChatResponse(
session_id=session_id, content=response.content,
model=response.model, usage=response.usage,
)
session_id的作用:客户端首次请求不传session_id,服务端返回新创建的 ID。后续请求带上这个 ID,就能延续对话。
七、系统提示词设计
系统提示词是控制AI行为的核心手段。不同的系统提示词设计对输出质量影响巨大。
"""app/prompts.py - 系统提示词模板"""
from enum import Enum
class SystemPrompt(str, Enum):
"""预定义系统提示词"""
GENERAL_ASSISTANT = """你是一个有用的AI助手。请用简洁、准确的中文回答问题。
规则:
1. 如果不确定,直接说不知道,不要编造
2. 技术问题优先给出代码示例
3. 回答长度控制在500字以内,除非用户要求详细说明"""
CODE_ASSISTANT = """你是一个高级编程助手,精通Python、JavaScript、Go等主流语言。
规则:
1. 代码必须完整可运行,不要省略
2. 优先使用类型注解
3. 关键代码添加注释
4. 给出多种实现方案并对比优劣
5. 提及时间/空间复杂度"""
TRANSLATOR = """你是一个专业翻译,中英文互译。
规则:
1. 保持原文的语气和风格
2. 技术术语保留英文原文
3. 翻译自然流畅,不要直译"""
def build_system_prompt(
base_prompt: str,
context: str | None = None,
constraints: list[str] | None = None,
) -> str:
"""构建系统提示词"""
parts = [base_prompt]
if context:
parts.append(f"\n\n## 背景信息\n{context}")
if constraints:
constraint_text = "\n".join(f"- {c}" for c in constraints)
parts.append(f"\n\n## 额外约束\n{constraint_text}")
return "".join(parts)
使用示例:
from app.prompts import SystemPrompt, build_system_prompt
# 基础提示词
system_msg = ChatMessage(role=MessageRole.SYSTEM, content=SystemPrompt.CODE_ASSISTANT.value)
# 自定义提示词
custom_prompt = build_system_prompt(
base_prompt=SystemPrompt.GENERAL_ASSISTANT.value,
context="你正在帮助一个Python后端开发者调试FastAPI应用。",
constraints=["回答不超过300字", "优先使用async/await"],
)
八、价格优化策略
8.1 DeepSeek缓存命中
DeepSeek API支持前缀缓存(prefix caching)。当连续请求的system prompt相同时,API会自动缓存,被缓存部分的token按0.1倍计费。
"""app/services/deepseek.py - 添加缓存优化提示"""
class DeepSeekService:
async def chat_with_cache_hint(self, messages: list[ChatMessage], **kwargs) -> ChatResponse:
"""带缓存优化的调用"""
payload = {
"model": kwargs.get("model") or self.default_model,
"messages": [m.model_dump() for m in messages],
"max_tokens": kwargs.get("max_tokens") or self.default_max_tokens,
"temperature": kwargs.get("temperature") or self.default_temperature,
}
# 关键:确保system prompt始终在第一条,且完全一致
# 这样DeepSeek可以命中前缀缓存
response = await self.client.post("/v1/chat/completions", json=payload)
if response.status_code != 200:
raise Exception(f"API错误: {response.text}")
data = response.json()
usage = data.get("usage", {})
# 检查缓存命中情况
cached_tokens = usage.get("prompt_cache_hit_tokens", 0)
if cached_tokens > 0:
print(f"缓存命中 {cached_tokens} tokens,节省 {(cached_tokens / usage['prompt_tokens']):.0%}")
return ChatResponse(
content=data["choices"][0]["message"]["content"],
model=data["model"],
usage=usage,
)
8.2 模型降级策略
简单问题用便宜的模型,复杂问题用贵的模型:
"""app/services/model_selector.py - 模型选择器"""
from app.models import ChatMessage, MessageRole
class ModelSelector:
"""根据问题复杂度自动选择模型"""
# 复杂度关键词
COMPLEX_KEYWORDS = [
"架构设计", "系统设计", "性能优化", "分布式",
"算法", "递归", "动态规划", "机器学习",
"详细分析", "对比分析", "深入解析",
"refactor", "design pattern", "architecture",
]
def select_model(self, messages: list[ChatMessage]) -> str:
"""选择合适的模型"""
# 获取最后一条用户消息
user_msg = next(
(m for m in reversed(messages) if m.role == MessageRole.USER),
None,
)
if not user_msg:
return "deepseek-chat" # 默认V3
# 检查消息长度
if len(user_msg.content) > 1000:
return "deepseek-reasoner" # 长问题用R1
# 检查复杂度关键词
for keyword in self.COMPLEX_KEYWORDS:
if keyword.lower() in user_msg.content.lower():
return "deepseek-reasoner"
# 简单问题用mini模型
if len(user_msg.content) < 50:
return "deepseek-chat"
return "deepseek-chat"
九、前端调用示例
流式输出的前端调用:
// 前端流式调用示例
async function streamChat(messages) {
const response = await fetch('/api/v1/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages, stream: true }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const chunk = JSON.parse(line.slice(6));
if (chunk.delta) {
document.getElementById('output').textContent += chunk.delta; // 逐字输出
}
} catch (e) {
// 忽略解析错误
}
}
}
}
}
十、踩坑记录
坑1:SSE流式输出断连
现象:流式输出中途断开,前端收到EventSource的error事件。
原因:Nginx默认有proxy_buffering on,会缓冲SSE响应。60秒没有新数据时Nginx主动断开连接。
解决:
# Nginx配置
location /api/v1/chat/stream {
proxy_pass http://backend;
proxy_buffering off; # 关闭缓冲
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off; # 关闭分块传输编码
proxy_read_timeout 300s; # 延长超时
}
坑2:httpx连接池耗尽
现象:高并发时出现Connection pool is full错误。
原因:默认httpx连接池大小(max_connections=100)不够用。
解决:
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={
"Authorization": f"Bearer {self.api_key}",
},
timeout=httpx.Timeout(60.0, connect=10.0),
limits=httpx.Limits(
max_connections=200, # 增大连接池
max_keepalive_connections=50,
keepalive_expiry=30, # 30秒空闲后关闭
),
)
坑3:中文编码问题
现象:流式输出时中文出现乱码,尤其是多字节字符在chunk边界被截断。
原因:UTF-8中文是3字节,SSE按行切分时可能从中间截断。
解决:使用aiter_lines()而不是aiter_bytes(),httpx会自动处理行边界。如果用aiter_bytes(),需要在应用层处理编码:
# 错误做法
async for chunk in response.aiter_bytes():
text = chunk.decode("utf-8") # 可能截断多字节字符
# 正确做法
async for line in response.aiter_lines(): # 自动按行分割
if line.startswith("data: "):
data = json.loads(line[6:])
坑4:DeepSeek API返回429
现象:调用频率高时返回429 Too Many Requests。
原因:DeepSeek对免费用户和付费用户的限流策略不同。Tier 1账号限制为3 RPM(Requests Per Minute),Tier 2为10 RPM。
解决:
- 升级账号等级(充值后自动升级)
- 在应用层做请求队列和重试:
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry_error_callback=lambda x: None, # 重试耗尽后返回None
)
async def chat_with_retry(self, messages, **kwargs):
response = await self.client.post("/v1/chat/completions", json=payload)
response.raise_for_status()
return response.json()
十一、总结
本文从零搭建了一个完整的FastAPI + DeepSeek智能助手服务,涵盖了:
- API调用:使用httpx封装同步和流式两种调用方式
- 会话管理:内存级别的多轮对话支持
- 系统提示词:可复用的提示词模板设计
- 价格优化:利用前缀缓存和模型降级降低成本
- 生产实战:解决了SSE断连、连接池耗尽、中文编码等真实问题
下一步:第六篇将在这个基础上接入LangChain,构建RAG知识库,让AI能基于私有文档回答问题。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)