引言

异步编程是现代Python后端开发的核心技能之一。作为从Python转向Rust的开发者,我深刻理解asyncio的设计理念和性能优势。本文将深入解析asyncio的内部机制,从事件循环到协程调度,帮助你全面掌握Python异步编程。

一、asyncio基础架构

1.1 事件循环(Event Loop)

事件循环是asyncio的核心组件,负责调度协程、处理IO事件和管理任务:

import asyncio

async def main():
    print("Hello, asyncio!")
    
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# Python 3.7+ 简化写法
asyncio.run(main())

1.2 协程(Coroutine)

协程是轻量级的并发原语,可以暂停和恢复执行:

async def fetch_data(url):
    print(f"Fetching {url}")
    await asyncio.sleep(1)  # 模拟IO操作
    return f"Data from {url}"

async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())

1.3 任务(Task)

任务是协程的包装器,可以并发执行:

async def task1():
    await asyncio.sleep(1)
    return "Task 1 completed"

async def task2():
    await asyncio.sleep(2)
    return "Task 2 completed"

async def main():
    # 创建任务
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    
    # 等待所有任务完成
    results = await asyncio.gather(t1, t2)
    print(results)  # ['Task 1 completed', 'Task 2 completed']

asyncio.run(main())

二、事件循环深入

2.1 事件循环的工作原理

事件循环使用Reactor模式处理IO事件:

import asyncio
import socket

async def handle_client(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    
    print(f"Received {message!r} from {addr!r}")
    
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888
    )
    
    async with server:
        await server.serve_forever()

asyncio.run(main())

2.2 事件循环实现对比

实现 平台 特点
SelectorEventLoop 跨平台 使用select/poll/epoll
ProactorEventLoop Windows 使用IOCP
uvloop Linux/macOS 高性能第三方实现

2.3 使用uvloop提升性能

import asyncio
import uvloop

# 使用uvloop替换默认事件循环
uvloop.install()

async def main():
    print("Running on uvloop")

asyncio.run(main())

三、协程调度机制

3.1 协程状态机

协程有四种状态:

  • PENDING:初始状态
  • RUNNING:正在执行
  • FINISHED:执行完成
  • CANCELLED:被取消
import asyncio

async def my_coroutine():
    try:
        while True:
            await asyncio.sleep(1)
            print("Running...")
    except asyncio.CancelledError:
        print("Cancelled!")
        raise

async def main():
    task = asyncio.create_task(my_coroutine())
    await asyncio.sleep(3)
    task.cancel()  # 取消任务
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

asyncio.run(main())

3.2 任务调度策略

asyncio使用协作式调度,协程必须主动让出控制权:

async def compute():
    # 计算密集型任务会阻塞事件循环
    result = 0
    for i in range(10**7):
        result += i
    return result

async def io_task():
    print("Starting IO task")
    await asyncio.sleep(0.1)
    print("IO task completed")

async def main():
    # 计算任务会阻塞IO任务
    compute_task = asyncio.create_task(compute())
    io_task = asyncio.create_task(io_task())
    
    await compute_task
    await io_task

asyncio.run(main())

3.3 解决计算密集型任务阻塞问题

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def compute():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        # 在线程池中执行计算密集型任务
        result = await loop.run_in_executor(pool, heavy_compute)
    return result

def heavy_compute():
    result = 0
    for i in range(10**7):
        result += i
    return result

async def main():
    result = await compute()
    print(f"Result: {result}")

asyncio.run(main())

四、并发模式

4.1 并行执行多个任务

async def fetch(url):
    print(f"Fetching {url}")
    await asyncio.sleep(1)
    return f"Result from {url}"

async def main():
    urls = [
        "https://api.example.com/1",
        "https://api.example.com/2",
        "https://api.example.com/3"
    ]
    
    # 使用gather并发执行
    results = await asyncio.gather(*[fetch(url) for url in urls])
    print(results)

asyncio.run(main())

4.2 带超时的任务执行

async def long_running_task():
    await asyncio.sleep(5)
    return "Done"

async def main():
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out")

asyncio.run(main())

4.3 任务优先级控制

async def high_priority():
    print("High priority task started")
    await asyncio.sleep(0.1)
    print("High priority task completed")

async def low_priority():
    print("Low priority task started")
    await asyncio.sleep(1)
    print("Low priority task completed")

async def main():
    # 先启动低优先级任务
    low = asyncio.create_task(low_priority())
    await asyncio.sleep(0.01)  # 让低优先级任务开始
    
    # 启动高优先级任务
    high = asyncio.create_task(high_priority())
    
    await high
    await low

