Python asyncio深入解析:从事件循环到协程调度
·
引言
异步编程是现代Python后端开发的核心技能之一。作为从Python转向Rust的开发者,我深刻理解asyncio的设计理念和性能优势。本文将深入解析asyncio的内部机制,从事件循环到协程调度,帮助你全面掌握Python异步编程。
一、asyncio基础架构
1.1 事件循环(Event Loop)
事件循环是asyncio的核心组件,负责调度协程、处理IO事件和管理任务:
import asyncio
async def main():
print("Hello, asyncio!")
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Python 3.7+ 简化写法
asyncio.run(main())
1.2 协程(Coroutine)
协程是轻量级的并发原语,可以暂停和恢复执行:
async def fetch_data(url):
print(f"Fetching {url}")
await asyncio.sleep(1) # 模拟IO操作
return f"Data from {url}"
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
1.3 任务(Task)
任务是协程的包装器,可以并发执行:
async def task1():
await asyncio.sleep(1)
return "Task 1 completed"
async def task2():
await asyncio.sleep(2)
return "Task 2 completed"
async def main():
# 创建任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任务完成
results = await asyncio.gather(t1, t2)
print(results) # ['Task 1 completed', 'Task 2 completed']
asyncio.run(main())
二、事件循环深入
2.1 事件循环的工作原理
事件循环使用Reactor模式处理IO事件:
import asyncio
import socket
async def handle_client(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888
)
async with server:
await server.serve_forever()
asyncio.run(main())
2.2 事件循环实现对比
| 实现 | 平台 | 特点 |
|---|---|---|
| SelectorEventLoop | 跨平台 | 使用select/poll/epoll |
| ProactorEventLoop | Windows | 使用IOCP |
| uvloop | Linux/macOS | 高性能第三方实现 |
2.3 使用uvloop提升性能
import asyncio
import uvloop
# 使用uvloop替换默认事件循环
uvloop.install()
async def main():
print("Running on uvloop")
asyncio.run(main())
三、协程调度机制
3.1 协程状态机
协程有四种状态:
- PENDING:初始状态
- RUNNING:正在执行
- FINISHED:执行完成
- CANCELLED:被取消
import asyncio
async def my_coroutine():
try:
while True:
await asyncio.sleep(1)
print("Running...")
except asyncio.CancelledError:
print("Cancelled!")
raise
async def main():
task = asyncio.create_task(my_coroutine())
await asyncio.sleep(3)
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
asyncio.run(main())
3.2 任务调度策略
asyncio使用协作式调度,协程必须主动让出控制权:
async def compute():
# 计算密集型任务会阻塞事件循环
result = 0
for i in range(10**7):
result += i
return result
async def io_task():
print("Starting IO task")
await asyncio.sleep(0.1)
print("IO task completed")
async def main():
# 计算任务会阻塞IO任务
compute_task = asyncio.create_task(compute())
io_task = asyncio.create_task(io_task())
await compute_task
await io_task
asyncio.run(main())
3.3 解决计算密集型任务阻塞问题
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def compute():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
# 在线程池中执行计算密集型任务
result = await loop.run_in_executor(pool, heavy_compute)
return result
def heavy_compute():
result = 0
for i in range(10**7):
result += i
return result
async def main():
result = await compute()
print(f"Result: {result}")
asyncio.run(main())
四、并发模式
4.1 并行执行多个任务
async def fetch(url):
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Result from {url}"
async def main():
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3"
]
# 使用gather并发执行
results = await asyncio.gather(*[fetch(url) for url in urls])
print(results)
asyncio.run(main())
4.2 带超时的任务执行
async def long_running_task():
await asyncio.sleep(5)
return "Done"
async def main():
try:
result = await asyncio.wait_for(long_running_task(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Task timed out")
asyncio.run(main())
4.3 任务优先级控制
async def high_priority():
print("High priority task started")
await asyncio.sleep(0.1)
print("High priority task completed")
async def low_priority():
print("Low priority task started")
await asyncio.sleep(1)
print("Low priority task completed")
async def main():
# 先启动低优先级任务
low = asyncio.create_task(low_priority())
await asyncio.sleep(0.01) # 让低优先级任务开始
# 启动高优先级任务
high = asyncio.create_task(high_priority())
await high
await low
asyncio.run(main())
五、异步IO操作
5.1 文件操作
async def read_file(path):
loop = asyncio.get_event_loop()
with open(path, 'r') as f:
# 使用线程池执行阻塞IO
content = await loop.run_in_executor(None, f.read)
return content
async def main():
content = await read_file('example.txt')
print(content)
asyncio.run(main())
5.2 网络操作
import asyncio
import aiohttp
async def fetch_with_aiohttp(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
content = await fetch_with_aiohttp('https://www.python.org')
print(content[:500])
asyncio.run(main())
5.3 数据库操作
import asyncio
import asyncpg
async def query_database():
conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
result = await conn.fetch('SELECT * FROM users LIMIT 10')
await conn.close()
return result
async def main():
users = await query_database()
print(users)
asyncio.run(main())
六、实战:构建异步Web服务
6.1 使用FastAPI
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def heavy_computation():
await asyncio.sleep(1)
return {"result": 42}
@app.get("/")
async def read_root():
return {"message": "Hello World"}
@app.get("/compute")
async def compute():
return await heavy_computation()
6.2 WebSocket支持
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
await asyncio.sleep(0.1)
6.3 中间件实现
import asyncio
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = asyncio.get_event_loop().time()
response = await call_next(request)
duration = asyncio.get_event_loop().time() - start_time
print(f"Request took {duration:.2f} seconds")
return response
七、性能优化技巧
7.1 避免阻塞调用
# 错误:阻塞事件循环
async def bad_example():
import time
time.sleep(1) # 阻塞!
# 正确:使用异步版本
async def good_example():
await asyncio.sleep(1) # 非阻塞
7.2 批量操作优化
async def process_items(items):
# 分批处理
batch_size = 100
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
7.3 连接池管理
import asyncpg
from typing import List
class DatabasePool:
def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def init(self):
self.pool = await asyncpg.create_pool(
dsn=self.dsn,
min_size=self.min_size,
max_size=self.max_size
)
async def query(self, sql: str, *args):
async with self.pool.acquire() as conn:
return await conn.fetch(sql, *args)
八、从Python到Rust的异步迁移
8.1 asyncio vs Tokio对比
| 特性 | Python asyncio | Rust Tokio |
|---|---|---|
| 运行时 | 单线程事件循环 | 多线程工作窃取 |
| 并发模型 | 协程 | 协程 + 线程池 |
| 性能 | 较好 | 接近原生 |
| 内存安全 | 运行时检查 | 编译时保证 |
8.2 代码对比
Python版本:
import asyncio
async def fetch(url):
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3")
)
print(results)
asyncio.run(main())
Rust版本:
use tokio;
async fn fetch(url: &str) -> String {
println!("Fetching {}", url);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
format!("Data from {}", url)
}
#[tokio::main]
async fn main() {
let results = tokio::join!(
fetch("url1"),
fetch("url2"),
fetch("url3")
);
println!("{:?}", results);
}
九、常见问题与解决方案
9.1 事件循环阻塞
# 问题:计算密集型任务阻塞事件循环
async def blocking_task():
result = 0
for i in range(10**8):
result += i
# 解决方案:使用线程池
async def non_blocking_task():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_compute)
return result
9.2 协程泄漏
# 问题:忘记await任务
async def main():
task = asyncio.create_task(long_running())
# task被丢弃,可能导致资源泄漏
# 解决方案:确保await所有任务
async def main():
task = asyncio.create_task(long_running())
await task
9.3 递归深度限制
# 问题:协程递归过深
async def recursive(n):
if n == 0:
return 0
return n + await recursive(n - 1)
# 解决方案:使用循环替代递归
async def iterative(n):
result = 0
for i in range(n + 1):
result += i
return result
十、总结
asyncio是Python异步编程的核心库,通过事件循环、协程和任务实现高效的并发编程。关键要点包括:
- 事件循环:负责调度协程和处理IO事件
- 协程:轻量级的并发原语
- 任务:协程的并发包装器
- 并发模式:gather、wait_for、join等
- 性能优化:避免阻塞、使用线程池、批量操作
通过掌握asyncio,你可以构建高性能的异步应用,为后续学习Rust异步编程打下坚实基础。
参考资料:
- asyncio官方文档:https://docs.python.org/3/library/asyncio.html
- FastAPI文档:https://fastapi.tiangolo.com/
- uvloop文档:https://uvloop.readthedocs.io/
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)