# kafka consumer

# 学习目标

如何给consumer分配对应的partition来消费

rebalance

offset commit管理

# kafka consumer使用

boolean stop = false;
Map<String, Object> consumerProperties = new HashMap<>();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
kafkaConsumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Partitions revoked " + partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Partitions assigned " + partitions);
    }
});
while (!stop) {
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    System.out.println("Polled records " + consumerRecords);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# KafkaConsumer实现原理

ConsumerCoordinator Fetcher ConsumerNetworkClient ConsumerMetadata SubscriptionState ConsumerPartitionAssignor OffsetResetStrategy IsolationLevel

创建KafkaConsumer实例

subscribe订阅

发送metadata请求 发送FindCoordinator请求

和coordinator建立连接

向coordinator 发送join group请求 join group返回结果。generation

assignment strategy RangeCoordinator

send leader SyncGroup 请求

assignment assigned partitions

fetching committed offsets

向partition发送请求

发送OFFSET_FOR_LEADER_EPOCH请求

发送READ_UNCOMMITTED FullFetchRequest

构建incremental fetch

接收Fetch Response

发送OFFSET_COMMIT请求

partition分配:

offset 自动提交: 默认情况(enable.auto.commit)下consumer拉取完消息后,会定时(auto.commit.interval.ms默认5秒)发送offset到broker。

第一次拉取消息前,consumer需要向coordinator发送一个OFFSET_FOR_LEADER_EPOCH请求 offset本地保存

TopicPartitionState

线程模型: 默认情况下KafkaConsumer只有一个单线程,负责io交互,一般情况下建议拉取完消息后,把消息交给另外一个线程池处理,提高系统的消息消费能力。

# KafkaConsumer构建

TODO 补充时序图

# subscribe方法

subscribe方法用于配置要订阅的topic,不是增加修改而是全量配置,会对之前的subscribe订阅topics进行覆盖。

如果传入的topics集合为空,则要取消注册的topics。

否则将topics和listener设置到subscriptions对象属性中

然后调用Metadata.requestUpdateForNewTopics。

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    acquireAndEnsureOpen();
    try {
        maybeThrowInvalidGroupIdException();
        if (topics == null)
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
        if (topics.isEmpty()) {
            // treat subscribing to empty topic list as the same as unsubscribing
            this.unsubscribe();
        } else {
            for (String topic : topics) {
                if (Utils.isBlank(topic))
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
            }

            throwIfNoAssignorsConfigured();
            // 清理不再最新的subscribe的topics中的Fetch的缓存数据
            fetcher.clearBufferedDataForUnassignedTopics(topics);
            log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
            // 调用subscriptions.subscribe,会更新subscriptions中的数据,包含topics和listener
            if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                // 调用metadata.requestUpdateForNewTopics,更新metadata中的needPartialUpdate=true, lastRefreshMs修改为0,requestVersion++,好让后台线程Sender去刷新metadata获取最新的topics的metadata信息
                metadata.requestUpdateForNewTopics();
        }
    } finally {
        release();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# KafkaConsumer poll拉取消息数据

public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}
1
2
3

includeMetadataInTimeout参数的含义是: poll的超时时间包含了metadata的获取时间

调用poll(Duration timeout)时updateAssignmentMetadataIfNeeded参数设置的是true。

会走到updateAssignmentMetadataIfNeeded方法,获取metadata

获取完metadata后,调用pollForFetches方法通过Fetcher向broker拉取消息记录。

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        do {
            client.maybeTriggerWakeup();

            if (includeMetadataInTimeout) {
                // try to update assignment metadata BUT do not need to block on the timer for join group
                updateAssignmentMetadataIfNeeded(timer, false);
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                    log.warn("Still waiting for metadata");
                }
            }

            final Fetch<K, V> fetch = pollForFetches(timer);
            if (!fetch.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }

                if (fetch.records().isEmpty()) {
                    log.trace("Returning empty records from `poll()` "
                            + "since the consumer's position has advanced for at least one topic partition");
                }

                return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        release();
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# updateAssignmentMetadataIfNeeded

Consumer请求metadata信息,调用ConsumerCoordinator的poll方法,获取ConsumerCoordinator。

coordinator获取成功后,调用updateFetchPosition向coordinator查询TopicPartition的offset,保存offset信息到本地subscriptions状态中

boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }

    // 调用updateFetchPositions从coordinator查询当前要消费的partition的offset
    return updateFetchPositions(timer);
}
1
2
3
4
5
6
7
8

# ConsumerCoordinator.poll获取coordinator

