背景

接手了一个系统,该使用了HAProxy + Sentinel +redis方案,该方案在redis发生主从切换后,因为应用层启用了连接池,老连接连的仍然是redis的“老主节点”。当应用层获取到这些老连接进行写操作的时候,会抛出异常。
org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: READONLY You can’t write against a read only replica.

该方案如下

在这里插入图片描述

redis使用使用哨兵模式进行组网,哨兵负责主节点的故障转移。
HAProxy作为Redis集群的代理(HAProxy工作在TCP层),屏蔽底层redis的组网细节,对上层应用来看就是单节点的redis。
从下面的配置可以看到,每次应用层创建新连接时,HAproxy会轮训所有节点,探测到主节点后,代应用层发起连接。

defaults
  mode tcp

frontend redis
bind *:16382 name redis
  default_backend redis
  
backend redis
option tcp-check
tcp-check connect
tcp-check send AUTH\ xxxxxxxxx\r\n
tcp-check expect string +OK
tcp-check send PING\r\n
tcp-check expect string +PONG
tcp-check send info\ replication\r\n
tcp-check expect string role:master
tcp-check send QUIT\r\n
tcp-check expect string +OK
server redis_6382_1 xx.xx.xx.xx:6382 check inter 1s
server redis_6382_2 xx.xx.xx.xx:6382 check inter 1s
server redis_6382_3 xx.xx.xx.xx:6382 check inter 1s
server redis_6382_4 xx.xx.xx.xx:6382 check inter 1s

该方案的问题

不像http协议连接被设计成无状态。redis的协议较简单,且是有状态的。在执行操作前得先认证(认证是可选的),认证完了后,可以长期保持连接,进行交互,除非服务端主动断开连接(不深究,可能服务端设置了tcp参数,长期无响应的连接会被服务端关闭)。
为了减少创建连接带来的消耗,jedis设计了连接池,创建出来的连接只要能够正常访问,连接每次用完后会被回收到连接池中进行复用。
哨兵模式通过心跳检测redis主节点是否发生故障,然后进行故障转移(主从切换)。发生主从切换时,并不一定是redis挂了,有时候可能redis的负债过重,无法及时响应哨兵的PING命令。
此时虽然哨兵认为redis挂了,但应用层之前建立的连接还是能够正常连通的,jedis认为该连接仍然可用回收的时候不会进行销毁。不过老连接进行写操作时会导致异常,因为老连接连的是“老主节点”,发生主从切换后,“老主节点”变成从节点且只读。

为什么主从切换后jedis没有销毁老的连接

一般连接池都会对连接进行有效性校验,防止应用层拿到了不可用的连接。
该校验可以在连接回收的时候,也可以在应用层获取连接前。
开启连接池后,获取连接的代码,可以追溯到redis.clients.jedis.JedisPool#getResource

  @Override
  public Jedis getResource() {
    Jedis jedis = super.getResource();
    jedis.setDataSource(this);
    return jedis;
  }

跟踪super.getResource()最终可以定位到org.apache.commons.pool2.impl.GenericObjectPool#borrowObject()

    public T borrowObject(long borrowMaxWaitMillis) throws Exception {
         ... ...
         省略无关代码
                
                # 可以看到对连接进行有效性校验是个可选项,配置后开会开启
                if (p != null && this.getTestOnBorrow()) {
                    boolean validate = false;
                    Throwable validationThrowable = null;

                    try {
                        validate = this.factory.validateObject(p);
                    }
                ... ...
                }
                ... ...
    }

factory.validateObject()追溯到redis.clients.jedis.JedisFactory#validateObject
可以看到仅仅只是通过PONG命令探测连接是否连通,所以即使发生了主从切换,从jedis的角度来看连接仍然是有效的,这些连接不会被主动销毁。

  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }

问题复现

人工模拟哨兵的主从切换。

  1. 去掉Sentinel的监控(kill掉Sentinel进程)
  2. 找一个从节点,执行slaveof no one
  3. 剩余的节点执行,slaveof 新节点

方案选型

Jedis目前支持哨兵模式了,不过我们系统无法容忍长时间的主备变更信息延时通知到应用层,因此需要审视jedis的源码,确认集群发生主从切换后,是否能够快速通知到应用层并清理老连接。
测试代码如下:

	public static void main(String[] args) throws JsonProcessingException {
		// SpringApplication.run(DemoApplication.class, args);
		RedisTemplate<String,String> redisTemplate = new RedisTemplate();
		Set<String> setRedisNode = new HashSet<>();
		# 哨兵的IP:Port
		setRedisNode.add("xx.xx.xx.xx:26379");
		setRedisNode.add("xx.xx.xx.xx:26381");
		setRedisNode.add("xx.xx.xx.xx:26382");
		setRedisNode.add("xx.xx.xx.xx:26383");
		# "mymaster" 对应sentinel.conf中"sentinel monitor mymaster xx.xx.xx.xx 6383 2"
		# 一组哨兵是可以同时监控N组redis集群的,"mymaster"代表的某一组集群主节点的别名,
		# RedisTemplate只能操作单组集群,所以RedisSentinelConfiguration需要指定"mymaster"
		RedisSentinelConfiguration redisSentinelConfiguration = new RedisSentinelConfiguration("mymaster", setRedisNode);
		JedisPoolConfig config = new JedisPoolConfig();
		JedisConnectionFactory connectionFactory = new JedisConnectionFactory(redisSentinelConfiguration,config);
		connectionFactory.afterPropertiesSet();
		redisTemplate.setConnectionFactory(connectionFactory);
		redisTemplate.afterPropertiesSet();
		redisTemplate.opsForValue().set("aaa","bbb");
		while(true){}
	}

