Kafka源码(五)消费者rebalance

Java教程 2025-09-23

前言

本章分析Kafka消费者如何Rebalance,包括:

1)如何确定Coordinator:FindCoordinatorRequest;

2)消费者如何成组:JoinGroupRequest;

3)消费者如何得到分区分配方案:SyncGroupRequest;

4)COOPERATIVE协议的作用;

5)静态成员的作用;

注:本文基于kafka2.6,没有KRaft。

使用案例

public static void main(String[] args) {
  Properties props = new Properties();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
  // default enable.auto.commit=true
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  // default auto.commit.interval.ms=5000
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  // default session.timeout.ms=10000 超时没发送心跳,被移除消费组
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Arrays.asList("topic1"));
  for (int i = 0; i < 10; i++) {
      try {
          ConsumerRecords records = consumer.poll(Duration.ofMillis(1000 * 5));
          for (ConsumerRecord record : records) {
              System.out.println(record.value());
          }
      } catch (Exception e) {
          e.printStackTrace();
      }
  }
  consumer.close();
}

一、概述

1-1、消费方法概述

消费者通过KafkaConsumer#poll拉取消息,有两个API。

1)Duration真实代表poll方法最多阻塞等待的时间;

2)long并不是真实阻塞在poll方法的时长,会无限等待Metadata准备好,违背了超时时间语义,被废弃;

public ConsumerRecords poll(final long timeoutMs) {
    return poll(time.timer(timeoutMs), false);
}
public ConsumerRecords poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

另一方面,KafkaConsumer非线程安全,poll方法和最终调用broker都是同一个线程,如果要多线程需要用户自己实现。

 * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
 * making the call. It is the responsibility of the user to ensure that multi-threaded access
 * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.

KafkaConsumer#poll:poll分为两步:1)准备metadata;2)fetch拉消息。

private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) {
  try {
      do {
          // 1. 准备metadata
          if (includeMetadataInTimeout) {
              // poll(Duration)
              updateAssignmentMetadataIfNeeded(timer, false);
          } else {
              // 过时方法,poll(long) 
              while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                  log.warn("Still waiting for metadata");
              }
          }
          // 2. fetch拉消息
          final Map>> records = pollForFetches(timer);
          if (!records.isEmpty()) {
              if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                  client.transmitSends();
              }
              return this.interceptors.onConsume(new ConsumerRecords<>(records));
          }
      } while (timer.notExpired());
      return ConsumerRecords.empty();
  } finally {
      release();
  }
}

KafkaConsumer#updateAssignmentMetadataIfNeeded:准备metadata包括:协调者poll、更新消费位点。

private final ConsumerCoordinator coordinator; 
boolean updateAssignmentMetadataIfNeeded(final Timer timer, 
                  final boolean waitForJoinGroup/*false*/) {
    // 1. ConsumerCoordinator.poll
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }
    // 2. 更新消费位点
    return updateFetchPositions(timer);
}

本次分析的内容锁定在ConsumerCoordinator#poll。

1-2、rebalance概述

这里对比一下RocketMQ的Rebalance。(POP消费可以理解为不需要rebalance)

image.png

RocketMQ的rebalance由每个消费组组内成员独立完成

1)消费者向所有订阅的topic的master broker发送心跳,包含自身clientId;

2)循环每个topic进行rebalance;

3)从nameserver获取topic有多少个queue;

4)由于topic有n个queue在不同broker上,随便选一个broker,获取组成员列表clientIds;

5)使用AllocateMessageQueueAveragely策略为自己分配队列,根据自己的clientId所处所有clientIds的下标index,分配MessaegQueue。比如8个queue,3个client:index=0的client分配q0-q2,index=1的client分配q3-q5,index=2的client分配q6-q7;

整个过程中,所有broker节点,所有消费者实例,都是对等的,没有角色区分。

rebalance达成一致主要依赖两点:

1)消费组的订阅关系一致(一个组订阅相同的topic和tag),消费组订阅topic相关的broker都有组内所有成员心跳,从任一broker能发现所有成员clientIds;

2)将clientIds排序,消费者按照自身clientId在clientIds中的下标位置,平均分配队列;

Kafka的rebalance完全不同,引入了三种角色概念。

image.png

Coordinator:在broker中会为每个消费组分配一个Coordinator协调者,需要注意负载均衡

1)消费组成员注册发现;

2)消费组消费进度管理;

3)消费组分区分配协调;(并不直接处理消费组的分区分配)

Group Leader:每个消费组需要有一个Leader,主要作用是执行分区分配

Group Follower:接收分区分配。

二、相关组件介绍

2-1、SubscriptionState

订阅状态。assignment=rebalance最终分配给当前消费者的分区状态。

public class SubscriptionState {
    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    }
    // 订阅方式
    private SubscriptionType subscriptionType;
    // AUTO_PATTERN 正则匹配订阅
    private Pattern subscribedPattern;
    // AUTO_TOPICS 精确匹配订阅
    private Set<String> subscription;
    // 消费组订阅topics全集
    private Set<String> groupSubscription;
    // 分配分区-状态
    private final PartitionStates assignment;
    // 如果消费组首次创建,初始化消费进度的策略
    // auto.offset.reset=latest
    private final OffsetResetStrategy defaultResetStrategy;
    // 用户subscribe可以提供第二个参数ConsumerRebalanceListener
    private ConsumerRebalanceListener rebalanceListener;
    // rebalance导致assignment分区改变,assignmentId++
    private int assignmentId = 0;
}

有三种订阅模式。

KafkaConsumer#subscribe→SubscriptionState#subscribe:AUTO_TOPICS,精确匹配n个topics。

public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
      registerRebalanceListener(listener);
      setSubscriptionType(SubscriptionType.AUTO_TOPICS);
      return changeSubscription(topics);
}
  private boolean changeSubscription(Set<String> topicsToSubscribe) {
      if (subscription.equals(topicsToSubscribe))
          return false;
      subscription = topicsToSubscribe;
      return true;
  }

KafkaConsumer#subscribe→SubscriptionState#subscribe:AUTO_PATTERN,正则匹配topics;

public synchronized void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
    registerRebalanceListener(listener);
    setSubscriptionType(SubscriptionType.AUTO_PATTERN);
    this.subscribedPattern = pattern;
}

KafkaConsumer#assign→SubscriptionState#assignFromUser:USER_ASSIGNED,用户直接指定分区消费,不会使用消费组rebalance。(忽略)

public synchronized boolean assignFromUser(Set partitions) {
    setSubscriptionType(SubscriptionType.USER_ASSIGNED);
    if (this.assignment.partitionSet().equals(partitions))
        return false;
    // ...
}

SubscriptionState#assignment:消费组leader为当前消费者分配的分区→对应的分区状态。

public class PartitionStates<S> {
    private final LinkedHashMap map = new LinkedHashMap<>();
    private final Set partitionSetView = Collections.unmodifiableSet(map.keySet());
}
private static class TopicPartitionState {
    // fetch状态
    private FetchState fetchState;
    // fetch位点
    private FetchPosition position;
    // 高水位
    private Long highWatermark;
    // log startOffset
    private Long logStartOffset;
    // LSO
    private Long lastStableOffset;
    // 被用户停止
    private boolean paused;
    private OffsetResetStrategy resetStrategy;
    private Long nextRetryTimeMs;
    private Integer preferredReadReplica;
    private Long preferredReadReplicaExpireTimeMs;
}

2-2、ConsumerMetadata

ConsumerMetadata在Metadata的基础上根据订阅模式走不同逻辑处理MetadataRequest/Response。

精确订阅,精确获取元数据;正则订阅,需要全量获取topic,正则筛选元数据。

public class ConsumerMetadata extends Metadata {
    // 是否包含内部topic,默认false
    private final boolean includeInternalTopics;
    // 默认允许自动创建topic
    private final boolean allowAutoTopicCreation;
    // 订阅状态
    private final SubscriptionState subscription;
    // 忽略
    private final Set<String> transientTopics;

    public ConsumerMetadata(
       // retry.backoff.ms=100ms
      long refreshBackoffMs,
       // metadata.max.age.ms=5min
      long metadataExpireMs,
       // !(exclude.internal.topics=true) = false
       boolean includeInternalTopics,
       // allow.auto.create.topics=true
      boolean allowAutoTopicCreation,
      SubscriptionState subscription, // 订阅状态
      LogContext logContext,
      ClusterResourceListeners clusterResourceListeners) {
        super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
        this.includeInternalTopics = includeInternalTopics;
        this.allowAutoTopicCreation = allowAutoTopicCreation;
        this.subscription = subscription;
        this.transientTopics = new HashSet<>();
    }

    public boolean allowAutoTopicCreation() {
        return allowAutoTopicCreation;
    }

    @Override
    public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
        if (subscription.hasPatternSubscription())
             // 正则匹配,需要获取所有元数据
            return MetadataRequest.Builder.allTopics();
        // 精确匹配,只要获取订阅的(组leader分配的)
        List<String> topics = new ArrayList<>();
        topics.addAll(subscription.metadataTopics());
        topics.addAll(transientTopics);
        return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
    }
    // MetadataResponse中的topic是否要保留元数据
    @Override
    protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
        if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
            // 精确订阅
            return true;
        if (isInternal && !includeInternalTopics)
            // 内部topic默认不需要
            return false;
       // 正则匹配
        return subscription.matchesSubscribedPattern(topic);
    }
}

2-3、ConsumerNetworkClient

Consumer的通讯客户端高层封装。

public class ConsumerNetworkClient implements Closeable {
    // 通讯客户端
    private final KafkaClient client;
    // 待发送请求
    private final UnsentRequests unsent = new UnsentRequests();
    // 待处理响应
    private final ConcurrentLinkedQueue 
      pendingCompletion = new ConcurrentLinkedQueue<>();
    private final static class UnsentRequests {
      private final ConcurrentMap// 虽然这里是个队列,但是实际上一个节点只会有一个待发送请求
        // 这个是上层控制的,底层KafkaChannel同时只能有一个Send缓存
        ConcurrentLinkedQueue> unsent;
    }
}

Kafka的通讯方式和Producer类似。

ConsumerNetworkClient#send:上层先将请求缓存到unsent中。

