From 48579d713a4a164f7754159f622a6d55c2a8f7bc Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 5 Feb 2024 03:19:48 -0800 Subject: [PATCH 1/8] Add Channelz staus page exporting GRPC channelz data --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- .../worker/status/ChannelzServlet.java | 245 ++++++++++++++++++ .../worker/status/WorkerStatusPages.java | 4 + .../grpc/stubs/WindmillChannelFactory.java | 2 + 4 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 10a7ba7f1c84..164ea0eda3e6 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -903,7 +903,7 @@ class BeamModulePlugin implements Plugin { testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version", truth : "com.google.truth:truth:1.1.5", threetenbp : "org.threeten:threetenbp:1.6.8", - vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.1", + vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2", vendored_guava_32_1_2_jre : "org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1", vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2", woodstox_core_asl : "org.codehaus.woodstox:woodstox-core-asl:4.4.1", diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java new file mode 100644 index 000000000000..a3a781228167 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java @@ -0,0 +1,245 @@ +package org.apache.beam.runners.dataflow.worker.status; + +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** Respond to /channelz with the GRPC channelz data. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable { + private static final String PATH = "/channelz"; + private static final Logger LOG = LoggerFactory.getLogger(ChannelzServlet.class); + private static final int MAX_TOP_CHANNELS_TO_RETURN = 100; + + private final ChannelzService channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN); + + public ChannelzServlet() { + super(PATH); + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + response.setStatus(HttpServletResponse.SC_OK); + PrintWriter writer = response.getWriter(); + captureData(writer); + } + + @Override + public String pageName() { + return PATH; + } + + @Override + public void captureData(PrintWriter writer) { + writer.println(""); + writer.println("

Channelz

"); + appendTopChannels(writer); + writer.println(""); + } + + // channelz proto says there may not be cycles in the ref graph + // we track visited ids to prevent any accidental cycles, + static class VisitedSets { + Set channels = new HashSet<>(); + Set subchannels = new HashSet<>(); + } + + private void appendTopChannels(PrintWriter writer) { + SettableFuture future = SettableFuture.create(); + channelzService.getTopChannels( + GetTopChannelsRequest.newBuilder() + .build(), getStreamObserver(future)); + GetTopChannelsResponse topChannelsResponse; + try { + topChannelsResponse = future.get(); + } catch (Exception e) { + String msg = "Failed to get channelz: " + e.getMessage(); + LOG.warn(msg, e); + writer.println(msg); + return; + } + writer.println("

Top Level Channels

"); + writer.println(""); + VisitedSets visitedSets = new VisitedSets(); + for (Channel channel : topChannelsResponse.getChannelList()) { + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + } + writer.println("
"); + writer.println("TopChannelId: " + channel.getRef().getChannelId()); + writer.println(""); + appendChannel(channel, writer, visitedSets); + writer.println("
"); + } + + + + private void appendChannels(List channelRefs, PrintWriter writer, VisitedSets visitedSets) { + for (ChannelRef channelRef : channelRefs) { + writer.println(""); + writer.println(""); + writer.println("Channel: " + channelRef.getChannelId()); + writer.println(""); + writer.println(""); + appendChannel(channelRef, writer, visitedSets); + writer.println(""); + writer.println(""); + } + } + + private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) { + if (visitedSets.channels.contains(channelRef.getChannelId())) { + String msg = "Duplicate Channel Id: " + channelRef; + LOG.warn(msg); + writer.println(msg); + return; + } + visitedSets.channels.add(channelRef.getChannelId()); + SettableFuture future = SettableFuture.create(); + channelzService.getChannel( + GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(), + getStreamObserver(future)); + Channel channel; + try { + channel = future.get().getChannel(); + } catch (Exception e) { + String msg = "Failed to get Channel: " + channelRef; + LOG.warn(msg, e); + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + appendChannel(channel, writer, visitedSets); + } + + private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) { + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + appendChannels(channel.getChannelRefList(), writer, visitedSets); + appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets); + appendSockets(channel.getSocketRefList(), writer); + writer.println("
"); + writer.println("ChannelId: " + channel.getRef().getChannelId()); + writer.println("
" + channel);
+        writer.println("
"); + } + + private void appendSubChannels(List subchannelRefList, PrintWriter writer, VisitedSets visitedSets) { + for (SubchannelRef subchannelRef : subchannelRefList) { + writer.println(""); + writer.println(""); + writer.println("Sub Channel: " + subchannelRef.getSubchannelId()); + writer.println(""); + writer.println(""); + appendSubchannel(subchannelRef, writer, visitedSets); + writer.println(""); + writer.println(""); + } + } + + private void appendSubchannel(SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) { + if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) { + String msg = "Duplicate Subchannel Id: " + subchannelRef; + LOG.warn(msg); + writer.println(msg); + return; + } + visitedSets.subchannels.add(subchannelRef.getSubchannelId()); + SettableFuture future = SettableFuture.create(); + channelzService.getSubchannel( + GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()) + .build(), + getStreamObserver(future)); + Subchannel subchannel; + try { + subchannel = future.get().getSubchannel(); + } catch (Exception e) { + String msg = "Failed to get Subchannel: " + subchannelRef; + LOG.warn(msg, e); + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + appendChannels(subchannel.getChannelRefList(), writer, visitedSets); + appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets); + appendSockets(subchannel.getSocketRefList(), writer); + writer.println("
SubchannelId: " + subchannelRef.getSubchannelId()); + writer.println("
" + subchannel.toString());
+        writer.println("
"); + } + + private void appendSockets(List socketRefList, PrintWriter writer) { + for (SocketRef socketRef : socketRefList) { + writer.println(""); + writer.println(""); + writer.println("Socket: " + socketRef.getSocketId()); + writer.println(""); + writer.println(""); + appendSocket(socketRef, writer); + writer.println(""); + writer.println(""); + } + } + + private void appendSocket(SocketRef socketRef, PrintWriter writer) { + SettableFuture future = SettableFuture.create(); + channelzService.getSocket( + GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()) + .build(), + getStreamObserver(future)); + Socket socket; + try { + socket = future.get().getSocket(); + } catch (Exception e) { + String msg = "Failed to get Socket: " + socketRef; + LOG.warn(msg, e); + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + writer.println("
" + socket + "
"); + } + + private StreamObserver getStreamObserver(SettableFuture future) { + return new StreamObserver() { + T response = null; + + @Override + public void onNext(T message) { + response = message; + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onCompleted() { + future.set(response); + } + }; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java index 08515a1fb33e..91c6f6f96ca1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java @@ -47,8 +47,10 @@ public class WorkerStatusPages { private final List capturePages; private final StatuszServlet statuszServlet = new StatuszServlet(); private final ThreadzServlet threadzServlet = new ThreadzServlet(); + private final ChannelzServlet channelzServlet = new ChannelzServlet(); private final ServletHandler servletHandler = new ServletHandler(); + @VisibleForTesting WorkerStatusPages(Server server, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) { this.statusServer = server; @@ -57,6 +59,7 @@ public class WorkerStatusPages { // Install the default servlets (threadz, healthz, heapz, jfrz, statusz) addServlet(threadzServlet); + addServlet(channelzServlet); addServlet(new HealthzServlet(healthyIndicator)); addServlet(new HeapzServlet(memoryMonitor)); if (Environments.getJavaVersion() != Environments.JavaVersion.java8) { @@ -67,6 +70,7 @@ public class WorkerStatusPages { // Add default capture pages (threadz, statusz) this.capturePages.add(threadzServlet); this.capturePages.add(statuszServlet); + this.capturePages.add(channelzServlet); // Add some status pages addStatusDataProvider("resources", "Resources", memoryMonitor); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java index 484d433ca6e7..bd1c5c8bc213 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java @@ -34,6 +34,7 @@ public final class WindmillChannelFactory { public static final String LOCALHOST = "localhost"; private static final int DEFAULT_GRPC_PORT = 443; + private static final int MAX_REMOTE_TRACE_EVENTS = 100; private WindmillChannelFactory() {} @@ -111,6 +112,7 @@ private static Channel createRemoteChannel( .flowControlWindow(10 * 1024 * 1024) .maxInboundMessageSize(Integer.MAX_VALUE) .maxInboundMetadataSize(1024 * 1024) + .maxTraceEvents(MAX_REMOTE_TRACE_EVENTS) .negotiationType(NegotiationType.TLS) // Set ciphers(null) to not use GCM, which is disabled for Dataflow // due to it being horribly slow. From 3e65e335e886eb2e283788ad65e4ae78b9fd627b Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 7 Feb 2024 06:46:30 -0800 Subject: [PATCH 2/8] Address review comments --- .../dataflow/worker/BatchDataflowWorker.java | 2 +- .../worker/StreamingDataflowWorker.java | 3 +- .../StreamingDataflowWorkerOptions.java | 5 + .../worker/status/ChannelzServlet.java | 245 --------------- .../worker/status/WorkerStatusPages.java | 27 +- .../worker/windmill/WindmillServerBase.java | 6 + .../worker/windmill/WindmillServerStub.java | 6 + .../windmill/client/grpc/ChannelzServlet.java | 284 ++++++++++++++++++ .../client/grpc/GrpcDispatcherClient.java | 22 +- .../client/grpc/GrpcWindmillServer.java | 5 + .../dataflow/worker/FakeWindmillServer.java | 22 +- .../worker/status/WorkerStatusPagesTest.java | 10 +- .../client/grpc/ChannelzServletTest.java | 103 +++++++ 13 files changed, 471 insertions(+), 269 deletions(-) delete mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java index 7407c97619b4..a94a50d35758 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java @@ -146,7 +146,7 @@ protected BatchDataflowWorker( this.memoryMonitor = MemoryMonitor.fromOptions(options); this.statusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, this.memoryMonitor, () -> true); + WorkerStatusPages.create(options, DEFAULT_STATUS_PORT, this.memoryMonitor, () -> true); if (!DataflowRunner.hasExperiment(options, "disable_debug_capture")) { this.debugCaptureManager = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 3ba27bd852fc..2f6ce56ecd50 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -314,7 +314,8 @@ public class StreamingDataflowWorker { this.executorSupplier = executorSupplier; this.windmillServiceEnabled = options.isEnableStreamingEngine(); this.memoryMonitor = MemoryMonitor.fromOptions(options); - this.statusPages = WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor, () -> true); + this.statusPages = + WorkerStatusPages.create(options, DEFAULT_STATUS_PORT, memoryMonitor, () -> true); if (windmillServiceEnabled) { this.debugCaptureManager = new DebugCapture.Manager(options, statusPages.getDebugCapturePages()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java index bacfa1eef63b..0bcbe7cd81c8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java @@ -130,6 +130,11 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt void setWindmillMessagesBetweenIsReadyChecks(int value); + @Default.Boolean(true) + boolean getChannelzShowOnlyWindmillServiceChannels(); + + void setChannelzShowOnlyWindmillServiceChannels(boolean value); + /** * Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for * backwards compatibility. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java deleted file mode 100644 index a3a781228167..000000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ChannelzServlet.java +++ /dev/null @@ -1,245 +0,0 @@ -package org.apache.beam.runners.dataflow.worker.status; - -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** Respond to /channelz with the GRPC channelz data. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable { - private static final String PATH = "/channelz"; - private static final Logger LOG = LoggerFactory.getLogger(ChannelzServlet.class); - private static final int MAX_TOP_CHANNELS_TO_RETURN = 100; - - private final ChannelzService channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN); - - public ChannelzServlet() { - super(PATH); - } - - @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - response.setStatus(HttpServletResponse.SC_OK); - PrintWriter writer = response.getWriter(); - captureData(writer); - } - - @Override - public String pageName() { - return PATH; - } - - @Override - public void captureData(PrintWriter writer) { - writer.println(""); - writer.println("

Channelz

"); - appendTopChannels(writer); - writer.println(""); - } - - // channelz proto says there may not be cycles in the ref graph - // we track visited ids to prevent any accidental cycles, - static class VisitedSets { - Set channels = new HashSet<>(); - Set subchannels = new HashSet<>(); - } - - private void appendTopChannels(PrintWriter writer) { - SettableFuture future = SettableFuture.create(); - channelzService.getTopChannels( - GetTopChannelsRequest.newBuilder() - .build(), getStreamObserver(future)); - GetTopChannelsResponse topChannelsResponse; - try { - topChannelsResponse = future.get(); - } catch (Exception e) { - String msg = "Failed to get channelz: " + e.getMessage(); - LOG.warn(msg, e); - writer.println(msg); - return; - } - writer.println("

Top Level Channels

"); - writer.println(""); - VisitedSets visitedSets = new VisitedSets(); - for (Channel channel : topChannelsResponse.getChannelList()) { - writer.println(""); - writer.println(""); - writer.println(""); - writer.println(""); - } - writer.println("
"); - writer.println("TopChannelId: " + channel.getRef().getChannelId()); - writer.println(""); - appendChannel(channel, writer, visitedSets); - writer.println("
"); - } - - - - private void appendChannels(List channelRefs, PrintWriter writer, VisitedSets visitedSets) { - for (ChannelRef channelRef : channelRefs) { - writer.println(""); - writer.println(""); - writer.println("Channel: " + channelRef.getChannelId()); - writer.println(""); - writer.println(""); - appendChannel(channelRef, writer, visitedSets); - writer.println(""); - writer.println(""); - } - } - - private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) { - if (visitedSets.channels.contains(channelRef.getChannelId())) { - String msg = "Duplicate Channel Id: " + channelRef; - LOG.warn(msg); - writer.println(msg); - return; - } - visitedSets.channels.add(channelRef.getChannelId()); - SettableFuture future = SettableFuture.create(); - channelzService.getChannel( - GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(), - getStreamObserver(future)); - Channel channel; - try { - channel = future.get().getChannel(); - } catch (Exception e) { - String msg = "Failed to get Channel: " + channelRef; - LOG.warn(msg, e); - writer.println(msg + " Exception: " + e.getMessage()); - return; - } - appendChannel(channel, writer, visitedSets); - } - - private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) { - writer.println(""); - writer.println(""); - writer.println(""); - writer.println(""); - writer.println(""); - appendChannels(channel.getChannelRefList(), writer, visitedSets); - appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets); - appendSockets(channel.getSocketRefList(), writer); - writer.println("
"); - writer.println("ChannelId: " + channel.getRef().getChannelId()); - writer.println("
" + channel);
-        writer.println("
"); - } - - private void appendSubChannels(List subchannelRefList, PrintWriter writer, VisitedSets visitedSets) { - for (SubchannelRef subchannelRef : subchannelRefList) { - writer.println(""); - writer.println(""); - writer.println("Sub Channel: " + subchannelRef.getSubchannelId()); - writer.println(""); - writer.println(""); - appendSubchannel(subchannelRef, writer, visitedSets); - writer.println(""); - writer.println(""); - } - } - - private void appendSubchannel(SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) { - if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) { - String msg = "Duplicate Subchannel Id: " + subchannelRef; - LOG.warn(msg); - writer.println(msg); - return; - } - visitedSets.subchannels.add(subchannelRef.getSubchannelId()); - SettableFuture future = SettableFuture.create(); - channelzService.getSubchannel( - GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()) - .build(), - getStreamObserver(future)); - Subchannel subchannel; - try { - subchannel = future.get().getSubchannel(); - } catch (Exception e) { - String msg = "Failed to get Subchannel: " + subchannelRef; - LOG.warn(msg, e); - writer.println(msg + " Exception: " + e.getMessage()); - return; - } - - writer.println(""); - writer.println(""); - writer.println(""); - writer.println(""); - writer.println(""); - appendChannels(subchannel.getChannelRefList(), writer, visitedSets); - appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets); - appendSockets(subchannel.getSocketRefList(), writer); - writer.println("
SubchannelId: " + subchannelRef.getSubchannelId()); - writer.println("
" + subchannel.toString());
-        writer.println("
"); - } - - private void appendSockets(List socketRefList, PrintWriter writer) { - for (SocketRef socketRef : socketRefList) { - writer.println(""); - writer.println(""); - writer.println("Socket: " + socketRef.getSocketId()); - writer.println(""); - writer.println(""); - appendSocket(socketRef, writer); - writer.println(""); - writer.println(""); - } - } - - private void appendSocket(SocketRef socketRef, PrintWriter writer) { - SettableFuture future = SettableFuture.create(); - channelzService.getSocket( - GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()) - .build(), - getStreamObserver(future)); - Socket socket; - try { - socket = future.get().getSocket(); - } catch (Exception e) { - String msg = "Failed to get Socket: " + socketRef; - LOG.warn(msg, e); - writer.println(msg + " Exception: " + e.getMessage()); - return; - } - writer.println("
" + socket + "
"); - } - - private StreamObserver getStreamObserver(SettableFuture future) { - return new StreamObserver() { - T response = null; - - @Override - public void onNext(T message) { - response = message; - } - - @Override - public void onError(Throwable throwable) { - future.setException(throwable); - } - - @Override - public void onCompleted() { - future.set(response); - } - }; - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java index 91c6f6f96ca1..b78a9d8e225c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java @@ -26,8 +26,11 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; +import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletHandler; @@ -47,19 +50,21 @@ public class WorkerStatusPages { private final List capturePages; private final StatuszServlet statuszServlet = new StatuszServlet(); private final ThreadzServlet threadzServlet = new ThreadzServlet(); - private final ChannelzServlet channelzServlet = new ChannelzServlet(); private final ServletHandler servletHandler = new ServletHandler(); - @VisibleForTesting - WorkerStatusPages(Server server, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) { + WorkerStatusPages( + DataflowWorkerHarnessOptions options, + Server server, + MemoryMonitor memoryMonitor, + BooleanSupplier healthyIndicator) { this.statusServer = server; this.capturePages = new ArrayList<>(); this.statusServer.setHandler(servletHandler); // Install the default servlets (threadz, healthz, heapz, jfrz, statusz) addServlet(threadzServlet); - addServlet(channelzServlet); + addServlet(new HealthzServlet(healthyIndicator)); addServlet(new HeapzServlet(memoryMonitor)); if (Environments.getJavaVersion() != Environments.JavaVersion.java8) { @@ -70,18 +75,26 @@ public class WorkerStatusPages { // Add default capture pages (threadz, statusz) this.capturePages.add(threadzServlet); this.capturePages.add(statuszServlet); - this.capturePages.add(channelzServlet); + if (options.isStreaming()) { + ChannelzServlet channelzServlet = + new ChannelzServlet(options.as(StreamingDataflowWorkerOptions.class)); + addServlet(channelzServlet); + this.capturePages.add(channelzServlet); + } // Add some status pages addStatusDataProvider("resources", "Resources", memoryMonitor); } public static WorkerStatusPages create( - int defaultStatusPort, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) { + DataflowWorkerHarnessOptions options, + int defaultStatusPort, + MemoryMonitor memoryMonitor, + BooleanSupplier healthyIndicator) { int statusPort = defaultStatusPort; if (System.getProperties().containsKey("status_port")) { statusPort = Integer.parseInt(System.getProperty("status_port")); } - return new WorkerStatusPages(new Server(statusPort), memoryMonitor, healthyIndicator); + return new WorkerStatusPages(options, new Server(statusPort), memoryMonitor, healthyIndicator); } /** Start the server. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java index 8caa79cd3f76..a1160b4f98d9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** @@ -53,6 +54,11 @@ public void setWindmillServiceEndpoints(Set endpoints) throws IOExc // This class is used for windmill appliance and local runner tests. } + @Override + public ImmutableSet getWindmillServiceEndpoints() { + return ImmutableSet.of(); + } + @Override public boolean isReady() { return true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java index 25581bee2089..19022fda1c75 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java @@ -28,6 +28,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** Stub for communicating with a Windmill server. */ @@ -43,6 +44,11 @@ public abstract class WindmillServerStub implements StatusDataProvider { */ public abstract void setWindmillServiceEndpoints(Set endpoints) throws IOException; + /* + * Returns the windmill service endpoints set by setWindmillServiceEndpoints + */ + public abstract ImmutableSet getWindmillServiceEndpoints(); + /** Returns true iff this WindmillServerStub is ready for making API calls. */ public abstract boolean isReady(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java new file mode 100644 index 000000000000..30c4df774459 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; +import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; +import org.apache.beam.runners.dataflow.worker.status.DebugCapture; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; + +/** Respond to /channelz with the GRPC channelz data. */ +public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable { + private static final String PATH = "/channelz"; + private static final int MAX_TOP_CHANNELS_TO_RETURN = 100; + + private final ChannelzService channelzService; + private final WindmillServerStub windmillServerStub; + private final boolean showOnlyWindmillServiceChannels; + + public ChannelzServlet(StreamingDataflowWorkerOptions options) { + super(PATH); + channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN); + windmillServerStub = options.getWindmillServerStub(); + showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels(); + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + response.setStatus(HttpServletResponse.SC_OK); + PrintWriter writer = response.getWriter(); + captureData(writer); + } + + @Override + public String pageName() { + return PATH; + } + + @Override + public void captureData(PrintWriter writer) { + writer.println(""); + writer.println("

Channelz

"); + appendTopChannels(writer); + writer.println(""); + } + + // channelz proto says there won't be cycles in the ref graph. + // we track visited ids to be defensive and prevent any accidental cycles. + static class VisitedSets { + Set channels = new HashSet<>(); + Set subchannels = new HashSet<>(); + } + + private void appendTopChannels(PrintWriter writer) { + SettableFuture future = SettableFuture.create(); + channelzService.getTopChannels( + GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future)); + GetTopChannelsResponse topChannelsResponse; + try { + topChannelsResponse = future.get(); + } catch (Exception e) { + String msg = "Failed to get channelz: " + e.getMessage(); + writer.println(msg); + return; + } + + List topChannels = topChannelsResponse.getChannelList(); + if (showOnlyWindmillServiceChannels) { + topChannels = filterWindmillChannels(topChannels); + } + writer.println("

Top Level Channels

"); + writer.println(""); + VisitedSets visitedSets = new VisitedSets(); + for (Channel channel : topChannels) { + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + } + writer.println("
"); + writer.println("TopChannelId: " + channel.getRef().getChannelId()); + writer.println(""); + appendChannel(channel, writer, visitedSets); + writer.println("
"); + } + + private List filterWindmillChannels(List channels) { + ImmutableSet windmillServiceEndpoints = + windmillServerStub.getWindmillServiceEndpoints(); + Set windmillServiceHosts = + windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet()); + List windmillChannels = new ArrayList<>(); + for (Channel channel : channels) { + for (String windmillServiceHost : windmillServiceHosts) { + if (channel.getData().getTarget().contains(windmillServiceHost)) { + windmillChannels.add(channel); + break; + } + } + } + return windmillChannels; + } + + private void appendChannels( + List channelRefs, PrintWriter writer, VisitedSets visitedSets) { + for (ChannelRef channelRef : channelRefs) { + writer.println(""); + writer.println(""); + writer.println("Channel: " + channelRef.getChannelId()); + writer.println(""); + writer.println(""); + appendChannel(channelRef, writer, visitedSets); + writer.println(""); + writer.println(""); + } + } + + private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) { + if (visitedSets.channels.contains(channelRef.getChannelId())) { + String msg = "Duplicate Channel Id: " + channelRef; + writer.println(msg); + return; + } + visitedSets.channels.add(channelRef.getChannelId()); + SettableFuture future = SettableFuture.create(); + channelzService.getChannel( + GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(), + getStreamObserver(future)); + Channel channel; + try { + channel = future.get().getChannel(); + } catch (Exception e) { + String msg = "Failed to get Channel: " + channelRef; + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + appendChannel(channel, writer, visitedSets); + } + + private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) { + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + appendChannels(channel.getChannelRefList(), writer, visitedSets); + appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets); + appendSockets(channel.getSocketRefList(), writer); + writer.println("
"); + writer.println("ChannelId: " + channel.getRef().getChannelId()); + writer.println("
" + channel);
+    writer.println("
"); + } + + private void appendSubChannels( + List subchannelRefList, PrintWriter writer, VisitedSets visitedSets) { + for (SubchannelRef subchannelRef : subchannelRefList) { + writer.println(""); + writer.println(""); + writer.println("Sub Channel: " + subchannelRef.getSubchannelId()); + writer.println(""); + writer.println(""); + appendSubchannel(subchannelRef, writer, visitedSets); + writer.println(""); + writer.println(""); + } + } + + private void appendSubchannel( + SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) { + if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) { + String msg = "Duplicate Subchannel Id: " + subchannelRef; + writer.println(msg); + return; + } + visitedSets.subchannels.add(subchannelRef.getSubchannelId()); + SettableFuture future = SettableFuture.create(); + channelzService.getSubchannel( + GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()).build(), + getStreamObserver(future)); + Subchannel subchannel; + try { + subchannel = future.get().getSubchannel(); + } catch (Exception e) { + String msg = "Failed to get Subchannel: " + subchannelRef; + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println(""); + appendChannels(subchannel.getChannelRefList(), writer, visitedSets); + appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets); + appendSockets(subchannel.getSocketRefList(), writer); + writer.println("
SubchannelId: " + subchannelRef.getSubchannelId()); + writer.println("
" + subchannel.toString());
+    writer.println("
"); + } + + private void appendSockets(List socketRefList, PrintWriter writer) { + for (SocketRef socketRef : socketRefList) { + writer.println(""); + writer.println(""); + writer.println("Socket: " + socketRef.getSocketId()); + writer.println(""); + writer.println(""); + appendSocket(socketRef, writer); + writer.println(""); + writer.println(""); + } + } + + private void appendSocket(SocketRef socketRef, PrintWriter writer) { + SettableFuture future = SettableFuture.create(); + channelzService.getSocket( + GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()).build(), + getStreamObserver(future)); + Socket socket; + try { + socket = future.get().getSocket(); + } catch (Exception e) { + String msg = "Failed to get Socket: " + socketRef; + writer.println(msg + " Exception: " + e.getMessage()); + return; + } + writer.println("
" + socket + "
"); + } + + private StreamObserver getStreamObserver(SettableFuture future) { + return new StreamObserver() { + @Nullable T response = null; + + @Override + public void onNext(T message) { + response = message; + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onCompleted() { + future.set(response); + } + }; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index ef9156f9c050..a4e70f540dda 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -21,7 +21,6 @@ import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; @@ -48,7 +47,7 @@ class GrpcDispatcherClient { private final List dispatcherStubs; @GuardedBy("this") - private final Set dispatcherEndpoints; + private ImmutableSet dispatcherEndpoints; @GuardedBy("this") private final Random rand; @@ -56,7 +55,7 @@ class GrpcDispatcherClient { private GrpcDispatcherClient( WindmillStubFactory windmillStubFactory, List dispatcherStubs, - Set dispatcherEndpoints, + ImmutableSet dispatcherEndpoints, Random rand) { this.windmillStubFactory = windmillStubFactory; this.dispatcherStubs = dispatcherStubs; @@ -66,7 +65,7 @@ private GrpcDispatcherClient( static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) { return new GrpcDispatcherClient( - windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random()); + windmillStubFactory, new ArrayList<>(), ImmutableSet.of(), new Random()); } @VisibleForTesting @@ -76,7 +75,10 @@ static GrpcDispatcherClient forTesting( Set dispatcherEndpoints) { Preconditions.checkArgument(dispatcherEndpoints.size() == dispatcherStubs.size()); return new GrpcDispatcherClient( - windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new Random()); + windmillGrpcStubFactory, + dispatcherStubs, + ImmutableSet.copyOf(dispatcherEndpoints), + new Random()); } synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() { @@ -88,6 +90,10 @@ synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() { : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size()))); } + synchronized ImmutableSet getDispatcherEndpoints() { + return dispatcherEndpoints; + } + synchronized boolean isReady() { return !dispatcherStubs.isEmpty(); } @@ -114,10 +120,8 @@ private synchronized void resetDispatcherEndpoints( ImmutableSet newDispatcherEndpoints) { LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", newDispatcherEndpoints); this.dispatcherStubs.clear(); - this.dispatcherEndpoints.clear(); - this.dispatcherEndpoints.addAll(newDispatcherEndpoints); - - dispatcherEndpoints.stream() + this.dispatcherEndpoints = ImmutableSet.copyOf(newDispatcherEndpoints); + this.dispatcherEndpoints.stream() .map(this::createDispatcherStubForWindmillService) .forEach(dispatcherStubs::add); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 64ee5ae0f969..3465b2e0b9c4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -246,6 +246,11 @@ public void setWindmillServiceEndpoints(Set endpoints) { dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints)); } + @Override + public ImmutableSet getWindmillServiceEndpoints() { + return dispatcherClient.getDispatcherEndpoints(); + } + @Override public boolean isReady() { return dispatcherClient.isReady(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index 2cfec6d3139a..bb85ba39039c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationCommitWorkRequest; @@ -61,6 +61,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; @@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory; /** An in-memory Windmill server that offers provided work and data. */ -class FakeWindmillServer extends WindmillServerStub { +public class FakeWindmillServer extends WindmillServerStub { private static final Logger LOG = LoggerFactory.getLogger(FakeWindmillServer.class); private final ResponseQueue workToOffer; private final ResponseQueue dataToOffer; @@ -88,6 +89,9 @@ class FakeWindmillServer extends WindmillServerStub { private boolean dropStreamingCommits = false; private Consumer> processHeartbeatResponses; + @GuardedBy("this") + private ImmutableSet dispatcherEndpoints; + public FakeWindmillServer(ErrorCollector errorCollector) { workToOffer = new ResponseQueue() @@ -476,8 +480,18 @@ public ArrayList getStatsReceived() { } @Override - public void setWindmillServiceEndpoints(Set endpoints) throws IOException { - isReady = true; + public void setWindmillServiceEndpoints(Set endpoints) { + synchronized (this) { + this.dispatcherEndpoints = ImmutableSet.copyOf(endpoints); + isReady = true; + } + } + + @Override + public ImmutableSet getWindmillServiceEndpoints() { + synchronized (this) { + return dispatcherEndpoints; + } } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java index bea39bd7822e..9cda88f29531 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java @@ -21,7 +21,9 @@ import static org.hamcrest.Matchers.containsString; import java.util.function.BooleanSupplier; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.eclipse.jetty.server.LocalConnector; import org.eclipse.jetty.server.Server; import org.junit.After; @@ -45,7 +47,9 @@ public class WorkerStatusPagesTest { @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - wsp = new WorkerStatusPages(server, mockMemoryMonitor, mockHealthyIndicator); + DataflowWorkerHarnessOptions options = + PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class); + wsp = new WorkerStatusPages(options, server, mockMemoryMonitor, mockHealthyIndicator); server.addConnector(connector); wsp.start(); } @@ -74,9 +78,11 @@ public void testHealthzHealthy() throws Exception { @Test public void testHealthzUnhealthy() throws Exception { + DataflowWorkerHarnessOptions options = + PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class); // set up WorkerStatusPages that respond unhealthy status on "healthz" wsp.stop(); - wsp = new WorkerStatusPages(server, mockMemoryMonitor, () -> false); + wsp = new WorkerStatusPages(options, server, mockMemoryMonitor, () -> false); wsp.start(); String response = getPage("/healthz"); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java new file mode 100644 index 000000000000..212ae1707a4b --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static org.junit.Assert.*; + +import java.io.*; +import org.apache.beam.runners.dataflow.worker.FakeWindmillServer; +import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.junit.Test; +import org.junit.rules.ErrorCollector; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChannelzServletTest { + + @Test + public void testRendersAllChannels() throws UnsupportedEncodingException { + String windmill1 = "WindmillHost1"; + String windmill2 = "WindmillHost2"; + String nonWindmill1 = "NonWindmillHost1"; + String someOtherHost1 = "SomeOtherHost2"; + ManagedChannel[] unusedChannels = + new ManagedChannel[] { + InProcessChannelBuilder.forName(windmill1).build(), + InProcessChannelBuilder.forName(windmill2).build(), + InProcessChannelBuilder.forName(nonWindmill1).build(), + InProcessChannelBuilder.forName(someOtherHost1).build() + }; + StreamingDataflowWorkerOptions options = + PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class); + FakeWindmillServer fakeWindmillServer = new FakeWindmillServer(new ErrorCollector()); + fakeWindmillServer.setWindmillServiceEndpoints( + ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); + options.setWindmillServerStub(fakeWindmillServer); + options.setChannelzShowOnlyWindmillServiceChannels(false); + ChannelzServlet channelzServlet = new ChannelzServlet(options); + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + channelzServlet.captureData(writer); + writer.flush(); + String channelzData = stringWriter.toString(); + assertTrue(channelzData.contains(windmill1)); + assertTrue(channelzData.contains(windmill2)); + assertTrue(channelzData.contains(nonWindmill1)); + assertTrue(channelzData.contains(someOtherHost1)); + } + + @Test + public void testRendersOnlyWindmillChannels() throws UnsupportedEncodingException { + String windmill1 = "WindmillHost1"; + String windmill2 = "WindmillHost2"; + String nonWindmill1 = "NonWindmillHost1"; + String someOtherHost1 = "SomeOtherHost2"; + ManagedChannel[] unusedChannels = + new ManagedChannel[] { + InProcessChannelBuilder.forName(windmill1).build(), + InProcessChannelBuilder.forName(windmill2).build(), + InProcessChannelBuilder.forName(nonWindmill1).build(), + InProcessChannelBuilder.forName(someOtherHost1).build() + }; + StreamingDataflowWorkerOptions options = + PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class); + FakeWindmillServer fakeWindmillServer = new FakeWindmillServer(new ErrorCollector()); + fakeWindmillServer.setWindmillServiceEndpoints( + ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); + options.setWindmillServerStub(fakeWindmillServer); + options.setChannelzShowOnlyWindmillServiceChannels(true); + ChannelzServlet channelzServlet = new ChannelzServlet(options); + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + channelzServlet.captureData(writer); + writer.flush(); + String channelzData = stringWriter.toString(); + assertTrue(channelzData.contains(windmill1)); + assertTrue(channelzData.contains(windmill2)); + // The logic does a substring match on the target + // NonWindmillHost1 matches since it contains WindmillHost1 which is a windmill host + assertTrue(channelzData.contains(nonWindmill1)); + assertFalse(channelzData.contains(someOtherHost1)); + } +} From 5acf3ac7092b71066432e9844607c4fd9933e250 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 7 Feb 2024 06:49:49 -0800 Subject: [PATCH 3/8] doc fix --- .../dataflow/worker/options/StreamingDataflowWorkerOptions.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java index 0bcbe7cd81c8..cac77ed34098 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java @@ -130,6 +130,7 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt void setWindmillMessagesBetweenIsReadyChecks(int value); + @Description("If true, will only show windmill service channels on /channelz") @Default.Boolean(true) boolean getChannelzShowOnlyWindmillServiceChannels(); From 991fc86b816f99070b750bd15849b47be90a4e1d Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 15 Feb 2024 22:59:54 -0800 Subject: [PATCH 4/8] cleanups --- .../beam/runners/dataflow/worker/BatchDataflowWorker.java | 2 +- .../runners/dataflow/worker/StreamingDataflowWorker.java | 3 +-- .../runners/dataflow/worker/status/WorkerStatusPages.java | 7 +------ .../worker/windmill/client/grpc/GrpcDispatcherClient.java | 2 +- .../dataflow/worker/status/WorkerStatusPagesTest.java | 6 ------ 5 files changed, 4 insertions(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java index a94a50d35758..7407c97619b4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java @@ -146,7 +146,7 @@ protected BatchDataflowWorker( this.memoryMonitor = MemoryMonitor.fromOptions(options); this.statusPages = - WorkerStatusPages.create(options, DEFAULT_STATUS_PORT, this.memoryMonitor, () -> true); + WorkerStatusPages.create(DEFAULT_STATUS_PORT, this.memoryMonitor, () -> true); if (!DataflowRunner.hasExperiment(options, "disable_debug_capture")) { this.debugCaptureManager = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index a55bf4227fd0..06c05bfcc5bb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -322,8 +322,7 @@ public class StreamingDataflowWorker { this.executorSupplier = executorSupplier; this.windmillServiceEnabled = options.isEnableStreamingEngine(); this.memoryMonitor = MemoryMonitor.fromOptions(options); - this.statusPages = - WorkerStatusPages.create(options, DEFAULT_STATUS_PORT, memoryMonitor, () -> true); + this.statusPages = WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor, () -> true); if (windmillServiceEnabled) { this.debugCaptureManager = new DebugCapture.Manager(options, statusPages.getDebugCapturePages()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java index 5407d198dc60..aaa6cd6d6d36 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java @@ -25,7 +25,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.sdk.util.construction.Environments; @@ -58,7 +57,6 @@ public class WorkerStatusPages { // Install the default servlets (threadz, healthz, heapz, jfrz, statusz) addServlet(threadzServlet); - addServlet(new HealthzServlet(healthyIndicator)); addServlet(new HeapzServlet(memoryMonitor)); if (Environments.getJavaVersion() != Environments.JavaVersion.java8) { @@ -74,10 +72,7 @@ public class WorkerStatusPages { } public static WorkerStatusPages create( - DataflowWorkerHarnessOptions options, - int defaultStatusPort, - MemoryMonitor memoryMonitor, - BooleanSupplier healthyIndicator) { + int defaultStatusPort, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) { int statusPort = defaultStatusPort; if (System.getProperties().containsKey("status_port")) { statusPort = Integer.parseInt(System.getProperty("status_port")); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index 1ada9938f6e8..845d54588e79 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -97,7 +97,7 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() { : randomlySelectNextStub(windmillServiceStubs)); } - synchronized ImmutableSet getDispatcherEndpoints() { + ImmutableSet getDispatcherEndpoints() { return dispatcherStubs.get().dispatcherEndpoints(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java index 75d814ad3d76..bea39bd7822e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPagesTest.java @@ -21,9 +21,7 @@ import static org.hamcrest.Matchers.containsString; import java.util.function.BooleanSupplier; -import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.eclipse.jetty.server.LocalConnector; import org.eclipse.jetty.server.Server; import org.junit.After; @@ -47,8 +45,6 @@ public class WorkerStatusPagesTest { @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - DataflowWorkerHarnessOptions options = - PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class); wsp = new WorkerStatusPages(server, mockMemoryMonitor, mockHealthyIndicator); server.addConnector(connector); wsp.start(); @@ -78,8 +74,6 @@ public void testHealthzHealthy() throws Exception { @Test public void testHealthzUnhealthy() throws Exception { - DataflowWorkerHarnessOptions options = - PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class); // set up WorkerStatusPages that respond unhealthy status on "healthz" wsp.stop(); wsp = new WorkerStatusPages(server, mockMemoryMonitor, () -> false); From 10453f7130877a6e05a38fdb49c97bf6707962f2 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 16 Feb 2024 02:32:18 -0800 Subject: [PATCH 5/8] Address comments --- .../dataflow/worker/StreamingDataflowWorker.java | 4 +++- .../windmill/client/grpc/ChannelzServlet.java | 13 +++++++------ .../windmill/client/grpc/ChannelzServletTest.java | 4 ++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 06c05bfcc5bb..2e0156bae778 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -212,6 +212,7 @@ public class StreamingDataflowWorker { private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = Duration.standardMinutes(5); private static final Random clientIdGenerator = new Random(); + private static final String CHANNELZ_PATH = "/channelz"; final WindmillStateCache stateCache; // Maps from computation ids to per-computation state. private final ConcurrentMap computationMap; @@ -738,7 +739,7 @@ public void startStatusPages() { } if (windmillServiceEnabled) { - ChannelzServlet channelzServlet = new ChannelzServlet(options, windmillServer); + ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH, options, windmillServer); statusPages.addServlet(channelzServlet); statusPages.addCapturePage(channelzServlet); } @@ -2089,6 +2090,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro } private class MetricsDataProvider implements StatusDataProvider { + @Override public void appendSummaryHtml(PrintWriter writer) { writer.println(workUnitExecutor.summaryHtml()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java index 7e41335fd5b5..6b28b3ab6d7f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -39,18 +39,18 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; -/** Respond to /channelz with the GRPC channelz data. */ +/** Respond to /path with the GRPC channelz data. */ public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable { - private static final String PATH = "/channelz"; - private static final int MAX_TOP_CHANNELS_TO_RETURN = 100; + + private static final int MAX_TOP_CHANNELS_TO_RETURN = 500; private final ChannelzService channelzService; private final WindmillServerStub windmillServerStub; private final boolean showOnlyWindmillServiceChannels; public ChannelzServlet( - StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) { - super(PATH); + String path, StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) { + super(path); channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN); this.windmillServerStub = windmillServerStub; showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels(); @@ -66,7 +66,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) @Override public String pageName() { - return PATH; + return getPath(); } @Override @@ -80,6 +80,7 @@ public void captureData(PrintWriter writer) { // channelz proto says there won't be cycles in the ref graph. // we track visited ids to be defensive and prevent any accidental cycles. static class VisitedSets { + Set channels = new HashSet<>(); Set subchannels = new HashSet<>(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java index dd3f31b69075..3ec951d9c142 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java @@ -56,7 +56,7 @@ public void testRendersAllChannels() throws UnsupportedEncodingException { fakeWindmillServer.setWindmillServiceEndpoints( ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); options.setChannelzShowOnlyWindmillServiceChannels(false); - ChannelzServlet channelzServlet = new ChannelzServlet(options, fakeWindmillServer); + ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer); StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); channelzServlet.captureData(writer); @@ -88,7 +88,7 @@ public void testRendersOnlyWindmillChannels() throws UnsupportedEncodingExceptio fakeWindmillServer.setWindmillServiceEndpoints( ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); options.setChannelzShowOnlyWindmillServiceChannels(true); - ChannelzServlet channelzServlet = new ChannelzServlet(options, fakeWindmillServer); + ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer); StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); channelzServlet.captureData(writer); From d10b43b008d1eaa40cf75a3085a78856e764e4f4 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 16 Feb 2024 03:03:55 -0800 Subject: [PATCH 6/8] doc fix --- .../dataflow/worker/windmill/client/grpc/ChannelzServlet.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java index 6b28b3ab6d7f..0e1e524e6dff 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -87,6 +87,10 @@ static class VisitedSets { private void appendTopChannels(PrintWriter writer) { SettableFuture future = SettableFuture.create(); + // IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels + // in the worker, we might not return all the windmill channels. If we run into + // such situations, this logic can be modified to loop till we see an empty + // GetTopChannelsResponse response with the end bit set. channelzService.getTopChannels( GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future)); GetTopChannelsResponse topChannelsResponse; From d70e4e9cd57f77da2dedbe9fe4c982eae6959aea Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 16 Feb 2024 03:08:44 -0800 Subject: [PATCH 7/8] make VisitedSets private --- .../dataflow/worker/windmill/client/grpc/ChannelzServlet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java index 0e1e524e6dff..9b315ec49aad 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -79,7 +79,7 @@ public void captureData(PrintWriter writer) { // channelz proto says there won't be cycles in the ref graph. // we track visited ids to be defensive and prevent any accidental cycles. - static class VisitedSets { + private static class VisitedSets { Set channels = new HashSet<>(); Set subchannels = new HashSet<>(); From 631a96eb0be59f83e31c387ac89a1dde3233611b Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 16 Feb 2024 03:12:07 -0800 Subject: [PATCH 8/8] mark ChannelzServlet as Internal --- .../dataflow/worker/windmill/client/grpc/ChannelzServlet.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java index 9b315ec49aad..9ab02788603e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; import org.apache.beam.runners.dataflow.worker.status.DebugCapture; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; @@ -40,6 +41,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; /** Respond to /path with the GRPC channelz data. */ +@Internal public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable { private static final int MAX_TOP_CHANNELS_TO_RETURN = 500;