Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ repos:
- id: spotless-apply
name: spotless (apply)
language: system
entry: mvn --batch-mode spotless:apply
entry: mvn -q --batch-mode spotless:apply
pass_filenames: false
3 changes: 1 addition & 2 deletions src/main/cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ set(VELOX4J_SOURCES
velox4j/eval/Evaluation.cc
velox4j/eval/Evaluator.cc
velox4j/iterator/BlockingQueue.cc
velox4j/vector/Vectors.cc
velox4j/shuffle/HashPartitioner.cc)
velox4j/vector/Vectors.cc)
set(VELOX4J_INCLUDES ${CMAKE_CURRENT_LIST_DIR} ${velox_SOURCE_DIR}
${JniHelpersLib_SOURCE_DIR})
set(VELOX4J_DEPENDENCIES velox JniHelpers JNI::JNI)
Expand Down
146 changes: 122 additions & 24 deletions src/main/cpp/main/velox4j/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "velox4j/jni/JniError.h"
#include "velox4j/lifecycle/Session.h"
#include "velox4j/query/QueryExecutor.h"
#include "velox4j/shuffle/HashPartitioner.h"
#include "velox4j/vector/Vectors.h"

namespace velox4j {
Expand Down Expand Up @@ -340,32 +339,68 @@ jlongArray rowVectorPartitionByKeys(
JNI_METHOD_END(nullptr)
}

jlongArray rowVectorPartitionByKeyHashes(
jlongArray baseVectorWrapPartitions(
JNIEnv* env,
jobject javaThis,
jlong vid,
jintArray jKeyChannels,
jlong vectorId,
jintArray jPartitions,
jint numPartitions) {
JNI_METHOD_START
auto session = sessionOf(env, javaThis);
auto pool = session->memoryManager()->getVeloxPool(
"Hash Partition Memory Pool", memory::MemoryPool::Kind::kLeaf);
const auto inputRowVector = ObjectStore::retrieve<RowVector>(vid);
"Wrap Partitions Memory Pool", memory::MemoryPool::Kind::kLeaf);
VectorPtr vector = ObjectStore::retrieve<BaseVector>(vectorId);
flattenVector(vector, vector->size());
const auto inputNumRows = vector->size();
auto safeArray = getIntArrayElementsSafe(env, jPartitions);

auto safeArray = getIntArrayElementsSafe(env, jKeyChannels);
std::vector<column_index_t> keyChannels(safeArray.length());
for (jsize i = 0; i < safeArray.length(); ++i) {
keyChannels[i] = safeArray.elems()[i];
std::vector<jlong> outVector(numPartitions, 0);
VELOX_USER_CHECK_EQ(
safeArray.length(),
inputNumRows,
"Expected one partition id per input row");

std::vector<vector_size_t> partitionSizes(numPartitions);
std::vector<BufferPtr> partitionRows(numPartitions);
std::vector<vector_size_t*> rawPartitionRows(numPartitions);
std::fill(partitionSizes.begin(), partitionSizes.end(), 0);

for (int row = 0; row < inputNumRows; ++row) {
const auto partitionId = static_cast<int>(safeArray.elems()[row]);
VELOX_USER_CHECK_GE(partitionId, 0, "partition id must be non-negative");
VELOX_USER_CHECK_LT(
partitionId,
numPartitions,
"partition id {} is out of range for {} partitions",
partitionId,
numPartitions);
++partitionSizes[partitionId];
}

for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
partitionRows[partitionId] =
allocateIndices(partitionSizes[partitionId], pool);
rawPartitionRows[partitionId] =
partitionRows[partitionId]->asMutable<vector_size_t>();
}

HashPartitioner partitioner(std::move(keyChannels), numPartitions, pool);
auto partitions = partitioner.partition(inputRowVector);
std::vector<vector_size_t> partitionNextRowOffset(numPartitions);
std::fill(partitionNextRowOffset.begin(), partitionNextRowOffset.end(), 0);
for (int row = 0; row < inputNumRows; ++row) {
const auto partitionId = static_cast<int>(safeArray.elems()[row]);
rawPartitionRows[partitionId][partitionNextRowOffset[partitionId]] = row;
++partitionNextRowOffset[partitionId];
}

std::vector<jlong> outVector(numPartitions, 0);
for (int pid = 0; pid < numPartitions; ++pid) {
if (partitions[pid] != nullptr) {
outVector[pid] = session->objectStore()->save(partitions[pid]);
for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
const vector_size_t partitionSize = partitionSizes[partitionId];
if (partitionSize == 0) {
continue;
}
VectorPtr partitionVector = partitionSize == inputNumRows
? vector
: wrapInDictionary(partitionSize, partitionRows[partitionId], vector);
outVector[partitionId] = session->objectStore()->save(partitionVector);
}

const jlongArray out = env->NewLongArray(outVector.size());
Expand All @@ -375,6 +410,54 @@ jlongArray rowVectorPartitionByKeyHashes(
JNI_METHOD_END(nullptr)
}

jlong createPartitionFunction(
JNIEnv* env,
jobject javaThis,
jstring specJson,
jint numPartitions,
jboolean localExchange) {
JNI_METHOD_START
auto session = sessionOf(env, javaThis);
auto serdePool = session->memoryManager()->getVeloxPool(
"Partition Function Serde Memory Pool", memory::MemoryPool::Kind::kLeaf);
spotify::jni::JavaString jSpecJson{env, specJson};
auto dynamic = folly::parseJson(jSpecJson.get());
auto spec = ISerializable::deserialize<core::PartitionFunctionSpec>(
dynamic, serdePool);
auto function = std::shared_ptr<core::PartitionFunction>(
spec->create(numPartitions, static_cast<bool>(localExchange)).release());
return session->objectStore()->save(function);
JNI_METHOD_END(-1)
}

jintArray partitionFunctionPartition(
JNIEnv* env,
jobject javaThis,
jlong partitionFunctionId,
jlong rowVectorId) {
JNI_METHOD_START
auto function =
ObjectStore::retrieve<core::PartitionFunction>(partitionFunctionId);
const auto inputRowVector = ObjectStore::retrieve<RowVector>(rowVectorId);

std::vector<uint32_t> partitions;
auto singlePartition = function->partition(*inputRowVector, partitions);
std::vector<jint> outVector;
if (singlePartition.has_value()) {
outVector.assign(
inputRowVector->size(), static_cast<jint>(singlePartition.value()));
} else {
outVector.reserve(partitions.size());
for (const auto partition : partitions) {
outVector.push_back(static_cast<jint>(partition));
}
}
const jintArray out = env->NewIntArray(outVector.size());
env->SetIntArrayRegion(out, 0, outVector.size(), outVector.data());
return out;
JNI_METHOD_END(nullptr)
}

jlong createSelectivityVector(JNIEnv* env, jobject javaThis, jint length) {
JNI_METHOD_START
auto vector =
Expand Down Expand Up @@ -606,6 +689,29 @@ void JniWrapper::initialize(JNIEnv* env) {
kTypeArray(kTypeInt),
kTypeInt,
nullptr);
addNativeMethod(
"baseVectorWrapPartitions",
(void*)baseVectorWrapPartitions,
kTypeArray(kTypeLong),
kTypeLong,
kTypeArray(kTypeInt),
kTypeInt,
nullptr);
addNativeMethod(
"createPartitionFunction",
(void*)createPartitionFunction,
kTypeLong,
kTypeString,
kTypeInt,
kTypeBool,
nullptr);
addNativeMethod(
"partitionFunctionPartition",
(void*)partitionFunctionPartition,
kTypeArray(kTypeInt),
kTypeLong,
kTypeLong,
nullptr);
addNativeMethod(
"createSelectivityVector",
(void*)createSelectivityVector,
Expand Down Expand Up @@ -633,14 +739,6 @@ void JniWrapper::initialize(JNIEnv* env) {
kTypeString,
kTypeString,
nullptr);
addNativeMethod(
"rowVectorPartitionByKeyHashes",
(void*)rowVectorPartitionByKeyHashes,
kTypeArray(kTypeLong),
kTypeLong,
kTypeArray(kTypeInt),
kTypeInt,
nullptr);
addNativeMethod(
"createUpIteratorWithExternalStream",
(void*)createUpIteratorWithExternalStream,
Expand Down
12 changes: 12 additions & 0 deletions src/main/cpp/main/velox4j/vector/Vectors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "velox4j/vector/Vectors.h"

#include <velox/exec/OperatorUtils.h>
#include <velox/vector/ComplexVector.h>
#include <velox/vector/LazyVector.h>

Expand Down Expand Up @@ -75,4 +76,15 @@ void flattenVector(VectorPtr& vector, vector_size_t targetSize) {
vector = vector->slice(0, targetSize);
}
}

VectorPtr wrapInDictionary(
vector_size_t size,
const BufferPtr& indices,
const VectorPtr& vector) {
VELOX_CHECK_NOT_NULL(vector);
if (auto rowVector = std::dynamic_pointer_cast<RowVector>(vector)) {
return exec::wrap(size, indices, rowVector);
}
return BaseVector::wrapInDictionary(nullptr, indices, size, vector);
}
} // namespace velox4j
5 changes: 5 additions & 0 deletions src/main/cpp/main/velox4j/vector/Vectors.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ namespace velox4j {
void flattenVector(
facebook::velox::VectorPtr& vector,
facebook::velox::vector_size_t targetSize);

facebook::velox::VectorPtr wrapInDictionary(
facebook::velox::vector_size_t size,
const facebook::velox::BufferPtr& indices,
const facebook::velox::VectorPtr& vector);
} // namespace velox4j
32 changes: 16 additions & 16 deletions src/main/java/org/boostscale/velox4j/data/RowVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
package org.boostscale.velox4j.data;

import java.util.List;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;

import org.boostscale.velox4j.jni.JniApi;
import org.boostscale.velox4j.plan.partition.HashPartitionFunctionSpec;
import org.boostscale.velox4j.plan.partition.PartitionFunctionSpec;
import org.boostscale.velox4j.partition.PartitionFunction;
import org.boostscale.velox4j.partition.PartitionFunctionSpec;

public class RowVectors {
private final JniApi jniApi;
Expand All @@ -36,28 +37,27 @@ public List<RowVector> partitionByKeys(RowVector rowVector, List<Integer> keyCha
return jniApi.rowVectorPartitionByKeys(rowVector, keyChannels, 128);
}

/**
* Partitions the input RowVector into a list of RowVectors where each one has the same keys
* defined by the key indices of `keyChannels`, with a configurable maximum number of partitions.
*/
public List<RowVector> partitionByKeys(
RowVector rowVector, List<Integer> keyChannels, int maxPartitions) {
Preconditions.checkArgument(
maxPartitions > 0, "maxPartitions must be positive, got %s", maxPartitions);
return jniApi.rowVectorPartitionByKeys(rowVector, keyChannels, maxPartitions);
}

/**
* Partitions a RowVector into numPartitions groups using the given partition function spec.
* Returns a list of size numPartitions where index i contains rows for partition i (null if
* empty).
*
* <p>Currently only {@link HashPartitionFunctionSpec} is supported.
*/
public List<RowVector> partitionBySpec(
RowVector rowVector, PartitionFunctionSpec spec, int numPartitions) {
Preconditions.checkArgument(
numPartitions > 0, "numPartitions must be positive, got %s", numPartitions);
return jniApi.rowVectorPartitionBySpec(rowVector, spec, numPartitions);
try (PartitionFunction partitionFunction =
jniApi.createPartitionFunction(spec, numPartitions, false)) {
final int[] partitions = jniApi.partitionFunctionPartition(partitionFunction, rowVector);
return jniApi.baseVectorWrapPartitions(rowVector, partitions, numPartitions).stream()
.map(
vector -> {
if (vector == null) {
return null;
}
return vector.asRowVector();
})
.collect(Collectors.toList());
}
}
}
36 changes: 18 additions & 18 deletions src/main/java/org/boostscale/velox4j/jni/JniApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;

Expand All @@ -30,8 +29,8 @@
import org.boostscale.velox4j.iterator.DownIterator;
import org.boostscale.velox4j.iterator.GenericUpIterator;
import org.boostscale.velox4j.iterator.UpIterator;
import org.boostscale.velox4j.plan.partition.HashPartitionFunctionSpec;
import org.boostscale.velox4j.plan.partition.PartitionFunctionSpec;
import org.boostscale.velox4j.partition.PartitionFunction;
import org.boostscale.velox4j.partition.PartitionFunctionSpec;
import org.boostscale.velox4j.query.Query;
import org.boostscale.velox4j.query.QueryExecutor;
import org.boostscale.velox4j.query.SerialTask;
Expand Down Expand Up @@ -130,10 +129,6 @@ public BaseVector baseVectorSlice(BaseVector vector, int offset, int length) {
return baseVectorWrap(jni.baseVectorSlice(vector.id(), offset, length));
}

public List<RowVector> rowVectorPartitionByKeys(RowVector vector, List<Integer> keyChannels) {
return rowVectorPartitionByKeys(vector, keyChannels, 128);
}

public List<RowVector> rowVectorPartitionByKeys(
RowVector vector, List<Integer> keyChannels, int maxPartitions) {
final int[] keyChannelArray = keyChannels.stream().mapToInt(i -> i).toArray();
Expand All @@ -144,23 +139,28 @@ public List<RowVector> rowVectorPartitionByKeys(
.collect(Collectors.toList());
}

public List<RowVector> rowVectorPartitionBySpec(
RowVector vector, PartitionFunctionSpec spec, int numPartitions) {
Preconditions.checkArgument(
spec instanceof HashPartitionFunctionSpec,
"Only HashPartitionFunctionSpec is supported, got %s",
spec.getClass().getSimpleName());
HashPartitionFunctionSpec hashSpec = (HashPartitionFunctionSpec) spec;
final int[] keyChannelArray = hashSpec.getKeyChannels().stream().mapToInt(i -> i).toArray();
final long[] vids =
jni.rowVectorPartitionByKeyHashes(vector.id(), keyChannelArray, numPartitions);
public PartitionFunction createPartitionFunction(
PartitionFunctionSpec spec, int numPartitions, boolean localExchange) {
final String specJson = Serde.toPrettyJson(spec);
return new PartitionFunction(
this, jni.createPartitionFunction(specJson, numPartitions, localExchange));
}

public int[] partitionFunctionPartition(
PartitionFunction partitionFunction, RowVector rowVector) {
return jni.partitionFunctionPartition(partitionFunction.id(), rowVector.id());
}

public List<BaseVector> baseVectorWrapPartitions(
BaseVector vector, int[] partitions, int numPartitions) {
final long[] vids = jni.baseVectorWrapPartitions(vector.id(), partitions, numPartitions);
return Arrays.stream(vids)
.mapToObj(
vid -> {
if (vid == 0) {
return null;
}
return baseVectorWrap(vid).asRowVector();
return baseVectorWrap(vid);
})
.collect(Collectors.toList());
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/boostscale/velox4j/jni/JniWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public long sessionId() {

native long[] rowVectorPartitionByKeys(long id, int[] keyChannels, int maxPartitions);

native long[] rowVectorPartitionByKeyHashes(long id, int[] keyChannels, int numPartitions);
native long createPartitionFunction(String specJson, int numPartitions, boolean localExchange);

native int[] partitionFunctionPartition(long partitionFunctionId, long rowVectorId);

native long[] baseVectorWrapPartitions(long vectorId, int[] partitions, int numPartitions);

native long createSelectivityVector(int length);

Expand Down
Loading
Loading