业务需要使用到一个websocket
于时创建了如下的websocket接口
在这里插入图片描述

使用测试工具测试
测试工具

Debug

本地测试正常,于时打到了服务器上,发现请求没有响应;
原来是没有配置nginx反向代理,
但是请求还是不通,再次debug,
发现请求被spring拦截了,不对啊。websocket怎么会进拦截器呢,查询得知在转发ws请求的时候需要把ws对http的升级给传递下去

然后对服务器上的nginx配置进行了修改,
如下

        location /infoCar/ {
            client_max_body_size  300m;
            proxy_connect_timeout 300s;
            proxy_send_timeout 300s;
            proxy_read_timeout 300s;
            proxy_http_version 1.1; // 添加
            proxy_set_header Upgrade "websocket"; // 添加
#           proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade"; // 添加
            proxy_pass   http://192.168.1.9:18082;
                }
  }

发起请求:
会话成功进了请求,,流程也走下来了,但是,请求走到方法末尾的时候就显示websocket断开连接并且响应的消息也无法到达,同时断开连接的时候报错EOFF这是一个通讯流意外终止的异常;

后来才想到我这个请求经过了两个nginx的反向代理,在入口nginx的时候也需要配置http升级协议请求头;否则在响应的时候,经过入口nginx的反向代理又把里面加上的升级请求头给清除了,所以响应的时候就找不到连接造成了这个错误,
所以在入口nginx重新添加了请求头,
在这里插入图片描述

成功访问
在这里插入图片描述

websocket部分的代码笔记

/**
 * @author guochao
 */
@ServerEndpoint("/infoCard/{token}")
@Component
@Lazy
public class WebSocketServer {

