1.首先pom引入websocket

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

SpringBoot和WebSocket是没有token鉴权的,需要自己定义实现WebSocket建立连接时的鉴权,websocket的实现方式就和一般博客看到的不一样了,本文自定义websocket配置:WebSocketConfig implements WebSocketConfigurer,

websocket的拦截器:CustomWebsocketInterceptor extends HttpSessionHandshakeInterceptor,

websocket的Handler:CustomWebSocketHandler extends TextWebSocketHandler

2.配置WebSocketConfig

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private CustomWebsocketInterceptor customWebsocketInterceptor;

    @Resource
    private CustomWebSocketHandler customWebSocketHandler;


    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry
                // 设置处理器处理/custom/**
                .addHandler(customWebSocketHandler,"/wsTest/websocket")
                // 允许跨越
                .setAllowedOrigins("*")
                // 设置监听器
                .addInterceptors(customWebsocketInterceptor);
    }

}

建立连接的url
wss://websocket.test.com/wsTest/websocket
注意,使用wss调用需要在nginx配置ssl证书,否则部署到服务器是无法建立websocket连接的。

3.Nginx配置如下,

upstream wsTestServer {
	ip_hash;
	server 10.*.*.92:8008;
	server 10.*.*.92:8008;
}

server {
        # cloudflare
		listen 80;
        listen       443 ssl;
        server_name  websocket.test.com;
        # 证书路径:
        ssl_certificate      /etc/nginx/cert/ssl-c.crt;
        ssl_certificate_key  /etc/nginx/cert/ssl-k.key;

        ssl_session_cache    shared:SSL:1m;
        ssl_session_timeout  5m;

        ssl_ciphers  HIGH:!aNULL:!MD5;
        ssl_prefer_server_ciphers  on;
		
		location  /wsTest/websocket {
			proxy_pass http://wsTestServer;
			
			proxy_set_header  Host $host;
			proxy_http_version 1.1;
			proxy_set_header X-Client-IP $remote_addr;
			proxy_set_header Upgrade $http_upgrade;
			proxy_set_header Connection "upgrade";
			proxy_set_header X-Real-IP $remote_addr;
			proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $http_host;
            proxy_set_header Upgrade $http_upgrade;
			proxy_connect_timeout 60;
			proxy_read_timeout 600;

			#允许跨域请求的域,* 代表所有
			add_header 'Access-Control-Allow-Origin' *;
			#允许带上cookie请求
			add_header 'Access-Control-Allow-Credentials' 'true';
			#允许请求的方法,比如 GET/POST/PUT/DELETE
			add_header 'Access-Control-Allow-Methods' *;
			#允许请求的header
			add_header 'Access-Control-Allow-Headers' *;
		}
}

其中要监听listen 80端口,尤其注意的是要注意这下面2个配置必须加上,否则建立连接成功立刻就会断掉。
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;

从nginx配置看,进行了负载均衡,服务进行了集群部署,项目部署在多台机器上,如此,当一个客户端用户和一台机器建立了链接,客户端自然是不会再去和其他机器建立连接,同时一般建立连接成功时,还会断开旧的连接,这样就不能和新的机器建立连接。
解决方案
1、使用消息中间件解决websocket session共享问题。
2、使用redis的发布订阅模式解决
我采用了方案2,

收到客户端发送到服务器发给接收者的消息时,

调用:

JSONObject obj = new JSONObject(); 
obj.put("title", "我是标题");
obj.put("smsContent","我是内容");
customWebSocketHandler.dealNodeSession(userId, obj);

    public void dealNodeSession(String uid, JSONObject message){
        boolean isSendMessage = sendMessage(uid, message.toJSONString());
        // 如果当前服务发送成功就不再同志其他服务订阅发送
        if (isSendMessage){
            log.info("连接对象【{}】在本服务发送消息",uid);
            return;
        }
        log.info("连接对象【{}】在本服务未建立连接,通知其他服务发送消息",uid);
        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("userId",uid);
            jsonObject.put("message",message);
            redisService.convertAndSend(RedisConstants.WETECH_REDIS_CHANNEL, jsonObject);
        } catch (Exception e) {
           log.error("dealNodeSession Exception: {}",e);
        }
    }

4.Redis发布订阅的配置

@Configuration
public class MessageConfig {

    /**
     * 配置 RedisMessageListenerContainer 监听器容器
     *
     * @param connectionFactory 连接工厂
     * @param listenerAdapter   消息侦听器适配器
     * @return
     */

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 消息主题列表
        List<Topic> topicList = new ArrayList<>();
        // 订阅 “wsTopic” 频道
        topicList.add(new ChannelTopic(RedisConstants.WETECH_REDIS_CHANNEL));
        // 订阅 “wsTopic2” 模式
        // topicList.add(new PatternTopic((RedisConstants.REDIS_CHANNEL2));
        // 将消息侦听器添加到容器中
        container.addMessageListener(listenerAdapter, topicList);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        // 消息监听适配器
        return new MessageListenerAdapter(receiver, "onMessage");
    }
}

