diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index b840e638..9d657a77 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -299,6 +299,20 @@ function(resolve_zlib_dependency) endfunction() +# ---------------------------------------------------------------------- +# Zstd + +function(resolve_zstd_dependency) + find_package(zstd CONFIG) + if(zstd_FOUND) + list(APPEND ICEBERG_SYSTEM_DEPENDENCIES zstd) + message(STATUS "Found zstd, version: ${zstd_VERSION}") + set(ICEBERG_SYSTEM_DEPENDENCIES + ${ICEBERG_SYSTEM_DEPENDENCIES} + PARENT_SCOPE) + endif() +endfunction() + resolve_zlib_dependency() resolve_nanoarrow_dependency() resolve_nlohmann_json_dependency() @@ -306,4 +320,5 @@ resolve_nlohmann_json_dependency() if(ICEBERG_BUILD_BUNDLE) resolve_arrow_dependency() resolve_avro_dependency() + resolve_zstd_dependency() endif() diff --git a/example/demo_example.cc b/example/demo_example.cc index 79e5b99e..aa63826b 100644 --- a/example/demo_example.cc +++ b/example/demo_example.cc @@ -19,13 +19,16 @@ #include -#include "iceberg/arrow/demo_arrow.h" -#include "iceberg/avro/demo_avro.h" -#include "iceberg/demo.h" +#include "iceberg/avro/avro_reader.h" +#include "iceberg/file_reader.h" int main() { - std::cout << iceberg::Demo().print() << std::endl; - std::cout << iceberg::arrow::DemoArrow().print() << std::endl; - std::cout << iceberg::avro::DemoAvro().print() << std::endl; + iceberg::avro::AvroReader::Register(); + auto open_result = iceberg::ReaderFactoryRegistry::Open( + iceberg::FileFormatType::kAvro, {.path = "non-existing-file.avro"}); + if (!open_result.has_value()) { + std::cerr << "Failed to open avro file" << std::endl; + return 1; + } return 0; } diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index a3a6cf56..4fb0d542 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -20,7 +20,6 @@ set(ICEBERG_INCLUDES "$" set(ICEBERG_SOURCES arrow_c_data_internal.cc catalog/in_memory_catalog.cc - demo.cc expression/expression.cc expression/literal.cc file_reader.cc @@ -94,9 +93,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES - arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc - avro/demo_avro.cc avro/avro_data_util.cc avro/avro_reader.cc avro/avro_schema_util.cc diff --git a/src/iceberg/arrow/demo_arrow.cc b/src/iceberg/arrow/demo_arrow.cc deleted file mode 100644 index 1e5de284..00000000 --- a/src/iceberg/arrow/demo_arrow.cc +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/arrow/demo_arrow.h" - -#include -#include - -#include "iceberg/demo.h" - -namespace iceberg::arrow { - -std::string DemoArrow::print() const { - return Demo().print() + ", Arrow version: " + ::arrow::GetBuildInfo().version_string + - ", Parquet version: " + CREATED_BY_VERSION; -} - -} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/demo_arrow.h b/src/iceberg/arrow/demo_arrow.h deleted file mode 100644 index c50429e8..00000000 --- a/src/iceberg/arrow/demo_arrow.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include - -#include "iceberg/demo.h" -#include "iceberg/iceberg_bundle_export.h" - -namespace iceberg::arrow { - -class ICEBERG_BUNDLE_EXPORT DemoArrow : public Demo { - public: - DemoArrow() = default; - ~DemoArrow() override = default; - std::string print() const override; -}; - -} // namespace iceberg::arrow diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index e3b4b063..16ac41a6 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -419,14 +419,13 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, const SchemaField& projected_field, ::arrow::ArrayBuilder* array_builder) { if (avro_node->type() == ::avro::AVRO_UNION) { - const auto& union_datum = avro_datum.value<::avro::GenericUnion>(); - size_t branch = union_datum.currentBranch(); + size_t branch = avro_datum.unionBranch(); if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); return {}; } else { - return AppendFieldToBuilder(avro_node->leafAt(branch), union_datum.datum(), - projection, projected_field, array_builder); + return AppendFieldToBuilder(avro_node->leafAt(branch), avro_datum, projection, + projected_field, array_builder); } } diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 5b663299..0ec6d086 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -76,9 +76,14 @@ struct ReadContext { // 1. prune the reader schema based on the projection // 2. read key-value metadata from the avro file // 3. collect basic reader metrics -class AvroBatchReader::Impl { +class AvroReader::Impl { public: Status Open(const ReaderOptions& options) { + // TODO(gangwu): perhaps adding a ReaderOptions::Validate() method + if (options.projection == nullptr) { + return InvalidArgument("Projected schema is required by Avro reader"); + } + batch_size_ = options.batch_size; read_schema_ = options.projection; @@ -106,11 +111,10 @@ class AvroBatchReader::Impl { // Project the read schema on top of the file schema. // TODO(gangwu): support pruning source fields - ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(), + ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(), /*prune_source=*/false)); - base_reader->init(file_schema); reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>( - std::move(base_reader)); + std::move(base_reader), file_schema); if (options.split) { reader_->sync(options.split->offset); @@ -119,7 +123,7 @@ class AvroBatchReader::Impl { return {}; } - Result Next() { + Result> Next() { if (!context_) { ICEBERG_RETURN_UNEXPECTED(InitReadContext()); } @@ -148,6 +152,19 @@ class AvroBatchReader::Impl { return {}; } + Result Schema() { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitReadContext()); + } + ArrowSchema arrow_schema; + auto export_result = ::arrow::ExportSchema(*context_->arrow_schema_, &arrow_schema); + if (!export_result.ok()) { + return InvalidSchema("Failed to export the arrow schema: {}", + export_result.message()); + } + return arrow_schema; + } + private: Status InitReadContext() { context_ = std::make_unique(); @@ -174,9 +191,9 @@ class AvroBatchReader::Impl { return {}; } - Result ConvertBuilderToArrowArray() { + Result> ConvertBuilderToArrowArray() { if (context_->builder_->length() == 0) { - return {}; + return std::nullopt; } auto builder_result = context_->builder_->Finish(); @@ -201,7 +218,7 @@ class AvroBatchReader::Impl { // The end of the split to read and used to terminate the reading. std::optional split_end_; // The schema to read. - std::shared_ptr read_schema_; + std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. SchemaProjection projection_; // The avro reader to read the data into a datum. @@ -210,13 +227,21 @@ class AvroBatchReader::Impl { std::unique_ptr context_; }; -Result AvroBatchReader::Next() { return impl_->Next(); } +Result> AvroReader::Next() { return impl_->Next(); } -Status AvroBatchReader::Open(const ReaderOptions& options) { +Result AvroReader::Schema() { return impl_->Schema(); } + +Status AvroReader::Open(const ReaderOptions& options) { impl_ = std::make_unique(); return impl_->Open(options); } -Status AvroBatchReader::Close() { return impl_->Close(); } +Status AvroReader::Close() { return impl_->Close(); } + +void AvroReader::Register() { + static ReaderFactoryRegistry avro_reader_register( + FileFormatType::kAvro, + []() -> Result> { return std::make_unique(); }); +} } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_reader.h b/src/iceberg/avro/avro_reader.h index dc4cf458..9427f542 100644 --- a/src/iceberg/avro/avro_reader.h +++ b/src/iceberg/avro/avro_reader.h @@ -25,19 +25,22 @@ namespace iceberg::avro { /// \brief A reader that reads ArrowArray from Avro files. -class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader { +class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader { public: - AvroBatchReader() = default; + AvroReader() = default; - ~AvroBatchReader() override = default; + ~AvroReader() override = default; Status Open(const ReaderOptions& options) final; Status Close() final; - Result Next() final; + Result> Next() final; - DataLayout data_layout() const final { return DataLayout::kArrowArray; } + Result Schema() final; + + /// \brief Register this Avro reader implementation. + static void Register(); private: class Impl; diff --git a/src/iceberg/avro/demo_avro.cc b/src/iceberg/avro/demo_avro.cc deleted file mode 100644 index 679b4966..00000000 --- a/src/iceberg/avro/demo_avro.cc +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/avro/demo_avro.h" - -#include - -#include "avro/Compiler.hh" -#include "avro/ValidSchema.hh" -#include "iceberg/demo.h" - -namespace iceberg::avro { - -std::string DemoAvro::print() const { - std::string input = - "{\n\ - \"type\": \"record\",\n\ - \"name\": \"testrecord\",\n\ - \"fields\": [\n\ - {\n\ - \"name\": \"testbytes\",\n\ - \"type\": \"bytes\",\n\ - \"default\": \"\"\n\ - }\n\ - ]\n\ -}\n\ -"; - - ::avro::ValidSchema schema = ::avro::compileJsonSchemaFromString(input); - std::ostringstream actual; - schema.toJson(actual); - - return actual.str(); -} - -Result DemoAvroReader::Next() { return std::monostate(); } - -Reader::DataLayout DemoAvroReader::data_layout() const { - return Reader::DataLayout::kStructLike; -} - -Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; } - -Status DemoAvroReader::Close() { return {}; } - -ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result> { - return std::make_unique(); -}); - -} // namespace iceberg::avro diff --git a/src/iceberg/avro/demo_avro.h b/src/iceberg/avro/demo_avro.h deleted file mode 100644 index 14cb2fa8..00000000 --- a/src/iceberg/avro/demo_avro.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include - -#include "iceberg/avro.h" -#include "iceberg/file_reader.h" -#include "iceberg/iceberg_bundle_export.h" - -namespace iceberg::avro { - -class ICEBERG_BUNDLE_EXPORT DemoAvro : public Avro { - public: - DemoAvro() = default; - ~DemoAvro() override = default; - std::string print() const override; -}; - -class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader { - public: - DemoAvroReader() = default; - ~DemoAvroReader() override = default; - Status Open(const ReaderOptions& options) override; - Status Close() override; - Result Next() override; - DataLayout data_layout() const override; -}; - -} // namespace iceberg::avro diff --git a/src/iceberg/demo.cc b/src/iceberg/demo.cc deleted file mode 100644 index aa1835b2..00000000 --- a/src/iceberg/demo.cc +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/demo.h" - -#include "iceberg/avro.h" // include to export symbols -#include "iceberg/catalog.h" -#include "iceberg/file_io.h" -#include "iceberg/location_provider.h" -#include "iceberg/table.h" -#include "iceberg/transaction.h" - -namespace iceberg { - -std::string Demo::print() const { return "Demo"; } - -} // namespace iceberg diff --git a/src/iceberg/demo.h b/src/iceberg/demo.h deleted file mode 100644 index 7e810f9c..00000000 --- a/src/iceberg/demo.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include - -#include "iceberg/iceberg_export.h" - -namespace iceberg { - -class ICEBERG_EXPORT Demo { - public: - Demo() = default; - virtual ~Demo() = default; - - virtual std::string print() const; -}; - -} // namespace iceberg diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc index a56c30cc..7804dc3b 100644 --- a/src/iceberg/file_reader.cc +++ b/src/iceberg/file_reader.cc @@ -59,13 +59,4 @@ Result> ReaderFactoryRegistry::Open( return reader; } -StructLikeReader::StructLikeReader(std::unique_ptr reader) - : reader_(std::move(reader)) {} - -Result StructLikeReader::Next() { return NotImplemented(""); } - -BatchReader::BatchReader(std::unique_ptr reader) : reader_(std::move(reader)) {} - -Result BatchReader::Next() { return NotImplemented(""); } - } // namespace iceberg diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index ca46cdcc..6fc75225 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -25,7 +25,6 @@ #include #include #include -#include #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" @@ -50,66 +49,11 @@ class ICEBERG_EXPORT Reader { /// \brief Read next data from file. /// - /// \return std::monostate if the reader has no more data, otherwise `ArrowArray` or - /// `StructLike` depending on the data layout by the reader implementation. - using Data = - std::variant>; - virtual Result Next() = 0; + /// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`. + virtual Result> Next() = 0; - enum class DataLayout { kArrowArray, kStructLike }; - - /// \brief Get the data layout returned by `Next()` of the reader. - virtual DataLayout data_layout() const = 0; -}; - -/// \brief Wrapper of `Reader` to always return `StructLike`. -/// -/// If the data layout of the wrapped reader is `ArrowArray`, the data will be converted -/// to `StructLike`; otherwise, the data will be returned as is without any cost. -class ICEBERG_EXPORT StructLikeReader : public Reader { - public: - explicit StructLikeReader(std::unique_ptr reader); - - ~StructLikeReader() override = default; - - /// \brief Always read data into `StructLike` or monostate if no more data. - Result Next() final; - - DataLayout data_layout() const final { return DataLayout::kStructLike; } - - Status Open(const struct ReaderOptions& options) final { - return reader_->Open(options); - } - - Status Close() final { return reader_->Close(); } - - private: - std::unique_ptr reader_; -}; - -/// \brief Wrapper of `Reader` to always return `ArrowArray`. -/// -/// If the data layout of the wrapped reader is `StructLike`, the data will be converted -/// to `ArrowArray`; otherwise, the data will be returned as is without any cost. -class ICEBERG_EXPORT BatchReader : public Reader { - public: - explicit BatchReader(std::unique_ptr reader); - - ~BatchReader() override = default; - - /// \brief Always read data into `ArrowArray` or monostate if no more data. - Result Next() final; - - DataLayout data_layout() const final { return DataLayout::kArrowArray; } - - Status Open(const struct ReaderOptions& options) final { - return reader_->Open(options); - } - - Status Close() final { return reader_->Close(); } - - private: - std::unique_ptr reader_; + /// \brief Get the schema of the data. + virtual Result Schema() = 0; }; /// \brief A split of the file to read. @@ -122,6 +66,8 @@ struct ICEBERG_EXPORT Split { /// \brief Options for creating a reader. struct ICEBERG_EXPORT ReaderOptions { + static constexpr int64_t kDefaultBatchSize = 4096; + /// \brief The path to the file to read. std::string path; /// \brief The total length of the file. @@ -130,12 +76,12 @@ struct ICEBERG_EXPORT ReaderOptions { std::optional split; /// \brief The batch size to read. Only applies to implementations that support /// batching. - int64_t batch_size; + int64_t batch_size = kDefaultBatchSize; /// \brief FileIO instance to open the file. Reader implementations should down cast it /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses /// `ArrowFileSystemFileIO` as the default implementation. std::shared_ptr io; - /// \brief The projection schema to read from the file. + /// \brief The projection schema to read from the file. This field is required. std::shared_ptr projection; /// \brief The filter to apply to the data. Reader implementations may ignore this if /// the file format does not support filtering. @@ -160,9 +106,4 @@ struct ICEBERG_EXPORT ReaderFactoryRegistry { const ReaderOptions& options); }; -/// \brief Macro to register a reader factory for a specific file format. -#define ICEBERG_REGISTER_READER_FACTORY(format_type, reader_factory) \ - static ::iceberg::ReaderFactoryRegistry register_reader_factory_##format_type( \ - ::iceberg::FileFormatType::k##format_type, reader_factory); - } // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 6b81eb9b..8827ec79 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -37,7 +37,7 @@ class ICEBERG_EXPORT ManifestReader { virtual Result>> Entries() const = 0; private: - std::unique_ptr reader_; + std::unique_ptr reader_; }; /// \brief Read manifest files from a manifest list file. @@ -46,7 +46,7 @@ class ICEBERG_EXPORT ManifestListReader { virtual Result>> Files() const = 0; private: - std::unique_ptr reader_; + std::unique_ptr reader_; }; } // namespace iceberg diff --git a/test/avro_test.cc b/test/avro_test.cc index 01ac8211..ff39ebbf 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -17,43 +17,147 @@ * under the License. */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include -#include +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/avro/avro_reader.h" +#include "iceberg/schema.h" +#include "iceberg/type.h" #include "matchers.h" +#include "temp_file_test_base.h" namespace iceberg::avro { -TEST(AVROTest, TestDemoAvro) { - std::string expected = - "{\n\ - \"type\": \"record\",\n\ - \"name\": \"testrecord\",\n\ - \"fields\": [\n\ - {\n\ - \"name\": \"testbytes\",\n\ - \"type\": \"bytes\",\n\ - \"default\": \"\"\n\ - }\n\ - ]\n\ -}\n\ -"; - - auto avro = iceberg::avro::DemoAvro(); - EXPECT_EQ(avro.print(), expected); +class AvroReaderTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { AvroReader::Register(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + file_io_ = std::make_shared(local_fs_); + temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro"); + } + + void CreateSimpleAvroFile() { + const std::string avro_schema_json = R"({ + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "id", "type": "int", "field-id": 1}, + {"name": "name", "type": ["null", "string"], "field-id": 2} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + const std::vector> test_data = { + {1, "Alice"}, {2, "Bob"}, {3, "Charlie"}}; + + ::avro::DataFileWriter<::avro::GenericDatum> writer(temp_avro_file_.c_str(), + avro_schema); + for (const auto& [id, name] : test_data) { + ::avro::GenericDatum datum(avro_schema.root()); + auto& record = datum.value<::avro::GenericRecord>(); + record.fieldAt(0).value() = id; + record.fieldAt(1).selectBranch(1); // non-null + record.fieldAt(1).value() = name; + writer.write(datum); + } + writer.close(); + } + + void VerifyNextBatch(Reader& reader, std::string_view expected_json) { + // Boilerplate to get Arrow schema + auto schema_result = reader.Schema(); + ASSERT_THAT(schema_result, IsOk()); + auto arrow_c_schema = std::move(schema_result.value()); + auto import_schema_result = ::arrow::ImportType(&arrow_c_schema); + auto arrow_schema = import_schema_result.ValueOrDie(); + + // Boilerplate to get Arrow array + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_TRUE(data.value().has_value()); + auto arrow_c_array = data.value().value(); + auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema); + auto arrow_array = data_result.ValueOrDie(); + + // Verify data + auto expected_array = + ::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie(); + ASSERT_TRUE(arrow_array->Equals(*expected_array)); + } + + void VerifyExhausted(Reader& reader) { + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_FALSE(data.value().has_value()); + } + + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; + std::string temp_avro_file_; +}; + +TEST_F(AvroReaderTest, ReadTwoFields) { + CreateSimpleAvroFile(); + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared())}); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +TEST_F(AvroReaderTest, ReadReorderedFieldsWithNulls) { + CreateSimpleAvroFile(); + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", std::make_shared()), + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(3, "score", std::make_shared())}); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch( + *reader, R"([["Alice", 1, null], ["Bob", 2, null], ["Charlie", 3, null]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } -TEST(AVROTest, TestDemoAvroReader) { - auto result = ReaderFactoryRegistry::Open(FileFormatType::kAvro, {}); - ASSERT_THAT(result, IsOk()); +TEST_F(AvroReaderTest, ReadWithBatchSize) { + CreateSimpleAvroFile(); + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared())}); - auto reader = std::move(result.value()); - ASSERT_EQ(reader->data_layout(), Reader::DataLayout::kStructLike); + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .batch_size = 2, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); - auto data = reader->Next(); - ASSERT_THAT(data, IsOk()); - ASSERT_TRUE(std::holds_alternative(data.value())); + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, R"([[1], [2]])")); + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, R"([[3]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } } // namespace iceberg::avro