Kafka根本原理

源码 2024-10-5 17:15:59 62 0 来自 中国
官方文档:https://kafka.apache.org/24/documentation.html#brokerconfigs
1.Kafka适用场景

日记网络:一个公司可以用Kafka网络各种服务的log,通过kafka以同一接口服务的方式开放给各种consumer,比方hadoop、Hbase、Solr等。
消息体系:解耦和生产者和消耗者、缓存消息等。
用户活动跟踪:Kafka经常被用来记载web用户大概app用户的各种活动,如欣赏网页、搜刮、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做及时的监控分析,大概装载到hadoop、数据堆栈中做离线分析和发掘。
运营指标:Kafka也经常用来记载运营监控数据。包罗网络各种分布式应用的数据,生产各种操纵的会合反馈,比如报警和陈诉。
2.Kafka根本概念

3.生产者

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载平衡。也可以根据消息中的某一个关键字来举行区分。通常第二种方式使用的更多。
1)写入方式
producer 采取 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于次序写磁盘(次序写磁盘服从比随机写内存要高,保障 kafka 吞吐率)。
2)消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
  • 指定了 patition,则直接使用;
  • 未指定 patition 但指定 key,通过对 key 的 value 举行hash 选出一个 patition
  • patition 和 key 都未指定,使用轮询选出一个 patition。
3)写入流程
  • producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
  • producer 将消息发送给该 leader
  • leader 将消息写入当地 log
  • followers 从 leader pull 消息,写入当地 log 后 向leader 发送 ACK
  • leader 收到全部 ISR 中的 replica 的 ACK 后,增长 HW(high watermark,末了 commit 的 offset) 并向 producer 发送 ACK
4.Broker

4.1 Topic

可以明确Topic是一个种别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区(Partition)日记文件:
  • Partition是一个有序的message序列,这些message按次序添加到一个叫做commit log的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。
  • 每个partition,都对应一个commit log文件。一个partition中的message的offset都是唯一的,但是差别的partition中的message的offset可能是相同的。
  • kafka一样寻常不会删除消息,不管这些消息有没有被消耗。只会根据配置的日记保存时间(log.retention.hours)确认消息多久被删除,默认保存迩来一周的日记消息。kafka的性能与保存的消息数据量巨细没有关系,因此生存大量的数据消息日记信息不会有什么影响。
  • 每个consumer是基于自己在commit log中的消耗进度(offset)来举行工作的。在kafka中,消耗offset由consumer自己来维护;一样寻常情况下我们按照次序逐条消耗commit log中的消息,固然我可以通过指定offset来重复消耗某些消息,大概跳过某些消息。
  • 这意味kafka中的consumer对集群的影响黑白常小的,添加一个大概淘汰一个consumer,对于集群大概其他consumer来说,都是没有影响的,由于每个consumer维护各自的消耗offset。
Topic的情况:
  • leader节点负责给定partition的全部读写请求,同一个主题差别分区leader副本一样寻常不一样(为了容灾)
  • replicas 体现某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,以致这个节点挂了,也会列出。
  • isr 是replicas的一个子集,它只列出当前还存在世的,而且已同步备份了该partition的节点。
4.2 HW与LEO详解

HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消耗到HW所在的位置。别的每个replica都有HW,leader和follower各自尊责更新自己的HW的状态。对于leader新写入的消息,consumer不能立即消耗,leader会期待该消息被全部ISR中的replicas同步后更新HW,此时消息才气被consumer消耗。如许就包管了如果leader所在的broker失效,该消息仍然可以重新推举的leader中获取。对于来自内部broker的读取请求,没有HW的限定。
5.png 由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求全部能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被以为已经commit,这种情况下如果follower都还没有复制完,落伍于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的平衡了确保数据不丢失以及吞吐率。
4.3 日记分段存储

Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号定名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里,这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G,做这个限定目标是为了方便把 log 文件加载到内存去操纵
# 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记载一条当前消息的offset到index文件,# 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找详细消息00000000000000000000.index# 消息存储文件,重要存offset和消息体00000000000000000000.log# 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记载一条当前消息的发送时间戳与对应的offset到timeindex文件,# 如果必要按照时间来定位消息的offset,会先在这个文件里查找00000000000000000000.timeindex00000000000005367851.index00000000000005367851.log00000000000005367851.timeindex00000000000009936472.index00000000000009936472.log00000000000009936472.timeindex这个 9936472 之类的数字,就是代表了这个日记段文件里包罗的起始 Offset,也就分析这个分区里至少都写入了靠近 1000 万条数据了。
Kafka Broker 有一个参数,log.segment.bytes,限定了每个日记段文件的巨细,最大就是 1GB。
一个日记段文件满了,就自动开一个新的日记段文件来写入,制止单个文件过大,影响文件的读写性能,这个过程叫做 log rolling,正在被写入的谁人日记段文件,叫做 active log segment。
4.4 Controller推举以及副本推举

