redisson是一个用于连接redis的java客户端工作,相对于jedis,是一个采用异步模型,大量使用netty promise编程的客户端框架。

0     代码示例
//创建配置信息
        Config config = new Config();
        config.useSingleServer().setAddress("localhost:6379").setConnectionPoolSize(5);

        Redisson redisson = Redisson.create(config);

        //测试 list
        List<String> strList = redisson.getList("strList");
        strList.clear(); //清除所有数据
        strList.add("测试中文1");
        strList.add("test2");

        redisson.shutdown();

从代码上来看,其基本的使用非常简单,在最后的使用当中。除与redisson打交道之外(获取各种数据结构),完全感觉不到与redis的信息连接。甚至于返回于上层直接不需要考虑下层的实现,一切均由redisson进行了封装。

1     创建初始连接结构

1.1     配置信息config

config信息由4种不同的连接配置和其它属性进行组合,如下所示:

private SentinelServersConfig sentinelServersConfig;//基本哨兵server的配置
    private MasterSlaveServersConfig masterSlaveServersConfig;//基于主从的配置
    private SingleServerConfig singleServerConfig;//基于单台连接的配置
    private ClusterServersConfig clusterServersConfig;//基于集群的配置

    //这里即在整个命令处理过程中,总共的线程数,可以理解为所有的底层运行都是由这些线程来运行(而不是用户线程)
    private int threads = 0; // 0 = current_processors_amount * 2 

    private RedissonCodec codec;//用于实现对象编码的实现(从object->byte[],或byte[]->object) ,默认为jackson

针对上面的4种config,又是基于同一个继承体系,如下所示:

如上所示,在baseConfig中定义了基本的属性,然后除单机器配置之外,其它的都基于一个主从的配置,类似多节点地址的配置信息。在后面的实现中,我们将看到更多这种设计,即通过继承来完成多种不同场景的实现。

然后,通过 Redisson.create来完成一个类似于client的创建,可以认为创建的对象是一个单例。在后面的调用中,都使用此实例即可。其内部结构如下所示:

public class Redisson implements RedissonClient {

    private final ConnectionManager connectionManager;
    private final Config config;
}

其中config即刚才我们定义的配置对象,而connectionManager即可以理解为通过刚才的不同类型的连接配置实现了不同的连接管理器,与配置信息一致,相对象的连接管理器也是按照继承体系来实现的,如下所示:

上面的实现与配置一致,不过这里采用了主从管理的基本实现,其它实现通过override相应的方法来提供不同的子实现。其中单机实现,可以理解为没有slaver的连接处理。连接管理器,实现了连接信息管理的机制,并且实现了如何选择合适的连接对象来执行不同的操作的语义。即向外封装了connection的调用,入口都通过connectionManager来处理。正因为redisson向外提供的是数据结构语义,因此也不需要暴露connection信息,因此这种实现是值得的。

connectionManager的定义如下所示:

//获取相应值,一个封装调用
  <V> V get(Future<V> future);
/** --------------------- 基本同步的操作实现,不同的调用方法 最终都返回具体的值 start -------------- */
  <V, R> R read(String key, SyncOperation<V, R> operation);
  <V, R> R read(SyncOperation<V, R> operation);
  <V, R> R write(String key, SyncOperation<V, R> operation);
  <V, R> R write(SyncOperation<V, R> operation);
  <V, R> R write(String key, AsyncOperation<V, R> asyncOperation);
  <V, R> R write(AsyncOperation<V, R> asyncOperation);
  <V, T> Future<T> writeAllAsync(AsyncOperation<V, T> asyncOperation);
  <V, T> T read(String key, AsyncOperation<V, T> asyncOperation);
  <V, T> T read(AsyncOperation<V, T> asyncOperation);
/** --------------------- 基本同步的操作实现,不同的调用方法 最终都返回具体的值 end -------------- */
/** --------------------- 基本异步的操作实现,返回future 而业务自行决定如何处理 start -------------- */
  <V, T> Future<T> readAsync(String key, AsyncOperation<V, T> asyncOperation);
  <V, T> Future<T> readAsync(AsyncOperation<V, T> asyncOperation);
  <V, T> Future<T> writeAsync(String key, AsyncOperation<V, T> asyncOperation);
  <V, T> Future<T> writeAsync(AsyncOperation<V, T> asyncOperation);
/** --------------------- 基本异步的操作实现,返回future 而业务自行决定如何处理 end -------------- */
/** 创建一个新的连接对象(用于读操作),即实现连接池语义,写操作由子类具体实现 */
  <K, V> RedisConnection<K, V> connectionReadOp(int slot);
2     创建数据连接

创建连接根据当前操作是读还是写来进行,因为如果是读操作,可以通过主从由从机器进行连接。这里仅考虑创建写操作的连接,其在主机器上进行。因此,我们直接查看相应的实现。

