kafka java 今天来点猛的!Kafka万亿级消息实战
kafka java 今天来点猛的!Kafka万亿级消息实战
1.2.1 间数据迁移
不指定数据目录
//未指定迁移目录的迁移计划
{
"version":1,
"partitions":[
{"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},
{"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},
{"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}
]
}
指定数据目录
//指定迁移目录的迁移计划
{
"version":1,
"partitions":[
{"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
{"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
{"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}
]
}
1.2.2 内部磁盘间数据迁移
生产环境的服务器一般都是挂载多块硬盘,比如4块/12块等;那么可能出现在Kafka集群内部,各间流量比较均衡,但是在内部,各磁盘间流量不均衡,导致部分磁盘过载,从而影响集群性能和稳定,也没有较好的利用硬件资源。在这种情况下,我们就需要对内部多块磁盘的流量做负载均衡,让流量更均匀的分布到各磁盘上。
1.2.3 并发数据迁移
当前Kafka开源版本(2.1.1版本)提供的副本迁移工具“
bin/kafka–.sh”在同一个集群内只能实现迁移任务的串行。对于集群内已经实现多个资源组物理隔离的情况,由于各资源组不会相互影响,但是却不能友好的进行并行的提交迁移任务,迁移效率有点低下,这种不足直到2.6.0版本才得以解决。如果需要实现并发数据迁移,可以选择升级Kafka版本或者修改Kafka源码。
1.2.4 终止数据迁移
当前Kafka开源版本(2.1.1版本)提供的副本迁移工具“
bin/kafka–.sh”在启动迁移任务后,无法终止迁移。当迁移任务对集群的稳定性或者性能有影响时,将变得束手无策,只能等待迁移任务执行完毕(成功或者失败),这种不足直到2.6.0版本才得以解决。如果需要实现终止数据迁移,可以选择升级Kafka版本或者修改Kafka源码。
1.3 流量限制1.3.1 生产消费流量限制
经常会出现一些突发的,不可预测的异常生产或者消费流量会对集群的IO等资源产生巨大压力,最终影响整个集群的稳定与性能。那么我们可以对用户的生产、消费、副本间数据同步进行流量限制,这个限流机制并不是为了限制用户,而是避免突发的流量影响集群的稳定和性能,给用户可以更好的服务。
如下图所示,节点入流量由140MB/s左右突增到250MB/s,而出流量则从400MB/s左右突增至800MB/s。如果没有限流机制kafka java,那么集群的多个节点将有被这些异常流量打挂的风险,甚至造成集群雪崩。
对于生产者和消费者的流量限制,官网提供了以下几种维度组合进行限制(当然,下面限流机制存在一定缺陷,后面在“Kafka开源版本功能缺陷”我们将提到):
/config/users//clients/ //根据用户和客户端ID组合限流
/config/users//clients/
/config/users///根据用户限流 这种限流方式是我们最常用的方式
/config/users//clients/
/config/users//clients/
/config/users/
/config/clients/
/config/clients/
在启动Kafka的服务时需要开启JMX参数配置,方便通过其他应用程序采集Kafka的各项JMX指标进行服务监控。当用户需要调整限流阈值时,根据单个所能承受的流量进行智能评估,无需人工干预判断是否可以调整;对于用户流量限制,主要需要参考的指标包括以下两个:
(1)消费流量指标:ObjectName:kafka.server:type=Fetch,user=acl认证用户名称 属性:byte-rate(用户在当前broker的出流量)、throttle-time(用户在当前broker的出流量被限制时间)
(2)生产流量指标:ObjectName:kafka.server:type=Produce,user=acl认证用户名称 属性:byte-rate(用户在当前broker的入流量)、throttle-time(用户在当前broker的入流量被限制时间)
1.3.2 同步/数据迁移流量限制
副本迁移/数据同步流量限制官网地址:链接
涉及参数如下:
//副本同步限流配置共涉及以下4个参数
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas
辅助指标如下:
(1)副本同步出流量指标:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
(2)副本同步入流量指标:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
1.4 监控告警
关于Kafka的监控有一些开源的工具可用使用,比如下面这几种:
Kafka ;
Kafka Eagle;
Kafka ;
;
我们已经把Kafka 作为我们查看一些基本指标的工具嵌入平台,然而这些开源工具不能很好的融入到我们自己的业务系统或者平台上。所以,我们需要自己去实现一套粒度更细、监控更智能、告警更精准的系统。其监控覆盖范围应该包括基础硬件、操作系统(操作系统偶尔出现系统进程hang住情况,导致假死kafka java 今天来点猛的!Kafka万亿级消息实战,无法正常提供服务)、Kafka的服务、Kafka客户端应用程序、集群、上下游全链路监控。
1.4.1 硬件监控
网络监控:
核心指标包括网络入流量、网络出流量、网络丢包、网络重传、处于TIME.WAIT的TCP连接数、交换机、机房带宽、DNS服务器监控(如果DNS服务器异常,可能出现流量黑洞,引起大面积业务故障)等。
磁盘监控:
核心指标包括监控磁盘write、磁盘read(如果消费时没有延时,或者只有少量延时,一般都没有磁盘read操作)、磁盘、磁盘(这个指标如果过高说明磁盘负载较大)、磁盘存储空间、磁盘坏盘、磁盘坏块/坏道(坏道或者坏块将导致处于半死不活状态,由于有crc校验,消费者将被卡住)等。
CPU监控:
监控CPU空闲率/负载,主板故障等,通常CPU使用率比较低不是Kafka的瓶颈。
内存/交换区监控:
内存使用率,内存故障。一般情况下,服务器上除了启动Kafka的时分配的堆内存以外,其他内存基本全部被用来做。
缓存命中率监控:
由于是否读磁盘对Kafka的性能影响很大kafka java,所以我们需要监控Linux的缓存命中率,如果缓存命中率高,则说明消费者基本命中缓存。
系统日志:
我们需要对操作系统的错误日志进行监控告警,及时发现一些硬件故障。
1.4.2 服务监控
服务的监控,主要是通过在服务启动时指定JMX端口,然后通过实现一套指标采集程序去采集JMX指标。(服务端指标官网地址)
级监控
进程、入流量字节大小/记录数、出流量字节大小/记录数、副本同步入流量、副本同步出流量、间流量偏差、连接数、请求队列数、网络空闲率、生产延时、消费延时、生产请求数、消费请求数、上分布个数、上分布副本个数、上各磁盘流量、 GC等。
topic级监控
topic入流量字节大小/记录数、topic出流量字节大小/记录数、无流量topic、topic流量突变(突增/突降)、topic消费延时。
级监控
分区入流量字节大小/记录数、分区出流量字节大小/记录数、topic分区副本缺失、分区消费延迟记录、分区切换、分区数据倾斜(生产消息时,如果指定了消息的key容易造成数据倾斜,这严重影响Kafka的服务性能)、分区存储大小(可以治理单分区过大的topic)。
用户级监控
用户出/入流量字节大小、用户出/入流量被限制时间、用户流量突变(突增/突降)。
服务日志监控
对端打印的错误日志进行监控告警,及时发现服务异常。
1.4.3.客户端监控
客户端监控主要是自己实现一套指标上报程序,这个程序需要实现
org.apache.kafka.common.metrics.MetricsReporter 接口。然后在生产者或者消费者的配置中加入配置项 metric.reporters,如下所示:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//ClientMetricsReporter类实现org.apache.kafka.common.metrics.MetricsReporter接口
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
...
客户端指标官网地址:
_node
\
\
客户端监控流程架构如下图所示:
1.4.3.1 生产者客户端监控
维度:用户名称、客户端ID、客户端IP、topic名称、集群名称、;
指标:连接数、IO等待时间、生产流量大小、生产记录数、请求次数、请求延时、发送错误/重试次数等。
1.4.3.2 消费者客户端监控
维度:用户名称、客户端ID、客户端IP、topic名称、集群名称、消费组、、topic分区;
指标:连接数、io等待时间、消费流量大小、消费记录数、消费延时、topic分区消费延迟记录等。
1.4.4 监控
进程监控;
的切换监控;
服务的错误日志监控;
1.4.5 全链路监控
当数据链路非常长的时候(比如:业务应用->埋点SDk->数据采集->Kafka->实时计算->业务应用),我们定位问题通常需要经过多个团队反复沟通与排查才能发现问题到底出现在哪个环节,这样排查问题效率比较低下。在这种情况下,我们就需要与上下游一起梳理整个链路的监控。出现问题时,第一时间定位问题出现在哪个环节,缩短问题定位与故障恢复时间。
1.5 资源隔离1.5.1 相同集群不同业务资源物理隔离
我们对所有集群中不同对业务进行资源组物理隔离,避免各业务之间相互影响。在这里,我们假设集群有4个节点(
///),2个业务(业务A/业务B),他们分别拥有topic分区分布如下图所示,两个业务topic都分散在集群的各个上,并且在磁盘层面也存在交叉。
试想一下,如果我们其中一个业务异常,比如流量突增,导致节点异常或者被打挂。那么这时候另外一个业务也将受到影响,这样将大大的影响了我们服务的可用性,造成故障,扩大了故障影响范围。
针对这些痛点,我们可以对集群中的业务进行物理资源隔离,各业务独享资源,进行资源组划分(这里把4各划分为和两个资源组)如下图所示,不同业务的topic分布在自己的资源组内,当其中一个业务异常时,不会波及另外一个业务,这样就可以有效的缩小我们的故障范围,提高服务可用性。
1.6 集群归类
我们把集群根据业务特点进行拆分为日志集群、监控集群、计费集群、搜索集群、离线集群、在线集群等,不同场景业务放在不同集群,避免不同业务相互影响。
1.7 扩容/缩容1.7.1 topic扩容分区
随着topic数据量增长,我们最初创建的topic指定的分区个数可能已经无法满足数量流量要求,所以我们需要对topic的分区进行扩展。扩容分区时需要考虑一下几点:
必须保证topic分区与轮询的分布在资源组内所有上,让流量分布更加均衡,同时需要考虑相同分区不同副本跨机架分布以提高容灾能力;
当topic分区个数除以资源组节点个数有余数时,需要把余数分区优先考虑放入流量较低的。
1.7.2 上线
随着业务量增多,数据量不断增大,我们的集群也需要进行节点扩容。关于扩容,我们需要实现以下几点:
扩容智能评估:根据集群负载,把是否需要扩容评估程序化、智能化;
智能扩容:当评估需要扩容后,把扩容流程以及流量均衡平台化。
1.7.3 下线
某些场景下,我们需要下线我们的,主要包括以下几个场景:
一些老化的服务器需要下线,实现节点下线平台化;
服务器故障,故障无法恢复,我们需要下线故障服务器,实现节点下线平台化;
有更优配置的服务器替换已有节点,实现下线节点平台化。
1.8 负载均衡
我们为什么需要负载均衡呢?首先,我们来看第一张图,下图是我们集群某个资源组刚扩容后的流量分布情况,流量无法自动的分摊到我们新扩容后的节点上。那么这个时候需要我们手动去触发数据迁移,把部分副本迁移至新节点上才能实现流量均衡。
下面,我们来看一下第二张图。这张图我们可以看出流量分布非常不均衡,最低和最高流量偏差数倍以上。这和Kafka的架构特点有关,当集群规模与数据量达到一定量后,必然出现当问题。这种情况下,我们也需要进行负载均衡。
我们再来看看第三张图。这里我们可以看出出流量只有部分节点突增,这就是topic分区在集群内部不够分散,集中分布到了某几个导致,这种情况我们也需要进行扩容分区和均衡。
我们比较理想的流量分布应该如下图所示,各节点间流量偏差非常小,这种情况下,既可以增强集群扛住流量异常突增的能力又可以提升集群整体资源利用率和服务稳定性,降低成本。
负载均衡我们需要实现以下效果:
1)生成副本迁移计划以及执行迁移任务平台化、自动化、智能化;
2)执行均衡后间流量比较均匀,且单个topic分区均匀分布在所有节点上;
3)执行均衡后内部多块磁盘间流量比较均衡;
要实现这个效果,我们需要开发一套自己的负载均衡工具,如对开源的 进行二次开发;此工具的核心主要在生成迁移计划的策略,迁移计划的生成方案直接影响到最后集群负载均衡的效果。参考内容:
/- to Kafka REST API
架构图如下:
在生成迁移计划时,我们需要考虑以下几点:
1)选择核心指标作为生成迁移计划的依据,比如出流量、入流量、机架、单topic分区分散性等;
2)优化用来生成迁移计划的指标样本,比如过滤流量突增/突降/掉零等异常样本;
3)各资源组的迁移计划需要使用的样本全部为资源组内部样本,不涉及其他资源组,无交叉;
4)治理单分区过大topic,让topic分区分布更分散,流量不集中在部分,让topic单分区数据量更小,这样可以减少迁移的数据量kafka java 今天来点猛的!Kafka万亿级消息实战,提升迁移速度;
5)已经均匀分散在资源组内的topic,加入迁移黑名单,不做迁移,这样可以减少迁移的数据量,提升迁移速度;
6)做topic治理,排除长期无流量topic对均衡的干扰;
7)新建topic或者topic分区扩容时,应让所有分区轮询分布在所有节点,轮询后余数分区优先分布流量较低的;
8)扩容节点后开启负载均衡时,优先把同一分配了同一大流量(流量大而不是存储空间大,这里可以认为是每秒的吞吐量)topic多个分区的,迁移一部分到新节点;
9)提交迁移任务时,同一批迁移计划中的分区数据大小偏差应该尽可能小,这样可以避免迁移任务中小分区迁移完成后长时间等待大分区的迁移,造成任务倾斜;
1.9 安全认证
是不是我们的集群所有人都可以随意访问呢?当然不是,为了集群的安全,我们需要进行权限认证kafka java,屏蔽非法操作。主要包括以下几个方面需要做安全认证:
(1)生产者权限认证;
(2)消费者权限认证;
(3)指定数据目录迁移安全认证;
官网地址:
1.10 集群容灾
跨机架容灾:
官网地址:
跨集群/机房容灾:如果有异地双活等业务场景时,可以参考.7版本的 2.0。
1. 本站所有资源来源于用户上传和网络,如有侵权请联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系站长处理!
6. 本站不售卖代码,资源标价只是站长收集整理的辛苦费!如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。
7. 站长QQ号码 2205675299
资源库 - 资源分享下载网 » kafka java 今天来点猛的!Kafka万亿级消息实战
常见问题FAQ
- 关于资源售价和售后服务的说明?
- 代码有没有售后服务和技术支持?
- 有没有搭建服务?
- 链接地址失效了怎么办?
- 关于解压密码