Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data #30211

Merged
merged 9 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ class BeamModulePlugin implements Plugin<Project> {
testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version",
truth : "com.google.truth:truth:1.1.5",
threetenbp : "org.threeten:threetenbp:1.6.8",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.1",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
vendored_guava_32_1_2_jre : "org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1",
vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
woodstox_core_asl : "org.codehaus.woodstox:woodstox-core-asl:4.4.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package org.apache.beam.runners.dataflow.worker.status;

import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/** Respond to /channelz with the GRPC channelz data. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable {
private static final String PATH = "/channelz";
private static final Logger LOG = LoggerFactory.getLogger(ChannelzServlet.class);
private static final int MAX_TOP_CHANNELS_TO_RETURN = 100;

private final ChannelzService channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN);

public ChannelzServlet() {
super(PATH);
}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter writer = response.getWriter();
captureData(writer);
}

@Override
public String pageName() {
return PATH;
}

@Override
public void captureData(PrintWriter writer) {
writer.println("<html>");
writer.println("<h1>Channelz</h1>");
appendTopChannels(writer);
writer.println("</html>");
}

// channelz proto says there may not be cycles in the ref graph
// we track visited ids to prevent any accidental cycles,
static class VisitedSets {
Set<Long> channels = new HashSet<>();
Set<Long> subchannels = new HashSet<>();
}

private void appendTopChannels(PrintWriter writer) {
SettableFuture<GetTopChannelsResponse> future = SettableFuture.create();
channelzService.getTopChannels(
GetTopChannelsRequest.newBuilder()
.build(), getStreamObserver(future));
GetTopChannelsResponse topChannelsResponse;
try {
topChannelsResponse = future.get();
} catch (Exception e) {
String msg = "Failed to get channelz: " + e.getMessage();
LOG.warn(msg, e);
writer.println(msg);
return;
}
writer.println("<h2>Top Level Channels</h2>");
writer.println("<table border='1'>");
VisitedSets visitedSets = new VisitedSets();
for (Channel channel : topChannelsResponse.getChannelList()) {
writer.println("<tr>");
writer.println("<td>");
writer.println("TopChannelId: " + channel.getRef().getChannelId());
writer.println("</td>");
writer.println("<td>");
appendChannel(channel, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
writer.println("</table>");
}



private void appendChannels(List<ChannelRef> channelRefs, PrintWriter writer, VisitedSets visitedSets) {
for (ChannelRef channelRef : channelRefs) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Channel: " + channelRef.getChannelId());
writer.println("</td>");
writer.println("<td>");
appendChannel(channelRef, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) {
if (visitedSets.channels.contains(channelRef.getChannelId())) {
String msg = "Duplicate Channel Id: " + channelRef;
LOG.warn(msg);
writer.println(msg);
return;
}
visitedSets.channels.add(channelRef.getChannelId());
SettableFuture<GetChannelResponse> future = SettableFuture.create();
channelzService.getChannel(
GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(),
getStreamObserver(future));
Channel channel;
try {
channel = future.get().getChannel();
} catch (Exception e) {
String msg = "Failed to get Channel: " + channelRef;
LOG.warn(msg, e);
writer.println(msg + " Exception: " + e.getMessage());
return;
}
appendChannel(channel, writer, visitedSets);
}

private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) {
writer.println("<table border='1'>");
writer.println("<tr>");
writer.println("<td>");
writer.println("ChannelId: " + channel.getRef().getChannelId());
writer.println("</td>");
writer.println("<td><pre>" + channel);
writer.println("</pre></td>");
writer.println("</tr>");
appendChannels(channel.getChannelRefList(), writer, visitedSets);
appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets);
appendSockets(channel.getSocketRefList(), writer);
writer.println("</table>");
}

private void appendSubChannels(List<SubchannelRef> subchannelRefList, PrintWriter writer, VisitedSets visitedSets) {
for (SubchannelRef subchannelRef : subchannelRefList) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Sub Channel: " + subchannelRef.getSubchannelId());
writer.println("</td>");
writer.println("<td>");
appendSubchannel(subchannelRef, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendSubchannel(SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) {
if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) {
String msg = "Duplicate Subchannel Id: " + subchannelRef;
LOG.warn(msg);
writer.println(msg);
return;
}
visitedSets.subchannels.add(subchannelRef.getSubchannelId());
SettableFuture<GetSubchannelResponse> future = SettableFuture.create();
channelzService.getSubchannel(
GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId())
.build(),
getStreamObserver(future));
Subchannel subchannel;
try {
subchannel = future.get().getSubchannel();
} catch (Exception e) {
String msg = "Failed to get Subchannel: " + subchannelRef;
LOG.warn(msg, e);
writer.println(msg + " Exception: " + e.getMessage());
return;
}

writer.println("<table border='1'>");
writer.println("<tr>");
writer.println("<td>SubchannelId: " + subchannelRef.getSubchannelId());
writer.println("</td>");
writer.println("<td><pre>" + subchannel.toString());
writer.println("</pre></td>");
writer.println("</tr>");
appendChannels(subchannel.getChannelRefList(), writer, visitedSets);
appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets);
appendSockets(subchannel.getSocketRefList(), writer);
writer.println("</table>");
}

private void appendSockets(List<SocketRef> socketRefList, PrintWriter writer) {
for (SocketRef socketRef : socketRefList) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Socket: " + socketRef.getSocketId());
writer.println("</td>");
writer.println("<td>");
appendSocket(socketRef, writer);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendSocket(SocketRef socketRef, PrintWriter writer) {
SettableFuture<GetSocketResponse> future = SettableFuture.create();
channelzService.getSocket(
GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId())
.build(),
getStreamObserver(future));
Socket socket;
try {
socket = future.get().getSocket();
} catch (Exception e) {
String msg = "Failed to get Socket: " + socketRef;
LOG.warn(msg, e);
writer.println(msg + " Exception: " + e.getMessage());
return;
}
writer.println("<pre>" + socket + "</pre>");
}

private <T> StreamObserver<T> getStreamObserver(SettableFuture<T> future) {
return new StreamObserver<T>() {
T response = null;

@Override
public void onNext(T message) {
response = message;
}

@Override
public void onError(Throwable throwable) {
future.setException(throwable);
}

@Override
public void onCompleted() {
future.set(response);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ public class WorkerStatusPages {
private final List<Capturable> capturePages;
private final StatuszServlet statuszServlet = new StatuszServlet();
private final ThreadzServlet threadzServlet = new ThreadzServlet();
private final ChannelzServlet channelzServlet = new ChannelzServlet();
private final ServletHandler servletHandler = new ServletHandler();


@VisibleForTesting
WorkerStatusPages(Server server, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) {
this.statusServer = server;
Expand All @@ -57,6 +59,7 @@ public class WorkerStatusPages {

// Install the default servlets (threadz, healthz, heapz, jfrz, statusz)
addServlet(threadzServlet);
addServlet(channelzServlet);
addServlet(new HealthzServlet(healthyIndicator));
addServlet(new HeapzServlet(memoryMonitor));
if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
Expand All @@ -67,6 +70,7 @@ public class WorkerStatusPages {
// Add default capture pages (threadz, statusz)
this.capturePages.add(threadzServlet);
this.capturePages.add(statuszServlet);
this.capturePages.add(channelzServlet);
// Add some status pages
addStatusDataProvider("resources", "Resources", memoryMonitor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public final class WindmillChannelFactory {
public static final String LOCALHOST = "localhost";
private static final int DEFAULT_GRPC_PORT = 443;
private static final int MAX_REMOTE_TRACE_EVENTS = 100;

private WindmillChannelFactory() {}

Expand Down Expand Up @@ -111,6 +112,7 @@ private static Channel createRemoteChannel(
.flowControlWindow(10 * 1024 * 1024)
.maxInboundMessageSize(Integer.MAX_VALUE)
.maxInboundMetadataSize(1024 * 1024)
.maxTraceEvents(MAX_REMOTE_TRACE_EVENTS)
.negotiationType(NegotiationType.TLS)
// Set ciphers(null) to not use GCM, which is disabled for Dataflow
// due to it being horribly slow.
Expand Down
Loading