作者
cxuan
责编
刘静
Kafka系列的阶段性总结(万字长文,做好准备,建议先收藏再看)
初识Kafka
什么是Kafka
Kafka是由Linkedin公司开发的,它是一个分布式的,支持多分区、多副本,基于Zookeeper的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
Kafka的基本术语
消息:Kafka中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率,消息会分批次写入Kafka,批次就代指的是一组消息。
主题:消息的种类称为主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现kafka的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序
生产者:向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。
消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(ConsumerGroup)指的就是由一个或多个消费者组成的群体。
偏移量:偏移量(ConsumerOffset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
broker:一个独立的Kafka服务器就被称为broker,broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker集群:broker是集群的组成部分,broker集群由一个或多个broker组成,每个集群都有一个broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka中消息的备份又叫做副本(Replica),副本的数量是可以配置的,Kafka定义了两类副本:领导者副本(LeaderReplica)和追随者副本(FollowerReplica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
Kafka的特性(设计原则)
高吞吐、低延迟:kakfa最大的特点就是收发消息非常快,kafka每秒可以处理几十万条消息,它的最低延迟只有几毫秒;高伸缩性:每个主题(topic)包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中;持久性、可靠性:Kafka能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka底层的数据存储是基于Zookeeper存储的,Zookeeper我们知道它的数据能够持久存储;容错性:允许集群中的节点失败,某个节点宕机,Kafka集群能够正常工作;高并发:支持数千个客户端同时读写。Kafka的使用场景
活动跟踪:Kafka可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到Kafka,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给Kafka,这样就可以生成报告,可以做智能推荐,购买喜好等;传递消息:Kafka另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的;度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;日志记录:Kafka的基本概念来源于提交日志,比如我们可以把数据库的更新发送到Kafka上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等;流式处理:流式处理是有一个能够提供多种应用程序的领域;限流削峰:Kafka多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka中,避免直接请求后端程序导致服务崩溃。Kafka的消息队列
Kafka的消息队列一般分为两种模式:点对点模式和发布订阅模式
Kafka是支持消费者群组的,也就是说Kafka中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式
点对点模式的消息队列
如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列
发布-订阅模式的消息队列
Kafka系统架构
如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干ConsumerGroup,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在ConsumerGroup发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
核心API
Kafka有四个核心API,它们分别是
ProducerAPI,它允许应用程序向一个或多个topics上发送消息记录;ConsumerAPI,允许应用程序订阅一个或多个topics并处理为其生成的记录流;StreamsAPI,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流;ConnectorAPI,它允许构建和运行将Kafka主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改。
Kafka为何如此之快
Kafka实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka可以将数据记录分批发送,从生产者到文件系统(Kafka主题日志)到消费者,可以端到端的查看这些批次的数据。
批处理能够进行更有效的数据压缩并减少I/O延迟,Kafka采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,更多关于磁盘寻址的了解,请参阅程序员需要了解的硬核知识之磁盘。
总结一下其实就是四个要点
顺序读写;零拷贝;消息压缩;分批发送。
Kafka安装和重要配置
Kafka安装我在Kafka系列第一篇应该比较详细了,详情见带你涨姿势的认识一下kafka这篇文章。
那我们还是主要来说一下Kafka中的重要参数配置吧,这些参数对Kafka来说是非常重要的。
broker端配置
broker.id每个kafkabroker都有一个唯一的标识来表示,这个唯一的标识符即是broker.id,它的默认值是0。这个值在kafka集群中必须是唯一的,这个值可以任意设定,
port如果使用配置样本来启动kafka,它会监听端口。修改port配置参数可以把它设置成任意的端口。要注意,如果使用以下的端口,需要使用root权限启动kakfa。
zookeeper.connect用于保存broker元数据的Zookeeper地址是通过zookeeper.connect来指定的。比如我可以这么指定localhost:表示这个Zookeeper是运行在本地端口上的。我们也可以通过比如我们可以通过zk1:,zk2:,zk3:来指定zookeeper.connect的多个参数值。该配置参数是用冒号分割的一组hostname:port/path列表,其含义如下
hostname是Zookeeper服务器的机器名或者ip地址。
port是Zookeeper客户端的端口号
/path是可选择的Zookeeper路径,Kafka路径是使用了chroot环境,如果不指定默认使用跟路径。
如果你有两套Kafka集群,假设分别叫它们kafka1和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:,zk2:,zk3:/kafka1和zk1:,zk2:,zk3:/kafka2
log.dirsKafka把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过log.dirs来制定的,它是用一组逗号来分割的本地系统路径,log.dirs是没有默认值的,你必须手动指定他的默认值。其实还有一个参数是log.dir,如你所知,这个配置是没有s的,默认情况下只用配置log.dirs就好了,比如你可以通过/home/kafka1,/home/kafka2,/home/kafka3这样来配置这个参数的值。
num.recovery.threads.per.data.dir对于如下3种情况,Kafka会使用可配置的线程池来处理日志片段。
服务器正常启动,用于打开每个分区的日志片段;
服务器崩溃后重启,用于检查和截断每个分区的日志片段;
服务器正常关闭,用于关闭日志片段。
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。也就是说,如果num.recovery.threads.per.data.dir被设为8,并且log.dir指定了3个路径,那么总共需要24个线程。
auto.create.topics.enable默认情况下,kafka会使用三种方式来自动创建主题,下面是三种情况:
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户端向主题发送元数据请求时
auto.create.topics.enable参数我建议最好设置成false,即不允许自动创建Topic。在我们的线上环境里面有很多名字稀奇古怪的Topic,我想大概都是因为该参数被设置成了true的缘故。
主题默认配置
Kafka为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数
num.partitionsnum.partitions参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。
default.replication.factor这个参数比较简单,它表示kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor的默认值为1,这个参数在你启用了主题自动创建功能后有效。
log.retention.msKafka通常根据时间来决定数据可以保留多久。默认使用log.retention.hours参数来配置时间,默认是个小时,也就是一周。除此之外,还有两个参数log.retention.minutes和log.retentiion.ms。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用log.retention.ms。
log.retention.bytes另一种保留消息的方式是判断消息是否过期。它的值通过参数log.retention.bytes来指定,作用在每一个分区上。也就是说,如果有一个包含8个分区的主题,并且log.retention.bytes被设置为1GB,那么这个主题最多可以保留8GB数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。
log.segment.bytes上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达broker时,它们被追加到分区的当前日志片段上,当日志片段大小到达log.segment.bytes指定上限(默认为1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。
log.segment.ms上面提到日志片段经关闭后需等待过期,那么log.segment.ms这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms和log.retention.bytes也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。
message.max.bytesbroker通过设置message.max.bytes参数来限制单个消息的大小,默认是,也就是1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于mesage.max.bytes,那么消息的实际大小可以大于这个值
这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响IO吞吐量。
retention.ms规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,一旦设置了这个值,它会覆盖掉Broker端的全局参数值。
retention.bytesretention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。
JVM参数配置
JDK版本一般推荐直接使用JDK1.8,这个版本也是现在中国大部分程序员的首选版本。
说到JVM端设置,就绕不开堆这个话题,业界最推崇的一种设置方式就是直接将JVM堆大小设置为6GB,这样会避免很多Bug出现。
JVM端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的GC设置。如果你依然在使用Java7,那么可以根据以下法则选择合适的垃圾回收器:
如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。当然了,如果你已经在使用Java8了,那么就用默认的G1收集器就好了。在没有任何调优的情况下,G1表现得要比CMS出色,主要体现在更少的FullGC,需要调整的参数更少等,所以使用G1就好了。
一般G1的调整只需要这两个参数即可
MaxGCPauseMillis该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1可以根据需要使用更长的时间。它的默认值是ms,也就是说,每一轮垃圾回收大概需要ms的时间;
InitiatingHeapOccupancyPercent该参数指定了G1启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45,这就表明G1在堆使用率到达45之前不会启用垃圾回收。这个百分比包括新生代和老年代。
KafkaProducer
在Kafka中,我们把产生消息的那一方称为生产者,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到Kafka后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给Kafka后台,然后淘宝会根据你的爱好做智能推荐,致使你的钱包从来都禁不住诱惑,那么这些生产者产生的消息是怎么传到Kafka应用程序的呢?发送过程是怎么样的呢?
尽管消息的产生非常简单,但是消息的发送过程还是比较复杂的,如图
我们从创建一个ProducerRecord对象开始,ProducerRecord是Kafka中的一个核心类,它代表了一组Kafka需要发送的key/value键值对,它由记录要发送到的主题名称(TopicName),可选的分区号(PartitionNumber)以及可选的键值对构成。
在发送ProducerRecord时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输。然后消息到达了分区器。
如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key的hash函数映射指定一个分区。如果发送的过程中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题和分区发送数据了。
ProducerRecord还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka最终使用的时间戳取决于topic主题配置的时间戳类型。
如果将主题配置为使用CreateTime,则生产者记录中的时间戳将由broker使用。如果将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由broker重写。然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到KafkaBroker上。
KafkaBroker在收到消息时会返回一个响应,如果写入成功,会返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。
创建Kafka生产者
要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性
bootstrap.servers该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker信息。不过建议至少要提供两个broker信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
key.serializerbroker需要接收到序列化之后的key/value值,所以生产者发送的消息需要经过序列化之后才传递给KafkaBroker。生产者需要知道采用何种方式把Java对象转换为字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.