springcloud(nacos)集成Kafka
nacos
Nacos是由阿里巴巴开源的服务治理中间件,集成了动态服务发现、配置管理和服务元数据管理功能,广泛应用于微服务架构中,简化服务治理过程。
项目地址:https://gitcode.com/gh_mirrors/na/nacos
免费下载资源
·
官网:http://kafka.apache.org/
1.kafka安装前的配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
Docker安装zookeeper
# 查看本地镜像
docker images
# 检索ZooKeeper 镜像
docker search zookeeper
# 拉取ZooKeeper镜像最新版本
docker pull zookeeper:latest
# 我使用的版本
docker pull zookeeper:3.4.14
创建容器
docker run -d \
--privileged=true \
--name zookeeper \
--restart=always \
-p 2181:2181 zookeeper:3.4.14
参数说明:
-e TZ="Asia/Shanghai" # 指定上海时区
-d # 表示在一直在后台运行容器
-p 2181:2181 # 对端口进行映射,将本地2181端口映射到容器内部的2181端口
--name # 设置创建的容器名称
-v # 将本地目录(文件)挂载到容器指定目录;
--restart always #始终重新启动zookeeper,看需求设置不设置自启动
Docker安装kafka
1.拉取镜像
docker pull wurstmeister/kafka:2.12-2.3.1
2.创建容器
docker run -d \
--privileged=true \
--name kafka \
--restart=always \
--env KAFKA_ADVERTISED_HOST_NAME=你自己的端口号 \
--env KAFKA_ZOOKEEPER_CONNECT=你自己的端口号:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你自己的端口号:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
创建一个关于kafka的项目
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
导入yml的文件
# Tomcat
server:
port: 9004
# Spring
spring:
application:
# 应用名称
name: bxh-kafka
profiles:
# 环境配置
active: dev
main:
# 允许使用循环引用
allow-circular-references: true
# 允许定义相同的bean对象 去覆盖原有的
allow-bean-definition-overriding: true
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 自己的端口号:8848
config:
# 配置中心地址
server-addr: 自己的端口号:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
nacos中新加配置( 哪一个微服务要用kafka就在那一个微服务nacos上面加上这个配置文件 )
spring:
kafka:
topic: -1
bootstrap-servers: 自己的端口号:9092
producer:
retries: 10
acks: all
linger.ms: 1
batch.size: 16384
buffer.memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
auto-commit-interval: 1S
auto-offset-reset: earliest
enable-auto-commit: true
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者发送消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 类说明
*
* @author zhuwenqiang
* @date 2023/3/1
*/
public class ProducerQuickStart {
public static void main(String[] args) {
// 1; 构建 Properties 对象 存放 kafka 生产者配置信息
Properties properties = new Properties();
// 设置 kafka 连接地址
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "106.14.71.185:9092");
// 设置 键值 序列化
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 2; 构建 Kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 3: 发送消息
// 封装发送消息对象
ProducerRecord<String, String> record = new ProducerRecord<String, String>("bwie-topic", "10001", "hello kafka~~");
kafkaProducer.send(record);
// 4; 关系消息通道
kafkaProducer.close();
}
}
消费者接收消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 类说明
*
* @author zhuwenqiang
* @date 2023/3/1
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
// 构建kafka消费者配置
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "106.14.71.185:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
// 构建 Kafka 消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
kafkaConsumer.subscribe(Collections.singleton("bwie-topic"));
// 获取消息
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
}
}
}
}
总结
-
生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)同一个分组
-
生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)消费者设置不同的分组
kafka发送消息 ( 消息为对象 )
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* kafka发送手机短信的方法
*/
@PostMapping("/sendCode/{tel}")
public Result sendCode(@PathVariable("tel") String tel){
log.info("功能名称:发生验证码的方法,请求路径:[{}],请求方法:[{}],请求参数:[{}]",request.getRequestURI()
,request.getMethod(),JSON.toJSONString(tel));
// 获取短信验证码
String numbers = RandomUtil.randomNumbers(4);
// 存入redis
redisTemplate.opsForValue().set(tel,numbers,5, TimeUnit.MINUTES);
CodeRequest codeRequest = new CodeRequest();
codeRequest.setTel(tel);
codeRequest.setCode(numbers);
// 异步发生kak
kafkaTemplate.send("code",JSON.toJSONString(codeRequest));
Result result = Result.success();
log.info("功能名称:发生验证码的方法,请求路径:[{}],请求方法:[{}],响应参数:[{}]",request.getRequestURI()
,request.getMethod(),JSON.toJSONString(result));
return result;
}
kafka接收消息
@KafkaListener(topics = "code")
public void testcode(String code){
if (!StringUtils.isEmpty(code)){
CodeRequest codeRequest = JSON.parseObject(code, CodeRequest.class);
TelSmsUtils.sendSms(codeRequest.getTel(),"10001",new HashMap<String,String>(){{
put("code",codeRequest.getCode());
}});
System.out.println("kafka消费成功.....................................................");
}
}
kafka发送消息 ( 消息为字符串 )
package com.heima.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("itcast-topic","黑马程序员");
return "ok";
}
}
kafka接收消息
package com.heima.kafka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class HelloListener {
@KafkaListener(topics = "itcast-topic")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}
}
}
GitHub 加速计划 / na / nacos
29.83 K
12.75 K
下载
Nacos是由阿里巴巴开源的服务治理中间件,集成了动态服务发现、配置管理和服务元数据管理功能,广泛应用于微服务架构中,简化服务治理过程。
最近提交(Master分支:3 个月前 )
4334cd16
* Support custom client configuration timeout.(#12748)
* Add UT.(#12748) 12 天前
b04d2266
16 天前
更多推荐
已为社区贡献4条内容
所有评论(0)