新西兰服务器

Kafka集群优化的方法是什么


Kafka集群优化的方法是什么

发布时间:2022-01-11 16:16:02 来源:高防服务器网 阅读:76 作者:iii 栏目:网络管理

这篇文章主要介绍了Kafka集群优化的方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Kafka集群优化的方法是什么文章都会有所收获,下面我们一起来看看吧。

背景

个推作为专业的数据智能服务商,已经成功服务了数十万APP,每日的消息下发量达百亿级别,由此产生了海量日志数据。为了应对业务上的各种需求,我们需要采集并集中化日志进行计算,为此个推选用了高可用的、高可靠的、分布式的Flume系统以对海量日志进行采集、聚合和传输。此外,个推也不断对Flume进行迭代升级,以实现自己对日志的特定需求。

原有的异地机房日志汇聚方式,整个流程相对来说比较简单,A机房业务产生的日志通过多种方式写入该机房Kafka集群,然后B机房的Flume通过网络专线实时消费A机房Kafka的日志数据后写入本机房的Kafka集群,所有机房的数据就是通过相同方式在B机房Kakfa集群中集中化管理。如图一所示:

图一:原有异地日志传输模式

但是随着业务量的不断增加,日志数据在逐渐增多的过程中对带宽要求变高,带宽的瓶颈问题日益凸显。按照1G的专线带宽成本2~3w/月来计算,一个异地机房一年仅专线带宽扩容成本就高达30w以上。对此,如何找到一种成本更加低廉且符合当前业务预期的传输方案呢?Avro有快速压缩的二进制数据形式,并能有效节约数据存储空间和网络传输带宽,从而成为优选方案。

优化思路

Avro简介

Avro是一个数据序列化系统。它是Hadoop的一个子项目,也是Apache的一个独立的项目,其主要特点如下:

● 丰富的数据结构;

● 可压缩、快速的二进制数据类型;

● 可持久化存储的文件类型;

● 远程过程调用(RPC);

● 提供的机制使动态语言可以方便地处理数据。

具体可参考官方网站:http://avro.apache.org/

Flume Avro方案

Flume的RPC Source是Avro Source,它被设计为高扩展的RPC服务端,能从其他Flume Agent 的Avro Sink或者Flume SDK客户端,接收数据到Flume Agent中,具体流程如图二所示:

图二:Avro Source流程

针对该模式,我们的日志传输方案计划变更为A机房部署Avro Sink用以消费该机房Kafka集群的日志数据,压缩后发送到B机房的Avro Source,然后解压写入B机房的Kafka集群,具体的传输模式如图三所示:

图三:Flume Avro传输模式

可能存在的问题

我们预估可能存在的问题主要有以下三点:

● 当专线故障的时候,数据是否能保证完整性;

● 该模式下CPU和内存等硬件的消耗评估;

● 传输性能问题。

验证情况

针对以上的几个问题,我们做了几项对比实验。

环境准备情况说明:

1. 两台服务器192.168.10.81和192.168.10.82,以及每台服务器上对应一个Kakfa集群,模拟A机房和B机房;

2.   两个Kafka集群中对应topicA(源端)和topicB(目标端)。在topicA中写入合计大小11G的日志数据用来模拟原始端日志数据。

3.   192.168.10.82上部署一个Flume,模拟原有传输方式。

4.  192.168.10.81服务器部署Avro Sink,192.168.10.82部署Avro Source,模拟Flume Avro传输模式。

原有Flume模式验证(非Avro)

 监控Kafka消费情况:

 81流量统计:

  82流量统计:

消费全部消息耗时:20min

消费总日志条数统计:129,748,260

总流量:13.5G

Avro模式验证

 配置说明:

Avro Sink配置:

#kafkasink 是kafkatokafka的sinks的名字,可配多个,空格分开kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSourcekafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect =192.168.10.81:2181kafkatokafka.sources.kafka_dmc_bullet.topic = topicAkafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms =150000kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms =10000kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavrokafkatokafka.sources.kafka_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet的配置,可配置多个sink提高压缩传输效率kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSinkkafkatokafka.sinks.kafkasink_dmc_bullet.hostname =192.168.10.82kafkatokafka.sinks.kafkasink_dmc_bullet.port =55555//与source的rpc端口一一对应kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate//压缩模式kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level =6//压缩率1~9kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet配的channel,只配一个kafkatokafka.channels.channel_dmc_bullet.type = memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000#kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000#kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =5000kafkatokafka.channels.channel_dmc_bullet.keep-alive =60

Avro Source配置:

#kafkasink 是kafkatokafka的sinks的名字,可配多个,空格分开kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type= avrokafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.bind =0.0.0.0kafkatokafka.sources.kafka_dmc_bullet.port =55555//rpc端口绑定kafkatokafka.sources.kafka_dmc_bullet.compression-type= deflate//压缩模式kafkatokafka.sources.kafka_dmc_bullet.batchSize =100#source kafkasink_dmc_bullet的配置kafkatokafka.sinks.kafkasink_dmc_bullet.type= org.apache.flume.sink.kafka.KafkaSinkkafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitionerkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicBkafkatokafka.sinks.kafkasink_dmc_bullet.brokerList =192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =500kafkatokafka.channels.channel_dmc_bullet.type= memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =1000

监控Kafka消费情况

81流量统计:

82流量统计:

消费全部消息耗时:26min

消费总日志条数统计:129,748,260

总流量:1.69G

故障模拟

1.   模拟专线故障,在A、B两机房不通的情况下,Avro Sink报错如下:

2. 监控Kafka消费情况,发现消费者已停止消费:

3. 故障处理恢复后继续消费剩余日志,经统计,总日志条数为:129,747,255。

结论

1.   当专线发生故障时,正在网络传输中的通道外数据可能会有少部分丢失,其丢失原因为网络原因,与Avro模式无关;故障后停止消费的数据不会有任何的丢失问题,由于网络原因丢失的数据需要评估其重要性以及是否需要补传。

2.   流量压缩率达80%以上,同时我们也测试了等级为1~9的压缩率,6跟9非常接近,CPU和内存的使用率与原有传输模式相差不大,带宽的优化效果比较明显。

3.   传输性能由于压缩的原因适当变弱,单Sink由原先20分钟延长至26分钟,可适当增加Sink的个数来提高传输速率。

生产环境实施结果

实施结果如下:

1.   由于还有其它业务的带宽占用,总带宽使用率节省了50%以上,现阶段高峰期带宽速率不超过400Mbps;

2.   每个Sink传输速率的极限大概是3000条每秒,压缩传输速率问题通过增加Sink的方式解决,但会适当增加CPU和内存的损耗。

关于“Kafka集群优化的方法是什么”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Kafka集群优化的方法是什么”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注高防服务器网行业资讯频道。

[微信提示:高防服务器能助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。

[图文来源于网络,不代表本站立场,如有侵权,请联系高防服务器网删除]
[