Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Managed IO of Iceberg appends NULL / Unprintable characters for string type columns. #33963

Open
2 of 17 tasks
saathwik-tk opened this issue Feb 12, 2025 · 8 comments
Open
2 of 17 tasks
Assignees
Labels
bug IcebergIO IcebergIO: can only be used through ManagedIO io java P2

Comments

@saathwik-tk
Copy link

saathwik-tk commented Feb 12, 2025

What happened?

Data Ingested via

pipeline.apply(<Source>)
              .apply(JsonToRow.withSchema(mySchema))
              .apply(Managed.write(Managed.ICEBERG).withConfig(myConfig))

Issue is:
After data gets ingested we cannot see the data with the below queries (given that field_name has many values with 'value1')

  • SELECT * FROM catalog_name.namespace.table_name WHERE field_name = 'value1';
  • SELECT * FROM catalog_name.namespace.table_name WHERE field_name like 'value1';

However we can see the data with the below queries

  • SELECT * FROM catalog_name.namespace.table_name WHERE TRIM(field_name) = 'value1'
  • SELECT * FROM catalog_name.namespace.table_name WHERE field_name like '%value1'
  • SELECT * FROM catalog_name.namespace.table_name WHERE field_name like 'value1%'
  • SELECT * FROM catalog_name.namespace.table_name WHERE field_name like '%value1%'

Even though I tried using the below approach, I saw the same issue, but the below approach makes sure that it is not from the source data.

pipeline.apply(<Source>)
              .apply(JsonToRow.withSchema(mySchema)).setCoder(RowCoder.of(mySchema))
              .apply(ParDo.of(new DoFn<Row, Row>() {
                    @ProcessElement
                    public void processFn(@Element Row row, OutputReceiver<Row> out){
                        List<Object> cleanedValues = schema.getFields().stream()
                                .map(field -> {
                                    Object value = row.getValue(field.getName());
                                    if(value instanceof  String){
                                        return ((String) value).trim();
                                    }
                                    return value;
                                })
                                .collect(Collectors.toList());
                        Row trimmedRow = Row.withSchema(mySchema)
                                .addValues(cleanedValues)
                                .build();
                        out.output(trimmedRow);
                    }
                })).setCoder(RowCoder.of(mySchema))
              .apply(Managed.write(Managed.ICEBERG).withConfig(myConfig));

However the issue is not seen in all the string type values, but seen in few not seen in most, some of the strings include '1234567890', '2025-02-12' or any date of this type in a string format.

NOTE:
Use the same reproduction as this
Beam Version: 2.62.0
Iceberg Version: 1.4.2

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad liferoad added IcebergIO IcebergIO: can only be used through ManagedIO and removed awaiting triage labels Feb 12, 2025
@saathwik-tk
Copy link
Author

I also tried using GSON in place of JsonToRow.withSchema() to make sure that ObjectMapper isn't a cause.

@ahmedabu98 Do you have any inputs here?

@ahmedabu98
Copy link
Contributor

Hey @saathwik-tk, I'm having trouble reproducing this one. Here's what I tried:

  1. Create an Iceberg table with BigQueryMetastoreCatalog using this partition spec:
PartitionSpec partitionSpec =
    PartitionSpec.builderFor(ICEBERG_SCHEMA)
        .identity("bool")
        .hour("datetime")
        .truncate("str", "value_x".length())
        .build();
  1. Write rows to Iceberg table
  2. Execute BQ query:
BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
String q =
    String.format(
        "SELECT * FROM `%s.%s` where str = 'value_123'", OPTIONS.getProject(), tableId());
List<TableRow> rowList = bqClient.queryUnflattened(q, OPTIONS.getProject(), true, true);

As expected, I get one record where str = value_123.

I tried the same but without partitioning on "str" and it also works.

@ahmedabu98
Copy link
Contributor

Can you try running a simple pipeline with a fixed row(s)? i.e. something like

writePipeline
        .apply(Create.of(Row.withSchema(...).addValues(...).build()))
        .apply(Managed.write(Managed.ICEBERG).withConfig(...));

@ahmedabu98
Copy link
Contributor

Also are you seeing this for just String types?

@saathwik-tk
Copy link
Author

Yes I see it only for string types.

@saathwik-tk
Copy link
Author

saathwik-tk commented Feb 14, 2025

pipeline.apply(Create.of(Row.withSchema(schemaTest).addValues("2025-02-14",1).build())).setCoder(RowCoder.of(schemaTest))
             .apply(Managed.write(Managed.ICEBERG).withConfig(config));

Just tried the above, resulted the same thing
SELECT * FROM table where id=1 works
SELECT * FROM table where date='2025-02-14' doesn't returns any data.

@ahmedabu98
try this particular kind of strings '2025-02-14', '2025-02-13' ... like this,
just FYI, I'm using Hive Catalog and Trino as a query Engine.

Schema schemaTest = Schema.builder()
                .addStringField("date")
                .addNullableInt32Field("id")
                .build();

@ahmedabu98
Copy link
Contributor

Thanks for doing that. Can you also paste the Schema you're using?

@saathwik-tk
Copy link
Author

Schema schemaTest = Schema.builder()
                .addStringField("date")
                .addNullableInt32Field("id")
                .build();

Ignore the thing that I made it Nullable. (Doesn't actually matter)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug IcebergIO IcebergIO: can only be used through ManagedIO io java P2
Projects
None yet
Development

No branches or pull requests

3 participants