更新时间:2026-05-20 | 阅读时长:15分钟⚡


前言

你有没有遇到过这种情况?

  • 下载100张图片,一张一张下,要等半天
  • 爬取数据,程序卡在等待网络响应上
  • 自动化处理文件,CPU跑满了却发现大部分时间在干等

问题在哪?程序太"老实"了,一次只干一件事。

今天这篇文章,教你让Python"一心多用"——同时下载、同时爬取、同时处理。效率提升5到10倍不是梦。


一、为什么需要并发?

先看一个真实的场景:

任务:下载10张图片,每张3秒

python

复制

import time
import requests

def download_image(i):
    """下载一张图片,耗时3秒"""
    print(f"开始下载第{i}张图片...")
    time.sleep(3)  # 模拟网络请求
    print(f"第{i}张图片下载完成!")

# 单线程:逐一下载
start = time.time()
for i in range(1, 11):
    download_image(i)
end = time.time()

print(f"\n总耗时:{end - start:.2f}秒")

运行结果:

开始下载第1张图片...
第1张图片下载完成!
开始下载第2张图片...
...
总耗时:30.00秒

10张图片花了30秒,因为它们在"排队"。

现在用并发,同样的任务:

python

复制

import concurrent.futures
import time

# 多线程:同时下载
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(download_image, i) for i in range(1, 11)]
    for f in concurrent.futures.as_completed(futures):
        pass
end = time.time()

print(f"\n总耗时:{end - start:.2f}秒")

运行结果:

开始下载第1张图片...
开始下载第2张图片...
...
总耗时:3.05秒

从30秒降到3秒,效率提升10倍!

这就是并发的威力。


二、Python并发三剑客

Python提供三种并发方式,各有适用场景:

方式 适用场景 特点
多线程(Threading) I/O密集型任务(网络请求、文件读写) 简单易用,受GIL限制
多进程(Multiprocessing) CPU密集型任务(计算、图像处理) 真正并行,资源消耗大
异步(Asyncio) 超高并发I/O任务(聊天服务器、爬虫) 效率最高,代码风格特殊

三、多线程:threading模块

3.1 基本用法

python

复制

import threading
import time

def task(name, seconds):
    """模拟耗时任务"""
    print(f"[{name}] 开始执行...")
    time.sleep(seconds)
    print(f"[{name}] 执行完成!(耗时{seconds}秒)")

# 创建线程
t1 = threading.Thread(target=task, args=("任务A", 2))
t2 = threading.Thread(target=task, args=("任务B", 3))

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print("\n所有任务完成!")

运行结果:

[任务A] 开始执行...
[任务B] 开始执行...
[任务A] 执行完成!(耗时2秒)
[任务B] 执行完成!(耗时3秒)

所有任务完成!

3.2 线程池:concurrent.futures

上面的方式需要手动管理线程。更推荐用线程池:

python

复制

import concurrent.futures
import time

def fetch_url(url):
    """模拟网络请求"""
    time.sleep(1)  # 假设每个请求耗时1秒
    return f"成功获取:{url}"

# 要请求的URL列表
urls = [
    "https://api.example.com/data1",
    "https://api.example.com/data2",
    "https://api.example.com/data3",
    "https://api.example.com/data4",
    "https://api.example.com/data5",
]

# 使用线程池并发请求
start = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 提交所有任务
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    
    # 获取结果(按完成顺序)
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            result = future.result()
            print(result)
        except Exception as e:
            print(f"请求失败:{url}, 错误:{e}")

end = time.time()
print(f"\n总耗时:{end - start:.2f}秒(串行需要{len(urls)}秒)")

3.3 带返回值的批量处理

python

复制

import concurrent.futures
import math

def calculate_square(n):
    """计算平方"""
    return n, n ** 2

# 批量计算
numbers = list(range(1, 21))

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(calculate_square, numbers))

print("计算结果:")
for num, square in results:
    print(f"  {num}² = {square}")

四、多进程:multiprocessing模块

4.1 为什么需要多进程?

Python有个叫**GIL(全局解释器锁)**的东西,它限制了同一时刻只有一个线程执行Python字节码。

对于CPU密集型任务(数学计算、图像处理、机器学习),多线程反而比单线程慢。这时候要用多进程。

