前言

完整代码和演示demo在最后面

通过本文的学习,将能够(可能):

  1. 解UDP与TCP的核心差异及其适用场景
  2. 掌握NIO Reactor模型的设计原理和实现方法
  3. 构建UDP通信框架
  4. 实现分片协议和心跳机制

一、UDP和TCP的差异

维度 TCP UDP
连接 有连接,一个 Channel 对应一个客户端 无连接,一个 Channel 收发所有设备
读写 字节流,需处理粘包/半包 数据报,一个包就是一个完整单元
流控 自带滑动窗口 无流控,send 满则静默丢包
服务端 ServerSocketChannel + SocketChannel 只需一个 DatagramChannel

关键认知:UDP 没有“连接”,服务端不维护连接状态。一个DatagramChannel可以收发任意设备的数据,设备地址从receive()的返回值获取

二、NIO Reactor模型

UDP的Reactor骨架和TCP类似,都是绑定注册创建selector,劫持select。。。

 channel = DatagramChannel.open();
 channel.configureBlocking(false);
 channel.bind(new InetSocketAddress(PORT));
 selector = Selector.open();
 channel.register(selector, SelectionKey.OP_READ);
 new Thread(this::iterateSelector, "selector-thread").start();
 
private void iterateSelector(){
        while(!Thread.currentThread().isInterrupted()){
            try {
                if (selector.select(100) == 0 ){
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                    ..........
                    }
                   }
                  }catch(IOException e){
                throw new RuntimeException(e);
            }
        }
    }

三、客户端模板

这里展现的是简单的udp客户端模板,包含守护线程接受消息,主动发送包装好的消息给服务端

    private static final int PORT = 8888;
    private static final String HOST = "127.0.0.1";
    private static final int BUFFER_SIZE = 1024;
    // 定义分包大小
    private static final int MTU_SIZE = 500;
    // 定义消息头大小
    private static final int HEADER_SIZE = 7;
    // 定义负载body大小
    private static final int MAX_PAYLOAD = MTU_SIZE - HEADER_SIZE;
    // 全局消息ID
    private static int THE_MESSAGE_ID = 0;
public static void main(String[] args) throws IOException {
        // 创建UDP通道(非阻塞模式)
        DatagramChannel channel = DatagramChannel.open();
        channel.configureBlocking(false);
        // 连接服务端(UDP的connect只是设置默认目标地址,不建立真正连接)
        channel.connect(new InetSocketAddress(HOST, PORT));
        System.out.println("UDP客户端启动成功,准备发送数据...");
        
        // 启动读线程:接收服务端响应
        Thread readThread = new Thread(() -> {
            ByteBuffer readBuf = ByteBuffer.allocate(BUFFER_SIZE);
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    readBuf.clear();
                    // 接收数据(非阻塞,立即返回)
                    InetSocketAddress sender = (InetSocketAddress) channel.receive(readBuf);
                    
                    if (sender != null && readBuf.position() > 0) {
                        readBuf.flip();
                        byte[] data = new byte[readBuf.remaining()];
                        readBuf.get(data);
                        String msg = new String(data, StandardCharsets.UTF_8);
                        System.out.println("\n[收到响应] 来自: " + sender + " | 内容: " + msg);
                        System.out.print("[发送消息] ");
                    }
                    
                    // 短暂休眠,避免CPU空转
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            } catch (IOException e) {
                System.err.println("读取异常: " + e.getMessage());
            }
        }, "udp-read-thread");
        readThread.setDaemon(true);
        readThread.start();
        
        // 主线程负责写:读取用户输入并发送
        Scanner scanner = new Scanner(System.in);
        System.out.print("[发送消息] ");
        
        while (scanner.hasNextLine()) {
            String line = scanner.nextLine();
            if (line.isEmpty()) {
                System.out.print("[发送消息] ");
                continue;
            }
            
            // 退出命令
            if ("exit".equalsIgnoreCase(line) || "quit".equalsIgnoreCase(line)) {
                System.out.println("正在关闭客户端...");
                break;
            }

            try {
                // 复制为字节数组
                byte[] bytes = line.getBytes(StandardCharsets.UTF_8);
                int byLen = bytes.length;
                int totalChunk = (int) Math.ceil((double) byLen / MAX_PAYLOAD);
                int offset = 0;
                if(byLen <= MTU_SIZE){
                    ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + byLen);
                    buf.putShort((short)THE_MESSAGE_ID); // 唯一消息号
                    buf.putShort((short) 0);         // 当前分片号
                    buf.putShort((short) totalChunk);// 总分片数
                    buf.put((byte) 1);               // 结束标记
                    buf.put(bytes);
                    buf.flip();
                    int zSend = channel.write(buf);
                    System.out.println("[直接发送] 总分片:"+totalChunk+" 共"+zSend+"字节");
                }else{
                    int seq = 0;
                    while(offset < byLen){
                        // 计算本次切片所需长度
                        int nowLen = Math.min(MAX_PAYLOAD,byLen - offset);
                        // 初始化本次切片
                        byte[] chunk = new byte[nowLen];
                        System.arraycopy(bytes,offset,chunk,0,nowLen);
                        // 修改偏移量
                        offset += nowLen;
                        // 发送切片数据
                        ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + nowLen);
                        buf.putShort((short)THE_MESSAGE_ID);
                        buf.putShort((short) seq);
                        buf.putShort((short) totalChunk);
                        byte isLast = (offset >= byLen) ? (byte) 1 : 0;
                        buf.put(isLast);
                        buf.put(chunk);
                        buf.flip();
                        channel.write(buf);
                        System.out.println("[分片发送] 序号:"+seq+" 长度:" + nowLen + " 字节");
                        seq++;
                    }
                    // 统计是否完成发送,共发送数据
                    System.out.println("[发送完成] 总计发送 " + byLen + " 字节");
                }
                THE_MESSAGE_ID ++;
            } catch (IOException e) {
                System.err.println("发送失败: " + e.getMessage());
            }
        }
        
        // 清理资源
        scanner.close();
        readThread.interrupt();
        channel.close();
        System.out.println("客户端已关闭");
    }

