并发新纪元:Python协程与异步编程完全解析

🔎大家好,我是ZTLJQ,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流
📝个人主页-ZTLJQ的主页
🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝📣系列果你对这个系列感兴趣的话
专栏 - Python从零到企业级应用:短时间成为市场抢手的程序员
✔说明⇢本人讲解主要包括Python爬虫、JS逆向、Python的企业级应用
如果你对这个系列感兴趣的话,可以关注订阅哟👋
从“等待”到“并行”
在传统的同步编程世界里,代码像一条单行道,顺序执行。一旦遇到一个耗时的操作,比如一个网络请求或文件读写,整个程序就会“卡住”,直到这个操作完成才继续。这对于需要处理大量I/O(输入/输出)操作的应用(如Web服务器、网络爬虫)来说,效率是极其低下的。
协程(Coroutine)与异步编程(Asynchronous Programming)为我们开辟了一条全新的道路。它不是让程序傻傻地等待,而是提供了一种“协作式”的方式,让程序在等待一个慢操作的时候,可以主动让出控制权去处理其他任务。这种方式在单个线程内实现了高并发,极大地提升了I/O密集型应用的吞吐量。
本篇博客将带你深入Python异步编程的核心——asyncio库,通过详细的理论讲解和丰富的实战案例,让你彻底掌握这门强大的技术。
第一部分:理解协程与异步编程
1.1 核心概念
- 协程 (Coroutine): 可以理解为一个“可暂停”的函数。它不像普通函数那样从头跑到尾,而是可以在某个点(
await)暂停执行,并在稍后恢复。它是异步编程的基础单元。 async def: 用于定义一个协程函数。调用这个函数并不会立即执行函数体,而是返回一个协程对象(Coroutine Object)。await: 关键字,用于等待另一个协程的完成。它只能在async def函数内部使用。await会暂停当前协程的执行,将控制权交还给事件循环,去执行其他准备就绪的协程,直到被等待的协程返回结果。- 事件循环 (Event Loop): 异步程序的“心脏”和“调度器”。它不断地运行,查找并执行那些已经准备好运行的协程。
asyncio库的核心就是事件循环。
1.2 协程 vs. 线程 vs. 进程
| 特性 | 普通函数 (同步) | 线程 (Threading) | 协程 (Asyncio) |
|---|---|---|---|
| 执行方式 | 顺序执行,遇到I/O就阻塞 | 操作系统调度,多线程并发 | 事件循环调度,协程协作式并发 |
| 资源开销 | 低 | 高 (线程创建、切换开销大) | 极低 (单线程内切换) |
| GIL影响 | 无 | 受GIL限制,无法真正并行CPU任务 | 无 (单线程,天然避开了GIL) |
| 适用场景 | CPU密集型、简单任务 | I/O密集型,但开销较大 | I/O密集型,高并发场景 (首选) |
第二部分:asyncio 核心与实战
asyncio是Python标准库中用于编写并发代码的模块,专为异步I/O、事件循环、协程和任务而设计。
2.1 实战案例:基础异步函数与并发执行
让我们从一个最简单的例子开始,感受异步的魅力。
basic_async_concurrency.py
import asyncio
import time
# 定义一个模拟耗时I/O操作的协程
async def fetch_data(data_id, delay):
print(f"任务 {data_id} 开始执行...")
# await asyncio.sleep(delay) 是一个非阻塞的延迟模拟
# 它会暂停当前协程,并允许事件循环去执行其他任务
await asyncio.sleep(delay)
print(f"任务 {data_id} 执行完成,耗时 {delay} 秒")
return f"结果_{data_id}"
# 主协程函数
async def main():
print("=== 并发执行多个任务 ===")
start_time = time.time()
# 创建三个协程对象
task1 = fetch_data("A", 2)
task2 = fetch_data("B", 1)
task3 = fetch_data("C", 3)
# asyncio.gather() 用于并发运行多个协程,并等待它们全部完成
# 它会返回一个包含所有结果的列表
results = await asyncio.gather(task1, task2, task3)
end_time = time.time()
print("\n所有任务的返回结果:")
for result in results:
print(f" - {result}")
print(f"\n总耗时: {end_time - start_time:.2f} 秒")
# 运行主协程
# asyncio.run() 会自动创建事件循环并运行main()
if __name__ == "__main__":
asyncio.run(main())
输出结果:
=== 并发执行多个任务 ===
任务 A 开始执行...
任务 B 开始执行...
任务 C 开始执行...
任务 B 执行完成,耗时 1 秒
任务 A 执行完成,耗时 2 秒
任务 C 执行完成,耗时 3 秒
所有任务的返回结果:
- 结果_A
- 结果_B
- 结果_C
总耗时: 3.01 秒
案例解析:
- 并发而非并行: 注意,程序总耗时约为3秒,而不是2+1+3=6秒。这是因为三个任务是并发执行的。当
task1(延时2秒)在await asyncio.sleep(2)时,事件循环会立即切换到执行task2和task3。asyncio.gather(): 这是并发执行多个协程的最常用方法之一。它接收多个协程作为参数,并发地启动它们,然后等待所有协程都完成。asyncio.run(): 这是Python 3.7+推荐的启动顶级异步程序的方式,它封装了事件循环的创建、启动和关闭过程。
2.2 实战案例:异步网络请求
网络请求是I/O密集型任务的典型代表。aiohttp是Python中最流行的异步HTTP客户端库。让我们看看如何用它实现高效的并发请求。
async_http_requests.py
import asyncio
import aiohttp
import time
# pip install aiohttp
API_ENDPOINTS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/json",
"https://httpbin.org/uuid"
]
async def fetch_url(session, url):
"""异步获取单个URL的数据"""
try:
print(f"正在请求 {url}...")
# async with 用于确保资源被正确释放
async with session.get(url) as response:
if response.status == 200:
data = await response.json() # 异步读取响应体
print(f"✅ 成功获取 {url}")
return {"url": url, "status": "success", "data_type": type(data).__name__}
else:
print(f"❌ 请求失败 {url}, 状态码: {response.status}")
return {"url": url, "status": "error", "code": response.status}
except Exception as e:
print(f"❌ 请求异常 {url}: {str(e)}")
return {"url": url, "status": "exception", "error": str(e)}
async def main():
start_time = time.time()
# 创建一个aiohttp客户端会话
# 会话可以复用底层TCP连接,提高效率
async with aiohttp.ClientSession() as session:
# 创建一个任务列表,每个任务都是一个fetch_url协程
tasks = [fetch_url(session, url) for url in API_ENDPOINTS]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
end_time = time.time()
print("\n--- 抓取结果汇总 ---")
for res in results:
print(f" - {res['url']:<35} | Status: {res['status']}")
print(f"\n并发请求总耗时: {end_time - start_time:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
案例解析:
aiohttp.ClientSession(): 一个异步的HTTP会话对象。使用async with语句可以确保会话在使用完毕后被正确关闭和清理。session.get(url): 这个操作本身返回一个协程,需要用await等待。async with确保响应对象被正确处理。response.json(): 读取JSON响应体也是一个I/O操作,所以同样需要用await等待。- 性能对比: 如果用同步的
requests库逐个请求这5个URL,总耗时将是1+2+1+0.1+0.1≈4.2秒以上。而异步版本利用了并发,总耗时约为2秒(由最慢的delay/2决定),效率提升了超过一倍。
第三部分:进阶应用 - 生产者消费者模式
在处理大量数据流时,生产者-消费者模式是一种非常经典的设计。我们可以使用asyncio.Queue来实现一个高效的异步管道。
3.1 实战案例:异步数据处理管道
设想一个场景:我们有一批URL需要抓取,抓取后需要对数据进行处理。抓取和处理是两个独立的步骤,它们的速度可能不匹配。我们希望抓取器能持续工作,而处理器能从一个缓冲区(队列)中拿数据处理。
async_producer_consumer.py
import asyncio
import aiohttp
import time
import random
async def producer(queue, urls):
"""生产者:抓取URL数据,并放入队列"""
print("[Producer] 开始抓取数据...")
async with aiohttp.ClientSession() as session:
for url in urls:
print(f" [Producer] 正在抓取 {url}...")
async with session.get(url) as response:
raw_data = await response.text()
# 将抓取到的数据(或其他标识)放入队列
await queue.put({"url": url, "data": raw_data[:100]}) # 只存前100个字符
print(f" [Producer] {url} 的数据已放入队列")
# 模拟抓取间隔,避免过于频繁
await asyncio.sleep(random.uniform(0.1, 0.3))
# 生产完毕,放入一个哨兵信号通知消费者停止
await queue.put(None)
print("[Producer] 抓取任务完成")
async def consumer(queue, worker_id):
"""消费者:从队列取出数据,进行处理"""
print(f"[Consumer-{worker_id}] 开始处理数据...")
while True:
# 从队列中获取数据
item = await queue.get()
# 如果收到哨兵信号,则退出
if item is None:
print(f"[Consumer-{worker_id}] 收到结束信号,准备退出")
# 重新放回哨兵,以便其他消费者也能收到
await queue.put(None)
break
url = item['url']
data_preview = item['data']
print(f" [Consumer-{worker_id}] 正在处理 {url} 的数据...")
# 模拟数据处理耗时
await asyncio.sleep(random.uniform(0.5, 1.0))
print(f" [Consumer-{worker_id}] 处理完成 {url}, 数据预览: '{data_preview}...'")
async def main():
urls = [
"https://httpbin.org/bytes/100",
"https://httpbin.org/bytes/200",
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/robots.txt",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
]
# 创建一个队列,用于生产者和消费者之间传递数据
# maxsize参数可以限制队列大小,防止内存溢出
queue = asyncio.Queue(maxsize=2)
# 创建生产者任务
producer_task = asyncio.create_task(producer(queue, urls))
# 创建多个消费者任务,模拟并发处理
consumer_tasks = []
for i in range(2): # 创建2个消费者
consumer_task = asyncio.create_task(consumer(queue, i+1))
consumer_tasks.append(consumer_task)
# 等待生产者和所有消费者任务完成
await producer_task
await asyncio.gather(*consumer_tasks)
print("\n所有生产者和消费者任务均已结束!")
if __name__ == "__main__":
start = time.time()
asyncio.run(main())
end = time.time()
print(f"\n管道总耗时: {end - start:.2f} 秒")
案例解析:
asyncio.Queue: 一个线程安全、协程安全的异步队列。生产者向队列put数据,消费者从队列get数据。maxsize参数可以实现背压(backpressure),防止生产者过快导致内存撑爆。asyncio.create_task(): 将一个协程包装成一个任务(Task),使其可以被事件循环并发调度。- 模式优势: 生产者可以持续抓取数据而不必等待处理完成,消费者则可以从容处理队列中的数据。两者解耦,各自按自己的节奏运行,最大化了整体吞吐量。
第四部分:异步与同步的交互
有时候,我们不可避免地需要在异步代码中调用一些旧的、同步的阻塞函数(如某些计算库或文件操作)。直接调用它们会阻塞整个事件循环。asyncio提供了run_in_executor来解决这个问题,它会将同步函数放到一个线程池或进程池中执行,避免阻塞事件循环。
async_with_sync.py
import asyncio
import time
def blocking_cpu_task(n):
"""一个耗时的同步CPU任务"""
print(f"CPU任务开始 (n={n})...")
total = 0
for i in range(n):
total += i * i
print(f"CPU任务完成,结果: {total}")
return total
async def main():
print("主协程开始...")
# 模拟一些异步工作
await asyncio.sleep(1)
print("异步工作1完成")
# 将同步任务放到默认的线程池中执行
# 这样就不会阻塞事件循环
sync_result = await asyncio.get_event_loop().run_in_executor(
None, # None表示使用默认的ThreadPoolExecutor
blocking_cpu_task, # 要执行的函数
5000000 # 函数的参数
)
print(f"从同步任务得到结果: {sync_result}")
await asyncio.sleep(1)
print("异步工作2完成")
print("主协程结束")
if __name__ == "__main__":
asyncio.run(main())
第五部分:最佳实践与注意事项
- 区分场景: 异步编程主要用于I/O密集型任务。对于纯粹的CPU密集型任务,由于Python GIL的存在,多进程(
multiprocessing)通常是更好的选择。 - 避免阻塞: 绝对不要在
async def函数中调用任何会阻塞线程的同步函数(如time.sleep()或requests.get()),这会使整个事件循环停滞。必须使用对应的异步版本(如await asyncio.sleep()或aiohttp)。 - 错误处理: 异步代码的错误处理同样重要。务必使用
try...except来捕获await操作可能引发的异常。 - 资源管理: 始终使用
async with语句来管理需要异步初始化和清理的资源,如aiohttp.ClientSession、数据库连接等,确保资源被正确释放。
结语
Python的异步编程模型,通过asyncio库和async/await关键字,为我们提供了一种强大而优雅的方式来处理高并发的I/O密集型任务。它能在单线程内实现惊人的吞吐量,是构建高性能网络应用、API客户端和数据采集工具的利器。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)