高并发优化:异步、缓存、限流、监控

文章信息:标题《高并发优化:异步、缓存、限流、监控》| 字数:约4000字 | 预估阅读时间:20分钟


1. 性能优化的核心思路

FastAPI应用性能瓶颈通常来自:

  • I/O阻塞:同步数据库、HTTP调用阻塞事件循环
  • 重复计算:相同请求每次都计算
  • 资源耗尽:连接池、内存、CPU被打满

优化原则:

  1. 异步优先:所有I/O操作用async
  2. 缓存复用:减少重复计算和数据库查询
  3. 限流保护:防止突发流量打垮服务
  4. 监控定位:用数据驱动优化决策

2. 异步最佳实践

2.1 异步vs同步的差异

同步写法(阻塞):

# 同步:请求1等待3秒,请求2也等待3秒,总共6秒
# 注意:同步路由用普通 def,FastAPI 会自动在线程池中执行,不会阻塞事件循环
@app.get("/sync")
def sync_handler():  # 不要用 async def + 同步 requests,否则会阻塞事件循环
    result = requests.get("https://api.example.com/data")  # 3秒
    return result.json()

异步写法(非阻塞):

# 异步:请求1和请求2同时发起,总共3秒
@app.get("/async")
async def async_handler():
    async with httpx.AsyncClient() as client:
        result = await client.get("https://api.example.com/data")  # 3秒
        return result.json()

2.2 异步HTTP客户端

# utils/http_client.py
import httpx
from contextlib import asynccontextmanager
from typing import Optional

class HTTPClient:
    """全局异步HTTP客户端

    注意:此单例模式仅在单进程(单个 worker)下有效。
    如果使用 gunicorn --workers N 多进程部署,每个进程会持有独立的实例,
    连接池不会跨进程共享。多进程场景下建议使用共享连接池(如通过 Redis 或外部代理)。
    """

    _instance: Optional["HTTPClient"] = None
    _client: Optional[httpx.AsyncClient] = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    async def get_client(self) -> httpx.AsyncClient:
        if self._client is None or self._client.is_closed:
            self._client = httpx.AsyncClient(
                timeout=httpx.Timeout(30.0, connect=5.0),
                limits=httpx.Limits(
                    max_keepalive_connections=20,
                    max_connections=100,
                    keepalive_expiry=30.0
                ),
                follow_redirects=True,
            )
        return self._client

    async def close(self):
        if self._client and not self._client.is_closed:
            await self._client.aclose()
            self._client = None

# 全局实例
http_client = HTTPClient()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时
    yield
    # 关闭时
    await http_client.close()

app = FastAPI(lifespan=lifespan)

@app.get("/ai-proxy")
async def proxy_to_ai(prompt: str):
    client = await http_client.get_client()
    response = await client.post(
        "https://api.deepseek.com/v1/chat/completions",
        json={"model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}]}
    )
    return response.json()

2.3 异步数据库连接池

# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool

# 生产环境:有限连接池
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=10,           # 常规连接数
    max_overflow=20,        # 允许额外创建
    pool_timeout=30,        # 获取连接超时
    pool_pre_ping=True,     # 使用前检查连接
    pool_recycle=3600,      # 1小时后回收连接
)

# 开发环境:无连接池(方便调试)
dev_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    poolclass=NullPool,
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

2.4 并发任务处理

# 处理多个AI请求(并发)
async def process_multiple_queries(queries: List[str]) -> List[str]:
    async with httpx.AsyncClient() as client:
        tasks = [
            call_deepseek(client, q) for q in queries
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return [r if isinstance(r, str) else f"Error: {r}" for r in results]

async def call_deepseek(client: httpx.AsyncClient, query: str) -> str:
    response = await client.post(
        "https://api.deepseek.com/v1/chat/completions",
        headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}"},
        json={
            "model": "deepseek-chat",
            "messages": [{"role": "user", "content": query}]
        }
    )
    return response.json()["choices"][0]["message"]["content"]

