Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions programs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ option (ENABLE_CLICKHOUSE_COPIER "Enable clickhouse-copier" ${ENABLE_CLICKHOUSE_
option (ENABLE_CLICKHOUSE_FORMAT "Enable clickhouse-format" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_OBFUSCATOR "Enable clickhouse-obfuscator" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "Enable clickhouse-odbc-bridge" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_UDF_MANAGER "Enable clickhouse-udf-manager" ${ENABLE_CLICKHOUSE_ALL})

if(NOT (MAKE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES))
set(CLICKHOUSE_ONE_SHARED 1)
Expand Down Expand Up @@ -80,21 +81,22 @@ add_subdirectory (compressor)
add_subdirectory (copier)
add_subdirectory (format)
add_subdirectory (obfuscator)
add_subdirectory (UserDefinedFunctionsManager)

if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()

if (CLICKHOUSE_ONE_SHARED)
add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})
target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK})
target_include_directories(clickhouse-lib ${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_CLIENT_INCLUDE} ${CLICKHOUSE_LOCAL_INCLUDE} ${CLICKHOUSE_BENCHMARK_INCLUDE} ${CLICKHOUSE_COPIER_INCLUDE} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} ${CLICKHOUSE_COMPRESSOR_INCLUDE} ${CLICKHOUSE_FORMAT_INCLUDE} ${CLICKHOUSE_OBFUSCATOR_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE})
add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_UDF_MANAGER_SOURCES})
target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_UDF_MANAGER_LINK})
target_include_directories(clickhouse-lib ${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_CLIENT_INCLUDE} ${CLICKHOUSE_LOCAL_INCLUDE} ${CLICKHOUSE_BENCHMARK_INCLUDE} ${CLICKHOUSE_COPIER_INCLUDE} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} ${CLICKHOUSE_COMPRESSOR_INCLUDE} ${CLICKHOUSE_FORMAT_INCLUDE} ${CLICKHOUSE_OBFUSCATOR_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_UDF_MANAGER_INCLUDE})
set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "")
install (TARGETS clickhouse-lib LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT clickhouse)
endif()

if (CLICKHOUSE_SPLIT_BINARY)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-obfuscator clickhouse-copier)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-obfuscator clickhouse-copier clickhouse-udf-manager)

if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-odbc-bridge)
Expand Down Expand Up @@ -138,6 +140,9 @@ else ()
if (ENABLE_CLICKHOUSE_OBFUSCATOR)
clickhouse_target_link_split_lib(clickhouse obfuscator)
endif ()
if (ENABLE_CLICKHOUSE_UDF_MANAGER)
clickhouse_target_link_split_lib(clickhouse udf-manager)
endif ()

set (CLICKHOUSE_BUNDLE)
if (ENABLE_CLICKHOUSE_SERVER)
Expand Down Expand Up @@ -188,6 +193,11 @@ else ()
if(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge)
endif()
if (ENABLE_CLICKHOUSE_UDF_MANAGER)
add_custom_target (clickhouse-udf-manager ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-udf-manager DEPENDS clickhouse)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-udf-manager DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-udf-manager)
endif ()

