高并发优化:异步、缓存、限流、监控
高并发优化:异步、缓存、限流、监控
文章信息:标题《高并发优化:异步、缓存、限流、监控》| 字数:约4000字 | 预估阅读时间:20分钟
1. 性能优化的核心思路
FastAPI应用性能瓶颈通常来自:
- I/O阻塞:同步数据库、HTTP调用阻塞事件循环
- 重复计算:相同请求每次都计算
- 资源耗尽:连接池、内存、CPU被打满
优化原则:
- 异步优先:所有I/O操作用async
- 缓存复用:减少重复计算和数据库查询
- 限流保护:防止突发流量打垮服务
- 监控定位:用数据驱动优化决策
2. 异步最佳实践
2.1 异步vs同步的差异
同步写法(阻塞):
# 同步:请求1等待3秒,请求2也等待3秒,总共6秒
# 注意:同步路由用普通 def,FastAPI 会自动在线程池中执行,不会阻塞事件循环
@app.get("/sync")
def sync_handler(): # 不要用 async def + 同步 requests,否则会阻塞事件循环
result = requests.get("https://api.example.com/data") # 3秒
return result.json()
异步写法(非阻塞):
# 异步:请求1和请求2同时发起,总共3秒
@app.get("/async")
async def async_handler():
async with httpx.AsyncClient() as client:
result = await client.get("https://api.example.com/data") # 3秒
return result.json()
2.2 异步HTTP客户端
# utils/http_client.py
import httpx
from contextlib import asynccontextmanager
from typing import Optional
class HTTPClient:
"""全局异步HTTP客户端
注意:此单例模式仅在单进程(单个 worker)下有效。
如果使用 gunicorn --workers N 多进程部署,每个进程会持有独立的实例,
连接池不会跨进程共享。多进程场景下建议使用共享连接池(如通过 Redis 或外部代理)。
"""
_instance: Optional["HTTPClient"] = None
_client: Optional[httpx.AsyncClient] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
),
follow_redirects=True,
)
return self._client
async def close(self):
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
# 全局实例
http_client = HTTPClient()
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时
yield
# 关闭时
await http_client.close()
app = FastAPI(lifespan=lifespan)
@app.get("/ai-proxy")
async def proxy_to_ai(prompt: str):
client = await http_client.get_client()
response = await client.post(
"https://api.deepseek.com/v1/chat/completions",
json={"model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}]}
)
return response.json()
2.3 异步数据库连接池
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool
# 生产环境:有限连接池
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10, # 常规连接数
max_overflow=20, # 允许额外创建
pool_timeout=30, # 获取连接超时
pool_pre_ping=True, # 使用前检查连接
pool_recycle=3600, # 1小时后回收连接
)
# 开发环境:无连接池(方便调试)
dev_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
poolclass=NullPool,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
2.4 并发任务处理
# 处理多个AI请求(并发)
async def process_multiple_queries(queries: List[str]) -> List[str]:
async with httpx.AsyncClient() as client:
tasks = [
call_deepseek(client, q) for q in queries
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if isinstance(r, str) else f"Error: {r}" for r in results]
async def call_deepseek(client: httpx.AsyncClient, query: str) -> str:
response = await client.post(
"https://api.deepseek.com/v1/chat/completions",
headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}"},
json={
"model": "deepseek-chat",
"messages": [{"role": "user", "content": query}]
}
)
return response.json()["choices"][0]["message"]["content"]
# 限流并发(控制并发数)
async def limited_gather(tasks: List, max_concurrent: int = 5) -> List:
semaphore = asyncio.Semaphore(max_concurrent)
async def with_semaphore(task):
async with semaphore:
return await task
return await asyncio.gather(*[with_semaphore(t) for t in tasks])
3. Redis缓存
3.1 缓存层设计
# utils/cache.py
import redis.asyncio as redis
import json
from typing import Optional, Any
import hashlib
class CacheManager:
"""Redis缓存管理器"""
def __init__(self, url: str = "redis://localhost:6379/0"):
self.redis = redis.from_url(url, decode_responses=True)
self.default_ttl = 3600 # 1小时
async def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
value = await self.redis.get(key)
if value:
try:
return json.loads(value)
except json.JSONDecodeError:
return value
return None
async def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""设置缓存"""
ttl = ttl or self.default_ttl
if isinstance(value, (dict, list)):
value = json.dumps(value)
return await self.redis.setex(key, ttl, value)
async def delete(self, key: str) -> bool:
"""删除缓存"""
return await self.redis.delete(key) > 0
async def exists(self, key: str) -> bool:
"""检查key是否存在"""
return await self.redis.exists(key) > 0
def make_key(self, prefix: str, *args) -> str:
"""生成缓存key"""
key_str = ":".join(str(a) for a in args)
hash_key = hashlib.md5(key_str.encode()).hexdigest()[:12]
return f"{prefix}:{hash_key}"
# 全局实例
cache = CacheManager()
3.2 缓存中间件
# middlewares/cache.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from utils.cache import cache
import json
import logging
logger = logging.getLogger(__name__)
class CacheMiddleware(BaseHTTPMiddleware):
"""自动缓存GET请求
注意:读取 body_iterator 后需要构造新的 Response 对象返回,
因为原始响应的 body_iterator 是一次性的,消费后无法再次读取。
这意味着原始响应的流式传输特性会丢失,整个响应体会被缓存到内存中。
对于大文件或 SSE 流式响应,应跳过缓存(见下方 status_code != 200 的分支)。
"""
async def dispatch(self, request: Request, call_next):
# 只缓存GET请求
if request.method != "GET":
return await call_next(request)
# 生成缓存key
cache_key = f"http:{request.url.path}:{request.url.query}"
# 尝试获取缓存
cached = await cache.get(cache_key)
if cached:
return Response(
content=json.dumps(cached) if isinstance(cached, dict) else cached,
media_type="application/json",
headers={"X-Cache": "HIT"}
)
# 执行请求
response = await call_next(request)
# 缓存响应(如果成功)
if response.status_code == 200:
body = b""
async for chunk in response.body_iterator:
body += chunk
try:
data = json.loads(body.decode())
await cache.set(cache_key, data, ttl=300) # 缓存5分钟
except Exception as e:
logger.warning(f"缓存响应解析失败: {e}")
return Response(
content=body,
status_code=response.status_code,
headers={**dict(response.headers), "X-Cache": "MISS"}
)
return response
3.3 会话存储
# utils/session.py
from typing import Optional, Dict, List
import json
class SessionManager:
"""基于Redis的会话管理"""
def __init__(self, redis_client, prefix: str = "session"):
self.redis = redis_client
self.prefix = prefix
def _key(self, session_id: str) -> str:
return f"{self.prefix}:{session_id}"
async def create(self, session_id: str, ttl: int = 86400) -> Dict:
"""创建会话"""
session_data = {"created_at": self._now()}
await self.redis.setex(
self._key(session_id),
ttl,
json.dumps(session_data)
)
return session_data
async def get(self, session_id: str) -> Optional[Dict]:
"""获取会话"""
data = await self.redis.get(self._key(session_id))
if data:
return json.loads(data)
return None
async def update(self, session_id: str, data: Dict) -> bool:
"""更新会话(保留原有 TTL)"""
key = self._key(session_id)
existing = await self.get(session_id)
if existing is None:
return False
existing.update(data)
# 先获取剩余 TTL,再用 setex 重新设置,避免 redis.set 丢失 TTL
ttl = await self.redis.ttl(key)
if ttl > 0:
await self.redis.setex(key, ttl, json.dumps(existing))
else:
# TTL 已过期或为 -1(无过期),直接 set
await self.redis.set(key, json.dumps(existing))
return True
async def delete(self, session_id: str) -> bool:
"""删除会话"""
return await self.redis.delete(self._key(session_id)) > 0
@staticmethod
def _now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
4. 限流策略
4.1 使用slowapi
# main.py
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# 初始化限流器(基于IP地址)
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# 全局限流中间件(示例:基于 IP 的滑动窗口限流)
from collections import defaultdict
import time
_request_log: dict[str, list[float]] = defaultdict(list)
GLOBAL_RATE_LIMIT = 100 # 每分钟最大请求数
GLOBAL_RATE_WINDOW = 60 # 窗口大小(秒)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# 某些路径不限流
if request.url.path in ["/health", "/docs", "/openapi.json"]:
return await call_next(request)
# 基于 IP 的滑动窗口限流
client_ip = request.client.host if request.client else "unknown"
now = time.time()
window_start = now - GLOBAL_RATE_WINDOW
# 清理过期记录
_request_log[client_ip] = [t for t in _request_log[client_ip] if t > window_start]
if len(_request_log[client_ip]) >= GLOBAL_RATE_LIMIT:
return Response(content="Too Many Requests", status_code=429)
_request_log[client_ip].append(now)
return await call_next(request)
# 端点限流装饰器
@app.get("/ai/chat")
@limiter.limit("10/minute") # 每分钟10次
async def ai_chat(request: Request, prompt: str):
# AI处理逻辑
pass
@app.get("/search")
@limiter.limit("30/minute") # 每分钟30次
async def search(request: Request, q: str):
# 搜索逻辑
pass
# 自定义限流规则(基于用户)
from slowapi import Limiter
from slowapi.util import get_remote_address
def get_user_identifier(request: Request) -> str:
"""基于API Key的限流"""
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if api_key:
return api_key
return get_remote_address(request)
user_limiter = Limiter(key_func=get_user_identifier)
@app.post("/ai/completion")
@user_limiter.limit("60/minute") # 每分钟60次
async def ai_completion(request: Request):
pass
4.2 令牌桶算法实现
# utils/ratelimit.py
import time
import asyncio
from typing import Dict
from functools import lru_cache
class TokenBucket:
"""令牌桶限流器"""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate # 每秒补充令牌数
self.tokens = capacity
self.last_refill = time.time()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
"""尝试获取令牌,返回是否成功"""
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""补充令牌"""
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
class MultiTierLimiter:
"""多层级限流器(支持用户/租户/全局)
注意:user_limiters 字典会随用户数无限增长。
生产环境建议使用 LRU 缓存淘汰不活跃的用户桶,或定时清理。
示例:使用 functools.lru_cache 或 cachetools.TTLCache 限制最大条目数。
"""
MAX_USER_LIMITERS = 10000 # 建议设置上限
def __init__(self):
self.user_limiters: Dict[str, TokenBucket] = {}
self.global_limiter = TokenBucket(capacity=1000, refill_rate=100) # 全局每秒1000请求
def get_user_bucket(self, user_id: str, tier: str = "free") -> TokenBucket:
"""获取用户的限流桶"""
if user_id not in self.user_limiters:
# 超过上限时清理最早的条目(简单 LRU 淘汰)
if len(self.user_limiters) >= self.MAX_USER_LIMITERS:
oldest_key = next(iter(self.user_limiters))
del self.user_limiters[oldest_key]
limits = {
"free": (10, 1), # 10个令牌,每秒补充1个
"pro": (100, 10), # 100个令牌,每秒补充10个
"enterprise": (1000, 100) # 1000个令牌,每秒补充100个
}
capacity, refill = limits.get(tier, limits["free"])
self.user_limiters[user_id] = TokenBucket(capacity, refill)
return self.user_limiters[user_id]
async def check(self, user_id: str, tier: str = "free") -> tuple[bool, str]:
"""检查是否允许请求"""
global_bucket = self.global_limiter
user_bucket = self.get_user_bucket(user_id, tier)
# 先检查全局
if not await global_bucket.acquire():
return False, "系统限流,请稍后再试"
# 再检查用户级
if not await user_bucket.acquire():
return False, f"您的{tier}套餐已达限流阈值"
return True, "允许"
4.3 AI API特殊限流
# utils/ai_ratelimit.py
import asyncio
import time
from collections import defaultdict
class AIAPIRateLimiter:
"""AI API特殊限流器(按模型和Token数)"""
def __init__(self):
self.limits = {
"deepseek-chat": {"requests": 60, "tokens": 1_000_000}, # RPM和TPM
"deepseek-coder": {"requests": 30, "tokens": 500_000},
"gpt-4o-mini": {"requests": 500, "tokens": 1_000_000},
}
self.request_times = defaultdict(list)
# token_counts 存储 (timestamp, token_count) 元组,便于按时间窗口清理
self.token_counts = defaultdict(list)
async def acquire(self, model: str, estimated_tokens: int = 0) -> bool:
"""检查是否可以调用AI API"""
now = time.time()
limit_config = self.limits.get(model, {"requests": 100, "tokens": 500_000})
# 清理过期记录(1分钟内)
self.request_times[model] = [t for t in self.request_times[model] if now - t < 60]
self.token_counts[model] = [(ts, tc) for ts, tc in self.token_counts[model] if now - ts < 60]
# 检查请求数
if len(self.request_times[model]) >= limit_config["requests"]:
return False
# 检查Token数
if estimated_tokens > 0:
recent_tokens = sum(tc for _, tc in self.token_counts[model])
if recent_tokens + estimated_tokens > limit_config["tokens"]:
return False
# 记录
self.request_times[model].append(now)
if estimated_tokens > 0:
self.token_counts[model].append((now, estimated_tokens))
return True
async def wait_and_acquire(self, model: str, estimated_tokens: int = 0, max_wait: int = 60) -> bool:
"""等待直到可以调用"""
start = time.time()
while time.time() - start < max_wait:
if await self.acquire(model, estimated_tokens):
return True
await asyncio.sleep(1)
return False
5. Prometheus + Grafana监控
5.1 Prometheus指标
# utils/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# 使用默认注册表(prometheus_client 内置的 REGISTRY),
# 这样 /metrics 端点直接调用 generate_latest() 即可输出所有指标。
# 请求计数
REQUEST_COUNT = Counter(
"fastapi_requests_total",
"Total requests",
["method", "endpoint", "status"],
)
# 请求延迟
REQUEST_LATENCY = Histogram(
"fastapi_request_duration_seconds",
"Request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
)
# AI API调用计数
AI_API_CALLS = Counter(
"ai_api_calls_total",
"AI API calls",
["model", "status"],
)
# AI API延迟
AI_API_LATENCY = Histogram(
"ai_api_duration_seconds",
"AI API call duration",
["model"],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
# 当前连接数
ACTIVE_CONNECTIONS = Gauge(
"fastapi_active_connections",
"Active connections",
)
# 缓存命中率
CACHE_HITS = Counter(
"cache_hits_total",
"Cache hits",
["cache_type"],
)
CACHE_MISSES = Counter(
"cache_misses_total",
"Cache misses",
["cache_type"],
)
5.2 FastAPI中间件集成
# middlewares/metrics.py
import time
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from utils.metrics import (
REQUEST_COUNT, REQUEST_LATENCY, ACTIVE_CONNECTIONS
)
class MetricsMiddleware(BaseHTTPMiddleware):
"""指标收集中间件"""
async def dispatch(self, request: Request, call_next):
ACTIVE_CONNECTIONS.inc()
start_time = time.time()
try:
response = await call_next(request)
status = response.status_code
except Exception as e:
status = 500
raise
finally:
ACTIVE_CONNECTIONS.dec()
duration = time.time() - start_time
# 记录指标
endpoint = request.url.path
method = request.method
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=str(status)
).inc()
REQUEST_LATENCY.labels(
method=method,
endpoint=endpoint
).observe(duration)
return response
5.3 指标端点
# routers/metrics.py
from fastapi import APIRouter, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
router = APIRouter(prefix="/metrics", tags=["监控"])
@router.get("")
async def metrics():
"""Prometheus指标端点"""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
@router.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy"}
5.4 Grafana仪表板配置
注意:以下 JSON 仅示意核心 panel 配置,不是完整的 Grafana 仪表板导入格式。
实际导入时需要补充id、gridPos、datasource、interval、fieldConfig等字段。
可参考 Grafana 官方文档 构建完整仪表板。
{
"dashboard": {
"title": "FastAPI应用监控",
"panels": [
{
"title": "QPS(每秒请求数)",
"targets": [
{
"expr": "rate(fastapi_requests_total[1m])",
"legendFormat": "{{endpoint}}"
}
]
},
{
"title": "P99延迟",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(fastapi_request_duration_seconds_bucket[5m]))",
"legendFormat": "P99"
}
]
},
{
"title": "错误率",
"targets": [
{
"expr": "rate(fastapi_requests_total{status=~'5..'}[5m]) / rate(fastapi_requests_total[5m])",
"legendFormat": "错误率"
}
]
},
{
"title": "AI API调用延迟",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(ai_api_duration_seconds_bucket[5m]))",
"legendFormat": "P95 - {{model}}"
}
]
}
]
}
}
6. 结构化日志
# utils/logging.py
import structlog
import logging
from typing import Any
# 配置structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# 请求日志中间件
async def log_request(request: Request, call_next):
logger.info(
"request_started",
method=request.method,
path=request.url.path,
client=request.client.host if request.client else "unknown",
)
response = await call_next(request)
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status=response.status_code,
)
return response
7. Locust性能测试
# tests/load_test.py
from locust import HttpUser, task, between, events
import random
class FastAPIUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""用户启动时获取认证 token"""
# 根据实际认证方式替换,例如 JWT 登录
response = self.client.post("/auth/login", json={
"username": "testuser",
"password": "testpass"
})
if response.status_code == 200:
token = response.json().get("access_token", "")
# 后续所有请求自动携带认证头
self.client.headers.update({"Authorization": f"Bearer {token}"})
# 如果使用 API Key 认证,可直接设置:
# self.client.headers.update({"Authorization": "Bearer YOUR_API_KEY"})
@task(3)
def chat_with_ai(self):
"""AI聊天接口(高频)"""
self.client.post(
"/ai/chat",
json={"prompt": f"什么是Python #{random.randint(1,100)}"},
)
@task(1)
def health_check(self):
"""健康检查(低频)"""
self.client.get("/health")
@task(2)
def search(self):
"""搜索接口(中频)"""
self.client.get(f"/search?q=python&limit=10")
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("性能测试开始")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("性能测试结束")
# 运行:locust -f tests/load_test.py --host=http://localhost:8000
8. 踩坑记录
坑1:async函数中使用同步库
问题:在async函数中用了requests(同步库),阻塞了整个事件循环。
解决:替换为httpx.AsyncClient或aiohttp。如果必须用同步库,用run_in_executor包装:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def sync_to_async():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, blocking_function)
return result
补充:
asyncio.get_event_loop()在 Python 3.10+ 已废弃,应使用asyncio.get_running_loop()。
更简洁的写法是直接使用asyncio.to_thread(blocking_function)(Python 3.9+)。
坑2:连接池配置导致"Too many connections"
问题:PostgreSQL默认max_connections=100,但4个FastAPI实例各开20个连接池就超标了。
解决:计算总连接数,合理分配:
# 每个实例
engine = create_async_engine(
DATABASE_URL,
pool_size=10, # 常规连接
max_overflow=5, # 额外连接
)
# 4个实例 = 10*4 + 5*4 = 60 连接,加上其他服务还有余量
坑3:Redis缓存雪崩
问题:大量缓存同时过期,瞬间大量请求打到数据库。
解决:添加随机延迟:
async def set_with_jitter(self, key: str, value: Any, base_ttl: int):
# 基础TTL + 随机0-10%延迟
import random
jitter = base_ttl * random.uniform(0, 0.1)
await self.set(key, value, ttl=int(base_ttl + jitter))
坑4:限流器在高并发下失效
问题:多个协程同时检查限流,都通过,导致超过限制。
解决:使用asyncio.Lock保证原子性:
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
async with self._lock:
# 检查和扣减必须是原子操作
...
坑5:Prometheus指标影响性能
问题:每个请求都更新指标,高并发下反而成为瓶颈。
解决:使用multiprocessing模式或减少指标维度:
# 不要给每个请求的每个参数都打标签
# 坏:endpoint=具体路径, method=具体方法, status=具体码
# 好:endpoint=/api/*, status=2xx/4xx/5xx
坑6:热重载时连接池泄漏
问题:用uvicorn --reload开发时,代码重载但连接池没清理。
解决:正确使用lifespan管理生命周期:
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化
yield
# 清理
await engine.dispose()
9. 总结
本文覆盖了FastAPI生产环境性能优化的完整方案:
- 异步优化:httpx异步客户端、异步数据库连接池、并发任务处理
- 缓存策略:Redis缓存层、自动缓存中间件、会话存储
- 限流保护:slowapi限流、令牌桶算法、AI API特殊限流
- 监控体系:Prometheus指标、Grafana仪表板、结构化日志
- 性能测试:Locust负载测试
关键指标:
- P99延迟 < 500ms
- 错误率 < 0.1%
- 缓存命中率 > 80%
监控关键指标:
- QPS(每秒请求数)
- P50/P95/P99延迟
- 错误率
- CPU/内存使用率
- AI API延迟和Token消耗
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)