public boolean poll(Timer timer, boolean waitForJoinGroup) {
    maybeUpdateSubscriptionMetadata();

    invokeCompletedOffsetCommitCallbacks();

    // subscriptionType是AUTO_TOPICS或AUTO_PATTERN时hasAutoAssignedPartitions是true
    if (subscriptions.hasAutoAssignedPartitions()) {
        if (protocol == null) {
            throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
                " to empty while trying to subscribe for group protocol to auto assign partitions");
        }
        // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
        // group proactively due to application inactivity even if (say) the coordinator cannot be found.
        pollHeartbeat(timer.currentTimeMs());
        // coordinatorUnknownAndUnready方法中,如果发现当前没有Coordinator,会向broker发送一个FindCoordinator请求,查找当前的consumer group的coordinator节点。
        if (coordinatorUnknownAndUnready(timer)) {
            return false;
        }

        // coordinator准备好后,会调用rejoinNeededOrPending判断是否要rejoin,consumer第一次调用时需要rejoin,因为AbstractCoordinator中的rejoinNeeded初始值是true
        if (rejoinNeededOrPending()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription()) {
                // For consumer group that uses pattern-based subscription, after a topic is created,
                // any consumer that discovers the topic after metadata refresh can trigger rebalance
                // across the entire consumer group. Multiple rebalances can be triggered after one topic
                // creation if consumers refresh metadata at vastly different times. We can significantly
                // reduce the number of rebalances caused by single topic creation by asking consumer to
                // refresh metadata before re-joining the group as long as the refresh backoff time has
                // passed.
                if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
                    this.metadata.requestUpdate();
                }

                if (!client.ensureFreshMetadata(timer)) {
                    return false;
                }

                maybeUpdateSubscriptionMetadata();
            }

            // if not wait for join group, we would just use a timer of 0
            if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
                // since we may use a different timer in the callee, we'd still need
                // to update the original timer's current time after the call
                timer.update(time.milliseconds());

                return false;
            }
        }
    } else {
        // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata.
        // If connections to all nodes fail, wakeups triggered while attempting to send fetch
        // requests result in polls returning immediately, causing a tight loop of polls. Without
        // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
        // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
        if (coordinatorUnknownAndUnready(timer)) {
            return false;
        }
    }

    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

# AbstractCoordinator的ensureActiveGroup方法

# 开启向发送coordinator心跳的线程
boolean ensureActiveGroup(final Timer timer) {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    // 确保coordinator目前已经ready,因为要给coordinator发送心跳,
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }

    // 开启向coordinator发送心跳的线程
    startHeartbeatThreadIfNeeded();
    // 向coordinator发送joinGroup请求加入到consumer group中
    return joinGroupIfNeeded(timer);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
# joinGroup

joinGroup方法将当前的consumer加入到对应的Topic的consumer group中。

boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
        // 检查下coordinator的状态
        if (!ensureCoordinatorReady(timer)) {
            return false;
        }

        // needsJoinPrepare初始值为true
        // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
        // time if the client is woken up before a pending rebalance completes. This must be called
        // on each iteration of the loop because an event requiring a rebalance (such as a metadata
        // refresh which changes the matched subscription set) can occur while another rebalance is
        // still in progress.
        if (needsJoinPrepare) {
            // need to set the flag before calling onJoinPrepare since the user callback may throw
            // exception, in which case upon retry we should not retry onJoinPrepare either.
            needsJoinPrepare = false;
            // return false when onJoinPrepare is waiting for committing offset
            // 调用onJoinPrepare方法,ConsumerCoordinator会调用maybeAutoCommitOffsetsAsync提交offset,这时给consumer 因为超时等原因rejoin重新加入的时候使用的
            if (!onJoinPrepare(generation.generationId, generation.memberId)) {
                needsJoinPrepare = true;
                //should not initiateJoinGroup if needsJoinPrepare still is true
                return false;
            }
        }

        // 发送join group请求,请求参数包含topics和consumer group名称
        final RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future, timer);
        if (!future.isDone()) {
            // we ran out of time
            return false;
        }

        if (future.succeeded()) {
            Generation generationSnapshot;
            MemberState stateSnapshot;

            // Generation data maybe concurrently cleared by Heartbeat thread.
            // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
            // and shouldn't block heartbeat thread.
            // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
            // 从join group返回结果中获取到generation, 保存generationSnapshot
            synchronized (AbstractCoordinator.this) {
                generationSnapshot = this.generation;
                stateSnapshot = this.state;
            }

            if (!hasGenerationReset(generationSnapshot) && stateSnapshot == MemberState.STABLE) {
                // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                ByteBuffer memberAssignment = future.value().duplicate();

                // 调用onJoinComplete
                onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);

                // Generally speaking we should always resetJoinGroupFuture once the future is done, but here
                // we can only reset the join group future after the completion callback returns. This ensures
                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
                resetJoinGroupFuture();
                needsJoinPrepare = true;
            } else {
                final String reason = String.format("rebalance failed since the generation/state was " +
                        "modified by heartbeat thread to %s/%s before the rebalance callback triggered",
                        generationSnapshot, stateSnapshot);

                resetStateAndRejoin(reason, true);
                resetJoinGroupFuture();
            }
        } else {
            final RuntimeException exception = future.exception();

            resetJoinGroupFuture();
            synchronized (AbstractCoordinator.this) {
                rejoinReason = String.format("rebalance failed due to '%s' (%s)", exception.getMessage(), exception.getClass().getSimpleName());
                rejoinNeeded = true;
            }

            if (exception instanceof UnknownMemberIdException ||
                exception instanceof IllegalGenerationException ||
                exception instanceof RebalanceInProgressException ||
                exception instanceof MemberIdRequiredException)
                continue;
            else if (!future.isRetriable())
                throw exception;

            timer.sleep(rebalanceConfig.retryBackoffMs);
        }
    }
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# partition assign