install (TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

Expand Down
4 changes: 4 additions & 0 deletions programs/UserDefinedFunctionsManager/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set(CLICKHOUSE_UDF_MANAGER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/UDFManager.cpp)
set(CLICKHOUSE_UDF_MANAGER_LINK PRIVATE dbms ${Boost_PROGRAM_OPTIONS_LIBRARY})

clickhouse_program_add(udf-manager)
48 changes: 48 additions & 0 deletions programs/UserDefinedFunctionsManager/UDFManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include <iostream>

#include <boost/program_options.hpp>

#include <Functions/UserDefinedFunctions/UDFManager.h>
#include <Common/TerminalSize.h>
#include <Common/Exception.h>


int mainEntryClickHouseUDFManager(int argc, char ** argv)
{
try
{
using namespace DB;
namespace po = boost::program_options;

po::options_description description = createOptionsDescription("Options", getTerminalWidth());
description.add_options()("help", "produce help message")(
"uid", po::value<unsigned>(), "Linux user id");

po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run();
po::variables_map options;
po::store(parsed, options);

if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " [options]\n";
return 0;
}

if (options.count("uid"))
{
if (setuid(options["uid"].as<unsigned>()) != 0)
{
std::cerr << "Can not sed uid " << options["uid"].as<unsigned>() << "\n";
return 1;
}
}

return UDFManager().run();
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}
6 changes: 6 additions & 0 deletions programs/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
int mainEntryClickHouseObfuscator(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_UDF || !defined(ENABLE_CLICKHOUSE_UDF)
int mainEntryClickHouseUDFManager(int argc, char ** argv);
#endif


namespace
Expand Down Expand Up @@ -90,6 +93,9 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
{"obfuscator", mainEntryClickHouseObfuscator},
#endif
#if ENABLE_CLICKHOUSE_UDF_MANAGER || !defined(ENABLE_CLICKHOUSE_UDF_MANAGER)
{"udf-manager", mainEntryClickHouseUDFManager},
#endif
};


Expand Down
4 changes: 4 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ list (APPEND dbms_headers TableFunctions/ITableFunction.h TableFunctions/Table
list (APPEND dbms_sources Dictionaries/DictionaryFactory.cpp Dictionaries/DictionarySourceFactory.cpp Dictionaries/DictionaryStructure.cpp Dictionaries/getDictionaryConfigurationFromAST.cpp)
list (APPEND dbms_headers Dictionaries/DictionaryFactory.h Dictionaries/DictionarySourceFactory.h Dictionaries/DictionaryStructure.h Dictionaries/getDictionaryConfigurationFromAST.h)

list (APPEND dbms_sources Functions/UserDefinedFunctions/UDFManager.cpp Functions/UserDefinedFunctions/UDFConnector.cpp Functions/UserDefinedFunctions/UDFSharedMemory.cpp Functions/UserDefinedFunctions/UDFLib.cpp)
list (APPEND dbms_headers Functions/UserDefinedFunctions/UDFManager.h Functions/UserDefinedFunctions/UDFAPI.h Functions/UserDefinedFunctions/UDFConnector.h Functions/UserDefinedFunctions/UDFLib.h)
list (APPEND dbms_headers Functions/UserDefinedFunctions/UDFSharedMemory.h Functions/UserDefinedFunctions/UDFControlCommand.h Functions/UserDefinedFunctions/FunctionUDF.h)

if (NOT ENABLE_SSL)
list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp)
list (REMOVE_ITEM clickhouse_common_io_headers Common/OpenSSLHelpers.h)
Expand Down
26 changes: 22 additions & 4 deletions src/Functions/FunctionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,34 @@ void FunctionFactory::registerFunction(const
Creator creator,
CaseSensitiveness case_sensitiveness)
{
if (!functions.emplace(name, creator).second)
throw Exception("FunctionFactory: the function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);

String function_name_lowercase = Poco::toLower(name);
if (isAlias(name) || isAlias(function_name_lowercase))
throw Exception("FunctionFactory: the function name '" + name + "' is already registered as alias",
ErrorCodes::LOGICAL_ERROR);

if (!functions.emplace(name, creator).second)
throw Exception("FunctionFactory: the function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);

if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_functions.emplace(function_name_lowercase, creator).second)
throw Exception("FunctionFactory: the case insensitive function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}

void FunctionFactory::registerUserDefinedFunction(const std::string & name,
Creator creator)
{
std::unique_lock lock(udf_mutex);
String function_name_lowercase = Poco::toLower(name);
if (isAlias(name) || isAlias(function_name_lowercase))
throw Exception("FunctionFactory: the function name '" + name + "' is already registered as alias",
ErrorCodes::LOGICAL_ERROR);

if (!user_defined_functions.emplace(name, creator).second)
throw Exception("FunctionFactory: the function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}

FunctionOverloadResolverImplPtr FunctionFactory::getImpl(
const std::string & name,
Expand Down Expand Up @@ -77,6 +90,11 @@ FunctionOverloadResolverImplPtr FunctionFactory::tryGetImpl(
if (case_insensitive_functions.end() != it)
return it->second(context);

std::unique_lock lock(udf_mutex);
it = user_defined_functions.find(name);
if (user_defined_functions.end() != it)
return it->second(context);

return {};
}

Expand Down
7 changes: 7 additions & 0 deletions src/Functions/FunctionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class FunctionFactory : private boost::noncopyable, public IFactoryWithAliases<s
registerFunction(name, &Function::create, case_sensitiveness);
}

/// UDF are case sensitive
void registerUserDefinedFunction(const std::string & name, Creator creator);

/// Throws an exception if not found.
FunctionOverloadResolverPtr get(const std::string & name, const Context & context) const;

Expand All @@ -54,6 +57,10 @@ class FunctionFactory : private boost::noncopyable, public IFactoryWithAliases<s

Functions functions;
Functions case_insensitive_functions;
Functions user_defined_functions;

/// For dynamic UDF usage
mutable std::mutex udf_mutex;

template <typename Function>
static FunctionOverloadResolverImplPtr createDefaultFunction(const Context & context)
Expand Down
56 changes: 56 additions & 0 deletions src/Functions/UserDefinedFunctions/FunctionUDF.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#ifndef CLICKHOUSE_FUNCTIONUDF_H
#define CLICKHOUSE_FUNCTIONUDF_H

#include <Functions/IFunction.h>

#include "UDFConnector.h"

namespace DB
{

class FunctionUDF : public IFunction
{
public:
explicit FunctionUDF(std::string name_, UDFConnector &connector_) : name(name_), connector(connector_) {}

std::string getName() const override { return name; }

bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }

DataTypePtr getReturnTypeImpl(const DataTypes & /* arguments */) const override
{
connector.getReturnTypeCall(name); /// @TODO Igr
// DataTypeFactory::instance();
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
}

bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; } /// @TODO Igr

void executeImpl(Block & block, const ColumnNumbers & /* arguments */, size_t result, size_t input_rows_count) override
{
connector.execCall(name); /// @TODO Igr

auto column = block.getByPosition(result).column->cloneResized(input_rows_count);

if (column->isFixedAndContiguous())
{
auto to_alloc = column->getRawData().size;
++to_alloc;
/// SharedMemory.alloc(to_alloc)
}

block.getByPosition(result).column = std::move(column);

throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
}

private:
std::string name;
UDFConnector &connector;
};

}

#endif //CLICKHOUSE_FUNCTIONUDF_H
47 changes: 47 additions & 0 deletions src/Functions/UserDefinedFunctions/UDFAPI.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef CLICKHOUSE_UDFAPI_H
#define CLICKHOUSE_UDFAPI_H

#include <unistd.h>

struct UserDefinedFunctionResult
{
const char * data_type;
void * data;
size_t size;
};

/// All allocated memory will automatically free after ExecImpl return
/// Allocated memory should be used in UserDefinedFunctionResult as data pointer
typedef void * (*AllocateImpl)(size_t size);

/// Return one column of result data
typedef struct UserDefinedFunctionResult (*ExecImpl)(UserDefinedFunctionResult* input, AllocateImpl allocate);
/// Return type of function result (data should be nullptr)
typedef struct UserDefinedFunctionResult (*GetReturnTypeImpl)(UserDefinedFunctionResult* input);

struct UserDefinedFunction
{
/// Name of UDF
const char * name;

/// Accept null terminated list of input columns types (data is nullptr)
/// Returns type of data
GetReturnTypeImpl get_return_type_impl;

/// Accept null terminated list of input columns
/// Returns one column
ExecImpl exec_impl;
};

struct UDFList
{
/// Name of UDF lib
const char * name;
/// Null terminated list
UserDefinedFunction * functions;
};

typedef struct UDFList (*UDFInit)();
typedef void (*UDFListFree)(struct UDFList);

#endif //CLICKHOUSE_UDFAPI_H
59 changes: 59 additions & 0 deletions src/Functions/UserDefinedFunctions/UDFConnector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include "UDFConnector.h"

#include "FunctionUDF.h"


namespace DB
{

void UDFConnector::run()
{
{
std::unique_lock lock(mutex);
if (!manager)
{
throw Exception("No manager", 0); /// @TODO Igr
}
}
while (active)
{
UDFControlCommandResult cmd_result;
cmd_result.read(manager->out);
std::unique_lock lock(mutex);

auto it = result_waiters.find(cmd_result.request_id);
if (it == result_waiters.end())
; /// @TODO Igr Log Here
else
{
it->second.result = std::move(cmd_result);
it->second.done = true;
it->second.cv.notify_all();
}
}
}

void UDFConnector::load(std::string_view filename)
{
UDFControlCommand cmd;
cmd.name = "InitLib";
cmd.args = std::vector<std::string>{std::string(filename)};
auto result = run_command(cmd);
if (!result.isSuccess())
throw Exception(result.message, result.code);

std::string_view funcs(result.message);
while (!funcs.empty())
{
auto pos = funcs.find(' ');
auto name = std::string(funcs.substr(0, pos));
funcs = funcs.substr(pos + 1 ? pos != std::string::npos : pos);
if (!name.empty())
FunctionFactory::instance().registerUserDefinedFunction(name, [=, this](const Context &) {
return std::make_unique<DefaultOverloadResolver>(std::make_unique<FunctionUDF>(name, *this));
});
}
}

}

Loading