客户端选择

redis 常用的连接客户端 有三个

  1. Jedis:是老牌的Redis的Java实现客户端,提供了比较全面的Redis命令的支持,
  2. Redisson:实现了分布式和可扩展的Java数据结构。
  3. Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。

spring data redis

  1. 如果未指定 redis client 则 spring-boot-autoconfigure.jar 默认取 LettuceConnectionConfiguration中的LettuceConnectionFactory
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration

	@Bean
	@ConditionalOnMissingBean(name = "redisTemplate")
	public RedisTemplate<Object, Object> redisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		RedisTemplate<Object, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration

	@Bean
	@ConditionalOnMissingBean(RedisConnectionFactory.class)
	public LettuceConnectionFactory redisConnectionFactory(
			ClientResources clientResources) throws UnknownHostException {
		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(
				clientResources, this.properties.getLettuce().getPool());
		return createLettuceConnectionFactory(clientConfig);
	}


spring redis Sentinel Lettuce 实现

先看 io.lettuce.core.RedisClient

 private <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI,
            Duration timeout) {
        assertNotNull(codec);
        checkValidRedisURI(redisURI);

        ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
        connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
        connectionBuilder.clientResources(clientResources);

        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);

        StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(endpoint, codec, timeout);

        logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());

        connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint))
                .connection(connection);
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            connectionBuilder.enablePingBeforeConnect();
        }

        if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {
            channelType(connectionBuilder, redisURI);
            try {
                getConnection(initializeChannelAsync(connectionBuilder));
            } catch (RuntimeException e) {
                connection.close();
                throw e;
            }
        } else {

            boolean connected = false;
            boolean first = true;
            Exception causingException = null;
            validateUrisAreOfSameConnectionType(redisURI.getSentinels());

            for (RedisURI uri : redisURI.getSentinels()) {
                if (first) {
                    channelType(connectionBuilder, uri);
                    first = false;
                }
                connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));

                if (logger.isDebugEnabled()) {
                    SocketAddress socketAddress = SocketAddressResolver.resolve(uri, clientResources.dnsResolver());
                    logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
                }
                try {
                    getConnection(initializeChannelAsync(connectionBuilder));
                    connected = true;
                    break;
                } catch (Exception e) {
                    logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + e.toString());
                    causingException = e;
                }
            }

            if (!connected) {
                connection.close();
                throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(),
                        causingException);
            }
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            connection.setClientName(redisURI.getClientName());
        }

        return connection;
    }

io.lettuce.core.protocol.ConnectionWatchdog

 @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        logger.debug("{} channelInactive()", logPrefix());
        if (!armed) {
            logger.debug("{} ConnectionWatchdog not armed", logPrefix());
            return;
        }

        channel = null;

        if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
            scheduleReconnect();
        } else {
            logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
        }

        super.channelInactive(ctx);
    }

    /**
     * Enable {@link ConnectionWatchdog} to listen for disconnected events.
     */
    void arm() {
        this.armed = true;
        setListenOnChannelInactive(true);
    }

    /**
     * Schedule reconnect if channel is not available/not active.
     */
    public void scheduleReconnect() {

        logger.debug("{} scheduleReconnect()", logPrefix());

        if (!isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }

        if (!isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }

        if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {

            attempts++;
            final int attempt = attempts;
            int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
            logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);

            this.reconnectScheduleTimeout = timer.newTimeout(it -> {

                reconnectScheduleTimeout = null;

                if (!isEventLoopGroupActive()) {
                    logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
                    return;
                }

                reconnectWorkers.submit(() -> {
                    ConnectionWatchdog.this.run(attempt);
                    return null;
                });
            }, timeout, TimeUnit.MILLISECONDS);

            // Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
            if (!reconnectSchedulerSync.get()) {
                reconnectScheduleTimeout = null;
            }
        } else {
            logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
        }
    }

通过以上源码可以得出如下初始化流程:

  1. 遍历Sentinel节点集合,找到一个可用的Sentinel节点,如果找不到就从Sentinel节点集合中去找下一个;如果都找不到直接抛出异常给客户端:
  2. 找到一个可用的Sentinel节点, 执行sentinelGetMasterAddrByName( masterName),通过主机名称找到对应主节点信息
  3. 根据ConnectionWatchdog重连的机制,进行故障转移 当发现 netty handler 中 channelInactive 触发 (与远程主机的连接意外断开时。当尝试连接远程主机但连接失败时。当关闭当前通道时。) 会重新去 拉去 Sentinel中对应的主节点信息