# 限流并发(控制并发数)
async def limited_gather(tasks: List, max_concurrent: int = 5) -> List:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def with_semaphore(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*[with_semaphore(t) for t in tasks])

3. Redis缓存

3.1 缓存层设计

# utils/cache.py
import redis.asyncio as redis
import json
from typing import Optional, Any
import hashlib

class CacheManager:
    """Redis缓存管理器"""

    def __init__(self, url: str = "redis://localhost:6379/0"):
        self.redis = redis.from_url(url, decode_responses=True)
        self.default_ttl = 3600  # 1小时

    async def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        value = await self.redis.get(key)
        if value:
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                return value
        return None

    async def set(self, key: str, value: Any, ttl: int = None) -> bool:
        """设置缓存"""
        ttl = ttl or self.default_ttl
        if isinstance(value, (dict, list)):
            value = json.dumps(value)
        return await self.redis.setex(key, ttl, value)

    async def delete(self, key: str) -> bool:
        """删除缓存"""
        return await self.redis.delete(key) > 0

    async def exists(self, key: str) -> bool:
        """检查key是否存在"""
        return await self.redis.exists(key) > 0

    def make_key(self, prefix: str, *args) -> str:
        """生成缓存key"""
        key_str = ":".join(str(a) for a in args)
        hash_key = hashlib.md5(key_str.encode()).hexdigest()[:12]
        return f"{prefix}:{hash_key}"

# 全局实例
cache = CacheManager()

3.2 缓存中间件

# middlewares/cache.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from utils.cache import cache
import json
import logging

logger = logging.getLogger(__name__)

class CacheMiddleware(BaseHTTPMiddleware):
    """自动缓存GET请求

    注意:读取 body_iterator 后需要构造新的 Response 对象返回,
    因为原始响应的 body_iterator 是一次性的,消费后无法再次读取。
    这意味着原始响应的流式传输特性会丢失,整个响应体会被缓存到内存中。
    对于大文件或 SSE 流式响应,应跳过缓存(见下方 status_code != 200 的分支)。
    """

    async def dispatch(self, request: Request, call_next):
        # 只缓存GET请求
        if request.method != "GET":
            return await call_next(request)

        # 生成缓存key
        cache_key = f"http:{request.url.path}:{request.url.query}"

        # 尝试获取缓存
        cached = await cache.get(cache_key)
        if cached:
            return Response(
                content=json.dumps(cached) if isinstance(cached, dict) else cached,
                media_type="application/json",
                headers={"X-Cache": "HIT"}
            )

        # 执行请求
        response = await call_next(request)

        # 缓存响应(如果成功)
        if response.status_code == 200:
            body = b""
            async for chunk in response.body_iterator:
                body += chunk
            try:
                data = json.loads(body.decode())
                await cache.set(cache_key, data, ttl=300)  # 缓存5分钟
            except Exception as e:
                logger.warning(f"缓存响应解析失败: {e}")

            return Response(
                content=body,
                status_code=response.status_code,
                headers={**dict(response.headers), "X-Cache": "MISS"}
            )

        return response

3.3 会话存储

# utils/session.py
from typing import Optional, Dict, List
import json