public RequestFuture send(Node node,
          AbstractRequest.Builder requestBuilder,
          int requestTimeoutMs) {
  long now = time.milliseconds();
  // 响应回调
  RequestFutureCompletionHandler completionHandler = 
        new RequestFutureCompletionHandler();
  // 构造请求
  ClientRequest clientRequest = 
    client.newClientRequest(node.idString(), requestBuilder, now, true,
  requestTimeoutMs, completionHandler);
  // 缓存请求
  unsent.put(node, clientRequest);
  // nioSelector wakeup
  client.wakeup();
  // 返回响应future给上层
  return completionHandler.future;
}

ConsumerNetworkClient#poll:真实执行rpc,发送请求获取响应。

1)trySend:处理请求,将unsent缓存的请求转换为Send缓存到KafkaChannel;

2)client.poll:调用底层通讯客户端执行IO读写;

3)firePendingCompletedRequests:处理响应;

private final KafkaClient client;
public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
    // 1. 处理响应 pendingCompletion
    firePendingCompletedRequests();
    lock.lock();
    try {
        handlePendingDisconnects();
        // 2. 处理请求 unsent -> KafkaChannel.send
        // 如果连接未建立,这里触发建立连接
        long pollDelayMs = trySend(timer.currentTimeMs());

        // 3. 执行io 发请求 收响应
        if (pendingCompletion.isEmpty() && (pollCondition == null 
             || pollCondition.shouldBlock())) {
            long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
            if (client.inFlightRequestCount() == 0)
                pollTimeout = Math.min(pollTimeout, retryBackoffMs);
            client.poll(pollTimeout, timer.currentTimeMs());
        } else {
            client.poll(0, timer.currentTimeMs());
        }
        // ...
    } finally {
        lock.unlock();
    }
    // 4. 再次处理响应 pendingCompletion
    firePendingCompletedRequests();
    metadata.maybeThrowAnyException();
}
long trySend(long now) {
    // 3000
    long pollDelayMs = maxPollTimeoutMs;
    for (Node node : unsent.nodes()) {
        Iterator iterator = unsent.requestIterator(node);
        if (iterator.hasNext())
            pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
        while (iterator.hasNext()) {
            ClientRequest request = iterator.next();
            // 如果没建立连接,这里ready会尝试建立
            if (client.ready(node, now)) {
                // 将Request转换为Send,缓存到KafkaChannel通道
                client.send(request, now);
                iterator.remove();
            } else {
                break;
            }
        }
    }
    return pollDelayMs;
}

RequestFutureCompletionHandler是ConsumerNetworkClient的内部类,被ClientRequest包装传入底层通讯client。当收到响应后,handler被加入pendingCompletion队列。

private class RequestFutureCompletionHandler implements RequestCompletionHandler {
    private final RequestFuture future;
    private ClientResponse response;
    private RuntimeException e;

    private RequestFutureCompletionHandler() {
        this.future = new RequestFuture<>();
    }

    public void fireCompletion() {
        // 完成future
        if (e != null) {
            future.raise(e);
        } else if (response.authenticationException() != null) {
            future.raise(response.authenticationException());
        } else if (response.wasDisconnected()) {
                    response.requestHeader(), response.destination());
            future.raise(DisconnectException.INSTANCE);
        } else if (response.versionMismatch() != null) {
            future.raise(response.versionMismatch());
        } else {
            future.complete(response);
        }
    }
    public void onFailure(RuntimeException e) {
        this.e = e;
        pendingCompletion.add(this);
    }
    @Override
    public void onComplete(ClientResponse response) {
        this.response = response;
        pendingCompletion.add(this);
    }
}

ConsumerNetworkClient#firePendingCompletedRequests:循环所有响应,完成future。

private void firePendingCompletedRequests() {
    boolean completedRequestsFired = false;
    for (;;) {
        RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
        if (completionHandler == null)
            break;
        completionHandler.fireCompletion();
        completedRequestsFired = true;
    }
    if (completedRequestsFired)
        client.wakeup();
}

2-4、ConsumerGroupMetadata

ConsumerGroupMetadata:消费组元数据,消费组成员进组后更新。

public class ConsumerGroupMetadata {
    // group.id = 消费组id
    final private String groupId;
    // UNKNOWN_GENERATION_ID = -1
    final private int generationId;
    // UNKNOWN_MEMBER_ID = ""
    final private String memberId;
    // group.instance.id = 静态成员id(default null)
    final private Optional<String> groupInstanceId;
}

2-5、ConsumerPartitionAssignor

ConsumerPartitionAssignor:分配器,分区分配算法,通过 集群元数据+组成员订阅信息 分配 消费组成员消费分区。

public interface ConsumerPartitionAssignor {
  // 分配分区
  GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
  // 协议,默认EAGER
   default List<RebalanceProtocol> supportedProtocols() {
      return Collections.singletonList(RebalanceProtocol.EAGER);
  }
}
final class GroupSubscription {
   private final Map<String/*组成员id*/, Subscription> subscriptions;
}
final class Subscription {
  // 消费者实例订阅的topics
  private final List<String> topics;
  // 消费者实例实际被分配的分区
  private final List<TopicPartition> ownedPartitions;
  // 静态成员id
  private Optional<String> groupInstanceId;
}
final class GroupAssignment {
  private final Map<String/*组成员id*/, Assignment> assignments;
}
final class Assignment {
  private List<TopicPartition> partitions;
}

partition.assignment.strategy可配置ConsumerPartitionAssignor实现。

2.6版本默认策略为RangeAssignor,高版本为RangeAssignor+CooperativeStickyAssignor。

2-6、ConsumerCoordinator

ConsumerCoordinator:客户端侧的核心组件,与服务端协调者交互。

public final class ConsumerCoordinator extends AbstractCoordinator {
    // rebalance配置
    private final GroupRebalanceConfig rebalanceConfig;
    // rebalance分配分区算法
    private final List<ConsumerPartitionAssignor> assignors;
    // 消费者元数据
    private final ConsumerMetadata metadata;
    // 订阅状态
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    // 是否自动提交
    private final boolean autoCommitEnabled;
    // 自动提交间隔
    private final int autoCommitIntervalMs;
    // 消费拦截器(暴露给用户的扩展点)
    private final ConsumerInterceptors interceptors;
    private final AtomicInteger pendingAsyncCommits;
    private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
    // 自己是否是groupLeader
    private boolean isLeader = false;
    private Set<String> joinedSubscription;
    // 临时元数据
    private MetadataSnapshot metadataSnapshot;
    private MetadataSnapshot assignmentSnapshot;
    // 自动提交计时器
    private Timer nextAutoCommitTimer;
    private AtomicBoolean asyncCommitFenced;
    // 消费组元数据
    private ConsumerGroupMetadata groupMetadata;
    private final boolean throwOnFetchStableOffsetsUnsupported;
    // rebalance协议
    private final RebalanceProtocol protocol;
}

AbstractCoordinator:

public abstract class AbstractCoordinator implements Closeable {
    // 心跳信息
    private final Heartbeat heartbeat;
    // rebalance配置
    private final GroupRebalanceConfig rebalanceConfig;
    // 通讯客户端
    protected final ConsumerNetworkClient client;
    // 协调者节点
    private Node coordinator = null;
    // 是否需要JoinGroup进组
    private boolean rejoinNeeded = true;
    // 是否需要执行JoinPrepare
    private boolean needsJoinPrepare = true;
    // 心跳线程
    private HeartbeatThread heartbeatThread = null;
    // JoinGroup的future
    private RequestFuture joinFuture = null;
    // FindCoordinator的future
    private RequestFuture findCoordinatorFuture = null;
    private volatile RuntimeException fatalFindCoordinatorException = null;
    // generation
    private Generation generation = Generation.NO_GENERATION;
    private long lastRebalanceStartMs = -1L;
    private long lastRebalanceEndMs = -1L;
    private long lastTimeOfConnectionMs = -1L;
    // 消费者进组状态
    protected MemberState state = MemberState.UNJOINED;
}

ConsumerCoordinator#poll:本次分析的核心方法。

1)ensureCoordinatorReady:发现Coordinator;

2)ensureActiveGroup:加入消费组,完成rebalance;

3)maybeAutoCommitOffsetsAsync:如果自动提交,会在coordinator里执行提交;(本次分析忽略)

public boolean poll(Timer timer, boolean waitForJoinGroup) {
    maybeUpdateSubscriptionMetadata();
    invokeCompletedOffsetCommitCallbacks();
    if (subscriptions.hasAutoAssignedPartitions()) {
        // case1 精确订阅 / 正则订阅
        // 唤醒kafka-coordinator-heartbeat-thread线程
        // 更新 Heartbeat 上次 poll 时间
        // 维持 max.poll.interval.ms = 5m 避免触发 rebalance
        pollHeartbeat(timer.currentTimeMs());
        // Step1,发现Coordinator FindCoordinatorRequest 确保Coordinator连接建立
        if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
            return false;
        }
        // Step2,尝试重新进组
        if (rejoinNeededOrPending()) {
            // 消费者进组 JoinGroupRequest + SyncGroupRequest
            if (!ensureActiveGroup(waitForJoinGroup ? 
                                   timer : time.timer(0L))) {
                timer.update(time.milliseconds());

                return false;
            }
        }
    } else {
        // case2 用户手动assign分区,只需要更新元数据
        if (metadata.updateRequested() && 
            !client.hasReadyNodes(timer.currentTimeMs())) {
            client.awaitMetadataUpdate(timer);
        }
    }
    // 自动提交offset
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}

三、发现协调者

3-1、consumer发现协调者不可用

AbstractCoordinator#coordinatorUnknown:如果这里检测到coordinator节点连接断开,设置coordinator为null,代表需要重新发现协调者。

private Node coordinator = null;
public boolean coordinatorUnknown() {
    return checkAndGetCoordinator() == null;
}
protected synchronized Node checkAndGetCoordinator() {
    if (coordinator != null && client.isUnavailable(coordinator)) {
        markCoordinatorUnknown(true);
        return null;
    }
    return this.coordinator;
}
protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
    if (this.coordinator != null) {
        log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
        Node oldCoordinator = this.coordinator;
        this.coordinator = null;
        if (!isDisconnected)
            client.disconnectAsync(oldCoordinator);
        lastTimeOfConnectionMs = time.milliseconds();
    }
}

3-2、consumer发送FindCoordinatorRequest

AbstractCoordinator#ensureCoordinatorReady:发送FindCoordinatorRequest。

protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
  if (!coordinatorUnknown())
      return true;
  do {
      if (fatalFindCoordinatorException != null) {
          final RuntimeException fatalException = fatalFindCoordinatorException;
          fatalFindCoordinatorException = null;
          throw fatalException;
      }
      // 缓存FindCoordinatorRequest到ConsumerNetworkClient.unsent
      final RequestFuture future = lookupCoordinator();
      // 通讯层发起FindCoordinatorRequest请求,等待处理结果
      client.poll(future, timer);
      // FindCoordinatorRequest超时直接返回
      if (!future.isDone()) {
          break;
      }
      RuntimeException fatalException = null;
      if (future.failed()) {
          if (future.isRetriable()) {
              client.awaitMetadataUpdate(timer);
          } else {
              fatalException = future.exception();
          }
      } else if (coordinator != null && client.isUnavailable(coordinator)) {
          markCoordinatorUnknown();
          timer.sleep(rebalanceConfig.retryBackoffMs);
      }
      clearFindCoordinatorFuture();
      if (fatalException != null)
          throw fatalException;
  } while (coordinatorUnknown() && timer.notExpired());
  return !coordinatorUnknown();
}

AbstractCoordinator#lookupCoordinator:挑选负载最低的broker节点,查询当前消费组的Coordinator节点。负载根据client和broker的inFlightRequests(处理中请求数)决定。

private RequestFuture findCoordinatorFuture = null;
protected synchronized RequestFuture lookupCoordinator() {
    if (findCoordinatorFuture == null) {
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            return RequestFuture.noBrokersAvailable();
        } else {
            findCoordinatorFuture = sendFindCoordinatorRequest(node);
        }
    }
    return findCoordinatorFuture;
}

AbstractCoordinator#sendFindCoordinatorRequest:请求中包含当前消费组id。

private RequestFuture sendFindCoordinatorRequest(Node node) {
    FindCoordinatorRequest.Builder requestBuilder =
            new FindCoordinatorRequest.Builder(
                    new FindCoordinatorRequestData()
                        .setKeyType(CoordinatorType.GROUP.id())
                        .setKey(this.rebalanceConfig.groupId));
    return client.send(node, requestBuilder)
            .compose(new FindCoordinatorResponseHandler());
}

3-3、broker处理FindCoordinatorRequest

KafkaApis#handleFindCoordinatorRequest:处理FindCoordinatorRequest的主流程。

def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
    val findCoordinatorRequest = request.body[FindCoordinatorRequest]
    val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
      case CoordinatorType.GROUP =>
        // Step1,计算consumerGroup所属分区
        val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
        // Step2,尝试创建topic=__consumer_offsets
        val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
        (partition, metadata)
    }
  def createResponse(requestThrottleMs: Int): AbstractResponse = {
     // 构造FindCoordinatorResponse
    def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
      new FindCoordinatorResponse(
          new FindCoordinatorResponseData()
            .setErrorCode(error.code)
            .setErrorMessage(error.message)
         // brokerId
            .setNodeId(node.id)
        // host
            .setHost(node.host)
        // port
            .setPort(node.port)
            .setThrottleTimeMs(requestThrottleMs))
    }
    val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
      createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
    } else {
      // Step3,根据partitionId找到分区leader的端点
      val coordinatorEndpoint = topicMetadata.partitions.asScala
        .find(_.partitionIndex == partition)
        .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
        .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
        .flatMap(_.getNode(request.context.listenerName))
        .filterNot(_.isEmpty)
      coordinatorEndpoint match {
        case Some(endpoint) =>
          createFindCoordinatorResponse(Errors.NONE, endpoint)
        case _ =>
          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
      }
    }
    responseBody
  }
  sendResponseMaybeThrottle(request, createResponse)
}

GroupCoordinator#partitionFor:Step1

消费组对应的Coordinator取决于系统topic= _consumer_offsets的分区数(默认50)。

协调者节点=leaderOf(hash(groupId) %__consumer_offsets的分区数)。

class GroupCoordinator {
  def partitionFor(group: String): Int = groupManager.partitionFor(group)
}
// GroupMetadataManager#partitionFor
private val groupMetadataTopicPartitionCount = 
              getGroupMetadataTopicPartitionCount
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode)
            % groupMetadataTopicPartitionCount
private def getGroupMetadataTopicPartitionCount: Int = {
  zkClient.getTopicPartitionCount("__consumer_offsets")
        .getOrElse(config.offsetsTopicNumPartitions/*50*/)
}

KafkaApis#getOrCreateInternalTopic:Step2,如果缓存里没有这个topic,会自动创建

private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
  val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
  topicMetadata.headOption.getOrElse(createInternalTopic(topic))
}

KafkaApis#createInternalTopic:_consumeroffsets的配置为50分区+3副本,segment大小100MB,清理策略=compact。

private def createInternalTopic(topic: String): MetadataResponseTopic = {
  // 存活broker数量
  val aliveBrokers = metadataCache.getAliveBrokers
  topic match {
    case "__consumer_offsets" =>
      if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
        // 存活broker < offsets.topic.replication.factor(3),返回COORDINATOR_NOT_AVAILABLE
        // server.properties这个配置可能是1,注意调整
        metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, 
                              topic, true, util.Collections.emptyList())
      } else {
        // 创建内部topic=__consumer_offsets,
        // partition=offsets.topic.num.partitions=50
        // replicationFactor=offsets.topic.replication.factor=3
        createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
          groupCoordinator.offsetsTopicConfigs)
      }
  }
}
// __consumer_offsets的其他配置
def offsetsTopicConfigs: Properties = {
  val props = new Properties
  // cleanup.policy=compact
  props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
  // segment.bytes=100MB(正常segment默认是1G)
  props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
  props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
  props
}

3-4、consumer处理FindCoordinatorResponse

FindCoordinatorResponseHandler处理FindCoordinatorResponse,与协调者建立连接。

协调者一方面参与消费组维护工作,另一方面也会作为普通broker提供消息能力,在客户端侧会分别建立连接,协调者的连接id=Integer.MAX_VALUE - brokerId

private class FindCoordinatorResponseHandler extends 
              RequestFutureAdapter {
  @Override
  public void onSuccess(ClientResponse resp, RequestFuture future) {
      FindCoordinatorResponse findCoordinatorResponse = 
        (FindCoordinatorResponse) resp.responseBody();
      Errors error = findCoordinatorResponse.error();
      if (error == Errors.NONE) {
          synchronized (AbstractCoordinator.this) {
              // 协调者的connectionId = Integer.MAX_VALUE - brokerId
              // 普通broker的connectionId = brokerId
              // 使用两条通讯的通道
              int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();

              AbstractCoordinator.this.coordinator = new Node(
                      coordinatorConnectionId,
                      findCoordinatorResponse.data().host(),
                      findCoordinatorResponse.data().port());
              log.info("Discovered group coordinator {}", coordinator);
              // 建立连接
              client.tryConnect(coordinator);
              // 重新开始计算session.timeout.ms=10s(高版本45s)协调者下线检测时间
              heartbeat.resetSessionTimeout();
          }
          future.complete(null);
      } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
          future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
      } else {
          log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
          future.raise(error);
      }
  }
  @Override
  public void onFailure(RuntimeException e, RequestFuture future) {
      if (!(e instanceof RetriableException)) {
          fatalFindCoordinatorException = e;
      }
      super.onFailure(e, future);
  }
}

四、消费者Rebalance

4-1、是否需要重新进组

ConsumerCoordinator#rejoinNeededOrPending:重新进组的情况有:

1)topic分区数量发生变更,只有leader才关注;

2)当前消费者实例订阅变更;

3)rejoinNeeded=true,比如刚启动;

public boolean rejoinNeededOrPending() {
    if (!subscriptions.hasAutoAssignedPartitions())
        // 用户手动assign,不需要rebalance
        return false;
    if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
        // topic分区数量发生变更(只有leader才有assignmentSnapshot)
        requestRejoin();
        return true;
    }
    if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
        // 当前消费者实例订阅变更
        requestRejoin();
        return true;
    }
    return super.rejoinNeededOrPending();
}
// 是否需要JoinGroup进组
private boolean rejoinNeeded = true;
// 是否需要执行JoinPrepare
private boolean needsJoinPrepare = true;
protected synchronized boolean rejoinNeededOrPending() {
    return rejoinNeeded || joinFuture != null;
}

4-2、consumer发送JoinGroupRequest

AbstractCoordinator#ensureActiveGroup:开启心跳线程,执行进组。

boolean ensureActiveGroup(final Timer timer) {
    // 确认coordinator就绪
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }
    // 开启心跳线程
    startHeartbeatThreadIfNeeded();
    // 进组
    return joinGroupIfNeeded(timer);
}
private HeartbeatThread heartbeatThread = null;
private synchronized void startHeartbeatThreadIfNeeded() {
    if (heartbeatThread == null) {
        heartbeatThread = new HeartbeatThread();
        heartbeatThread.start();
    }
}
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
    // 心跳线程创建了,但是暂时还不会发送心跳,需要先进组
    private boolean enabled = false;
}

AbstractCoordinator#joinGroupIfNeeded:执行JoinPrepare+发送JoinGroupRequest。

boolean joinGroupIfNeeded(final Timer timer) {
    // 需要进组
    while (rejoinNeededOrPending()) {
        // 确保Coordinator连接建立
        if (!ensureCoordinatorReady(timer)) {
            return false;
        }
        // 清理之前的订阅状态
        if (needsJoinPrepare) {
            needsJoinPrepare = false;
            onJoinPrepare(generation.generationId, generation.memberId);
        }
        // 发送JoinGroupRequest、SyncGroupRequest
        final RequestFuture future = initiateJoinGroup();
        client.poll(future, timer);
        if (!future.isDone()) {
            // 超时
            return false;
        }
        // 处理future结果
        // ...
    }
    return true;
}

AbstractCoordinator.Generation:客户端侧用Generation维护当前是第几轮进组。

protected static class Generation {
    public static final Generation NO_GENERATION = new Generation(
            // -1
            OffsetCommitRequest.DEFAULT_GENERATION_ID,
            // ""
            JoinGroupRequest.UNKNOWN_MEMBER_ID,
            null);
    public final int generationId; // 第几轮join
    public final String memberId; // 当前成员id
    public final String protocolName; // 用什么策略
}

