WebSocket 实现长连接及通过WebSocket获取客户端IP

WebSocket 是一种支持双向通讯的网络通信协议。

实现过程:

1 添加ServerEndpointExporter配置bean

@Configuration
public class WebSocketConfig {

    // 自动注册使用了@ServerEndpoint**注解声明的Websocket endpoint
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

2 实现过程

需求是通过WebSocket,建立长连接,并获取当前在线的人数。通过Websocket 不断发送消息,建立长连接,给Session续命。我是通过MAC地址,区分不同的设备,因为我的需求中需要一个账号能够登录多台机器。所以我通过MAC地址用于标识不同的设备信息。(若是一个账号只能登陆一次,采用用户ID)

1 . 添加配置

@ServerEndpoint(value = "/websocket/onlineAme/{Mac}")

2. 主要方法

  • @OnOpen

    • 首次建立连接时,运行该注解下的方法。
    • 在此方法中可以获取websocket的session
    • 并将该用户的Mac 以及 Session存放到
    private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
    
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException {
        log.info("【Ame websocket 链接成功】,Ame mac:"+ mac);
        session.setMaxIdleTimeout(sessionTimeout);
        // 获取客户端的Ip
        if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){
            log.error("并未上传设备信息");
        }
        setMap(session,mac);
    }
    
    private void setMap(Session session,String mac){
        sessionMap.put(mac,session);
        log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size());
    }
    
  • @OnMessage

    • 该方法是客户端与服务端进行通讯。

    • 每次客户端与服务端建立通讯时,会给Session续命,延长Session的时常

      @OnMessage
      public void onMessage(Session session, String msg)  {
              session.setMaxIdleTimeout(sessionTimeout);
              if(StringUtils.isBlank(msg)){
                  return;
              }
              // 判断 MAC 地址 是否 是正在上线
              String mac = getMACBySession(session);
              if(StringUtils.isBlank(mac)){
                  return;
              }
              // 将上传的msg转化为 AmeServicePack
      		handleAmeMsg(mac,ame);
      }
      private String getMACBySession(Session session){
          String mac = getUserIdBySession(session);
          if(ObjectUtil.isNull(mac)){
              return null;
          }
          return mac;
      }
      private String getUserIdBySession(Session session){
          for (String mac : sessionMap.keySet()) {
              /*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比  */
              if(sessionMap.get(mac).getId().equals(session.getId())){
                  return mac;
              }
          }
          return null;
      }
      
  • @OnClose

    @OnClose
    public void onClose(Session session,@PathParam(value = "Mac") String mac) {
        removeMap(session);
        log.info("【websocket退出成功】该设备退出:"+mac);
    }
    
    private void removeMap(Session session){
        String mac = getUserIdBySession(session);
        if(ObjectUtil.isNull(mac)){
            return;
        }
        sessionMap.remove(mac);
        //userMap.remove(userId);
        removeAme(mac);
    }
    
    private void removeAme(String mac){
        ameHashMap.remove(mac);
        sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac);
    }
    
  • @OnError

    @OnError
    public void onError(Session session,Throwable throwable) {
        log.error("websocket: 发生了错误");
        removeMap(session);
        throwable.printStackTrace();
    }
    
  • 向某一个用户发送消息

     /*
       发送自定义消息
         向某一个用户发送,消息*/
    public static void sendInfo(String message,String toMac){
        log.info("发送消息:{},内容是:{}",message,toMac);
        if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){
            log.error("消息不完整");
            return;
        }
        // 包含就发送
        //System.out.println(sessionMap.containsKey(toUserId));
        if(sessionMap.containsKey(toMac)){
            try {
                sendMessage(sessionMap.get(toMac),message);
            }catch (Exception e){
                log.error("发送给用户{}的消息出错",toMac);
            }
        }
        // 用户不在线
        else {
            log.error("设备{}不在线",toMac);
        }
    
    }
    
    public static void sendMessage(Session session,String message) throws IOException {
        session.getBasicRemote().sendText(message);
    }
    

3 通过WebSocket获取 请求Ip地址

Websocket 中的request中并没有header 中并没有客户端的Ip地址,但是在SpringCloud中,是通过网关,路由转发。在网关中的请求的request中存在Ip地址,可以通过拦截器,获取网关的ip然后将request放到websocket的request中。

3.1 拦截器
package com.mam.gateway.filter;

import jdk.nashorn.internal.runtime.regexp.joni.Config;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.net.InetSocketAddress;
import java.util.Objects;

/**
 * 获取 WebSocket 上传Session的 Ip信息
 */
@Component
public class SessionFilter extends AbstractGatewayFilterFactory<SessionFilter.Config> {
    public SessionFilter()
    {
        super(SessionFilter.Config.class);
    }

    @Override
    public String name()
    {
        return "SessionFilter";
    }

    @Override
    public GatewayFilter apply(SessionFilter.Config config) {
        return new GatewayFilter() {
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                ServerWebExchange mutatedServerWebExchange = exchange.mutate().request(exchange.getRequest()).build();
                return chain.filter(mutatedServerWebExchange);
            }
        };
    }
    static class Config
    {
        private Integer order;
        public Integer getOrder()
        {
            return order;
        }
        public void setOrder(Integer order)
        {
            this.order = order;
        }
    }
}

3.2 ServerEndpointConfig 的配置
public class WebSocketConfigurator extends ServerEndpointConfig.Configurator{
    private static final Logger log = LoggerFactory.getLogger(WebSocketConfigurator.class);

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        Map<String, Object> attributes = sec.getUserProperties();
        try{
            String clientIp = IpUtils.getIpAddrByHandshakeRequest(request.getHeaders());
            attributes.put("clientIp",clientIp);
            log.info("websocker拦截器X-Real_IP{}header{}",request.getHeaders().get("X-Real_IP"),request.getHeaders().toString());
        }catch (Exception e){
            e.printStackTrace();
        }
        super.modifyHandshake(sec,request,response);
    }
}
public static String getIpAddrByHandshakeRequest(Map<String, List<String>> map)
{
    if (map == null)
    {
        return null;
    }

    String ip = null;

    // X-Forwarded-For:Squid 服务代理
    String ipAddresses = Convert.toStr(map.get("X-Forwarded-For"));
    if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
    {
        // Proxy-Client-IP:apache 服务代理
        ipAddresses = Convert.toStr(map.get("Proxy-Client-IP"));
    }else {
        ipAddresses = ipAddresses.substring(1,ipAddresses.length()-1);
    }
    if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
    {
        // WL-Proxy-Client-IP:weblogic 服务代理
        ipAddresses = Convert.toStr(map.get("WL-Proxy-Client-IP"));
    }
    if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
    {
        // HTTP_CLIENT_IP:有些代理服务器
        ipAddresses = Convert.toStr(map.get("HTTP_CLIENT_IP"));
    }
    if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
    {
        // X-Real-IP:nginx服务代理
        ipAddresses = Convert.toStr(map.get("X-Real-IP"));
    }

    // 有些网络通过多层代理,那么获取到的ip就会有多个,一般都是通过逗号(,)分割开来,并且第一个ip为客户端的真实IP
    if (ipAddresses != null && ipAddresses.length() != 0)
    {
        ip = ipAddresses.split(",")[0];
    }
    return ip.equals("0:0:0:0:0:0:0:1") ? "127.0.0.1" : ip;
}

3.3 WebSocket 获取Ip
AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
Map<String, Object> userProperties = session.getUserProperties();
String clientip = (String) userProperties.get("clientIp");

4 完整代码

@Component
@ServerEndpoint(value = "/websocket/onlineAme/{Mac}",configurator = WebSocketConfigurator.class)
public class AmeLoginWebSocket {

    static Logger log = LoggerFactory.getLogger(AmeLoginWebSocket.class);

    /* 存储session */
    private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
    /* 存储 在线 ame服务 信息*/
    private  ConcurrentHashMap<String,AmeServicePack> ameHashMap = new ConcurrentHashMap<>();
    /* 存储 Ip  */

    private static final long sessionTimeout = 600000;