5.Redis订阅监听

/**
 * redis消息监听对象,接收订阅消息
 */
@Component
@Slf4j
public class RedisReceiver implements MessageListener {

    @Autowired
    private CustomWebSocketHandler customWebSocketHandler;

    /**
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
       // log.info("redisReceiver onMessage message: {}",message);
        String channel = new String(message.getChannel());// 订阅的频道名称
        String msg = "";
        try {
            msg = new String(message.getBody(), Constants.UTF8);//注意与发布消息编码一致,否则会乱码
            if (!StringUtils.isEmpty(msg)) {
                if (RedisConstants.WETECH_REDIS_CHANNEL.equals(channel)) {  // 最新消息
                   String replaceAll = msg.replaceAll("\\\\", "");

                    JSONObject jsonObject = JSONObject.parseObject(replaceAll);
                    if (jsonObject == null) {
                        log.info("redisReceiver onMessage redisMessage is null");
                        return;
                    }
                     String userId = (String) jsonObject.get("userId");
                    JSONObject obj=jsonObject.getJSONObject("message");
                    log.info("redisReceiver onMessage userId: {}",userId);
                    customWebSocketHandler.sendMessage(userId, obj.toJSONString());
                } else {
                    // 其他订阅的消息处理
                }
            } else {
                log.info("redisReceiver onMessage msg is null");
            }
        } catch (Exception e) {
            log.error("redisReceiver exception:" + e.toString());
        }
    }
}

6.websocket的拦截器:CustomWebsocketInterceptor


/**
 * @author wangchengfeng
 * 用来处理webscocket拦截器
 */
@Component
@Slf4j
public class CustomWebsocketInterceptor extends HttpSessionHandshakeInterceptor {

    @Autowired
    private UserService userService;

