百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

0748-5.14.4-Kafka的扩容和缩容

csdh11 2025-03-03 18:11 24 浏览

?文档编写目的

在Kafka集群资源使用已超出系统配置的资源时,或者有大量资源闲置造成资源浪费的时候,需要分别通过扩容Kafka和缩容Kafka来进行调整。本篇文章Fayson主要介绍如何进行Kafka的扩容和缩容,以及变更后的Kafka集群如何进行负载均衡的操作。

  • 测试环境:

1.Redhat7.2

2.采用root用户操作

3.CM为5.16.2,CDH为5.14.4

4.Kafka版本为0.10.2

5.集群启用了Kerberos,Kafka未启用Kerberos和Sentry


Kafka集群的扩容

2.1 当前Kafka集群状态


集群中有3个kafka broker


有2个topic,分别为test和test1,情况如下



2.2 扩容前准备

新扩容的机器要先加入集群中,通过CM管理,按照下面的步骤进行操作

1.修改新添加的机器的hostname

[root@hadoop6?~]#?hostnamectl?set-hostname?cdh04.hadoop.com


2.修改/etc/hosts文件并同步到所有节点,这里用脚本来实现。

192.168.0.204?cdh01.hadoop.com?cdh01
192.168.0.205?cdh02.hadoop.com?cdh02
192.168.0.206?cdh03.hadoop.com?cdh03
192.168.0.195?cdh04.hadoop.com?cdh04


3.新添加的节点关闭防火墙,设置开机自动关闭

[root@cdh04?~]#?systemctl?stop?firewalld
[root@cdh04?~]#?systemctl?disable?firewalld


4.新添加的节点禁用SELinux,并修改修改/etc/selinux/config

[root@cdh04?~]#?setenforce?0
setenforce:?SELinux?is?disabled
[root@cdh04?~]#?vim?/etc/selinux/config


5.新添加的节点关闭透明大页面,并且在/etc/rc.d/rc.local 里面加入脚本,设置自动开机关闭。

[root@cdh04?~]#?echo?never?>?/sys/kernel/mm/transparent_hugepage/defrag
[root@cdh04?~]#?echo?never?>?/sys/kernel/mm/transparent_hugepage/enabled
[root@cdh04?~]#?vim?/etc/rc.d/rc.local
if?test?-f?/sys/kernel/mm/transparent_hugepage/defrag
then?echo?never?>?/sys/kernel/mm/transparent_hugepage/defrag
fi
if?test?-f?/sys/kernel/mm/transparent_hugepage/enabled
then?echo?never?>?/sys/kernel/mm/transparent_hugepage/enabled
fi


[root@cdh04?~]#?chmod?+x?/etc/rc.d/rc.local


6.新添加的节点设置SWAP,并且设置开机自动更改

[root@cdh04?~]#?sysctl?vm.swappiness=1
vm.swappiness?=?1
[root@cdh04?~]#?echo?vm.swappiness?=?1??>>?/etc/sysctl.conf


7.新添加的节点配置时钟同步,先在所有服务器卸载chrony,再安装ntp,再修改配置把时钟同步跟其他kafka broker节点保持一致

[root@cdh04?~]#?yum?-y?remove?chrony
[root@cdh04?~]#?yum?-y?install?ntp
vim?/etc/ntp.conf

启动ntp服务,并设置开机启动

[root@cdh04?java]#?systemctl?start?ntpd[root@cdh04 java]# systemctl enable ntpd


8.新添加的节点安装kerberos客户端,并进行配置

[root@cdh04?~]#?yum?-y?install?krb5-libs?krb5-workstation

从其他节点拷贝/etc/krb5.conf文件到新节点


9.从其他节点拷贝集群使用的JDK到新节点/usr/java目录下

[root@cdh01?java]#?scp?-r?jdk1.8.0_131/?cdh04:/usr/java/jdk1.8.0_131/


10.在CM编辑主机模板kafka,只添加kafka broker角色


2.3 扩容Kafka

1.添加新的节点到集群,从CM主页点击Add Hosts,如下图所示


2.点击继续


3.搜索主机,输入主机名,点击搜索并继续


4.输入自定义存储库地址,并继续


5.点击安装JDK并继续


6.输入主机密码并继续


7.正在进行安装,安装成功后点击继续

激活完成,点击继续


8.点击继续,进入主机检查,添加主机完成


9.应用主机模板kafka,扩容完成


扩容完成


扩容后平衡

在扩容完成后,可以通过自带的命令来生成topic的平衡策略和执行平衡的操作。

3.1 生成平衡策略

1.查询当前创建的topic

[root@cdh01?~]#?kafka-topics?--list?--zookeeper?cdh01.hadoop.com:2181


2.按照查询到的topic来创建文件topics-to-move.json 格式如下

{"topics":?[{"topic":?"test"},
{"topic":?"test1"}
????????????],
"version":1
}



3.在CM查询kafka broker的ID


4.用命令生成迁移方案,下面画红框的就是生成的平衡方案

kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--topics-to-move-json-file?topics-to-move.json??--broker-list?"121,124,125,126"?--generate


5.把Proposed partition reassignment configuration下的内容复制保存为json文件newkafka.json,平衡方案生成完成。


3.2 执行平衡策略

1.执行下面的命令来进行平衡

[root@cdh01?~]#?kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--execute


2.用下面命令来进行查看执行的进度

kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--verify


从结果可以看到已经执行完成。,并且在新添加的节点上也可以看到有topic的副本存过来了。

