Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data #30211

Merged
merged 9 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ class BeamModulePlugin implements Plugin<Project> {
testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version",
truth : "com.google.truth:truth:1.1.5",
threetenbp : "org.threeten:threetenbp:1.6.8",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.1",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
vendored_guava_32_1_2_jre : "org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1",
vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
woodstox_core_asl : "org.codehaus.woodstox:woodstox-core-asl:4.4.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ public Dataflow create(PipelineOptions options) {

void setWindmillGetDataStreamCount(int value);

@Description("If true, will only show windmill service channels on /channelz")
@Default.Boolean(true)
boolean getChannelzShowOnlyWindmillServiceChannels();

void setChannelzShowOnlyWindmillServiceChannels(boolean value);

/**
* The amount of time before UnboundedReaders are considered idle and closed during streaming
* execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -735,6 +736,13 @@ public void startStatusPages() {
if (debugCaptureManager != null) {
debugCaptureManager.start();
}

if (windmillServiceEnabled) {
ChannelzServlet channelzServlet = new ChannelzServlet(options, windmillServer);
statusPages.addServlet(channelzServlet);
statusPages.addCapturePage(channelzServlet);
}

statusPages.addServlet(stateCache.statusServlet());
statusPages.addServlet(new SpecsServlet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/**
Expand Down Expand Up @@ -53,6 +54,11 @@ public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws IOExc
// This class is used for windmill appliance and local runner tests.
}

@Override
public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
return ImmutableSet.of();
}

@Override
public boolean isReady() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/** Stub for communicating with a Windmill server. */
Expand All @@ -40,6 +41,11 @@ public abstract class WindmillServerStub implements StatusDataProvider {
*/
public abstract void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws IOException;

/*
* Returns the windmill service endpoints set by setWindmillServiceEndpoints
*/
public abstract ImmutableSet<HostAndPort> getWindmillServiceEndpoints();

/** Returns true iff this WindmillServerStub is ready for making API calls. */
public abstract boolean isReady();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;

/** Respond to /channelz with the GRPC channelz data. */
public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable {
private static final String PATH = "/channelz";
private static final int MAX_TOP_CHANNELS_TO_RETURN = 100;

private final ChannelzService channelzService;
private final WindmillServerStub windmillServerStub;
private final boolean showOnlyWindmillServiceChannels;

public ChannelzServlet(
StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) {
super(PATH);
channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN);
this.windmillServerStub = windmillServerStub;
showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels();
}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter writer = response.getWriter();
captureData(writer);
}

@Override
public String pageName() {
return PATH;
}

@Override
public void captureData(PrintWriter writer) {
writer.println("<html>");
writer.println("<h1>Channelz</h1>");
appendTopChannels(writer);
writer.println("</html>");
}

// channelz proto says there won't be cycles in the ref graph.
// we track visited ids to be defensive and prevent any accidental cycles.
static class VisitedSets {
Set<Long> channels = new HashSet<>();
Set<Long> subchannels = new HashSet<>();
}

private void appendTopChannels(PrintWriter writer) {
SettableFuture<GetTopChannelsResponse> future = SettableFuture.create();
channelzService.getTopChannels(
GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future));
GetTopChannelsResponse topChannelsResponse;
try {
topChannelsResponse = future.get();
} catch (Exception e) {
String msg = "Failed to get channelz: " + e.getMessage();
writer.println(msg);
return;
}

List<Channel> topChannels = topChannelsResponse.getChannelList();
if (showOnlyWindmillServiceChannels) {
topChannels = filterWindmillChannels(topChannels);
}
writer.println("<h2>Top Level Channels</h2>");
writer.println("<table border='1'>");
VisitedSets visitedSets = new VisitedSets();
for (Channel channel : topChannels) {
writer.println("<tr>");
writer.println("<td>");
writer.println("TopChannelId: " + channel.getRef().getChannelId());
writer.println("</td>");
writer.println("<td>");
appendChannel(channel, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
writer.println("</table>");
}

private List<Channel> filterWindmillChannels(List<Channel> channels) {
ImmutableSet<HostAndPort> windmillServiceEndpoints =
windmillServerStub.getWindmillServiceEndpoints();
Set<String> windmillServiceHosts =
windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet());
List<Channel> windmillChannels = new ArrayList<>();
for (Channel channel : channels) {
for (String windmillServiceHost : windmillServiceHosts) {
if (channel.getData().getTarget().contains(windmillServiceHost)) {
windmillChannels.add(channel);
break;
}
}
}
return windmillChannels;
}

private void appendChannels(
List<ChannelRef> channelRefs, PrintWriter writer, VisitedSets visitedSets) {
for (ChannelRef channelRef : channelRefs) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Channel: " + channelRef.getChannelId());
writer.println("</td>");
writer.println("<td>");
appendChannel(channelRef, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) {
if (visitedSets.channels.contains(channelRef.getChannelId())) {
String msg = "Duplicate Channel Id: " + channelRef;
writer.println(msg);
return;
}
visitedSets.channels.add(channelRef.getChannelId());
SettableFuture<GetChannelResponse> future = SettableFuture.create();
channelzService.getChannel(
GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(),
getStreamObserver(future));
Channel channel;
try {
channel = future.get().getChannel();
} catch (Exception e) {
String msg = "Failed to get Channel: " + channelRef;
writer.println(msg + " Exception: " + e.getMessage());
return;
}
appendChannel(channel, writer, visitedSets);
}

private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) {
writer.println("<table border='1'>");
writer.println("<tr>");
writer.println("<td>");
writer.println("ChannelId: " + channel.getRef().getChannelId());
writer.println("</td>");
writer.println("<td><pre>" + channel);
writer.println("</pre></td>");
writer.println("</tr>");
appendChannels(channel.getChannelRefList(), writer, visitedSets);
appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets);
appendSockets(channel.getSocketRefList(), writer);
writer.println("</table>");
}

