diff --git a/udp/client/.gitkeep b/udp/client/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/udp/client/CMakeLists.txt b/udp/client/CMakeLists.txt new file mode 100644 index 0000000..5f080c9 --- /dev/null +++ b/udp/client/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 2.8) +project(calculator) +set(CMAKE_CXX_STANDARD 17) + +file(GLOB SOURCE_FILES "src/*.cpp") + +add_executable(calculator ${SOURCE_FILES}) + +target_include_directories(calculator PRIVATE include) + +find_package(Threads REQUIRED) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(calculator PUBLIC "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(calculator "${CMAKE_THREAD_LIBS_INIT}") +endif() diff --git a/udp/client/include/CalcuatorServerDriver.hpp b/udp/client/include/CalcuatorServerDriver.hpp new file mode 100644 index 0000000..c451052 --- /dev/null +++ b/udp/client/include/CalcuatorServerDriver.hpp @@ -0,0 +1,56 @@ +#include + +#pragma once + +#include +#include +#include +#include +#include +#include "requests.hpp" +#include "socketUtils.hpp" +#include "ConcurrentQueue.hpp" +#include "ConcurrentMap.hpp" + +class CalcuatorServerDriver { +public: + CalcuatorServerDriver(std::string host, uint16_t port) : m_host_str(std::move(host)), m_port(port) {} + + ~CalcuatorServerDriver(); + + void connect(); + + bool hasResult(); + + CalculatorResponse getResult(); + + void factorial(uint32_t id, int64_t arg); + + void sqrt(uint32_t id, int64_t arg); + + CalculatorResponse plus(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse minus(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse multiply(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse divide(uint32_t id, int64_t arg1, int64_t arg2); + + void results(); + +private: + void sendRequest(CalculatorRequest const &request); + + CalculatorResponse getResponse(CalculatorRequest const &request); + + void readingThreadTask(); + + ConcurrentQueue m_longResults; + ConcurrentMap m_instantResults; + int m_socket = 0; + std::string m_host_str; + sockaddr_in m_host; + uint16_t m_port; + std::thread m_readingThread; + std::unordered_set m_longOperations; +}; diff --git a/udp/client/include/CalculatorApp.hpp b/udp/client/include/CalculatorApp.hpp new file mode 100644 index 0000000..84dca42 --- /dev/null +++ b/udp/client/include/CalculatorApp.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include "CalcuatorServerDriver.hpp" +#include "socketUtils.hpp" + +class CalculatorApp { +public: + CalculatorApp(std::string const &host, uint16_t port) : m_driver(host, port) {} + + void start(); + +private: + void printPrompt(uint32_t computationId); + + void processInput(std::string &line); + + template + void printResult(uint32_t id, T const &value) { + std::cout << "Out [" << id << "]: " << value << std::endl; + } + + void printEntryMessage(); + + void printLine(std::string const &line = ""); + + uint32_t m_currentComputation = 0; + CalcuatorServerDriver m_driver; + + void printResponse(const CalculatorResponse &response); +}; \ No newline at end of file diff --git a/udp/client/include/ConcurrentMap.hpp b/udp/client/include/ConcurrentMap.hpp new file mode 100644 index 0000000..bfc36c9 --- /dev/null +++ b/udp/client/include/ConcurrentMap.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include + +template +class ConcurrentMap { +public: + void put(Key const &key, Value const &value) { + m_setMutex.lock(); + m_map[key] = value; + m_setMutex.unlock(); + } + + bool contains(Key const &key) { + m_setMutex.lock(); + + if (m_map.find(key) != m_map.end()) { + m_setMutex.unlock(); + return true; + } + + m_setMutex.unlock(); + return false; + } + + Value pop(Key const &key) { + m_setMutex.lock(); + auto result = m_map[key]; + m_map.erase(key); + m_setMutex.unlock(); + return result; + } + +private: + std::mutex m_setMutex; + std::unordered_map m_map; +}; diff --git a/udp/client/include/ConcurrentQueue.hpp b/udp/client/include/ConcurrentQueue.hpp new file mode 100644 index 0000000..16639f8 --- /dev/null +++ b/udp/client/include/ConcurrentQueue.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +template +class ConcurrentQueue { +public: + void push(T const &response) { + m_queueMutex.lock(); + m_resultsQueue.push(response); + m_queueMutex.unlock(); + } + + T pop() { + m_queueMutex.lock(); + auto result = m_resultsQueue.front(); + m_resultsQueue.pop(); + m_queueMutex.unlock(); + return result; + } + + bool empty() { + m_queueMutex.lock(); + bool isEmpty = m_resultsQueue.empty(); + m_queueMutex.unlock(); + return isEmpty; + } + +private: + std::mutex m_queueMutex; + std::queue m_resultsQueue; +}; diff --git a/udp/client/include/requests.hpp b/udp/client/include/requests.hpp new file mode 100644 index 0000000..1f552ed --- /dev/null +++ b/udp/client/include/requests.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +#pragma pack(push, 1) +struct CalculatorResponse { + uint8_t errorCode; + uint8_t operationType; + uint32_t computationId; + int64_t result; +}; +#pragma pack(pop) + +enum OperationType { + FAST = 1, + SLOW +}; + +enum ErrorCode { + OK = 0, + WAIT_FOR_RESULT, + OVERFLOW, + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE, +}; + +std::string errorCodeToString(uint8_t code); + +#pragma pack(push, 1) +struct CalculatorRequest { + uint8_t type; + uint32_t computationId; + int64_t firstOperand; + int64_t secondOperand; +}; +#pragma pack(pop) + +enum RequestType { + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT, + LONG_OP_RESULT +}; diff --git a/udp/client/include/socketUtils.hpp b/udp/client/include/socketUtils.hpp new file mode 100644 index 0000000..2f1e4dd --- /dev/null +++ b/udp/client/include/socketUtils.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +void writeToSocket(sockaddr_in &host, int socket, uint8_t const *data, size_t size); + +void readFromSocket(sockaddr_in &host, int socket, uint8_t *data, size_t size); + +void printError(const std::string &s); + +void error(const std::string &type, const std::string &s); + +int socketInit(); + +sockaddr_in hostInit(std::string const &hostname, uint16_t port); + +template +void writeObject(sockaddr_in &host, int socket, const T &object) { + writeToSocket(host, socket, (uint8_t const *) &object, sizeof(T)); +} + +template +T readObject(sockaddr_in &host, int socket) { + T object; + readFromSocket(host, socket, (uint8_t *) &object, sizeof(T)); + + return object; +} diff --git a/udp/client/src/CalcuatorServerDriver.cpp b/udp/client/src/CalcuatorServerDriver.cpp new file mode 100644 index 0000000..af6de15 --- /dev/null +++ b/udp/client/src/CalcuatorServerDriver.cpp @@ -0,0 +1,93 @@ +#include "CalcuatorServerDriver.hpp" + +CalcuatorServerDriver::~CalcuatorServerDriver() { + shutdown(m_socket, SHUT_RDWR); + close(m_socket); +} + +void CalcuatorServerDriver::connect() { + m_socket = socketInit(); + m_host = hostInit(m_host_str, m_port); + m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); +} + +void CalcuatorServerDriver::readingThreadTask() { + while (true) { + auto response = readObject(m_host, m_socket); + if (response.errorCode != WAIT_FOR_RESULT && response.operationType == SLOW) { + m_longResults.push(response); + } + m_instantResults.put(response.computationId, response); + } +} + +bool CalcuatorServerDriver::hasResult() { + return !m_longResults.empty(); +} + +CalculatorResponse CalcuatorServerDriver::getResult() { + auto response = m_longResults.pop(); + m_longOperations.erase(response.computationId); + return response; +} + +void CalcuatorServerDriver::factorial(uint32_t id, int64_t arg) { + getResponse({FACT, id, arg, 0}); + m_longOperations.insert(id); +} + +void CalcuatorServerDriver::sqrt(uint32_t id, int64_t arg) { + getResponse({SQRT, id, arg, 0}); + m_longOperations.insert(id); +} + +CalculatorResponse CalcuatorServerDriver::plus(uint32_t id, int64_t arg1, int64_t arg2) { + return getResponse({PLUS, id, arg1, arg2}); +} + +CalculatorResponse CalcuatorServerDriver::minus(uint32_t id, int64_t arg1, int64_t arg2) { + return getResponse({MINUS, id, arg1, arg2}); +} + +CalculatorResponse CalcuatorServerDriver::multiply(uint32_t id, int64_t arg1, int64_t arg2) { + return getResponse({MULT, id, arg1, arg2}); +} + +CalculatorResponse CalcuatorServerDriver::divide(uint32_t id, int64_t arg1, int64_t arg2) { + return getResponse({DIV, id, arg1, arg2}); +} + +void CalcuatorServerDriver::results() { + for (uint32_t id : m_longOperations) { + getResponse({LONG_OP_RESULT, id, 0, 0}); + } +} + +void CalcuatorServerDriver::sendRequest(CalculatorRequest const &request) { + writeObject(m_host, m_socket, request); +} + +CalculatorResponse CalcuatorServerDriver::getResponse(CalculatorRequest const &request) { + sendRequest(request); + int i = 0; + int resend = 0; + + while (!m_instantResults.contains(request.computationId)) { + if (i == 10) { + if (resend == 5) { + error("get response", "Connection error."); + } + sendRequest(request); + resend++; + i = 0; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + i++; + } + + return m_instantResults.pop(request.computationId); +} + + + diff --git a/udp/client/src/CalculatorApp.cpp b/udp/client/src/CalculatorApp.cpp new file mode 100644 index 0000000..e28d1b3 --- /dev/null +++ b/udp/client/src/CalculatorApp.cpp @@ -0,0 +1,105 @@ +#include "CalculatorApp.hpp" + +void CalculatorApp::start() { + m_driver.connect(); + + printEntryMessage(); + + std::string line; + printPrompt(m_currentComputation); + + while (std::getline(std::cin, line)) { + processInput(line); + ++m_currentComputation; + printPrompt(m_currentComputation); + } +} + +void CalculatorApp::printPrompt(uint32_t computationId) { + std::cout << "In [" << computationId << "]: "; +} + +void CalculatorApp::processInput(std::string &line) { + std::istringstream iss(line); + std::vector results((std::istream_iterator(iss)), + std::istream_iterator()); + + if (results.size() <= 3) { + if (results.size() == 2) { + if (results[0] == "fact") { + m_driver.factorial(m_currentComputation, static_cast(std::stoull(results[1]))); + } else if (results[0] == "sqrt") { + m_driver.sqrt(m_currentComputation, static_cast(std::stoull(results[1]))); + } else { + std::cout << "Syntax error." << std::endl; + } + } else if (results.size() == 1) { + if (results[0] == "rslt") { + m_driver.results(); + } else { + std::cout << "Syntax error." << std::endl; + } + } else if (results.size() == 3 && results[0].length() == 1) { + CalculatorResponse response{}; + switch (results[0][0]) { + case '+': + response = m_driver.plus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '-': + response = m_driver.minus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '*': + response = m_driver.multiply(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '/': + response = m_driver.divide(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + default: + std::cout << "Syntax error." << std::endl; + break; + } + + printResponse(response); + } else { + std::cout << "Syntax error." << std::endl; + } + } else { + std::cout << "Syntax error." << std::endl; + } + + printLine(); + + while (m_driver.hasResult()) { + printResponse(m_driver.getResult()); + printLine(); + } +} + +void CalculatorApp::printResponse(const CalculatorResponse &response) { + if (response.errorCode != OK) { + printResult(response.computationId, errorCodeToString(response.errorCode)); + } else { + printResult(response.computationId, response.result); + } +} + +void CalculatorApp::printEntryMessage() { + std::cout + << "Welcome!\n" + << "\tOnline calculator 1.0\n\n" + << "\tSupported operations: + - * / fact sqrt rslt quit\n" + << "\tUse prefix notation (e.g + 2 3).\n" + << std::endl; +} + +void CalculatorApp::printLine(const std::string &line) { + std::cout << line << std::endl; +} diff --git a/udp/client/src/main.cpp b/udp/client/src/main.cpp new file mode 100644 index 0000000..ccc4cf7 --- /dev/null +++ b/udp/client/src/main.cpp @@ -0,0 +1,11 @@ +#include +#include + +int main(int argc, char *argv[]) { + if (argc != 3) { + error("Invalid argumets", "Usage: calculator "); + } + + CalculatorApp app(std::string(argv[1]), static_cast(atoi(argv[2]))); + app.start(); +} \ No newline at end of file diff --git a/udp/client/src/requests.cpp b/udp/client/src/requests.cpp new file mode 100644 index 0000000..defabe9 --- /dev/null +++ b/udp/client/src/requests.cpp @@ -0,0 +1,18 @@ +#include "requests.hpp" + +std::string errorCodeToString(uint8_t code) { + switch (code) { + case OK: + return "OK"; + case OVERFLOW: + return "Integer overflow"; + case DIV_BY_ZERO: + return "Zero division error"; + case FACT_OF_NEGATIVE: + return "Factorial of negative number"; + case SQRT_OF_NEGATIVE: + return "Sqrt of negative number"; + default: + return "Unknown error"; + } +} \ No newline at end of file diff --git a/udp/client/src/socketUtils.cpp b/udp/client/src/socketUtils.cpp new file mode 100644 index 0000000..ab03fa7 --- /dev/null +++ b/udp/client/src/socketUtils.cpp @@ -0,0 +1,51 @@ +#include "socketUtils.hpp" + +void writeToSocket(sockaddr_in &host, int socket, uint8_t const *data, size_t size) { + ssize_t n = sendto(socket, data, size, 0, (const sockaddr *) &host, sizeof(host)); + if (size != n) { + error("write", "failed to write to socket!"); + } +} + +void readFromSocket(sockaddr_in &host, int socket, uint8_t *data, size_t size) { + socklen_t len = sizeof(host); + ssize_t n = recvfrom(socket, (void *) data, size, 0, (sockaddr *) &host, &len); + if (size != n) { + error("read", "failed to read from socket!"); + } +} + +void printError(const std::string &s) { + std::cerr << s << std::endl; +} + +void error(const std::string &type, const std::string &s) { + printError("Error " + type + ": " + s); + exit(0); +} + +int socketInit() { + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + + if (sockfd < 0) { + error("Init socket", "Network problem."); + } + + return sockfd; +} + +sockaddr_in hostInit(std::string const &hostname, uint16_t port) { + hostent *server = gethostbyname(hostname.c_str()); + + if (server == nullptr) { + error("Host init", "Incorrect host."); + } + + sockaddr_in serv_addr{}; + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy(server->h_addr, (char *) &serv_addr.sin_addr.s_addr, (size_t) server->h_length); + serv_addr.sin_port = htons(port); + + return serv_addr; +} diff --git a/udp/server/.gitkeep b/udp/server/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/udp/server/CMakeLists.txt b/udp/server/CMakeLists.txt new file mode 100644 index 0000000..e932937 --- /dev/null +++ b/udp/server/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.10) +project(server) + +set(CMAKE_CXX_STANDARD 14) + +set(SOURCE_FILES + src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h include/util.hpp src/pools.cpp include/pools.h) + +include_directories(include) + +add_executable(server ${SOURCE_FILES}) + +find_package(Threads REQUIRED) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(server PUBLIC "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(server "${CMAKE_THREAD_LIBS_INIT}") +endif() \ No newline at end of file diff --git a/udp/server/include/pools.h b/udp/server/include/pools.h new file mode 100644 index 0000000..eae46f1 --- /dev/null +++ b/udp/server/include/pools.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "response.hpp" + +class socket_int_pool { +public: + void insert(int socket_descriptor, int id, std::thread* thread); + void remove(int socket_descriptor, int id); + void clear(); +private: + std::mutex lock; + std::map, std::thread*> pool; +}; + +bool operator <(const struct sockaddr_in& x, const struct sockaddr_in& y); + +class long_computation_response_pool { +public: + void insert(struct sockaddr_in client_addr, int id, int type, dctp_response_header response); + dctp_response_header get(struct sockaddr_in client_addr, int id, int type); + bool contains(struct sockaddr_in client_addr, int id, int type); + void clear(); +private: + std::mutex lock; + std::map, dctp_response_header> pool; +}; diff --git a/udp/server/include/request.hpp b/udp/server/include/request.hpp new file mode 100644 index 0000000..2682e83 --- /dev/null +++ b/udp/server/include/request.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include + +#pragma pack(push, 1) + +struct dctp_request_header { + uint8_t type; + uint32_t id; + int64_t first_operand; + int64_t second_operand; +}; + +#pragma pack(pop) + +enum request_type { + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT, + LONG_COMPUTATION_RESULT +}; + +std::string request_to_string(struct dctp_request_header request) { + return "Id: " + std::to_string(request.id) + + "; type: " + std::to_string(request.type) + + "; first operand: " + std::to_string(request.first_operand) + + "; second operand: " + std::to_string(request.second_operand); +} \ No newline at end of file diff --git a/udp/server/include/response.hpp b/udp/server/include/response.hpp new file mode 100644 index 0000000..ae7ac9b --- /dev/null +++ b/udp/server/include/response.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include + +#pragma pack(push, 1) + +struct dctp_response_header { + uint8_t return_code; + uint8_t operation_type; + uint32_t id; + int64_t result; +}; + +#pragma pack(pop) + +enum return_code { + OK = 0, + WAIT_FOR_RESULT, + OVERFLOW, + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE, + UNKNOWN_OPERATION +}; + +enum operation_type { + FAST = 1, + SLOW +}; + +inline std::string response_to_string(struct dctp_response_header response) { + return "Return code: " + std::to_string(response.return_code) + + "; type: " + std::to_string(response.operation_type) + + "; id: " + std::to_string(response.id) + + "; result: " + std::to_string(response.result); +} diff --git a/udp/server/include/server.h b/udp/server/include/server.h new file mode 100644 index 0000000..99bfe0b --- /dev/null +++ b/udp/server/include/server.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include +#include "pools.h" + +class server { +public: + explicit server(uint16_t port); + + ~server(); + + void wait_for_clients(); + + void client_handler(struct dctp_request_header request); + dctp_response_header handle_new_request(struct dctp_request_header request); + struct dctp_response_header handle_slow_function( + struct dctp_request_header request, + void(server::* process)(int, struct dctp_request_header) + ); + + void process_sqrt(int socket_descriptor, struct dctp_request_header request); + + void process_fact(int socket_descriptor, struct dctp_request_header request); +private: + uint16_t port_number; + int socket_descriptor; + sockaddr_in server_addr{}; + sockaddr_in client_addr{}; + bool isInitialized = false; + bool isTerminated = false; + + socket_int_pool slow_ops_pool; + long_computation_response_pool computation_response_pool; +}; + diff --git a/udp/server/include/util.hpp b/udp/server/include/util.hpp new file mode 100644 index 0000000..222a9e2 --- /dev/null +++ b/udp/server/include/util.hpp @@ -0,0 +1,72 @@ +#pragma once + +#include + +void log(const std::string &s) { + std::cout << s << std::endl; +} + +void log_error(const std::string &s) { + std::cerr << "Error: " << s << std::endl; +} + +ssize_t socket_write_response(int socket_descriptor, struct dctp_response_header& response, sockaddr_in& client_addr) { + socklen_t client_size = sizeof(client_addr); + ssize_t sent_bytes = sendto( + socket_descriptor, + &response, + sizeof(response), + 0, + (struct sockaddr *) &client_addr, + client_size + ); + + log("Server: sent to socket " + + std::to_string(socket_descriptor) + + " response (size = " + std::to_string(sent_bytes) + "): " + + response_to_string(response)); + + if (sent_bytes < 0) { + log_error("can't write to socket"); + return -1; + } + if (sent_bytes == 0) { + log_error("socket has closed."); + return -1; + } + if (sent_bytes > 0 && sent_bytes != sizeof(response)) { + log_error("data was not written fully."); + return -1; + } + + return sent_bytes; +} + +ssize_t socket_read_request(int socket_descriptor, struct dctp_request_header& request, sockaddr_in& client_addr) { + socklen_t client_size = sizeof(client_addr); + ssize_t read_bytes = recvfrom( + socket_descriptor, + (void *) &request, + sizeof(request), + 0, + (struct sockaddr *) &client_addr, + &client_size + ); + + log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); + + if (read_bytes < 0) { + log_error("error while reading request"); + return -1; + } + if (read_bytes == 0) { + log_error("socket has closed."); + return -1; + } + if (read_bytes > 0 && read_bytes != sizeof(request)) { + log_error("data was not read fully (read " + std::to_string(read_bytes) + " bytes)."); + return -1; + } + + return read_bytes; +} \ No newline at end of file diff --git a/udp/server/src/main.cpp b/udp/server/src/main.cpp new file mode 100644 index 0000000..5c72320 --- /dev/null +++ b/udp/server/src/main.cpp @@ -0,0 +1,27 @@ +#include +#include + +#include "server.h" + +server* server_pointer; +void signal_handler( int signum ) { + std::cout << "Interrupt signal (" << signum << ") received.\n"; + + delete server_pointer; + exit(signum); +} + +int main(int argc, char **argv) { + std::signal(SIGINT, signal_handler); + std::signal(SIGTERM, signal_handler); + + uint16_t port_number = 22229; + if (argc > 1) { + port_number = static_cast(atoi(argv[1])); // NOLINT(cert-err34-c) + } + + server_pointer = new server(port_number); + server_pointer->wait_for_clients(); + + return 0; +} \ No newline at end of file diff --git a/udp/server/src/pools.cpp b/udp/server/src/pools.cpp new file mode 100644 index 0000000..e31e1e2 --- /dev/null +++ b/udp/server/src/pools.cpp @@ -0,0 +1,58 @@ +#include + +#include +#include +#include + +void socket_int_pool::insert(int socket_descriptor, int id, std::thread *thread) { + lock.lock(); + pool[{socket_descriptor, id}] = thread; + lock.unlock(); +} + +void socket_int_pool::remove(int socket_descriptor, int id) { + lock.lock(); + pool.erase({socket_descriptor, id}); + lock.unlock(); +} + +void socket_int_pool::clear() { + for (auto socket_id_thread : this->pool) { + auto socket_and_id = socket_id_thread.first; + close(socket_and_id.first); + std::thread* slow_op_thread = socket_id_thread.second; + slow_op_thread->join(); + } + this->pool.clear(); +} + + +bool operator <(const struct sockaddr_in& x, const struct sockaddr_in& y) { + return x.sin_addr.s_addr < y.sin_addr.s_addr || x.sin_port < y.sin_port; +} + +void long_computation_response_pool::insert(sockaddr_in client_addr, int id, int type, dctp_response_header response) { + lock.lock(); + pool[{client_addr, id, type}] = response; + lock.unlock(); +} + +dctp_response_header long_computation_response_pool::get(struct sockaddr_in client_addr, int id, int type) { + dctp_response_header response{}; + lock.lock(); + response = pool[{client_addr, id, type}]; + lock.unlock(); + return response; +} + +void long_computation_response_pool::clear() { + this->pool.clear(); +} + +bool long_computation_response_pool::contains(struct sockaddr_in client_addr, int id, int type) { + bool result; + lock.lock(); + result = pool.find(std::make_tuple(client_addr, id, type)) != pool.end(); + lock.unlock(); + return result; +} diff --git a/udp/server/src/server.cpp b/udp/server/src/server.cpp new file mode 100644 index 0000000..8100924 --- /dev/null +++ b/udp/server/src/server.cpp @@ -0,0 +1,172 @@ +#include "server.h" + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + + +server::server(uint16_t port_number) : port_number(port_number) { + log("Server: initialization begin."); + + socket_descriptor = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_descriptor < 0) { + log_error("can't open socket."); + return; + } + int k = 1; + if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &k, sizeof(int)) < 0) { + log_error("setsockopt(SO_REUSEADDR) failed"); + } + + bzero((char *) &server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port_number); + + if (bind(socket_descriptor, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { + log_error("can't bind."); + return; + } + isInitialized = true; + + log("Server: initialization success."); +} + +server::~server() { + log("Server: destruction begin."); + this->isTerminated = true; + close(socket_descriptor); + + this->slow_ops_pool.clear(); + this->computation_response_pool.clear(); + + log("Server: destruction success."); +} + +void server::wait_for_clients() { + if (!this->isInitialized) { + return; + } + + while (!this->isTerminated) { + dctp_request_header request{}; + if (socket_read_request(socket_descriptor, request, client_addr) < 0) { + continue; + } + + log("Server: new client connected. " + std::to_string(client_addr.sin_addr.s_addr)); + client_handler(request); + log("Server: stopped work with client. " + std::to_string(client_addr.sin_addr.s_addr)); + } +} + +void server::client_handler(struct dctp_request_header request) { + struct dctp_response_header response{}; + if (computation_response_pool.contains(client_addr, request.id, request.type)) { + response = computation_response_pool.get(client_addr, request.id, request.type); + } else { + response = handle_new_request(request); + computation_response_pool.insert(client_addr, request.id, request.type, response); + } + + socket_write_response(socket_descriptor, response, client_addr); +} + +dctp_response_header server::handle_new_request(struct dctp_request_header request) { + struct dctp_response_header response{}; + int64_t result; + switch (request.type) { + case PLUS: + result = request.first_operand + request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MINUS: + result = request.first_operand - request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MULT: + result = request.first_operand * request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case DIV: + if (request.second_operand == 0) { + response = {DIV_BY_ZERO, FAST, request.id, 0}; + } else { + result = request.first_operand / request.second_operand; + response = {OK, FAST, request.id, result}; + } + break; + case FACT: + response = handle_slow_function(request, &server::process_fact); + break; + case SQRT: + response = handle_slow_function(request, &server::process_sqrt); + break; + case LONG_COMPUTATION_RESULT: + // If it's not in computation pool, then it doesn't exists. + response = {UNKNOWN_OPERATION, SLOW, request.id, 0}; + break; + default: + response = {UNKNOWN_OPERATION, FAST, request.id, 0}; + break; + } + + return response; +} + +struct dctp_response_header server::handle_slow_function( + struct dctp_request_header request, + void(server::* process)(int, struct dctp_request_header) + ) { + struct dctp_response_header response{WAIT_FOR_RESULT, SLOW, request.id, 0}; + computation_response_pool.insert(client_addr, request.id, request_type::LONG_COMPUTATION_RESULT, response); + + std::thread* thread = new std::thread(process, this, socket_descriptor, request); + slow_ops_pool.insert(socket_descriptor, request.id, thread); + return response; +} + +void server::process_sqrt(int socket_descriptor, struct dctp_request_header request) { + sleep(3); + struct dctp_response_header response{}; + if (request.first_operand < 0) { + response = {SQRT_OF_NEGATIVE, SLOW, request.id, 0}; + } else { + auto result = static_cast(sqrt(request.first_operand)); + response = {OK, SLOW, request.id, result}; + } + + computation_response_pool.insert(client_addr, request.id, request_type::LONG_COMPUTATION_RESULT, response); + slow_ops_pool.remove(socket_descriptor, request.id); +} + +void server::process_fact(int socket_descriptor, struct dctp_request_header request) { + sleep(3); + struct dctp_response_header response{}; + if (request.first_operand < 0) { + response = {FACT_OF_NEGATIVE, SLOW, request.id, 0}; + } else if (request.first_operand > 20) { + response = {OVERFLOW, SLOW, request.id, 0}; + } else { + int64_t result = 1; + for (int i = 1; i <= request.first_operand; i++) { + result *= i; + } + response = {OK, SLOW, request.id, result}; + } + + computation_response_pool.insert(client_addr, request.id, request_type::LONG_COMPUTATION_RESULT, response); + slow_ops_pool.remove(socket_descriptor, request.id); +} +