缩容前的准备

1.编辑之前创建的topics-to-move.json文件,添加上系统自动生成的__consumer_offsets

[root@cdh01?~]#?vim?topics-to-move.json?
{"topics":?[{"topic":?"test"},{"topic":?"__consumer_offsets"},
{"topic":?"test1"}
????????????],
"version":1
}


2.再次使用命令生成迁移计划,这里只选取121,124,126这三个broker,然后把生成计划中的126替换成125进行保存,这样就把126上的数据全部迁移到了125上。

kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--topics-to-move-json-file?topics-to-move.json??--broker-list?"121,124,126"?--generate


3.执行迁移命令,进行迁移

kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--execute


4.进行查询,迁移完成

kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--verify


5.在要删除的broker上也可以看到,topic数据已经迁移走


Kafka集群的缩容

在完成上诉缩容前的准备后,现在可以进行kafka集群的缩容。

1.从CM进入Kafka的实例界面


2.勾选要删除的broker,先停止该broker


3.停止完成后,进行删除


删除完成。

总结

1.Kafka集群的扩容和缩容可以通过CM来进行添加broker和删除broker来进行。


2.在Kafka集群扩容后,已有topic的partition不会自动均衡到新的磁盘上。可以通过kafka-reassign-partitions命令来进行数据平衡,先用命令生成平衡方案,再执行。也可以手动编辑迁移方案来进行执行。


3.新建topic的partition, 会以磁盘为单位,按照partition数量最少的来落盘。


4.在Kafka缩容前,需要把要删除的broker上的topic数据迁出,也可以通过kafka-reassign-partitions来进行迁移,手动编辑迁移方案,再通过命令执行即可。



5.kafka-reassign-partitions这个命令,只有指定了broker id上的topic才会参与partition的reassign。根据我们的需求,可以手动来编写和修改迁移计划。

相关推荐

探索Java项目中日志系统最佳实践:从入门到精通

探索Java项目中日志系统最佳实践:从入门到精通在现代软件开发中,日志系统如同一位默默无闻却至关重要的管家,它记录了程序运行中的各种事件,为我们排查问题、监控性能和优化系统提供了宝贵的依据。在Java...

用了这么多年的java日志框架,你真的弄懂了吗?

在项目开发过程中,有一个必不可少的环节就是记录日志,相信只要是个程序员都用过,可是咱们自问下,用了这么多年的日志框架,你确定自己真弄懂了日志框架的来龙去脉嘛?下面笔者就详细聊聊java中常用日志框架的...

物理老师教你学Java语言(中篇)(物理专业学编程)

第四章物质的基本结构——类与对象...

一文搞定!Spring Boot3 定时任务操作全攻略

各位互联网大厂的后端开发小伙伴们,在使用SpringBoot3开发项目时,你是否遇到过定时任务实现的难题呢?比如任务调度时间不准确,代码报错却找不到方向,是不是特别头疼?如今,随着互联网业务规模...

你还不懂java的日志系统吗 ?(java的日志类)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

谈谈枚举的新用法--java(java枚举的作用与好处)

问题的由来前段时间改游戏buff功能,干了一件愚蠢的事情,那就是把枚举和运算集合在一起,然后运行一段时间后buff就出现各种问题,我当时懵逼了!事情是这样的,做过游戏的都知道,buff,需要分类型,且...

你还不懂java的日志系统吗(javaw 日志)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

Java 8之后的那些新特性(三):Java System Logger

去年12月份log4j日志框架的一个漏洞,给Java整个行业造成了非常大的影响。这个事情也顺带把log4j这个日志框架推到了争议的最前线。在Java领域,log4j可能相对比较流行。而在log4j之外...

Java开发中的日志管理:让程序“开口说话”

Java开发中的日志管理:让程序“开口说话”日志是程序员的朋友,也是程序的“嘴巴”。它能让程序在运行过程中“开口说话”,告诉我们它的状态、行为以及遇到的问题。在Java开发中,良好的日志管理不仅能帮助...

吊打面试官(十二)--Java语言中ArrayList类一文全掌握

导读...

OS X 效率启动器 Alfred 详解与使用技巧

问:为什么要在Mac上使用效率启动器类应用?答:在非特殊专业用户的环境下,(每天)用户一般可以在系统中进行上百次操作,可以是点击,也可以是拖拽,但这些只是过程,而我们的真正目的是想获得结果,也就是...

Java中 高级的异常处理(java中异常处理的两种方式)

介绍异常处理是软件开发的一个关键方面,尤其是在Java中,这种语言以其稳健性和平台独立性而闻名。正确的异常处理不仅可以防止应用程序崩溃,还有助于调试并向用户提供有意义的反馈。...

【性能调优】全方位教你定位慢SQL,方法介绍下!

1.使用数据库自带工具...

全面了解mysql锁机制(InnoDB)与问题排查

MySQL/InnoDB的加锁,一直是一个常见的话题。例如,数据库如果有高并发请求,如何保证数据完整性?产生死锁问题如何排查并解决?下面是不同锁等级的区别表级锁:开销小,加锁快;不会出现死锁;锁定粒度...

看懂这篇文章,你就懂了数据库死锁产生的场景和解决方法

一、什么是死锁加锁(Locking)是数据库在并发访问时保证数据一致性和完整性的主要机制。任何事务都需要获得相应对象上的锁才能访问数据,读取数据的事务通常只需要获得读锁(共享锁),修改数据的事务需要获...