class SessionManager:
    """基于Redis的会话管理"""

    def __init__(self, redis_client, prefix: str = "session"):
        self.redis = redis_client
        self.prefix = prefix

    def _key(self, session_id: str) -> str:
        return f"{self.prefix}:{session_id}"

    async def create(self, session_id: str, ttl: int = 86400) -> Dict:
        """创建会话"""
        session_data = {"created_at": self._now()}
        await self.redis.setex(
            self._key(session_id),
            ttl,
            json.dumps(session_data)
        )
        return session_data

    async def get(self, session_id: str) -> Optional[Dict]:
        """获取会话"""
        data = await self.redis.get(self._key(session_id))
        if data:
            return json.loads(data)
        return None

    async def update(self, session_id: str, data: Dict) -> bool:
        """更新会话(保留原有 TTL)"""
        key = self._key(session_id)
        existing = await self.get(session_id)
        if existing is None:
            return False
        existing.update(data)
        # 先获取剩余 TTL,再用 setex 重新设置,避免 redis.set 丢失 TTL
        ttl = await self.redis.ttl(key)
        if ttl > 0:
            await self.redis.setex(key, ttl, json.dumps(existing))
        else:
            # TTL 已过期或为 -1(无过期),直接 set
            await self.redis.set(key, json.dumps(existing))
        return True

    async def delete(self, session_id: str) -> bool:
        """删除会话"""
        return await self.redis.delete(self._key(session_id)) > 0

    @staticmethod
    def _now() -> str:
        from datetime import datetime, timezone
        return datetime.now(timezone.utc).isoformat()

4. 限流策略

4.1 使用slowapi

# main.py
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# 初始化限流器(基于IP地址)
limiter = Limiter(key_func=get_remote_address)

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# 全局限流中间件(示例:基于 IP 的滑动窗口限流)
from collections import defaultdict
import time

_request_log: dict[str, list[float]] = defaultdict(list)
GLOBAL_RATE_LIMIT = 100  # 每分钟最大请求数
GLOBAL_RATE_WINDOW = 60  # 窗口大小(秒)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # 某些路径不限流
    if request.url.path in ["/health", "/docs", "/openapi.json"]:
        return await call_next(request)

    # 基于 IP 的滑动窗口限流
    client_ip = request.client.host if request.client else "unknown"
    now = time.time()
    window_start = now - GLOBAL_RATE_WINDOW

    # 清理过期记录
    _request_log[client_ip] = [t for t in _request_log[client_ip] if t > window_start]

    if len(_request_log[client_ip]) >= GLOBAL_RATE_LIMIT:
        return Response(content="Too Many Requests", status_code=429)

    _request_log[client_ip].append(now)
    return await call_next(request)

# 端点限流装饰器
@app.get("/ai/chat")
@limiter.limit("10/minute")  # 每分钟10次
async def ai_chat(request: Request, prompt: str):
    # AI处理逻辑
    pass

@app.get("/search")
@limiter.limit("30/minute")  # 每分钟30次
async def search(request: Request, q: str):
    # 搜索逻辑
    pass

# 自定义限流规则(基于用户)
from slowapi import Limiter
from slowapi.util import get_remote_address

def get_user_identifier(request: Request) -> str:
    """基于API Key的限流"""
    api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
    if api_key:
        return api_key
    return get_remote_address(request)

user_limiter = Limiter(key_func=get_user_identifier)

@app.post("/ai/completion")
@user_limiter.limit("60/minute")  # 每分钟60次
async def ai_completion(request: Request):
    pass

4.2 令牌桶算法实现

# utils/ratelimit.py
import time
import asyncio
from typing import Dict
from functools import lru_cache

class TokenBucket:
    """令牌桶限流器"""

    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate  # 每秒补充令牌数
        self.tokens = capacity
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1) -> bool:
        """尝试获取令牌,返回是否成功"""
        async with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def _refill(self):
        """补充令牌"""
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

