第4步:异步编程与网络基础

4.1 同步与异步编程

概念:什么是同步与异步?

  • 同步(Synchronous):代码按顺序执行,每个操作必须等待前一个操作完成后才能开始。简单直观,但效率较低。
  • 异步(Asynchronous):操作发起后不等待结果返回,继续执行后续代码,通过回调、事件或协程机制处理结果。效率高,但编程模型复杂。

同步如同排队点餐,必须等前一个人完成才能点;异步如同取号点餐,号到了再处理。

同步 vs 异步代码对比

# 同步代码
import time

def sync_task():
    print("开始任务1")
    time.sleep(2)  # 等待2秒
    print("任务1完成")

def sync_main():
    sync_task()
    sync_task()  # 必须等第一个完成
    print("全部完成")  # 4秒后

# 异步代码
import asyncio

async def async_task():
    print("开始任务1")
    await asyncio.sleep(2)  # 模拟异步IO
    print("任务1完成")

async def async_main():
    await asyncio.gather(
        async_task(),
        async_task()
    )
    print("全部完成")  # 约2秒后(并行执行)

# asyncio.run(async_main())

阻塞 vs 非阻塞

  • 阻塞(Blocking):调用函数时,当前线程被挂起,等待操作完成
  • 非阻塞(Non-blocking):调用函数时立即返回,不等待操作完成

并发 vs 并行

  • 并发(Concurrency):多个任务交替执行,通过快速切换让人感觉同时执行
  • 并行(Parallelism):多个任务真正同时执行,需要多核 CPU
# 并发:单线程中协程交替执行(asyncio)
async def task1():
    await asyncio.sleep(1)
async def task2():
    await asyncio.sleep(1)
# asyncio.gather(task1(), task2())  # 单线程并发

# 并行:多核 CPU 同时执行(多进程)
import concurrent.futures
def cpu_task(n):
    return sum(i * i for i in range(n))
with concurrent.futures.ProcessPoolExecutor() as executor:
    results = list(executor.map(cpu_task, [10**6, 10**6]))

4.2 异步编程 asyncio

概念:asyncio 的作用

asyncio 是 Python 标准库中用于编写异步代码的框架,基于协程(Coroutine)和事件循环(Event Loop)。它允许单个线程同时处理多个 I/O 密集型任务,如网络请求、文件读写等。

事件循环

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 推荐方式(Python 3.7+):自动管理事件循环
asyncio.run(main())

# 旧式手动管理(不推荐,仅了解)
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(main())
# loop.close()

async/await 语法

import asyncio

# async def 定义协程函数
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # await 等待另一个协程
    print("World")

# 调用协程不会立即执行
coro = say_hello()

# 必须用事件循环运行
asyncio.run(coro)

# await 只能在 async 函数中使用
async def main():
    await say_hello()  # 等待协程完成

asyncio.run(main())

协程函数 vs 普通函数

def regular_func():
    return "普通函数"

async def async_func():
    return "协程函数"

# 调用方式不同
print(regular_func())              # "普通函数"(直接返回结果)
print(async_func())                # <coroutine object async_func at ...>(返回协程对象)
result = asyncio.run(async_func()) # 需要事件循环运行
print(result)                      # "协程函数"

awaitable 对象

以下类型都可以被 await:

  • 协程(coroutine)
  • Task
  • Future
  • 支持 await 协议的对象
import asyncio

async def task():
    return "result"

# 协程
asyncio.run(task())

# Task:用于并发调度协程
async def main():
    task1 = asyncio.create_task(task())
    task2 = asyncio.create_task(task())
    results = await asyncio.gather(task1, task2)
    print(results)  # ['result', 'result']

asyncio.run(main())

4.3 并发任务管理

概念:并发任务管理

在异步编程中,并发任务管理是指如何同时运行多个协程、协调它们的执行顺序、处理它们的结果。asyncio 提供了多种工具来创建、管理和等待协程任务,如 gather、create_task、wait 等。

gather - 并发执行多个协程

