Postman:支持离线使用,未登录状态下,以及内网环境下,都可以正常进行大部分操作
Apipost:( https://www.apipost.cn/download.html ) 经典版(Apipost7) 支持 离线/未登录 状态下使用
apifox不支持离线,而且不登录没法用

1、Postman

Postman V9.16 绿色版汉化:https://www.cr173.com/soft/1497202.html

postman 简介

Postman 是一款功能强大的网页调试与发送网页HTTP请求的工具。有 Chrome 插件版本,也有Postman 本地应用程序版本,插件版本早已停止更新。详细了解为什么支持 Postman Chrome 应用程序已被弃用?:http://chromecj.com/web-development/2018-04/1376.html

postman 的特点

  • postman 只做 http协议 的接口的测试,是一种最广泛 REST 接口测试客户端软件。
  • postman 支持 http 协议的所有请求方式,包括 get、post、head、put、delete 等。
  • postman 支持各种额外的头部字段的添加。
  • postman 除了可以模拟普通表单数据外,还支持文件、图片、视频等数据请求。
  • postman 是一个接口的开发和测试的全流程支持软件。
  • 支持前端开发:mock(模拟) 测试
  • 支持后端开发:接口测试、开发接口文档
  • 支持测试:接口测试
  • 支持运维:监控功能
  • postman 支持云服务:随时随地都能无缝对接加班。
  • 数据同步功能,家里、办公室的电脑登录同一账号,数据即可同步。
  • 团队协作,你设计的请求可以团队内的推送,交给其他人执行或继续开发。

新版 postman 阉割了collection 数据导入导出等功能的解决方法:https://www.cnblogs.com/gnz48/p/18651831

新版本不仅阉割了导入本地备份数据包的功能,还无法新建 collection,只有历史记录列表了,这使得许多用户无法正常使用 Postman。可以下载并使用 Postman 的老版本。Postman 的老版本(9.31.28)仍然可以正常使用,并且具有以下优点:

  • 可以导入本地备份数据包。
  • 可以新建 collection。
  • 环境变量与工作空间绑定,方便管理。
  • 提供丰富的变量提示,方便使用。
  • 界面简洁,操作方便。
  • 速度快,响应迅速。
  • 支持多种语言的实现方式。

下载(9.31.28) 安装

新版本可能部分电脑会出现卡顿,可以试试 7 系最终版:

备用下载:链接 提取码:w3h5

安装步骤

  1. 下载对应操作系统的安装包。
  2. 双击安装包,按照提示进行安装。
  3. 安装完成后,即可使用 Postman。

Postman 实用教程

2、Apifox 快速入门

帮助文档:https://apifox.com/help/

入门

接口文档

接口调试

环境 & 变量

前后置操作 & 脚本

自动化测试

接口 Mock ( 模拟 )

自动生成代码

https://apifox.com/help/code-generation

导入 / 导出接口

团队 & 项目

账号 & 软件设置

应用与插件

最佳实践

WebSocket 接口

WebService 接口

Socket 接口

gRPC 接口

Dubbo 接口

参考资料

常见问题

1. Apifox 是否收费?

Apifox 公网版 (SaaS 版) 免费,私有化部署版收费。

2. 登录(Auth)态如何实现?

请参考文档:登录态(Auth)如何处理

3. 接口发送请求前需要调用登录接口获取 token 放在 header,如何实现?

请参考文档:登录态(Auth)如何处理

4. B 接口请求参数依赖于 A 接口返回的数据,如何实现?

请参考文档:接口之间如何传递数据

5. 同项目下有不同域名的接口,如何处理?

方案一:在环境里新增多个服务,分别设置不同的前置 URL ,接口分组和接口维度可以指定对应的前置 URL。推荐本方案!

方案二:把域名设置成环境变量如DOMAIN_1,接口路径这样填写:https://{{DOMAIN_1}}/users。接口路径是以http://https://起始的,系统会自动忽略里环境里前置 URL。

方案三:给不同域名接口设置不同环境,通过切换环境来运行不同域名下的接口。不推荐本方案!

6. 脚本如何读取或修改接口请求信息?

请参考文档: 脚本读取/修改接口请求信息

7. 是否支持查询数据库字段作为参数传给接口?

支持,请参考文档:数据库操作

8. 数据是存储在本地还是云端?可否离线使用?可否私有化部署?

目前 Apifox 有 Saas 版 和私有化部署版 。

Saas 版 是免费的,数据都是存在云端的,需要联网才能使用。

私有化部署版 是收费的,数据存在使用者企业内部,不连外网也可以使用。

注意

环境变量/全局变量里的 本地值 仅存放在本地,不会同步到云端,团队成员之间也不会相互同步,适合存放token账号密码之类的敏感数据。

9. 使用 Postman 调用接口返回正常,而 Apifox 返回错误

解决方法:对比 postman 和 apifox 实际发出的请求内容(url、参数、body、header)是否完全一样。

查看实际请求内容方法:

  1. Apifox:返回内容下的实际请求 tab 里查看
  2. Postman:点击底部状态栏里的Console查看

10. 为什么修改了环境变量(或全局变量)值,而引用的地方没有生效?

  1. 请检查环境变量全局变量临时变量里是不是有多个地方定义了相同名称的变量,如果有,系统会根据优先级来取值。优先级顺序如下:临时变量>环境变量>全局变量
  2. 请检查修改的是否是本地值,环境变量(或全局变量)仅读取本地值,而不会读取远程值

11. Web 端与客户端有何区别?

Web 端与客户端在主要流程的使用上没有明显差异,都能够满足团队内的接口协作需求,但以下功能存在差异。

以下截图均为 Web 端截图。

导出接口

Web 端:❌ 客户端:✅

Agent 服务

Web 端:✅ 客户端:❌

本地 Mock 功能

Web 端:❌ 客户端:✅

生成业务代码

Web 端:❌ 客户端:✅

外部程序

Web 端:❌ 客户端:✅

调整字体大小

Web 端:❌ 客户端:✅

网络代理

Web 端:❌ 客户端:✅

12. Web 端与客户端数据不同步如何处理?

若发现客户端中某个项目的接口数据与 Web 端不一致,那么有可能是因为两端数据未同步。你可以尝试以下两种方法解决:

  1. 退出 Apifox 客户端后重新运行。
  2. 进入 Apifox 客户端中的项目后,点击右上角的“刷新”按钮。

3、Apipost

官网:https://www.apipost.cn/
官网文档:https://v7-wiki.apipost.cn/docs/3/

4、api 压测 工具

web 压测 工具

  • JMeter:一个广泛使用的开源压力测试工具,可用于测试Web应用程序的性能,包括APIs。
  • PerformanceRunner:泽众PerformanceRunner(简称PR)是国内专业的支持http、https、websocket、tcp/ip、MQ等各种协议、10万+海量并发、可靠的性能测试工具/压力测试工具,降低了应用系统运行风险。
  • Gatling:这是另一个开源压力测试工具,使用Scala编写,可用于测试Web应用程序和APIs的性能。
  • LoadRunner:这是一种商业压力测试工具,可测试多种协议,包括Web、API等
  • Postman:这是一个流行的API开发工具,它还具有测试和监视API性能的功能。
  • Apache Bench:这是一个简单但功能强大的工具,可用于测试Web应用程序和APIs的性能。
  • Siege:这是另一个免费的压力测试工具,可用于测试Web应用程序和APIs的性能。

Jmeter 基于多线程,Locust 基于协程。 Locust 默认的 HttpSession 客户端性能有点低,做压测还是建议使用 FastHttpLocust 客户端,但是 Locust 官网也提到了,FastHttpLocust 并不能完全替代 HttpSession,这个还得取决于测试场景

发压能力:相同并发下,Locust(使用FastHttpLocust)> Jmeter

并发能力:Locust和Jmeter旗鼓相当,都能满足工作需求,Jmeter 消耗的内存更高

如果只是做简单的接口测试、压力测试,没有需要写代码来扩展的特殊需求,首选 Jmeter;
如果某些测试场景需要写代码来扩展,你会 Java 的话,可以选择Jmeter;
如果某些测试场景需要写代码来扩展,你会 Python 的话,可以选择 Locust;
如果想在单台机器发起更大的压力的话,并且 Python 代码能力不错的话,可以选择 Locust,记得一定要使用 FastHttpLocust 客户端

蝗虫 (LOCUST)

官网:https://www.locust.io/
官网文档:https://docs.locust.io/en/latest/
github:https://github.com/locustio/locust

蝗虫测试本质上只是一个 Python 程序,向要测试的系统发出请求。来进行百万长连接性能测试。Locust 基于 gevent 使用协程机制,避免了系统资源调度,由此可以大幅度提高单机的并发性能。

Locust 是使用 python 开发的,自带一个Web UI,用于定义用户模型,发起测试,实时测试数据,错误统计等。

使用类 linux 平台时请一定要修改最大文件打开数量。 可以使用 ulimit -n 查看当前支持的文件句柄,并用 ulimit -n xxxx 来进行修改。ulimit -n 65535

API

安装 Locust

安装:pip install locust

locust --help 

用法: locust [options] [UserClass ...]

命令选项:

  -h, --help                              帮助
  -f <filename>, --locustfile <filename>  py脚本文件
  --config <filename>                     配置文件
  -H <base url>, --host <base url>        要测试的URL地址
  -u <int>, --users <int>                 Locust并发用户的数量
  -r <float>, --spawn-rate <float>        生成用户的速率(每秒)
  -t <time string>, --run-time <time string>  运行多长时间(300s, 20m, 3h, 1h30m), 默认一直运行
  -l, --list            列出可用的 User classes 并退出。示例: locust -f my_py.py -l

Web UI 选项:

  --web-host <ip>       运行web绑定的网卡地址. 默认所有。
  --web-port <port number>, -P <port number>  运行web的绑定端口                        
  --headless            无头模式,关闭web界面,立即启动测试。使用-u和-t来控制用户数量和运行时间
  --autostart           立刻开始测试 (跟 --headless 很像, 但是不会禁用 web UI)
  --autoquit <seconds>  在运行结束后X秒完全退出蝗虫。只能与 --autostart 一起使用。
                        默认情况下,Locust将一直运行,直到您使用CTRL+C关闭它
  --web-auth <username:password>  基本验证。格式:username:password                        
  --tls-cert <filename>    用于通过HTTPS提供服务的TLS证书的可选路径                        
  --tls-key <filename>     用于通过HTTPS提供服务的TLS私钥的可选路径
  --class-picker           启用web界面中的选择框,选择 "用户类"
  --modern-ui              使用新的基于react的web UI前端

Master options: 主节点选项

worker节点连接到master节点后,master才能进行负载测试
  --master                           启动 master 节点,等待 worker 进行连接
  --master-bind-host <ip>            master节点绑定的网卡ip,默认绑定所有网卡。
  --master-bind-port <port number>   master监听的端口,默认 5557
  --expect-workers <int>         延迟测试,至到指定数量的worker连接成功后才进行测试
  --expect-workers-max-wait <int>   等待时间。

Worker options: 从节点 选项

  --worker                     设置为 worker 节点
  --processes <int>            worker的进程数
  --master-host <hostname>     master节点的ip. 默认 127.0.0.1.
  --master-port <port number>  master节点的端口. 默认 5557.

Tag 选项:

  可以使用@tag装饰器标记蝗虫任务。这些选项允许指定在测试期间包含或排除哪些任务。
  -T [<tag> ...], --tags [<tag> ...]            列出包含的测试
  -E [<tag> ...], --exclude-tags [<tag> ...]    列出排除的测试

Request statistics options:

  --csv <filename>      以CSV格式存储请求统计信息到文件。
  --csv-full-history    
  --print-stats         启用在UI运行中定期打印请求状态
  --only-summary        禁用在 --headless 运行期间定期打印请求统计信息
  --reset-stats         一旦 spawning 完成就重置状态
  --html <filename>     将HTML报告存储到指定的文件路径
  --json                将最终统计数据以JSON格式打印到stdout。

Logging options:

  --skip-log-setup      禁用蝗虫的日志设置。使用由Locust测试或者Python提供的。
  --loglevel <level>, -L <level>  DEBUG/INFO/WARNING/ERROR/CRITICAL. Default is INFO
  --logfile <filename>   log文件路径

其他 options:

  --show-task-ratio         打印用户类的任务执行比率表。
  --show-task-ratio-json    打印User类任务执行率的json数据。
  --version, -V         
  --exit-code-on-error <int>  设置要在测试结果包含任何失败或错误时使用的进程退出代码。默认为1
  -s <number>, --stop-timeout <number>
  --equal-weights       使用均匀分布的任务权重,覆盖locustfile中指定的权重。
  --enable-rebalancing  允许在测试运行期间添加或删除新工作者时自动重新平衡用户。

User classes:

  <UserClass1 UserClass2>  在命令行末尾,您可以列出要使用的用户类(可用的用户类)
                           可以用——list)列出。LOCUST_USER_CLASSES环境变量也可用于
                           指定用户类。默认是使用所有可用的User类

