Skip to content

feat: avro support applying field-ids based on name mapping #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/name_mapping.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -96,16 +97,33 @@ class AvroReader::Impl {
// Create a base reader without setting reader schema to enable projection.
auto base_reader =
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();
::avro::ValidSchema file_schema = base_reader->dataSchema();

// Validate field ids in the file schema.
HasIdVisitor has_id_visitor;
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));

if (has_id_visitor.HasNoIds()) {
// TODO(gangwu): support applying field-ids based on name mapping
return NotImplemented("Avro file schema has no field IDs");
}
if (!has_id_visitor.AllHaveIds()) {
// Apply field IDs based on name mapping if available
if (options.name_mapping) {
MappedField mapped_field;
// Convert NameMapping to MappedFields for nested mapping
mapped_field.nested_mapping =
std::make_shared<MappedFields>(options.name_mapping->AsMappedFields());
ICEBERG_ASSIGN_OR_RAISE(
auto new_root_node,
CreateAvroNodeWithFieldIds(file_schema.root(), mapped_field));

// Create a new schema with the updated root node
auto new_schema = ::avro::ValidSchema(new_root_node);

// Update the file schema to use the new schema with field IDs
file_schema = new_schema;
} else {
return InvalidSchema(
"Avro file schema has no field IDs and no name mapping provided");
}
} else if (!has_id_visitor.AllHaveIds()) {
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
}

Expand Down
223 changes: 223 additions & 0 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <avro/ValidSchema.hh>

#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/constants.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/name_mapping.h"
#include "iceberg/schema.h"
#include "iceberg/schema_util_internal.h"
#include "iceberg/util/formatter.h"
Expand Down Expand Up @@ -783,4 +785,225 @@ Result<SchemaProjection> Project(const Schema& expected_schema,
return SchemaProjection{std::move(field_projection.children)};
}

