AI实验室2026_第8篇_Python多线程与并发编程教程
更新时间:2026-05-20 | 阅读时长:15分钟⚡
前言
你有没有遇到过这种情况?
- 下载100张图片,一张一张下,要等半天
- 爬取数据,程序卡在等待网络响应上
- 自动化处理文件,CPU跑满了却发现大部分时间在干等
问题在哪?程序太"老实"了,一次只干一件事。
今天这篇文章,教你让Python"一心多用"——同时下载、同时爬取、同时处理。效率提升5到10倍不是梦。
一、为什么需要并发?
先看一个真实的场景:
任务:下载10张图片,每张3秒
python
复制
import time
import requests
def download_image(i):
"""下载一张图片,耗时3秒"""
print(f"开始下载第{i}张图片...")
time.sleep(3) # 模拟网络请求
print(f"第{i}张图片下载完成!")
# 单线程:逐一下载
start = time.time()
for i in range(1, 11):
download_image(i)
end = time.time()
print(f"\n总耗时:{end - start:.2f}秒")
运行结果:
开始下载第1张图片...
第1张图片下载完成!
开始下载第2张图片...
...
总耗时:30.00秒
10张图片花了30秒,因为它们在"排队"。
现在用并发,同样的任务:
python
复制
import concurrent.futures
import time
# 多线程:同时下载
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(download_image, i) for i in range(1, 11)]
for f in concurrent.futures.as_completed(futures):
pass
end = time.time()
print(f"\n总耗时:{end - start:.2f}秒")
运行结果:
开始下载第1张图片...
开始下载第2张图片...
...
总耗时:3.05秒
从30秒降到3秒,效率提升10倍!
这就是并发的威力。
二、Python并发三剑客
Python提供三种并发方式,各有适用场景:
| 方式 | 适用场景 | 特点 |
|---|---|---|
| 多线程(Threading) | I/O密集型任务(网络请求、文件读写) | 简单易用,受GIL限制 |
| 多进程(Multiprocessing) | CPU密集型任务(计算、图像处理) | 真正并行,资源消耗大 |
| 异步(Asyncio) | 超高并发I/O任务(聊天服务器、爬虫) | 效率最高,代码风格特殊 |
三、多线程:threading模块
3.1 基本用法
python
复制
import threading
import time
def task(name, seconds):
"""模拟耗时任务"""
print(f"[{name}] 开始执行...")
time.sleep(seconds)
print(f"[{name}] 执行完成!(耗时{seconds}秒)")
# 创建线程
t1 = threading.Thread(target=task, args=("任务A", 2))
t2 = threading.Thread(target=task, args=("任务B", 3))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("\n所有任务完成!")
运行结果:
[任务A] 开始执行...
[任务B] 开始执行...
[任务A] 执行完成!(耗时2秒)
[任务B] 执行完成!(耗时3秒)
所有任务完成!
3.2 线程池:concurrent.futures
上面的方式需要手动管理线程。更推荐用线程池:
python
复制
import concurrent.futures
import time
def fetch_url(url):
"""模拟网络请求"""
time.sleep(1) # 假设每个请求耗时1秒
return f"成功获取:{url}"
# 要请求的URL列表
urls = [
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
"https://api.example.com/data4",
"https://api.example.com/data5",
]
# 使用线程池并发请求
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# 获取结果(按完成顺序)
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"请求失败:{url}, 错误:{e}")
end = time.time()
print(f"\n总耗时:{end - start:.2f}秒(串行需要{len(urls)}秒)")
3.3 带返回值的批量处理
python
复制
import concurrent.futures
import math
def calculate_square(n):
"""计算平方"""
return n, n ** 2
# 批量计算
numbers = list(range(1, 21))
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(calculate_square, numbers))
print("计算结果:")
for num, square in results:
print(f" {num}² = {square}")
四、多进程:multiprocessing模块
4.1 为什么需要多进程?
Python有个叫**GIL(全局解释器锁)**的东西,它限制了同一时刻只有一个线程执行Python字节码。
对于CPU密集型任务(数学计算、图像处理、机器学习),多线程反而比单线程慢。这时候要用多进程。
python
复制
import multiprocessing
import time
def cpu_task(n):
"""CPU密集型任务:计算素数"""
count = 0
for i in range(2, n):
is_prime = True
for j in range(2, int(i**0.5) + 1):
if i % j == 0:
is_prime = False
break
if is_prime:
count += 1
return count
# 单进程
start = time.time()
result1 = cpu_task(100000)
end = time.time()
print(f"单进程耗时:{end - start:.2f}秒,找到{result1}个素数")
# 多进程
start = time.time()
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(cpu_task, [25000] * 4)
end = time.time()
print(f"多进程耗时:{end - start:.2f}秒,找到{sum(results)}个素数")
4.2 多进程池
python
复制
import multiprocessing as mp
def process_file(filename):
"""处理单个文件"""
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
return filename, len(content)
# 文件列表
files = [f"data_{i}.txt" for i in range(1, 21)]
# 使用进程池处理
with mp.Pool(processes=mp.cpu_count()) as pool:
results = pool.map(process_file, files)
print("文件处理结果:")
for fname, size in results:
print(f" {fname}: {size}字符")
五、异步编程:asyncio
asyncio是Python最强大的并发方式,特别适合I/O密集型高并发场景。
5.1 基本概念
python
复制
import asyncio
async def say_hello(name, seconds):
"""异步函数"""
print(f"[{name}] 开始...")
await asyncio.sleep(seconds) # 模拟异步I/O操作
print(f"[{name}] 完成!")
async def main():
"""异步主函数"""
# 同时执行3个任务
await asyncio.gather(
say_hello("任务1", 2),
say_hello("任务2", 3),
say_hello("任务3", 1),
)
# 运行
asyncio.run(main())
5.2 异步HTTP请求
python
复制
import asyncio
import aiohttp
async def fetch(session, url):
"""异步获取URL"""
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
import time
start = time.time()
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end = time.time()
print(f"并发请求{len(urls)}个URL,耗时:{end - start:.2f}秒")
print(f"串行需要:约{len(urls)}秒")
asyncio.run(main())
5.3 异步文件操作
python
复制
import asyncio
async def read_file(path):
"""异步读取文件"""
await asyncio.to_thread(read_file_sync, path)
def read_file_sync(path):
"""同步读取(在新线程中运行)"""
with open(path, 'r', encoding='utf-8') as f:
return f.read()
async def main():
# 同时读取多个文件
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = [asyncio.to_thread(read_file_sync, f) for f in files]
contents = await asyncio.gather(*tasks)
for f, content in zip(files, contents):
print(f"{f}: {len(content)}字符")
asyncio.run(main())
六、实战:并发爬虫
把并发技术用到爬虫上,效率翻倍!
6.1 基础版:多线程爬虫
python
复制
import concurrent.futures
import requests
from bs4 import BeautifulSoup
import time
def crawl_page(url):
"""爬取单个页面"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.select_one('h3.title').get_text(strip=True)
score = soup.select_one('span.score').get_text(strip=True)
return {'url': url, 'title': title, 'score': score}
except Exception as e:
return {'url': url, 'error': str(e)}
# 待爬取的URL列表
urls = [f'https://example.com/item/{i}' for i in range(1, 101)]
print(f"开始爬取{len(urls)}个页面...")
start = time.time()
# 使用20个线程并发爬取
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
results = list(executor.map(crawl_page, urls))
# 统计结果
success = sum(1 for r in results if 'error' not in r)
failed = len(results) - success
end = time.time()
print(f"\n爬取完成!成功:{success},失败:{failed}")
print(f"总耗时:{end - start:.2f}秒(串行约需{len(urls) * 2}秒)")
6.2 进阶版:异步爬虫
python
复制
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
async def crawl_async(session, url):
"""异步爬取单个页面"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.select_one('h3.title').get_text(strip=True)
return {'url': url, 'title': title}
except Exception as e:
return {'url': url, 'error': str(e)}
async def main_async(urls):
"""异步主函数"""
async with aiohttp.ClientSession() as session:
tasks = [crawl_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 待爬取的URL列表
urls = [f'https://example.com/item/{i}' for i in range(1, 101)]
print(f"开始异步爬取{len(urls)}个页面...")
start = time.time()
results = asyncio.run(main_async(urls))
end = time.time()
success = sum(1 for r in results if 'error' not in r)
print(f"\n爬取完成!成功:{success},失败:{len(results) - success}")
print(f"总耗时:{end - start:.2f}秒")
七、实战:并发文件处理
批量处理文件,同样可以用并发加速:
python
复制
import concurrent.futures
import os
import time
def process_image(filename):
"""模拟图片处理:缩放、加水印、压缩"""
time.sleep(0.5) # 模拟耗时处理
return f"处理完成:{filename}"
def process_batch(image_files, max_workers=10):
"""批量处理图片"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_image, f): f for f in image_files}
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
# 模拟文件列表
image_files = [f"image_{i}.jpg" for i in range(1, 51)]
print(f"开始处理{len(image_files)}张图片...")
start = time.time()
results = process_batch(image_files, max_workers=10)
end = time.time()
print(f"\n处理完成!共{len(results)}张")
print(f"总耗时:{end - start:.2f}秒(串行需{len(image_files) * 0.5}秒)")
八、线程安全与锁
多个线程同时访问同一资源时,可能出现数据竞争问题:
python
复制
import threading
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # 非原子操作,可能出错
# 创建10个线程
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"预期结果:1000000,实际结果:{counter}")
结果往往不是1000000!需要加锁:
python
复制
import threading
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100000):
with lock: # 加锁保护
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"预期结果:1000000,实际结果:{counter}")
九、最佳实践与注意事项
9.1 选择合适的并发方式
python
复制
# I/O密集型:用多线程或异步
if task_type == "io_bound":
# 方案1:多线程(简单)
with ThreadPoolExecutor(max_workers=20) as executor:
results = executor.map(do_io_task, items)
# 方案2:异步(高效,推荐)
results = asyncio.run(async_io_tasks(items))
# CPU密集型:用多进程
else:
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
results = executor.map(do_cpu_task, items)
9.2 控制并发数量
不要无限制创建线程,会耗尽系统资源:
python
复制
# ✅ 合理控制线程数
with ThreadPoolExecutor(max_workers=20) as executor:
# 分批处理
for batch in chunks(items, 100):
executor.map(task, batch)
# ❌ 无限制创建(危险!)
for item in huge_list:
threading.Thread(target=task, args=(item,)).start()
9.3 异常处理
python
复制
import concurrent.futures
def safe_task(item):
try:
return process(item)
except Exception as e:
return {'error': str(e), 'item': item}
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(safe_task, items)
for result in results:
if 'error' in result:
print(f"处理失败:{result}")
总结
今天这篇文章,涵盖了Python并发的核心技能:
| 方式 | 工具 | 适用场景 |
|---|---|---|
| 多线程 | threading, ThreadPoolExecutor |
I/O密集型:网络请求、文件读写 |
| 多进程 | multiprocessing, ProcessPoolExecutor |
CPU密集型:计算、图像处理 |
| 异步 | asyncio, aiohttp |
超高并发I/O:爬虫、服务器 |
整个思路就是:识别I/O等待时间,用并发填满等待空隙。
结合前面学过的爬虫、自动化办公,你现在可以让这些任务"同时进行",效率提升立竿见影。
👍 如果这篇文章对你有帮助,记得点赞! 在评论区说说,你项目中哪个场景最需要并发优化?
我是AI实验室2026,专注AI大模型与Python技术,我们下篇见!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)