Kafka核心总控制器Controller
在Kafka集群中会有一个大概多个broker,此中有一个broker会被推举为控制器(Kafka Controller),它负责管理整个集群中全部分区和副本的状态。
  • 当某个分区的leader副本出现故障时,由控制器负责为该分区推举新的leader副本。
  • 当检测到某个分区的ISR聚集发生厘革时,由控制器负责关照全部broker更新其元数据信息。
  • 当使用kafka-topics.sh脚本为某个topic增长分区数目时,同样还是由控制器负责让新分区被其他节点感知到。
Controller推举机制
  • 在kafka集群启动的时间,会自动推举一台broker作为controller来管理整个集群,推举的过程是集群中每个broker都会实行在zookeeper上创建一个 /controller 临时节点,zookeeper会包管有且仅有一个broker能创建乐成,这个broker就会成为集群的总控器controller。
  • 当这个controller脚色的broker宕机了,此时zookeeper临时节点会消散,集群里其他broker会不绝监听这个临时节点,发现临时节点消散了,就竞争再次创建临时节点,就是我们上面说的推举机制,zookeeper又会包管有一个broker成为新的controller。
具备控制器身份的broker必要比其他平凡的broker多一份职责,详细细节如下:
  • 监听broker相干的厘革。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理处罚broker增减的厘革。
  • 监听topic相干的厘革。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理处罚topic增减的厘革;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理处罚删除topic的动作。
  • 从Zookeeper中读取获取当前全部与topic、partition以及broker有关的信息并举行相应的管理。对于全部topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配厘革。
  • 更新集群的元数据信息,同步到其他平凡的broker节点中。
Partition副本推举Leader机制
controller感知到分区leader所在的broker挂了(controller监听了许多zk节点可以感知到broker存活),controller会从ISR列表(参数unclean.leader.election.enable=false的条件下)里挑第一个broker作为leader(第一个broker开始放进ISR列表,可能是同步数据最多的副本),如果参数unclean.leader.election.enable为true,代表在ISR列表里全部副本都挂了的时间可以在ISR列表以外的副本中选leader,这种设置,可以进步可用性,但是选出的新leader有可能数据少许多。
副本进入ISR列表有两个条件:
  • 副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通
  • 副本能复制leader上的全部写操纵,而且不能落伍太多。(与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,凌驾这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)
5.消耗者

5.1 消耗模式

传统的消息通报模式有2种:队列( queue) 和(publish-subscribe)
  • queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。
  • publish-subscribe模式:消息会被广播给全部的consumer。
Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group。
  • queue模式:全部的consumer都位于同一个consumer group 下。
  • publish-subscribe模式:全部的consumer都有着自己唯一的consumer group。
5.2 消耗次序

消耗次序
  • 一个partition同一个时候在一个consumer group中只能有一个consumer instance在消耗,从而包管消耗次序。
  • consumer group中的consumer instance的数目不能比一个Topic中的partition的数目多,否则,多出来的consumer消耗不到消息。
  • Kafka只在partition的范围内包管消息消耗的局部次序性,不能在同一个topic中的多个partition中包管总的消耗次序性。
  • 如果有在总体上包管消耗次序的需求,那么我们可以通过将topic的partition数目设置为1,将consumer group中的consumer instance数目也设置为1,但是如许会影响性能,以是kafka的次序消耗很少用。
5.3 消耗者消耗消息的offset记载机制

每个consumer会定期将自己消耗分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时间,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清算topic里的消息,末了就保存最新的那条数据。
由于__consumer_offsets可能会吸收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),如许可以通过加呆板的方式抗大并发。
通过如下公式可以选出consumer消耗的offset要提交到__consumer_offsets的哪个分区
公式:hash(consumerGroupId)  %  __consumer_offsets主题的分区数
5.4 消耗者Rebalance机制

