[VL] shuffle: TypeAwareCompress(tac) for column-wise data compression like (U)INT64#11894
[VL] shuffle: TypeAwareCompress(tac) for column-wise data compression like (U)INT64#11894guowangy wants to merge 2 commits intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI on x86 |
d4db9f6 to
6d5f57f
Compare
|
Run Gluten Clickhouse CI on x86 |
|
We have enabled this in Gazelle. @marin-ma do you still remember why we didn't introduce it in Gluten? |
|
Run Gluten Clickhouse CI on x86 |
|
@FelixYBW We used the compression and arrow ipc payload API in gazelle, and added the FastPFor compression for integer column types and it's also used as the default compression method for integers in gazelle https://github.com/fast-pack/FastPFOR Because adding a new compression algorithm for the Arrow API requires extra patches, we removed it mainly due to maintenance concerns. Using the default LZ4 algorithm did not result in significant performance regression. |
|
@marin-ma We have also investigated FastPFor library, but it only supports int32 type. This version is aimed to optimize int64 data type, which can archive better compression ratio and speed than int32 types. |
What changes are proposed in this pull request?
Introduces TypeAwareCompress (TAC) — a column-wise compression layer for shuffle that selects
an algorithm based on each buffer's data type, applied per-buffer alongside the existing LZ4/ZSTD
codec path.
For
INT64/UINT64columns the values are often clustered in a small range, makingFrame-of-Reference + Bit-Packing (FFOR) significantly more effective than generic byte-level
compression. TAC exploits this by encoding 8-byte integer buffers with a 4-lane FFOR codec before
the standard codec sees them.
Here is the performance data on TPCH/TPCDS:
New files
cpp/core/utils/tac/ffor.hppuint64_tcpp/core/utils/tac/FForCodec.{h,cc}ffor.hppcpp/core/utils/tac/TypeAwareCompressCodec.{h,cc}cpp/velox/shuffle/VeloxTypeAwareCompress.hTypeKind→TacDataType(BIGINT→kUInt64)Shuffle integration
Payload.cc/h:BlockPayload::fromBuffersaccepts an optionalbufferTypesvector. Per-buffer:if
TypeAwareCompressCodec::support(type)is true, use TAC; otherwise fall back to LZ4/ZSTD.A new wire marker
kTypeAwareBuffer = -3is added; decompression inreadCompressedBufferisself-describing. If TAC compressed size ≥ original, falls back to
kUncompressedBuffer.Options.h: addsenableTypeAwareCompress(defaultfalse) toLocalPartitionWriterOptions.VeloxHashShuffleWriter: populatesbufferTypesfrom the schema when TAC is enabled.GlutenConfig.scala: new configspark.gluten.sql.columnar.shuffle.typeAwareCompress.enabled(defaultfalse).ColumnarShuffleWriter/LocalPartitionWriterJniWrapper: forward the new option to native.Disabled by default — no behaviour changes for existing deployments.
How was this patch tested?
cpp/core/tests/FForCodecTest.cccovers:maxCompressedLengthboundary checkscpp/velox/tests/VeloxShuffleWriterTest.cc: extended to exercise the TAC path end-to-end throughVeloxHashShuffleWriter.Was this patch authored or co-authored using generative AI tooling?
Co-authored-by: Claude Sonnet 4.6