示例:

    locust -f my_test.py -H https://www.example.com
    locust --headless -u 100 -t 20m --processes 4 MyHttpUser AnotherUser

编写 web 服务

首先使用 fastapi 启动一个 web 服务:

import fastapi
import uvicorn
from pathlib import Path
from fastapi import Request

app_main = fastapi.FastAPI()


@app_main.get("/hello")
@app_main.post("/hello")
async def func(request: Request):
    ret_val = {"response": "测试 hello 请求"}
    return ret_val


@app_main.get("/world")
@app_main.post("/world")
async def func(request: Request):
    ret_val = {"response": "测试 world 请求"}
    return ret_val


def http_server():
    uvicorn.run(f'{Path(__file__).stem}:app_main', host="0.0.0.0", port=9000)
    pass


if __name__ == '__main__':
    http_server()
    pass

使用 locust 测试 并发数

my_test.py

from locust import HttpUser, task


class HelloWorldUser(HttpUser):
    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")

执行命令:locust -f my_test.py --modern-ui  

访问 http://127.0.0.1:8089 进行测试前的配置。

  • 第1个"Number of total users to simulate" 填写的是 总共将运行的用户数;默认1就可以。
  • 第2个 "Hatch rate"每秒加载的用户数;默认1就可以。
  • 第3个 "Host",被测接口的域名或ip端口地址(带http://)

配置完成后,点击 start swarm 开始进行测试,locust 就会不停的向测试服务发送请求,就可以测出每条中并发数了

因为高级参数里面没有配置测试时间, 所以会一直向 /hello 发出 /world HTTP 请求,手动点击 stop 停止发送请求。

Python 直接运行

可以直接执行 Python 代码启动负载测试,而不是使用命令 locust 。

首先 创建一个 Environment 实例:

from locust.env import Environment

env = Environment(user_classes=[MyTestUser])

Environment 的实例方法 create_local_runner、create_master_runner 可以用来启动一个 Runner实例,Runner实例可以用来启动一个负载测试:

from locust import HttpUser, task


class HelloWorldUser(HttpUser):
    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")


if __name__ == "__main__":
    from locust.env import Environment
    env = Environment(user_classes=[HelloWorldUser], host="http://127.0.0.1:9000")
    env.create_local_runner()
    env.runner.start(5000, spawn_rate=20)
    env.runner.greenlet.join()

也可以绕过调度和分发逻辑,手动控制生成的用户:

new_users = env.runner.spawn_users({MyUserClass.__name__: 2})
new_users[1].my_custom_token = "custom-token-2"
new_users[0].my_custom_token = "custom-token-1"

上面的示例仅适用于独立/本地运行程序模式,并且是一个实验性功能。更常见/更好的方法是使用init 或 test_start 事件钩子来获取/创建令牌列表,并使用on_start和on_stop方法从该列表中读取并将它们设置在您的单个User实例上。

虽然可以通过这种方式 ( 使用 create_worker_runner ) 创建 locust 工作线程,但这几乎没有意义。每个工作线程都需要在单独的Python进程中,直接与工作线程运行程序交互可能会破坏一些东西。只需使用常规 locust --worker ...命令启动工作程序即可。

还可以使用 Environment 实例 create_web_ui 的方法启动一个 Web UI,该 UI 可用于查看统计信息并控制运行器(例如启动和停止负载测试):

from locust import HttpUser, task


class HelloWorldUser(HttpUser):
    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")


if __name__ == "__main__":
    from locust.env import Environment
    env = Environment(user_classes=[HelloWorldUser], host="http://127.0.0.1:9000")
    env.create_local_runner()
    env.create_web_ui()
    env.web_ui.greenlet.join()
import os
import sys
from locust import HttpUser, task


class HelloWorldUser(HttpUser):
    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")


if __name__ == "__main__":
    script_file_path = sys.argv[0]
    print(script_file_path)
    os.system(f"locust -f {script_file_path}")

完整示例

#!/usr/bin/env python3
import gevent
from locust import HttpUser, task, events
from locust.env import Environment
from locust.stats import stats_printer, stats_history
from locust.log import setup_logging

setup_logging("INFO", None)


class MyUser(HttpUser):
    host = "https://docs.locust.io"

    @task
    def t(self):
        self.client.get("/")


# setup Environment and Runner
env = Environment(user_classes=[MyUser], events=events)
runner = env.create_local_runner()

# start a WebUI instance
web_ui = env.create_web_ui("127.0.0.1", 8089)

# execute init event handlers (only really needed if you have registered any)
env.events.init.fire(environment=env, runner=runner, web_ui=web_ui)

# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))

# start a greenlet that save current stats to history
gevent.spawn(stats_history, env.runner)

# start the test
runner.start(1, spawn_rate=10)

# in 60 seconds stop the runner
gevent.spawn_later(60, lambda: runner.quit())

# wait for the greenlets
runner.greenlet.join()

# stop the web server for good measures
web_ui.stop()

示例:百万长连接性能测试

https://zhuanlan.zhihu.com/p/97577744

# locust_test1.py
from locust import HttpLocust, TaskSet, task, between

class UserBehavior(TaskSet):
    def on_start(self):
        # on_start是在task中任何用户开始时都会调用的部分我们一般来进行初始化
        self.login()

    def on_stop(self):
        # on_stop 在停止时调用,我们可以用来回收资源
        self.logout()

    def login(self):
        self.client.post("/login", {"username":"ellen_key", "password":"education"})

    def logout(self):
        self.client.post("/logout", {"username":"ellen_key", "password":"education"})

    # @task装饰器,更方便我们的使用,所有带@task都会进行调用
    @task(2)
    def index(self):
        # 2/3的概率调用获得首页方法
        self.client.get("/")

    @task(1)
    def profile(self):
        # 1/3概率调用获得用户信息方法
        self.client.get("/profile")

class WebsiteUser(HttpLocust):
    host = "http://test.cn"
    # 我们首先给task_set赋值
    task_set = UserBehavior
    # 设定下次调用等待时间,单位为秒
    wait_time = between(5, 9)

接下来我们开始启动测试

  • 启用 WEB 界面:locust -f locust_test1.py 执行后可以去 WEB 界面 http://127.0.0.1:8089 进行控制,
  • 启用无WEB界面的方案 locust -f locust_test1.py --no-web -c 100 -r 20 -t 20m  模拟100用户,按20来进行递增,请求20分钟。

主从模式启动

locust -f locst_test1.py --master
locust -f locst_test1.py --slave --master-host=192.168.110.19

长连接脚本

简单的安装和QG我们都看过了,现在我们开始实战tcp长连接方式。因内部通信协议保密我们使用之前我开源的一个《超快地球物理坐标计算服务器》来进行演示。首先我们使用docker来启动服务器 docker run --rm -t -p 40000:40000 gcontainer/earth-server earth_server -c

我们首先创建一个Socket连接的基础类,主要负责socket连接的建立、收发消息、关闭

class SocketClient(object):

    def __init__(self):
        # 仅在新建实例的时候创建socket.
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def __getattr__(self, name):
        conn = self._socket
        def wrapper(*args, **kwargs):
            # 根据后面做的业务类,不同的方法做不同的处理
            if name == "connect":
                try:
                    conn.connect(args[0])
                except Exception as e:
                    print(e)
            elif name == "send":
                print(' '.join(hex(ord(i)) for i in args[0]))
                conn.sendall(args[0])
                data = conn.recv(1024)
                print(data)
            elif name == "close":
                conn.close()
        return wrapper

接下来我们创建一个实际的业务处理类UserBehavior集成自TaskSet

class UserBehavior(TaskSet):
    def on_start(self):
        # 该方法每用户启动时调用进行连接打开
        self.client.connect((self.locust.host, self.locust.port))
    def on_stop(self):
        # 该方法当程序结束时每用户进行调用,关闭连接
        self.client.close()

    @task(1)
    def sendAddCmd(self):
        # 处理坐标的增加1%的概率调用 该方法
        lat, log = generate_random_gps()
        dataBody = [
            'add ',
            ranstr(6),
            ' ',
            format(log,'f'),
            ' ',
            format(lat,'f'),
            '\x0d','\x0a']
        start_time = time.time()
        # 接下来做实际的网络调用,并通过request_failure和request_success方法分别统计成功和失败的次数以及所消耗的时间
        try:
            self.client.send("".join(dataBody))
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="earthtest", name="add", response_time=total_time, response_length=0, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="earthtest", name="add", response_time=total_time, response_length=0)
    @task(99)
    def sendGetCmd(self):
        lat, log = generate_random_gps()
        dataBody = [
            'get ',
            format(log,'f'),
            ' ',
            format(lat,'f'),
            ' 5',
            '\x0d','\x0a']
        start_time = time.time()
        try:
            self.client.send("".join(dataBody))
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="earthtest", name="get", response_time=total_time, response_length=0, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="earthtest", name="get", response_time=total_time, response_length=0)