private void appendSubChannels(
List<SubchannelRef> subchannelRefList, PrintWriter writer, VisitedSets visitedSets) {
for (SubchannelRef subchannelRef : subchannelRefList) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Sub Channel: " + subchannelRef.getSubchannelId());
writer.println("</td>");
writer.println("<td>");
appendSubchannel(subchannelRef, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendSubchannel(
SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) {
if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) {
String msg = "Duplicate Subchannel Id: " + subchannelRef;
writer.println(msg);
return;
}
visitedSets.subchannels.add(subchannelRef.getSubchannelId());
SettableFuture<GetSubchannelResponse> future = SettableFuture.create();
channelzService.getSubchannel(
GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()).build(),
getStreamObserver(future));
Subchannel subchannel;
try {
subchannel = future.get().getSubchannel();
} catch (Exception e) {
String msg = "Failed to get Subchannel: " + subchannelRef;
writer.println(msg + " Exception: " + e.getMessage());
return;
}

writer.println("<table border='1'>");
writer.println("<tr>");
writer.println("<td>SubchannelId: " + subchannelRef.getSubchannelId());
writer.println("</td>");
writer.println("<td><pre>" + subchannel.toString());
writer.println("</pre></td>");
writer.println("</tr>");
appendChannels(subchannel.getChannelRefList(), writer, visitedSets);
appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets);
appendSockets(subchannel.getSocketRefList(), writer);
writer.println("</table>");
}

private void appendSockets(List<SocketRef> socketRefList, PrintWriter writer) {
for (SocketRef socketRef : socketRefList) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Socket: " + socketRef.getSocketId());
writer.println("</td>");
writer.println("<td>");
appendSocket(socketRef, writer);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendSocket(SocketRef socketRef, PrintWriter writer) {
SettableFuture<GetSocketResponse> future = SettableFuture.create();
channelzService.getSocket(
GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()).build(),
getStreamObserver(future));
Socket socket;
try {
socket = future.get().getSocket();
} catch (Exception e) {
String msg = "Failed to get Socket: " + socketRef;
writer.println(msg + " Exception: " + e.getMessage());
return;
}
writer.println("<pre>" + socket + "</pre>");
}

private <T> StreamObserver<T> getStreamObserver(SettableFuture<T> future) {
return new StreamObserver<T>() {
@Nullable T response = null;

@Override
public void onNext(T message) {
response = message;
}

@Override
public void onError(Throwable throwable) {
future.setException(throwable);
}

@Override
public void onCompleted() {
future.set(response);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
: randomlySelectNextStub(windmillServiceStubs));
}

ImmutableSet<HostAndPort> getDispatcherEndpoints() {
return dispatcherStubs.get().dispatcherEndpoints();
}

CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs =
dispatcherStubs.get().windmillMetadataServiceStubs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
}

@Override
public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
return dispatcherClient.getDispatcherEndpoints();
}

@Override
public boolean isReady() {
return dispatcherClient.hasInitializedEndpoints();
Expand Down
Loading
Loading