class MultiTierLimiter:
    """多层级限流器(支持用户/租户/全局)

    注意:user_limiters 字典会随用户数无限增长。
    生产环境建议使用 LRU 缓存淘汰不活跃的用户桶,或定时清理。
    示例:使用 functools.lru_cache 或 cachetools.TTLCache 限制最大条目数。
    """

    MAX_USER_LIMITERS = 10000  # 建议设置上限

    def __init__(self):
        self.user_limiters: Dict[str, TokenBucket] = {}
        self.global_limiter = TokenBucket(capacity=1000, refill_rate=100)  # 全局每秒1000请求

    def get_user_bucket(self, user_id: str, tier: str = "free") -> TokenBucket:
        """获取用户的限流桶"""
        if user_id not in self.user_limiters:
            # 超过上限时清理最早的条目(简单 LRU 淘汰)
            if len(self.user_limiters) >= self.MAX_USER_LIMITERS:
                oldest_key = next(iter(self.user_limiters))
                del self.user_limiters[oldest_key]
            limits = {
                "free": (10, 1),      # 10个令牌,每秒补充1个
                "pro": (100, 10),     # 100个令牌,每秒补充10个
                "enterprise": (1000, 100)  # 1000个令牌,每秒补充100个
            }
            capacity, refill = limits.get(tier, limits["free"])
            self.user_limiters[user_id] = TokenBucket(capacity, refill)
        return self.user_limiters[user_id]

    async def check(self, user_id: str, tier: str = "free") -> tuple[bool, str]:
        """检查是否允许请求"""
        global_bucket = self.global_limiter
        user_bucket = self.get_user_bucket(user_id, tier)

        # 先检查全局
        if not await global_bucket.acquire():
            return False, "系统限流,请稍后再试"

        # 再检查用户级
        if not await user_bucket.acquire():
            return False, f"您的{tier}套餐已达限流阈值"

        return True, "允许"

4.3 AI API特殊限流

# utils/ai_ratelimit.py
import asyncio
import time
from collections import defaultdict

class AIAPIRateLimiter:
    """AI API特殊限流器(按模型和Token数)"""

    def __init__(self):
        self.limits = {
            "deepseek-chat": {"requests": 60, "tokens": 1_000_000},  # RPM和TPM
            "deepseek-coder": {"requests": 30, "tokens": 500_000},
            "gpt-4o-mini": {"requests": 500, "tokens": 1_000_000},
        }
        self.request_times = defaultdict(list)
        # token_counts 存储 (timestamp, token_count) 元组,便于按时间窗口清理
        self.token_counts = defaultdict(list)

    async def acquire(self, model: str, estimated_tokens: int = 0) -> bool:
        """检查是否可以调用AI API"""
        now = time.time()
        limit_config = self.limits.get(model, {"requests": 100, "tokens": 500_000})

        # 清理过期记录(1分钟内)
        self.request_times[model] = [t for t in self.request_times[model] if now - t < 60]
        self.token_counts[model] = [(ts, tc) for ts, tc in self.token_counts[model] if now - ts < 60]

        # 检查请求数
        if len(self.request_times[model]) >= limit_config["requests"]:
            return False

        # 检查Token数
        if estimated_tokens > 0:
            recent_tokens = sum(tc for _, tc in self.token_counts[model])
            if recent_tokens + estimated_tokens > limit_config["tokens"]:
                return False

        # 记录
        self.request_times[model].append(now)
        if estimated_tokens > 0:
            self.token_counts[model].append((now, estimated_tokens))

        return True

    async def wait_and_acquire(self, model: str, estimated_tokens: int = 0, max_wait: int = 60) -> bool:
        """等待直到可以调用"""
        start = time.time()
        while time.time() - start < max_wait:
            if await self.acquire(model, estimated_tokens):
                return True
            await asyncio.sleep(1)
        return False

5. Prometheus + Grafana监控

5.1 Prometheus指标

# utils/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# 使用默认注册表(prometheus_client 内置的 REGISTRY),
# 这样 /metrics 端点直接调用 generate_latest() 即可输出所有指标。

# 请求计数
REQUEST_COUNT = Counter(
    "fastapi_requests_total",
    "Total requests",
    ["method", "endpoint", "status"],
)

# 请求延迟
REQUEST_LATENCY = Histogram(
    "fastapi_request_duration_seconds",
    "Request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
)

# AI API调用计数
AI_API_CALLS = Counter(
    "ai_api_calls_total",
    "AI API calls",
    ["model", "status"],
)