最终实现我们的启动类,一个完整的调用过程结束

class SocketUser(SocketLocust):
    # 目标地址
    host = "127.0.0.1"
    # 目标端口
    port = 40000
    task_set = UserBehavior
    wait_time = between(0.1, 1)

我们模拟200用户启动下试试脚本。locust -f locust_tcptest.py --no-web -c 200 -r 50 -t 10m

参考资料

示例:变成性能测试老司机

https://zhuanlan.zhihu.com/p/143892229

编写 locustfile

locustfile 文件

locust 文件只是一个普通的 Python 模块

一个更完整/更现实的测试示例:

import time
from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
    wait_time = between(1, 5)  # 使模拟用户在执行每个任务(见下文)后等待 1 到 5 秒

    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")

    @task(3)
    def view_items(self):
        for item_id in range(10):
            self.client.get(f"/item?id={item_id}", name="/item")
            time.sleep(1)

    def on_start(self):
        self.client.post("/login", json={"username":"foo", "password":"bar"})
  • 继承 HttpUser ,为每个用户提供一个 client 属性,该属性是 HttpSession 的实例,可用于向我们要加载测试的目标系统发出 HTTP 请求。当测试开始时,Locust 将为其模拟的每个用户创建一个此类的实例,并且每个用户都将开始在他们自己的绿色 gevent 线程中运行。要使文件成为有效的 locustfile,它必须包含至少一个继承自 User 的类。
  • @task 方法是 locust 文件的核心。对于每个正在运行的用户,Locust 都会创建一个 greenlet(微线程),它将调用这些方法。其中一个方法被赋予了更高的权重3,没有权重时任务是随机选择的,分配不同的权重,代表执行的次数更多,这里权重是3,说明调用view_items的次数可能是 hello_world 的 三倍

