最近在用消息队列处理可能会时间较长的任务,用户在提交后可以先做别的,处理的过程会异步完成,完成后及时通知用户。如何实现?

首先分析一下我们的需求:(1)即时通信,我不知道什么时候需要发送消息,所以希望双方能提前建立连接或者保持长时间的连接(2)需要服务器端主动向客户端发送消息。

1 什么是websocket?

WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。非常适合作为服务器推送的技术。

参考SpringBoot2.0集成WebSocket,实现后台向前端推送信息_springboot集成websocket-CSDN博客

Spring官网关于websocket的guide

Getting Started | Using WebSocket to build an interactive web application (spring.io)

2 为什么需要 WebSocket?

HTTP 协议有一个缺陷:通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。HTTP 在每次请求结束后都会主动释放连接,因此HTTP连接是一种“短连接”。要保持客户端程序的在线状态,就需要向服务器轮询,即不断地向服务器发起连接请求,举例来说,我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此WebSocket 就是这样发明的。

3 WebSocket的特点

1)客户端发起的连接建立在 TCP 协议之上,服务器端的实现比较容易。 2)与 HTTP 协议有着良好的兼容性,握手阶段采用 HTTP 协议。做一个握手的动作浏览器和服务器之间就创建了持久性的连接,两者之间就直接可以进行双向数据传输。 3)数据格式比较轻量,性能开销小,通信高效。 4)可以发送文本,也可以发送二进制数据。 5)没有同源限制,客户端可以与任意服务器通信。 6)协议标识符是 ws(如果加密,则为wss),服务器网址就是 URL。

4 与其他交互技术的对比

1)Http轮询 长轮询就是客户端按照一个固定的时间定期向服务器发送请求,通常这个时间间隔的长度受到服务端的更新频率和客户端处理更新数据时间的影响。这种方式缺点很明显,就是浏览器要不断发请求到服务器以获取最新信息,造成服务器压力过大,占用宽带资源。 2)使用 streaming AJAX streaming ajax是一种通过 ajax 实现的长连接维持机制。主要目的就是在数据传输过程中对返回的数据进行读取,并且不会关闭连接。 3)iframe方式 iframe 可以在页面中嵌套一个子页面,通过将 iframe 的 src 指向一个长连接的请求地址,服务端就能不断往客户端传输数据。

很多网站为了实现推送技术,所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出 HTTP 请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而 HTTP 请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

参考:WebSocket 学习_websocket流程-CSDN博客

4 落地实现

后端:

(1)依赖包

Spring提供的websocket启动包

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-websocket</artifactId>  
</dependency> 

(2)配置ServerEndpointExportor,ServerEndpointExporter是由Spring官方提供的标准实现,用于扫描配置类ServerEndpointCOnfig和@ServerEndpoint注解实例, 让客户端能连上你的服务端。

@Configuration  
public class WebSocketConfig {  
	
    @Bean  
    public ServerEndpointExporter serverEndpointExporter() {  
        return new ServerEndpointExporter();  
    }  
  
}

(3)后端建立Server

@ServerEndpoint("/wsServer/{userId}")// 客户端连接到服务端的地址,并且可以传递传输
@Component
@Slf4j
public class WebSocketServer {
}

属性:

1 OnlineCount:【静态变量】记录当前在线连接数(线程安全的)

private static AtomicInteger onlineCount = new AtomicInteger(0);

2 webSocketMap:【静态变量】记录用户ID与对应的客户端Socket对象(线程安全的)

private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

3 其他属性:session、userId建立连接时传入

private String userId;
private Session session;// 用于向客户端发送消息

方法:

