Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: packed reader partial fields read #170

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,57 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
/**
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param fs Arrow file system.
* @param path The root path of the packed files to read.
* @param origin_schema The original schema of data.
* @param needed_columns The needed columns to read from the original schema.
* @param buffer_size The max buffer size of the packed reader.
*/
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

/**
* @brief Return the schema of needed columns.
*/
std::shared_ptr<arrow::Schema> schema() const override;

/**
* @brief Read next batch of arrow record batch to the specifed pointer.
* If the data is drained, return nullptr.
*
* @param batch The record batch pointer specified to read.
*/
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

/**
* @brief Close the reader and clean up resources.
*/
arrow::Status Close() override;

private:
void initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);
void init(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);

Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
Status initNeededSchema(const std::set<int>& needed_columns, const std::shared_ptr<arrow::Schema> origin_schema);

Status initColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();

std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;

private:
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::Schema> needed_schema_;
std::shared_ptr<arrow::Schema> origin_schema_;

size_t memory_limit_;
size_t buffer_available_;
Expand Down
53 changes: 39 additions & 14 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,37 @@ namespace milvus_storage {

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: file_path_(file_path),
schema_(schema),
origin_schema_(origin_schema),
buffer_available_(buffer_size),
memory_limit_(buffer_size),
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
initialize(fs, file_path_, schema_, needed_columns, buffer_size);
init(fs, file_path_, origin_schema_, needed_columns, buffer_size);
}

void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size) {
auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields());
void PackedRecordBatchReader::init(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size) {
// init needed schema
auto status = initNeededSchema(needed_columns, origin_schema);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}

// init column offsets
status = initColumnOffsets(fs, needed_columns, origin_schema->num_fields());
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}

// init arrow file readers
for (auto i : needed_paths_) {
auto result = MakeArrowFileReader(fs, ConcatenateFilePath(file_path_, std::to_string(i)));
if (!result.ok()) {
Expand All @@ -61,6 +70,7 @@ void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
file_readers_.emplace_back(std::move(result.value()));
}

// init uncompressed row group sizes from metadata
for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata();

Expand All @@ -80,9 +90,9 @@ void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
tables_.resize(needed_paths_.size(), std::queue<std::shared_ptr<arrow::Table>>());
}

Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& fs,
const std::set<int>& needed_columns,
size_t num_fields) {
Status PackedRecordBatchReader::initColumnOffsets(arrow::fs::FileSystem& fs,
const std::set<int>& needed_columns,
size_t num_fields) {
std::string path = ConcatenateFilePath(file_path_, std::to_string(0));
auto reader = MakeArrowFileReader(fs, path);
if (!reader.ok()) {
Expand All @@ -108,7 +118,22 @@ Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& f
return Status::OK();
}

std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return schema_; }
Status PackedRecordBatchReader::initNeededSchema(const std::set<int>& needed_columns,
const std::shared_ptr<arrow::Schema> schema) {
std::vector<std::shared_ptr<arrow::Field>> needed_fields;

for (int col : needed_columns) {
if (col < 0 || col >= schema->num_fields()) {
return Status::ReaderError("Specified column index" + std::to_string(col) + " is out of bounds. Schema has " +
std::to_string(schema->num_fields()) + " fields.");
}
needed_fields.push_back(schema->field(col));
}
needed_schema_ = std::make_shared<arrow::Schema>(needed_fields);
return Status::OK();
}

std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return needed_schema_; }

arrow::Status PackedRecordBatchReader::advanceBuffer() {
std::vector<std::vector<int>> rgs_to_read(file_readers_.size());
Expand Down Expand Up @@ -208,7 +233,7 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat
// Determine the maximum contiguous slice across all tables
auto batch_data = chunk_manager_->SliceChunksByMaxContiguousSlice(row_limit_ - absolute_row_position_, tables_);
absolute_row_position_ += chunk_manager_->GetChunkSize();
*out = arrow::RecordBatch::Make(schema_, chunk_manager_->GetChunkSize(), std::move(batch_data));
*out = arrow::RecordBatch::Make(needed_schema_, chunk_manager_->GetChunkSize(), std::move(batch_data));
return arrow::Status::OK();
}

Expand Down
17 changes: 17 additions & 0 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,21 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
ValidateTableData(table);
}

TEST_F(PackedIntegrationTest, TestPartialField) {
int batch_size = 1000;

PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());

std::set<int> needed_columns = {0, 2};
PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_EQ(table->fields()[0], schema_->field(0));
ASSERT_EQ(table->fields()[1], schema_->field(2));
ASSERT_EQ(table->schema(), pr.schema());
}

} // namespace milvus_storage