spring redis Sentinel Lettuce 实现

redis.clients.jedis.JedisSentinelPool

public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
    HostAndPort master = this.initSentinels(sentinels, masterName);
    this.initPool(master);
}



private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

    HostAndPort master = null;
    boolean sentinelAvailable = false;

    log.info("Trying to find master from available Sentinels...");

    for (String sentinel : sentinels) {
      final HostAndPort hap = HostAndPort.parseString(sentinel);

      log.fine("Connecting to Sentinel " + hap);

      Jedis jedis = null;
      try {
        jedis = new Jedis(hap.getHost(), hap.getPort());

        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

        // connected to sentinel...
        sentinelAvailable = true;

        if (masterAddr == null || masterAddr.size() != 2) {
          log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
              + ".");
          continue;
        }

        master = toHostAndPort(masterAddr);
        log.fine("Found Redis master at " + master);
        break;
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
            + ". Trying next one.");
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }

    if (master == null) {
      if (sentinelAvailable) {
        // can connect to sentinel, but master name seems to not
        // monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }

    log.info("Redis master running at " + master + ", starting Sentinel listeners...");

    for (String sentinel : sentinels) {
      final HostAndPort hap = HostAndPort.parseString(sentinel);
      MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  }

通过以上源码可以得出如下初始化流程:

  1. initSentinels用sentinels和masterName,也就是Sentinel节点的集合和Redis数据节点的名字来初始化,获取到Redis的主节点,同时呢,为每个Sentinel创建了一个监听的进程。
  2. MasterListener是一个线程,为每个Sentinel节点创建一个线程,来订阅channel:+switch-master,从而当主节点发生切换的时候,可以及时感知,修改Sentinel的客户端缓冲池JedisSentinelPool。

spring redis Sentinel redisson 实现


/**  
* Redisson构造方法  
* @param config for Redisson  
* @return Redisson instance  
*/  
protected Redisson(Config config) {  
    //赋值变量config  
    this.config = config;  
    //产生一份对于传入config的备份  
    Config configCopy = new Config(config);  
 
    //根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化  
    connectionManager = ConfigSupport.createConnectionManager(configCopy);  
    //连接池对象回收调度器  
    evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());  
    //Redisson的对象编码类  
    codecProvider = configCopy.getCodecProvider();  
    //Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider  
    resolverProvider = configCopy.getResolverProvider();  

在这里插入图片描述
跟进去看哨兵模式下的 ConnectionManager
org.redisson.connection.SentinelConnectionManager

public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) {
        super(config, id);
        
        if (cfg.getMasterName() == null) {
            throw new IllegalArgumentException("masterName parameter is not defined!");
        }
        if (cfg.getSentinelAddresses().isEmpty()) {
            throw new IllegalArgumentException("At least one sentinel node should be defined!");
        }

        this.config = create(cfg);
        this.sentinelPassword = cfg.getSentinelPassword();
        initTimer(this.config);

        this.natMapper = cfg.getNatMapper();

        this.sentinelResolver = resolverGroup.getResolver(getGroup().next());

        for (String address : cfg.getSentinelAddresses()) {
            RedisURI addr = new RedisURI(address);
            scheme = addr.getScheme();
            addr = applyNatMap(addr);
            if (NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null && !addr.getHost().equals("localhost")) {
                sentinelHosts.add(addr);
            }
        }

        checkAuth(cfg);

        Throwable lastException = null;
        for (String address : cfg.getSentinelAddresses()) {
            RedisURI addr = new RedisURI(address);
            addr = applyNatMap(addr);

            RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null);
            try {
                RedisConnection connection = null;
                try {
                    connection = client.connect();
                    if (!connection.isActive()) {
                        continue;
                    }
                } catch (RedisConnectionException e) {
                    continue;
                }
                
                InetSocketAddress master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
                if (master == null) {
                    throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
                }

                RedisURI masterHost = toURI(master.getHostString(), String.valueOf(master.getPort()));
                this.config.setMasterAddress(masterHost.toString());
                currentMaster.set(masterHost);
                log.info("master: {} added", masterHost);

                List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
                for (Map<String, String> map : sentinelSlaves) {
                    if (map.isEmpty()) {
                        continue;
                    }

                    String ip = map.get("ip");
                    String port = map.get("port");
                    String flags = map.getOrDefault("flags", "");

                    RedisURI host = toURI(ip, port);

                    this.config.addSlaveAddress(host.toString());
                    log.debug("slave {} state: {}", host, map);
                    log.info("slave: {} added", host);

                    if (flags.contains("s_down") || flags.contains("disconnected")) {
                        disconnectedSlaves.add(host);
                        log.warn("slave: {} is down", host);
                    }
                }
                
                List<Map<String, String>> sentinelSentinels = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
                List<RFuture<Void>> connectionFutures = new ArrayList<>(sentinelSentinels.size());
                for (Map<String, String> map : sentinelSentinels) {
                    if (map.isEmpty()) {
                        continue;
                    }

                    String ip = map.get("ip");
                    String port = map.get("port");

                    RedisURI sentinelAddr = toURI(ip, port);
                    RFuture<Void> future = registerSentinel(sentinelAddr, this.config, null);
                    connectionFutures.add(future);
                }

                RFuture<Void> f = registerSentinel(addr, this.config, null);
                connectionFutures.add(f);

                for (RFuture<Void> future : connectionFutures) {
                    future.awaitUninterruptibly(this.config.getConnectTimeout());
                }

                break;
            } catch (RedisConnectionException e) {
                stopThreads();
                throw e;
            } catch (Exception e) {
                lastException = e;
                log.warn(e.getMessage());
            } finally {
                client.shutdownAsync();
            }
        }

        if (cfg.isCheckSentinelsList()) {
            if (sentinels.isEmpty()) {
                stopThreads();
                throw new RedisConnectionException("SENTINEL SENTINELS command returns empty result! Set checkSentinelsList = false to avoid this check.", lastException);
            } else if (sentinels.size() < 2) {
                stopThreads();
                throw new RedisConnectionException("SENTINEL SENTINELS command returns less than 2 nodes! At least two sentinels should be defined in Redis configuration. Set checkSentinelsList = false to avoid this check.", lastException);
            }
        }
        
        if (currentMaster.get() == null) {
            stopThreads();
            throw new RedisConnectionException("Can't connect to servers!", lastException);
        }
        if (this.config.getReadMode() != ReadMode.MASTER && this.config.getSlaveAddresses().isEmpty()) {
            log.warn("ReadMode = " + this.config.getReadMode() + ", but slave nodes are not found!");
        }
        
        initSingleEntry();
        
        scheduleChangeCheck(cfg, null);
    }

