Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Parquet] Add variant type #45375

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,629 changes: 1,819 additions & 810 deletions cpp/src/generated/parquet_types.cpp

Large diffs are not rendered by default.

1,050 changes: 151 additions & 899 deletions cpp/src/generated/parquet_types.h

Large diffs are not rendered by default.

523 changes: 288 additions & 235 deletions cpp/src/generated/parquet_types.tcc

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ set(PARQUET_SRCS
arrow/reader_internal.cc
arrow/schema.cc
arrow/schema_internal.cc
arrow/variant.cc
arrow/writer.cc
bloom_filter.cc
bloom_filter_reader.cc
Expand Down Expand Up @@ -399,7 +400,8 @@ add_parquet_test(arrow-test
arrow/arrow_metadata_test.cc
arrow/arrow_reader_writer_test.cc
arrow/arrow_schema_test.cc
arrow/arrow_statistics_test.cc)
arrow/arrow_statistics_test.cc
arrow/variant_test.cc)

add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
arrow/reconstruct_internal_test.cc)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
::arrow::int64()},
{"json", LogicalType::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::utf8()},
{"bson", LogicalType::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"variant", LogicalType::Variant(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"interval", LogicalType::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12,
::arrow::fixed_size_binary(12)},
{"uuid", LogicalType::UUID(), ParquetType::FIXED_LEN_BYTE_ARRAY, 16,
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/arrow/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Result<std::shared_ptr<ArrowType>> FromByteArray(
case LogicalType::Type::NONE:
case LogicalType::Type::ENUM:
case LogicalType::Type::BSON:
case LogicalType::Type::VARIANT:
return ::arrow::binary();
case LogicalType::Type::JSON:
if (reader_properties.get_arrow_extensions_enabled()) {
Expand Down
84 changes: 84 additions & 0 deletions cpp/src/parquet/arrow/variant.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/arrow/variant.h"

#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/logging.h"

namespace parquet::arrow {

using ::arrow::Status;
using ::arrow::Type;

bool isBinary(Type::type type) {
return type == Type::BINARY || type == Type::LARGE_BINARY;
}

bool VariantExtensionType::ExtensionEquals(const ExtensionType& other) const {
// TODO(neilechao)
return false;
}

Result<std::shared_ptr<DataType>> VariantExtensionType::Deserialize(
std::shared_ptr<DataType> storage_type, const std::string& serialized) const {
return VariantExtensionType::Make(std::move(storage_type));
}

std::string VariantExtensionType::Serialize() const { return ""; }

std::shared_ptr<Array> VariantExtensionType::MakeArray(
std::shared_ptr<ArrayData> data) const {
DCHECK_EQ(data->type->id(), Type::EXTENSION);
DCHECK_EQ("parquet.variant",
::arrow::internal::checked_cast<const ExtensionType&>(*data->type)
.extension_name());
return std::make_shared<::arrow::ExtensionArray>(data);
}

bool VariantExtensionType::IsSupportedStorageType(
std::shared_ptr<DataType> storage_type) {
if (storage_type->id() == Type::STRUCT) {
// TODO(neilechao) assertions for binary types, and non-nullable first field for
// metadata
return storage_type->num_fields() >= 2;
}

return false;
}

Result<std::shared_ptr<DataType>> VariantExtensionType::Make(
std::shared_ptr<DataType> storage_type) {
if (!IsSupportedStorageType(storage_type)) {
return Status::Invalid(
"Invalid storage type for VariantExtensionType, must be struct with binary "
"metadata, value, and typed_value fields: ",
storage_type->ToString());
}
return std::make_shared<VariantExtensionType>(std::move(storage_type));
}

std::shared_ptr<DataType> variant(std::shared_ptr<DataType> storage_type) {
return VariantExtensionType::Make(std::move(storage_type)).ValueOrDie();
}

} // namespace parquet::arrow
64 changes: 64 additions & 0 deletions cpp/src/parquet/arrow/variant.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdexcept>
#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"

namespace parquet::arrow {

using ::arrow::Array;
using ::arrow::ArrayData;
using ::arrow::DataType;
using ::arrow::ExtensionType;
using ::arrow::Result;

class ARROW_EXPORT VariantExtensionType : public ExtensionType {
public:
explicit VariantExtensionType(const std::shared_ptr<DataType>& storage_type)
: ExtensionType(storage_type), storage_type_(storage_type) {}

std::string extension_name() const override { return "parquet.variant"; }

bool ExtensionEquals(const ExtensionType& other) const override;

Result<std::shared_ptr<DataType>> Deserialize(
std::shared_ptr<DataType> storage_type,
const std::string& serialized_data) const override;

std::string Serialize() const override;

std::shared_ptr<Array> MakeArray(std::shared_ptr<ArrayData> data) const override;

static Result<std::shared_ptr<DataType>> Make(std::shared_ptr<DataType> storage_type);

static bool IsSupportedStorageType(std::shared_ptr<DataType> storage_type);

private:
std::shared_ptr<DataType> storage_type_;
};

/// \brief Return a VariantExtensionType instance.
ARROW_EXPORT std::shared_ptr<DataType> variant(std::shared_ptr<DataType> storage_type);

} // namespace parquet::arrow
34 changes: 34 additions & 0 deletions cpp/src/parquet/arrow/variant_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/arrow/variant.h"

#include "arrow/array/validate.h"
#include "arrow/ipc/test_common.h"
#include "arrow/record_batch.h"
#include "arrow/testing/gtest_util.h"
#include "parquet/exception.h"

namespace arrow {

using arrow::ipc::test::RoundtripBatch;

class TestVariantExtensionType : public ::testing::Test {};

TEST_F(TestVariantExtensionType, VariantRoundtrip) { ASSERT_TRUE(false); }

} // namespace arrow
10 changes: 10 additions & 0 deletions cpp/src/parquet/parquet.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ struct JsonType {
struct BsonType {
}

/**
* Embedded Variant logical type annotation
*
* Allowed for physical types: BINARY
*/
struct VariantType {
}

/**
* LogicalType annotations to replace ConvertedType.
*
Expand Down Expand Up @@ -404,6 +412,7 @@ union LogicalType {
13: BsonType BSON // use ConvertedType BSON
14: UUIDType UUID // no compatible ConvertedType
15: Float16Type FLOAT16 // no compatible ConvertedType
16: VariantType VARIANT // no compatible ConvertedType
}

/**
Expand Down Expand Up @@ -952,6 +961,7 @@ union ColumnOrder {
* ENUM - unsigned byte-wise comparison
* LIST - undefined
* MAP - undefined
* VARIANT - undefined
*
* In the absence of logical types, the sort order is determined by the physical type:
* BOOLEAN - false, true
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/parquet/schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,9 @@ TEST(TestLogicalTypeConstruction, NewTypeIncompatibility) {
auto check_is_float16 = [](const std::shared_ptr<const LogicalType>& logical_type) {
return logical_type->is_float16();
};
auto check_is_variant = [](const std::shared_ptr<const LogicalType>& logical_type) {
return logical_type->is_variant();
};
auto check_is_null = [](const std::shared_ptr<const LogicalType>& logical_type) {
return logical_type->is_null();
};
Expand All @@ -1163,6 +1166,7 @@ TEST(TestLogicalTypeConstruction, NewTypeIncompatibility) {
std::vector<ConfirmNewTypeIncompatibilityArguments> cases = {
{LogicalType::UUID(), check_is_UUID},
{LogicalType::Float16(), check_is_float16},
{LogicalType::Variant(), check_is_variant},
{LogicalType::Null(), check_is_null},
{LogicalType::Time(false, LogicalType::TimeUnit::MILLIS), check_is_time},
{LogicalType::Time(false, LogicalType::TimeUnit::MICROS), check_is_time},
Expand Down Expand Up @@ -1247,6 +1251,7 @@ TEST(TestLogicalTypeOperation, LogicalTypeProperties) {
{BSONLogicalType::Make(), false, true, true},
{UUIDLogicalType::Make(), false, true, true},
{Float16LogicalType::Make(), false, true, true},
{VariantLogicalType::Make(), false, true, true},
{NoLogicalType::Make(), false, false, true},
};

Expand Down Expand Up @@ -1544,6 +1549,7 @@ TEST(TestLogicalTypeOperation, LogicalTypeRepresentation) {
{LogicalType::BSON(), "BSON", R"({"Type": "BSON"})"},
{LogicalType::UUID(), "UUID", R"({"Type": "UUID"})"},
{LogicalType::Float16(), "Float16", R"({"Type": "Float16"})"},
{LogicalType::Variant(), "Variant", R"({"Type": "Variant"})"},
{LogicalType::None(), "None", R"({"Type": "None"})"},
};

Expand Down Expand Up @@ -1594,6 +1600,7 @@ TEST(TestLogicalTypeOperation, LogicalTypeSortOrder) {
{LogicalType::BSON(), SortOrder::UNSIGNED},
{LogicalType::UUID(), SortOrder::UNSIGNED},
{LogicalType::Float16(), SortOrder::SIGNED},
{LogicalType::Variant(), SortOrder::UNKNOWN},
{LogicalType::None(), SortOrder::UNKNOWN}};

for (const ExpectedSortOrder& c : cases) {
Expand Down Expand Up @@ -1735,6 +1742,14 @@ TEST(TestSchemaNodeCreation, FactoryExceptions) {
Float16LogicalType::Make(),
Type::FIXED_LEN_BYTE_ARRAY, 3));

// Incompatible primitive type ...
ASSERT_ANY_THROW(PrimitiveNode::Make("variant", Repetition::REQUIRED,
VariantLogicalType::Make(), Type::DOUBLE));
// Incompatible primitive type ...
ASSERT_ANY_THROW(PrimitiveNode::Make("variant", Repetition::REQUIRED,
VariantLogicalType::Make(),
Type::FIXED_LEN_BYTE_ARRAY, 2));

// Non-positive length argument for fixed length binary ...
ASSERT_ANY_THROW(PrimitiveNode::Make("negative_length", Repetition::REQUIRED,
NoLogicalType::Make(), Type::FIXED_LEN_BYTE_ARRAY,
Expand Down Expand Up @@ -1928,6 +1943,8 @@ TEST_F(TestSchemaElementConstruction, SimpleCases) {
{"float16", LogicalType::Float16(), Type::FIXED_LEN_BYTE_ARRAY, 2, false,
ConvertedType::NA, true,
[this]() { return element_->logicalType.__isset.FLOAT16; }},
{"variant", LogicalType::Variant(), Type::BYTE_ARRAY, -1, false, ConvertedType::NA,
true, [this]() { return element_->logicalType.__isset.VARIANT; }},
{"none", LogicalType::None(), Type::INT64, -1, false, ConvertedType::NA, false,
check_nothing}};

Expand Down Expand Up @@ -2265,6 +2282,7 @@ TEST(TestLogicalTypeSerialization, Roundtrips) {
{LogicalType::BSON(), Type::BYTE_ARRAY, -1},
{LogicalType::UUID(), Type::FIXED_LEN_BYTE_ARRAY, 16},
{LogicalType::Float16(), Type::FIXED_LEN_BYTE_ARRAY, 2},
{LogicalType::Variant(), Type::BYTE_ARRAY, -1},
{LogicalType::None(), Type::BOOLEAN, -1}};

for (const AnnotatedPrimitiveNodeFactoryArguments& c : cases) {
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/parquet/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ std::shared_ptr<const LogicalType> LogicalType::FromThrift(
return UUIDLogicalType::Make();
} else if (type.__isset.FLOAT16) {
return Float16LogicalType::Make();
} else if (type.__isset.VARIANT) {
return VariantLogicalType::Make();
} else {
throw ParquetException("Metadata contains Thrift LogicalType that is not recognized");
}
Expand Down Expand Up @@ -536,6 +538,10 @@ std::shared_ptr<const LogicalType> LogicalType::Float16() {
return Float16LogicalType::Make();
}

std::shared_ptr<const LogicalType> LogicalType::Variant() {
return VariantLogicalType::Make();
}

std::shared_ptr<const LogicalType> LogicalType::None() { return NoLogicalType::Make(); }

/*
Expand Down Expand Up @@ -618,6 +624,7 @@ class LogicalType::Impl {
class BSON;
class UUID;
class Float16;
class Variant;
class No;
class Undefined;

Expand Down Expand Up @@ -690,6 +697,9 @@ bool LogicalType::is_UUID() const { return impl_->type() == LogicalType::Type::U
bool LogicalType::is_float16() const {
return impl_->type() == LogicalType::Type::FLOAT16;
}
bool LogicalType::is_variant() const {
return impl_->type() == LogicalType::Type::VARIANT;
}
bool LogicalType::is_none() const { return impl_->type() == LogicalType::Type::NONE; }
bool LogicalType::is_valid() const {
return impl_->type() != LogicalType::Type::UNDEFINED;
Expand Down Expand Up @@ -1619,6 +1629,22 @@ class LogicalType::Impl::Float16 final : public LogicalType::Impl::Incompatible,

GENERATE_MAKE(Float16)

class LogicalType::Impl::Variant final : public LogicalType::Impl::Incompatible,
public LogicalType::Impl::SimpleApplicable {
public:
friend class VariantLogicalType;

OVERRIDE_TOSTRING(Variant)
OVERRIDE_TOTHRIFT(VariantType, VARIANT)

private:
Variant()
: LogicalType::Impl(LogicalType::Type::VARIANT, SortOrder::UNKNOWN),
LogicalType::Impl::SimpleApplicable(parquet::Type::BYTE_ARRAY) {}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac @emkornfield - I initially had Variant inherit from SimpleApplicable(BYTE_ARRAY), but Variant should actually be composed of two separate byte arrays - one for metadata (the dict) and one for values. This muddies the applicability of VariantLogicalType to a single parquet type.

parquet column-size variant_basic.parquet VARIANT_COL.value-> Size In Bytes: 69 Size In Ratio: 0.52671754 VARIANT_COL.metadata-> Size In Bytes: 62 Size In Ratio: 0.47328246

  1. One possibility is to create separate VariantMetadataLogicalType and VariantValueLogicalType, with VariantLogicalType containing both as class members. The pros are that this reflects the storage in Parquet, where metadata and values are stored in separate columns, and the cons are that this diverges from parquet.thrift and potentially the other language implementations
  2. Other options would be to have VariantMetadata and VariantValue present but not as logical types

What are your thoughts on these approaches?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, VariantLogicalType should be similar to MapLogicalType and ListLogicalType which annotate group type. Though it (the unshredded form) is composed of two separate byte arrays, these two types are under the same group type as below. Can we model it as a struct<binary,binary>?

optional group variant_name (VARIANT) {
  required binary metadata;
  required binary value;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac - I updated the thrift definition of variant to include required metadata and value binary members, and I passed the metadata and value into VariantLogicalType / LogicalType::Impl::Variant to populate the required helper methods.

Afterwards, I saw that Maps and Lists take a different approach - they appear to have barebones MapLogicalType and ListLogicalTypes respectively, and I don't see their structure defined clearly in parquet.thrift. It looks like their structure is listed in LogicalTypes.md and referencing again in some tests.

What's the difference between defining the members in the parquet.thrift strict versus using GroupNodes built on top of PrimitiveNodes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I may have not explained it clearly.

Parquet has two different kind of nodes: primitive and group. primitive node is usually for a primitive physical type (e.g. int64, double, binary, etc.) while group is for a complex type (e.g. struct, map, list, etc.). Whatever the node type is, it occupies a SchemaElement in the Parquet schema: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L502

LogicalType can be assigned to both primitive and group node. Most logical types annotate a primitive node, examples are DECIMAL, TIMESTAMP, etc. However, List/Map/Variant are logical types that must annotate a group node.

For List and Map, their group structures are dynamic because of different subtypes (the element type of a List or the key/value types of a Map). For Variant, its group structure is also dynamic depending on whether it is shredded and shredded value types.

I updated the thrift definition of variant to include required metadata and value binary members

Based on above explanation, we cannot modify parquet.thrift. To facilitate the current development, maybe we can define a VariantExtensionType similar to implementations at https://github.com/apache/arrow/tree/01e3f1e6829d6fcc9021ac47aebb6350590ca134/cpp/src/arrow/extension with a storage type of struct<metadata:binary,value:binary>. Once stable, we can make it canonical following the procedure at https://arrow.apache.org/docs/format/CanonicalExtensions.html

Do you have any opinion? @emkornfield @pitrou @mapleFU

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I create a VariantExtensionType with storage type struct<metadata:binary, value:binary>, will the Parquet Reader and Writer break down the struct members (metadata and value) into separate columns for reading and writing?

Just double checking, since the documentation on Arrow Extension type writing Parquet files says "An Arrow Extension type is written out as its storage type", and I want to make sure that separate columns for metadata and binary are read and written.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these two binary columns are processed individually by the Parquet writer and reader. You might want to implement a VariantExtensionArray on top of the StructArray to restore the variant-typed values.

};

GENERATE_MAKE(Variant)

class LogicalType::Impl::No final : public LogicalType::Impl::SimpleCompatible,
public LogicalType::Impl::UniversalApplicable {
public:
Expand Down
Loading