概念:gather 用于并发执行多个协程,并收集它们的结果。它会等待所有协程完成,返回结果列表。如果任一协程抛出异常,gather 默认会立即将第一个异常传播给调用者(其他协程仍会继续运行)。使用 return_exceptions=True 可将异常作为结果返回而非抛出。

import asyncio

async def fetch_data(id):
    print(f"任务 {id} 开始")
    await asyncio.sleep(1)
    print(f"任务 {id} 完成")
    return f"数据 {id}"

async def main():
    # 并发执行,返回结果列表
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    )
    print(results)  # ['数据 1', '数据 2', '数据 3']

asyncio.run(main())
# 总耗时约1秒(并行),而不是3秒(串行)

create_task - 创建任务

概念:create_task 用于将协程包装为 Task 对象并立即调度执行。相比直接 await 协程,create_task 不会阻塞,可以同时创建多个任务并发运行。

import asyncio

async def task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return name

async def main():
    # 创建任务(立即调度)
    t1 = asyncio.create_task(task("A", 2))
    t2 = asyncio.create_task(task("B", 1))

    print("任务已创建")

    # 等待所有任务
    results = await asyncio.gather(t1, t2)
    print(results)

asyncio.run(main())

wait - 等待任务完成

概念:wait 用于等待一组任务完成,返回两个集合:已完成的任务和待定(未完成)的任务。它支持 timeout 参数,可以设置最大等待时间。

import asyncio

async def task(name):
    await asyncio.sleep(1)
    return name

async def main():
    # 创建多个任务
    tasks = [asyncio.create_task(task(i)) for i in range(5)]

    # 等待完成(可设置超时)
    done, pending = await asyncio.wait(tasks, timeout=2)

    print(f"完成: {len(done)}, 待定: {len(pending)}")
    for t in done:
        print(await t)

asyncio.run(main())

as_completed - 按完成顺序处理

概念:as_completed 返回一个迭代器,按任务实际完成的顺序 yield Future 对象。与 gather 不同,gather 是按创建顺序返回结果,as_completed 是按完成顺序。

import asyncio

async def task(id):
    await asyncio.sleep(id)
    return id

async def main():
    tasks = [task(3), task(1), task(2)]

    # 按完成顺序返回
    for future in asyncio.as_completed(tasks):
        result = await future
        print(f"完成: {result}")

# 输出顺序: 1, 2, 3(按实际完成时间)

gather 的并发控制

概念asyncio.gather 本身没有限制并发数量的参数,会同时调度所有传入的协程。如果需要限制并发数量,可以使用 asyncio.Semaphore(信号量)来控制同时执行的任务数量。

import asyncio

async def fetch_data(id, semaphore):
    async with semaphore:
        # 只有获得信号量的任务才能执行
        await asyncio.sleep(1)
        return f"数据 {id}"

async def main():
    # 限制最多 5 个并发
    semaphore = asyncio.Semaphore(5)

    # 100 个任务,但同时最多只有 5 个在执行
    tasks = [fetch_data(i, semaphore) for i in range(100)]
    results = await asyncio.gather(*tasks)

asyncio.run(main())
# 另一种方式:分批执行
async def fetch_data(id):
    await asyncio.sleep(1)
    return f"数据 {id}"

async def main():
    all_tasks = [fetch_data(i) for i in range(100)]
    batch_size = 5

    # 每批 5 个,逐批执行
    for i in range(0, len(all_tasks), batch_size):
        batch = all_tasks[i:i + batch_size]
        await asyncio.gather(*batch)

4.4 异步上下文管理器与迭代器

概念:异步上下文管理器与迭代器

异步上下文管理器用于异步资源的管理,类似同步的 with 语句,但支持 await 操作。异步迭代器则允许在 for 循环中使用 await,适合遍历异步生成的数据流。它们是 asyncio 编程中处理资源清理和数据流的重要工具。

异步上下文管理器

概念:异步上下文管理器通过 async with 语句使用,实现 __aenter____aexit__ 方法来管理异步资源,如数据库连接、网络连接等。

import asyncio

# async with 用于异步资源管理
class AsyncResource:
    async def __aenter__(self):
        print("获取资源")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源")
        await asyncio.sleep(0.1)