asyncio.run(main())

五、异步IO操作

5.1 文件操作

async def read_file(path):
    loop = asyncio.get_event_loop()
    with open(path, 'r') as f:
        # 使用线程池执行阻塞IO
        content = await loop.run_in_executor(None, f.read)
    return content

async def main():
    content = await read_file('example.txt')
    print(content)

asyncio.run(main())

5.2 网络操作

import asyncio
import aiohttp

async def fetch_with_aiohttp(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    content = await fetch_with_aiohttp('https://www.python.org')
    print(content[:500])

asyncio.run(main())

5.3 数据库操作

import asyncio
import asyncpg

async def query_database():
    conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
    result = await conn.fetch('SELECT * FROM users LIMIT 10')
    await conn.close()
    return result

async def main():
    users = await query_database()
    print(users)

asyncio.run(main())

六、实战:构建异步Web服务

6.1 使用FastAPI

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def heavy_computation():
    await asyncio.sleep(1)
    return {"result": 42}

@app.get("/")
async def read_root():
    return {"message": "Hello World"}

@app.get("/compute")
async def compute():
    return await heavy_computation()

6.2 WebSocket支持

from fastapi import FastAPI, WebSocket
import asyncio

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message received: {data}")
        await asyncio.sleep(0.1)

6.3 中间件实现

import asyncio
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = asyncio.get_event_loop().time()
    response = await call_next(request)
    duration = asyncio.get_event_loop().time() - start_time
    print(f"Request took {duration:.2f} seconds")
    return response

七、性能优化技巧

7.1 避免阻塞调用

# 错误:阻塞事件循环
async def bad_example():
    import time
    time.sleep(1)  # 阻塞!

# 正确:使用异步版本
async def good_example():
    await asyncio.sleep(1)  # 非阻塞

7.2 批量操作优化

async def process_items(items):
    # 分批处理
    batch_size = 100
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        tasks = [process_item(item) for item in batch]
        await asyncio.gather(*tasks)

7.3 连接池管理

import asyncpg
from typing import List

class DatabasePool:
    def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None
    
    async def init(self):
        self.pool = await asyncpg.create_pool(
            dsn=self.dsn,
            min_size=self.min_size,
            max_size=self.max_size
        )
    
    async def query(self, sql: str, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetch(sql, *args)

八、从Python到Rust的异步迁移

8.1 asyncio vs Tokio对比

特性 Python asyncio Rust Tokio
运行时 单线程事件循环 多线程工作窃取
并发模型 协程 协程 + 线程池
性能 较好 接近原生
内存安全 运行时检查 编译时保证

8.2 代码对比

Python版本:

import asyncio

async def fetch(url):
    print(f"Fetching {url}")
    await asyncio.sleep(1)
    return f"Data from {url}"

async def main():
    results = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3")
    )
    print(results)

asyncio.run(main())

Rust版本:

use tokio;

async fn fetch(url: &str) -> String {
    println!("Fetching {}", url);
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    format!("Data from {}", url)
}

#[tokio::main]
async fn main() {
    let results = tokio::join!(
        fetch("url1"),
        fetch("url2"),
        fetch("url3")
    );
    println!("{:?}", results);
}

九、常见问题与解决方案

9.1 事件循环阻塞

# 问题:计算密集型任务阻塞事件循环
async def blocking_task():
    result = 0
    for i in range(10**8):
        result += i

# 解决方案:使用线程池
async def non_blocking_task():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy_compute)
    return result

9.2 协程泄漏

# 问题:忘记await任务
async def main():
    task = asyncio.create_task(long_running())
    # task被丢弃,可能导致资源泄漏

# 解决方案:确保await所有任务
async def main():
    task = asyncio.create_task(long_running())
    await task

9.3 递归深度限制

# 问题:协程递归过深
async def recursive(n):
    if n == 0:
        return 0
    return n + await recursive(n - 1)

# 解决方案:使用循环替代递归
async def iterative(n):
    result = 0
    for i in range(n + 1):
        result += i
    return result

十、总结

asyncio是Python异步编程的核心库,通过事件循环、协程和任务实现高效的并发编程。关键要点包括:

  1. 事件循环:负责调度协程和处理IO事件
  2. 协程:轻量级的并发原语
  3. 任务:协程的并发包装器
  4. 并发模式:gather、wait_for、join等
  5. 性能优化:避免阻塞、使用线程池、批量操作

通过掌握asyncio,你可以构建高性能的异步应用,为后续学习Rust异步编程打下坚实基础。


参考资料

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