diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 0307ffdbf7b1..567e769d1304 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -716,6 +716,9 @@ class QueryConfig { static constexpr const char* kRowSizeTrackingEnabled = "row_size_tracking_enabled"; + static constexpr const char* kDisableCrc32ForShuffle = + "disable_crc32_for_shuffle"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -1291,6 +1294,10 @@ class QueryConfig { return get(kClientTags, ""); } + bool isDisableCrc32ForShuffleEnabled() const { + return get(kDisableCrc32ForShuffle, false); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 974cfbb3ec8f..91bc4b73c899 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -30,6 +30,7 @@ std::unique_ptr getVectorSerdeOptions( options->compressionKind = common::stringToCompressionKind(queryConfig.shuffleCompressionKind()); options->minCompressionRatio = PartitionedOutput::minCompressionRatio(); + options->disableCrc32c = queryConfig.isDisableCrc32ForShuffleEnabled(); return options; } } // namespace @@ -126,7 +127,12 @@ BlockingReason Destination::flush( // Upper limit of message size with no columns. constexpr int32_t kMinMessageSize = 128; - auto listener = bufferManager.newListener(); + + std::unique_ptr listener(nullptr); + if (!serdeOptions_->disableCrc32c) { + listener = bufferManager.newListener(); + } + IOBufOutputStream stream( *current_->pool(), listener.get(), diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 2217c2eed226..2a558b68783a 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -247,9 +247,14 @@ void PrestoVectorSerde::serializeSingleColumn( Scratch scratch; serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch); - PrestoOutputStreamListener listener; - OStreamOutputStream outputStream(output, &listener); - stream->flush(&outputStream); + if (opts->disableCrc32c) { + OStreamOutputStream outputStream(output, nullptr); + stream->flush(&outputStream); + } else { + PrestoOutputStreamListener listener; + OStreamOutputStream outputStream(output, &listener); + stream->flush(&outputStream); + } } // static diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index a079e232b8a5..2cd8d7556302 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -57,8 +57,12 @@ class PrestoVectorSerde : public VectorSerde { common::CompressionKind _compressionKind, float _minCompressionRatio = 0.8, bool _nullsFirst = false, - bool _preserveEncodings = false) - : VectorSerde::Options(_compressionKind, _minCompressionRatio), + bool _preserveEncodings = false, + bool _disableCrc32c = false) + : VectorSerde::Options( + _compressionKind, + _minCompressionRatio, + _disableCrc32c), useLosslessTimestamp(_useLosslessTimestamp), nullsFirst(_nullsFirst), preserveEncodings(_preserveEncodings) {} diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 5bcfae91c3d4..b588ace02813 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -77,7 +77,7 @@ class CompactRowSerializerTest : public ::testing::Test, appendRow_ = GetParam().appendRow; compressionKind_ = GetParam().compressionKind; microBatchDeserialize_ = GetParam().microBatchDeserialize; - options_ = std::make_unique(compressionKind_, 0.8); + options_ = std::make_unique(compressionKind_, 0.8, false); } void TearDown() override { diff --git a/velox/serializers/tests/SerializedPageFileTest.cpp b/velox/serializers/tests/SerializedPageFileTest.cpp index 52a0feb3a487..61fe389c9342 100644 --- a/velox/serializers/tests/SerializedPageFileTest.cpp +++ b/velox/serializers/tests/SerializedPageFileTest.cpp @@ -101,7 +101,8 @@ class SerializedPageFileTest : public ::testing::TestWithParam, false); // preserveEncodings case SerdeType::kCompactRow: case SerdeType::kUnsafeRow: - return std::make_unique(compressionKind_, 0.8); + return std::make_unique( + compressionKind_, 0.8, false); } return nullptr; } diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index 36642caf993e..0ae2ee13044f 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -63,7 +63,8 @@ class UnsafeRowSerializerTest : public ::testing::Test, VectorSerde::Kind::kUnsafeRow); appendRow_ = GetParam().appendRow; compressionKind_ = GetParam().compressionKind; - options_ = std::make_unique(compressionKind_, 0.8); + options_ = + std::make_unique(compressionKind_, 0.8, false); } void TearDown() override { diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index a11733c803b9..43feb15fc681 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -219,9 +219,11 @@ class VectorSerde { Options( common::CompressionKind _compressionKind, - float _minCompressionRatio) + float _minCompressionRatio, + bool _disableCrc32c) : compressionKind(_compressionKind), - minCompressionRatio(_minCompressionRatio) {} + minCompressionRatio(_minCompressionRatio), + disableCrc32c(_disableCrc32c) {} virtual ~Options() = default; @@ -231,6 +233,7 @@ class VectorSerde { /// than this causes subsequent compression attempts to be skipped. The more /// times compression misses the target the less frequently it is tried. float minCompressionRatio{0.8}; + bool disableCrc32c{false}; }; Kind kind() const {