Python 并发编程基础

标签: #Python #并发编程 #多线程 #多进程 #asyncio #GIL
学习周期:2 天 | 核心目标:掌握多线程、多进程、异步IO的基础用法,理解 GIL 限制,能根据任务类型选择合适的并发方式


3.5 并发编程基础

并发编程指在同一时间内执行多个任务,提高程序效率。Python 提供了三种主要的并发方式:

方式 模块 适用场景 特点
多线程 threading IO 密集型任务(网络请求、文件读写) 受 GIL 限制,不能利用多核 CPU
多进程 multiprocessing CPU 密集型任务(计算、加密) 绕过 GIL,利用多核,但进程间通信开销大
异步 IO asyncio 高并发 IO 任务(Web 服务器、爬虫) 单线程异步,高效,需配合异步库

3.5.1 多线程(threading 模块)

GIL(全局解释器锁)

Python 的 CPython 解释器有一个 GIL,同一时刻只允许一个线程执行 Python 字节码。因此多线程不能利用多核 CPU 提升计算性能,但对于 IO 密集型任务(等待网络、磁盘),线程可以在阻塞时让出 GIL,实现并发。

基本用法(两种创建方式)

方式1:直接使用 threading.Thread

import threading
import time

def task(name, delay):
    for i in range(3):
        print(f"线程 {name} 执行第 {i+1} 次,时间:{time.ctime()}")
        time.sleep(delay)   # 模拟 IO 等待

t1 = threading.Thread(target=task, args=("A", 1))
t2 = threading.Thread(target=task, args=("B", 2))

t1.start()
t2.start()
t1.join()
t2.join()
print("所有线程执行完毕")

方式2:继承 threading.Thread 类

class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    def run(self):
        for i in range(3):
            print(f"线程 {self.name} 执行第 {i+1} 次")
            time.sleep(self.delay)

t1 = MyThread("A", 1)
t2 = MyThread("B", 2)
t1.start()
t2.start()
t1.join()
t2.join()
GIL 限制演示(CPU 密集型任务)
def cpu_intensive(n):
    total = 0
    for i in range(n):
        total += i * i
    return total

# 单线程
start = time.time()
cpu_intensive(10**8)
print(f"单线程:{time.time() - start:.2f} 秒")

# 多线程(因 GIL,几乎无提升)
def multi_thread():
    t1 = threading.Thread(target=cpu_intensive, args=(5_000_000,))
    t2 = threading.Thread(target=cpu_intensive, args=(5_000_000,))
    t1.start(); t2.start()
    t1.join(); t2.join()

