官网:http://kafka.apache.org/  

1.kafka安装前的配置

 Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

 Docker安装zookeeper

# 查看本地镜像
docker images
# 检索ZooKeeper 镜像
docker search zookeeper
# 拉取ZooKeeper镜像最新版本
docker pull zookeeper:latest
# 我使用的版本
docker pull zookeeper:3.4.14

 创建容器

docker run -d \
--privileged=true \
--name zookeeper \
--restart=always \
-p 2181:2181 zookeeper:3.4.14

参数说明:

-e TZ="Asia/Shanghai" # 指定上海时区 
-d # 表示在一直在后台运行容器
-p 2181:2181 # 对端口进行映射,将本地2181端口映射到容器内部的2181端口
--name # 设置创建的容器名称
-v # 将本地目录(文件)挂载到容器指定目录;
--restart always #始终重新启动zookeeper,看需求设置不设置自启动

Docker安装kafka

1.拉取镜像

docker pull wurstmeister/kafka:2.12-2.3.1

2.创建容器

docker run -d \
--privileged=true \
--name kafka \
--restart=always \
--env KAFKA_ADVERTISED_HOST_NAME=你自己的端口号 \
--env KAFKA_ZOOKEEPER_CONNECT=你自己的端口号:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你自己的端口号:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

创建一个关于kafka的项目

导入依赖

     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
 <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

导入yml的文件

# Tomcat
server:
  port: 9004
# Spring
spring:
  application:
    # 应用名称
    name: bxh-kafka
  profiles:
    # 环境配置
    active: dev
  main:
    # 允许使用循环引用
    allow-circular-references: true
    # 允许定义相同的bean对象 去覆盖原有的
    allow-bean-definition-overriding: true
  cloud:
    nacos:
      discovery:
        # 服务注册地址
        server-addr: 自己的端口号:8848
      config:
        # 配置中心地址
        server-addr: 自己的端口号:8848
        # 配置文件格式
        file-extension: yml
        # 共享配置
        shared-configs:
          - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos中新加配置(  哪一个微服务要用kafka就在那一个微服务nacos上面加上这个配置文件 )

spring:
  kafka:
    topic: -1
    bootstrap-servers: 自己的端口号:9092
    producer:
      retries: 10
      acks: all
      linger.ms: 1
      batch.size: 16384
      buffer.memory: 33554432 
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      auto-commit-interval: 1S
      auto-offset-reset: earliest
      enable-auto-commit: true
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产者发送消息

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**

 * 类说明

 *

 * @author zhuwenqiang

 * @date 2023/3/1

 */

public class ProducerQuickStart {

    public static void main(String[] args) {

        // 1; 构建 Properties 对象  存放 kafka 生产者配置信息

        Properties properties = new Properties();

        // 设置 kafka 连接地址

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "106.14.71.185:9092");

        // 设置 键值 序列化

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 2; 构建 Kafka 生产者对象

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 3: 发送消息

        // 封装发送消息对象

        ProducerRecord<String, String> record = new ProducerRecord<String, String>("bwie-topic", "10001", "hello kafka~~");

        kafkaProducer.send(record);

        // 4; 关系消息通道

        kafkaProducer.close();

    }

}

消费者接收消息

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

/**

 * 类说明

 *

 * @author zhuwenqiang

 * @date 2023/3/1

 */

public class ConsumerQuickStart {

    public static void main(String[] args) {

        // 构建kafka消费者配置

        Properties properties = new Properties();

        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "106.14.71.185:9092");

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");

        // 构建 Kafka 消费者对象

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题

        kafkaConsumer.subscribe(Collections.singleton("bwie-topic"));

        // 获取消息

        while(true) {

            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(record.key());

                System.out.println(record.value());

            }

        }

    }

}

总结

  1. 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)同一个分组

  2. 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)消费者设置不同的分组

kafka发送消息 (  消息为对象 )

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;


    /**
     * kafka发送手机短信的方法
     */
    @PostMapping("/sendCode/{tel}")
    public Result sendCode(@PathVariable("tel") String tel){
        log.info("功能名称:发生验证码的方法,请求路径:[{}],请求方法:[{}],请求参数:[{}]",request.getRequestURI()
                ,request.getMethod(),JSON.toJSONString(tel));
        // 获取短信验证码
        String numbers = RandomUtil.randomNumbers(4);
        // 存入redis
        redisTemplate.opsForValue().set(tel,numbers,5, TimeUnit.MINUTES);
        CodeRequest codeRequest = new CodeRequest();
        codeRequest.setTel(tel);
        codeRequest.setCode(numbers);
        // 异步发生kak
        kafkaTemplate.send("code",JSON.toJSONString(codeRequest));
        Result result = Result.success();
        log.info("功能名称:发生验证码的方法,请求路径:[{}],请求方法:[{}],响应参数:[{}]",request.getRequestURI()
                ,request.getMethod(),JSON.toJSONString(result));
        return  result;
    }

kafka接收消息

  @KafkaListener(topics = "code")
    public void testcode(String code){
        if (!StringUtils.isEmpty(code)){
            CodeRequest codeRequest = JSON.parseObject(code, CodeRequest.class);
            TelSmsUtils.sendSms(codeRequest.getTel(),"10001",new HashMap<String,String>(){{
                put("code",codeRequest.getCode());
            }});
            System.out.println("kafka消费成功.....................................................");
        }
    }

kafka发送消息 (  消息为字符串 )

package com.heima.kafka.controller;
​
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
​
@RestController
public class HelloController {
​
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
​
    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("itcast-topic","黑马程序员");
        return "ok";
    }
}

kafka接收消息

package com.heima.kafka.listener;
​
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
​
@Component
public class HelloListener {
​
    @KafkaListener(topics = "itcast-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }
​
    }
}

GitHub 加速计划 / na / nacos
29.83 K
12.75 K
下载
Nacos是由阿里巴巴开源的服务治理中间件,集成了动态服务发现、配置管理和服务元数据管理功能,广泛应用于微服务架构中,简化服务治理过程。
最近提交(Master分支:3 个月前 )
4334cd16 * Support custom client configuration timeout.(#12748) * Add UT.(#12748) 12 天前
b04d2266 16 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