🔎大家好,我是ZTLJQ,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流

📝个人主页-ZTLJQ的主页

🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝​📣系列果你对这个系列感兴趣的话

专栏 - ​​​​​​Python从零到企业级应用:短时间成为市场抢手的程序员

✔说明⇢本人讲解主要包括Python爬虫、JS逆向、Python的企业级应用

如果你对这个系列感兴趣的话,可以关注订阅哟👋

从“等待”到“并行”

在传统的同步编程世界里,代码像一条单行道,顺序执行。一旦遇到一个耗时的操作,比如一个网络请求或文件读写,整个程序就会“卡住”,直到这个操作完成才继续。这对于需要处理大量I/O(输入/输出)操作的应用(如Web服务器、网络爬虫)来说,效率是极其低下的。

协程(Coroutine)与异步编程(Asynchronous Programming)为我们开辟了一条全新的道路。它不是让程序傻傻地等待,而是提供了一种“协作式”的方式,让程序在等待一个慢操作的时候,可以主动让出控制权去处理其他任务。这种方式在单个线程内实现了高并发,极大地提升了I/O密集型应用的吞吐量。

本篇博客将带你深入Python异步编程的核心——asyncio库,通过详细的理论讲解和丰富的实战案例,让你彻底掌握这门强大的技术。


第一部分:理解协程与异步编程
1.1 核心概念
  • 协程 (Coroutine): 可以理解为一个“可暂停”的函数。它不像普通函数那样从头跑到尾,而是可以在某个点(await)暂停执行,并在稍后恢复。它是异步编程的基础单元。
  • async def: 用于定义一个协程函数。调用这个函数并不会立即执行函数体,而是返回一个协程对象(Coroutine Object)。
  • await: 关键字,用于等待另一个协程的完成。它只能在async def函数内部使用。await会暂停当前协程的执行,将控制权交还给事件循环,去执行其他准备就绪的协程,直到被等待的协程返回结果。
  • 事件循环 (Event Loop): 异步程序的“心脏”和“调度器”。它不断地运行,查找并执行那些已经准备好运行的协程。asyncio库的核心就是事件循环。
1.2 协程 vs. 线程 vs. 进程
特性 普通函数 (同步) 线程 (Threading) 协程 (Asyncio)
执行方式 顺序执行,遇到I/O就阻塞 操作系统调度,多线程并发 事件循环调度,协程协作式并发
资源开销 高 (线程创建、切换开销大) 极低 (单线程内切换)
GIL影响 受GIL限制,无法真正并行CPU任务 无 (单线程,天然避开了GIL)
适用场景 CPU密集型、简单任务 I/O密集型,但开销较大 I/O密集型,高并发场景 (首选)

第二部分:asyncio 核心与实战

asyncio是Python标准库中用于编写并发代码的模块,专为异步I/O、事件循环、协程和任务而设计。

2.1 实战案例:基础异步函数与并发执行

让我们从一个最简单的例子开始,感受异步的魅力。

basic_async_concurrency.py

import asyncio
import time

# 定义一个模拟耗时I/O操作的协程
async def fetch_data(data_id, delay):
    print(f"任务 {data_id} 开始执行...")
    # await asyncio.sleep(delay) 是一个非阻塞的延迟模拟
    # 它会暂停当前协程,并允许事件循环去执行其他任务
    await asyncio.sleep(delay)
    print(f"任务 {data_id} 执行完成,耗时 {delay} 秒")
    return f"结果_{data_id}"

# 主协程函数
async def main():
    print("=== 并发执行多个任务 ===")
    start_time = time.time()

    # 创建三个协程对象
    task1 = fetch_data("A", 2)
    task2 = fetch_data("B", 1)
    task3 = fetch_data("C", 3)

    # asyncio.gather() 用于并发运行多个协程,并等待它们全部完成
    # 它会返回一个包含所有结果的列表
    results = await asyncio.gather(task1, task2, task3)

    end_time = time.time()
    
    print("\n所有任务的返回结果:")
    for result in results:
        print(f"  - {result}")
    
    print(f"\n总耗时: {end_time - start_time:.2f} 秒")

# 运行主协程
# asyncio.run() 会自动创建事件循环并运行main()
if __name__ == "__main__":
    asyncio.run(main())

输出结果:

=== 并发执行多个任务 ===
任务 A 开始执行...
任务 B 开始执行...
任务 C 开始执行...
任务 B 执行完成,耗时 1 秒
任务 A 执行完成,耗时 2 秒
任务 C 执行完成,耗时 3 秒