HttpUser 不是真正的浏览器,因此不会解析 HTML 响应来加载资源或呈现页面。不过,它会跟踪 cookie。

@task(3)
def view_items(self):
    for item_id in range(10):
        self.client.get(f"/item?id={item_id}", name="/item")
        time.sleep(1)

在 view_items 任务中,通过查询参数载入了10个不同的url,为了不让在locust的统计状态中显示,可以使用 "/item" 参数进行分组显示

自动生成 locustfile 文件

对于不习惯编写 locustfile 的初学者特别有用。har2locust 仍处于测试阶段。它可能并不总是生成正确的 locustfile,并且其界面可能会在版本之间更改。

User 类

"User 类" 表示系统的一种用户/方案类型。当进行测试运行时,可以指定要模拟的并发用户数,Locust 将为每个用户创建一个实例。可以将任何你喜欢的属性添加到这些类/实例中,但有一些属性对 Locust 有特殊意义:

wait_time 用来在每次任务执行后引入延迟。就是在任务执行结束后等待多长时间,再继续执行下一个任务。等待时间只适用于任务,而不是请求。如果未指定,则下一个任务将在完成后立即执行。通过查看源码可以看到 wait_time 有4个方法可以进行设置:

  • constant 在固定的时间内。示例:wait_time = constant(3)  任务执行结束后等待3秒在继续下一个任务。
  • between 最小值和最大值之间的随机时间。示例:wait_time = between(3.0, 10.5)  任务执行结束后,会等待3.0到10.5秒在继续下一个任务
    from locust import User, task, between
    
    class MyUser(User):
        @task
        def my_task(self):
            print("executing my_task")
    
        wait_time = between(0.5, 10)
  • constant_throughput 吞吐量。示例:如果希望 Locust 在峰值负载下每秒运行 500 次任务迭代,则可以使用 wait_time = constant_throughput(0.1) 和 5000 的用户计数。吞吐量0.1 * 5000用户 = 500 个并发。示例:如果 wait_time = constant_throughput(2) 并在任务中执行两个请求,则请求速率/RPS 将为每个用户 4。
  • constant_pacing 步长,就是每多少秒运行一次。( 相当一 constant_throughput 的倒数 )