ConsumerCoordinator#onJoinPrepare:

1)如果自动提交,先提交一下offset;

2)如果generation=null,清空分配给自己的分区;

3)如果generation≠null,根据protocol策略清除订阅状态中分配给自己的分区;

4)取消leader标记;

protected void onJoinPrepare(int generation, String memberId) {
  // 如果自动提交,先提交一下offset
  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
  Exception exception = null;
  final Set revokedPartitions;
  if (generation == Generation.NO_GENERATION.generationId &&
      memberId.equals(Generation.NO_GENERATION.memberId)) {
      // 初始状态或generation被heartbeat线程设置为NO_GENERATION
      // 取消所有分配给自己的分区
      revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
      if (!revokedPartitions.isEmpty()) {
          exception = invokePartitionsLost(revokedPartitions);
          subscriptions.assignFromSubscribed(Collections.emptySet());
      }
  } else {
      switch (protocol) { // 默认EAGER
          case EAGER:
               // 移除所有分配给自己的分区
              revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
              exception = invokePartitionsRevoked(revokedPartitions);
              subscriptions.assignFromSubscribed(Collections.emptySet());
              break;
          case COOPERATIVE:
              // 仅移除非自己订阅的topic的分区
              Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
              revokedPartitions = ownedPartitions.stream()
                  .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
                  .collect(Collectors.toSet());
              if (!revokedPartitions.isEmpty()) {
                  exception = invokePartitionsRevoked(revokedPartitions);
                  ownedPartitions.removeAll(revokedPartitions);
                  subscriptions.assignFromSubscribed(ownedPartitions);
              }
              break;
      }
  }
  // 标记自己非leader
  isLeader = false;
  // 清空消费组订阅的topic列表
  subscriptions.resetGroupSubscription();
  if (exception != null) {
      throw new KafkaException("User rebalance callback throws an error", exception);
  }
}

AbstractCoordinator#initiateJoinGroup:组装JoinGroupRequest,请求超时时间=max(request.timeout.ms=30s, rebalance.timeout.ms=5分钟+5s)。

groupId:消费组id;

sessionTimeoutMs:session.timeout.ms=10s(高版本45s),心跳超时时间;

memberId:当前成员id,初始为空串

groupInstanceId:静态成员id,默认空;

protocolType:固定consumer;

protocols:订阅的topics和当前的分配给自己的分区;

rebalanceTimeoutMs: max.poll.interval.ms=5分钟,rebalance等待超时时间;

 private synchronized RequestFuture initiateJoinGroup() {
    if (joinFuture == null) {
        // UNJOINED -> PREPARING_REBALANCE
        state = MemberState.PREPARING_REBALANCE;
        if (lastRebalanceStartMs == -1L)
            lastRebalanceStartMs = time.milliseconds();
        joinFuture = sendJoinGroupRequest();
    }
    return joinFuture;
}
RequestFuture sendJoinGroupRequest() {
  log.info("(Re-)joining group");
  JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
          new JoinGroupRequestData()
    .setGroupId(rebalanceConfig.groupId)
    .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
    .setMemberId(this.generation.memberId)
    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
    .setProtocolType(protocolType())
    .setProtocols(metadata())
    .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
  );
  // 请求超时时间 = max(request.timeout.ms=30s, rebalance.timeout.ms=5分钟+5s)
  int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
      rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
  return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
          .compose(new JoinGroupResponseHandler(generation));
}

protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
    this.joinedSubscription = subscriptions.subscription();
    JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = 
      new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
    List topics = new ArrayList<>(joinedSubscription);
    // 循环所有支持的分配算法,默认RangeAssignor
    for (ConsumerPartitionAssignor assignor : assignors) {
        Subscription subscription = new Subscription(
         // 订阅的topics
        topics,
        // 忽略       
        assignor.subscriptionUserData(joinedSubscription),
         // 当前分配到的分区
        subscriptions.assignedPartitionsList());
        ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
        protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
                .setName(assignor.name())
                .setMetadata(Utils.toArray(metadata)));
    }
    return protocolSet;
}

4-3、coordinator处理JoinGroupRequest

4-3-1、分配memberId

GroupCoordinator#doUnknownJoinGroup:消费组成员刚上线进组,首次发送JoinGroupRequest,memberId为空,coordinator分配memberId并响应MEMBER_ID_REQUIRED

private def doUnknownJoinGroup(group: GroupMetadata,
       groupInstanceId: Option[String],
       requireKnownMemberId: Boolean,
       clientId: String,
       clientHost: String,
       rebalanceTimeoutMs: Int,
       sessionTimeoutMs: Int,
       protocolType: String,
       protocols: List[(String, Array[Byte])],
       // 响应回调
       responseCallback: JoinCallback): Unit = {
  group.inLock {
    //...
    // 生成memberId
    val newMemberId = group.generateMemberId(clientId, groupInstanceId)
    if (group.hasStaticMember(groupInstanceId)) {
      // 静态成员 暂时忽略
      updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
    } else if (requireKnownMemberId) {
      // memberId标记为pending
      group.addPendingMember(newMemberId)
      // 延迟任务sessionTimeoutMs=10s(高版本45s),如果10s内没有收到joinGroup请求,则将memberId从pendingMember中移除
      addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
      // 返回分配的memberId,和MEMBER_ID_REQUIRED异常
      responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
    }
  }
}

GroupMetadata#generateMemberId:memberId=clientId-{uuid},其中clientId由客户端生成,默认clientId=consumer-{groupId}-{客户端自增id}。

def generateMemberId(clientId: String,
                       groupInstanceId: Option[String]): String = {
    groupInstanceId match {
      case None =>
        // {client.id}-{uuid}
        // client.id默认客户端生成规则consumer-{groupId}-{客户端自增id}
        clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
      case Some(instanceId) =>
        // 静态成员(忽略)group.instance.id-{uuid}
        instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
    }
  }

GroupMetadata#pendingMembers:这个memberId被临时存储在pendingMembers中,如果客户端超时sessionTimeoutMs未进组,将被移除。

private val pendingMembers = new mutable.HashSet[String]
def addPendingMember(memberId: String) = pendingMembers.add(memberId)

AbstractCoordinator#joinGroupIfNeeded:回到consumer,更新generation的memberId,再次发起JoinGroupRequest。

private class JoinGroupResponseHandler extends 
    CoordinatorResponseHandler {
    public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
        Errors error = joinResponse.error();
        // ...
        if (error == Errors.MEMBER_ID_REQUIRED) {
            String memberId = joinResponse.data().memberId();
            synchronized (AbstractCoordinator.this) {
                 // 更新generation的memberId为服务端返回
                AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
            }
            future.raise(error);
        }
    }
}
boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
        // ...
        // 发送JoinGroupRequest
        final RequestFuture future = initiateJoinGroup();
        client.poll(future, timer);
        // ...
        if (future.succeeded()) {
           // ...
        } else {
            final RuntimeException exception = future.exception();
            resetJoinGroupFuture();
            if (exception instanceof UnknownMemberIdException ||
                exception instanceof RebalanceInProgressException ||
                exception instanceof IllegalGenerationException ||
                // 收到MemberIdRequiredException异常
                exception instanceof MemberIdRequiredException)
                continue;
            // ...
        }
    }
    return true;
}

4-3-2、新成员JoinGroup

上面新分配的memberId被标记为pending,新成员再次发送JoinGroupRequest。

GroupCoordinator#doJoinGroup:将JoinGroupRequest封装为MemberMetadata加入GroupMetadata,等待JoinGroupResponse成员数量numMembersAwaitingJoin+1。

private def doJoinGroup(group: GroupMetadata,
                          memberId: String,
                          groupInstanceId: Option[String],
                          clientId: String,
                          clientHost: String,
                          rebalanceTimeoutMs: Int,
                          sessionTimeoutMs: Int,
                          protocolType: String,
                          protocols: List[(String, Array[Byte])],
                          responseCallback: JoinCallback): Unit = {
  group.inLock {
    if (group.isPendingMember(memberId)) {
      // pending的成员正式进组
      addMemberAndRebalance(rebalanceTimeoutMs, 
        sessionTimeoutMs, memberId, groupInstanceId,
        clientId, clientHost, protocolType,
        protocols, group, responseCallback)
    }
  }
}
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                    sessionTimeoutMs: Int,
                                    memberId: String,
                                    groupInstanceId: Option[String],
                                    clientId: String,
                                    clientHost: String,
                                    protocolType: String,
                                    protocols: List[(String, Array[Byte])],
                                    group: GroupMetadata,
                                    callback: JoinCallback): Unit = {
  val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
    clientId, clientHost, rebalanceTimeoutMs,
    sessionTimeoutMs, protocolType, protocols)
  member.isNew = true
  if (group.is(PreparingRebalance) && group.generationId == 0)
    // group首次rebalance,标记newMemberAdded=true
    group.newMemberAdded = true
  // 进组,numMembersAwaitingJoin++
  group.add(member, callback)
  // 提交固定5分钟的DelayedHeartbeat延迟任务,如果超时没后续处理,member出组
  completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
  if (member.isStaticMember) {
    group.addStaticMember(groupInstanceId, memberId)
  } else {
    group.removePendingMember(memberId)
  }
  // 触发rebalance
  maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}
// GroupMetadata#add
def add(member: MemberMetadata, callback: JoinCallback = null): Unit = {
  if (members.isEmpty)
    this.protocolType = Some(member.protocolType)
  if (leaderId.isEmpty)
    // 第一个进组的成员,成为leader
    leaderId = Some(member.memberId)
  members.put(member.memberId, member)
  member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
  member.awaitingJoinCallback = callback
  if (member.isAwaitingJoin)
    numMembersAwaitingJoin += 1
}

GroupCoordinator#maybePrepareRebalance:每个group对应一个DelayedJoin延迟任务,尝试完成DelayedJoin进组,否则提交DelayedJoin进组延迟任务。

private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {
  group.inLock { // group锁
    if (group.canRebalance)
      prepareRebalance(group, reason)
  }
}

