Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mismatch in Data Types Between Partial and Final Stages in Aggregations #1287

Open
andygrove opened this issue Jan 14, 2025 · 1 comment
Open
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Member

Describe the bug

An issue has been identified where the data type of a column in the final stage of an aggregation does not match the data type in the partial stage.

Here is a summary of a discussion between myself and @rluvaton.

This problem arises because:

  • Partial Stage: The column data type is derived from the schema of the first batch during the ScanExec. For instance, the type might be a dictionary of UTF-8.
  • Final Stage: The column data type is determined by converting the Spark type to an Arrow type, resulting in a plain UTF-8 type.

This mismatch causes failures in scenarios where the accumulator expects the same data type in both stages. Specifically:

  • The partial stage may use a special implementation (e.g., for dictionaries of UTF-8) that is incompatible with the final stage.
  • The final stage receives the state fields but applies the wrong accumulator due to mismatched data types.

Steps to Reproduce

  • Use an aggregation function like max on a string column where the data type is dictionary-encoded in the partial stage.
  • Observe that the final stage fails because it expects a plain UTF-8 type.

Possible Solutions Discussed

  • Always unpack string dictionaries during ScanExec. While benchmarks show no significant performance regression, this limits future optimizations.
  • Injecting a CopyExec to unpack dictionaries before final aggregation. However, this approach is infeasible as inputs may include nested types (e.g., list of strings, structs).
  • Saving additional metadata in shuffle files to track dictionary encoding.
  • Adding schema inference earlier in the planning phase to account for dictionary encoding.

Challenges

  • Dictionary encoding varies across batches and files (e.g., in Parquet), and schema inference currently relies on the first batch.
  • The final aggregation stage uses unbound columns based on the original child, not the state fields, leading to type mismatches.

Action Items

  • Create a minimal reproduction case to isolate the issue.
  • Investigate integrating schema metadata into Spark plans earlier during planning.
  • Evaluate the feasibility of using DataFusion's catalog or schema inference mechanisms to improve type consistency.

Links and References

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

@andygrove andygrove added the bug Something isn't working label Jan 14, 2025
@rluvaton
Copy link
Contributor

rluvaton commented Jan 14, 2025

Injecting a CopyExec to unpack dictionaries before final aggregation. However, this approach is infeasible as inputs may include nested types (e.g., list of strings, structs).

This won't solve the issue as CopyExec works on the actual data, the data type of the unbound column in the final stage will still be the same

Also I don't recommend changing the state field data of accumulator between stages as it is the accumulator responsibility, the type might not be derived from the input bit used for own calculation + this can lead to code that is harder to reason about


Note that spark ignores the child expression in the final stage in its implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants