-
Notifications
You must be signed in to change notification settings - Fork 40
feat: basic table scan planning #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
src/iceberg/manifest_reader.h
Outdated
/// \param file_path Path to the manifest list file. | ||
/// \return A Result containing the reader or an error. | ||
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader( | ||
const std::string& file_path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not enough. At least we need extra parameters like table_format_version
and file_io
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, should we use std::string_view
for path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to provide a ManifestListReaderBuilder/ManifestReaderBuilder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know yet. It depends on how we will use them. BTW @dongxiao1198 will work on manifest reading.
src/iceberg/manifest_reader.h
Outdated
/// \param file_path Path to the manifest list file. | ||
/// \return A Result containing the reader or an error. | ||
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader( | ||
const std::string& file_path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know yet. It depends on how we will use them. BTW @dongxiao1198 will work on manifest reading.
}; | ||
|
||
/// \brief Represents a task to scan a portion of a data file. | ||
class ICEBERG_EXPORT FileScanTask : public ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts about FileScanTask
:
- Should we remove
ScanTask
abstraction above? If we remove the abstraction, we can directly use aggregate initialization to create a task. Otherwise we may need to expand the constructor every time a new parameter is required. - If we do (1) above, is it possible also to make it a simple struct by removing all functions (as they are all trivial accessors).
- Should we add fields (a.k.a. spec and partition_value) from Java
PartitionScanTask
to support partitioning? We can add them later but a TODO comment is desirable. - Should we combine
start
andlength
, and wrap them bystd::optional
? I believe they are not required at all times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I initially expected it to just be a struct, but since the previous comments suggested doing an abstraction, I referred to the design in iceberg-java/iceberg-python.
- Partition spec and value can be obtained from DataFile and Snapshot, and we can add these interfaces when needed for subsequent PR
- Sure, I will modify it to optional, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition spec and value can be obtained from DataFile and Snapshot
That's a good point
src/iceberg/table_scan.h
Outdated
/// \brief Sets the schema to use for the scan. | ||
/// \param schema The schema to use. | ||
/// \return Reference to the builder. | ||
TableScanBuilder& WithSchema(std::shared_ptr<Schema> schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this. We just need schema of a specific snapshot id which can be obtained via table_metadata. Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to specify the projected schema without specifying the column name, and I have modified it to WithProjectedSchema.
/// \brief snapshot ID to scan, if specified. | ||
std::optional<int64_t> snapshot_id_; | ||
/// \brief Context for the scan, including snapshot, schema, and filter. | ||
TableScanContext context_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Java version of TableScanContext
, column_names_
and snapshot_id_
are also stored in it. Should we follow the same pattern? If we do this, it seems that TableScanBuilder
is indeed a TableScanContextBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally expected that TableScanContext would be context information retained after converting various input parameters, and that what was no longer needed in the subsequent file scanning process would be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean you will remove TableScanContext
? It depends on whether you will reuse it to plan files for metadata tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableScanContext will be relied on during subsequent plan files
data_entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber); | ||
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { | ||
// Additional filtering logic here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the additional filtering logic? Did you mean to further check if the delete files can be filtered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFiles only need to retain DeleteFiles with a sequence greater than their own?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs per equality and positional deletes. I think there is a pretty good overview here: https://iceberg.apache.org/spec/#scan-planning
src/iceberg/table_scan.cc
Outdated
return sizeInBytes; | ||
} | ||
|
||
int32_t FileScanTask::files_count() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we need to rename it to FilesCount()
. @lidavidm suggestion?
}; | ||
|
||
/// \brief Represents a task to scan a portion of a data file. | ||
class ICEBERG_EXPORT FileScanTask : public ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition spec and value can be obtained from DataFile and Snapshot
That's a good point
/// \brief snapshot ID to scan, if specified. | ||
std::optional<int64_t> snapshot_id_; | ||
/// \brief Context for the scan, including snapshot, schema, and filter. | ||
TableScanContext context_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean you will remove TableScanContext
? It depends on whether you will reuse it to plan files for metadata tables.
|
||
/// \brief Plans the scan tasks by resolving manifests and data files. | ||
/// \return A Result containing scan tasks or an error. | ||
virtual Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
virtual Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const = 0; | |
virtual Result<std::vector<std::shared_ptr<ScanTask>>> PlanTasks() const = 0; |
I remember that @lishuxu has commented to rename it to PlanTasks
. Do we need to modify the signature as above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In iceberg-java, the result of planFiles is a one-to-one correspondence between the entire file and FileScanTask. planTask is the result of splitting the result of planFiles. Each scanTask may contain multiple files, or only a part of one file. I am not sure if the split feature is needed now, so I did not use PlanTask.
src/iceberg/table_scan.h
Outdated
}; | ||
|
||
/// \brief A scan that reads data files and applies delete files to filter rows. | ||
class ICEBERG_EXPORT DataScan : public TableScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little bit confused about the name of Scan and ScanTask across different implementations. Should this be DataTableScan
which produces FileScanTask
? For DataScan
, I think it should produce a group of DataTask
which contains rows of FileScanTask
.
Simply put:
Scan -> ScanTask
TableScan -> FileScanTask
DataScan -> DataTask
DataScan inherits TableScan inherits Scan
DataTask inherits FileScanTask inherits ScanTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think we can constantly evolve this design because APIs can be unstable before the 1.0.0 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At PyIceberg we (tried to) copied the Java structure, but in the end I think it is too much OOP for Python. Maybe good to start small in C++ as well. While we can change APIs until 1.0.0, I think it is important to get this one right pretty early on, since this is the main integration point for query engines.
src/iceberg/table_scan.cc
Outdated
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file, | ||
std::vector<std::shared_ptr<DataFile>> delete_files, | ||
int64_t start, int64_t length, | ||
std::shared_ptr<Expression> residual) | ||
: data_file_(std::move(file)), | ||
delete_files_(std::move(delete_files)), | ||
start_(start), | ||
length_(length), | ||
residual_(std::move(residual)) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this FileScanTask
is inspired by PyIceberg, while I think it might be better to follow Java: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/FileScanTask.java
The name FileScanTask
implies to me, that it will read a full file, and then the start
and end
do not make sense. This is in the case you want to split up the work by row-group, rather than file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the start and end are not required, I will remove them from FileScanTask.
src/iceberg/manifest_reader.h
Outdated
/// \param file_path Path to the manifest file. | ||
/// \return A Result containing the reader or an error. | ||
Result<std::unique_ptr<ManifestReader>> CreateManifestReader( | ||
const std::string_view& file_path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
private: | ||
/// \brief Index by sequence number for quick filtering | ||
std::multimap<int64_t, ManifestEntry*> sequence_index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::multimap<int64_t, ManifestEntry*> sequence_index; | |
std::multimap<int64_t, ManifestEntry*> sequence_index_; |
For private variables? Besides, why not std::map<int64_t, std::vector<const ManifestEntry*>>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is for the convenience of searching by sequence as mentioned earlier.
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this incorrect? Since it find the lowerbound and traverse all the sequence numbers above data_sequence_number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the meaning here is to find all DeleteFiles corresponding to this DataFile. Only those with a sequence number higher than the DataFile need to be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Higher or equal for positional deletes: https://iceberg.apache.org/spec/#scan-planning
src/iceberg/table_scan.cc
Outdated
column_names_.reserve(column_names.size()); | ||
column_names_ = std::move(column_names); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need reserve before move. std::move
usally exchange memory between column_names_
and column_names
.
src/iceberg/table_scan.cc
Outdated
return InvalidArgument("Schema {} in snapshot {} is not found", | ||
*snapshot->schema_id, snapshot->snapshot_id); | ||
} | ||
auto schema = *it; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto schema = *it; | |
const auto& schema = *it; |
src/iceberg/table_scan.cc
Outdated
auto matched_deletes = GetMatchedDeletes(*data_entry, delete_file_index); | ||
const auto& data_file = data_entry->data_file; | ||
tasks.emplace_back(std::make_shared<FileScanTask>( | ||
data_file, std::move(matched_deletes), std::move(residual))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data_file, std::move(matched_deletes), std::move(residual))); | |
data_file, std::move(matched_deletes), residual)); |
Seems that we cannot move residual
because it's used multiple times in the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this has not been implemented yet, and it is expected that this residual will be shared among all tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just have a comment w.r.t. the table scan name. Elsewhere LGTM.
src/iceberg/table_scan.h
Outdated
}; | ||
|
||
/// \brief A scan that reads data files and applies delete files to filter rows. | ||
class ICEBERG_EXPORT DataScan : public TableScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class ICEBERG_EXPORT DataScan : public TableScan { | |
class ICEBERG_EXPORT DataTableScan : public TableScan { |
I think DataScan
is ambiguous and people may assume that it will return rows of data instead of the file list. DataTable
is better to differentiate from MetadataTable
.
|
||
namespace { | ||
/// \brief Use indexed data structures for efficient lookups | ||
class DeleteFileIndex { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In intuition this can build once ( push all element and sort one), then when querying, return a slice for it. But this also looks good to me
} | ||
|
||
int64_t FileScanTask::SizeBytes() const { | ||
int64_t sizeInBytes = data_file_->file_size_in_bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int64_t sizeInBytes = data_file_->file_size_in_bytes; | |
int64_t size_in_bytes = data_file_->file_size_in_bytes; |
table_metadata->table_uuid); | ||
} | ||
auto iter = std::ranges::find_if(table_metadata->snapshots, | ||
[&snapshot_id](const auto& snapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the way below to eval *snapshot_id
only once
[&snapshot_id](const auto& snapshot) { | |
[id = *snapshot_id](const auto& snapshot) { |
} | ||
|
||
const auto& schemas = table_metadata->schemas; | ||
const auto it = std::ranges::find_if(schemas, [&schema_id](const auto& schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
// TODO(gty404): support case-insensitive column names | ||
auto field_opt = schema->GetFieldByName(column_name); | ||
if (!field_opt) { | ||
return InvalidArgument("Column {} not found in schema", column_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add schema to output?
@@ -107,4 +108,8 @@ const std::vector<SnapshotLogEntry>& Table::history() const { | |||
|
|||
const std::shared_ptr<FileIO>& Table::io() const { return io_; } | |||
|
|||
std::unique_ptr<TableScanBuilder> Table::NewScan() const { | |||
return std::make_unique<TableScanBuilder>(metadata_, io_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about passing in the Table
instead? That has all the metadata, and also the io
class DeleteFileIndex { | ||
public: | ||
/// \brief Build the index from a list of manifest entries. | ||
explicit DeleteFileIndex(const std::vector<std::unique_ptr<ManifestEntry>>& entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add this right away, or defer this to a later PR? Previously at PyIceberg we threw an exception when we encountered a positional or equality delete.
// TODO(gty404): check if the delete entry contains the data entry's file path | ||
matched_deletes.emplace_back(delete_entry->data_file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this filter, it will likely explode the number of relevant entries for each of the data files.
data_entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber); | ||
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { | ||
// Additional filtering logic here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs per equality and positional deletes. I think there is a pretty good overview here: https://iceberg.apache.org/spec/#scan-planning
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Higher or equal for positional deletes: https://iceberg.apache.org/spec/#scan-planning
Introducing basic scan table data interface