前言

在Python后端开发中,并发编程和网络通信是两大核心技能。无论是构建高并发的Web服务,还是实现实时通信的聊天应用,都离不开对进程、线程、协程以及Socket编程的深入理解。本文将基于实际代码案例,系统梳理Python并发编程与网络通信的知识体系,助你从理论到实战全面掌握。

一、网络通信基石:TCP与UDP

1.1 核心概念速览

概念 说明
IP地址 网络中设备的唯一标识,分为IPv4(4字节)和IPv6(8字节)
端口号 标识进程的逻辑地址,范围0-65535,其中1024-65535为动态端口
TCP 面向连接、可靠传输,需三次握手建立连接,适用于文件下载、网页浏览
UDP 无连接、不可靠但高效,适用于视频直播、语音通话等实时场景
Socket 进程间通信的工具,封装了TCP/UDP协议细节

1.2 TCP服务端与客户端实战

服务端核心步骤:

python

import socket

# 1. 创建socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 2. 绑定IP和端口
server_socket.bind(('0.0.0.0', 8888))

# 3. 设置监听
server_socket.listen(10)
print("服务端已启动,等待客户端连接...")

# 4. 接受客户端连接
client_socket, client_addr = server_socket.accept()
print(f"客户端连接成功:{client_addr}")

# 5. 收发数据
client_socket.send(b'Hello Client')
data = client_socket.recv(1024).decode('utf-8')
print("客户端发来的消息:", data)

# 6. 关闭连接
client_socket.close()
server_socket.close()

客户端核心步骤:

python

import socket

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('服务器IP', 8888))

data = client_socket.recv(1024).decode('utf-8')
print("服务器发来:", data)

client_socket.send('我是客户端'.encode('utf-8'))
client_socket.close()

1.3 UDP群聊系统实现

UDP无需建立连接,适合广播式通信。以下是一个简易群聊服务器的核心逻辑:

python

import socket
from threading import Thread, Lock

online_clients = set()
lock = Lock()

def handle_client(client_data, client_addr):
    message = client_data.decode().strip()
    with lock:
        if message == '[上线]':
            online_clients.add(client_addr)
            broadcast(f'服务器提示:{client_addr} 进入群聊', client_addr)
        elif message == '[下线]':
            online_clients.discard(client_addr)
            broadcast(f'服务器提示:{client_addr} 退出群聊', client_addr)
        else:
            broadcast(f'{client_addr}: {message}', client_addr)

def broadcast(message, sender_addr):
    for client in list(online_clients):
        if client != sender_addr:
            socket_obj.sendto(message.encode(), client)

# 主循环接收数据并为每个客户端创建处理线程

二、多进程编程:数据隔离与通信

2.1 进程创建与基本方法

python

from multiprocessing import Process
import os

def worker(name):
    print(f'子进程 {name},PID: {os.getpid()},父进程PID: {os.getppid()}')

if __name__ == '__main__':
    p = Process(target=worker, args=('Task1',))
    p.start()
    p.join()  # 等待子进程结束

关键方法解析:

  • start():启动子进程

  • join([timeout]):阻塞主进程等待子进程结束

  • terminate():强制终止子进程

  • daemon=True:设置守护进程,主进程结束时子进程自动终止

2.2 进程锁:解决资源竞争

python

from multiprocessing import Process, Lock

def printer(lock, msg):
    with lock:  # 自动获取和释放锁
        for char in msg:
            print(char, end='')
        print()

if __name__ == '__main__':
    lock = Lock()
    p1 = Process(target=printer, args=(lock, 'Hello'))
    p2 = Process(target=printer, args=(lock, 'World'))
    p1.start()
    p2.start()

Lock vs RLock:

  • Lock:不可重入,同一进程只能获取一次

  • RLock:可重入锁,允许同一进程多次获取,需对应多次释放

2.3 进程间通信:Queue与Pipe

使用Queue实现生产者-消费者:

python

from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        q.put(f'产品{i}')
        print(f'生产:产品{i}')

def consumer(q):
    while True:
        item = q.get()
        print(f'消费:{item}')

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.terminate()

Queue方法注意:

  • put(item, timeout):队列满时阻塞等待,超时抛出异常

  • get(timeout):队列空时阻塞等待

  • put_nowait() / get_nowait():非阻塞版本

Pipe实现双向通信:

python

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send('Hello from sender')
    conn.close()