joinGroup返回结果中,会返回当前的consumer是否是这个group的leader,leader负责对partition的消费进行分配,默认的分配方式是RangeAssigner,按照范围分配。 分配完成后,会调用SyncGroup发送给coordinator,SyncGroup的结果会返回属于当前consumer的partition(assignment),然后保存到本地,准备后面fetch拉取消息。

由consumer leader进行分配是为了提高可扩展性,开发者可以实现自定义的Assigner控制分配分配逻辑。

# consumer group leader

leader

private RequestFuture<ByteBuffer> onLeaderElected(JoinGroupResponse joinResponse) {
    try {
        // perform the leader synchronization and send back the assignment for the group
        Map<String, ByteBuffer> groupAssignment = onLeaderElected(
            joinResponse.data().leader(),
            joinResponse.data().protocolName(),
            joinResponse.data().members(),
            joinResponse.data().skipAssignment()
        );

        List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
        for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
            groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
                    .setMemberId(assignment.getKey())
                    .setAssignment(Utils.toArray(assignment.getValue()))
            );
        }

        SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                        new SyncGroupRequestData()
                                .setGroupId(rebalanceConfig.groupId)
                                .setMemberId(generation.memberId)
                                .setProtocolType(protocolType())
                                .setProtocolName(generation.protocolName)
                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                .setGenerationId(generation.generationId)
                                .setAssignments(groupAssignmentList)
                );
        log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# maybeAutoCommitOffsetsAsync

maybeAutoCommitOffsetsAsync方法负责异步提交本地offset

public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        nextAutoCommitTimer.update(now);
        if (nextAutoCommitTimer.isExpired()) {
            nextAutoCommitTimer.reset(autoCommitIntervalMs);
            autoCommitOffsetsAsync();
        }
    }
}
1
2
3
4
5
6
7
8
9

# 拉取消息记录

获取到coordinator和TopicPartition的offset后,就可以通过pollForFetches方法拉取消息记录了

final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
    // before returning the fetched records, we can send off the next round of fetches
    // and avoid block waiting for their responses to enable pipelining while the user
    // is handling the fetched records.
    //
    // NOTE: since the consumed position has already been updated, we must not allow
    // wakeups or any other errors to be triggered prior to returning the fetched records.
    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
        client.transmitSends();
    }

    if (fetch.records().isEmpty()) {
        log.trace("Returning empty records from `poll()` "
                + "since the consumer's position has advanced for at least one topic partition");
    }

    return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

pollForFetches方法发送fetcher.sendFetches()后,等待结果返回,然后fetcher.collectFetch()获取消息记录

private Fetch<K, V> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
            Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // if data is available already, return it immediately
    final Fetch<K, V> fetch = fetcher.collectFetch();
    if (!fetch.isEmpty()) {
        return fetch;
    }

    // send any new fetches (won't resend pending fetches)
    fetcher.sendFetches();

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure

    // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
    // updateAssignmentMetadataIfNeeded before this method.
    if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
        pollTimeout = retryBackoffMs;
    }

    log.trace("Polling for fetches with timeout {}", pollTimeout);

    Timer pollTimer = time.timer(pollTimeout);
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());

    return fetcher.collectFetch();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

sendFetches方法获取offset构建成FetchRequest对象。

prepareFetchRequests()方法会对TopicPartition在subscriptions状态对象中查找本地的offset(初始值是前面通过coordinator查询到的offset), 然后构建FetchRequest请求并发送。

