diff --git a/src/main/cpp/main/CMakeLists.txt b/src/main/cpp/main/CMakeLists.txt index b9f6de2614..059f086b7e 100644 --- a/src/main/cpp/main/CMakeLists.txt +++ b/src/main/cpp/main/CMakeLists.txt @@ -5,7 +5,6 @@ set(VELOX4J_SOURCES velox4j/jni/JniError.cc velox4j/jni/JniLoader.cc velox4j/jni/JniWrapper.cc - velox4j/jni/StaticJniWrapper.cc velox4j/iterator/UpIterator.cc velox4j/iterator/DownIterator.cc velox4j/lifecycle/ObjectStore.cc diff --git a/src/main/cpp/main/velox4j/jni/JniLoader.cc b/src/main/cpp/main/velox4j/jni/JniLoader.cc index 0917e83f18..81e8dd2ea6 100644 --- a/src/main/cpp/main/velox4j/jni/JniLoader.cc +++ b/src/main/cpp/main/velox4j/jni/JniLoader.cc @@ -19,7 +19,6 @@ #include "velox4j/jni/JniCommon.h" #include "velox4j/jni/JniError.h" #include "velox4j/jni/JniWrapper.h" -#include "velox4j/jni/StaticJniWrapper.h" #include "velox4j/memory/JavaAllocationListener.h" // The JNI entrypoint. @@ -31,7 +30,6 @@ JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM* jvm, void*) { } velox4j::getJniErrorState()->ensureInitialized(env); - velox4j::jniClassRegistry()->add(env, new velox4j::StaticJniWrapper(env)); velox4j::jniClassRegistry()->add(env, new velox4j::JniWrapper(env)); velox4j::jniClassRegistry()->add( env, new velox4j::DownIteratorJniWrapper(env)); diff --git a/src/main/cpp/main/velox4j/jni/JniWrapper.cc b/src/main/cpp/main/velox4j/jni/JniWrapper.cc index 4b71b7feec..bd975d1841 100644 --- a/src/main/cpp/main/velox4j/jni/JniWrapper.cc +++ b/src/main/cpp/main/velox4j/jni/JniWrapper.cc @@ -23,13 +23,17 @@ #include #include "velox4j/arrow/Arrow.h" +#include "velox4j/config/Config.h" #include "velox4j/connector/ExternalStream.h" #include "velox4j/eval/Evaluator.h" +#include "velox4j/init/Init.h" #include "velox4j/iterator/BlockingQueue.h" #include "velox4j/iterator/DownIterator.h" +#include "velox4j/iterator/UpIterator.h" #include "velox4j/jni/JniCommon.h" #include "velox4j/jni/JniError.h" #include "velox4j/lifecycle/Session.h" +#include "velox4j/memory/JavaAllocationListener.h" #include "velox4j/query/QueryExecutor.h" #include "velox4j/vector/Vectors.h" @@ -39,6 +43,37 @@ using namespace facebook::velox; namespace { const char* kClassName = "org/boostscale/velox4j/jni/JniWrapper"; +void initialize0(JNIEnv* env, jclass clazz, jstring globalConfJson) { + JNI_METHOD_START + spotify::jni::JavaString jGlobalConfJson{env, globalConfJson}; + auto dynamic = folly::parseJson(jGlobalConfJson.get()); + auto confArray = ConfigArray::create(dynamic); + initialize(confArray); + JNI_METHOD_END() +} + +jlong createMemoryManager(JNIEnv* env, jclass clazz, jobject jListener) { + JNI_METHOD_START + auto listener = std::make_unique( + std::make_unique(env, jListener), 8 << 10 << 10); + auto mm = std::make_shared(std::move(listener)); + return ObjectStore::global()->save(mm); + JNI_METHOD_END(-1L) +} + +jlong createSession(JNIEnv* env, jclass clazz, long memoryManagerId) { + JNI_METHOD_START + auto mm = ObjectStore::retrieve(memoryManagerId); + return ObjectStore::global()->save(std::make_shared(mm.get())); + JNI_METHOD_END(-1L) +} + +void releaseCppObject(JNIEnv* env, jclass clazz, jlong objId) { + JNI_METHOD_START + ObjectStore::release(objId); + JNI_METHOD_END() +} + /// Get the Velox4J session object that is associated with the current /// JniWrapper. Session* sessionOf(JNIEnv* env, jobject javaThis) { @@ -107,6 +142,20 @@ jlong queryExecutorExecute( JNI_METHOD_END(-1L) } +jint upIteratorAdvance(JNIEnv* env, jclass clazz, jlong itrId) { + JNI_METHOD_START + auto itr = ObjectStore::retrieve(itrId); + return static_cast(itr->advance()); + JNI_METHOD_END(-1) +} + +void upIteratorWait(JNIEnv* env, jclass clazz, jlong itrId) { + JNI_METHOD_START + auto itr = ObjectStore::retrieve(itrId); + itr->wait(); + JNI_METHOD_END() +} + jlong upIteratorGet(JNIEnv* env, jobject javaThis, jlong itrId) { JNI_METHOD_START auto itr = ObjectStore::retrieve(itrId); @@ -114,6 +163,21 @@ jlong upIteratorGet(JNIEnv* env, jobject javaThis, jlong itrId) { JNI_METHOD_END(-1L) } +void blockingQueuePut(JNIEnv* env, jclass clazz, jlong queueId, jlong rvId) { + JNI_METHOD_START + auto queue = ObjectStore::retrieve(queueId); + auto rv = ObjectStore::retrieve(rvId); + queue->put(rv); + JNI_METHOD_END() +} + +void blockingQueueNoMoreInput(JNIEnv* env, jclass clazz, jlong queueId) { + JNI_METHOD_START + auto queue = ObjectStore::retrieve(queueId); + queue->noMoreInput(); + JNI_METHOD_END() +} + jlong createExternalStreamFromDownIterator( JNIEnv* env, jobject javaThis, @@ -131,6 +195,106 @@ jlong createBlockingQueue(JNIEnv* env, jobject javaThis) { JNI_METHOD_END(-1L) } +void serialTaskAddSplit( + JNIEnv* env, + jclass clazz, + jlong stId, + jstring planNodeId, + jint groupId, + jstring connectorSplitJson) { + JNI_METHOD_START + auto serialTask = ObjectStore::retrieve(stId); + spotify::jni::JavaString jPlanNodeId{env, planNodeId}; + spotify::jni::JavaString jConnectorSplitJson{env, connectorSplitJson}; + auto jConnectorSplitDynamic = folly::parseJson(jConnectorSplitJson.get()); + auto connectorSplit = std::const_pointer_cast( + ISerializable::deserialize( + jConnectorSplitDynamic)); + serialTask->addSplit(jPlanNodeId.get(), groupId, connectorSplit); + JNI_METHOD_END() +} + +void serialTaskNoMoreSplits( + JNIEnv* env, + jclass clazz, + jlong stId, + jstring planNodeId) { + JNI_METHOD_START + auto serialTask = ObjectStore::retrieve(stId); + spotify::jni::JavaString jPlanNodeId{env, planNodeId}; + serialTask->noMoreSplits(jPlanNodeId.get()); + JNI_METHOD_END() +} + +jstring serialTaskCollectStats(JNIEnv* env, jclass clazz, jlong stId) { + JNI_METHOD_START + auto serialTask = ObjectStore::retrieve(stId); + const auto stats = serialTask->collectStats(); + const auto statsDynamic = stats->toJson(); + const auto statsJson = folly::toPrettyJson(statsDynamic); + return env->NewStringUTF(statsJson.data()); + JNI_METHOD_END(nullptr) +} + +jstring variantInferType(JNIEnv* env, jclass clazz, jstring json) { + JNI_METHOD_START + spotify::jni::JavaString jJson{env, json}; + auto dynamic = folly::parseJson(jJson.get()); + auto deserialized = variant::create(dynamic); + auto type = deserialized.inferType(); + auto serializedDynamic = type->serialize(); + auto typeJson = folly::toPrettyJson(serializedDynamic); + return env->NewStringUTF(typeJson.data()); + JNI_METHOD_END(nullptr); +} + +jstring variantAsJava(JNIEnv* env, jclass clazz, jlong id) { + JNI_METHOD_START + auto v = ObjectStore::retrieve(id); + auto serializedDynamic = v->serialize(); + auto serializeJson = folly::toPrettyJson(serializedDynamic); + return env->NewStringUTF(serializeJson.data()); + JNI_METHOD_END(nullptr) +} + +jlong variantAsCpp(JNIEnv* env, jobject javaThis, jstring json) { + JNI_METHOD_START + auto session = sessionOf(env, javaThis); + spotify::jni::JavaString jJson{env, json}; + auto dynamic = folly::parseJson(jJson.get()); + auto deserialized = variant::create(dynamic); + return session->objectStore()->save(std::make_shared(deserialized)); + JNI_METHOD_END(-1) +} + +jlong variantToVector( + JNIEnv* env, + jobject javaThis, + jstring typeJson, + jstring variantJson) { + JNI_METHOD_START + auto session = sessionOf(env, javaThis); + auto vectorPool = session->memoryManager()->getVeloxPool( + "BaseVector Memory Pool", memory::MemoryPool::Kind::kLeaf); + spotify::jni::JavaString jTypeJson{env, typeJson}; + spotify::jni::JavaString jVariantJson{env, variantJson}; + auto type = Type::create(folly::parseJson(jTypeJson.get())); + auto variant = variant::create(folly::parseJson(jVariantJson.get())); + auto variantVector = + facebook::velox::variantToVector(type, variant, vectorPool); + return session->objectStore()->save(variantVector); + JNI_METHOD_END(-1) +} + +jstring arrowToType(JNIEnv* env, jclass clazz, jlong cSchema) { + JNI_METHOD_START + auto type = fromArrowToType(reinterpret_cast(cSchema)); + auto serializedDynamic = type->serialize(); + auto typeJson = folly::toPrettyJson(serializedDynamic); + return env->NewStringUTF(typeJson.data()); + JNI_METHOD_END(nullptr) +} + void typeToArrow( JNIEnv* env, jobject javaThis, @@ -148,6 +312,99 @@ void typeToArrow( JNI_METHOD_END() } +void baseVectorToArrow( + JNIEnv* env, + jclass clazz, + jlong vid, + jlong cSchema, + jlong cArray) { + JNI_METHOD_START + auto vector = ObjectStore::retrieve(vid); + fromBaseVectorToArrow( + vector, + reinterpret_cast(cSchema), + reinterpret_cast(cArray)); + JNI_METHOD_END() +} + +jstring baseVectorSerialize(JNIEnv* env, jclass clazz, jlongArray vids) { + JNI_METHOD_START + std::ostringstream out; + auto safeArray = getLongArrayElementsSafe(env, vids); + for (int i = 0; i < safeArray.length(); ++i) { + const jlong& vid = safeArray.elems()[i]; + auto vector = ObjectStore::retrieve(vid); + saveVector(*vector, out); + } + auto serializedData = out.str(); + auto encoded = + encoding::Base64::encode(serializedData.data(), serializedData.size()); + return env->NewStringUTF(encoded.data()); + JNI_METHOD_END(nullptr) +} + +jbyteArray +baseVectorSerializeToBuf(JNIEnv* env, jclass clazz, jlongArray vids) { + JNI_METHOD_START + std::ostringstream out; + auto safeArray = getLongArrayElementsSafe(env, vids); + for (int i = 0; i < safeArray.length(); ++i) { + const jlong& vid = safeArray.elems()[i]; + auto vector = ObjectStore::retrieve(vid); + saveVector(*vector, out); + } + auto serializedData = out.str(); + jbyteArray byteArray = env->NewByteArray(serializedData.size()); + env->SetByteArrayRegion( + byteArray, + 0, + serializedData.size(), + reinterpret_cast(serializedData.data())); + return byteArray; + JNI_METHOD_END(nullptr) +} + +jstring baseVectorGetType(JNIEnv* env, jclass clazz, jlong vid) { + JNI_METHOD_START + auto vector = ObjectStore::retrieve(vid); + auto serializedDynamic = vector->type()->serialize(); + auto serializeJson = folly::toPrettyJson(serializedDynamic); + return env->NewStringUTF(serializeJson.data()); + JNI_METHOD_END(nullptr) +} + +jint baseVectorGetSize(JNIEnv* env, jclass clazz, jlong vid) { + JNI_METHOD_START + auto vector = ObjectStore::retrieve(vid); + return static_cast(vector->size()); + JNI_METHOD_END(-1) +} + +jstring baseVectorGetEncoding(JNIEnv* env, jclass clazz, jlong vid) { + JNI_METHOD_START + auto vector = ObjectStore::retrieve(vid); + auto name = VectorEncoding::mapSimpleToName(vector->encoding()); + return env->NewStringUTF(name.data()); + JNI_METHOD_END(nullptr) +} + +void baseVectorAppend(JNIEnv* env, jclass clazz, jlong vid, jlong toAppendVid) { + JNI_METHOD_START + auto vector = ObjectStore::retrieve(vid); + auto toAppend = ObjectStore::retrieve(toAppendVid); + vector->append(toAppend.get()); + JNI_METHOD_END() +} + +jboolean +selectivityVectorIsValid(JNIEnv* env, jclass clazz, jlong svId, jint idx) { + JNI_METHOD_START + auto vector = ObjectStore::retrieve(svId); + auto valid = vector->isValid(static_cast(idx)); + return static_cast(valid); + JNI_METHOD_END(false) +} + jlong createEmptyBaseVector(JNIEnv* env, jobject javaThis, jstring typeJson) { JNI_METHOD_START auto session = sessionOf(env, javaThis); @@ -466,6 +723,15 @@ jlong createSelectivityVector(JNIEnv* env, jobject javaThis, jint length) { JNI_METHOD_END(-1) } +jstring tableWriteTraitsOutputType(JNIEnv* env, jclass clazz) { + JNI_METHOD_START + auto type = exec::TableWriteTraits::outputType(std::nullopt); + auto serializedDynamic = type->serialize(); + auto typeJson = folly::toPrettyJson(serializedDynamic); + return env->NewStringUTF(typeJson.data()); + JNI_METHOD_END(nullptr) +} + jstring tableWriteTraitsOutputTypeFromColumnStatsSpec( JNIEnv* env, jobject javaThis, @@ -484,6 +750,30 @@ jstring tableWriteTraitsOutputTypeFromColumnStatsSpec( JNI_METHOD_END(nullptr) } +jstring planNodeToString( + JNIEnv* env, + jclass clazz, + jstring planNodeJson, + jboolean detailed, + jboolean recursive) { + JNI_METHOD_START + spotify::jni::JavaString jJson{env, planNodeJson}; + auto dynamic = folly::parseJson(jJson.get()); + auto planNode = ISerializable::deserialize(dynamic); + auto str = planNode->toString(detailed, recursive); + return env->NewStringUTF(str.data()); + JNI_METHOD_END(nullptr) +} + +jstring iSerializableAsJava(JNIEnv* env, jclass clazz, jlong id) { + JNI_METHOD_START + auto iSerializable = ObjectStore::retrieve(id); + auto serializedDynamic = iSerializable->serialize(); + auto serializeJson = folly::toPrettyJson(serializedDynamic); + return env->NewStringUTF(serializeJson.data()); + JNI_METHOD_END(nullptr) +} + jlong iSerializableAsCpp(JNIEnv* env, jobject javaThis, jstring json) { JNI_METHOD_START auto session = sessionOf(env, javaThis); @@ -497,35 +787,6 @@ jlong iSerializableAsCpp(JNIEnv* env, jobject javaThis, jstring json) { JNI_METHOD_END(-1) } -jlong variantAsCpp(JNIEnv* env, jobject javaThis, jstring json) { - JNI_METHOD_START - auto session = sessionOf(env, javaThis); - spotify::jni::JavaString jJson{env, json}; - auto dynamic = folly::parseJson(jJson.get()); - auto deserialized = variant::create(dynamic); - return session->objectStore()->save(std::make_shared(deserialized)); - JNI_METHOD_END(-1) -} - -jlong variantToVector( - JNIEnv* env, - jobject javaThis, - jstring typeJson, - jstring variantJson) { - JNI_METHOD_START - auto session = sessionOf(env, javaThis); - auto vectorPool = session->memoryManager()->getVeloxPool( - "BaseVector Memory Pool", memory::MemoryPool::Kind::kLeaf); - spotify::jni::JavaString jTypeJson{env, typeJson}; - spotify::jni::JavaString jVariantJson{env, variantJson}; - auto type = Type::create(folly::parseJson(jTypeJson.get())); - auto variant = variant::create(folly::parseJson(jVariantJson.get())); - auto variantVector = - facebook::velox::variantToVector(type, variant, vectorPool); - return session->objectStore()->save(variantVector); - JNI_METHOD_END(-1) -} - class ExternalStreamAsUpIterator : public UpIterator { public: explicit ExternalStreamAsUpIterator(const std::shared_ptr& es) @@ -590,7 +851,26 @@ const char* JniWrapper::getCanonicalName() const { void JniWrapper::initialize(JNIEnv* env) { JavaClass::setClass(env); + // Caches the sessionId Java method. cacheMethod(env, "sessionId", kTypeLong, nullptr); + + // All native method definitions. + addNativeMethod( + "initialize", (void*)initialize0, kTypeVoid, kTypeString, nullptr); + addNativeMethod( + "createMemoryManager", + (void*)createMemoryManager, + kTypeLong, + "org/boostscale/velox4j/memory/AllocationListener", + nullptr); + addNativeMethod( + "createSession", (void*)createSession, kTypeLong, kTypeLong, nullptr); + addNativeMethod( + "releaseCppObject", + (void*)releaseCppObject, + kTypeVoid, + kTypeLong, + nullptr); addNativeMethod( "createEvaluator", (void*)createEvaluator, @@ -617,8 +897,29 @@ void JniWrapper::initialize(JNIEnv* env) { kTypeLong, kTypeLong, nullptr); + addNativeMethod( + "upIteratorAdvance", + (void*)upIteratorAdvance, + kTypeInt, + kTypeLong, + nullptr); + addNativeMethod( + "upIteratorWait", (void*)upIteratorWait, kTypeVoid, kTypeLong, nullptr); addNativeMethod( "upIteratorGet", (void*)upIteratorGet, kTypeLong, kTypeLong, nullptr); + addNativeMethod( + "blockingQueuePut", + (void*)blockingQueuePut, + kTypeVoid, + kTypeLong, + kTypeLong, + nullptr); + addNativeMethod( + "blockingQueueNoMoreInput", + (void*)blockingQueueNoMoreInput, + kTypeVoid, + kTypeLong, + nullptr); addNativeMethod( "createExternalStreamFromDownIterator", (void*)createExternalStreamFromDownIterator, @@ -628,11 +929,46 @@ void JniWrapper::initialize(JNIEnv* env) { addNativeMethod( "createBlockingQueue", (void*)createBlockingQueue, kTypeLong, nullptr); addNativeMethod( - "createEmptyBaseVector", - (void*)createEmptyBaseVector, + "serialTaskAddSplit", + (void*)serialTaskAddSplit, + kTypeVoid, + kTypeLong, + kTypeString, + kTypeInt, + kTypeString, + nullptr); + addNativeMethod( + "serialTaskNoMoreSplits", + (void*)serialTaskNoMoreSplits, + kTypeVoid, + kTypeLong, + kTypeString, + nullptr); + addNativeMethod( + "serialTaskCollectStats", + (void*)serialTaskCollectStats, + kTypeString, kTypeLong, + nullptr); + addNativeMethod( + "variantInferType", + (void*)variantInferType, + kTypeString, kTypeString, nullptr); + addNativeMethod( + "variantAsJava", (void*)variantAsJava, kTypeString, kTypeLong, nullptr); + addNativeMethod( + "variantAsCpp", (void*)variantAsCpp, kTypeLong, kTypeString, nullptr); + addNativeMethod( + "variantToVector", + (void*)variantToVector, + kTypeLong, + kTypeString, + kTypeString, + nullptr); + addNativeMethod( + "arrowToType", (void*)arrowToType, kTypeString, kTypeLong, nullptr); addNativeMethod( "typeToArrow", (void*)typeToArrow, @@ -640,6 +976,12 @@ void JniWrapper::initialize(JNIEnv* env) { kTypeString, kTypeLong, nullptr); + addNativeMethod( + "createEmptyBaseVector", + (void*)createEmptyBaseVector, + kTypeLong, + kTypeString, + nullptr); addNativeMethod( "arrowToBaseVector", (void*)arrowToBaseVector, @@ -647,6 +989,58 @@ void JniWrapper::initialize(JNIEnv* env) { kTypeLong, kTypeLong, nullptr); + addNativeMethod( + "baseVectorToArrow", + (void*)baseVectorToArrow, + kTypeVoid, + kTypeLong, + kTypeLong, + kTypeLong, + nullptr); + addNativeMethod( + "baseVectorSerialize", + (void*)baseVectorSerialize, + kTypeString, + kTypeArray(kTypeLong), + nullptr); + addNativeMethod( + "baseVectorSerializeToBuf", + (void*)baseVectorSerializeToBuf, + kTypeArray(kTypeByte), + kTypeArray(kTypeLong), + nullptr); + addNativeMethod( + "baseVectorGetType", + (void*)baseVectorGetType, + kTypeString, + kTypeLong, + nullptr); + addNativeMethod( + "baseVectorGetSize", + (void*)baseVectorGetSize, + kTypeInt, + kTypeLong, + nullptr); + addNativeMethod( + "baseVectorGetEncoding", + (void*)baseVectorGetEncoding, + kTypeString, + kTypeLong, + nullptr); + addNativeMethod( + "baseVectorAppend", + (void*)baseVectorAppend, + kTypeVoid, + kTypeLong, + kTypeLong, + nullptr); + addNativeMethod( + "selectivityVectorIsValid", + (void*)selectivityVectorIsValid, + kTypeBool, + kTypeLong, + kTypeInt, + nullptr); addNativeMethod( "baseVectorDeserialize", (void*)baseVectorDeserialize, @@ -718,6 +1112,11 @@ void JniWrapper::initialize(JNIEnv* env) { kTypeLong, kTypeInt, nullptr); + addNativeMethod( + "tableWriteTraitsOutputType", + (void*)tableWriteTraitsOutputType, + kTypeString, + nullptr); addNativeMethod( "tableWriteTraitsOutputTypeFromColumnStatsSpec", (void*)tableWriteTraitsOutputTypeFromColumnStatsSpec, @@ -725,19 +1124,24 @@ void JniWrapper::initialize(JNIEnv* env) { kTypeString, nullptr); addNativeMethod( - "iSerializableAsCpp", - (void*)iSerializableAsCpp, - kTypeLong, + "planNodeToString", + (void*)planNodeToString, + kTypeString, kTypeString, + kTypeBool, + kTypeBool, nullptr); addNativeMethod( - "variantAsCpp", (void*)variantAsCpp, kTypeLong, kTypeString, nullptr); + "iSerializableAsJava", + (void*)iSerializableAsJava, + kTypeString, + kTypeLong, + nullptr); addNativeMethod( - "variantToVector", - (void*)variantToVector, + "iSerializableAsCpp", + (void*)iSerializableAsCpp, kTypeLong, kTypeString, - kTypeString, nullptr); addNativeMethod( "createUpIteratorWithExternalStream", @@ -746,6 +1150,7 @@ void JniWrapper::initialize(JNIEnv* env) { kTypeLong, nullptr); + // Registers all native methods. registerNativeMethods(env); } diff --git a/src/main/cpp/main/velox4j/jni/StaticJniWrapper.cc b/src/main/cpp/main/velox4j/jni/StaticJniWrapper.cc deleted file mode 100644 index d6d3a7b315..0000000000 --- a/src/main/cpp/main/velox4j/jni/StaticJniWrapper.cc +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox4j/jni/StaticJniWrapper.h" -#include -#include -#include -#include -#include - -#include "velox4j/arrow/Arrow.h" -#include "velox4j/config/Config.h" -#include "velox4j/init/Init.h" -#include "velox4j/iterator/BlockingQueue.h" -#include "velox4j/iterator/UpIterator.h" -#include "velox4j/jni/JniCommon.h" -#include "velox4j/jni/JniError.h" -#include "velox4j/lifecycle/Session.h" -#include "velox4j/memory/JavaAllocationListener.h" -#include "velox4j/query/QueryExecutor.h" - -namespace velox4j { -using namespace facebook::velox; - -namespace { -const char* kClassName = "org/boostscale/velox4j/jni/StaticJniWrapper"; - -void initialize0(JNIEnv* env, jobject javaThis, jstring globalConfJson) { - JNI_METHOD_START - spotify::jni::JavaString jGlobalConfJson{env, globalConfJson}; - auto dynamic = folly::parseJson(jGlobalConfJson.get()); - auto confArray = ConfigArray::create(dynamic); - initialize(confArray); - JNI_METHOD_END() -} - -jlong createMemoryManager(JNIEnv* env, jobject javaThis, jobject jListener) { - JNI_METHOD_START - auto listener = std::make_unique( - std::make_unique(env, jListener), 8 << 10 << 10); - auto mm = std::make_shared(std::move(listener)); - return ObjectStore::global()->save(mm); - JNI_METHOD_END(-1L) -} - -jlong createSession(JNIEnv* env, jobject javaThis, long memoryManagerId) { - JNI_METHOD_START - auto mm = ObjectStore::retrieve(memoryManagerId); - return ObjectStore::global()->save(std::make_shared(mm.get())); - JNI_METHOD_END(-1L) -} - -void releaseCppObject(JNIEnv* env, jobject javaThis, jlong objId) { - JNI_METHOD_START - ObjectStore::release(objId); - JNI_METHOD_END() -} - -jint upIteratorAdvance(JNIEnv* env, jobject javaThis, jlong itrId) { - JNI_METHOD_START - auto itr = ObjectStore::retrieve(itrId); - return static_cast(itr->advance()); - JNI_METHOD_END(-1) -} - -void upIteratorWait(JNIEnv* env, jobject javaThis, jlong itrId) { - JNI_METHOD_START - auto itr = ObjectStore::retrieve(itrId); - itr->wait(); - JNI_METHOD_END() -} - -void blockingQueuePut( - JNIEnv* env, - jobject javaThis, - jlong queueId, - jlong rvId) { - JNI_METHOD_START - auto queue = ObjectStore::retrieve(queueId); - auto rv = ObjectStore::retrieve(rvId); - queue->put(rv); - JNI_METHOD_END() -} - -void blockingQueueNoMoreInput(JNIEnv* env, jobject javaThis, jlong queueId) { - JNI_METHOD_START - auto queue = ObjectStore::retrieve(queueId); - queue->noMoreInput(); - JNI_METHOD_END() -} - -void serialTaskAddSplit( - JNIEnv* env, - jobject javaThis, - jlong stId, - jstring planNodeId, - jint groupId, - jstring connectorSplitJson) { - JNI_METHOD_START - auto serialTask = ObjectStore::retrieve(stId); - spotify::jni::JavaString jPlanNodeId{env, planNodeId}; - spotify::jni::JavaString jConnectorSplitJson{env, connectorSplitJson}; - auto jConnectorSplitDynamic = folly::parseJson(jConnectorSplitJson.get()); - auto connectorSplit = std::const_pointer_cast( - ISerializable::deserialize( - jConnectorSplitDynamic)); - serialTask->addSplit(jPlanNodeId.get(), groupId, connectorSplit); - JNI_METHOD_END() -} - -void serialTaskNoMoreSplits( - JNIEnv* env, - jobject javaThis, - jlong stId, - jstring planNodeId) { - JNI_METHOD_START - auto serialTask = ObjectStore::retrieve(stId); - spotify::jni::JavaString jPlanNodeId{env, planNodeId}; - serialTask->noMoreSplits(jPlanNodeId.get()); - JNI_METHOD_END() -} - -jstring serialTaskCollectStats(JNIEnv* env, jobject javaThis, jlong stId) { - JNI_METHOD_START - auto serialTask = ObjectStore::retrieve(stId); - const auto stats = serialTask->collectStats(); - const auto statsDynamic = stats->toJson(); - const auto statsJson = folly::toPrettyJson(statsDynamic); - return env->NewStringUTF(statsJson.data()); - JNI_METHOD_END(nullptr) -} - -jstring variantInferType(JNIEnv* env, jobject javaThis, jstring json) { - JNI_METHOD_START - spotify::jni::JavaString jJson{env, json}; - auto dynamic = folly::parseJson(jJson.get()); - auto deserialized = variant::create(dynamic); - auto type = deserialized.inferType(); - auto serializedDynamic = type->serialize(); - auto typeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(typeJson.data()); - JNI_METHOD_END(nullptr); -} - -jstring arrowToType(JNIEnv* env, jobject javaThis, jlong cSchema) { - JNI_METHOD_START - auto type = fromArrowToType(reinterpret_cast(cSchema)); - auto serializedDynamic = type->serialize(); - auto typeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(typeJson.data()); - JNI_METHOD_END(nullptr) -} - -void baseVectorToArrow( - JNIEnv* env, - jobject javaThis, - jlong vid, - jlong cSchema, - jlong cArray) { - JNI_METHOD_START - auto vector = ObjectStore::retrieve(vid); - fromBaseVectorToArrow( - vector, - reinterpret_cast(cSchema), - reinterpret_cast(cArray)); - JNI_METHOD_END() -} - -jstring baseVectorSerialize(JNIEnv* env, jobject javaThis, jlongArray vids) { - JNI_METHOD_START - std::ostringstream out; - auto safeArray = getLongArrayElementsSafe(env, vids); - for (int i = 0; i < safeArray.length(); ++i) { - const jlong& vid = safeArray.elems()[i]; - auto vector = ObjectStore::retrieve(vid); - saveVector(*vector, out); - } - auto serializedData = out.str(); - auto encoded = - encoding::Base64::encode(serializedData.data(), serializedData.size()); - return env->NewStringUTF(encoded.data()); - JNI_METHOD_END(nullptr) -} - -// Serialize vectors to a raw byte array (no Base64 encoding). -// More efficient than baseVectorSerialize() for binary transport protocols. -jbyteArray -baseVectorSerializeToBuf(JNIEnv* env, jobject javaThis, jlongArray vids) { - JNI_METHOD_START - std::ostringstream out; - auto safeArray = getLongArrayElementsSafe(env, vids); - for (int i = 0; i < safeArray.length(); ++i) { - const jlong& vid = safeArray.elems()[i]; - auto vector = ObjectStore::retrieve(vid); - saveVector(*vector, out); - } - auto serializedData = out.str(); - jbyteArray byteArray = env->NewByteArray(serializedData.size()); - env->SetByteArrayRegion( - byteArray, - 0, - serializedData.size(), - reinterpret_cast(serializedData.data())); - return byteArray; - JNI_METHOD_END(nullptr) -} - -jstring baseVectorGetType(JNIEnv* env, jobject javaThis, jlong vid) { - JNI_METHOD_START - auto vector = ObjectStore::retrieve(vid); - auto serializedDynamic = vector->type()->serialize(); - auto serializeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(serializeJson.data()); - JNI_METHOD_END(nullptr) -} - -jint baseVectorGetSize(JNIEnv* env, jobject javaThis, jlong vid) { - JNI_METHOD_START - auto vector = ObjectStore::retrieve(vid); - return static_cast(vector->size()); - JNI_METHOD_END(-1) -} - -jstring baseVectorGetEncoding(JNIEnv* env, jobject javaThis, jlong vid) { - JNI_METHOD_START - auto vector = ObjectStore::retrieve(vid); - auto name = VectorEncoding::mapSimpleToName(vector->encoding()); - return env->NewStringUTF(name.data()); - JNI_METHOD_END(nullptr) -} - -void baseVectorAppend( - JNIEnv* env, - jobject javaThis, - jlong vid, - jlong toAppendVid) { - JNI_METHOD_START - auto vector = ObjectStore::retrieve(vid); - auto toAppend = ObjectStore::retrieve(toAppendVid); - vector->append(toAppend.get()); - JNI_METHOD_END() -} - -jboolean -selectivityVectorIsValid(JNIEnv* env, jobject javaThis, jlong svId, jint idx) { - JNI_METHOD_START - auto vector = ObjectStore::retrieve(svId); - auto valid = vector->isValid(static_cast(idx)); - return static_cast(valid); - JNI_METHOD_END(false) -} - -jstring planNodeToString( - JNIEnv* env, - jobject javaThis, - jstring planNodeJson, - jboolean detailed, - jboolean recursive) { - JNI_METHOD_START - spotify::jni::JavaString jJson{env, planNodeJson}; - auto dynamic = folly::parseJson(jJson.get()); - auto planNode = ISerializable::deserialize(dynamic); - auto str = planNode->toString(detailed, recursive); - return env->NewStringUTF(str.data()); - JNI_METHOD_END(nullptr) -} - -jstring iSerializableAsJava(JNIEnv* env, jobject javaThis, jlong id) { - JNI_METHOD_START - auto iSerializable = ObjectStore::retrieve(id); - auto serializedDynamic = iSerializable->serialize(); - auto serializeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(serializeJson.data()); - JNI_METHOD_END(nullptr) -} - -jstring variantAsJava(JNIEnv* env, jobject javaThis, jlong id) { - JNI_METHOD_START - auto v = ObjectStore::retrieve(id); - auto serializedDynamic = v->serialize(); - auto serializeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(serializeJson.data()); - JNI_METHOD_END(nullptr) -} - -jstring -deserializeAndSerializeVariant(JNIEnv* env, jobject javaThis, jstring json) { - JNI_METHOD_START - spotify::jni::JavaString jJson{env, json}; - auto dynamic = folly::parseJson(jJson.get()); - auto deserialized = variant::create(dynamic); - auto serializedDynamic = deserialized.serialize(); - auto serializeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(serializeJson.data()); - JNI_METHOD_END(nullptr) -} - -jstring tableWriteTraitsOutputType(JNIEnv* env, jobject javaThis) { - JNI_METHOD_START - auto type = exec::TableWriteTraits::outputType(std::nullopt); - auto serializedDynamic = type->serialize(); - auto typeJson = folly::toPrettyJson(serializedDynamic); - return env->NewStringUTF(typeJson.data()); - JNI_METHOD_END(nullptr) -} - -} // namespace - -const char* StaticJniWrapper::getCanonicalName() const { - return kClassName; -} - -void StaticJniWrapper::initialize(JNIEnv* env) { - JavaClass::setClass(env); - - addNativeMethod( - "initialize", (void*)initialize0, kTypeVoid, kTypeString, nullptr); - addNativeMethod( - "createMemoryManager", - (void*)createMemoryManager, - kTypeLong, - "org/boostscale/velox4j/memory/AllocationListener", - nullptr); - addNativeMethod( - "createSession", (void*)createSession, kTypeLong, kTypeLong, nullptr); - addNativeMethod( - "releaseCppObject", - (void*)releaseCppObject, - kTypeVoid, - kTypeLong, - nullptr); - addNativeMethod( - "upIteratorAdvance", - (void*)upIteratorAdvance, - kTypeInt, - kTypeLong, - nullptr); - addNativeMethod( - "upIteratorWait", (void*)upIteratorWait, kTypeVoid, kTypeLong, nullptr); - addNativeMethod( - "blockingQueuePut", - (void*)blockingQueuePut, - kTypeVoid, - kTypeLong, - kTypeLong, - nullptr); - addNativeMethod( - "blockingQueueNoMoreInput", - (void*)blockingQueueNoMoreInput, - kTypeVoid, - kTypeLong, - nullptr); - addNativeMethod( - "serialTaskAddSplit", - (void*)serialTaskAddSplit, - kTypeVoid, - kTypeLong, - kTypeString, - kTypeInt, - kTypeString, - nullptr); - addNativeMethod( - "serialTaskNoMoreSplits", - (void*)serialTaskNoMoreSplits, - kTypeVoid, - kTypeLong, - kTypeString, - nullptr); - addNativeMethod( - "serialTaskCollectStats", - (void*)serialTaskCollectStats, - kTypeString, - kTypeLong, - nullptr); - addNativeMethod( - "variantInferType", - (void*)variantInferType, - kTypeString, - kTypeString, - nullptr); - addNativeMethod( - "arrowToType", (void*)arrowToType, kTypeString, kTypeLong, nullptr); - addNativeMethod( - "baseVectorToArrow", - (void*)baseVectorToArrow, - kTypeVoid, - kTypeLong, - kTypeLong, - kTypeLong, - nullptr); - addNativeMethod( - "baseVectorSerialize", - (void*)baseVectorSerialize, - kTypeString, - kTypeArray(kTypeLong), - nullptr); - addNativeMethod( - "baseVectorSerializeToBuf", - (void*)baseVectorSerializeToBuf, - kTypeArray(kTypeByte), - kTypeArray(kTypeLong), - nullptr); - addNativeMethod( - "baseVectorGetType", - (void*)baseVectorGetType, - kTypeString, - kTypeLong, - nullptr); - addNativeMethod( - "baseVectorGetSize", - (void*)baseVectorGetSize, - kTypeInt, - kTypeLong, - nullptr); - addNativeMethod( - "baseVectorGetEncoding", - (void*)baseVectorGetEncoding, - kTypeString, - kTypeLong, - nullptr); - addNativeMethod( - "baseVectorAppend", - (void*)baseVectorAppend, - kTypeVoid, - kTypeLong, - kTypeLong, - nullptr); - addNativeMethod( - "selectivityVectorIsValid", - (void*)selectivityVectorIsValid, - kTypeBool, - kTypeLong, - kTypeInt, - nullptr); - addNativeMethod( - "tableWriteTraitsOutputType", - (void*)tableWriteTraitsOutputType, - kTypeString, - nullptr); - addNativeMethod( - "planNodeToString", - (void*)planNodeToString, - kTypeString, - kTypeString, - kTypeBool, - kTypeBool, - nullptr); - addNativeMethod( - "iSerializableAsJava", - (void*)iSerializableAsJava, - kTypeString, - kTypeLong, - nullptr); - addNativeMethod( - "variantAsJava", (void*)variantAsJava, kTypeString, kTypeLong, nullptr); - - registerNativeMethods(env); -} - -void StaticJniWrapper::mapFields() {} -} // namespace velox4j diff --git a/src/main/cpp/main/velox4j/jni/StaticJniWrapper.h b/src/main/cpp/main/velox4j/jni/StaticJniWrapper.h deleted file mode 100644 index 6c2b74a0c6..0000000000 --- a/src/main/cpp/main/velox4j/jni/StaticJniWrapper.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace velox4j { -/// A static JNI wrapper that is independent to any JNI sessions. -/// All the JNI methods defined in the static JNI wrapper are globally -/// available without having to create a session first. -class StaticJniWrapper final : public spotify::jni::JavaClass { - public: - explicit StaticJniWrapper(JNIEnv* env) : JavaClass(env) { - StaticJniWrapper::initialize(env); - } - - const char* getCanonicalName() const override; - - void initialize(JNIEnv* env) override; - - void mapFields() override; -}; -} // namespace velox4j diff --git a/src/main/java/org/boostscale/velox4j/Velox4j.java b/src/main/java/org/boostscale/velox4j/Velox4j.java index b0104e470e..4d1a7c8168 100644 --- a/src/main/java/org/boostscale/velox4j/Velox4j.java +++ b/src/main/java/org/boostscale/velox4j/Velox4j.java @@ -22,9 +22,9 @@ import org.boostscale.velox4j.config.Config; import org.boostscale.velox4j.exception.VeloxException; +import org.boostscale.velox4j.jni.JniApi; import org.boostscale.velox4j.jni.JniLibLoader; import org.boostscale.velox4j.jni.JniWorkspace; -import org.boostscale.velox4j.jni.StaticJniApi; import org.boostscale.velox4j.memory.AllocationListener; import org.boostscale.velox4j.memory.MemoryManager; import org.boostscale.velox4j.serializable.ISerializableRegistry; @@ -79,12 +79,12 @@ public static void initialize() { * listen on all the native memory allocations. */ public static MemoryManager newMemoryManager(AllocationListener listener) { - return StaticJniApi.get().createMemoryManager(listener); + return JniApi.createMemoryManager(listener); } /** Creates a new Velox4J session. */ public static Session newSession(MemoryManager memoryManager) { - return StaticJniApi.get().createSession(memoryManager); + return JniApi.createSession(memoryManager); } private static void initialize0() { @@ -93,7 +93,7 @@ private static void initialize0() { JniLibLoader.loadAll(JniWorkspace.getDefault().getSubDir("lib")); VariantRegistry.registerAll(); ISerializableRegistry.registerAll(); - StaticJniApi.get().initialize(globalConf); + JniApi.initialize(globalConf); } } } diff --git a/src/main/java/org/boostscale/velox4j/arrow/Arrow.java b/src/main/java/org/boostscale/velox4j/arrow/Arrow.java index e58078477e..f1e4b33623 100644 --- a/src/main/java/org/boostscale/velox4j/arrow/Arrow.java +++ b/src/main/java/org/boostscale/velox4j/arrow/Arrow.java @@ -25,7 +25,6 @@ import org.boostscale.velox4j.data.BaseVector; import org.boostscale.velox4j.data.RowVector; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; import org.boostscale.velox4j.type.RowType; import org.boostscale.velox4j.type.Type; @@ -47,7 +46,7 @@ public Schema toArrowSchema(BufferAllocator alloc, RowType type) { public static RowType fromArrowSchema(BufferAllocator alloc, Schema schema) { try (final ArrowSchema cSchema = ArrowSchema.allocateNew(alloc)) { Data.exportSchema(alloc, schema, null, cSchema); - final RowType type = (RowType) StaticJniApi.get().arrowToRowType(cSchema); + final RowType type = (RowType) JniApi.arrowToRowType(cSchema); return type; } } @@ -63,7 +62,7 @@ public Field toArrowField(BufferAllocator alloc, Type type) { public static Type fromArrowField(BufferAllocator alloc, Field field) { try (final ArrowSchema cSchema = ArrowSchema.allocateNew(alloc)) { Data.exportField(alloc, field, null, cSchema); - final Type type = StaticJniApi.get().arrowToRowType(cSchema); + final Type type = JniApi.arrowToRowType(cSchema); return type; } } @@ -71,7 +70,7 @@ public static Type fromArrowField(BufferAllocator alloc, Field field) { public static FieldVector toArrowVector(BufferAllocator alloc, BaseVector vector) { try (final ArrowSchema cSchema = ArrowSchema.allocateNew(alloc); final ArrowArray cArray = ArrowArray.allocateNew(alloc)) { - StaticJniApi.get().baseVectorToArrow(vector, cSchema, cArray); + JniApi.baseVectorToArrow(vector, cSchema, cArray); final FieldVector fv = Data.importVector(alloc, cArray, cSchema, null); return fv; } @@ -89,7 +88,7 @@ public BaseVector fromArrowVector(BufferAllocator alloc, FieldVector arrowVector public static VectorSchemaRoot toArrowVectorSchemaRoot(BufferAllocator alloc, RowVector vector) { try (final ArrowSchema cSchema = ArrowSchema.allocateNew(alloc); final ArrowArray cArray = ArrowArray.allocateNew(alloc)) { - StaticJniApi.get().baseVectorToArrow(vector, cSchema, cArray); + JniApi.baseVectorToArrow(vector, cSchema, cArray); final VectorSchemaRoot vsr = Data.importVectorSchemaRoot(alloc, cArray, cSchema, null); return vsr; } diff --git a/src/main/java/org/boostscale/velox4j/connector/ExternalStreams.java b/src/main/java/org/boostscale/velox4j/connector/ExternalStreams.java index 161be6c9f6..2f46031249 100644 --- a/src/main/java/org/boostscale/velox4j/connector/ExternalStreams.java +++ b/src/main/java/org/boostscale/velox4j/connector/ExternalStreams.java @@ -16,7 +16,6 @@ import org.boostscale.velox4j.data.RowVector; import org.boostscale.velox4j.iterator.DownIterator; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; /** A factory for creating {@link ExternalStream} instances. */ public class ExternalStreams { @@ -65,11 +64,11 @@ public long id() { } public void put(RowVector rowVector) { - StaticJniApi.get().blockingQueuePut(this, rowVector); + JniApi.blockingQueuePut(this, rowVector); } public void noMoreInput() { - StaticJniApi.get().blockingQueueNoMoreInput(this); + JniApi.blockingQueueNoMoreInput(this); } } } diff --git a/src/main/java/org/boostscale/velox4j/data/BaseVector.java b/src/main/java/org/boostscale/velox4j/data/BaseVector.java index 62c2cf3ec6..74531c81e8 100644 --- a/src/main/java/org/boostscale/velox4j/data/BaseVector.java +++ b/src/main/java/org/boostscale/velox4j/data/BaseVector.java @@ -22,7 +22,6 @@ import org.boostscale.velox4j.exception.VeloxException; import org.boostscale.velox4j.jni.CppObject; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; import org.boostscale.velox4j.type.Type; public class BaseVector implements CppObject { @@ -50,7 +49,7 @@ public long id() { } public Type getType() { - return StaticJniApi.get().baseVectorGetType(this); + return JniApi.baseVectorGetType(this); } public VectorEncoding getEncoding() { @@ -58,7 +57,7 @@ public VectorEncoding getEncoding() { } public int getSize() { - return StaticJniApi.get().baseVectorGetSize(this); + return JniApi.baseVectorGetSize(this); } public BaseVector wrapInConstant(int length, int index) { @@ -70,7 +69,7 @@ public BaseVector slice(int offset, int length) { } public void append(BaseVector toAppend) { - StaticJniApi.get().baseVectorAppend(this, toAppend); + JniApi.baseVectorAppend(this, toAppend); } public BaseVector flattenedVector() { diff --git a/src/main/java/org/boostscale/velox4j/data/BaseVectors.java b/src/main/java/org/boostscale/velox4j/data/BaseVectors.java index 6882e6543c..456a0037fe 100644 --- a/src/main/java/org/boostscale/velox4j/data/BaseVectors.java +++ b/src/main/java/org/boostscale/velox4j/data/BaseVectors.java @@ -19,7 +19,6 @@ import com.google.common.collect.ImmutableList; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; import org.boostscale.velox4j.type.Type; public class BaseVectors { @@ -34,7 +33,7 @@ public BaseVector createEmpty(Type type) { } public static String serializeOne(BaseVector vector) { - return StaticJniApi.get().baseVectorSerialize(ImmutableList.of(vector)); + return JniApi.baseVectorSerialize(ImmutableList.of(vector)); } public BaseVector deserializeOne(String serialized) { @@ -45,19 +44,19 @@ public BaseVector deserializeOne(String serialized) { } public static String serializeAll(List vectors) { - return StaticJniApi.get().baseVectorSerialize(vectors); + return JniApi.baseVectorSerialize(vectors); } /** Serialize a single vector to raw binary bytes (no Base64). Efficient for network transport. */ public static byte[] serializeOneToBuf(BaseVector vector) { - return StaticJniApi.get().baseVectorSerializeToBuf(ImmutableList.of(vector)); + return JniApi.baseVectorSerializeToBuf(ImmutableList.of(vector)); } /** * Serialize multiple vectors to raw binary bytes (no Base64). Efficient for network transport. */ public static byte[] serializeAllToBuf(List vectors) { - return StaticJniApi.get().baseVectorSerializeToBuf(vectors); + return JniApi.baseVectorSerializeToBuf(vectors); } public List deserializeAll(String serialized) { diff --git a/src/main/java/org/boostscale/velox4j/data/SelectivityVector.java b/src/main/java/org/boostscale/velox4j/data/SelectivityVector.java index e315b67d1f..deac6b8e55 100644 --- a/src/main/java/org/boostscale/velox4j/data/SelectivityVector.java +++ b/src/main/java/org/boostscale/velox4j/data/SelectivityVector.java @@ -14,7 +14,7 @@ package org.boostscale.velox4j.data; import org.boostscale.velox4j.jni.CppObject; -import org.boostscale.velox4j.jni.StaticJniApi; +import org.boostscale.velox4j.jni.JniApi; public class SelectivityVector implements CppObject { private final long id; @@ -29,6 +29,6 @@ public long id() { } public boolean isValid(int idx) { - return StaticJniApi.get().selectivityVectorIsValid(this, idx); + return JniApi.selectivityVectorIsValid(this, idx); } } diff --git a/src/main/java/org/boostscale/velox4j/iterator/GenericUpIterator.java b/src/main/java/org/boostscale/velox4j/iterator/GenericUpIterator.java index 985ee7d151..48ae60d098 100644 --- a/src/main/java/org/boostscale/velox4j/iterator/GenericUpIterator.java +++ b/src/main/java/org/boostscale/velox4j/iterator/GenericUpIterator.java @@ -15,7 +15,6 @@ import org.boostscale.velox4j.data.RowVector; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; public class GenericUpIterator implements UpIterator { private final JniApi jniApi; @@ -28,12 +27,12 @@ public GenericUpIterator(JniApi jniApi, long id) { @Override public State advance() { - return StaticJniApi.get().upIteratorAdvance(this); + return JniApi.upIteratorAdvance(this); } @Override public void waitFor() { - StaticJniApi.get().upIteratorWait(this); + JniApi.upIteratorWait(this); } @Override diff --git a/src/main/java/org/boostscale/velox4j/jni/CppObject.java b/src/main/java/org/boostscale/velox4j/jni/CppObject.java index 5a0cd576f0..5ccb58fbd5 100644 --- a/src/main/java/org/boostscale/velox4j/jni/CppObject.java +++ b/src/main/java/org/boostscale/velox4j/jni/CppObject.java @@ -27,6 +27,6 @@ public interface CppObject extends AutoCloseable { */ @Override default void close() { - StaticJniApi.get().releaseCppObject(this); + JniApi.releaseCppObject(this); } } diff --git a/src/main/java/org/boostscale/velox4j/jni/JniApi.java b/src/main/java/org/boostscale/velox4j/jni/JniApi.java index 4a71ef08c9..6cd5b307ea 100644 --- a/src/main/java/org/boostscale/velox4j/jni/JniApi.java +++ b/src/main/java/org/boostscale/velox4j/jni/JniApi.java @@ -21,6 +21,8 @@ import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; +import org.boostscale.velox4j.config.Config; +import org.boostscale.velox4j.connector.ConnectorSplit; import org.boostscale.velox4j.connector.ExternalStream; import org.boostscale.velox4j.connector.ExternalStreams; import org.boostscale.velox4j.data.*; @@ -29,11 +31,15 @@ import org.boostscale.velox4j.iterator.DownIterator; import org.boostscale.velox4j.iterator.GenericUpIterator; import org.boostscale.velox4j.iterator.UpIterator; +import org.boostscale.velox4j.memory.AllocationListener; +import org.boostscale.velox4j.memory.MemoryManager; import org.boostscale.velox4j.partition.PartitionFunction; import org.boostscale.velox4j.partition.PartitionFunctionSpec; +import org.boostscale.velox4j.plan.PlanNode; import org.boostscale.velox4j.query.Query; import org.boostscale.velox4j.query.QueryExecutor; import org.boostscale.velox4j.query.SerialTask; +import org.boostscale.velox4j.query.SerialTaskStats; import org.boostscale.velox4j.serde.Serde; import org.boostscale.velox4j.serializable.ISerializable; import org.boostscale.velox4j.serializable.ISerializableCo; @@ -55,6 +61,26 @@ public final class JniApi { this.jni = jni; } + // Global initialization. + public static void initialize(Config globalConf) { + JniWrapper.initialize(Serde.toPrettyJson(globalConf)); + } + + // Memory. + public static MemoryManager createMemoryManager(AllocationListener listener) { + return new MemoryManager(JniWrapper.createMemoryManager(listener)); + } + + // Lifecycle. + public static LocalSession createSession(MemoryManager memoryManager) { + return new LocalSession(JniWrapper.createSession(memoryManager.id())); + } + + public static void releaseCppObject(CppObject obj) { + JniWrapper.releaseCppObject(obj.id()); + } + + // Expression evaluation. public Evaluator createEvaluator(Evaluation evaluation) { final String evalJson = Serde.toPrettyJson(evaluation); return new Evaluator(this, jni.createEvaluator(evalJson)); @@ -64,6 +90,7 @@ public BaseVector evaluatorEval(Evaluator evaluator, SelectivityVector sv, RowVe return baseVectorWrap(jni.evaluatorEval(evaluator.id(), sv.id(), input.id())); } + // Plan execution. public QueryExecutor createQueryExecutor(Query query) { final String queryJson = Serde.toPrettyJson(query); return new QueryExecutor(this, jni.createQueryExecutor(queryJson)); @@ -78,10 +105,28 @@ public SerialTask queryExecutorExecute(QueryExecutor executor) { return new SerialTask(this, jni.queryExecutorExecute(executor.id())); } + // UpIterator. + public static UpIterator.State upIteratorAdvance(UpIterator itr) { + return UpIterator.State.get(JniWrapper.upIteratorAdvance(itr.id())); + } + + public static void upIteratorWait(UpIterator itr) { + JniWrapper.upIteratorWait(itr.id()); + } + public RowVector upIteratorGet(UpIterator itr) { return baseVectorWrap(jni.upIteratorGet(itr.id())).asRowVector(); } + // DownIterator. + public static void blockingQueuePut(ExternalStreams.BlockingQueue queue, RowVector rowVector) { + JniWrapper.blockingQueuePut(queue.id(), rowVector.id()); + } + + public static void blockingQueueNoMoreInput(ExternalStreams.BlockingQueue queue) { + JniWrapper.blockingQueueNoMoreInput(queue.id()); + } + public ExternalStream createExternalStreamFromDownIterator(DownIterator itr) { return new ExternalStreams.GenericExternalStream(jni.createExternalStreamFromDownIterator(itr)); } @@ -90,11 +135,95 @@ public ExternalStreams.BlockingQueue createBlockingQueue() { return new ExternalStreams.BlockingQueue(jni.createBlockingQueue()); } + // SerialTask. + public static void serialTaskAddSplit( + SerialTask serialTask, String planNodeId, int groupId, ConnectorSplit split) { + final String splitJson = Serde.toJson(split); + JniWrapper.serialTaskAddSplit(serialTask.id(), planNodeId, groupId, splitJson); + } + + public static void serialTaskNoMoreSplits(SerialTask serialTask, String planNodeId) { + JniWrapper.serialTaskNoMoreSplits(serialTask.id(), planNodeId); + } + + public static SerialTaskStats serialTaskCollectStats(SerialTask serialTask) { + final String statsJson = JniWrapper.serialTaskCollectStats(serialTask.id()); + return SerialTaskStats.fromJson(statsJson); + } + + // Variant. + public static Type variantInferType(Variant variant) { + final String variantJson = Serde.toJson(variant); + final String typeJson = JniWrapper.variantInferType(variantJson); + return Serde.fromJson(typeJson, Type.class); + } + + public static Variant variantAsJava(VariantCo co) { + final String json = JniWrapper.variantAsJava(co.id()); + return Serde.fromJson(json, Variant.class); + } + + public VariantCo variantAsCpp(Variant variant) { + final String json = Serde.toPrettyJson(variant); + return new VariantCo(jni.variantAsCpp(json)); + } + + public BaseVector variantToVector(Type type, Variant variant) { + final String typeJson = Serde.toJson(type); + final String variantJson = Serde.toPrettyJson(variant); + return baseVectorWrap(jni.variantToVector(typeJson, variantJson)); + } + + // Type. + public static Type arrowToRowType(ArrowSchema schema) { + try { + final String typeJson = JniWrapper.arrowToType(schema.memoryAddress()); + return Serde.fromJson(typeJson, Type.class); + } finally { + schema.close(); + } + } + public void typeToArrow(Type type, ArrowSchema schema) { final String typeJson = Serde.toJson(type); jni.typeToArrow(typeJson, schema.memoryAddress()); } + // BaseVector / RowVector / SelectivityVector. + public static void baseVectorToArrow(BaseVector vector, ArrowSchema schema, ArrowArray array) { + JniWrapper.baseVectorToArrow(vector.id(), schema.memoryAddress(), array.memoryAddress()); + } + + public static String baseVectorSerialize(List vector) { + return JniWrapper.baseVectorSerialize(vector.stream().mapToLong(BaseVector::id).toArray()); + } + + /** Serialize vectors to raw binary bytes (no Base64 encoding). */ + public static byte[] baseVectorSerializeToBuf(List vector) { + return JniWrapper.baseVectorSerializeToBuf(vector.stream().mapToLong(BaseVector::id).toArray()); + } + + public static Type baseVectorGetType(BaseVector vector) { + final String typeJson = JniWrapper.baseVectorGetType(vector.id()); + return Serde.fromJson(typeJson, Type.class); + } + + public static int baseVectorGetSize(BaseVector vector) { + return JniWrapper.baseVectorGetSize(vector.id()); + } + + public static VectorEncoding baseVectorGetEncoding(BaseVector vector) { + return VectorEncoding.valueOf(JniWrapper.baseVectorGetEncoding(vector.id())); + } + + public static void baseVectorAppend(BaseVector vector, BaseVector toAppend) { + JniWrapper.baseVectorAppend(vector.id(), toAppend.id()); + } + + public static boolean selectivityVectorIsValid(SelectivityVector vector, int idx) { + return JniWrapper.selectivityVectorIsValid(vector.id(), idx); + } + public BaseVector createEmptyBaseVector(Type type) { final String typeJson = Serde.toJson(type); return baseVectorWrap(jni.createEmptyBaseVector(typeJson)); @@ -173,6 +302,13 @@ public SelectivityVector createSelectivityVector(int length) { return new SelectivityVector(jni.createSelectivityVector(length)); } + // TableWrite. + public static RowType tableWriteTraitsOutputType() { + final String typeJson = JniWrapper.tableWriteTraitsOutputType(); + final RowType type = Serde.fromJson(typeJson, RowType.class); + return type; + } + public RowType tableWriteTraitsOutputTypeFromColumnStatsSpec(ColumnStatsSpec columnStatsSpec) { final String columnStatsSpecJson = Serde.toJson(columnStatsSpec); final String typeJson = jni.tableWriteTraitsOutputTypeFromColumnStatsSpec(columnStatsSpecJson); @@ -180,20 +316,20 @@ public RowType tableWriteTraitsOutputTypeFromColumnStatsSpec(ColumnStatsSpec col return type; } - public ISerializableCo iSerializableAsCpp(ISerializable iSerializable) { - final String json = Serde.toPrettyJson(iSerializable); - return new ISerializableCo(jni.iSerializableAsCpp(json)); + // PlanNode. + public static String planNodeToString(PlanNode planNode, boolean detailed, boolean recursive) { + return JniWrapper.planNodeToString(Serde.toPrettyJson(planNode), detailed, recursive); } - public VariantCo variantAsCpp(Variant variant) { - final String json = Serde.toPrettyJson(variant); - return new VariantCo(jni.variantAsCpp(json)); + // Serde. + public static ISerializable iSerializableAsJava(ISerializableCo co) { + final String json = JniWrapper.iSerializableAsJava(co.id()); + return Serde.fromJson(json, ISerializable.class); } - public BaseVector variantToVector(Type type, Variant variant) { - final String typeJson = Serde.toJson(type); - final String variantJson = Serde.toPrettyJson(variant); - return baseVectorWrap(jni.variantToVector(typeJson, variantJson)); + public ISerializableCo iSerializableAsCpp(ISerializable iSerializable) { + final String json = Serde.toPrettyJson(iSerializable); + return new ISerializableCo(jni.iSerializableAsCpp(json)); } @VisibleForTesting @@ -202,8 +338,7 @@ public UpIterator createUpIteratorWithExternalStream(ExternalStream es) { } private BaseVector baseVectorWrap(long id) { - final VectorEncoding encoding = - VectorEncoding.valueOf(StaticJniWrapper.get().baseVectorGetEncoding(id)); + final VectorEncoding encoding = VectorEncoding.valueOf(JniWrapper.baseVectorGetEncoding(id)); return BaseVector.wrap(this, id, encoding); } } diff --git a/src/main/java/org/boostscale/velox4j/jni/JniWrapper.java b/src/main/java/org/boostscale/velox4j/jni/JniWrapper.java index 7ede093af3..c0f728d186 100644 --- a/src/main/java/org/boostscale/velox4j/jni/JniWrapper.java +++ b/src/main/java/org/boostscale/velox4j/jni/JniWrapper.java @@ -16,14 +16,9 @@ import com.google.common.annotations.VisibleForTesting; import org.boostscale.velox4j.iterator.DownIterator; +import org.boostscale.velox4j.memory.AllocationListener; -/** - * A dynamic JniWrapper that includes the JNI methods that are session-aware. Which means, the - * sanity of these methods usually rely on certain objects that were stored in the current session. - * For example, an API that turns a Velox vector into another, then returns it to Java - this method - * will read and write objects from and to the current JNI session storage. So the method will be - * defined in the (dynamic) JniWrapper. - */ +/** JNI native method declarations for both global and session-aware APIs. */ final class JniWrapper { private final long sessionId; @@ -36,6 +31,17 @@ public long sessionId() { return sessionId; } + // Global initialization. + static native void initialize(String globalConfJson); + + // For Memory. + static native long createMemoryManager(AllocationListener listener); + + // For Lifecycle. + static native long createSession(long memoryManagerId); + + static native void releaseCppObject(long objectId); + // Expression evaluation. native long createEvaluator(String evalJson); @@ -47,17 +53,61 @@ public long sessionId() { native long queryExecutorExecute(long id); // For UpIterator. + static native int upIteratorAdvance(long id); + + static native void upIteratorWait(long id); + native long upIteratorGet(long id); // For DownIterator. + static native void blockingQueuePut(long id, long rvId); + + static native void blockingQueueNoMoreInput(long id); + native long createExternalStreamFromDownIterator(DownIterator itr); native long createBlockingQueue(); + // For SerialTask. + static native void serialTaskAddSplit( + long id, String planNodeId, int groupId, String connectorSplitJson); + + static native void serialTaskNoMoreSplits(long id, String planNodeId); + + static native String serialTaskCollectStats(long id); + + // For Variant. + static native String variantInferType(String json); + + static native String variantAsJava(long id); + + native long variantAsCpp(String json); + + native long variantToVector(String typeJson, String variantJson); + // For Type. + static native String arrowToType(long cSchema); + native void typeToArrow(String typeJson, long cSchema); // For BaseVector / RowVector / SelectivityVector. + static native void baseVectorToArrow(long rvid, long cSchema, long cArray); + + static native String baseVectorSerialize(long[] id); + + // Serialize vectors to raw binary (no Base64 encoding). More efficient for network transport. + static native byte[] baseVectorSerializeToBuf(long[] id); + + static native String baseVectorGetType(long id); + + static native int baseVectorGetSize(long id); + + static native String baseVectorGetEncoding(long id); + + static native void baseVectorAppend(long id, long toAppendId); + + static native boolean selectivityVectorIsValid(long id, int idx); + native long createEmptyBaseVector(String typeJson); native long arrowToBaseVector(long cSchema, long cArray); @@ -84,14 +134,17 @@ public long sessionId() { native long createSelectivityVector(int length); // For TableWrite. + static native String tableWriteTraitsOutputType(); + native String tableWriteTraitsOutputTypeFromColumnStatsSpec(String columnStatsSpecJson); - // For serde. - native long iSerializableAsCpp(String json); + // For PlanNode. + static native String planNodeToString(String planNodeJson, boolean detailed, boolean recursive); - native long variantAsCpp(String json); + // For serde. + static native String iSerializableAsJava(long id); - native long variantToVector(String typeJson, String variantJson); + native long iSerializableAsCpp(String json); @VisibleForTesting native long createUpIteratorWithExternalStream(long id); diff --git a/src/main/java/org/boostscale/velox4j/jni/StaticJniApi.java b/src/main/java/org/boostscale/velox4j/jni/StaticJniApi.java deleted file mode 100644 index 59c17f7115..0000000000 --- a/src/main/java/org/boostscale/velox4j/jni/StaticJniApi.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.boostscale.velox4j.jni; - -import java.util.List; - -import org.apache.arrow.c.ArrowArray; -import org.apache.arrow.c.ArrowSchema; - -import org.boostscale.velox4j.config.Config; -import org.boostscale.velox4j.connector.ConnectorSplit; -import org.boostscale.velox4j.connector.ExternalStreams; -import org.boostscale.velox4j.data.BaseVector; -import org.boostscale.velox4j.data.RowVector; -import org.boostscale.velox4j.data.SelectivityVector; -import org.boostscale.velox4j.data.VectorEncoding; -import org.boostscale.velox4j.iterator.UpIterator; -import org.boostscale.velox4j.memory.AllocationListener; -import org.boostscale.velox4j.memory.MemoryManager; -import org.boostscale.velox4j.plan.PlanNode; -import org.boostscale.velox4j.query.SerialTask; -import org.boostscale.velox4j.query.SerialTaskStats; -import org.boostscale.velox4j.serde.Serde; -import org.boostscale.velox4j.serializable.ISerializable; -import org.boostscale.velox4j.serializable.ISerializableCo; -import org.boostscale.velox4j.type.RowType; -import org.boostscale.velox4j.type.Type; -import org.boostscale.velox4j.variant.Variant; -import org.boostscale.velox4j.variant.VariantCo; - -/** The higher-level JNI-based API over {@link StaticJniWrapper}. */ -public class StaticJniApi { - private static final StaticJniApi INSTANCE = new StaticJniApi(); - - public static StaticJniApi get() { - return INSTANCE; - } - - private final StaticJniWrapper jni = StaticJniWrapper.get(); - - private StaticJniApi() {} - - public void initialize(Config globalConf) { - jni.initialize(Serde.toPrettyJson(globalConf)); - } - - public MemoryManager createMemoryManager(AllocationListener listener) { - return new MemoryManager(jni.createMemoryManager(listener)); - } - - public LocalSession createSession(MemoryManager memoryManager) { - return new LocalSession(jni.createSession(memoryManager.id())); - } - - public void releaseCppObject(CppObject obj) { - jni.releaseCppObject(obj.id()); - } - - public UpIterator.State upIteratorAdvance(UpIterator itr) { - return UpIterator.State.get(jni.upIteratorAdvance(itr.id())); - } - - public void upIteratorWait(UpIterator itr) { - jni.upIteratorWait(itr.id()); - } - - public void blockingQueuePut(ExternalStreams.BlockingQueue queue, RowVector rowVector) { - jni.blockingQueuePut(queue.id(), rowVector.id()); - } - - public void blockingQueueNoMoreInput(ExternalStreams.BlockingQueue queue) { - jni.blockingQueueNoMoreInput(queue.id()); - } - - public void serialTaskAddSplit( - SerialTask serialTask, String planNodeId, int groupId, ConnectorSplit split) { - final String splitJson = Serde.toJson(split); - jni.serialTaskAddSplit(serialTask.id(), planNodeId, groupId, splitJson); - } - - public void serialTaskNoMoreSplits(SerialTask serialTask, String planNodeId) { - jni.serialTaskNoMoreSplits(serialTask.id(), planNodeId); - } - - public SerialTaskStats serialTaskCollectStats(SerialTask serialTask) { - final String statsJson = jni.serialTaskCollectStats(serialTask.id()); - return SerialTaskStats.fromJson(statsJson); - } - - public Type variantInferType(Variant variant) { - final String variantJson = Serde.toJson(variant); - final String typeJson = jni.variantInferType(variantJson); - return Serde.fromJson(typeJson, Type.class); - } - - public Type arrowToRowType(ArrowSchema schema) { - try { - final String typeJson = jni.arrowToType(schema.memoryAddress()); - return Serde.fromJson(typeJson, Type.class); - } finally { - schema.close(); - } - } - - public void baseVectorToArrow(BaseVector vector, ArrowSchema schema, ArrowArray array) { - jni.baseVectorToArrow(vector.id(), schema.memoryAddress(), array.memoryAddress()); - } - - public String baseVectorSerialize(List vector) { - return jni.baseVectorSerialize(vector.stream().mapToLong(BaseVector::id).toArray()); - } - - /** Serialize vectors to raw binary bytes (no Base64 encoding). */ - public byte[] baseVectorSerializeToBuf(List vector) { - return jni.baseVectorSerializeToBuf(vector.stream().mapToLong(BaseVector::id).toArray()); - } - - public Type baseVectorGetType(BaseVector vector) { - final String typeJson = jni.baseVectorGetType(vector.id()); - return Serde.fromJson(typeJson, Type.class); - } - - public int baseVectorGetSize(BaseVector vector) { - return jni.baseVectorGetSize(vector.id()); - } - - public VectorEncoding baseVectorGetEncoding(BaseVector vector) { - return VectorEncoding.valueOf(jni.baseVectorGetEncoding(vector.id())); - } - - public void baseVectorAppend(BaseVector vector, BaseVector toAppend) { - jni.baseVectorAppend(vector.id(), toAppend.id()); - } - - public boolean selectivityVectorIsValid(SelectivityVector vector, int idx) { - return jni.selectivityVectorIsValid(vector.id(), idx); - } - - public RowType tableWriteTraitsOutputType() { - final String typeJson = jni.tableWriteTraitsOutputType(); - final RowType type = Serde.fromJson(typeJson, RowType.class); - return type; - } - - public String planNodeToString(PlanNode planNode, boolean detailed, boolean recursive) { - return jni.planNodeToString(Serde.toPrettyJson(planNode), detailed, recursive); - } - - public ISerializable iSerializableAsJava(ISerializableCo co) { - final String json = jni.iSerializableAsJava(co.id()); - return Serde.fromJson(json, ISerializable.class); - } - - public Variant variantAsJava(VariantCo co) { - final String json = jni.variantAsJava(co.id()); - return Serde.fromJson(json, Variant.class); - } -} diff --git a/src/main/java/org/boostscale/velox4j/jni/StaticJniWrapper.java b/src/main/java/org/boostscale/velox4j/jni/StaticJniWrapper.java deleted file mode 100644 index 2b293d38df..0000000000 --- a/src/main/java/org/boostscale/velox4j/jni/StaticJniWrapper.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.boostscale.velox4j.jni; - -import org.boostscale.velox4j.memory.AllocationListener; - -/** - * A static JNI wrapper that is independent to any JNI sessions. All the JNI methods defined in the - * static JNI wrapper are globally available without having to create a session first. - */ -public class StaticJniWrapper { - private static final StaticJniWrapper INSTANCE = new StaticJniWrapper(); - - static StaticJniWrapper get() { - return INSTANCE; - } - - private StaticJniWrapper() {} - - // Global initialization. - native void initialize(String globalConfJson); - - // Memory. - native long createMemoryManager(AllocationListener listener); - - // Lifecycle. - native long createSession(long memoryManagerId); - - native void releaseCppObject(long objectId); - - // For UpIterator. - native int upIteratorAdvance(long id); - - native void upIteratorWait(long id); - - // For DownIterator. - native void blockingQueuePut(long id, long rvId); - - native void blockingQueueNoMoreInput(long id); - - // For SerialTask. - native void serialTaskAddSplit( - long id, String planNodeId, int groupId, String connectorSplitJson); - - native void serialTaskNoMoreSplits(long id, String planNodeId); - - native String serialTaskCollectStats(long id); - - // For Variant. - native String variantInferType(String json); - - // For Type. - native String arrowToType(long cSchema); - - // For BaseVector / RowVector / SelectivityVector. - native void baseVectorToArrow(long rvid, long cSchema, long cArray); - - native String baseVectorSerialize(long[] id); - - // Serialize vectors to raw binary (no Base64 encoding). More efficient for network transport. - native byte[] baseVectorSerializeToBuf(long[] id); - - native String baseVectorGetType(long id); - - native int baseVectorGetSize(long id); - - native String baseVectorGetEncoding(long id); - - native void baseVectorAppend(long id, long toAppendId); - - native boolean selectivityVectorIsValid(long id, int idx); - - // For TableWrite. - native String tableWriteTraitsOutputType(); - - // For PlanNode. - native String planNodeToString(String planNodeJson, boolean detailed, boolean recursive); - - // For serde. - native String iSerializableAsJava(long id); - - native String variantAsJava(long id); -} diff --git a/src/main/java/org/boostscale/velox4j/plan/PlanNode.java b/src/main/java/org/boostscale/velox4j/plan/PlanNode.java index c731bd097d..b95559c754 100644 --- a/src/main/java/org/boostscale/velox4j/plan/PlanNode.java +++ b/src/main/java/org/boostscale/velox4j/plan/PlanNode.java @@ -18,7 +18,7 @@ import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonInclude; -import org.boostscale.velox4j.jni.StaticJniApi; +import org.boostscale.velox4j.jni.JniApi; import org.boostscale.velox4j.serializable.ISerializable; public abstract class PlanNode extends ISerializable { @@ -45,6 +45,6 @@ public String getId() { * @param recursive if true, includes the entire subtree; otherwise just this node */ public String toFormatString(boolean detailed, boolean recursive) { - return StaticJniApi.get().planNodeToString(this, detailed, recursive); + return JniApi.planNodeToString(this, detailed, recursive); } } diff --git a/src/main/java/org/boostscale/velox4j/query/SerialTask.java b/src/main/java/org/boostscale/velox4j/query/SerialTask.java index 55605e121f..559c2ac22a 100644 --- a/src/main/java/org/boostscale/velox4j/query/SerialTask.java +++ b/src/main/java/org/boostscale/velox4j/query/SerialTask.java @@ -17,7 +17,6 @@ import org.boostscale.velox4j.data.RowVector; import org.boostscale.velox4j.iterator.UpIterator; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; /** * An up-iterator implementation that is backed by a Velox task that runs in serial execution mode. @@ -33,12 +32,12 @@ public SerialTask(JniApi jniApi, long id) { @Override public State advance() { - return StaticJniApi.get().upIteratorAdvance(this); + return JniApi.upIteratorAdvance(this); } @Override public void waitFor() { - StaticJniApi.get().upIteratorWait(this); + JniApi.upIteratorWait(this); } @Override @@ -52,14 +51,14 @@ public long id() { } public void addSplit(String planNodeId, ConnectorSplit split) { - StaticJniApi.get().serialTaskAddSplit(this, planNodeId, -1, split); + JniApi.serialTaskAddSplit(this, planNodeId, -1, split); } public void noMoreSplits(String planNodeId) { - StaticJniApi.get().serialTaskNoMoreSplits(this, planNodeId); + JniApi.serialTaskNoMoreSplits(this, planNodeId); } public SerialTaskStats collectStats() { - return StaticJniApi.get().serialTaskCollectStats(this); + return JniApi.serialTaskCollectStats(this); } } diff --git a/src/main/java/org/boostscale/velox4j/serializable/ISerializableCo.java b/src/main/java/org/boostscale/velox4j/serializable/ISerializableCo.java index db2786b1ad..d0b492784a 100644 --- a/src/main/java/org/boostscale/velox4j/serializable/ISerializableCo.java +++ b/src/main/java/org/boostscale/velox4j/serializable/ISerializableCo.java @@ -14,7 +14,7 @@ package org.boostscale.velox4j.serializable; import org.boostscale.velox4j.jni.CppObject; -import org.boostscale.velox4j.jni.StaticJniApi; +import org.boostscale.velox4j.jni.JniApi; /** Binds a CPP ISerializable object. */ public class ISerializableCo implements CppObject { @@ -30,6 +30,6 @@ public long id() { } public ISerializable asJava() { - return StaticJniApi.get().iSerializableAsJava(this); + return JniApi.iSerializableAsJava(this); } } diff --git a/src/main/java/org/boostscale/velox4j/variant/VariantCo.java b/src/main/java/org/boostscale/velox4j/variant/VariantCo.java index c625a73726..a28fcf367a 100644 --- a/src/main/java/org/boostscale/velox4j/variant/VariantCo.java +++ b/src/main/java/org/boostscale/velox4j/variant/VariantCo.java @@ -14,7 +14,7 @@ package org.boostscale.velox4j.variant; import org.boostscale.velox4j.jni.CppObject; -import org.boostscale.velox4j.jni.StaticJniApi; +import org.boostscale.velox4j.jni.JniApi; /** Binds a CPP variant object. */ public class VariantCo implements CppObject { @@ -30,6 +30,6 @@ public long id() { } public Variant asJava() { - return StaticJniApi.get().variantAsJava(this); + return JniApi.variantAsJava(this); } } diff --git a/src/main/java/org/boostscale/velox4j/variant/Variants.java b/src/main/java/org/boostscale/velox4j/variant/Variants.java index 449e5cc94e..86bd2904c0 100644 --- a/src/main/java/org/boostscale/velox4j/variant/Variants.java +++ b/src/main/java/org/boostscale/velox4j/variant/Variants.java @@ -17,7 +17,6 @@ import org.boostscale.velox4j.data.BaseVector; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; import org.boostscale.velox4j.type.Type; public class Variants { @@ -36,7 +35,7 @@ public BaseVector toVector(Type type, Variant variant) { } public Type inferType(Variant variant) { - return StaticJniApi.get().variantInferType(variant); + return JniApi.variantInferType(variant); } public static void checkSameType(List variants) { diff --git a/src/main/java/org/boostscale/velox4j/write/TableWriteTraits.java b/src/main/java/org/boostscale/velox4j/write/TableWriteTraits.java index 2fdf053338..671882f6f5 100644 --- a/src/main/java/org/boostscale/velox4j/write/TableWriteTraits.java +++ b/src/main/java/org/boostscale/velox4j/write/TableWriteTraits.java @@ -14,7 +14,6 @@ package org.boostscale.velox4j.write; import org.boostscale.velox4j.jni.JniApi; -import org.boostscale.velox4j.jni.StaticJniApi; import org.boostscale.velox4j.type.RowType; public class TableWriteTraits { @@ -25,7 +24,7 @@ public TableWriteTraits(JniApi jniApi) { } public static RowType outputType() { - return StaticJniApi.get().tableWriteTraitsOutputType(); + return JniApi.tableWriteTraitsOutputType(); } public RowType outputType(ColumnStatsSpec columnStatsSpec) { diff --git a/src/test/java/org/boostscale/velox4j/jni/JniApiTest.java b/src/test/java/org/boostscale/velox4j/jni/JniApiTest.java index e623f2b6c2..34849dd1d4 100644 --- a/src/test/java/org/boostscale/velox4j/jni/JniApiTest.java +++ b/src/test/java/org/boostscale/velox4j/jni/JniApiTest.java @@ -61,7 +61,7 @@ public static void beforeClass() throws Exception { Velox4jTests.ensureInitialized(); arrowAlloc = new RootAllocator(Long.MAX_VALUE); allocationListener = new BytesAllocationListener(); - memoryManager = StaticJniApi.get().createMemoryManager(allocationListener); + memoryManager = JniApi.createMemoryManager(allocationListener); } @AfterClass @@ -140,10 +140,10 @@ public void testExecuteQueryTwice() { public void testVectorSerdeEmpty() { final LocalSession session = createLocalSession(memoryManager); final JniApi jniApi = getJniApi(session); - final String serialized = StaticJniApi.get().baseVectorSerialize(Collections.emptyList()); + final String serialized = JniApi.baseVectorSerialize(Collections.emptyList()); final List deserialized = jniApi.baseVectorDeserialize(serialized); Assert.assertTrue(deserialized.isEmpty()); - final String serializedSecond = StaticJniApi.get().baseVectorSerialize(deserialized); + final String serializedSecond = JniApi.baseVectorSerialize(deserialized); Assert.assertEquals(serialized, serializedSecond); session.close(); ; @@ -158,7 +158,7 @@ public void testVectorSerdeSingle() { final UpIterator itr = queryExecutor.execute(); final RowVector vector = UpIteratorTests.collectSingleVector(itr); final List vectors = ImmutableList.of(vector); - final String serialized = StaticJniApi.get().baseVectorSerialize(vectors); + final String serialized = JniApi.baseVectorSerialize(vectors); final List deserialized = jniApi.baseVectorDeserialize(serialized); BaseVectorTests.assertEquals(vectors, deserialized); session.close(); @@ -174,7 +174,7 @@ public void testVectorSerdeMultiple() { final UpIterator itr = queryExecutor.execute(); final RowVector vector = UpIteratorTests.collectSingleVector(itr); final List vectors = ImmutableList.of(vector, vector); - final String serialized = StaticJniApi.get().baseVectorSerialize(vectors); + final String serialized = JniApi.baseVectorSerialize(vectors); final List deserialized = jniApi.baseVectorDeserialize(serialized); BaseVectorTests.assertEquals(vectors, deserialized); session.close(); @@ -197,11 +197,9 @@ public void testArrowRoundTrip() { @Test public void testVariantInferType() { - Assert.assertTrue( - StaticJniApi.get().variantInferType(new IntegerValue(5)) instanceof IntegerType); - Assert.assertTrue(StaticJniApi.get().variantInferType(new RealValue(4.6f)) instanceof RealType); - Assert.assertTrue( - StaticJniApi.get().variantInferType(new DoubleValue(4.6d)) instanceof DoubleType); + Assert.assertTrue(JniApi.variantInferType(new IntegerValue(5)) instanceof IntegerType); + Assert.assertTrue(JniApi.variantInferType(new RealValue(4.6f)) instanceof RealType); + Assert.assertTrue(JniApi.variantInferType(new DoubleValue(4.6d)) instanceof DoubleType); } @Test diff --git a/src/test/java/org/boostscale/velox4j/jni/JniApiTests.java b/src/test/java/org/boostscale/velox4j/jni/JniApiTests.java index fed0a481bb..023b66f53f 100644 --- a/src/test/java/org/boostscale/velox4j/jni/JniApiTests.java +++ b/src/test/java/org/boostscale/velox4j/jni/JniApiTests.java @@ -19,7 +19,7 @@ public final class JniApiTests { private JniApiTests() {} public static LocalSession createLocalSession(MemoryManager memoryManager) { - return StaticJniApi.get().createSession(memoryManager); + return JniApi.createSession(memoryManager); } public static JniApi getJniApi(LocalSession session) {