四、心跳处理

服务端心跳处理是为了表明设备还活着。在UDP中,心跳不是为了检测连接状态,而是让服务端维护一个“活跃设备列表”

    // 客户地址记录
    private static final ConcurrentHashMap<Object, Long> clientAddresses = new ConcurrentHashMap<>();
    // 在服务端处理SelectionKey中
    clientAddresses.put(receive, nowTime);
    //在服务端启动后,创建定时线程
    ScheduledExecutorService cleanPool = new ScheduledThreadPoolExecutor(
                1,
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );
     long timeout = 10 * 1000;
        cleanPool.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            clientAddresses.entrySet().removeIf(entry -> {

                Long ctx = entry.getValue();
                System.out.println("对象:"+ entry.getKey()+ "已连接了" + (now - ctx)/1000 + "秒");
                return (now - ctx) > timeout;
            });
            // 这里先不管,是后面的清理分包缓存
            server.udpGroupServer.clearTimeout();
        }, 0, 60, TimeUnit.SECONDS);

五、分片协议设计

5.1 协议头

[消息ID 2B][分片号 2B][总分片数 2B][结束标记 1B][数据体 xB]
  • 消息ID:客户端持有会话时,每次消息都会有个新的消息ID。服务端可以拼接消息ID和客户端地址得到唯一字段。
  • 分片号:从 0 开始
  • 总分片数:每片都携带,服务端可预分配缓存
  • 结束标记:0=中间片,1=最后一片

5.2 客户端分片

  byte[] bytes = line.getBytes(StandardCharsets.UTF_8);
  int byLen = bytes.length;
  int totalChunk = (int) Math.ceil((double) byLen / MAX_PAYLOAD);
  int offset = 0;
  if(byLen <= MTU_SIZE){
      ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + byLen);
      buf.putShort((short)THE_MESSAGE_ID); // 唯一消息号
      buf.putShort((short) 0);         // 当前分片号
      buf.putShort((short) totalChunk);// 总分片数
      buf.put((byte) 1);               // 结束标记
      buf.put(bytes);
      buf.flip();
      int zSend = channel.write(buf);
  }

5.3 服务端重组

public void handleReceive(InetSocketAddress sender, int clientLocalMsgId, short totalChunk, short seq, byte[] body) {
        // 1. 构建唯一Key
        if (body == null || body.length == 0) {
            return;
        }

        ClientMsgKey key = new ClientMsgKey(sender, clientLocalMsgId);

        // 2. 获取、创建外层分组对象
        UDPChunkGroup group = globalMsgCache.computeIfAbsent(key, k -> new UDPChunkGroup(k, totalChunk));
        UDPCombinationDTO dto = group.getCombination();

        // 3. 存入分片
        dto.getChunkMap().put(seq, body);

        // 4. 判断是否收齐
        if (dto.getChunkMap().size() == dto.getTotalChunk()) {
            group.setStatus(1); // 标记已完成
            Map<Short, byte[]> chunkMap = dto.getChunkMap();
            StringBuilder fullMsg = new StringBuilder();

            for (short i = 0; i < totalChunk; i++) {
                byte[] part = chunkMap.get(i);
                if (part != null) {
                    String partStr = new String(part, StandardCharsets.UTF_8);
                    fullMsg.append(partStr);
                }
            }
            String completeMsg = fullMsg.toString();
            System.out.printf("【消息重组完成】来源:%s | 内容:%s%n", sender, completeMsg);

            globalMsgCache.remove(key);
        }
    }
  • 用 InetSocketAddress + msgId 做复合键
  • computeIfAbsent 保证同一消息的分片进入同一缓存
  • 判断收齐:chunkMap.size() == 总分片数