async def main():
    async with AsyncResource() as resource:
        print("使用资源")
        await asyncio.sleep(1)

asyncio.run(main())

contextlib 异步支持

概念:contextlib 的 @asynccontextmanager 装饰器可以将一个异步生成器函数转换为异步上下文管理器,使资源管理代码更简洁。

import asyncio
import contextlib

@contextlib.asynccontextmanager
async def managed_resource():
    print("获取")
    yield "resource"
    print("释放")

async def main():
    async with managed_resource() as r:
        print(f"使用 {r}")

asyncio.run(main())

异步迭代器

概念:异步迭代器实现了 __aiter____anext__ 方法,可以在迭代过程中使用 await。适用于需要异步获取数据的场景。

import asyncio

# 异步迭代器:实现 __aiter__ 和 __anext__
class AsyncCounter:
    def __init__(self, max):
        self.max = max
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current < self.max:
            result = self.current
            self.current += 1
            return result
        raise StopAsyncIteration

async def main():
    async for i in AsyncCounter(5):
        print(i)

asyncio.run(main())

异步生成器

import asyncio

# 异步生成器:使用 yield 的 async 函数
async def async_gen(n):
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for i in async_gen(5):
        print(i)

asyncio.run(main())

# 列表推导式
result = [i async for i in async_gen(5)]
print(result)  # [0, 1, 2, 3, 4]

4.5 异步队列与信号量

概念:异步队列与信号量

异步队列(Queue)用于协程之间的数据传递,类似生产者-消费者模式。信号量(Semaphore)用于控制同时访问资源的协程数量,避免资源竞争。事件(Event)和条件(Condition)则用于协程之间的同步和通信。

asyncio.Queue - 异步队列

概念:asyncio.Queue 是协程安全的队列实现,用于在协程之间传递数据。支持 FIFO 队列、优先级队列和 LIFO 队列(堆栈)。

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f"生产: {i}")
        await asyncio.sleep(0.5)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"消费: {item}")
        await asyncio.sleep(1)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # 并发运行生产者和消费者
    await asyncio.gather(
        producer(queue),
        consumer(queue)
    )

asyncio.run(main())

队列方法

概念:asyncio.Queue 提供了 put、get、task_done、join 等方法。put/get 是异步的,会自动处理阻塞;join 阻塞直到队列被清空。

import asyncio

async def main():
    queue = asyncio.Queue(maxsize=2)

    # put - 添加元素(满时阻塞)
    await queue.put(1)
    await queue.put(2)
    print(queue.full())  # True

    # get - 获取元素(空时阻塞)
    item = await queue.get()
    print(item)  # 1

    # 工具方法
    print(queue.qsize())      # 队列大小
    print(queue.empty())       # 是否为空
    print(queue.full())        # 是否已满

    # task_done - 标记任务完成(必须在 get 之后、join 之前调用)
    queue.task_done()

    # join - 等待队列清空(所有 get 的元素都被 task_done 标记)
    await queue.join()

asyncio.Semaphore - 信号量

概念:Semaphore 是一种计数器,用于控制同时访问某个资源的协程数量。常用于限制并发连接数、数据库连接池大小等。

import asyncio

# 信号量:控制并发数量
semaphore = asyncio.Semaphore(2)

async def task(id):
    async with semaphore:
        print(f"任务 {id} 开始")
        await asyncio.sleep(1)
        print(f"任务 {id} 完成")

async def main():
    tasks = [task(i) for i in range(5)]
    await asyncio.gather(*tasks)

# 只有2个任务同时执行
asyncio.run(main())

asyncio.Event - 事件

概念:Event 用于协程之间的通知机制。一个协程可以等待事件被设置(set),另一个协程可以触发事件(set)。事件被设置后,所有等待的协程都会恢复执行。

import asyncio

async def waiter(event):
    print("等待事件...")
    await event.wait()  # 阻塞直到事件被设置
    print("事件已触发!")

async def setter(event):
    await asyncio.sleep(2)
    event.set()  # 触发事件

async def main():
    event = asyncio.Event()  # 在协程内创建,避免跨事件循环问题
    await asyncio.gather(waiter(event), setter(event))