def receiver(conn):
    msg = conn.recv()
    print('收到:', msg)

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(parent_conn,))
    p2 = Process(target=receiver, args=(child_conn,))
    p1.start()
    p2.start()

2.4 进程池:高效管理进程

python

from concurrent.futures import ProcessPoolExecutor
import os

def task(n):
    print(f'任务{n} 在进程 {os.getpid()} 执行')
    return n * 2

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, i) for i in range(5)]
        for f in futures:
            print('结果:', f.result())

三、多线程编程:数据共享与锁

3.1 线程创建与数据共享

python

from threading import Thread, Lock

counter = 0
lock = Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = [Thread(target=increment) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(counter)  # 正确输出200000

线程 vs 进程数据共享:

  • 线程间共享全局变量(需加锁保护)

  • 进程间内存独立,需借助Queue/Pipe等机制通信

3.2 线程锁的必要性

不加锁时,多个线程同时修改共享变量会导致数据错乱(GIL并不能保证原子性)。with lock语句确保临界区代码串行执行。

四、协程:异步IO的利器

4.1 协程基础概念

协程是单线程内的任务调度机制,通过async/await语法实现非阻塞IO。当遇到IO操作时,事件循环会挂起当前任务,转而执行其他任务,从而大幅提升IO密集型任务的效率。

4.2 协程函数与事件循环

python

import asyncio

async def work():
    print('开始工作')
    await asyncio.sleep(1)  # 模拟IO操作
    print('工作结束')
    return 'result'

# 运行协程
result = asyncio.run(work())
print(result)

核心要点:

  • async def 定义协程函数,调用返回协程对象(不执行)

  • await 挂起当前协程,等待可等待对象(协程/Task/Future)完成

  • asyncio.run() 创建事件循环并运行协程

4.3 并发执行多个协程

python

import asyncio
import time

async def task(name, delay):
    await asyncio.sleep(delay)
    return f'{name}完成'

async def main():
    start = time.time()
    # 并发执行三个任务
    results = await asyncio.gather(
        task('A', 2),
        task('B', 2),
        task('C', 2)
    )
    print(results)
    print(f'耗时: {time.time()-start:.2f}秒')  # 约2秒而非6秒

asyncio.run(main())

4.4 实战:异步下载图片对比

同步下载(耗时累加):

python

import requests

def download(url):
    resp = requests.get(url)
    with open(url[-10:], 'wb') as f:
        f.write(resp.content)

for url in url_list:
    download(url)  # 总耗时 = 每张图片下载时间之和

异步下载(并发执行):

python

import aiohttp
import asyncio

async def download(session, url):
    async with session.get(url) as resp:
        content = await resp.read()
        with open(url[-10:], 'wb') as f:
            f.write(content)

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [download(session, url) for url in url_list]
        await asyncio.gather(*tasks)

asyncio.run(main())  # 总耗时 ≈ 最慢的一张图片下载时间

五、并发模型对比总结

维度 多进程 多线程 协程
内存隔离 独立内存空间,数据不共享 共享进程内存 共享线程内存
切换开销 大(系统调用) 中等 极小(用户态切换)
适用场景 CPU密集型 IO密集型(需注意GIL) 高并发IO密集型
通信方式 Queue、Pipe、共享内存 共享变量 + 锁 无需特殊机制(单线程)
Python实现 multiprocessing threading asyncio

六、常见问题与最佳实践

  1. GIL的影响:Python多线程由于全局解释器锁的存在,无法利用多核执行CPU密集型任务,此时应改用多进程。

  2. 避免死锁:使用with lock语句自动管理锁的获取与释放;尽量使用RLock避免嵌套锁问题。

  3. 队列阻塞处理:设置合理的timeout参数,防止程序永久阻塞。

  4. 协程注意事项:不要在协程中调用同步阻塞函数(如time.sleep),应使用asyncio.sleep;网络请求需使用支持异步的库(如aiohttp)。

  5. 进程池大小:一般设置为CPU核心数 + 1,过多进程会导致上下文切换开销增大。

结语

从传统的多进程、多线程到现代的协程,Python为我们提供了丰富的并发编程工具。理解它们的底层原理和适用场景,是构建高性能应用的关键。本文的代码案例涵盖了从基础Socket通信到高级异步并发的完整链路,希望能帮助你在实际项目中灵活选用最合适的方案。

如果你觉得本文有帮助,欢迎点赞、收藏、转发,也欢迎在评论区交流你的并发编程经验

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