Kafka2.0 Consumer Coordinator Source

Consumer groups and consumers

  1. Consumer groups and consumers are one-to-many relationships.
  2. Consumers in the same consumer group can consume multiple zones and are exclusive.
  3. The partition allocation policy for consumers is defined by the interface PartitionAssignor, which has three built-in allocation policies, RangeAssignor, RoundRobinAssignor, StickyAssignor, to support custom policies.
  4. Different consumer groups can consume the same zones without interruption.

Consumer Coordinator and Group Coordinator

  1. Consumer Coordinator on the client side and Group Coordinator on the server side keep communicating through the heartbeat.
  2. Consumers need to make sure that the coordinator is read before they consume.
    1. Select the node Node with the fewest requests, that is, the node with the fewest InFlightRequests.
    2. A request to get a coordinator node is sent to this node, and the sending process is similar to sending a pull request.
    3. A request to join a group is sent to the found coordinator node, where the heartbeat thread is disabled.
    4. Join the group response processor JoinGroupResponseHandler to process the response, which includes generationId, memberId, leaderId, protocol.
    5. If it is a leader consumer, that is, memberId=leaderId, the partition allocation needs to be calculated based on the allocation policy protocol.
    6. Encapsulate the partition allocation results into a synchronization group request and send the synchronization group request to the coordinator node.
    7. The Sync Group Response Processor SyncGroupResponseHandler handles the response to the above request.
    8. If Step 5 determines that it is not a follower consumer, it is also necessary to send a synchronization group request to the coordinator, but instead of encapsulating the partition allocation results, it is obtained from the group coordinator.
    9. Start the heartbeat thread after joining the group successfully.
    10. Update the partition allocation for the local cache, where the consumer rebalance listener is invoked.

Consumer Status

  • UNJOINED: The initial state of the consumer is UNJOINED, indicating that the consumer is not in the consumption group.
  • REBALANCING: The status changes to REBALANCING to indicate rebalancing before the consumer sends a request to the coordinator to join the group
  • STABLE: The consumer listens on the message and returns successfully. The state changes to STABLE, indicating a stable state. If the message fails, the state is reset to UNJOINED

Heartbeat Threads

  1. When a consumer joins a consumer group, a heartbeat thread is started and communication with the group coordinator is maintained.
  2. If the consumer state is not STABLE, no heartbeat is sent.
  3. If the group coordinator is unknown, wait for a period of time to try again.
  4. If the heartbeat session times out, the marker coordinator node is unknown.
  5. If heartbeat polling times out, send a request to leave the group.
  6. If you do not need to send a heartbeat at this time, wait for a period of time to try again.
  7. Send a heartbeat, register the response listener, set the receive time after receiving the response, and make the next heartbeat.


Pull offset

  1. If a partition is specified, the consumer coordinator pulls a mapping relationship between a set of partitions and submitted offsets from the group coordinator and caches them to SubscriptionState.
  2. Set offset reset policy: LATEST, EARLIEST,NONE.
  3. Updates the offset location of consumption asynchronously.

Submit Offset

  1. Consumer Coordinator gets the current coordinator node.
  2. Send a submit offset request to this node and return to Future.

Join Group Process

Source Code Analysis of Consumer Joining Group Process

boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
    final long startMs = time.milliseconds();
    if (!coordinator.poll(timeoutMs)) { // Get Coordinator
        return false;
    // Update Offset
    return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
// Get Coordinator
public boolean poll(final long timeoutMs) {
    final long startTime = time.milliseconds();
    long currentTime = startTime;
    long elapsed = 0L;

    if (subscriptions.partitionsAutoAssigned()) { // Is Auto-Assign Theme Type
        // Update last polling time for heartbeat

        if (coordinatorUnknown()) { // Coordinator unknown
            // Ensure coordinator is ready
            if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                return false;

        if (rejoinNeededOrPending()) { // Need to join consumption group
            // Join group, synchronize group
            if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                return false;
            currentTime = time.milliseconds();
    } else { // Specify partition type
        if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// If there are no ready nodes
            // Blocking waiting for metadata updates
            final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
            if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
                return false; // Failed to update metadata

            currentTime = time.milliseconds();

    maybeAutoCommitOffsetsAsync(currentTime); // Asynchronous Autocommit Offset
    return true;
// Ensure coordinator is ready
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
    final long startTimeMs = time.milliseconds();
    long elapsedTime = 0L;

    while (coordinatorUnknown()) { // If the coordinator is unknown
        final RequestFuture<Void> future = lookupCoordinator(); // Send a request for a coordinator to the node with the fewest queues for current requests
        client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
        if (!future.isDone()) {
            break; // Response incomplete, exit

    return !coordinatorUnknown();
// Join group, synchronize group
boolean ensureActiveGroup(long timeoutMs, long startMs) {
    startHeartbeatThreadIfNeeded(); // Start Heartbeat Thread
    return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
    long elapsedTime = 0L;

    while (rejoinNeededOrPending()) {
        // Send Join Group Request
        final RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
        if (!future.isDone()) {
            // we ran out of time
            return false;

        if (future.succeeded()) { // Join Success, Callback Processing Response, Update Cached Partition Allocation
            ByteBuffer memberAssignment = future.value().duplicate();
            onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
    return true;
// Send Join Group Request
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    if (joinFuture == null) {
        disableHeartbeatThread(); // Pause heartbeat thread

        state = MemberState.REBALANCING; // Status changed to REBALANCING
        joinFuture = sendJoinGroupRequest(); // Send Join Group Request to Coordinator
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // Response listener
            public void onSuccess(ByteBuffer value) { // Success
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.STABLE; // Status changed to STABLE
                    rejoinNeeded = false; // No need to join

                    if (heartbeatThread != null)
                        heartbeatThread.enable(); // Start a paused heartbeat

            public void onFailure(RuntimeException e) { // fail
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.UNJOINED; // Status changed to UNJOINED
    return joinFuture;
// Send Join Group Request to Coordinator
RequestFuture<ByteBuffer> sendJoinGroupRequest() {

    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(

    int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
    return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
            .compose(new JoinGroupResponseHandler()); // Asynchronous Callback Response Processing Class
// Asynchronous Callback Response Processing Class
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
        Errors error = joinResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                if (state != MemberState.REBALANCING) { // If REBALANCING, the state is abnormal
                    future.raise(new UnjoinedGroupException());
                } else {
                    AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
                    if (joinResponse.isLeader()) { // The current consumer group is leader
                    } else { // When the consumer is a follower
// Send leader consumer synchronization group request
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // Allocate partitions to consumers based on the allocation policy of the response
        Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());

        SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
// Send follower consumer synchronization group request
private RequestFuture<ByteBuffer> onJoinFollower() {
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
                    Collections.<String, ByteBuffer>emptyMap()); // Send requests without allocation information
    return sendSyncGroupRequest(requestBuilder);

