Skip to content

Commit

Permalink
KAFKA-17439: Make polling for new records an explicit action/event in…
Browse files Browse the repository at this point in the history
… the new consumer (#17035)

Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans <[email protected]>
  • Loading branch information
kirktrue authored Oct 28, 2024
1 parent 5f92f60 commit 9e42475
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
Expand Down Expand Up @@ -708,6 +709,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
updateAssignmentMetadataIfNeeded(timer);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
sendPrefetches(timer);

if (fetch.records().isEmpty()) {
log.trace("Returning empty records from `poll()` "
+ "since the consumer's position has advanced for at least one topic partition");
Expand Down Expand Up @@ -1519,6 +1528,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
return fetch;
}

// send any new fetches (won't resend pending fetches)
sendFetches(timer);

// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure

Expand Down Expand Up @@ -1606,6 +1618,63 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}

/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
*
* <p/>
*
* This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method
* of the same name:
*
* <ul>
* <li>
* The method will wait for confirmation of the request creation before continuing.
* </li>
* <li>
* The method will throw exceptions encountered during request creation to the user <b>immediately</b>.
* </li>
* <li>
* The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation.
* Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms.
* That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}.
* Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests}
* as it can handle requests that are created after the timeout.
* </li>
* </ul>
*
* @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice
* is used to avoid using {@link Long#MAX_VALUE} to wait "forever"
*/
private void sendFetches(Timer timer) {
try {
applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException e) {
// Can be ignored, per above comments.
}
}

/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread
* will not wait for confirmation of the request creation before continuing.
*
* <p/>
*
* At the point this method is called, {@link KafkaConsumer#poll(Duration)} has data ready to return to the user,
* which means the consumed position was already updated. In order to prevent potential gaps in records, this
* method is designed to suppress all exceptions.
*
* @param timer Provides an upper bound for the event and its {@link CompletableFuture future}
*/
private void sendPrefetches(Timer timer) {
try {
applicationEventHandler.add(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (Throwable t) {
// Any unexpected errors will be logged for troubleshooting, but not thrown.
log.warn("An unexpected error occurred while pre-fetching data in Consumer.poll(), but was suppressed", t);
}
}

@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -29,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -41,6 +44,7 @@
public class FetchRequestManager extends AbstractFetch implements RequestManager {

private final NetworkClientDelegate networkClientDelegate;
private CompletableFuture<Void> pendingFetchRequestFuture;

FetchRequestManager(final LogContext logContext,
final Time time,
Expand All @@ -65,15 +69,42 @@ protected void maybeThrowAuthFailure(Node node) {
networkClientDelegate.maybeThrowAuthFailure(node);
}

/**
* Signals the {@link Consumer} wants requests be created for the broker nodes to fetch the next
* batch of records.
*
* @see CreateFetchRequestsEvent
* @return Future on which the caller can wait to ensure that the requests have been created
*/
public CompletableFuture<Void> createFetchRequests() {
CompletableFuture<Void> future = new CompletableFuture<>();

if (pendingFetchRequestFuture != null) {
// In this case, we have an outstanding fetch request, so chain the newly created future to be
// completed when the "pending" future is completed.
pendingFetchRequestFuture.whenComplete((value, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(value);
}
});
} else {
pendingFetchRequestFuture = future;
}

return future;
}

/**
* {@inheritDoc}
*/
@Override
public PollResult poll(long currentTimeMs) {
return pollInternal(
prepareFetchRequests(),
this::handleFetchSuccess,
this::handleFetchFailure
this::prepareFetchRequests,
this::handleFetchSuccess,
this::handleFetchFailure
);
}

Expand All @@ -82,9 +113,12 @@ public PollResult poll(long currentTimeMs) {
*/
@Override
public PollResult pollOnClose(long currentTimeMs) {
// There needs to be a pending fetch request for pollInternal to create the requests.
createFetchRequests();

// TODO: move the logic to poll to handle signal close
return pollInternal(
prepareCloseFetchSessionRequests(),
this::prepareCloseFetchSessionRequests,
this::handleCloseFetchSessionSuccess,
this::handleCloseFetchSessionFailure
);
Expand All @@ -94,28 +128,56 @@ public PollResult pollOnClose(long currentTimeMs) {
* Creates the {@link PollResult poll result} that contains a list of zero or more
* {@link FetchRequest.Builder fetch requests}.
*
* @param fetchRequests {@link Map} of {@link Node nodes} to their {@link FetchSessionHandler.FetchRequestData}
* @param successHandler {@link ResponseHandler Handler for successful responses}
* @param errorHandler {@link ResponseHandler Handler for failure responses}
* @param fetchRequestPreparer {@link FetchRequestPreparer} to generate a {@link Map} of {@link Node nodes}
* to their {@link FetchSessionHandler.FetchRequestData}
* @param successHandler {@link ResponseHandler Handler for successful responses}
* @param errorHandler {@link ResponseHandler Handler for failure responses}
* @return {@link PollResult}
*/
private PollResult pollInternal(Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests,
private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer,
ResponseHandler<ClientResponse> successHandler,
ResponseHandler<Throwable> errorHandler) {
List<UnsentRequest> requests = fetchRequests.entrySet().stream().map(entry -> {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null)
errorHandler.handle(fetchTarget, data, error);
else
successHandler.handle(fetchTarget, data, clientResponse);
};

return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler);
}).collect(Collectors.toList());

return new PollResult(requests);
if (pendingFetchRequestFuture == null) {
// If no explicit request for creating fetch requests was issued, just short-circuit.
return PollResult.EMPTY;
}

try {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = fetchRequestPreparer.prepare();

List<UnsentRequest> requests = fetchRequests.entrySet().stream().map(entry -> {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null)
errorHandler.handle(fetchTarget, data, error);
else
successHandler.handle(fetchTarget, data, clientResponse);
};

return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler);
}).collect(Collectors.toList());

pendingFetchRequestFuture.complete(null);
return new PollResult(requests);
} catch (Throwable t) {
// A "dummy" poll result is returned here rather than rethrowing the error because any error
// that is thrown from any RequestManager.poll() method interrupts the polling of the other
// request managers.
pendingFetchRequestFuture.completeExceptionally(t);
return PollResult.EMPTY;
} finally {
pendingFetchRequestFuture = null;
}
}

/**
* Simple functional interface to all passing in a method reference for improved readability.
*/
@FunctionalInterface
protected interface FetchRequestPreparer {

Map<Node, FetchSessionHandler.FetchRequestData> prepare();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public void process(ApplicationEvent event) {
process((CommitOnCloseEvent) event);
return;

case CREATE_FETCH_REQUESTS:
process((CreateFetchRequestsEvent) event);
return;

case SHARE_FETCH:
process((ShareFetchEvent) event);
return;
Expand Down Expand Up @@ -176,6 +180,11 @@ private void process(final PollEvent event) {
}
}

private void process(final CreateFetchRequestsEvent event) {
CompletableFuture<Void> future = requestManagers.fetchRequestManager.createFetchRequests();
future.whenComplete(complete(event.future()));
}

private void process(final AsyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.FetchRequestManager;

/**
* {@code CreateFetchRequestsEvent} signals that the {@link Consumer} wants to issue fetch requests to the nodes
* for the partitions to which the consumer is currently subscribed. The event is completed when the
* {@link FetchRequestManager} has finished <em>creating</em> (i.e. not enqueuing, sending, or receiving)
* fetch requests (if any) to send to the broker nodes.
*/
public class CreateFetchRequestsEvent extends CompletableApplicationEvent<Void> {

public CreateFetchRequestsEvent(final long deadlineMs) {
super(Type.CREATE_FETCH_REQUESTS, deadlineMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2566,11 +2566,6 @@ public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol)
consumer.assign(singleton(tp0));
consumer.seek(tp0, 50L);

// For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in background thread.
// Wait for the first fetch request to avoid ListOffsetResponse mismatch.
TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
"No fetch request sent");

client.prepareResponse(request -> request instanceof ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
assertEquals(singletonMap(tp0, 90L), consumer.endOffsets(Collections.singleton(tp0)));
// correct lag result should be returned as well
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
Expand Down Expand Up @@ -1711,6 +1712,7 @@ public void testEnsurePollEventSentOnConsumerPoll() {
consumer.subscribe(singletonList("topic1"));
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
}

private Properties requiredConsumerConfigAndGroupId(final String groupId) {
Expand Down
Loading

0 comments on commit 9e42475

Please sign in to comment.