# EvalScope Benchmark 源码学习记录

> 说明:以下内容整理自本次对话,按“问题 → 回答”的形式汇总,方便后续查阅。
> 主题主要围绕:
>
> - `benchmark.py` 主流程
> - `asyncio` 中 `async / await / yield / Task / Queue / Event`
> - `aiohttp.ClientSession / TCPConnector`
> - `ApiPluginBase`
> - 背压、并发、内存占用、任务收尾机制

---

# 目录

1. [benchmark 主流程源码分析](#1-benchmark-主流程源码分析)
2. [时序图 / 架构图](#2-时序图--架构图)
3. [限流、并发、背压图修正版](#3-限流并发背压图修正版)
4. [async yield await 执行原理](#4-async-yield-await-执行原理)
5. [这段代码里有几个协程](#5-这段代码里有几个协程)
6. [内部 generator 是不是独立协程](#6-内部-generator-是不是独立协程)
7. [协程之间的交互图](#7-协程之间的交互图)
8. [rate 控制会不会不准](#8-rate-控制会不会不准)
9. [AioHttpClient 源码原理](#9-aiohttpclient-源码原理)
10. [ApiPluginBase 有什么作用](#10-apipluginbase-有什么作用)
11. [为什么不为每个请求新建 ClientSession](#11-为什么不为每个请求新建-clientsession)
12. [TCPConnector、args.parallel、semaphore 的整体关系](#12-tcpconnectorargsparallelsemaphore-的整体关系)
13. [benchmark.py 中 in_flight 的作用](#13-benchmarkpy-中-in_flight-的作用)
14. [Task 对象是什么,为什么占内存](#14-task-对象是什么为什么占内存)
15. [benchmark_data_queue 和 data_process_completed_event 是干嘛的](#15-benchmark_data_queue-和-data_process_completed_event-是干嘛的)
16. [为什么 queue.put 要 await](#16-为什么-queueput-要-await)
17. [task_done 和 join 的原理](#17-task_done-和-join-的原理)
18. [wait_for(queue.get(), timeout=0.1) 的用法](#18-wait_forqueueget-timeout01-的用法)
19. [这个过程里到底哪些东西占内存](#19-这个过程里到底哪些东西占内存)

---

# 1. benchmark 主流程源码分析

## 问题

详细分析一下这段 benchmark 源码。

## 回答

下面按**整体流程 → 关键函数 → 并发/限流设计 → 数据流 → 潜在问题与改进点**来详细分析。

### 一、整体在做什么

这是一个异步 benchmark 压测框架核心流程,目标是:

1. 根据用户参数构造请求
   - 单 prompt 重复压测
   - 或从 dataset 中生成消息压测
2. 以一定并发/速率把请求发给目标 API
3. 收集每个请求的 benchmark 数据
4. 实时写入 SQLite 数据库
5. 实时更新聚合指标并打印日志
6. 最后生成 summary 结果

整体是一个典型的**生产者-消费者模型**:

- 生产者
  - `get_requests()`:生成请求
  - `send_request()`:真正把请求发出去并把结果放入队列
- 消费者
  - `statistic_benchmark_metric()`:从队列拿结果,更新 metrics,写 DB,输出日志

主控制逻辑是 `benchmark()`。

---

### 二、全局变量

```python
logger = get_logger()
data_process_completed_event = asyncio.Event()
  • logger:日志对象
  • data_process_completed_event:异步事件,表示“发送侧是否已经彻底结束”

消费者退出条件:

while not (data_process_completed_event.is_set() and benchmark_data_queue.empty()):

只有当:

  • 生产结束
  • 队列也空了

消费者才退出。


三、get_requests():请求生成器

async def get_requests(args, api_plugin) -> AsyncGenerator[dict, None]:

这是一个异步生成器,负责源源不断产出 request。

内部有两种模式:

1. _generate_from_prompt()
  • load_prompt(args.prompt)
  • 如果 apply_chat_template=True,包装成 chat message
  • api_plugin.build_request(messages) 生成请求体
  • 重复 args.number 次 yield

特点:

  • prompt 模式里 request 只 build 一次,然后重复 yield
  • 如果后续有人修改这个 request,就有共享对象风险

2. _generate_from_dataset()
  • DatasetRegistry.get_class(args.dataset)(args) 获取数据集处理类
  • build_messages() 产生 messages
  • 先把 messages 读入 dataset_messages
  • 然后循环使用这些 messages,直到产出 args.number 个 request

伪逻辑:

while count < args.number:
    messages = dataset_messages[dataset_index]
    request = api_plugin.build_request(messages)
    if request is not None:
        yield request
        count += 1
    dataset_index = (dataset_index + 1) % num_messages

潜在问题:

  • 如果所有 build_request(messages) 都返回 None,会死循环

3. rate limit
async for request in generator:
    yield request
    if args.rate != -1:
        interval = np.random.exponential(1.0 / args.rate)
        await asyncio.sleep(interval)

这表示:

  • yield request 把一个 request 交给调用方
  • 然后等待一个指数分布间隔
  • 用于模拟泊松流量到达过程

即:

  • rate=-1:不限制速率
  • rate>0:按平均到达率限制请求注入速度

四、send_request():发送一个请求

async def send_request(semaphore, request, benchmark_data_queue, args, client):

逻辑:

async with semaphore:
    benchmark_data = await client.post(request)
benchmark_data.update_gpu_usage()
await benchmark_data_queue.put(benchmark_data)

作用:

  1. 通过 semaphore 限制并发
  2. 调用共享 client.post()
  3. 更新 GPU 使用信息
  4. 把结果放入队列

五、statistic_benchmark_metric():消费者与统计核心

async def statistic_benchmark_metric(benchmark_data_queue, args, api_plugin):

做的事:

  1. 初始化 BenchmarkMetrics
  2. 打开 SQLite 连接
  3. 不断从 benchmark_data_queue 取数据
  4. metrics.update_metrics(...)
  5. insert_benchmark_data(...)
  6. 每隔 db_commit_interval 提交数据库
  7. 输出日志 / 可视化
  8. 最后返回 (metrics, result_db_path)

关键退出条件:

while not (data_process_completed_event.is_set() and benchmark_data_queue.empty()):

取队列时:

benchmark_data = await asyncio.wait_for(benchmark_data_queue.get(), timeout=0.1)

表示:

  • 最多等 0.1 秒
  • 没数据就 timeout
  • 然后继续循环检查是否该退出

这是“超时轮询 + Event”式退出机制。


六、connect_test()

压测前做连通性检查:

if is_embedding_or_rerank_api(args.api):
    return

if args.no_test_connection:
    return

if not await test_connection(args, api_plugin):
    raise TimeoutError('Test connection failed')

作用:

  • 提前发现地址、鉴权、模型配置问题

七、benchmark():总控函数

主要流程:

  1. 获取 API plugin
  2. 创建 benchmark_data_queue
  3. 清空 data_process_completed_event
  4. 执行连接测试
  5. 创建共享 AioHttpClient
  6. 启动 statistic_benchmark_metric 后台任务
  7. 创建 semaphore
  8. 遍历 get_requests(),不断创建 send_request task
  9. in_flight 限制已创建未完成的 task 数量
  10. 等所有请求任务完成
  11. await benchmark_data_queue.join()
  12. data_process_completed_event.set()
  13. 等统计任务退出
  14. summary_result(...)
  15. 返回结果

八、并发控制设计

这段代码里有四层控制:

1. rate

控制请求生成速度

2. Semaphore(args.parallel)

控制同时真正执行 HTTP 请求的数量

3. max_in_flight

控制已创建未完成 task 数量,防止 OOM

4. Queue(maxsize=...)

控制结果积压,形成背压


九、主要优点

  1. 插件化强:API / Dataset 可扩展
  2. 请求发送与统计持久化解耦
  3. 流式写 SQLite,避免结果全堆内存
  4. 有背压控制,防止 OOM
  5. 共享 HTTP client,减少连接开销
  6. 支持泊松过程限流

十、潜在问题

  1. dataset 模式可能死循环(所有 request 都是 None
  2. prompt 模式复用同一个 request 可能有共享修改风险
  3. 消费者退出机制是轮询式,不如 sentinel 优雅
  4. data_process_completed_event 是全局变量,不适合同进程多 benchmark 并发
  5. SQLite commit 放在线程里,虽然能用,但要注意线程模型
  6. argssend_request() 中未使用
  7. done 任务结果未显式消费
  8. initial=1 的 tqdm 参数可疑

十一、总结

这是一个:

  • 插件化
  • 异步
  • 生产者-消费者
  • 带背压
  • 流式统计持久化

的 benchmark 执行框架。


2. 时序图 / 架构图

问题

画出这段代码的时序图 / 架构图。

回答

提供了多张 Mermaid 图,包括:

1. 整体架构图

渲染错误: Mermaid 渲染失败: Parse error on line 2: ...t LR A[benchmark(args)] --> B[ApiReg ----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'PS'

2. 运行时架构图

强调:

  • benchmark 主协程
  • ProducerSide
  • ConsumerSide

3. 完整时序图

描述从 benchmark(args) 开始,到:

  • 连接测试
  • 创建 client
  • 启动统计任务
  • 获取 request
  • 创建 send_request task
  • queue.put / queue.get
  • metrics 更新
  • DB 写入
  • 汇总结果

全过程。


4. 简化版核心时序图

summary_result SQLite statistic_benchmark_metric Queue AioHttpClient send_request get_requests benchmark summary_result SQLite statistic_benchmark_metric Queue AioHttpClient send_request get_requests benchmark loop [生成请求] loop [消费结果] 启动统计消费者任务 获取 request request 创建发送任务 post(request) benchmark_data put(benchmark_data) get() benchmark_data update_metrics() insert + commit join() 通知结束 metrics summary_result() final results

5. 生产者-消费者抽象图

请求源 Prompt / Dataset

get_requests

send_request tasks

AioHttpClient.post

benchmark_data_queue

statistic_benchmark_metric

BenchmarkMetrics

SQLite

Logger / Visualizer

summary_result

最终 benchmark 报告


3. 限流、并发、背压图修正版

问题

“这个图专门强调限流、并发、背压”的 Mermaid 渲染失败。

回答

提供了多种更稳的版本。

最稳简化版

get_requests

Rate limiter

create_task send_request

max_in_flight limit

Semaphore parallel limit

client.post

Queue maxsize limit

statistic_benchmark_metric

DB commit metrics log

中文稳版本

请求生成 get_requests

限流 rate control

创建发送任务

in flight 数量限制

并发限制 semaphore

发送请求 client.post

结果队列 queue

统计消费者

写库和日志

原因分析:

  • 中文边标签
  • 括号
  • 特殊符号
  • 混合中英文复杂节点文本

都可能导致 Mermaid 某些渲染器报错。


4. async yield await 执行原理

问题

async for request in generator:
    yield request
    if args.rate != -1:
        interval = np.random.exponential(1.0 / args.rate)
        await asyncio.sleep(interval)

这个执行过程是什么?我不懂 async yield await 的执行原理。

回答

核心结论:

  • get_requests(...) 是一个异步生成器
  • 它被外部通过 async for 逐个拉取值
  • 每次:
    1. 内部 yield request 把一个 request 交给外部
    2. 生成器暂停
    3. 外部处理这个 request
    4. 外部再次请求下一个值时,生成器从 yield 后恢复
    5. 然后执行 await asyncio.sleep(interval)
    6. 睡醒后再产生下一个 request

await 是什么

等一个异步操作完成,先把控制权交回事件循环,稍后再恢复。

例如:

await asyncio.sleep(1)

不是阻塞线程 1 秒,而是:

  • 1 秒后再叫醒我
  • 现在先去执行别的协程

yield 是什么

产出一个值给外部,然后暂停函数,下次继续。

普通生成器:

def gen():
    yield 1
    yield 2

async def + yield 是什么

就是异步生成器

async def gen():
    yield 1
    await asyncio.sleep(1)
    yield 2

外部必须:

async for x in gen():
    ...

这段 benchmark 代码里的语义

get_requests() 相当于一个“带限流的异步转发器”。

  • 外部 benchmark()get_requests() 要 request
  • get_requests() 再向内部 generator(prompt/dataset)要 request
  • 把 request yieldbenchmark()
  • benchmark() 下一次再来取时,先 sleep 一个 interval,再取下一条

所以:

yield request
await asyncio.sleep(interval)

不是“yield 后后台继续睡”,而是:

外部下一次来要值时,先从 yield 后面恢复,然后 sleep,再生成下一个值。


一个最小示例

async def get_requests():
    for i in range(3):
        yield i
        await asyncio.sleep(1)

外部:

async for x in get_requests():
    print(x)

过程:

  1. 外部要值
  2. 生成器执行到 yield 0
  3. 外部拿到 0
  4. 外部再要值
  5. 生成器从 yield 0 后恢复,执行 await sleep(1)
  6. 睡醒后 yield 1
  7. 外部拿到 1
  8. 如此往复

一句话总结

  • yield:把一个值交给调用者,并暂停自己
  • await:把控制权交给事件循环,等异步事件完成后再恢复
  • async for:调用方不断恢复这个异步生成器,直到它结束

5. 这段代码里有几个协程

问题

有几个协程啊?

回答

要分两种口径看:


1. 按 async def / 异步生成器函数种类看

主要有:

  1. get_requests(...)
  2. _generate_from_prompt()
  3. _generate_from_dataset()
  4. send_request(...)
  5. statistic_benchmark_metric(...)
  6. connect_test(...)
  7. benchmark(...)

其中:

  • benchmark / send_request / statistic_benchmark_metric / connect_test 是协程函数
  • get_requests / _generate_from_prompt / _generate_from_dataset 是异步生成器函数

2. 按运行时同时存在的执行单元看

一定会有
  1. benchmark(args) 主协程
  2. statistic_benchmark_metric(...) 后台 task
  3. get_requests(...) 异步生成器实例
  4. _generate_from_prompt()_generate_from_dataset() 的异步生成器实例(二选一)
还有很多个
  1. send_request(...) task,数量随请求数增加

但注意:

  • 真正同时进入 client.post() 的最多是 args.parallel
  • 已创建未完成 task 最多大约是
    args.parallel * args.in_flight_task_multiplier

大致同时存在的数量

固定部分:

  • 1 个 benchmark
  • 1 个 statistic_benchmark_metric
  • 1 个 get_requests
  • 1 个内部 generator

再加若干个 send_request task。


但“存在”和“正在发 HTTP”不是同一回事

因为 send_request 还要过:

async with semaphore:

所以:

  • 真正同时发请求的最多是 args.parallel
  • 其余 task 可能只是在等 semaphore

6. 内部 generator 是不是独立协程

问题

“1 个内部 generator(prompt 或 dataset)”这个不是一个协程吧。和 get_requests 是同一个协程吧?

回答

你的理解更接近精确答案。

结论

  • get_requests() 是一个异步生成器对象
  • _generate_from_prompt() / _generate_from_dataset() 也是异步生成器对象
  • 但它们不是独立被 create_task() 调度的并发 task
  • 它们是在 get_requests() 的执行过程中,通过:
async for request in generator:

被串行驱动的


更精确地说

它们:

  • 不是同一个对象
  • 也不是两个并发 task
  • 而是同一条执行链上的上下游生成器

类似同步版本:

def outer():
    for x in inner():
        yield x

异步版本只是:

async def outer():
    async for x in inner():
        yield x

本质没变:

  • 外层驱动内层
  • 内层产出值
  • 外层再转发

区分两个概念

1. 独立 Task

会被事件循环独立调度,比如:

  • statistic_benchmark_metric
  • send_request
2. 异步生成器对象

是被上层 async for 拉取的,不是独立 task,比如:

  • get_requests
  • _generate_from_prompt / _generate_from_dataset

一句话总结

内部 generator 不是独立并发协程任务,它只是 get_requests() 内部驱动的一个异步生成器。


7. 协程之间的交互图

问题

请画出协程之间的交互。这样我才明白。

回答


图 1:哪些是独立任务,哪些不是

Main Task: benchmark

Async Generator: get_requests

Async Generator: inner generator
prompt or dataset

Task: statistic_benchmark_metric

Task: send_request 1

Task: send_request 2

Task: send_request N

说明:

  • benchmark:主任务
  • get_requests:被 benchmark 驱动
  • inner generator:被 get_requests 驱动
  • statistic_benchmark_metric:独立 task
  • send_request:独立 task

图 2:benchmark -> get_requests -> inner generator 拉取关系

inner generator get_requests benchmark inner generator get_requests benchmark 要下一个 request 要下一个 request 返回 request yield request 再要下一个 request await sleep(rate interval) 再要下一个 request 返回 request yield request

这张图说明:

  • benchmarkget_requests
  • get_requests 再拉 inner generator
  • inner generator 产出值
  • get_requests 转手 yield 给 benchmark

图 3:完整协程交互图

statistic_benchmark_metric task queue send_request task inner generator get_requests benchmark statistic_benchmark_metric task queue send_request task inner generator get_requests benchmark loop [每次生成一个 request] loop [消费结果] create_task start consumer 要下一个 request 要下一个 request request yield request create_task send_request put benchmark_data get benchmark_data benchmark_data update metrics and write DB

图 4:暂停 / 恢复图

inner generator get_requests benchmark inner generator get_requests benchmark benchmark 开始 async for request in get_requests() get_requests 在 yield 处暂停 benchmark 拿到 request1,继续执行循环体 get_requests 因 await 暂停 get_requests 再次暂停 恢复,取第1个值 恢复,取第1个值 产出 request1 yield request1 恢复,取第2个值 执行 await sleep(interval) sleep 结束后,恢复 inner generator 产出 request2 yield request2

分类表

名称 类型 是否独立并发 task 谁驱动它
benchmark 协程 事件循环主任务
statistic_benchmark_metric 协程 create_task
send_request 协程 create_task
get_requests 异步生成器 benchmarkasync for
_generate_from_prompt/_dataset 异步生成器 get_requestsasync for

8. rate 控制会不会不准

问题

await asyncio.sleep(interval) 这里等待过程中让出了协程。万一其他协程执行过程中花了较长时间,那这里的真正等待时间不就超出 interval 了吗?那就不符合预期的 rate 了啊。

回答

对,你理解得完全正确。

结论

await asyncio.sleep(interval) 的语义是:

至少等待这么久,然后由事件循环在有空时恢复你

而不是:

绝对精确地在 interval 秒后立刻恢复

所以实际时间满足:

actual_gap >= interval

如果事件循环忙、其他协程长时间不让出、存在阻塞 I/O 或 CPU 重活,那么实际恢复会更晚,导致实际 rate 偏低。


为什么会这样

await asyncio.sleep(interval) 本质上是:

  1. 告诉事件循环:interval 秒后我可以恢复
  2. 到点后也不是马上抢占执行,而是等事件循环调度到我

所以实际等待 = 定时器时间 + 调度延迟。


什么时候会明显超出

  1. 有 CPU 密集型代码阻塞事件循环
  2. 有同步阻塞操作(如 time.sleep
  3. DB / 日志 / 其他协程很重
  4. OS 调度负载高

所以 rate 不精确吗?

是的,它不是硬实时精确 rate,而是“在 asyncio 条件下尽量接近目标平均速率”。

并且由于本身就用的是指数分布:

np.random.exponential(1.0 / args.rate)

本来目标就是统计意义上的泊松过程,而不是每个间隔严格相等。


现有实现的缺点

当前写法:

yield request
await asyncio.sleep(interval)

属于相对时间等待

  • 一次晚了,后面不会自动追赶
  • 会有累计漂移

更精确的做法:按绝对时间调度

比如:

loop = asyncio.get_running_loop()
next_time = loop.time()

async for request in generator:
    if args.rate != -1:
        interval = np.random.exponential(1.0 / args.rate)
        next_time += interval
        now = loop.time()
        sleep_for = next_time - now
        if sleep_for > 0:
            await asyncio.sleep(sleep_for)
    yield request

这样能:

  • 避免累计漂移
  • 更贴近理论目标到达时间线

但即使如此,也只能尽量接近,不可能绝对实时。


最准确的一句话

asyncio.sleep(t) 的语义不是:

恰好 t 秒后继续

而是:

不早于 t 秒后,再由事件循环安排恢复


9. AioHttpClient 源码原理

问题

详细解释一下 AioHttpClienttest_connection 的原理。

回答

这段代码主要做两件事:

  1. AioHttpClient
    • 封装 aiohttp.ClientSession
    • 负责连接池、超时、trace、异常兜底
    • 实际请求处理交给 api_plugin.process_request(...)
  2. test_connection
    • benchmark 前发一个最小请求做连通性检测
    • 失败则重试直到总超时

一、整体设计思想

1. 网络层和协议层分离

AioHttpClient 负责:

  • session
  • connector
  • timeout
  • trace
  • 异常兜底

api_plugin 负责:

  • 请求体格式
  • 流式/非流式响应处理
  • token/TTFT 解析
  • 输出 BenchmarkData

2. 共享会话复用连接

正式 benchmark 阶段只创建一个 AioHttpClient,内部只创建一个 ClientSession,所有请求共享。

好处:

  • 复用 TCP 连接
  • 减少 TLS 握手
  • TTFT 更稳定
  • 更接近真实压测场景

3. 单请求失败不打崩 benchmark

无论超时还是异常,都转换为:

BenchmarkData(success=False, error=str(e))

上层统一统计失败请求。


二、__init__ 做了什么

保存参数:

self.url = args.url
self.headers = {'user-agent': 'modelscope_bench', **(args.headers or {})}
self.total_timeout = args.total_timeout
self.read_timeout = args.read_timeout
self.connect_timeout = args.connect_timeout
self.api_plugin = api_plugin

三、TCPConnector 作用

connector = aiohttp.TCPConnector(
    limit=args.parallel or 0,
    limit_per_host=args.parallel or 0,
    ttl_dns_cache=300,
    use_dns_cache=True,
    keepalive_timeout=60,
    enable_cleanup_closed=True,
    force_close=False,
    ssl=('https://' in self.url),
)

注意:

这里不是立刻建立 TCP 连接,而是创建连接池管理规则

参数含义:

  • limit:整个连接池最大连接数
  • limit_per_host:每个 host 最大连接数
  • ttl_dns_cache=300:DNS 缓存 300 秒
  • use_dns_cache=True:启用 DNS 缓存
  • keepalive_timeout=60:空闲连接保留 60 秒
  • enable_cleanup_closed=True:帮助清理关闭连接
  • force_close=False:请求结束后不强制关闭连接
  • ssl=('https://' in self.url):https 启用 SSL

四、ClientTimeout

client_timeout = aiohttp.ClientTimeout(
    total=self.total_timeout, connect=self.connect_timeout, sock_read=self.read_timeout
)

含义:

  • total:整个请求总超时
  • connect:建连超时
  • sock_read:读取响应超时

五、ClientSession

self.client = aiohttp.ClientSession(
    connector=connector,
    trust_env=True,
    timeout=client_timeout,
    trace_configs=[self._create_trace_config()] if args.debug else []
)

作用:

  • 绑定连接池
  • 绑定默认 timeout
  • 从环境变量读取代理
  • 挂 trace hooks(debug 时)

六、异步上下文管理器

async def __aenter__(self):
    pass

async def __aexit__(self, exc_type, exc, tb):
    await self.client.close()

用途:

async with client:
    ...

注意:

  • __aenter__ 更规范应返回 self
  • 当前因为外部没用 as client,所以没出问题

七、trace 机制

挂了三个回调:

  1. on_request_start
  2. on_request_chunk_sent
  3. on_response_chunk_received

用于 debug:

  • 请求什么时候开始
  • 请求 body 发了什么
  • 响应 chunk 收到了什么

对定位流式接口、TTFT 很有帮助。


八、post() 的原理

output = await self.api_plugin.process_request(self.client, self.url, self.headers, body)
return output

也就是说:

  • AioHttpClient 不自己写统一 session.post(...)
  • 而是把 session、url、headers、body 交给 plugin
  • 由 plugin 决定是:
    • JSON 响应
    • SSE 流式响应
    • embedding
    • rerank
    • 自定义协议

九、异常兜底

except asyncio.TimeoutError as e:
    ...
    return BenchmarkData(success=False, error=str(e))
except (aiohttp.ClientConnectorError, Exception) as e:
    ...
    return BenchmarkData(success=False, error=str(e))

优点:

  • benchmark 不因单请求失败而崩溃

缺点:

  • Exception 太宽,可能吞掉插件真实 bug

十、test_connection() 原理

async def test_connection(args, api_plugin) -> bool:

流程:

  1. 记录开始时间
  2. 内部定义 attempt_connection()
    • 新建 AioHttpClient
    • 构造一个 "hello" 请求
    • client.post(...)
  3. while True 不断重试
  4. 成功就返回 True
  5. 达到总超时就返回 False
  6. 每次失败 sleep 10 秒

注意:

  • 每次 attempt 都新建一个 ClientSession
  • 总重试窗口也使用 args.total_timeout
  • 这和单次请求 timeout 共用一个参数,语义上不够理想

十一、总结

这段代码本质上是:

aiohttp.ClientSession + TCPConnector 构造一个带连接池、超时、trace 的异步 HTTP 客户端外壳,
再把具体协议处理委托给 api_plugin.process_request()
从而形成 benchmark 场景下的通用网络访问层。


10. ApiPluginBase 有什么作用

问题

ApiPluginBase 这个类有什么作用?

回答

这个类作用非常关键。

一句话

ApiPluginBase 是整个 benchmark 框架里的“API 适配层统一抽象接口”。

它的目的是:

把不同模型服务 / API 的差异隔离开,让 benchmark 主流程不用关心具体协议细节。


一、在系统中的定位

主流程里有:

api_plugin_class = ApiRegistry.get_class(args.api)
api_plugin = api_plugin_class(args)

后面调用:

request = api_plugin.build_request(messages)
output = await self.api_plugin.process_request(self.client, self.url, self.headers, body)
metrics.update_metrics(benchmark_data, api_plugin)

说明 benchmark 主流程只认统一接口,不认具体协议。


二、如果没有这个类会怎样

没有插件基类,就要在主流程里到处写:

if api == "openai":
    ...
elif api == "vllm":
    ...
elif api == "embedding":
    ...

会导致:

  • 核心代码耦合严重
  • 每加一个 API 都得改 benchmark 核心
  • 维护成本高

三、抽象方法们的作用

1. build_request(...)
def build_request(self, messages, param=None) -> Dict:

作用:

把 benchmark 内部统一的输入 messages 转成某个 API 所需的 request body

比如:

  • OpenAI chat:{"model": ..., "messages": ...}
  • completion:{"model": ..., "prompt": ...}
  • embedding:{"model": ..., "input": ...}

2. parse_responses(...)
def parse_responses(self, responses, request=None, **kwargs) -> Tuple[int, int]:

作用:

从响应里解析出 (prompt_tokens, completion_tokens)

因为不同 API:

  • 有的直接有 usage
  • 有的流式响应没有 usage
  • 有的要自己基于 tokenizer 估算
  • embedding / rerank 的 token 语义又不同

3. process_request(...)
async def process_request(self, client_session, url, headers, body) -> BenchmarkData:

这是最核心的方法。

作用:

真正完成一次 HTTP 请求,并把结果组织成统一的 BenchmarkData

原因是:

  • 不同 API 的网络交互方式差异太大
  • 有的流式,有的非流式
  • 有的要统计 TTFT
  • 有的 chunk 解析方式不同

所以整个请求处理过程都交给插件。


四、replace_values(...)

@staticmethod
def replace_values(input_json, model, prompt):

作用:

递归替换 JSON 结构里的 %m%p

比如:

{
  "model": "%m",
  "messages": [{"role": "user", "content": "%p"}]
}

替换后变成具体 model 和 prompt。

注意:

  • 对顶层直接是 str 的情况,这个函数当前写法不会把结果返回出去
  • 对 dict/list 容器内嵌字符串的替换是有效的

五、设计价值

这个类核心解决的是:

不同 API 的请求格式、响应格式、流式协议、token 统计方式都不同,但 benchmark 框架希望统一调度、统一统计。

它同时兼具:

  • 策略模式(不同 API 用不同策略)
  • 适配器模式(把外部协议适配成框架统一结构)

六、一句话总结

ApiPluginBase 的作用:

  1. 定义统一接口
  2. 隔离不同 API 差异
  3. 支撑插件化扩展
  4. 保证所有 API 最终都输出统一 BenchmarkData

11. 为什么不为每个请求新建 ClientSession

问题

“不是每个请求都新建一个 ClientSession”这个是干嘛用的?

回答

一句话

ClientSession 不是“一次请求对象”,而是“带连接池的共享 HTTP 会话”。

不为每个请求都新建 ClientSession,是为了:

  1. 复用 TCP / TLS 连接
  2. 共享连接池
  3. 减少系统开销和延迟
  4. 让 benchmark 更接近真实服务调用方式

一、ClientSession 是什么

不是“一次请求”,而是:

  • 默认 header
  • timeout
  • connector
  • cookie / auth / proxy 配置
  • 连接池

这些共享状态的集合。


二、如果每个请求都新建 session,会怎样

比如:

session = aiohttp.ClientSession()
await session.post(...)
await session.close()

问题:

  1. 每次都重建 TCP / TLS 连接,开销很大
  2. 无法利用连接池
  3. TTFT 和 latency 会包含建连成本,污染 benchmark
  4. socket / 文件描述符 / TIME_WAIT 增多
  5. 高并发下资源消耗更大

三、Session 与连接池的关系

connector = aiohttp.TCPConnector(...)
session = aiohttp.ClientSession(connector=connector)

可以理解为:

  • TCPConnector:底层连接池管理器
  • ClientSession:上层 HTTP 会话接口,使用 connector 的连接池

四、复用 session 的实际效果

第一次请求:

  • 建 TCP
  • 建 TLS(如果 https)
  • 发请求
  • 收响应
  • 连接返回池中

后续请求:

  • 直接复用已有 keep-alive 连接
  • 减少握手和延迟

五、为什么对 TTFT 很重要

如果每次都新建连接,TTFT 会混入:

  • TCP 建连时间
  • TLS 握手时间

这不利于测模型服务端本身的首 token 性能。

共享 session 更接近真实线上情况。


六、一句话总结

不为每个请求都新建 ClientSession,是为了:

复用连接、减少握手和建连开销、降低延迟、减少资源消耗,并让 benchmark 更真实。


12. TCPConnector、args.parallel、semaphore 的整体关系

问题

这里为啥要创建连接呢?这里的 args.parallel 又是干嘛的?从整体上描述整个过程。与 semaphore 的关系,为啥要创建 TCP 连接呢?

回答


一、最关键结论

connector = aiohttp.TCPConnector(...)
self.client = aiohttp.ClientSession(connector=connector, ...)

这里不是立刻创建 TCP 连接。

而是在:

  1. 创建一个连接池管理器
  2. 配置未来发请求时:
    • 最多多少连接
    • 连接如何复用
    • DNS 如何缓存
    • keepalive 多久
    • SSL 如何处理

真正 TCP 建连一般发生在第一次:

await session.post(...)

时。


二、一次 benchmark 的整体层次

第 1 层:请求生成层

get_requests()

第 2 层:任务调度层

benchmark() 创建 send_request task

第 3 层:网络执行层

send_requestAioHttpClient.post()ClientSessionTCPConnector

第 4 层:结果消费层

statistic_benchmark_metric()


三、HTTP 为何需要 TCP 连接

HTTP 请求底层必须通过网络连接。

HTTP
  1. DNS
  2. TCP
  3. HTTP request/response
HTTPS
  1. DNS
  2. TCP
  3. TLS
  4. HTTP request/response

所以 HTTP 请求一定依赖 TCP 连接。


四、TCPConnector 的作用

TCPConnector 是 aiohttp 的底层连接池管理器,用来管理:

  • 最大连接数
  • 每 host 连接数
  • DNS 缓存
  • keepalive
  • SSL

本质上:

它不是“现在就创建连接”,而是“制定连接池规则”。


五、args.parallel 是什么

这是 benchmark 的核心并发参数。

1. 在 Semaphore(args.parallel)

表示:

同时最多多少个 send_request 能进入请求执行区

2. 在 TCPConnector(limit=args.parallel)

表示:

底层连接池最多允许多少连接

所以:

  • Semaphore 管应用层请求并发
  • Connector 管网络层连接资源

六、SemaphoreTCPConnector 的关系

一句话:

  • Semaphore:控制“谁有资格开始发请求”
  • TCPConnector:控制“底层连接资源怎么分配和复用”

它们不是重复,而是两道不同层次的限制。


七、完整例子

假设:

parallel = 4

于是:

semaphore = Semaphore(4)
connector = TCPConnector(limit=4, limit_per_host=4)

如果创建了 20 个 send_request task

第一步

先过 semaphore:

  • 只有 4 个 task 能进入
  • 其余等待 semaphore
第二步

进入 client.post() 后,再由 session/connector 分配连接:

  • 尝试复用已有连接
  • 没有则创建新连接
  • 但总数不超过 4

八、为什么有了 semaphore 还要 connector limit

因为:

  1. 两者控制层次不同
  2. 连接池复用 / DNS cache / keepalive 只能由 connector 管
  3. 明确底层资源边界更稳健
  4. 双重限制更安全

九、最准确的一句话

TCPConnector(...) 的作用不是“现在就创建 TCP 连接”,而是:

创建一个底层连接池管理器,规定未来 benchmark 发 HTTP 请求时,最多能用多少连接、连接如何复用、DNS 如何缓存、HTTPS 如何处理。

args.parallel 的含义是:

希望网络层最大连接能力和 benchmark 的目标并发度保持一致。


13. benchmark.py 中 in_flight 的作用

问题

benchmark.py 184-200in_flight 干嘛用的?

回答

一句话

in_flight 用来限制“已经创建但还没结束”的 send_request task 数量,避免任务堆积导致内存暴涨 / OOM。


一、为什么有了 semaphore 还不够

很多人会想:

已经有 Semaphore(args.parallel) 了,为什么还需要 in_flight

因为:

  • semaphore 只限制真正同时执行 client.post() 的任务数
  • 不限制已创建但正在等待 semaphore 的 task 数量

如果你直接:

async for request in get_requests(...):
    asyncio.create_task(send_request(...))

那么:

  • 只有 parallel 个 task 真正在发请求
  • 但其余成千上万 task 已经创建出来了,都会占内存

二、in_flight 是什么

in_flight: set[asyncio.Task] = set()

保存的是:

当前已经 create_task、但尚未结束的 send_request task


三、max_in_flight

max_in_flight = args.parallel * args.in_flight_task_multiplier

表示:

允许同时存在的未完成 task 数量上限

例如:

  • parallel=10
  • multiplier=3

则:

max_in_flight = 30

表示:

  • 最多 30 个 task 已创建未完成
  • 其中真正活跃执行 HTTP 的仍最多 10 个
  • 其余最多 20 个在等 semaphore / 收尾

四、代码怎么工作

1. 初始化
semaphore = asyncio.Semaphore(args.parallel)
in_flight = set()
max_in_flight = args.parallel * args.in_flight_task_multiplier
2. 每来一个 request 先检查 in_flight 是否超限
if len(in_flight) >= max_in_flight:
    done, pending = await asyncio.wait(in_flight, return_when=asyncio.FIRST_COMPLETED)
    in_flight = pending

含义:

  • 如果 task 太多了,就先等至少一个完成
  • done:已完成
  • pending:未完成
  • in_flight 更新成 pending
3. 再创建新任务
task = asyncio.create_task(send_request(...))
in_flight.add(task)
4. 收尾
await asyncio.gather(*in_flight, return_exceptions=True)

等待剩余任务都完成。


五、和 semaphore 的区别

semaphore

限制:

同时正在干活的工人数

in_flight

限制:

工地里总共挤了多少工人(含排队的)


六、本质

in_flight 的核心作用:

  1. 防止一下子创建过多 task
  2. 防止内存暴涨
  3. 给任务创建过程增加背压
  4. 降低事件循环调度压力

14. Task 对象是什么,为什么占内存

问题

task 对象是什么东西?为什么会占用内存?

回答

一句话

asyncio.Task 是一个真实的 Python 对象,它保存协程执行状态、局部变量引用、等待关系、结果/异常、回调等信息,所以会占内存。


一、Task 是什么

当你写:

task = asyncio.create_task(send_request(...))

这里发生的是:

  • send_request(...) 产生一个协程对象
  • create_task(...) 把这个协程交给事件循环调度
  • 包装成一个 Task 对象

所以:

  • 协程对象:待运行的异步过程
  • Task 对象:被事件循环管理的执行单元

二、Task 内部大概保存什么

1. 它包装的协程对象
2. 当前执行到哪一步

比如停在:

  • await semaphore.acquire()
  • await client.post(...)
  • await queue.put(...)
3. 局部变量和参数引用

例如:

  • request
  • args
  • client
  • queue
  • semaphore
4. Future / callback / 等待链
5. 结果 / 异常 / 状态

比如:

  • pending
  • running
  • done
  • cancelled

三、为什么“只是等 semaphore”的 task 也占内存

即使 task 只是卡在:

async with semaphore:

它也还是活着的 task,仍然持有:

  • 协程对象
  • 当前暂停现场
  • requestargsclient 等对象的引用
  • 等待状态

所以不是零成本。


四、为什么大量 task 会吃内存

假设创建了 10 万个 task:

  • 10 万个 Task 对象
  • 10 万个协程对象
  • 10 万份局部变量引用
  • 10 万个 request 引用
  • 事件循环还要管理它们

就会导致:

  • 内存膨胀
  • GC 压力大
  • 调度压力大

五、Task 的内存不只是它自己

更大的问题往往是:

Task 会拖住一串它引用的对象一起活着。

比如每个 task 都引用一个很大的 request,那么即使 Task 对象本体不大,总体内存也会很可观。


六、一句话总结

Task 占内存的原因:

  1. Task 是真实对象
  2. 它要保存异步执行现场
  3. 它持有大量上下文引用
  4. 大量 task 会拖住大量 request / response / 参数对象不被回收

15. benchmark_data_queue 和 data_process_completed_event 是干嘛的

问题

benchmark_data_queue: asyncio.Queue = asyncio.Queue(maxsize=max(1, args.parallel * args.queue_size_multiplier))
data_process_completed_event.clear()

这两个干嘛的?什么原理?

回答

一句话

  • benchmark_data_queue:发送侧和统计侧之间的异步结果通道
  • data_process_completed_event.clear():把“发送已经彻底结束”的标志重置为未完成状态

它们一起构成了 benchmark 中的生产者-消费者协调机制


一、为什么需要它们

发送侧

send_request(...)

benchmark_data = await client.post(request)
await benchmark_data_queue.put(benchmark_data)

它是生产者。

统计侧

statistic_benchmark_metric(...)

benchmark_data = await benchmark_data_queue.get()
metrics.update_metrics(...)
insert_benchmark_data(...)
benchmark_data_queue.task_done()

它是消费者。

所以需要一个中间队列来传递结果。

同时消费者还要知道:

“是暂时没数据,还是以后再也不会有新数据了?”

这就需要 Event


二、benchmark_data_queue 的作用

是一个有限容量的 asyncio.Queue

作用:

  1. 解耦请求发送与统计/持久化
  2. 缓冲短时波动
  3. 提供背压
  4. 支持 join/task_done 收尾同步

三、为什么设置 maxsize

maxsize=max(1, args.parallel * args.queue_size_multiplier)

因为如果队列无限长,当消费慢于生产时,内存会不断增长。

有了 maxsize 后:

  • 队列满时
  • await queue.put(...) 会挂起生产者
  • 形成背压

外层的 max(1, ...) 是防止 maxsize=0,因为 Queue(maxsize=0) 在 asyncio 中表示无限容量。


四、data_process_completed_event.clear() 的作用

data_process_completed_event 是模块级全局变量:

data_process_completed_event = asyncio.Event()

clear() 表示:

本轮 benchmark 开始前,把“发送全部结束”标志重置为 False。

因为上一次 benchmark 结束时可能已经 set() 过。


五、为什么消费者不能只看 queue.empty()

因为:

  • 队列暂时为空 ≠ 以后再也不会有数据

比如:

  • 队列现在空了
  • 但还有很多 send_request 正在路上
  • 过会儿还会 put() 新结果

所以消费者退出条件必须是:

data_process_completed_event.is_set() and benchmark_data_queue.empty()

即:

  • 不会再有新数据
  • 当前队列也空了

六、它们配合方式

Queue

负责传递数据、做缓冲、背压

Event

负责告诉消费者:

发送侧已经彻底结束,不会再产生新数据了


七、一句话总结

  • benchmark_data_queue:有限容量的异步结果队列
  • data_process_completed_event.clear():每轮 benchmark 启动时重置“生产完成”标志

16. 为什么 queue.put 要 await

问题

await benchmark_data_queue.put(benchmark_data)

put 为啥要加 await

回答

一句话

asyncio.Queue.put() 可能因为队列满而需要等待,所以它是异步操作,必须 await


一、为什么 put() 不是普通函数

因为 asyncio.Queue 是有容量上限的:

Queue(maxsize=...)

所以 put(item) 有两种情况:

情况 1:队列没满

立刻放进去,几乎马上返回

情况 2:队列满了

必须等消费者取走一个元素腾出空间,当前协程要挂起

这就是为什么 put() 是异步的。


二、如果不 await

benchmark_data_queue.put(benchmark_data)

你只是得到了一个 coroutine / awaitable,并没有真正执行 put。

所以要么:

  • 根本没放进去
  • 要么出现 “coroutine was never awaited” 警告

三、这里 await 的真实语义

不是“放元素这个动作慢”,而是:

这个动作可能因为队列已满而需要等待。


四、为什么这是好事

如果消费者处理太慢,而生产者持续 put()

  • 没有限制就会导致队列无限膨胀
  • 内存不断增长

加上:

await queue.put(item)

就能实现背压:

  • 队列满时,发送侧暂停
  • 等消费者处理一部分再继续

五、对称关系

  • await queue.get():队列空时等待新数据
  • await queue.put(item):队列满时等待空位

两者都可能挂起协程,所以都要 await


17. task_done 和 join 的原理

问题

task_donejoin 没有看懂。

回答

一句话

put/get 管“队列里有没有元素”
task_done/join 管“这些元素是否已经被真正处理完”

这是关键。


一、get() 不等于“处理完”

消费者里:

benchmark_data = await benchmark_data_queue.get()

metrics.update_metrics(...)
insert_benchmark_data(...)
benchmark_data_queue.task_done()

含义:

  1. get() 只是从队列里拿到一项
  2. 你还没处理完
  3. 只有做完 metrics / DB / 日志后,才调用 task_done()

所以:

  • get() = 领取任务
  • task_done() = 完成任务

二、join() 是干嘛的

await benchmark_data_queue.join()

表示:

等待所有已经放进队列的项目,都被消费者处理完并调用了 task_done()


三、内部原理:unfinished_tasks 计数器

Queue 内部可以想象维护了一个计数器:

unfinished_tasks

规则:

  • 每次 put(item)unfinished_tasks += 1
  • 每次 task_done()unfinished_tasks -= 1
  • unfinished_tasks == 0 时,join() 返回

四、为什么 join()empty() 更强

queue.empty()

只表示当前容器里有没有元素

queue.join()

表示所有放进去的元素都已经真正处理完了

比如:

  • item 被 get() 拿走后,队列可能已经空了
  • 但消费者还在处理这个 item
  • 所以 empty() 是 True,join() 仍不会返回

五、如果忘了 task_done()

unfinished_tasks 永远减不回去,await queue.join() 会永远卡住。


六、如果 task_done() 调多了

会报错,因为没有那么多未完成任务可减了。


七、在 benchmark 里的意义

await benchmark_data_queue.join()

表示:

等所有已经进入结果队列的 benchmark_data 都被真正处理完(统计、写库、日志)后,主协程才继续往下收尾。


八、一句话记忆法

  • put():登记一个待处理任务
  • get():领取一个任务
  • task_done():声明任务处理完成
  • join():等待所有登记的任务都完成

18. wait_for(queue.get(), timeout=0.1) 的用法

问题

await asyncio.wait_for(benchmark_data_queue.get(), timeout=0.1)

这是什么用法呢?

回答

一句话

等待从队列取数据,但最多只等 0.1 秒。
如果 0.1 秒内没拿到,就超时醒来,去检查别的退出条件。


一、拆开看

1. benchmark_data_queue.get()

含义:

从异步队列里取一个元素;如果队列空,就一直等。

2. asyncio.wait_for(..., timeout=0.1)

含义:

给这个异步操作套一个超时:最多等 0.1 秒。

  • 如果 0.1 秒内完成,返回结果
  • 否则抛 asyncio.TimeoutError

二、为什么不直接 await queue.get()

如果写:

benchmark_data = await benchmark_data_queue.get()

当:

  • 队列空
  • 生产也结束
  • 再也不会有新数据

消费者就会永远卡在这里,没机会回到循环顶部检查退出条件。


三、wait_for(..., timeout=0.1) 的作用

让消费者“定期醒来”:

  1. 尝试等队列数据
  2. 只等 0.1 秒
  3. 超时就醒来
  4. 回到 while 顶部检查:
    • 生产是否结束
    • 队列是否为空

这是一种“超时轮询式退出机制”。


四、为什么 timeout 设 0.1 秒

是折中:

  • 太大:退出不及时
  • 太小:轮询太频繁,开销变大

0.1 秒是比较常见的折中值。


五、超时后为什么 continue

except asyncio.TimeoutError:
    continue

这里的 TimeoutError 不是错误,而是一种正常控制流:

暂时没数据,那就继续下一轮,检查是否该退出。


六、一句话总结

wait_for(queue.get(), timeout=0.1) 的本质是:

防止 queue.get() 无限阻塞,使消费者可以周期性地检查退出条件。


19. 这个过程里到底哪些东西占内存

问题

请问这个过程,占内存的是哪些东西呢。是 Queue 里的东西吗,task 和 queue 本身会占用嘛?

回答

一句话

会,Queue 里的元素、Task、本身的 Queue 对象都会占内存
但实际 benchmark 里,最大的通常不是 Queue 壳子本身,而是它里面装的对象,以及未完成 Task 持有的引用链。


一、主要内存来源

  1. asyncio.Task 对象本身
  2. Task 持有的协程执行现场
  3. Task 引用的参数和上下文
    • request
    • args
    • client
    • queue
    • semaphore
  4. benchmark_data_queue 里尚未被消费的 benchmark_data
  5. BenchmarkData 本身可能引用的响应内容、统计字段、错误信息
  6. dataset_messages 预加载的数据
  7. HTTP 客户端连接池 / buffer / response chunk
  8. metrics / SQLite / 日志临时对象

二、直接回答两个问题

1. “是 Queue 里的东西吗?”

是。
Queue 里积压的 benchmark_data 会占内存,而且常常是重要来源。

2. “task 和 queue 本身会占用嘛?”

也会。

  • Queue 壳子占内存,但通常不是最大头
  • Task 本身也占,而且会拖住很多引用对象一起活着

三、按三层理解最清晰

第一层:容器壳子本身
  • Task
  • Queue
  • set(in_flight)
  • Session / Connector
  • Metrics
第二层:容器里装的元素
  • in_flight 里的 task
  • queue 里的 benchmark_data
  • dataset_messages 里的 message
第三层:元素继续引用的对象
  • request
  • response
  • messages
  • raw chunk
  • error string
  • 其他上下文对象

真正大的往往是第三层。


四、Task 为什么吃内存

每个未完成 task 至少保存:

  • Task 对象
  • 协程对象
  • 当前暂停状态
  • 局部变量引用
  • request
  • client
  • queue
  • args

所以如果有很多 task,它们会拖住很多 request 和上下文对象一起活着。


五、Queue 为什么会占内存

Queue 本身会存:

  • 容器结构
  • 等待 put/get 的协程队列
  • unfinished_tasks 计数器
  • join 的等待信息

更关键的是:

Queue 里的每个 benchmark_data 元素都必须一直活着,直到被消费。

如果每个 BenchmarkData 很重(含完整 response/chunk),积压就很危险。


六、这段 benchmark 最容易吃内存的几个点

  1. 大量 in_flight task
  2. queue 中积压的 benchmark_data
  3. 预加载的 dataset_messages
  4. 每个 task 持有的 request
  5. BenchmarkData 中持有的 response / chunk / 文本

七、为什么要同时限制 in_flightqueue maxsize

它们分别卡住两类堆积:

in_flight

限制:

活着的 task 数量,以及 task 持有的 request / 上下文对象数量

queue maxsize

限制:

尚未被消费的 BenchmarkData 数量


八、一句话总结

回答你的问题:

  • Queue 里的东西会占内存
  • Task 本身也会占,而且会拖住一串对象
  • Queue 本身也占,但通常不是主要矛盾
  • 实际大头一般是:
    • 未完成 task
    • task 持有的 request / response / 上下文
    • queue 中积压的 BenchmarkData
    • 预加载的数据集内容

总结

本次对话核心收获包括:

  1. benchmark.py 是一个异步生产者-消费者模型
  2. get_requests / 内部 generator 是异步生成器,不是独立 task
  3. send_request / statistic_benchmark_metric 是真正独立调度的 task
  4. Semaphore 控制活跃并发,in_flight 控制已创建未完成 task 数
  5. Queue(maxsize) 负责结果缓冲和背压
  6. Event 用来告诉消费者“是否不会再有新数据”
  7. task_done/join 是“处理完成同步机制”
  8. wait_for(queue.get(), timeout=0.1) 是为了让消费者定期醒来检查退出条件
  9. ClientSession + TCPConnector 是共享连接池,不是每次请求新建会话
  10. ApiPluginBase 是整个 benchmark 的 API 适配层接口
  11. 内存主要消耗在:
    • 未完成 task
    • queue 中的结果对象
    • 任务与结果对象持有的 request/response/messages 等引用链

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