    /**
     * 建立连接时
     *
     * @param request    the current request
     * @param response   the current response
     * @param wsHandler  the target WebSocket handler
     * @param attributes the attributes from the HTTP handshake to associate with the WebSocket
     *                   session; the provided attributes are copied, the original map is not used.
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler
            , Map<String, Object> attributes) {
        try {
            ServletServerHttpRequest req = (ServletServerHttpRequest) request;
            ServletServerHttpResponse res = (ServletServerHttpResponse) response;
            String token = req.getServletRequest().getHeader("Sec-WebSocket-Protocol");
            if (StringUtils.isBlank(token)) {
                log.error("CustomWebsocketInterceptor beforeHandshake token is empty");
                return false;
            }

            UserInfo model = userService.queryUserInfoByToken(token);
            if (model == null) {
                log.error("CustomWebsocketInterceptor beforeHandshake userInfoModel is empty");
                return false;
            }

            attributes.put("userId", model.getUid());
            log.info("attributes:{}", attributes);
            //在后端握手时设置一下请求头(Sec-WebSocket-Protocol),前端发来什么授权值,这里就设置什么值,不设置会报错导致建立连接成功后立即被关闭
            res.getServletResponse().setHeader("Sec-WebSocket-Protocol", token);

            /**
             * 鉴权: return false 不通过
             *  response.setStatusCode(HttpStatus.UNAUTHORIZED);
             *  return false;
             */
            super.setCreateSession(true);
            return super.beforeHandshake(request, response, wsHandler, attributes);
        } catch (Exception e) {
            log.info("beforeHandshake Exception:{}", e);
        }
        return false;
    }

    /**
     * 成功建立连接后
     *
     * @param request   the current request
     * @param response  the current response
     * @param wsHandler the target WebSocket handler
     * @param exception an exception raised during the handshake, or {@code null} if none
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                               Exception exception) {
        log.info("连接成功....");
        //其他业务代码
        super.afterHandshake(request, response, wsHandler, exception);
    }
}

7.websocket的处理器CustomWebSocketHandler

@Slf4j
@Component
public class CustomWebSocketHandler extends TextWebSocketHandler {

    @Autowired
    private RedisService redisService;

    /**
     * 当前websocket连接集合
     */
    public static final ConcurrentHashMap<String, WebSocketSession> WEB_SOCKET_SESSION_MAP = new ConcurrentHashMap<>();

    /**
     * 收到客户端消息时触发的回调
     *
     * @param session 连接对象
     * @param message 消息体
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        log.info("接受到消息【{}】的消息:{}", session.getId(), message.getPayload());
        String messagePayload = message.getPayload();
        JSONObject jsonObject = JSONObject.parseObject(messagePayload);
        if (jsonObject != null && jsonObject.get("wsMsgType").equals("heartBeat")) {
            String uid = (String) jsonObject.get("uid");
            JSONObject obj = new JSONObject();
            obj.put("heartBeatResult", "done");
            dealNodeSession(uid, obj);
        }
    }

    /**
     * 建立连接后触发的回调
     *
     * @param session 连接对象
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        try {
            String sessionId = getSessionId(session);
            if (StringUtils.isBlank(sessionId)){
                log.error("建立了连接时 sessionId is null");
                return;
            }
            // 如果存在则断开连接
            if (WEB_SOCKET_SESSION_MAP.containsKey(sessionId)) {
                WEB_SOCKET_SESSION_MAP.get(sessionId).close();
                WEB_SOCKET_SESSION_MAP.remove(sessionId);
            }
            // 将新连接添加
            WEB_SOCKET_SESSION_MAP.put(sessionId, session);
            log.debug("与【{}】建立了连接", sessionId);
            //sendMessage(sessionId, sessionId);
            log.debug("attributes:{}", session.getAttributes());
        } catch (IOException e) {
            log.error("建立连接后触发的回调【{}】,status:{}", getSessionId(session));
        }
    }

    /**
     * 断开连接后触发的回调
     *
     * @param session 连接对象
     * @param status  状态
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        log.info("连接对象【{}】断开连接,status:{}", getSessionId(session), status.getCode());
        try {
            String sessionId = getSessionId(session);
            if (StringUtils.isBlank(sessionId)){
                return;
            }
            // 关闭连接
            session.close(CloseStatus.SERVER_ERROR);
            // 删除对象
            WEB_SOCKET_SESSION_MAP.remove(sessionId);
        } catch (IOException e) {
            log.error("连接对象【{}】断开连接,IOException :{}", e);
        }
    }

    /**
     * 传输消息出错时触发的回调
     *
     * @param session   连接对象
     * @param exception 异常
     * @throws Exception 异常
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
          log.info("连接对象【{}】发生错误,exception:{}", session.getId(), exception.getMessage());
        // 如果发送异常,则断开连接
        String sessionId = getSessionId(session);
        if (StringUtils.isBlank(sessionId)){
            log.error("建立了连接时 sessionId is null");
            return;
        }
        try {
            if (session.isOpen()) {
                session.close();
                WEB_SOCKET_SESSION_MAP.remove(sessionId);
            }
        } catch (IOException e) {
            log.error("连接对象【{}】发生错误,IOException :{}", e);
        }
    }

    /**
     * 自定义判断 sessionId
     *
     * @param session 连接对象
     * @return sessionId
     */
    private String getSessionId(WebSocketSession session) {
        return (String) session.getAttributes().get("userId");
    }

    public void dealNodeSession(String uid, JSONObject message){
        boolean isSendMessage = sendMessage(uid, message.toJSONString());
        // 如果当前服务发送成功就不再同志其他服务订阅发送
        if (isSendMessage){
            log.info("连接对象【{}】在本服务发送消息",uid);
            return;
        }
        log.info("连接对象【{}】在本服务未建立连接,通知其他服务发送消息",uid);
        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("userId",uid);
            jsonObject.put("message",message);
            redisService.convertAndSend(RedisConstants.WETECH_REDIS_CHANNEL, jsonObject);
        } catch (Exception e) {
           log.error("dealNodeSession Exception: {}",e);
        }
    }

    /**
     * 发送消息
     *
     * @param sessionId 对象id
     * @param message   消息
     * @throws IOException IO
     */
    public boolean sendMessage(String sessionId, String message) {
        WebSocketSession webSocketSession = WEB_SOCKET_SESSION_MAP.get(sessionId);
        if (webSocketSession == null) {
            log.error("连接对象【{}】未在该服务建立连接,不发送消息{}", sessionId,message);
            return false;
        }
        if (!webSocketSession.isOpen()) {
            log.error("连接对象【{}】已关闭 无法送消息:{}", sessionId, message);
            return false;
        } else {
            try {
                webSocketSession.sendMessage(new TextMessage(message));
                log.info("sendMessage:向{}发送消息", sessionId);
                return true;
            } catch (IOException e) {
                log.error("sendMessage IOException:向{}发送消息:{}", sessionId, message);
                return false;
            }
        }
    }

    /**
     * 获取所有的连接对象ID
     *
     * @return ids
     */
    public List<String> getSessionIds() {
        Enumeration<String> keys = WEB_SOCKET_SESSION_MAP.keys();
        List<String> ks = new ArrayList<>();
        while (keys.hasMoreElements()) {
            ks.add(keys.nextElement());
        }
        return ks;
    }
}

8.向接收者发送websocket通知

@RestController
@RequestMapping("/chat")
@Slf4j
public class ChatController {

    @Resource
    private CustomWebSocketHandler customWebSocketHandler;

    /**
     * 添加聊天记录,web
     */
    @PostMapping("/add/record")
    public void addChatRecord(@RequestBody AddChatRecordDto dto) {
        JSONObject obj = new JSONObject();
        obj.put("chatDialogResult", dto);
        customWebSocketHandler.dealNodeSession(dto.getUid(), obj);
    }
}

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