Skip to content

Add segment logging to the cli #25950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ public class ClientOptions
@Option(names = "--decimal-data-size", description = "Show data size and rate in base 10 rather than base 2")
public boolean decimalDataSize;

@Option(names = "--segment-logging", description = "Enable spooling protocol segment logging")
public boolean segmentLoggingEnabled;

public enum OutputFormat
{
AUTO,
Expand Down Expand Up @@ -346,6 +349,7 @@ public ClientSession toClientSession(TrinoUri uri)
.toClientSessionBuilder()
.source(uri.getSource().orElse(SOURCE_DEFAULT))
.encoding(encoding)
.segmentLoggingEnabled(segmentLoggingEnabled)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ private boolean isCliSpecificOptions(String name)
case "editingMode":
case "disableAutoSuggestion":
case "decimalDataSize":
case "segmentLoggingEnabled":
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ClientSession
private final boolean compressionDisabled;
private final Optional<String> encoding;
private final Duration heartbeatInterval;
private final boolean segmentLoggingEnabled;

public static Builder builder()
{
Expand Down Expand Up @@ -102,7 +103,8 @@ private ClientSession(
Duration clientRequestTimeout,
boolean compressionDisabled,
Optional<String> encoding,
Duration heartbeatInterval)
Duration heartbeatInterval,
boolean segmentLoggingEnabled)
{
this.server = requireNonNull(server, "server is null");
this.user = requireNonNull(user, "user is null");
Expand All @@ -128,6 +130,7 @@ private ClientSession(
this.compressionDisabled = compressionDisabled;
this.encoding = requireNonNull(encoding, "encoding is null");
this.heartbeatInterval = requireNonNull(heartbeatInterval, "heartbeatInterval is null");
this.segmentLoggingEnabled = segmentLoggingEnabled;

for (String clientTag : clientTags) {
checkArgument(!clientTag.contains(","), "client tag cannot contain ','");
Expand Down Expand Up @@ -286,6 +289,11 @@ public Duration getHeartbeatInterval()
return heartbeatInterval;
}

public boolean isSegmentLoggingEnabled()
{
return segmentLoggingEnabled;
}

@Override
public String toString()
{
Expand All @@ -310,6 +318,7 @@ public String toString()
.add("compressionDisabled", compressionDisabled)
.add("encoding", encoding)
.add("heartbeatInterval", heartbeatInterval)
.add("segmentLoggingEnabled", segmentLoggingEnabled)
.omitNullValues()
.toString();
}
Expand Down Expand Up @@ -340,6 +349,7 @@ public static final class Builder
private boolean compressionDisabled;
private Optional<String> encoding = Optional.empty();
private Duration heartbeatInterval = new Duration(30, SECONDS);
private boolean segmentLoggingEnabled;

private Builder() {}

Expand Down Expand Up @@ -369,6 +379,7 @@ private Builder(ClientSession clientSession)
clientRequestTimeout = clientSession.getClientRequestTimeout();
compressionDisabled = clientSession.isCompressionDisabled();
encoding = clientSession.getEncoding();
segmentLoggingEnabled = clientSession.isSegmentLoggingEnabled();
}

public Builder server(URI server)
Expand Down Expand Up @@ -515,6 +526,12 @@ public Builder heartbeatInterval(Duration heartbeatInterval)
return this;
}

public Builder segmentLoggingEnabled(boolean segmentLoggingEnabled)
{
this.segmentLoggingEnabled = segmentLoggingEnabled;
return this;
}

public ClientSession build()
{
return new ClientSession(
Expand All @@ -541,7 +558,8 @@ public ClientSession build()
clientRequestTimeout,
compressionDisabled,
encoding,
heartbeatInterval);
heartbeatInterval,
segmentLoggingEnabled);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.client;

import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.client.spooling.EncodedQueryData;
import io.trino.client.spooling.InlineSegment;
import io.trino.client.spooling.Segment;

import java.util.List;
import java.util.Optional;

import static io.trino.client.spooling.DataAttribute.ROW_OFFSET;
import static io.trino.client.spooling.DataAttribute.UNCOMPRESSED_SIZE;
import static java.lang.Math.round;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class LoggingResultRowsDecoder
extends ResultRowsDecoder
{
private final ResultRowsDecoder delegate;
private long start = -1;

public LoggingResultRowsDecoder(ResultRowsDecoder delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
System.err.println("Segment logging is enabled, segment details will be logged to stderr");
}

@Override
public ResultRows toRows(QueryResults results)
{
return toRows(results.getColumns(), results.getData());
}

@Override
public ResultRows toRows(List<Column> columns, QueryData data)
{
if (start == -1) {
// Start the timer only once, when the first data frame is processed (could be empty)
start = System.nanoTime();
}
Duration elapsed = new Duration(System.nanoTime() - start, NANOSECONDS).convertToMostSuccinctTimeUnit();
if (data instanceof EncodedQueryData) {
EncodedQueryData encodedData = (EncodedQueryData) data;

for (Segment segment : encodedData.getSegments()) {
long start = 1 + segment.getRequiredAttribute(ROW_OFFSET, Long.class);
long rows = segment.getRowsCount();
long end = start + rows - 1;
boolean isInline = segment instanceof InlineSegment;

int size = segment.getSegmentSize();
long uncompressedSize = segment.getAttribute(UNCOMPRESSED_SIZE, Integer.class)
.orElse(size);

boolean isCompressed = size != uncompressedSize;
double compressionRatio = isCompressed ? round(10000.0d * size / uncompressedSize) / 100.0d : 1.0;

System.err.printf("+ %9s %s rows %6d to %6d size: %9s uncompressed size: %9s compression ratio: %.02f%%%n",
elapsed,
isInline ? "Inlined" : "Spooled",
start,
end,
DataSize.ofBytes(size).succinct(),
DataSize.ofBytes(uncompressedSize).succinct(),
compressionRatio);
}
}

if (data instanceof JsonQueryData) {
JsonQueryData jsonData = (JsonQueryData) data;
System.err.printf("+ %9s Direct rows %6d%n", elapsed, jsonData.getRowsCount());
}

if (data == null || data.isNull()) {
System.err.printf("+ %9s Rows are not ready%n", elapsed);
}

return delegate.toRows(columns, data);
}

@Override
public Optional<String> getEncoding()
{
return delegate.getEncoding();
}

@Override
public void close()
throws Exception
{
System.err.printf("+ %9s No more data %n", new Duration(System.nanoTime() - start, NANOSECONDS).convertToMostSuccinctTimeUnit());
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.ProtocolException;
import java.net.SocketTimeoutException;
import java.net.URI;
Expand Down Expand Up @@ -143,7 +142,7 @@ public StatementClientV1(Call.Factory httpCallFactory, Call.Factory segmentHttpC
this.compressionDisabled = session.isCompressionDisabled();
this.heartbeatInterval = session.getHeartbeatInterval().toMillis() * 1_000_000;

this.resultRowsDecoder = new ResultRowsDecoder(new OkHttpSegmentLoader(requireNonNull(segmentHttpCallFactory, "segmentHttpCallFactory is null")));
this.resultRowsDecoder = createResultRowsDecoder(segmentHttpCallFactory, session.isSegmentLoggingEnabled());

Request request = buildQueryRequest(session, query, session.getEncoding());
// Pass empty as materializedJsonSizeLimit to always materialize the first response
Expand Down Expand Up @@ -647,9 +646,10 @@ public void close()
// releasing all resources and pruning remote segments
try {
Closeables.close(currentRows.get(), false);
resultRowsDecoder.close();
}
catch (IOException e) {
throw new UncheckedIOException(e);
catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -737,4 +737,13 @@ protected List<Object> computeNext()
};
}
}

private static ResultRowsDecoder createResultRowsDecoder(Call.Factory segmentHttpCallFactory, boolean segmentLoggingEnabled)
{
ResultRowsDecoder decoder = new ResultRowsDecoder(new OkHttpSegmentLoader(requireNonNull(segmentHttpCallFactory, "segmentHttpCallFactory is null")));
if (segmentLoggingEnabled) {
return new LoggingResultRowsDecoder(decoder);
}
return decoder;
}
}
Loading