diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e9a2417d92d6..5eb7e3c14df1 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -399,24 +399,30 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } else if ( sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { + bool hashTableBuildOncePerExecutorEnabled = + SubstraitParser::configSetInOptimization( + sJoin.advanced_extension(), "isHashTableBuildOncePerExecutor="); + std::string hashTableId = sJoin.hashtableid(); std::shared_ptr opaqueSharedHashTable = nullptr; bool joinHasNullKeys = false; - try { - auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); - joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); - auto originalShared = hashTableBuilder->hashTable(); - opaqueSharedHashTable = std::shared_ptr( - originalShared, reinterpret_cast(originalShared.get())); - - LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; - } catch (const std::exception& e) { - LOG(WARNING) - << "Error retrieving HashTable from ObjectStore: " << e.what() - << ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false."; - opaqueSharedHashTable = nullptr; + if (hashTableBuildOncePerExecutorEnabled) { + try { + auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); + joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); + auto originalShared = hashTableBuilder->hashTable(); + opaqueSharedHashTable = std::shared_ptr( + originalShared, reinterpret_cast(originalShared.get())); + + LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; + } catch (const std::exception& e) { + throw GlutenException( + "Error retrieving HashTable from ObjectStore: " + std::string(e.what()) + + " You can set spark.gluten.velox.buildHashTableOncePerExecutor.enabled" + " to false as workaround."); + } } // Create HashJoinNode node diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 149a0ca729eb..629d7c9958c1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.expression._ import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.sql.shims.SparkShimLoader @@ -281,6 +282,12 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { // isBHJ: 0 for SHJ, 1 for BHJ // isNullAwareAntiJoin: 0 for false, 1 for true // buildHashTableId: the unique id for the hash table of build plan + val isHashTableBuildOncePerExecutor = + if ( + BackendsApiManager.getSettings.enableHashTableBuildOncePerExecutor() && + GlutenConfig.get.enableColumnarBroadcastExchange + ) { 1 } + else 0 joinParametersStr .append("isBHJ=") .append(isBHJ) @@ -291,6 +298,9 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { .append("buildHashTableId=") .append(buildHashTableId) .append("\n") + .append("isHashTableBuildOncePerExecutor=") + .append(isHashTableBuildOncePerExecutor) + .append("\n") .append("isExistenceJoin=") .append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0) .append("\n")