跟踪调用链,发现JedisConnectionFactory (afterPropertiesSet方法)在初始化的过程中会做以下动作:

  1. 向哨兵进程查询当前主节点
  2. 使用异步线程监听哨兵发过来的事件,如果是主从切换事件,则立马更新主节点,并清理连接池。
 # 代码有删减
  public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory,
      final JedisClientConfig sentinelClientConfig) {
    # 查询当前主节点
    HostAndPort master = initSentinels(sentinels, masterName);
    # 设置当前主节点
    initMaster(master);
  }

JedisSentinelPool#initSentinels方法详解,代码有删减

  private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String masterName) {
    HostAndPort master = null;
    boolean sentinelAvailable = false;
    
    # 防止部分哨兵不可用
    for (HostAndPort sentinel : sentinels) {
      Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) 
      # 连接哨兵进程并通过"SENTINEL get-master-addr-by-name mymaster"命令查询当前主节点
      List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
      sentinelAvailable = true;
      if (masterAddr == null || masterAddr.size() != 2) {
          continue;
      }
      master = toHostAndPort(masterAddr);
      break;
    }
    # MasterListenerThread的子类
    # 是通过异步线程去检测主从切换事件,然后及时更新主节点,清理连接池
    for (HostAndPort sentinel : sentinels) {
      MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }
    return master;
  }

MasterListener 的run方法,代码有删减

MasterListener是JedisSentinelPool内部类,因此能够操作JedisSentinelPool的变量(masterName)和方法(initMaster)
感兴趣的同学可以继续深入JedisPubSub代码,可以发现如下:

  • 主从切换事件是哨兵进程主动推送过来的,所以能够保证实时性
    因为redis协议是双工的,所以服务端可以主动推数据给客户端。
    监听事件的过程就是,创建socket连接,然后读取数据进行解析。
    不发生主从切换事件时,没有数据推送过来,线程会阻塞在read操作上面。所以虽然run方法是死循环,但是并不会占用cpu时间。
    public void run() {
      while (running.get()) {
          final HostAndPort hostPort = new HostAndPort(host, port);
          # 连接本MasterListener关注的哨兵进程
          j = new Jedis(hostPort, sentinelClientConfig);
          
          # 监听+switch-master事件
          j.subscribe(new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
              String[] switchMasterMsg = message.split(" ");
              if (masterName.equals(switchMasterMsg[0])) {
                initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); 
              } 
            }
          }, "+switch-master");

  }

JedisSentinelPool#initMaster

如果和当前主节点信息不一致,这更新主节点信息,并清理连接池

  private void initMaster(HostAndPort master) {
    synchronized (initPoolLock) {
      if (!master.equals(currentHostMaster)) {
        currentHostMaster = master;
        factory.setHostAndPort(currentHostMaster);
        clearInternalPool();
      }
    }
  }

jedis的哨兵模式并没有实现读写分离,仅仅只与主节点建立连接。所以没办法完全挖掘集群的性能。

可以通过redisTemplate.opsForValue().set(“aaa”,“bbb”);去跟踪新连接的创建过程(非复用连接池中的连接)。
最终会追踪到JedisConnectionFactory#fetchJedisConnector方法

	protected Jedis fetchJedisConnector() {
	# jedis会开启连接池
	    if (getUsePool() && pool != null) {
			return pool.getResource();
		}
		Jedis jedis = createJedis();
		jedis.connect();
		return jedis;
	}

JedisSentinelPool负责连接的创建,JedisSentinelPool持有主节点信息

  public Jedis getResource() {
    while (true) {
      Jedis jedis = super.getResource();
      jedis.setDataSource(this);
      final HostAndPort master = currentHostMaster;
      final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient()
          .getPort());

      if (master.equals(connection)) {
        return jedis;
      } else {
        returnBrokenResource(jedis);
      }
    }
  }
GitHub 加速计划 / sentine / Sentinel
22.24 K
7.98 K
下载
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
最近提交(Master分支:2 个月前 )
195150bc * fix issue 2485 which occur oom when using async servlet request. * optimize imports * 1. fix the same issue in the webmvc-v6x 2. improve based on review comments 2 个月前
b78b09d3 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