参考博客:
看完这篇Kafka,你也许就会了Kafka

一、kafka安装使用

步骤:

brew install zookeeper
brew install kafka

安装位置在/usr/local/Cellar,安装完成后会有两个文件夹zookeeperkafka
配置文件在/usr/local/etc/kafka,里面有server.propertieszookeeper.properties
启动zookeeper和kafaka:

#第一种方式,使用 brew services 插件启动
brew services start zookeeper
brew services start kafka

#第二种方式,直接启动
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
kafka-server-start /usr/local/etc/kafka/server.properties

创建topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

查看topic:

kafka-topics --list --zookeeper localhost:2181

生产者消息

kafka-console-producer --broker-list localhost:9092 --topic test1

新建一个shell窗口,作为消费者消息窗口

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning

效果:生产者消息窗口输入什么,消费者消息窗口输出什么。
kafka命令大全:

OptionDescription
–alterAlter the number of partitions, eplica assignment, and/or configuration for the topic.
–at-min-isr-partitionsif set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.
–bootstrap-server <String: server to connect to>REQUIRED: The Kafka server to connect to. In case of providing this, a direct Zookeeper connection won’t be required.
–command-config <String: command config property file>Property file containing configs to be passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.
–config <String: name=value>A topic configuration override for the topic being created or altered. The following is a list of valid configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate # retention.bytes # retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full etails on the topic configs. It is supported only in combination with --create if --bootstrap-server option is used (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option).
–createCreate a new topic.
–deleteDelete a topic
–delete-config <String: name>A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option.
–describeList details for the given topics.
–disable-rack-awareDisable rack aware replica assignment
–exclude-internalexclude internal topics when running list or describe command. The internal topics will be listed by default
–forceSuppress console prompts
–helpPrint usage information.
–if-existsif set when altering or deleting or describing topics, the action will only execute if the topic exists.
–if-not-existsif set when creating topics, the action will only execute if the topic does not already exist.
–listList all available topics.
–partitions <Integer: # of partitions>The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.
–replica-assignment <String:
broker_id_for_part1_replica1:
broker_id_for_part1_replica2,
broker_id_for_part2_replica1: broker_id_for_part2_replica2,
…>
A list of manual partition-to-broker assignments for the topic being created or altered.
–replication-factor <Integer: replication factor>The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.
–topic <String: topic>The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the ‘’ prefix to escape regular expression symbols; e.g. “test.topic”.
–topics-with-overridesif set when describing topics, only show topics that have overridden configs
–unavailable-partitionsif set when describing topics, only show partitions whose leader is not available
–under-min-isr-partitionsif set when describing topics, only show partitions whose isr count is less than the configured minimum. Not supported with the --zookeeper option.
–under-replicated-partitionsif set when describing topics, only show under replicated partitions
–versionDisplay Kafka version.
–zookeeper <String: hosts>DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.

使用--delete --topic demoName命令删除的时候会出现一条这样的消息:

Topic demoName is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

所以只是被标记删除,并没有真正地删除,要真正删除,需要在config/server.properties中设置delete.topic.enable=true

二、kafka简介

kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要用于大数据实时处理领域。
kafka基础架构主要存在生产者Producer、kafka集群Broker、消费者Consumer、注册消息Zookeeper。

  • Producer:消息生产者,向kafka中发布消息的角色。
  • Consumer:消息消费者,即从kafka中拉取消息消费的客户端。
  • Consumer Group:消费者组,一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区的消息,消费者之间互不影响,所有的消费者都属于某个消费组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够被一个消费者组中的一个消费者所消费
  • Broker:经纪人,一台kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个topic
  • Topic:主题,可以理解为一个队列。生产者和消费者都是面向一个topic。
  • Partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序不能保证全局有序)。
  • Replication:副本,为保证集群中某个节点发生故障,节点上Partition数据不丢失,kafka可以正常地工作,kafka提供了副本机制,一个topic的每个Partition有若干个副本,一个leader和多个follower。
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象。
  • Follower:每个分区多个副本的从角色,实时地从Leader中同步数据,leader发生故障的时候,某个follower会成为新的leader。

  • kafka工作流程:Topic是逻辑上的改变,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件的末端,且每条数据都有自己的offset,Consumer组中的每个Consumer都会实时记录自己消费到了哪个offset,以便于出错恢复的时候可以从上次的位置接续消费。
  • kafka的文件存储:一个Topic分为多个Partition,一个Partition分为多个segment,一个segment对应两个文件.log.index
    其中.index文件存储消息的offset+真实的起始偏移量,.log存储的是真实的数据。
    为快速定位大文件中消息位置,kafka采用了分片索引的机制来加速定位。
  • 生产者分区策略
    生产者ISR:为保证producer发送的数据能够可靠地发送到指定的topic中,topic中的每个Partition收到producer发送的数据后,都需要向producer发送ack,即acknowledgment,如果producer收到ack就会进行下一轮的发送,否则重新发送数据。
  • 生产者分区分配策略
    • 生产者ISR(in-sync replica set)【速率和leader相差低于10s的follower集合
      • 副本数据同步策略:全部follower同步完成才发送ack。这种方式比半数follower同步完成即发送ack的容错率要高。虽然这样网络延迟较高,但网络延迟对kafka的影响较小,而方案二会造成大量数据冗余。
    • 生产者ack机制
      • kafka为用户提供了三种可靠性级别,用户可根据可靠性和延迟的要求进行权衡选择不同的配置。
        参数0:producer不等待Broker的ack,提供了最低的延迟,Broker接收到还没有写入磁盘就已经返回,但Broker故障时可能丢失数据
        参数1:producer等待Broker的ack,Partition的leader落盘成功后返回ack,如果follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)。
        参数-1(all):producer等待Broker的ack、Partition的leader和ISR的follower全部落盘成功才返回ack,但如果是在follower同步完成后,Broker发送ack之前,此时leader发生故障的话会造成数据重复
        总结:producer返回ack,0无落盘直接返,1只leader落盘返,-1全部落盘然后返。
    • 数据一致性问题
      • LEO(Log End Offset):每个副本最后的一个offset
      • HW(High Watermark):高水位,指消费者能见到的最大的offset,ISR队列中最小的LEO。
      • follower故障:follower发生故障后会被临时提出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步,等待该follower的LEO≥该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
      • leader故障:leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。
      • 这只能保证副本之间的数据一致性,并不能保证数据不丢失或数据重复
    • Exactly Once(精确的一次)
      • ack级别设置为-1,保证producer答server之间不会丢失数据,即At Least Once,至少一次语义,不能保证数据不重复。ack级别设置为0,保证生产者每条消息只会被发送一次,即At Most Once,至多一次语义,保证数据不重复却不能保证数据不丢失。对于重要的数据,要求数据不重复也不丢失,即Exactly Once
        0.11版本的kafka引入了幂等性幂等性指代Producer无论向Server发送了多少次重复数据,Server端都只会持久化一条数据。启用幂等性,即在Producer的参数中设置enable.idempotence=true即可。但幂等性无法保证跨分区会话的Exactly Once。
  • 消费者分区分配策略
    • 消费方式:Consumer采用pull拉的方式从Broker中读取数据。
      push推的方式很难适应消费速率不同的消费者,典型问题是拒绝服务以及网络堵塞。而pull的方式可以让Consumer根据自己的消费处理能力以适当的速度消费消息。
    • kafka的两种分配策略:
      Round-Robin:主要采用轮询的方式分配所有的分区,会导致每个Consumer所承载的Partition数量不一致,从而导致各个Consumer压力不均。
      Range:采用重分配方式,首先计算各个Consumer将会承载的Partition数量,然后将指定数量的Partition分配给该Consumer。按照range的方式进行分配,本质是以此遍历每个topic,然后将这个topic按照其订阅的Consumer数量进行平均分配,多出来的则按照Consumer的字典序挨个分配,这样会导致在前面的Consumer得到更多的Partition,导致各个Consumer压力不均。
    • 消费者offset的存储:kafka0.9版本之前,Consumer默认将offset保存在zookeeper中,0.9版本之后默认保存在kafka一个内置的topic中,为__consumer_offsets
    • 同一个消费者组中的消费者,同一时刻只能有一个消费者消费
  • 高效读写 & zookeeper的作用
    • kafka的高效读写
      • 顺序读磁盘:kafka的Producer生产数据写入log文件中时采用的是顺序写的方式。顺序写之所以快和磁盘的机械结构有关,省去了大量的磁头寻址的时间。
      • 零拷贝技术:这种方式只用将磁盘文件的数据复制到页面缓存一次,然后将数据从页面缓存直接发送到网络中,从而避免了重复复制的操作。
    • kafka中zookeeper的作用
      • kafka集群中有一个Broker会被选举为Controller,负责管理Broker的上下线、所有topic的分区副本分配和leader的选举(leader选举策略为先到先得)等工作。controller的工作管理依赖于zookeeper。
  • 事务

kafka从0.11版本开始引入事务支持,事务可以保证kafka在Exactly Once语义的基础上,生产和消费的跨分区会话,要么全部成功,要么全部失败。

  • Producer事务
GitHub 加速计划 / br / brew
40.4 K
9.47 K
下载
🍺 The missing package manager for macOS (or Linux)
最近提交(Master分支:30 天前 )
90a90b30 2 个月前
47b1cab7 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