六、超时处理

public void clearTimeout() {
        long now = System.currentTimeMillis();
        globalMsgCache.entrySet().removeIf(entry -> {
            UDPChunkGroup group = entry.getValue();
            UDPCombinationDTO dto = group.getCombination();
            boolean isTimeout = now - dto.getCreateTs() > TIMEOUT;
            boolean isCollecting = group.getStatus() == 0;
            if (isTimeout && isCollecting) {
                group.setStatus(2); // 标记为超时丢弃
                System.out.printf("【消息超时丢弃】客户端:%s%n", entry.getKey().getAddress());
                return true;
            }
            return false;
        });
    }

七、ACK回复

UDP发送消息给客户端很简单,不需要写像TCP那样复杂的写队列。但是由于UDP本身的特性,发送消息是取决于业务的,不是强制的

ByteBuffer ack = ByteBuffer.allocate(5);
ack.putShort(seq);
ack.put((byte) 0x01);
ack.flip();
channel.send(ack, sender);
  • sender是先前receive()获取的客户端地址

八、完整架构

UDP包 → Selector线程 → 心跳 → 更新活跃表
                     → 数据 → 解析协议头 → 分片缓存
                                            ↓
                                      size == totalChunk ?
                                            ↓
                                      组装完整消息
                                            ↓
                                    策略模式 → 业务处理

九、其他

  1. UDP 无连接:服务端只维护“活跃时间”记录
  2. 心跳和数据分开处理:心跳在 IO 线程直接处理,不进队列
  3. 分片缓存必须有超时清理:否则设备断线导致内存泄漏
  4. 多厂家用策略模式适配:IO 层解析,消费者只处理标准消息
  5. 是否发 ACK 看业务:重要数据才需要,UDP 发 ACK 不费力

十、完整代码