private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
  if (group.is(CompletingRebalance)) {
    // group已经完成进组流程,响应JoinGroupResponse,
    // 但是还没用下发assignment分配,则下发assignment=空的SyncGroupResult
    resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
  }

  val delayedRebalance = if (group.is(Empty)) {
    // 如果组里还没人,组刚创建,使用InitialDelayedJoin策略
    new InitialDelayedJoin(this,
      joinPurgatory,
      group,
      groupConfig.groupInitialRebalanceDelayMs,
       // group.initial.rebalance.delay.ms=3s                     
      groupConfig.groupInitialRebalanceDelayMs,
       // max.poll.interval.ms=5min - 3s
      max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
  }
  else {
    // 如果组里已经有人了,使用DelayedJoin策略,延迟max.poll.interval.ms=5min
    new DelayedJoin(this, group, group.rebalanceTimeoutMs)
  }
  // 变更为PreparingRebalance
  group.transitionTo(PreparingRebalance)
  val groupKey = GroupKey(group.groupId)
  // 针对group,尝试完成DelayedJoin,否则提交DelayedJoin延迟任务
  joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

4-3-3、老成员JoinGroup

group在broker侧有四个重要状态(详细的状态描述见GroupState的javadoc):

1)Empty:空组;

2)PreparingRebalance:收到JoinGroupRequest,rebalance中,还未响应JoinResponse;

3)CompletingRebalance:响应JoinResponse,达成成组条件;

4)Stable:收到leader的SyncGroupRequest分区分配方案;

GroupCoordinator#doJoinGroup:老成员JoinGroup需要根据状态和元数据情况区分。

1)PreparingRebalance:走updateMemberAndRebalance;

2)CompletingRebalance:如果响应JoinGroupResponse后再收到JoinGroupRequest(比如客户端发现rebalance超时),protocols与成员在broker侧的元数据比对(成员订阅的topics变更),发生变更走updateMemberAndRebalance;未变更,再响应JoinResponse;

3)Stable:如果是leader发起的或protocols变更,走updateMemberAndRebalance;未变更,再响应JoinResponse;

private def doJoinGroup(group: GroupMetadata,
        memberId: String,
        groupInstanceId: Option[String],
        clientId: String,
        clientHost: String,
        rebalanceTimeoutMs: Int,
        sessionTimeoutMs: Int,
        protocolType: String,
        protocols: List[(String, Array[Byte])],
        responseCallback: JoinCallback): Unit = {
  group.inLock {
    if (group.isPendingMember(memberId)) {
      // 新成员进组
    } else {
      // 非pending成员进组
      val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
      if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
        responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
      } else if (!group.has(memberId) || groupInstanceIdNotFound) {
        responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
      } else {
        val member = group.get(memberId)
        group.currentState match {
          case PreparingRebalance =>
            // 收到JoinGroup还未响应JoinResponse
            updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback)
          case CompletingRebalance =>
            // 响应JoinResponse等待SyncGroupRequest
            if (member.matches(protocols)) {
              responseCallback(JoinGroupResult(
                members = if (group.isLeader(memberId)) {
                  group.currentMemberMetadata
                } else {
                  List.empty
                },
                memberId = memberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                error = Errors.NONE))
            } else {
              updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
            }
          case Stable =>
            // 收到leader的SyncGroupRequest
            val member = group.get(memberId)
            if (group.isLeader(memberId)) {
              updateMemberAndRebalance(group, member, protocols, s"leader ${member.memberId} re-joining group during ${group.currentState}", responseCallback)
            } else if (!member.matches(protocols)) {
              updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
            } else {
              responseCallback(JoinGroupResult(
                members = List.empty,
                memberId = memberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                error = Errors.NONE))
            }
          case Empty | Dead =>
            responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
        }
      }
    }
  }
}

GroupCoordinator#updateMemberAndRebalance:更新MemberMetadata,等待JoinGroupResponse成员数量numMembersAwaitingJoin+1,尝试完成DelayedJoin进组,否则提交DelayedJoin进组延迟任务。

 private def updateMemberAndRebalance(group: GroupMetadata,
       member: MemberMetadata,
       protocols: List[(String, Array[Byte])],
       reason: String,
       callback: JoinCallback): Unit = {
  group.updateMember(member, protocols, callback)
  maybePrepareRebalance(group, reason)
}
// 同新成员进组,尝试完成DelayedJoin,否则提交DelayedJoin延迟任务
private def maybePrepareRebalance(group: GroupMetadata, 
                                  reason: String): Unit = {
  group.inLock {
    if (group.canRebalance)
      prepareRebalance(group, reason)
  }
}

def updateMember(member: MemberMetadata,
                   protocols: List[(String, Array[Byte])],
                   callback: JoinCallback): Unit = {
  member.supportedProtocols = protocols

  if (callback != null && !member.isAwaitingJoin) {
    numMembersAwaitingJoin += 1
  } 
  member.awaitingJoinCallback = callback
}

4-3-4、成组

PreparingRebalance阶段,一个group会对应一个DelayedJoin。

每个JoinGroupRequest,都会尝试执行DelayedJoin#tryComplete,判断是否可以满足完成成组条件,如果不满足则提交rebalanceTimeoutMs=5分钟的延迟任务。

当满足成组条件或超时,执行DelayedJoin#onComplete。

private[group] class DelayedJoin(coordinator: GroupCoordinator,
   group: GroupMetadata,
   rebalanceTimeout: Long) 
  extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
  // 每次成员JoinGroupRequest,触发tryComplete,尝试完成进组
  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
  // tryComplete成功/超时,执行
  override def onComplete() = coordinator.onCompleteJoin(group)
}

GroupCoordinator#tryCompleteJoin:如果 成员数量 = 等待JoinGroupResponse成员数量(numMembersAwaitingJoin),且没有pending成员(刚分配memberId还未发送JoinGroupRequest的成员)待加入,则成组。

def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
  group.inLock {
    if (group.hasAllMembersJoined) {
      // 执行onComplete
      forceComplete()
    } else false
  }
}
// GroupMetadata
def hasAllMembersJoined = members.size == numMembersAwaitingJoin && pendingMembers.isEmpty

关于超时计算的方式,在broker侧有两种策略。

DelayedJoin:当消费组不为空发生rebalance,rebalanceTimeoutMs=5分钟。

InitialDelayedJoin:如果消费组为空发生reblance,3s内没新成员进组,则执行DelayedJoin#onComplete快速成组,3s内有新成员进组,则再延迟3s判断,最大不超过rebalanceTimeoutMs=5分钟。

private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
  purgatory: DelayedOperationPurgatory[DelayedJoin],
  group: GroupMetadata,
   // group.initial.rebalance.delay.ms=3s   
  configuredRebalanceDelay: Int,
  delayMs: Int,
  // max.poll.interval.ms=5min - group.initial.rebalance.delay.ms=3s
  remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {
  override def tryComplete(): Boolean = false
  // 当超时自动触发
  override def onComplete(): Unit = {
    group.inLock {
      // 有新成员进组newMemberAdded会设置为true
      // remainingMs=剩余rebalance超时时间 > 0
      if (group.newMemberAdded && remainingMs != 0) {
        group.newMemberAdded = false
        val delay = min(configuredRebalanceDelay, remainingMs)
        val remaining = max(remainingMs - delayMs, 0)
        purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
          purgatory,
          group,
          configuredRebalanceDelay,
          delay,
          remaining
        ), Seq(GroupKey(group.groupId)))
      } else
        // 3s内没新成员进组,或超过rebalanceTimeout5分钟,走DelayedJoin
        super.onComplete()
    }
  }
}

GroupCoordinator#onCompleteJoin:成组

1)移除超时时间内没发送JoinGroup的成员;

2)选leader;

3)generation+1;

4)响应所有成员JoinGroupResponse,只响应leader所有成员元数据(JoinGroupRequest.protocols);

5)开启心跳超时检测,对应JoinGroupRequest.sessionTimeout(session.timeout.ms=10s(高版本45s));

def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock {
    // 1. 移除没发送JoinGroupRequest的成员
    // (如果这里移除了,往往就是rebalance超时触发的成组)
    group.notYetRejoinedMembers.filterNot(_.isStaticMember) 
    foreach { failedMember =>
      removeHeartbeatForLeavingMember(group, failedMember)
      group.remove(failedMember.memberId)
      group.removeStaticMember(failedMember.groupInstanceId)
    }
    if (
     // 2. 选leader
     !group.maybeElectNewJoinedLeader() 
     && group.allMembers.nonEmpty) {
      joinPurgatory.tryCompleteElseWatch(
        new DelayedJoin(this, group, group.rebalanceTimeoutMs),
        Seq(GroupKey(group.groupId)))
    } else {
      // 3. generation++
      group.initNextGeneration()
      for (member <- group.allMemberMetadata) {
        val joinResult = JoinGroupResult(
          members = if (group.isLeader(member.memberId)) {
            // leader才响应所有成员元数据(JoinGroupRequest.protocols)
            group.currentMemberMetadata
          } else {
            List.empty
          },
          memberId = member.memberId,
          generationId = group.generationId,
          protocolType = group.protocolType,
          protocolName = group.protocolName,
          leaderId = group.leaderOrNull,
          error = Errors.NONE)
        // 4. 响应JoinGroupResponse
        group.maybeInvokeJoinCallback(member, joinResult)
        // 5. 开启心跳检测延迟任务
        completeAndScheduleNextHeartbeatExpiration(group, member)
        member.isNew = false
      }
    }
  }
}
// GroupMetadata
def initNextGeneration(): Unit = {
  if (members.nonEmpty) {
    generationId += 1
    protocolName = Some(selectProtocol)
    // 组内成员订阅的topics全集
    subscribedTopics = computeSubscribedTopics()
    transitionTo(CompletingRebalance)
  } else {
    generationId += 1
    protocolName = None
    subscribedTopics = computeSubscribedTopics()
    transitionTo(Empty)
  }
  receivedConsumerOffsetCommits = false
  receivedTransactionalOffsetCommits = false
}

GroupMetadata#maybeElectNewJoinedLeader:选举leader。如果老leader在rebalance阶段及时发送JoinGroup,则保留老leader;否则随便选一个发送了JoinGroup的成员成为leader

def maybeElectNewJoinedLeader(): Boolean = {
  leaderId.exists { currentLeaderId =>
    val currentLeader = get(currentLeaderId)
    if (!currentLeader.isAwaitingJoin) {
      // 老leader没发送JoinGroup
      members.find(_._2.isAwaitingJoin) match {
        // 随便选一个发送JoinGroup的成员成为leader
        case Some((anyJoinedMemberId, anyJoinedMember)) =>
          leaderId = Option(anyJoinedMemberId)
          true
        case None =>
          false
      }
    } else {
      // 老leader发送了JoinGroup,直接当选
      true
    }
  }
}

