Skip to content

Commit

Permalink
feat: packed reader and writer new api (#172)
Browse files Browse the repository at this point in the history
main feat for the patch:
* Column groups and file paths are explicitly defined by users. The
packed writer will not split column groups by memory. It follows the
user-defined column groups to split arrow records and write into the
corresponding file path.
* Change factory arrow filesystem to singleton arrow file system to
support Milvus init core.
* Separate arrow fs config from storage config.
related: #171

---------

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Feb 17, 2025
1 parent 1a5bbad commit 58df0bb
Show file tree
Hide file tree
Showing 30 changed files with 378 additions and 582 deletions.
18 changes: 10 additions & 8 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,28 @@ class S3Fixture : public benchmark::Fixture {
const char* secret_key = std::getenv(kEnvSecretKey);
const char* endpoint_url = std::getenv(kEnvS3EndpointUrl);
const char* file_path = std::getenv(kEnvFilePath);
auto conf = StorageConfig();
auto conf = ArrowFileSystemConfig();
conf.storage_type = "local";
conf.uri = "file:///tmp/";
if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) {
conf.uri = std::string(endpoint_url);
conf.access_key_id = std::string(access_key);
conf.access_key_value = std::string(secret_key);
conf.file_path = std::string(file_path);
conf.storage_type = "remote";
}
storage_config_ = std::move(conf);
config_ = std::move(conf);

auto base = std::string();
auto factory = std::make_shared<FileSystemFactory>();
auto result = factory->BuildFileSystem(conf, &base);
ArrowFileSystemSingleton::GetInstance().Init(conf);
ArrowFileSystemPtr fs = ArrowFileSystemSingleton::GetInstance().GetArrowFileSystem();
if (!result.ok()) {
state.SkipWithError("Failed to build file system!");
}
fs_ = std::move(result).value();
}

std::shared_ptr<arrow::fs::FileSystem> fs_;
StorageConfig storage_config_;
ArrowFileSystemConfig config_;
};

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
Expand Down Expand Up @@ -135,9 +136,10 @@ static void PackedWrite(benchmark::State& st,

for (auto _ : st) {
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.use_custom_part_upload = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf);
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
PackedRecordBatchWriter writer(fs, path, schema, conf, column_groups, buffer_size);
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand Down
21 changes: 2 additions & 19 deletions cpp/include/milvus-storage/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,12 @@ static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE +

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
static constexpr int64_t DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB

struct StorageConfig {
std::string uri = "";
std::string bucket_name = "";
std::string access_key_id = "";
std::string access_key_value = "";
std::string file_path = "";
std::string root_path = "";
std::string cloud_provider = "";
std::string region = "";
bool use_custom_part_upload_size = false;
int64_t part_size = 0;
parquet::WriterProperties writer_props = *parquet::default_writer_properties();

std::string ToString() const {
std::stringstream ss;
ss << "[uri=" << uri << ", bucket_name=" << bucket_name << ", root_path=" << root_path
<< ", cloud_provider=" << cloud_provider << ", region=" << region
<< ", use_custom_part_upload_size=" << use_custom_part_upload_size << "]";

return ss.str();
}
};

} // namespace milvus_storage
11 changes: 5 additions & 6 deletions cpp/include/milvus-storage/filesystem/azure/azure_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ class AzureFileSystemProducer : public FileSystemProducer {
public:
AzureFileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) override {
Result<ArrowFileSystemPtr> Make(const ArrowFileSystemConfig& config, std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri));
RETURN_ARROW_NOT_OK(uri_parser.Parse(config.uri));

arrow::fs::AzureOptions options;
auto account = storage_config.access_key_id;
auto key = storage_config.access_key_value;
auto account = config.access_key_id;
auto key = config.access_key_value;
if (account.empty() || key.empty()) {
return Status::InvalidArgument("Please provide azure storage account and azure secret key");
}
Expand All @@ -43,7 +42,7 @@ class AzureFileSystemProducer : public FileSystemProducer {

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options));
fs->CreateDir(*out_path);
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return ArrowFileSystemPtr(fs);
}
};

