# consumer offset
consumer offset是一个consumer group对于一个topic的某个partition的消费位置的记录,
# offset的查询
consumer在完成FindCoordinator、JoinGroup、SyncGroup请求后(关于kafka rebalance参考我的另一篇文章),拿到了自己要消费的partition。 然后在poll时,会通过OffsetFetch请求查询对应的partition的offset。
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
return updateFetchPositions(timer);
}
2
3
4
5
6
7
updateFetchPositions中会判断当前是否有FetchPositions,如果没有调用refreshCommittedOffsetsIfNeeded
private boolean updateFetchPositions(final Timer timer) {
// If any partitions have been truncated due to a leader change, we need to validate the offsets
fetcher.validateOffsetsIfNeeded();
cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHasAllFetchPositions) return true;
// If there are any partitions which do not have a valid position and are not
// awaiting reset, then we need to fetch committed offsets. We will only do a
// coordinator lookup if there are partitions which have missing positions, so
// a consumer with manually assigned partitions can avoid a coordinator dependence
// by always ensuring that assigned partitions have an initial position.
if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
2
3
4
5
6
7
8
9
10
11
12
13
refreshCommittedOffsetsIfNeeded方法向coordinator发送OffsetFetch请求,得到结果后保存到内存SubscriptionState中
public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();
final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(initializingPartitions, timer);
2
3
# offset的提交
offset由consumer提交,分为自动提交(默认)和手动提交(commitSync/commitAsync)两种方式。 自动提交由consumer在poll的时候检查是否超过了auto.commit.interval.ms,如果超过,发送一次异步的提交请求。
入口还是在poll方法中,调用updateAssignmentMetadataIfNeeded,在coordinator.poll方法会调用maybeAutoCommitOffsetsAsync提交commit offset。
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
return updateFetchPositions(timer);
}
2
3
4
5
6
7
public boolean poll(Timer timer, boolean waitForJoinGroup) {
// ...
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
2
3
4
5
maybeAutoCommitOffsetsAsync判断距离上次commit时间是否超过了autoCommitIntervalMs,超过了 发送OffsetCommit请求
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
autoCommitOffsetsAsync();
}
}
}
2
3
4
5
6
7
8
9
# broker中offset的存储
# group coordinator和__consumer_offsets
kafka broker内部通过一个特殊的topic __consumer_offsets来记录保存消费的offset。 这个topic有若干(offsets.topic.num.partitions默认50)个partition。
每个kafka broker都会启动一个group coordinator,每个coordinator各自负责一部分group。实现offset管理、rebalance等功能。
在FindCoordinator请求的过程是,用group name的hash(取正整数)对offsets.topic.num.partitions取模,得到对应的partition,然后这个partition的 leader节点就是负责该group的coordinator。
consumer获得自己的group对应的coordinator后,会向coordinator发起JoinGroup、SyncGroup请求,获取能够消费的TopicPartition(关于kafka rebalance的细节查看另一篇rebalance的文章)。 得到partition信息后,在下次poll请求时,就会通过OffsetFetch请求向coordinator查询对应的TopicPartition的offset。
broker在启动时会启动一个group coordinator,coordinator负责一部分group的rebalance以及offset管理。
如果有broker挂掉或新broker加入,__consumer_offsets的各个topic的leader replica重新选举leader后,新的leader会负责对应的位置上的 group(按照前面介绍的取模规则)的offset管理、rebalance。
group coordinator会在内存中维护自己管理的consumer group下所有的topic partition的offset信息,查询时直接从内存中读取返回。 写入时,会先写入到内存中,然后通过ReplicaManager写入到__consumer_offsets消息中,这个Topic消息和普通消息一样也会有replica复制。
group coordinator在成为__consumer_offsets的partition的leader(因为启动或broker变更重新选举)后,会对__consumer_offsets的消息数据进行加载到内存中, 加载完成后,就可以响应offset的查询、存储请求了,加载之前会返回正在加载的异常,consumer收到异常会稍后重试。
# __consumer_offsets 的清理
__consumer_offsets日志文件随着offset提交会越来越大,kafka给__consumer_offsets配置的清理策略是compact压缩, 通过压缩,能够将一个group下某个topic partition最后的offset保存下来。