diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index 7cbe2e7a2a65..0ebedc55551c 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -13,6 +13,7 @@ option (ENABLE_CLICKHOUSE_COPIER "Enable clickhouse-copier" ${ENABLE_CLICKHOUSE_ option (ENABLE_CLICKHOUSE_FORMAT "Enable clickhouse-format" ${ENABLE_CLICKHOUSE_ALL}) option (ENABLE_CLICKHOUSE_OBFUSCATOR "Enable clickhouse-obfuscator" ${ENABLE_CLICKHOUSE_ALL}) option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "Enable clickhouse-odbc-bridge" ${ENABLE_CLICKHOUSE_ALL}) +option (ENABLE_CLICKHOUSE_UDF_MANAGER "Enable clickhouse-udf-manager" ${ENABLE_CLICKHOUSE_ALL}) if(NOT (MAKE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES)) set(CLICKHOUSE_ONE_SHARED 1) @@ -80,21 +81,22 @@ add_subdirectory (compressor) add_subdirectory (copier) add_subdirectory (format) add_subdirectory (obfuscator) +add_subdirectory (UserDefinedFunctionsManager) if (ENABLE_CLICKHOUSE_ODBC_BRIDGE) add_subdirectory (odbc-bridge) endif () if (CLICKHOUSE_ONE_SHARED) - add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES}) - target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK}) - target_include_directories(clickhouse-lib ${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_CLIENT_INCLUDE} ${CLICKHOUSE_LOCAL_INCLUDE} ${CLICKHOUSE_BENCHMARK_INCLUDE} ${CLICKHOUSE_COPIER_INCLUDE} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} ${CLICKHOUSE_COMPRESSOR_INCLUDE} ${CLICKHOUSE_FORMAT_INCLUDE} ${CLICKHOUSE_OBFUSCATOR_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE}) + add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_UDF_MANAGER_SOURCES}) + target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_UDF_MANAGER_LINK}) + target_include_directories(clickhouse-lib ${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_CLIENT_INCLUDE} ${CLICKHOUSE_LOCAL_INCLUDE} ${CLICKHOUSE_BENCHMARK_INCLUDE} ${CLICKHOUSE_COPIER_INCLUDE} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} ${CLICKHOUSE_COMPRESSOR_INCLUDE} ${CLICKHOUSE_FORMAT_INCLUDE} ${CLICKHOUSE_OBFUSCATOR_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_UDF_MANAGER_INCLUDE}) set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "") install (TARGETS clickhouse-lib LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT clickhouse) endif() if (CLICKHOUSE_SPLIT_BINARY) - set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-obfuscator clickhouse-copier) + set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-obfuscator clickhouse-copier clickhouse-udf-manager) if (ENABLE_CLICKHOUSE_ODBC_BRIDGE) list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-odbc-bridge) @@ -138,6 +140,9 @@ else () if (ENABLE_CLICKHOUSE_OBFUSCATOR) clickhouse_target_link_split_lib(clickhouse obfuscator) endif () + if (ENABLE_CLICKHOUSE_UDF_MANAGER) + clickhouse_target_link_split_lib(clickhouse udf-manager) + endif () set (CLICKHOUSE_BUNDLE) if (ENABLE_CLICKHOUSE_SERVER) @@ -188,6 +193,11 @@ else () if(ENABLE_CLICKHOUSE_ODBC_BRIDGE) list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge) endif() + if (ENABLE_CLICKHOUSE_UDF_MANAGER) + add_custom_target (clickhouse-udf-manager ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-udf-manager DEPENDS clickhouse) + install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-udf-manager DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) + list(APPEND CLICKHOUSE_BUNDLE clickhouse-udf-manager) + endif () install (TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) diff --git a/programs/UserDefinedFunctionsManager/CMakeLists.txt b/programs/UserDefinedFunctionsManager/CMakeLists.txt new file mode 100644 index 000000000000..058697089196 --- /dev/null +++ b/programs/UserDefinedFunctionsManager/CMakeLists.txt @@ -0,0 +1,4 @@ +set(CLICKHOUSE_UDF_MANAGER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/UDFManager.cpp) +set(CLICKHOUSE_UDF_MANAGER_LINK PRIVATE dbms ${Boost_PROGRAM_OPTIONS_LIBRARY}) + +clickhouse_program_add(udf-manager) diff --git a/programs/UserDefinedFunctionsManager/UDFManager.cpp b/programs/UserDefinedFunctionsManager/UDFManager.cpp new file mode 100644 index 000000000000..47ed17f1eddf --- /dev/null +++ b/programs/UserDefinedFunctionsManager/UDFManager.cpp @@ -0,0 +1,48 @@ +#include + +#include + +#include +#include +#include + + +int mainEntryClickHouseUDFManager(int argc, char ** argv) +{ + try + { + using namespace DB; + namespace po = boost::program_options; + + po::options_description description = createOptionsDescription("Options", getTerminalWidth()); + description.add_options()("help", "produce help message")( + "uid", po::value(), "Linux user id"); + + po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run(); + po::variables_map options; + po::store(parsed, options); + + if (options.count("help")) + { + std::cout << "Usage: " << argv[0] << " [options]\n"; + return 0; + } + + if (options.count("uid")) + { + if (setuid(options["uid"].as()) != 0) + { + std::cerr << "Can not sed uid " << options["uid"].as() << "\n"; + return 1; + } + } + + return UDFManager().run(); + } + catch (...) + { + std::cerr << DB::getCurrentExceptionMessage(true) << "\n"; + auto code = DB::getCurrentExceptionCode(); + return code ? code : 1; + } +} diff --git a/programs/main.cpp b/programs/main.cpp index 3c16fb64a571..3c60677a0c4d 100644 --- a/programs/main.cpp +++ b/programs/main.cpp @@ -52,6 +52,9 @@ int mainEntryClickHouseClusterCopier(int argc, char ** argv); #if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR) int mainEntryClickHouseObfuscator(int argc, char ** argv); #endif +#if ENABLE_CLICKHOUSE_UDF || !defined(ENABLE_CLICKHOUSE_UDF) +int mainEntryClickHouseUDFManager(int argc, char ** argv); +#endif namespace @@ -90,6 +93,9 @@ std::pair clickhouse_applications[] = #if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR) {"obfuscator", mainEntryClickHouseObfuscator}, #endif +#if ENABLE_CLICKHOUSE_UDF_MANAGER || !defined(ENABLE_CLICKHOUSE_UDF_MANAGER) + {"udf-manager", mainEntryClickHouseUDFManager}, +#endif }; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 68f1beadb2b5..20684c4f6f50 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -261,6 +261,10 @@ list (APPEND dbms_headers TableFunctions/ITableFunction.h TableFunctions/Table list (APPEND dbms_sources Dictionaries/DictionaryFactory.cpp Dictionaries/DictionarySourceFactory.cpp Dictionaries/DictionaryStructure.cpp Dictionaries/getDictionaryConfigurationFromAST.cpp) list (APPEND dbms_headers Dictionaries/DictionaryFactory.h Dictionaries/DictionarySourceFactory.h Dictionaries/DictionaryStructure.h Dictionaries/getDictionaryConfigurationFromAST.h) +list (APPEND dbms_sources Functions/UserDefinedFunctions/UDFManager.cpp Functions/UserDefinedFunctions/UDFConnector.cpp Functions/UserDefinedFunctions/UDFSharedMemory.cpp Functions/UserDefinedFunctions/UDFLib.cpp) +list (APPEND dbms_headers Functions/UserDefinedFunctions/UDFManager.h Functions/UserDefinedFunctions/UDFAPI.h Functions/UserDefinedFunctions/UDFConnector.h Functions/UserDefinedFunctions/UDFLib.h) +list (APPEND dbms_headers Functions/UserDefinedFunctions/UDFSharedMemory.h Functions/UserDefinedFunctions/UDFControlCommand.h Functions/UserDefinedFunctions/FunctionUDF.h) + if (NOT ENABLE_SSL) list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp) list (REMOVE_ITEM clickhouse_common_io_headers Common/OpenSSLHelpers.h) diff --git a/src/Functions/FunctionFactory.cpp b/src/Functions/FunctionFactory.cpp index 6ebfbc208cb2..15995463ad0e 100644 --- a/src/Functions/FunctionFactory.cpp +++ b/src/Functions/FunctionFactory.cpp @@ -23,21 +23,34 @@ void FunctionFactory::registerFunction(const Creator creator, CaseSensitiveness case_sensitiveness) { - if (!functions.emplace(name, creator).second) - throw Exception("FunctionFactory: the function name '" + name + "' is not unique", - ErrorCodes::LOGICAL_ERROR); - String function_name_lowercase = Poco::toLower(name); if (isAlias(name) || isAlias(function_name_lowercase)) throw Exception("FunctionFactory: the function name '" + name + "' is already registered as alias", ErrorCodes::LOGICAL_ERROR); + if (!functions.emplace(name, creator).second) + throw Exception("FunctionFactory: the function name '" + name + "' is not unique", + ErrorCodes::LOGICAL_ERROR); + if (case_sensitiveness == CaseInsensitive && !case_insensitive_functions.emplace(function_name_lowercase, creator).second) throw Exception("FunctionFactory: the case insensitive function name '" + name + "' is not unique", ErrorCodes::LOGICAL_ERROR); } +void FunctionFactory::registerUserDefinedFunction(const std::string & name, + Creator creator) +{ + std::unique_lock lock(udf_mutex); + String function_name_lowercase = Poco::toLower(name); + if (isAlias(name) || isAlias(function_name_lowercase)) + throw Exception("FunctionFactory: the function name '" + name + "' is already registered as alias", + ErrorCodes::LOGICAL_ERROR); + + if (!user_defined_functions.emplace(name, creator).second) + throw Exception("FunctionFactory: the function name '" + name + "' is not unique", + ErrorCodes::LOGICAL_ERROR); +} FunctionOverloadResolverImplPtr FunctionFactory::getImpl( const std::string & name, @@ -77,6 +90,11 @@ FunctionOverloadResolverImplPtr FunctionFactory::tryGetImpl( if (case_insensitive_functions.end() != it) return it->second(context); + std::unique_lock lock(udf_mutex); + it = user_defined_functions.find(name); + if (user_defined_functions.end() != it) + return it->second(context); + return {}; } diff --git a/src/Functions/FunctionFactory.h b/src/Functions/FunctionFactory.h index 18847006d085..caf63892a1d5 100644 --- a/src/Functions/FunctionFactory.h +++ b/src/Functions/FunctionFactory.h @@ -39,6 +39,9 @@ class FunctionFactory : private boost::noncopyable, public IFactoryWithAliases static FunctionOverloadResolverImplPtr createDefaultFunction(const Context & context) diff --git a/src/Functions/UserDefinedFunctions/FunctionUDF.h b/src/Functions/UserDefinedFunctions/FunctionUDF.h new file mode 100644 index 000000000000..86e6f13d6922 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/FunctionUDF.h @@ -0,0 +1,56 @@ +#ifndef CLICKHOUSE_FUNCTIONUDF_H +#define CLICKHOUSE_FUNCTIONUDF_H + +#include + +#include "UDFConnector.h" + +namespace DB +{ + +class FunctionUDF : public IFunction +{ +public: + explicit FunctionUDF(std::string name_, UDFConnector &connector_) : name(name_), connector(connector_) {} + + std::string getName() const override { return name; } + + bool isVariadic() const override { return true; } + size_t getNumberOfArguments() const override { return 0; } + + DataTypePtr getReturnTypeImpl(const DataTypes & /* arguments */) const override + { + connector.getReturnTypeCall(name); /// @TODO Igr + // DataTypeFactory::instance(); + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); + } + + bool useDefaultImplementationForConstants() const override { return true; } + ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; } /// @TODO Igr + + void executeImpl(Block & block, const ColumnNumbers & /* arguments */, size_t result, size_t input_rows_count) override + { + connector.execCall(name); /// @TODO Igr + + auto column = block.getByPosition(result).column->cloneResized(input_rows_count); + + if (column->isFixedAndContiguous()) + { + auto to_alloc = column->getRawData().size; + ++to_alloc; + /// SharedMemory.alloc(to_alloc) + } + + block.getByPosition(result).column = std::move(column); + + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); + } + +private: + std::string name; + UDFConnector &connector; +}; + +} + +#endif //CLICKHOUSE_FUNCTIONUDF_H diff --git a/src/Functions/UserDefinedFunctions/UDFAPI.h b/src/Functions/UserDefinedFunctions/UDFAPI.h new file mode 100644 index 000000000000..9d56e5cf401e --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFAPI.h @@ -0,0 +1,47 @@ +#ifndef CLICKHOUSE_UDFAPI_H +#define CLICKHOUSE_UDFAPI_H + +#include + +struct UserDefinedFunctionResult +{ + const char * data_type; + void * data; + size_t size; +}; + +/// All allocated memory will automatically free after ExecImpl return +/// Allocated memory should be used in UserDefinedFunctionResult as data pointer +typedef void * (*AllocateImpl)(size_t size); + +/// Return one column of result data +typedef struct UserDefinedFunctionResult (*ExecImpl)(UserDefinedFunctionResult* input, AllocateImpl allocate); +/// Return type of function result (data should be nullptr) +typedef struct UserDefinedFunctionResult (*GetReturnTypeImpl)(UserDefinedFunctionResult* input); + +struct UserDefinedFunction +{ + /// Name of UDF + const char * name; + + /// Accept null terminated list of input columns types (data is nullptr) + /// Returns type of data + GetReturnTypeImpl get_return_type_impl; + + /// Accept null terminated list of input columns + /// Returns one column + ExecImpl exec_impl; +}; + +struct UDFList +{ + /// Name of UDF lib + const char * name; + /// Null terminated list + UserDefinedFunction * functions; +}; + +typedef struct UDFList (*UDFInit)(); +typedef void (*UDFListFree)(struct UDFList); + +#endif //CLICKHOUSE_UDFAPI_H diff --git a/src/Functions/UserDefinedFunctions/UDFConnector.cpp b/src/Functions/UserDefinedFunctions/UDFConnector.cpp new file mode 100644 index 000000000000..8a5300a102e2 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFConnector.cpp @@ -0,0 +1,59 @@ +#include "UDFConnector.h" + +#include "FunctionUDF.h" + + +namespace DB +{ + +void UDFConnector::run() +{ + { + std::unique_lock lock(mutex); + if (!manager) + { + throw Exception("No manager", 0); /// @TODO Igr + } + } + while (active) + { + UDFControlCommandResult cmd_result; + cmd_result.read(manager->out); + std::unique_lock lock(mutex); + + auto it = result_waiters.find(cmd_result.request_id); + if (it == result_waiters.end()) + ; /// @TODO Igr Log Here + else + { + it->second.result = std::move(cmd_result); + it->second.done = true; + it->second.cv.notify_all(); + } + } +} + +void UDFConnector::load(std::string_view filename) +{ + UDFControlCommand cmd; + cmd.name = "InitLib"; + cmd.args = std::vector{std::string(filename)}; + auto result = run_command(cmd); + if (!result.isSuccess()) + throw Exception(result.message, result.code); + + std::string_view funcs(result.message); + while (!funcs.empty()) + { + auto pos = funcs.find(' '); + auto name = std::string(funcs.substr(0, pos)); + funcs = funcs.substr(pos + 1 ? pos != std::string::npos : pos); + if (!name.empty()) + FunctionFactory::instance().registerUserDefinedFunction(name, [=, this](const Context &) { + return std::make_unique(std::make_unique(name, *this)); + }); + } +} + +} + diff --git a/src/Functions/UserDefinedFunctions/UDFConnector.h b/src/Functions/UserDefinedFunctions/UDFConnector.h new file mode 100644 index 000000000000..6fbdfbe2118e --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFConnector.h @@ -0,0 +1,97 @@ +#ifndef CLICKHOUSE_UDFCONNECTOR_H +#define CLICKHOUSE_UDFCONNECTOR_H + +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "UDFAPI.h" +#include "UDFControlCommand.h" + +namespace DB +{ + +class UDFConnector : private boost::noncopyable +{ +public: + explicit UDFConnector(std::optional uid_) : uid(uid_) + { + std::unique_lock lock(mutex); + std::vector args; + if (uid) + args.push_back("uid=" + std::to_string(uid.value())); + manager = ShellCommand::executeDirect("./clickhouse-udf-manager", args, true); + thread = std::thread([this]() { this->run(); }); + } + + ~UDFConnector() + { + std::unique_lock lock(mutex); + active = false; + for (auto && waiter : result_waiters) + { + waiter.second.done = waiter.second.canceled = true; + waiter.second.cv.notify_all(); + } + thread.join(); + } + + void load(std::string_view filename); + void getReturnTypeCall(std::string_view) {} + void execCall(std::string_view) {} + +private: + void run(); + + UDFControlCommandResult run_command(UDFControlCommand & cmd) + { + std::unique_lock lock(mutex); + if (!active) + return UDFControlCommandResult{1, "Aborted", cmd.request_id}; + + cmd.request_id = last_id++; + auto &waiter = result_waiters[cmd.request_id]; + cmd.write(manager->in); + while (!waiter.done) + waiter.cv.wait(lock); + + auto result = std::move(waiter.result); + if (waiter.canceled) + { + result_waiters.erase(cmd.request_id); + return UDFControlCommandResult{1, "Canceled", cmd.request_id}; + } + result_waiters.erase(cmd.request_id); + lock.unlock(); + return result; + } + + std::thread thread; + std::optional uid; + bool active = true; + + std::mutex mutex; + unsigned last_id = 0; + + struct Unit + { + std::condition_variable cv; + UDFControlCommandResult result; + bool done = false; + bool canceled = false; + }; + std::map result_waiters; + + std::unique_ptr manager; +}; + +} + +#endif //CLICKHOUSE_UDFCONNECTOR_H diff --git a/src/Functions/UserDefinedFunctions/UDFControlCommand.h b/src/Functions/UserDefinedFunctions/UDFControlCommand.h new file mode 100644 index 000000000000..d2d625f4d663 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFControlCommand.h @@ -0,0 +1,76 @@ +#ifndef CLICKHOUSE_UDFCONTROLCOMMAND_H +#define CLICKHOUSE_UDFCONTROLCOMMAND_H + +#include + +#include +#include + +namespace DB +{ + +struct UDFControlCommand +{ + static constexpr auto InitLib = "InitLib"; + static constexpr auto ExecFunc = "ExecFunc"; + static constexpr auto GetReturnType = "GetReturnType"; + static constexpr auto Stop = "Stop"; + + + std::string name; + std::vector args; + unsigned request_id; + + void read(ReadBuffer & in) + { + readVarUInt(request_id, in); + readStringBinary(name, in); + size_t args_num; + readVarUInt(args_num, in); + args.resize(args_num); + for (auto && arg : args) + { + readStringBinary(arg, in); + } + } + + void write(WriteBuffer & out) + { + writeVarUInt(request_id, out); + writeStringBinary(name, out); + writeVarUInt(args.size(), out); + for (auto && arg : args) + { + writeStringBinary(arg, out); + } + out.sync(); + } +}; + +struct UDFControlCommandResult +{ + unsigned code; + std::string message; // Contains error message on error + unsigned request_id; + + bool isSuccess() { return code == 0; } + + void read(ReadBuffer & in) + { + readVarUInt(request_id, in); + readVarUInt(code, in); + readStringBinary(message, in); + } + + void write(WriteBuffer & out) + { + writeVarUInt(request_id, out); + writeVarUInt(code, out); + writeStringBinary(message, out); + out.sync(); + } +}; + +} + +#endif //CLICKHOUSE_UDFCONTROLCOMMAND_H diff --git a/src/Functions/UserDefinedFunctions/UDFLib.cpp b/src/Functions/UserDefinedFunctions/UDFLib.cpp new file mode 100644 index 000000000000..dfc54b31e346 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFLib.cpp @@ -0,0 +1 @@ +#include "UDFLib.h" diff --git a/src/Functions/UserDefinedFunctions/UDFLib.h b/src/Functions/UserDefinedFunctions/UDFLib.h new file mode 100644 index 000000000000..717a8bd910cf --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFLib.h @@ -0,0 +1,158 @@ +#ifndef CLICKHOUSE_UDFLIB_H +#define CLICKHOUSE_UDFLIB_H + +#include + +#include + +#include "UDFControlCommand.h" +#include "UDFAPI.h" + +namespace DB +{ + +class UDFLib +{ +public: + explicit UDFLib(std::string_view fname) : filename(fname) { } + + UDFLib(UDFLib && other) : filename(other.filename), name(other.name), exec_funcs(std::move(other.exec_funcs)), get_return_type_funcs(std::move(other.get_return_type_funcs)), dl_handle(other.dl_handle) + { + other.dl_handle = nullptr; + } + + ~UDFLib() { unload(); } + + UDFControlCommandResult load() + { + UDFControlCommandResult res; + dl_handle = dlopen(filename.data(), RTLD_LAZY); + if (!dl_handle) + { + res.code = 1; + res.message = std::string("Can not load lib ") + filename; + return res; + } + constexpr auto init_function_name = "UDFInit"; + UDFInit init_func = reinterpret_cast(dlsym(dl_handle, init_function_name)); + if (init_func == nullptr) + { + res.code = 1; + res.message = std::string("Can not find init function ") + init_function_name + " in file " + filename; + unload(); + return res; + } + constexpr auto free_function_name = "UDFListFree"; + UDFListFree free_func = reinterpret_cast(dlsym(dl_handle, free_function_name)); + if (free_func == nullptr) + { + res.code = 1; + res.message = std::string("Can not find init function ") + free_function_name + " in file " + filename; + ; + unload(); + return res; + } + UDFList info = init_func(); + auto guard = ext::make_scope_guard([=]() { free_func(info); }); + name = info.name; + res.code = 0; + res.message.clear(); + auto * ptr = info.functions; + while (ptr != nullptr) + { + if (ptr->name == nullptr) + { + res.code = 1; + res.message = "Null pointer for function name in lib " + name; + unload(); + return res; + } + if (ptr->exec_impl == nullptr) + { + res.code = 1; + res.message = "Null pointer for function " + std::string(ptr->name) + " in lib " + name; + unload(); + return res; + } + exec_funcs[ptr->name] = ptr->exec_impl; + get_return_type_funcs[ptr->name] = ptr->get_return_type_impl; + + /// Full function name is libname + '_' + funcname + res.message.append(name); + res.message.push_back('_'); + res.message.append(ptr->name); + res.message.push_back(' '); + ++ptr; + } + return res; + } + + void unload() + { + if (dl_handle == nullptr) + { + return; + } + exec_funcs.clear(); + get_return_type_funcs.clear(); + dlclose(dl_handle); /// @TODO Igr Errors ignored + dl_handle = nullptr; + } + + bool isInited() { return dl_handle != nullptr; } + + std::string_view getName() const { return name; } + + UDFControlCommandResult exec(std::string_view func_name) + { + if (!isInited()) + { + auto res = load(); + if (!res.isSuccess()) + return res; + } + UDFControlCommandResult res; + auto func = exec_funcs.find(std::string(func_name)); + if (func == exec_funcs.end()) + { + res.code = 1; + res.message = std::string("No such function ") + std::string(func_name); + return res; + } + func->second(nullptr, nullptr); /// @TODO Igr args + res.code = 0; + return res; + } + + UDFControlCommandResult get_return_type(std::string_view func_name) + { + if (!isInited()) + { + auto res = load(); + if (!res.isSuccess()) + return res; + } + UDFControlCommandResult res; + auto func = get_return_type_funcs.find(std::string(func_name)); + if (func == get_return_type_funcs.end()) + { + res.code = 1; + res.message = std::string("No such function ") + std::string(func_name); + return res; + } + func->second(nullptr); /// @TODO Igr args + res.code = 0; + return res; + } + +private: + std::string filename; + std::string name; + std::unordered_map exec_funcs; + std::unordered_map get_return_type_funcs; + void * dl_handle = nullptr; +}; + +} + +#endif //CLICKHOUSE_UDFLIB_H diff --git a/src/Functions/UserDefinedFunctions/UDFManager.cpp b/src/Functions/UserDefinedFunctions/UDFManager.cpp new file mode 100644 index 000000000000..6a2af7efcb05 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFManager.cpp @@ -0,0 +1,60 @@ +#include "UDFManager.h" + +namespace DB +{ + +UDFControlCommandResult UDFManager::initLib(const std::string & filename) +{ + UDFLib lib(filename); + UDFControlCommandResult result = lib.load(); + if (result.isSuccess()) + { + std::string name = std::string(lib.getName()); + if (libs.find(filename) != libs.end()) + { + result.code = 1; + result.message = "Lib " + name + " already exists"; + } + else + libs.insert({std::move(name), std::move(lib)}); + } + return result; +} + +int UDFManager::run() +{ + UDFControlCommand command; + UDFControlCommandResult result; + bool active = true; + while (active) + { + result.message.clear(); + result.code = 0; + command.read(in); + if (command.name == UDFControlCommand::InitLib) + { + /// Arg is filename + if (command.args.size() != 1) + { + result.code = 1; + result.message = "Wrong arguments"; + } + else + result = initLib(command.args[0]); + } + else if (command.name == UDFControlCommand::ExecFunc) + { + /// Args is data_pointer and library_name + "_" + function_name + ; + } + else if (command.name == UDFControlCommand::Stop) + { + active = false; + } + result.request_id = command.request_id; + result.write(out); + } + return 0; +} + +} diff --git a/src/Functions/UserDefinedFunctions/UDFManager.h b/src/Functions/UserDefinedFunctions/UDFManager.h new file mode 100644 index 000000000000..30974276b0f8 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFManager.h @@ -0,0 +1,44 @@ +#ifndef CLICKHOUSE_UDFMANAGER_H +#define CLICKHOUSE_UDFMANAGER_H + +#include +#include + +#include + +#include +#include +#include + +#include +#include + +#include +#include + +#include "UDFControlCommand.h" +#include "UDFLib.h" + + +namespace DB +{ + +class UDFManager : private boost::noncopyable +{ +public: + UDFManager() : in(std::cin, 16), out(std::cout, 16) { } + ~UDFManager() = default; + + int run(); + +private: + UDFControlCommandResult initLib(const std::string & filename); + + ReadBufferFromIStream in; + WriteBufferFromOStream out; + std::map libs; +}; + +} + +#endif //CLICKHOUSE_UDFMANAGER_H diff --git a/src/Functions/UserDefinedFunctions/UDFSharedMemory.cpp b/src/Functions/UserDefinedFunctions/UDFSharedMemory.cpp new file mode 100644 index 000000000000..caebcc486bb9 --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFSharedMemory.cpp @@ -0,0 +1 @@ +#include "UDFSharedMemory.h" diff --git a/src/Functions/UserDefinedFunctions/UDFSharedMemory.h b/src/Functions/UserDefinedFunctions/UDFSharedMemory.h new file mode 100644 index 000000000000..5f654de7978a --- /dev/null +++ b/src/Functions/UserDefinedFunctions/UDFSharedMemory.h @@ -0,0 +1,16 @@ +#ifndef CLICKHOUSE_UDFSHAREDMEMORY_H +#define CLICKHOUSE_UDFSHAREDMEMORY_H + +#include + + +class UDFSharedMemory +{ + template + UDFSharedMemory(std::string_view name, OpenTag openTag, AccessTag accessTag) : memory(accessTag, name, openTag) { } + + boost::interprocess::shared_memory_object memory; +}; + + +#endif //CLICKHOUSE_UDFSHAREDMEMORY_H diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 59861631544d..715ba1325799 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -58,6 +58,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -339,6 +340,8 @@ struct ContextShared /// Storage policy chooser for MergeTree engines mutable std::shared_ptr merge_tree_storage_policy_selector; + UDFManager user_defined_functions_manager; /// Manager of user defined functions + std::optional merge_tree_settings; /// Settings of MergeTree* engines. std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default) std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default) diff --git a/src/Storages/System/StorageSystemFunctions.cpp b/src/Storages/System/StorageSystemFunctions.cpp index e46b7007dc23..44e2be03f689 100644 --- a/src/Storages/System/StorageSystemFunctions.cpp +++ b/src/Storages/System/StorageSystemFunctions.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include