diff --git a/.gitignore b/.gitignore index 4933a2d..06787ee 100644 --- a/.gitignore +++ b/.gitignore @@ -35,9 +35,11 @@ .idea /build +/build-ex /cmake-build-* /.build /.venv /.vcpkg /.build* +/vcpkg_installed diff --git a/CMakeLists.txt b/CMakeLists.txt index 58ea9e1..31a6fd3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(CMAKE_EXPERIMENTAL_CXX_MODULE_CMAKE_API ON) project(cpp-jam VERSION 0.0.1 @@ -75,6 +76,12 @@ find_package(Boost.DI CONFIG REQUIRED) find_package(qtils CONFIG REQUIRED) find_package(prometheus-cpp CONFIG REQUIRED) +if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + add_compile_options(-fmodules-ts) +elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang|AppleClang") + add_compile_options(-fmodules) +endif() + add_library(headers INTERFACE) target_include_directories(headers INTERFACE $ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 08f0356..87a038b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -24,3 +24,8 @@ add_subdirectory(metrics) # Clocks and time subsystem add_subdirectory(clock) +# Subscription Engine subsystem +add_subdirectory(se) + +# Modules subsystem +add_subdirectory(modules) diff --git a/src/app/CMakeLists.txt b/src/app/CMakeLists.txt index df1bc36..023e182 100644 --- a/src/app/CMakeLists.txt +++ b/src/app/CMakeLists.txt @@ -28,7 +28,9 @@ add_library(build_version add_library(app_configuration SHARED configuration.cpp) target_link_libraries(app_configuration - Boost::boost) + Boost::boost + fmt::fmt +) add_library(app_configurator SHARED configurator.cpp) target_link_libraries(app_configurator diff --git a/src/app/configuration.hpp b/src/app/configuration.hpp index 380dff5..2132356 100644 --- a/src/app/configuration.hpp +++ b/src/app/configuration.hpp @@ -5,6 +5,7 @@ #pragma once +#include #include #include @@ -22,6 +23,7 @@ namespace jam::app { [[nodiscard]] std::string nodeVersion() const; [[nodiscard]] std::string nodeName() const; + [[nodiscard]] std::optional metricsEndpoint() const; private: diff --git a/src/app/configurator.cpp b/src/app/configurator.cpp index 3963bdc..32cc532 100644 --- a/src/app/configurator.cpp +++ b/src/app/configurator.cpp @@ -3,20 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -#include "app/configuration.hpp" +#include "app/configurator.hpp" #include #include #include +#include #include #include #include #include "app/build_version.hpp" -#include "app/configurator.hpp" - -#include +#include "app/configuration.hpp" using Endpoint = boost::asio::ip::tcp::endpoint; @@ -110,7 +109,7 @@ namespace jam::app { .add(metrics_options); } - outcome::result Configurator::step1() { + outcome::result Configurator::step1() { // read min cli-args and config namespace po = boost::program_options; namespace fs = std::filesystem; @@ -120,7 +119,7 @@ namespace jam::app { po::variables_map vm; - // first-run parse to read only general options and to lookup for "help", + // first-run parse to read-only general options and to lookup for "help", // "config" and "version". all the rest options are ignored try { po::parsed_options parsed = po::command_line_parser(argc_, argv_) @@ -200,6 +199,7 @@ namespace jam::app { } outcome::result Configurator::initGeneralConfig() { + // Init by config-file if (config_file_.has_value()) { auto section = (*config_file_)["general"]; if (section.IsDefined()) { diff --git a/src/app/configurator.hpp b/src/app/configurator.hpp index 8fd1a8e..7e76e2b 100644 --- a/src/app/configurator.hpp +++ b/src/app/configurator.hpp @@ -5,6 +5,8 @@ #pragma once +#include + #include #include #include diff --git a/src/app/impl/application_impl.cpp b/src/app/impl/application_impl.cpp index 2f29cb1..ee2ad68 100644 --- a/src/app/impl/application_impl.cpp +++ b/src/app/impl/application_impl.cpp @@ -16,16 +16,24 @@ #include "log/logger.hpp" #include "metrics/histogram_timer.hpp" #include "metrics/metrics.hpp" +#include "se/impl/subscription_manager.hpp" namespace jam::app { + SeHolder::SeHolder(se_ptr se) : se_(std::move(se)) {} + + SeHolder::~SeHolder() { + se_->dispose(); + } + ApplicationImpl::ApplicationImpl( std::shared_ptr logsys, std::shared_ptr config, std::shared_ptr state_manager, std::shared_ptr watchdog, std::shared_ptr metrics_exposer, - std::shared_ptr system_clock) + std::shared_ptr system_clock, + std::shared_ptr) : logger_(logsys->getLogger("Application", "application")), app_config_(std::move(config)), state_manager_(std::move(state_manager)), diff --git a/src/app/impl/application_impl.hpp b/src/app/impl/application_impl.hpp index 43a4d38..9ab0fb2 100644 --- a/src/app/impl/application_impl.hpp +++ b/src/app/impl/application_impl.hpp @@ -6,12 +6,13 @@ #pragma once -#include "app/application.hpp" - #include #include +#include "app/application.hpp" +#include "se/subscription_fwd.hpp" + namespace jam { class Watchdog; } // namespace jam @@ -41,6 +42,14 @@ namespace jam::metrics { namespace jam::app { + struct SeHolder final { + using se_ptr = std::shared_ptr; + se_ptr se_; + + SeHolder(se_ptr se); + ~SeHolder(); + }; + class ApplicationImpl final : public Application { public: ApplicationImpl(std::shared_ptr logsys, @@ -48,7 +57,8 @@ namespace jam::app { std::shared_ptr state_manager, std::shared_ptr watchdog, std::shared_ptr metrics_exposer, - std::shared_ptr system_clock); + std::shared_ptr system_clock, + std::shared_ptr); void run() override; diff --git a/src/app/impl/state_manager_impl.hpp b/src/app/impl/state_manager_impl.hpp index 13f5f26..1965e3f 100644 --- a/src/app/impl/state_manager_impl.hpp +++ b/src/app/impl/state_manager_impl.hpp @@ -6,13 +6,12 @@ #pragma once -#include "app/state_manager.hpp" - #include #include #include #include +#include "app/state_manager.hpp" #include "utils/ctor_limiters.hpp" namespace soralog { diff --git a/src/executable/CMakeLists.txt b/src/executable/CMakeLists.txt index 392671f..0a0234c 100644 --- a/src/executable/CMakeLists.txt +++ b/src/executable/CMakeLists.txt @@ -35,6 +35,7 @@ else () add_executable(jam_node jam_node.cpp) target_link_libraries(jam_node ${LIBRARIES}) endif () +add_dependencies(jam_node all_modules) #if (BACKWARD) # add_backward(jam_node) diff --git a/src/executable/jam_node.cpp b/src/executable/jam_node.cpp index af58cdf..23f0649 100644 --- a/src/executable/jam_node.cpp +++ b/src/executable/jam_node.cpp @@ -4,18 +4,24 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include +#include +#include +#include +#include #include #include -#include - #include "app/application.hpp" #include "app/configuration.hpp" #include "app/configurator.hpp" #include "injector/node_injector.hpp" +#include "loaders/impl/example_loader.hpp" #include "log/logger.hpp" +#include "modules/module_loader.hpp" +#include "se/subscription.hpp" using std::string_view_literals::operator""sv; @@ -35,11 +41,29 @@ namespace { int run_node(std::shared_ptr logsys, std::shared_ptr appcfg) { auto injector = std::make_unique(logsys, appcfg); + // Load modules + std::deque> loaders; + { + auto logger = logsys->getLogger("Modules", "jam"); + const std::string path("modules"); + + jam::modules::ModuleLoader module_loader(path); + auto modules = module_loader.get_modules(); + if (modules.has_error()) { + SL_CRITICAL(logger, "Failed to load modules from path: {}", path); + return EXIT_FAILURE; + } + + for (const auto &module : modules.value()) { + auto loader = injector->register_loader(module); + if (loader) { + loaders.emplace_back(std::move(loader)); + } + } + } auto logger = logsys->getLogger("Main", jam::log::defaultGroupName); - auto app = injector->injectApplication(); - SL_INFO(logger, "Node started. Version: {} ", appcfg->nodeVersion()); app->run(); diff --git a/src/injector/CMakeLists.txt b/src/injector/CMakeLists.txt index 7030c72..3b6979b 100644 --- a/src/injector/CMakeLists.txt +++ b/src/injector/CMakeLists.txt @@ -15,4 +15,6 @@ target_link_libraries(node_injector application metrics clock + se_async + modules ) diff --git a/src/injector/node_injector.cpp b/src/injector/node_injector.cpp index 48ef7d5..11fd608 100644 --- a/src/injector/node_injector.cpp +++ b/src/injector/node_injector.cpp @@ -9,8 +9,14 @@ #include "injector/node_injector.hpp" +#include +#include +#include +#include + #include #include +#include #include "app/configuration.hpp" #include "app/impl/application_impl.hpp" @@ -18,9 +24,13 @@ #include "app/impl/watchdog.hpp" #include "clock/impl/clock_impl.hpp" #include "injector/bind_by_lambda.hpp" +#include "loaders/loader.hpp" #include "log/logger.hpp" #include "metrics/impl/exposer_impl.hpp" #include "metrics/impl/prometheus/handler_impl.hpp" +#include "modules/module.hpp" +#include "se/impl/async_dispatcher_impl.hpp" +#include "se/subscription_fwd.hpp" namespace { namespace di = boost::di; @@ -29,7 +39,7 @@ namespace { template auto useConfig(C c) { - return boost::di::bind >().to( + return boost::di::bind>().to( std::move(c))[boost::di::override]; } @@ -50,6 +60,7 @@ namespace { di::bind.to(logsys), di::bind.to(), di::bind.to(), + di::bind.to>(), di::bind.to([](const auto &injector) { return metrics::Exposer::Configuration{ {boost::asio::ip::address_v4::from_string("127.0.0.1"), 7777} @@ -92,10 +103,44 @@ namespace jam::injector { NodeInjector::NodeInjector(std::shared_ptr logsys, std::shared_ptr config) : pimpl_{std::make_unique( - makeNodeInjector(std::move(logsys), std::move(config)))} {} + makeNodeInjector(std::move(logsys), std::move(config)))} {} std::shared_ptr NodeInjector::injectApplication() { return pimpl_->injector_ - .template create >(); + .template create>(); + } + + std::shared_ptr NodeInjector::getSE() { + return pimpl_->injector_.template create>(); + } + + std::unique_ptr NodeInjector::register_loader( + std::shared_ptr module) { + auto logsys = pimpl_->injector_ + .template create>(); + auto logger = logsys->getLogger("Modules", "jam"); + + if ("ExampleLoader" == module->get_loader_id()) { + auto loader = + pimpl_->injector_ + .template create>(); + loader->start(module); + + if (auto info = loader->module_info()) { + SL_INFO(logger, "> Module: {} [{}]", *info, module->get_path()); + } else { + SL_ERROR(logger, + "> No module info for: {} [{}]", + module->get_loader_id(), + module->get_path()); + } + return std::unique_ptr(loader.release()); + } + + SL_CRITICAL(logger, + "> No loader found for: {} [{}]", + module->get_loader_id(), + module->get_path()); + return {}; } } // namespace jam::injector diff --git a/src/injector/node_injector.hpp b/src/injector/node_injector.hpp index 6be0473..8c44252 100644 --- a/src/injector/node_injector.hpp +++ b/src/injector/node_injector.hpp @@ -8,6 +8,8 @@ #include +#include "se/subscription.hpp" + namespace jam::log { class LoggingSystem; } // namespace jam::log @@ -17,6 +19,15 @@ namespace jam::app { class Application; } // namespace jam::app +namespace jam::loaders { + class Loader; + class ExampleLoader; +} // namespace jam::loaders + +namespace jam::modules { + class Module; +} // namespace jam::modules + namespace jam::injector { /** @@ -29,6 +40,9 @@ namespace jam::injector { std::shared_ptr configuration); std::shared_ptr injectApplication(); + std::shared_ptr getSE(); + std::unique_ptr register_loader( + std::shared_ptr module); protected: std::shared_ptr pimpl_; diff --git a/src/loaders/README b/src/loaders/README new file mode 100644 index 0000000..31be259 --- /dev/null +++ b/src/loaders/README @@ -0,0 +1 @@ +# loaders are locating here \ No newline at end of file diff --git a/src/loaders/impl/example_loader.hpp b/src/loaders/impl/example_loader.hpp new file mode 100644 index 0000000..3085577 --- /dev/null +++ b/src/loaders/impl/example_loader.hpp @@ -0,0 +1,67 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +#include + +#include "loaders/loader.hpp" +#include "log/logger.hpp" +#include "modules/example/example.hpp" +#include "se/subscription.hpp" + +namespace jam::loaders { + + class ExampleLoader final + : public std::enable_shared_from_this, + public Loader, + public modules::ExampleModuleLoader { + struct __T {}; + std::shared_ptr logsys_; + + using InitCompleteSubscriber = BaseSubscriber<__T>; + std::shared_ptr on_init_complete_; + + public: + ExampleLoader(std::shared_ptr logsys, + std::shared_ptr se_manager) + : Loader(std::move(logsys), std::move(se_manager)) {} + + ExampleLoader(const ExampleLoader &) = delete; + ExampleLoader &operator=(const ExampleLoader &) = delete; + + ~ExampleLoader() = default; + + void start(std::shared_ptr module) { + set_module(module); + auto function = + get_module() + ->getFunctionFromLibrary< + std::weak_ptr, + std::shared_ptr, + std::shared_ptr>("query_module_instance"); + + auto se_manager = get_se_manager(); + if (function) { + auto module_internal = (*function)(shared_from_this(), logsys_); + on_init_complete_ = se::SubscriberCreator<__T>::template create< + EventTypes::kOnTestOperationComplete>( + *se_manager, + SubscriptionEngineHandlers::kTest, + [module_internal](auto &) { + if (auto m = module_internal.lock()) { + m->on_loaded_success(); + } + }); + + se_manager->notify(jam::EventTypes::kOnTestOperationComplete); + } + } + }; +} // namespace jam::loaders diff --git a/src/loaders/loader.hpp b/src/loaders/loader.hpp new file mode 100644 index 0000000..6b99e81 --- /dev/null +++ b/src/loaders/loader.hpp @@ -0,0 +1,73 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +#include "modules/module.hpp" +#include "se/subscription_fwd.hpp" + +namespace jam::log { + class LoggingSystem; +} // namespace jam::log + +namespace jam::modules { + class Module; +} // namespace jam::modules + +namespace jam::loaders { + + class Loader { + public: + Loader(const Loader &) = delete; + Loader &operator=(const Loader &) = delete; + + Loader(std::shared_ptr logsys, + std::shared_ptr se_manager) + : logsys_(std::move(logsys)), se_manager_(std::move(se_manager)) {} + + virtual ~Loader() = default; + virtual void start(std::shared_ptr) = 0; + + std::optional module_info() { + if (module_) { + auto result = + module_->getFunctionFromLibrary("module_info"); + if (result) { + return (*result)(); + } + } + return std::nullopt; + } + + std::shared_ptr get_logsys() { + return logsys_; + } + + std::shared_ptr get_module() { + if (!module_) { + throw std::runtime_error("Module not set"); + } + return module_; + } + + void set_module(std::shared_ptr module) { + module_ = module; + } + + std::shared_ptr get_se_manager() { + return se_manager_; + } + + private: + std::shared_ptr module_; + std::shared_ptr logsys_; + std::shared_ptr se_manager_; + }; +} // namespace jam::loaders diff --git a/src/log/logger.hpp b/src/log/logger.hpp index 5cce5fd..c35952e 100644 --- a/src/log/logger.hpp +++ b/src/log/logger.hpp @@ -9,19 +9,22 @@ #include #include -#include #include +#include +#include #include #include #include #include -#include "utils/ctor_limiters.hpp" #include "injector/dont_inject.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::log { using soralog::Level; + using Logger = qtils::StrictSharedPtr; + enum class Error : uint8_t { WRONG_LEVEL = 1, WRONG_GROUP, WRONG_LOGGER }; outcome::result str2lvl(std::string_view str); @@ -58,23 +61,22 @@ namespace jam::log { return logging_system_->getLogger(logger_name, group_name, level); } - [[nodiscard]] - bool setLevelOfGroup(const std::string &group_name, Level level) const { + [[nodiscard]] bool setLevelOfGroup(const std::string &group_name, + Level level) const { return logging_system_->setLevelOfGroup(group_name, level); } - [[nodiscard]] - bool resetLevelOfGroup(const std::string &group_name) const { + [[nodiscard]] bool resetLevelOfGroup(const std::string &group_name) const { return logging_system_->resetLevelOfGroup(group_name); } - [[nodiscard]] - bool setLevelOfLogger(const std::string &logger_name, Level level) const { + [[nodiscard]] bool setLevelOfLogger(const std::string &logger_name, + Level level) const { return logging_system_->setLevelOfLogger(logger_name, level); } - [[nodiscard]] - bool resetLevelOfLogger(const std::string &logger_name) const { + [[nodiscard]] bool resetLevelOfLogger( + const std::string &logger_name) const { return logging_system_->resetLevelOfLogger(logger_name); } diff --git a/src/metrics/histogram_timer.hpp b/src/metrics/histogram_timer.hpp index e907969..446cc78 100644 --- a/src/metrics/histogram_timer.hpp +++ b/src/metrics/histogram_timer.hpp @@ -27,6 +27,12 @@ namespace jam::metrics { registry_->registerGaugeFamily(name, help); metric_ = registry_->registerGaugeMetric(name); } + GaugeHelper(const std::string &name, + const std::string &help, + const std::map &labels) { + registry_->registerGaugeFamily(name, help); + metric_ = registry_->registerGaugeMetric(name, labels); + } auto *operator->() const { return metric_; diff --git a/src/modules/CMakeLists.txt b/src/modules/CMakeLists.txt new file mode 100644 index 0000000..5e5f336 --- /dev/null +++ b/src/modules/CMakeLists.txt @@ -0,0 +1,93 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +if(NOT TARGET all_modules) + add_custom_target(all_modules) +endif() + +function(add_jam_module NAME) + set(MODULE_NAME ${NAME}) + + set(MODULE "${MODULE_NAME}_module") + + # Parse named arguments + cmake_parse_arguments( + # Prefix for parsed argument variables + MODULE + # List of flags (boolean arguments without values) + "" + # List of named arguments with a single value + "" + # List of named arguments with multiple values + "SOURCE;INCLUDE_DIRS;LIBRARIES;DEFINITIONS" + # Input arguments + ${ARGN} + ) + + if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/module.cpp) + message(FATAL_ERROR "Not found `module.cpp` file (main file of module)") + endif () + if (NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${MODULE_NAME}.hpp" OR NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${MODULE_NAME}.cpp") + message(FATAL_ERROR "Not found `${MODULE_NAME}.hpp` nor `${MODULE_NAME}.cpp` file (class of module)") + endif () + + # Create a shared module library + add_library(${MODULE} MODULE # or SHARED + module.cpp + ${MODULE_NAME}.cpp + ${MODULE_SOURCE} + ) + + # Set exported symbols visibility + set_target_properties(${MODULE} PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN ON + ) + + # Set include directories + if (MODULE_INCLUDE_DIRS) + target_include_directories(${MODULE} PRIVATE + ${MODULE_INCLUDE_DIRS} + ) + endif () + + # Set definitions specified for module + if (MODULE_DEFINITIONS) + target_compile_definitions(${MODULE} PRIVATE + ${MODULE_DEFINITIONS} + ) + endif () + + # Link with libs + if (MODULE_LIBRARIES) + target_link_libraries(${MODULE} + ${MODULE_LIBRARIES} + ) + endif () + + # Set C++ standard + target_compile_features(${MODULE} PRIVATE + cxx_std_20 + ) + + add_dependencies(all_modules ${MODULE}) + +endfunction() + +# -------------- Core-part of module subsystem -------------- + +add_library(modules + module_loader.cpp +) + +target_link_libraries(modules + qtils::qtils +) + +# -------------- Modules -------------- + +# Example module +add_subdirectory(example) \ No newline at end of file diff --git a/src/modules/README b/src/modules/README new file mode 100644 index 0000000..a7c5cfe --- /dev/null +++ b/src/modules/README @@ -0,0 +1 @@ +# modules are locating here \ No newline at end of file diff --git a/src/modules/example/CMakeLists.txt b/src/modules/example/CMakeLists.txt new file mode 100644 index 0000000..7ff734f --- /dev/null +++ b/src/modules/example/CMakeLists.txt @@ -0,0 +1,17 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_jam_module(example + SOURCE + example.cpp + INCLUDE_DIRS + ${CMAKE_SOURCE_DIR}/src + DEFINITIONS + SOME_FLAG=1 + LIBRARIES + qtils::qtils + soralog::soralog +) \ No newline at end of file diff --git a/src/modules/example/example.cpp b/src/modules/example/example.cpp new file mode 100644 index 0000000..5eb1c50 --- /dev/null +++ b/src/modules/example/example.cpp @@ -0,0 +1,18 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/example/example.hpp" + +namespace jam::modules { + // ExampleModule::ExampleModule( + // qtils::StrictSharedPtr loader, + // qtils::StrictSharedPtr logging_system) + // : loader_(loader), + // logger_(logging_system->getLogger("ExampleModule", "example_module")) + + // {} + +} // namespace jam::modules diff --git a/src/modules/example/example.hpp b/src/modules/example/example.hpp new file mode 100644 index 0000000..ad5fe69 --- /dev/null +++ b/src/modules/example/example.hpp @@ -0,0 +1,41 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace jam::modules { + class ExampleModule; + struct ExampleModuleLoader { + virtual ~ExampleModuleLoader() = default; + }; + + struct ExampleModule { + virtual ~ExampleModule() = default; + virtual void on_loaded_success() = 0; + }; +} // namespace jam::modules + +// class BlockTree; + +namespace jam::modules { + + // class ExampleModule : public Singleton { + // public: + // static std::shared_ptr instance; + // CREATE_SHARED_METHOD(ExampleModule); + + // ExampleModule(qtils::StrictSharedPtr loader, + // qtils::StrictSharedPtr logging_system); + + // qtils::StrictSharedPtr loader_; + // log::Logger logger_; + // }; + +} // namespace jam::modules diff --git a/src/modules/example/module.cpp b/src/modules/example/module.cpp new file mode 100644 index 0000000..7120918 --- /dev/null +++ b/src/modules/example/module.cpp @@ -0,0 +1,49 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#define MODULE_C_API extern "C" __attribute__((visibility("default"))) +#define MODULE_API __attribute__((visibility("default"))) + +MODULE_C_API const char *loader_id() { + return "ExampleLoader"; +} + +MODULE_C_API const char *module_info() { + return "ExampleModule v1.0"; +} + +class ExampleModuleImpl final : public jam::modules::ExampleModule { + std::shared_ptr loader_; + std::shared_ptr logger_; + + public: + ExampleModuleImpl(std::shared_ptr loader, + std::shared_ptr logger) + : loader_(std::move(loader)), logger_(std::move(logger)) {} + + void on_loaded_success() override { + auto l = logger_->getLogger("ExampleModule", "jam"); + SL_INFO(l, "Loaded success"); + } +}; +static std::shared_ptr exmpl_mod; + + +MODULE_C_API std::weak_ptr query_module_instance( + std::shared_ptr loader, + std::shared_ptr logger) { + if (!exmpl_mod) { + exmpl_mod = std::make_shared(std::move(loader), + std::move(logger)); + } + return exmpl_mod; +} + +MODULE_C_API void release_module_instance() { + exmpl_mod.reset(); +} diff --git a/src/modules/module.hpp b/src/modules/module.hpp new file mode 100644 index 0000000..496a4fe --- /dev/null +++ b/src/modules/module.hpp @@ -0,0 +1,67 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace jam::modules { + + class Module final : public std::enable_shared_from_this { + public: + Module(Module &&) = default; + + // Static method for Module object creation + static std::shared_ptr create( + const std::string &path, + std::unique_ptr handle, + const std::string &loader_id) { + return std::shared_ptr( + new Module(path, std::move(handle), loader_id)); + } + + // Getter for library path + const std::string &get_path() const { + return path_; + } + + // Getter for loader Id + const std::string &get_loader_id() const { + return loader_id_; + } + + // Get function address from library + template + std::optional getFunctionFromLibrary( + const char *funcName) { + void *funcAddr = dlsym(handle_.get(), funcName); + if (!funcAddr) { + return std::nullopt; + } + return reinterpret_cast(funcAddr); + } + + private: + Module(const std::string &path, + std::unique_ptr handle, + const std::string &loader_id) + : path_(path), handle_(std::move(handle)), loader_id_(loader_id) {} + + std::string path_; // Library path + std::unique_ptr handle_; // Library handle + std::string loader_id_; // Loader ID + + Module(const Module &) = delete; + Module &operator=(const Module &) = delete; + Module &operator=(Module &&) = delete; + }; + +} // namespace jam::modules diff --git a/src/modules/module_loader.cpp b/src/modules/module_loader.cpp new file mode 100644 index 0000000..dbd7f3b --- /dev/null +++ b/src/modules/module_loader.cpp @@ -0,0 +1,80 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/module_loader.hpp" + +#define COMPONENT_NAME "ModuleLoader" + +OUTCOME_CPP_DEFINE_CATEGORY(jam::modules, ModuleLoader::Error, e) { + using E = jam::modules::ModuleLoader::Error; + switch (e) { + case E::PathIsNotADir: + return COMPONENT_NAME ": path is not a directory"; + case E::OpenLibraryFailed: + return COMPONENT_NAME ": open library failed"; + case E::NoLoaderIdExport: + return COMPONENT_NAME ": library doesn't provide loader_id function"; + case E::UnexpectedLoaderId: + return COMPONENT_NAME ": unexpected loader id"; + } + return COMPONENT_NAME ": unknown error"; +} + +namespace jam::modules { + + Result ModuleLoader::recursive_search( + const fs::path &dir_path, std::deque> &modules) { + if (!fs::exists(dir_path) || !fs::is_directory(dir_path)) { + return Error::PathIsNotADir; + } + + for (const auto &entry : fs::directory_iterator(dir_path)) { + const auto &entry_path = entry.path(); + const auto &entry_name = entry.path().filename().string(); + + if (entry_name[0] == '.' || entry_name[0] == '_') { + continue; + } + + if (fs::is_directory(entry)) { + OUTCOME_TRY(recursive_search(entry_path, modules)); + } else if (fs::is_regular_file(entry) + && entry_path.extension() == ".so") { + OUTCOME_TRY(load_module(entry_path.string(), modules)); + } + } + return outcome::success(); + } + + Result ModuleLoader::load_module( + const std::string &module_path, + std::deque> &modules) { + std::unique_ptr handle( + dlopen(module_path.c_str(), RTLD_LAZY), dlclose); + if (!handle) { + return Error::OpenLibraryFailed; + } + + typedef const char *(*LoaderIdFunc)(); + LoaderIdFunc loader_id_func = + (LoaderIdFunc)dlsym(handle.get(), "loader_id"); + + if (!loader_id_func) { + return Error::NoLoaderIdExport; + } + + const char *loader_id = loader_id_func(); + if (!loader_id) { + return Error::UnexpectedLoaderId; + } + + auto module = Module::create(module_path, std::move(handle), loader_id); + modules.push_back(module); + return outcome::success(); + } + + +} // namespace jam::modules diff --git a/src/modules/module_loader.hpp b/src/modules/module_loader.hpp new file mode 100644 index 0000000..daae0d8 --- /dev/null +++ b/src/modules/module_loader.hpp @@ -0,0 +1,56 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "modules/module.hpp" + +namespace fs = std::filesystem; +template +using Result = outcome::result; + +namespace jam::modules { + + class ModuleLoader { + public: + enum class Error : uint8_t { + PathIsNotADir, + OpenLibraryFailed, + NoLoaderIdExport, + UnexpectedLoaderId, + }; + + explicit ModuleLoader(const std::string &dir_path) : dir_path_(dir_path) {} + + Result>> get_modules() { + std::deque> modules; + OUTCOME_TRY(recursive_search(fs::path(dir_path_), modules)); + return modules; + } + + private: + std::string dir_path_; + + Result recursive_search(const fs::path &dir_path, + std::deque> &modules); + Result load_module(const std::string &module_path, + std::deque> &modules); + }; + +} // namespace jam::modules + +OUTCOME_HPP_DECLARE_ERROR(jam::modules, ModuleLoader::Error); diff --git a/src/se/CMakeLists.txt b/src/se/CMakeLists.txt new file mode 100644 index 0000000..fe524f6 --- /dev/null +++ b/src/se/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_library(se_async + async_dispatcher.cpp + subscription.cpp + ) + +target_link_libraries(se_async + ) + +add_library(se_sync + sync_dispatcher.cpp + subscription.cpp + ) + +target_link_libraries(se_sync + ) diff --git a/src/se/async_dispatcher.cpp b/src/se/async_dispatcher.cpp new file mode 100644 index 0000000..a742632 --- /dev/null +++ b/src/se/async_dispatcher.cpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include "impl/async_dispatcher_impl.hpp" +#include "subscription.hpp" + +namespace jam::se { + + std::shared_ptr getDispatcher() { + return std::make_shared< + AsyncDispatcher>(); + } + +} // namespace jam::se diff --git a/src/se/impl/async_dispatcher_impl.hpp b/src/se/impl/async_dispatcher_impl.hpp new file mode 100644 index 0000000..983dd63 --- /dev/null +++ b/src/se/impl/async_dispatcher_impl.hpp @@ -0,0 +1,166 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "common.hpp" +#include "dispatcher.hpp" +#include "thread_handler.hpp" + +namespace jam::se { + + template + class AsyncDispatcher final : public IDispatcher { + public: + // Disable copying + AsyncDispatcher(const AsyncDispatcher &) = delete; + AsyncDispatcher &operator=(const AsyncDispatcher &) = delete; + + // Disable moving + AsyncDispatcher(AsyncDispatcher &&) = delete; + AsyncDispatcher &operator=(AsyncDispatcher &&) = delete; + + static constexpr uint32_t kHandlersCount = kCount; + static constexpr uint32_t kPoolThreadsCount = kPoolSize; + + private: + using Parent = IDispatcher; + + struct SchedulerContext { + /// Scheduler to execute tasks + std::shared_ptr handler; + }; + + SchedulerContext handlers_[kHandlersCount]; + SchedulerContext pool_[kPoolThreadsCount]; + + std::atomic_int64_t temporary_handlers_tasks_counter_; + std::atomic is_disposed_; + + struct BoundContexts { + typename Parent::Tid next_tid_offset = 0u; + std::unordered_map contexts; + }; + utils::ReadWriteObject bound_; + + void uploadToHandler(const typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) { + assert(tid != kExecuteInPool || !pred); + if (is_disposed_.load()) { + return; + } + + if (tid < kHandlersCount) { + pred ? handlers_[tid].handler->repeat( + timeout, std::move(task), std::move(pred)) + : handlers_[tid].handler->addDelayed(timeout, std::move(task)); + return; + } + + if (auto context = + bound_.sharedAccess([tid](const BoundContexts &bound) + -> std::optional { + if (auto it = bound.contexts.find(tid); + it != bound.contexts.end()) { + return it->second; + } + return std::nullopt; + })) { + pred ? context->handler->repeat( + timeout, std::move(task), std::move(pred)) + : context->handler->addDelayed(timeout, std::move(task)); + return; + } + + std::optional opt_task = std::move(task); + for (auto &handler : pool_) { + if (opt_task = + handler.handler->uploadIfFree(timeout, std::move(*opt_task)); + !opt_task) { + return; + } + } + + auto h = std::make_shared(); + ++temporary_handlers_tasks_counter_; + h->addDelayed(timeout, [this, h, task{std::move(*opt_task)}]() mutable { + if (!is_disposed_.load()) { + task(); + } + --temporary_handlers_tasks_counter_; + h->dispose(false); + }); + } + + public: + AsyncDispatcher() { + temporary_handlers_tasks_counter_.store(0); + is_disposed_ = false; + for (auto &h : handlers_) { + h.handler = std::make_shared(); + } + for (auto &h : pool_) { + h.handler = std::make_shared(); + } + } + + void dispose() override { + is_disposed_ = true; + for (auto &h : handlers_) { + h.handler->dispose(); + } + for (auto &h : pool_) { + h.handler->dispose(); + } + + while (temporary_handlers_tasks_counter_.load() != 0) { + std::this_thread::sleep_for(std::chrono::microseconds(0ull)); + } + } + + void add(typename Parent::Tid tid, typename Parent::Task &&task) override { + uploadToHandler( + tid, std::chrono::microseconds(0ull), std::move(task), nullptr); + } + + void addDelayed(typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task) override { + uploadToHandler(tid, timeout, std::move(task), nullptr); + } + + void repeat(typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) override { + uploadToHandler(tid, timeout, std::move(task), std::move(pred)); + } + + std::optional bind(std::shared_ptr scheduler) override { + if (!scheduler) { + return std::nullopt; + } + + return bound_.exclusiveAccess( + [scheduler(std::move(scheduler))](BoundContexts &bound) { + const auto execution_tid = kHandlersCount + bound.next_tid_offset; + assert(bound.contexts.find(execution_tid) == bound.contexts.end()); + bound.contexts[execution_tid] = SchedulerContext{scheduler}; + ++bound.next_tid_offset; + return execution_tid; + }); + } + + bool unbind(Tid tid) override { + return bound_.exclusiveAccess([tid](BoundContexts &bound) { + return bound.contexts.erase(tid) == 1; + }); + } + }; + +} // namespace jam::se diff --git a/src/se/impl/compile-time_murmur2.hpp b/src/se/impl/compile-time_murmur2.hpp new file mode 100644 index 0000000..4aa1beb --- /dev/null +++ b/src/se/impl/compile-time_murmur2.hpp @@ -0,0 +1,102 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace jam::se::utils { + + class Hasher { + static constexpr /* h */ uint32_t __init__(uint32_t len) { + return 0 ^ len; + } + + template + static constexpr uint32_t __load__(__T &data, uint32_t offset) { + return data[offset + 0] | (data[offset + 1] << 8) + | (data[offset + 2] << 16) | (data[offset + 3] << 24); + } + + static constexpr uint32_t __mul__(uint32_t val1, uint32_t val2) { + return val1 * val2; + } + + static constexpr uint32_t __sl__(uint32_t value, uint32_t count) { + return (value << count); + } + + static constexpr uint32_t __sr__(uint32_t value, uint32_t count) { + return (value >> count); + } + + static constexpr uint32_t __xor__(uint32_t h, uint32_t k) { + return h ^ k; + } + + static constexpr uint32_t __xor_with_sr__(uint32_t k, uint32_t r) { + return __xor__(k, __sr__(k, r)); + } + + template + static constexpr /* h */ uint32_t __proc__(__Type &data, + uint32_t len, + uint32_t offset, + uint32_t h, + uint32_t m, + uint32_t r) { + return len >= 4 + ? __proc__( + data, + len - 4, + offset + 4, + __xor__(__mul__(h, m), + __mul__(__xor_with_sr__( + __mul__(__load__(data, offset), m), r), + m)), + m, + r) + : len == 3 ? __proc__(data, + len - 1, + offset, + __xor__(h, __sl__(data[offset + 2], 16)), + m, + r) + : len == 2 ? __proc__(data, + len - 1, + offset, + __xor__(h, __sl__(data[offset + 1], 8)), + m, + r) + : len == 1 + ? __proc__( + data, len - 1, offset, __xor__(h, data[offset]) * m, m, r) + : __xor__(__mul__(__xor_with_sr__(h, 13), m), + __sr__(__mul__(__xor_with_sr__(h, 13), m), 15)); + } + + public: + template + static constexpr uint32_t murmur2(__Type &data, uint32_t len) { + return __proc__(data, len, 0, __init__(len), 0x5bd1e995, 24); + } + }; + +} // namespace jam::se::utils + +#ifndef CT_MURMUR2 +#define CT_MURMUR2(x) \ + ::jam::se::utils::Hasher::murmur2(x, (sizeof(x) / sizeof(x[0])) - 1) +#endif // CT_MURMUR2 + +static_assert(CT_MURMUR2("Called the One Ring, or the Ruling Ring.") + == 1333588607); +static_assert( + CT_MURMUR2("Fashioned by Sauron a decade after the making of the Elven " + "rings in the fires of Mount Doom in Mordor and which") + == 1319897327); +static_assert(CT_MURMUR2("could only be destroyed in that same fire.") + == 702138758); diff --git a/src/se/impl/dispatcher.hpp b/src/se/impl/dispatcher.hpp new file mode 100644 index 0000000..0483c62 --- /dev/null +++ b/src/se/impl/dispatcher.hpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +#include "scheduler.hpp" + +namespace jam::se { + + struct IDispatcher { + using Tid = uint32_t; + using Task = IScheduler::Task; + using Predicate = IScheduler::Predicate; + + static constexpr Tid kExecuteInPool = std::numeric_limits::max(); + + virtual ~IDispatcher() = default; + + virtual std::optional bind(std::shared_ptr scheduler) = 0; + virtual bool unbind(Tid tid) = 0; + + virtual void dispose() = 0; + virtual void add(Tid tid, Task &&task) = 0; + virtual void addDelayed(Tid tid, + std::chrono::microseconds timeout, + Task &&task) = 0; + virtual void repeat(Tid tid, + std::chrono::microseconds timeout, + Task &&task, + Predicate &&pred) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/scheduler.hpp b/src/se/impl/scheduler.hpp new file mode 100644 index 0000000..9f1a07c --- /dev/null +++ b/src/se/impl/scheduler.hpp @@ -0,0 +1,42 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace jam::se { + + class IScheduler { + public: + using Task = std::function; + using Predicate = std::function; + virtual ~IScheduler() {} + + /// Stops sheduler work and tasks execution + virtual void dispose(bool wait_for_release = true) = 0; + + /// Checks if current scheduler executes task + virtual bool isBusy() const = 0; + + /// If scheduller is not busy it takes task for execution. Otherwise it + /// returns it back. + virtual std::optional uploadIfFree(std::chrono::microseconds timeout, + Task &&task) = 0; + + /// Adds delayed task to execution queue + virtual void addDelayed(std::chrono::microseconds timeout, Task &&t) = 0; + + /// Adds task that will be periodicaly called with timeout period after + /// timeout, until predicate return true + virtual void repeat(std::chrono::microseconds timeout, + Task &&t, + Predicate &&pred) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/scheduler_impl.hpp b/src/se/impl/scheduler_impl.hpp new file mode 100644 index 0000000..c8ad7c2 --- /dev/null +++ b/src/se/impl/scheduler_impl.hpp @@ -0,0 +1,193 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.hpp" +#include "scheduler.hpp" + +namespace jam::se { + + class SchedulerBase : public IScheduler { + private: + using Time = std::chrono::high_resolution_clock; + using Timepoint = std::chrono::time_point