asyncio.run(main())

asyncio.Condition - 条件

概念:Condition 是 Event 的扩展,允许协程等待某个条件变为真。与 Event 不同,Condition 需要配合 async with 使用,并且支持 notify 唤醒指定数量的等待协程。

import asyncio

async def consumer(condition):
    async with condition:
        await condition.wait()  # 等待条件
        print("消费者: 收到通知")

async def producer(condition):
    await asyncio.sleep(1)
    async with condition:
        condition.notify()  # 通知等待者

async def main():
    condition = asyncio.Condition()  # 在协程内创建
    await asyncio.gather(consumer(condition), producer(condition))

asyncio.run(main())

4.6 HTTP 协议基础

概念:HTTP 是什么?

HTTP(HyperText Transfer Protocol)是万维网的数据传输协议,基于请求-响应模式。客户端发送请求,服务器返回响应。HTTP/1.1 是目前最广泛使用的版本,HTTP/2 和 HTTP/3 正在逐步普及。

HTTP 请求结构

请求方法 URL路径 HTTP版本
Header1: value1
Header2: value2

请求体(POST/PUT)
GET /index.html HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0
Accept: text/html

(GET 没有请求体)

HTTP 响应结构

HTTP版本 状态码 状态描述
Header1: value1
Header2: value2

响应体
HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: 1234

<html>...</html>

请求方法

方法 用途 幂等 安全性
GET 获取资源
POST 创建资源
PUT 更新资源(整体)
PATCH 部分更新
DELETE 删除资源
HEAD 获取头部
OPTIONS 获取支持的 methods

状态码

类别 范围 含义
1xx 100-199 信息性响应
2xx 200-299 成功(200 OK, 201 Created, 204 No Content)
3xx 300-399 重定向(301 Moved, 304 Not Modified)
4xx 400-499 客户端错误(400 Bad Request, 401 Unauthorized, 403 Forbidden, 404 Not Found)
5xx 500-599 服务器错误(500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable)

请求头与响应头

# 常用请求头
GET /api/users HTTP/1.1
Host: api.example.com
Authorization: Bearer token123
Content-Type: application/json
Accept: application/json
User-Agent: MyApp/1.0
Cookie: session_id=abc123

# 常用响应头
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 256
Cache-Control: max-age=3600
Set-Cookie: session=xyz; HttpOnly
Access-Control-Allow-Origin: *

4.7 HTTP 请求与响应

概念:HTTP 请求与响应

HTTP 请求是客户端向服务器发送的数据包,包含请求方法、URL、头部和可选的请求体。HTTP 响应是服务器返回给客户端的数据包,包含状态码、头部和响应体。Python 提供了 urllib 和 requests 等库来发送 HTTP 请求和处理响应。

urllib 发送请求

概念:urllib 是 Python 标准库,包含 urlopen 函数用于发送 HTTP 请求。适合简单的 GET/POST 请求,但 API 相对底层。

from urllib import request, parse

# GET 请求
with request.urlopen("https://httpbin.org/get") as response:
    print(response.status)  # 200
    print(response.read().decode())

# 带查询参数
params = parse.urlencode({"page": 1, "limit": 10})
url = f"https://httpbin.org/get?{params}"
with request.urlopen(url) as response:
    data = response.read().decode()

# POST 请求
data = parse.urlencode({"name": "Alice", "age": 25}).encode()
with request.urlopen("https://httpbin.org/post", data=data) as response:
    print(response.read().decode())

# 设置请求头
req = request.Request(
    "https://httpbin.org/headers",
    headers={
        "User-Agent": "MyApp/1.0",
        "Authorization": "Bearer token123"
    },
    method="GET"
)
with request.urlopen(req) as response:
    print(response.read().decode())

requests 库(更推荐)

概念:requests 是目前最流行的 HTTP 库,API 设计人性化,支持会话、Cookie、认证等高级功能。建议日常使用 requests 而不是 urllib。

pip install requests
import requests

# GET 请求
response = requests.get("https://httpbin.org/get")
print(response.status_code)
print(response.json())  # 自动解析 JSON

