# consumer offset

consumer offset是一个consumer group对于一个topic的某个partition的消费位置的记录,

# offset的查询

consumer在完成FindCoordinator、JoinGroup、SyncGroup请求后(关于kafka rebalance参考我的另一篇文章),拿到了自己要消费的partition。 然后在poll时,会通过OffsetFetch请求查询对应的partition的offset。

boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }

    return updateFetchPositions(timer);
}
1
2
3
4
5
6
7

updateFetchPositions中会判断当前是否有FetchPositions,如果没有调用refreshCommittedOffsetsIfNeeded

private boolean updateFetchPositions(final Timer timer) {
    // If any partitions have been truncated due to a leader change, we need to validate the offsets
    fetcher.validateOffsetsIfNeeded();

    cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
    if (cachedSubscriptionHasAllFetchPositions) return true;

    // If there are any partitions which do not have a valid position and are not
    // awaiting reset, then we need to fetch committed offsets. We will only do a
    // coordinator lookup if there are partitions which have missing positions, so
    // a consumer with manually assigned partitions can avoid a coordinator dependence
    // by always ensuring that assigned partitions have an initial position.
    if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
1
2
3
4
5
6
7
8
9
10
11
12
13

refreshCommittedOffsetsIfNeeded方法向coordinator发送OffsetFetch请求,得到结果后保存到内存SubscriptionState中

public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
    final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();
    final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(initializingPartitions, timer);
1
2
3

# offset的提交

offset由consumer提交,分为自动提交(默认)和手动提交(commitSync/commitAsync)两种方式。 自动提交由consumer在poll的时候检查是否超过了auto.commit.interval.ms,如果超过,发送一次异步的提交请求。

入口还是在poll方法中,调用updateAssignmentMetadataIfNeeded,在coordinator.poll方法会调用maybeAutoCommitOffsetsAsync提交commit offset。

boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }

    return updateFetchPositions(timer);
}
1
2
3
4
5
6
7
public boolean poll(Timer timer, boolean waitForJoinGroup) {
    // ...
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}
1
2
3
4
5

maybeAutoCommitOffsetsAsync判断距离上次commit时间是否超过了autoCommitIntervalMs,超过了 发送OffsetCommit请求

public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        nextAutoCommitTimer.update(now);
        if (nextAutoCommitTimer.isExpired()) {
            nextAutoCommitTimer.reset(autoCommitIntervalMs);
            autoCommitOffsetsAsync();
        }
    }
}
1
2
3
4
5
6
7
8
9

# broker中offset的存储

# group coordinator和__consumer_offsets

kafka broker内部通过一个特殊的topic __consumer_offsets来记录保存消费的offset。 这个topic有若干(offsets.topic.num.partitions默认50)个partition。

每个kafka broker都会启动一个group coordinator,每个coordinator各自负责一部分group。实现offset管理、rebalance等功能。

在FindCoordinator请求的过程是,用group name的hash(取正整数)对offsets.topic.num.partitions取模,得到对应的partition,然后这个partition的 leader节点就是负责该group的coordinator。

consumer获得自己的group对应的coordinator后,会向coordinator发起JoinGroup、SyncGroup请求,获取能够消费的TopicPartition(关于kafka rebalance的细节查看另一篇rebalance的文章)。 得到partition信息后,在下次poll请求时,就会通过OffsetFetch请求向coordinator查询对应的TopicPartition的offset。

broker在启动时会启动一个group coordinator,coordinator负责一部分group的rebalane以及offset管理。

如果有broker挂掉或新broker加入,__consumer_offsets的各个topic的leader replica重新选举leader后,新的leader会负责对应的位置上的 group(按照前面介绍的取模规则)的offset管理、rebalance。

group coordinator会在内存中维护自己管理的consumer group下所有的topic partition的offset信息,查询时直接从内存中读取返回。 写入时,会先写入到内存中,然后通过ReplicaManager写入到__consumer_offsets消息中,这个Topic消息和普通消息一样也会有replica复制。

group coordinator在成为__consumer_offsets的partition的leader(因为启动或broker变更重新选举)后,会对__consumer_offsets的消息数据进行加载到内存中, 加载完成后,就可以响应offset的查询、存储请求了,加载之前会返回正在加载的异常,consumer收到异常会稍后重试。

# __consumer_offsets 的清理

__consumer_offsets日志文件随着offset提交会越来越大,kafka给__consumer_offsets配置的清理策略是compact压缩, 通过压缩,能够将一个group下某个topic partition最后的offset保存下来。

img.png