public class UDPGroupServer {
    public static class ClientMsgKey{
        @Getter
        private final InetSocketAddress address;
        private final int id;
        public ClientMsgKey(InetSocketAddress address, int id) {
            this.address = address;
            this.id = id;
        }
        @Override
        public int hashCode() {
            return Objects.hash(address, id);
        }
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            ClientMsgKey other = (ClientMsgKey) obj;
            return Objects.equals(address, other.address) && id == other.id;
        }
    }
    public static class UDPCombinationDTO {
        @Getter
        private final Short totalChunk;
        @Getter
        private final Map<Short, byte[]> chunkMap;
        @Getter
        private final long createTs;

        public UDPCombinationDTO(Short total) {
            this.totalChunk = total;
            this.chunkMap = new ConcurrentHashMap<>();
            this.createTs = System.currentTimeMillis();
        }
    }
    // 总合包
    public static class UDPChunkGroup {
        // 唯一标识Key
        private final ClientMsgKey msgKey;
        // 分片聚合主体
        private final UDPCombinationDTO combination;

        // 重传次数(丢包时记录)
        private int retryCount = 0;
        // 消息状态:0-收集中 1-已完成 2-已超时丢弃
        @Setter
        private int status = 0;

        public UDPChunkGroup(ClientMsgKey key, Short totalChunk) {
            this.msgKey = key;
            this.combination = new UDPCombinationDTO(totalChunk);
        }

        public ClientMsgKey getMsgKey() { return msgKey; }
        public UDPCombinationDTO getCombination() { return combination; }
        public int getRetryCount() { return retryCount; }
        public int getStatus() { return status; }

        // 状态/重试次数修改方法(可控修改,比直接set更安全)
        public void incRetryCount() {
            this.retryCount++;
        }

    }
    private final Map<ClientMsgKey, UDPChunkGroup> globalMsgCache = new ConcurrentHashMap<>();
    // 超时阈值
    private static final long TIMEOUT = 3000;
    public void handleReceive(InetSocketAddress sender, int clientLocalMsgId, short totalChunk, short seq, byte[] body) {
        // 1. 构建唯一Key
        if (body == null || body.length == 0) {
            return;
        }

        ClientMsgKey key = new ClientMsgKey(sender, clientLocalMsgId);

        // 2. 获取、创建外层分组对象
        UDPChunkGroup group = globalMsgCache.computeIfAbsent(key, k -> new UDPChunkGroup(k, totalChunk));
        UDPCombinationDTO dto = group.getCombination();

        // 3. 存入分片
        dto.getChunkMap().put(seq, body);

        // 4. 判断是否收齐
        if (dto.getChunkMap().size() == dto.getTotalChunk()) {
            group.setStatus(1); // 标记已完成
            Map<Short, byte[]> chunkMap = dto.getChunkMap();
            StringBuilder fullMsg = new StringBuilder();

            for (short i = 0; i < totalChunk; i++) {
                byte[] part = chunkMap.get(i);
                if (part != null) {
                    String partStr = new String(part, StandardCharsets.UTF_8);
                    fullMsg.append(partStr);
                }
            }
            String completeMsg = fullMsg.toString();
            System.out.printf("【消息重组完成】来源:%s | 内容:%s%n", sender, completeMsg);

            globalMsgCache.remove(key);
        }
    }
    public void clearTimeout() {
        long now = System.currentTimeMillis();
        globalMsgCache.entrySet().removeIf(entry -> {
            UDPChunkGroup group = entry.getValue();
            UDPCombinationDTO dto = group.getCombination();
            boolean isTimeout = now - dto.getCreateTs() > TIMEOUT;
            boolean isCollecting = group.getStatus() == 0;
            if (isTimeout && isCollecting) {
                group.setStatus(2); // 标记为超时丢弃
                System.out.printf("【消息超时丢弃】客户端:%s%n", entry.getKey().getAddress());
                return true;
            }
            return false;
        });
    }
}
public class UDPNIOServerDemo {
    private static final int PORT = 8888;
    private static final String HOST = "127.0.0.1";
    private static final int BUFFER_SIZE = 1024;
    private final ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
    private DatagramChannel channel;
    private Selector selector;
    private final ExecutorService businessPool;
    private final UDPGroupServer udpGroupServer;
    // 客户地址记录
    private static final ConcurrentHashMap<Object, Long> clientAddresses = new ConcurrentHashMap<>();
    UDPNIOServerDemo() {
        this.udpGroupServer = new UDPGroupServer();
        this.businessPool = new ThreadPoolExecutor(
                2,
                5,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.DiscardPolicy()
        );
        // 预创建所有核心线程,避免首次任务提交时的线程创建开销
        ((ThreadPoolExecutor) businessPool).prestartAllCoreThreads();
    }
    private void start() throws IOException {
        channel = DatagramChannel.open();
        channel.configureBlocking(false);
        channel.bind(new InetSocketAddress(PORT));
        selector = Selector.open();
        channel.register(selector, SelectionKey.OP_READ);
        new Thread(this::iterateSelector, "selector-thread").start();
    }
    private void iterateSelector(){
        while(!Thread.currentThread().isInterrupted()){
            try {
                if (selector.select(100) == 0 ){
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        buf.clear();
                        SocketAddress receive = channel.receive(buf);
                        buf.flip();
                        short mesId = buf.getShort();
                        short seq = buf.getShort();
                        short totalChunk = buf.getShort();
                        byte isLast = buf.get();


                        // 复制数据到新的byte数组,避免多线程共享缓冲区
                        byte[] data = new byte[buf.remaining()];
                        buf.get(data);
                        String content = new String(data, StandardCharsets.UTF_8);
                        Long nowTime = System.currentTimeMillis();
                        clientAddresses.put(receive, nowTime);
                        // 使用execute(),因为不需要返回值
                        businessPool.execute(() -> {
                            udpGroupServer.handleReceive((InetSocketAddress) receive,  mesId,totalChunk, seq, data);
                        });
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) {
        UDPNIOServerDemo server = new UDPNIOServerDemo();
        try {
            server.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        ScheduledExecutorService cleanPool = new ScheduledThreadPoolExecutor(
                1,
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );
        long timeout = 10 * 1000;
        cleanPool.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            clientAddresses.entrySet().removeIf(entry -> {

                Long ctx = entry.getValue();
                System.out.println("对象:"+ entry.getKey()+ "已连接了" + (now - ctx)/1000 + "秒");
                return (now - ctx) > timeout;
            });
            server.udpGroupServer.clearTimeout();
        }, 0, 60, TimeUnit.SECONDS);
    }
}

26.6.11更新: 完整springboot UDP Demo
https://gitee.com/Xia-cuan/udp-fragment-reassembly-demo
https://github.com/Xiacqi1/UDP-Fragment-Reassembly-Demo-Project

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