所有任务的返回结果:
  - 结果_A
  - 结果_B
  - 结果_C

总耗时: 3.01 秒

案例解析:

  • 并发而非并行: 注意,程序总耗时约为3秒,而不是2+1+3=6秒。这是因为三个任务是并发执行的。当task1(延时2秒)在await asyncio.sleep(2)时,事件循环会立即切换到执行task2task3
  • asyncio.gather(): 这是并发执行多个协程的最常用方法之一。它接收多个协程作为参数,并发地启动它们,然后等待所有协程都完成。
  • asyncio.run(): 这是Python 3.7+推荐的启动顶级异步程序的方式,它封装了事件循环的创建、启动和关闭过程。
2.2 实战案例:异步网络请求

网络请求是I/O密集型任务的典型代表。aiohttp是Python中最流行的异步HTTP客户端库。让我们看看如何用它实现高效的并发请求。

async_http_requests.py

import asyncio
import aiohttp
import time

# pip install aiohttp

API_ENDPOINTS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/json",
    "https://httpbin.org/uuid"
]

async def fetch_url(session, url):
    """异步获取单个URL的数据"""
    try:
        print(f"正在请求 {url}...")
        # async with 用于确保资源被正确释放
        async with session.get(url) as response:
            if response.status == 200:
                data = await response.json() # 异步读取响应体
                print(f"✅ 成功获取 {url}")
                return {"url": url, "status": "success", "data_type": type(data).__name__}
            else:
                print(f"❌ 请求失败 {url}, 状态码: {response.status}")
                return {"url": url, "status": "error", "code": response.status}
    except Exception as e:
        print(f"❌ 请求异常 {url}: {str(e)}")
        return {"url": url, "status": "exception", "error": str(e)}

async def main():
    start_time = time.time()
    
    # 创建一个aiohttp客户端会话
    # 会话可以复用底层TCP连接,提高效率
    async with aiohttp.ClientSession() as session:
        # 创建一个任务列表,每个任务都是一个fetch_url协程
        tasks = [fetch_url(session, url) for url in API_ENDPOINTS]
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)

    end_time = time.time()

    print("\n--- 抓取结果汇总 ---")
    for res in results:
        print(f"  - {res['url']:<35} | Status: {res['status']}")

    print(f"\n并发请求总耗时: {end_time - start_time:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

案例解析:

  • aiohttp.ClientSession(): 一个异步的HTTP会话对象。使用async with语句可以确保会话在使用完毕后被正确关闭和清理。
  • session.get(url): 这个操作本身返回一个协程,需要用await等待。async with确保响应对象被正确处理。
  • response.json(): 读取JSON响应体也是一个I/O操作,所以同样需要用await等待。
  • 性能对比: 如果用同步的requests库逐个请求这5个URL,总耗时将是1+2+1+0.1+0.1≈4.2秒以上。而异步版本利用了并发,总耗时约为2秒(由最慢的delay/2决定),效率提升了超过一倍。

第三部分:进阶应用 - 生产者消费者模式

在处理大量数据流时,生产者-消费者模式是一种非常经典的设计。我们可以使用asyncio.Queue来实现一个高效的异步管道。

3.1 实战案例:异步数据处理管道

设想一个场景:我们有一批URL需要抓取,抓取后需要对数据进行处理。抓取和处理是两个独立的步骤,它们的速度可能不匹配。我们希望抓取器能持续工作,而处理器能从一个缓冲区(队列)中拿数据处理。

async_producer_consumer.py

import asyncio
import aiohttp
import time
import random

async def producer(queue, urls):
    """生产者:抓取URL数据,并放入队列"""
    print("[Producer] 开始抓取数据...")
    async with aiohttp.ClientSession() as session:
        for url in urls:
            print(f"  [Producer] 正在抓取 {url}...")
            async with session.get(url) as response:
                raw_data = await response.text()
                # 将抓取到的数据(或其他标识)放入队列
                await queue.put({"url": url, "data": raw_data[:100]}) # 只存前100个字符
                print(f"  [Producer] {url} 的数据已放入队列")
                # 模拟抓取间隔,避免过于频繁
                await asyncio.sleep(random.uniform(0.1, 0.3))
    
    # 生产完毕,放入一个哨兵信号通知消费者停止
    await queue.put(None)
    print("[Producer] 抓取任务完成")

async def consumer(queue, worker_id):
    """消费者:从队列取出数据,进行处理"""
    print(f"[Consumer-{worker_id}] 开始处理数据...")
    while True:
        # 从队列中获取数据
        item = await queue.get()
        
        # 如果收到哨兵信号,则退出
        if item is None:
            print(f"[Consumer-{worker_id}] 收到结束信号,准备退出")
            # 重新放回哨兵,以便其他消费者也能收到
            await queue.put(None)
            break
        
        url = item['url']
        data_preview = item['data']
        
        print(f"  [Consumer-{worker_id}] 正在处理 {url} 的数据...")
        # 模拟数据处理耗时
        await asyncio.sleep(random.uniform(0.5, 1.0))
        print(f"  [Consumer-{worker_id}] 处理完成 {url}, 数据预览: '{data_preview}...'")

async def main():
    urls = [
        "https://httpbin.org/bytes/100",
        "https://httpbin.org/bytes/200",
        "https://httpbin.org/html",
        "https://httpbin.org/json",
        "https://httpbin.org/xml",
        "https://httpbin.org/robots.txt",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers",
    ]

    # 创建一个队列,用于生产者和消费者之间传递数据
    # maxsize参数可以限制队列大小,防止内存溢出
    queue = asyncio.Queue(maxsize=2)

    # 创建生产者任务
    producer_task = asyncio.create_task(producer(queue, urls))
    
    # 创建多个消费者任务,模拟并发处理
    consumer_tasks = []
    for i in range(2): # 创建2个消费者
        consumer_task = asyncio.create_task(consumer(queue, i+1))
        consumer_tasks.append(consumer_task)

    # 等待生产者和所有消费者任务完成
    await producer_task
    await asyncio.gather(*consumer_tasks)

    print("\n所有生产者和消费者任务均已结束!")

if __name__ == "__main__":
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f"\n管道总耗时: {end - start:.2f} 秒")