rebalance就是说如果消耗组里的消耗者数目有厘革或消耗的分区数有厘革,kafka会重新分配消耗者消耗分区的关系。比如consumer group中某个消耗者挂了,此时会自动把分配给他的分区交给其他的消耗者,如果他又重启了,那么又会把一些分区重新交还给他。
注意:rebalance只针对subscribe这种不指定分区消耗的情况,如果通过assign这种消耗方式指定了分区,kafka不会举行rebanlance。
如下情况可能会触发消耗者rebalance
  • 消耗组里的consumer增长或淘汰了
  • 动态给topic增长了分区
  • 消耗组订阅了更多的topic
rebalance过程中,消耗者无法从kafka消耗消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百个,那重平衡可能会耗时极多,以是应只管制止在体系高峰期的重平衡发生。
消耗者Rebalance分区分配战略:
重要有三种rebalance的战略:range、round-robin、sticky。
Kafka 提供了消耗者客户端参数partition.assignment.strategy 来设置消耗者与订阅主题之间的分区分配战略。默认情况为range分配战略。
假设一个主题有10个分区(0-9),现在有三个consumer消耗:
  • range战略就是按照分区序号排序,假设 n=分区数/消耗者数目 = 3, m=分区数%消耗者数目 = 1,那么前 m 个消耗者每个分配 n+1 个分区,背面的(消耗者数目-m )个消耗者每个分配 n 个分区。
    比如分区03给一个consumer,分区46给一个consumer,分区7~9给一个consumer。
  • round-robin战略就是轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、8给一个consumer
  • sticky战略初始时分配战略与round-robin类似,但是在rebalance的时间,必要包管如下两个原则。
    1)分区的分配要尽可能匀称 。
    2)分区的分配尽可能与前次分配的保持相同。
    当两者发生辩说时,第一个目标优先于第二个目标 。如许可以最大程度维持原来的分区分配的战略。
    比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky战略分配的结果如下:
    consumer1除了原有的0~3,会再分配一个7
    consumer2除了原有的4~6,会再分配8和9
第一阶段:选择组和谐器
组和谐器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组和谐器coordinator,负责监控这个消耗组里的全部消耗者的心跳,以及判断是否宕机,然后开启消耗者rebalance。
consumer group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的组和谐器GroupCoordinator,并跟其创建网络毗连。
组和谐器选择方式:
consumer消耗的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer group的coordinator
第二阶段:到场消耗组JOIN GROUP
在乐成找到消耗组所对应的 GroupCoordinator 之后就进入到场消耗组的阶段,在此阶段的消耗者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理处罚相应。然后GroupCoordinator 从一个consumer group中选择第一个到场group的consumer作为leader(消耗组和谐器),把consumer group情况发送给这个leader,接着这个leader会负责订定分区方案。
第三阶段( SYNC GROUP)
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker举行网络毗连以及消息消耗。
6.Zookeeper

7.一些问题及办理方案

1、消息丢失情况:
消息发送端:
(1)acks=0: 体现producer不必要期待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最轻易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要期待leader已经乐成将数据写入当地log,但是不必要期待全部follower是否乐成写入。就可以继续发送下一条消息。这种情况下,如果follower没有乐成备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader必要期待全部备份(min.insync.replicas配置的备份个数)都乐成写入日记,这种战略会包管只要有一个备份存活就不会丢失数据。这是最强的数据包管。一样寻常除非是金融级别,或跟钱打交道的场景才会使用这种配置。固然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。
消息消耗端:
如果消耗这边配置的是自动提交,万一消耗到数据还没处理处罚完,就自动提交offset了,但是此时你consumer直接宕机了,未处理处罚完的数据丢失了,下次也消耗不到了。
2、消息重复消耗
消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经吸收到消息,但发送方会重新发送消息
消息消耗端:
如果消耗这边配置的是自动提交,刚拉取了一批数据处理处罚了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理处罚
一样寻常消耗端都是要做消耗幂等处理处罚的。
3、消息乱序
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送乐成才去发送下一条消息,如许可能会出现,发送了1,2,3条消息,第一条超时了,背面两条发送乐成,再重试发送第1条消息,这时消息在broker端的次序就是2,3,1了
以是,是否肯定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,固然acks不能设置为0,如许也能包管消息发送的有序。
kafka包管全链路消息次序消耗,必要从发送端开始,将全部有序消息发送到同一个分区,然后用一个消耗者去消耗,但是这种性能比力低,可以在消耗者端吸收到消息后将必要包管次序消耗的几条消耗发到内存队列(可以搞多个),一个内存队列开启一个线程次序处理处罚消息。
包管消息有序:
1)发送端:用同步发送的模式去发消息,固然acks不能设置为0,如许也能包管消息发送的有序。
2)broker端:将全部有序消息发送到同一个分区
3)消耗端:用一个消耗者去消耗;也可以在消耗者端吸收到消息后将必要包管次序消耗的几条消息发到内存队列(可以搞多个),一个内存队列开启一个线程次序处理处罚消息。
4、消息积蓄
1)线上偶然由于发送方发送消息速率过快,大概消耗方处理处罚消息过慢,可能会导致broker积蓄大量未消耗消息。
此种情况如果积蓄了上百万未消耗消息必要告急处理处罚,可以修改消耗端步调,让其将收到的消息快速转发到其他topic(可以设置许多分区),然后再启动多个消耗者同时消耗新主题的差别分区。
2)由于消息数据格式变动或消耗者步调有bug,导致消耗者不绝消耗不乐成,也可能导致broker积蓄大量未消耗消息。
此种情况可以将这些消耗不乐成的消息转发到别的队列里去(类似死信队列),背面再渐渐分析死信队列里的消息处理处罚问题。
5、延时队列
延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消耗者立即获取,而是期待特定的时间后,消耗者才气获取这个消息举行消耗,延时队列的使用场景有许多, 比如 :
1)在订单体系中, 一个用户下单之后通常有 30 分钟的时间举行支付,如果 30 分钟之内没有支付乐成,那么这个订单将举行非常处理处罚,这时就可以使用延时队列来处理处罚这些订单了。
2)订单完成1小时后关照用户举行评价。
实现思绪:发送延时消息时先把消息按照差别的耽误时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,...topic_2h,这个一样寻常不能支持恣意时间段的延时),然后通过定时器举行轮训消耗这些topic,检察消息是否到期,如果到期就把这个消息发送到详细业务处理处罚的topic中,队列中消息越靠前的到期时间越早,详细来说就是定时器在一次消耗过程中,对消息的发送时间做判断,看下是否耽误到对应时间了,如果到了就转发,如果还没到这一次定时使命就可以提前竣事了。
6、消息回溯
如果某段时间对已消耗消息盘算的结果以为有问题,可能是由于步调bug导致的盘算错误,当步调bug修复后,这时可能必要对之前已消耗的消息重新消耗,可以指定从多久之前的消息回溯消耗,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消耗
7、分区数越多吞吐量越高吗
从压测结果来看,分区数到达某个值吞吐量反而开始降落,实际上许多事变都会有一个临界值,当凌驾这个临界值之后,许多本来符合既定逻辑的走向又会变得差别。一样寻常情况分区数跟集群呆板数目相当就差不多了。
固然吞吐量的数值和走势还会和磁盘、文件体系、 I/O调理战略等因素相干。
8、消息通报保障
at most once(消耗者最多收到一次消息,0--1次):acks = 0 可以实现。
at least once(消耗者至少收到一次消息,1--多次):ack = all 可以实现。
exactly once(消耗者刚好收到一次消息):at least once 加上消耗者幂等性可以实现,还可以用kafka生产者的幂等性来实现。
kafka生产者的幂等性:由于发送端重试导致的消息重复发送问题,kafka的幂等性可以包管重复发送的消息只吸收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
详细实现原理是,kafka每次发送消息会天生PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会查抄PID和Sequence Number,如果相同不会再吸收。
PID:每个新的 Producer 在初始化的时间会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会天生新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。
9、kafka的事件
Kafka的事件差别于Rocketmq,Rocketmq是保障当地事件(比如数据库)与mq消息发送的事件一致性,Kafka的事件重要是保障一次发送多条消息的事件一致性(要么同时乐成要么同时失败),一样寻常在kafka的流式盘算场景用得多一点,比如,kafka必要对一个topic里的消息做差别的流式盘算处理处罚,处理处罚完分别发到差别的topic里,这些topic分别被差别的鄙俚体系消耗(比如hbase,redis,es等),这种我们肯定渴望体系发送到多个topic的数据保持事件一致性。
10、kafka高性能的缘故因由
  • 磁盘次序读写:kafka消息不能修改以及不会从文件中心删除包管了磁盘次序读,kafka的消息写入文件都是追加在文件末端,不会写入文件中的某个位置(随机写)包管了磁盘次序写。
  • 数据传输的零拷贝
  • 读写数据的批量batch处理处罚以及压缩传输
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-11-22 02:08, Processed in 0.178929 second(s), 37 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表