kafka书籍重读
Kafka客户端kafka 客户端设计有2个方面,应用层的编程模型和API,也就是主线程涉及到的事情;
kafka-producer;主要是send dataproducerINterceptors,主要是发送和收到ack之后的一些列回调逻辑,丰富了客户端功能;序列化器,可以选择不同的序列化方式partitioner,自己控制数据在集群中的分区,可以精细化控制
[*]消息累加器,维护了多个 Deque队列,以parititionId 作为 Key; 比如我们有3个分区,那么就有三个对应的队列;
可以看到kafka高性能客户端,也是一个生产消费模型, 主线程发送数据到 Deque中,每个Deque的元素是一个ProducerBatch,
包含多个消息record;producerBatch是发送到网络端的最小元素, 对于batch大小的控制,也可以控制我们的一些吞吐量;
在producerBatch 的管理上,使用了 ByteBufferPool,复用mermoryPool,也可以提高内存管理效率;另外,应该也可以利用一些内存映射技术,做零拷贝,把数据自己映射到内核,进一步提高性能;
另外一个线程就是sender线程,专门处理网络相关的工作,是客户端的消费者; sender线程从消息pool里面不断的去拉去数据,发给集群。 sender线程维护了整个集群的meta信息,他会选择一台负载最低的Node,发送metaRequest来交换信息,node会和zookeeper通信,把controller,分区,主体的信息返回来; 那么 客户端sender 线程,是如何找出来最低负载的Node了。
sender线程维护了一个 inFlightRequest队列;对于每个分区,他使用pipeline的方式,发送request 给Node节点,并且为每个node维护,inFlightRequest; 这个类似于tcp的滑动窗口,先一股脑发送出去,然后等response,等待ack的request越多,认为负载最大;inflight机制,在高性能客户端中很常见,比如raft-pipeline,cassandra-driver;是一种很好的性能优化机制,又可以进行重试,只不过需要多浪费一点内存;是某种程度上的,空间换时间;
有了整个模型之后,就是客户端的一系列的参数配置,主要就是上述各个模块的参数,
比如Ack的配置;消息累计器Pool 的最大大小,sendBuffer等等;
kafka 消费者
[*]消费者与 partition对应关系, 一个parition 对应一个 消费着,消费者可以消费多个partition;
一个partition对应的消费者,可以是一个进程,也可以是一个线程,本质是一个consumer对象就行;kafka内部topic存储,每个消费者,与partition 对应的offset关系; 新的消费者第一次找不到上一次offset,就按照offset.rest. 的设置, latest,earliest;consumer,可以通过seek 方法来指定消费起始点,kafka提供两种方式,指定offset,或者是按照时间来指定;如果seek 的offset超过了现有的范围,那么就会用offset.rest 的配置
[*]提交offset 是kafka的一个难点,我们需要很清楚自己的代码的逻辑,以及异常处理的问题; kafka分为自动提交,和手动提交,
手动提交分为同步和异步; 自动提交是 poll 的时候会去判断,是不是可以提交(检查用户配置的提交interval);
poll 之后,在客户端会缓存这个offset,如果发生了异常,下次还是重offset去拉取,和服务端的commitedOffset无关;只有consumer退出才会去服务器端拉取offset;
对于手动提交,我们需要非常清楚知道我们的是不是允许重复消费,或者还是精准一次消费;其实,如果处理不好,手动还是自动,都可能出现丢失数据【未消费,已经提交】和重复消费的问题;rebalance的问题,partition 与consumer 关系发生重置, 比如从消费者a 到B,如果a 还没来得及提交,就会被b重复消费;所以应该避免rebalance的发生;rebalance的触发,比如新的消费者加入,或者离开,也或者心跳信息检测消费者失去响应,;rebalance的时候消费者停止,kafka提供 consumerRebalace listener,可以供用户处理相应的逻辑;
主题和分区
主题和分区的设定,通常需要我们对业务load进行一个估计,然后配置perf test 进行参数设置; 经验来看, partition 的增加会增加吞吐,知道到达一个机器的零界值,partition增加反而会影响性能,他不是一个线性的关系;kafka tool 提供了很多cmd 来管理主题和分区,比如rebalance,尽可能的让leader 的压力分散到集群中的各个节点, 所以leader的负载均衡分布式比较重要的;这对于运维来说是一个细活;
日志的存储
kafka 的日志存储室从设计模型和实现细节上都做了很多优化,来提高大数据量的吞吐量;
设计模型上,文件的顺序写,只能追加日志,不能删除修改过往数据, 使用 LogSegment 存储日志,辅佐以两个index文件,一个offset-index,一个是 timestamp-index,倒排存储 数据文件的 offset,来加快随机读写的性能;时间索引 的 offset,实际指向的是 offset-index;这是一个稀疏索引问见,为了使减少索引文件的大小; log 文件通常被设计成1G的大小,也是未来随机读取时候的性能考虑;消息存储上,序列化的格式经历过V0-V2,一方面是增加字段,丰富功能,另外一方面是使用更加优秀的序列化方式,目前使用的V2版本,字段增加了,但是使用了类似protobuf的可变存储技术,使得在大规模数据集的时候,存储大小还能降低,未来在设计序列化的场景时,可以去借鉴这样的方式;另外kafka的实现上,还利用了很多操作系统的优化, 比如大量利用 pagecache, 写入磁盘的数据,先到pagechache,在flush到磁盘,自己管理memory pool;写入数据直接directmemory 映射到内核内存,不经过用户态;设计到网络的发送的时候,也考虑使用令拷贝技术,直接 文件的内核 memeroy,映射到socket 的buffer,由网络设备发送,而不必经过用户态,这些依赖于操作系统的sendFile技术,对应java里面Nio的transferToChannel等API;
kafka 服务端
客户端与服务端通信有 N个协议,每个协议都有N个字段,基本上就是可以配置的项目都在协议中有所体现;服务端与客户端建立session 机制,在一个session中可以 省略很多重复信息的发送,类似于tcp 建立连接;是kafka优化的一个措施kafka 中大量依赖 定时任务;定时任务随着数据量的增加,可能达到100万级别,如果使用java 的 delayQueue,logN的插入消耗也会影响性能; kafka 使用了时间轮的设计,类似设计在netty 等中也可以找到,比如3个不同interval 的时间轮,每个 bucket 用列表管理相同的任务, 那么插入到delayqueue的 为bucket,数量大大减少,思路就是时间轮管理任务的插入,delayqueue推进 时间向前走,是不错的设计;server 端重度依赖zookeeper,zookeeper 作为哨兵的角色, 一开始竞争选举一个controller, controller 管理整个集群的meta data,负责各个partition 的leader /replica 的选举;controller 自身的选举是一个竞争式的,随选注册随先得, 而leader 选举由controller 控制, 原则,从 AR 选举一个 replica,并且replica在 ISR队列中, 并不是我们理解的直接冲ISR队列中获取;
kafka 客户端深入
parition assigner ,有 range 模式,有 roundRobin的,有sticky模式的,对于 一个consumer,消费多个 topics的时候,如何分配有不一样的效果,用户可以自己实现 assigner,甚至可以给一个 parition分配到多个consumer,但是无法保证消费的顺序性;range 和robin 都有自己的问题,sticky模式 比较复杂,但可以尽量保证均匀分布在consumer上,且 rebalancer的时候, 尽可能的保持原有的assignment,避免不必要的抖动;consumer, 也有一个leader,那么就是kafka中出现的第三个leader了。leader的作用,比如一个group中,不同的consumer配置了多个分区分配策略,那么需要leader去协调一个大家能接受的; 另外在 分区发送 rebalance,reassign的时候,也需要有 consumerLeader 进行协调,他与server端的 groupCoorindator进行交互;groupCoorindator 依赖与 consumer_offset, 大概50个分区,存了不同group,不同consumer,对不同topic消费的情况,也就是kafka里面,提交offset 的一些信息;consumer 消费 partition,被reassign 大概有2种情况; 内部独立的heartbeat线程,心跳超时;另外一个是poll.interval 超时,我们在现实应用中,最常遇见的就是poll.interval超时,因为我们在一个polll后,会处理任务,如果发送一些积压,就会超时;客户端的幂等性,主要是保证producer端的消息一次性;使用客户端的producer—id,加seq ,保证在一个session,一个分区里面,消息不会重复;而transactionId,则是保证的是跨session,跨分区的消息写入的 原子性;transaction的机制实现的很复杂,有一个transaction_state 主题,专门来保存每个 transaction的与各个parition的状态;另外,producer-id 是用transactionId 来回去的,保证了跨session 的同一个producer-Id,同时允许一个transactionId 复用到多个transaction,用producer-id+ epoch 来区分;transaction 机制起作用的前提是开启 幂等性;
页:
[1]