SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)

在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。

  • 启动MySQL环境,并开启binlog
  • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
  • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
  • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

预先在model实体中准备

短信实体

@Data
@ApiModel(description = "短信实体")
public class MsmVo{
	
	@ApiModelProperty(value="phone")
	private String phone;

	@ApiModelProperty(value = "短信模板code")
	private  String templateCode;

	@ApiModelProperty(value="短信模板参数")
	private Map<String,Object> param;
}

排班实体

@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{
	
	@ApiModelProperty(value="可预约数")
	private Integer reserverdNumber

	@ApiModelProperty(value = "剩余预约数")
	private Integer availableNumber;

	@ApiModelProperty(value = "排班id")
	private String scheduleId;

	@ApiModelProperty(value = "短信实体")
	private MsmVo msmVo;
}

一、安装RabbitMQ

docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement

访问:http://IP:15672
在这里插入图片描述

二、rabbit-util模块封装

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
</dependency>

创建一个RabbitService用来发送消息

@Service
public class RabbitService{
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	//发送消息
	public boolean sendMessage(String exchange,String routingKey,Object message){
		rabbitTemplate.convertAndSend(exchange,routingKey,message);
		return true;
	}
}

创建mq消息转化器

@Configuration
public class MQConfig{
	
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}

添加常量配置类

public class MqConst{
	
	//预约下单
	public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
	public static final String ROUTING_ORDER = "order";
	//队列
	public static final String QUEUE_ORDER = "queue.order";

	//短信
	public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
	public static final String ROUTING_MSM_ITEM = "msm.item";
	pulib static final String Queue_MSM_item = "queue.msm.item";
}

三、短信模块service-sms

将二中的模块依赖引入

<dependency>
	<groupId>com.michael</groupId>
	<artifactId>rabbit_util</artifactId>
	<version>xxx</version>
</dependency>

配置文件application.properties

spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest

Service发送消息

public interface MsmService{
	//发送手机验证码
	boolean send(String phone,String code);
	
	//MQ使用发送短信的接口
	boolean send(MsmVo msmVo);
}
@Service
public class MsmServiceImpl implements MsmService{

	@Override
	public boolean send(String phone,String code){
		//判断手机号是否为空
		if(StringUtils.isEmpty(phone)){
			return false;
		}

		//整合阿里云相关参数,短信服务
		DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,
			ConstantPropertiesUtils.ACCESS_KEY_ID,
			ConstantPropertiesUtils.SECRET
		);
		IAcsClient client = new DefaultAcsClient(profile);
		CommonRequest request = new CommonRequest();

		request.setMethod(MethodType.POST);
		request.setDomain("dysmsapi.aliyuncs.com");
		request.setVersion("2018-08-08");
		request.setAction("SendSms");

		//手机号
		request.putQueryParameter("PhoneNumbers",phone);
		//签名名称
		request.putQueryParameter("SignName","我的网站");
		//模板
		request.putQueryParameter("TemplateCode","SMS_180051135");
		//验证码使用json格式{"code":"123456"}
		Map<String,Object> param = new HashMap();
		param.put("code",code);
		request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));
		
		//调用方法进行短信发送
		try{
			CommonResponse response = client.getCommonResponse(request);
			System.out.println(response.getData());
			return response.getHttpResponse().isSuccess();
		}catch(ServerException e){
			e.printStackTrace();
		}catch(ClientException e){
			e.printStackTrace();
		}
		return false;
	}
	
	@Override
	public boolean send(MsmVo msmVo){
		
		if(!StringUtils.isEmpty(msmVO.getPhone())){
			String code = (String)msmVo.getParam().get("code");
			boolean isSend = this.send(msmVo.getPhone(),code);
			return isSend;
		}
		return false;
	}
}

创建mq监控器

@Component
public class MsmReceiver{
	
	@Autowired
	private MsmService msmService;

	//监听
	@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),
		exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
		key = {MqConst.ROUTING_MSM_ITEM}
	))
	public void send(MsmVo msmVo,Message message,Channel channel){
		msmService.ssend(msmVo);
	}
}

四、业务类

生成订单之后,发送短信并更新数量

①、业务模块中引入依赖

rabbit-util

②、添加配置

spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

③、service接口以及实现类

@Override
public void update(Schedule schedule){
	schedule.setUpdata(new Date());
	scheduleRepository.save(schedule);
}

④、receiver包中创建MQ监听器

@Component
public class HospitalReceiver{
	
	@Autowired
	private ScheduleService scheduleService;

	@Autowired
	private RabbitService rabbitService;

	//监听
	@RabbitListener(
		bindings = @QueueBinding(
			value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),
			exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
			key = {MqConst.ROUTING_ORDER}
		)
	)
	public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{
		//下单成功,更新数据
		Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
		schedule.setReservedNumber(orderMqVo.getReservedNumber());
		schedule.setAvailableNumber(orderMqVo.getAvailableNumber);
		scheduleService.update(schedule);

		//发送短信
		MsmVo msmVo = orderMqVo.getMsmVo();
		if(null != msmVo){
			rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);
		}
	}
}
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