Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: packed reader and writer new api #172

Merged
merged 8 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,28 @@ class S3Fixture : public benchmark::Fixture {
const char* secret_key = std::getenv(kEnvSecretKey);
const char* endpoint_url = std::getenv(kEnvS3EndpointUrl);
const char* file_path = std::getenv(kEnvFilePath);
auto conf = StorageConfig();
auto conf = ArrowFileSystemConfig();
conf.storage_type = "local";
conf.uri = "file:///tmp/";
if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) {
conf.uri = std::string(endpoint_url);
conf.access_key_id = std::string(access_key);
conf.access_key_value = std::string(secret_key);
conf.file_path = std::string(file_path);
conf.storage_type = "remote";
}
storage_config_ = std::move(conf);
config_ = std::move(conf);

auto base = std::string();
auto factory = std::make_shared<FileSystemFactory>();
auto result = factory->BuildFileSystem(conf, &base);
ArrowFileSystemSingleton::GetInstance().Init(conf);
ArrowFileSystemPtr fs = ArrowFileSystemSingleton::GetInstance().GetArrowFileSystem();
if (!result.ok()) {
state.SkipWithError("Failed to build file system!");
}
fs_ = std::move(result).value();
}

std::shared_ptr<arrow::fs::FileSystem> fs_;
StorageConfig storage_config_;
ArrowFileSystemConfig config_;
};

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
Expand Down Expand Up @@ -135,9 +136,10 @@ static void PackedWrite(benchmark::State& st,

for (auto _ : st) {
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.use_custom_part_upload = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf);
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
PackedRecordBatchWriter writer(fs, path, schema, conf, column_groups, buffer_size);
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand Down
21 changes: 2 additions & 19 deletions cpp/include/milvus-storage/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,12 @@ static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE +

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
static constexpr int64_t DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB

struct StorageConfig {
std::string uri = "";
std::string bucket_name = "";
std::string access_key_id = "";
std::string access_key_value = "";
std::string file_path = "";
std::string root_path = "";
std::string cloud_provider = "";
std::string region = "";
bool use_custom_part_upload_size = false;
int64_t part_size = 0;
parquet::WriterProperties writer_props = *parquet::default_writer_properties();

std::string ToString() const {
std::stringstream ss;
ss << "[uri=" << uri << ", bucket_name=" << bucket_name << ", root_path=" << root_path
<< ", cloud_provider=" << cloud_provider << ", region=" << region
<< ", use_custom_part_upload_size=" << use_custom_part_upload_size << "]";

return ss.str();
}
};

} // namespace milvus_storage
11 changes: 5 additions & 6 deletions cpp/include/milvus-storage/filesystem/azure/azure_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ class AzureFileSystemProducer : public FileSystemProducer {
public:
AzureFileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) override {
Result<ArrowFileSystemPtr> Make(const ArrowFileSystemConfig& config, std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri));
RETURN_ARROW_NOT_OK(uri_parser.Parse(config.uri));

arrow::fs::AzureOptions options;
auto account = storage_config.access_key_id;
auto key = storage_config.access_key_value;
auto account = config.access_key_id;
auto key = config.access_key_value;
if (account.empty() || key.empty()) {
return Status::InvalidArgument("Please provide azure storage account and azure secret key");
}
Expand All @@ -43,7 +42,7 @@ class AzureFileSystemProducer : public FileSystemProducer {

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options));
fs->CreateDir(*out_path);
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return ArrowFileSystemPtr(fs);
}
};

Expand Down
80 changes: 75 additions & 5 deletions cpp/include/milvus-storage/filesystem/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,33 @@

