diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index ebfd47669ce8..106d8b981701 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -39,10 +39,12 @@ public long rtHandle() { public static native long cloneHashTable(long hashTableData); - public static native long nativeBuild( + public native long nativeBuild( String buildHashTableId, long[] batchHandlers, String[] joinKeys, + String[] filterBuildColumns, + boolean filterPropagatesNulls, int joinType, boolean hasMixedFiltCondition, boolean isExistenceJoin, diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index f34b3b6e625c..88e70cea85be 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -585,6 +585,22 @@ object VeloxConfig extends ConfigRegistry { .intConf .createWithDefault(100000) + val VELOX_MIN_TABLE_ROWS_FOR_PARALLEL_JOIN_BUILD = + buildConf("spark.gluten.velox.minTableRowsForParallelJoinBuild") + .experimental() + .doc("Experimental: the minimum number of table rows that can trigger " + + "the parallel hash join table build.") + .intConf + .createWithDefault(1000) + + val VELOX_JOIN_BUILD_VECTOR_HASHER_MAX_NUM_DISTINCT = + buildConf("spark.gluten.velox.joinBuildVectorHasherMaxNumDistinct") + .experimental() + .doc("Experimental: maximum number of distinct values to keep when " + + "merging vector hashers in join HashBuild.") + .intConf + .createWithDefault(1000000) + val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_PCT = buildConf("spark.gluten.velox.abandonDedupHashMap.minPct") .experimental() diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index ba014134d036..1554c4ddd3e5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -17,6 +17,8 @@ package org.apache.gluten.execution import org.apache.gluten.config.VeloxConfig +import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD import org.apache.spark.rpc.GlutenDriverEndpoint @@ -148,6 +150,20 @@ case class BroadcastHashJoinExecTransformer( } else { -1 } + + val (filterBuildColumns: Array[String], filterPropagatesNulls: Boolean) = condition match { + case Some(expr) => + val buildOutputSet = buildPlan.outputSet + val cols: Array[String] = expr.references.toSeq.collect { + case a: Attribute if buildOutputSet.contains(a) => + ConverterUtils.genColumnNameWithExprId(a) + }.toArray + val propagates = SparkShimLoader.getSparkShims.isNullIntolerant(expr) + (cols, propagates) + case None => + (Array.empty[String], false) + } + val context = BroadcastHashJoinContext( buildKeyExprs, @@ -156,6 +172,8 @@ case class BroadcastHashJoinExecTransformer( condition.isDefined, joinType.isInstanceOf[ExistenceJoin], buildPlan.output, + filterBuildColumns, + filterPropagatesNulls, buildBroadcastTableId, isNullAwareAntiJoin, bloomFilterPushdownSize, @@ -174,6 +192,8 @@ case class BroadcastHashJoinContext( hasMixedFiltCondition: Boolean, isExistenceJoin: Boolean, buildSideStructure: Seq[Attribute], + filterBuildColumns: Array[String], + filterPropagatesNulls: Boolean, buildHashTableId: String, isNullAwareAntiJoin: Boolean = false, bloomFilterPushdownSize: Long, diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 1b21ef7b425c..fea9f149745a 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -207,12 +207,16 @@ case class ColumnarBuildSideRelation( ConverterUtils.genColumnNameWithExprId(attr) }.toArray + val hashJoinBuilder = HashJoinBuilder.create(runtime) + // Build the hash table - hashTableData = HashJoinBuilder + hashTableData = hashJoinBuilder .nativeBuild( broadcastContext.buildHashTableId, batchArray.toArray, joinKeys, + broadcastContext.filterBuildColumns, + broadcastContext.filterPropagatesNulls, broadcastContext.substraitJoinType.ordinal(), broadcastContext.hasMixedFiltCondition, broadcastContext.isExistenceJoin, diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index 26195d48af0e..fbc329f36060 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -177,12 +177,16 @@ class UnsafeColumnarBuildSideRelation( ConverterUtils.genColumnNameWithExprId(attr) }.toArray + val hashJoinBuilder = HashJoinBuilder.create(runtime) + // Build the hash table - hashTableData = HashJoinBuilder + hashTableData = hashJoinBuilder .nativeBuild( broadcastContext.buildHashTableId, batchArray.toArray, joinKeys, + broadcastContext.filterBuildColumns, + broadcastContext.filterPropagatesNulls, broadcastContext.substraitJoinType.ordinal(), broadcastContext.hasMixedFiltCondition, broadcastContext.isExistenceJoin, diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 728cc46c92a4..2cb75c5a124c 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -112,6 +112,10 @@ class VeloxRuntime final : public Runtime { return veloxPlan_; } + const std::shared_ptr& veloxCfg() const { + return veloxCfg_; + } + bool debugModeEnabled() const { return debugModeEnabled_; } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index cabe3252827d..cc277991f6f3 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -63,8 +63,16 @@ const std::string kAbandonPartialAggregationMinRows = "spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows"; // hashmap build +const std::string kMinTableRowsForParallelJoinBuild = "spark.gluten.velox.minTableRowsForParallelJoinBuild"; +const uint32_t kMinTableRowsForParallelJoinBuildDefault = 1'000; + +const std::string kJoinBuildVectorHasherMaxNumDistinct = "spark.gluten.velox.joinBuildVectorHasherMaxNumDistinct"; +const uint32_t kJoinBuildVectorHasherMaxNumDistinctDefault = 1'000'000; + const std::string kAbandonDedupHashMapMinRows = "spark.gluten.velox.abandonDedupHashMap.minRows"; +const uint32_t kAbandonDedupHashMapMinRowsDefault = 100'000; const std::string kAbandonDedupHashMapMinPct = "spark.gluten.velox.abandonDedupHashMap.minPct"; +const uint32_t kAbandonDedupHashMapMinPctDefault = 0; // execution const std::string kSparkBloomFilterExpectedNumItems = "spark.sql.optimizer.runtime.bloomFilter.expectedNumItems"; diff --git a/cpp/velox/jni/JniHashTable.cc b/cpp/velox/jni/JniHashTable.cc index 8af60a5534e7..11873471575b 100644 --- a/cpp/velox/jni/JniHashTable.cc +++ b/cpp/velox/jni/JniHashTable.cc @@ -18,6 +18,7 @@ #include #include +#include #include "JniHashTable.h" #include "folly/String.h" #include "memory/ColumnarBatch.h" @@ -57,6 +58,8 @@ jlong JniHashTableContext::callJavaGet(const std::string& id) const { // Return the velox's hash table. std::shared_ptr nativeHashTableBuild( const std::vector& joinKeys, + const std::vector& filterBuildColumns, + bool filterPropagatesNulls, std::vector names, std::vector veloxTypeList, int joinType, @@ -64,6 +67,10 @@ std::shared_ptr nativeHashTableBuild( bool isExistenceJoin, bool isNullAwareAntiJoin, int64_t bloomFilterPushdownSize, + uint32_t minTableRowsForParallelJoinBuild, + uint32_t joinBuildVectorHasherMaxNumDistinct, + uint32_t abandonHashBuildDedupMinRows, + uint32_t abandonHashBuildDedupMinPct, std::vector>& batches, std::shared_ptr memoryPool) { auto rowType = std::make_shared(std::move(names), std::move(veloxTypeList)); @@ -115,18 +122,38 @@ std::shared_ptr nativeHashTableBuild( std::make_shared(rowType->findChild(name), name)); } + std::vector filterInputChannels; + filterInputChannels.reserve(filterBuildColumns.size()); + for (const auto& name : filterBuildColumns) { + if (const auto idx = rowType->getChildIdxIfExists(name)) { + filterInputChannels.push_back(*idx); + } + } + std::sort(filterInputChannels.begin(), filterInputChannels.end()); + filterInputChannels.erase( + std::unique(filterInputChannels.begin(), filterInputChannels.end()), filterInputChannels.end()); + auto hashTableBuilder = std::make_shared( vJoin, isNullAwareAntiJoin, hasMixedJoinCondition, bloomFilterPushdownSize, joinKeyTypes, + filterInputChannels, + filterPropagatesNulls, rowType, - memoryPool.get()); + memoryPool.get(), + minTableRowsForParallelJoinBuild, + joinBuildVectorHasherMaxNumDistinct, + abandonHashBuildDedupMinRows, + abandonHashBuildDedupMinPct); for (auto i = 0; i < batches.size(); i++) { auto rowVector = VeloxColumnarBatch::from(memoryPool.get(), batches[i])->getRowVector(); hashTableBuilder->addInput(rowVector); + if (hashTableBuilder->noMoreInput()) { + break; + } } return hashTableBuilder; diff --git a/cpp/velox/jni/JniHashTable.h b/cpp/velox/jni/JniHashTable.h index 51d9a8e25bdf..47f89d179968 100644 --- a/cpp/velox/jni/JniHashTable.h +++ b/cpp/velox/jni/JniHashTable.h @@ -73,6 +73,8 @@ class JniHashTableContext { // Return the hash table builder address. std::shared_ptr nativeHashTableBuild( const std::vector& joinKeys, + const std::vector& filterBuildColumns, + bool filterPropagatesNulls, std::vector names, std::vector veloxTypeList, int joinType, @@ -80,6 +82,10 @@ std::shared_ptr nativeHashTableBuild( bool isExistenceJoin, bool isNullAwareAntiJoin, int64_t bloomFilterPushdownSize, + uint32_t minTableRowsForParallelJoinBuild, + uint32_t joinBuildVectorHasherMaxNumDistinct, + uint32_t abandonHashBuildDedupMinRows, + uint32_t abandonHashBuildDedupMinPct, std::vector>& batches, std::shared_ptr memoryPool); diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 51e14fde33cc..2435d9f641b2 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -30,6 +30,7 @@ #include "compute/VeloxBackend.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" +#include "config/VeloxConfig.h" #include "jni/JniError.h" #include "jni/JniFileSystem.h" #include "jni/JniHashTable.h" @@ -938,10 +939,12 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrappe JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_nativeBuild( // NOLINT JNIEnv* env, - jclass, + jobject wrapper, jstring tableId, jlongArray batchHandles, jobjectArray joinKeys, + jobjectArray filterBuildColumns, + jboolean filterPropagatesNulls, jint joinType, jboolean hasMixedJoinCondition, jboolean isExistenceJoin, @@ -950,6 +953,18 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native jlong bloomFilterPushdownSize, jint numThreads) { JNI_METHOD_START + auto ctx = getRuntime(env, wrapper); + auto* runtime = dynamic_cast(ctx); + GLUTEN_CHECK(runtime != nullptr, "Not a Velox runtime"); + const auto& queryConf = *(runtime->veloxCfg()); + const auto minTableRowsForParallelJoinBuild = + queryConf.get(kMinTableRowsForParallelJoinBuild, kMinTableRowsForParallelJoinBuildDefault); + const auto joinBuildVectorHasherMaxNumDistinct = + queryConf.get(kJoinBuildVectorHasherMaxNumDistinct, kJoinBuildVectorHasherMaxNumDistinctDefault); + const auto abandonHashBuildDedupMinRows = + queryConf.get(kAbandonDedupHashMapMinRows, kAbandonDedupHashMapMinRowsDefault); + const auto abandonHashBuildDedupMinPct = + queryConf.get(kAbandonDedupHashMapMinPct, kAbandonDedupHashMapMinPctDefault); const auto hashTableId = jStringToCString(env, tableId); // Convert Java String array to C++ vector @@ -961,6 +976,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native hashJoinKeys.emplace_back(jStringToCString(env, jkey)); } + std::vector filterColumns; + if (filterBuildColumns != nullptr) { + jsize filterColumnsCount = env->GetArrayLength(filterBuildColumns); + filterColumns.reserve(filterColumnsCount); + for (jsize i = 0; i < filterColumnsCount; ++i) { + jstring jcol = (jstring)env->GetObjectArrayElement(filterBuildColumns, i); + filterColumns.emplace_back(jStringToCString(env, jcol)); + } + } + const auto inputType = gluten::getByteArrayElementsSafe(env, namedStruct); std::string structString{ reinterpret_cast(inputType.elems()), static_cast(inputType.length())}; @@ -990,6 +1015,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native if (numThreads == 1) { auto builder = nativeHashTableBuild( hashJoinKeys, + filterColumns, + filterPropagatesNulls, names, veloxTypeList, joinType, @@ -997,6 +1024,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native isExistenceJoin, isNullAwareAntiJoin, bloomFilterPushdownSize, + minTableRowsForParallelJoinBuild, + joinBuildVectorHasherMaxNumDistinct, + abandonHashBuildDedupMinRows, + abandonHashBuildDedupMinPct, cb, defaultLeafVeloxMemoryPool()); @@ -1004,7 +1035,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native mainTable->prepareJoinTable( {}, facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit, - 1'000'000, + builder->joinBuildVectorHasherMaxNumDistinct(), builder->dropDuplicates(), nullptr); builder->setHashTable(std::move(mainTable)); @@ -1027,12 +1058,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native // Submit task to thread pool auto future = folly::via(executor, [&, t, start, end]() { std::vector> threadBatches; + threadBatches.reserve(end - start); for (size_t i = start; i < end; ++i) { threadBatches.push_back(cb[i]); } auto builder = nativeHashTableBuild( hashJoinKeys, + filterColumns, + filterPropagatesNulls, names, veloxTypeList, joinType, @@ -1040,6 +1074,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native isExistenceJoin, isNullAwareAntiJoin, bloomFilterPushdownSize, + minTableRowsForParallelJoinBuild, + joinBuildVectorHasherMaxNumDistinct, + abandonHashBuildDedupMinRows, + abandonHashBuildDedupMinPct, threadBatches, defaultLeafVeloxMemoryPool()); @@ -1067,7 +1105,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native mainTable->prepareJoinTable( std::move(tables), facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit, - 1'000'000, + hashTableBuilders[0]->joinBuildVectorHasherMaxNumDistinct(), hashTableBuilders[0]->dropDuplicates(), allowParallelJoinBuild ? VeloxBackend::get()->executor() : nullptr); diff --git a/cpp/velox/operators/hashjoin/HashTableBuilder.cc b/cpp/velox/operators/hashjoin/HashTableBuilder.cc index 2dcc098dadee..363edea4eccc 100644 --- a/cpp/velox/operators/hashjoin/HashTableBuilder.cc +++ b/cpp/velox/operators/hashjoin/HashTableBuilder.cc @@ -16,19 +16,25 @@ */ #include "operators/hashjoin/HashTableBuilder.h" + +#include + +#include + #include "velox/exec/OperatorUtils.h" namespace gluten { namespace { facebook::velox::RowTypePtr hashJoinTableType( const std::vector& joinKeys, - const facebook::velox::RowTypePtr& inputType) { + const facebook::velox::RowTypePtr& inputType, + bool includeDependents) { const auto numKeys = joinKeys.size(); std::vector names; - names.reserve(inputType->size()); + names.reserve(includeDependents ? inputType->size() : numKeys); std::vector types; - types.reserve(inputType->size()); + types.reserve(includeDependents ? inputType->size() : numKeys); std::unordered_set keyChannelSet; keyChannelSet.reserve(inputType->size()); @@ -40,6 +46,10 @@ facebook::velox::RowTypePtr hashJoinTableType( types.emplace_back(inputType->childAt(channel)); } + if (!includeDependents) { + return ROW(std::move(names), std::move(types)); + } + for (auto i = 0; i < inputType->size(); ++i) { if (keyChannelSet.find(i) == keyChannelSet.end()) { names.emplace_back(inputType->nameOf(i)); @@ -62,15 +72,28 @@ HashTableBuilder::HashTableBuilder( bool withFilter, int64_t bloomFilterPushdownSize, const std::vector& joinKeys, + const std::vector& filterInputChannels, + bool filterPropagatesNulls, const facebook::velox::RowTypePtr& inputType, - facebook::velox::memory::MemoryPool* pool) + facebook::velox::memory::MemoryPool* pool, + uint32_t minTableRowsForParallelJoinBuild, + uint32_t joinBuildVectorHasherMaxNumDistinct, + uint32_t abandonHashBuildDedupMinRows, + uint32_t abandonHashBuildDedupMinPct) : joinType_{joinType}, nullAware_{nullAware}, withFilter_(withFilter), keyChannelMap_(joinKeys.size()), inputType_(inputType), bloomFilterPushdownSize_(bloomFilterPushdownSize), - pool_(pool) { + pool_(pool), + minTableRowsForParallelJoinBuild_(minTableRowsForParallelJoinBuild), + joinBuildVectorHasherMaxNumDistinct_(joinBuildVectorHasherMaxNumDistinct), + abandonHashBuildDedupMinRows_(abandonHashBuildDedupMinRows), + abandonHashBuildDedupMinPct_(abandonHashBuildDedupMinPct), + filterPropagatesNulls_(filterPropagatesNulls) { + dropDuplicates_ = + !withFilter_ && (isLeftSemiFilterJoin(joinType_) || isLeftSemiProjectJoin(joinType_) || isAntiJoin(joinType_)); const auto numKeys = joinKeys.size(); keyChannels_.reserve(numKeys); @@ -82,25 +105,84 @@ HashTableBuilder::HashTableBuilder( } // Identify the non-key build side columns and make a decoder for each. - const int32_t numDependents = inputType_->size() - numKeys; - if (numDependents > 0) { - // Number of join keys (numKeys) may be less then number of input columns - // (inputType->size()). In this case numDependents is negative and cannot be - // used to call 'reserve'. This happens when we join different probe side - // keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 = - // u.k AND t.k2 = u.k. - dependentChannels_.reserve(numDependents); - decoders_.reserve(numDependents); - } - for (auto i = 0; i < inputType->size(); ++i) { - if (keyChannelMap_.find(i) == keyChannelMap_.end()) { - dependentChannels_.emplace_back(i); - decoders_.emplace_back(std::make_unique()); + if (!dropDuplicates_) { + const int32_t numDependents = inputType_->size() - numKeys; + if (numDependents > 0) { + // Number of join keys (numKeys) may be less then number of input columns + // (inputType->size()). In this case numDependents is negative and cannot + // be used to call 'reserve'. This happens when we join different probe + // side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON + // t.k1 = u.k AND t.k2 = u.k. + dependentChannels_.reserve(numDependents); + decoders_.reserve(numDependents); + } + for (auto i = 0; i < inputType->size(); ++i) { + if (keyChannelMap_.find(i) == keyChannelMap_.end()) { + dependentChannels_.emplace_back(i); + decoders_.emplace_back(std::make_unique()); + } } } - tableType_ = hashJoinTableType(joinKeys, inputType); + tableType_ = hashJoinTableType(joinKeys, inputType, !dropDuplicates_); setupTable(); + + if (isAntiJoin(joinType_) && withFilter_ && filterPropagatesNulls_) { + setupFilterForAntiJoins(filterInputChannels); + } +} + +void HashTableBuilder::setupFilterForAntiJoins(const std::vector& filterInputChannels) { + VELOX_DCHECK(std::is_sorted(dependentChannels_.begin(), dependentChannels_.end())); + + for (auto channel : filterInputChannels) { + auto keyIter = keyChannelMap_.find(channel); + if (keyIter != keyChannelMap_.end()) { + keyFilterChannels_.push_back(keyIter->second); + continue; + } + + auto dependentIter = std::lower_bound(dependentChannels_.begin(), dependentChannels_.end(), channel); + if (dependentIter == dependentChannels_.end() || *dependentIter != channel) { + continue; + } + dependentFilterChannels_.push_back(dependentIter - dependentChannels_.begin()); + } +} + +void HashTableBuilder::removeInputRowsForAntiJoinFilter() { + bool changed = false; + auto* rawActiveRows = activeRows_.asMutableRange().bits(); + + auto removeNulls = [&](facebook::velox::DecodedVector& decoded) { + if (decoded.mayHaveNulls()) { + changed = true; + facebook::velox::bits::andBits(rawActiveRows, decoded.nulls(&activeRows_), 0, activeRows_.end()); + } + }; + + for (auto channel : keyFilterChannels_) { + removeNulls(uniqueTable_->hashers()[channel]->decodedVector()); + } + for (auto channel : dependentFilterChannels_) { + removeNulls(*decoders_[channel]); + } + + if (changed) { + activeRows_.updateBounds(); + } +} + +bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t numDistinct) const { + VELOX_CHECK(dropDuplicates_); + return numHashInputRows_ > abandonHashBuildDedupMinRows_ && + (100 * numDistinct / numHashInputRows_) >= abandonHashBuildDedupMinPct_; +} + +void HashTableBuilder::abandonHashBuildDedup() { + abandonHashBuildDedup_ = true; + uniqueTable_->setAllowDuplicates(true); + lookup_.reset(); } // Invoked to set up hash table to build. @@ -128,14 +210,9 @@ void HashTableBuilder::setupTable() { true, // allowDuplicates true, // hasProbedFlag false, // hasCountFlag - 1'000, // operatorCtx_->driverCtx()->queryConfig().minTableRowsForParallelJoinBuild() - pool_, - true); + minTableRowsForParallelJoinBuild_, + pool_); } else { - // (Left) semi and anti join with no extra filter only needs to know whether - // there is a match. Hence, no need to store entries with duplicate keys. - dropDuplicates_ = - !withFilter_ && (isLeftSemiFilterJoin(joinType_) || isLeftSemiProjectJoin(joinType_) || isAntiJoin(joinType_)); // Right semi join needs to tag build rows that were probed. const bool needProbedFlag = isRightSemiFilterJoin(joinType_); const bool hasCountFlag = facebook::velox::core::isCountingJoin(joinType_); @@ -148,9 +225,8 @@ void HashTableBuilder::setupTable() { !dropDuplicates_, // allowDuplicates needProbedFlag, // hasProbedFlag hasCountFlag, // hasCountFlag - 1'000, // operatorCtx_->driverCtx()->queryConfig().minTableRowsForParallelJoinBuild() - pool_, - true); + minTableRowsForParallelJoinBuild_, + pool_); } else { // Ignore null keys uniqueTable_ = facebook::velox::exec::HashTable::createForJoin( @@ -159,12 +235,20 @@ void HashTableBuilder::setupTable() { !dropDuplicates_, // allowDuplicates needProbedFlag, // hasProbedFlag hasCountFlag, // hasCountFlag - 1'000, // operatorCtx_->driverCtx()->queryConfig().minTableRowsForParallelJoinBuild() + minTableRowsForParallelJoinBuild_, pool_, bloomFilterPushdownSize_); } } analyzeKeys_ = uniqueTable_->hashMode() != facebook::velox::exec::BaseHashTable::HashMode::kHash; + + if (dropDuplicates_) { + if (!facebook::velox::core::isCountingJoin(joinType_) && abandonHashBuildDedupMinPct_ == 0) { + abandonHashBuildDedup(); + return; + } + lookup_ = std::make_unique(uniqueTable_->hashers(), pool_); + } } void HashTableBuilder::addInput(facebook::velox::RowVectorPtr input) { @@ -178,9 +262,6 @@ void HashTableBuilder::addInput(facebook::velox::RowVectorPtr input) { hashers[i]->decode(*key, activeRows_); } - deselectRowsWithNulls(hashers, activeRows_); - activeRows_.setAll(); - if (!isRightJoin(joinType_) && !isFullJoin(joinType_) && !isRightSemiProjectJoin(joinType_) && !isLeftNullAwareJoinWithFilter(joinType_, nullAware_, withFilter_)) { deselectRowsWithNulls(hashers, activeRows_); @@ -200,10 +281,55 @@ void HashTableBuilder::addInput(facebook::velox::RowVectorPtr input) { } } + if (isAntiJoin(joinType_) && nullAware_ && joinHasNullKeys_ && !withFilter_) { + noMoreInput_ = true; + return; + } + + if (!activeRows_.hasSelections()) { + return; + } + + if (dropDuplicates_ && !abandonHashBuildDedup_) { + // Counting joins must not abandon dedup — accurate counts are required. + VELOX_CHECK_NOT_NULL(lookup_); + const bool abandonEarly = + !facebook::velox::core::isCountingJoin(joinType_) && abandonHashBuildDedupEarly(uniqueTable_->numDistinct()); + if (!abandonEarly) { + numHashInputRows_ += activeRows_.countSelected(); + uniqueTable_->prepareForGroupProbe( + *lookup_, input, activeRows_, facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit); + if (lookup_->rows.empty()) { + return; + } + uniqueTable_->groupProbe(*lookup_, facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit); + + // For counting joins, increment the count for duplicate rows. + // New rows are initialized with count = 1 by initializeRow. + // Increment count for all rows, then decrement for new rows to + // correct the over-counting. + if (facebook::velox::core::isCountingJoin(joinType_)) { + auto* rows = uniqueTable_->rows(); + for (auto row : lookup_->rows) { + rows->incrementCount(lookup_->hits[row]); + } + for (auto newRow : lookup_->newGroups) { + rows->decrementCount(lookup_->hits[newRow]); + } + } + return; + } + abandonHashBuildDedup(); + } + for (auto i = 0; i < dependentChannels_.size(); ++i) { decoders_[i]->decode(*input->childAt(dependentChannels_[i])->loadedVector(), activeRows_); } + if (isAntiJoin(joinType_) && withFilter_ && filterPropagatesNulls_) { + removeInputRowsForAntiJoinFilter(); + } + if (!activeRows_.hasSelections()) { return; } diff --git a/cpp/velox/operators/hashjoin/HashTableBuilder.h b/cpp/velox/operators/hashjoin/HashTableBuilder.h index 83c90b411009..98e459b0141e 100644 --- a/cpp/velox/operators/hashjoin/HashTableBuilder.h +++ b/cpp/velox/operators/hashjoin/HashTableBuilder.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include "velox/exec/HashJoinBridge.h" #include "velox/exec/HashTable.h" @@ -35,8 +36,14 @@ class HashTableBuilder { bool withFilter, int64_t bloomFilterPushdownSize, const std::vector& joinKeys, + const std::vector& filterInputChannels, + bool filterPropagatesNulls, const facebook::velox::RowTypePtr& inputType, - facebook::velox::memory::MemoryPool* pool); + facebook::velox::memory::MemoryPool* pool, + uint32_t minTableRowsForParallelJoinBuild, + uint32_t joinBuildVectorHasherMaxNumDistinct, + uint32_t abandonHashBuildDedupMinRows, + uint32_t abandonHashBuildDedupMinPct); void addInput(facebook::velox::RowVectorPtr input); @@ -45,6 +52,7 @@ class HashTableBuilder { } std::unique_ptr uniqueTable() { + lookup_.reset(); return std::move(uniqueTable_); } @@ -63,10 +71,24 @@ class HashTableBuilder { return dropDuplicates_; } + bool noMoreInput() const { + return noMoreInput_; + } + + uint32_t joinBuildVectorHasherMaxNumDistinct() const { + return joinBuildVectorHasherMaxNumDistinct_; + } + private: // Invoked to set up hash table to build. void setupTable(); + void setupFilterForAntiJoins(const std::vector& filterInputChannels); + void removeInputRowsForAntiJoinFilter(); + + bool abandonHashBuildDedupEarly(int64_t numDistinct) const; + void abandonHashBuildDedup(); + const facebook::velox::core::JoinType joinType_; const bool nullAware_; @@ -99,6 +121,8 @@ class HashTableBuilder { // Set of active rows during addInput(). facebook::velox::SelectivityVector activeRows_; + std::unique_ptr lookup_; + // True if this is a build side of an anti or left semi project join and has // at least one entry with null join keys. bool joinHasNullKeys_{false}; @@ -118,6 +142,14 @@ class HashTableBuilder { facebook::velox::memory::MemoryPool* pool_; bool dropDuplicates_{false}; + bool abandonHashBuildDedup_{false}; + bool noMoreInput_{false}; + uint64_t numHashInputRows_{0}; + uint32_t minTableRowsForParallelJoinBuild_{1'000}; + uint32_t joinBuildVectorHasherMaxNumDistinct_{1'000'000}; + uint32_t abandonHashBuildDedupMinRows_{100'000}; + uint32_t abandonHashBuildDedupMinPct_{0}; + bool filterPropagatesNulls_{false}; }; } // namespace gluten diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index b01e343c1011..2202fed3d5bc 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -16,7 +16,6 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | | spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | | spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout for asynchronous execution when task is being stopped in Velox backend. It's recommended to set to a number larger than network connection timeout that the possible aysnc tasks are relying on. | -| spark.gluten.sql.columnar.backend.velox.broadcastHashTableBuildThreads | 1 | The number of threads used to build the broadcast hash table. If not set or set to 0, it will use the default number of threads (available processors). | | spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | | spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | @@ -48,7 +47,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created | | spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level | | spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run | -| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | +| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | | spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size | | spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. | | spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | @@ -78,6 +77,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | | spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | | spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | +| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. | | spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | ## Gluten Velox backend *experimental* configurations @@ -86,5 +86,7 @@ nav_order: 16 |----------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------| | spark.gluten.velox.abandonDedupHashMap.minPct | 0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. Value is integer based and range is [0, 100]. | | spark.gluten.velox.abandonDedupHashMap.minRows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. | +| spark.gluten.velox.joinBuildVectorHasherMaxNumDistinct | 1000000 | Experimental: maximum number of distinct values to keep when merging vector hashers in join HashBuild. | +| spark.gluten.velox.minTableRowsForParallelJoinBuild | 1000 | Experimental: the minimum number of table rows that can trigger the parallel hash join table build. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. | diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 2f5350f38a08..4d1fd804a9cc 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -212,6 +212,8 @@ trait SparkShims { def withAnsiEvalMode(expr: Expression): Boolean = false + def isNullIntolerant(expr: Expression): Boolean + def createParquetFilters( conf: SQLConf, schema: MessageType, diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 0ea3d7d9b8fe..c78b26627066 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -84,6 +84,8 @@ class Spark33Shims extends SparkShims { ) } + override def isNullIntolerant(expr: Expression): Boolean = expr.isInstanceOf[NullIntolerant] + override def generateFileScanRDD( sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 61bc3bc94568..1bf51e488c7d 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -102,6 +102,8 @@ class Spark34Shims extends SparkShims { ) } + override def isNullIntolerant(expr: Expression): Boolean = expr.isInstanceOf[NullIntolerant] + override def generateFileScanRDD( sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 7e31af9b672c..36ba645ae196 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -106,6 +106,8 @@ class Spark35Shims extends SparkShims { ) } + override def isNullIntolerant(expr: Expression): Boolean = expr.isInstanceOf[NullIntolerant] + override def generateFileScanRDD( sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], diff --git a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala index 0c3b163ce533..0653cf04c0fb 100644 --- a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala +++ b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala @@ -111,6 +111,8 @@ class Spark40Shims extends SparkShims { ) } + override def isNullIntolerant(expr: Expression): Boolean = expr.nullIntolerant + override def generateFileScanRDD( sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 5ff9d51c71db..cb953b659d66 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -110,6 +110,8 @@ class Spark41Shims extends SparkShims { ) } + override def isNullIntolerant(expr: Expression): Boolean = expr.nullIntolerant + override def generateFileScanRDD( sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow],