Kafka之消费与激情

副标题#e#

首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:

public class JConsumerSubscribe extends Thread {  

    public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /** 初始化Kafka集群信息. */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址  

        props.put("group.id", "ke");// 指定消费者组  

        props.put("enable.auto.commit", "true");// 开启自动提交  

        props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔  

        // 反序列化消息主键        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  

        // 反序列化消费记录        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  

        return props;  

    }    /** 实现一个单线程消费者. */    @Override    public void run() {        // 创建一个消费者实例对象        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());        // 订阅消费主题集合        consumer.subscribe(Arrays.asList("test_kafka_topic"));  

        // 实时消费标识        boolean flag = true;  

        while (flag) {  

            // 获取主题消息数据            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  

            for (ConsumerRecord<String, String> record : records)  

                // 循环打印消息记录                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  

        }        // 出现异常关闭消费者对象        consumer.close();  

    }}  

上述代码我们就可以非常便捷地拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下:

org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {  

        acquireAndEnsureOpen();        try {  

#p#副标题#e#

            if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");  

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {  

#p#副标题#e##p#分页标题#e#

                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");  

            }            // poll for new data until the timeout expires  

            long elapsedTime = 0L;  

            do {  

                client.maybeTriggerWakeup();                final long metadataEnd;                if (includeMetadataInTimeout) {  

                    final long metadataStart = time.milliseconds();                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {  

                        return ConsumerRecords.empty();  

                    }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd – metadataStart;                } else {  

                    while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {  

                        log.warn("Still waiting for metadata");  

                    }                    metadataEnd = time.milliseconds();                }                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                if (!records.isEmpty()) {  

                    // before returning the fetched records, we can send off the next round of fetches  

#p#副标题#e#

                    // and avoid block waiting for their responses to enable pipelining while the user  

                    // is handling the fetched records.  

                    //  

                    // NOTE: since the consumed position has already been updated, we must not allow  

                    // wakeups or any other errors to be triggered prior to returning the fetched records.  

#p#副标题#e##p#分页标题#e#

                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {  

                        client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords<>(records));  

                }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd – metadataEnd;            } while (elapsedTime < timeoutMs);  

            return ConsumerRecords.empty();  

        } finally {  

            release();        }    }  

上述代码中有个方法pollForFetches,它的实现逻辑如下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {  

        final long startMs = time.milliseconds();  

        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);  

        // if data is available already, return it immediately  

        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();  

        if (!records.isEmpty()) {  

            return records;  

        }  

        // send any new fetches (won't resend pending fetches)  

        fetcher.sendFetches();  

#p#副标题#e#

        // We do not want to be stuck blocking in poll if we are missing some positions  

        // since the offset lookup may be backing off after a failure  

        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call  

        // updateAssignmentMetadataIfNeeded before this method.  

        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {  

            pollTimeout = retryBackoffMs;  

        }  

        client.poll(pollTimeout, startMs, () -> {  

            // since a fetch might be completed by the background thread, we need this poll condition  

            // to ensure that we do not block unnecessarily in poll()  

            return !fetcher.hasCompletedFetches();  

        });  

#p#副标题#e##p#分页标题#e#

        // after the long poll, we should check whether the group needs to rebalance  

        // prior to returning data so that the group can stabilize faster  

        if (coordinator.rejoinNeededOrPending()) {  

            return Collections.emptyMap();  

        }  

        return fetcher.fetchedRecords();  

    }  

上述代码中加粗的位置,我们可以看出每次消费者客户端拉取数据时,通过poll方法,先调用fetcher中的fetchedRecords函数,如果获取不到数据,就会发起一个新的sendFetches请求。而在消费数据的时候,每个批次从Kafka Broker Server中拉取数据是有最大数据量限制,默认是500条,由属性(max.poll.records)控制,可以在客户端中设置该属性值来调整我们消费时每次拉取数据的量。

提示:这里需要注意的是,max.poll.records返回的是一个poll请求的数据总和,与多少个分区无关。因此,每次消费从所有分区中拉取Topic的数据的总条数不会超过max.poll.records所设置的值。

而在Fetcher的类中,在sendFetches方法中有限制拉取数据容量的限制,由属性(max.partition.fetch.bytes),默认1MB。可能会有这样一个场景,当满足max.partition.fetch.bytes限制条件,如果需要Fetch出10000条记录,每次默认500条,那么我们需要执行20次才能将这一次通过网络发起的请求全部Fetch完毕。

这里,可能有同学有疑问,我们不能将默认的max.poll.records属性值调到10000吗?可以调,但是还有个属性需要一起配合才可以,这个就是每次poll的超时时间(Duration.ofMillis(100)),这里需要根据你的实际每条数据的容量大小来确定设置超时时间,如果你将最大值调到10000,当你每条记录的容量很大时,超时时间还是100ms,那么可能拉取的数据少于10000条。

相关文章

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注