Skip to content

consume flow control settings updates from job settings #34539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel;

import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
Expand Down Expand Up @@ -87,7 +87,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
Expand Down Expand Up @@ -243,10 +242,11 @@ private StreamingDataflowWorker(
@Nullable ChannelzServlet channelzServlet = null;
Consumer<PrintWriter> getDataStatusProvider;
Supplier<Long> currentActiveCommitBytesProvider;

ChannelCache channelCache = null;
if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) {
// Direct path pipelines.
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
channelCache = createChannelCache(options, configFetcher);
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
Expand All @@ -273,7 +273,7 @@ private StreamingDataflowWorker(
processingContext,
getWorkStreamLatencies);
}),
createFanOutStubFactory(options),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),
GetWorkBudgetDistributors.distributeEvenly(),
Preconditions.checkNotNull(dispatcherClient),
commitWorkStream ->
Expand Down Expand Up @@ -384,6 +384,7 @@ private StreamingDataflowWorker(
.setChannelzServlet(channelzServlet)
.setGetDataStatusProvider(getDataStatusProvider)
.setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
.setChannelCache(channelCache)
.build();

LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled());
Expand Down Expand Up @@ -620,19 +621,28 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options)
StreamingDataflowWorker.class.getSimpleName());
}

private static ChannelCachingStubFactory createFanOutStubFactory(
DataflowWorkerHarnessOptions workerOptions) {
return ChannelCachingRemoteStubFactory.create(
workerOptions.getGcpCredential(),
private static ChannelCache createChannelCache(
DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) {
ChannelCache channelCache =
ChannelCache.create(
serviceAddress ->
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress.
IsolationChannel.create(
() ->
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
(currentFlowControlSettings, serviceAddress) -> {
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress.
return IsolationChannel.create(
() ->
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
});
configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
config ->
channelCache.consumeFlowControlSettings(
config.userWorkerJobSettings().getFlowControlSettings()));
return channelCache;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.runners.dataflow.worker.streaming.config;

import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.annotations.Internal;

Expand All @@ -33,5 +32,5 @@ public interface StreamingGlobalConfigHandle {
* Subscribe to config updates by registering a callback. Callback should be called the first time
* with settings, if any. The callback could execute inline before the method returns.
*/
void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> callback);
void registerConfigObserver(Consumer<StreamingGlobalConfig> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -79,6 +80,7 @@ public final class StreamingWorkerStatusPages {
private final @Nullable GrpcWindmillStreamFactory windmillStreamFactory;
private final DebugCapture.@Nullable Manager debugCapture;
private final @Nullable ChannelzServlet channelzServlet;
private final @Nullable ChannelCache channelCache;

private final AtomicReference<StreamingGlobalConfig> globalConfig = new AtomicReference<>();

Expand All @@ -96,7 +98,8 @@ public final class StreamingWorkerStatusPages {
Consumer<PrintWriter> getDataStatusProvider,
BoundedQueueExecutor workUnitExecutor,
ScheduledExecutorService statusPageDumper,
StreamingGlobalConfigHandle globalConfigHandle) {
StreamingGlobalConfigHandle globalConfigHandle,
@Nullable ChannelCache channelCache) {
this.clock = clock;
this.clientId = clientId;
this.isRunning = isRunning;
Expand All @@ -111,6 +114,7 @@ public final class StreamingWorkerStatusPages {
this.workUnitExecutor = workUnitExecutor;
this.statusPageDumper = statusPageDumper;
globalConfigHandle.registerConfigObserver(globalConfig::set);
this.channelCache = channelCache;
}

public static StreamingWorkerStatusPages.Builder builder() {
Expand Down Expand Up @@ -168,6 +172,9 @@ private void addStreamingEngineStatusPages() {
}
writer.println(config.userWorkerJobSettings().toString());
});
if (channelCache != null) {
statusPages.addStatusDataProvider("ChannelCache", "ChannelCache", channelCache);
}
}

private boolean isStreamingEngine() {
Expand Down Expand Up @@ -276,6 +283,8 @@ public interface Builder {

Builder setGlobalConfigHandle(StreamingGlobalConfigHandle globalConfigHandle);

Builder setChannelCache(@Nullable ChannelCache channelCache);

StreamingWorkerStatusPages build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
import org.apache.beam.sdk.annotations.Internal;
Expand All @@ -36,7 +37,7 @@ public static WindmillConnection from(

windmillEndpoint.workerToken().ifPresent(windmillWorkerConnection::setBackendWorkerToken);
windmillEndpoint.directEndpoint().ifPresent(windmillWorkerConnection::setDirectEndpoint);
windmillWorkerConnection.setStub(endpointToStubFn.apply(windmillEndpoint));
windmillWorkerConnection.setStubSupplier(() -> endpointToStubFn.apply(windmillEndpoint));

return windmillWorkerConnection.build();
}
Expand All @@ -48,17 +49,22 @@ public static Builder builder() {

public abstract String backendWorkerToken();

public abstract Optional<WindmillServiceAddress> directEndpoint();
abstract Optional<WindmillServiceAddress> directEndpoint();

public abstract CloudWindmillServiceV1Alpha1Stub stub();
abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubSupplier();

public final CloudWindmillServiceV1Alpha1Stub currentStub() {
return stubSupplier().get();
}

@AutoValue.Builder
public abstract static class Builder {
abstract Builder setBackendWorkerToken(String backendWorkerToken);

public abstract Builder setDirectEndpoint(WindmillServiceAddress value);
abstract Builder setDirectEndpoint(WindmillServiceAddress value);

public abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub);
public abstract Builder setStubSupplier(
Supplier<CloudWindmillServiceV1Alpha1Stub> stubSupplier);

public abstract WindmillConnection build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.LOCALHOST;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.localhostChannel;

import com.google.auto.value.AutoValue;
import java.util.List;
Expand Down Expand Up @@ -164,15 +164,6 @@ private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
return stubs.get(rand.nextInt(stubs.size()));
}

/**
* Returns whether the {@link DispatcherStubs} have been set. Once initially set, {@link
* #dispatcherStubs} will always have a value as empty updates will trigger an {@link
* IllegalStateException}.
*/
public boolean hasInitializedEndpoints() {
return dispatcherStubs.get().hasInitializedEndpoints();
}

public void onJobConfig(StreamingGlobalConfig config) {
if (config.windmillServiceEndpoints().isEmpty()) {
LOG.warn("Dispatcher client received empty windmill service endpoints from global config");
Expand Down Expand Up @@ -282,14 +273,6 @@ private static CloudWindmillMetadataServiceV1Alpha1Stub createWindmillMetadataSe
WindmillServiceAddress.create(endpoint));
}

private int size() {
return dispatcherEndpoints().size();
}

private boolean hasInitializedEndpoints() {
return size() > 0;
}

abstract ImmutableSet<HostAndPort> dispatcherEndpoints();

abstract ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.inProcessChannel;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.LOCALHOST;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.inProcessChannel;
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.localhostChannel;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public GetWorkStream createDirectGetWorkStream(
WorkItemScheduler workItemScheduler) {
return GrpcDirectGetWorkStream.create(
connection.backendWorkerToken(),
responseObserver -> connection.stub().getWorkStream(responseObserver),
responseObserver -> connection.currentStub().getWorkStream(responseObserver),
request,
grpcBackOff.get(),
newStreamObserverFactory(),
Expand Down Expand Up @@ -265,7 +265,7 @@ public GetDataStream createDirectGetDataStream(
WindmillConnection connection, ThrottleTimer getDataThrottleTimer) {
return GrpcGetDataStream.create(
connection.backendWorkerToken(),
responseObserver -> connection.stub().getDataStream(responseObserver),
responseObserver -> connection.currentStub().getDataStream(responseObserver),
grpcBackOff.get(),
newStreamObserverFactory(),
streamRegistry,
Expand Down Expand Up @@ -297,7 +297,7 @@ public CommitWorkStream createDirectCommitWorkStream(
WindmillConnection connection, ThrottleTimer commitWorkThrottleTimer) {
return GrpcCommitWorkStream.create(
connection.backendWorkerToken(),
responseObserver -> connection.stub().commitWorkStream(responseObserver),
responseObserver -> connection.currentStub().commitWorkStream(responseObserver),
grpcBackOff.get(),
newStreamObserverFactory(),
streamRegistry,
Expand Down
Loading
Loading