Skip to content

Commit

Permalink
[feat]: packed reader partial fields read (#170)
Browse files Browse the repository at this point in the history
related: #169

---------

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Feb 7, 2025
1 parent 7475494 commit 1a5bbad
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 22 deletions.
41 changes: 33 additions & 8 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,57 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
/**
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param fs Arrow file system.
* @param path The root path of the packed files to read.
* @param origin_schema The original schema of data.
* @param needed_columns The needed columns to read from the original schema.
* @param buffer_size The max buffer size of the packed reader.
*/
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

/**
* @brief Return the schema of needed columns.
*/
std::shared_ptr<arrow::Schema> schema() const override;

/**
* @brief Read next batch of arrow record batch to the specifed pointer.
* If the data is drained, return nullptr.
*
* @param batch The record batch pointer specified to read.
*/
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

/**
* @brief Close the reader and clean up resources.
*/
arrow::Status Close() override;

private:
void initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);
void init(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);

Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
Status initNeededSchema(const std::set<int>& needed_columns, const std::shared_ptr<arrow::Schema> origin_schema);

Status initColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();

std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;

private:
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::Schema> needed_schema_;
std::shared_ptr<arrow::Schema> origin_schema_;

size_t memory_limit_;
size_t buffer_available_;
Expand Down
53 changes: 39 additions & 14 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,37 @@ namespace milvus_storage {

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: file_path_(file_path),
schema_(schema),
origin_schema_(origin_schema),
buffer_available_(buffer_size),
memory_limit_(buffer_size),
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
initialize(fs, file_path_, schema_, needed_columns, buffer_size);
init(fs, file_path_, origin_schema_, needed_columns, buffer_size);
}

void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size) {
auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields());
void PackedRecordBatchReader::init(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size) {
// init needed schema
auto status = initNeededSchema(needed_columns, origin_schema);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}

// init column offsets
status = initColumnOffsets(fs, needed_columns, origin_schema->num_fields());
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}

// init arrow file readers
for (auto i : needed_paths_) {
auto result = MakeArrowFileReader(fs, ConcatenateFilePath(file_path_, std::to_string(i)));
if (!result.ok()) {
Expand All @@ -61,6 +70,7 @@ void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
file_readers_.emplace_back(std::move(result.value()));
}

// init uncompressed row group sizes from metadata
for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata();

Expand All @@ -80,9 +90,9 @@ void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
tables_.resize(needed_paths_.size(), std::queue<std::shared_ptr<arrow::Table>>());
}

Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& fs,
const std::set<int>& needed_columns,
size_t num_fields) {
Status PackedRecordBatchReader::initColumnOffsets(arrow::fs::FileSystem& fs,
const std::set<int>& needed_columns,
size_t num_fields) {
std::string path = ConcatenateFilePath(file_path_, std::to_string(0));
auto reader = MakeArrowFileReader(fs, path);
if (!reader.ok()) {
Expand All @@ -108,7 +118,22 @@ Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& f
return Status::OK();
}

std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return schema_; }
Status PackedRecordBatchReader::initNeededSchema(const std::set<int>& needed_columns,
const std::shared_ptr<arrow::Schema> schema) {
std::vector<std::shared_ptr<arrow::Field>> needed_fields;

for (int col : needed_columns) {
if (col < 0 || col >= schema->num_fields()) {
return Status::ReaderError("Specified column index" + std::to_string(col) + " is out of bounds. Schema has " +
std::to_string(schema->num_fields()) + " fields.");
}
needed_fields.push_back(schema->field(col));
}
needed_schema_ = std::make_shared<arrow::Schema>(needed_fields);
return Status::OK();
}

std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return needed_schema_; }

arrow::Status PackedRecordBatchReader::advanceBuffer() {
std::vector<std::vector<int>> rgs_to_read(file_readers_.size());
Expand Down Expand Up @@ -208,7 +233,7 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat
// Determine the maximum contiguous slice across all tables
auto batch_data = chunk_manager_->SliceChunksByMaxContiguousSlice(row_limit_ - absolute_row_position_, tables_);
absolute_row_position_ += chunk_manager_->GetChunkSize();
*out = arrow::RecordBatch::Make(schema_, chunk_manager_->GetChunkSize(), std::move(batch_data));
*out = arrow::RecordBatch::Make(needed_schema_, chunk_manager_->GetChunkSize(), std::move(batch_data));
return arrow::Status::OK();
}

Expand Down
17 changes: 17 additions & 0 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,21 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
ValidateTableData(table);
}

TEST_F(PackedIntegrationTest, TestPartialField) {
int batch_size = 1000;

PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());

std::set<int> needed_columns = {0, 2};
PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_EQ(table->fields()[0], schema_->field(0));
ASSERT_EQ(table->fields()[1], schema_->field(2));
ASSERT_EQ(table->schema(), pr.schema());
}

} // namespace milvus_storage

0 comments on commit 1a5bbad

Please sign in to comment.