# AI API延迟
AI_API_LATENCY = Histogram(
    "ai_api_duration_seconds",
    "AI API call duration",
    ["model"],
    buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)

# 当前连接数
ACTIVE_CONNECTIONS = Gauge(
    "fastapi_active_connections",
    "Active connections",
)

# 缓存命中率
CACHE_HITS = Counter(
    "cache_hits_total",
    "Cache hits",
    ["cache_type"],
)

CACHE_MISSES = Counter(
    "cache_misses_total",
    "Cache misses",
    ["cache_type"],
)

5.2 FastAPI中间件集成

# middlewares/metrics.py
import time
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from utils.metrics import (
    REQUEST_COUNT, REQUEST_LATENCY, ACTIVE_CONNECTIONS
)

class MetricsMiddleware(BaseHTTPMiddleware):
    """指标收集中间件"""

    async def dispatch(self, request: Request, call_next):
        ACTIVE_CONNECTIONS.inc()
        start_time = time.time()

        try:
            response = await call_next(request)
            status = response.status_code
        except Exception as e:
            status = 500
            raise
        finally:
            ACTIVE_CONNECTIONS.dec()
            duration = time.time() - start_time

            # 记录指标
            endpoint = request.url.path
            method = request.method

            REQUEST_COUNT.labels(
                method=method,
                endpoint=endpoint,
                status=str(status)
            ).inc()

            REQUEST_LATENCY.labels(
                method=method,
                endpoint=endpoint
            ).observe(duration)

        return response

5.3 指标端点

# routers/metrics.py
from fastapi import APIRouter, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST

router = APIRouter(prefix="/metrics", tags=["监控"])

@router.get("")
async def metrics():
    """Prometheus指标端点"""
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

@router.get("/health")
async def health():
    """健康检查"""
    return {"status": "healthy"}

5.4 Grafana仪表板配置

注意:以下 JSON 仅示意核心 panel 配置,不是完整的 Grafana 仪表板导入格式。
实际导入时需要补充 idgridPosdatasourceintervalfieldConfig 等字段。
可参考 Grafana 官方文档 构建完整仪表板。

{
  "dashboard": {
    "title": "FastAPI应用监控",
    "panels": [
      {
        "title": "QPS(每秒请求数)",
        "targets": [
          {
            "expr": "rate(fastapi_requests_total[1m])",
            "legendFormat": "{{endpoint}}"
          }
        ]
      },
      {
        "title": "P99延迟",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(fastapi_request_duration_seconds_bucket[5m]))",
            "legendFormat": "P99"
          }
        ]
      },
      {
        "title": "错误率",
        "targets": [
          {
            "expr": "rate(fastapi_requests_total{status=~'5..'}[5m]) / rate(fastapi_requests_total[5m])",
            "legendFormat": "错误率"
          }
        ]
      },
      {
        "title": "AI API调用延迟",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(ai_api_duration_seconds_bucket[5m]))",
            "legendFormat": "P95 - {{model}}"
          }
        ]
      }
    ]
  }
}

6. 结构化日志

# utils/logging.py
import structlog
import logging
from typing import Any

# 配置structlog
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

# 请求日志中间件
async def log_request(request: Request, call_next):
    logger.info(
        "request_started",
        method=request.method,
        path=request.url.path,
        client=request.client.host if request.client else "unknown",
    )

    response = await call_next(request)

    logger.info(
        "request_completed",
        method=request.method,
        path=request.url.path,
        status=response.status_code,
    )

    return response

7. Locust性能测试

# tests/load_test.py
from locust import HttpUser, task, between, events
import random

