Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data #30211

Merged
merged 9 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public class StreamingDataflowWorker {

private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = Duration.standardMinutes(5);
private static final Random clientIdGenerator = new Random();
private static final String CHANNELZ_PATH = "/channelz";
final WindmillStateCache stateCache;
// Maps from computation ids to per-computation state.
private final ConcurrentMap<String, ComputationState> computationMap;
Expand Down Expand Up @@ -738,7 +739,7 @@ public void startStatusPages() {
}

if (windmillServiceEnabled) {
ChannelzServlet channelzServlet = new ChannelzServlet(options, windmillServer);
ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH, options, windmillServer);
statusPages.addServlet(channelzServlet);
statusPages.addCapturePage(channelzServlet);
}
Expand Down Expand Up @@ -2089,6 +2090,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
}

private class MetricsDataProvider implements StatusDataProvider {

@Override
public void appendSummaryHtml(PrintWriter writer) {
writer.println(workUnitExecutor.summaryHtml());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;

/** Respond to /channelz with the GRPC channelz data. */
/** Respond to /path with the GRPC channelz data. */
public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable {
private static final String PATH = "/channelz";
private static final int MAX_TOP_CHANNELS_TO_RETURN = 100;

private static final int MAX_TOP_CHANNELS_TO_RETURN = 500;

private final ChannelzService channelzService;
private final WindmillServerStub windmillServerStub;
private final boolean showOnlyWindmillServiceChannels;

public ChannelzServlet(
StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) {
super(PATH);
String path, StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) {
super(path);
channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN);
this.windmillServerStub = windmillServerStub;
showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels();
Expand All @@ -66,7 +66,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)

@Override
public String pageName() {
return PATH;
return getPath();
}

@Override
Expand All @@ -80,12 +80,17 @@ public void captureData(PrintWriter writer) {
// channelz proto says there won't be cycles in the ref graph.
// we track visited ids to be defensive and prevent any accidental cycles.
static class VisitedSets {

Set<Long> channels = new HashSet<>();
Set<Long> subchannels = new HashSet<>();
}

private void appendTopChannels(PrintWriter writer) {
SettableFuture<GetTopChannelsResponse> future = SettableFuture.create();
// IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels
// in the worker, we might not return all the windmill channels. If we run into
// such situations, this logic can be modified to loop till we see an empty
// GetTopChannelsResponse response with the end bit set.
channelzService.getTopChannels(
GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future));
GetTopChannelsResponse topChannelsResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testRendersAllChannels() throws UnsupportedEncodingException {
fakeWindmillServer.setWindmillServiceEndpoints(
ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2)));
options.setChannelzShowOnlyWindmillServiceChannels(false);
ChannelzServlet channelzServlet = new ChannelzServlet(options, fakeWindmillServer);
ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer);
StringWriter stringWriter = new StringWriter();
PrintWriter writer = new PrintWriter(stringWriter);
channelzServlet.captureData(writer);
Expand Down Expand Up @@ -88,7 +88,7 @@ public void testRendersOnlyWindmillChannels() throws UnsupportedEncodingExceptio
fakeWindmillServer.setWindmillServiceEndpoints(
ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2)));
options.setChannelzShowOnlyWindmillServiceChannels(true);
ChannelzServlet channelzServlet = new ChannelzServlet(options, fakeWindmillServer);
ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer);
StringWriter stringWriter = new StringWriter();
PrintWriter writer = new PrintWriter(stringWriter);
channelzServlet.captureData(writer);
Expand Down
Loading