不使用binlog,canal,kafka等,只用java+mybatis拦截器来实现项目中的异步双写主从数据库,代码逻辑全整理
项目中因为要迁库,所以我要在原项目中接入我的双写逻辑,确保新旧两个库都有数据写入,假如新库写入失败,旧库数据也能写入,这就确保了重要数据不能丢失。
一开始考虑的方案是使用数据同步工具,像是canal或是DTS等,但是环境这块卡的比较死,没有其他花里胡哨的工具,只能纯靠java改写代码来实现了,期间排了不少坑,这里做个人踩坑记录
实现效果,批量双写全部报200,自测下来还算成功
首先列一下要实现类的目录,因为我们门户,任务,和开放接口都是单独一套springboot然后共用common包的形式,所以这套package每个springboot都要引入
这里整理的比较匆忙,但重点其实是这几个类
aop中TargetDataSource之前说过了,还包括DataSourceConfig,DataSourceUtil和DynamicDataSource的配置,可以看我之前写的这一篇:mybatis动态数据源配置(附自定义注解实现数据源切换)
另外一个注解没有用上,是做多数据源事务切换的作用,想看代码和使用场景的,完全参考如下:配置多个数据源的事务
现在开始讲重点,为了追踪mysql中实时变化的数据,就要用到mybatis拦截器,这是我之前参考的博客,但不能完全适用于我的业务场景:用mybatis 拦截器 为insert update操作填充字段
说一下我的写法
进入AutoFillInterceptor类,也就是我的拦截器
因为我要引入我的DoubleWriteService做我的双写逻辑,这里会发生第一个坑,因为service加载速度是在拦截器后面的,直接Autowired启动会报错,这里用到了懒加载机制,确保启动顺利
之后进入拦截器的时候先判空,再去SpringUtils取bean,就能调用了
附上工具类SpringUtils
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
if (SpringUtils.applicationContext == null) {
SpringUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
//根据name
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
//根据类型
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
接着往下走方法,这里遇到的第一个坑就是无限循环调用导致的stackoverflow,看过前面那篇博客的都知道,这个mybatis拦截器的作用其实就是去监控各类dao增改的操作,如果走到我的dao里,就会导致再次进入拦截器,然后再进入我的dao,循环往复导致了爆栈,所以我的操作很简单,就是取MappedStatement中的id,也就是完整的包路径+Dao类名,如果是就直接返回
接下来就是处理我的第一个arg,这个args是如何产生的呢,开头我们声明的就是args,第一个就肯定是MappedStatement了,第二个Object是我们的传参,可以是实体类或是map,这个之后会讲
取MappedStatement的用处其实有很多,自己debug的话可以看到许多mybatis分装的参数,我这里就取id就够了,一般常用的就是取sql字符串了,这里注释的就是取sql并且替换占位符?的逻辑。我看了好几篇博客,都写的一样:SpringBoot通过MyBatis拦截器打印完整SQL语句(无问号) 但我自己跑的时候却发现获取不到参数,所以我的双写不用直接sql的形式做
这里我就用到了第二个args,也就是增改他可以传entity类,可以传map,我这里做了类判断和新增修改判断,然后分别进入我相应的方法里。我的service入口传参简单明了,就是传表名和map/entity的形式
顺便说一下枚举的作用,因为我拦截器只能获得sqlID,为了获取表名tableName,这里枚举其实用到了包含,其实就是如下效果,假如表名交lzq_test1,包路径名叫com.lzq.common.dao.lzqTest1Dao
拦截器获取的sqlId一般都叫com.lzq.common.dao.lzqTest1Dao.insert,所以这里用contains去获取
枚举讲完了,之后就进入我的service做主要逻辑了
可以看到insert和update各做了map和obj类的传参,确保都能顺利执行写入,先讲一下各个方法的作用:
- void dealWithSql(String sql); 直接传sql执行sql,因为拦截器那一步获取不到sql,这个方法没用上
- void update(String tableName, Object o); 更新逻辑入口,支持obj和map
- void insert(String tableName, Object o);新增逻辑入口,支持obj和map
- String processUpdateBinLog(Map<String, Object> map,String tableName,String isUpdate); 切换数据源执行的入口,因为他是要上注解,同类调用会让注解失效,所以要设成public外部调取
- List getRedisFailList(String tableName,String isUpdate); 我会把dao执行失败的传参放入redis里,这里统一获取失败后的传参全是jsonstring格式
- void handleTransactionals(List tableNameList, List objList,List isUpdateList); 处理事务的方法,表名列表,对象列表和新增修改列表,一一对应,统一成功或失败,这里没用上
进入实现类,所有依赖如图所示,事务的注掉了因为发现不好用,同类调用避免依赖问题再次加上@Lazy
线程池根据应用自己配
@Data
@Configuration
@ConfigurationProperties(prefix = "executor")
@EnableConfigurationProperties(ExecutorConfig.class)
public class ExecutorConfig {
private int corePoolSize = 5;//2
private int maxPoolSize = 10;//4
private int keepAliveSeconds = 60;
private int queueCapacity = 200;
@Bean
public TraceThreadPoolExecutor traceThreadPoolExecutor() {
return new TraceThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity, rejectedExecutionHandler());
}
RejectedExecutionHandler rejectedExecutionHandler() {
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}
双写开关根据配置文件实现
新增/更新方法如下
再往下走,根据需要自己配
public <V> String processUpdateBinLogData(String tableName, Map<String, Object> param,String isUpdate) { //, int threadCount
String result;
if (null == tableName) {
return null;
}
result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);
/** 请求数据超过二十,才使用多线程,否则就是单线程 */
// int threadCount = 2;//requestList.size() > 20 ? 20 : 1;
// try {
// result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);
// if (threadCount > 1) {
// /* 多线程处理 */
// result = procUpdateBinLogByMultiThread(enumByTable,param, threadCount);
// } else {
// /* 直接处理 */
// result = processUpdateBinLog(param, enumByTable);
// }
// } catch (Exception e) {
//
// }
return result;
}
private <V> String procUpdateBinLogByMultiThread(String tableName,Map<String, Object> param,String isUpdate) {
//List<String> result = new ArrayList<>();
if (!param.isEmpty()) {
processUpdateBinLogDataByThread(param, tableName,isUpdate);
}
// if (!param.isEmpty() && null != threadCount) {
// List<List<V>> splitList = ListUtils.averageSplit(requestList, threadCount);
// if (CollectionUtils.isNotEmpty(splitList)){
// List<Future<List<String>>> futureList = new ArrayList<>();
// for (List<V> r : splitList) {
// Future<List<String>> future = processUpdateBinLogDataByThread(param, enumByTable);
// if (null != future){
// futureList.add(future);
// }
// }
// /** 获取多线程返回结果 */
// for (Future<List<String>> r : futureList) {
// result.addAll(r.get());
// }
// }
// result.addAll(future.get());
// }
return null;
}
这里用Future类和线程池去走异步
public <T> Future<String> processUpdateBinLogDataByThread(Map<String, Object> param, String tableName,String isUpdate) {
//Future<List<String>> result = null;
Future<String> result = null;
//if (CollectionUtils.isNotEmpty(requestList)) {
if (!param.isEmpty()) {
result = threadPoolExecutor.submit(new TraceAsyncCallableTask<String>() {
private final Map<String, Object> paramIn = param;
private final String tableNameIn = tableName;
private final String isUpdateIn = isUpdate;
private Object res;
@Override
public String getMethod() {
return "processUpdateBinLogDataByThread";
}
@Override
public String[] getParams() {
String[] paraArr = new String[1];
return paraArr;
}
@Override
public String getService() {
return "DoubleWriteService";
}
@Override
public Object getRes() {
return res;
}
@Override
public String call() {
//同类调用才能生效注解
return doubleWriteService.processUpdateBinLog(paramIn, tableNameIn,isUpdateIn);
}
});
}
return result;
}
注意这里,用到了切换数据源的注解
@Override
@TargetDataSource(connName = "dbTwo") //异步重新生效不注解,换成同类调用
public <V> String processUpdateBinLog(Map<String, Object> map, String tableName,String isUpdate) {
//List<String> result = new ArrayList<>();
//String result = "fail";
int isSucc = 0;
Map<String, Object> queryMap = new HashMap<>();
//String str;
if (!map.isEmpty()) {
try {
//报错测试
//int i = 1;i = i /0;
map = humpToUnderline(map);
queryMap.put("tableName", tableName);
queryMap.put("fieldsMap", map);
queryMap.put("queryMap", map);
if (DoubleWriteConstants.UPDATE.equals(isUpdate)){
isSucc = dataMigrationDao.updateTableListDynamic(queryMap);
//更新直接去map的主键
logger.info("{}表双写{}成功,主键->{}", tableName,isUpdate, map.get("id"));
}else {
isSucc = dataMigrationDao.insertTableListDynamic(queryMap);
//新增取mybatis返回主键
logger.info("{}表双写{}成功,主键->{}", tableName,isUpdate, queryMap.get("id"));
}
}catch (Exception e){
String jsonStr = JSONObject.toJSONString(map);
//key格式:前缀+新增/更新+表名
cacheClient.lpush( DoubleWriteConstants.REDIS_KEY_PREFIX + isUpdate + tableName,jsonStr);
logger.error("{}表双写{}错误,json->{},异常->{}" ,tableName,isUpdate,jsonStr, e.getMessage());
}
}
return null;
}
驼峰转下划线逻辑,这里用hutool的逻辑复制过来的
/**
* 把 map 中的 key 由驼峰命名转为下划线,使用LinkedHashMap确保字段顺序一致性
*/
private HashMap<String, Object> humpToUnderline(Map<String, Object> map) {
//使用LinkedHashMap确保字段顺序一致性
HashMap<String, Object> transitionMap = new LinkedHashMap<>(16);
map.forEach((k, v) -> transitionMap.put(toUnderlineCase(k), v));
return transitionMap;
}
public static String toUnderlineCase(CharSequence str) {
return toSymbolCase(str, '_');
}
public static String toSymbolCase(CharSequence str, char symbol) {
if (str == null) {
return null;
} else {
int length = str.length();
StringBuilder sb = new StringBuilder();
for(int i = 0; i < length; ++i) {
char c = str.charAt(i);
Character preChar = i > 0 ? str.charAt(i - 1) : null;
if (Character.isUpperCase(c)) {
Character nextChar = i < str.length() - 1 ? str.charAt(i + 1) : null;
if (null != preChar && Character.isUpperCase(preChar)) {
sb.append(c);
} else if (null != nextChar && Character.isUpperCase(nextChar)) {
if (null != preChar && symbol != preChar) {
sb.append(symbol);
}
sb.append(c);
} else {
if (null != preChar && symbol != preChar) {
sb.append(symbol);
}
sb.append(Character.toLowerCase(c));
}
} else {
if (sb.length() > 0 && Character.isUpperCase(sb.charAt(sb.length() - 1)) && symbol != c) {
sb.append(symbol);
}
sb.append(c);
}
}
return sb.toString();
}
}
最后是beanToMap逻辑,注意有时候会有转化失败问题,会影响到入mybatis的传参,自己处理
public static Map<String,Object> beanToMap(Object object){
Map<String,Object> map = null;
try {
map = new HashMap<String, Object>();
BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor property : propertyDescriptors) {
String key = property.getName();
if (key.compareToIgnoreCase("class") == 0) {
continue;
}
Method getter = property.getReadMethod();
Object value = getter!=null ? getter.invoke(object) : null;
map.put(key, value);
}
//key 可能会把自己的class 和hashcode编进去,直接去掉
map.remove("class");
} catch (Exception e) {
e.printStackTrace();
return new HashMap<>();
}
Set<String> set = map.keySet();
Iterator<String> it = set.iterator();
while (it.hasNext()){
String key = it.next();
if (map.get(key)==null || map.get(key)==""){
map.remove(key);
set = map.keySet();
it = set.iterator();
}
}
if ("false".equals(map.get("emtpy"))){
logger.error("{}双写前obj转化失败",object);
}
return map;
}
最后讲一下dao,上篇博客也整理过了,就是动态新增修改
作为消费端使用很方便,只要传表名和map就完全能用,这里注意insert要加上useGeneratedKeys = “true” keyProperty = "id"来获取自增主键,这会自动注入到当前map里
<insert id="insertTableListDynamic" parameterType="java.util.HashMap" useGeneratedKeys = "true" keyProperty = "id">
insert into
${map.tableName}
(
<foreach collection="map.fieldsMap" index="key" item="value"
separator=",">
`${key}`
</foreach>
)
values
(
<foreach collection="map.queryMap" index="key" item="value"
separator=",">
#{value}
</foreach>
)
</insert>
<update id="updateTableListDynamic" parameterType="java.util.HashMap">
update ${map.tableName}
<trim prefix="set" suffixOverrides=",">
<foreach collection="map.queryMap.entrySet()" item="value" index="key" separator=",">
<choose>
<when test="key != 'operation_date' and key != 'out_date' and key != 'return_date' and key != 'sign_date'
and key != 'audit_date' and key != 'complaint_date' and key != 'close_date' and key != 'create_date'
and key != 'modify_date' and key != 'handle_date' and key != 'create_time' and key != 'update_time'
and key != 'update_date' ">
<if test="value != null and value !=''">
${key}= #{value}
</if>
</when>
<otherwise>
<if test="value!= null ">
${key}= #{value}
</if>
</otherwise>
</choose>
</foreach>
</trim>
where id = #{map.queryMap.id}
</update>
更多推荐
所有评论(0)