【Python从入门到精通】第 017 篇:并发编程(二)——asyncio 异步编程实战
上一篇【第 016 篇】并发编程(一)——多线程与多进程
下一篇【第 018 篇】文件与IO操作——读写CSV、JSON、Excel
系列说明:本系列共 30 篇,全面介绍 Python 编程从零基础到软件工程师的完整路径。本文为第 017 篇,深入讲解 Python asyncio 异步编程——asyncio 是 Python 3.4 引入的标准库,专门用于编写高并发 I/O 操作,是处理大量网络连接和 I/O 密集型任务的利器。
摘要
asyncio 是 Python 异步编程的核心模块,它基于事件循环和协程机制,实现了单线程内的并发执行。相比多线程,asyncio 避免了锁竞争和上下文切换开销,特别适合 I/O 密集型任务如网络请求、文件操作、数据库访问等。本文涵盖 async/await 语法、事件循环、协程、任务管理、信号量与锁、流处理、异常处理,以及 asyncio 与其他异步库的集成。
一、为什么需要 asyncio
1.1 I/O 密集型问题的本质
# 同步方式:顺序执行,总耗时 = sum(各请求时间)
import requests
import time
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
start = time.time()
for url in urls:
response = requests.get(url)
print(f"收到响应:{response.status_code}")
print(f"同步总耗时:{time.time() - start:.2f}s")
# 约 4 秒
1.2 asyncio 的解决思路
asyncio 采用协程(Coroutine)机制,在等待 I/O 时切换到其他任务,避免阻塞:
import asyncio
import aiohttp
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
responses = await asyncio.gather(*tasks)
print(f"收到 {len(responses)} 个响应")
start = time.time()
asyncio.run(main())
print(f"异步总耗时:{time.time() - start:.2f}s")
# 约 2 秒(最长请求的时间)
二、async/await 语法
2.1 定义协程函数
# 使用 async def 定义协程函数
async def hello():
return "Hello, async!"
# 协程函数调用后返回协程对象
coro = hello()
print(type(coro)) # <class 'coroutine'>
# 运行协程
result = asyncio.run(coro)
print(result) # Hello, async!
# 不能直接调用
# result = hello() # 返回协程对象,不会执行
2.2 await 关键字
import asyncio
async def get_data():
"""模拟异步 I/O 操作。"""
await asyncio.sleep(1) # 模拟 I/O 等待
return "数据"
async def main():
# await 等待协程完成并获取结果
result = await get_data()
print(f"获取到:{result}")
asyncio.run(main())
2.3 awaitable 对象
await 可以用于以下对象:协程(Coroutine)、任务(Task)、未来对象(Future)。
import asyncio
async def coro():
return "result"
async def main():
# 等待协程
result1 = await coro()
# 等待 Task
task = asyncio.create_task(coro())
result2 = await task
# 等待 Future
loop = asyncio.get_event_loop()
future = loop.create_future()
# future.set_result("done")
# result3 = await future
print(result1, result2)
asyncio.run(main())
三、事件循环
3.1 运行事件循环
import asyncio
# 方法一:asyncio.run()(推荐,主入口)
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
# 方法二:get_event_loop() + run_until_complete()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 方法三:.run() 在已有事件循环中
async def nested():
return 42
async def outer():
result = await nested()
print(f"结果:{result}")
# 在已存在的事件循环中运行
async def main_with_existing_loop():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, nested) # 在线程池运行同步函数
3.2 事件循环的生命周期
import asyncio
import time
async def slow_task(name, duration):
print(f"[{name}] 开始")
await asyncio.sleep(duration)
print(f"[{name}] 完成")
return name
async def main():
start = time.time()
# gather: 并发执行多个协程
results = await asyncio.gather(
slow_task("A", 1),
slow_task("B", 2),
slow_task("C", 1),
)
print(f"结果:{results}") # ['A', 'C', 'B'](完成顺序)
print(f"总耗时:{time.time() - start:.2f}s") # ~2s(最长任务的耗时)
asyncio.run(main())
3.3 asyncio.gather vs asyncio.create_task
import asyncio
import time
async def task(name, duration):
print(f"[{name}] 开始")
await asyncio.sleep(duration)
print(f"[{name}] 完成")
return name
# gather:等待所有任务完成
async def using_gather():
results = await asyncio.gather(
task("A", 1),
task("B", 2),
)
return results
# create_task + gather:任务立即开始执行
async def using_create_task():
# 创建任务(立即开始调度)
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 2))
# 两任务同时执行
results = await asyncio.gather(t1, t2)
return results
# create_task + 独立等待
async def using_task_independently():
t1 = asyncio.create_task(task("A", 1))
t2 = asyncio.create_task(task("B", 2))
# 等待特定任务
result1 = await t1
print(f"Task A 完成:{result1}")
# 等待其他任务
result2 = await t2
return result2
四、任务管理
4.1 asyncio.Task
Task 是 Future 的子类,用于调度协程执行:
import asyncio
async def fetch(url):
await asyncio.sleep(1)
return f"数据 from {url}"
async def main():
# create_task:创建并启动任务
task1 = asyncio.create_task(fetch("https://example.com"))
task2 = asyncio.create_task(fetch("https://python.org"))
# 等待结果
result1 = await task1
result2 = await task2
print(result1, result2)
asyncio.run(main())
4.2 asyncio.wait
import asyncio
async def task(name, duration):
await asyncio.sleep(duration)
return f"{name} 完成"
async def main():
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3)),
]
# wait:等待一组任务完成
# return_when 参数:FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"已完成:{[t.result() for t in done]}")
print(f"待完成:{len(pending)} 个")
# 取消待完成的任务
for p in pending:
p.cancel()
# 等待取消完成
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(main())
4.3 asyncio.as_completed
import asyncio
async def task(name, duration):
await asyncio.sleep(duration)
return f"{name} 完成"
async def main():
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3)),
]
# as_completed:按完成顺序返回结果
for completed in asyncio.as_completed(tasks):
result = await completed
print(f"收到结果:{result}")
asyncio.run(main())
# 输出顺序:B 完成 -> A 完成 -> C 完成(按实际完成时间)
五、同步与异步
5.1 asyncio.Lock
import asyncio
class AsyncCounter:
def __init__(self):
self.count = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
self.count += 1
return self.count
async def worker(counter, worker_id):
for _ in range(100):
new_count = await counter.increment()
print(f"Worker {worker_id} 完成")
async def main():
counter = AsyncCounter()
tasks = [worker(counter, i) for i in range(10)]
await asyncio.gather(*tasks)
print(f"最终计数:{counter.count}") # 应该是 1000
asyncio.run(main())
5.2 asyncio.Semaphore
import asyncio
async def limited_task(semaphore, task_id):
async with semaphore:
print(f"任务 {task_id} 开始")
await asyncio.sleep(1)
print(f"任务 {task_id} 完成")
return task_id
async def main():
# 限制同时执行的任务数
semaphore = asyncio.Semaphore(3)
tasks = [limited_task(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(f"完成顺序:{results}")
asyncio.run(main())
5.3 asyncio.Queue
import asyncio
async def producer(queue, count):
for i in range(count):
item = f"item-{i}"
await queue.put(item)
print(f"生产:{item}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=2)
print(f"消费者 {name} 处理:{item}")
await asyncio.sleep(0.3)
queue.task_done()
except asyncio.TimeoutError:
print(f"消费者 {name} 超时,退出")
break
async def main():
queue = asyncio.Queue()
# 启动生产者和消费者
await asyncio.gather(
producer(queue, 5),
consumer(queue, "C1"),
consumer(queue, "C2"),
)
asyncio.run(main())
5.4 asyncio.Event
import asyncio
async def waiter(event, name):
print(f"[{name}] 等待事件...")
await event.wait()
print(f"[{name}] 事件已触发!")
async def setter(event):
await asyncio.sleep(2)
print("[Setter] 设置事件")
event.set()
async def main():
event = asyncio.Event()
tasks = [
asyncio.create_task(waiter(event, "W1")),
asyncio.create_task(waiter(event, "W2")),
asyncio.create_task(setter(event)),
]
await asyncio.gather(*tasks)
asyncio.run(main())
# W1 和 W2 会同时等待,直到 setter 设置事件
六、异步上下文管理器
6.1 async with 语法
import asyncio
class AsyncResource:
"""模拟异步资源(如数据库连接)。"""
async def __aenter__(self):
print("获取资源...")
await asyncio.sleep(0.5) # 模拟连接建立
print("资源已就绪")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源...")
await asyncio.sleep(0.5) # 模拟关闭连接
print("资源已释放")
return False # 不抑制异常
async def use_resource():
async with AsyncResource() as resource:
print("使用资源中...")
await asyncio.sleep(1)
# 退出 with 块时自动清理
asyncio.run(use_resource())
6.2 aenter 和 aexit 的返回值
class ManagedResource:
async def __aenter__(self):
return "返回值将绑定到 as 后的变量"
async def __aexit__(self, *args):
pass
async def main():
async with ManagedResource() as resource:
print(resource) # "返回值将绑定到 as 后的变量"
七、异步迭代器与生成器
7.1 async for
import asyncio
class AsyncRange:
"""异步迭代器。"""
def __init__(self, max_num):
self.current = 0
self.max_num = max_num
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(0.1) # 模拟异步操作
if self.current >= self.max_num:
raise StopAsyncIteration
value = self.current
self.current += 1
return value
async def main():
async for i in AsyncRange(5):
print(f"值:{i}")
asyncio.run(main())
7.2 异步生成器
import asyncio
async def async_range(max_num):
"""异步生成器。"""
for i in range(max_num):
await asyncio.sleep(0.1) # 模拟异步操作
yield i
async def main():
async for i in async_range(5):
print(f"值:{i}")
asyncio.run(main())
7.3 aitertools(Python 3.12+)
import asyncio
import aitertools
async def process_item(item):
await asyncio.sleep(0.1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# 异步映射
doubled = [x async for x in aitertools.async_iter(items).map(process_item)]
print(f"加倍结果:{doubled}") # [2, 4, 6, 8, 10]
# 异步过滤
filtered = [x async for x in aitertools.async_iter(items).filter(lambda x: x > 2)]
print(f"过滤结果:{filtered}") # [3, 4, 5]
# Python 3.12 以下需要手动实现或使用第三方库
八、实战演练
8.1 异步 HTTP 客户端
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class FetchResult:
url: str
status: int
content: str
duration: float
error: Optional[str] = None
async def fetch(session, url) -> FetchResult:
start = asyncio.get_event_loop().time()
try:
async with session.get(url) as response:
content = await response.text()
duration = asyncio.get_event_loop().time() - start
return FetchResult(
url=url,
status=response.status,
content=content[:100], # 只保留前100字符
duration=duration
)
except Exception as e:
duration = asyncio.get_event_loop().time() - start
return FetchResult(
url=url,
status=0,
content="",
duration=duration,
error=str(e)
)
async def fetch_all(urls: List[str]) -> List[FetchResult]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
]
print("开始并发请求...")
results = await fetch_all(urls)
print(f"\n请求完成,共 {len(results)} 个响应:")
for r in results:
status = f"HTTP {r.status}" if r.status else f"错误: {r.error}"
print(f" {r.url}: {status} ({r.duration:.2f}s)")
if __name__ == "__main__":
asyncio.run(main())
8.2 异步 WebSocket 客户端
import asyncio
import json
import websockets
from typing import Callable, Optional
class AsyncWebSocketClient:
"""异步 WebSocket 客户端。"""
def __init__(self, uri: str):
self.uri = uri
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
async def connect(self):
self.websocket = await websockets.connect(self.uri)
print(f"已连接到 {self.uri}")
async def send(self, message: dict):
if self.websocket:
await self.websocket.send(json.dumps(message))
print(f"发送:{message}")
async def receive(self) -> dict:
if self.websocket:
message = await self.websocket.recv()
data = json.loads(message)
print(f"收到:{data}")
return data
async def close(self):
if self.websocket:
await self.websocket.close()
print("连接已关闭")
async def echo_handler(uri: str):
"""回声服务器测试处理器。"""
client = AsyncWebSocketClient(uri)
try:
await client.connect()
# 发送消息
for i in range(5):
await client.send({"message": f"Hello {i}", "id": i})
await asyncio.sleep(0.5)
# 接收响应
for _ in range(5):
response = await asyncio.wait_for(client.receive(), timeout=5)
print(f"响应:{response}")
except asyncio.TimeoutError:
print("接收超时")
except websockets.exceptions.ConnectionClosed:
print("连接已关闭")
finally:
await client.close()
# 使用(需要运行一个 echo 服务器)
# asyncio.run(echo_handler("ws://localhost:8080"))
8.3 异步限流器
import asyncio
import time
from typing import List, Callable, Any, Awaitable
class AsyncRateLimiter:
"""异步限流器。"""
def __init__(self, rate: float, per: float):
"""
Args:
rate: 每秒允许的请求数
per: 时间窗口(秒)
"""
self.rate = rate
self.per = per
self.allowance = rate
self.last_check = time.time()
self.lock = asyncio.Lock()
async def acquire(self):
"""获取许可(必要时等待)。"""
async with self.lock:
current = time.time()
time_passed = current - self.last_check
self.last_check = current
# 恢复令牌
self.allowance += time_passed * self.rate
if self.allowance > self.rate:
self.allowance = self.rate
if self.allowance < 1.0:
# 需要等待
wait_time = (1.0 - self.allowance) / self.rate
await asyncio.sleep(wait_time)
self.allowance = 0.0
else:
self.allowance -= 1.0
async def rate_limited_task(limiter, task_id):
await limiter.acquire()
print(f"任务 {task_id} 执行")
await asyncio.sleep(0.1)
async def main():
# 每秒最多执行 5 个任务
limiter = AsyncRateLimiter(rate=5, per=1.0)
tasks = [rate_limited_task(limiter, i) for i in range(15)]
start = time.time()
await asyncio.gather(*tasks)
duration = time.time() - start
print(f"总耗时:{duration:.2f}s(理论上应约 3 秒)")
asyncio.run(main())
九、异常处理
9.1 try/except 在 async 中
import asyncio
async def risky_task():
await asyncio.sleep(1)
raise ValueError("出错了!")
async def main():
try:
result = await risky_task()
except ValueError as e:
print(f"捕获异常:{e}")
finally:
print("清理代码")
asyncio.run(main())
9.2 gather 中的异常
import asyncio
async def task(id, should_fail=False):
await asyncio.sleep(0.5)
if should_fail:
raise ValueError(f"Task {id} 失败")
return f"Task {id} 完成"
async def main():
# return_exceptions=True:捕获所有异常,返回异常对象
results = await asyncio.gather(
task(1),
task(2, should_fail=True),
task(3),
return_exceptions=True
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Task {i} 异常:{result}")
else:
print(f"Task {i} 结果:{result}")
asyncio.run(main())
9.3 超时处理
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "完成"
async def with_timeout():
try:
result = await asyncio.wait_for(slow_task(), timeout=3.0)
print(f"结果:{result}")
except asyncio.TimeoutError:
print("任务超时!")
async def shield_from_cancellation():
task = asyncio.create_task(slow_task())
try:
# shield 保护任务不被取消
result = await asyncio.shield(task)
except asyncio.CancelledError:
print("主任务被取消,但保护的任务继续运行...")
result = await task
print(f"任务完成:{result}")
asyncio.run(with_timeout())
十、asyncio 与同步代码的集成
10.1 run_in_executor 运行同步代码
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def cpu_bound_sync():
"""同步 CPU 密集型任务。"""
result = 0
for i in range(10000000):
result += i * i
return result
async def main():
# 在线程池中运行同步代码
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_sync)
print(f"计算结果:{result}")
asyncio.run(main())
10.2 to_thread(Python 3.9+)
import asyncio
def blocking_io():
time.sleep(1)
return "同步 I/O 完成"
async def main():
# 将同步阻塞代码放到线程中
result = await asyncio.to_thread(blocking_io)
print(result)
asyncio.run(main())
十一、常见问题与注意事项
11.1 忘记 await
async def wrong():
coro = some_async_function() # 没有 await
return coro # 返回协程对象,不是结果
async def correct():
result = await some_async_function() # 正确
return result
11.2 在同步代码中调用异步代码
import asyncio
# 错误:在同步函数中调用 asyncio.run()
def sync_wrapper():
asyncio.run(async_function()) # 会创建新的事件循环
# 正确:使用 get_event_loop()
def sync_wrapper():
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# 没有正在运行的事件循环
asyncio.run(async_function())
else:
# 已有事件循环,在新线程中运行
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
loop.run_in_executor(pool, asyncio.run, async_function())
11.3 事件循环与嵌套
import asyncio
# asyncio.run() 不能嵌套使用
async def inner():
return await asyncio.sleep(1, result="inner")
async def outer():
# asyncio.run(inner()) # 错误:不能嵌套
result = await inner() # 正确
return result
asyncio.run(outer())
十二、总结
asyncio 是 Python 处理高并发 I/O 密集型任务的首选方案,它通过事件循环和协程机制,在单线程内实现了高效的并发执行。
本文的核心要点:async/await 是 asyncio 的核心语法,定义了协程和等待机制。事件循环是 asyncio 的调度中心,管理所有协程的执行。Task 是 Future 的子类,用于调度和追踪协程的执行状态。asyncio 提供了 Lock、Semaphore、Queue、Event 等同步原语,用于协程间协调。gather、wait、as_completed 提供了不同的任务组合方式。asyncio 与同步代码可以通过 run_in_executor 或 to_thread 集成。
上一篇【第 016 篇】并发编程(一)——多线程与多进程
下一篇【第 018 篇】文件与IO操作——读写CSV、JSON、Excel
参考资料
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)