AI_Python基础-8.并发编程基础
·
Python 并发编程基础
标签: #Python #并发编程 #多线程 #多进程 #asyncio #GIL
学习周期:2 天 | 核心目标:掌握多线程、多进程、异步IO的基础用法,理解 GIL 限制,能根据任务类型选择合适的并发方式
3.5 并发编程基础
并发编程指在同一时间内执行多个任务,提高程序效率。Python 提供了三种主要的并发方式:
| 方式 | 模块 | 适用场景 | 特点 |
|---|---|---|---|
| 多线程 | threading |
IO 密集型任务(网络请求、文件读写) | 受 GIL 限制,不能利用多核 CPU |
| 多进程 | multiprocessing |
CPU 密集型任务(计算、加密) | 绕过 GIL,利用多核,但进程间通信开销大 |
| 异步 IO | asyncio |
高并发 IO 任务(Web 服务器、爬虫) | 单线程异步,高效,需配合异步库 |
3.5.1 多线程(threading 模块)
GIL(全局解释器锁)
Python 的 CPython 解释器有一个 GIL,同一时刻只允许一个线程执行 Python 字节码。因此多线程不能利用多核 CPU 提升计算性能,但对于 IO 密集型任务(等待网络、磁盘),线程可以在阻塞时让出 GIL,实现并发。
基本用法(两种创建方式)
方式1:直接使用 threading.Thread
import threading
import time
def task(name, delay):
for i in range(3):
print(f"线程 {name} 执行第 {i+1} 次,时间:{time.ctime()}")
time.sleep(delay) # 模拟 IO 等待
t1 = threading.Thread(target=task, args=("A", 1))
t2 = threading.Thread(target=task, args=("B", 2))
t1.start()
t2.start()
t1.join()
t2.join()
print("所有线程执行完毕")
方式2:继承 threading.Thread 类
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
for i in range(3):
print(f"线程 {self.name} 执行第 {i+1} 次")
time.sleep(self.delay)
t1 = MyThread("A", 1)
t2 = MyThread("B", 2)
t1.start()
t2.start()
t1.join()
t2.join()
GIL 限制演示(CPU 密集型任务)
def cpu_intensive(n):
total = 0
for i in range(n):
total += i * i
return total
# 单线程
start = time.time()
cpu_intensive(10**8)
print(f"单线程:{time.time() - start:.2f} 秒")
# 多线程(因 GIL,几乎无提升)
def multi_thread():
t1 = threading.Thread(target=cpu_intensive, args=(5_000_000,))
t2 = threading.Thread(target=cpu_intensive, args=(5_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
start = time.time()
multi_thread()
print(f"多线程:{time.time() - start:.2f} 秒")
线程安全与锁
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 应为 500000
线程池(concurrent.futures.ThreadPoolExecutor)
from concurrent.futures import ThreadPoolExecutor
def task(n):
time.sleep(1)
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for f in futures:
print(f.result())
3.5.2 多进程(multiprocessing 模块)
多进程绕过 GIL,每个进程有独立的内存空间,适合 CPU 密集型任务。
基本用法(两种创建方式)
方式1:直接使用 multiprocessing.Process
import multiprocessing
def cpu_task(n):
total = sum(i*i for i in range(n))
print(f"进程 {multiprocessing.current_process().name} 完成")
if __name__ == "__main__":
p1 = multiprocessing.Process(target=cpu_task, args=(5_000_000,))
p2 = multiprocessing.Process(target=cpu_task, args=(5_000_000,))
p1.start()
p2.start()
p1.join()
p2.join()
方式2:继承 multiprocessing.Process
class MyProcess(multiprocessing.Process):
def __init__(self, n):
super().__init__()
self.n = n
def run(self):
total = sum(i*i for i in range(self.n))
print(f"进程 {self.name} 完成")
if __name__ == "__main__":
p1 = MyProcess(5_000_000)
p2 = MyProcess(5_000_000)
p1.start()
p2.start()
p1.join()
p2.join()
多进程 vs 多线程(CPU 密集型任务对比)
# 单线程:14.6 秒
# 多线程:13.9 秒(几乎无提升)
# 多进程:7.1 秒(接近翻倍,利用多核)
进程间通信(Queue)
def producer(q):
for i in range(5):
q.put(f"数据{i}")
q.put(None)
def consumer(q):
while True:
data = q.get()
if data is None:
break
print(f"消费:{data}")
if __name__ == "__main__":
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
进程池(concurrent.futures.ProcessPoolExecutor)
from concurrent.futures import ProcessPoolExecutor
def square(n):
return n * n
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(square, range(10))
print(list(results))
3.5.3 异步 IO(asyncio)
异步 IO 在单线程内通过事件循环实现并发,适合高并发 IO 密集型任务(如爬虫、API 服务)。
基本用法
import asyncio
async def async_task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay) # 模拟异步 IO
print(f"{name} 结束")
return f"{name} 完成"
async def main():
tasks = [
asyncio.create_task(async_task("任务1", 2)),
asyncio.create_task(async_task("任务2", 1)),
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
异步网络请求(需安装 aiohttp)
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = ["https://www.python.org", "https://www.github.com"]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, html in zip(urls, results):
print(f"{url} 长度:{len(html)}")
asyncio.run(main())
异步与同步混合(在线程池中运行阻塞任务)
def sync_blocking():
time.sleep(2)
return "同步结果"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, sync_blocking)
print(result)
asyncio.run(main())
3.5.4 三者对比与选择指南
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| CPU 密集型(图像处理、大量计算) | 多进程 | 绕过 GIL,利用多核 |
| IO 密集型(文件、网络、数据库) | 多线程 或 asyncio | 线程轻量,asyncio 更高效(万级并发) |
| 高并发 Web 服务器/爬虫 | asyncio + aiohttp | 单线程处理海量连接,资源占用低 |
| 简单并发任务 | concurrent.futures |
API 简洁,自动管理线程/进程池 |
🎯 练习项目:日志分析脚本(结合生成器、装饰器)
项目需求
- 用生成器逐行读取大日志文件,避免内存溢出。
- 统计日志中不同级别(INFO、ERROR、WARNING)的数量。
- 用装饰器计算脚本执行时间。
- (可选)用多进程优化超大文件的统计效率。
完整代码实现
import time
import functools
from collections import defaultdict
# 装饰器:计时
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} 执行耗时:{end - start:.6f} 秒")
return result
return wrapper
# 生成器:逐行读取大文件
def read_lines(file_path):
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
yield line.strip()
# 统计日志级别
@timer
def count_log_levels(file_path):
level_counts = defaultdict(int)
for line in read_lines(file_path):
# 假设日志格式:... INFO ... / ... ERROR ... / ... WARNING ...
if " INFO " in line:
level_counts["INFO"] += 1
elif " ERROR " in line:
level_counts["ERROR"] += 1
elif " WARNING " in line:
level_counts["WARNING"] += 1
print("日志统计结果:")
for level, count in level_counts.items():
print(f" {level}: {count}")
return level_counts
# 生成测试日志(可选)
def generate_test_log(file_path, lines=100000):
import random
with open(file_path, "w", encoding="utf-8") as f:
levels = ["INFO", "ERROR", "WARNING"]
for i in range(lines):
level = random.choice(levels)
f.write(f"[2024-01-01 12:00:{i:02d}] {level}: 测试日志 {i}\n")
print(f"测试日志已生成:{file_path},共 {lines} 行")
if __name__ == "__main__":
# 生成测试日志(首次运行取消注释)
# generate_test_log("sample.log", 1000000)
# 统计日志
count_log_levels("sample.log")
扩展要求
- 使用多进程并发分析多个日志文件。
- 支持命令行参数指定文件路径(使用
argparse)。 - 将结果输出为 JSON 文件。
📚 学习资料(Obsidian 可直接收藏)
🎯 学习建议(2 天计划)
- 第 1 天:理解 GIL,练习多线程(锁、线程池)和多进程(进程池、队列),对比 CPU 密集型任务的性能差异。
- 第 2 天:学习
asyncio基本语法(async/await、事件循环),尝试用aiohttp写简单异步爬虫,完成日志分析项目。
✅ 核心要点总结
- GIL:CPython 的全局解释器锁,导致多线程无法利用多核 CPU。
- 多线程:适合 IO 密集型任务,注意线程安全(使用锁),可用
ThreadPoolExecutor。 - 多进程:适合 CPU 密集型任务,绕过 GIL,进程间通信使用
Queue或Pipe,可用ProcessPoolExecutor。 - 异步 IO:单线程高并发,使用
asyncio+ 异步库(如aiohttp),代码复杂但效率极高。 - 选择原则:CPU 密集型 → 多进程;IO 密集型 → 多线程或 asyncio(高并发选 asyncio)。
- 实战技巧:生成器处理大文件,装饰器计时,多进程可进一步优化日志分析效率。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)