Expand Down
91 changes: 86 additions & 5 deletions cpp/include/milvus-storage/filesystem/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,39 @@
#include <arrow/util/uri.h>
#include <memory>
#include <string>
#include <mutex>
#include "milvus-storage/common/result.h"
#include "milvus-storage/common/config.h"

namespace milvus_storage {

using ArrowFileSystemPtr = std::shared_ptr<arrow::fs::FileSystem>;

struct ArrowFileSystemConfig {
std::string uri = "";
std::string storage_type = "local"; // [local, remote]
std::string bucket_name = "";
std::string access_key_id = "";
std::string access_key_value = "";
std::string cloud_provider = ""; // [aws, azure, gcp, tencent, aliyun]
std::string region = "";
bool use_custom_part_upload = false;

std::string ToString() const {
std::stringstream ss;
ss << "[uri=" << uri << ", storage_type=" << storage_type << ", bucket_name=" << bucket_name
<< ", cloud_provider=" << cloud_provider << ", region=" << region
<< ", use_custom_part_upload=" << use_custom_part_upload << "]";

return ss.str();
}
};

class FileSystemProducer {
public:
virtual ~FileSystemProducer() = default;

virtual Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) = 0;
virtual Result<ArrowFileSystemPtr> Make(const ArrowFileSystemConfig& config, std::string* out_path) = 0;

std::string UriToPath(const std::string& uri) {
arrow::util::Uri uri_parser;
Expand All @@ -41,10 +63,69 @@ class FileSystemProducer {
}
};

class FileSystemFactory {
class ArrowFileSystemSingleton {
private:
ArrowFileSystemSingleton(){};

public:
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const StorageConfig& storage_config,
std::string* out_path);
ArrowFileSystemSingleton(const ArrowFileSystemSingleton&) = delete;
ArrowFileSystemSingleton& operator=(const ArrowFileSystemSingleton&) = delete;

static ArrowFileSystemSingleton& GetInstance() {
static ArrowFileSystemSingleton instance;
return instance;
}

void Init(const ArrowFileSystemConfig& config) {
std::lock_guard<std::mutex> lock(mutex_);
if (afs_ == nullptr) {
afs_ = createArrowFileSystem(config).value();
}
}

void Release() {
std::lock_guard<std::mutex> lock(mutex_);
if (afs_ != nullptr) {
afs_.reset();
}
}

ArrowFileSystemPtr GetArrowFileSystem() {
std::lock_guard<std::mutex> lock(mutex_);
return afs_;
}

private:
Result<ArrowFileSystemPtr> createArrowFileSystem(const ArrowFileSystemConfig& config);

private:
ArrowFileSystemPtr afs_ = nullptr;
std::mutex mutex_;
};

enum class StorageType {
None = 0,
Local = 1,
Minio = 2,
Remote = 3,
};

enum class CloudProviderType : int8_t {
UNKNOWN = 0,
AWS = 1,
GCP = 2,
ALIYUN = 3,
AZURE = 4,
TENCENTCLOUD = 5,
};

static std::map<std::string, StorageType> StorageType_Map = {{"local", StorageType::Local},
{"remote", StorageType::Remote}};

static std::map<std::string, CloudProviderType> CloudProviderType_Map = {{"aws", CloudProviderType::AWS},
{"gcp", CloudProviderType::GCP},
{"aliyun", CloudProviderType::ALIYUN},
{"azure", CloudProviderType::AZURE},
{"tencent", CloudProviderType::TENCENTCLOUD}};

} // namespace milvus_storage
17 changes: 8 additions & 9 deletions cpp/include/milvus-storage/filesystem/s3/s3_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ class S3FileSystemProducer : public FileSystemProducer {
public:
S3FileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) override {
Result<ArrowFileSystemPtr> Make(const ArrowFileSystemConfig& config, std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri));
RETURN_ARROW_NOT_OK(uri_parser.Parse(config.uri));

