Skip to content

Commit 5b4c45a

Browse files
committed
[client] Refactor code to avoid doing projection for rows twice
This simplifies code and reduce CPU costs as well.
1 parent 8b62653 commit 5b4c45a

File tree

5 files changed

+79
-148
lines changed

5 files changed

+79
-148
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java

+7-22
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,13 @@
2626
import com.alibaba.fluss.record.LogRecordReadContext;
2727
import com.alibaba.fluss.row.GenericRow;
2828
import com.alibaba.fluss.row.InternalRow;
29-
import com.alibaba.fluss.row.ProjectedRow;
3029
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
3130
import com.alibaba.fluss.rpc.protocol.ApiError;
3231
import com.alibaba.fluss.utils.CloseableIterator;
33-
import com.alibaba.fluss.utils.Projection;
3432

3533
import org.slf4j.Logger;
3634
import org.slf4j.LoggerFactory;
3735

38-
import javax.annotation.Nullable;
39-
4036
import java.io.Closeable;
4137
import java.util.ArrayList;
4238
import java.util.Collections;
@@ -61,8 +57,7 @@ abstract class CompletedFetch {
6157
private final Iterator<LogRecordBatch> batches;
6258
private final LogScannerStatus logScannerStatus;
6359
protected final LogRecordReadContext readContext;
64-
@Nullable protected final Projection projection;
65-
protected final InternalRow.FieldGetter[] fieldGetters;
60+
protected final InternalRow.FieldGetter[] selectedFieldGetters;
6661

6762
private LogRecordBatch currentBatch;
6863
private LogRecord lastRecord;
@@ -92,30 +87,20 @@ public CompletedFetch(
9287
this.readContext = readContext;
9388
this.isCheckCrcs = isCheckCrcs;
9489
this.logScannerStatus = logScannerStatus;
95-
this.projection = readContext.getProjection();
9690
this.nextFetchOffset = fetchOffset;
97-
this.fieldGetters = readContext.getProjectedFieldGetters();
91+
this.selectedFieldGetters = readContext.getSelectedFieldGetters();
9892
}
9993

10094
// TODO: optimize this to avoid deep copying the record.
10195
// refactor #fetchRecords to return an iterator which lazily deserialize
10296
// from underlying record stream and arrow buffer.
10397
ScanRecord toScanRecord(LogRecord record) {
104-
GenericRow newRow = new GenericRow(fieldGetters.length);
98+
GenericRow newRow = new GenericRow(selectedFieldGetters.length);
10599
InternalRow internalRow = record.getRow();
106-
for (int i = 0; i < fieldGetters.length; i++) {
107-
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
108-
}
109-
if (projection != null && projection.isReorderingNeeded()) {
110-
return new ScanRecord(
111-
record.logOffset(),
112-
record.timestamp(),
113-
record.getRowKind(),
114-
ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow));
115-
} else {
116-
return new ScanRecord(
117-
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
100+
for (int i = 0; i < selectedFieldGetters.length; i++) {
101+
newRow.setField(i, selectedFieldGetters[i].getFieldOrNull(internalRow));
118102
}
103+
return new ScanRecord(record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
119104
}
120105

121106
boolean isConsumed() {
@@ -243,7 +228,7 @@ private LogRecord nextFetchedRecord() throws Exception {
243228

244229
private void maybeEnsureValid(LogRecordBatch batch) {
245230
if (isCheckCrcs) {
246-
if (projection != null) {
231+
if (readContext.isProjectionPushDowned()) {
247232
LOG.debug("Skipping CRC check for column projected log record batch.");
248233
return;
249234
}

fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java

+6-13
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.alibaba.fluss.config.Configuration;
2828
import com.alibaba.fluss.exception.InvalidMetadataException;
2929
import com.alibaba.fluss.fs.FsPath;
30-
import com.alibaba.fluss.metadata.LogFormat;
3130
import com.alibaba.fluss.metadata.PhysicalTablePath;
3231
import com.alibaba.fluss.metadata.TableBucket;
3332
import com.alibaba.fluss.metadata.TableInfo;
@@ -85,8 +84,8 @@ public class LogFetcher implements Closeable {
8584
private final boolean isPartitioned;
8685
private final LogRecordReadContext readContext;
8786
// TODO this context can be merge with readContext. Introduce it only because log remote read
88-
// currently can only do project when generate scanRecord instead of doing project while read
89-
// bytes from remote file.
87+
// currently can only do project when generate scanRecord instead of doing project while read
88+
// bytes from remote file.
9089
private final LogRecordReadContext remoteReadContext;
9190
@Nullable private final Projection projection;
9291
private final RpcClient rpcClient;
@@ -97,7 +96,6 @@ public class LogFetcher implements Closeable {
9796
private final LogFetchBuffer logFetchBuffer;
9897
private final LogFetchCollector logFetchCollector;
9998
private final RemoteLogDownloader remoteLogDownloader;
100-
private final LogFormat logFormat;
10199

102100
@GuardedBy("this")
103101
private final Set<Integer> nodesWithPendingFetchRequests;
@@ -119,7 +117,6 @@ public LogFetcher(
119117
RemoteFileDownloader remoteFileDownloader) {
120118
this.tablePath = tableInfo.getTablePath();
121119
this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned();
122-
this.logFormat = tableInfo.getTableDescriptor().getLogFormat();
123120
this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection);
124121
this.remoteReadContext =
125122
LogRecordReadContext.createReadContext(tableInfo, true, projection);
@@ -317,6 +314,8 @@ private synchronized void handleFetchLogResponse(
317314
fetchResultForBucket,
318315
readContext,
319316
logScannerStatus,
317+
// skipping CRC check if projection push downed as
318+
// the data is pruned
320319
isCheckCrcs,
321320
fetchOffset);
322321
logFetchBuffer.add(completedFetch);
@@ -423,7 +422,8 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
423422
.setMaxBytes(maxFetchBytes);
424423
PbFetchLogReqForTable reqForTable =
425424
new PbFetchLogReqForTable().setTableId(finalTableId);
426-
if (projectionPushDownEnable()) {
425+
if (readContext.isProjectionPushDowned()) {
426+
assert projection != null;
427427
reqForTable
428428
.setProjectionPushdownEnabled(true)
429429
.setProjectedFields(projection.getProjectionInOrder());
@@ -449,13 +449,6 @@ private List<TableBucket> fetchableBuckets() {
449449
return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket));
450450
}
451451

452-
private boolean projectionPushDownEnable() {
453-
// Currently, only ARROW log format supports projection push down to server. Other log
454-
// formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more
455-
// details.
456-
return projection != null && logFormat == LogFormat.ARROW;
457-
}
458-
459452
private Integer getTableBucketLeader(TableBucket tableBucket) {
460453
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
461454
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {

0 commit comments

Comments
 (0)