namespace {

Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
auto new_record_node = std::make_shared<::avro::NodeRecord>();
new_record_node->setName(original_node->name());

for (size_t i = 0; i < original_node->leaves(); ++i) {
if (i >= original_node->names()) {
return InvalidSchema("Index {} is out of bounds for names (size: {})", i,
original_node->names());
}
const std::string& field_name = original_node->nameAt(i);
if (i >= original_node->leaves()) {
return InvalidSchema("Index {} is out of bounds for leaves (size: {})", i,
original_node->leaves());
}
::avro::NodePtr field_node = original_node->leafAt(i);

// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
// Try to find nested field by name
const MappedField* nested_field = nullptr;
if (field.nested_mapping) {
auto fields_span = field.nested_mapping->fields();
for (const auto& f : fields_span) {
if (f.names.find(field_name) != f.names.end()) {
nested_field = &f;
break;
}
}
}
Comment on lines +809 to +818
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const MappedField* nested_field = nullptr;
if (field.nested_mapping) {
auto fields_span = field.nested_mapping->fields();
for (const auto& f : fields_span) {
if (f.names.find(field_name) != f.names.end()) {
nested_field = &f;
break;
}
}
}
const MappedField* nested_field = nullptr;
auto field_id_opt = field.Id(field_name);
if (field_id_opt.has_value()) {
nested_field = &(field.Field(field_id_opt.value()).value().get());
}

This is not a required suggestion but I just found that the refactoring above does the same thing. Perhaps we should extend MappedFields by adding a new function to return a std::optional<MappedFieldConstRef> by finding a field name.


if (nested_field) {
// Check if field_id is present
if (!nested_field->field_id.has_value()) {
return InvalidSchema("Field ID is missing for field '{}' in nested mapping",
field_name);
}

// Preserve existing custom attributes for this field
::avro::CustomAttributes attributes;
if (i < original_node->customAttributes()) {
// Copy all existing attributes from the original node
const auto& original_attrs = original_node->customAttributesAt(i);
const auto& existing_attrs = original_attrs.attributes();
for (const auto& attr_pair : existing_attrs) {
Comment on lines +831 to +833
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const auto& original_attrs = original_node->customAttributesAt(i);
const auto& existing_attrs = original_attrs.attributes();
for (const auto& attr_pair : existing_attrs) {
for (const auto& attr_pair : original_node->customAttributesAt(i).attributes()) {

attributes.addAttribute(attr_pair.first, attr_pair.second, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
attributes.addAttribute(attr_pair.first, attr_pair.second, /*addQuote=*/false);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better readability.

}
}

// Add field ID attribute to the new node (preserving existing attributes)
attributes.addAttribute(std::string(kFieldIdProp),
std::to_string(nested_field->field_id.value()), false);

new_record_node->addCustomAttributesForField(attributes);

// Recursively apply field IDs to nested fields
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
CreateAvroNodeWithFieldIds(field_node, *nested_field));
new_record_node->addName(field_name);
new_record_node->addLeaf(new_nested_node);
} else {
// If no nested field found, this is an error
return InvalidSchema("Field '{}' not found in nested mapping", field_name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is much better to return error immediately after we fail to find the field id. Then we don't need the large if branch at line 817.

}
}

return new_record_node;
}

Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
if (original_node->leaves() != 1) {
return InvalidSchema("Array type must have exactly one leaf");
}

auto new_array_node = std::make_shared<::avro::NodeArray>();

// Check if this is a map represented as array
if (HasMapLogicalType(original_node)) {
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
CreateAvroNodeWithFieldIds(original_node->leafAt(0), field));
new_array_node->addLeaf(new_element_node);
return new_array_node;
}

// For regular arrays, try to find element field ID from nested mapping
const MappedField* element_field = nullptr;
if (field.nested_mapping) {
auto fields_span = field.nested_mapping->fields();
for (const auto& f : fields_span) {
if (f.names.find(std::string(kElement)) != f.names.end()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to find the field name of a map or array? IIUC, the field name mapping should only apply to struct fields. We can blindly use the child nested_mapping.

element_field = &f;
break;
}
}
}

if (element_field) {
// Check if field_id is present
if (!element_field->field_id.has_value()) {
return InvalidSchema("Field ID is missing for element field in array");
}

ICEBERG_ASSIGN_OR_RAISE(
auto new_element_node,
CreateAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
new_array_node->addLeaf(new_element_node);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we add element_field->field_id to the custom attributes?

} else {
// If no element field found, this is an error
return InvalidSchema("Element field not found in nested mapping for array");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

}

return new_array_node;
}

Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
if (original_node->leaves() != 2) {
return InvalidSchema("Map type must have exactly two leaves");
}

auto new_map_node = std::make_shared<::avro::NodeMap>();

// For map types, we use fixed field IDs for key and value
// Key field gets field ID 0, value field gets field ID 1
constexpr int32_t kMapKeyFieldId = 0;
constexpr int32_t kMapValueFieldId = 1;

// Create key field with fixed field ID
MappedField key_field;
key_field.field_id = kMapKeyFieldId;
key_field.nested_mapping =
field.nested_mapping; // Pass through nested mapping for complex key types

// Create value field with fixed field ID
MappedField value_field;
value_field.field_id = kMapValueFieldId;
value_field.nested_mapping =
field.nested_mapping; // Pass through nested mapping for complex value types

// Add key and value nodes
ICEBERG_ASSIGN_OR_RAISE(
auto new_key_node, CreateAvroNodeWithFieldIds(original_node->leafAt(0), key_field));
ICEBERG_ASSIGN_OR_RAISE(
auto new_value_node,
CreateAvroNodeWithFieldIds(original_node->leafAt(1), value_field));
new_map_node->addLeaf(new_key_node);
new_map_node->addLeaf(new_value_node);

return new_map_node;
}

Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& field) {
if (original_node->leaves() != 2) {
return InvalidSchema("Union type must have exactly two branches");
}

const auto& branch_0 = original_node->leafAt(0);
const auto& branch_1 = original_node->leafAt(1);

bool branch_0_is_null = (branch_0->type() == ::avro::AVRO_NULL);
bool branch_1_is_null = (branch_1->type() == ::avro::AVRO_NULL);

if (branch_0_is_null && !branch_1_is_null) {
// branch_0 is null, branch_1 is not null
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
CreateAvroNodeWithFieldIds(branch_1, field));
auto new_union_node = std::make_shared<::avro::NodeUnion>();
new_union_node->addLeaf(branch_0); // null branch
new_union_node->addLeaf(new_branch_1);
return new_union_node;
} else if (!branch_0_is_null && branch_1_is_null) {
// branch_0 is not null, branch_1 is null
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
CreateAvroNodeWithFieldIds(branch_0, field));
auto new_union_node = std::make_shared<::avro::NodeUnion>();
new_union_node->addLeaf(new_branch_0);
new_union_node->addLeaf(branch_1); // null branch
return new_union_node;
} else if (branch_0_is_null && branch_1_is_null) {
// Both branches are null - this is invalid
return InvalidSchema("Union type cannot have two null branches");
} else {
// Neither branch is null - this is invalid
return InvalidSchema("Union type must have exactly one null branch");
}
}

} // namespace

Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& mapped_field) {
switch (original_node->type()) {
case ::avro::AVRO_RECORD:
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
case ::avro::AVRO_ARRAY:
return CreateArrayNodeWithFieldIds(original_node, mapped_field);
case ::avro::AVRO_MAP:
return CreateMapNodeWithFieldIds(original_node, mapped_field);
case ::avro::AVRO_UNION:
return CreateUnionNodeWithFieldIds(original_node, mapped_field);
case ::avro::AVRO_BOOL:
case ::avro::AVRO_INT:
case ::avro::AVRO_LONG:
case ::avro::AVRO_FLOAT:
case ::avro::AVRO_DOUBLE:
case ::avro::AVRO_STRING:
case ::avro::AVRO_BYTES:
case ::avro::AVRO_FIXED:
// For primitive types, just return a copy
return original_node;
case ::avro::AVRO_NULL:
case ::avro::AVRO_ENUM:
default:
return InvalidSchema("Unsupported Avro type for field ID application: {}",
ToString(original_node));
}
}

} // namespace iceberg::avro
8 changes: 8 additions & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <avro/Node.hh>

