什么是asyncio

产生背景

Python在处理I/O密集型任务时遇到的问题。传统的多线程编程模型在处理大量I/O操作时,由于线程创建和管理的开销,效率较低。为了解决这个问题,Python从3.4版本开始引入了asyncio库,它能够在单个线程内实现并发,避免了多线程带来的开销,从而提升了性能并降低了复杂度。

如何使用

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    # 网络IO请求:下载一张图片
    yield from asyncio.sleep(2)  # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    # 网络IO请求:下载一张图片
    yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    print(4)


tasks = [
    asyncio.ensure_future( func1() ),
    asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

核心内容

事件循环

async用于执行未完成任务,并在任务完成后,从任务列表中移除。
这步骤为

import asyncio

# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务放到`任务列表`
loop.run_until_complete(任务)

uvloop

uvloop是 asyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高。事实上,uvloop要比nodejs、gevent等其他python异步框架至少要快2倍,性能可以比肩Go语言。

使用
  1. 安装uvloop

    pip3 install uvloop
    
  2. 在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做就行。

    import asyncio
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    # 编写asyncio的代码,与之前写的代码一致。
    
    # 内部的事件循环自动化会变为uvloop
    asyncio.run(...)
    

注意:知名的asgi uvicorn内部就是使用的uvloop的事件循环。

协程函数的编写

协程函数编写后,需要实例化并加入事件循环中的任务列表,函数内部的逻辑才会执行

import asyncio 

async def func():
    print("快来搞我吧!")

result = func()
loop = asyncio.get_event_loop()
loop.run_until_complete(result)

python3.7之后,可以通过使用asyncio.run替代上面注册任务到事件循环,简化了代码的编写

import asyncio 

async def func():
    print("快来搞我吧!")

result = func()
asyncio.run(result)

await

使用:await+可等待对象
await在接收到可等待对象的返回值之后,才会继续执行后续的逻辑。

import asyncio

async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return '返回值'

async def func():
    print("执行协程函数内部代码")

    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
    response1 = await others()
    print("IO请求结束,结果为:", response1)
    
    response2 = await others()
    print("IO请求结束,结果为:", response2)
    
asyncio.run( func() )

可等待对象

Future对象

asyncio中的Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪。( Task 是 Futrue的子类 )
Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)。

简单使用
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # # 创建一个任务(Future对象),这个任务什么都不干。
    fut = loop.create_future()

    # 等待任务最终结果(Future对象),没有结果则会一直等下去。
    await fut

asyncio.run(main())

因为await需要得到可等待对象返回值才会走后续逻辑,而Future对象不绑定任何行为,是不会自动返回值的,所以想要让事件循环获取Future的结果,则需要手动设置。

import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
    fut = loop.create_future()

    # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
    # 即手动设置future任务的最终结果,那么fut就可以结束了。
    await loop.create_task(set_after(fut))

    # 等待 Future对象获取 最终结果,否则一直等下去
    data = await fut
    print(data)

asyncio.run(main())

而Task对象继承了Future对象,其实就对Future进行扩展,他可以实现在对应绑定的函数执行完成之后,自动执行set_result,从而实现自动结束。

python中另一个Future对象

在Python的concurrent.futures模块中也有一个Future对象,这个对象是基于线程池和进程池实现异步操作时使用的对象。
两个Future对象是不同的,他们是为不同的应用场景而设计,例如:concurrent.futures.Future不支持await语法等。

为何python提供这个功能

其实,一般在程序开发中我们要么统一使用 asyncio 的协程实现异步操作,要么都使用进程池和线程池实现异步操作。但如果 协程的异步进程池/线程池的异步 混搭时,那么就会用到此功能了。

简单使用:

import time
import asyncio
import concurrent.futures

def func1():
    # 某个耗时操作
    time.sleep(2)
    return "SB"

async def main():
    loop = asyncio.get_running_loop()

    # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
    # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
    # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1)
    #     print('custom thread pool', result)

    # 3. Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1)
    #     print('custom process pool', result)

asyncio.run(main())

应用场景:
比如一个系统80%都是用asyncio实现异步编程,但是可能剩下的20%涉及到其他第三方接口,它不支持asyncio,那么要支持它进行异步操作,就可以使用python提供的线程池、进程池来实现。

import asyncio
import requests


async def download_image(url):
    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
    print("开始下载:", url)

    loop = asyncio.get_event_loop()
    # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
    # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行requests.get函数,并返回一个concurrent.futures.Future对象
    # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
    # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
    future = loop.run_in_executor(None, requests.get, url)

    response = await future
    print('下载完成')
    # 图片保存到本地文件
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)


if __name__ == '__main__':
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]

    tasks = [download_image(url) for url in url_list]

    loop = asyncio.get_event_loop()
    loop.run_until_complete( asyncio.wait(tasks) )
如何知道第三方接口是否支持asyncio编程
  • await调用第三方接口,看是否有报错,报错就代表不支持
  • 查看源码中是否有提及asyncio的适配
如果第三方接口或者使用了旧版同步接口(不支持asyncio),怎么使用asyncio保证其不会阻塞线程
Task对象(常用)

Task继承Future,Task对象内部await结果的处理基于Future对象来的。

作用:

更高效地添加多个协程对象到事件循环中。
官方说明:

Tasks are used to schedule coroutines concurrently.
When a coroutine is wrapped into a Task with functions like asyncio.create_task()
the coroutine is automatically scheduled to run soon。
翻译:Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。