也可以直接在类上声明自己的 wait_time 方法。例如,下面的 User 类将休眠一秒钟,然后是两秒钟,然后是三秒钟,依此类推。

class MyUser(User):
    last_wait_time = 0

    def wait_time(self):
        self.last_wait_time += 1
        return self.last_wait_time

关于 -u 和 -r 参数:

  • 因为"User 类" 表示系统的一种用户/方案类型。当进行测试运行时,可以指定要模拟的并发用户数,Locust 将为每个用户创建一个实例。
  • -u 1 -r 1:表示总共只有一个用户,每秒钟加载1个用户(相当于同时开始1并发请求),加载完成后,立马开始不间断的进行请求。同时开始1并发请求,然后一直往上增长进行请求
  • -u 100 -r 20:表示总共有100个用户,每秒钟加载20个用户(相当于同时开始20并发请求),5秒加载完成所有用户。每个用户加载完成后立马开始不间断的进行请求。同时开始20并发请求,然后一直往上增长进行请求
  • -u 100 -r 100:表示总共有100个用户,每秒钟加载100个用户(相当于同时开始100并发请求),每个用户加载完成后立马开始不间断的进行请求。同时开始100并发请求,然后一直往上增长进行请求

weight 和 fixed_count 属性

如果希望模拟更多特定类型的用户,则可以在这些类上设置权重属性。例如,网络用户的可能性是移动用户的三倍:@task采用可选的权重参数,该参数可用于指定任务的执行比率

class WebUser(User):
    weight = 3
    ...

class MobileUser(User):
    weight = 1
    ...

也可以设置属性 fixed_count 。在这种情况下,权重属性将被忽略,并且将生成确切的计数用户。首先生成这些用户。在下面的示例中,将只生成一个 AdminUser 实例,以便进行一些特定的工作,更准确地控制请求计数,而不受用户总数的影响。

class AdminUser(User):
    wait_time = constant(600)
    fixed_count = 1

    @task
    def restart_app(self):
        ...

class WebUser(User):
    ...

host 属性