start = time.time()
multi_thread()
print(f"多线程:{time.time() - start:.2f} 秒")
线程安全与锁
counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)   # 应为 500000
线程池(concurrent.futures.ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor

def task(n):
    time.sleep(1)
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    for f in futures:
        print(f.result())

3.5.2 多进程(multiprocessing 模块)

多进程绕过 GIL,每个进程有独立的内存空间,适合 CPU 密集型任务。

基本用法(两种创建方式)

方式1:直接使用 multiprocessing.Process

import multiprocessing

def cpu_task(n):
    total = sum(i*i for i in range(n))
    print(f"进程 {multiprocessing.current_process().name} 完成")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target=cpu_task, args=(5_000_000,))
    p2 = multiprocessing.Process(target=cpu_task, args=(5_000_000,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

方式2:继承 multiprocessing.Process

class MyProcess(multiprocessing.Process):
    def __init__(self, n):
        super().__init__()
        self.n = n
    def run(self):
        total = sum(i*i for i in range(self.n))
        print(f"进程 {self.name} 完成")

if __name__ == "__main__":
    p1 = MyProcess(5_000_000)
    p2 = MyProcess(5_000_000)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
多进程 vs 多线程(CPU 密集型任务对比)
# 单线程:14.6 秒
# 多线程:13.9 秒(几乎无提升)
# 多进程:7.1 秒(接近翻倍,利用多核)
进程间通信(Queue)
def producer(q):
    for i in range(5):
        q.put(f"数据{i}")
    q.put(None)

def consumer(q):
    while True:
        data = q.get()
        if data is None:
            break
        print(f"消费:{data}")

if __name__ == "__main__":
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
进程池(concurrent.futures.ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor

def square(n):
    return n * n

with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(square, range(10))
    print(list(results))

3.5.3 异步 IO(asyncio

异步 IO 在单线程内通过事件循环实现并发,适合高并发 IO 密集型任务(如爬虫、API 服务)。

基本用法
import asyncio

async def async_task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)   # 模拟异步 IO
    print(f"{name} 结束")
    return f"{name} 完成"

async def main():
    tasks = [
        asyncio.create_task(async_task("任务1", 2)),
        asyncio.create_task(async_task("任务2", 1)),
    ]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())
异步网络请求(需安装 aiohttp
import aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.text()

async def main():
    urls = ["https://www.python.org", "https://www.github.com"]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, html in zip(urls, results):
            print(f"{url} 长度:{len(html)}")

asyncio.run(main())
异步与同步混合(在线程池中运行阻塞任务)
def sync_blocking():
    time.sleep(2)
    return "同步结果"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, sync_blocking)
    print(result)

asyncio.run(main())

3.5.4 三者对比与选择指南

场景 推荐方案 原因
CPU 密集型(图像处理、大量计算) 多进程 绕过 GIL,利用多核
IO 密集型(文件、网络、数据库) 多线程 或 asyncio 线程轻量,asyncio 更高效(万级并发)
高并发 Web 服务器/爬虫 asyncio + aiohttp 单线程处理海量连接,资源占用低
简单并发任务 concurrent.futures API 简洁,自动管理线程/进程池

🎯 练习项目:日志分析脚本(结合生成器、装饰器)

项目需求

  1. 生成器逐行读取大日志文件,避免内存溢出。
  2. 统计日志中不同级别(INFO、ERROR、WARNING)的数量。
  3. 装饰器计算脚本执行时间。
  4. (可选)用多进程优化超大文件的统计效率。

完整代码实现

import time
import functools
from collections import defaultdict

# 装饰器:计时
def timer(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        print(f"{func.__name__} 执行耗时:{end - start:.6f} 秒")
        return result
    return wrapper

# 生成器:逐行读取大文件
def read_lines(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            yield line.strip()

# 统计日志级别
@timer
def count_log_levels(file_path):
    level_counts = defaultdict(int)
    for line in read_lines(file_path):
        # 假设日志格式:... INFO ... / ... ERROR ... / ... WARNING ...
        if " INFO " in line:
            level_counts["INFO"] += 1
        elif " ERROR " in line:
            level_counts["ERROR"] += 1
        elif " WARNING " in line:
            level_counts["WARNING"] += 1
    print("日志统计结果:")
    for level, count in level_counts.items():
        print(f"  {level}: {count}")
    return level_counts

# 生成测试日志(可选)
def generate_test_log(file_path, lines=100000):
    import random
    with open(file_path, "w", encoding="utf-8") as f:
        levels = ["INFO", "ERROR", "WARNING"]
        for i in range(lines):
            level = random.choice(levels)
            f.write(f"[2024-01-01 12:00:{i:02d}] {level}: 测试日志 {i}\n")
    print(f"测试日志已生成:{file_path},共 {lines} 行")

if __name__ == "__main__":
    # 生成测试日志(首次运行取消注释)
    # generate_test_log("sample.log", 1000000)

    # 统计日志
    count_log_levels("sample.log")

扩展要求

  • 使用多进程并发分析多个日志文件。
  • 支持命令行参数指定文件路径(使用 argparse)。
  • 将结果输出为 JSON 文件。

📚 学习资料(Obsidian 可直接收藏)


🎯 学习建议(2 天计划)

  1. 第 1 天:理解 GIL,练习多线程(锁、线程池)和多进程(进程池、队列),对比 CPU 密集型任务的性能差异。
  2. 第 2 天:学习 asyncio 基本语法(async/await、事件循环),尝试用 aiohttp 写简单异步爬虫,完成日志分析项目。

✅ 核心要点总结

  1. GIL:CPython 的全局解释器锁,导致多线程无法利用多核 CPU。
  2. 多线程:适合 IO 密集型任务,注意线程安全(使用锁),可用 ThreadPoolExecutor
  3. 多进程:适合 CPU 密集型任务,绕过 GIL,进程间通信使用 Queue 或 Pipe,可用 ProcessPoolExecutor
  4. 异步 IO:单线程高并发,使用 asyncio + 异步库(如 aiohttp),代码复杂但效率极高。
  5. 选择原则:CPU 密集型 → 多进程;IO 密集型 → 多线程或 asyncio(高并发选 asyncio)。
  6. 实战技巧:生成器处理大文件,装饰器计时,多进程可进一步优化日志分析效率。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