protected <K, V> RedisConnection<K, V> connectionWriteOp(int slot) {
        return getEntry(slot).connectionWriteOp();
    }

转交由entry,即masterSlaveEntry来处理,entry可以理解为一个具体的表示单个机器的连接配置对象。

public <K, V> RedisConnection<K, V> connectionWriteOp() {
//这里通过计数信号量来简单进行判定连接数是否已达上线,如果已达上线,则强制阻塞当前线程
  acquireMasterConnection();
//从已创建好的队列中获取,如未创建,则进行创建
  RedisConnection<K, V> conn = masterEntry.getConnections().poll();
  if (conn != null) {
      return conn;
  }
//具体的创建过程
      conn = masterEntry.getClient().connect(codec);
......
      return conn;
    }

从具体的connect方法来看,即创建一个 RedisAsyncConnection 对象,通过注册入连接远程端口的channel中,通过相应的active事件完成对channel的绑定。后续的操作均通过对channel发送事件来完成。具体的代码如下所示:

connect = bootstrap.handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog, handler, connection);
                    }
                }).connect();
            }
            connect.sync();

从上可以看出,在连接的时候指定了3个处理器,即IN OUT的处理器

watchlog负责处理连接断开的时候是否进行重连

handler负责处理最终进行命令的底层发送和接收

connection则负责进行命令的转发,包括各项对redis命令的封装调用。可以理解为handler工作在网络层,而connection则工作在应用层。

3     发送调用命令

3.1     发送的准备操作

如在redissonList中的size操作,即通过调用connection的dispatch命令来操作。如下所示:

public synchronized <T> Promise<T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args)

type表示指定的指令,如LLEN

output表示在数据返回之后,由commandHandler将相应的结果反序列化入output中

args即表示指令具体的参数

因为在output数据返回之后,需要通知到外转的信息,因此需要有一个相对应的promise,其被作为listner加入到execute(connection)中的结果回调当中。

3.2     数据命令发送

相应的代码在connection中的dispatch中,主要由如下代码构成:

Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null, promise);
        queue.put(cmd);
        channel.writeAndFlush(cmd);

然后,这里需要由commandHander来负责具体的数据发送操作(因为,它是注册了channel的相应处理器), 具体代码如下所示:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
  ByteBuf buf = ctx.alloc().heapBuffer();
  cmd.encode(buf);
  ctx.write(buf, promise);
}

3.3     数据结果的解析

接下来的操作,由commandHandler来接收相应的数据,如下所示:

//由channelRead转向decode
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
    while (true) {
  Command<K, V, ?> cmd = queue.peek();
  if (cmd == null
    || !rsm.decode(buffer, cmd.getOutput())) {
      break;
  }
  cmd = queue.take();
  cmd.complete();
    }
}

这里,因为在发送命令时,将每个命令相应的顺序入queue,因此在结果时,也是按照queue相对应的结果对应起来。在这里,直接就认为最先的结果对应于最先的cmd。通过rsm.decode,负责将相应buffer的结果反序列化output中,最后调用cmd.complete来通知进一步的promise。如下所示:

public void complete() {
  completeAmount--;
  if (completeAmount == 0) {
    Object res = output.get();
    ...... 略去错误处理
      promise.setSuccess((T)res);
    }
  }
}

3.4     数据结构的反向处理

在上一步的处理中,当相应cmd的complete的处理中,之前传递的promise被设置了result,因此在其上绑定的promise也会进一步设置数据值。如果在最上面,我们使用如下的代码时:

mainPromise.awaitUninterruptibly().getNow()

就会最终获取到相应的结果,其值最终体现在业务代码中。

4     与netty的整合

在上面,我们可以看到,整个redisson和netty的协议进行了完全的整合,包括handler的介入,eventGroup的使用等,都使用了全套的netty体系。然后,由于netty提供了promise功能,这里也大量使用了相应的异步模型来进行数据处理。

5     总结

在redisson中,各个部分均采用了最新的一些技术栈,包括java 5线程语义,Promise编程语义,在技术的学习上有很高的学习意义。相比jedis,其支持的特性并不是很高,但对于日常的使用还是没有问题的。其对集合的封装,编解码的处理,都达到了一个开箱即用的目的。相比jedis,仅完成了一个基本的redis网络实现,可以理解为redisson是一个完整的框架,而jedis即完成了语言层的适配。其次,redisson在设计模式,以及编码上,都有完整的测试示例,代码可读性也非常好,很值得进行源码级学习。

如果在项目中已经使用了netty,那么如果需要集成redis,那么使用redisson是最好的选择了,都不需要另外增加依赖信息。

GitHub 加速计划 / re / redisson
23.06 K
5.31 K
下载
Redisson - Easy Redis Java client with features of In-Memory Data Grid. Sync/Async/RxJava/Reactive API. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache ...
最近提交(Master分支:2 个月前 )
15bd94ed - 2 个月前
d220f2a8 - 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