[vllm]evalscope 压测原理
# EvalScope Benchmark 源码学习记录
> 说明:以下内容整理自本次对话,按“问题 → 回答”的形式汇总,方便后续查阅。
> 主题主要围绕:
>
> - `benchmark.py` 主流程
> - `asyncio` 中 `async / await / yield / Task / Queue / Event`
> - `aiohttp.ClientSession / TCPConnector`
> - `ApiPluginBase`
> - 背压、并发、内存占用、任务收尾机制
---
# 目录
1. [benchmark 主流程源码分析](#1-benchmark-主流程源码分析)
2. [时序图 / 架构图](#2-时序图--架构图)
3. [限流、并发、背压图修正版](#3-限流并发背压图修正版)
4. [async yield await 执行原理](#4-async-yield-await-执行原理)
5. [这段代码里有几个协程](#5-这段代码里有几个协程)
6. [内部 generator 是不是独立协程](#6-内部-generator-是不是独立协程)
7. [协程之间的交互图](#7-协程之间的交互图)
8. [rate 控制会不会不准](#8-rate-控制会不会不准)
9. [AioHttpClient 源码原理](#9-aiohttpclient-源码原理)
10. [ApiPluginBase 有什么作用](#10-apipluginbase-有什么作用)
11. [为什么不为每个请求新建 ClientSession](#11-为什么不为每个请求新建-clientsession)
12. [TCPConnector、args.parallel、semaphore 的整体关系](#12-tcpconnectorargsparallelsemaphore-的整体关系)
13. [benchmark.py 中 in_flight 的作用](#13-benchmarkpy-中-in_flight-的作用)
14. [Task 对象是什么,为什么占内存](#14-task-对象是什么为什么占内存)
15. [benchmark_data_queue 和 data_process_completed_event 是干嘛的](#15-benchmark_data_queue-和-data_process_completed_event-是干嘛的)
16. [为什么 queue.put 要 await](#16-为什么-queueput-要-await)
17. [task_done 和 join 的原理](#17-task_done-和-join-的原理)
18. [wait_for(queue.get(), timeout=0.1) 的用法](#18-wait_forqueueget-timeout01-的用法)
19. [这个过程里到底哪些东西占内存](#19-这个过程里到底哪些东西占内存)
---
# 1. benchmark 主流程源码分析
## 问题
详细分析一下这段 benchmark 源码。
## 回答
下面按**整体流程 → 关键函数 → 并发/限流设计 → 数据流 → 潜在问题与改进点**来详细分析。
### 一、整体在做什么
这是一个异步 benchmark 压测框架核心流程,目标是:
1. 根据用户参数构造请求
- 单 prompt 重复压测
- 或从 dataset 中生成消息压测
2. 以一定并发/速率把请求发给目标 API
3. 收集每个请求的 benchmark 数据
4. 实时写入 SQLite 数据库
5. 实时更新聚合指标并打印日志
6. 最后生成 summary 结果
整体是一个典型的**生产者-消费者模型**:
- 生产者
- `get_requests()`:生成请求
- `send_request()`:真正把请求发出去并把结果放入队列
- 消费者
- `statistic_benchmark_metric()`:从队列拿结果,更新 metrics,写 DB,输出日志
主控制逻辑是 `benchmark()`。
---
### 二、全局变量
```python
logger = get_logger()
data_process_completed_event = asyncio.Event()
logger:日志对象data_process_completed_event:异步事件,表示“发送侧是否已经彻底结束”
消费者退出条件:
while not (data_process_completed_event.is_set() and benchmark_data_queue.empty()):
只有当:
- 生产结束
- 队列也空了
消费者才退出。
三、get_requests():请求生成器
async def get_requests(args, api_plugin) -> AsyncGenerator[dict, None]:
这是一个异步生成器,负责源源不断产出 request。
内部有两种模式:
1. _generate_from_prompt()
load_prompt(args.prompt)- 如果
apply_chat_template=True,包装成 chat message api_plugin.build_request(messages)生成请求体- 重复
args.number次 yield
特点:
- prompt 模式里
request只 build 一次,然后重复 yield - 如果后续有人修改这个 request,就有共享对象风险
2. _generate_from_dataset()
DatasetRegistry.get_class(args.dataset)(args)获取数据集处理类build_messages()产生 messages- 先把 messages 读入
dataset_messages - 然后循环使用这些 messages,直到产出
args.number个 request
伪逻辑:
while count < args.number:
messages = dataset_messages[dataset_index]
request = api_plugin.build_request(messages)
if request is not None:
yield request
count += 1
dataset_index = (dataset_index + 1) % num_messages
潜在问题:
- 如果所有
build_request(messages)都返回None,会死循环
3. rate limit
async for request in generator:
yield request
if args.rate != -1:
interval = np.random.exponential(1.0 / args.rate)
await asyncio.sleep(interval)
这表示:
yield request把一个 request 交给调用方- 然后等待一个指数分布间隔
- 用于模拟泊松流量到达过程
即:
rate=-1:不限制速率rate>0:按平均到达率限制请求注入速度
四、send_request():发送一个请求
async def send_request(semaphore, request, benchmark_data_queue, args, client):
逻辑:
async with semaphore:
benchmark_data = await client.post(request)
benchmark_data.update_gpu_usage()
await benchmark_data_queue.put(benchmark_data)
作用:
- 通过
semaphore限制并发 - 调用共享
client.post() - 更新 GPU 使用信息
- 把结果放入队列
五、statistic_benchmark_metric():消费者与统计核心
async def statistic_benchmark_metric(benchmark_data_queue, args, api_plugin):
做的事:
- 初始化
BenchmarkMetrics - 打开 SQLite 连接
- 不断从
benchmark_data_queue取数据 metrics.update_metrics(...)insert_benchmark_data(...)- 每隔
db_commit_interval提交数据库 - 输出日志 / 可视化
- 最后返回
(metrics, result_db_path)
关键退出条件:
while not (data_process_completed_event.is_set() and benchmark_data_queue.empty()):
取队列时:
benchmark_data = await asyncio.wait_for(benchmark_data_queue.get(), timeout=0.1)
表示:
- 最多等 0.1 秒
- 没数据就 timeout
- 然后继续循环检查是否该退出
这是“超时轮询 + Event”式退出机制。
六、connect_test()
压测前做连通性检查:
if is_embedding_or_rerank_api(args.api):
return
if args.no_test_connection:
return
if not await test_connection(args, api_plugin):
raise TimeoutError('Test connection failed')
作用:
- 提前发现地址、鉴权、模型配置问题
七、benchmark():总控函数
主要流程:
- 获取 API plugin
- 创建
benchmark_data_queue - 清空
data_process_completed_event - 执行连接测试
- 创建共享
AioHttpClient - 启动
statistic_benchmark_metric后台任务 - 创建
semaphore - 遍历
get_requests(),不断创建send_requesttask - 用
in_flight限制已创建未完成的 task 数量 - 等所有请求任务完成
await benchmark_data_queue.join()data_process_completed_event.set()- 等统计任务退出
summary_result(...)- 返回结果
八、并发控制设计
这段代码里有四层控制:
1. rate
控制请求生成速度
2. Semaphore(args.parallel)
控制同时真正执行 HTTP 请求的数量
3. max_in_flight
控制已创建未完成 task 数量,防止 OOM
4. Queue(maxsize=...)
控制结果积压,形成背压
九、主要优点
- 插件化强:API / Dataset 可扩展
- 请求发送与统计持久化解耦
- 流式写 SQLite,避免结果全堆内存
- 有背压控制,防止 OOM
- 共享 HTTP client,减少连接开销
- 支持泊松过程限流
十、潜在问题
- dataset 模式可能死循环(所有 request 都是
None) - prompt 模式复用同一个 request 可能有共享修改风险
- 消费者退出机制是轮询式,不如 sentinel 优雅
data_process_completed_event是全局变量,不适合同进程多 benchmark 并发- SQLite commit 放在线程里,虽然能用,但要注意线程模型
args在send_request()中未使用done任务结果未显式消费initial=1的 tqdm 参数可疑
十一、总结
这是一个:
- 插件化
- 异步
- 生产者-消费者
- 带背压
- 流式统计持久化
的 benchmark 执行框架。
2. 时序图 / 架构图
问题
画出这段代码的时序图 / 架构图。
回答
提供了多张 Mermaid 图,包括:
1. 整体架构图
2. 运行时架构图
强调:
- benchmark 主协程
- ProducerSide
- ConsumerSide
3. 完整时序图
描述从 benchmark(args) 开始,到:
- 连接测试
- 创建 client
- 启动统计任务
- 获取 request
- 创建 send_request task
- queue.put / queue.get
- metrics 更新
- DB 写入
- 汇总结果
全过程。
4. 简化版核心时序图
5. 生产者-消费者抽象图
3. 限流、并发、背压图修正版
问题
“这个图专门强调限流、并发、背压”的 Mermaid 渲染失败。
回答
提供了多种更稳的版本。
最稳简化版
中文稳版本
原因分析:
- 中文边标签
- 括号
- 特殊符号
- 混合中英文复杂节点文本
都可能导致 Mermaid 某些渲染器报错。
4. async yield await 执行原理
问题
async for request in generator:
yield request
if args.rate != -1:
interval = np.random.exponential(1.0 / args.rate)
await asyncio.sleep(interval)
这个执行过程是什么?我不懂 async yield await 的执行原理。
回答
核心结论:
get_requests(...)是一个异步生成器- 它被外部通过
async for逐个拉取值 - 每次:
- 内部
yield request把一个 request 交给外部 - 生成器暂停
- 外部处理这个 request
- 外部再次请求下一个值时,生成器从
yield后恢复 - 然后执行
await asyncio.sleep(interval) - 睡醒后再产生下一个 request
- 内部
await 是什么
等一个异步操作完成,先把控制权交回事件循环,稍后再恢复。
例如:
await asyncio.sleep(1)
不是阻塞线程 1 秒,而是:
- 1 秒后再叫醒我
- 现在先去执行别的协程
yield 是什么
产出一个值给外部,然后暂停函数,下次继续。
普通生成器:
def gen():
yield 1
yield 2
async def + yield 是什么
就是异步生成器:
async def gen():
yield 1
await asyncio.sleep(1)
yield 2
外部必须:
async for x in gen():
...
这段 benchmark 代码里的语义
get_requests() 相当于一个“带限流的异步转发器”。
- 外部
benchmark()向get_requests()要 request get_requests()再向内部generator(prompt/dataset)要 request- 把 request
yield给benchmark() - 当
benchmark()下一次再来取时,先 sleep 一个 interval,再取下一条
所以:
yield request
await asyncio.sleep(interval)
不是“yield 后后台继续睡”,而是:
外部下一次来要值时,先从
yield后面恢复,然后 sleep,再生成下一个值。
一个最小示例
async def get_requests():
for i in range(3):
yield i
await asyncio.sleep(1)
外部:
async for x in get_requests():
print(x)
过程:
- 外部要值
- 生成器执行到
yield 0 - 外部拿到 0
- 外部再要值
- 生成器从
yield 0后恢复,执行await sleep(1) - 睡醒后
yield 1 - 外部拿到 1
- 如此往复
一句话总结
yield:把一个值交给调用者,并暂停自己await:把控制权交给事件循环,等异步事件完成后再恢复async for:调用方不断恢复这个异步生成器,直到它结束
5. 这段代码里有几个协程
问题
有几个协程啊?
回答
要分两种口径看:
1. 按 async def / 异步生成器函数种类看
主要有:
get_requests(...)_generate_from_prompt()_generate_from_dataset()send_request(...)statistic_benchmark_metric(...)connect_test(...)benchmark(...)
其中:
benchmark / send_request / statistic_benchmark_metric / connect_test是协程函数get_requests / _generate_from_prompt / _generate_from_dataset是异步生成器函数
2. 按运行时同时存在的执行单元看
一定会有
benchmark(args)主协程statistic_benchmark_metric(...)后台 taskget_requests(...)异步生成器实例_generate_from_prompt()或_generate_from_dataset()的异步生成器实例(二选一)
还有很多个
send_request(...)task,数量随请求数增加
但注意:
- 真正同时进入
client.post()的最多是args.parallel - 已创建未完成 task 最多大约是
args.parallel * args.in_flight_task_multiplier
大致同时存在的数量
固定部分:
- 1 个
benchmark - 1 个
statistic_benchmark_metric - 1 个
get_requests - 1 个内部 generator
再加若干个 send_request task。
但“存在”和“正在发 HTTP”不是同一回事
因为 send_request 还要过:
async with semaphore:
所以:
- 真正同时发请求的最多是
args.parallel - 其余 task 可能只是在等 semaphore
6. 内部 generator 是不是独立协程
问题
“1 个内部 generator(prompt 或 dataset)”这个不是一个协程吧。和 get_requests 是同一个协程吧?
回答
你的理解更接近精确答案。
结论
get_requests()是一个异步生成器对象_generate_from_prompt()/_generate_from_dataset()也是异步生成器对象- 但它们不是独立被
create_task()调度的并发 task - 它们是在
get_requests()的执行过程中,通过:
async for request in generator:
被串行驱动的
更精确地说
它们:
- 不是同一个对象
- 也不是两个并发 task
- 而是同一条执行链上的上下游生成器
类似同步版本:
def outer():
for x in inner():
yield x
异步版本只是:
async def outer():
async for x in inner():
yield x
本质没变:
- 外层驱动内层
- 内层产出值
- 外层再转发
区分两个概念
1. 独立 Task
会被事件循环独立调度,比如:
statistic_benchmark_metricsend_request
2. 异步生成器对象
是被上层 async for 拉取的,不是独立 task,比如:
get_requests_generate_from_prompt / _generate_from_dataset
一句话总结
内部 generator 不是独立并发协程任务,它只是 get_requests() 内部驱动的一个异步生成器。
7. 协程之间的交互图
问题
请画出协程之间的交互。这样我才明白。
回答
图 1:哪些是独立任务,哪些不是
说明:
benchmark:主任务get_requests:被benchmark驱动inner generator:被get_requests驱动statistic_benchmark_metric:独立 tasksend_request:独立 task
图 2:benchmark -> get_requests -> inner generator 拉取关系
这张图说明:
benchmark拉get_requestsget_requests再拉inner generatorinner generator产出值get_requests转手 yield 给benchmark
图 3:完整协程交互图
图 4:暂停 / 恢复图
分类表
| 名称 | 类型 | 是否独立并发 task | 谁驱动它 |
|---|---|---|---|
benchmark |
协程 | 是 | 事件循环主任务 |
statistic_benchmark_metric |
协程 | 是 | create_task |
send_request |
协程 | 是 | create_task |
get_requests |
异步生成器 | 否 | benchmark 的 async for |
_generate_from_prompt/_dataset |
异步生成器 | 否 | get_requests 的 async for |
8. rate 控制会不会不准
问题
await asyncio.sleep(interval) 这里等待过程中让出了协程。万一其他协程执行过程中花了较长时间,那这里的真正等待时间不就超出 interval 了吗?那就不符合预期的 rate 了啊。
回答
对,你理解得完全正确。
结论
await asyncio.sleep(interval) 的语义是:
至少等待这么久,然后由事件循环在有空时恢复你
而不是:
绝对精确地在 interval 秒后立刻恢复
所以实际时间满足:
actual_gap >= interval
如果事件循环忙、其他协程长时间不让出、存在阻塞 I/O 或 CPU 重活,那么实际恢复会更晚,导致实际 rate 偏低。
为什么会这样
await asyncio.sleep(interval) 本质上是:
- 告诉事件循环:
interval秒后我可以恢复 - 到点后也不是马上抢占执行,而是等事件循环调度到我
所以实际等待 = 定时器时间 + 调度延迟。
什么时候会明显超出
- 有 CPU 密集型代码阻塞事件循环
- 有同步阻塞操作(如
time.sleep) - DB / 日志 / 其他协程很重
- OS 调度负载高
所以 rate 不精确吗?
是的,它不是硬实时精确 rate,而是“在 asyncio 条件下尽量接近目标平均速率”。
并且由于本身就用的是指数分布:
np.random.exponential(1.0 / args.rate)
本来目标就是统计意义上的泊松过程,而不是每个间隔严格相等。
现有实现的缺点
当前写法:
yield request
await asyncio.sleep(interval)
属于相对时间等待:
- 一次晚了,后面不会自动追赶
- 会有累计漂移
更精确的做法:按绝对时间调度
比如:
loop = asyncio.get_running_loop()
next_time = loop.time()
async for request in generator:
if args.rate != -1:
interval = np.random.exponential(1.0 / args.rate)
next_time += interval
now = loop.time()
sleep_for = next_time - now
if sleep_for > 0:
await asyncio.sleep(sleep_for)
yield request
这样能:
- 避免累计漂移
- 更贴近理论目标到达时间线
但即使如此,也只能尽量接近,不可能绝对实时。
最准确的一句话
asyncio.sleep(t) 的语义不是:
恰好 t 秒后继续
而是:
不早于 t 秒后,再由事件循环安排恢复
9. AioHttpClient 源码原理
问题
详细解释一下 AioHttpClient 和 test_connection 的原理。
回答
这段代码主要做两件事:
AioHttpClient- 封装
aiohttp.ClientSession - 负责连接池、超时、trace、异常兜底
- 实际请求处理交给
api_plugin.process_request(...)
- 封装
test_connection- benchmark 前发一个最小请求做连通性检测
- 失败则重试直到总超时
一、整体设计思想
1. 网络层和协议层分离
AioHttpClient 负责:
- session
- connector
- timeout
- trace
- 异常兜底
api_plugin 负责:
- 请求体格式
- 流式/非流式响应处理
- token/TTFT 解析
- 输出
BenchmarkData
2. 共享会话复用连接
正式 benchmark 阶段只创建一个 AioHttpClient,内部只创建一个 ClientSession,所有请求共享。
好处:
- 复用 TCP 连接
- 减少 TLS 握手
- TTFT 更稳定
- 更接近真实压测场景
3. 单请求失败不打崩 benchmark
无论超时还是异常,都转换为:
BenchmarkData(success=False, error=str(e))
上层统一统计失败请求。
二、__init__ 做了什么
保存参数:
self.url = args.url
self.headers = {'user-agent': 'modelscope_bench', **(args.headers or {})}
self.total_timeout = args.total_timeout
self.read_timeout = args.read_timeout
self.connect_timeout = args.connect_timeout
self.api_plugin = api_plugin
三、TCPConnector 作用
connector = aiohttp.TCPConnector(
limit=args.parallel or 0,
limit_per_host=args.parallel or 0,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=60,
enable_cleanup_closed=True,
force_close=False,
ssl=('https://' in self.url),
)
注意:
这里不是立刻建立 TCP 连接,而是创建连接池管理规则
参数含义:
limit:整个连接池最大连接数limit_per_host:每个 host 最大连接数ttl_dns_cache=300:DNS 缓存 300 秒use_dns_cache=True:启用 DNS 缓存keepalive_timeout=60:空闲连接保留 60 秒enable_cleanup_closed=True:帮助清理关闭连接force_close=False:请求结束后不强制关闭连接ssl=('https://' in self.url):https 启用 SSL
四、ClientTimeout
client_timeout = aiohttp.ClientTimeout(
total=self.total_timeout, connect=self.connect_timeout, sock_read=self.read_timeout
)
含义:
total:整个请求总超时connect:建连超时sock_read:读取响应超时
五、ClientSession
self.client = aiohttp.ClientSession(
connector=connector,
trust_env=True,
timeout=client_timeout,
trace_configs=[self._create_trace_config()] if args.debug else []
)
作用:
- 绑定连接池
- 绑定默认 timeout
- 从环境变量读取代理
- 挂 trace hooks(debug 时)
六、异步上下文管理器
async def __aenter__(self):
pass
async def __aexit__(self, exc_type, exc, tb):
await self.client.close()
用途:
async with client:
...
注意:
__aenter__更规范应返回self- 当前因为外部没用
as client,所以没出问题
七、trace 机制
挂了三个回调:
on_request_starton_request_chunk_senton_response_chunk_received
用于 debug:
- 请求什么时候开始
- 请求 body 发了什么
- 响应 chunk 收到了什么
对定位流式接口、TTFT 很有帮助。
八、post() 的原理
output = await self.api_plugin.process_request(self.client, self.url, self.headers, body)
return output
也就是说:
AioHttpClient不自己写统一session.post(...)- 而是把 session、url、headers、body 交给 plugin
- 由 plugin 决定是:
- JSON 响应
- SSE 流式响应
- embedding
- rerank
- 自定义协议
九、异常兜底
except asyncio.TimeoutError as e:
...
return BenchmarkData(success=False, error=str(e))
except (aiohttp.ClientConnectorError, Exception) as e:
...
return BenchmarkData(success=False, error=str(e))
优点:
- benchmark 不因单请求失败而崩溃
缺点:
Exception太宽,可能吞掉插件真实 bug
十、test_connection() 原理
async def test_connection(args, api_plugin) -> bool:
流程:
- 记录开始时间
- 内部定义
attempt_connection()- 新建
AioHttpClient - 构造一个
"hello"请求 client.post(...)
- 新建
while True不断重试- 成功就返回
True - 达到总超时就返回
False - 每次失败 sleep 10 秒
注意:
- 每次 attempt 都新建一个
ClientSession - 总重试窗口也使用
args.total_timeout - 这和单次请求 timeout 共用一个参数,语义上不够理想
十一、总结
这段代码本质上是:
用
aiohttp.ClientSession + TCPConnector构造一个带连接池、超时、trace 的异步 HTTP 客户端外壳,
再把具体协议处理委托给api_plugin.process_request(),
从而形成 benchmark 场景下的通用网络访问层。
10. ApiPluginBase 有什么作用
问题
ApiPluginBase 这个类有什么作用?
回答
这个类作用非常关键。
一句话
ApiPluginBase是整个 benchmark 框架里的“API 适配层统一抽象接口”。
它的目的是:
把不同模型服务 / API 的差异隔离开,让 benchmark 主流程不用关心具体协议细节。
一、在系统中的定位
主流程里有:
api_plugin_class = ApiRegistry.get_class(args.api)
api_plugin = api_plugin_class(args)
后面调用:
request = api_plugin.build_request(messages)
output = await self.api_plugin.process_request(self.client, self.url, self.headers, body)
metrics.update_metrics(benchmark_data, api_plugin)
说明 benchmark 主流程只认统一接口,不认具体协议。
二、如果没有这个类会怎样
没有插件基类,就要在主流程里到处写:
if api == "openai":
...
elif api == "vllm":
...
elif api == "embedding":
...
会导致:
- 核心代码耦合严重
- 每加一个 API 都得改 benchmark 核心
- 维护成本高
三、抽象方法们的作用
1. build_request(...)
def build_request(self, messages, param=None) -> Dict:
作用:
把 benchmark 内部统一的输入 messages 转成某个 API 所需的 request body
比如:
- OpenAI chat:
{"model": ..., "messages": ...} - completion:
{"model": ..., "prompt": ...} - embedding:
{"model": ..., "input": ...}
2. parse_responses(...)
def parse_responses(self, responses, request=None, **kwargs) -> Tuple[int, int]:
作用:
从响应里解析出
(prompt_tokens, completion_tokens)
因为不同 API:
- 有的直接有 usage
- 有的流式响应没有 usage
- 有的要自己基于 tokenizer 估算
- embedding / rerank 的 token 语义又不同
3. process_request(...)
async def process_request(self, client_session, url, headers, body) -> BenchmarkData:
这是最核心的方法。
作用:
真正完成一次 HTTP 请求,并把结果组织成统一的
BenchmarkData
原因是:
- 不同 API 的网络交互方式差异太大
- 有的流式,有的非流式
- 有的要统计 TTFT
- 有的 chunk 解析方式不同
所以整个请求处理过程都交给插件。
四、replace_values(...)
@staticmethod
def replace_values(input_json, model, prompt):
作用:
递归替换 JSON 结构里的
%m和%p
比如:
{
"model": "%m",
"messages": [{"role": "user", "content": "%p"}]
}
替换后变成具体 model 和 prompt。
注意:
- 对顶层直接是
str的情况,这个函数当前写法不会把结果返回出去 - 对 dict/list 容器内嵌字符串的替换是有效的
五、设计价值
这个类核心解决的是:
不同 API 的请求格式、响应格式、流式协议、token 统计方式都不同,但 benchmark 框架希望统一调度、统一统计。
它同时兼具:
- 策略模式(不同 API 用不同策略)
- 适配器模式(把外部协议适配成框架统一结构)
六、一句话总结
ApiPluginBase 的作用:
- 定义统一接口
- 隔离不同 API 差异
- 支撑插件化扩展
- 保证所有 API 最终都输出统一
BenchmarkData
11. 为什么不为每个请求新建 ClientSession
问题
“不是每个请求都新建一个 ClientSession”这个是干嘛用的?
回答
一句话
ClientSession不是“一次请求对象”,而是“带连接池的共享 HTTP 会话”。
不为每个请求都新建 ClientSession,是为了:
- 复用 TCP / TLS 连接
- 共享连接池
- 减少系统开销和延迟
- 让 benchmark 更接近真实服务调用方式
一、ClientSession 是什么
不是“一次请求”,而是:
- 默认 header
- timeout
- connector
- cookie / auth / proxy 配置
- 连接池
这些共享状态的集合。
二、如果每个请求都新建 session,会怎样
比如:
session = aiohttp.ClientSession()
await session.post(...)
await session.close()
问题:
- 每次都重建 TCP / TLS 连接,开销很大
- 无法利用连接池
- TTFT 和 latency 会包含建连成本,污染 benchmark
- socket / 文件描述符 / TIME_WAIT 增多
- 高并发下资源消耗更大
三、Session 与连接池的关系
connector = aiohttp.TCPConnector(...)
session = aiohttp.ClientSession(connector=connector)
可以理解为:
TCPConnector:底层连接池管理器ClientSession:上层 HTTP 会话接口,使用 connector 的连接池
四、复用 session 的实际效果
第一次请求:
- 建 TCP
- 建 TLS(如果 https)
- 发请求
- 收响应
- 连接返回池中
后续请求:
- 直接复用已有 keep-alive 连接
- 减少握手和延迟
五、为什么对 TTFT 很重要
如果每次都新建连接,TTFT 会混入:
- TCP 建连时间
- TLS 握手时间
这不利于测模型服务端本身的首 token 性能。
共享 session 更接近真实线上情况。
六、一句话总结
不为每个请求都新建 ClientSession,是为了:
复用连接、减少握手和建连开销、降低延迟、减少资源消耗,并让 benchmark 更真实。
12. TCPConnector、args.parallel、semaphore 的整体关系
问题
这里为啥要创建连接呢?这里的 args.parallel 又是干嘛的?从整体上描述整个过程。与 semaphore 的关系,为啥要创建 TCP 连接呢?
回答
一、最关键结论
connector = aiohttp.TCPConnector(...)
self.client = aiohttp.ClientSession(connector=connector, ...)
这里不是立刻创建 TCP 连接。
而是在:
- 创建一个连接池管理器
- 配置未来发请求时:
- 最多多少连接
- 连接如何复用
- DNS 如何缓存
- keepalive 多久
- SSL 如何处理
真正 TCP 建连一般发生在第一次:
await session.post(...)
时。
二、一次 benchmark 的整体层次
第 1 层:请求生成层
get_requests()
第 2 层:任务调度层
benchmark() 创建 send_request task
第 3 层:网络执行层
send_request → AioHttpClient.post() → ClientSession → TCPConnector
第 4 层:结果消费层
statistic_benchmark_metric()
三、HTTP 为何需要 TCP 连接
HTTP 请求底层必须通过网络连接。
HTTP
- DNS
- TCP
- HTTP request/response
HTTPS
- DNS
- TCP
- TLS
- HTTP request/response
所以 HTTP 请求一定依赖 TCP 连接。
四、TCPConnector 的作用
TCPConnector 是 aiohttp 的底层连接池管理器,用来管理:
- 最大连接数
- 每 host 连接数
- DNS 缓存
- keepalive
- SSL
本质上:
它不是“现在就创建连接”,而是“制定连接池规则”。
五、args.parallel 是什么
这是 benchmark 的核心并发参数。
1. 在 Semaphore(args.parallel) 中
表示:
同时最多多少个
send_request能进入请求执行区
2. 在 TCPConnector(limit=args.parallel) 中
表示:
底层连接池最多允许多少连接
所以:
Semaphore管应用层请求并发Connector管网络层连接资源
六、Semaphore 和 TCPConnector 的关系
一句话:
Semaphore:控制“谁有资格开始发请求”TCPConnector:控制“底层连接资源怎么分配和复用”
它们不是重复,而是两道不同层次的限制。
七、完整例子
假设:
parallel = 4
于是:
semaphore = Semaphore(4)
connector = TCPConnector(limit=4, limit_per_host=4)
如果创建了 20 个 send_request task:
第一步
先过 semaphore:
- 只有 4 个 task 能进入
- 其余等待 semaphore
第二步
进入 client.post() 后,再由 session/connector 分配连接:
- 尝试复用已有连接
- 没有则创建新连接
- 但总数不超过 4
八、为什么有了 semaphore 还要 connector limit
因为:
- 两者控制层次不同
- 连接池复用 / DNS cache / keepalive 只能由 connector 管
- 明确底层资源边界更稳健
- 双重限制更安全
九、最准确的一句话
TCPConnector(...) 的作用不是“现在就创建 TCP 连接”,而是:
创建一个底层连接池管理器,规定未来 benchmark 发 HTTP 请求时,最多能用多少连接、连接如何复用、DNS 如何缓存、HTTPS 如何处理。
而 args.parallel 的含义是:
希望网络层最大连接能力和 benchmark 的目标并发度保持一致。
13. benchmark.py 中 in_flight 的作用
问题
benchmark.py 184-200 的 in_flight 干嘛用的?
回答
一句话
in_flight用来限制“已经创建但还没结束”的send_requesttask 数量,避免任务堆积导致内存暴涨 / OOM。
一、为什么有了 semaphore 还不够
很多人会想:
已经有
Semaphore(args.parallel)了,为什么还需要in_flight?
因为:
semaphore只限制真正同时执行client.post()的任务数- 它不限制已创建但正在等待 semaphore 的 task 数量
如果你直接:
async for request in get_requests(...):
asyncio.create_task(send_request(...))
那么:
- 只有
parallel个 task 真正在发请求 - 但其余成千上万 task 已经创建出来了,都会占内存
二、in_flight 是什么
in_flight: set[asyncio.Task] = set()
保存的是:
当前已经
create_task、但尚未结束的send_request task
三、max_in_flight
max_in_flight = args.parallel * args.in_flight_task_multiplier
表示:
允许同时存在的未完成 task 数量上限
例如:
parallel=10multiplier=3
则:
max_in_flight = 30
表示:
- 最多 30 个 task 已创建未完成
- 其中真正活跃执行 HTTP 的仍最多 10 个
- 其余最多 20 个在等 semaphore / 收尾
四、代码怎么工作
1. 初始化
semaphore = asyncio.Semaphore(args.parallel)
in_flight = set()
max_in_flight = args.parallel * args.in_flight_task_multiplier
2. 每来一个 request 先检查 in_flight 是否超限
if len(in_flight) >= max_in_flight:
done, pending = await asyncio.wait(in_flight, return_when=asyncio.FIRST_COMPLETED)
in_flight = pending
含义:
- 如果 task 太多了,就先等至少一个完成
done:已完成pending:未完成- 把
in_flight更新成pending
3. 再创建新任务
task = asyncio.create_task(send_request(...))
in_flight.add(task)
4. 收尾
await asyncio.gather(*in_flight, return_exceptions=True)
等待剩余任务都完成。
五、和 semaphore 的区别
semaphore
限制:
同时正在干活的工人数
in_flight
限制:
工地里总共挤了多少工人(含排队的)
六、本质
in_flight 的核心作用:
- 防止一下子创建过多 task
- 防止内存暴涨
- 给任务创建过程增加背压
- 降低事件循环调度压力
14. Task 对象是什么,为什么占内存
问题
task 对象是什么东西?为什么会占用内存?
回答
一句话
asyncio.Task是一个真实的 Python 对象,它保存协程执行状态、局部变量引用、等待关系、结果/异常、回调等信息,所以会占内存。
一、Task 是什么
当你写:
task = asyncio.create_task(send_request(...))
这里发生的是:
send_request(...)产生一个协程对象create_task(...)把这个协程交给事件循环调度- 包装成一个
Task对象
所以:
- 协程对象:待运行的异步过程
- Task 对象:被事件循环管理的执行单元
二、Task 内部大概保存什么
1. 它包装的协程对象
2. 当前执行到哪一步
比如停在:
await semaphore.acquire()await client.post(...)await queue.put(...)
3. 局部变量和参数引用
例如:
requestargsclientqueuesemaphore
4. Future / callback / 等待链
5. 结果 / 异常 / 状态
比如:
- pending
- running
- done
- cancelled
三、为什么“只是等 semaphore”的 task 也占内存
即使 task 只是卡在:
async with semaphore:
它也还是活着的 task,仍然持有:
- 协程对象
- 当前暂停现场
- 对
request、args、client等对象的引用 - 等待状态
所以不是零成本。
四、为什么大量 task 会吃内存
假设创建了 10 万个 task:
- 10 万个 Task 对象
- 10 万个协程对象
- 10 万份局部变量引用
- 10 万个 request 引用
- 事件循环还要管理它们
就会导致:
- 内存膨胀
- GC 压力大
- 调度压力大
五、Task 的内存不只是它自己
更大的问题往往是:
Task 会拖住一串它引用的对象一起活着。
比如每个 task 都引用一个很大的 request,那么即使 Task 对象本体不大,总体内存也会很可观。
六、一句话总结
Task 占内存的原因:
- Task 是真实对象
- 它要保存异步执行现场
- 它持有大量上下文引用
- 大量 task 会拖住大量 request / response / 参数对象不被回收
15. benchmark_data_queue 和 data_process_completed_event 是干嘛的
问题
benchmark_data_queue: asyncio.Queue = asyncio.Queue(maxsize=max(1, args.parallel * args.queue_size_multiplier))
data_process_completed_event.clear()
这两个干嘛的?什么原理?
回答
一句话
benchmark_data_queue:发送侧和统计侧之间的异步结果通道data_process_completed_event.clear():把“发送已经彻底结束”的标志重置为未完成状态
它们一起构成了 benchmark 中的生产者-消费者协调机制。
一、为什么需要它们
发送侧
send_request(...):
benchmark_data = await client.post(request)
await benchmark_data_queue.put(benchmark_data)
它是生产者。
统计侧
statistic_benchmark_metric(...):
benchmark_data = await benchmark_data_queue.get()
metrics.update_metrics(...)
insert_benchmark_data(...)
benchmark_data_queue.task_done()
它是消费者。
所以需要一个中间队列来传递结果。
同时消费者还要知道:
“是暂时没数据,还是以后再也不会有新数据了?”
这就需要 Event。
二、benchmark_data_queue 的作用
是一个有限容量的 asyncio.Queue。
作用:
- 解耦请求发送与统计/持久化
- 缓冲短时波动
- 提供背压
- 支持
join/task_done收尾同步
三、为什么设置 maxsize
maxsize=max(1, args.parallel * args.queue_size_multiplier)
因为如果队列无限长,当消费慢于生产时,内存会不断增长。
有了 maxsize 后:
- 队列满时
await queue.put(...)会挂起生产者- 形成背压
外层的 max(1, ...) 是防止 maxsize=0,因为 Queue(maxsize=0) 在 asyncio 中表示无限容量。
四、data_process_completed_event.clear() 的作用
data_process_completed_event 是模块级全局变量:
data_process_completed_event = asyncio.Event()
clear() 表示:
本轮 benchmark 开始前,把“发送全部结束”标志重置为 False。
因为上一次 benchmark 结束时可能已经 set() 过。
五、为什么消费者不能只看 queue.empty()
因为:
- 队列暂时为空 ≠ 以后再也不会有数据
比如:
- 队列现在空了
- 但还有很多
send_request正在路上 - 过会儿还会
put()新结果
所以消费者退出条件必须是:
data_process_completed_event.is_set() and benchmark_data_queue.empty()
即:
- 不会再有新数据
- 当前队列也空了
六、它们配合方式
Queue
负责传递数据、做缓冲、背压
Event
负责告诉消费者:
发送侧已经彻底结束,不会再产生新数据了
七、一句话总结
benchmark_data_queue:有限容量的异步结果队列data_process_completed_event.clear():每轮 benchmark 启动时重置“生产完成”标志
16. 为什么 queue.put 要 await
问题
await benchmark_data_queue.put(benchmark_data)
put 为啥要加 await?
回答
一句话
asyncio.Queue.put()可能因为队列满而需要等待,所以它是异步操作,必须await。
一、为什么 put() 不是普通函数
因为 asyncio.Queue 是有容量上限的:
Queue(maxsize=...)
所以 put(item) 有两种情况:
情况 1:队列没满
立刻放进去,几乎马上返回
情况 2:队列满了
必须等消费者取走一个元素腾出空间,当前协程要挂起
这就是为什么 put() 是异步的。
二、如果不 await
benchmark_data_queue.put(benchmark_data)
你只是得到了一个 coroutine / awaitable,并没有真正执行 put。
所以要么:
- 根本没放进去
- 要么出现 “coroutine was never awaited” 警告
三、这里 await 的真实语义
不是“放元素这个动作慢”,而是:
这个动作可能因为队列已满而需要等待。
四、为什么这是好事
如果消费者处理太慢,而生产者持续 put():
- 没有限制就会导致队列无限膨胀
- 内存不断增长
加上:
await queue.put(item)
就能实现背压:
- 队列满时,发送侧暂停
- 等消费者处理一部分再继续
五、对称关系
await queue.get():队列空时等待新数据await queue.put(item):队列满时等待空位
两者都可能挂起协程,所以都要 await。
17. task_done 和 join 的原理
问题
task_done 和 join 没有看懂。
回答
一句话
put/get管“队列里有没有元素”task_done/join管“这些元素是否已经被真正处理完”
这是关键。
一、get() 不等于“处理完”
消费者里:
benchmark_data = await benchmark_data_queue.get()
metrics.update_metrics(...)
insert_benchmark_data(...)
benchmark_data_queue.task_done()
含义:
get()只是从队列里拿到一项- 你还没处理完
- 只有做完 metrics / DB / 日志后,才调用
task_done()
所以:
get()= 领取任务task_done()= 完成任务
二、join() 是干嘛的
await benchmark_data_queue.join()
表示:
等待所有已经放进队列的项目,都被消费者处理完并调用了
task_done()
三、内部原理:unfinished_tasks 计数器
Queue 内部可以想象维护了一个计数器:
unfinished_tasks
规则:
- 每次
put(item):unfinished_tasks += 1 - 每次
task_done():unfinished_tasks -= 1 - 当
unfinished_tasks == 0时,join()返回
四、为什么 join() 比 empty() 更强
queue.empty()
只表示当前容器里有没有元素
queue.join()
表示所有放进去的元素都已经真正处理完了
比如:
- item 被
get()拿走后,队列可能已经空了 - 但消费者还在处理这个 item
- 所以
empty()是 True,join()仍不会返回
五、如果忘了 task_done()
那 unfinished_tasks 永远减不回去,await queue.join() 会永远卡住。
六、如果 task_done() 调多了
会报错,因为没有那么多未完成任务可减了。
七、在 benchmark 里的意义
await benchmark_data_queue.join()
表示:
等所有已经进入结果队列的
benchmark_data都被真正处理完(统计、写库、日志)后,主协程才继续往下收尾。
八、一句话记忆法
put():登记一个待处理任务get():领取一个任务task_done():声明任务处理完成join():等待所有登记的任务都完成
18. wait_for(queue.get(), timeout=0.1) 的用法
问题
await asyncio.wait_for(benchmark_data_queue.get(), timeout=0.1)
这是什么用法呢?
回答
一句话
等待从队列取数据,但最多只等 0.1 秒。
如果 0.1 秒内没拿到,就超时醒来,去检查别的退出条件。
一、拆开看
1. benchmark_data_queue.get()
含义:
从异步队列里取一个元素;如果队列空,就一直等。
2. asyncio.wait_for(..., timeout=0.1)
含义:
给这个异步操作套一个超时:最多等 0.1 秒。
- 如果 0.1 秒内完成,返回结果
- 否则抛
asyncio.TimeoutError
二、为什么不直接 await queue.get()
如果写:
benchmark_data = await benchmark_data_queue.get()
当:
- 队列空
- 生产也结束
- 再也不会有新数据
消费者就会永远卡在这里,没机会回到循环顶部检查退出条件。
三、wait_for(..., timeout=0.1) 的作用
让消费者“定期醒来”:
- 尝试等队列数据
- 只等 0.1 秒
- 超时就醒来
- 回到 while 顶部检查:
- 生产是否结束
- 队列是否为空
这是一种“超时轮询式退出机制”。
四、为什么 timeout 设 0.1 秒
是折中:
- 太大:退出不及时
- 太小:轮询太频繁,开销变大
0.1 秒是比较常见的折中值。
五、超时后为什么 continue
except asyncio.TimeoutError:
continue
这里的 TimeoutError 不是错误,而是一种正常控制流:
暂时没数据,那就继续下一轮,检查是否该退出。
六、一句话总结
wait_for(queue.get(), timeout=0.1) 的本质是:
防止
queue.get()无限阻塞,使消费者可以周期性地检查退出条件。
19. 这个过程里到底哪些东西占内存
问题
请问这个过程,占内存的是哪些东西呢。是 Queue 里的东西吗,task 和 queue 本身会占用嘛?
回答
一句话
会,Queue 里的元素、Task、本身的 Queue 对象都会占内存。
但实际 benchmark 里,最大的通常不是 Queue 壳子本身,而是它里面装的对象,以及未完成 Task 持有的引用链。
一、主要内存来源
asyncio.Task对象本身- Task 持有的协程执行现场
- Task 引用的参数和上下文
requestargsclientqueuesemaphore
benchmark_data_queue里尚未被消费的benchmark_dataBenchmarkData本身可能引用的响应内容、统计字段、错误信息dataset_messages预加载的数据- HTTP 客户端连接池 / buffer / response chunk
- metrics / SQLite / 日志临时对象
二、直接回答两个问题
1. “是 Queue 里的东西吗?”
是。
Queue 里积压的 benchmark_data 会占内存,而且常常是重要来源。
2. “task 和 queue 本身会占用嘛?”
也会。
- Queue 壳子占内存,但通常不是最大头
- Task 本身也占,而且会拖住很多引用对象一起活着
三、按三层理解最清晰
第一层:容器壳子本身
- Task
- Queue
- set(in_flight)
- Session / Connector
- Metrics
第二层:容器里装的元素
- in_flight 里的 task
- queue 里的
benchmark_data - dataset_messages 里的 message
第三层:元素继续引用的对象
- request
- response
- messages
- raw chunk
- error string
- 其他上下文对象
真正大的往往是第三层。
四、Task 为什么吃内存
每个未完成 task 至少保存:
- Task 对象
- 协程对象
- 当前暂停状态
- 局部变量引用
requestclientqueueargs
所以如果有很多 task,它们会拖住很多 request 和上下文对象一起活着。
五、Queue 为什么会占内存
Queue 本身会存:
- 容器结构
- 等待 put/get 的协程队列
- unfinished_tasks 计数器
- join 的等待信息
更关键的是:
Queue 里的每个
benchmark_data元素都必须一直活着,直到被消费。
如果每个 BenchmarkData 很重(含完整 response/chunk),积压就很危险。
六、这段 benchmark 最容易吃内存的几个点
- 大量
in_flighttask - queue 中积压的
benchmark_data - 预加载的
dataset_messages - 每个 task 持有的 request
BenchmarkData中持有的 response / chunk / 文本
七、为什么要同时限制 in_flight 和 queue maxsize
它们分别卡住两类堆积:
in_flight
限制:
活着的 task 数量,以及 task 持有的 request / 上下文对象数量
queue maxsize
限制:
尚未被消费的
BenchmarkData数量
八、一句话总结
回答你的问题:
- Queue 里的东西会占内存
- Task 本身也会占,而且会拖住一串对象
- Queue 本身也占,但通常不是主要矛盾
- 实际大头一般是:
- 未完成 task
- task 持有的 request / response / 上下文
- queue 中积压的
BenchmarkData - 预加载的数据集内容
总结
本次对话核心收获包括:
benchmark.py是一个异步生产者-消费者模型get_requests/ 内部 generator 是异步生成器,不是独立 tasksend_request/statistic_benchmark_metric是真正独立调度的 taskSemaphore控制活跃并发,in_flight控制已创建未完成 task 数Queue(maxsize)负责结果缓冲和背压Event用来告诉消费者“是否不会再有新数据”task_done/join是“处理完成同步机制”wait_for(queue.get(), timeout=0.1)是为了让消费者定期醒来检查退出条件ClientSession + TCPConnector是共享连接池,不是每次请求新建会话ApiPluginBase是整个 benchmark 的 API 适配层接口- 内存主要消耗在:
- 未完成 task
- queue 中的结果对象
- 任务与结果对象持有的 request/response/messages 等引用链
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)