    static Log log = LogFactory.get(WebSocketServer.class);
    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    private final WebSocketService webSocketService = SpringUtil.getBean(WebSocketService.class);
    private final CheckToken checkToken = SpringUtil.getBean(CheckToken.class);

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token) {
        User user = checkToken.queryUserByToken(token);
        if (StringUtils.isBlank(token) || ObjectUtils.isEmpty(user)) {
            throw new RuntimeException("token无效");
        }
        this.userId = String.valueOf(user.getId());
        this.session = session;
        log.info("连接成功,用户:{}", userId);
        System.out.println();
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            webSocketMap.put(userId, this);
            //加入set中
        } else {
            webSocketMap.put(userId, this);
            //加入set中
            addOnlineCount();
            //在线数加1
        }
        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
        //连接成功时
        String msgJson = JSONObject.toJSONString(new WebSocketMsgView(true, 20000,
                WsRespTypeEnum.OPEN_SUCCESS.getType(), "open_success"));
        try {
            sendMessage(msgJson);
            //sendInfo(userId,msgJson);
        } catch (IOException e) {
            log.error("用户:" + userId + ",网络异常!!!!!!");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        System.out.println("连接关闭");
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("token") String token) throws IOException {
        log.info("用户消息:" + userId + ",报文:" + message);
        //可以群发消息
        //消息保存到数据库、redis
        if (StringUtils.isNotBlank(message)) {
            try {
                //解析发送的报文,封装为ws来参obj
                WebSocketParam webSocketParam = JSON.parseObject(message).toJavaObject(WebSocketParam.class);
                User user = checkToken.queryUserByToken(webSocketParam.getToken());
                //根据类型获取对应的响应信息
                WsRequest wsRequest = webSocketService.wsController(user, webSocketParam);
                //编辑接收人
                String recUser = wsRequest.getRecUser().toString();
                //生成json
                String jsonResult = JSONObject.toJSONString(wsRequest.getWebSocketMsgView());
                //发送json
                if (StringUtils.isNotBlank(recUser) && webSocketMap.containsKey(recUser)) {
                    webSocketMap.get(recUser).sendMessage(jsonResult);
                } else {
                    sendInfo(userId,JSON.toJSONString(webSocketService.userUnOnline(Long.parseLong(userId)).getWebSocketMsgView()));
                    log.error("请求的userId:" + recUser + "不在该服务器上");
                }
            } catch (JSONException e) {
                WebSocketMsgView webSocketMsgView = webSocketService.badParam(Long.parseLong(userId)).getWebSocketMsgView();
                sendInfo(userId,JSON.toJSONString(webSocketMsgView));
                log.error("报文解析异常,报文:{},会话:{},异常:",message,session,e);
            } catch (Exception e){
                log.error("websocketError:",e);
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 发送自定义消息
     */
    public static void sendInfo(@PathParam("userId") String userId,String message) throws IOException {
        log.info("发送消息到:" + userId + ",报文:" + message);
        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            webSocketMap.get(userId).sendMessage(message);
        } else {
            log.error("用户" + userId + ",不在线!");
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

config


/**
 * 开启WebSocket支持
 * @author guochao
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

业务类



@Service
@Slf4j
public class WebSocketService {


    /**
     * ws消息分发处理器
     *
     * @return
     */
    public WsRequest wsController(User user,WebSocketParam webSocketParam) {
        try {
            WsRequest wsRequest=null;
            //请求分发
            switch (webSocketParam.getType()) {
                //类型4,关闭QR信息
                case 4:
                    wsRequest = closeQr(webSocketParam);
                    break;
                //类型2,更新针对指定用户的权限
                case 3:
                    wsRequest = updateAuth(webSocketParam);
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + webSocketParam.getType());
            }
            return wsRequest;
        } catch (IllegalStateException e) {
            log.error("有客户端传输了错误的类型匹配");
            return badParam(webSocketParam.getRecUserId());
        }
    }

    /**
     * 更新权限并刷新页面消息
     * @param webSocketParam
     * @return
     */
    protected WsRequest updateAuth(WebSocketParam webSocketParam){
        WebSocketMsgView webSocketMsgView = new WebSocketMsgView();
        webSocketMsgView.setCode(20000);
        webSocketMsgView.setFlag(true);
        webSocketMsgView.setType(WsRespTypeEnum.UPDATE_MSG.getType());
        webSocketMsgView.setMsg("updateAuth");
        return new WsRequest(webSocketParam.getRecUserId(),webSocketMsgView);
    }


    /**
     * 关闭二维码消息
     * @param webSocketParam
     * @return
     */
    protected WsRequest closeQr(WebSocketParam webSocketParam){
        WebSocketMsgView webSocketMsgView = new WebSocketMsgView();
        webSocketMsgView.setCode(20000);
        webSocketMsgView.setFlag(true);
        webSocketMsgView.setType(WsRespTypeEnum.CLOSE_QR.getType());
        webSocketMsgView.setMsg("closeQr");
       return new WsRequest(webSocketParam.getRecUserId(),webSocketMsgView);
    }

    /**
     * 参数异常响应消息
     * @param userId
     * @return
     */
    public WsRequest badParam(Long userId){
        WebSocketMsgView webSocketMsgView = new WebSocketMsgView();
        webSocketMsgView.setCode(50000);
        webSocketMsgView.setFlag(false);
        webSocketMsgView.setType(WsRespTypeEnum.BAD_PARAM.getType());
        webSocketMsgView.setMsg("badParam");
        return new WsRequest(userId,webSocketMsgView);
    }

    /**
     * 用户不在线响应消息
     * @param userId
     * @return
     */
    public WsRequest userUnOnline(Long userId){
        WebSocketMsgView webSocketMsgView = new WebSocketMsgView();
        webSocketMsgView.setCode(50000);
        webSocketMsgView.setFlag(false);
        webSocketMsgView.setType(WsRespTypeEnum.USER_UN_ONLINE.getType());
        webSocketMsgView.setMsg("userUnOnline");
        return new WsRequest(userId,webSocketMsgView);
    }
}

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