4-4、consumer处理JoinGroupResponse,发送SyncGroupRequest

经过JoinGroup,coordinator知道消费组成员和他们的订阅情况(GroupMetadata),响应所有成员JoinGroupResponse。

JoinGroupResponseHandler#handle:处理JoinGroupResponse。

1)允许心跳线程开始发送心跳;

2)coordinator下发新的generation,consumer更新内存generation;

3)区分leader和follower执行SyncGroup,执行完成后(chain)才能完成最终的joinGroupFuture;

private class JoinGroupResponseHandler extends 
  CoordinatorResponseHandler {
  @Override
  public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
      Errors error = joinResponse.error();
      if (error == Errors.NONE) {
        synchronized (AbstractCoordinator.this) {
            if (state != MemberState.PREPARING_REBALANCE) {
                future.raise(new UnjoinedGroupException());
            } else {
                state = MemberState.COMPLETING_REBALANCE;
                // 允许发送心跳
                if (heartbeatThread != null)
                    heartbeatThread.enable();
                // 更新generation
                AbstractCoordinator.this.generation = new Generation(
                    joinResponse.data().generationId(),
                    joinResponse.data().memberId(), joinResponse.data().protocolName());
                log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
                // 区分leader和follower处理
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
            }
        }
    }
  }
}

AbstractCoordinator#onJoinFollower:follower回复一个简单的SyncGroupRequest,携带刚下发的generation,不包含assignments

private RequestFuture onJoinFollower() {
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(
                    new SyncGroupRequestData()
    .setGroupId(rebalanceConfig.groupId)
    .setMemberId(generation.memberId)
    .setProtocolType(protocolType())
    .setProtocolName(generation.protocolName)
    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
    .setGenerationId(generation.generationId)
    // 注意这里是空的
    .setAssignments(Collections.emptyList()));
    return sendSyncGroupRequest(requestBuilder);
}
private RequestFuture sendSyncGroupRequest(
  SyncGroupRequest.Builder requestBuilder) {
    if (coordinatorUnknown())
        return RequestFuture.coordinatorNotAvailable();
    return client.send(coordinator, requestBuilder)
            .compose(new SyncGroupResponseHandler(generation));
}

AbstractCoordinator#onJoinLeader:leader会收到coordinator返回的所有成员订阅情况(members),需要为组内成员分配分区,SyncGroupRequest中包含分配结果assignments

private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) {
    try {
       // 执行assignment,为每个成员分配消费分区
        Map groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
                joinResponse.data().members());
        List.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
        for (Map.Entry assignment : groupAssignment.entrySet()) {
            groupAssignmentList.add(
          new SyncGroupRequestData.SyncGroupRequestAssignment()
                .setMemberId(assignment.getKey())
                .setAssignment(Utils.toArray(assignment.getValue()))
            );
        }
       // 发送SyncGroupRequest给coordinator
        SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                  new SyncGroupRequestData()
                            // ... 其他字段同consumer
                          .setAssignments(groupAssignmentList)
                );
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}

ConsumerCoordinator#performAssignment:

1)isLeader=true,标记自己是leader;

2)assignor.assign,传入集群元数据(Cluster)和成员订阅元数据(memberId+Subscription),得到每个成员的分配分区(memberId+Assignment);

3)assignmentSnapshot = metadataSnapshot,记录本次分配时,相关topic的分区数量,后续poll的过程中会校验是否发生变化,如果发生变化要主动发起JoinGroup,启动一轮rebalance;

4)分配分区Assignment转ByteBuffer;

protected Map performAssignment(String leaderId,
         String assignmentStrategy,
List allSubscriptions) {
    // 分配器(RangeAssignor)
    ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
    // 组内topic订阅全集
    Set allSubscribedTopics = new HashSet<>();
    // memberId -> 订阅
    Map subscriptions = new HashMap<>();
    // memberId -> 当前分配的分区
    Map> ownedPartitions = new HashMap<>();
    // 组装上面三个集合
    for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
        Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
        subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
        subscriptions.put(memberSubscription.memberId(), subscription);
        allSubscribedTopics.addAll(subscription.topics());
        ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());
    }
    // leader需要及时刷新所有topic的元数据,缓存组订阅的topic全集
    updateGroupSubscription(allSubscribedTopics);
    // leader标记
    isLeader = true;
    // 返回 memberId -> List<分区>
    Map assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
    // ...
    // 记录分配时的元数据快照(每个topic有几个partition)
    // ConsumerCoordinator#rejoinNeededOrPending
    // poll的时候会看元数据是否变了
    // 如果变了,leader就需要JoinGroupRequest重新发起一轮rebalance
    assignmentSnapshot = metadataSnapshot;
    log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);
    // Assignment转ByteBuffer
    Map groupAssignment = new HashMap<>();
    for (Map.Entry assignmentEntry : assignments.entrySet()) {
        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
        groupAssignment.put(assignmentEntry.getKey(), buffer);
    }
    return groupAssignment;
}

默认Assignor实现为RangeAssignor,需要实现assign方法。

输入输出如下:

public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor {
    public abstract Map<String/*成员id*/, List<TopicPartition>/*分配分区*/> 
      assign
     (Map<String/*topic*/, Integer/*分区数量*/> partitionsPerTopic,
      Map<String/*成员id*/, Subscription/*订阅情况*/> subscriptions);
}

RangeAssignor:循环topic和订阅这个topic的成员列表,Kafka允许消费组的订阅关系不一致

比如有消费者memberId=C1和C2,都订阅了topic=T1和T2,每个topic有3个分区,C1排序在前获得每个topic的0-1号分区,C2排序在后获得每个topic的2号分区。最终分配结果为:C1=T1P0、T1P1、T2P0、T2P1;C2=T1P2、T2P2。

public class RangeAssignor extends AbstractPartitionAssignor {
    @Override
    public Map> assign(
      Map partitionsPerTopic,
      Map subscriptions) {
        // topic ->订阅这个topic的member
        Map> consumersPerTopic = consumersPerTopic(subscriptions);
        // 结果集
        Map> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<>());
        // 循环每个topic 和 订阅这个topic的成员s
        for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List consumersForTopic = topicEntry.getValue();
            // topic的分区数量
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            // 有groupInstanceId的在前,先按照groupInstanceId排序,再按照memberId排序
            // 简单理解为按照memberId排序
            Collections.sort(consumersForTopic);
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
            // Topic+分区Id封装为TopicPartition
            List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + 
                  Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + 
                  (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i).memberId)
                  .addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }
}

4-5、coordinator处理SyncGroupRequest

KafkaApis#handleSyncGroupRequest:follower和leader都会发送SyncGroupRequest,但是只有leader会包含分区分配结果(assignments参数)。

def handleSyncGroupRequest(request: RequestChannel.Request): Unit = {
    val syncGroupRequest = request.body[SyncGroupRequest]

    // 响应SyncGroupResponse
    def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
      sendResponseMaybeThrottle(request, requestThrottleMs =>
        new SyncGroupResponse(
          new SyncGroupResponseData()
            .setErrorCode(syncGroupResult.error.code)
            .setProtocolType(syncGroupResult.protocolType.orNull)
            .setProtocolName(syncGroupResult.protocolName.orNull)
            // 分区分配assignment
            .setAssignment(syncGroupResult.memberAssignment)
            .setThrottleTimeMs(requestThrottleMs)
        ))
    }

    // 只有leader会发送assignment
    val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
    syncGroupRequest.data.assignments.forEach { assignment =>
      assignmentMap += (assignment.memberId -> assignment.assignment)
    }

    groupCoordinator.handleSyncGroup(
      syncGroupRequest.data.groupId,
      syncGroupRequest.data.generationId,
      syncGroupRequest.data.memberId,
      Option(syncGroupRequest.data.protocolType),
      Option(syncGroupRequest.data.protocolName),
      Option(syncGroupRequest.data.groupInstanceId),
      assignmentMap.result,
      sendResponseCallback
    )
}

GroupCoordinator#doSyncGroup:对于follower当前group一般会处于两种状态。

leader还未发送SyncGroup,group处于CompletingRebalance,follower的SyncGroup会在这里挂起,缓存响应callback到awaitingSyncCallback,当leader的SyncGroup到达后,再响应follower;

leader已经发送SyncGroup,完成storeGroup,group处于Stable,follower的SyncGroup会被直接回复,因为coordinator已经知道group分区分配情况。

private def doSyncGroup(group: GroupMetadata,
                          generationId: Int,
                          memberId: String,
                          protocolType: Option[String],
                          protocolName: Option[String],
                          groupInstanceId: Option[String],
                          groupAssignment: Map[String, Array[Byte]],
                          responseCallback: SyncCallback): Unit = {
  group.inLock { // group锁
    if (!group.has(memberId)) {
      // 如果成员syncgroup超时,被移除出组
      responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
    } else if (generationId != group.generationId) {
      // generation不同,说明已经发生过其他joingroup
      responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
    } 
    // ...其他校验
    else {
      group.currentState match {
      case Empty =>
        responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
      case PreparingRebalance =>
        responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
      case CompletingRebalance =>
         // follower的SyncGroupRequest,仅缓存response回调方法
        group.get(memberId).awaitingSyncCallback = responseCallback
        if (group.isLeader(memberId)) {
          // 收到leader的SyncGroupRequest
          info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
          val missing = group.allMembers.diff(groupAssignment.keySet)
          val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
          // 存储groupMetadata
          groupManager.storeGroup(group, assignment, (error: Errors) => {
            group.inLock {
              if (group.is(CompletingRebalance) 
                  && generationId == group.generationId) {
                if (error != Errors.NONE) {
                  // 存储失败,再次开始rebalance,进入PreparingRebalance,
                  // 响应SyncGroupResponse为ERROR
                  resetAndPropagateAssignmentError(group, error)
                  maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
                } else {
                  // 存储成功,更新成员分配到内存,响应所有成员SyncGroupResponse
                  setAndPropagateAssignment(group, assignment)
                  group.transitionTo(Stable)
                }
              }
            }
          })
          groupCompletedRebalanceSensor.record()
        }
      case Stable =>
        // 如果已经stable,收到syncgroup,直接响应分配结果
        // leader的SyncGroup先到,follower后到的情况
        val memberMetadata = group.get(memberId)
        responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
        completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
      }
    }
  }
}
private def setAndPropagateAssignment(group: GroupMetadata, 
    assignment: Map[String, Array[Byte]]): Unit = {
  // 缓存分区分配方案
  group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
  propagateAssignment(group, Errors.NONE)
}
private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {
  // ...
  for (member <- group.allMemberMetadata) {
    // 向所有发送SyncGroup的成员发送响应,分配消费分区
    if (group.maybeInvokeSyncCallback(member, 
          SyncGroupResult(protocolType, protocolName, 
                          member.assignment, error))) {
      // 开启下一轮心跳超时检测
      completeAndScheduleNextHeartbeatExpiration(group, member)
    }
  }
}