class FastAPIUser(HttpUser):
    wait_time = between(1, 3)

    def on_start(self):
        """用户启动时获取认证 token"""
        # 根据实际认证方式替换,例如 JWT 登录
        response = self.client.post("/auth/login", json={
            "username": "testuser",
            "password": "testpass"
        })
        if response.status_code == 200:
            token = response.json().get("access_token", "")
            # 后续所有请求自动携带认证头
            self.client.headers.update({"Authorization": f"Bearer {token}"})
        # 如果使用 API Key 认证,可直接设置:
        # self.client.headers.update({"Authorization": "Bearer YOUR_API_KEY"})

    @task(3)
    def chat_with_ai(self):
        """AI聊天接口(高频)"""
        self.client.post(
            "/ai/chat",
            json={"prompt": f"什么是Python #{random.randint(1,100)}"},
        )

    @task(1)
    def health_check(self):
        """健康检查(低频)"""
        self.client.get("/health")

    @task(2)
    def search(self):
        """搜索接口(中频)"""
        self.client.get(f"/search?q=python&limit=10")

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    print("性能测试开始")

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    print("性能测试结束")

# 运行:locust -f tests/load_test.py --host=http://localhost:8000

8. 踩坑记录

坑1:async函数中使用同步库

问题:在async函数中用了requests(同步库),阻塞了整个事件循环。

解决:替换为httpx.AsyncClientaiohttp。如果必须用同步库,用run_in_executor包装:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def sync_to_async():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, blocking_function)
    return result

补充asyncio.get_event_loop() 在 Python 3.10+ 已废弃,应使用 asyncio.get_running_loop()
更简洁的写法是直接使用 asyncio.to_thread(blocking_function)(Python 3.9+)。

坑2:连接池配置导致"Too many connections"

问题:PostgreSQL默认max_connections=100,但4个FastAPI实例各开20个连接池就超标了。

解决:计算总连接数,合理分配:

# 每个实例
engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,      # 常规连接
    max_overflow=5,    # 额外连接
)
# 4个实例 = 10*4 + 5*4 = 60 连接,加上其他服务还有余量

坑3:Redis缓存雪崩

问题:大量缓存同时过期,瞬间大量请求打到数据库。

解决:添加随机延迟:

async def set_with_jitter(self, key: str, value: Any, base_ttl: int):
    # 基础TTL + 随机0-10%延迟
    import random
    jitter = base_ttl * random.uniform(0, 0.1)
    await self.set(key, value, ttl=int(base_ttl + jitter))

坑4:限流器在高并发下失效

问题:多个协程同时检查限流,都通过,导致超过限制。

解决:使用asyncio.Lock保证原子性:

self._lock = asyncio.Lock()

async def acquire(self, tokens: int = 1) -> bool:
    async with self._lock:
        # 检查和扣减必须是原子操作
        ...

坑5:Prometheus指标影响性能

问题:每个请求都更新指标,高并发下反而成为瓶颈。

解决:使用multiprocessing模式或减少指标维度:

# 不要给每个请求的每个参数都打标签
# 坏:endpoint=具体路径, method=具体方法, status=具体码
# 好:endpoint=/api/*, status=2xx/4xx/5xx

坑6:热重载时连接池泄漏

问题:用uvicorn --reload开发时,代码重载但连接池没清理。

解决:正确使用lifespan管理生命周期:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 初始化
    yield
    # 清理
    await engine.dispose()

9. 总结

本文覆盖了FastAPI生产环境性能优化的完整方案:

  1. 异步优化:httpx异步客户端、异步数据库连接池、并发任务处理
  2. 缓存策略:Redis缓存层、自动缓存中间件、会话存储
  3. 限流保护:slowapi限流、令牌桶算法、AI API特殊限流
  4. 监控体系:Prometheus指标、Grafana仪表板、结构化日志
  5. 性能测试:Locust负载测试

关键指标

  • P99延迟 < 500ms
  • 错误率 < 0.1%
  • 缓存命中率 > 80%

监控关键指标

  • QPS(每秒请求数)
  • P50/P95/P99延迟
  • 错误率
  • CPU/内存使用率
  • AI API延迟和Token消耗
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