# 带参数
params = {"page": 1, "limit": 10}
response = requests.get("https://httpbin.org/get", params=params)

# POST 请求
data = {"name": "Alice", "age": 25}
response = requests.post("https://httpbin.org/post", json=data)

# 设置请求头
headers = {"Authorization": "Bearer token123"}
response = requests.get("https://httpbin.org/headers", headers=headers)

# 处理响应
print(response.text)     # 原始文本
print(response.json())   # JSON 数据
print(response.headers)  # 响应头
print(response.cookies)  # Cookies

requests 进阶用法

概念:requests 提供了丰富的高级功能,包括文件上传下载、会话管理、自动处理编码、超时控制、重试机制等。

import requests

# 上传文件
with open("file.txt", "rb") as f:
    files = {"file": f}
    response = requests.post("https://httpbin.org/post", files=files)

# 下载文件
response = requests.get("https://httpbin.org/image", stream=True)
with open("image.png", "wb") as f:
    for chunk in response.iter_content(1024):
        f.write(chunk)

# 设置超时
response = requests.get("https://httpbin.org/delay/5", timeout=3)

# 处理错误
try:
    response = requests.get("https://httpbin.org/status/404")
    response.raise_for_status()
except requests.HTTPError as e:
    print(f"HTTP 错误: {e}")

# Session(保持 Cookie)
session = requests.Session()
session.headers.update({"Authorization": "Bearer token"})
response = session.get("https://httpbin.org/headers")

4.8 RESTful API 设计

概念:什么是 REST?

REST(Representational State Transfer)是一种软件架构风格,用于设计网络应用程序的 API。RESTful API 基于 HTTP 协议,使用标准的请求方法(GET/POST/PUT/DELETE)对资源进行操作。

RESTful 设计原则

概念:RESTful 设计原则包括:使用 HTTP 方法表达操作、使用名词表示资源、使用复数形式、使用 HTTP 状态码、保持无状态。这些原则使 API 更加清晰、一致和易于理解。

# 资源设计
# 用户资源
GET    /users          # 获取用户列表
GET    /users/{id}     # 获取单个用户
POST   /users          # 创建用户
PUT    /users/{id}     # 更新用户(整体)
PATCH  /users/{id}     # 部分更新用户
DELETE /users/{id}     # 删除用户

# 订单资源(嵌套)
GET    /users/{id}/orders        # 获取用户的订单
GET    /orders/{id}              # 获取单个订单
POST   /users/{id}/orders        # 为用户创建订单
DELETE /orders/{id}              # 删除订单

请求与响应格式

概念:RESTful API 的请求体通常使用 JSON 格式,响应也返回 JSON。成功的创建操作返回 201 Created,更新操作返回 200 OK 或 204 No Content,删除操作返回 204 No Content。

# 创建用户 POST /users
# 请求体
{
    "name": "Alice",
    "email": "alice@example.com",
    "password": "secret123"
}

# 响应 201 Created
{
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com",
    "created_at": "2024-01-15T10:30:00Z"
}

# 错误响应
# 响应 400 Bad Request
{
    "error": "Validation Error",
    "message": "邮箱格式不正确",
    "details": {
        "email": "无效的邮箱格式"
    }
}

过滤、排序、分页

概念:RESTful API 通过查询参数实现资源过滤、排序和分页。过滤使用 field=value 格式,排序使用 sort=field&order=desc,常见分页方式有 page/per_page 和 offset/limit 两种。

# 过滤
GET /users?status=active&role=admin

# 排序
GET /users?sort=created_at&order=desc

# 分页
GET /users?page=2&per_page=20

# 响应包含分页信息
{
    "data": [...],
    "pagination": {
        "page": 2,
        "per_page": 20,
        "total": 100,
        "total_pages": 5
    }
}

4.9 TCP/UDP Socket 基础

概念:TCP vs UDP

  • TCP(Transmission Control Protocol):面向连接的协议,提供可靠的数据传输。适合需要保证数据完整性的场景(如 HTTP、邮件)。
  • UDP(User Datagram Protocol):无连接的协议,不保证可靠性,但速度快。适合实时性要求高的场景(如视频流、DNS)。