python

复制

import multiprocessing
import time

def cpu_task(n):
    """CPU密集型任务:计算素数"""
    count = 0
    for i in range(2, n):
        is_prime = True
        for j in range(2, int(i**0.5) + 1):
            if i % j == 0:
                is_prime = False
                break
        if is_prime:
            count += 1
    return count

# 单进程
start = time.time()
result1 = cpu_task(100000)
end = time.time()
print(f"单进程耗时:{end - start:.2f}秒,找到{result1}个素数")

# 多进程
start = time.time()
with multiprocessing.Pool(processes=4) as pool:
    results = pool.map(cpu_task, [25000] * 4)
end = time.time()
print(f"多进程耗时:{end - start:.2f}秒,找到{sum(results)}个素数")

4.2 多进程池

python

复制

import multiprocessing as mp

def process_file(filename):
    """处理单个文件"""
    with open(filename, 'r', encoding='utf-8') as f:
        content = f.read()
    return filename, len(content)

# 文件列表
files = [f"data_{i}.txt" for i in range(1, 21)]

# 使用进程池处理
with mp.Pool(processes=mp.cpu_count()) as pool:
    results = pool.map(process_file, files)

print("文件处理结果:")
for fname, size in results:
    print(f"  {fname}: {size}字符")

五、异步编程:asyncio

asyncio是Python最强大的并发方式,特别适合I/O密集型高并发场景

5.1 基本概念

python

复制

import asyncio

async def say_hello(name, seconds):
    """异步函数"""
    print(f"[{name}] 开始...")
    await asyncio.sleep(seconds)  # 模拟异步I/O操作
    print(f"[{name}] 完成!")

async def main():
    """异步主函数"""
    # 同时执行3个任务
    await asyncio.gather(
        say_hello("任务1", 2),
        say_hello("任务2", 3),
        say_hello("任务3", 1),
    )

# 运行
asyncio.run(main())

5.2 异步HTTP请求

python

复制

import asyncio
import aiohttp

async def fetch(session, url):
    """异步获取URL"""
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
    ]

    async with aiohttp.ClientSession() as session:
        import time
        start = time.time()
        
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        end = time.time()
        print(f"并发请求{len(urls)}个URL,耗时:{end - start:.2f}秒")
        print(f"串行需要:约{len(urls)}秒")

asyncio.run(main())

5.3 异步文件操作

python

复制

import asyncio

async def read_file(path):
    """异步读取文件"""
    await asyncio.to_thread(read_file_sync, path)

def read_file_sync(path):
    """同步读取(在新线程中运行)"""
    with open(path, 'r', encoding='utf-8') as f:
        return f.read()

async def main():
    # 同时读取多个文件
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [asyncio.to_thread(read_file_sync, f) for f in files]
    contents = await asyncio.gather(*tasks)
    
    for f, content in zip(files, contents):
        print(f"{f}: {len(content)}字符")

asyncio.run(main())

六、实战:并发爬虫

把并发技术用到爬虫上,效率翻倍!

6.1 基础版:多线程爬虫

python

复制

import concurrent.futures
import requests
from bs4 import BeautifulSoup
import time

def crawl_page(url):
    """爬取单个页面"""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        soup = BeautifulSoup(response.text, 'html.parser')
        
        title = soup.select_one('h3.title').get_text(strip=True)
        score = soup.select_one('span.score').get_text(strip=True)
        
        return {'url': url, 'title': title, 'score': score}
    except Exception as e:
        return {'url': url, 'error': str(e)}

# 待爬取的URL列表
urls = [f'https://example.com/item/{i}' for i in range(1, 101)]

print(f"开始爬取{len(urls)}个页面...")
start = time.time()

# 使用20个线程并发爬取
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    results = list(executor.map(crawl_page, urls))

# 统计结果
success = sum(1 for r in results if 'error' not in r)
failed = len(results) - success

end = time.time()
print(f"\n爬取完成!成功:{success},失败:{failed}")
print(f"总耗时:{end - start:.2f}秒(串行约需{len(urls) * 2}秒)")

6.2 进阶版:异步爬虫

python

