「Kafka」Broker篇
主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。
Kafka Broker 总体工作流程
Zookeeper 存储的 Kafka 信息
- 启动 Zookeeper 客户端:
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
- 通过 ls 命令可以查看 kafka 相关信息:
[zk: localhost:2181(CONNECTED) 2] ls /kafka
Kafka Broker 总体工作流程
模拟 Kafka 上下线,Zookeeper 中数据变化:
- 查看
/kafka/brokers/ids
路径上的节点: - 查看
/kafka/controller
路径上的数据: - 查看
/kafka/brokers/topics/first/partitions/0/state
路径上的数据: - 停止 hadoop104 上的 kafka:
- 再次查看
/kafka/brokers/ids
路径上的节点 - 再次查看
/kafka/controller
路径上的数据 - 再次查看
/kafka/brokers/topics/first/partitions/0/state
路径上的数据 - 启动 hadoop104 上的 kafka
- 再次观察 1、2、3 步骤中的内容。
Broker 重要参数
生产经验—节点服役和退役
服役新节点
新节点准备
- 关闭 hadoop104,并右键执行克隆操作
- 开启 hadoop105,并修改 IP 地址
- 在 hadoop105 上,修改主机名称为 hadoop105
- 重新启动 hadoop104、hadoop105
- 修改 haodoop105 中 kafka 的 broker.id 为
3
,保证唯一
。[atguigu@hadoop105 config]$ vim server.properties
- 删除 hadoop105 中 kafka 下的 datas 和 logs
[atguigu@hadoop105 kafka]$ rm -rf datas/* logs/*
- 启动 hadoop102、hadoop103、hadoop104 上的 kafka 集群
[atguigu@hadoop102 ~]$ zk.sh start [atguigu@hadoop102 ~]$ kf.sh start
- 单独启动 hadoop105 中的 kafka
[atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
我们先来看一下 first 主题的信息:
目前 first 主题的信息仍然只存在 broker0、1、2上,但 broker3 并没有帮助分担历史数据,所以我们需要负载均衡的操作。
执行负载均衡操作
- 创建一个要均衡的主题:
[atguigu@hadoop102 kafka]$ vim topics-to-move.json { "topics": [ { "topic": "first"} ], "version": 1 }
- 生成一个负载均衡的计划
- 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
输入以下内容(刚生成的计划):
{ "version":1,"partitions":[{ "topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{ "topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{ "topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
- 执行副本存储计划:
- 验证副本存储计划:
退役旧节点
执行负载均衡操作
先按照退役一台节点,生成
执行计划
,然后按照服役时操作流程执行负载均衡
。把要退役节点的数据导入到其他节点上。
- 创建一个要均衡的主题
[atguigu@hadoop102 kafka]$ vim topics-to-move.json { "topics": [ { "topic": "first"} ], "version": 1 }
- 创建执行计划
- 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json { "version":1,"partitions":[{ "topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{ "topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{ "topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
- 执行副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
- 验证副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first
执行停止命令
在 hadoop105 上执行停止命令即可:
[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
Kafka 副本
副本基本信息
- Kafka 副本作用:提高数据可靠性。
- Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;
- 太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka 中副本分为:Leader 和 Follower。
- Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
A R = I S R + O S R AR = ISR + OSR AR=ISR+OSR
I S R ISR ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms
参数设定,默认 30s
。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
O S R OSR OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。
Leader 选举会按照 AR 的顺序进行选取,就是下图中的 Replicas
顺序:
Leader 和 Follower 故障处理细节
Follower 故障处理细节
消费者可见的数据最大 offset 就是 4, H W − 1 HW - 1 HW−1。
该 Follower 先被踢出 ISR 队列,然后其余的 Leader、Follower继续接受数据。如果该 Follower 恢复了,会读取本地磁盘上次记录的 HW,并裁剪掉 高于 HW 的数据,从 HW 开始向 Leader 进行同步数据。
待该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上了 Leader,就可以重新加入 ISR 了。
Leader 故障处理细节
broker0 一开始是 Leader,然后挂掉了,选举 broker1 为新的 Leader,然后其余的 Follower 会把各自 log 文件高于 HW 的部分裁剪掉,然后从新的 Leader 同步数据。
分区副本分配
如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?
创建 16 分区,3 个副本
- 创建一个新的 topic,名称为 second
- 查看分区和副本情况:
依次错开,让每一个副本负载均衡,均匀分配,也可以保证数据的可靠性。
生产经验—手动调整分区副本存储
在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求:创建一个新的topic,4个分区,两个副本,名称为 three。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。
手动调整分区副本存储的步骤如下:
生产经验—Leader Partition 负载平衡
真正生产环境建议关闭,或设置 percentage 为 20%、30%,不要频繁的触发自平衡,浪费集群大量性能。
生产经验—增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
- 创建 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
- 手动增加副本存储
- 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
输入如下内容:
{ "version":1,"partitions":[{ "topic":"four","partition":0,"replicas":[0,1,2]},{ "topic":"four","partition":1,"replicas":[0,1,2]},{ "topic":"four","partition":2,"replicas":[0,1,2]}]}
- 执行副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
- 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)
文件存储
文件存储机制
Topic 数据的存储机制
kafka 中默认数据保存 7 天,通过 .timeindex
文件判断日志保存多久,过期会定时清理对应的数据,详情参考下方的 - 文件清理策略。
思考:Topic 数据到底存储在什么位置?
index 文件和 log 文件详解
.index 文件面试考点:存储的索引是对每一条数据都进行存储吗?
答:这里用的是稀疏索引
,log 文件每存储4kb
的数据,就会往 index 文件中写入一条索引。
说明:日志存储参数配置
文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:
log.retention.hours
,最低优先级,小时,默认 7 天。log.retention.minutes
,分钟。log.retention.ms
,最高优先级,毫秒。log.retention.check.interval.ms
,负责设置检查周期,默认 5 分钟。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
1)delete 日志删除:将过期数据删除
log.cleanup.policy = delete
所有数据启用删除策略(默认)- 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
- 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
log.retention.bytes
,默认等于-1
,表示无穷大,其实就是关闭掉了。
思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?
以 segment 中所有记录中的最大时间戳作为该文件时间戳,进行删除。
也就是只要这个 segment 中有数据还未过期,就不进行删除操作。
2)compact 日志压缩
高效读写数据
分布式集群
Kafka 本身是分布式集群,可以采用分区技术,并行度高。
稀疏索引
读数据采用稀疏索引,可以快速定位要消费的数据。
顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
页缓存 + 零拷贝技术
今天的文章 「Kafka」Broker篇分享到此就结束了,感谢您的阅读。笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/85394.html