启一个 schedule 每一秒调度一次检查 是否发生过 故障转移
scheduleChangeCheck

    private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {
        monitorFuture = group.schedule(new Runnable() {
            @Override
            public void run() {
                AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
                Iterator<RedisClient> iter = iterator;
                if (iter == null) {
                    // Shuffle the list so all clients don't prefer the same sentinel
                    List<RedisClient> clients = new ArrayList<>(sentinels.values());
                    Collections.shuffle(clients);
                    iter = clients.iterator();
                }
                checkState(cfg, iter, lastException);
            }
        }, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
    }
private void checkState(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {
        if (!iterator.hasNext()) {
            if (lastException.get() != null) {
                log.error("Can't update cluster state", lastException.get());
            }
            performSentinelDNSCheck(null);
            scheduleChangeCheck(cfg, null);
            return;
        }
        if (!getShutdownLatch().acquire()) {
            return;
        }

        RedisClient client = iterator.next();
        RedisURI addr = getIpAddr(client.getAddr());
        RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
        connectionFuture.onComplete((connection, e) -> {
            if (e != null) {
                lastException.set(e);
                getShutdownLatch().release();
                checkState(cfg, iterator, lastException);
                return;
            }

            updateState(cfg, connection, iterator);
        });

    }

    private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator) {
        AtomicInteger commands = new AtomicInteger(2);
        BiConsumer<Object, Throwable> commonListener = new BiConsumer<Object, Throwable>() {
            
            private final AtomicBoolean failed = new AtomicBoolean();
            
            @Override
            public void accept(Object t, Throwable u) {
                if (commands.decrementAndGet() == 0) {
                    getShutdownLatch().release();
                    if (failed.get()) {
                        scheduleChangeCheck(cfg, iterator);
                    } else {
                        scheduleChangeCheck(cfg, null);
                    }
                }
                if (u != null && failed.compareAndSet(false, true)) {
                    log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), u);
                    closeNodeConnection(connection);
                }
            }
        };
        
        RFuture<InetSocketAddress> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
        masterFuture.onComplete((master, e) -> {
            if (e != null) {
                return;
            }

            RedisURI current = currentMaster.get();
            RedisURI newMaster = toURI(master.getHostString(), String.valueOf(master.getPort()));
            if (!newMaster.equals(current)
                    && currentMaster.compareAndSet(current, newMaster)) {
                RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
                changeFuture.onComplete((res, ex) -> {
                    if (ex != null) {
                        currentMaster.compareAndSet(newMaster, current);
                    }
                });
            }
        });
        masterFuture.onComplete(commonListener);
        
        if (!config.checkSkipSlavesInit()) {
            RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
            commands.incrementAndGet();
            slavesFuture.onComplete((slavesMap, e) -> {
                if (e != null) {
                    return;
                }
                
                Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());
                List<RFuture<Void>> futures = new ArrayList<>();
                for (Map<String, String> map : slavesMap) {
                    if (map.isEmpty()) {
                        continue;
                    }
                    
                    String ip = map.get("ip");
                    String port = map.get("port");
                    String flags = map.getOrDefault("flags", "");
                    String masterHost = map.get("master-host");
                    String masterPort = map.get("master-port");

                    RedisURI slaveAddr = toURI(ip, port);
                    if (flags.contains("s_down") || flags.contains("disconnected")) {
                        slaveDown(slaveAddr);
                        continue;
                    }
                    if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {
                        continue;
                    }

                    currentSlaves.add(slaveAddr);
                    RFuture<Void> slaveFuture = addSlave(slaveAddr);
                    futures.add(slaveFuture);
                }
                
                CountableListener<Void> listener = new CountableListener<Void>() {
                    @Override
                    protected void onSuccess(Void value) {
                        MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
                        entry.getAllEntries().stream()
                                .map(e -> e.getClient().getAddr())
                                .map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
                                .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
                                .forEach(a -> slaveDown(a));
                    };
                };
                
                listener.setCounter(futures.size());
                for (RFuture<Void> f : futures) {
                    f.onComplete(listener);
                }
            });
            slavesFuture.onComplete(commonListener);
        }
                
        RFuture<List<Map<String, String>>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
        sentinelsFuture.onComplete((list, e) -> {
            if (e != null || list.isEmpty()) {
                return;
            }
            
            Set<RedisURI> newUris = list.stream().filter(m -> {
                String flags = m.getOrDefault("flags", "");
                if (!m.isEmpty() && !flags.contains("disconnected") && !flags.contains("s_down")) {
                    return true;
                }
                return false;
            }).map(m -> {
                String ip = m.get("ip");
                String port = m.get("port");
                return toURI(ip, port);
            }).collect(Collectors.toSet());
            
            InetSocketAddress addr = connection.getRedisClient().getAddr();
            RedisURI currentAddr = getIpAddr(addr);
            newUris.add(currentAddr);
            
            updateSentinels(newUris);
        });
        sentinelsFuture.onComplete(commonListener);
    }

通过以上源码可以得出如下初始化流程:

scheduleClusterChangeCheck定时检测集群节点状态
checkMasterNodesChange检测主节点状态

sentinel 查询 Master 的地址,如果发现 Master 变更了,则删除旧的 masterEntry,重建一个新的 masterEntry;

GitHub 加速计划 / sentine / Sentinel
22.25 K
7.98 K
下载
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
最近提交(Master分支:3 个月前 )
195150bc * fix issue 2485 which occur oom when using async servlet request. * optimize imports * 1. fix the same issue in the webmvc-v6x 2. improve based on review comments 2 个月前
b78b09d3 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