    /** 链接成功后发送消息**/
    @OnMessage
    public void onMessage(Session session, String msg)  {
        session.setMaxIdleTimeout(sessionTimeout);
        log.info("【websocket 接收成功】内容为"+msg);
        if(StringUtils.isBlank(msg)){
            return;
        }
        // 判断 MAC 地址 是否 是正在上线
        String mac = getMACBySession(session);
        if(StringUtils.isBlank(mac)){
            return;
        }
        AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
        Map<String, Object> userProperties = session.getUserProperties();
        String clientip = (String) userProperties.get("clientIp");
        // 将上传的msg转化为 AmeServicePack
        handleAmeMsg(mac,ame);
        
    }

    private void handleAmeMsg(String mac, AmeServicePack ameInfo) {

        log.info("Ame:MAC{}Ip{}:",mac,ameInfo.getAmeip());
        if(ameHashMap.containsKey(mac)){
            log.info("该设备{}Ip{}已上线",mac,ameInfo.getAmeip());
            sendInfo("该用户Ip"+ameInfo.getAmeip()+"已存在",mac);
        }else {
            ameHashMap.put(mac,ameInfo);
        }
    }

    private boolean updateOnline(AmeServicePack ameInfo){
        AjaxResult isonline = SpringUtils.getBean(IAmePackService.class).update(ameInfo,SecurityConstants.INNER);
        if((Integer)isonline.get("code")== 200){
            return true;
        }else {
            return false;
        }
    }

    /**
     * ameIp 为上线 但 任务表中仍有 正在运行的任务,并将其修改为 -2
     * @param ameIp
     */
    private void updateErrorTaskStatus(String ameIp){
        SpringUtils.getBean(IAmePackService.class).updateErrorAMEStatus(ameIp,SecurityConstants.INNER);
    }



    private void handlePCMsg(LoginUser loginUser, String msg) {
        log.info("系统用户:{},消息{}:",loginUser.getUsername(),msg);
    }
    private String getMACBySession(Session session){
        String mac = getUserIdBySession(session);
        if(ObjectUtil.isNull(mac)){
            return null;
        }
        return mac;
    }
    /*
     *成功建立连接后调用
     * @param [session, username]
     * @return void
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException {
        log.info("【Ame websocket 链接成功】,Ame mac:"+ mac);
        session.setMaxIdleTimeout(sessionTimeout);
        // 获取客户端的Ip


        if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){
            log.error("并未上传设备信息");
        }
        setMap(session,mac);
    }
    private void setMap(Session session,String mac){
        sessionMap.put(mac,session);
        log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size());
    }


    /*
     *关闭连接时调用
     * @param [userId]
     * @return void
     */
    @OnClose
    public void onClose(Session session,@PathParam(value = "Mac") String mac) {
        removeMap(session);
        log.info("【websocket退出成功】该设备退出:"+mac);
    }

    private void removeMap(Session session){
        String mac = getUserIdBySession(session);
        if(ObjectUtil.isNull(mac)){
            return;
        }
        sessionMap.remove(mac);
        //userMap.remove(userId);
        removeAme(mac);
    }
    private String getUserIdBySession(Session session){
        for (String mac : sessionMap.keySet()) {
            /*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比  */
            if(sessionMap.get(mac).getId().equals(session.getId())){
                return mac;
            }
        }
        return null;
    }
    private void removeAme(String mac){
        ameHashMap.remove(mac);
        log.info("{}:下线成功",ameInfo.getAmeip());
        sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac);
    }
    /*
     *发生错误时调用
     * @param [session, throwable]
     * @return void
     */
    @OnError
    public void onError(Session session,Throwable throwable) {
        log.error("websocket: 发生了错误");
        removeMap(session);
        throwable.printStackTrace();
    }

    /*
     发送自定义消息
     向某一个用户发送,消息
    */
    public static void sendInfo(String message,String toMac){
        log.info("发送消息:{},内容是:{}",message,toMac);
        if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){
            log.error("消息不完整");
            return;
        }
        // 包含就发送
        //System.out.println(sessionMap.containsKey(toUserId));
        if(sessionMap.containsKey(toMac)){
            try {
                sendMessage(sessionMap.get(toMac),message);
            }catch (Exception e){
                log.error("发送给用户{}的消息出错",toMac);
            }
        }
        // 用户不在线
        else {
            log.error("设备{}不在线",toMac);
        }

    }

    public static void sendMessage(Session session,String message) throws IOException {
        session.getBasicRemote().sendText(message);
    }
}

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