/**
* 连接建立成功调用的方法
* */
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
    this.session = session;
    this.userId=userId;
    if(webSocketMap.containsKey(userId)){
        webSocketMap.remove(userId);
        webSocketMap.put(userId,this);
        //加入map中
    }else{
        webSocketMap.put(userId,this);
        //加入set中
        addOnlineCount();
        //在线数加1
    }
    log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());

    try {
        sendMessage("连接成功");
    } catch (IOException e) {
        log.error("用户:"+userId+",网络异常!!!!!!");
    }
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
    if(webSocketMap.containsKey(userId)){
        webSocketMap.remove(userId);
        //从set中删除
        subOnlineCount();
    }
    log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
    log.info("用户消息:"+userId+",报文:"+message);
    //可以群发消息
    //消息保存到数据库、redis
    if(StringUtils.isNotBlank(message)){
        try {
            //解析发送的报文
            JSONObject jsonObject = JSON.parseObject(message);
            //追加发送人(防止串改)
            jsonObject.put("fromUserId",this.userId);
            String toUserId=jsonObject.getString("toUserId");
            //传送给对应toUserId用户的websocket
            if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
            }else{
                log.error("请求的userId:"+toUserId+"不在该服务器上");
                //否则不在这个服务器上,发送到mysql或者redis
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
    log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
    error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
    this.session.getBasicRemote().sendText(message);
}

/**
* 群发自定义消息
*/
public void sendMessage(String message, String userId){
    log.info("推送消息到客户端 " + userId + ",推送内容:" + message);
    try {
        WebSocketServer webSocketServer = webSocketMap.get(userId);
        if (webSocketServer != null) {
            webSocketServer.sendMessage(message);
        } else {
            log.error("请求的userId:" + userId + "不在该服务器上");
        }
    } catch (IOException e) {
        throw new BusinessException(ErrorCode.SYSTEM_ERROR,"推送消息失败");
    }
}

/**
* 发送自定义消息
* */
public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
    log.info("发送消息到:"+userId+",报文:"+message);
    if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
        webSocketMap.get(userId).sendMessage(message);
    }else{
        log.error("用户"+userId+",不在线!");
    }
}

public static synchronized int getOnlineCount() {
    return onlineCount.get();
}

public static synchronized void addOnlineCount() {
    WebSocketServer.onlineCount.getAndAdd(1);
}

public static synchronized void subOnlineCount() {
    WebSocketServer.onlineCount.getAndAdd(-1);
}

前端:(Typescript react)

import { message } from 'antd';
import {useEffect} from 'react';

const WebSocketComponent = (userId: any) => {

  useEffect(() => {
    const id = userId.userId;
    console.log(userId)
    // 创建WebSocket连接
    const url = 'ws://127.0.0.1:6848/wsServer'+id
    console.log(url)
    const socket = new WebSocket(url);
    if(!socket){
        alert("您的浏览器不支持WebSocket协议!");
    }
    // 处理连接成功事件
    socket.onopen = () => {
      console.log('WebSocket连接已打开');
      socket.send('Hello, WebSocket!'); // 发送一条消息
      console.log('已发送消息');
    };

    // 处理接收到消息事件
    socket.onmessage = (event) => {
      const messageContent = event.data;
      console.log('收到消息:', messageContent);
      // 使用Ant Design的message组件显示消息
      message.success(messageContent,5)
      // 在此处处理接收到的消息
      // return <Notification message={"您的图表生成完成"} description={"请前往`我的图表页面进行查看详情`"}/>
    };

    // 处理连接关闭事件
    socket.onclose = () => {
      socket.send('Hello, WebSocket!'); // 发送一条消息
      console.log('已发送消息');
      console.log('WebSocket连接已关闭');
    };

    // 处理错误事件
    socket.onerror = (error) => {
      console.error('WebSocket发生错误:', error);
    };

    // 在组件卸载时关闭WebSocket连接
    return () => {
      socket.close();
    };
  }, [userId]);

  return;
}

export default WebSocketComponent;

5 心跳保活

任何客户端-服务端模式的长连接通信都需要确认对方的状态,是处于长时间空闲还是挂了,挂了就需要关闭连接避免长时间占用造成资源的浪费,或者及时重连。心跳保活是常用的一种方式,一般是一方每隔一段时间发送一段空数据,如果能接收到对方的回复则重置心跳时间,如果超时就需要断开连接或重连。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