namespace milvus_storage {

using ArrowFileSystemPtr = std::shared_ptr<arrow::fs::FileSystem>;

struct ArrowFileSystemConfig {
std::string uri = "";
std::string storage_type = "local"; // [local, remote]
std::string bucket_name = "";
std::string access_key_id = "";
std::string access_key_value = "";
std::string cloud_provider = ""; // [aws, azure, gcp, tencent, aliyun]
std::string region = "";
bool use_custom_part_upload = false;

std::string ToString() const {
std::stringstream ss;
ss << "[uri=" << uri << ", storage_type=" << storage_type << ", bucket_name=" << bucket_name
<< ", cloud_provider=" << cloud_provider << ", region=" << region
<< ", use_custom_part_upload=" << use_custom_part_upload << "]";

return ss.str();
}
};

class FileSystemProducer {
public:
virtual ~FileSystemProducer() = default;

virtual Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) = 0;
virtual Result<ArrowFileSystemPtr> Make(const ArrowFileSystemConfig& config, std::string* out_path) = 0;

std::string UriToPath(const std::string& uri) {
arrow::util::Uri uri_parser;
Expand All @@ -41,10 +62,59 @@ class FileSystemProducer {
}
};

class FileSystemFactory {
class ArrowFileSystemSingleton {
private:
ArrowFileSystemSingleton(){};

public:
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const StorageConfig& storage_config,
std::string* out_path);
ArrowFileSystemSingleton(const ArrowFileSystemSingleton&) = delete;
ArrowFileSystemSingleton& operator=(const ArrowFileSystemSingleton&) = delete;

static ArrowFileSystemSingleton& GetInstance() {
static ArrowFileSystemSingleton instance;
return instance;
}

void Init(const ArrowFileSystemConfig& config) {
if (afs_ == nullptr) {
afs_ = createArrowFileSystem(config).value();
}
}

void Release() {}

ArrowFileSystemPtr GetArrowFileSystem() { return afs_; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why bother creating a ArrowFileSystemSingleton instance, if we have the afs_ instance already?

Additionally, we should assert the afs_ exist in such method.


private:
Result<ArrowFileSystemPtr> createArrowFileSystem(const ArrowFileSystemConfig& config);

private:
ArrowFileSystemPtr afs_ = nullptr;
};

enum class StorageType {
None = 0,
Local = 1,
Minio = 2,
Remote = 3,
};

enum class CloudProviderType : int8_t {
UNKNOWN = 0,
AWS = 1,
GCP = 2,
ALIYUN = 3,
AZURE = 4,
TENCENTCLOUD = 5,
};

static std::map<std::string, StorageType> StorageType_Map = {{"local", StorageType::Local},
{"remote", StorageType::Remote}};

static std::map<std::string, CloudProviderType> CloudProviderType_Map = {{"aws", CloudProviderType::AWS},
{"gcp", CloudProviderType::GCP},
{"aliyun", CloudProviderType::ALIYUN},
{"azure", CloudProviderType::AZURE},
{"tencent", CloudProviderType::TENCENTCLOUD}};

} // namespace milvus_storage
17 changes: 8 additions & 9 deletions cpp/include/milvus-storage/filesystem/s3/s3_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ class S3FileSystemProducer : public FileSystemProducer {
public:
S3FileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) override {
Result<ArrowFileSystemPtr> Make(const ArrowFileSystemConfig& config, std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri));
RETURN_ARROW_NOT_OK(uri_parser.Parse(config.uri));

if (!arrow::fs::IsS3Initialized()) {
arrow::fs::S3GlobalOptions global_options;
Expand All @@ -46,19 +45,19 @@ class S3FileSystemProducer : public FileSystemProducer {

arrow::fs::S3Options options;
options.endpoint_override = uri_parser.ToString();
options.ConfigureAccessKey(storage_config.access_key_id, storage_config.access_key_value);
options.ConfigureAccessKey(config.access_key_id, config.access_key_value);

if (!storage_config.region.empty()) {
options.region = storage_config.region;
if (!config.region.empty()) {
options.region = config.region;
}

if (storage_config.use_custom_part_upload_size) {
if (config.use_custom_part_upload) {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, MultiPartUploadS3FS::Make(options));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return ArrowFileSystemPtr(fs);
}

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return ArrowFileSystemPtr(fs);
}
};

Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace milvus_storage {
class ParquetFileWriter : public FileWriter {
public:
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
const StorageConfig& storage_config);

Expand All @@ -47,7 +47,7 @@ class ParquetFileWriter : public FileWriter {
Status Close() override;

private:
arrow::fs::FileSystem& fs_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
std::shared_ptr<arrow::Schema> schema_;
const std::string file_path_;
const StorageConfig& storage_config_;
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ class ColumnGroup {

Status Clear();

int GetTotalRows() const { return total_rows_; }
int64_t GetTotalRows() const { return total_rows_; }

private:
GroupId group_id_;
std::vector<size_t> batch_memory_usage_;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches_;
size_t memory_usage_;
std::vector<int> origin_column_indices_;
int total_rows_;
int64_t total_rows_;
};

struct ColumnGroupState {
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ColumnGroupWriter {
public:
ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
const StorageConfig& storage_config,
const std::vector<int>& origin_column_indices);
Expand All @@ -49,7 +49,7 @@ class ColumnGroupWriter {
ColumnGroup column_group_;
int flushed_batches_;
int flushed_count_;
int flushed_rows_;
int64_t flushed_rows_;
};

} // namespace milvus_storage
31 changes: 16 additions & 15 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param fs Arrow file system.
* @param path The root path of the packed files to read.
* @param origin_schema The original schema of data.
* @param paths Paths of the packed files to read.
* @param schema The schema of data to read.
* @param needed_columns The needed columns to read from the original schema.
* @param buffer_size The max buffer size of the packed reader.
*/
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
std::vector<std::string>& paths,
std::shared_ptr<arrow::Schema> schema,
std::set<int>& needed_columns,
int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

/**
* @brief Return the schema of needed columns.
Expand All @@ -75,22 +75,24 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {

private:
void init(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> origin_schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);
std::vector<std::string>& paths,
std::shared_ptr<arrow::Schema> origin_schema,
std::set<int>& needed_columns,
int64_t buffer_size);

Status initNeededSchema(const std::set<int>& needed_columns, const std::shared_ptr<arrow::Schema> origin_schema);
Status initNeededSchema(std::set<int>& needed_columns, std::shared_ptr<arrow::Schema> origin_schema);

Status initColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
Status initColumnOffsets(arrow::fs::FileSystem& fs,
std::set<int>& needed_columns,
size_t num_fields,
std::vector<std::string>& paths);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();

std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;

private:
std::shared_ptr<arrow::Schema> needed_schema_;
std::shared_ptr<arrow::Schema> origin_schema_;

size_t memory_limit_;
size_t buffer_available_;
Expand All @@ -102,9 +104,8 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::set<int> needed_paths_;
std::set<std::string> needed_paths_;
std::vector<std::vector<size_t>> row_group_sizes_;
const std::string file_path_;
int read_count_;
};

Expand Down
Loading