TCP Socket 服务器

概念:TCP 是面向连接的协议,服务器需要先监听端口(listen),然后接受客户端连接(accept)。每个客户端连接都会创建一个独立的 socket 进行通信。

import socket

def start_tcp_server(host="127.0.0.1", port=8888):
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)

    print(f"服务器启动: {host}:{port}")

    while True:
        client_socket, address = server_socket.accept()
        print(f"客户端连接: {address}")

        # 处理请求
        data = client_socket.recv(1024)
        print(f"收到数据: {data.decode()}")

        # 发送响应
        response = "Hello from server"
        client_socket.send(response.encode())

        client_socket.close()

start_tcp_server()

TCP Socket 客户端

概念:TCP 客户端通过 connect 方法连接到服务器,然后使用 send/recv 进行数据收发。客户端不需要监听端口,直接与服务端建立连接。

import socket

def tcp_client():
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect(("127.0.0.1", 8888))

    # 发送数据
    message = "Hello from client"
    client_socket.send(message.encode())

    # 接收响应
    response = client_socket.recv(1024)
    print(f"收到响应: {response.decode()}")

    client_socket.close()

tcp_client()

UDP Socket

概念:UDP 是无连接协议,服务器不需要 accept 客户端连接,客户端也不需要 connect。数据以数据报(datagram)为单位发送,直接发送到目标地址。

import socket

# UDP 服务器
def udp_server(host="127.0.0.1", port=8888):
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.bind((host, port))

    print(f"UDP 服务器启动: {host}:{port}")

    while True:
        data, address = server_socket.recvfrom(1024)
        print(f"收到 {address}: {data.decode()}")

        response = "Message received"
        server_socket.sendto(response.encode(), address)

# UDP 客户端
def udp_client():
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    message = "Hello UDP"
    client_socket.sendto(message.encode(), ("127.0.0.1", 8888))

    data, server = client_socket.recvfrom(1024)
    print(f"收到响应: {data.decode()}")

    client_socket.close()

异步 Socket(asyncio)

import asyncio

async def handle_client(reader, writer):
    address = writer.get_extra_info('peername')
    print(f"客户端连接: {address}")

    data = await reader.read(1024)
    message = data.decode()
    print(f"收到: {message}")

    response = "Hello from async server"
    writer.write(response.encode())
    await writer.drain()

    writer.close()
    await writer.wait_closed()  # Python 3.7+ 需要等待关闭完成
    print(f"客户端断开: {address}")

async def tcp_server():
    server = await asyncio.start_server(
        handle_client, "127.0.0.1", 8888
    )
    print("异步服务器启动")
    async with server:
        await server.serve_forever()

asyncio.run(tcp_server())

4.10 并发与并行进阶

概念:GIL 的影响

Python 的 GIL(Global Interpreter Lock)限制了同一时刻只有一个线程执行 Python 字节码。这使得 CPU 密集型任务无法通过多线程获得性能提升,但 I/O 密集型任务(如网络请求)仍能从异步编程中受益。

多进程 vs 多线程 vs 异步

概念:多进程(ProcessPoolExecutor)利用多核 CPU,适合 CPU 密集型任务。多线程(ThreadPoolExecutor)适合 I/O 密集型但需要阻塞等待的场景。异步(asyncio)适合高并发 I/O 密集型任务,单线程即可处理大量并发连接。

# CPU 密集型:使用多进程
from concurrent.futures import ProcessPoolExecutor

def cpu_task(n):
    return sum(i * i for i in range(n))

with ProcessPoolExecutor() as executor:
    results = list(executor.map(cpu_task, [10**6] * 8))

# I/O 密集型:使用异步
import asyncio

async def io_task(url):
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = ["https://example.com"] * 100
    results = await asyncio.gather(*[io_task(url) for url in urls])

asyncio.run(main())

# I/O 密集型:也可以使用多线程
from concurrent.futures import ThreadPoolExecutor
import requests

def io_task(url):
    return requests.get(url).text

with ThreadPoolExecutor() as executor:
    results = list(executor.map(io_task, urls))

aiohttp 异步 HTTP 客户端

