From e971cc4711d244764e379bf591880c7fe0d73135 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 27 May 2025 14:43:17 +0800 Subject: [PATCH 01/23] feat: basic table scan planning --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/result.h | 6 +- src/iceberg/table.h | 11 ++-- src/iceberg/table_scan.cc | 122 +++++++++++++++++++++++++++++++++++++ src/iceberg/table_scan.h | 117 +++++++++++++++++++++++++++++++++++ src/iceberg/type_fwd.h | 6 +- 6 files changed, 254 insertions(+), 9 deletions(-) create mode 100644 src/iceberg/table_scan.cc create mode 100644 src/iceberg/table_scan.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0ea47f8a..9b6b2e8c 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -38,6 +38,7 @@ set(ICEBERG_SOURCES sort_order.cc statistics_file.cc table_metadata.cc + table_scan.cc transform.cc transform_function.cc type.cc diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 38d9e381..faee0dd0 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -31,6 +31,7 @@ namespace iceberg { enum class ErrorKind { kAlreadyExists, kCommitStateUnknown, + kDataInvalid, kInvalidArgument, kInvalidExpression, kInvalidSchema, @@ -65,14 +66,15 @@ using Status = Result; /// \brief Macro to define error creation functions #define DEFINE_ERROR_FUNCTION(name) \ template \ - inline auto name(const std::format_string fmt, Args&&... args) \ - -> unexpected { \ + inline auto name(const std::format_string fmt, \ + Args&&... args) -> unexpected { \ return unexpected( \ {ErrorKind::k##name, std::format(fmt, std::forward(args)...)}); \ } DEFINE_ERROR_FUNCTION(AlreadyExists) DEFINE_ERROR_FUNCTION(CommitStateUnknown) +DEFINE_ERROR_FUNCTION(DataInvalid) DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 11a9fc98..a8448183 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -88,10 +88,11 @@ class ICEBERG_EXPORT Table { /// \return a vector of history entries virtual const std::vector>& history() const = 0; - /// \brief Create a new table scan for this table + /// \brief Create a new table scan builder for this table /// - /// Once a table scan is created, it can be refined to project columns and filter data. - virtual std::unique_ptr NewScan() const = 0; + /// Once a table scan builder is created, it can be refined to project columns and + /// filter data. + virtual std::unique_ptr NewScan() const = 0; /// \brief Create a new append API to add files to this table and commit virtual std::shared_ptr NewAppend() = 0; @@ -99,10 +100,8 @@ class ICEBERG_EXPORT Table { /// \brief Create a new transaction API to commit multiple table operations at once virtual std::unique_ptr NewTransaction() = 0; - /// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an - /// IO-less design in the core library. // /// \brief Returns a FileIO to read and write table data and metadata files - // virtual std::shared_ptr io() const = 0; + virtual std::shared_ptr io() const = 0; /// \brief Returns a LocationProvider to provide locations for new data files virtual std::unique_ptr location_provider() const = 0; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc new file mode 100644 index 00000000..cb05234f --- /dev/null +++ b/src/iceberg/table_scan.cc @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/table_scan.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {} + +TableScanBuilder& TableScanBuilder::WithColumnNames( + const std::vector& column_names) { + column_names_ = column_names; + return *this; +} + +TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) { + snapshot_id_ = snapshot_id; + return *this; +} + +TableScanBuilder& TableScanBuilder::WithFilter( + const std::shared_ptr& filter) { + filter_ = filter; + return *this; +} + +Result> TableScanBuilder::Build() { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, snapshot_id_ ? table_.snapshot(*snapshot_id_) + : Result>( + table_.current_snapshot())); + + auto ResolveSchema = [&]() -> Result> { + if (snapshot->schema_id) { + const auto& schemas = table_.schemas(); + if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { + return it->second; + } + return DataInvalid("Schema {} in snapshot {} is not found", *snapshot->schema_id, + snapshot->snapshot_id); + } + return table_.schema(); + }; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSchema()); + + std::vector field_ids; + field_ids.reserve(column_names_.size()); + for (const auto& column_name : column_names_) { + auto field_opt = schema->GetFieldByName(column_name); + if (!field_opt) { + return InvalidArgument("Column {} not found in schema", column_name); + } + field_ids.emplace_back(field_opt.value().get().field_id()); + } + + auto context = std::make_unique( + std::move(snapshot), std::move(schema), std::move(field_ids), std::move(filter_)); + return std::make_unique(std::move(context), table_.io()); +} + +TableScan::TableScan(std::unique_ptr context, + std::shared_ptr file_io) + : context_(std::move(context)), file_io_(std::move(file_io)) {} + +Result>> TableScan::PlanFiles() const { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, + CreateManifestListReader(context_->snapshot_->manifest_list)); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); + + std::vector> tasks; + for (const auto& manifest_file : manifest_files) { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, + CreateManifestReader(manifest_file->manifest_path)); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); + + for (const auto& manifest : manifests) { + const auto& data_file = manifest->data_file; + tasks.emplace_back(std::make_unique( + data_file.file_path, 0, data_file.file_size_in_bytes, data_file.record_count, + data_file.content, data_file.file_format, context_->schema_, + context_->field_ids_, context_->filter_)); + } + } + return tasks; +} + +Result> TableScan::CreateManifestListReader( + const std::string& file_path) const { + return NotImplemented("manifest list reader"); +} + +Result> TableScan::CreateManifestReader( + const std::string& file_path) const { + return NotImplemented("manifest reader"); +} + +} // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h new file mode 100644 index 00000000..50105537 --- /dev/null +++ b/src/iceberg/table_scan.h @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Builder class for creating TableScan instances. +class ICEBERG_EXPORT TableScanBuilder { + public: + /// \brief Constructs a TableScanBuilder for the given table. + /// \param table Reference to the table to scan. + explicit TableScanBuilder(const Table& table); + + /// \brief Sets the snapshot ID to scan. + /// \param snapshot_id The ID of the snapshot. + /// \return Reference to the builder. + TableScanBuilder& WithSnapshotId(int64_t snapshot_id); + + /// \brief Selects columns to include in the scan. + /// \param column_names A list of column names. + /// \return Reference to the builder. + TableScanBuilder& WithColumnNames(const std::vector& column_names); + + /// \brief Applies a filter expression to the scan. + /// \param filter Filter expression to use. + /// \return Reference to the builder. + TableScanBuilder& WithFilter(const std::shared_ptr& filter); + + /// \brief Builds and returns a TableScan instance. + /// \return A Result containing the TableScan or an error. + Result> Build(); + + private: + const Table& table_; + std::vector column_names_; + std::optional snapshot_id_; + std::shared_ptr filter_; +}; + +/// \brief Represents a configured scan operation on a table. +class ICEBERG_EXPORT TableScan { + public: + /// \brief Scan context holding snapshot and scan-specific metadata. + struct ScanContext { + std::shared_ptr snapshot_; ///< Snapshot to scan. + std::shared_ptr schema_; ///< Projected schema. + std::vector field_ids_; ///< Field IDs of selected columns. + std::shared_ptr filter_; ///< Filter expression to apply. + }; + + /// \brief Constructs a TableScan with the given context and file I/O. + /// \param context Scan context including snapshot, schema, and filter. + /// \param file_io File I/O instance for reading manifests and data files. + TableScan(std::unique_ptr context, std::shared_ptr file_io); + + /// \brief Plans the scan tasks by resolving manifests and data files. + /// + /// Returns a list of file scan tasks if successful. + /// \return A Result containing scan tasks or an error. + Result>> PlanFiles() const; + + private: + /// \brief Creates a reader for the manifest list. + /// \param file_path Path to the manifest list file. + /// \return A Result containing the reader or an error. + Result> CreateManifestListReader( + const std::string& file_path) const; + + /// \brief Creates a reader for a manifest file. + /// \param file_path Path to the manifest file. + /// \return A Result containing the reader or an error. + Result> CreateManifestReader( + const std::string& file_path) const; + + std::unique_ptr context_; + std::shared_ptr file_io_; +}; + +/// \brief Represents a task to scan a portion of a data file. +struct ICEBERG_EXPORT FileScanTask { + std::string file_path_; ///< Path to the data file. + uint64_t start_; ///< Start byte offset. + uint64_t length_; ///< Length in bytes to scan. + std::optional record_count_; ///< Optional number of records. + DataFile::Content file_content_; ///< Type of file content. + FileFormatType file_format_; ///< Format of the data file. + std::shared_ptr schema_; ///< Projected schema. + std::vector field_ids_; ///< Field IDs to project. + std::shared_ptr filter_; ///< Filter expression to apply. +}; + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 9fc6bd6c..0c95ea6e 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -91,6 +91,9 @@ class LocationProvider; class SortField; class SortOrder; class Table; +class TableScan; +struct FileScanTask; +class TableScanBuilder; class Transaction; class Transform; class TransformFunction; @@ -119,11 +122,12 @@ class MetadataUpdate; class UpdateRequirement; class AppendFiles; -class TableScan; struct DataFile; struct ManifestEntry; struct ManifestFile; +class ManifestReader; struct ManifestList; +class ManifestListReader; } // namespace iceberg From 5fc697159aea7c6a337a86f918259bd29e7b104a Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 27 May 2025 15:05:36 +0800 Subject: [PATCH 02/23] fix cpp lint --- src/iceberg/result.h | 4 ++-- src/iceberg/type_fwd.h | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/iceberg/result.h b/src/iceberg/result.h index faee0dd0..c5177885 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -66,8 +66,8 @@ using Status = Result; /// \brief Macro to define error creation functions #define DEFINE_ERROR_FUNCTION(name) \ template \ - inline auto name(const std::format_string fmt, \ - Args&&... args) -> unexpected { \ + inline auto name(const std::format_string fmt, Args&&... args) \ + -> unexpected { \ return unexpected( \ {ErrorKind::k##name, std::format(fmt, std::forward(args)...)}); \ } diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0c95ea6e..0b185aae 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -93,6 +93,7 @@ class SortOrder; class Table; class TableScan; struct FileScanTask; +class FileIO; class TableScanBuilder; class Transaction; class Transform; From 6a2cb7449df9451af29c4b8936825c7b9228e810 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 27 May 2025 15:37:17 +0800 Subject: [PATCH 03/23] fix build fail on windows --- src/iceberg/table_scan.cc | 6 +++--- src/iceberg/table_scan.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index cb05234f..4cee9f53 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -87,12 +87,12 @@ TableScan::TableScan(std::unique_ptr context, std::shared_ptr file_io) : context_(std::move(context)), file_io_(std::move(file_io)) {} -Result>> TableScan::PlanFiles() const { +Result>> TableScan::PlanFiles() const { ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, CreateManifestListReader(context_->snapshot_->manifest_list)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); - std::vector> tasks; + std::vector> tasks; for (const auto& manifest_file : manifest_files) { ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, CreateManifestReader(manifest_file->manifest_path)); @@ -100,7 +100,7 @@ Result>> TableScan::PlanFiles() const for (const auto& manifest : manifests) { const auto& data_file = manifest->data_file; - tasks.emplace_back(std::make_unique( + tasks.emplace_back(std::make_shared( data_file.file_path, 0, data_file.file_size_in_bytes, data_file.record_count, data_file.content, data_file.file_format, context_->schema_, context_->field_ids_, context_->filter_)); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 50105537..78f6d42e 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -82,7 +82,7 @@ class ICEBERG_EXPORT TableScan { /// /// Returns a list of file scan tasks if successful. /// \return A Result containing scan tasks or an error. - Result>> PlanFiles() const; + Result>> PlanFiles() const; private: /// \brief Creates a reader for the manifest list. From d71c26ab069f34c9f55aaca14e869bea8cf055df Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 27 May 2025 15:41:26 +0800 Subject: [PATCH 04/23] fix lint --- src/iceberg/table_scan.h | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 78f6d42e..d2ea0e3c 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -103,15 +103,15 @@ class ICEBERG_EXPORT TableScan { /// \brief Represents a task to scan a portion of a data file. struct ICEBERG_EXPORT FileScanTask { - std::string file_path_; ///< Path to the data file. - uint64_t start_; ///< Start byte offset. - uint64_t length_; ///< Length in bytes to scan. - std::optional record_count_; ///< Optional number of records. - DataFile::Content file_content_; ///< Type of file content. - FileFormatType file_format_; ///< Format of the data file. - std::shared_ptr schema_; ///< Projected schema. - std::vector field_ids_; ///< Field IDs to project. - std::shared_ptr filter_; ///< Filter expression to apply. + std::string file_path; ///< Path to the data file. + uint64_t start; ///< Start byte offset. + uint64_t length; ///< Length in bytes to scan. + std::optional record_count; ///< Optional number of records. + DataFile::Content file_content; ///< Type of file content. + FileFormatType file_format; ///< Format of the data file. + std::shared_ptr schema; ///< Projected schema. + std::vector field_ids; ///< Field IDs to project. + std::shared_ptr filter; ///< Filter expression to apply. }; } // namespace iceberg From c6c1a1f9def07099d0d5778c1b554e296a6f4d02 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 27 May 2025 16:18:58 +0800 Subject: [PATCH 05/23] fix some comments --- src/iceberg/result.h | 8 ++++---- src/iceberg/table_scan.cc | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/iceberg/result.h b/src/iceberg/result.h index c5177885..18f4e679 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -31,8 +31,8 @@ namespace iceberg { enum class ErrorKind { kAlreadyExists, kCommitStateUnknown, - kDataInvalid, kInvalidArgument, + kInvalidData, kInvalidExpression, kInvalidSchema, kIOError, @@ -66,15 +66,15 @@ using Status = Result; /// \brief Macro to define error creation functions #define DEFINE_ERROR_FUNCTION(name) \ template \ - inline auto name(const std::format_string fmt, Args&&... args) \ - -> unexpected { \ + inline auto name(const std::format_string fmt, \ + Args&&... args) -> unexpected { \ return unexpected( \ {ErrorKind::k##name, std::format(fmt, std::forward(args)...)}); \ } DEFINE_ERROR_FUNCTION(AlreadyExists) DEFINE_ERROR_FUNCTION(CommitStateUnknown) -DEFINE_ERROR_FUNCTION(DataInvalid) +DEFINE_ERROR_FUNCTION(InvalidData) DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 4cee9f53..37309f74 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -60,7 +60,7 @@ Result> TableScanBuilder::Build() { if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { return it->second; } - return DataInvalid("Schema {} in snapshot {} is not found", *snapshot->schema_id, + return InvalidData("Schema {} in snapshot {} is not found", *snapshot->schema_id, snapshot->snapshot_id); } return table_.schema(); From cd07a0cab5237f3f02e77519d28958971722f9e9 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 27 May 2025 16:22:25 +0800 Subject: [PATCH 06/23] fix clang format --- src/iceberg/result.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 18f4e679..b0e1b94c 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -66,8 +66,8 @@ using Status = Result; /// \brief Macro to define error creation functions #define DEFINE_ERROR_FUNCTION(name) \ template \ - inline auto name(const std::format_string fmt, \ - Args&&... args) -> unexpected { \ + inline auto name(const std::format_string fmt, Args&&... args) \ + -> unexpected { \ return unexpected( \ {ErrorKind::k##name, std::format(fmt, std::forward(args)...)}); \ } From b7becc2ab9f7ef986f073989abd92326d864fc11 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 29 May 2025 10:07:37 +0800 Subject: [PATCH 07/23] fix some comments --- src/iceberg/table_scan.cc | 45 +++++++++++++++++++++------------------ src/iceberg/table_scan.h | 17 +++++++-------- src/iceberg/type_fwd.h | 9 +++++--- 3 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 37309f74..ea86025d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -50,23 +50,25 @@ TableScanBuilder& TableScanBuilder::WithFilter( } Result> TableScanBuilder::Build() { - ICEBERG_ASSIGN_OR_RAISE(auto snapshot, snapshot_id_ ? table_.snapshot(*snapshot_id_) - : Result>( - table_.current_snapshot())); - - auto ResolveSchema = [&]() -> Result> { - if (snapshot->schema_id) { - const auto& schemas = table_.schemas(); - if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { - return it->second; - } + std::shared_ptr snapshot; + if (snapshot_id_) { + ICEBERG_ASSIGN_OR_RAISE(snapshot, table_.snapshot(*snapshot_id_)); + } else { + snapshot = table_.current_snapshot(); + } + + std::shared_ptr schema; + if (snapshot->schema_id) { + const auto& schemas = table_.schemas(); + if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { + schema = it->second; + } else { return InvalidData("Schema {} in snapshot {} is not found", *snapshot->schema_id, snapshot->snapshot_id); } - return table_.schema(); - }; - - ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSchema()); + } else { + schema = table_.schema(); + } std::vector field_ids; field_ids.reserve(column_names_.size()); @@ -78,18 +80,19 @@ Result> TableScanBuilder::Build() { field_ids.emplace_back(field_opt.value().get().field_id()); } - auto context = std::make_unique( - std::move(snapshot), std::move(schema), std::move(field_ids), std::move(filter_)); + TableScan::ScanContext context{.snapshot = std::move(snapshot), + .schema = std::move(schema), + .field_ids = std::move(field_ids), + .filter = std::move(filter_)}; return std::make_unique(std::move(context), table_.io()); } -TableScan::TableScan(std::unique_ptr context, - std::shared_ptr file_io) +TableScan::TableScan(ScanContext context, std::shared_ptr file_io) : context_(std::move(context)), file_io_(std::move(file_io)) {} Result>> TableScan::PlanFiles() const { ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, - CreateManifestListReader(context_->snapshot_->manifest_list)); + CreateManifestListReader(context_.snapshot->manifest_list)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); std::vector> tasks; @@ -102,8 +105,8 @@ Result>> TableScan::PlanFiles() const const auto& data_file = manifest->data_file; tasks.emplace_back(std::make_shared( data_file.file_path, 0, data_file.file_size_in_bytes, data_file.record_count, - data_file.content, data_file.file_format, context_->schema_, - context_->field_ids_, context_->filter_)); + data_file.content, data_file.file_format, context_.schema, context_.field_ids, + context_.filter)); } } return tasks; diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index d2ea0e3c..9076cec4 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -22,8 +22,6 @@ #include #include -#include "iceberg/expression/expression.h" -#include "iceberg/file_io.h" #include "iceberg/manifest_entry.h" #include "iceberg/type_fwd.h" @@ -42,6 +40,7 @@ class ICEBERG_EXPORT TableScanBuilder { TableScanBuilder& WithSnapshotId(int64_t snapshot_id); /// \brief Selects columns to include in the scan. + /// Defaults to none which means select all columns /// \param column_names A list of column names. /// \return Reference to the builder. TableScanBuilder& WithColumnNames(const std::vector& column_names); @@ -67,16 +66,16 @@ class ICEBERG_EXPORT TableScan { public: /// \brief Scan context holding snapshot and scan-specific metadata. struct ScanContext { - std::shared_ptr snapshot_; ///< Snapshot to scan. - std::shared_ptr schema_; ///< Projected schema. - std::vector field_ids_; ///< Field IDs of selected columns. - std::shared_ptr filter_; ///< Filter expression to apply. + std::shared_ptr snapshot; ///< Snapshot to scan. + std::shared_ptr schema; ///< Table schema. + std::vector field_ids; ///< Field IDs of selected columns. + std::shared_ptr filter; ///< Filter expression to apply. }; /// \brief Constructs a TableScan with the given context and file I/O. /// \param context Scan context including snapshot, schema, and filter. /// \param file_io File I/O instance for reading manifests and data files. - TableScan(std::unique_ptr context, std::shared_ptr file_io); + TableScan(ScanContext context, std::shared_ptr file_io); /// \brief Plans the scan tasks by resolving manifests and data files. /// @@ -97,7 +96,7 @@ class ICEBERG_EXPORT TableScan { Result> CreateManifestReader( const std::string& file_path) const; - std::unique_ptr context_; + ScanContext context_; std::shared_ptr file_io_; }; @@ -109,7 +108,7 @@ struct ICEBERG_EXPORT FileScanTask { std::optional record_count; ///< Optional number of records. DataFile::Content file_content; ///< Type of file content. FileFormatType file_format; ///< Format of the data file. - std::shared_ptr schema; ///< Projected schema. + std::shared_ptr schema; ///< Table schema. std::vector field_ids; ///< Field IDs to project. std::shared_ptr filter; ///< Filter expression to apply. }; diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0b185aae..0d9c8982 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -91,10 +91,7 @@ class LocationProvider; class SortField; class SortOrder; class Table; -class TableScan; -struct FileScanTask; class FileIO; -class TableScanBuilder; class Transaction; class Transform; class TransformFunction; @@ -112,6 +109,12 @@ class NameMapping; enum class SnapshotRefType; enum class TransformType; +class Expression; + +struct FileScanTask; +class TableScan; +class TableScanBuilder; + /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- From 28043b1e6fd41ede1ad1e3c5535eac6bf980e2cf Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Sat, 14 Jun 2025 14:08:38 +0800 Subject: [PATCH 08/23] Update src/iceberg/table_scan.h Co-authored-by: Gang Wu --- src/iceberg/table_scan.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 9076cec4..d68d4121 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -40,8 +40,7 @@ class ICEBERG_EXPORT TableScanBuilder { TableScanBuilder& WithSnapshotId(int64_t snapshot_id); /// \brief Selects columns to include in the scan. - /// Defaults to none which means select all columns - /// \param column_names A list of column names. + /// \param column_names A list of column names. If empty, all columns will be selected. /// \return Reference to the builder. TableScanBuilder& WithColumnNames(const std::vector& column_names); From fa2589136fc750cc1f8085014019d1c7fb455290 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Sat, 14 Jun 2025 14:09:14 +0800 Subject: [PATCH 09/23] Update src/iceberg/table_scan.h Co-authored-by: Gang Wu --- src/iceberg/table_scan.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index d68d4121..edbe9568 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -47,7 +47,7 @@ class ICEBERG_EXPORT TableScanBuilder { /// \brief Applies a filter expression to the scan. /// \param filter Filter expression to use. /// \return Reference to the builder. - TableScanBuilder& WithFilter(const std::shared_ptr& filter); + TableScanBuilder& WithFilter(std::shared_ptr filter); /// \brief Builds and returns a TableScan instance. /// \return A Result containing the TableScan or an error. From 812a545aa742f63aaa558e4ec378730310010c45 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Sat, 14 Jun 2025 14:49:44 +0800 Subject: [PATCH 10/23] fix comments --- src/iceberg/manifest_entry.h | 2 +- src/iceberg/manifest_reader.h | 18 +++++++++++++++ src/iceberg/table_scan.cc | 43 ++++++++++++++--------------------- src/iceberg/table_scan.h | 33 ++++++--------------------- src/iceberg/type_fwd.h | 3 ++- 5 files changed, 45 insertions(+), 54 deletions(-) diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h index 43db00ae..6fd2eb28 100644 --- a/src/iceberg/manifest_entry.h +++ b/src/iceberg/manifest_entry.h @@ -291,7 +291,7 @@ struct ICEBERG_EXPORT ManifestEntry { std::optional file_sequence_number; /// Field id: 2 /// File path, partition tuple, metrics, ... - DataFile data_file; + std::shared_ptr data_file; inline static const SchemaField kStatus = SchemaField::MakeRequired(0, "status", std::make_shared()); diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 6b81eb9b..d417ee8f 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -34,6 +34,7 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ICEBERG_EXPORT ManifestReader { public: + virtual ~ManifestReader() = default; virtual Result>> Entries() const = 0; private: @@ -43,10 +44,27 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Read manifest files from a manifest list file. class ICEBERG_EXPORT ManifestListReader { public: + virtual ~ManifestListReader() = default; virtual Result>> Files() const = 0; private: std::unique_ptr reader_; }; +/// \brief Creates a reader for the manifest list. +/// \param file_path Path to the manifest list file. +/// \return A Result containing the reader or an error. +Result> CreateManifestListReader( + const std::string& file_path) { + return NotImplemented("CreateManifestListReader is not implemented yet."); +} + +/// \brief Creates a reader for a manifest file. +/// \param file_path Path to the manifest file. +/// \return A Result containing the reader or an error. +Result> CreateManifestReader( + const std::string& file_path) { + return NotImplemented("CreateManifestReader is not implemented yet."); +} + } // namespace iceberg diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index ea86025d..147d4f5b 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -33,8 +33,8 @@ namespace iceberg { TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {} TableScanBuilder& TableScanBuilder::WithColumnNames( - const std::vector& column_names) { - column_names_ = column_names; + std::vector column_names) { + column_names_ = std::move(column_names); return *this; } @@ -43,9 +43,8 @@ TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) { return *this; } -TableScanBuilder& TableScanBuilder::WithFilter( - const std::shared_ptr& filter) { - filter_ = filter; +TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr filter) { + filter_ = std::move(filter); return *this; } @@ -56,6 +55,9 @@ Result> TableScanBuilder::Build() { } else { snapshot = table_.current_snapshot(); } + if (snapshot == nullptr) { + return InvalidArgument("No snapshot found for table {}", table_.name()); + } std::shared_ptr schema; if (snapshot->schema_id) { @@ -63,26 +65,27 @@ Result> TableScanBuilder::Build() { if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { schema = it->second; } else { - return InvalidData("Schema {} in snapshot {} is not found", *snapshot->schema_id, - snapshot->snapshot_id); + return InvalidArgument("Schema {} in snapshot {} is not found", + *snapshot->schema_id, snapshot->snapshot_id); } } else { schema = table_.schema(); } - std::vector field_ids; - field_ids.reserve(column_names_.size()); + std::vector projected_fields; + projected_fields.reserve(column_names_.size()); for (const auto& column_name : column_names_) { auto field_opt = schema->GetFieldByName(column_name); if (!field_opt) { return InvalidArgument("Column {} not found in schema", column_name); } - field_ids.emplace_back(field_opt.value().get().field_id()); + projected_fields.emplace_back(field_opt.value().get()); } + auto projected_schema = + std::make_shared(std::move(projected_fields), schema->schema_id()); TableScan::ScanContext context{.snapshot = std::move(snapshot), - .schema = std::move(schema), - .field_ids = std::move(field_ids), + .projected_schema = std::move(projected_schema), .filter = std::move(filter_)}; return std::make_unique(std::move(context), table_.io()); } @@ -103,23 +106,11 @@ Result>> TableScan::PlanFiles() const for (const auto& manifest : manifests) { const auto& data_file = manifest->data_file; - tasks.emplace_back(std::make_shared( - data_file.file_path, 0, data_file.file_size_in_bytes, data_file.record_count, - data_file.content, data_file.file_format, context_.schema, context_.field_ids, - context_.filter)); + tasks.emplace_back( + std::make_shared(data_file, 0, data_file->file_size_in_bytes)); } } return tasks; } -Result> TableScan::CreateManifestListReader( - const std::string& file_path) const { - return NotImplemented("manifest list reader"); -} - -Result> TableScan::CreateManifestReader( - const std::string& file_path) const { - return NotImplemented("manifest reader"); -} - } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index edbe9568..01d3e11b 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -42,7 +42,7 @@ class ICEBERG_EXPORT TableScanBuilder { /// \brief Selects columns to include in the scan. /// \param column_names A list of column names. If empty, all columns will be selected. /// \return Reference to the builder. - TableScanBuilder& WithColumnNames(const std::vector& column_names); + TableScanBuilder& WithColumnNames(std::vector column_names); /// \brief Applies a filter expression to the scan. /// \param filter Filter expression to use. @@ -65,10 +65,9 @@ class ICEBERG_EXPORT TableScan { public: /// \brief Scan context holding snapshot and scan-specific metadata. struct ScanContext { - std::shared_ptr snapshot; ///< Snapshot to scan. - std::shared_ptr schema; ///< Table schema. - std::vector field_ids; ///< Field IDs of selected columns. - std::shared_ptr filter; ///< Filter expression to apply. + std::shared_ptr snapshot; ///< Snapshot to scan. + std::shared_ptr projected_schema; ///< Projected schema. + std::shared_ptr filter; ///< Filter expression to apply. }; /// \brief Constructs a TableScan with the given context and file I/O. @@ -83,33 +82,15 @@ class ICEBERG_EXPORT TableScan { Result>> PlanFiles() const; private: - /// \brief Creates a reader for the manifest list. - /// \param file_path Path to the manifest list file. - /// \return A Result containing the reader or an error. - Result> CreateManifestListReader( - const std::string& file_path) const; - - /// \brief Creates a reader for a manifest file. - /// \param file_path Path to the manifest file. - /// \return A Result containing the reader or an error. - Result> CreateManifestReader( - const std::string& file_path) const; - ScanContext context_; std::shared_ptr file_io_; }; /// \brief Represents a task to scan a portion of a data file. struct ICEBERG_EXPORT FileScanTask { - std::string file_path; ///< Path to the data file. - uint64_t start; ///< Start byte offset. - uint64_t length; ///< Length in bytes to scan. - std::optional record_count; ///< Optional number of records. - DataFile::Content file_content; ///< Type of file content. - FileFormatType file_format; ///< Format of the data file. - std::shared_ptr schema; ///< Table schema. - std::vector field_ids; ///< Field IDs to project. - std::shared_ptr filter; ///< Filter expression to apply. + std::shared_ptr data_file; ///< Data file metadata. + uint64_t start; ///< Start byte offset. + uint64_t length; ///< Length in bytes to scan. }; } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index b337f29b..8a7ea663 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -131,8 +131,9 @@ class AppendFiles; struct DataFile; struct ManifestEntry; struct ManifestFile; -class ManifestReader; struct ManifestList; + +class ManifestReader; class ManifestListReader; } // namespace iceberg From 85802e92a39b7a022df187fb63e5fe31ef7bc8a7 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 23 Jun 2025 15:23:28 +0800 Subject: [PATCH 11/23] Abstract TableScan and ScanTask --- src/iceberg/snapshot.h | 1 + src/iceberg/table_scan.cc | 199 +++++++++++++++++++++++++++++++------- src/iceberg/table_scan.h | 135 ++++++++++++++++++++++---- src/iceberg/type_fwd.h | 2 +- 4 files changed, 282 insertions(+), 55 deletions(-) diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index a16522a8..feba7c8f 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -222,6 +222,7 @@ struct ICEBERG_EXPORT DataOperation { /// Snapshots are created by table operations. struct ICEBERG_EXPORT Snapshot { static constexpr int64_t kInvalidSnapshotId = -1; + static constexpr int64_t kInitialSequenceNumber = 0; /// A unique long ID. int64_t snapshot_id; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 147d4f5b..fa923239 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -30,7 +30,52 @@ namespace iceberg { -TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {} +// implement FileScanTask +FileScanTask::FileScanTask(std::shared_ptr file, + std::vector> delete_files, + int64_t start, int64_t length, + std::shared_ptr residual) + : data_file_(std::move(file)), + delete_files_(std::move(delete_files)), + start_(start), + length_(length), + residual_(std::move(residual)) {} + +const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } + +const std::vector>& FileScanTask::delete_files() const { + return delete_files_; +} + +int64_t FileScanTask::start() const { return start_; } + +int64_t FileScanTask::length() const { return length_; } + +int64_t FileScanTask::size_bytes() const { + int64_t sizeInBytes = length_; + std::ranges::for_each(delete_files_, [&sizeInBytes](const auto& delete_file) { + sizeInBytes += delete_file->file_size_in_bytes; + }); + return sizeInBytes; +} + +int32_t FileScanTask::files_count() const { + return static_cast(delete_files_.size() + 1); +} + +int64_t FileScanTask::estimated_row_count() const { + const double scannedFileFraction = + static_cast(length_) / data_file_->file_size_in_bytes; + return static_cast(scannedFileFraction * data_file_->record_count); +} + +const std::shared_ptr& FileScanTask::residual() const { return residual_; } + +TableScanBuilder::TableScanBuilder(const Table& table, + std::shared_ptr table_metadata) + : table_(table) { + context_.table_metadata = std::move(table_metadata); +} TableScanBuilder& TableScanBuilder::WithColumnNames( std::vector column_names) { @@ -38,13 +83,33 @@ TableScanBuilder& TableScanBuilder::WithColumnNames( return *this; } +TableScanBuilder& TableScanBuilder::WithSchema(std::shared_ptr schema) { + context_.projected_schema = std::move(schema); + return *this; +} + TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) { snapshot_id_ = snapshot_id; return *this; } TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr filter) { - filter_ = std::move(filter); + context_.filter = std::move(filter); + return *this; +} + +TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) { + context_.case_sensitive = case_sensitive; + return *this; +} + +TableScanBuilder& TableScanBuilder::WithOption(std::string property, std::string value) { + context_.options[std::move(property)] = std::move(value); + return *this; +} + +TableScanBuilder& TableScanBuilder::WithLimit(std::optional limit) { + context_.limit = limit; return *this; } @@ -58,59 +123,127 @@ Result> TableScanBuilder::Build() { if (snapshot == nullptr) { return InvalidArgument("No snapshot found for table {}", table_.name()); } - - std::shared_ptr schema; - if (snapshot->schema_id) { - const auto& schemas = table_.schemas(); - if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { - schema = it->second; + context_.snapshot = std::move(snapshot); + + if (!context_.projected_schema) { + std::shared_ptr schema; + if (snapshot->schema_id) { + const auto& schemas = table_.schemas(); + if (const auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { + schema = it->second; + } else { + return InvalidArgument("Schema {} in snapshot {} is not found", + *snapshot->schema_id, snapshot->snapshot_id); + } } else { - return InvalidArgument("Schema {} in snapshot {} is not found", - *snapshot->schema_id, snapshot->snapshot_id); + schema = table_.schema(); } - } else { - schema = table_.schema(); - } - std::vector projected_fields; - projected_fields.reserve(column_names_.size()); - for (const auto& column_name : column_names_) { - auto field_opt = schema->GetFieldByName(column_name); - if (!field_opt) { - return InvalidArgument("Column {} not found in schema", column_name); + // TODO(gty404): collect touched columns from filter expression + std::vector projected_fields; + projected_fields.reserve(column_names_.size()); + for (const auto& column_name : column_names_) { + // TODO(gty404): support case-insensitive column names + auto field_opt = schema->GetFieldByName(column_name); + if (!field_opt) { + return InvalidArgument("Column {} not found in schema", column_name); + } + projected_fields.emplace_back(field_opt.value().get()); } - projected_fields.emplace_back(field_opt.value().get()); + + context_.projected_schema = + std::make_shared(std::move(projected_fields), schema->schema_id()); } - auto projected_schema = - std::make_shared(std::move(projected_fields), schema->schema_id()); - TableScan::ScanContext context{.snapshot = std::move(snapshot), - .projected_schema = std::move(projected_schema), - .filter = std::move(filter_)}; - return std::make_unique(std::move(context), table_.io()); + return std::make_unique(std::move(context_), table_.io()); } -TableScan::TableScan(ScanContext context, std::shared_ptr file_io) +TableScan::TableScan(TableScanContext context, std::shared_ptr file_io) : context_(std::move(context)), file_io_(std::move(file_io)) {} -Result>> TableScan::PlanFiles() const { +const std::shared_ptr& TableScan::snapshot() const { return context_.snapshot; } + +const std::shared_ptr& TableScan::projection() const { + return context_.projected_schema; +} + +const TableScanContext& TableScan::context() const { return context_; } + +const std::shared_ptr& TableScan::io() const { return file_io_; } + +DataScan::DataScan(TableScanContext context, std::shared_ptr file_io) + : TableScan(std::move(context), std::move(file_io)) {} + +Result>> DataScan::PlanFiles() const { ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, CreateManifestListReader(context_.snapshot->manifest_list)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); - std::vector> tasks; + std::vector> data_entries; + std::vector> positional_delete_entries; for (const auto& manifest_file : manifest_files) { ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, CreateManifestReader(manifest_file->manifest_path)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); - for (const auto& manifest : manifests) { - const auto& data_file = manifest->data_file; - tasks.emplace_back( - std::make_shared(data_file, 0, data_file->file_size_in_bytes)); + // TODO(gty404): filter manifests using partition spec and filter expression + + for (auto& manifest_entry : manifests) { + const auto& data_file = manifest_entry->data_file; + switch (data_file->content) { + case DataFile::Content::kData: + data_entries.push_back(std::move(manifest_entry)); + break; + case DataFile::Content::kPositionDeletes: + // TODO(gty404): check if the sequence number is greater than or equal to the + // minimum sequence number of all manifest entries + positional_delete_entries.push_back(std::move(manifest_entry)); + break; + case DataFile::Content::kEqualityDeletes: + return NotSupported("Equality deletes are not supported in data scan"); + } } } + + // TODO(gty404): build residual expression from filter + std::shared_ptr residual; + + std::vector> tasks; + for (const auto& data_entry : data_entries) { + auto matched_deletes = GetMatchedDeletes(*data_entry, positional_delete_entries); + const auto& data_file = data_entry->data_file; + tasks.emplace_back(std::make_shared( + data_file, matched_deletes, 0, data_file->file_size_in_bytes, residual)); + } return tasks; } +std::vector> DataScan::GetMatchedDeletes( + const ManifestEntry& data_entry, + const std::vector>& positional_delete_entries) const { + auto data_sequence_number = + data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); + std::vector relevant_entries; + // TODO(gty404): consider using a more efficient data structure + for (const auto& delete_entry : positional_delete_entries) { + const int64_t delete_sequence_number = + delete_entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); + if (delete_sequence_number >= data_sequence_number) { + relevant_entries.push_back(delete_entry.get()); + } + } + + std::vector> matched_deletes; + if (relevant_entries.empty()) { + return matched_deletes; + } + + matched_deletes.reserve(relevant_entries.size()); + for (const auto& delete_entry : relevant_entries) { + // TODO(gty404): check if the delete entry contains the data entry's file path + matched_deletes.emplace_back(delete_entry->data_file); + } + return matched_deletes; +} + } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 01d3e11b..3f3117d6 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -27,12 +27,76 @@ namespace iceberg { +class ICEBERG_EXPORT ScanTask { + public: + virtual ~ScanTask() = default; + + /// \brief The number of bytes that should be read by this scan task. + virtual int64_t size_bytes() const = 0; + + /// \brief The number of files that should be read by this scan task. + virtual int32_t files_count() const = 0; + + /// \brief The number of rows that should be read by this scan task. + virtual int64_t estimated_row_count() const = 0; +}; + +/// \brief Represents a task to scan a portion of a data file. +class ICEBERG_EXPORT FileScanTask : public ScanTask { + public: + FileScanTask(std::shared_ptr file, + std::vector> delete_files, int64_t start, + int64_t length, std::shared_ptr residual); + + /// \brief The data file that should be read by this scan task. + virtual const std::shared_ptr& data_file() const; + + /// \brief The delete files that should be read by this scan task. + const std::vector>& delete_files() const; + + /// \brief The byte offset in the data file where the scan should start. + int64_t start() const; + + /// \brief The length in bytes to scan from the start offset. + int64_t length() const; + + /// \brief The residual expression to apply after scanning the data file. + const std::shared_ptr& residual() const; + + int64_t size_bytes() const override; + int32_t files_count() const override; + int64_t estimated_row_count() const override; + + private: + std::shared_ptr data_file_; ///< Data file metadata. + std::vector> delete_files_; ///< Delete files metadata. + + int64_t start_; ///< Start byte offset. + int64_t length_; ///< Length in bytes to scan. + + std::shared_ptr residual_; ///< Residual expression to apply. +}; + +/// \brief Scan context holding snapshot and scan-specific metadata. +struct TableScanContext { + std::shared_ptr table_metadata; ///< Table metadata. + std::shared_ptr snapshot; ///< Snapshot to scan. + std::shared_ptr projected_schema; ///< Projected schema. + std::shared_ptr filter; ///< Filter expression to apply. + bool case_sensitive = false; ///< Whether the scan is case-sensitive. + std::unordered_map + options; ///< Additional options for the scan. + std::optional limit; ///< Optional limit on the number of rows to scan. +}; + /// \brief Builder class for creating TableScan instances. class ICEBERG_EXPORT TableScanBuilder { public: /// \brief Constructs a TableScanBuilder for the given table. - /// \param table Reference to the table to scan. - explicit TableScanBuilder(const Table& table); + /// \param table The table to scan. + /// \param table_metadata The metadata of the table to scan. + explicit TableScanBuilder(const Table& table, + std::shared_ptr table_metadata); /// \brief Sets the snapshot ID to scan. /// \param snapshot_id The ID of the snapshot. @@ -44,53 +108,82 @@ class ICEBERG_EXPORT TableScanBuilder { /// \return Reference to the builder. TableScanBuilder& WithColumnNames(std::vector column_names); + /// \brief Sets the schema to use for the scan. + /// \param schema The schema to use. + /// \return Reference to the builder. + TableScanBuilder& WithSchema(std::shared_ptr schema); + /// \brief Applies a filter expression to the scan. /// \param filter Filter expression to use. /// \return Reference to the builder. TableScanBuilder& WithFilter(std::shared_ptr filter); + /// \brief Sets whether the scan should be case-sensitive. + /// \param case_sensitive Whether the scan is case-sensitive. + /// /return Reference to the builder. + TableScanBuilder& WithCaseSensitive(bool case_sensitive); + + /// \brief Sets an option for the scan. + /// \param property The name of the option. + /// \param value The value of the option. + /// \return Reference to the builder. + TableScanBuilder& WithOption(std::string property, std::string value); + + /// \brief Sets an optional limit on the number of rows to scan. + /// \param limit Optional limit on the number of rows. + /// \return Reference to the builder. + TableScanBuilder& WithLimit(std::optional limit); + /// \brief Builds and returns a TableScan instance. /// \return A Result containing the TableScan or an error. Result> Build(); private: - const Table& table_; + const Table& table_; ///< Reference to the table to scan. std::vector column_names_; std::optional snapshot_id_; - std::shared_ptr filter_; + TableScanContext context_; ///< Context for the scan. }; /// \brief Represents a configured scan operation on a table. class ICEBERG_EXPORT TableScan { public: - /// \brief Scan context holding snapshot and scan-specific metadata. - struct ScanContext { - std::shared_ptr snapshot; ///< Snapshot to scan. - std::shared_ptr projected_schema; ///< Projected schema. - std::shared_ptr filter; ///< Filter expression to apply. - }; + virtual ~TableScan() = default; /// \brief Constructs a TableScan with the given context and file I/O. /// \param context Scan context including snapshot, schema, and filter. /// \param file_io File I/O instance for reading manifests and data files. - TableScan(ScanContext context, std::shared_ptr file_io); + TableScan(TableScanContext context, std::shared_ptr file_io); + + const std::shared_ptr& snapshot() const; + + const std::shared_ptr& projection() const; + + const TableScanContext& context() const; + + const std::shared_ptr& io() const; /// \brief Plans the scan tasks by resolving manifests and data files. - /// - /// Returns a list of file scan tasks if successful. /// \return A Result containing scan tasks or an error. - Result>> PlanFiles() const; + virtual Result>> PlanFiles() const = 0; - private: - ScanContext context_; + protected: + const TableScanContext context_; std::shared_ptr file_io_; }; -/// \brief Represents a task to scan a portion of a data file. -struct ICEBERG_EXPORT FileScanTask { - std::shared_ptr data_file; ///< Data file metadata. - uint64_t start; ///< Start byte offset. - uint64_t length; ///< Length in bytes to scan. +class ICEBERG_EXPORT DataScan : public TableScan { + public: + DataScan(TableScanContext context, std::shared_ptr file_io); + + /// \brief Plans the scan tasks by resolving manifests and data files. + /// \return A Result containing scan tasks or an error. + Result>> PlanFiles() const override; + + private: + std::vector> GetMatchedDeletes( + const ManifestEntry& data_entry, + const std::vector>& positional_delete_entries) const; }; } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 8a7ea663..27592201 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -112,7 +112,7 @@ enum class TransformType; class Expression; -struct FileScanTask; +class FileScanTask; class TableScan; class TableScanBuilder; From c7621b33e282fcdf95236d1fa954d8addd0378c9 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 23 Jun 2025 15:59:19 +0800 Subject: [PATCH 12/23] fix lint --- src/iceberg/table_scan.cc | 52 ++++++++++++++++++++++++++++----------- src/iceberg/table_scan.h | 21 +++++++++++++--- 2 files changed, 56 insertions(+), 17 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index fa923239..5b1949e8 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,6 +19,8 @@ #include "iceberg/table_scan.h" +#include + #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" @@ -64,6 +66,9 @@ int32_t FileScanTask::files_count() const { } int64_t FileScanTask::estimated_row_count() const { + if (data_file_->file_size_in_bytes == 0) { + return 0; + } const double scannedFileFraction = static_cast(length_) / data_file_->file_size_in_bytes; return static_cast(scannedFileFraction * data_file_->record_count); @@ -79,6 +84,7 @@ TableScanBuilder::TableScanBuilder(const Table& table, TableScanBuilder& TableScanBuilder::WithColumnNames( std::vector column_names) { + column_names_.reserve(column_names.size()); column_names_ = std::move(column_names); return *this; } @@ -205,34 +211,52 @@ Result>> DataScan::PlanFiles() const { } } + DeleteFileIndex delete_file_index; + delete_file_index.BuildIndex(positional_delete_entries); + // TODO(gty404): build residual expression from filter std::shared_ptr residual; - std::vector> tasks; for (const auto& data_entry : data_entries) { - auto matched_deletes = GetMatchedDeletes(*data_entry, positional_delete_entries); + auto matched_deletes = GetMatchedDeletes(*data_entry, delete_file_index); const auto& data_file = data_entry->data_file; tasks.emplace_back(std::make_shared( - data_file, matched_deletes, 0, data_file->file_size_in_bytes, residual)); + data_file, std::move(matched_deletes), 0, data_file->file_size_in_bytes, + std::move(residual))); } return tasks; } -std::vector> DataScan::GetMatchedDeletes( - const ManifestEntry& data_entry, - const std::vector>& positional_delete_entries) const { +void DataScan::DeleteFileIndex::BuildIndex( + const std::vector>& entries) { + sequence_index.clear(); + + for (const auto& entry : entries) { + const int64_t seq_num = + entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); + sequence_index.emplace(seq_num, entry.get()); + } +} + +std::vector DataScan::DeleteFileIndex::FindRelevantEntries( + const ManifestEntry& data_entry) const { + std::vector relevant_deletes; + + // Use lower_bound for efficient range search auto data_sequence_number = data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); - std::vector relevant_entries; - // TODO(gty404): consider using a more efficient data structure - for (const auto& delete_entry : positional_delete_entries) { - const int64_t delete_sequence_number = - delete_entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); - if (delete_sequence_number >= data_sequence_number) { - relevant_entries.push_back(delete_entry.get()); - } + for (auto it = sequence_index.lower_bound(data_sequence_number); + it != sequence_index.end(); ++it) { + // Additional filtering logic here + relevant_deletes.push_back(it->second); } + return relevant_deletes; +} + +std::vector> DataScan::GetMatchedDeletes( + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) const { + const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); std::vector> matched_deletes; if (relevant_entries.empty()) { return matched_deletes; diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 3f3117d6..875a64e4 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -49,7 +49,7 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { int64_t length, std::shared_ptr residual); /// \brief The data file that should be read by this scan task. - virtual const std::shared_ptr& data_file() const; + const std::shared_ptr& data_file() const; /// \brief The delete files that should be read by this scan task. const std::vector>& delete_files() const; @@ -155,12 +155,20 @@ class ICEBERG_EXPORT TableScan { /// \param file_io File I/O instance for reading manifests and data files. TableScan(TableScanContext context, std::shared_ptr file_io); + /// \brief Returns the snapshot being scanned. + /// \return A shared pointer to the snapshot. const std::shared_ptr& snapshot() const; + /// \brief Returns the projected schema for the scan. + /// \return A shared pointer to the projected schema. const std::shared_ptr& projection() const; + /// \brief Returns the scan context. + /// \return A reference to the TableScanContext. const TableScanContext& context() const; + /// \brief Returns the file I/O instance used for reading manifests and data files. + /// \return A shared pointer to the FileIO instance. const std::shared_ptr& io() const; /// \brief Plans the scan tasks by resolving manifests and data files. @@ -181,9 +189,16 @@ class ICEBERG_EXPORT DataScan : public TableScan { Result>> PlanFiles() const override; private: + // Use indexed data structures for efficient lookups + struct DeleteFileIndex { + // Index by sequence number for quick filtering + std::multimap sequence_index; + void BuildIndex(const std::vector>& entries); + std::vector FindRelevantEntries( + const ManifestEntry& data_entry) const; + }; std::vector> GetMatchedDeletes( - const ManifestEntry& data_entry, - const std::vector>& positional_delete_entries) const; + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) const; }; } // namespace iceberg From e1267fcadb781fd62b22acaaf3c3703eac75f4f9 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 23 Jun 2025 16:05:14 +0800 Subject: [PATCH 13/23] fix lint --- src/iceberg/table_scan.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 5b1949e8..4c54a76e 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,6 +19,7 @@ #include "iceberg/table_scan.h" +#include #include #include "iceberg/manifest_entry.h" From 5248e2240a3a5d5a35bb6628ad9e3e3202ff961b Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 23 Jun 2025 16:24:06 +0800 Subject: [PATCH 14/23] fix lint --- src/iceberg/table_scan.cc | 11 +++++------ src/iceberg/table_scan.h | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 4c54a76e..16bb37b6 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -121,19 +121,18 @@ TableScanBuilder& TableScanBuilder::WithLimit(std::optional limit) { } Result> TableScanBuilder::Build() { - std::shared_ptr snapshot; if (snapshot_id_) { - ICEBERG_ASSIGN_OR_RAISE(snapshot, table_.snapshot(*snapshot_id_)); + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.snapshot(*snapshot_id_)); } else { - snapshot = table_.current_snapshot(); + context_.snapshot = table_.current_snapshot(); } - if (snapshot == nullptr) { + if (context_.snapshot == nullptr) { return InvalidArgument("No snapshot found for table {}", table_.name()); } - context_.snapshot = std::move(snapshot); if (!context_.projected_schema) { std::shared_ptr schema; + const auto& snapshot = context_.snapshot; if (snapshot->schema_id) { const auto& schemas = table_.schemas(); if (const auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { @@ -256,7 +255,7 @@ std::vector DataScan::DeleteFileIndex::FindRelevantEntries( } std::vector> DataScan::GetMatchedDeletes( - const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) const { + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) { const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); std::vector> matched_deletes; if (relevant_entries.empty()) { diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 875a64e4..1801d47c 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -197,8 +197,8 @@ class ICEBERG_EXPORT DataScan : public TableScan { std::vector FindRelevantEntries( const ManifestEntry& data_entry) const; }; - std::vector> GetMatchedDeletes( - const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) const; + static std::vector> GetMatchedDeletes( + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index); }; } // namespace iceberg From 29e886574a5660f25c155fc7182868853e2ea4d6 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 30 Jun 2025 10:26:40 +0800 Subject: [PATCH 15/23] resolve some comments --- src/iceberg/table.cc | 5 ++ src/iceberg/table.h | 2 +- src/iceberg/table_scan.cc | 132 +++++++++++++++++++++----------------- src/iceberg/table_scan.h | 64 +++++++++--------- 4 files changed, 113 insertions(+), 90 deletions(-) diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index c79f3786..bbc77e13 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -23,6 +23,7 @@ #include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_scan.h" namespace iceberg { @@ -107,4 +108,8 @@ const std::vector& Table::history() const { const std::shared_ptr& Table::io() const { return io_; } +std::unique_ptr Table::NewScan() const { + return std::make_unique(*this, metadata_); +} + } // namespace iceberg diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 8174ac0e..9a89057b 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -108,7 +108,7 @@ class ICEBERG_EXPORT Table { /// /// Once a table scan builder is created, it can be refined to project columns and /// filter data. - virtual std::unique_ptr NewScan() const = 0; + virtual std::unique_ptr NewScan() const; /// \brief Returns a FileIO to read and write table data and metadata files const std::shared_ptr& io() const; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 16bb37b6..19b7b099 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -33,6 +33,58 @@ namespace iceberg { +namespace { +/// \brief Use indexed data structures for efficient lookups +struct DeleteFileIndex { + /// \brief Index by sequence number for quick filtering + std::multimap sequence_index; + + /// \brief Build the index from a list of manifest entries. + void BuildIndex(const std::vector>& entries) { + sequence_index.clear(); + + for (const auto& entry : entries) { + const int64_t seq_num = + entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); + sequence_index.emplace(seq_num, entry.get()); + } + } + + /// \brief Find delete files that match the sequence number of a data entry. + std::vector FindRelevantEntries(const ManifestEntry& data_entry) const { + std::vector relevant_deletes; + + // Use lower_bound for efficient range search + auto data_sequence_number = + data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); + for (auto it = sequence_index.lower_bound(data_sequence_number); + it != sequence_index.end(); ++it) { + // Additional filtering logic here + relevant_deletes.push_back(it->second); + } + + return relevant_deletes; + } +}; + +/// \brief Get matched delete files for a given data entry. +std::vector> GetMatchedDeletes( + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) { + const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); + std::vector> matched_deletes; + if (relevant_entries.empty()) { + return matched_deletes; + } + + matched_deletes.reserve(relevant_entries.size()); + for (const auto& delete_entry : relevant_entries) { + // TODO(gty404): check if the delete entry contains the data entry's file path + matched_deletes.emplace_back(delete_entry->data_file); + } + return matched_deletes; +} +} // namespace + // implement FileScanTask FileScanTask::FileScanTask(std::shared_ptr file, std::vector> delete_files, @@ -122,19 +174,19 @@ TableScanBuilder& TableScanBuilder::WithLimit(std::optional limit) { Result> TableScanBuilder::Build() { if (snapshot_id_) { - ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.snapshot(*snapshot_id_)); + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.SnapshotById(*snapshot_id_)); } else { - context_.snapshot = table_.current_snapshot(); + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.current_snapshot()); } if (context_.snapshot == nullptr) { - return InvalidArgument("No snapshot found for table {}", table_.name()); + return InvalidArgument("No snapshot found for table {}", table_.name().name); } if (!context_.projected_schema) { std::shared_ptr schema; const auto& snapshot = context_.snapshot; if (snapshot->schema_id) { - const auto& schemas = table_.schemas(); + const auto& schemas = *table_.schemas(); if (const auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { schema = it->second; } else { @@ -142,23 +194,26 @@ Result> TableScanBuilder::Build() { *snapshot->schema_id, snapshot->snapshot_id); } } else { - schema = table_.schema(); + ICEBERG_ASSIGN_OR_RAISE(schema, table_.schema()); } - // TODO(gty404): collect touched columns from filter expression - std::vector projected_fields; - projected_fields.reserve(column_names_.size()); - for (const auto& column_name : column_names_) { - // TODO(gty404): support case-insensitive column names - auto field_opt = schema->GetFieldByName(column_name); - if (!field_opt) { - return InvalidArgument("Column {} not found in schema", column_name); + if (column_names_.empty()) { + context_.projected_schema = schema; + } else { + // TODO(gty404): collect touched columns from filter expression + std::vector projected_fields; + projected_fields.reserve(column_names_.size()); + for (const auto& column_name : column_names_) { + // TODO(gty404): support case-insensitive column names + auto field_opt = schema->GetFieldByName(column_name); + if (!field_opt) { + return InvalidArgument("Column {} not found in schema", column_name); + } + projected_fields.emplace_back(field_opt.value().get()); } - projected_fields.emplace_back(field_opt.value().get()); + context_.projected_schema = + std::make_shared(std::move(projected_fields), schema->schema_id()); } - - context_.projected_schema = - std::make_shared(std::move(projected_fields), schema->schema_id()); } return std::make_unique(std::move(context_), table_.io()); @@ -227,47 +282,4 @@ Result>> DataScan::PlanFiles() const { return tasks; } -void DataScan::DeleteFileIndex::BuildIndex( - const std::vector>& entries) { - sequence_index.clear(); - - for (const auto& entry : entries) { - const int64_t seq_num = - entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); - sequence_index.emplace(seq_num, entry.get()); - } -} - -std::vector DataScan::DeleteFileIndex::FindRelevantEntries( - const ManifestEntry& data_entry) const { - std::vector relevant_deletes; - - // Use lower_bound for efficient range search - auto data_sequence_number = - data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); - for (auto it = sequence_index.lower_bound(data_sequence_number); - it != sequence_index.end(); ++it) { - // Additional filtering logic here - relevant_deletes.push_back(it->second); - } - - return relevant_deletes; -} - -std::vector> DataScan::GetMatchedDeletes( - const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) { - const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); - std::vector> matched_deletes; - if (relevant_entries.empty()) { - return matched_deletes; - } - - matched_deletes.reserve(relevant_entries.size()); - for (const auto& delete_entry : relevant_entries) { - // TODO(gty404): check if the delete entry contains the data entry's file path - matched_deletes.emplace_back(delete_entry->data_file); - } - return matched_deletes; -} - } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 1801d47c..6a89cd1a 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -27,6 +27,7 @@ namespace iceberg { +/// \brief Represents a task to scan a table or a portion of it. class ICEBERG_EXPORT ScanTask { public: virtual ~ScanTask() = default; @@ -68,25 +69,34 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { int64_t estimated_row_count() const override; private: - std::shared_ptr data_file_; ///< Data file metadata. - std::vector> delete_files_; ///< Delete files metadata. - - int64_t start_; ///< Start byte offset. - int64_t length_; ///< Length in bytes to scan. - - std::shared_ptr residual_; ///< Residual expression to apply. + /// \brief Data file metadata. + std::shared_ptr data_file_; + /// \brief Delete files metadata. + std::vector> delete_files_; + /// \brief Start byte offset. + int64_t start_; + /// \brief Length in bytes to scan. + int64_t length_; + /// \brief Residual expression to apply. + std::shared_ptr residual_; }; /// \brief Scan context holding snapshot and scan-specific metadata. struct TableScanContext { - std::shared_ptr table_metadata; ///< Table metadata. - std::shared_ptr snapshot; ///< Snapshot to scan. - std::shared_ptr projected_schema; ///< Projected schema. - std::shared_ptr filter; ///< Filter expression to apply. - bool case_sensitive = false; ///< Whether the scan is case-sensitive. - std::unordered_map - options; ///< Additional options for the scan. - std::optional limit; ///< Optional limit on the number of rows to scan. + /// \brief Table metadata. + std::shared_ptr table_metadata; + /// \brief Snapshot to scan. + std::shared_ptr snapshot; + /// \brief Projected schema. + std::shared_ptr projected_schema; + /// \brief Filter expression to apply. + std::shared_ptr filter; + /// \brief Whether the scan is case-sensitive. + bool case_sensitive = false; + /// \brief Additional options for the scan. + std::unordered_map options; + /// \brief Optional limit on the number of rows to scan. + std::optional limit; }; /// \brief Builder class for creating TableScan instances. @@ -139,10 +149,14 @@ class ICEBERG_EXPORT TableScanBuilder { Result> Build(); private: - const Table& table_; ///< Reference to the table to scan. + /// \brief Reference to the table to scan. + const Table& table_; + /// \brief column names to project in the scan. std::vector column_names_; + /// \brief snapshot ID to scan, if specified. std::optional snapshot_id_; - TableScanContext context_; ///< Context for the scan. + /// \brief Context for the scan, including snapshot, schema, and filter. + TableScanContext context_; }; /// \brief Represents a configured scan operation on a table. @@ -176,29 +190,21 @@ class ICEBERG_EXPORT TableScan { virtual Result>> PlanFiles() const = 0; protected: + /// \brief context for the scan, including snapshot, schema, and filter. const TableScanContext context_; + /// \brief File I/O instance for reading manifests and data files. std::shared_ptr file_io_; }; +/// \brief A scan that reads data files and applies delete files to filter rows. class ICEBERG_EXPORT DataScan : public TableScan { public: + /// \brief Constructs a DataScan with the given context and file I/O. DataScan(TableScanContext context, std::shared_ptr file_io); /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. Result>> PlanFiles() const override; - - private: - // Use indexed data structures for efficient lookups - struct DeleteFileIndex { - // Index by sequence number for quick filtering - std::multimap sequence_index; - void BuildIndex(const std::vector>& entries); - std::vector FindRelevantEntries( - const ManifestEntry& data_entry) const; - }; - static std::vector> GetMatchedDeletes( - const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index); }; } // namespace iceberg From ae560f39a6ba24ffc5c5b3e3872980e8f203be13 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 30 Jun 2025 18:01:11 +0800 Subject: [PATCH 16/23] remove Snapshot::kInitialSequenceNumber --- src/iceberg/snapshot.h | 1 - src/iceberg/table_scan.cc | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 1a76262e..c52feefb 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -225,7 +225,6 @@ struct ICEBERG_EXPORT DataOperation { /// Snapshots are created by table operations. struct ICEBERG_EXPORT Snapshot { static constexpr int64_t kInvalidSnapshotId = -1; - static constexpr int64_t kInitialSequenceNumber = 0; /// A unique long ID. int64_t snapshot_id; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 19b7b099..eb79490d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -29,6 +29,7 @@ #include "iceberg/schema_field.h" #include "iceberg/snapshot.h" #include "iceberg/table.h" +#include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -45,7 +46,7 @@ struct DeleteFileIndex { for (const auto& entry : entries) { const int64_t seq_num = - entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); + entry->sequence_number.value_or(TableMetadata::kInitialSequenceNumber); sequence_index.emplace(seq_num, entry.get()); } } @@ -56,7 +57,7 @@ struct DeleteFileIndex { // Use lower_bound for efficient range search auto data_sequence_number = - data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); + data_entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber); for (auto it = sequence_index.lower_bound(data_sequence_number); it != sequence_index.end(); ++it) { // Additional filtering logic here From 0ff952bb8d3836365cb37bd7342a864f9b493d23 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Tue, 1 Jul 2025 23:02:11 +0800 Subject: [PATCH 17/23] resolve some comments --- src/iceberg/table.cc | 2 +- src/iceberg/table_scan.cc | 78 +++++++++++++++++++++------------------ src/iceberg/table_scan.h | 28 +++++++------- 3 files changed, 58 insertions(+), 50 deletions(-) diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index bbc77e13..a71e218b 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -109,7 +109,7 @@ const std::vector& Table::history() const { const std::shared_ptr& Table::io() const { return io_; } std::unique_ptr Table::NewScan() const { - return std::make_unique(*this, metadata_); + return std::make_unique(metadata_, io_); } } // namespace iceberg diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index eb79490d..16b31091 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -28,7 +28,6 @@ #include "iceberg/schema.h" #include "iceberg/schema_field.h" #include "iceberg/snapshot.h" -#include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" @@ -36,14 +35,10 @@ namespace iceberg { namespace { /// \brief Use indexed data structures for efficient lookups -struct DeleteFileIndex { - /// \brief Index by sequence number for quick filtering - std::multimap sequence_index; - +class DeleteFileIndex { + public: /// \brief Build the index from a list of manifest entries. - void BuildIndex(const std::vector>& entries) { - sequence_index.clear(); - + explicit DeleteFileIndex(const std::vector>& entries) { for (const auto& entry : entries) { const int64_t seq_num = entry->sequence_number.value_or(TableMetadata::kInitialSequenceNumber); @@ -66,6 +61,10 @@ struct DeleteFileIndex { return relevant_deletes; } + + private: + /// \brief Index by sequence number for quick filtering + std::multimap sequence_index; }; /// \brief Get matched delete files for a given data entry. @@ -107,7 +106,7 @@ int64_t FileScanTask::start() const { return start_; } int64_t FileScanTask::length() const { return length_; } -int64_t FileScanTask::size_bytes() const { +int64_t FileScanTask::SizeBytes() const { int64_t sizeInBytes = length_; std::ranges::for_each(delete_files_, [&sizeInBytes](const auto& delete_file) { sizeInBytes += delete_file->file_size_in_bytes; @@ -115,11 +114,11 @@ int64_t FileScanTask::size_bytes() const { return sizeInBytes; } -int32_t FileScanTask::files_count() const { +int32_t FileScanTask::FilesCount() const { return static_cast(delete_files_.size() + 1); } -int64_t FileScanTask::estimated_row_count() const { +int64_t FileScanTask::EstimatedRowCount() const { if (data_file_->file_size_in_bytes == 0) { return 0; } @@ -130,9 +129,9 @@ int64_t FileScanTask::estimated_row_count() const { const std::shared_ptr& FileScanTask::residual() const { return residual_; } -TableScanBuilder::TableScanBuilder(const Table& table, - std::shared_ptr table_metadata) - : table_(table) { +TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, + std::shared_ptr file_io) + : file_io_(std::move(file_io)) { context_.table_metadata = std::move(table_metadata); } @@ -143,7 +142,7 @@ TableScanBuilder& TableScanBuilder::WithColumnNames( return *this; } -TableScanBuilder& TableScanBuilder::WithSchema(std::shared_ptr schema) { +TableScanBuilder& TableScanBuilder::WithProjectedSchema(std::shared_ptr schema) { context_.projected_schema = std::move(schema); return *this; } @@ -174,29 +173,39 @@ TableScanBuilder& TableScanBuilder::WithLimit(std::optional limit) { } Result> TableScanBuilder::Build() { - if (snapshot_id_) { - ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.SnapshotById(*snapshot_id_)); - } else { - ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.current_snapshot()); + const auto& table_metadata = context_.table_metadata; + auto snapshot_id = snapshot_id_ ? snapshot_id_ : table_metadata->current_snapshot_id; + if (!snapshot_id) { + return InvalidArgument("No snapshot ID specified for table {}", + table_metadata->table_uuid); } - if (context_.snapshot == nullptr) { - return InvalidArgument("No snapshot found for table {}", table_.name().name); + auto iter = std::ranges::find_if(table_metadata->snapshots, + [&snapshot_id](const auto& snapshot) { + return snapshot->snapshot_id == *snapshot_id; + }); + if (iter == table_metadata->snapshots.end() || *iter == nullptr) { + return NotFound("Snapshot with ID {} is not found", *snapshot_id); } + context_.snapshot = *iter; if (!context_.projected_schema) { - std::shared_ptr schema; const auto& snapshot = context_.snapshot; - if (snapshot->schema_id) { - const auto& schemas = *table_.schemas(); - if (const auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { - schema = it->second; - } else { - return InvalidArgument("Schema {} in snapshot {} is not found", - *snapshot->schema_id, snapshot->snapshot_id); - } - } else { - ICEBERG_ASSIGN_OR_RAISE(schema, table_.schema()); + auto schema_id = + snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id; + if (!schema_id) { + return InvalidArgument("No schema ID found in snapshot {} for table {}", + snapshot->snapshot_id, table_metadata->table_uuid); + } + + const auto& schemas = table_metadata->schemas; + const auto it = std::ranges::find_if(schemas, [&schema_id](const auto& schema) { + return schema->schema_id() == *schema_id; + }); + if (it == schemas.end()) { + return InvalidArgument("Schema {} in snapshot {} is not found", + *snapshot->schema_id, snapshot->snapshot_id); } + auto schema = *it; if (column_names_.empty()) { context_.projected_schema = schema; @@ -217,7 +226,7 @@ Result> TableScanBuilder::Build() { } } - return std::make_unique(std::move(context_), table_.io()); + return std::make_unique(std::move(context_), file_io_); } TableScan::TableScan(TableScanContext context, std::shared_ptr file_io) @@ -267,8 +276,7 @@ Result>> DataScan::PlanFiles() const { } } - DeleteFileIndex delete_file_index; - delete_file_index.BuildIndex(positional_delete_entries); + DeleteFileIndex delete_file_index(positional_delete_entries); // TODO(gty404): build residual expression from filter std::shared_ptr residual; diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 6a89cd1a..9611e9eb 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -27,22 +27,22 @@ namespace iceberg { -/// \brief Represents a task to scan a table or a portion of it. +/// \brief An abstract scan task. class ICEBERG_EXPORT ScanTask { public: virtual ~ScanTask() = default; /// \brief The number of bytes that should be read by this scan task. - virtual int64_t size_bytes() const = 0; + virtual int64_t SizeBytes() const = 0; /// \brief The number of files that should be read by this scan task. - virtual int32_t files_count() const = 0; + virtual int32_t FilesCount() const = 0; /// \brief The number of rows that should be read by this scan task. - virtual int64_t estimated_row_count() const = 0; + virtual int64_t EstimatedRowCount() const = 0; }; -/// \brief Represents a task to scan a portion of a data file. +/// \brief Task representing a data file and its corresponding delete files. class ICEBERG_EXPORT FileScanTask : public ScanTask { public: FileScanTask(std::shared_ptr file, @@ -64,9 +64,9 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { /// \brief The residual expression to apply after scanning the data file. const std::shared_ptr& residual() const; - int64_t size_bytes() const override; - int32_t files_count() const override; - int64_t estimated_row_count() const override; + int64_t SizeBytes() const override; + int32_t FilesCount() const override; + int64_t EstimatedRowCount() const override; private: /// \brief Data file metadata. @@ -103,10 +103,10 @@ struct TableScanContext { class ICEBERG_EXPORT TableScanBuilder { public: /// \brief Constructs a TableScanBuilder for the given table. - /// \param table The table to scan. /// \param table_metadata The metadata of the table to scan. - explicit TableScanBuilder(const Table& table, - std::shared_ptr table_metadata); + /// \param file_io The FileIO instance for reading manifests and data files. + explicit TableScanBuilder(std::shared_ptr table_metadata, + std::shared_ptr file_io); /// \brief Sets the snapshot ID to scan. /// \param snapshot_id The ID of the snapshot. @@ -121,7 +121,7 @@ class ICEBERG_EXPORT TableScanBuilder { /// \brief Sets the schema to use for the scan. /// \param schema The schema to use. /// \return Reference to the builder. - TableScanBuilder& WithSchema(std::shared_ptr schema); + TableScanBuilder& WithProjectedSchema(std::shared_ptr schema); /// \brief Applies a filter expression to the scan. /// \param filter Filter expression to use. @@ -149,8 +149,8 @@ class ICEBERG_EXPORT TableScanBuilder { Result> Build(); private: - /// \brief Reference to the table to scan. - const Table& table_; + /// \brief the file I/O instance for reading manifests and data files. + std::shared_ptr file_io_; /// \brief column names to project in the scan. std::vector column_names_; /// \brief snapshot ID to scan, if specified. From 1b5d123c572b90479e9d568aad21b331633e8926 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Thu, 3 Jul 2025 17:04:33 +0800 Subject: [PATCH 18/23] resolve some comments --- src/iceberg/manifest_reader.h | 4 ++-- src/iceberg/table_scan.cc | 18 +++++------------- src/iceberg/table_scan.h | 14 ++------------ 3 files changed, 9 insertions(+), 27 deletions(-) diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index d417ee8f..4f098d12 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -55,7 +55,7 @@ class ICEBERG_EXPORT ManifestListReader { /// \param file_path Path to the manifest list file. /// \return A Result containing the reader or an error. Result> CreateManifestListReader( - const std::string& file_path) { + const std::string_view& file_path) { return NotImplemented("CreateManifestListReader is not implemented yet."); } @@ -63,7 +63,7 @@ Result> CreateManifestListReader( /// \param file_path Path to the manifest file. /// \return A Result containing the reader or an error. Result> CreateManifestReader( - const std::string& file_path) { + const std::string_view& file_path) { return NotImplemented("CreateManifestReader is not implemented yet."); } diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 16b31091..23d6d409 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -88,12 +88,9 @@ std::vector> GetMatchedDeletes( // implement FileScanTask FileScanTask::FileScanTask(std::shared_ptr file, std::vector> delete_files, - int64_t start, int64_t length, std::shared_ptr residual) : data_file_(std::move(file)), delete_files_(std::move(delete_files)), - start_(start), - length_(length), residual_(std::move(residual)) {} const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } @@ -102,12 +99,8 @@ const std::vector>& FileScanTask::delete_files() const return delete_files_; } -int64_t FileScanTask::start() const { return start_; } - -int64_t FileScanTask::length() const { return length_; } - int64_t FileScanTask::SizeBytes() const { - int64_t sizeInBytes = length_; + int64_t sizeInBytes = data_file_->file_size_in_bytes; std::ranges::for_each(delete_files_, [&sizeInBytes](const auto& delete_file) { sizeInBytes += delete_file->file_size_in_bytes; }); @@ -122,8 +115,9 @@ int64_t FileScanTask::EstimatedRowCount() const { if (data_file_->file_size_in_bytes == 0) { return 0; } + const auto sizeInBytes = data_file_->file_size_in_bytes; const double scannedFileFraction = - static_cast(length_) / data_file_->file_size_in_bytes; + static_cast(sizeInBytes) / data_file_->file_size_in_bytes; return static_cast(scannedFileFraction * data_file_->record_count); } @@ -276,17 +270,15 @@ Result>> DataScan::PlanFiles() const { } } - DeleteFileIndex delete_file_index(positional_delete_entries); - // TODO(gty404): build residual expression from filter std::shared_ptr residual; std::vector> tasks; + DeleteFileIndex delete_file_index(positional_delete_entries); for (const auto& data_entry : data_entries) { auto matched_deletes = GetMatchedDeletes(*data_entry, delete_file_index); const auto& data_file = data_entry->data_file; tasks.emplace_back(std::make_shared( - data_file, std::move(matched_deletes), 0, data_file->file_size_in_bytes, - std::move(residual))); + data_file, std::move(matched_deletes), std::move(residual))); } return tasks; } diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 9611e9eb..e4a38d93 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -46,8 +46,8 @@ class ICEBERG_EXPORT ScanTask { class ICEBERG_EXPORT FileScanTask : public ScanTask { public: FileScanTask(std::shared_ptr file, - std::vector> delete_files, int64_t start, - int64_t length, std::shared_ptr residual); + std::vector> delete_files, + std::shared_ptr residual); /// \brief The data file that should be read by this scan task. const std::shared_ptr& data_file() const; @@ -55,12 +55,6 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { /// \brief The delete files that should be read by this scan task. const std::vector>& delete_files() const; - /// \brief The byte offset in the data file where the scan should start. - int64_t start() const; - - /// \brief The length in bytes to scan from the start offset. - int64_t length() const; - /// \brief The residual expression to apply after scanning the data file. const std::shared_ptr& residual() const; @@ -73,10 +67,6 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { std::shared_ptr data_file_; /// \brief Delete files metadata. std::vector> delete_files_; - /// \brief Start byte offset. - int64_t start_; - /// \brief Length in bytes to scan. - int64_t length_; /// \brief Residual expression to apply. std::shared_ptr residual_; }; From 3dc2b385270c48465f884fcd858c08d8f575dce7 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Fri, 4 Jul 2025 08:41:31 +0800 Subject: [PATCH 19/23] resolve some comments --- src/iceberg/manifest_reader.h | 5 ++--- src/iceberg/table_scan.cc | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 4f098d12..48a6b2d2 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -55,15 +55,14 @@ class ICEBERG_EXPORT ManifestListReader { /// \param file_path Path to the manifest list file. /// \return A Result containing the reader or an error. Result> CreateManifestListReader( - const std::string_view& file_path) { + std::string_view file_path) { return NotImplemented("CreateManifestListReader is not implemented yet."); } /// \brief Creates a reader for a manifest file. /// \param file_path Path to the manifest file. /// \return A Result containing the reader or an error. -Result> CreateManifestReader( - const std::string_view& file_path) { +Result> CreateManifestReader(std::string_view file_path) { return NotImplemented("CreateManifestReader is not implemented yet."); } diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 23d6d409..c9ddde17 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -131,7 +131,6 @@ TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata TableScanBuilder& TableScanBuilder::WithColumnNames( std::vector column_names) { - column_names_.reserve(column_names.size()); column_names_ = std::move(column_names); return *this; } @@ -199,7 +198,7 @@ Result> TableScanBuilder::Build() { return InvalidArgument("Schema {} in snapshot {} is not found", *snapshot->schema_id, snapshot->snapshot_id); } - auto schema = *it; + const auto& schema = *it; if (column_names_.empty()) { context_.projected_schema = schema; @@ -277,8 +276,8 @@ Result>> DataScan::PlanFiles() const { for (const auto& data_entry : data_entries) { auto matched_deletes = GetMatchedDeletes(*data_entry, delete_file_index); const auto& data_file = data_entry->data_file; - tasks.emplace_back(std::make_shared( - data_file, std::move(matched_deletes), std::move(residual))); + tasks.emplace_back( + std::make_shared(data_file, std::move(matched_deletes), residual)); } return tasks; } From 702d0f444bd124b68bcec0d1a495376b1bf7f9dc Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Fri, 4 Jul 2025 16:45:02 +0800 Subject: [PATCH 20/23] resolve comments --- src/iceberg/table_scan.cc | 6 +++--- src/iceberg/table_scan.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index c9ddde17..54c933bc 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -219,7 +219,7 @@ Result> TableScanBuilder::Build() { } } - return std::make_unique(std::move(context_), file_io_); + return std::make_unique(std::move(context_), file_io_); } TableScan::TableScan(TableScanContext context, std::shared_ptr file_io) @@ -235,10 +235,10 @@ const TableScanContext& TableScan::context() const { return context_; } const std::shared_ptr& TableScan::io() const { return file_io_; } -DataScan::DataScan(TableScanContext context, std::shared_ptr file_io) +DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr file_io) : TableScan(std::move(context), std::move(file_io)) {} -Result>> DataScan::PlanFiles() const { +Result>> DataTableScan::PlanFiles() const { ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, CreateManifestListReader(context_.snapshot->manifest_list)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index e4a38d93..3297e4ba 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -187,10 +187,10 @@ class ICEBERG_EXPORT TableScan { }; /// \brief A scan that reads data files and applies delete files to filter rows. -class ICEBERG_EXPORT DataScan : public TableScan { +class ICEBERG_EXPORT DataTableScan : public TableScan { public: /// \brief Constructs a DataScan with the given context and file I/O. - DataScan(TableScanContext context, std::shared_ptr file_io); + DataTableScan(TableScanContext context, std::shared_ptr file_io); /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. From 4342887e9bee068861926a5c1646cccfccffc13f Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 7 Jul 2025 09:26:58 +0800 Subject: [PATCH 21/23] resolve some comments --- src/iceberg/table_scan.cc | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 54c933bc..86a8fb92 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -100,11 +100,11 @@ const std::vector>& FileScanTask::delete_files() const } int64_t FileScanTask::SizeBytes() const { - int64_t sizeInBytes = data_file_->file_size_in_bytes; - std::ranges::for_each(delete_files_, [&sizeInBytes](const auto& delete_file) { - sizeInBytes += delete_file->file_size_in_bytes; + int64_t size_in_bytes = data_file_->file_size_in_bytes; + std::ranges::for_each(delete_files_, [&size_in_bytes](const auto& delete_file) { + size_in_bytes += delete_file->file_size_in_bytes; }); - return sizeInBytes; + return size_in_bytes; } int32_t FileScanTask::FilesCount() const { @@ -115,9 +115,9 @@ int64_t FileScanTask::EstimatedRowCount() const { if (data_file_->file_size_in_bytes == 0) { return 0; } - const auto sizeInBytes = data_file_->file_size_in_bytes; + const auto size_in_bytes = data_file_->file_size_in_bytes; const double scannedFileFraction = - static_cast(sizeInBytes) / data_file_->file_size_in_bytes; + static_cast(size_in_bytes) / data_file_->file_size_in_bytes; return static_cast(scannedFileFraction * data_file_->record_count); } @@ -172,10 +172,9 @@ Result> TableScanBuilder::Build() { return InvalidArgument("No snapshot ID specified for table {}", table_metadata->table_uuid); } - auto iter = std::ranges::find_if(table_metadata->snapshots, - [&snapshot_id](const auto& snapshot) { - return snapshot->snapshot_id == *snapshot_id; - }); + auto iter = std::ranges::find_if( + table_metadata->snapshots, + [id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; }); if (iter == table_metadata->snapshots.end() || *iter == nullptr) { return NotFound("Snapshot with ID {} is not found", *snapshot_id); } @@ -191,8 +190,8 @@ Result> TableScanBuilder::Build() { } const auto& schemas = table_metadata->schemas; - const auto it = std::ranges::find_if(schemas, [&schema_id](const auto& schema) { - return schema->schema_id() == *schema_id; + const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) { + return schema->schema_id() == id; }); if (it == schemas.end()) { return InvalidArgument("Schema {} in snapshot {} is not found", @@ -210,7 +209,8 @@ Result> TableScanBuilder::Build() { // TODO(gty404): support case-insensitive column names auto field_opt = schema->GetFieldByName(column_name); if (!field_opt) { - return InvalidArgument("Column {} not found in schema", column_name); + return InvalidArgument("Column {} not found in schema '{}'", column_name, + *schema_id); } projected_fields.emplace_back(field_opt.value().get()); } From 0d9b89e5948e4ced042276490989f4e4e4f7a2bb Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 7 Jul 2025 09:58:32 +0800 Subject: [PATCH 22/23] Trigger CI From e4af0e70ff004b2040f487c1121d2fb92e0fd28f Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 7 Jul 2025 10:27:03 +0800 Subject: [PATCH 23/23] Trigger CI