GroupMetadataManager#storeGroup:对于leader的SyncGroup,将组元数据GroupMetadata通过消息的方式存储,topic= _consumeroffsets,partition=该组对应的协调者的leader分区

def storeGroup(group: GroupMetadata,
                 groupAssignment: Map[String, Array[Byte]],
                 responseCallback: Errors => Unit): Unit = {
  getMagic(partitionFor(group.groupId)) match {
    case Some(magicValue) =>
      val timestampType = TimestampType.CREATE_TIME
      val timestamp = time.milliseconds()
      val key = GroupMetadataManager.groupMetadataKey(group.groupId)
      val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
      val records = {
        val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
          Seq(new SimpleRecord(timestamp, key, value)).asJava))
        val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
        builder.append(timestamp, key, value)
        builder.build()
      }
      // topic=__consumer_offsets
      // partition=当初为group分配协调者的leader分区
      val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
      val groupMetadataRecords = Map(groupMetadataPartition -> records)
      val generationId = group.generationId
      appendForGroup(group, groupMetadataRecords, putCacheCallback)
    case None =>
      responseCallback(Errors.NOT_COORDINATOR)
      None
  }
}

GroupMetadataManager#groupMetadataValue:消息key=groupId,消息value中包含GroupMetadata所有属性,包括每个成员的订阅和分配。

def groupMetadataValue(groupMetadata: GroupMetadata,
                         assignment: Map[String, Array[Byte]],
                         apiVersion: ApiVersion): Array[Byte] = {

    val (version, value) = {
      //...
      (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
    }
    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
    value.set(GENERATION_KEY, groupMetadata.generationId)
    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
    value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
    val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
      val memberStruct = value.instance(MEMBERS_KEY)
      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
      memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
      memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
      val protocol = groupMetadata.protocolName.orNull
      val metadata = memberMetadata.metadata(protocol)
      // 订阅
      memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
      // 分配
      val memberAssignment = assignment(memberMetadata.memberId)
      memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
      memberStruct
    }
    value.set(MEMBERS_KEY, memberArray.toArray)
    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
    byteBuffer.putShort(version)
    value.writeTo(byteBuffer)
    byteBuffer.array()
  }

4-6、consumer收到SyncGroupResponse

SyncGroupResponseHandler:收到SyncGroupResponse,所有成员得到leader分配给自己的分区。如果coordinator返回异常,标记需要重新JoinGroup。

private class SyncGroupResponseHandler extends 
  CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {

  @Override
  public void handle(SyncGroupResponse syncResponse,
                     RequestFuture<ByteBuffer> future) {
      Errors error = syncResponse.error();
      if (error == Errors.NONE) {
        // ...
        log.info("Successfully synced group in generation {}", generation);
        state = MemberState.STABLE;
        rejoinNeeded = false;
        future.complete(
          // 分配给自己的分区
          ByteBuffer.wrap(syncResponse.data.assignment()));
      } else {
          // 标记需要重新JoinGroup,rejoinNeeded=true
          requestRejoin();
          // ...异常
          future.raise(...);
      }
  }
}

AbstractCoordinator#joinGroupIfNeeded:至此initiateJoinGroup完成,future中包含本轮rebalance后当前成员分配到的消费分区。

boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
        // 确保Coordinator连接建立
        if (!ensureCoordinatorReady(timer)) {
            return false;
        }
        // 清理之前的订阅状态
        if (needsJoinPrepare) {
            needsJoinPrepare = false;
            onJoinPrepare(generation.generationId, generation.memberId);
        }
        // 发送JoinGroupRequest、SyncGroupRequest
        final RequestFuture future = initiateJoinGroup();
        client.poll(future, timer);
        if (!future.isDone()) {
            // timer超时
            return false;
        }
        if (future.succeeded()) {
            // SyncGroupRequest完成
            Generation generationSnapshot;
            MemberState stateSnapshot;
           // 更新本轮rebalance的Generation和MemberState
            synchronized (AbstractCoordinator.this) {
                generationSnapshot = this.generation;
                stateSnapshot = this.state;
            }
            if (generationSnapshot != Generation.NO_GENERATION
                && stateSnapshot == MemberState.STABLE) {
                // 处理收到assignment
                ByteBuffer memberAssignment = future.value().duplicate();
                onJoinComplete(generationSnapshot.generationId,
                               generationSnapshot.memberId, 
                               generationSnapshot.protocolName, 
                               memberAssignment);
                resetJoinGroupFuture();
                needsJoinPrepare = true;
            } else {
                resetStateAndRejoin();
                resetJoinGroupFuture();
            }
        }
    }
}

ConsumerCoordinator#onJoinComplete:非leader清空assignmentSnapshot,不会感知元数据变更(topic对应分区数量变更)而发起rebalance。将leader下发的assignments更新到内存订阅状态。

private final SubscriptionState subscriptions;
protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
    if (!isLeader)
        // follower,清空assignmentSnapshot,不感知元数据变更发起rebalance
        assignmentSnapshot = null;
    groupMetadata = new ConsumerGroupMetadata
      (rebalanceConfig.groupId, generation, memberId, 
         rebalanceConfig.groupInstanceId);
    Assignment assignment = 
      ConsumerProtocol.deserializeAssignment(assignmentBuffer);
    // 新分配给自己的分区
    Set assignedPartitions = new HashSet<>(assignment.partitions());
    maybeUpdateJoinedSubscription(assignedPartitions);
    if (autoCommitEnabled)
        this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
    // 将新分配的分区更新到内存
    subscriptions.assignFromSubscribed(assignedPartitions);
    //...
}

五、心跳

5-1、consumer发送HeartbeatRequest

KafkaConsumer只有两个线程,用户poll线程和心跳线程。

只有当收到JoinGroup响应之后(状态=COMPLETING_REBALANCE或STABLE),心跳线程才会运行。

1)如果发现协调者下线,发起FindCoordinatorRequest;

2)如果超时未收到心跳响应(session.timeout.ms=10s(高版本45s)),标记协调者下线;

3)如果用户代码consumer.poll两次间隔超时(max.poll.interval.ms=5分钟),主动离组,发送LeaveGroupRequest,state=>UNJOINED,generation=NO_GENERATION;

4)如果没到心跳时间(heartbeat.interval.ms=3s),等一会再执行(retry.backoff.ms=100ms);

5)不满足上述条件,发送心跳请求,收到心跳响应更新心跳时间(heartbeat.receiveHeartbeat);

private class HeartbeatThread extends KafkaThread implements AutoCloseable {
    // 是否允许发送心跳
    private boolean enabled = false;
    public void enable() {
        synchronized (AbstractCoordinator.this) {
            this.enabled = true;
            heartbeat.resetTimeouts();
            AbstractCoordinator.this.notify();
        }
    }
    public void disable() {
        synchronized (AbstractCoordinator.this) {
            this.enabled = false;
        }
    }
    public void run() {
      while (true) {
          synchronized (AbstractCoordinator.this) { // 协调者锁
            if (!enabled) {
                AbstractCoordinator.this.wait();
                continue;
            }
            // 未进组(状态为UNJOINED或PREPARING_REBALANCE),停止工作
            if (state.hasNotJoinedGroup() || hasFailed()) {
                disable();
                continue;
            }
            // 通讯客户端 读写请求响应
            client.pollNoWakeup();
            long now = time.milliseconds();
            if (coordinatorUnknown()) {
                // 1. coordinator下线,心跳线程也会主动发起FindCoordinatorRequest
                if (findCoordinatorFuture != null) {
                    clearFindCoordinatorFuture();
                    AbstractCoordinator.this
                      .wait(rebalanceConfig.retryBackoffMs);
                } else {
                    lookupCoordinator();
                }
            } else if (heartbeat.sessionTimeoutExpired(now)) {
                // 2. 超时没有收到心跳响应,认为协调者下线,session.timeout.ms=10s(高版本45s)
                markCoordinatorUnknown();
            } else if (heartbeat.pollTimeoutExpired(now)) {
                // 3. consumer.poll超时,发送LeaveGroupRequest,state=>UNJOINED,generation=NO_GENERATION
                maybeLeaveGroup(leaveReason);
            } else if (!heartbeat.shouldHeartbeat(now)) {
                // 4. 没到心跳时间(3s),跳过
                AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
            } else {
                // 5. 到心跳时间(3s),发送心跳
                heartbeat.sentHeartbeat(now);
                final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
                heartbeatFuture.addListener(new RequestFutureListener<Void>() {
                    @Override
                    public void onSuccess(Void value) {
                        synchronized (AbstractCoordinator.this) {
                           // 6. 更新上次心跳时间
                            heartbeat.receiveHeartbeat();
                        }
                    }
                    @Override
                    public void onFailure(RuntimeException e) {
                        synchronized (AbstractCoordinator.this) {
                            if (e instanceof RebalanceInProgressException) {
                               // 6. 更新上次心跳时间
                                heartbeat.receiveHeartbeat();
                            } else if (e instanceof FencedInstanceIdException) {
                                heartbeatThread.failed.set(e);
                            } else {
                                heartbeat.failHeartbeat();
                                AbstractCoordinator.this.notify();
                            }
                        }
                    }
                });
            }
          }
      }
    }
}

AbstractCoordinator#sendHeartbeatRequest:请求参数=groupId+memberId+generationId。