public synchronized int sendFetches() {
    // Update metrics in case there was an assignment change
    sensors.maybeUpdateAssignment(subscriptions);

    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        final short maxVersion;
        if (!data.canUseTopicIds()) {
            maxVersion = (short) 12;
        } else {
            maxVersion = ApiKeys.FETCH.latestVersion();
        }
        final FetchRequest.Builder request = FetchRequest.Builder
                .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend())
                .isolationLevel(isolationLevel)
                .setMaxBytes(this.maxBytes)
                .metadata(data.metadata())
                .removed(data.toForget())
                .replaced(data.toReplace())
                .rackId(clientRackId);

        if (log.isDebugEnabled()) {
            log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
        }
        RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
        // We add the node to the set of nodes with pending fetch requests before adding the
        // listener because the future may have been fulfilled on another thread (e.g. during a
        // disconnection being handled by the heartbeat thread) which will mean the listener
        // will be invoked synchronously.
        this.nodesWithPendingFetchRequests.add(entry.getKey().id());
        future.addListener(new RequestFutureListener<ClientResponse>() {
            @Override
            public void onSuccess(ClientResponse resp) {
                synchronized (Fetcher.this) {
                    try {
                        FetchResponse response = (FetchResponse) resp.responseBody();
                        FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                        if (handler == null) {
                            log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
                                    fetchTarget.id());
                            return;
                        }
                        if (!handler.handleResponse(response, resp.requestHeader().apiVersion())) {
                            if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
                                metadata.requestUpdate();
                            }
                            return;
                        }

                        Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), resp.requestHeader().apiVersion());
                        Set<TopicPartition> partitions = new HashSet<>(responseData.keySet());
                        FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                        for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
                            TopicPartition partition = entry.getKey();
                            FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
                            if (requestData == null) {
                                String message;
                                if (data.metadata().isFull()) {
                                    message = MessageFormatter.arrayFormat(
                                            "Response for missing full request partition: partition={}; metadata={}",
                                            new Object[]{partition, data.metadata()}).getMessage();
                                } else {
                                    message = MessageFormatter.arrayFormat(
                                            "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}",
                                            new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage();
                                }

                                // Received fetch response for missing session partition
                                throw new IllegalStateException(message);
                            } else {
                                long fetchOffset = requestData.fetchOffset;
                                FetchResponseData.PartitionData partitionData = entry.getValue();

                                log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                        isolationLevel, fetchOffset, partition, partitionData);

                                Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partitionData).batches().iterator();
                                short responseVersion = resp.requestHeader().apiVersion();

                                completedFetches.add(new CompletedFetch(partition, partitionData,
                                        metricAggregator, batches, fetchOffset, responseVersion));
                            }
                        }

                        sensors.fetchLatency.record(resp.requestLatencyMs());
                    } finally {
                        nodesWithPendingFetchRequests.remove(fetchTarget.id());
                    }
                }
            }

            @Override
            public void onFailure(RuntimeException e) {
                synchronized (Fetcher.this) {
                    try {
                        FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                        if (handler != null) {
                            handler.handleError(e);
                        }
                    } finally {
                        nodesWithPendingFetchRequests.remove(fetchTarget.id());
                    }
                }
            }
        });

    }
    return fetchRequestMap.size();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

发送完FetchRequest后,KafkaConsumer通过fetcher.collectFetch获取结果。 在前面的sendFetches方法中,收到返回结果后,会将结果放到completedFetches字段中(ConcurrentLinkedQueue<CompletedFetch>类型) collectFetch方法在最后调用fetchRecords,会将subscription的offset更新为fetchRecords中的nextOffset。 然后在后面的KafkaConsumer.poll方法调用ConsumerCoordinator.poll方法中,会定时异步发送本地的offset给coordinator,coordinator同步保存到kafka broker中。

# offset commit

# kafka rebalance

如何进行rebalance,避免消息不丢失,可能产生重复消费吗? 心跳,如果consumer实例挂掉,其他实例如何发现处理

consumer group如何维护

# group rebalance

# 疑问

consumerMetadata的作用

coordinator是谁?是partition的leader吗?

epoch的作用

abstract coordinator中generationId的作用

拉取消息是从leader拉取吗?是否可以从ISR中拉取

transaction的作用是什么,有哪些应用场景?

leader切换怎么处理的

partition中的highWatermark, lastStableOffset是什么

# 为什么由consumer leader进行partition assignment,而不是broker?

放在consumer中,可以在使用时实现自己的定制化的assign策略。

# 不同的线程消费模型的优缺点

# Kafka Rebalance Protocol

# 参考