并发的艺术:Python异步编程与网络交互完全解析
🔎大家好,我是ZTLJQ,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流
📝个人主页-ZTLJQ的主页
🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝📣系列果你对这个系列感兴趣的话
专栏 - Python从零到企业级应用:短时间成为市场抢手的程序员
✔说明⇢本人讲解主要包括Python爬虫、JS逆向、Python的企业级应用
如果你对这个系列感兴趣的话,可以关注订阅哟👋
告别“等待”的烦恼
在传统的同步编程模式下,程序是一条道走到黑,遇到一个耗时操作(如网络请求、文件读写、数据库查询),整个线程就会停下来等待它完成,才能继续执行下一步。这种“阻塞”模式在面对大量I/O密集型任务时,效率极低,资源浪费严重。
异步编程(Asynchronous Programming)为我们提供了一种全新的思路。它允许程序在等待一个慢速操作的同时,去处理其他任务,从而极大地提高I/O密集型应用的吞吐量和响应速度。在Python中,asyncio库和async/await关键字是实现异步编程的核心。
本篇博客将带你从理论到实践,深入理解Python的异步机制,并通过实际案例,学会如何进行高效的异步网络交互。
第一部分:异步编程核心概念
1.1 同步 vs. 异步 vs. 多线程
- 同步 (Synchronous): 顺序执行,一步接一步。
A -> B -> C,B必须等A完成。 - 多线程 (Multi-threading): 启动多个线程,操作系统调度它们。优点是利用多核,缺点是线程间切换开销大,且有GIL限制(Python中)。
Thread A (A1->B1), Thread B (A2->B2) - 异步 (Asynchronous): 在单个线程内,通过事件循环(Event Loop)管理多个任务。当一个任务需要等待时,控制权交还给事件循环,去执行其他就绪的任务。
Event Loop: Task A (wait), Task B (run), Task C (run) -> Task A (resume)
1.2 核心组件
async def: 定义一个协程(Coroutine)函数。调用此函数并不会立即执行,而是返回一个协程对象。await: 用于等待另一个协程的完成。它只能在async def函数内部使用。await会暂停当前协程的执行,直到等待的协程结束,并将结果返回。- 事件循环 (Event Loop): 异步程序的运行核心。它不断地循环,查找并执行那些已经准备好运行的协程。
第二部分:asyncio 库详解
asyncio是Python标准库中用于编写并发代码的模块,专门为异步I/O、事件循环、协程和任务而设计。
2.1 实战案例:基础异步函数与事件循环
让我们从最简单的例子开始,理解async和await的运作机制。
basic_async.py
import asyncio
import time
# 1. 定义一个模拟耗时I/O操作的异步函数
async def fetch_data(url, delay):
print(f"开始请求 {url}")
# await asyncio.sleep(delay) 是一个非阻塞的延迟模拟
# 它会让出控制权,允许事件循环执行其他任务
await asyncio.sleep(delay)
print(f"收到 {url} 的响应")
return f"数据来自 {url}"
# 2. 主协程函数,用来安排和等待其他协程
async def main():
start_time = time.time()
# 3. 创建三个协程对象,但并未执行
task1 = fetch_data("http://example.com", 2)
task2 = fetch_data("http://google.com", 1)
task3 = fetch_data("http://github.com", 3)
# 4. 并发执行所有任务,并等待它们全部完成
# asyncio.gather() 是一个非常有用的函数,用于并发运行多个协程
results = await asyncio.gather(task1, task2, task3)
end_time = time.time()
print("\n所有任务已完成:")
for result in results:
print(result)
print(f"\n总耗时: {end_time - start_time:.2f} 秒")
# 5. 运行主协程
# asyncio.run() 会自动创建一个事件循环,并运行main()协程
if __name__ == "__main__":
asyncio.run(main())
输出结果:
开始请求 http://example.com
开始请求 http://google.com
开始请求 http://github.com
收到 http://google.com 的响应
收到 http://example.com 的响应
收到 http://github.com 的响应
所有任务已完成:
数据来自 http://example.com
数据来自 http://google.com
数据来自 http://github.com
总耗时: 3.01 秒
案例解析:
- 并发而非并行: 注意,程序总耗时约为3秒,而不是2+1+3=6秒。这是因为三个
fetch_data任务是并发执行的。当一个任务(如等待2秒的那个)在await asyncio.sleep()时,事件循环会立刻切换到执行其他就绪的任务(如等待1秒的那个)。asyncio.gather(): 这个函数非常强大,它接收多个协程作为参数,并发地启动它们,然后等待所有协程都完成后,将结果作为一个列表返回。asyncio.run(): 这是Python 3.7+推荐的运行顶级协程的方式,它会处理事件循环的创建、启动和关闭。
第三部分:异步网络交互 - aiohttp
在实际应用中,异步编程最常见的场景就是网络请求。aiohttp是一个基于asyncio的异步HTTP客户端/服务端库,是进行异步网络交互的首选工具。
3.1 环境准备
pip install aiohttp
3.2 实战案例:高并发网络爬虫
假设我们需要从多个API端点获取数据,同步请求会非常慢。使用aiohttp和asyncio可以显著提升效率。
async_crawler.py
import asyncio
import aiohttp
import time
# 一个模拟API端点的URL列表
API_ENDPOINTS = [
"https://httpbin.org/delay/1", # 延迟1秒
"https://httpbin.org/delay/2", # 延迟2秒
"https://httpbin.org/delay/1", # 延迟1秒
"https://httpbin.org/delay/3", # 延迟3秒
"https://httpbin.org/json", # 快速响应
]
# 异步获取单个URL的数据
async def fetch_url(session, url):
try:
print(f"正在请求 {url}...")
async with session.get(url) as response:
# 检查状态码
if response.status == 200:
data = await response.json() # 异步读取响应体
print(f"✅ 成功获取 {url}")
return {"url": url, "status": "success", "data": data}
else:
print(f"❌ 请求失败 {url}, 状态码: {response.status}")
return {"url": url, "status": "error", "code": response.status}
except Exception as e:
print(f"❌ 请求异常 {url}: {str(e)}")
return {"url": url, "status": "exception", "error": str(e)}
# 主函数:并发请求所有URL
async def main():
start_time = time.time()
# 创建一个aiohttp客户端Session
# Session可以复用底层TCP连接,提高效率
async with aiohttp.ClientSession() as session:
# 创建一个任务列表
tasks = [fetch_url(session, url) for url in API_ENDPOINTS]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
end_time = time.time()
print("\n--- 抓取结果汇总 ---")
for res in results:
print(f"URL: {res['url']:<30} | Status: {res['status']}")
print(f"\n总共耗时: {end_time - start_time:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
输出结果 (示意):
1正在请求 https://httpbin.org/delay/1...
2正在请求 https://httpbin.org/delay/2...
3正在请求 https://httpbin.org/delay/1...
4正在请求 https://httpbin.org/delay/3...
5正在请求 https://httpbin.org/json...
6✅ 成功获取 https://httpbin.org/json
7✅ 成功获取 https://httpbin.org/delay/1
8✅ 成功获取 https://httpbin.org/delay/1
9✅ 成功获取 https://httpbin.org/delay/2
10✅ 成功获取 https://httpbin.org/delay/3
11
12--- 抓取结果汇总 ---
13URL: https://httpbin.org/delay/1 | Status: success
14URL: https://httpbin.org/delay/2 | Status: success
15URL: https://httpbin.org/delay/1 | Status: success
16URL: https://httpbin.org/delay/3 | Status: success
17URL: https://httpbin.org/json | Status: success
18
19总共耗时: 3.04 秒
案例解析:
aiohttp.ClientSession(): 一个会话对象,用于管理连接池和Cookie等。使用async with语句可以确保会话在使用完毕后被正确关闭。session.get(url): 返回一个协程,需要用await等待。async with确保响应对象被正确处理。response.json(): 这也是一个异步操作,因为它需要读取响应体,所以也需要await。- 性能对比: 如果用传统的
requests库同步抓取这5个URL,总耗时将是1+2+1+3+0.1≈7秒以上。而异步版本仅用了约3秒(由最长的请求决定),效率提升了超过一倍。
第四部分:进阶应用 - 生产者消费者模式
在处理大量数据时,一个常见的模式是生产者-消费者模式。我们可以用asyncio.Queue来实现一个高效的异步管道。
4.1 实战案例:异步数据处理管道
设想一个场景:我们有一个URL列表,需要先抓取原始数据(生产者),然后对数据进行处理(消费者),并且生产者和消费者的处理速度可能不一致。
async_pipeline.py
import asyncio
import aiohttp
import time
import random
async def producer(queue, urls):
"""生产者:从URL列表获取数据,并放入队列"""
async with aiohttp.ClientSession() as session:
for url in urls:
print(f"[生产者] 正在抓取 {url}...")
async with session.get(url) as response:
data = await response.text()
# 将抓取到的数据放入队列
await queue.put({"url": url, "raw_data": data})
print(f"[生产者] 已将 {url} 的数据放入队列")
# 模拟抓取间隔
await asyncio.sleep(random.uniform(0.1, 0.5))
# 生产完毕,放入一个哨兵信号
await queue.put(None)
async def consumer(queue, worker_id):
"""消费者:从队列取出数据,进行处理"""
while True:
# 从队列中获取数据
item = await queue.get()
# 如果收到哨兵信号,则退出
if item is None:
# 重新放回哨兵,以便其他消费者也能收到
await queue.put(None)
break
url = item['url']
raw_data = item['raw_data']
print(f"[消费者-{worker_id}] 开始处理 {url} 的数据...")
# 模拟数据处理耗时
await asyncio.sleep(random.uniform(0.5, 1.5))
processed_size = len(raw_data)
print(f"[消费者-{worker_id}] 处理完成 {url}, 数据大小: {processed_size}")
async def main():
urls = [
"https://httpbin.org/bytes/100",
"https://httpbin.org/bytes/200",
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/robots.txt",
]
# 创建一个队列,用于生产者和消费者之间传递数据
# maxsize参数可以限制队列大小,防止内存溢出
queue = asyncio.Queue(maxsize=3)
# 创建生产者和消费者任务
producer_task = asyncio.create_task(producer(queue, urls))
# 创建多个消费者任务,模拟并发处理
consumer_tasks = []
for i in range(2): # 创建2个消费者
consumer_task = asyncio.create_task(consumer(queue, i+1))
consumer_tasks.append(consumer_task)
# 等待生产者和所有消费者完成
await producer_task
await asyncio.gather(*consumer_tasks)
print("\n所有任务完成!")
if __name__ == "__main__":
start = time.time()
asyncio.run(main())
end = time.time()
print(f"总耗时: {end - start:.2f} 秒")
案例解析:
asyncio.Queue: 一个线程安全、协程安全的队列。生产者put数据,消费者get数据。maxsize参数可以背压(backpressure),防止生产者过快导致内存撑爆。asyncio.create_task(): 将协程包装成一个任务(Task),使其可以被并发调度。- 模式优势: 生产者可以持续抓取数据而不必等待处理完成,消费者则可以从容处理队列中的数据。两者解耦,各自按自己的节奏运行,最大化了效率。
第五部分:最佳实践与注意事项
- 何时使用异步: 异步编程主要适用于I/O密集型任务(网络请求、文件读写、数据库查询)。对于CPU密集型任务(如大量数学计算),由于Python GIL的存在,多进程通常比异步更有效。
- 错误处理: 异步代码的错误处理同样重要。务必使用
try...except来捕获await操作可能抛出的异常。 - 避免阻塞调用: 在异步代码中,切勿调用任何会阻塞线程的同步函数(如
time.sleep()或requests.get()),这会使整个事件循环停止。 - 资源管理: 始终使用
async with来管理需要异步初始化和清理的资源,如aiohttp.ClientSession、数据库连接等。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)