分析和设计

如果需要动态数据源,实现一个DataSource代理,在获取Connection时做手脚。原理参考:Mybatis.调用链路(源码:SqlSession-Executor-Transaction-DataSource-Connection)_闲猫的博客-CSDN博客

说明:一个请求进来,数据源对象不能变的,获取的Connection是每次都需要获取的。搞一个数据源代理对象,然后在getConnection方法返回不同数据源Connection。

AbstractRoutingDataSource

Spring提供的版本

 追getConnection方法:

看看protected DataSource determineTargetDataSource()

 追下去看看resolvedDataSources和resolvedDefaultDataSource 怎么来的

是这两个方法设置的,bing go,只需要通过这两个方法设置即可,注意Map的线程安全哦

动态数据源思路

1. 一个请求过来,需要Connection,根据DefaultDataSourceProxy来getConnection

2. 根据不同规则解析出需要请求的DsKey -- DSProperties

3. 根据DsKey获取DS对象,如果获取到则调用Ds.getConnection返回去即可

4. 如果没有获取到

 - 根据DsProperties构建一个DS对象

 - 将DS对象放在缓存内,让DefaultDataSourceProxy管理即可,注意线程安全

 - 继续第三步。

实现:

可以借助AbstractRoutingDataSource,也可以自定义。

借助AbstractRoutingDataSource:

1. 集成AbstractRoutingDataSource类

2. 加载默认数据源,设置resolvedDefaultDataSource

3. resolvedDataSources设置为空CurrentHashMap即可,这里是多线程,不是分布式锁

4. DsKey不是默认租户的,则获取DSProperties,然后构建DS对象,put进去CurrentHashmap中。 然后返回DS对象

5. AbstractRoutingDataSource根据determineCurrentLookupKey的返回值作为key从CurrentHashMap中get

6. 获取到的DS.getConnection对象return即可。

  动态加载和切换DS Demo版本

application.yml

参数完全跟单数据源一致。

mutidatasource:
    default-data-source:
        name: ****
        url: jdbc:mysql://****:3306/demo1?museUnicode=true&characterEncoding=utf-8
        username: root
        password: ****
        # 使用druid数据源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
        filters: stat
        maxActive: 20
        initialSize: 1
        maxWait: 60000
        minIdle: 1
        timeBetweenEvictionRunsMillis: 60000
        minEvictableIdleTimeMillis: 300000
        validationQuery: select 'x'
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        poolPreparedStatements: true
        maxOpenPreparedStatements: 20

DataSourceProxyProperties

import lombok.Data;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "mutidatasource")
public class DataSourceProxyProperties  implements InitializingBean{
	
	/**
	 * 默认数据源
	 */
	@Autowired
	private DataSourceProperties defaultDataSource;

	@Override
	public void afterPropertiesSet() throws Exception {
		System.out.println("===>"+defaultDataSource.getUrl());
	}
}

DataSourceProxyConfiguration

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnBean(DataSourceProxyProperties.class)
public class DataSourceProxyConfiguration {
	
	@Autowired
	DataSourceProxyProperties dataSourceProxyProperties;
	
	@Bean
	public DataSource getDataSource(){
		DataSource defaultDatasource = dataSourceProxyProperties.getDefaultDataSource()
				.initializeDataSourceBuilder().build();
		
		return new DefaultDataSourceProxy(defaultDatasource);
	}

}

DefaultDataSourceProxy

参考:org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource


import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.jdbc.datasource.AbstractDataSource;

public class DefaultDataSourceProxy extends AbstractDataSource  {


	@Autowired
	DataSourceProxyProperties dataSourceProxyProperties;
	
	private Map<Long,String> map = new HashMap<Long, String>();

	private DataSource defaultDatasource;

	private Map<Long, DataSource> dataSourceCache = new ConcurrentHashMap<Long, DataSource>();

	public DefaultDataSourceProxy(DataSource defaultDatasource) {
		this.defaultDatasource = defaultDatasource;
		map.put(2l, "jdbc:mysql://****:3306/demo2?museUnicode=true&characterEncoding=utf-8");
		map.put(3l, "jdbc:mysql://****:3306/demo3?museUnicode=true&characterEncoding=utf-8");
	}

	@Override
	public Connection getConnection() throws SQLException {
		return determineTargetDataSource().getConnection();
	}

	@Override
	public Connection getConnection(String username, String password) throws SQLException {
		return determineTargetDataSource().getConnection(username, password);
	}

	protected DataSource determineTargetDataSource() {

		// 获取租户ID - 0 就是虚拟的默认租户
		Long tenantId = 0 // 获取租户逻辑;
		if(tenantId == null || tenantId.intValue() == 0){
			return this.defaultDatasource;
		}

		// 获取数据源
		DataSource dataSource = this.dataSourceCache.get(tenantId);
		if(dataSource == null){
			// 加载
			DataSourceProperties properties = dataSourceProxyProperties.getDefaultDataSource();
			
			// 获取配置,可以从redis 获取缓存数据
			properties.setUrl(map.get(tenantId));

			// 创建数据源对象
			dataSource = properties.initializeDataSourceBuilder().build();
			this.dataSourceCache.put(tenantId, dataSource);
		}
		
		return dataSource;
	}

	@Override
	@SuppressWarnings("unchecked")
	public <T> T unwrap(Class<T> iface) throws SQLException {
		if (iface.isInstance(this)) {
			return (T) this;
		}
		return determineTargetDataSource().unwrap(iface);
	}

	@Override
	public boolean isWrapperFor(Class<?> iface) throws SQLException {
		return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
	}

}

测试验证

http://localhost:9092/test1?AccessTenant=3

0 默认

1. 没有

2/3 分别对应2/3 的DB配置

优化点

在加点安全控制 和 autoconfig 就算封装完成

两种接口提供, 1. 自动 懒加载;   2. 提供API手动加载, 并迁移Flyway(提供单独接口初始化)


end

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