Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options giving more isolation between grpc streaming rpcs. #30233

Merged
merged 3 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ applyJavaNature(
"org/slf4j/impl/**"
],
generatedClassPatterns: [
/^org\.apache\.beam\.runners\.dataflow\.worker\.windmill.*/
/^org\.apache\.beam\.runners\.dataflow\.worker\.windmill.*/,
/^org\.apache\.beam\.runners\.dataflow\.worker\..*AutoBuilder.*/,
],
shadowClosure: {
// Each included dependency must also include all of its necessary transitive dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoBuilder;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -31,6 +32,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand All @@ -50,7 +52,6 @@ public class MetricTrackingWindmillServerStub {

private static final int MAX_READS_PER_BATCH = 60;
private static final int MAX_ACTIVE_READS = 10;
private static final int NUM_STREAMS = 1;
private static final Duration STREAM_TIMEOUT = Duration.standardSeconds(30);
private final AtomicInteger activeSideInputs = new AtomicInteger();
private final AtomicInteger activeStateReads = new AtomicInteger();
Expand All @@ -59,28 +60,67 @@ public class MetricTrackingWindmillServerStub {
private final MemoryMonitor gcThrashingMonitor;
private final boolean useStreamingRequests;

private final WindmillStreamPool<GetDataStream> getDataStreamPool;

// This may be the same instance as getDataStreamPool based upon options.
private final WindmillStreamPool<GetDataStream> heartbeatStreamPool;

@GuardedBy("this")
private final List<ReadBatch> pendingReadBatches;

@GuardedBy("this")
private int activeReadThreads = 0;

private WindmillStreamPool<GetDataStream> streamPool;
@Internal
@AutoBuilder(ofClass = MetricTrackingWindmillServerStub.class)
public abstract static class Builder {

abstract Builder setServer(WindmillServerStub server);

abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);

abstract Builder setUseStreamingRequests(boolean useStreamingRequests);

abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);

abstract Builder setNumGetDataStreams(int numGetDataStreams);

abstract MetricTrackingWindmillServerStub build();
}

public static Builder builder(WindmillServerStub server, MemoryMonitor gcThrashingMonitor) {
return new AutoBuilder_MetricTrackingWindmillServerStub_Builder()
.setServer(server)
.setGcThrashingMonitor(gcThrashingMonitor)
.setUseStreamingRequests(false)
.setUseSeparateHeartbeatStreams(false)
.setNumGetDataStreams(1);
}

public MetricTrackingWindmillServerStub(
WindmillServerStub server, MemoryMonitor gcThrashingMonitor, boolean useStreamingRequests) {
MetricTrackingWindmillServerStub(
WindmillServerStub server,
MemoryMonitor gcThrashingMonitor,
boolean useStreamingRequests,
boolean useSeparateHeartbeatStreams,
int numGetDataStreams) {
this.server = server;
this.gcThrashingMonitor = gcThrashingMonitor;
// This is used as a queue but is expected to be less than 10 batches.
this.pendingReadBatches = new ArrayList<>();
this.useStreamingRequests = useStreamingRequests;
}

public void start() {
if (useStreamingRequests) {
streamPool =
WindmillStreamPool.create(NUM_STREAMS, STREAM_TIMEOUT, this.server::getDataStream);
getDataStreamPool =
WindmillStreamPool.create(
Math.max(1, numGetDataStreams), STREAM_TIMEOUT, this.server::getDataStream);
if (useSeparateHeartbeatStreams) {
heartbeatStreamPool =
WindmillStreamPool.create(1, STREAM_TIMEOUT, this.server::getDataStream);
} else {
heartbeatStreamPool = getDataStreamPool;
}
} else {
getDataStreamPool = heartbeatStreamPool = null;
}
// This is used as a queue but is expected to be less than 10 batches.
this.pendingReadBatches = new ArrayList<>();
}

// Adds the entry to a read batch for sending to the windmill server. If a non-null batch is
Expand Down Expand Up @@ -193,11 +233,11 @@ public Windmill.KeyedGetDataResponse getStateData(

try {
if (useStreamingRequests) {
GetDataStream stream = streamPool.getStream();
GetDataStream stream = getDataStreamPool.getStream();
try {
return stream.requestKeyedData(computation, request);
} finally {
streamPool.releaseStream(stream);
getDataStreamPool.releaseStream(stream);
}
} else {
SettableFuture<Windmill.KeyedGetDataResponse> response = SettableFuture.create();
Expand All @@ -219,11 +259,11 @@ public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request)
activeSideInputs.getAndIncrement();
try {
if (useStreamingRequests) {
GetDataStream stream = streamPool.getStream();
GetDataStream stream = getDataStreamPool.getStream();
try {
return stream.requestGlobalData(request);
} finally {
streamPool.releaseStream(stream);
getDataStreamPool.releaseStream(stream);
}
} else {
return server
Expand All @@ -240,18 +280,19 @@ public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request)

/** Tells windmill processing is ongoing for the given keys. */
public void refreshActiveWork(Map<String, List<HeartbeatRequest>> heartbeats) {
if (heartbeats.isEmpty()) {
return;
}
activeHeartbeats.set(heartbeats.size());
try {
if (useStreamingRequests) {
// With streaming requests, always send the request even when it is empty, to ensure that
// we trigger health checks for the stream even when it is idle.
GetDataStream stream = streamPool.getStream();
GetDataStream stream = heartbeatStreamPool.getStream();
try {
stream.refreshActiveWork(heartbeats);
} finally {
streamPool.releaseStream(stream);
heartbeatStreamPool.releaseStream(stream);
}
} else if (!heartbeats.isEmpty()) {
} else {
// This code path is only used by appliance which sends heartbeats (used to refresh active
// work) as KeyedGetDataRequests. So we must translate the HeartbeatRequest to a
// KeyedGetDataRequest here regardless of the value of sendKeyedGetDataRequests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,11 @@ public void run() {
this.windmillServer = options.getWindmillServerStub();
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
this.metricTrackingWindmillServer =
new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled);
this.metricTrackingWindmillServer.start();
MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
.setUseStreamingRequests(windmillServiceEnabled)
.setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams())
.setNumGetDataStreams(options.getWindmillGetDataStreamCount())
.build();
this.sideInputStateFetcher = new SideInputStateFetcher(metricTrackingWindmillServer, options);
this.clientId = clientIdGenerator.nextLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,25 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt

void setWindmillMessagesBetweenIsReadyChecks(int value);

@Description("If true, a most a single active rpc will be used per channel.")
@Default.Boolean(false)
boolean getUseWindmillIsolatedChannels();

void setUseWindmillIsolatedChannels(boolean value);

@Description(
"If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.")
@Default.Boolean(false)
boolean getUseSeparateWindmillHeartbeatStreams();

void setUseSeparateWindmillHeartbeatStreams(boolean value);

@Description("The number of streams to use for GetData requests.")
@Default.Integer(1)
int getWindmillGetDataStreamCount();

void setWindmillGetDataStreamCount(int value);

/**
* Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for
* backwards compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public static GrpcWindmillServer create(StreamingDataflowWorkerOptions workerOpt
GrpcDispatcherClient.create(
WindmillStubFactory.remoteStubFactory(
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
workerOptions.getGcpCredential())));
workerOptions.getGcpCredential(),
workerOptions.getUseWindmillIsolatedChannels())));
if (workerOptions.getWindmillServiceEndpoint() != null) {
grpcWindmillServer.configureWindmillServiceEndpoints();
} else if (!workerOptions.isEnableStreamingEngine()
Expand Down
Loading
Loading