Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ public long rtHandle() {

public static native long cloneHashTable(long hashTableData);

public static native long nativeBuild(
public native long nativeBuild(
Comment thread
JkSelf marked this conversation as resolved.
String buildHashTableId,
long[] batchHandlers,
String[] joinKeys,
String[] filterBuildColumns,
boolean filterPropagatesNulls,
int joinType,
boolean hasMixedFiltCondition,
boolean isExistenceJoin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,22 @@ object VeloxConfig extends ConfigRegistry {
.intConf
.createWithDefault(100000)

val VELOX_MIN_TABLE_ROWS_FOR_PARALLEL_JOIN_BUILD =
buildConf("spark.gluten.velox.minTableRowsForParallelJoinBuild")
.experimental()
.doc("Experimental: the minimum number of table rows that can trigger " +
"the parallel hash join table build.")
.intConf
.createWithDefault(1000)

val VELOX_JOIN_BUILD_VECTOR_HASHER_MAX_NUM_DISTINCT =
buildConf("spark.gluten.velox.joinBuildVectorHasherMaxNumDistinct")
.experimental()
.doc("Experimental: maximum number of distinct values to keep when " +
"merging vector hashers in join HashBuild.")
.intConf
.createWithDefault(1000000)

val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_PCT =
buildConf("spark.gluten.velox.abandonDedupHashMap.minPct")
.experimental()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.gluten.execution

import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.GlutenDriverEndpoint
Expand Down Expand Up @@ -148,6 +150,20 @@ case class BroadcastHashJoinExecTransformer(
} else {
-1
}

val (filterBuildColumns: Array[String], filterPropagatesNulls: Boolean) = condition match {
case Some(expr) =>
val buildOutputSet = buildPlan.outputSet
val cols: Array[String] = expr.references.toSeq.collect {
case a: Attribute if buildOutputSet.contains(a) =>
ConverterUtils.genColumnNameWithExprId(a)
}.toArray
val propagates = SparkShimLoader.getSparkShims.isNullIntolerant(expr)
(cols, propagates)
case None =>
(Array.empty[String], false)
}

val context =
BroadcastHashJoinContext(
buildKeyExprs,
Expand All @@ -156,6 +172,8 @@ case class BroadcastHashJoinExecTransformer(
condition.isDefined,
joinType.isInstanceOf[ExistenceJoin],
buildPlan.output,
filterBuildColumns,
filterPropagatesNulls,
buildBroadcastTableId,
isNullAwareAntiJoin,
bloomFilterPushdownSize,
Expand All @@ -174,6 +192,8 @@ case class BroadcastHashJoinContext(
hasMixedFiltCondition: Boolean,
isExistenceJoin: Boolean,
buildSideStructure: Seq[Attribute],
filterBuildColumns: Array[String],
filterPropagatesNulls: Boolean,
buildHashTableId: String,
isNullAwareAntiJoin: Boolean = false,
bloomFilterPushdownSize: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,16 @@ case class ColumnarBuildSideRelation(
ConverterUtils.genColumnNameWithExprId(attr)
}.toArray

val hashJoinBuilder = HashJoinBuilder.create(runtime)

// Build the hash table
hashTableData = HashJoinBuilder
hashTableData = hashJoinBuilder
.nativeBuild(
broadcastContext.buildHashTableId,
batchArray.toArray,
joinKeys,
broadcastContext.filterBuildColumns,
broadcastContext.filterPropagatesNulls,
broadcastContext.substraitJoinType.ordinal(),
broadcastContext.hasMixedFiltCondition,
broadcastContext.isExistenceJoin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,16 @@ class UnsafeColumnarBuildSideRelation(
ConverterUtils.genColumnNameWithExprId(attr)
}.toArray

val hashJoinBuilder = HashJoinBuilder.create(runtime)

// Build the hash table
hashTableData = HashJoinBuilder
hashTableData = hashJoinBuilder
.nativeBuild(
broadcastContext.buildHashTableId,
batchArray.toArray,
joinKeys,
broadcastContext.filterBuildColumns,
broadcastContext.filterPropagatesNulls,
broadcastContext.substraitJoinType.ordinal(),
broadcastContext.hasMixedFiltCondition,
broadcastContext.isExistenceJoin,
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class VeloxRuntime final : public Runtime {
return veloxPlan_;
}

const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg() const {
return veloxCfg_;
}

bool debugModeEnabled() const {
return debugModeEnabled_;
}
Expand Down
8 changes: 8 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,16 @@ const std::string kAbandonPartialAggregationMinRows =
"spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows";

// hashmap build
const std::string kMinTableRowsForParallelJoinBuild = "spark.gluten.velox.minTableRowsForParallelJoinBuild";
const uint32_t kMinTableRowsForParallelJoinBuildDefault = 1'000;

const std::string kJoinBuildVectorHasherMaxNumDistinct = "spark.gluten.velox.joinBuildVectorHasherMaxNumDistinct";
const uint32_t kJoinBuildVectorHasherMaxNumDistinctDefault = 1'000'000;

const std::string kAbandonDedupHashMapMinRows = "spark.gluten.velox.abandonDedupHashMap.minRows";
const uint32_t kAbandonDedupHashMapMinRowsDefault = 100'000;
const std::string kAbandonDedupHashMapMinPct = "spark.gluten.velox.abandonDedupHashMap.minPct";
const uint32_t kAbandonDedupHashMapMinPctDefault = 0;

// execution
const std::string kSparkBloomFilterExpectedNumItems = "spark.sql.optimizer.runtime.bloomFilter.expectedNumItems";
Expand Down
29 changes: 28 additions & 1 deletion cpp/velox/jni/JniHashTable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <arrow/c/abi.h>

#include <jni/JniCommon.h>
#include <algorithm>
#include "JniHashTable.h"
#include "folly/String.h"
#include "memory/ColumnarBatch.h"
Expand Down Expand Up @@ -57,13 +58,19 @@ jlong JniHashTableContext::callJavaGet(const std::string& id) const {
// Return the velox's hash table.
std::shared_ptr<HashTableBuilder> nativeHashTableBuild(
const std::vector<std::string>& joinKeys,
const std::vector<std::string>& filterBuildColumns,
bool filterPropagatesNulls,
std::vector<std::string> names,
std::vector<facebook::velox::TypePtr> veloxTypeList,
int joinType,
bool hasMixedJoinCondition,
bool isExistenceJoin,
bool isNullAwareAntiJoin,
int64_t bloomFilterPushdownSize,
uint32_t minTableRowsForParallelJoinBuild,
uint32_t joinBuildVectorHasherMaxNumDistinct,
uint32_t abandonHashBuildDedupMinRows,
uint32_t abandonHashBuildDedupMinPct,
std::vector<std::shared_ptr<ColumnarBatch>>& batches,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool) {
auto rowType = std::make_shared<facebook::velox::RowType>(std::move(names), std::move(veloxTypeList));
Expand Down Expand Up @@ -115,18 +122,38 @@ std::shared_ptr<HashTableBuilder> nativeHashTableBuild(
std::make_shared<facebook::velox::core::FieldAccessTypedExpr>(rowType->findChild(name), name));
}

std::vector<uint32_t> filterInputChannels;
filterInputChannels.reserve(filterBuildColumns.size());
for (const auto& name : filterBuildColumns) {
if (const auto idx = rowType->getChildIdxIfExists(name)) {
filterInputChannels.push_back(*idx);
}
}
std::sort(filterInputChannels.begin(), filterInputChannels.end());
filterInputChannels.erase(
std::unique(filterInputChannels.begin(), filterInputChannels.end()), filterInputChannels.end());

auto hashTableBuilder = std::make_shared<HashTableBuilder>(
vJoin,
isNullAwareAntiJoin,
hasMixedJoinCondition,
bloomFilterPushdownSize,
joinKeyTypes,
filterInputChannels,
filterPropagatesNulls,
rowType,
memoryPool.get());
memoryPool.get(),
minTableRowsForParallelJoinBuild,
joinBuildVectorHasherMaxNumDistinct,
abandonHashBuildDedupMinRows,
abandonHashBuildDedupMinPct);

for (auto i = 0; i < batches.size(); i++) {
auto rowVector = VeloxColumnarBatch::from(memoryPool.get(), batches[i])->getRowVector();
hashTableBuilder->addInput(rowVector);
if (hashTableBuilder->noMoreInput()) {
break;
}
}

return hashTableBuilder;
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/jni/JniHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,19 @@ class JniHashTableContext {
// Return the hash table builder address.
std::shared_ptr<HashTableBuilder> nativeHashTableBuild(
const std::vector<std::string>& joinKeys,
const std::vector<std::string>& filterBuildColumns,
bool filterPropagatesNulls,
std::vector<std::string> names,
std::vector<facebook::velox::TypePtr> veloxTypeList,
int joinType,
bool hasMixedJoinCondition,
bool isExistenceJoin,
bool isNullAwareAntiJoin,
int64_t bloomFilterPushdownSize,
uint32_t minTableRowsForParallelJoinBuild,
uint32_t joinBuildVectorHasherMaxNumDistinct,
uint32_t abandonHashBuildDedupMinRows,
uint32_t abandonHashBuildDedupMinPct,
std::vector<std::shared_ptr<ColumnarBatch>>& batches,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool);

Expand Down
44 changes: 41 additions & 3 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "compute/VeloxBackend.h"
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "jni/JniError.h"
#include "jni/JniFileSystem.h"
#include "jni/JniHashTable.h"
Expand Down Expand Up @@ -938,10 +939,12 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrappe

JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_nativeBuild( // NOLINT
JNIEnv* env,
jclass,
jobject wrapper,
jstring tableId,
jlongArray batchHandles,
jobjectArray joinKeys,
jobjectArray filterBuildColumns,
jboolean filterPropagatesNulls,
jint joinType,
jboolean hasMixedJoinCondition,
jboolean isExistenceJoin,
Expand All @@ -950,6 +953,18 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
jlong bloomFilterPushdownSize,
jint numThreads) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto* runtime = dynamic_cast<VeloxRuntime*>(ctx);
GLUTEN_CHECK(runtime != nullptr, "Not a Velox runtime");
const auto& queryConf = *(runtime->veloxCfg());
const auto minTableRowsForParallelJoinBuild =
queryConf.get<uint32_t>(kMinTableRowsForParallelJoinBuild, kMinTableRowsForParallelJoinBuildDefault);
const auto joinBuildVectorHasherMaxNumDistinct =
queryConf.get<uint32_t>(kJoinBuildVectorHasherMaxNumDistinct, kJoinBuildVectorHasherMaxNumDistinctDefault);
const auto abandonHashBuildDedupMinRows =
queryConf.get<uint32_t>(kAbandonDedupHashMapMinRows, kAbandonDedupHashMapMinRowsDefault);
const auto abandonHashBuildDedupMinPct =
queryConf.get<uint32_t>(kAbandonDedupHashMapMinPct, kAbandonDedupHashMapMinPctDefault);
const auto hashTableId = jStringToCString(env, tableId);

// Convert Java String array to C++ vector<string>
Expand All @@ -961,6 +976,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
hashJoinKeys.emplace_back(jStringToCString(env, jkey));
}

std::vector<std::string> filterColumns;
if (filterBuildColumns != nullptr) {
jsize filterColumnsCount = env->GetArrayLength(filterBuildColumns);
filterColumns.reserve(filterColumnsCount);
for (jsize i = 0; i < filterColumnsCount; ++i) {
jstring jcol = (jstring)env->GetObjectArrayElement(filterBuildColumns, i);
filterColumns.emplace_back(jStringToCString(env, jcol));
}
}

const auto inputType = gluten::getByteArrayElementsSafe(env, namedStruct);
std::string structString{
reinterpret_cast<const char*>(inputType.elems()), static_cast<std::string::size_type>(inputType.length())};
Expand Down Expand Up @@ -990,21 +1015,27 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
if (numThreads == 1) {
auto builder = nativeHashTableBuild(
hashJoinKeys,
filterColumns,
filterPropagatesNulls,
names,
veloxTypeList,
joinType,
hasMixedJoinCondition,
isExistenceJoin,
isNullAwareAntiJoin,
bloomFilterPushdownSize,
minTableRowsForParallelJoinBuild,
joinBuildVectorHasherMaxNumDistinct,
abandonHashBuildDedupMinRows,
abandonHashBuildDedupMinPct,
cb,
defaultLeafVeloxMemoryPool());

auto mainTable = builder->uniqueTable();
mainTable->prepareJoinTable(
{},
facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit,
1'000'000,
builder->joinBuildVectorHasherMaxNumDistinct(),
builder->dropDuplicates(),
nullptr);
builder->setHashTable(std::move(mainTable));
Expand All @@ -1027,19 +1058,26 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
// Submit task to thread pool
auto future = folly::via(executor, [&, t, start, end]() {
std::vector<std::shared_ptr<gluten::ColumnarBatch>> threadBatches;
threadBatches.reserve(end - start);
for (size_t i = start; i < end; ++i) {
threadBatches.push_back(cb[i]);
}

auto builder = nativeHashTableBuild(
hashJoinKeys,
filterColumns,
filterPropagatesNulls,
names,
veloxTypeList,
joinType,
hasMixedJoinCondition,
isExistenceJoin,
isNullAwareAntiJoin,
bloomFilterPushdownSize,
minTableRowsForParallelJoinBuild,
joinBuildVectorHasherMaxNumDistinct,
abandonHashBuildDedupMinRows,
abandonHashBuildDedupMinPct,
threadBatches,
defaultLeafVeloxMemoryPool());

Expand Down Expand Up @@ -1067,7 +1105,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
mainTable->prepareJoinTable(
std::move(tables),
facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit,
1'000'000,
hashTableBuilders[0]->joinBuildVectorHasherMaxNumDistinct(),
hashTableBuilders[0]->dropDuplicates(),
allowParallelJoinBuild ? VeloxBackend::get()->executor() : nullptr);

Expand Down
Loading
Loading