案例解析:

  • asyncio.Queue: 一个线程安全、协程安全的异步队列。生产者向队列put数据,消费者从队列get数据。maxsize参数可以实现背压(backpressure),防止生产者过快导致内存撑爆。
  • asyncio.create_task(): 将一个协程包装成一个任务(Task),使其可以被事件循环并发调度。
  • 模式优势: 生产者可以持续抓取数据而不必等待处理完成,消费者则可以从容处理队列中的数据。两者解耦,各自按自己的节奏运行,最大化了整体吞吐量。

第四部分:异步与同步的交互

有时候,我们不可避免地需要在异步代码中调用一些旧的、同步的阻塞函数(如某些计算库或文件操作)。直接调用它们会阻塞整个事件循环。asyncio提供了run_in_executor来解决这个问题,它会将同步函数放到一个线程池或进程池中执行,避免阻塞事件循环。

async_with_sync.py

import asyncio
import time

def blocking_cpu_task(n):
    """一个耗时的同步CPU任务"""
    print(f"CPU任务开始 (n={n})...")
    total = 0
    for i in range(n):
        total += i * i
    print(f"CPU任务完成,结果: {total}")
    return total

async def main():
    print("主协程开始...")
    
    # 模拟一些异步工作
    await asyncio.sleep(1)
    print("异步工作1完成")

    # 将同步任务放到默认的线程池中执行
    # 这样就不会阻塞事件循环
    sync_result = await asyncio.get_event_loop().run_in_executor(
        None, # None表示使用默认的ThreadPoolExecutor
        blocking_cpu_task, # 要执行的函数
        5000000 # 函数的参数
    )
    print(f"从同步任务得到结果: {sync_result}")

    await asyncio.sleep(1)
    print("异步工作2完成")

    print("主协程结束")

if __name__ == "__main__":
    asyncio.run(main())

第五部分:最佳实践与注意事项

  1. 区分场景: 异步编程主要用于I/O密集型任务。对于纯粹的CPU密集型任务,由于Python GIL的存在,多进程(multiprocessing)通常是更好的选择。
  2. 避免阻塞: 绝对不要在async def函数中调用任何会阻塞线程的同步函数(如time.sleep()requests.get()),这会使整个事件循环停滞。必须使用对应的异步版本(如await asyncio.sleep()aiohttp)。
  3. 错误处理: 异步代码的错误处理同样重要。务必使用try...except来捕获await操作可能引发的异常。
  4. 资源管理: 始终使用async with语句来管理需要异步初始化和清理的资源,如aiohttp.ClientSession、数据库连接等,确保资源被正确释放。
结语

Python的异步编程模型,通过asyncio库和async/await关键字,为我们提供了一种强大而优雅的方式来处理高并发的I/O密集型任务。它能在单线程内实现惊人的吞吐量,是构建高性能网络应用、API客户端和数据采集工具的利器。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