pip install aiohttp
import aiohttp
import asyncio

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

urls = ["https://httpbin.org/delay/1"] * 10

async def main():
    import time
    start = time.time()
    results = await fetch_all(urls)
    print(f"耗时: {time.time() - start:.2f}s")  # 约1秒(并行)

asyncio.run(main())

concurrent.futures 并发

概念:concurrent.futures 提供了 ThreadPoolExecutor(线程池)和 ProcessPoolExecutor(进程池)。线程池适合 I/O 密集型任务,进程池可以绕过 GIL,适合 CPU 密集型任务。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

# 线程池:适合 I/O 密集型
def io_task(n):
    time.sleep(1)
    return n

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(io_task, range(10)))
    print(f"耗时: {len(results)} 秒")  # 约1秒

# 进程池:适合 CPU 密集型
def cpu_task(n):
    return sum(i * i for i in range(n))

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_task, range(100)))

4.11 WebSocket 通信

概念:WebSocket 的特点

WebSocket 是一种在单个 TCP 连接上提供全双工通信的协议。与 HTTP 不同,WebSocket 允许服务器主动向客户端推送数据,适合实时应用如聊天、游戏、实时协作等。

websocket-client 客户端

概念:websocket-client 是一个第三方 WebSocket 客户端库,基于线程实现。适合与现有同步代码集成,使用回调函数处理消息。

pip install websocket-client
import websocket
import json
import threading
import time

class WebSocketClient:
    def __init__(self, url):
        self.url = url
        self.ws = None
        self.connected = False

    def on_message(self, ws, message):
        print(f"收到消息: {message}")

    def on_error(self, ws, error):
        print(f"错误: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        print("连接关闭")
        self.connected = False

    def on_open(self, ws):
        print("连接打开")
        self.connected = True
        # 发送消息
        ws.send(json.dumps({"type": "hello", "data": "world"}))

    def connect(self):
        self.ws = websocket.WebSocketApp(
            self.url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            on_open=self.on_open
        )
        # 在新线程中运行
        thread = threading.Thread(target=self.ws.run_forever)
        thread.daemon = True
        thread.start()

    def send(self, data):
        if self.connected:
            self.ws.send(json.dumps(data))

    def close(self):
        if self.ws:
            self.ws.close()

# 使用
client = WebSocketClient("wss://ws.postman-echo.com/raw")
client.connect()
time.sleep(2)
client.send({"message": "Hello"})
time.sleep(1)
client.close()

websockets 库(asyncio 版本)

概念:websockets 是支持 asyncio 的 WebSocket 库,API 设计简洁,支持客户端和服务器端。推荐在新项目中使用。

pip install websockets
import asyncio
import websockets

async def client():
    uri = "wss://ws.postman-echo.com/raw"
    async with websockets.connect(uri) as ws:
        # 发送消息
        await ws.send("Hello")
        print(f"发送: Hello")

        # 接收消息
        response = await ws.recv()
        print(f"收到: {response}")

asyncio.run(client())

asyncio 服务器 WebSocket

概念:websockets 库支持创建 WebSocket 服务器,使用 async def 定义处理函数。每当有客户端连接时,处理函数会被调用,可通过 websocket 参数接收和发送消息。

import asyncio
import websockets

# websockets 10+ 版本:handler 只接收 websocket 参数
async def echo(websocket):
    async for message in websocket:
        print(f"收到: {message}")
        await websocket.send(f"echo: {message}")

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        print("WebSocket 服务器启动: ws://localhost:8765")
        await asyncio.Future()  # 永久运行

asyncio.run(main())

4.12 asyncio 进阶

概念:取消与超时

asyncio 任务可以取消,设置超时以避免长时间阻塞。取消会触发 asyncio.CancelledError。

任务取消

概念:Task.cancel() 可以取消正在运行的协程。取消是协作式的,任务需要在 await 点检查是否被取消。被取消的任务会抛出 CancelledError。

import asyncio

async def long_task():
    try:
        print("任务开始")
        await asyncio.sleep(10)
        print("任务完成")
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    task = asyncio.create_task(long_task())

    await asyncio.sleep(2)
    print("取消任务")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("任务确实被取消了")

asyncio.run(main())

asyncio.timeout(Python 3.11+)

概念:asyncio.timeout 是 Python 3.11 引入的上下文管理器,用于为 await 操作设置超时。如果超时,会抛出 asyncio.TimeoutError,比 wait_for 更简洁。

import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        # 方式1: asyncio.timeout
        async with asyncio.timeout(3):
            result = await slow_task()
            print(result)
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())

