前言
本章分析Kafka消费者如何Rebalance,包括:
1)如何确定Coordinator:FindCoordinatorRequest;
2)消费者如何成组:JoinGroupRequest;
3)消费者如何得到分区分配方案:SyncGroupRequest;
4)COOPERATIVE协议的作用;
5)静态成员的作用;
注:本文基于kafka2.6,没有KRaft。
使用案例
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// default enable.auto.commit=true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// default auto.commit.interval.ms=5000
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// default session.timeout.ms=10000 超时没发送心跳,被移除消费组
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
for (int i = 0; i < 10; i++) {
try {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000 * 5));
for (ConsumerRecord record : records) {
System.out.println(record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.close();
}
一、概述
1-1、消费方法概述
消费者通过KafkaConsumer#poll拉取消息,有两个API。
1)Duration真实代表poll方法最多阻塞等待的时间;
2)long并不是真实阻塞在poll方法的时长,会无限等待Metadata准备好,违背了超时时间语义,被废弃;
public ConsumerRecords poll(final long timeoutMs) {
return poll(time.timer(timeoutMs), false);
}
public ConsumerRecords poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
另一方面,KafkaConsumer非线程安全,poll方法和最终调用broker都是同一个线程,如果要多线程需要用户自己实现。
* The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
* making the call. It is the responsibility of the user to ensure that multi-threaded access
* is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
KafkaConsumer#poll:poll分为两步:1)准备metadata;2)fetch拉消息。
private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) {
try {
do {
// 1. 准备metadata
if (includeMetadataInTimeout) {
// poll(Duration)
updateAssignmentMetadataIfNeeded(timer, false);
} else {
// 过时方法,poll(long)
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}
// 2. fetch拉消息
final Map>> records = pollForFetches(timer);
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
KafkaConsumer#updateAssignmentMetadataIfNeeded:准备metadata包括:协调者poll、更新消费位点。
private final ConsumerCoordinator coordinator;
boolean updateAssignmentMetadataIfNeeded(final Timer timer,
final boolean waitForJoinGroup/*false*/) {
// 1. ConsumerCoordinator.poll
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
// 2. 更新消费位点
return updateFetchPositions(timer);
}
本次分析的内容锁定在ConsumerCoordinator#poll。
1-2、rebalance概述
这里对比一下RocketMQ的Rebalance。(POP消费可以理解为不需要rebalance)
RocketMQ的rebalance由每个消费组组内成员独立完成。
1)消费者向所有订阅的topic的master broker发送心跳,包含自身clientId;
2)循环每个topic进行rebalance;
3)从nameserver获取topic有多少个queue;
4)由于topic有n个queue在不同broker上,随便选一个broker,获取组成员列表clientIds;
5)使用AllocateMessageQueueAveragely策略为自己分配队列,根据自己的clientId所处所有clientIds的下标index,分配MessaegQueue。比如8个queue,3个client:index=0的client分配q0-q2,index=1的client分配q3-q5,index=2的client分配q6-q7;
整个过程中,所有broker节点,所有消费者实例,都是对等的,没有角色区分。
rebalance达成一致主要依赖两点:
1)消费组的订阅关系一致(一个组订阅相同的topic和tag),消费组订阅topic相关的broker都有组内所有成员心跳,从任一broker能发现所有成员clientIds;
2)将clientIds排序,消费者按照自身clientId在clientIds中的下标位置,平均分配队列;
Kafka的rebalance完全不同,引入了三种角色概念。
Coordinator:在broker中会为每个消费组分配一个Coordinator协调者,需要注意负载均衡。
1)消费组成员注册发现;
2)消费组消费进度管理;
3)消费组分区分配协调;(并不直接处理消费组的分区分配)
Group Leader:每个消费组需要有一个Leader,主要作用是执行分区分配。
Group Follower:接收分区分配。
二、相关组件介绍
2-1、SubscriptionState
订阅状态。assignment=rebalance最终分配给当前消费者的分区状态。
public class SubscriptionState {
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}
// 订阅方式
private SubscriptionType subscriptionType;
// AUTO_PATTERN 正则匹配订阅
private Pattern subscribedPattern;
// AUTO_TOPICS 精确匹配订阅
private Set<String> subscription;
// 消费组订阅topics全集
private Set<String> groupSubscription;
// 分配分区-状态
private final PartitionStates assignment;
// 如果消费组首次创建,初始化消费进度的策略
// auto.offset.reset=latest
private final OffsetResetStrategy defaultResetStrategy;
// 用户subscribe可以提供第二个参数ConsumerRebalanceListener
private ConsumerRebalanceListener rebalanceListener;
// rebalance导致assignment分区改变,assignmentId++
private int assignmentId = 0;
}
有三种订阅模式。
KafkaConsumer#subscribe→SubscriptionState#subscribe:AUTO_TOPICS,精确匹配n个topics。
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
registerRebalanceListener(listener);
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
return changeSubscription(topics);
}
private boolean changeSubscription(Set<String> topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;
subscription = topicsToSubscribe;
return true;
}
KafkaConsumer#subscribe→SubscriptionState#subscribe:AUTO_PATTERN,正则匹配topics;
public synchronized void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
registerRebalanceListener(listener);
setSubscriptionType(SubscriptionType.AUTO_PATTERN);
this.subscribedPattern = pattern;
}
KafkaConsumer#assign→SubscriptionState#assignFromUser:USER_ASSIGNED,用户直接指定分区消费,不会使用消费组rebalance。(忽略)
public synchronized boolean assignFromUser(Set partitions ) {
setSubscriptionType(SubscriptionType.USER_ASSIGNED);
if (this.assignment.partitionSet().equals(partitions))
return false;
// ...
}
SubscriptionState#assignment:消费组leader为当前消费者分配的分区→对应的分区状态。
public class PartitionStates<S> {
private final LinkedHashMap map = new LinkedHashMap<>();
private final Set partitionSetView = Collections.unmodifiableSet(map.keySet());
}
private static class TopicPartitionState {
// fetch状态
private FetchState fetchState;
// fetch位点
private FetchPosition position;
// 高水位
private Long highWatermark;
// log startOffset
private Long logStartOffset;
// LSO
private Long lastStableOffset;
// 被用户停止
private boolean paused;
private OffsetResetStrategy resetStrategy;
private Long nextRetryTimeMs;
private Integer preferredReadReplica;
private Long preferredReadReplicaExpireTimeMs;
}
2-2、ConsumerMetadata
ConsumerMetadata在Metadata的基础上根据订阅模式走不同逻辑处理MetadataRequest/Response。
精确订阅,精确获取元数据;正则订阅,需要全量获取topic,正则筛选元数据。
public class ConsumerMetadata extends Metadata {
// 是否包含内部topic,默认false
private final boolean includeInternalTopics;
// 默认允许自动创建topic
private final boolean allowAutoTopicCreation;
// 订阅状态
private final SubscriptionState subscription;
// 忽略
private final Set<String> transientTopics;
public ConsumerMetadata(
// retry.backoff.ms=100ms
long refreshBackoffMs,
// metadata.max.age.ms=5min
long metadataExpireMs,
// !(exclude.internal.topics=true) = false
boolean includeInternalTopics,
// allow.auto.create.topics=true
boolean allowAutoTopicCreation,
SubscriptionState subscription, // 订阅状态
LogContext logContext,
ClusterResourceListeners clusterResourceListeners) {
super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
this.includeInternalTopics = includeInternalTopics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.subscription = subscription;
this.transientTopics = new HashSet<>();
}
public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
if (subscription.hasPatternSubscription())
// 正则匹配,需要获取所有元数据
return MetadataRequest.Builder.allTopics();
// 精确匹配,只要获取订阅的(组leader分配的)
List<String> topics = new ArrayList<>();
topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
}
// MetadataResponse中的topic是否要保留元数据
@Override
protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
// 精确订阅
return true;
if (isInternal && !includeInternalTopics)
// 内部topic默认不需要
return false;
// 正则匹配
return subscription.matchesSubscribedPattern(topic);
}
}
2-3、ConsumerNetworkClient
Consumer的通讯客户端高层封装。
public class ConsumerNetworkClient implements Closeable {
// 通讯客户端
private final KafkaClient client;
// 待发送请求
private final UnsentRequests unsent = new UnsentRequests();
// 待处理响应
private final ConcurrentLinkedQueue
pendingCompletion = new ConcurrentLinkedQueue<>();
private final static class UnsentRequests {
private final ConcurrentMap// 虽然这里是个队列,但是实际上一个节点只会有一个待发送请求
// 这个是上层控制的,底层KafkaChannel同时只能有一个Send缓存
ConcurrentLinkedQueue> unsent;
}
}
Kafka的通讯方式和Producer类似。
ConsumerNetworkClient#send:上层先将请求缓存到unsent中。
public RequestFuture send(Node node,
AbstractRequest.Builder> requestBuilder,
int requestTimeoutMs) {
long now = time.milliseconds();
// 响应回调
RequestFutureCompletionHandler completionHandler =
new RequestFutureCompletionHandler();
// 构造请求
ClientRequest clientRequest =
client.newClientRequest(node.idString(), requestBuilder, now, true,
requestTimeoutMs, completionHandler);
// 缓存请求
unsent.put(node, clientRequest);
// nioSelector wakeup
client.wakeup();
// 返回响应future给上层
return completionHandler.future;
}
ConsumerNetworkClient#poll:真实执行rpc,发送请求获取响应。
1)trySend:处理请求,将unsent缓存的请求转换为Send缓存到KafkaChannel;
2)client.poll:调用底层通讯客户端执行IO读写;
3)firePendingCompletedRequests:处理响应;
private final KafkaClient client;
public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
// 1. 处理响应 pendingCompletion
firePendingCompletedRequests();
lock.lock();
try {
handlePendingDisconnects();
// 2. 处理请求 unsent -> KafkaChannel.send
// 如果连接未建立,这里触发建立连接
long pollDelayMs = trySend(timer.currentTimeMs());
// 3. 执行io 发请求 收响应
if (pendingCompletion.isEmpty() && (pollCondition == null
|| pollCondition.shouldBlock())) {
long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
if (client.inFlightRequestCount() == 0)
pollTimeout = Math.min(pollTimeout, retryBackoffMs);
client.poll(pollTimeout, timer.currentTimeMs());
} else {
client.poll(0, timer.currentTimeMs());
}
// ...
} finally {
lock.unlock();
}
// 4. 再次处理响应 pendingCompletion
firePendingCompletedRequests();
metadata.maybeThrowAnyException();
}
long trySend(long now) {
// 3000
long pollDelayMs = maxPollTimeoutMs;
for (Node node : unsent.nodes()) {
Iterator iterator = unsent.requestIterator(node);
if (iterator.hasNext())
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
// 如果没建立连接,这里ready会尝试建立
if (client.ready(node, now)) {
// 将Request转换为Send,缓存到KafkaChannel通道
client.send(request, now);
iterator.remove();
} else {
break;
}
}
}
return pollDelayMs;
}
RequestFutureCompletionHandler是ConsumerNetworkClient的内部类,被ClientRequest包装传入底层通讯client。当收到响应后,handler被加入pendingCompletion队列。
private class RequestFutureCompletionHandler implements RequestCompletionHandler {
private final RequestFuture future;
private ClientResponse response;
private RuntimeException e;
private RequestFutureCompletionHandler() {
this.future = new RequestFuture<>();
}
public void fireCompletion() {
// 完成future
if (e != null) {
future.raise(e);
} else if (response.authenticationException() != null) {
future.raise(response.authenticationException());
} else if (response.wasDisconnected()) {
response.requestHeader(), response.destination());
future.raise(DisconnectException.INSTANCE);
} else if (response.versionMismatch() != null) {
future.raise(response.versionMismatch());
} else {
future.complete(response);
}
}
public void onFailure(RuntimeException e) {
this.e = e;
pendingCompletion.add(this);
}
@Override
public void onComplete(ClientResponse response) {
this.response = response;
pendingCompletion.add(this);
}
}
ConsumerNetworkClient#firePendingCompletedRequests:循环所有响应,完成future。
private void firePendingCompletedRequests() {
boolean completedRequestsFired = false;
for (;;) {
RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
if (completionHandler == null)
break;
completionHandler.fireCompletion();
completedRequestsFired = true;
}
if (completedRequestsFired)
client.wakeup();
}
2-4、ConsumerGroupMetadata
ConsumerGroupMetadata:消费组元数据,消费组成员进组后更新。
public class ConsumerGroupMetadata {
// group.id = 消费组id
final private String groupId;
// UNKNOWN_GENERATION_ID = -1
final private int generationId;
// UNKNOWN_MEMBER_ID = ""
final private String memberId;
// group.instance.id = 静态成员id(default null)
final private Optional<String> groupInstanceId;
}
2-5、ConsumerPartitionAssignor
ConsumerPartitionAssignor:分配器,分区分配算法,通过 集群元数据+组成员订阅信息 分配 消费组成员消费分区。
public interface ConsumerPartitionAssignor {
// 分配分区
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
// 协议,默认EAGER
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
}
final class GroupSubscription {
private final Map<String/*组成员id*/, Subscription> subscriptions;
}
final class Subscription {
// 消费者实例订阅的topics
private final List<String> topics;
// 消费者实例实际被分配的分区
private final List<TopicPartition> ownedPartitions;
// 静态成员id
private Optional<String> groupInstanceId;
}
final class GroupAssignment {
private final Map<String/*组成员id*/, Assignment> assignments;
}
final class Assignment {
private List<TopicPartition> partitions;
}
partition.assignment.strategy可配置ConsumerPartitionAssignor实现。
2.6版本默认策略为RangeAssignor,高版本为RangeAssignor+CooperativeStickyAssignor。
2-6、ConsumerCoordinator
ConsumerCoordinator:客户端侧的核心组件,与服务端协调者交互。
public final class ConsumerCoordinator extends AbstractCoordinator {
// rebalance配置
private final GroupRebalanceConfig rebalanceConfig;
// rebalance分配分区算法
private final List<ConsumerPartitionAssignor> assignors;
// 消费者元数据
private final ConsumerMetadata metadata;
// 订阅状态
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
// 是否自动提交
private final boolean autoCommitEnabled;
// 自动提交间隔
private final int autoCommitIntervalMs;
// 消费拦截器(暴露给用户的扩展点)
private final ConsumerInterceptors, ?> interceptors;
private final AtomicInteger pendingAsyncCommits;
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
// 自己是否是groupLeader
private boolean isLeader = false;
private Set<String> joinedSubscription;
// 临时元数据
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
// 自动提交计时器
private Timer nextAutoCommitTimer;
private AtomicBoolean asyncCommitFenced;
// 消费组元数据
private ConsumerGroupMetadata groupMetadata;
private final boolean throwOnFetchStableOffsetsUnsupported;
// rebalance协议
private final RebalanceProtocol protocol;
}
AbstractCoordinator:
public abstract class AbstractCoordinator implements Closeable {
// 心跳信息
private final Heartbeat heartbeat;
// rebalance配置
private final GroupRebalanceConfig rebalanceConfig;
// 通讯客户端
protected final ConsumerNetworkClient client;
// 协调者节点
private Node coordinator = null;
// 是否需要JoinGroup进组
private boolean rejoinNeeded = true;
// 是否需要执行JoinPrepare
private boolean needsJoinPrepare = true;
// 心跳线程
private HeartbeatThread heartbeatThread = null;
// JoinGroup的future
private RequestFuture joinFuture = null;
// FindCoordinator的future
private RequestFuture findCoordinatorFuture = null;
private volatile RuntimeException fatalFindCoordinatorException = null;
// generation
private Generation generation = Generation.NO_GENERATION;
private long lastRebalanceStartMs = -1L;
private long lastRebalanceEndMs = -1L;
private long lastTimeOfConnectionMs = -1L;
// 消费者进组状态
protected MemberState state = MemberState.UNJOINED;
}
ConsumerCoordinator#poll:本次分析的核心方法。
1)ensureCoordinatorReady:发现Coordinator;
2)ensureActiveGroup:加入消费组,完成rebalance;
3)maybeAutoCommitOffsetsAsync:如果自动提交,会在coordinator里执行提交;(本次分析忽略)
public boolean poll(Timer timer, boolean waitForJoinGroup) {
maybeUpdateSubscriptionMetadata();
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.hasAutoAssignedPartitions()) {
// case1 精确订阅 / 正则订阅
// 唤醒kafka-coordinator-heartbeat-thread线程
// 更新 Heartbeat 上次 poll 时间
// 维持 max.poll.interval.ms = 5m 避免触发 rebalance
pollHeartbeat(timer.currentTimeMs());
// Step1,发现Coordinator FindCoordinatorRequest 确保Coordinator连接建立
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
// Step2,尝试重新进组
if (rejoinNeededOrPending()) {
// 消费者进组 JoinGroupRequest + SyncGroupRequest
if (!ensureActiveGroup(waitForJoinGroup ?
timer : time.timer(0L))) {
timer.update(time.milliseconds());
return false;
}
}
} else {
// case2 用户手动assign分区,只需要更新元数据
if (metadata.updateRequested() &&
!client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
// 自动提交offset
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
三、发现协调者
3-1、consumer发现协调者不可用
AbstractCoordinator#coordinatorUnknown:如果这里检测到coordinator节点连接断开,设置coordinator为null,代表需要重新发现协调者。
private Node coordinator = null;
public boolean coordinatorUnknown() {
return checkAndGetCoordinator() == null;
}
protected synchronized Node checkAndGetCoordinator() {
if (coordinator != null && client.isUnavailable(coordinator)) {
markCoordinatorUnknown(true);
return null;
}
return this.coordinator;
}
protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
if (this.coordinator != null) {
log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
Node oldCoordinator = this.coordinator;
this.coordinator = null;
if (!isDisconnected)
client.disconnectAsync(oldCoordinator);
lastTimeOfConnectionMs = time.milliseconds();
}
}
3-2、consumer发送FindCoordinatorRequest
AbstractCoordinator#ensureCoordinatorReady:发送FindCoordinatorRequest。
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
if (!coordinatorUnknown())
return true;
do {
if (fatalFindCoordinatorException != null) {
final RuntimeException fatalException = fatalFindCoordinatorException;
fatalFindCoordinatorException = null;
throw fatalException;
}
// 缓存FindCoordinatorRequest到ConsumerNetworkClient.unsent
final RequestFuture future = lookupCoordinator();
// 通讯层发起FindCoordinatorRequest请求,等待处理结果
client.poll(future, timer);
// FindCoordinatorRequest超时直接返回
if (!future.isDone()) {
break;
}
RuntimeException fatalException = null;
if (future.failed()) {
if (future.isRetriable()) {
client.awaitMetadataUpdate(timer);
} else {
fatalException = future.exception();
}
} else if (coordinator != null && client.isUnavailable(coordinator)) {
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
clearFindCoordinatorFuture();
if (fatalException != null)
throw fatalException;
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
}
AbstractCoordinator#lookupCoordinator:挑选负载最低的broker节点,查询当前消费组的Coordinator节点。负载根据client和broker的inFlightRequests(处理中请求数)决定。
private RequestFuture findCoordinatorFuture = null;
protected synchronized RequestFuture lookupCoordinator() {
if (findCoordinatorFuture == null) {
Node node = this.client.leastLoadedNode();
if (node == null) {
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
AbstractCoordinator#sendFindCoordinatorRequest:请求中包含当前消费组id。
private RequestFuture sendFindCoordinatorRequest(Node node) {
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
3-3、broker处理FindCoordinatorRequest
KafkaApis#handleFindCoordinatorRequest:处理FindCoordinatorRequest的主流程。
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val findCoordinatorRequest = request.body[FindCoordinatorRequest]
val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
case CoordinatorType.GROUP =>
// Step1,计算consumerGroup所属分区
val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
// Step2,尝试创建topic=__consumer_offsets
val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
(partition, metadata)
}
def createResponse(requestThrottleMs: Int): AbstractResponse = {
// 构造FindCoordinatorResponse
def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setErrorCode(error.code)
.setErrorMessage(error.message)
// brokerId
.setNodeId(node.id)
// host
.setHost(node.host)
// port
.setPort(node.port)
.setThrottleTimeMs(requestThrottleMs))
}
val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
// Step3,根据partitionId找到分区leader的端点
val coordinatorEndpoint = topicMetadata.partitions.asScala
.find(_.partitionIndex == partition)
.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
.flatMap(_.getNode(request.context.listenerName))
.filterNot(_.isEmpty)
coordinatorEndpoint match {
case Some(endpoint) =>
createFindCoordinatorResponse(Errors.NONE, endpoint)
case _ =>
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
}
}
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
GroupCoordinator#partitionFor:Step1
消费组对应的Coordinator取决于系统topic= _consumer_offsets的分区数(默认50)。
协调者节点=leaderOf(hash(groupId) %__consumer_offsets的分区数)。
class GroupCoordinator {
def partitionFor(group: String): Int = groupManager.partitionFor(group)
}
// GroupMetadataManager#partitionFor
private val groupMetadataTopicPartitionCount =
getGroupMetadataTopicPartitionCount
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode)
% groupMetadataTopicPartitionCount
private def getGroupMetadataTopicPartitionCount: Int = {
zkClient.getTopicPartitionCount("__consumer_offsets")
.getOrElse(config.offsetsTopicNumPartitions/*50*/)
}
KafkaApis#getOrCreateInternalTopic:Step2,如果缓存里没有这个topic,会自动创建。
private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
topicMetadata.headOption.getOrElse(createInternalTopic(topic))
}
KafkaApis#createInternalTopic:_consumeroffsets的配置为50分区+3副本,segment大小100MB,清理策略=compact。
private def createInternalTopic(topic: String): MetadataResponseTopic = {
// 存活broker数量
val aliveBrokers = metadataCache.getAliveBrokers
topic match {
case "__consumer_offsets" =>
if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
// 存活broker < offsets.topic.replication.factor(3),返回COORDINATOR_NOT_AVAILABLE
// server.properties这个配置可能是1,注意调整
metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE,
topic, true, util.Collections.emptyList())
} else {
// 创建内部topic=__consumer_offsets,
// partition=offsets.topic.num.partitions=50
// replicationFactor=offsets.topic.replication.factor=3
createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
groupCoordinator.offsetsTopicConfigs)
}
}
}
// __consumer_offsets的其他配置
def offsetsTopicConfigs: Properties = {
val props = new Properties
// cleanup.policy=compact
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
// segment.bytes=100MB(正常segment默认是1G)
props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
props
}
3-4、consumer处理FindCoordinatorResponse
FindCoordinatorResponseHandler处理FindCoordinatorResponse,与协调者建立连接。
协调者一方面参与消费组维护工作,另一方面也会作为普通broker提供消息能力,在客户端侧会分别建立连接,协调者的连接id=Integer.MAX_VALUE - brokerId。
private class FindCoordinatorResponseHandler extends
RequestFutureAdapter {
@Override
public void onSuccess(ClientResponse resp, RequestFuture future) {
FindCoordinatorResponse findCoordinatorResponse =
(FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
// 协调者的connectionId = Integer.MAX_VALUE - brokerId
// 普通broker的connectionId = brokerId
// 使用两条通讯的通道
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());
log.info("Discovered group coordinator {}", coordinator);
// 建立连接
client.tryConnect(coordinator);
// 重新开始计算session.timeout.ms=10s(高版本45s)协调者下线检测时间
heartbeat.resetSessionTimeout();
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture future) {
if (!(e instanceof RetriableException)) {
fatalFindCoordinatorException = e;
}
super.onFailure(e, future);
}
}
四、消费者Rebalance
4-1、是否需要重新进组
ConsumerCoordinator#rejoinNeededOrPending:重新进组的情况有:
1)topic分区数量发生变更,只有leader才关注;
2)当前消费者实例订阅变更;
3)rejoinNeeded=true,比如刚启动;
public boolean rejoinNeededOrPending() {
if (!subscriptions.hasAutoAssignedPartitions())
// 用户手动assign,不需要rebalance
return false;
if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
// topic分区数量发生变更(只有leader才有assignmentSnapshot)
requestRejoin();
return true;
}
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
// 当前消费者实例订阅变更
requestRejoin();
return true;
}
return super.rejoinNeededOrPending();
}
// 是否需要JoinGroup进组
private boolean rejoinNeeded = true;
// 是否需要执行JoinPrepare
private boolean needsJoinPrepare = true;
protected synchronized boolean rejoinNeededOrPending() {
return rejoinNeeded || joinFuture != null;
}
4-2、consumer发送JoinGroupRequest
AbstractCoordinator#ensureActiveGroup:开启心跳线程,执行进组。
boolean ensureActiveGroup(final Timer timer) {
// 确认coordinator就绪
if (!ensureCoordinatorReady(timer)) {
return false;
}
// 开启心跳线程
startHeartbeatThreadIfNeeded();
// 进组
return joinGroupIfNeeded(timer);
}
private HeartbeatThread heartbeatThread = null;
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
// 心跳线程创建了,但是暂时还不会发送心跳,需要先进组
private boolean enabled = false;
}
AbstractCoordinator#joinGroupIfNeeded:执行JoinPrepare+发送JoinGroupRequest。
boolean joinGroupIfNeeded(final Timer timer) {
// 需要进组
while (rejoinNeededOrPending()) {
// 确保Coordinator连接建立
if (!ensureCoordinatorReady(timer)) {
return false;
}
// 清理之前的订阅状态
if (needsJoinPrepare) {
needsJoinPrepare = false;
onJoinPrepare(generation.generationId, generation.memberId);
}
// 发送JoinGroupRequest、SyncGroupRequest
final RequestFuture future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// 超时
return false;
}
// 处理future结果
// ...
}
return true;
}
AbstractCoordinator.Generation:客户端侧用Generation维护当前是第几轮进组。
protected static class Generation {
public static final Generation NO_GENERATION = new Generation(
// -1
OffsetCommitRequest.DEFAULT_GENERATION_ID,
// ""
JoinGroupRequest.UNKNOWN_MEMBER_ID,
null);
public final int generationId; // 第几轮join
public final String memberId; // 当前成员id
public final String protocolName; // 用什么策略
}
ConsumerCoordinator#onJoinPrepare:
1)如果自动提交,先提交一下offset;
2)如果generation=null,清空分配给自己的分区;
3)如果generation≠null,根据protocol策略清除订阅状态中分配给自己的分区;
4)取消leader标记;
protected void onJoinPrepare(int generation, String memberId) {
// 如果自动提交,先提交一下offset
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
Exception exception = null;
final Set revokedPartitions;
if (generation == Generation.NO_GENERATION.generationId &&
memberId.equals(Generation.NO_GENERATION.memberId)) {
// 初始状态或generation被heartbeat线程设置为NO_GENERATION
// 取消所有分配给自己的分区
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
if (!revokedPartitions.isEmpty()) {
exception = invokePartitionsLost(revokedPartitions);
subscriptions.assignFromSubscribed(Collections.emptySet());
}
} else {
switch (protocol) { // 默认EAGER
case EAGER:
// 移除所有分配给自己的分区
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
exception = invokePartitionsRevoked(revokedPartitions);
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
case COOPERATIVE:
// 仅移除非自己订阅的topic的分区
Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions = ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet());
if (!revokedPartitions.isEmpty()) {
exception = invokePartitionsRevoked(revokedPartitions);
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions);
}
break;
}
}
// 标记自己非leader
isLeader = false;
// 清空消费组订阅的topic列表
subscriptions.resetGroupSubscription();
if (exception != null) {
throw new KafkaException("User rebalance callback throws an error", exception);
}
}
AbstractCoordinator#initiateJoinGroup:组装JoinGroupRequest,请求超时时间=max(request.timeout.ms=30s, rebalance.timeout.ms=5分钟+5s)。
groupId:消费组id;
sessionTimeoutMs:session.timeout.ms=10s(高版本45s),心跳超时时间;
memberId:当前成员id,初始为空串;
groupInstanceId:静态成员id,默认空;
protocolType:固定consumer;
protocols:订阅的topics和当前的分配给自己的分区;
rebalanceTimeoutMs: max.poll.interval.ms=5分钟,rebalance等待超时时间;
private synchronized RequestFuture initiateJoinGroup() {
if (joinFuture == null) {
// UNJOINED -> PREPARING_REBALANCE
state = MemberState.PREPARING_REBALANCE;
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
joinFuture = sendJoinGroupRequest();
}
return joinFuture;
}
RequestFuture sendJoinGroupRequest() {
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
// 请求超时时间 = max(request.timeout.ms=30s, rebalance.timeout.ms=5分钟+5s)
int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler(generation));
}
protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
this.joinedSubscription = subscriptions.subscription();
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
List topics = new ArrayList<>(joinedSubscription);
// 循环所有支持的分配算法,默认RangeAssignor
for (ConsumerPartitionAssignor assignor : assignors) {
Subscription subscription = new Subscription(
// 订阅的topics
topics,
// 忽略
assignor.subscriptionUserData(joinedSubscription),
// 当前分配到的分区
subscriptions.assignedPartitionsList());
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName(assignor.name())
.setMetadata(Utils.toArray(metadata)));
}
return protocolSet;
}
4-3、coordinator处理JoinGroupRequest
4-3-1、分配memberId
GroupCoordinator#doUnknownJoinGroup:消费组成员刚上线进组,首次发送JoinGroupRequest,memberId为空,coordinator分配memberId并响应MEMBER_ID_REQUIRED。
private def doUnknownJoinGroup(group: GroupMetadata,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
// 响应回调
responseCallback: JoinCallback): Unit = {
group.inLock {
//...
// 生成memberId
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
if (group.hasStaticMember(groupInstanceId)) {
// 静态成员 暂时忽略
updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
} else if (requireKnownMemberId) {
// memberId标记为pending
group.addPendingMember(newMemberId)
// 延迟任务sessionTimeoutMs=10s(高版本45s),如果10s内没有收到joinGroup请求,则将memberId从pendingMember中移除
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
// 返回分配的memberId,和MEMBER_ID_REQUIRED异常
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
}
}
}
GroupMetadata#generateMemberId:memberId=clientId-{uuid},其中clientId由客户端生成,默认clientId=consumer-{groupId}-{客户端自增id}。
def generateMemberId(clientId: String,
groupInstanceId: Option[String]): String = {
groupInstanceId match {
case None =>
// {client.id}-{uuid}
// client.id默认客户端生成规则consumer-{groupId}-{客户端自增id}
clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
case Some(instanceId) =>
// 静态成员(忽略)group.instance.id-{uuid}
instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
}
}
GroupMetadata#pendingMembers:这个memberId被临时存储在pendingMembers中,如果客户端超时sessionTimeoutMs未进组,将被移除。
private val pendingMembers = new mutable.HashSet[String]
def addPendingMember(memberId: String) = pendingMembers.add(memberId)
AbstractCoordinator#joinGroupIfNeeded:回到consumer,更新generation的memberId,再次发起JoinGroupRequest。
private class JoinGroupResponseHandler extends
CoordinatorResponseHandler {
public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
Errors error = joinResponse.error();
// ...
if (error == Errors.MEMBER_ID_REQUIRED) {
String memberId = joinResponse.data().memberId();
synchronized (AbstractCoordinator.this) {
// 更新generation的memberId为服务端返回
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
future.raise(error);
}
}
}
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
// ...
// 发送JoinGroupRequest
final RequestFuture future = initiateJoinGroup();
client.poll(future, timer);
// ...
if (future.succeeded()) {
// ...
} else {
final RuntimeException exception = future.exception();
resetJoinGroupFuture();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
// 收到MemberIdRequiredException异常
exception instanceof MemberIdRequiredException)
continue;
// ...
}
}
return true;
}
4-3-2、新成员JoinGroup
上面新分配的memberId被标记为pending,新成员再次发送JoinGroupRequest。
GroupCoordinator#doJoinGroup:将JoinGroupRequest封装为MemberMetadata加入GroupMetadata,等待JoinGroupResponse成员数量numMembersAwaitingJoin+1。
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
if (group.isPendingMember(memberId)) {
// pending的成员正式进组
addMemberAndRebalance(rebalanceTimeoutMs,
sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType,
protocols, group, responseCallback)
}
}
}
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): Unit = {
val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
member.isNew = true
if (group.is(PreparingRebalance) && group.generationId == 0)
// group首次rebalance,标记newMemberAdded=true
group.newMemberAdded = true
// 进组,numMembersAwaitingJoin++
group.add(member, callback)
// 提交固定5分钟的DelayedHeartbeat延迟任务,如果超时没后续处理,member出组
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
if (member.isStaticMember) {
group.addStaticMember(groupInstanceId, memberId)
} else {
group.removePendingMember(memberId)
}
// 触发rebalance
maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}
// GroupMetadata#add
def add(member: MemberMetadata, callback: JoinCallback = null): Unit = {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
if (leaderId.isEmpty)
// 第一个进组的成员,成为leader
leaderId = Some(member.memberId)
members.put(member.memberId, member)
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
member.awaitingJoinCallback = callback
if (member.isAwaitingJoin)
numMembersAwaitingJoin += 1
}
GroupCoordinator#maybePrepareRebalance:每个group对应一个DelayedJoin延迟任务,尝试完成DelayedJoin进组,否则提交DelayedJoin进组延迟任务。
private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {
group.inLock { // group锁
if (group.canRebalance)
prepareRebalance(group, reason)
}
}
private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
if (group.is(CompletingRebalance)) {
// group已经完成进组流程,响应JoinGroupResponse,
// 但是还没用下发assignment分配,则下发assignment=空的SyncGroupResult
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
}
val delayedRebalance = if (group.is(Empty)) {
// 如果组里还没人,组刚创建,使用InitialDelayedJoin策略
new InitialDelayedJoin(this,
joinPurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
// group.initial.rebalance.delay.ms=3s
groupConfig.groupInitialRebalanceDelayMs,
// max.poll.interval.ms=5min - 3s
max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
}
else {
// 如果组里已经有人了,使用DelayedJoin策略,延迟max.poll.interval.ms=5min
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
}
// 变更为PreparingRebalance
group.transitionTo(PreparingRebalance)
val groupKey = GroupKey(group.groupId)
// 针对group,尝试完成DelayedJoin,否则提交DelayedJoin延迟任务
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
4-3-3、老成员JoinGroup
group在broker侧有四个重要状态(详细的状态描述见GroupState的javadoc):
1)Empty:空组;
2)PreparingRebalance:收到JoinGroupRequest,rebalance中,还未响应JoinResponse;
3)CompletingRebalance:响应JoinResponse,达成成组条件;
4)Stable:收到leader的SyncGroupRequest分区分配方案;
GroupCoordinator#doJoinGroup:老成员JoinGroup需要根据状态和元数据情况区分。
1)PreparingRebalance:走updateMemberAndRebalance;
2)CompletingRebalance:如果响应JoinGroupResponse后再收到JoinGroupRequest(比如客户端发现rebalance超时),protocols与成员在broker侧的元数据比对(成员订阅的topics变更),发生变更走updateMemberAndRebalance;未变更,再响应JoinResponse;
3)Stable:如果是leader发起的或protocols变更,走updateMemberAndRebalance;未变更,再响应JoinResponse;
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
if (group.isPendingMember(memberId)) {
// 新成员进组
} else {
// 非pending成员进组
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId) || groupInstanceIdNotFound) {
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
val member = group.get(memberId)
group.currentState match {
case PreparingRebalance =>
// 收到JoinGroup还未响应JoinResponse
updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback)
case CompletingRebalance =>
// 响应JoinResponse等待SyncGroupRequest
if (member.matches(protocols)) {
responseCallback(JoinGroupResult(
members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
}
case Stable =>
// 收到leader的SyncGroupRequest
val member = group.get(memberId)
if (group.isLeader(memberId)) {
updateMemberAndRebalance(group, member, protocols, s"leader ${member.memberId} re-joining group during ${group.currentState}", responseCallback)
} else if (!member.matches(protocols)) {
updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
} else {
responseCallback(JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
case Empty | Dead =>
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
}
}
}
}
GroupCoordinator#updateMemberAndRebalance:更新MemberMetadata,等待JoinGroupResponse成员数量numMembersAwaitingJoin+1,尝试完成DelayedJoin进组,否则提交DelayedJoin进组延迟任务。
private def updateMemberAndRebalance(group: GroupMetadata,
member: MemberMetadata,
protocols: List[(String, Array[Byte])],
reason: String,
callback: JoinCallback): Unit = {
group.updateMember(member, protocols, callback)
maybePrepareRebalance(group, reason)
}
// 同新成员进组,尝试完成DelayedJoin,否则提交DelayedJoin延迟任务
private def maybePrepareRebalance(group: GroupMetadata,
reason: String): Unit = {
group.inLock {
if (group.canRebalance)
prepareRebalance(group, reason)
}
}
def updateMember(member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback): Unit = {
member.supportedProtocols = protocols
if (callback != null && !member.isAwaitingJoin) {
numMembersAwaitingJoin += 1
}
member.awaitingJoinCallback = callback
}
4-3-4、成组
PreparingRebalance阶段,一个group会对应一个DelayedJoin。
每个JoinGroupRequest,都会尝试执行DelayedJoin#tryComplete,判断是否可以满足完成成组条件,如果不满足则提交rebalanceTimeoutMs=5分钟的延迟任务。
当满足成组条件或超时,执行DelayedJoin#onComplete。
private[group] class DelayedJoin(coordinator: GroupCoordinator,
group: GroupMetadata,
rebalanceTimeout: Long)
extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
// 每次成员JoinGroupRequest,触发tryComplete,尝试完成进组
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
// tryComplete成功/超时,执行
override def onComplete() = coordinator.onCompleteJoin(group)
}
GroupCoordinator#tryCompleteJoin:如果 成员数量 = 等待JoinGroupResponse成员数量(numMembersAwaitingJoin),且没有pending成员(刚分配memberId还未发送JoinGroupRequest的成员)待加入,则成组。
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
if (group.hasAllMembersJoined) {
// 执行onComplete
forceComplete()
} else false
}
}
// GroupMetadata
def hasAllMembersJoined = members.size == numMembersAwaitingJoin && pendingMembers.isEmpty
关于超时计算的方式,在broker侧有两种策略。
DelayedJoin:当消费组不为空发生rebalance,rebalanceTimeoutMs=5分钟。
InitialDelayedJoin:如果消费组为空发生reblance,3s内没新成员进组,则执行DelayedJoin#onComplete快速成组,3s内有新成员进组,则再延迟3s判断,最大不超过rebalanceTimeoutMs=5分钟。
private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
purgatory: DelayedOperationPurgatory[DelayedJoin],
group: GroupMetadata,
// group.initial.rebalance.delay.ms=3s
configuredRebalanceDelay: Int,
delayMs: Int,
// max.poll.interval.ms=5min - group.initial.rebalance.delay.ms=3s
remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {
override def tryComplete(): Boolean = false
// 当超时自动触发
override def onComplete(): Unit = {
group.inLock {
// 有新成员进组newMemberAdded会设置为true
// remainingMs=剩余rebalance超时时间 > 0
if (group.newMemberAdded && remainingMs != 0) {
group.newMemberAdded = false
val delay = min(configuredRebalanceDelay, remainingMs)
val remaining = max(remainingMs - delayMs, 0)
purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
purgatory,
group,
configuredRebalanceDelay,
delay,
remaining
), Seq(GroupKey(group.groupId)))
} else
// 3s内没新成员进组,或超过rebalanceTimeout5分钟,走DelayedJoin
super.onComplete()
}
}
}
GroupCoordinator#onCompleteJoin:成组
1)移除超时时间内没发送JoinGroup的成员;
2)选leader;
3)generation+1;
4)响应所有成员JoinGroupResponse,只响应leader所有成员元数据(JoinGroupRequest.protocols);
5)开启心跳超时检测,对应JoinGroupRequest.sessionTimeout(session.timeout.ms=10s(高版本45s));
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// 1. 移除没发送JoinGroupRequest的成员
// (如果这里移除了,往往就是rebalance超时触发的成组)
group.notYetRejoinedMembers.filterNot(_.isStaticMember)
foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
group.removeStaticMember(failedMember.groupInstanceId)
}
if (
// 2. 选leader
!group.maybeElectNewJoinedLeader()
&& group.allMembers.nonEmpty) {
joinPurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
Seq(GroupKey(group.groupId)))
} else {
// 3. generation++
group.initNextGeneration()
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(
members = if (group.isLeader(member.memberId)) {
// leader才响应所有成员元数据(JoinGroupRequest.protocols)
group.currentMemberMetadata
} else {
List.empty
},
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE)
// 4. 响应JoinGroupResponse
group.maybeInvokeJoinCallback(member, joinResult)
// 5. 开启心跳检测延迟任务
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
}
}
}
}
// GroupMetadata
def initNextGeneration(): Unit = {
if (members.nonEmpty) {
generationId += 1
protocolName = Some(selectProtocol)
// 组内成员订阅的topics全集
subscribedTopics = computeSubscribedTopics()
transitionTo(CompletingRebalance)
} else {
generationId += 1
protocolName = None
subscribedTopics = computeSubscribedTopics()
transitionTo(Empty)
}
receivedConsumerOffsetCommits = false
receivedTransactionalOffsetCommits = false
}
GroupMetadata#maybeElectNewJoinedLeader:选举leader。如果老leader在rebalance阶段及时发送JoinGroup,则保留老leader;否则随便选一个发送了JoinGroup的成员成为leader。
def maybeElectNewJoinedLeader(): Boolean = {
leaderId.exists { currentLeaderId =>
val currentLeader = get(currentLeaderId)
if (!currentLeader.isAwaitingJoin) {
// 老leader没发送JoinGroup
members.find(_._2.isAwaitingJoin) match {
// 随便选一个发送JoinGroup的成员成为leader
case Some((anyJoinedMemberId, anyJoinedMember)) =>
leaderId = Option(anyJoinedMemberId)
true
case None =>
false
}
} else {
// 老leader发送了JoinGroup,直接当选
true
}
}
}
4-4、consumer处理JoinGroupResponse,发送SyncGroupRequest
经过JoinGroup,coordinator知道消费组成员和他们的订阅情况(GroupMetadata),响应所有成员JoinGroupResponse。
JoinGroupResponseHandler#handle:处理JoinGroupResponse。
1)允许心跳线程开始发送心跳;
2)coordinator下发新的generation,consumer更新内存generation;
3)区分leader和follower执行SyncGroup,执行完成后(chain)才能完成最终的joinGroupFuture;
private class JoinGroupResponseHandler extends
CoordinatorResponseHandler {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
if (state != MemberState.PREPARING_REBALANCE) {
future.raise(new UnjoinedGroupException());
} else {
state = MemberState.COMPLETING_REBALANCE;
// 允许发送心跳
if (heartbeatThread != null)
heartbeatThread.enable();
// 更新generation
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
// 区分leader和follower处理
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
}
}
}
AbstractCoordinator#onJoinFollower:follower回复一个简单的SyncGroupRequest,携带刚下发的generation,不包含assignments。
private RequestFuture onJoinFollower() {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
// 注意这里是空的
.setAssignments(Collections.emptyList()));
return sendSyncGroupRequest(requestBuilder);
}
private RequestFuture sendSyncGroupRequest(
SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder)
.compose(new SyncGroupResponseHandler(generation));
}
AbstractCoordinator#onJoinLeader:leader会收到coordinator返回的所有成员订阅情况(members),需要为组内成员分配分区,SyncGroupRequest中包含分配结果assignments。
private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 执行assignment,为每个成员分配消费分区
Map groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
joinResponse.data().members());
List.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(
new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(assignment.getKey())
.setAssignment(Utils.toArray(assignment.getValue()))
);
}
// 发送SyncGroupRequest给coordinator
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
// ... 其他字段同consumer
.setAssignments(groupAssignmentList)
);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
ConsumerCoordinator#performAssignment:
1)isLeader=true,标记自己是leader;
2)assignor.assign,传入集群元数据(Cluster)和成员订阅元数据(memberId+Subscription),得到每个成员的分配分区(memberId+Assignment);
3)assignmentSnapshot = metadataSnapshot,记录本次分配时,相关topic的分区数量,后续poll的过程中会校验是否发生变化,如果发生变化要主动发起JoinGroup,启动一轮rebalance;
4)分配分区Assignment转ByteBuffer;
protected Map performAssignment(String leaderId,
String assignmentStrategy,
List allSubscriptions) {
// 分配器(RangeAssignor)
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
// 组内topic订阅全集
Set allSubscribedTopics = new HashSet<>();
// memberId -> 订阅
Map subscriptions = new HashMap<>();
// memberId -> 当前分配的分区
Map> ownedPartitions = new HashMap<>();
// 组装上面三个集合
for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
subscriptions.put(memberSubscription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());
}
// leader需要及时刷新所有topic的元数据,缓存组订阅的topic全集
updateGroupSubscription(allSubscribedTopics);
// leader标记
isLeader = true;
// 返回 memberId -> List<分区>
Map assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
// ...
// 记录分配时的元数据快照(每个topic有几个partition)
// ConsumerCoordinator#rejoinNeededOrPending
// poll的时候会看元数据是否变了
// 如果变了,leader就需要JoinGroupRequest重新发起一轮rebalance
assignmentSnapshot = metadataSnapshot;
log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);
// Assignment转ByteBuffer
Map groupAssignment = new HashMap<>();
for (Map.Entry assignmentEntry : assignments.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
return groupAssignment;
}
默认Assignor实现为RangeAssignor,需要实现assign方法。
输入输出如下:
public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor {
public abstract Map<String/*成员id*/, List<TopicPartition>/*分配分区*/>
assign
(Map<String/*topic*/, Integer/*分区数量*/> partitionsPerTopic,
Map<String/*成员id*/, Subscription/*订阅情况*/> subscriptions);
}
RangeAssignor:循环topic和订阅这个topic的成员列表,Kafka允许消费组的订阅关系不一致。
比如有消费者memberId=C1和C2,都订阅了topic=T1和T2,每个topic有3个分区,C1排序在前获得每个topic的0-1号分区,C2排序在后获得每个topic的2号分区。最终分配结果为:C1=T1P0、T1P1、T2P0、T2P1;C2=T1P2、T2P2。
public class RangeAssignor extends AbstractPartitionAssignor {
@Override
public Map> assign(
Map partitionsPerTopic,
Map subscriptions) {
// topic ->订阅这个topic的member
Map> consumersPerTopic = consumersPerTopic(subscriptions);
// 结果集
Map> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<>());
// 循环每个topic 和 订阅这个topic的成员s
for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List consumersForTopic = topicEntry.getValue();
// topic的分区数量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
// 有groupInstanceId的在前,先按照groupInstanceId排序,再按照memberId排序
// 简单理解为按照memberId排序
Collections.sort(consumersForTopic);
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
// Topic+分区Id封装为TopicPartition
List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i +
Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer +
(i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i).memberId)
.addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}
4-5、coordinator处理SyncGroupRequest
KafkaApis#handleSyncGroupRequest:follower和leader都会发送SyncGroupRequest,但是只有leader会包含分区分配结果(assignments参数)。
def handleSyncGroupRequest(request: RequestChannel.Request): Unit = {
val syncGroupRequest = request.body[SyncGroupRequest]
// 响应SyncGroupResponse
def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(syncGroupResult.error.code)
.setProtocolType(syncGroupResult.protocolType.orNull)
.setProtocolName(syncGroupResult.protocolName.orNull)
// 分区分配assignment
.setAssignment(syncGroupResult.memberAssignment)
.setThrottleTimeMs(requestThrottleMs)
))
}
// 只有leader会发送assignment
val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
syncGroupRequest.data.assignments.forEach { assignment =>
assignmentMap += (assignment.memberId -> assignment.assignment)
}
groupCoordinator.handleSyncGroup(
syncGroupRequest.data.groupId,
syncGroupRequest.data.generationId,
syncGroupRequest.data.memberId,
Option(syncGroupRequest.data.protocolType),
Option(syncGroupRequest.data.protocolName),
Option(syncGroupRequest.data.groupInstanceId),
assignmentMap.result,
sendResponseCallback
)
}
GroupCoordinator#doSyncGroup:对于follower当前group一般会处于两种状态。
leader还未发送SyncGroup,group处于CompletingRebalance,follower的SyncGroup会在这里挂起,缓存响应callback到awaitingSyncCallback,当leader的SyncGroup到达后,再响应follower;
leader已经发送SyncGroup,完成storeGroup,group处于Stable,follower的SyncGroup会被直接回复,因为coordinator已经知道group分区分配情况。
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
group.inLock { // group锁
if (!group.has(memberId)) {
// 如果成员syncgroup超时,被移除出组
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
} else if (generationId != group.generationId) {
// generation不同,说明已经发生过其他joingroup
responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
}
// ...其他校验
else {
group.currentState match {
case Empty =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case CompletingRebalance =>
// follower的SyncGroupRequest,仅缓存response回调方法
group.get(memberId).awaitingSyncCallback = responseCallback
if (group.isLeader(memberId)) {
// 收到leader的SyncGroupRequest
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
val missing = group.allMembers.diff(groupAssignment.keySet)
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// 存储groupMetadata
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
if (group.is(CompletingRebalance)
&& generationId == group.generationId) {
if (error != Errors.NONE) {
// 存储失败,再次开始rebalance,进入PreparingRebalance,
// 响应SyncGroupResponse为ERROR
resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
} else {
// 存储成功,更新成员分配到内存,响应所有成员SyncGroupResponse
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}
})
groupCompletedRebalanceSensor.record()
}
case Stable =>
// 如果已经stable,收到syncgroup,直接响应分配结果
// leader的SyncGroup先到,follower后到的情况
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
}
}
}
}
private def setAndPropagateAssignment(group: GroupMetadata,
assignment: Map[String, Array[Byte]]): Unit = {
// 缓存分区分配方案
group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
propagateAssignment(group, Errors.NONE)
}
private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {
// ...
for (member <- group.allMemberMetadata) {
// 向所有发送SyncGroup的成员发送响应,分配消费分区
if (group.maybeInvokeSyncCallback(member,
SyncGroupResult(protocolType, protocolName,
member.assignment, error))) {
// 开启下一轮心跳超时检测
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
GroupMetadataManager#storeGroup:对于leader的SyncGroup,将组元数据GroupMetadata通过消息的方式存储,topic= _consumeroffsets,partition=该组对应的协调者的leader分区。
def storeGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Unit = {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val key = GroupMetadataManager.groupMetadataKey(group.groupId)
val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
Seq(new SimpleRecord(timestamp, key, value)).asJava))
val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
builder.append(timestamp, key, value)
builder.build()
}
// topic=__consumer_offsets
// partition=当初为group分配协调者的leader分区
val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val groupMetadataRecords = Map(groupMetadataPartition -> records)
val generationId = group.generationId
appendForGroup(group, groupMetadataRecords, putCacheCallback)
case None =>
responseCallback(Errors.NOT_COORDINATOR)
None
}
}
GroupMetadataManager#groupMetadataValue:消息key=groupId,消息value中包含GroupMetadata所有属性,包括每个成员的订阅和分配。
def groupMetadataValue(groupMetadata: GroupMetadata,
assignment: Map[String, Array[Byte]],
apiVersion: ApiVersion): Array[Byte] = {
val (version, value) = {
//...
(3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
}
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
value.set(GENERATION_KEY, groupMetadata.generationId)
value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
value.set(LEADER_KEY, groupMetadata.leaderOrNull)
value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
val memberStruct = value.instance(MEMBERS_KEY)
memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
val protocol = groupMetadata.protocolName.orNull
val metadata = memberMetadata.metadata(protocol)
// 订阅
memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
// 分配
val memberAssignment = assignment(memberMetadata.memberId)
memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
memberStruct
}
value.set(MEMBERS_KEY, memberArray.toArray)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(version)
value.writeTo(byteBuffer)
byteBuffer.array()
}
4-6、consumer收到SyncGroupResponse
SyncGroupResponseHandler:收到SyncGroupResponse,所有成员得到leader分配给自己的分区。如果coordinator返回异常,标记需要重新JoinGroup。
private class SyncGroupResponseHandler extends
CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
// ...
log.info("Successfully synced group in generation {}", generation);
state = MemberState.STABLE;
rejoinNeeded = false;
future.complete(
// 分配给自己的分区
ByteBuffer.wrap(syncResponse.data.assignment()));
} else {
// 标记需要重新JoinGroup,rejoinNeeded=true
requestRejoin();
// ...异常
future.raise(...);
}
}
}
AbstractCoordinator#joinGroupIfNeeded:至此initiateJoinGroup完成,future中包含本轮rebalance后当前成员分配到的消费分区。
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
// 确保Coordinator连接建立
if (!ensureCoordinatorReady(timer)) {
return false;
}
// 清理之前的订阅状态
if (needsJoinPrepare) {
needsJoinPrepare = false;
onJoinPrepare(generation.generationId, generation.memberId);
}
// 发送JoinGroupRequest、SyncGroupRequest
final RequestFuture future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// timer超时
return false;
}
if (future.succeeded()) {
// SyncGroupRequest完成
Generation generationSnapshot;
MemberState stateSnapshot;
// 更新本轮rebalance的Generation和MemberState
synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation;
stateSnapshot = this.state;
}
if (generationSnapshot != Generation.NO_GENERATION
&& stateSnapshot == MemberState.STABLE) {
// 处理收到assignment
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generationSnapshot.generationId,
generationSnapshot.memberId,
generationSnapshot.protocolName,
memberAssignment);
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetStateAndRejoin();
resetJoinGroupFuture();
}
}
}
}
ConsumerCoordinator#onJoinComplete:非leader清空assignmentSnapshot,不会感知元数据变更(topic对应分区数量变更)而发起rebalance。将leader下发的assignments更新到内存订阅状态。
private final SubscriptionState subscriptions;
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
if (!isLeader)
// follower,清空assignmentSnapshot,不感知元数据变更发起rebalance
assignmentSnapshot = null;
groupMetadata = new ConsumerGroupMetadata
(rebalanceConfig.groupId, generation, memberId,
rebalanceConfig.groupInstanceId);
Assignment assignment =
ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// 新分配给自己的分区
Set assignedPartitions = new HashSet<>(assignment.partitions());
maybeUpdateJoinedSubscription(assignedPartitions);
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
// 将新分配的分区更新到内存
subscriptions.assignFromSubscribed(assignedPartitions);
//...
}
五、心跳
5-1、consumer发送HeartbeatRequest
KafkaConsumer只有两个线程,用户poll线程和心跳线程。
只有当收到JoinGroup响应之后(状态=COMPLETING_REBALANCE或STABLE),心跳线程才会运行。
1)如果发现协调者下线,发起FindCoordinatorRequest;
2)如果超时未收到心跳响应(session.timeout.ms=10s(高版本45s)),标记协调者下线;
3)如果用户代码consumer.poll两次间隔超时(max.poll.interval.ms=5分钟),主动离组,发送LeaveGroupRequest,state=>UNJOINED,generation=NO_GENERATION;
4)如果没到心跳时间(heartbeat.interval.ms=3s),等一会再执行(retry.backoff.ms=100ms);
5)不满足上述条件,发送心跳请求,收到心跳响应更新心跳时间(heartbeat.receiveHeartbeat);
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
// 是否允许发送心跳
private boolean enabled = false;
public void enable() {
synchronized (AbstractCoordinator.this) {
this.enabled = true;
heartbeat.resetTimeouts();
AbstractCoordinator.this.notify();
}
}
public void disable() {
synchronized (AbstractCoordinator.this) {
this.enabled = false;
}
}
public void run() {
while (true) {
synchronized (AbstractCoordinator.this) { // 协调者锁
if (!enabled) {
AbstractCoordinator.this.wait();
continue;
}
// 未进组(状态为UNJOINED或PREPARING_REBALANCE),停止工作
if (state.hasNotJoinedGroup() || hasFailed()) {
disable();
continue;
}
// 通讯客户端 读写请求响应
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
// 1. coordinator下线,心跳线程也会主动发起FindCoordinatorRequest
if (findCoordinatorFuture != null) {
clearFindCoordinatorFuture();
AbstractCoordinator.this
.wait(rebalanceConfig.retryBackoffMs);
} else {
lookupCoordinator();
}
} else if (heartbeat.sessionTimeoutExpired(now)) {
// 2. 超时没有收到心跳响应,认为协调者下线,session.timeout.ms=10s(高版本45s)
markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
// 3. consumer.poll超时,发送LeaveGroupRequest,state=>UNJOINED,generation=NO_GENERATION
maybeLeaveGroup(leaveReason);
} else if (!heartbeat.shouldHeartbeat(now)) {
// 4. 没到心跳时间(3s),跳过
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
// 5. 到心跳时间(3s),发送心跳
heartbeat.sentHeartbeat(now);
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
// 6. 更新上次心跳时间
heartbeat.receiveHeartbeat();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
// 6. 更新上次心跳时间
heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
heartbeatThread.failed.set(e);
} else {
heartbeat.failHeartbeat();
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
}
}
AbstractCoordinator#sendHeartbeatRequest:请求参数=groupId+memberId+generationId。
synchronized RequestFuture sendHeartbeatRequest() {
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler(generation));
}
5-2、coordinator处理HeartbeatRequest
GroupCoordinator#handleHeartbeat:
1)如果成员已经不存在(比如协调者通过心跳超时sessionTimeout移除这个成员),响应UNKNOWN_MEMBER_ID;
2)如果group处于CompletingRebalance/Stable,代表消费组处于稳定状态,开启下一轮心跳超时检测延迟任务completeAndScheduleNextHeartbeatExpiration;
3)如果group处于PreparingRebalance,代表有人触发了rebalance(如发送了JoinGroupRequest),则响应REBALANCE_IN_PROGRESS,通知发送心跳的成员发送JoinGroupRequest;
def handleHeartbeat(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit): Unit = {
groupManager.getGroup(groupId) match {
case None =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) => group.inLock {
if (group.is(Dead)) {
responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
} else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {
responseCallback(Errors.FENCED_INSTANCE_ID)
} else if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION)
} else {
group.currentState match {
case CompletingRebalance =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
case PreparingRebalance =>
// 有成员发起了JoinGroup,开始rebalance了,响应REBALANCE_IN_PROGRESS
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case Stable =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
}
}
}
}
}
5-3、consumer收到HeartbeatResponse
HeartbeatResponseHandler:如果协调者返回部分异常情况,将触发poll线程发起JoinGroupRequest参与新一轮rebalance。
1)REBALANCE_IN_PROGRESS,标记rejoinNeeded=true;
2)UNKNOWN_MEMBER_ID,重置状态UNJOINED,重置generation=NO_GENERATION;
private class HeartbeatResponseHandler extends
CoordinatorResponseHandler {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
Errors error = heartbeatResponse.error();
if (error == Errors.NONE) {
future.complete(null);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid",
coordinator());
markCoordinatorUnknown();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
if (state == MemberState.STABLE) {
// this.rejoinNeeded = true;
// poll主线程发现需要rejoin,会发起JoinGroupRequest
requestRejoin();
future.raise(error);
} else {
future.complete(null);
}
} else if (error == Errors.ILLEGAL_GENERATION ||
error == Errors.UNKNOWN_MEMBER_ID ||
error == Errors.FENCED_INSTANCE_ID) {
if (generationUnchanged()) {
log.info("Attempt to heartbeat with {} and group instance id {} failed due to {}, resetting generation",
sentGeneration, rebalanceConfig.groupInstanceId, error);
// state = MemberState.UNJOINED;
// generation = Generation.NO_GENERATION;
resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
future.raise(error);
} else {
future.complete(null);
}
}
// ...
}
}
5-4、coordinator发现成员心跳超时
GroupCoordinator#completeAndScheduleNextExpiration:每次收到心跳请求,协调者会开启新一轮心跳延迟任务。
private def completeAndScheduleNextExpiration(group: GroupMetadata,
member: MemberMetadata,
timeoutMs: Long): Unit = {
val memberKey = MemberKey(member.groupId, member.memberId)
// 标记成员心跳收到,完成这一轮心跳延迟任务
member.heartbeatSatisfied = true
heartbeatPurgatory.checkAndComplete(memberKey)
// 标记成员心跳未收到,开启新一轮心跳延迟任务,timeoutMs=sessionTimeout
member.heartbeatSatisfied = false
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
DelayedHeartbeat:心跳超时检测延迟任务。
private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
group: GroupMetadata,
memberId: String,
isPending: Boolean,
timeoutMs: Long)
extends DelayedOperation(timeoutMs, Some(group.lock)) {
// 心跳超时
override def onComplete() = coordinator.onCompleteHeartbeat()
}
// GroupCoordinator#onExpireHeartbeat
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
group.inLock {
// ...
val member = group.get(memberId)
if (!member.hasSatisfiedHeartbeat) {
// 超时没收到心跳
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
// 出组
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
}
GroupCoordinator#removeMemberAndUpdateGroup:
如果Stable或CompletingRebalance,稳定状态,maybePrepareRebalance触发rebalance,组状态变为PreparingRebalance;
如果PreparingRebalance,代表还在rebalance等待所有成员进组,可能由于这个老成员心跳超时退出而满足成组条件,响应组内成员JoinGroupResponse;
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
// 出组
group.remove(member.memberId)
group.removeStaticMember(member.groupInstanceId)
group.currentState match {
case Dead | Empty =>
// 已经成组了,但是现在有成员离组,触发rebalance,=>PreparingRebalance
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
// 收到JoinGroup,正在等待所有成员进组
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
GroupMetadata#remove:如果是leader出组,会直接选下一个memberId成为leader。
def remove(memberId: String): Unit = {
members.remove(memberId).foreach { member =>
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
if (member.isAwaitingJoin)
numMembersAwaitingJoin -= 1
}
if (isLeader(memberId))
leaderId = members.keys.headOption
}
六、COOPERATIVE协议rebalance
COOPERATIVE协议是为了解决rebalance期间Stop The World的问题,即rebalance期间消费组无法消费的问题。
七、静态成员
静态成员也是为了避免非必要的rebalance。
总结
发现协调者
每个broker都能处理FindCoordinatorRequest。
broker通过topic=___consumer_offsets的分区数,定位消费组对应协调节点。
协调者节点=leaderOfPartition(hash(groupId)%分区数),分区分布是否均衡决定了broker在协调者分配上是否负载均衡。
如果topic不存在会自动创建,默认__consumer_offsets的配置为50分区+3副本,segment大小100MB(正常topic是1G),清理策略=compact。注意:server.properties中副本数offsets.topic.replication.factor可能被设置为1。
Rebalance
Step1,发送JoinGroupRequest。
rebalance前需要提交一次offset,默认EAGER协议下清空分配分区,rebalance期间无法消费(Stop The World)。
成员刚启动 或 老成员没及时发送心跳被移除,memberId失效,协调者分配新memberId响应MEMBER_ID_REQUIRED,消费者重新发送JoinGroupRequest。
消费者携带memberId进组。
如果组内没有leader,第一个进组member成为leader。
协调者针对每个group开启一个延迟任务DelayedJoin,在rebalanceTimeout=消费者max.poll.interval.ms配置,默认5分钟(如果消费者配置不同取最大的)后会超时。如果是个新组延迟任务会采用InitialDelayedJoin,每3秒检测一次成组条件,最终超时时间也是5分钟。
成组条件1 = 所有老成员进组 + 没有pending成员(分配了memberId但是还没来得及第二次发送JoinGroup的成员);成组条件2 = rebalance超时,移除超时没进组的老成员。
成组后对每个成员开启心跳超时延迟任务,心跳超时时间=sessionTimeout=消费者配置session.timeout.ms=默认10s(高版本45s) ,响应JoinGroupResponse,包括leaderId、新generationId等,对于leader会额外携带所有consumer的protocols(协议包含订阅topic和当前分配的分区-用于渐进式rebalance)。
Step2,发送SyncGroupRequest,消费者此时都会开始发送心跳。
消费组Follower简单发送一个SyncGroupRequest,包含groupId+memberId+generationId等,用于获取分区分配方案。
消费组Leader从JoinGroupResponse中收到所有消费者的订阅情况,使用默认RangeAssignor为组内成员分配分区,发送SyncGroupRequest,额外包含分区分配方案assignments。
协调者收到Leader的分配方案,将其更新到内存GroupMetadata,持久化到topic=___consumer_offsets(作为该group的协调者对应的分区),响应所有成员SyncGroupResponse,包含分区分配方案。如果Follower从时序上晚于Leader发送SyncGroupRequest也没关系,直接响应SyncGroupResponse。
所有消费者收到分区分配,更新到内存,进入后续fetch阶段。
消费者心跳
心跳线程是消费者除poll应用线程以外唯一的独立线程,职责包括:
1)发现协调者下线,主动发起FindCoordinatorRequest;
2)超时未收到服务端心跳响应,session.timeout.ms=10s(高版本45s),标记协调者下线;
3)如果用户consumer.poll两次间隔超时(max.poll.interval.ms=5分钟),主动离组,发送LeaveGroupRequest;
4)满足心跳时间heartbeat.interval.ms=3s,发送心跳请求HeartbeatRequest;
协调者处理HeartbeatRequest:
1)如果成员不存在,代表心跳超时,已经出组,响应UNKNOWN_MEMBER_ID,成员需要重新JoinGroup;
2)如果消费组处于稳定状态,完成心跳延迟定时器并开启下一轮;
3)如果消费组处于PreparingRebalance,代表有人发了JoinGroup开启新一轮rebalance,响应REBALANCE_IN_PROGRESS,通知该成员进入rebalance,发送JoinGroup;
协调者的心跳延迟定时器超时:
1)成员出组;
2)如果消费组处于稳定状态,变更状态PreparingRebalance,后续其他成员发送心跳感知后,重新JoinGroup;
3)如果消费组处于PreparingRebalance,代表正在rebalance等待成员JoinGroup,判断是否满足成组条件;