复制

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def crawl_async(session, url):
    """异步爬取单个页面"""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    try:
        async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
            html = await response.text()
            soup = BeautifulSoup(html, 'html.parser')
            title = soup.select_one('h3.title').get_text(strip=True)
            return {'url': url, 'title': title}
    except Exception as e:
        return {'url': url, 'error': str(e)}

async def main_async(urls):
    """异步主函数"""
    async with aiohttp.ClientSession() as session:
        tasks = [crawl_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# 待爬取的URL列表
urls = [f'https://example.com/item/{i}' for i in range(1, 101)]

print(f"开始异步爬取{len(urls)}个页面...")
start = time.time()

results = asyncio.run(main_async(urls))

end = time.time()
success = sum(1 for r in results if 'error' not in r)
print(f"\n爬取完成!成功:{success},失败:{len(results) - success}")
print(f"总耗时:{end - start:.2f}秒")

七、实战:并发文件处理

批量处理文件,同样可以用并发加速:

python

复制

import concurrent.futures
import os
import time

def process_image(filename):
    """模拟图片处理:缩放、加水印、压缩"""
    time.sleep(0.5)  # 模拟耗时处理
    return f"处理完成:{filename}"

def process_batch(image_files, max_workers=10):
    """批量处理图片"""
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_image, f): f for f in image_files}
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    return results

# 模拟文件列表
image_files = [f"image_{i}.jpg" for i in range(1, 51)]

print(f"开始处理{len(image_files)}张图片...")
start = time.time()

results = process_batch(image_files, max_workers=10)

end = time.time()
print(f"\n处理完成!共{len(results)}张")
print(f"总耗时:{end - start:.2f}秒(串行需{len(image_files) * 0.5}秒)")

八、线程安全与锁

多个线程同时访问同一资源时,可能出现数据竞争问题:

python

复制

import threading

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1  # 非原子操作,可能出错

# 创建10个线程
threads = [threading.Thread(target=increment) for _ in range(10)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"预期结果:1000000,实际结果:{counter}")

结果往往不是1000000!需要加锁:

python

复制

import threading

counter = 0
lock = threading.Lock()

def increment_safe():
    global counter
    for _ in range(100000):
        with lock:  # 加锁保护
            counter += 1

threads = [threading.Thread(target=increment_safe) for _ in range(10)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"预期结果:1000000,实际结果:{counter}")

九、最佳实践与注意事项

9.1 选择合适的并发方式

python

复制

# I/O密集型:用多线程或异步
if task_type == "io_bound":
    # 方案1:多线程(简单)
    with ThreadPoolExecutor(max_workers=20) as executor:
        results = executor.map(do_io_task, items)
    
    # 方案2:异步(高效,推荐)
    results = asyncio.run(async_io_tasks(items))

# CPU密集型:用多进程
else:
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        results = executor.map(do_cpu_task, items)

9.2 控制并发数量

不要无限制创建线程,会耗尽系统资源:

python

复制

# ✅ 合理控制线程数
with ThreadPoolExecutor(max_workers=20) as executor:
    # 分批处理
    for batch in chunks(items, 100):
        executor.map(task, batch)

# ❌ 无限制创建(危险!)
for item in huge_list:
    threading.Thread(target=task, args=(item,)).start()

9.3 异常处理

python

复制

import concurrent.futures

def safe_task(item):
    try:
        return process(item)
    except Exception as e:
        return {'error': str(e), 'item': item}

with ThreadPoolExecutor(max_workers=10) as executor:
    results = executor.map(safe_task, items)
    for result in results:
        if 'error' in result:
            print(f"处理失败:{result}")

总结

今天这篇文章,涵盖了Python并发的核心技能:

方式 工具 适用场景
多线程 threadingThreadPoolExecutor I/O密集型:网络请求、文件读写
多进程 multiprocessingProcessPoolExecutor CPU密集型:计算、图像处理
异步 asyncioaiohttp 超高并发I/O:爬虫、服务器

整个思路就是:识别I/O等待时间,用并发填满等待空隙

结合前面学过的爬虫、自动化办公,你现在可以让这些任务"同时进行",效率提升立竿见影。


👍 如果这篇文章对你有帮助,记得点赞! 在评论区说说,你项目中哪个场景最需要并发优化?

我是AI实验室2026,专注AI大模型与Python技术,我们下篇见!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