diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index fe80b826c564..2376a2c9bbce 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -906,7 +906,7 @@ class BeamModulePlugin implements Plugin { testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version", truth : "com.google.truth:truth:1.1.5", threetenbp : "org.threeten:threetenbp:1.6.8", - vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.1", + vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2", vendored_guava_32_1_2_jre : "org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1", vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2", woodstox_core_asl : "org.codehaus.woodstox:woodstox-core-asl:4.4.1", diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 888b0d3f0b68..9b06fa9b7e27 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -311,6 +311,12 @@ public Dataflow create(PipelineOptions options) { void setWindmillGetDataStreamCount(int value); + @Description("If true, will only show windmill service channels on /channelz") + @Default.Boolean(true) + boolean getChannelzShowOnlyWindmillServiceChannels(); + + void setChannelzShowOnlyWindmillServiceChannels(boolean value); + /** * The amount of time before UnboundedReaders are considered idle and closed during streaming * execution. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 825c3fb78c7d..2e0156bae778 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -114,6 +114,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -211,6 +212,7 @@ public class StreamingDataflowWorker { private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = Duration.standardMinutes(5); private static final Random clientIdGenerator = new Random(); + private static final String CHANNELZ_PATH = "/channelz"; final WindmillStateCache stateCache; // Maps from computation ids to per-computation state. private final ConcurrentMap computationMap; @@ -735,6 +737,13 @@ public void startStatusPages() { if (debugCaptureManager != null) { debugCaptureManager.start(); } + + if (windmillServiceEnabled) { + ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH, options, windmillServer); + statusPages.addServlet(channelzServlet); + statusPages.addCapturePage(channelzServlet); + } + statusPages.addServlet(stateCache.statusServlet()); statusPages.addServlet(new SpecsServlet()); @@ -2081,6 +2090,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro } private class MetricsDataProvider implements StatusDataProvider { + @Override public void appendSummaryHtml(PrintWriter writer) { writer.println(workUnitExecutor.summaryHtml()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java index 8caa79cd3f76..a1160b4f98d9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** @@ -53,6 +54,11 @@ public void setWindmillServiceEndpoints(Set endpoints) throws IOExc // This class is used for windmill appliance and local runner tests. } + @Override + public ImmutableSet getWindmillServiceEndpoints() { + return ImmutableSet.of(); + } + @Override public boolean isReady() { return true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java index c327e68d7e91..34461ab471fd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** Stub for communicating with a Windmill server. */ @@ -40,6 +41,11 @@ public abstract class WindmillServerStub implements StatusDataProvider { */ public abstract void setWindmillServiceEndpoints(Set endpoints) throws IOException; + /* + * Returns the windmill service endpoints set by setWindmillServiceEndpoints + */ + public abstract ImmutableSet getWindmillServiceEndpoints(); + /** Returns true iff this WindmillServerStub is ready for making API calls. */ public abstract boolean isReady(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java new file mode 100644 index 000000000000..9ab02788603e --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; +import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; +import org.apache.beam.runners.dataflow.worker.status.DebugCapture; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; + +/** Respond to /path with the GRPC channelz data. */ +@Internal +public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable { + + private static final int MAX_TOP_CHANNELS_TO_RETURN = 500; + + private final ChannelzService channelzService; + private final WindmillServerStub windmillServerStub; + private final boolean showOnlyWindmillServiceChannels; + + public ChannelzServlet( + String path, StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) { + super(path); + channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN); + this.windmillServerStub = windmillServerStub; + showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels(); + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + response.setStatus(HttpServletResponse.SC_OK); + PrintWriter writer = response.getWriter(); + captureData(writer); + } + + @Override + public String pageName() { + return getPath(); + } + + @Override + public void captureData(PrintWriter writer) { + writer.println(""); + writer.println("

Channelz

"); + appendTopChannels(writer); + writer.println(""); + } + + // channelz proto says there won't be cycles in the ref graph. + // we track visited ids to be defensive and prevent any accidental cycles. + private static class VisitedSets { + + Set channels = new HashSet<>(); + Set subchannels = new HashSet<>(); + } + + private void appendTopChannels(PrintWriter writer) { + SettableFuture future = SettableFuture.create(); + // IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels + // in the worker, we might not return all the windmill channels. If we run into + // such situations, this logic can be modified to loop till we see an empty + // GetTopChannelsResponse response with the end bit set. + channelzService.getTopChannels( + GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future)); + GetTopChannelsResponse topChannelsResponse; + try { + topChannelsResponse = future.get(); + } catch (Exception e) { + String msg = "Failed to get channelz: " + e.getMessage(); + writer.println(msg); + return; + } + + List topChannels = topChannelsResponse.getChannelList(); + if (showOnlyWindmillServiceChannels) { + topChannels = filterWindmillChannels(topChannels); + } + writer.println("

Top Level Channels

"); + writer.println(""); + VisitedSets visitedSets = new VisitedSets(); + for (Channel channel : topChannels) { + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + } + writer.println("
"); + writer.println("TopChannelId: " + channel.getRef().getChannelId()); + writer.println(""); + appendChannel(channel, writer, visitedSets); + writer.println("
"); + } + + private List filterWindmillChannels(List channels) { + ImmutableSet windmillServiceEndpoints = + windmillServerStub.getWindmillServiceEndpoints(); + Set windmillServiceHosts = + windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet()); + List windmillChannels = new ArrayList<>(); + for (Channel channel : channels) { + for (String windmillServiceHost : windmillServiceHosts) { + if (channel.getData().getTarget().contains(windmillServiceHost)) { + windmillChannels.add(channel); + break; + } + } + } + return windmillChannels; + } + + private void appendChannels( + List channelRefs, PrintWriter writer, VisitedSets visitedSets) { + for (ChannelRef channelRef : channelRefs) { + writer.println(""); + writer.println(""); + writer.println("Channel: " + channelRef.getChannelId()); + writer.println(""); + writer.println(""); + appendChannel(channelRef, writer, visitedSets); + writer.println(""); + writer.println(""); + } + } + + private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) { + if (visitedSets.channels.contains(channelRef.getChannelId())) { + String msg = "Duplicate Channel Id: " + channelRef; + writer.println(msg); + return; + } + visitedSets.channels.add(channelRef.getChannelId()); + SettableFuture future = SettableFuture.create(); + channelzService.getChannel( + GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(), + getStreamObserver(future)); + Channel channel; + try { + channel = future.get().getChannel(); + } catch (Exception e) { + String msg = "Failed to get Channel: " + channelRef; + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + appendChannel(channel, writer, visitedSets); + } + + private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) { + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + appendChannels(channel.getChannelRefList(), writer, visitedSets); + appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets); + appendSockets(channel.getSocketRefList(), writer); + writer.println("
"); + writer.println("ChannelId: " + channel.getRef().getChannelId()); + writer.println("
" + channel);
+    writer.println("
"); + } + + private void appendSubChannels( + List subchannelRefList, PrintWriter writer, VisitedSets visitedSets) { + for (SubchannelRef subchannelRef : subchannelRefList) { + writer.println(""); + writer.println(""); + writer.println("Sub Channel: " + subchannelRef.getSubchannelId()); + writer.println(""); + writer.println(""); + appendSubchannel(subchannelRef, writer, visitedSets); + writer.println(""); + writer.println(""); + } + } + + private void appendSubchannel( + SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) { + if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) { + String msg = "Duplicate Subchannel Id: " + subchannelRef; + writer.println(msg); + return; + } + visitedSets.subchannels.add(subchannelRef.getSubchannelId()); + SettableFuture future = SettableFuture.create(); + channelzService.getSubchannel( + GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()).build(), + getStreamObserver(future)); + Subchannel subchannel; + try { + subchannel = future.get().getSubchannel(); + } catch (Exception e) { + String msg = "Failed to get Subchannel: " + subchannelRef; + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + appendChannels(subchannel.getChannelRefList(), writer, visitedSets); + appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets); + appendSockets(subchannel.getSocketRefList(), writer); + writer.println("
SubchannelId: " + subchannelRef.getSubchannelId()); + writer.println("
" + subchannel.toString());
+    writer.println("
"); + } + + private void appendSockets(List socketRefList, PrintWriter writer) { + for (SocketRef socketRef : socketRefList) { + writer.println(""); + writer.println(""); + writer.println("Socket: " + socketRef.getSocketId()); + writer.println(""); + writer.println(""); + appendSocket(socketRef, writer); + writer.println(""); + writer.println(""); + } + } + + private void appendSocket(SocketRef socketRef, PrintWriter writer) { + SettableFuture future = SettableFuture.create(); + channelzService.getSocket( + GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()).build(), + getStreamObserver(future)); + Socket socket; + try { + socket = future.get().getSocket(); + } catch (Exception e) { + String msg = "Failed to get Socket: " + socketRef; + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + writer.println("
" + socket + "
"); + } + + private StreamObserver getStreamObserver(SettableFuture future) { + return new StreamObserver() { + @Nullable T response = null; + + @Override + public void onNext(T message) { + response = message; + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onCompleted() { + future.set(response); + } + }; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index aa15e0a5e1a6..845d54588e79 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -97,6 +97,10 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() { : randomlySelectNextStub(windmillServiceStubs)); } + ImmutableSet getDispatcherEndpoints() { + return dispatcherStubs.get().dispatcherEndpoints(); + } + CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() { ImmutableList windmillMetadataServiceStubs = dispatcherStubs.get().windmillMetadataServiceStubs(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 858aeb159856..f94fc09ac537 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -267,6 +267,11 @@ public void setWindmillServiceEndpoints(Set endpoints) { dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints)); } + @Override + public ImmutableSet getWindmillServiceEndpoints() { + return dispatcherClient.getDispatcherEndpoints(); + } + @Override public boolean isReady() { return dispatcherClient.hasInitializedEndpoints(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java index cf31436d3647..d8e4c064e974 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java @@ -37,6 +37,7 @@ public final class WindmillChannelFactory { public static final String LOCALHOST = "localhost"; private static final int DEFAULT_GRPC_PORT = 443; + private static final int MAX_REMOTE_TRACE_EVENTS = 100; private WindmillChannelFactory() {} @@ -139,6 +140,7 @@ private static > T withDefaultChannelOpti return channelBuilder .maxInboundMessageSize(Integer.MAX_VALUE) + .maxTraceEvents(MAX_REMOTE_TRACE_EVENTS) .maxInboundMetadataSize(1024 * 1024); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index 069fcac07c80..e4985193d1cf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -43,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; @@ -64,6 +64,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; @@ -73,7 +74,7 @@ import org.slf4j.LoggerFactory; /** An in-memory Windmill server that offers provided work and data. */ -final class FakeWindmillServer extends WindmillServerStub { +public class FakeWindmillServer extends WindmillServerStub { private static final Logger LOG = LoggerFactory.getLogger(FakeWindmillServer.class); private final ResponseQueue workToOffer; private final ResponseQueue dataToOffer; @@ -91,6 +92,9 @@ final class FakeWindmillServer extends WindmillServerStub { private boolean dropStreamingCommits = false; private final Consumer> processHeartbeatResponses; + @GuardedBy("this") + private ImmutableSet dispatcherEndpoints; + public FakeWindmillServer( ErrorCollector errorCollector, Function> computationStateFetcher) { @@ -475,8 +479,18 @@ public ArrayList getStatsReceived() { } @Override - public void setWindmillServiceEndpoints(Set endpoints) throws IOException { - isReady = true; + public void setWindmillServiceEndpoints(Set endpoints) { + synchronized (this) { + this.dispatcherEndpoints = ImmutableSet.copyOf(endpoints); + isReady = true; + } + } + + @Override + public ImmutableSet getWindmillServiceEndpoints() { + synchronized (this) { + return dispatcherEndpoints; + } } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java new file mode 100644 index 000000000000..3ec951d9c142 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static org.junit.Assert.*; + +import java.io.*; +import java.util.Optional; +import org.apache.beam.runners.dataflow.worker.FakeWindmillServer; +import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.junit.Test; +import org.junit.rules.ErrorCollector; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChannelzServletTest { + + @Test + public void testRendersAllChannels() throws UnsupportedEncodingException { + String windmill1 = "WindmillHost1"; + String windmill2 = "WindmillHost2"; + String nonWindmill1 = "NonWindmillHost1"; + String someOtherHost1 = "SomeOtherHost2"; + ManagedChannel[] unusedChannels = + new ManagedChannel[] { + InProcessChannelBuilder.forName(windmill1).build(), + InProcessChannelBuilder.forName(windmill2).build(), + InProcessChannelBuilder.forName(nonWindmill1).build(), + InProcessChannelBuilder.forName(someOtherHost1).build() + }; + StreamingDataflowWorkerOptions options = + PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class); + FakeWindmillServer fakeWindmillServer = + new FakeWindmillServer(new ErrorCollector(), s -> Optional.empty()); + fakeWindmillServer.setWindmillServiceEndpoints( + ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); + options.setChannelzShowOnlyWindmillServiceChannels(false); + ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer); + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + channelzServlet.captureData(writer); + writer.flush(); + String channelzData = stringWriter.toString(); + assertTrue(channelzData.contains(windmill1)); + assertTrue(channelzData.contains(windmill2)); + assertTrue(channelzData.contains(nonWindmill1)); + assertTrue(channelzData.contains(someOtherHost1)); + } + + @Test + public void testRendersOnlyWindmillChannels() throws UnsupportedEncodingException { + String windmill1 = "WindmillHost1"; + String windmill2 = "WindmillHost2"; + String nonWindmill1 = "NonWindmillHost1"; + String someOtherHost1 = "SomeOtherHost2"; + ManagedChannel[] unusedChannels = + new ManagedChannel[] { + InProcessChannelBuilder.forName(windmill1).build(), + InProcessChannelBuilder.forName(windmill2).build(), + InProcessChannelBuilder.forName(nonWindmill1).build(), + InProcessChannelBuilder.forName(someOtherHost1).build() + }; + StreamingDataflowWorkerOptions options = + PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class); + FakeWindmillServer fakeWindmillServer = + new FakeWindmillServer(new ErrorCollector(), s -> Optional.empty()); + fakeWindmillServer.setWindmillServiceEndpoints( + ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); + options.setChannelzShowOnlyWindmillServiceChannels(true); + ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer); + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + channelzServlet.captureData(writer); + writer.flush(); + String channelzData = stringWriter.toString(); + assertTrue(channelzData.contains(windmill1)); + assertTrue(channelzData.contains(windmill2)); + // The logic does a substring match on the target + // NonWindmillHost1 matches since it contains WindmillHost1 which is a windmill host + assertTrue(channelzData.contains(nonWindmill1)); + assertFalse(channelzData.contains(someOtherHost1)); + } +}