host 属性是要测试的主机的 URL 前缀(例如 https://google.com )。它会自动添加到请求中,因此您可以这样做 self.client.get("/") 。可以在 Locust 的 Web UI 中或使用该 --host 选项在命令行上覆盖此值。

@tasks 和  task属性

启动负载测试时,将为每个模拟用户创建一个 User 类的实例,并且这些用户将开始在自己的绿色线程中运行。当这些用户运行时,他们会选择他们执行的任务,休眠一段时间,然后选择一个新任务,依此类推。User 类用 @task 装饰器将任务声明为其下的方法,也可以使用 tasks 属性指定任务

@task 装饰器:为用户添加任务的最简单方法是使用 @task 装饰器。

from locust import User, task, constant

class MyUser(User):
    wait_time = constant(1)

    @task
    def my_task(self):
        print("User instance (%r) executing my_task" % self)

tasks 属性:定义用户任务的另一种方法是设置 tasks 属性。

tasks 属性可以是 Tasks 列表,也可以是 字典,其中 Task 是 python 可调用对象或 TaskSet 类。如果任务是普通的 python 函数,则它们会收到一个参数,即执行任务的 User 实例。

from locust import User, constant

def my_task(user):
    pass

class MyUser(User):
    tasks = [my_task]
    wait_time = constant(1)

如果将 tasks 属性指定为列表,则每次执行任务时,都会从 tasks 属性中随机选择该任务。但是,如果 tasks 是一个字典 - 将可调用对象作为键,int 作为值 - 将随机选择要执行的任务,但以 int 作为比率。因此,对于如下所示的任务:{my_task: 3, another_task: 1}  表示 my_task被执行可能性是another_task的 3 倍。在内部,上面的字典实际上将扩展为一个列表(并且属性 tasks 已更新),如下所示:[my_task, my_task, my_task, another_task],然后使用 Python random.choice() 从列表中选择任务。

@tag装饰器

使用 @tag 装饰器标记任务,然后通过 --tags 和 --exclude-tags 参数来选择在测试期间执行的任务。示例:

from locust import User, constant, task, tag

class MyUser(User):
    wait_time = constant(1)

    @tag('tag1')
    @task
    def task1(self):
        pass

    @tag('tag1', 'tag2')
    @task
    def task2(self):
        pass

    @tag('tag3')
    @task
    def task3(self):
        pass

    @task
    def task4(self):
        pass

使用 --tags tag1 启动此测试,则在测试期间将仅执行 task1 和 task2。如果以 --tags tag2 tag3 启动它,则只会执行 task2 和 task3。

event 事件

如果你想在测试中运行一些设置代码,通常把它放在 locustfile 的模块级别就足够了,但有时你需要在运行中的特定时间做一些事情。为了满足这一需求,Locust 提供了事件钩子。

from locust import events

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    print("A new test is starting")

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    print("A new test is ending")

init 初始化

该 init 事件在每个 locust 进程开始时触发。这在分布式模式下特别有用,在分布式模式下,每个工作进程(而不是每个用户)都需要机会进行一些初始化。例如,假设您有一些全局状态,从此过程中生成的所有用户都需要该状态:

from locust import events
from locust.runners import MasterRunner

@events.init.add_listener
def on_locust_init(environment, **kwargs):
    if isinstance(environment.runner, MasterRunner):
        print("I'm on master node")
    else:
        print("I'm on a worker or standalone node")

其他事件

参阅 extending locust using event hooks 以获取其他事件,以及如何使用它们的更多示例。

on_start 和 on_stop 方法

HttpUser 类

HttpUser 是最常用 User 的。它添加了一个 client 用于发出 HTTP 请求的属性。

from locust import HttpUser, task, between

class MyUser(HttpUser):
    wait_time = between(5, 15)

    @task(4)
    def index(self):
        self.client.get("/")

    @task(1)
    def about(self):
        self.client.get("/about/")

client 属性 / HttpSession

client 是 HttpSession 的实例。HttpSession 是 requests.Session 的子类/包装器。就像 requests.Session 一样,它会在请求之间保留 cookie,因此可以轻松用于登录网站。

response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")

验证 response

如果 HTTP 响应代码正常 (<400),则认为请求成功,但对响应进行一些额外的验证通常很有用。可以使用 catch_response 参数、with 语句和对 response.failure() 的调用将请求标记为失败

with self.client.get("/", catch_response=True) as response:
    if response.text != "Success":
        response.failure("Got wrong response")
    elif response.elapsed.total_seconds() > 0.5:
        response.failure("Request took too long")

您还可以将请求标记为成功,即使响应代码错误:

with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        response.success()

您甚至可以通过抛出异常,然后在 with-block 之外捕获它来完全避免记录请求。或者你可以抛出一个 locust 异常让 locust 捕捉到它。

from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        raise RescheduleTask()

对请求进行分组

网站的 URL 包含某种动态参数的页面很常见。通常,在用户的统计信息中将这些 URL 组合在一起是有意义的。这可以通过将 name 参数传递给 HttpSession's 不同的请求方法来完成。

for i in range(10):
    self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")

在某些情况下,可能无法将参数传递到请求函数中,例如在与包装请求会话的库/SDK 交互时。通过设置 client.request_name 属性,提供了对请求进行分组的另一种方法。

self.client.request_name="/blog?id=[id]"
for i in range(10):
    self.client.get("/blog?id=%i" % i)
self.client.request_name=None

如果要使用最少的样板链接多个分组,则可以使用 client.rename_request() 上下文管理器。

@task
def multiple_groupings_example(self):
    # Statistics for these requests will be grouped under: /blog/?id=[id]
    with self.client.rename_request("/blog?id=[id]"):
        for i in range(10):
            self.client.get("/blog?id=%i" % i)

    # Statistics for these requests will be grouped under: /article/?id=[id]
    with self.client.rename_request("/article?id=[id]"):
        for i in range(10):
            self.client.get("/article?id=%i" % i)

使用 catch_response 并直接访问request_meta,您甚至可以根据响应中的某些内容重命名请求。

with self.client.get("/", catch_response=True) as resp:
    resp.request_meta["name"] = resp.json()["name"]

HTTP 代理设置

连接池

由于每个 HttpUser 都会创建新的 HttpSession ,所以每个用户实例都有自己的连接池。这类似于真实用户与 Web 服务器的交互方式。

但是,如果要在所有用户之间共享连接,则可以使用单个池管理器。为此,请将 class 属性设置为 pool_manager 的 urllib3.PoolManager 实例。

from locust import HttpUser
from urllib3 import PoolManager

class MyUser(HttpUser):
    # All users will be limited to 10 concurrent connections at most.
    pool_manager = PoolManager(maxsize=10, block=True)

TaskSets 任务集

TaskSets 是一种对分层网站/系统进行结构化测试的方法。You can read more about it here.

Examples 示例

这里有很多 locustfile 示例: here

分布式 负载

因为使用的是 gevent 协程,所以运行 Locust  单个进程就可以模拟相当高的吞吐量。对于一个简单的测试计划,它每秒发出数百个请求,如果使用 FastHttpUser,则为数千个请求。

但是如果测试计划很复杂,或者想要运行更多负载,则需要横向扩展到多个进程,甚至可能是多台计算机。由于 Python 无法充分利用每个进程的多个内核(参见 GIL),因此可能需要为每个处理器内核运行一个工作器实例才能访问所有计算能力。为此,

  • 如果在同一台主机上,可以使用 --master 参数在主模式下启动一个 Locust 实例,并使用 --worker 参数启动多个工作实例。
  • 如果工作线程与主服务器不在同一台计算机上,则用于 --master-host 将他们指向运行主服务器的计算机的 IP/主机名。

为了简化操作,

  • 可以使用 --processes 来开启多个实例,默认是开启一个 master 进程和指定数的 worker 进程
  • 如果 --processes 和 --worker 同时使用,只会启动指定数的 workers,不会开启 master 进程
  • master 运行 Locust 的 Web 界面,并告诉 worker 何时生成/停止用户。master 本身不运行任何用户。
  • worker 运行您的用户并将统计信息发送回主服务器。每个工作线程可以运行的用户数几乎没有限制。Locust/gevent 每个进程可以运行数千甚至数万个用户,只要它们的总请求速率 (RPS) 不太高。如果 Locust 即将耗尽 CPU 资源,它将记录警告。如果没有警告,您可以非常确定您的测试不受负载生成器 CPU 的限制。

示例 1:单台机器

启动一个主进程和 4 个工作进程非常简单:locust --processes 4

也可以自动检测机器中的内核数量,并为每个内核启动一个工作线程:locust --processes -1

示例 2:多台机器

在一台机器上以主模式启动蝗虫:locust -f my_locustfile.py --master

然后在每台工作机器上:locust -f my_locustfile.py --worker --master-host <your master's address> --processes 4

请注意,主节点和工作节点都需要访问 locustfile,它不会自动从 master 发送到 worker。但是你可以使用 locust-swarm 来自动化它。

跨节点通信

在分布式模式下运行 Locust 时,您可能希望在主节点和工作节点之间进行通信以协调数据。这可以通过使用内置消息挂钩的自定义消息轻松完成:

from locust import events
from locust.runners import MasterRunner, WorkerRunner


# 当worker接收到类型为 test_users 的消息时触发
def setup_test_users(environment, msg, **kwargs):
    for user in msg.data:
        print(f"User {user['name']} received")
    environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")


# 当 master 接收到一个类型为 acknowledge_users 的消息时触发。
def on_acknowledge(msg, **kwargs):
    print(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('test_users', setup_test_users)
    if not isinstance(environment.runner, WorkerRunner):
        environment.runner.register_message('acknowledge_users', on_acknowledge)


@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        users = [
            {"name": "User1"},
            {"name": "User2"},
            {"name": "User3"},
        ]
        environment.runner.send_message('test_users', users)

FastHttpUser

使用更快的 HTTP 客户端提高性能: https://docs.locust.io/en/latest/increase-performance.html#increase-performance.

Locust 的默认 HTTP 客户端使用 python-requests。

from locust import task, FastHttpUser

class MyUser(FastHttpUser):
    @task
    def index(self):
        response = self.client.get("/")

Locust 还附带使用 geventhttpclient 实现的 FastHttpUser 以非常高的吞吐量运行测试。它提供了一个非常相似的 API,并且使用的 CPU 时间要少得多,有时在给定硬件上每秒的最大请求数会增加 5 倍到 6 倍。假设单个 Locust 进程(仅限于一个 CPU 内核)使用 FastHttpUser 每秒可以执行大约 16000 个请求,使用 HttpUser 每秒可以执行 4000 个请求

只要负载生成器 CPU 没有过载,FastHttpUser 的响应时间应该与 HttpUser 的响应时间几乎相同。它不会更快地提出单个请求。

单个 FastHttpUser/geventhttpclient 会话可以并发执行请求,只需为每个请求启动 greenlets:

@task
def t(self):
    def concurrent_request(url):
        self.client.get(url)

    pool = gevent.pool.Pool()
    urls = ["/url1", "/url2", "/url3"]
    for url in urls:
        pool.spawn(concurrent_request, url)
    pool.join()

在调试器中运行测试

在调试器中运行 Locust 在开发测试时非常有用。除此之外,您可以检查特定的响应或检查某些用户实例变量。

但是调试器有时会遇到像 Locust 这样的复杂 gevent 应用程序的问题,而且框架本身发生了很多事情,您可能不感兴趣。为了简化这一点,Locust 提供了一种称为 run_single_user :

from locust import HttpUser, task, run_single_user


class QuickstartUser(HttpUser):
    host = "http://localhost"

    @task
    def hello_world(self):
        with self.client.get("/hello", catch_response=True) as resp:
            pass  # maybe set a breakpoint here to analyze the resp object?


# if launched directly, e.g. "python3 debugging.py", not "locust -f debugging.py"
if __name__ == "__main__":
    run_single_user(QuickstartUser)

它隐式地为请求事件注册一个事件处理程序,以打印有关每个请求的一些统计信息:

可以通过将参数指定为 run_single_user 来准确配置打印的内容。

打印 HTTP 通信

对于 HttpUser ( python-requests):

# put this at the top of your locustfile (or just before the request you want to trace)
import logging
from http.client import HTTPConnection

HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True

对于 FastHttpUser ( geventhttpclient):

import sys
...

class MyUser(FastHttpUser):
    @task
    def t(self):
        self.client.get("http://example.com/", debug_stream=sys.stderr)

无头 模式

可以在没有 Web UI 的情况下运行 locust 通过将 --headless 与 -u/--users 和 -r/--spawn-rate 一起使用:locust -f locust_files/my_locust_file.py --headless -u 100 -r 5

即使在无头模式下,您也可以在测试运行时更改用户计数。按下 w 可添加 1 个用户或 W 添加 10 个用户。按下 s 可移除 1 或 S 移除 10。

若要指定测试的运行时间,请使用 -t/--run-time :

locust --headless -u 100 --run-time 1h30m
$ locust --headless -u 100 --run-time 60 # default unit is seconds

默认情况下,Locust 会立即停止您的任务(甚至无需等待请求完成)。要给正在运行的任务一些时间来完成迭代,请使用 -s/--stop-timeout :locust --headless --run-time 1h30m --stop-timeout 10s

如果要在没有 Web UI 的情况下运行 Locust 分布式,则应在启动主节点时指定 --expect-workers 选项,以指定预期连接的工作节点数。然后,它将等到许多工作节点连接后再开始测试。

Event hooks

Locust 带有许多事件钩子,可用于以不同的方式扩展 Locust。

例如,下面介绍如何设置在请求完成后触发的事件侦听器:

from locust import events

@events.request.add_listener
def my_request_handler(request_type, name, response_time, response_length, response,
                       context, exception, start_time, url, **kwargs):
    if exception:
        print(f"Request to {name} failed with exception {exception}")
    else:
        print(f"Successfully made a request to: {name}")
        print(f"The response was {response.text}")

在分布式模式下运行 locust 时,在运行测试之前在工作节点上进行一些设置可能很有用。您可以通过检查节点的类型来检查以确保您没有在主节点上运行 runner :

from locust import events
from locust.runners import MasterRunner

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    if not isinstance(environment.runner, MasterRunner):
        print("Beginning test setup")
    else:
        print("Started test from Master node")

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    if not isinstance(environment.runner, MasterRunner):
        print("Cleaning up test data")
    else:
        print("Stopped test from Master node")

若要查看可用事件的完整列表,请参阅事件挂钩。Event hooks.

请求上下文

有一个 request event context 参数,使您能够传递有关请求的数据(例如用户名、标签等)。它可以直接在对请求方法的调用中设置,也可以在用户级别通过重写 User.context() 方法进行设置。

class MyUser(HttpUser):
    @task
    def t(self):
        self.client.post("/login", json={"username": "foo"})
        self.client.get("/other_request", context={"username": "foo"})

    @events.request.add_listener
    def on_request(context, **kwargs):
        if context:
            print(context["username"])

来自用户实例的上下文:

class MyUser(HttpUser):
    def context(self):
        return {"username": self.username}

    @task
    def t(self):
        self.username = "foo"
        self.client.post("/login", json={"username": self.username})

    @events.request.add_listener
    def on_request(context, **kwargs):
        print(context["username"])

响应中值的上下文,使用catch_response:

with self.client.get("/", catch_response=True) as resp:
    resp.request_meta["context"]["requestId"] = resp.json()["requestId"]

添加 Web 路由

Locust 使用 Flask 来提供 Web UI,因此很容易将 Web 端点添加到 Web UI。通过侦听事件 init ,我们可以检索对 Flask 应用实例的引用,并使用它来设置新路由:

from locust import events

@events.init.add_listener
def on_locust_init(environment, **kw):
    @environment.web_ui.app.route("/added_page")
    def my_added_page():
        return "Another page"

您现在应该能够启动 Locust 并浏览到 http://127.0.0.1:8089/added_page

扩展 Web UI

作为添加简单 Web 路由的替代方法,您可以使用 Flask 蓝图和模板不仅可以添加路由,还可以扩展 Web UI,以便与内置的 Locust 统计信息一起显示自定义数据。这更高级,因为它还涉及编写和包含路由提供的 HTML 和 Javascript 文件,但可以大大增强 Web UI 的实用性和可定制性。

扩展 Web UI 的工作示例,包括 HTML 和 Javascript 示例文件,可以在 Locust 源代码的 examples 目录中找到。

运行后台 greenlet 

因为蝗虫文件“只是代码”,所以没有什么能阻止你生成自己的 greenlet 与你的实际负载/用户并行运行。

import gevent
from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, MasterRunner, LocalRunner

def checker(environment):
    while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
        time.sleep(1)
        if environment.runner.stats.total.fail_ratio > 0.2:
            print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
            environment.runner.quit()
            return


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    # dont run this on workers, we only care about the aggregated numbers
    if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
        gevent.spawn(checker, environment)

Logging 日志 

Locust 使用 Python 内置的日志记录框架来处理日志记录。

使用 locust 进行爬虫

既然可以发起请求测试,那么肯定可以用来爬虫爬数据。

https://docs.locust.io/en/latest/writing-a-locustfile.html#user-class

client 属性 / HttpSession

client 是 HttpSession 的实例。HttpSession 是 requests.Session 的子类/包装器。就像 requests.Session 一样,它会在请求之间保留 cookie,因此可以轻松用于登录网站。

response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")

验证 response

如果 HTTP 响应代码正常 (<400),则认为请求成功,但对响应进行一些额外的验证通常很有用。可以使用 catch_response 参数、with 语句和对 response.failure() 的调用将请求标记为失败

with self.client.get("/", catch_response=True) as response:
    if response.text != "Success":
        response.failure("Got wrong response")
    elif response.elapsed.total_seconds() > 0.5:
        response.failure("Request took too long")

您还可以将请求标记为成功,即使响应代码错误:

with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        response.success()

您甚至可以通过抛出异常,然后在 with-block 之外捕获它来完全避免记录请求。或者你可以抛出一个 locust 异常让 locust 捕捉到它。

from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        raise RescheduleTask()

对请求进行分组

网站的 URL 包含某种动态参数的页面很常见。通常,在用户的统计信息中将这些 URL 组合在一起是有意义的。这可以通过将 name 参数传递给 HttpSession's 不同的请求方法来完成。

for i in range(10):
    self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")

在某些情况下,可能无法将参数传递到请求函数中,例如在与包装请求会话的库/SDK 交互时。通过设置 client.request_name 属性,提供了对请求进行分组的另一种方法。

self.client.request_name="/blog?id=[id]"
for i in range(10):
    self.client.get("/blog?id=%i" % i)
self.client.request_name=None

如果要使用最少的样板链接多个分组,则可以使用 client.rename_request() 上下文管理器。

@task
def multiple_groupings_example(self):
    # Statistics for these requests will be grouped under: /blog/?id=[id]
    with self.client.rename_request("/blog?id=[id]"):
        for i in range(10):
            self.client.get("/blog?id=%i" % i)

    # Statistics for these requests will be grouped under: /article/?id=[id]
    with self.client.rename_request("/article?id=[id]"):
        for i in range(10):
            self.client.get("/article?id=%i" % i)

使用 catch_response 并直接访问request_meta,您甚至可以根据响应中的某些内容重命名请求。

with self.client.get("/", catch_response=True) as resp:
    resp.request_meta["name"] = resp.json()["name"]

HTTP 代理设置

连接池

由于每个 HttpUser 都会创建新的 HttpSession ,所以每个用户实例都有自己的连接池。这类似于真实用户与 Web 服务器的交互方式。

但是,如果要在所有用户之间共享连接,则可以使用单个池管理器。为此,请将 class 属性设置为 pool_manager 的 urllib3.PoolManager 实例。

from locust import HttpUser
from urllib3 import PoolManager

class MyUser(HttpUser):
    # All users will be limited to 10 concurrent connections at most.
    pool_manager = PoolManager(maxsize=10, block=True)

限制 并发数

注意:locust 如果未指定 wait_time 则一个任务完成后立即执行下一个任务。所以一定要限制并发数,防止对网站造成 DDOS 共计。可以在每个任务之间设置等待时间,来限制并发数。

User 类的 wait_time 的方法可以很容易地在每次任务执行后引入延迟。

class MyUser(User):
    # wait between 3.0 and 10.5 seconds after each task
    wait_time = between(3.0, 10.5)

  • constant 在固定的时间内
from locust import User, task, constant

class MyUser(User):
    wait_time = constant(1)

    @task
    def my_task(self):
        print("User instance (%r) executing my_task" % self)
  • between 最小值和最大值之间的随机时间
from locust import User, task, between

class MyUser(User):
    @task
    def my_task(self):
        print("executing my_task")

    wait_time = between(0.5, 10)

例如,如果希望 Locust 在峰值负载下每秒运行 500 次任务迭代,则可以使用 wait_time = constant_throughput(0.1) 和 5000 的用户计数。

等待时间只会限制吞吐量,而不能启动新用户来达到目标。因此,在我们的示例中,如果任务迭代时间超过 10 秒,则吞吐量将小于 500。

分布式 爬虫

可以使用redis 作为任务队列,每个 work 从redis 获取任务

示例 代码

test.py

from gevent import monkey; monkey.patch_all()
import gevent
import redis
from locust import task, FastHttpUser, constant
from locust.exception import RescheduleTask
from bs4 import BeautifulSoup


rk = "task_queue"
redis_conn = redis.StrictRedis(
    host='127.0.0.1', port=6379, db=0,
    # password='xxx', 
    decode_responses=True
)


class MyUser(FastHttpUser):
    # 用来限制并发, 每0.5秒发送一个请求
    wait_time = constant(0.5)

    @staticmethod
    def get_redis_task():
        task_byte = redis_conn.spop(rk)
        if not task_byte:
            return None
        task_string = task_byte.decode('utf8')
        return task_string

    @task
    def crawl(self):
        task_string = self.get_redis_task()
        if not task_string:
            raise RescheduleTask()
            return None
        req_url = task_string
        with self.client.get(req_url, catch_response=True) as response:
            if response.status_code != 200:
                response.failure("请求失败")
            else:
                response.encoding = "utf-8"
                resp_text = response.text
                soup = BeautifulSoup(resp_text, "html.parser", from_encoding='utf-8')
                img_list = soup.find_all('img')
                list(map(lambda x=None: print(x["src"]), img_list))


def add_redis_task():
    # 直接搜索 meinv ,随便找一个网站
    for page_num in range(1, 100):
        # https://www.meinvtu1234.cc/a/list_1_3.html
        url = f"https://www.meinvtu1234.cc/a/list_1_{page_num}.html"
        redis_conn.sadd(rk, url)
    pass


if __name__ == '__main__':
    gevent.spawn(add_redis_task).join()

首先添加任务:python test.py

然后 locust 执行:locust -f test.py

浏览器打开 url

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