kafka集群存储原理poll实现解析


为何使用MQ

• 异步通信(调用解耦、故障隔离)

• 流量的削峰(防止流量压垮)

• 消息的持久化(可重试,可重复消费)


基本概念

Broker:即Kafka服务部署的一个节点,多个broker节点可以构成一个Kafka集群。

消息:消息是Kafka中最基本的消息单元。由一串字节组成,主要由key和value构成,key和value都是字节数组。key的主要作用是根据一定的策略,将这个消息路由到指定的分区中,这样就可以保证包含同一个key的消息全部写入同一个分区

Topic:Topic是用于存储消息的逻辑概念。每个Topic可以有多个生产者向其中push消息,也可以有多个消费者向其中pull消息。

分区(partition):每一个Topic都可以划分成多个分区(每一个Topic都至少有一个分区),不同的分区会分配在不同的Broker上以对Kafka进行水平扩展从而增加Kafka的并行处理能力。同一个Topic下的不同分区包含的消息是不同的。

Log:分区在逻辑上对应着一个Log,当生产者将消息写入分区的时候,实际上就是写入到了一个Log中。Log是一个逻辑概念,对应的是一个磁盘上的文件夹。Log由多个Segment组成,每一个Segment又对应着一个日志文件和一个索引文件。

副本:Kafka对消息进行了冗余备份,每一个分区都可以有多个副本,每一个副本中包含的消息是相同的(但不保证同一时刻下完全相同)。副本的类型分为Leader和Follower,当分区只有一个副本的时候,该副本属于Leader,没有 Follower。Kafka的副本具有一定的同步机制,在每个副本集合中,都会选举出一个副本作为Leader副本,Kafka在不同的场景中会采用不同的选举策略。Kafka中所有的读写请求都由选举出的Leader副本处理,其他的都作为Follower副本,Follower副本仅仅是从Leader副本中把数据拉取到本地之后,同步更新到自己的Log中。

生产者:生产者主要是生产消息,并将消息按照一定的规则推送到Topic的分区中

消费者:消费者主要是从Topic中拉取消息,并对消息进行消费。

ISR集合:ISR集合表示的是目前可用(alive)且消息量与Leader相差不多的副本集合,即整个副本集合的子集。ISR集合中副本所在的节点都与ZK保持着连接,此外,副本的最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超出指定的阈值。每一个分区的Leader副本都维护此分区的ISR集合。


kafka存储原理

kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。划分partition可以使得多个consumer并行消费。

image.png

Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

• offset

• MessageSize

• data


Kafka的三层模型

image.png

• 第一层:主题Topic。主题是承载消息的逻辑容器,物理上通过多个分区来实现。

• 第二层:分区Partition。一个主题的消息按规则分散(比如轮询、哈希)存储在多个分区,单个分区内的消息是有序的,分区间的消息没有顺序关系。分区还分为leader和follower,leader才对外提供服务(producer写入、consumer消费)并记录消息位移offset,follower用于灾备。消费者以一个组(consumer group)的方式消费多个分区的数据,分配每个消费者消费哪些分区leader的过程称为rebalance,每个消费者自行记录单个分区的消费位移(consumer offset)。

• 第三层:消息record。存储在分区内的最小单元信息。


kafka的poll()方法解析

kafka的消息拉取入口,会从上次消费的位置拉取消息,同时也可以手动指定消费位置。入参为阻塞时长,如果有消息将会立即返回,否则会阻塞到超时,如果没有数据则返回空的数据集合。

首先通过acquireAndEnsureOpen()确保本对象是单线程进入,之后检查是否订阅了topic,在主循环中通过pollForFetches()拉取消息。先检查是否经存在拉取过的未加工消息,如果没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。如果上一步拉取到消息,并不会立即返回,而是再一次触发消息拉取,并且使用的是非阻塞方式,调用client.pollNoWakeup()。这样做的目的是,提前网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样做到了并行处理,提高了效率。

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        // poll for new data until the timeout expires
        do {
            client.maybeTriggerWakeup();

            if (includeMetadataInTimeout) {
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                    log.warn("Still waiting for metadata");
                }
            }

            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.pollNoWakeup();
                }

                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}


评论区
Rick ©2018