synchronized RequestFuture sendHeartbeatRequest() {
  HeartbeatRequest.Builder requestBuilder =
          new HeartbeatRequest.Builder(new HeartbeatRequestData()
                  .setGroupId(rebalanceConfig.groupId)
                  .setMemberId(this.generation.memberId)
                  .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                  .setGenerationId(this.generation.generationId));
  return client.send(coordinator, requestBuilder)
          .compose(new HeartbeatResponseHandler(generation));
}

5-2、coordinator处理HeartbeatRequest

GroupCoordinator#handleHeartbeat:

1)如果成员已经不存在(比如协调者通过心跳超时sessionTimeout移除这个成员),响应UNKNOWN_MEMBER_ID;

2)如果group处于CompletingRebalance/Stable,代表消费组处于稳定状态,开启下一轮心跳超时检测延迟任务completeAndScheduleNextHeartbeatExpiration;

3)如果group处于PreparingRebalance,代表有人触发了rebalance(如发送了JoinGroupRequest),则响应REBALANCE_IN_PROGRESS,通知发送心跳的成员发送JoinGroupRequest;

  def handleHeartbeat(groupId: String,
                      memberId: String,
                      groupInstanceId: Option[String],
                      generationId: Int,
                      responseCallback: Errors => Unit): Unit = {
    groupManager.getGroup(groupId) match {
      case None =>
        responseCallback(Errors.UNKNOWN_MEMBER_ID)
      case Some(group) => group.inLock {
        if (group.is(Dead)) {
          responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
        } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {
          responseCallback(Errors.FENCED_INSTANCE_ID)
        } else if (!group.has(memberId)) {
          responseCallback(Errors.UNKNOWN_MEMBER_ID)
        } else if (generationId != group.generationId) {
          responseCallback(Errors.ILLEGAL_GENERATION)
        } else {
          group.currentState match {
            case CompletingRebalance =>
              val member = group.get(memberId)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.NONE)
            case PreparingRebalance =>
                // 有成员发起了JoinGroup,开始rebalance了,响应REBALANCE_IN_PROGRESS
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.REBALANCE_IN_PROGRESS)
            case Stable =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.NONE)
          }
        }
      }
    }
  }

5-3、consumer收到HeartbeatResponse

HeartbeatResponseHandler:如果协调者返回部分异常情况,将触发poll线程发起JoinGroupRequest参与新一轮rebalance。

1)REBALANCE_IN_PROGRESS,标记rejoinNeeded=true;

2)UNKNOWN_MEMBER_ID,重置状态UNJOINED,重置generation=NO_GENERATION;

private class HeartbeatResponseHandler extends 
  CoordinatorResponseHandler {
    @Override
    public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
        Errors error = heartbeatResponse.error();
        if (error == Errors.NONE) {
            future.complete(null);
        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                || error == Errors.NOT_COORDINATOR) {
            log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid",
                    coordinator());
            markCoordinatorUnknown();
            future.raise(error);
        } else if (error == Errors.REBALANCE_IN_PROGRESS) {
            if (state == MemberState.STABLE) {
                 // this.rejoinNeeded = true;
                // poll主线程发现需要rejoin,会发起JoinGroupRequest
                requestRejoin();
                future.raise(error);
            } else {
                future.complete(null);
            }
        } else if (error == Errors.ILLEGAL_GENERATION ||
                   error == Errors.UNKNOWN_MEMBER_ID ||
                   error == Errors.FENCED_INSTANCE_ID) {
            if (generationUnchanged()) {
                log.info("Attempt to heartbeat with {} and group instance id {} failed due to {}, resetting generation",
                    sentGeneration, rebalanceConfig.groupInstanceId, error);
                // state = MemberState.UNJOINED;
                // generation = Generation.NO_GENERATION;
                resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
                future.raise(error);
            } else {
                future.complete(null);
            }
        } 
         // ...
    }
}

5-4、coordinator发现成员心跳超时

GroupCoordinator#completeAndScheduleNextExpiration:每次收到心跳请求,协调者会开启新一轮心跳延迟任务。

private def completeAndScheduleNextExpiration(group: GroupMetadata, 
                    member: MemberMetadata, 
                    timeoutMs: Long): Unit = {
  val memberKey = MemberKey(member.groupId, member.memberId)
  // 标记成员心跳收到,完成这一轮心跳延迟任务
  member.heartbeatSatisfied = true
  heartbeatPurgatory.checkAndComplete(memberKey)
  // 标记成员心跳未收到,开启新一轮心跳延迟任务,timeoutMs=sessionTimeout
  member.heartbeatSatisfied = false
  val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
  heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}

DelayedHeartbeat:心跳超时检测延迟任务。

private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                      group: GroupMetadata,
                                      memberId: String,
                                      isPending: Boolean,
                                      timeoutMs: Long)
  extends DelayedOperation(timeoutMs, Some(group.lock)) {
  // 心跳超时
  override def onComplete() = coordinator.onCompleteHeartbeat()
}
// GroupCoordinator#onExpireHeartbeat
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
    group.inLock {
      // ...
      val member = group.get(memberId)
      if (!member.hasSatisfiedHeartbeat) {
        // 超时没收到心跳
        info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
        // 出组
        removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
      }
    }
  }
}

GroupCoordinator#removeMemberAndUpdateGroup:

如果Stable或CompletingRebalance,稳定状态,maybePrepareRebalance触发rebalance,组状态变为PreparingRebalance;

如果PreparingRebalance,代表还在rebalance等待所有成员进组,可能由于这个老成员心跳超时退出而满足成组条件,响应组内成员JoinGroupResponse;

  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
    // 出组
    group.remove(member.memberId)
    group.removeStaticMember(member.groupInstanceId)
    group.currentState match {
      case Dead | Empty =>
      // 已经成组了,但是现在有成员离组,触发rebalance,=>PreparingRebalance
      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
      // 收到JoinGroup,正在等待所有成员进组
      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
  }

GroupMetadata#remove:如果是leader出组,会直接选下一个memberId成为leader。

  def remove(memberId: String): Unit = {
    members.remove(memberId).foreach { member =>
      member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
      if (member.isAwaitingJoin)
        numMembersAwaitingJoin -= 1
    }
    if (isLeader(memberId))
      leaderId = members.keys.headOption
  }

六、COOPERATIVE协议rebalance

COOPERATIVE协议是为了解决rebalance期间Stop The World的问题,即rebalance期间消费组无法消费的问题。

七、静态成员

静态成员也是为了避免非必要的rebalance。

总结

发现协调者

image.png

每个broker都能处理FindCoordinatorRequest。

broker通过topic=___consumer_offsets的分区数,定位消费组对应协调节点。

协调者节点=leaderOfPartition(hash(groupId)%分区数),分区分布是否均衡决定了broker在协调者分配上是否负载均衡。

如果topic不存在会自动创建,默认__consumer_offsets的配置为50分区+3副本,segment大小100MB(正常topic是1G),清理策略=compact。注意:server.properties中副本数offsets.topic.replication.factor可能被设置为1

Rebalance

Step1,发送JoinGroupRequest。

image.png

rebalance前需要提交一次offset,默认EAGER协议下清空分配分区,rebalance期间无法消费(Stop The World)。

image.png

成员刚启动老成员没及时发送心跳被移除memberId失效,协调者分配新memberId响应MEMBER_ID_REQUIRED,消费者重新发送JoinGroupRequest。

image.png

消费者携带memberId进组。

如果组内没有leader,第一个进组member成为leader

协调者针对每个group开启一个延迟任务DelayedJoin,在rebalanceTimeout=消费者max.poll.interval.ms配置,默认5分钟(如果消费者配置不同取最大的)后会超时。如果是个新组延迟任务会采用InitialDelayedJoin,每3秒检测一次成组条件,最终超时时间也是5分钟。

成组条件1 = 所有老成员进组 + 没有pending成员(分配了memberId但是还没来得及第二次发送JoinGroup的成员);成组条件2 = rebalance超时,移除超时没进组的老成员。

成组后对每个成员开启心跳超时延迟任务,心跳超时时间=sessionTimeout=消费者配置session.timeout.ms=默认10s(高版本45s) ,响应JoinGroupResponse,包括leaderId、新generationId等,对于leader会额外携带所有consumer的protocols(协议包含订阅topic和当前分配的分区-用于渐进式rebalance)。

Step2,发送SyncGroupRequest,消费者此时都会开始发送心跳。

image.png

消费组Follower简单发送一个SyncGroupRequest,包含groupId+memberId+generationId等,用于获取分区分配方案。

消费组Leader从JoinGroupResponse中收到所有消费者的订阅情况,使用默认RangeAssignor为组内成员分配分区,发送SyncGroupRequest,额外包含分区分配方案assignments

协调者收到Leader的分配方案,将其更新到内存GroupMetadata,持久化到topic=___consumer_offsets(作为该group的协调者对应的分区),响应所有成员SyncGroupResponse,包含分区分配方案。如果Follower从时序上晚于Leader发送SyncGroupRequest也没关系,直接响应SyncGroupResponse。

所有消费者收到分区分配,更新到内存,进入后续fetch阶段。

消费者心跳

心跳线程是消费者除poll应用线程以外唯一的独立线程,职责包括:

1)发现协调者下线,主动发起FindCoordinatorRequest;

2)超时未收到服务端心跳响应,session.timeout.ms=10s(高版本45s),标记协调者下线;

3)如果用户consumer.poll两次间隔超时(max.poll.interval.ms=5分钟),主动离组,发送LeaveGroupRequest;

4)满足心跳时间heartbeat.interval.ms=3s,发送心跳请求HeartbeatRequest;

协调者处理HeartbeatRequest:

1)如果成员不存在,代表心跳超时,已经出组,响应UNKNOWN_MEMBER_ID,成员需要重新JoinGroup;

2)如果消费组处于稳定状态,完成心跳延迟定时器并开启下一轮;

3)如果消费组处于PreparingRebalance,代表有人发了JoinGroup开启新一轮rebalance,响应REBALANCE_IN_PROGRESS,通知该成员进入rebalance,发送JoinGroup;

协调者的心跳延迟定时器超时:

1)成员出组;

2)如果消费组处于稳定状态,变更状态PreparingRebalance,后续其他成员发送心跳感知后,重新JoinGroup;

3)如果消费组处于PreparingRebalance,代表正在rebalance等待成员JoinGroup,判断是否满足成组条件;