# 方式2: asyncio.wait_for
async def main2():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("等待超时")

asyncio.run(main2())

asyncio 异常处理

概念:asyncio 中的异常处理与同步代码类似,使用 try/except。gather 的 return_exceptions=True 参数可以将异常作为结果捕获,而不是抛出。

import asyncio

async def faulty_task():
    await asyncio.sleep(1)
    raise ValueError("出错了")

async def main():
    try:
        await faulty_task()
    except ValueError as e:
        print(f"捕获异常: {e}")

asyncio.run(main())

# gather 的异常处理
async def main2():
    try:
        results = await asyncio.gather(
            faulty_task(),
            asyncio.sleep(2),
            return_exceptions=True  # 捕获异常而不是抛出
        )
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 异常: {result}")
            else:
                print(f"任务 {i} 结果: {result}")
    except Exception as e:
        print(f"整体异常: {e}")

asyncio.run(main2())

asyncio.Future

概念:Future 是一个特殊的对象,代表一个异步操作的最终结果。不同于 Task,Future 可以由开发者手动创建和设置结果,常用于将回调式 API 转换为 async/await 风格。

import asyncio

async def main():
    # Future: 代表一个异步操作的最终结果
    future = asyncio.get_running_loop().create_future()  # 推荐使用 get_running_loop()

    async def set_result():
        await asyncio.sleep(1)
        future.set_result("完成")

    asyncio.create_task(set_result())
    result = await future
    print(f"Future 结果: {result}")

asyncio.run(main())

4.13 HTTP/2 简介

概念:HTTP/2 的优势

HTTP/2 是 HTTP 协议的第二个主要版本,相比 HTTP/1.1 有以下优势:

  • 多路复用:单个连接并行处理多个请求
  • 头部压缩:减少重复的头部数据传输
  • 服务器推送:服务器主动推送资源
  • 二进制分帧:更高效的传输格式

h2 库

概念:h2 是 Python 的 HTTP/2 协议实现库,提供了连接管理、帧处理等功能。实际使用中通常不需要直接操作 h2,推荐使用 httpx 库(支持 HTTP/2)来发送请求。

pip install httpx[http2]  # 安装 httpx 并启用 HTTP/2 支持
# 推荐方式:使用 httpx 发送 HTTP/2 请求
import httpx

# 同步 HTTP/2 客户端
with httpx.Client(http2=True) as client:
    response = client.get("https://www.google.com/")
    print(response.http_version)  # HTTP/2

# 异步 HTTP/2 客户端
async def async_http2():
    async with httpx.AsyncClient(http2=True) as client:
        response = await client.get("https://www.google.com/")
        print(response.http_version)
pip install h2  # 底层 HTTP/2 协议库(一般不需要直接使用)
import h2
import h2.connection
import h2.events

def create_h2_connection():
    conn = h2.connection.H2Connection(config=h2.config.H2Configuration(client_side=True))
    conn.initiate_connection()
    return conn

def handle_response(response_headers, response_body):
    print("响应头:", dict(response_headers))
    print("响应体:", response_body.decode())

# HTTP/2 请求
import socket

def http2_request(host, port=443):
    conn = create_h2_connection()

    # 发送请求
    stream_id = conn.get_next_available_stream_id()
    conn.send_headers(
        stream_id,
        [
            (":method", "GET"),
            (":scheme", "https"),
            (":authority", host),
            (":path", "/"),
        ]
    )
    conn.end_stream(stream_id)

    # 序列化并发送
    data = conn.data_to_send()

    # 使用 socket 发送(实际使用时需要 TLS)
    # s = socket.socket()
    # s.connect((host, port))
    # s.sendall(data)

    return conn
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