异步编程与网络基础
第4步:异步编程与网络基础
4.1 同步与异步编程
概念:什么是同步与异步?
- 同步(Synchronous):代码按顺序执行,每个操作必须等待前一个操作完成后才能开始。简单直观,但效率较低。
- 异步(Asynchronous):操作发起后不等待结果返回,继续执行后续代码,通过回调、事件或协程机制处理结果。效率高,但编程模型复杂。
同步如同排队点餐,必须等前一个人完成才能点;异步如同取号点餐,号到了再处理。
同步 vs 异步代码对比
# 同步代码
import time
def sync_task():
print("开始任务1")
time.sleep(2) # 等待2秒
print("任务1完成")
def sync_main():
sync_task()
sync_task() # 必须等第一个完成
print("全部完成") # 4秒后
# 异步代码
import asyncio
async def async_task():
print("开始任务1")
await asyncio.sleep(2) # 模拟异步IO
print("任务1完成")
async def async_main():
await asyncio.gather(
async_task(),
async_task()
)
print("全部完成") # 约2秒后(并行执行)
# asyncio.run(async_main())
阻塞 vs 非阻塞
- 阻塞(Blocking):调用函数时,当前线程被挂起,等待操作完成
- 非阻塞(Non-blocking):调用函数时立即返回,不等待操作完成
并发 vs 并行
- 并发(Concurrency):多个任务交替执行,通过快速切换让人感觉同时执行
- 并行(Parallelism):多个任务真正同时执行,需要多核 CPU
# 并发:单线程中协程交替执行(asyncio)
async def task1():
await asyncio.sleep(1)
async def task2():
await asyncio.sleep(1)
# asyncio.gather(task1(), task2()) # 单线程并发
# 并行:多核 CPU 同时执行(多进程)
import concurrent.futures
def cpu_task(n):
return sum(i * i for i in range(n))
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_task, [10**6, 10**6]))
4.2 异步编程 asyncio
概念:asyncio 的作用
asyncio 是 Python 标准库中用于编写异步代码的框架,基于协程(Coroutine)和事件循环(Event Loop)。它允许单个线程同时处理多个 I/O 密集型任务,如网络请求、文件读写等。
事件循环
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# 推荐方式(Python 3.7+):自动管理事件循环
asyncio.run(main())
# 旧式手动管理(不推荐,仅了解)
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(main())
# loop.close()
async/await 语法
import asyncio
# async def 定义协程函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # await 等待另一个协程
print("World")
# 调用协程不会立即执行
coro = say_hello()
# 必须用事件循环运行
asyncio.run(coro)
# await 只能在 async 函数中使用
async def main():
await say_hello() # 等待协程完成
asyncio.run(main())
协程函数 vs 普通函数
def regular_func():
return "普通函数"
async def async_func():
return "协程函数"
# 调用方式不同
print(regular_func()) # "普通函数"(直接返回结果)
print(async_func()) # <coroutine object async_func at ...>(返回协程对象)
result = asyncio.run(async_func()) # 需要事件循环运行
print(result) # "协程函数"
awaitable 对象
以下类型都可以被 await:
- 协程(coroutine)
- Task
- Future
- 支持 await 协议的对象
import asyncio
async def task():
return "result"
# 协程
asyncio.run(task())
# Task:用于并发调度协程
async def main():
task1 = asyncio.create_task(task())
task2 = asyncio.create_task(task())
results = await asyncio.gather(task1, task2)
print(results) # ['result', 'result']
asyncio.run(main())
4.3 并发任务管理
概念:并发任务管理
在异步编程中,并发任务管理是指如何同时运行多个协程、协调它们的执行顺序、处理它们的结果。asyncio 提供了多种工具来创建、管理和等待协程任务,如 gather、create_task、wait 等。
gather - 并发执行多个协程
概念:gather 用于并发执行多个协程,并收集它们的结果。它会等待所有协程完成,返回结果列表。如果任一协程抛出异常,gather 默认会立即将第一个异常传播给调用者(其他协程仍会继续运行)。使用 return_exceptions=True 可将异常作为结果返回而非抛出。
import asyncio
async def fetch_data(id):
print(f"任务 {id} 开始")
await asyncio.sleep(1)
print(f"任务 {id} 完成")
return f"数据 {id}"
async def main():
# 并发执行,返回结果列表
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(results) # ['数据 1', '数据 2', '数据 3']
asyncio.run(main())
# 总耗时约1秒(并行),而不是3秒(串行)
create_task - 创建任务
概念:create_task 用于将协程包装为 Task 对象并立即调度执行。相比直接 await 协程,create_task 不会阻塞,可以同时创建多个任务并发运行。
import asyncio
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return name
async def main():
# 创建任务(立即调度)
t1 = asyncio.create_task(task("A", 2))
t2 = asyncio.create_task(task("B", 1))
print("任务已创建")
# 等待所有任务
results = await asyncio.gather(t1, t2)
print(results)
asyncio.run(main())
wait - 等待任务完成
概念:wait 用于等待一组任务完成,返回两个集合:已完成的任务和待定(未完成)的任务。它支持 timeout 参数,可以设置最大等待时间。
import asyncio
async def task(name):
await asyncio.sleep(1)
return name
async def main():
# 创建多个任务
tasks = [asyncio.create_task(task(i)) for i in range(5)]
# 等待完成(可设置超时)
done, pending = await asyncio.wait(tasks, timeout=2)
print(f"完成: {len(done)}, 待定: {len(pending)}")
for t in done:
print(await t)
asyncio.run(main())
as_completed - 按完成顺序处理
概念:as_completed 返回一个迭代器,按任务实际完成的顺序 yield Future 对象。与 gather 不同,gather 是按创建顺序返回结果,as_completed 是按完成顺序。
import asyncio
async def task(id):
await asyncio.sleep(id)
return id
async def main():
tasks = [task(3), task(1), task(2)]
# 按完成顺序返回
for future in asyncio.as_completed(tasks):
result = await future
print(f"完成: {result}")
# 输出顺序: 1, 2, 3(按实际完成时间)
gather 的并发控制
概念:asyncio.gather 本身没有限制并发数量的参数,会同时调度所有传入的协程。如果需要限制并发数量,可以使用 asyncio.Semaphore(信号量)来控制同时执行的任务数量。
import asyncio
async def fetch_data(id, semaphore):
async with semaphore:
# 只有获得信号量的任务才能执行
await asyncio.sleep(1)
return f"数据 {id}"
async def main():
# 限制最多 5 个并发
semaphore = asyncio.Semaphore(5)
# 100 个任务,但同时最多只有 5 个在执行
tasks = [fetch_data(i, semaphore) for i in range(100)]
results = await asyncio.gather(*tasks)
asyncio.run(main())
# 另一种方式:分批执行
async def fetch_data(id):
await asyncio.sleep(1)
return f"数据 {id}"
async def main():
all_tasks = [fetch_data(i) for i in range(100)]
batch_size = 5
# 每批 5 个,逐批执行
for i in range(0, len(all_tasks), batch_size):
batch = all_tasks[i:i + batch_size]
await asyncio.gather(*batch)
4.4 异步上下文管理器与迭代器
概念:异步上下文管理器与迭代器
异步上下文管理器用于异步资源的管理,类似同步的 with 语句,但支持 await 操作。异步迭代器则允许在 for 循环中使用 await,适合遍历异步生成的数据流。它们是 asyncio 编程中处理资源清理和数据流的重要工具。
异步上下文管理器
概念:异步上下文管理器通过 async with 语句使用,实现 __aenter__ 和 __aexit__ 方法来管理异步资源,如数据库连接、网络连接等。
import asyncio
# async with 用于异步资源管理
class AsyncResource:
async def __aenter__(self):
print("获取资源")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(0.1)
async def main():
async with AsyncResource() as resource:
print("使用资源")
await asyncio.sleep(1)
asyncio.run(main())
contextlib 异步支持
概念:contextlib 的 @asynccontextmanager 装饰器可以将一个异步生成器函数转换为异步上下文管理器,使资源管理代码更简洁。
import asyncio
import contextlib
@contextlib.asynccontextmanager
async def managed_resource():
print("获取")
yield "resource"
print("释放")
async def main():
async with managed_resource() as r:
print(f"使用 {r}")
asyncio.run(main())
异步迭代器
概念:异步迭代器实现了 __aiter__ 和 __anext__ 方法,可以在迭代过程中使用 await。适用于需要异步获取数据的场景。
import asyncio
# 异步迭代器:实现 __aiter__ 和 __anext__
class AsyncCounter:
def __init__(self, max):
self.max = max
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current < self.max:
result = self.current
self.current += 1
return result
raise StopAsyncIteration
async def main():
async for i in AsyncCounter(5):
print(i)
asyncio.run(main())
异步生成器
import asyncio
# 异步生成器:使用 yield 的 async 函数
async def async_gen(n):
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
async for i in async_gen(5):
print(i)
asyncio.run(main())
# 列表推导式
result = [i async for i in async_gen(5)]
print(result) # [0, 1, 2, 3, 4]
4.5 异步队列与信号量
概念:异步队列与信号量
异步队列(Queue)用于协程之间的数据传递,类似生产者-消费者模式。信号量(Semaphore)用于控制同时访问资源的协程数量,避免资源竞争。事件(Event)和条件(Condition)则用于协程之间的同步和通信。
asyncio.Queue - 异步队列
概念:asyncio.Queue 是协程安全的队列实现,用于在协程之间传递数据。支持 FIFO 队列、优先级队列和 LIFO 队列(堆栈)。
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
print(f"生产: {i}")
await asyncio.sleep(0.5)
async def consumer(queue):
while True:
item = await queue.get()
print(f"消费: {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 并发运行生产者和消费者
await asyncio.gather(
producer(queue),
consumer(queue)
)
asyncio.run(main())
队列方法
概念:asyncio.Queue 提供了 put、get、task_done、join 等方法。put/get 是异步的,会自动处理阻塞;join 阻塞直到队列被清空。
import asyncio
async def main():
queue = asyncio.Queue(maxsize=2)
# put - 添加元素(满时阻塞)
await queue.put(1)
await queue.put(2)
print(queue.full()) # True
# get - 获取元素(空时阻塞)
item = await queue.get()
print(item) # 1
# 工具方法
print(queue.qsize()) # 队列大小
print(queue.empty()) # 是否为空
print(queue.full()) # 是否已满
# task_done - 标记任务完成(必须在 get 之后、join 之前调用)
queue.task_done()
# join - 等待队列清空(所有 get 的元素都被 task_done 标记)
await queue.join()
asyncio.Semaphore - 信号量
概念:Semaphore 是一种计数器,用于控制同时访问某个资源的协程数量。常用于限制并发连接数、数据库连接池大小等。
import asyncio
# 信号量:控制并发数量
semaphore = asyncio.Semaphore(2)
async def task(id):
async with semaphore:
print(f"任务 {id} 开始")
await asyncio.sleep(1)
print(f"任务 {id} 完成")
async def main():
tasks = [task(i) for i in range(5)]
await asyncio.gather(*tasks)
# 只有2个任务同时执行
asyncio.run(main())
asyncio.Event - 事件
概念:Event 用于协程之间的通知机制。一个协程可以等待事件被设置(set),另一个协程可以触发事件(set)。事件被设置后,所有等待的协程都会恢复执行。
import asyncio
async def waiter(event):
print("等待事件...")
await event.wait() # 阻塞直到事件被设置
print("事件已触发!")
async def setter(event):
await asyncio.sleep(2)
event.set() # 触发事件
async def main():
event = asyncio.Event() # 在协程内创建,避免跨事件循环问题
await asyncio.gather(waiter(event), setter(event))
asyncio.run(main())
asyncio.Condition - 条件
概念:Condition 是 Event 的扩展,允许协程等待某个条件变为真。与 Event 不同,Condition 需要配合 async with 使用,并且支持 notify 唤醒指定数量的等待协程。
import asyncio
async def consumer(condition):
async with condition:
await condition.wait() # 等待条件
print("消费者: 收到通知")
async def producer(condition):
await asyncio.sleep(1)
async with condition:
condition.notify() # 通知等待者
async def main():
condition = asyncio.Condition() # 在协程内创建
await asyncio.gather(consumer(condition), producer(condition))
asyncio.run(main())
4.6 HTTP 协议基础
概念:HTTP 是什么?
HTTP(HyperText Transfer Protocol)是万维网的数据传输协议,基于请求-响应模式。客户端发送请求,服务器返回响应。HTTP/1.1 是目前最广泛使用的版本,HTTP/2 和 HTTP/3 正在逐步普及。
HTTP 请求结构
请求方法 URL路径 HTTP版本
Header1: value1
Header2: value2
请求体(POST/PUT)
GET /index.html HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0
Accept: text/html
(GET 没有请求体)
HTTP 响应结构
HTTP版本 状态码 状态描述
Header1: value1
Header2: value2
响应体
HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: 1234
<html>...</html>
请求方法
| 方法 | 用途 | 幂等 | 安全性 |
|---|---|---|---|
| GET | 获取资源 | ✓ | ✓ |
| POST | 创建资源 | ✗ | ✗ |
| PUT | 更新资源(整体) | ✓ | ✗ |
| PATCH | 部分更新 | ✗ | ✗ |
| DELETE | 删除资源 | ✓ | ✗ |
| HEAD | 获取头部 | ✓ | ✓ |
| OPTIONS | 获取支持的 methods | ✓ | ✓ |
状态码
| 类别 | 范围 | 含义 |
|---|---|---|
| 1xx | 100-199 | 信息性响应 |
| 2xx | 200-299 | 成功(200 OK, 201 Created, 204 No Content) |
| 3xx | 300-399 | 重定向(301 Moved, 304 Not Modified) |
| 4xx | 400-499 | 客户端错误(400 Bad Request, 401 Unauthorized, 403 Forbidden, 404 Not Found) |
| 5xx | 500-599 | 服务器错误(500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable) |
请求头与响应头
# 常用请求头
GET /api/users HTTP/1.1
Host: api.example.com
Authorization: Bearer token123
Content-Type: application/json
Accept: application/json
User-Agent: MyApp/1.0
Cookie: session_id=abc123
# 常用响应头
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 256
Cache-Control: max-age=3600
Set-Cookie: session=xyz; HttpOnly
Access-Control-Allow-Origin: *
4.7 HTTP 请求与响应
概念:HTTP 请求与响应
HTTP 请求是客户端向服务器发送的数据包,包含请求方法、URL、头部和可选的请求体。HTTP 响应是服务器返回给客户端的数据包,包含状态码、头部和响应体。Python 提供了 urllib 和 requests 等库来发送 HTTP 请求和处理响应。
urllib 发送请求
概念:urllib 是 Python 标准库,包含 urlopen 函数用于发送 HTTP 请求。适合简单的 GET/POST 请求,但 API 相对底层。
from urllib import request, parse
# GET 请求
with request.urlopen("https://httpbin.org/get") as response:
print(response.status) # 200
print(response.read().decode())
# 带查询参数
params = parse.urlencode({"page": 1, "limit": 10})
url = f"https://httpbin.org/get?{params}"
with request.urlopen(url) as response:
data = response.read().decode()
# POST 请求
data = parse.urlencode({"name": "Alice", "age": 25}).encode()
with request.urlopen("https://httpbin.org/post", data=data) as response:
print(response.read().decode())
# 设置请求头
req = request.Request(
"https://httpbin.org/headers",
headers={
"User-Agent": "MyApp/1.0",
"Authorization": "Bearer token123"
},
method="GET"
)
with request.urlopen(req) as response:
print(response.read().decode())
requests 库(更推荐)
概念:requests 是目前最流行的 HTTP 库,API 设计人性化,支持会话、Cookie、认证等高级功能。建议日常使用 requests 而不是 urllib。
pip install requests
import requests
# GET 请求
response = requests.get("https://httpbin.org/get")
print(response.status_code)
print(response.json()) # 自动解析 JSON
# 带参数
params = {"page": 1, "limit": 10}
response = requests.get("https://httpbin.org/get", params=params)
# POST 请求
data = {"name": "Alice", "age": 25}
response = requests.post("https://httpbin.org/post", json=data)
# 设置请求头
headers = {"Authorization": "Bearer token123"}
response = requests.get("https://httpbin.org/headers", headers=headers)
# 处理响应
print(response.text) # 原始文本
print(response.json()) # JSON 数据
print(response.headers) # 响应头
print(response.cookies) # Cookies
requests 进阶用法
概念:requests 提供了丰富的高级功能,包括文件上传下载、会话管理、自动处理编码、超时控制、重试机制等。
import requests
# 上传文件
with open("file.txt", "rb") as f:
files = {"file": f}
response = requests.post("https://httpbin.org/post", files=files)
# 下载文件
response = requests.get("https://httpbin.org/image", stream=True)
with open("image.png", "wb") as f:
for chunk in response.iter_content(1024):
f.write(chunk)
# 设置超时
response = requests.get("https://httpbin.org/delay/5", timeout=3)
# 处理错误
try:
response = requests.get("https://httpbin.org/status/404")
response.raise_for_status()
except requests.HTTPError as e:
print(f"HTTP 错误: {e}")
# Session(保持 Cookie)
session = requests.Session()
session.headers.update({"Authorization": "Bearer token"})
response = session.get("https://httpbin.org/headers")
4.8 RESTful API 设计
概念:什么是 REST?
REST(Representational State Transfer)是一种软件架构风格,用于设计网络应用程序的 API。RESTful API 基于 HTTP 协议,使用标准的请求方法(GET/POST/PUT/DELETE)对资源进行操作。
RESTful 设计原则
概念:RESTful 设计原则包括:使用 HTTP 方法表达操作、使用名词表示资源、使用复数形式、使用 HTTP 状态码、保持无状态。这些原则使 API 更加清晰、一致和易于理解。
# 资源设计
# 用户资源
GET /users # 获取用户列表
GET /users/{id} # 获取单个用户
POST /users # 创建用户
PUT /users/{id} # 更新用户(整体)
PATCH /users/{id} # 部分更新用户
DELETE /users/{id} # 删除用户
# 订单资源(嵌套)
GET /users/{id}/orders # 获取用户的订单
GET /orders/{id} # 获取单个订单
POST /users/{id}/orders # 为用户创建订单
DELETE /orders/{id} # 删除订单
请求与响应格式
概念:RESTful API 的请求体通常使用 JSON 格式,响应也返回 JSON。成功的创建操作返回 201 Created,更新操作返回 200 OK 或 204 No Content,删除操作返回 204 No Content。
# 创建用户 POST /users
# 请求体
{
"name": "Alice",
"email": "alice@example.com",
"password": "secret123"
}
# 响应 201 Created
{
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"created_at": "2024-01-15T10:30:00Z"
}
# 错误响应
# 响应 400 Bad Request
{
"error": "Validation Error",
"message": "邮箱格式不正确",
"details": {
"email": "无效的邮箱格式"
}
}
过滤、排序、分页
概念:RESTful API 通过查询参数实现资源过滤、排序和分页。过滤使用 field=value 格式,排序使用 sort=field&order=desc,常见分页方式有 page/per_page 和 offset/limit 两种。
# 过滤
GET /users?status=active&role=admin
# 排序
GET /users?sort=created_at&order=desc
# 分页
GET /users?page=2&per_page=20
# 响应包含分页信息
{
"data": [...],
"pagination": {
"page": 2,
"per_page": 20,
"total": 100,
"total_pages": 5
}
}
4.9 TCP/UDP Socket 基础
概念:TCP vs UDP
- TCP(Transmission Control Protocol):面向连接的协议,提供可靠的数据传输。适合需要保证数据完整性的场景(如 HTTP、邮件)。
- UDP(User Datagram Protocol):无连接的协议,不保证可靠性,但速度快。适合实时性要求高的场景(如视频流、DNS)。
TCP Socket 服务器
概念:TCP 是面向连接的协议,服务器需要先监听端口(listen),然后接受客户端连接(accept)。每个客户端连接都会创建一个独立的 socket 进行通信。
import socket
def start_tcp_server(host="127.0.0.1", port=8888):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"服务器启动: {host}:{port}")
while True:
client_socket, address = server_socket.accept()
print(f"客户端连接: {address}")
# 处理请求
data = client_socket.recv(1024)
print(f"收到数据: {data.decode()}")
# 发送响应
response = "Hello from server"
client_socket.send(response.encode())
client_socket.close()
start_tcp_server()
TCP Socket 客户端
概念:TCP 客户端通过 connect 方法连接到服务器,然后使用 send/recv 进行数据收发。客户端不需要监听端口,直接与服务端建立连接。
import socket
def tcp_client():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(("127.0.0.1", 8888))
# 发送数据
message = "Hello from client"
client_socket.send(message.encode())
# 接收响应
response = client_socket.recv(1024)
print(f"收到响应: {response.decode()}")
client_socket.close()
tcp_client()
UDP Socket
概念:UDP 是无连接协议,服务器不需要 accept 客户端连接,客户端也不需要 connect。数据以数据报(datagram)为单位发送,直接发送到目标地址。
import socket
# UDP 服务器
def udp_server(host="127.0.0.1", port=8888):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind((host, port))
print(f"UDP 服务器启动: {host}:{port}")
while True:
data, address = server_socket.recvfrom(1024)
print(f"收到 {address}: {data.decode()}")
response = "Message received"
server_socket.sendto(response.encode(), address)
# UDP 客户端
def udp_client():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
message = "Hello UDP"
client_socket.sendto(message.encode(), ("127.0.0.1", 8888))
data, server = client_socket.recvfrom(1024)
print(f"收到响应: {data.decode()}")
client_socket.close()
异步 Socket(asyncio)
import asyncio
async def handle_client(reader, writer):
address = writer.get_extra_info('peername')
print(f"客户端连接: {address}")
data = await reader.read(1024)
message = data.decode()
print(f"收到: {message}")
response = "Hello from async server"
writer.write(response.encode())
await writer.drain()
writer.close()
await writer.wait_closed() # Python 3.7+ 需要等待关闭完成
print(f"客户端断开: {address}")
async def tcp_server():
server = await asyncio.start_server(
handle_client, "127.0.0.1", 8888
)
print("异步服务器启动")
async with server:
await server.serve_forever()
asyncio.run(tcp_server())
4.10 并发与并行进阶
概念:GIL 的影响
Python 的 GIL(Global Interpreter Lock)限制了同一时刻只有一个线程执行 Python 字节码。这使得 CPU 密集型任务无法通过多线程获得性能提升,但 I/O 密集型任务(如网络请求)仍能从异步编程中受益。
多进程 vs 多线程 vs 异步
概念:多进程(ProcessPoolExecutor)利用多核 CPU,适合 CPU 密集型任务。多线程(ThreadPoolExecutor)适合 I/O 密集型但需要阻塞等待的场景。异步(asyncio)适合高并发 I/O 密集型任务,单线程即可处理大量并发连接。
# CPU 密集型:使用多进程
from concurrent.futures import ProcessPoolExecutor
def cpu_task(n):
return sum(i * i for i in range(n))
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_task, [10**6] * 8))
# I/O 密集型:使用异步
import asyncio
async def io_task(url):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["https://example.com"] * 100
results = await asyncio.gather(*[io_task(url) for url in urls])
asyncio.run(main())
# I/O 密集型:也可以使用多线程
from concurrent.futures import ThreadPoolExecutor
import requests
def io_task(url):
return requests.get(url).text
with ThreadPoolExecutor() as executor:
results = list(executor.map(io_task, urls))
aiohttp 异步 HTTP 客户端
pip install aiohttp
import aiohttp
import asyncio
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
urls = ["https://httpbin.org/delay/1"] * 10
async def main():
import time
start = time.time()
results = await fetch_all(urls)
print(f"耗时: {time.time() - start:.2f}s") # 约1秒(并行)
asyncio.run(main())
concurrent.futures 并发
概念:concurrent.futures 提供了 ThreadPoolExecutor(线程池)和 ProcessPoolExecutor(进程池)。线程池适合 I/O 密集型任务,进程池可以绕过 GIL,适合 CPU 密集型任务。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# 线程池:适合 I/O 密集型
def io_task(n):
time.sleep(1)
return n
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_task, range(10)))
print(f"耗时: {len(results)} 秒") # 约1秒
# 进程池:适合 CPU 密集型
def cpu_task(n):
return sum(i * i for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, range(100)))
4.11 WebSocket 通信
概念:WebSocket 的特点
WebSocket 是一种在单个 TCP 连接上提供全双工通信的协议。与 HTTP 不同,WebSocket 允许服务器主动向客户端推送数据,适合实时应用如聊天、游戏、实时协作等。
websocket-client 客户端
概念:websocket-client 是一个第三方 WebSocket 客户端库,基于线程实现。适合与现有同步代码集成,使用回调函数处理消息。
pip install websocket-client
import websocket
import json
import threading
import time
class WebSocketClient:
def __init__(self, url):
self.url = url
self.ws = None
self.connected = False
def on_message(self, ws, message):
print(f"收到消息: {message}")
def on_error(self, ws, error):
print(f"错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("连接关闭")
self.connected = False
def on_open(self, ws):
print("连接打开")
self.connected = True
# 发送消息
ws.send(json.dumps({"type": "hello", "data": "world"}))
def connect(self):
self.ws = websocket.WebSocketApp(
self.url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# 在新线程中运行
thread = threading.Thread(target=self.ws.run_forever)
thread.daemon = True
thread.start()
def send(self, data):
if self.connected:
self.ws.send(json.dumps(data))
def close(self):
if self.ws:
self.ws.close()
# 使用
client = WebSocketClient("wss://ws.postman-echo.com/raw")
client.connect()
time.sleep(2)
client.send({"message": "Hello"})
time.sleep(1)
client.close()
websockets 库(asyncio 版本)
概念:websockets 是支持 asyncio 的 WebSocket 库,API 设计简洁,支持客户端和服务器端。推荐在新项目中使用。
pip install websockets
import asyncio
import websockets
async def client():
uri = "wss://ws.postman-echo.com/raw"
async with websockets.connect(uri) as ws:
# 发送消息
await ws.send("Hello")
print(f"发送: Hello")
# 接收消息
response = await ws.recv()
print(f"收到: {response}")
asyncio.run(client())
asyncio 服务器 WebSocket
概念:websockets 库支持创建 WebSocket 服务器,使用 async def 定义处理函数。每当有客户端连接时,处理函数会被调用,可通过 websocket 参数接收和发送消息。
import asyncio
import websockets
# websockets 10+ 版本:handler 只接收 websocket 参数
async def echo(websocket):
async for message in websocket:
print(f"收到: {message}")
await websocket.send(f"echo: {message}")
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("WebSocket 服务器启动: ws://localhost:8765")
await asyncio.Future() # 永久运行
asyncio.run(main())
4.12 asyncio 进阶
概念:取消与超时
asyncio 任务可以取消,设置超时以避免长时间阻塞。取消会触发 asyncio.CancelledError。
任务取消
概念:Task.cancel() 可以取消正在运行的协程。取消是协作式的,任务需要在 await 点检查是否被取消。被取消的任务会抛出 CancelledError。
import asyncio
async def long_task():
try:
print("任务开始")
await asyncio.sleep(10)
print("任务完成")
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(2)
print("取消任务")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务确实被取消了")
asyncio.run(main())
asyncio.timeout(Python 3.11+)
概念:asyncio.timeout 是 Python 3.11 引入的上下文管理器,用于为 await 操作设置超时。如果超时,会抛出 asyncio.TimeoutError,比 wait_for 更简洁。
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 方式1: asyncio.timeout
async with asyncio.timeout(3):
result = await slow_task()
print(result)
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
# 方式2: asyncio.wait_for
async def main2():
try:
result = await asyncio.wait_for(slow_task(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("等待超时")
asyncio.run(main2())
asyncio 异常处理
概念:asyncio 中的异常处理与同步代码类似,使用 try/except。gather 的 return_exceptions=True 参数可以将异常作为结果捕获,而不是抛出。
import asyncio
async def faulty_task():
await asyncio.sleep(1)
raise ValueError("出错了")
async def main():
try:
await faulty_task()
except ValueError as e:
print(f"捕获异常: {e}")
asyncio.run(main())
# gather 的异常处理
async def main2():
try:
results = await asyncio.gather(
faulty_task(),
asyncio.sleep(2),
return_exceptions=True # 捕获异常而不是抛出
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 异常: {result}")
else:
print(f"任务 {i} 结果: {result}")
except Exception as e:
print(f"整体异常: {e}")
asyncio.run(main2())
asyncio.Future
概念:Future 是一个特殊的对象,代表一个异步操作的最终结果。不同于 Task,Future 可以由开发者手动创建和设置结果,常用于将回调式 API 转换为 async/await 风格。
import asyncio
async def main():
# Future: 代表一个异步操作的最终结果
future = asyncio.get_running_loop().create_future() # 推荐使用 get_running_loop()
async def set_result():
await asyncio.sleep(1)
future.set_result("完成")
asyncio.create_task(set_result())
result = await future
print(f"Future 结果: {result}")
asyncio.run(main())
4.13 HTTP/2 简介
概念:HTTP/2 的优势
HTTP/2 是 HTTP 协议的第二个主要版本,相比 HTTP/1.1 有以下优势:
- 多路复用:单个连接并行处理多个请求
- 头部压缩:减少重复的头部数据传输
- 服务器推送:服务器主动推送资源
- 二进制分帧:更高效的传输格式
h2 库
概念:h2 是 Python 的 HTTP/2 协议实现库,提供了连接管理、帧处理等功能。实际使用中通常不需要直接操作 h2,推荐使用 httpx 库(支持 HTTP/2)来发送请求。
pip install httpx[http2] # 安装 httpx 并启用 HTTP/2 支持
# 推荐方式:使用 httpx 发送 HTTP/2 请求
import httpx
# 同步 HTTP/2 客户端
with httpx.Client(http2=True) as client:
response = client.get("https://www.google.com/")
print(response.http_version) # HTTP/2
# 异步 HTTP/2 客户端
async def async_http2():
async with httpx.AsyncClient(http2=True) as client:
response = await client.get("https://www.google.com/")
print(response.http_version)
pip install h2 # 底层 HTTP/2 协议库(一般不需要直接使用)
import h2
import h2.connection
import h2.events
def create_h2_connection():
conn = h2.connection.H2Connection(config=h2.config.H2Configuration(client_side=True))
conn.initiate_connection()
return conn
def handle_response(response_headers, response_body):
print("响应头:", dict(response_headers))
print("响应体:", response_body.decode())
# HTTP/2 请求
import socket
def http2_request(host, port=443):
conn = create_h2_connection()
# 发送请求
stream_id = conn.get_next_available_stream_id()
conn.send_headers(
stream_id,
[
(":method", "GET"),
(":scheme", "https"),
(":authority", host),
(":path", "/"),
]
)
conn.end_stream(stream_id)
# 序列化并发送
data = conn.data_to_send()
# 使用 socket 发送(实际使用时需要 TLS)
# s = socket.socket()
# s.connect((host, port))
# s.sendall(data)
return conn
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)