#include "iceberg/name_mapping.h"
#include "iceberg/result.h"
#include "iceberg/schema_util.h"
#include "iceberg/type.h"
Expand Down Expand Up @@ -144,4 +145,11 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
/// \return True if the node has a map logical type, false otherwise.
bool HasMapLogicalType(const ::avro::NodePtr& node);

/// \brief Create a new Avro node with field IDs from name mapping.
/// \param original_node The original Avro node to copy.
/// \param mapped_field The mapped field to apply field IDs from.
/// \return A new Avro node with field IDs applied, or an error.
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const MappedField& mapped_field);

} // namespace iceberg::avro
34 changes: 34 additions & 0 deletions src/iceberg/avro/constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <string_view>

namespace iceberg::avro {

// Avro logical type constants
constexpr std::string_view kMapLogicalType = "map";

// Name mapping field constants
constexpr std::string_view kElement = "element";
constexpr std::string_view kKey = "key";
constexpr std::string_view kValue = "value";

} // namespace iceberg::avro
3 changes: 3 additions & 0 deletions src/iceberg/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ struct ICEBERG_EXPORT ReaderOptions {
/// \brief The filter to apply to the data. Reader implementations may ignore this if
/// the file format does not support filtering.
std::shared_ptr<class Expression> filter;
/// \brief Name mapping for schema evolution compatibility. Used when reading files
/// that may have different field names than the current schema.
std::shared_ptr<class NameMapping> name_mapping;
/// \brief Format-specific or implementation-specific properties.
std::unordered_map<std::string, std::string> properties;
};
Expand Down
7 changes: 4 additions & 3 deletions src/iceberg/name_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,23 @@ const std::unordered_map<int32_t, MappedFieldConstRef>& MappedFields::LazyIdToFi
NameMapping::NameMapping(std::unique_ptr<MappedFields> mapping)
: mapping_(std::move(mapping)) {}

std::optional<MappedFieldConstRef> NameMapping::Find(int32_t id) {
std::optional<MappedFieldConstRef> NameMapping::Find(int32_t id) const {
const auto& fields_by_id = LazyFieldsById();
if (auto iter = fields_by_id.find(id); iter != fields_by_id.cend()) {
return iter->second;
}
return std::nullopt;
}

std::optional<MappedFieldConstRef> NameMapping::Find(std::span<const std::string> names) {
std::optional<MappedFieldConstRef> NameMapping::Find(
std::span<const std::string> names) const {
if (names.empty()) {
return std::nullopt;
}
return Find(JoinByDot(names));
}

std::optional<MappedFieldConstRef> NameMapping::Find(const std::string& name) {
std::optional<MappedFieldConstRef> NameMapping::Find(const std::string& name) const {
const auto& fields_by_name = LazyFieldsByName();
if (auto iter = fields_by_name.find(name); iter != fields_by_name.cend()) {
return iter->second;
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/name_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ class ICEBERG_EXPORT NameMapping {
static std::unique_ptr<NameMapping> MakeEmpty();

/// \brief Find a field by its ID.
std::optional<MappedFieldConstRef> Find(int32_t id);
std::optional<MappedFieldConstRef> Find(int32_t id) const;

/// \brief Find a field by its unconcatenated names.
std::optional<MappedFieldConstRef> Find(std::span<const std::string> names);
std::optional<MappedFieldConstRef> Find(std::span<const std::string> names) const;

/// \brief Find a field by its (concatenated) name.
std::optional<MappedFieldConstRef> Find(const std::string& name);
std::optional<MappedFieldConstRef> Find(const std::string& name) const;

/// \brief Get the underlying MappedFields instance.
const MappedFields& AsMappedFields() const;
Expand Down
Loading
Loading