UDP分片实战
·
前言
完整代码和演示demo在最后面
通过本文的学习,将能够(可能):
- 解UDP与TCP的核心差异及其适用场景
- 掌握NIO Reactor模型的设计原理和实现方法
- 构建UDP通信框架
- 实现分片协议和心跳机制
一、UDP和TCP的差异
| 维度 | TCP | UDP |
|---|---|---|
| 连接 | 有连接,一个 Channel 对应一个客户端 | 无连接,一个 Channel 收发所有设备 |
| 读写 | 字节流,需处理粘包/半包 | 数据报,一个包就是一个完整单元 |
| 流控 | 自带滑动窗口 | 无流控,send 满则静默丢包 |
| 服务端 | ServerSocketChannel + SocketChannel | 只需一个 DatagramChannel |
关键认知:UDP 没有“连接”,服务端不维护连接状态。一个DatagramChannel可以收发任意设备的数据,设备地址从receive()的返回值获取
二、NIO Reactor模型
UDP的Reactor骨架和TCP类似,都是绑定注册创建selector,劫持select。。。
channel = DatagramChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(PORT));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
new Thread(this::iterateSelector, "selector-thread").start();
private void iterateSelector(){
while(!Thread.currentThread().isInterrupted()){
try {
if (selector.select(100) == 0 ){
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
..........
}
}
}catch(IOException e){
throw new RuntimeException(e);
}
}
}
三、客户端模板
这里展现的是简单的udp客户端模板,包含守护线程接受消息,主动发送包装好的消息给服务端
private static final int PORT = 8888;
private static final String HOST = "127.0.0.1";
private static final int BUFFER_SIZE = 1024;
// 定义分包大小
private static final int MTU_SIZE = 500;
// 定义消息头大小
private static final int HEADER_SIZE = 7;
// 定义负载body大小
private static final int MAX_PAYLOAD = MTU_SIZE - HEADER_SIZE;
// 全局消息ID
private static int THE_MESSAGE_ID = 0;
public static void main(String[] args) throws IOException {
// 创建UDP通道(非阻塞模式)
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
// 连接服务端(UDP的connect只是设置默认目标地址,不建立真正连接)
channel.connect(new InetSocketAddress(HOST, PORT));
System.out.println("UDP客户端启动成功,准备发送数据...");
// 启动读线程:接收服务端响应
Thread readThread = new Thread(() -> {
ByteBuffer readBuf = ByteBuffer.allocate(BUFFER_SIZE);
try {
while (!Thread.currentThread().isInterrupted()) {
readBuf.clear();
// 接收数据(非阻塞,立即返回)
InetSocketAddress sender = (InetSocketAddress) channel.receive(readBuf);
if (sender != null && readBuf.position() > 0) {
readBuf.flip();
byte[] data = new byte[readBuf.remaining()];
readBuf.get(data);
String msg = new String(data, StandardCharsets.UTF_8);
System.out.println("\n[收到响应] 来自: " + sender + " | 内容: " + msg);
System.out.print("[发送消息] ");
}
// 短暂休眠,避免CPU空转
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (IOException e) {
System.err.println("读取异常: " + e.getMessage());
}
}, "udp-read-thread");
readThread.setDaemon(true);
readThread.start();
// 主线程负责写:读取用户输入并发送
Scanner scanner = new Scanner(System.in);
System.out.print("[发送消息] ");
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (line.isEmpty()) {
System.out.print("[发送消息] ");
continue;
}
// 退出命令
if ("exit".equalsIgnoreCase(line) || "quit".equalsIgnoreCase(line)) {
System.out.println("正在关闭客户端...");
break;
}
try {
// 复制为字节数组
byte[] bytes = line.getBytes(StandardCharsets.UTF_8);
int byLen = bytes.length;
int totalChunk = (int) Math.ceil((double) byLen / MAX_PAYLOAD);
int offset = 0;
if(byLen <= MTU_SIZE){
ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + byLen);
buf.putShort((short)THE_MESSAGE_ID); // 唯一消息号
buf.putShort((short) 0); // 当前分片号
buf.putShort((short) totalChunk);// 总分片数
buf.put((byte) 1); // 结束标记
buf.put(bytes);
buf.flip();
int zSend = channel.write(buf);
System.out.println("[直接发送] 总分片:"+totalChunk+" 共"+zSend+"字节");
}else{
int seq = 0;
while(offset < byLen){
// 计算本次切片所需长度
int nowLen = Math.min(MAX_PAYLOAD,byLen - offset);
// 初始化本次切片
byte[] chunk = new byte[nowLen];
System.arraycopy(bytes,offset,chunk,0,nowLen);
// 修改偏移量
offset += nowLen;
// 发送切片数据
ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + nowLen);
buf.putShort((short)THE_MESSAGE_ID);
buf.putShort((short) seq);
buf.putShort((short) totalChunk);
byte isLast = (offset >= byLen) ? (byte) 1 : 0;
buf.put(isLast);
buf.put(chunk);
buf.flip();
channel.write(buf);
System.out.println("[分片发送] 序号:"+seq+" 长度:" + nowLen + " 字节");
seq++;
}
// 统计是否完成发送,共发送数据
System.out.println("[发送完成] 总计发送 " + byLen + " 字节");
}
THE_MESSAGE_ID ++;
} catch (IOException e) {
System.err.println("发送失败: " + e.getMessage());
}
}
// 清理资源
scanner.close();
readThread.interrupt();
channel.close();
System.out.println("客户端已关闭");
}
四、心跳处理
服务端心跳处理是为了表明设备还活着。在UDP中,心跳不是为了检测连接状态,而是让服务端维护一个“活跃设备列表”
// 客户地址记录
private static final ConcurrentHashMap<Object, Long> clientAddresses = new ConcurrentHashMap<>();
// 在服务端处理SelectionKey中
clientAddresses.put(receive, nowTime);
//在服务端启动后,创建定时线程
ScheduledExecutorService cleanPool = new ScheduledThreadPoolExecutor(
1,
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
long timeout = 10 * 1000;
cleanPool.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
clientAddresses.entrySet().removeIf(entry -> {
Long ctx = entry.getValue();
System.out.println("对象:"+ entry.getKey()+ "已连接了" + (now - ctx)/1000 + "秒");
return (now - ctx) > timeout;
});
// 这里先不管,是后面的清理分包缓存
server.udpGroupServer.clearTimeout();
}, 0, 60, TimeUnit.SECONDS);
五、分片协议设计
5.1 协议头
[消息ID 2B][分片号 2B][总分片数 2B][结束标记 1B][数据体 xB]
- 消息ID:客户端持有会话时,每次消息都会有个新的消息ID。服务端可以拼接消息ID和客户端地址得到唯一字段。
- 分片号:从 0 开始
- 总分片数:每片都携带,服务端可预分配缓存
- 结束标记:0=中间片,1=最后一片
5.2 客户端分片
byte[] bytes = line.getBytes(StandardCharsets.UTF_8);
int byLen = bytes.length;
int totalChunk = (int) Math.ceil((double) byLen / MAX_PAYLOAD);
int offset = 0;
if(byLen <= MTU_SIZE){
ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + byLen);
buf.putShort((short)THE_MESSAGE_ID); // 唯一消息号
buf.putShort((short) 0); // 当前分片号
buf.putShort((short) totalChunk);// 总分片数
buf.put((byte) 1); // 结束标记
buf.put(bytes);
buf.flip();
int zSend = channel.write(buf);
}
5.3 服务端重组
public void handleReceive(InetSocketAddress sender, int clientLocalMsgId, short totalChunk, short seq, byte[] body) {
// 1. 构建唯一Key
if (body == null || body.length == 0) {
return;
}
ClientMsgKey key = new ClientMsgKey(sender, clientLocalMsgId);
// 2. 获取、创建外层分组对象
UDPChunkGroup group = globalMsgCache.computeIfAbsent(key, k -> new UDPChunkGroup(k, totalChunk));
UDPCombinationDTO dto = group.getCombination();
// 3. 存入分片
dto.getChunkMap().put(seq, body);
// 4. 判断是否收齐
if (dto.getChunkMap().size() == dto.getTotalChunk()) {
group.setStatus(1); // 标记已完成
Map<Short, byte[]> chunkMap = dto.getChunkMap();
StringBuilder fullMsg = new StringBuilder();
for (short i = 0; i < totalChunk; i++) {
byte[] part = chunkMap.get(i);
if (part != null) {
String partStr = new String(part, StandardCharsets.UTF_8);
fullMsg.append(partStr);
}
}
String completeMsg = fullMsg.toString();
System.out.printf("【消息重组完成】来源:%s | 内容:%s%n", sender, completeMsg);
globalMsgCache.remove(key);
}
}
- 用 InetSocketAddress + msgId 做复合键
- computeIfAbsent 保证同一消息的分片进入同一缓存
- 判断收齐:chunkMap.size() == 总分片数
六、超时处理
public void clearTimeout() {
long now = System.currentTimeMillis();
globalMsgCache.entrySet().removeIf(entry -> {
UDPChunkGroup group = entry.getValue();
UDPCombinationDTO dto = group.getCombination();
boolean isTimeout = now - dto.getCreateTs() > TIMEOUT;
boolean isCollecting = group.getStatus() == 0;
if (isTimeout && isCollecting) {
group.setStatus(2); // 标记为超时丢弃
System.out.printf("【消息超时丢弃】客户端:%s%n", entry.getKey().getAddress());
return true;
}
return false;
});
}
七、ACK回复
UDP发送消息给客户端很简单,不需要写像TCP那样复杂的写队列。但是由于UDP本身的特性,发送消息是取决于业务的,不是强制的
ByteBuffer ack = ByteBuffer.allocate(5);
ack.putShort(seq);
ack.put((byte) 0x01);
ack.flip();
channel.send(ack, sender);
- sender是先前receive()获取的客户端地址
八、完整架构
UDP包 → Selector线程 → 心跳 → 更新活跃表
→ 数据 → 解析协议头 → 分片缓存
↓
size == totalChunk ?
↓
组装完整消息
↓
策略模式 → 业务处理
九、其他
- UDP 无连接:服务端只维护“活跃时间”记录
- 心跳和数据分开处理:心跳在 IO 线程直接处理,不进队列
- 分片缓存必须有超时清理:否则设备断线导致内存泄漏
- 多厂家用策略模式适配:IO 层解析,消费者只处理标准消息
- 是否发 ACK 看业务:重要数据才需要,UDP 发 ACK 不费力
十、完整代码
public class UDPGroupServer {
public static class ClientMsgKey{
@Getter
private final InetSocketAddress address;
private final int id;
public ClientMsgKey(InetSocketAddress address, int id) {
this.address = address;
this.id = id;
}
@Override
public int hashCode() {
return Objects.hash(address, id);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ClientMsgKey other = (ClientMsgKey) obj;
return Objects.equals(address, other.address) && id == other.id;
}
}
public static class UDPCombinationDTO {
@Getter
private final Short totalChunk;
@Getter
private final Map<Short, byte[]> chunkMap;
@Getter
private final long createTs;
public UDPCombinationDTO(Short total) {
this.totalChunk = total;
this.chunkMap = new ConcurrentHashMap<>();
this.createTs = System.currentTimeMillis();
}
}
// 总合包
public static class UDPChunkGroup {
// 唯一标识Key
private final ClientMsgKey msgKey;
// 分片聚合主体
private final UDPCombinationDTO combination;
// 重传次数(丢包时记录)
private int retryCount = 0;
// 消息状态:0-收集中 1-已完成 2-已超时丢弃
@Setter
private int status = 0;
public UDPChunkGroup(ClientMsgKey key, Short totalChunk) {
this.msgKey = key;
this.combination = new UDPCombinationDTO(totalChunk);
}
public ClientMsgKey getMsgKey() { return msgKey; }
public UDPCombinationDTO getCombination() { return combination; }
public int getRetryCount() { return retryCount; }
public int getStatus() { return status; }
// 状态/重试次数修改方法(可控修改,比直接set更安全)
public void incRetryCount() {
this.retryCount++;
}
}
private final Map<ClientMsgKey, UDPChunkGroup> globalMsgCache = new ConcurrentHashMap<>();
// 超时阈值
private static final long TIMEOUT = 3000;
public void handleReceive(InetSocketAddress sender, int clientLocalMsgId, short totalChunk, short seq, byte[] body) {
// 1. 构建唯一Key
if (body == null || body.length == 0) {
return;
}
ClientMsgKey key = new ClientMsgKey(sender, clientLocalMsgId);
// 2. 获取、创建外层分组对象
UDPChunkGroup group = globalMsgCache.computeIfAbsent(key, k -> new UDPChunkGroup(k, totalChunk));
UDPCombinationDTO dto = group.getCombination();
// 3. 存入分片
dto.getChunkMap().put(seq, body);
// 4. 判断是否收齐
if (dto.getChunkMap().size() == dto.getTotalChunk()) {
group.setStatus(1); // 标记已完成
Map<Short, byte[]> chunkMap = dto.getChunkMap();
StringBuilder fullMsg = new StringBuilder();
for (short i = 0; i < totalChunk; i++) {
byte[] part = chunkMap.get(i);
if (part != null) {
String partStr = new String(part, StandardCharsets.UTF_8);
fullMsg.append(partStr);
}
}
String completeMsg = fullMsg.toString();
System.out.printf("【消息重组完成】来源:%s | 内容:%s%n", sender, completeMsg);
globalMsgCache.remove(key);
}
}
public void clearTimeout() {
long now = System.currentTimeMillis();
globalMsgCache.entrySet().removeIf(entry -> {
UDPChunkGroup group = entry.getValue();
UDPCombinationDTO dto = group.getCombination();
boolean isTimeout = now - dto.getCreateTs() > TIMEOUT;
boolean isCollecting = group.getStatus() == 0;
if (isTimeout && isCollecting) {
group.setStatus(2); // 标记为超时丢弃
System.out.printf("【消息超时丢弃】客户端:%s%n", entry.getKey().getAddress());
return true;
}
return false;
});
}
}
public class UDPNIOServerDemo {
private static final int PORT = 8888;
private static final String HOST = "127.0.0.1";
private static final int BUFFER_SIZE = 1024;
private final ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
private DatagramChannel channel;
private Selector selector;
private final ExecutorService businessPool;
private final UDPGroupServer udpGroupServer;
// 客户地址记录
private static final ConcurrentHashMap<Object, Long> clientAddresses = new ConcurrentHashMap<>();
UDPNIOServerDemo() {
this.udpGroupServer = new UDPGroupServer();
this.businessPool = new ThreadPoolExecutor(
2,
5,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy()
);
// 预创建所有核心线程,避免首次任务提交时的线程创建开销
((ThreadPoolExecutor) businessPool).prestartAllCoreThreads();
}
private void start() throws IOException {
channel = DatagramChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(PORT));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
new Thread(this::iterateSelector, "selector-thread").start();
}
private void iterateSelector(){
while(!Thread.currentThread().isInterrupted()){
try {
if (selector.select(100) == 0 ){
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
buf.clear();
SocketAddress receive = channel.receive(buf);
buf.flip();
short mesId = buf.getShort();
short seq = buf.getShort();
short totalChunk = buf.getShort();
byte isLast = buf.get();
// 复制数据到新的byte数组,避免多线程共享缓冲区
byte[] data = new byte[buf.remaining()];
buf.get(data);
String content = new String(data, StandardCharsets.UTF_8);
Long nowTime = System.currentTimeMillis();
clientAddresses.put(receive, nowTime);
// 使用execute(),因为不需要返回值
businessPool.execute(() -> {
udpGroupServer.handleReceive((InetSocketAddress) receive, mesId,totalChunk, seq, data);
});
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
UDPNIOServerDemo server = new UDPNIOServerDemo();
try {
server.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
ScheduledExecutorService cleanPool = new ScheduledThreadPoolExecutor(
1,
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
long timeout = 10 * 1000;
cleanPool.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
clientAddresses.entrySet().removeIf(entry -> {
Long ctx = entry.getValue();
System.out.println("对象:"+ entry.getKey()+ "已连接了" + (now - ctx)/1000 + "秒");
return (now - ctx) > timeout;
});
server.udpGroupServer.clearTimeout();
}, 0, 60, TimeUnit.SECONDS);
}
}
26.6.11更新: 完整springboot UDP Demo
https://gitee.com/Xia-cuan/udp-fragment-reassembly-demo
https://github.com/Xiacqi1/UDP-Fragment-Reassembly-Demo-Project
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)