if (!arrow::fs::IsS3Initialized()) {
arrow::fs::S3GlobalOptions global_options;
Expand All @@ -46,19 +45,19 @@ class S3FileSystemProducer : public FileSystemProducer {

arrow::fs::S3Options options;
options.endpoint_override = uri_parser.ToString();
options.ConfigureAccessKey(storage_config.access_key_id, storage_config.access_key_value);
options.ConfigureAccessKey(config.access_key_id, config.access_key_value);

if (!storage_config.region.empty()) {
options.region = storage_config.region;
if (!config.region.empty()) {
options.region = config.region;
}

if (storage_config.use_custom_part_upload_size) {
if (config.use_custom_part_upload) {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, MultiPartUploadS3FS::Make(options));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return ArrowFileSystemPtr(fs);
}

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return ArrowFileSystemPtr(fs);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/format/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FileRecordBatchReader : public arrow::RecordBatchReader {
* @param row_group_offset The starting row group index to read.
* @param row_group_num The number of row groups to read.
*/
FileRecordBatchReader(arrow::fs::FileSystem& fs,
FileRecordBatchReader(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE,
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace milvus_storage {
class ParquetFileWriter : public FileWriter {
public:
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
const StorageConfig& storage_config);

Expand All @@ -47,7 +47,7 @@ class ParquetFileWriter : public FileWriter {
Status Close() override;

private:
arrow::fs::FileSystem& fs_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
std::shared_ptr<arrow::Schema> schema_;
const std::string file_path_;
const StorageConfig& storage_config_;
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ class ColumnGroup {

Status Clear();

int GetTotalRows() const { return total_rows_; }
int64_t GetTotalRows() const { return total_rows_; }

private:
GroupId group_id_;
std::vector<size_t> batch_memory_usage_;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches_;
size_t memory_usage_;
std::vector<int> origin_column_indices_;
int total_rows_;
int64_t total_rows_;
};

struct ColumnGroupState {
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ColumnGroupWriter {
public:
ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
const StorageConfig& storage_config,
const std::vector<int>& origin_column_indices);
Expand All @@ -49,7 +49,7 @@ class ColumnGroupWriter {
ColumnGroup column_group_;
int flushed_batches_;
int flushed_count_;
int flushed_rows_;
int64_t flushed_rows_;
};

} // namespace milvus_storage
39 changes: 20 additions & 19 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param fs Arrow file system.
* @param path The root path of the packed files to read.
* @param origin_schema The original schema of data.
* @param paths Paths of the packed files to read.
* @param schema The schema of data to read.
* @param needed_columns The needed columns to read from the original schema.
* @param buffer_size The max buffer size of the packed reader.
*/
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
PackedRecordBatchReader(std::shared_ptr<arrow::fs::FileSystem> fs,
std::vector<std::string>& paths,
std::shared_ptr<arrow::Schema> schema,
std::set<int>& needed_columns,
int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

/**
* @brief Return the schema of needed columns.
Expand All @@ -74,23 +74,25 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
void init(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);

Status initNeededSchema(const std::set<int>& needed_columns, const std::shared_ptr<arrow::Schema> origin_schema);

Status initColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
void init(std::shared_ptr<arrow::fs::FileSystem> fs,
std::vector<std::string>& paths,
std::shared_ptr<arrow::Schema> origin_schema,
std::set<int>& needed_columns,
int64_t buffer_size);

Status initNeededSchema(std::set<int>& needed_columns, std::shared_ptr<arrow::Schema> origin_schema);

Status initColumnOffsets(std::shared_ptr<arrow::fs::FileSystem> fs,
std::set<int>& needed_columns,
size_t num_fields,
std::vector<std::string>& paths);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();

std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;

private:
std::shared_ptr<arrow::Schema> needed_schema_;
std::shared_ptr<arrow::Schema> origin_schema_;

size_t memory_limit_;
size_t buffer_available_;
Expand All @@ -102,9 +104,8 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::set<int> needed_paths_;
std::set<std::string> needed_paths_;
std::vector<std::vector<size_t>> row_group_sizes_;
const std::string file_path_;
int read_count_;
};

Expand Down
Loading

0 comments on commit 58df0bb

Please sign in to comment.