多个task对象执行完后,怎么知道哪个结果是哪个函数的呢?

可以通过在create_task中设置参数name,然后在返回的列表中查询Task对象获取结果

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main开始")

    task_list = [
        asyncio.create_task(func(), name='n1'),
        asyncio.create_task(func(), name='n2')
    ]

    print("main结束")

    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done)

asyncio.run(main())

异步迭代器

什么是异步迭代器

实现了 __aiter__()__anext__() 方法的对象。__anext__ 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

什么是异步可迭代对象?

可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。

import asyncio


class Reader(object):
    """ 自定义异步迭代器(同时也是异步可迭代对象) """

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val


async def func():
    # 创建异步可迭代对象
    async_iter = Reader()
    # async for 必须要放在async def函数内,否则语法错误。
    async for item in async_iter:
        print(item)

asyncio.run(func())

异步迭代器其实没什么太大的作用,只是支持了async for语法而已。

异步上下文管理器(常用)

此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。

import asyncio


class AsyncContextManager:
	def __init__(self):
        self.conn = conn
        
    async def do_something(self):
        # 异步操作数据库
        return 666

    async def __aenter__(self):
        # 异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库链接
		await asyncio.sleep(1)


async def func():
	# async with 不能单独使用,必须在async函数内调用
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)


asyncio.run(func())

这个异步的上下文管理器还是比较有用的,平时在开发过程中 打开、处理、关闭 操作时,就可以用这种方式来处理。

to_thread(python3.9+)

  • 作用:将同步阻塞 I/O 函数(如 time.sleep、requests.get、pymysql旧库)丢到线程池中运行,解放事件循环线程。
  • 它是run_in_executor的语法糖
import asyncio
import time

# 一个同步的阻塞函数
def blocking_io_task(name, duration):
    print(f"{name}: 开始执行,将阻塞 {duration} 秒")
    time.sleep(duration)  # 模拟I/O阻塞
    print(f"{name}: 执行完成")
    return f"{name}的结果"

async def main():
    # 使用to_thread在单独的线程中运行阻塞函数
    task1 = asyncio.to_thread(blocking_io_task, "任务1", 2)
    task2 = asyncio.to_thread(blocking_io_task, "任务2", 1)
    
    # 并发执行
    results = await asyncio.gather(task1, task2)
    print(f"结果: {results}")

# 运行
asyncio.run(main())

与run_in_executor的区别

在这里插入图片描述

import asyncio
import concurrent.futures
from functools import partial

def compute_intensive_task(x, y):
    # 模拟计算密集型任务
    result = 0
    for i in range(x, y):
        result += i
    return result

async def thread_pool_comparison():
    # 使用to_thread - 自动使用默认线程池
    tasks_to_thread = []
    for i in range(3):
        task = asyncio.to_thread(compute_intensive_task, i*1000, (i+1)*1000)
        tasks_to_thread.append(task)
    
    # 使用run_in_executor - 可以自定义线程池
    loop = asyncio.get_running_loop()
    
    # 创建自定义线程池
    custom_pool = concurrent.futures.ThreadPoolExecutor(
        max_workers=2,
        thread_name_prefix="CustomThread"
    )
    
    tasks_run_in_executor = []
    for i in range(3, 6):
        task = loop.run_in_executor(
            custom_pool,
            partial(compute_intensive_task, i*1000, (i+1)*1000)
        )
        tasks_run_in_executor.append(task)
    
    # 并发执行
    results1 = await asyncio.gather(*tasks_to_thread)
    results2 = await asyncio.gather(*tasks_run_in_executor)
    
    print(f"to_thread结果: {sum(results1)}")
    print(f"run_in_executor结果: {sum(results2)}")
    
    # 清理自定义线程池
    custom_pool.shutdown(wait=True)

场景

对于CPU密集型阻塞(如复杂计算、图像处理、非线程安全的C++库):

  • 方案:使用 loop.run_in_executor()配合 ProcessPoolExecutor,将调用丢到进程池。

  • 原理:每个进程有独立的 Python 解释器和内存空间,彻底绕过 GIL,实现真正的并行计算。

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# 1. 模拟一个同步的、耗时的I/O操作(如旧版requests.get)
def blocking_io(url):
    time.sleep(2)  # 模拟网络I/O等待
    return f"Data from {url}"

# 2. 模拟一个CPU密集型计算
def cpu_bound_task(n):
    return sum(i * i for i in range(n))  # 纯计算,无I/O

async def main():
    loop = asyncio.get_event_loop()
    
    # 场景A:调用同步I/O函数,用线程池
    print("处理I/O阻塞...")
    # 方法1 (Python 3.9+): 使用默认线程池
    result_io = await asyncio.to_thread(blocking_io, "http://example.com")
    print(result_io)
    
    # 方法2: 使用自定义线程池(可控制大小)
    with ThreadPoolExecutor(max_workers=5) as thread_pool:
        result_io2 = await loop.run_in_executor(thread_pool, blocking_io, "http://test.com")
        print(result_io2)
    
    # 场景B:调用CPU密集型函数,用进程池
    print("处理CPU计算...")
    with ProcessPoolExecutor() as process_pool:
        result_cpu = await loop.run_in_executor(process_pool, cpu_bound_task, 10**7)
        print(f"CPU计算结果长度: {len(str(result_cpu))}")

# 运行
asyncio.run(main())
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