diff --git a/.run/CLIENT rabbitClientL3cmd.run.xml b/.run/CLIENT rabbitClientL3cmd.run.xml new file mode 100644 index 0000000..c58c56b --- /dev/null +++ b/.run/CLIENT rabbitClientL3cmd.run.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.run/SERVER rabbitServerL3cmd.run.xml b/.run/SERVER rabbitServerL3cmd.run.xml new file mode 100644 index 0000000..265e880 --- /dev/null +++ b/.run/SERVER rabbitServerL3cmd.run.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.run/WORKER1 rabbitWorkerL3cmd.run.xml b/.run/WORKER1 rabbitWorkerL3cmd.run.xml new file mode 100644 index 0000000..c81a5d6 --- /dev/null +++ b/.run/WORKER1 rabbitWorkerL3cmd.run.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 5cefc6b..bf19b93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,7 @@ CPMAddPackage( find_package(SqliteOrm REQUIRED) find_package(argparse REQUIRED) +find_package(nlohmann_json REQUIRED) # Include directories include_directories(src) diff --git a/INTERNAL_API.md b/INTERNAL_API.md new file mode 100644 index 0000000..1f0a8ad --- /dev/null +++ b/INTERNAL_API.md @@ -0,0 +1,10 @@ +# Описание внутреннего взаимодействия API + +API передает сообщение используя JSON формат. +Каждое сообщение содержит следующие поля: + + +## Описание примеров взаимодействия + +### Воркер + diff --git a/conandata.yml b/conandata.yml index f117d5e..69a26cb 100644 --- a/conandata.yml +++ b/conandata.yml @@ -2,5 +2,6 @@ # To keep your changes, remove these comment lines, but the plugin won't be able to modify your requirements requirements: + - "nlohmann_json/3.11.3" - "argparse/3.0" - "sqlite_orm/1.8.2" \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 16df930..55be0e8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,22 +2,33 @@ # Add subdirectory for protocol add_subdirectory(protocol) - +include_directories(DataModel) # Source files set(SOURCE_FILES server/STIPServer.cpp client/STIPClient.cpp + services/TaskService/TaskService.cpp + services/UserDBService/UserDBService.cpp + services/TaskQueue/TaskQueue.h ) # Function to create executable targets function(add_rabbit_executable target_name) add_executable(${target_name} ${ARGN} ${SOURCE_FILES}) target_compile_features(${target_name} PRIVATE cxx_std_17) - target_link_libraries(${target_name} PUBLIC STIPProtocol argparse::argparse Boost::asio ws2_32) + target_link_libraries(${target_name} PUBLIC + STIPProtocol + argparse::argparse + Boost::asio + ws2_32 + nlohmann_json::nlohmann_json + ) endfunction() # Define executables add_rabbit_executable(rabbitServerL2Example ${CMAKE_CURRENT_SOURCE_DIR}/../examples/server.cpp) add_rabbit_executable(rabbitClientL2Example ${CMAKE_CURRENT_SOURCE_DIR}/../examples/client.cpp) + add_rabbit_executable(rabbitServerL3cmd cmd/serverMain.cpp rabbitCore/server/RabbitServer.cpp) -target_link_libraries(rabbitServerL3cmd PRIVATE sqlite_orm::sqlite_orm) +add_rabbit_executable(rabbitClientL3cmd cmd/clientMain.cpp rabbitCore/client/RabbitClient.cpp) +add_rabbit_executable(rabbitWorkerL3cmd cmd/workerMain.cpp rabbitCore/worker/RabbitWorker.cpp) diff --git a/src/DataModel/Client.h b/src/DataModel/Client.h new file mode 100644 index 0000000..c569c9d --- /dev/null +++ b/src/DataModel/Client.h @@ -0,0 +1,27 @@ +// +// Created by Potato on 12.04.24. +// + +#ifndef RABBIT_USER_H +#define RABBIT_USER_H + +#include +#include +#include "protocol/Connection.h" + +using json = nlohmann::json; + +struct Client { + std::string id; + STIP::Connection *connection; +}; + +inline void to_json(json &j, const Client &c) { + j = json{{"id", c.id}}; +} + +inline void from_json(const json &j, Client &c) { + j.at("id").get_to(c.id); +} + +#endif //RABBIT_USER_H diff --git a/src/DataModel/Message.h b/src/DataModel/Message.h new file mode 100644 index 0000000..f3f99ba --- /dev/null +++ b/src/DataModel/Message.h @@ -0,0 +1,48 @@ +// +// Created by Potato on 12.04.24. +// + +#ifndef RABBIT_MESSAGE_H +#define RABBIT_MESSAGE_H + +#include +#include + +using json = nlohmann::json; + +// TODO: обернуть все нижестоящие объекты в этот высооуровневый класс + +// example enum type declaration +enum MessageType { + RegisterClient, + RegisterWorker, + TaskRequest, + TaskResult, + Invalid = -1 +}; + +// map TaskState values to JSON as strings +NLOHMANN_JSON_SERIALIZE_ENUM(MessageType, { + { RegisterClient, "registerClient" }, + { RegisterWorker, "registerWorker" }, + { TaskRequest, "request" }, + { TaskResult, "result" }, + { Invalid, nullptr }, +}) + +struct Message { + MessageType action; + std::string data; +}; + +inline void to_json(nlohmann::json &j, const Message &m) { + j = json{{"action", m.action}, + {"data", m.data}}; +} + +inline void from_json(const nlohmann::json &j, Message &m) { + j.at("action").get_to(m.action); + j.at("data").get_to(m.data); +} + +#endif //RABBIT_MESSAGE_H diff --git a/src/DataModel/Task.h b/src/DataModel/Task.h new file mode 100644 index 0000000..7a20479 --- /dev/null +++ b/src/DataModel/Task.h @@ -0,0 +1,64 @@ +// +// Created by Serge on 26.03.2024. +// + +#ifndef RABBIT_TASKS_H +#define RABBIT_TASKS_H + +#include +#include + +using json = nlohmann::json; + +// example enum type declaration +enum TaskStatus { + Created, + Queued, + SentToWorker, + Ready, + Failed, +}; + +// map TaskState values to JSON as strings +NLOHMANN_JSON_SERIALIZE_ENUM(TaskStatus, { + { Created, "created" }, + { Queued, "queued" }, + { SentToWorker, "sentToWorker" }, + { Ready, "ready" }, + { Failed, "failed" }, +}) + +struct Task { + std::string id; + std::string func; + std::string input; + std::string output; + int cores; + TaskStatus status; + std::string worker_hash_id; + std::string client_hash_id; +}; + +inline void to_json(json &j, const Task &t) { + j = json{{"id", t.id}, + {"func", t.func}, + {"input", t.input}, + {"output", t.output}, + {"cores", t.cores}, + {"status", t.status}, + {"worker_hash_id", t.worker_hash_id}, + {"client_hash_id", t.client_hash_id}}; +} + +inline void from_json(const json &j, Task &t) { + j.at("id").get_to(t.id); + j.at("func").get_to(t.func); + j.at("input").get_to(t.input); + j.at("output").get_to(t.output); + j.at("cores").get_to(t.cores); + j.at("status").get_to(t.status); + j.at("worker_hash_id").get_to(t.worker_hash_id); + j.at("client_hash_id").get_to(t.client_hash_id); +} + +#endif //RABBIT_TASKS_H diff --git a/src/DataModel/TaskRequest.h b/src/DataModel/TaskRequest.h new file mode 100644 index 0000000..454ca7e --- /dev/null +++ b/src/DataModel/TaskRequest.h @@ -0,0 +1,34 @@ +// +// Created by Potato on 11.04.24. +// + +#ifndef RABBIT_TASKREQUEST_H +#define RABBIT_TASKREQUEST_H + +#include +#include + +using json = nlohmann::json; + +struct TaskRequest { + std::string id; + std::string func; + std::string data; + int cores; +}; + +inline void to_json(json &j, const struct TaskRequest &tr) { + j = json{{"id", tr.id}, + {"func", tr.func}, + {"data", tr.data}, + {"cores", tr.cores}}; +} + +inline void from_json(const json &j, struct TaskRequest &tr) { + j.at("id").get_to(tr.id); + j.at("func").get_to(tr.func); + j.at("data").get_to(tr.data); + j.at("cores").get_to(tr.cores); +} + +#endif //RABBIT_TASKREQUEST_H diff --git a/src/DataModel/TaskResult.h b/src/DataModel/TaskResult.h new file mode 100644 index 0000000..14e8acc --- /dev/null +++ b/src/DataModel/TaskResult.h @@ -0,0 +1,31 @@ +// +// Created by Potato on 11.04.24. +// + +#ifndef RABBIT_TASKRESULT_H +#define RABBIT_TASKRESULT_H + +#include +#include + +using json = nlohmann::json; + +struct TaskResult { + std::string id; + std::string data; + int status; +}; + +inline void to_json(json &j, const struct TaskResult &tr) { + j = json{{"id", tr.id}, + {"data", tr.data}, + {"status", tr.status}}; +} + +inline void from_json(const json &j, struct TaskResult &tr) { + j.at("id").get_to(tr.id); + j.at("data").get_to(tr.data); + j.at("status").get_to(tr.status); +} + +#endif //RABBIT_TASKRESULT_H diff --git a/src/DataModel/Tasks.h b/src/DataModel/Tasks.h deleted file mode 100644 index e1fd7eb..0000000 --- a/src/DataModel/Tasks.h +++ /dev/null @@ -1,21 +0,0 @@ -// -// Created by Serge on 26.03.2024. -// - -#ifndef RABBIT_TASKS_H -#define RABBIT_TASKS_H - -#include - -struct Task { - int id; - std::string queue; - std::string message; - std::string result_message; - int internal_status; - std::string worker_hash_id; - std::string client_hash_id; -}; - - -#endif //RABBIT_TASKS_H diff --git a/src/DataModel/Worker.h b/src/DataModel/Worker.h new file mode 100644 index 0000000..7820f23 --- /dev/null +++ b/src/DataModel/Worker.h @@ -0,0 +1,33 @@ +// +// Created by Potato on 12.04.24. +// + +#ifndef RABBIT_WORKER_H +#define RABBIT_WORKER_H + +#include +#include +#include "protocol/Connection.h" + +using json = nlohmann::json; + +struct Worker { + std::string id; + int cores; + int usedCores; + STIP::Connection *connection; +}; + +inline void to_json(json &j, const Worker &w) { + j = json{{"id", w.id}, + {"cores", w.cores}, + {"usedCores", w.usedCores}}; +} + +inline void from_json(const json &j, Worker &w) { + j.at("id").get_to(w.id); + j.at("cores").get_to(w.cores); + j.at("usedCores").get_to(w.usedCores); +} + +#endif //RABBIT_WORKER_H diff --git a/src/client/STIPClient.cpp b/src/client/STIPClient.cpp index a1b2da9..ccefc02 100644 --- a/src/client/STIPClient.cpp +++ b/src/client/STIPClient.cpp @@ -19,7 +19,6 @@ namespace STIP { this->connectionManager = new ConnectionManager(socket); } - void STIPClient::receiveProcess() { std::cout << "Start listen" << std::endl; while (isRunning) { diff --git a/src/client/STIPClient.h b/src/client/STIPClient.h index 46aca31..23181d2 100644 --- a/src/client/STIPClient.h +++ b/src/client/STIPClient.h @@ -33,7 +33,6 @@ namespace STIP { void receiveProcess(); }; - } #endif //RABBIT_STIPCLIENT_H diff --git a/src/cmd/clientMain.cpp b/src/cmd/clientMain.cpp new file mode 100644 index 0000000..7f02374 --- /dev/null +++ b/src/cmd/clientMain.cpp @@ -0,0 +1,231 @@ +// +// Created by Serge on 23.03.2024. +// + +#include "rabbitCore/client/RabbitClient.h" +#include "DataModel/TaskRequest.h" +#include +#include +#include +#include + +std::string promptSimpleMathTask() { + json data; + int num; + + std::cout << "Enter number one:" << std::endl; + std::cout << "> "; + std::cin >> num; + if (std::cin.fail()) { + std::cin.clear(); // Clear the error flag + std::cin.ignore(std::numeric_limits::max(), '\n'); // Ignore invalid input + throw std::runtime_error("Invalid input for number one."); + } + data["a"] = num; + + std::cout << "Enter number two:" << std::endl; + std::cout << "> "; + std::cin >> num; + if (std::cin.fail()) { + std::cin.clear(); + std::cin.ignore(std::numeric_limits::max(), '\n'); + throw std::runtime_error("Invalid input for number two."); + } + data["b"] = num; + + return data.dump(); +} + +std::string promptMatrixMultiplicationTask() { + int matrixSize; + std::cout << "Enter matrix size:" << std::endl; + std::cout << "> "; + std::cin >> matrixSize; + + // Optimized random matrix generation using pre-allocated memory + std::vector randomNumbers(matrixSize * matrixSize * 2); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib(0, 99); + std::generate(randomNumbers.begin(), randomNumbers.end(), [&]() { return distrib(gen); }); + + json data = json::array(); + int index = 0; + for (int i = 0; i < 2; ++i) { + json matrixJson = json::array(); + for (int j = 0; j < matrixSize; ++j) { + json rowJson = json::array(); + for (int k = 0; k < matrixSize; ++k) { + rowJson.emplace_back(randomNumbers[index++]); + } + matrixJson.emplace_back(rowJson); + } + data.emplace_back(matrixJson); + } + + return data.dump(); +} + + +std::string promptMatrixDeterminantTask() { + int matrixSize, matrixCount; + std::cout << "Enter matrix size:" << std::endl; + std::cout << "> "; + std::cin >> matrixSize; + std::cout << "Enter matrix count:" << std::endl; + std::cout << "> "; + std::cin >> matrixCount; + + // Optimized random matrix generation using pre-allocated memory + std::vector randomNumbers(matrixSize * matrixSize * matrixCount); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib(0, 99); + std::generate(randomNumbers.begin(), randomNumbers.end(), [&]() { return distrib(gen); }); + + json data = json::array(); + int index = 0; + std::cout << "Start randoming " << std::endl; + for (int i = 0; i < matrixCount; ++i) { + json matrixJson = json::array(); + for (int j = 0; j < matrixSize; ++j) { + json rowJson = json::array(); + for (int k = 0; k < matrixSize; ++k) { + rowJson.emplace_back(randomNumbers[index++]); + } + matrixJson.emplace_back(rowJson); + } + data.emplace_back(matrixJson); + } + + std::cout << "End randoming" << std::endl; + return data.dump(); +} + +void *receiverThread(void *arg) { + auto *client = static_cast(arg); + client->receiveResutls(); + return nullptr; +} + + +void *senderThread(void *arg) { + auto *client = static_cast(arg); + while (true) { + std::cout << "Enter task number:" << std::endl; + std::cout << "1 - simple math" << std::endl; + std::cout << "2 - matrix determinant" << std::endl; + std::cout << "3 - matrix multiplication" << std::endl; + std::cout << "0 - exit console" << std::endl; + std::cout << "> "; + + int taskNum; + std::cin >> taskNum; + if (std::cin.fail()) { + std::cin.clear(); + std::cin.ignore(std::numeric_limits::max(), '\n'); + std::cout << "Invalid input. Please enter a valid task number." << std::endl; + continue; + } + + std::string requestFunc; + std::string requestParams; + // enter cores + int cores; + std::cout << "Enter cores count:" << std::endl; + std::cout << "> "; + std::cin >> cores; + + try { + switch (taskNum) { + case 1: + requestParams = promptSimpleMathTask(); + requestFunc = "simpleMath"; + break; + + case 2: + requestParams = promptMatrixDeterminantTask(); + requestFunc = "determinant"; + break; + + case 3: + requestParams = promptMatrixMultiplicationTask(); + requestFunc = "matrixMultiplication"; + break; + +// int t, tasksCount, delay; +// std::cout << "Enter tasks count:" << std::endl; +// std::cout << "> "; +// std::cin >> tasksCount; +// std::cout << "Enter delay between tasks:" << std::endl; +// std::cout << "> "; +// std::cin >> delay; +// for (t = 0; t < tasksCount; t++) { +// json data; +// data["a"] = rand() % 100; +// data["b"] = rand() % 100; +// int taskId = rand(); +// client->sendTask(TaskRequest{std::to_string(taskId), "simpleMath", data.dump(), cores}); +// std::cout << "Task " << taskId << " sent" << std::endl; +// std::this_thread::sleep_for(std::chrono::milliseconds(delay)); +// } +// +// requestParams = promptMatrixDeterminantTask(); +// requestFunc = "determinant"; +// break; + + case 0: + return nullptr; + + default: + std::cout << "Unknown task number" << std::endl; + continue; + } + + TaskRequest tr{std::to_string(rand()), requestFunc, requestParams, cores}; + client->sendTask(tr); + } catch (const std::runtime_error &e) { + std::cout << "Error: " << e.what() << std::endl; + } + } + return nullptr; +} + +int main(int argc, const char *argv[]) { + argparse::ArgumentParser program("RabbitClient"); + program.add_argument("-i", "--id") + .help("Worker ID") + .default_value("1234"); + program.add_argument("-h", "--host") + .help("Server Host") + .default_value("localhost"); + program.add_argument("-p", "--port") + .help("Server Port") + .default_value(12345) + .action([](const std::string &value) { return std::stoi(value); }); + program.add_description("RabbitClient"); + program.add_epilog("RabbitClient is a client for Rabbit project"); + + try { + program.parse_args(argc, argv); + } catch (const std::runtime_error &err) { + std::cout << err.what() << std::endl; + std::cout << program; + return 0; + } + + auto id = program.get("--id"); + auto host = program.get("--host"); + int port = program.get("--port"); + RabbitClient client(id, host, port); + client.init(); + + pthread_t receiverThreadId, senderThreadId; + pthread_create(&receiverThreadId, nullptr, receiverThread, &client); + pthread_create(&senderThreadId, nullptr, senderThread, &client); + + pthread_join(receiverThreadId, nullptr); + pthread_join(senderThreadId, nullptr); + + return 0; +} diff --git a/src/cmd/serverMain.cpp b/src/cmd/serverMain.cpp index f81f176..984bf9d 100644 --- a/src/cmd/serverMain.cpp +++ b/src/cmd/serverMain.cpp @@ -7,7 +7,6 @@ #include int main(int argc, const char *argv[]) { -// std::cout << "Hello, World!" << std::endl; argparse::ArgumentParser program("RabbitServer"); program.add_argument("-p", "--port") .help("Port to listen") @@ -15,7 +14,7 @@ int main(int argc, const char *argv[]) { .action([](const std::string &value) { return std::stoi(value); }); program.add_description("RabbitServer"); program.add_epilog("RabbitServer is a server for Rabbit project"); -// program.parse_args(argc, argv); + try { program.parse_args(argc, argv); } catch (const std::runtime_error &err) { @@ -25,7 +24,7 @@ int main(int argc, const char *argv[]) { } int port = program.get("--port"); - RabbitServer server(12345); + RabbitServer server(port); server.init(); server.startPolling(); return 0; diff --git a/src/cmd/workerMain.cpp b/src/cmd/workerMain.cpp new file mode 100644 index 0000000..5f886bb --- /dev/null +++ b/src/cmd/workerMain.cpp @@ -0,0 +1,45 @@ +// +// Created by Serge on 23.03.2024. +// + +#include "rabbitCore/worker/RabbitWorker.h" +#include +#include + +int main(int argc, const char *argv[]) { + argparse::ArgumentParser program("RabbitWorker"); + program.add_argument("-i", "--id") + .help("Worker ID") + .default_value("1234"); + program.add_argument("-h", "--host") + .help("Server Host") + .default_value("localhost"); + program.add_argument("-p", "--port") + .help("Server Port") + .default_value(12345) + .action([](const std::string &value) { return std::stoi(value); }); + program.add_argument("-c", "--cores") + .help("Max cores") + .default_value(4) + .action([](const std::string &value) { return std::stoi(value); }); + program.add_description("RabbitWorker"); + program.add_epilog("RabbitWorker is a worker for Rabbit project"); + + try { + program.parse_args(argc, argv); + } catch (const std::runtime_error &err) { + std::cout << err.what() << std::endl; + std::cout << program; + return 0; + } + + auto id = program.get("--id"); + auto host = program.get("--host"); + int port = program.get("--port"); + int cores = program.get("--cores"); + RabbitWorker worker(id, host, port, cores); + worker.init(); + std::cout << "Worker [" << id << "] started on [" << host << ":" << port << "]" << std::endl; + worker.startPolling(); + return 0; +} \ No newline at end of file diff --git a/src/protocol/Connection.cpp b/src/protocol/Connection.cpp index 42d921a..f43ce5f 100644 --- a/src/protocol/Connection.cpp +++ b/src/protocol/Connection.cpp @@ -352,4 +352,3 @@ namespace STIP { return nullptr; } } - diff --git a/src/rabbitCore/client/RabbitClient.cpp b/src/rabbitCore/client/RabbitClient.cpp index 1c55858..952b797 100644 --- a/src/rabbitCore/client/RabbitClient.cpp +++ b/src/rabbitCore/client/RabbitClient.cpp @@ -3,3 +3,121 @@ // #include "RabbitClient.h" + +#include +#include "DataModel/TaskResult.h" +#include "protocol/STIP.h" +#include "client/STIPClient.h" +#include "protocol/Connection.h" +#include "DataModel/Client.h" +#include "Message.h" +#include + +using namespace STIP; + +using boost::asio::ip::udp; + +RabbitClient::RabbitClient(std::string id, std::string host, int port) { + this->id = std::move(id); + this->host = std::move(host); + this->port = port; +} + +void RabbitClient::init() { + resolver = new udp::resolver(io_context); + auto endpoints = resolver->resolve(udp::v4(), host, std::to_string(port)); + server_endpoint = new udp::endpoint(*endpoints.begin()); + + server_socket = new udp::socket(io_context); + server_socket->open(udp::v4()); + + client = new STIP::STIPClient(*server_socket); + client->startListen(); + + connection = client->connect(*server_endpoint); + + // Register client + if (connection) { + // Create Client object to send + Client clientInfo = { + id, + connection + }; + + nlohmann::json clientJson; + to_json(clientJson, clientInfo); + + Message message = { + MessageType::RegisterClient, + clientJson.dump() + }; + + nlohmann::json messageJson; + to_json(messageJson, message); + + std::string msg = messageJson.dump(); + connection->sendMessage(msg); + } else { + std::cerr << "Error: Failed to connect to server." << std::endl; + } + +//////////// + +// +// try { +// Message test = { +// MessageType::Invalid, +// "test" +// }; +// json testJson = test; +// connection->sendMessage(testJson.dump()); +// } catch (std::exception e) { +// std::cerr << "Error sending Test message 2: " << e.what() << std::endl; +// return; +// } +} + +void RabbitClient::receiveResutls() { + for (;;) { + STIP::ReceiveMessageSession *received = connection->receiveMessage(); + auto message = json::parse(received->getDataAsString()).template get(); + auto item = json::parse(message.data); + struct TaskResult result; + try { + result = item.template get(); + } catch (json::exception &e) { + std::cerr << "Error parsing task: " << e.what() << std::endl; + continue; + } + + json data = json::parse(result.data); + std::string pretty = data.dump(2); + int count = std::count(pretty.begin(), pretty.end(), '\n'); + +#define PRETTY_MAX_LINES_THR 35 + if (count > PRETTY_MAX_LINES_THR) { + pretty = data.dump(); + } + + std::cout << pretty << std::endl; + +// if (data.is_array()) { +// for (auto &row: data) { +// std::cout << row << std::endl; +// } +// } else { +// std::cout << data << std::endl; +// +// } + } +} + +void RabbitClient::sendTask(struct TaskRequest t) { + json task = t; + Message m = { + MessageType::TaskRequest, + task.dump() + }; + json message = m; + connection->sendMessage(message.dump()); +} diff --git a/src/rabbitCore/client/RabbitClient.h b/src/rabbitCore/client/RabbitClient.h index cae6ebe..14f0653 100644 --- a/src/rabbitCore/client/RabbitClient.h +++ b/src/rabbitCore/client/RabbitClient.h @@ -2,13 +2,45 @@ // Created by Serge on 23.03.2024. // -#ifndef RABBIT_RABBITCLIENT_H -#define RABBIT_RABBITCLIENT_H +#ifndef RABBIT_RabbitClient_H +#define RABBIT_RabbitClient_H +#include +#include "protocol/Connection.h" +#include +#include "DataModel/TaskRequest.h" +#include "client/STIPClient.h" + +using boost::asio::ip::udp; +using json = nlohmann::json; class RabbitClient { +public: + RabbitClient(std::string id, std::string host, int port); -}; + void init(); + void receiveResutls(); + void sendTask(TaskRequest t); + + ~RabbitClient() { + delete server_socket; + } + + void testMessage(std::string msg); +private: + STIP::STIPClient *client; + + std::string id; + std::string host; + int port; + STIP::Connection *connection{}; + udp::socket *server_socket{}; + + udp::resolver *resolver; + udp::endpoint *server_endpoint; + + boost::asio::io_context io_context; +}; -#endif //RABBIT_RABBITCLIENT_H +#endif //RABBIT_RabbitClient_H diff --git a/src/rabbitCore/proto.md b/src/rabbitCore/proto.md new file mode 100644 index 0000000..c7a360e --- /dev/null +++ b/src/rabbitCore/proto.md @@ -0,0 +1,56 @@ +task request +```json +{ + "id": "id-123345", + "queue": "calcMatrixMult", + "message": "data:{'matrixA':[[1,2],[3,4]], 'matrixB':[[1,2],[3,4]]}", + "result_message": "", + "internal_status": "", + "worker_hash_id": "", + "client_hash_id": "" +} +``` + +simple math task +```json +{ + "a": 1, + "b": 2 +} +``` + +matrix determinant +```json +[ + [[1, 2], [3, 4]], + [[1, 2], [3, 4]], + [[1, 2], [3, 4]] +] +``` + +Actions: +- 0 - auth +- 1 - task request +- 2 - task result + +```json +{ + "action": 1, + "data": { // data related only to this action + "field": "value" +} +} +``` + +## Register + +```json +{ + "action": "register", + "data": { + "hash_id": "hash_id", + "type": "worker", + "queue": "calcMatrixMult" + } +} +``` \ No newline at end of file diff --git a/src/rabbitCore/server/RabbitServer.cpp b/src/rabbitCore/server/RabbitServer.cpp index d578b84..54579cd 100644 --- a/src/rabbitCore/server/RabbitServer.cpp +++ b/src/rabbitCore/server/RabbitServer.cpp @@ -4,67 +4,379 @@ #include "protocol/STIP.h" #include "server/STIPServer.h" #include "protocol/Connection.h" +#include "DataModel/Message.h" #include "RabbitServer.h" +#include "TaskRequest.h" +#include "TaskResult.h" +#include +#include // For std::put_time using namespace STIP; using boost::asio::ip::udp; - - RabbitServer::RabbitServer(int port) { this->port = port; + taskService = TaskService(); } void RabbitServer::init() { - - - // Создаем UDP сокет для приема запросов на порту 12345 server_socket = new udp::socket(io_context, udp::endpoint(udp::v4(), port)); -// server_socket = new udp::socket(io_context, udp::endpoint(udp::v4(), port)); - - auto storage = make_storage("db.sqlite", - make_table("users", - make_column("id", - &User::id, - autoincrement(), - primary_key()), - make_column("first_name", - &User::firstName), - make_column("last_name", - &User::lastName), - make_column("birth_date", - &User::birthDate), - make_column("image_url", - &User::imageUrl), - make_column("type_id", - &User::typeId)), - make_table("user_types", - make_column("id", - &UserType::id, - autoincrement(), - primary_key()), - make_column("name", - &UserType::name, - default_value("name_placeholder")))); } void RabbitServer::startPolling() { STIPServer server(*server_socket); + std::cout << "Server started on port " << port << std::endl; + std::vector threadsProcessors; for (;;) { Connection *connection = server.acceptConnection(); + if (connection == nullptr) break; std::cout << "Connection accepted\n\n" << std::endl; - std::thread(&RabbitServer::processConnection, this, connection).detach(); + + + //std::thread(&RabbitServer::processConnection, this, connection).detach(); + // as in up string but with addding to vector + + threadsProcessors.emplace_back(&RabbitServer::processConnection, this, connection); } + + std::cout << "Server thread finished\n\n" << std::endl; + // hard stopping + server_socket->close(); + server_socket->shutdown(udp::socket::shutdown_both); + + // exit program + exit(0); } void RabbitServer::processConnection(STIP::Connection *connection) { - std::cout << "Connection accepted\n\n" << std::endl; + auto receiveMessage = connection->receiveMessage(); + json request = json::parse(receiveMessage->getDataAsString()); + +#ifdef SERVER_ARCH_DEBUG + std::cout << "Received message: " << request.dump() << std::endl; +#endif + Message message; + json data; + + try { + message = request.template get(); + data = json::parse(message.data); + } catch (json::exception &e) { + std::cerr << "Error parsing message: " << e.what() << std::endl; + return; + } + + switch (message.action) { + case MessageType::RegisterClient: { + std::cout << logTime() << "Received Client registration request\n"; + Client client; + try { + client = data.template get(); + client.connection = connection; + std::cout << logTime() << "Client registered: " << client.id << std::endl; + } catch (json::exception &e) { + std::cerr << logTime() << "Error parsing client: " << e.what() << std::endl; + break; + } + + userDBService.addClient(client); + + try { + processClient(client); + } catch (std::exception &e) { + std::cerr << logTime() << "Error processing client: " << e.what() << std::endl; + } + + userDBService.removeClient(client); + std::cout << logTime() << "Client disconnected: " << client.id << std::endl; + break; + } + + case MessageType::RegisterWorker: { + std::cout << logTime() << "Received Worker registration request\n"; + Worker worker; + try { + worker = data.template get(); + worker.connection = connection; + std::cout << logTime() << "Worker registered: " << worker.id << " (Cores: " << worker.cores << ")\n"; + } catch (json::exception &e) { + std::cerr << logTime() << "Error parsing worker: " << e.what() << std::endl; + break; + } + + userDBService.addWorker(worker); + + try { + processWorker(worker); + } catch (std::exception &e) { + std::cerr << "Error processing worker: " << e.what() << std::endl; + } + + userDBService.removeWorker(worker); + std::cout << logTime() << "Worker disconnected: " << worker.id << std::endl; + break; + } + + default: + std::cerr << logTime() << "Unknown action: " << message.action << std::endl; + break; + } + +// connection->kill(); + delete receiveMessage; + delete connection; +} + +void RabbitServer::processWorker(Worker &worker) { + checkTaskQueue(worker); + + for (;;) { + auto receiveMessage = worker.connection->receiveMessage(); +// std::cout << "Received message: " << receiveMessage->getDataAsString() << std::endl; + json request = json::parse(receiveMessage->getDataAsString()); // TODO change parse +#ifdef SERVER_ARCH_DEBUG + std::cout << "Received message: " << request.dump() << std::endl; +#endif + Message message; + json data; + try { + message = request.template get(); + data = json::parse(message.data); + } catch (json::exception &e) { + std::cerr << "Error parsing message: " << e.what() << std::endl; + continue; + } + + switch (message.action) { + case MessageType::TaskResult: { + std::cout << logTime() << "Received TaskResult from worker: " << worker.id << std::endl; + struct TaskResult result; + try { + result = data.template get(); + } catch (json::exception &e) { + std::cerr << logTime() << "Error parsing task: " << e.what() << std::endl; + continue; + } + + Task task = taskService.findTaskByID(result.id); + +#ifdef SERVER_ARCH_DEBUG + std::cout << json(task).dump() << std::endl; +#endif + + task.status = TaskStatus::Ready; + std::cout << logTime() << "Task marked as Ready: " << result.id << std::endl; + task.output = result.data; + + taskService.updateTask(task); + userDBService.modifyWorkerUsedCores(task.worker_hash_id, task.cores, false); + + Client client = userDBService.findClientByID(task.client_hash_id); + + Message result_message = { + MessageType::TaskResult, + message.data + }; + + try { + client.connection->sendMessage(json(result_message).dump()); + } catch (json::exception &e) { + std::cerr << logTime() << "Error sending task result to client: " << e.what() << std::endl; + continue; + } - std::string message = "Hello, I'm Ilya"; - connection->sendMessage(message); + std::cout << logTime() << "Sent TaskResult to client: " << client.id << std::endl; -// ReceiveMessageSession *received = connection->receiveMessage(); -// std::cout << "Received message: " << received->getDataAsString() << std::endl; + checkTaskQueue(worker); + break; + } + + default: + std::cerr << logTime() << "Unknown action: " << message.action << std::endl; + break; + } + } +} + +void RabbitServer::processClient(Client &client) { + std::vector threads; + for (;;) { + auto receiveMessage = client.connection->receiveMessage(); +#ifdef SERVER_ARCH_DEBUG + std::cout << "Received message: " << receiveMessage->getDataAsString() << std::endl; +#endif +// json request = json::parse(receiveMessage->getDataAsString()); +// std::cout << "Received message: " << request.dump() << std::endl; + std::cout << "start converting" << std::endl; + auto rawMessage = receiveMessage->getData(); + char *payload = static_cast(rawMessage.first); + std::cout << "start parsing" << std::endl; + json request = json::parse(payload, payload + rawMessage.second); + + Message message; + json data; + try { + std::cout << "Loaded to request" << std::endl; + message = request.template get(); + data = json::parse(message.data); + std::cout << "Loaded to data1" << std::endl; + } catch (json::exception &e) { + std::cerr << "Error parsing message: " << e.what() << std::endl; + continue; + } + + switch (message.action) { + case MessageType::TaskRequest: { + std::cout << logTime() << "Received TaskRequest from client: " << client.id << std::endl; + struct TaskRequest taskRequest; + try { +// std::cout << "Task data: " << data.dump() << std::endl; + std::cout << "Loaded to data2" << std::endl; + taskRequest = data.template get(); + std::cout << "Loaded to taskRequest" << std::endl; + std::cout << logTime() << "Task added: " << taskRequest.id << " (Cores: " << taskRequest.cores + << ")" + << std::endl; + } catch (json::exception &e) { + std::cerr << logTime() << "Error parsing task: " << e.what() << std::endl; + break; + } + Task task = { + taskRequest.id, + taskRequest.func, + taskRequest.data, + "", + taskRequest.cores, + TaskStatus::Queued, + "", + client.id + }; + try { + taskService.addTask(task); +// std::thread(&RabbitServer::processTask, this, std::ref(task)).detach(); + threads.emplace_back([this, task]() mutable { + this->processTask(task); + }); + } catch (std::exception &e) { + std::cerr << logTime() << "Error processing task: " << e.what() << std::endl; + } + break; + } + + default: + std::cerr << logTime() << "Unknown action: " << message.action << std::endl; + break; + } + } + for (auto &thread: threads) { + if (thread.joinable()) { + thread.join(); + } + } } + +void RabbitServer::checkTaskQueue(Worker &worker) { + userDBService.printLog(); + std::cout << logTime() << "Checking task queue for worker " << worker.id << std::endl; + std::cout << logTime() << "Worker " << worker.id << " has " << worker.cores - worker.usedCores + << " free cores " << "(cores: " << worker.cores << ", used: " << worker.usedCores << ")\n"; + + Task pendingTask; + if (pendingTasks.tryDequeue(pendingTask, worker.cores - worker.usedCores)) { + std::cout << logTime() << "Assigning pending task " << pendingTask.id << " to worker " + << worker.id << "\n"; + + struct TaskRequest taskRequest = { + pendingTask.id, + pendingTask.func, + pendingTask.input, + pendingTask.cores + }; + + Message message = { + MessageType::TaskRequest, + json(taskRequest).dump() + }; + + // update worker + userDBService.modifyWorkerUsedCores(worker.id, pendingTask.cores, true); + +// worker.usedCores += pendingTask.cores; +// std::cout << logTime() << "Update cores: increase " << pendingTask.cores << std::endl; +// userDBService.updateWorker(worker); +// std::cout << logTime() << "Update cores: Worker " << worker.id << " used cores: " << worker.usedCores +// << std::endl; + + pendingTask.worker_hash_id = worker.id; + pendingTask.status = TaskStatus::SentToWorker; + taskService.updateTask(pendingTask); + + try { + json message_json = message; + std::string strToSend = json(message_json).dump(); + worker.connection->sendMessage(strToSend); + } catch (json::exception &e) { + std::cerr << logTime() << "[checkTaskQueue] Error sending task to worker: " << e.what() << std::endl; + return; + } + + std::cout << logTime() << "Task " << pendingTask.id << " sent to worker " << worker.id << std::endl; + } +} + + +void RabbitServer::processTask(Task &task) { + userDBService.printLog(); + Worker worker = userDBService.findMostFreeWorker(task.cores); + std::cout << logTime() << "Found worker: " << worker.id << " (Cores: " << worker.cores << ")\n"; + + if (worker.id.empty()) { + pendingTasks.enqueue(task); + std::cout << logTime() << "Task " << task.id << " added to queue.\n"; + return; + } + + task.worker_hash_id = worker.id; + task.status = TaskStatus::SentToWorker; + taskService.updateTask(task); + + // update worker + userDBService.modifyWorkerUsedCores(worker.id, task.cores, true); + +// worker.usedCores += task.cores; +// std::cout << logTime() << "Update cores: increase " << task.cores << std::endl; +// userDBService.updateWorker(worker); +// std::cout << logTime() << "Update cores: Worker " << worker.id << " used cores: " << worker.usedCores << std::endl; + + struct TaskRequest taskRequest = { + task.id, + task.func, + task.input, + task.cores + }; + + Message message = { + MessageType::TaskRequest, + json(taskRequest).dump() + }; + + try { + std::string strToSend = json(message).dump(); + worker.connection->sendMessage(strToSend); + } catch (json::exception &e) { + std::cerr << logTime() << "[processTask] Error sending task to worker: " << e.what() << std::endl; + return; + } + std::cout << logTime() << "Task " << task.id << " sent to worker " << worker.id << std::endl; +} + +std::string RabbitServer::logTime() { + auto now = std::chrono::system_clock::now(); + auto in_time_t = std::chrono::system_clock::to_time_t(now); + + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %X: "); + return ss.str(); +} \ No newline at end of file diff --git a/src/rabbitCore/server/RabbitServer.h b/src/rabbitCore/server/RabbitServer.h index 6fdfa66..d5b37b3 100644 --- a/src/rabbitCore/server/RabbitServer.h +++ b/src/rabbitCore/server/RabbitServer.h @@ -6,29 +6,50 @@ #define RABBIT_RABBITSERVER_H #include -#include #include "protocol/Connection.h" +#include "services/TaskService/TaskService.h" +#include "services/UserDBService/UserDBService.h" +#include "services/TaskQueue/TaskQueue.h" +#include using boost::asio::ip::udp; -using namespace sqlite_orm; +using json = nlohmann::json; class RabbitServer { - public: - RabbitServer(int port); +public: + RabbitServer(int port); - void init(); - void startPolling(); + void init(); - void processConnection(STIP::Connection *connection); + void startPolling(); - ~RabbitServer() { - delete server_socket; - } - private: - int port; - boost::asio::io_context io_context; - udp::socket* server_socket; + void processConnection(STIP::Connection *connection); + ~RabbitServer() { + delete server_socket; + } + +private: + int port; + boost::asio::io_context io_context; + udp::socket *server_socket{}; + + TaskService taskService; + UserDBService userDBService; + TaskQueue pendingTasks; + + udp::resolver *resolver; + udp::endpoint *server_endpoint; + + void processWorker(Worker &worker); + + void processClient(Client &client); + + void processTask(Task &task); + + void checkTaskQueue(Worker &worker); + + std::string logTime(); }; diff --git a/src/rabbitCore/worker/RabbitWorker.cpp b/src/rabbitCore/worker/RabbitWorker.cpp new file mode 100644 index 0000000..f835143 --- /dev/null +++ b/src/rabbitCore/worker/RabbitWorker.cpp @@ -0,0 +1,279 @@ +// +// Created by Serge on 23.03.2024. +// + +#include "RabbitWorker.h" + +#include +#include "protocol/STIP.h" +#include "server/STIPServer.h" +#include "protocol/Connection.h" +#include "client/STIPClient.h" +#include "DataModel/TaskResult.h" +#include "DataModel/Worker.h" +#include "DataModel/Message.h" +#include "TaskRequest.h" + +using boost::asio::ip::udp; + +//typedef void (RabbitWorker::*func_type)(void); +//typedef std::map func_map_type; + +RabbitWorker::RabbitWorker(std::string id, std::string host, int port, int cores) { + std::cout << "RabbitWorker::RabbitWorker - Initializing with id: " << id << ", host: " << host << ", port: " << port + << ", cores: " << cores << std::endl; + this->id = std::move(id); + this->host = std::move(host); + this->port = port; + this->cores = cores; +} + +void RabbitWorker::init() { + std::cout << "RabbitWorker::init - Initialization started" << std::endl; + resolver = new udp::resolver(io_context); + auto endpoints = resolver->resolve(udp::v4(), host, std::to_string(port)); + server_endpoint = new udp::endpoint(*endpoints.begin()); + + server_socket = new udp::socket(io_context); + server_socket->open(udp::v4()); + + client = new STIP::STIPClient(*server_socket); + client->startListen(); + + connection = client->connect(*server_endpoint); + + mapping["simpleMath"] = &RabbitWorker::simpleMathHandler; + mapping["determinant"] = &RabbitWorker::determinantHandler; + mapping["matrixMultiplication"] = &RabbitWorker::matrixMultiplicationHandler; + + // Register worker + if (connection) { + std::cout << "RabbitWorker::init - Connection established" << std::endl; + // Create Worker object to send + Worker worker = { + id, + cores, + 0, + nullptr + }; + + nlohmann::json workerJson; + to_json(workerJson, worker); + + Message message = { + MessageType::RegisterWorker, + workerJson.dump() + }; + + nlohmann::json messageJson; + to_json(messageJson, message); + + std::string msg = messageJson.dump(); + connection->sendMessage(msg); + std::cout << "RabbitWorker::init - Worker registered with server" << std::endl; + } else { + std::cerr << "RabbitWorker::init - Error: Failed to connect to server." << std::endl; + } +} + +void RabbitWorker::startPolling() { + std::cout << "RabbitWorker::startPolling - Polling started" << std::endl; + for (;;) { + STIP::ReceiveMessageSession *received = connection->receiveMessage(); +// std::cout << "RabbitWorker::startPolling - Received message: " << received->getDataAsString() << std::endl; + json request = json::parse(received->getDataAsString()); +// std::cout << "RabbitWorker::startPolling - Received message: " << request.dump() << std::endl; + + Message message = request.get(); + json messageData = json::parse(message.data); + switch (message.action) { + case MessageType::TaskRequest: { + std::cout << "RabbitWorker::startPolling - Received task request" << std::endl; + struct TaskRequest task = messageData.get(); + json data = json::parse(task.data); + + if (mapping.find(task.func) != mapping.end()) { + std::cout << "RabbitWorker::startPolling - Executing handler for func: " << task.func << ", id: " + << task.id << std::endl; + (this->*mapping[task.func])(task.id, data, task.cores); + } else { + std::cout << "RabbitWorker::startPolling - Function not found: " << task.func << std::endl; + } + break; + } + default: + std::cout << "RabbitWorker::startPolling - Unknown message type" << std::endl; + break; + } + } +} + +// worker function implementations + +void RabbitWorker::doWait(int seconds) { + std::cout << "RabbitWorker::doWait - Waiting for " << seconds << " seconds" << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(seconds)); +} + +int RabbitWorker::simpleMath(int a, int b) { + std::cout << "RabbitWorker::simpleMath - Adding " << a << " + " << b << std::endl; + int c = a + b; + std::this_thread::sleep_for(std::chrono::seconds(2)); + std::cout << "RabbitWorker::simpleMath - Result is " << c << std::endl; + return c; +} + +int RabbitWorker::determinant(std::vector> matrix, int n) { + std::cout << "RabbitWorker::determinant - Calculating determinant for matrix of size " << n << std::endl; + int det = 1; + + for (int i = 0; i < n; i++) { + for (int j = i + 1; j < n; j++) { + while (matrix[j][i] != 0) { + int ratio = matrix[i][i] / matrix[j][i]; + for (int k = i; k < n; k++) { + matrix[i][k] -= ratio * matrix[j][k]; + } + std::swap(matrix[i], matrix[j]); + det *= -1; + } + } + det *= matrix[i][i]; + } + + std::cout << "RabbitWorker::determinant - Determinant result is " << det << std::endl; + return det; +} + +void RabbitWorker::matrixMultiplication(const std::vector> &matrixA, + const std::vector> &matrixB, + std::vector> &resultMatrix, int row, int col) { + int colsA = matrixA[0].size(); + for (int k = 0; k < colsA; ++k) { + resultMatrix[row][col] += matrixA[row][k] * matrixB[k][col]; + } +} + +void RabbitWorker::simpleMathHandler(const std::string &request_id, json data, int taskCores) { + std::cout << "RabbitWorker::simpleMathHandler - Handling simpleMath for request_id: " << request_id << std::endl; + int a, b, result; + try { + a = data["a"]; + b = data["b"]; + result = simpleMath(a, b); + } catch (json::exception &e) { + std::cerr << "RabbitWorker::simpleMathHandler - Error parsing data: " << e.what() << std::endl; + return; + } + + struct TaskResult taskResult = { + request_id, + json({{"result", result}}).dump(), + 1 + }; + + Message message = { + MessageType::TaskResult, + json(taskResult).dump() + }; + + connection->sendMessage(json(message).dump()); + std::cout << "RabbitWorker::simpleMathHandler - Result sent for request_id: " << request_id << std::endl; +} + +void RabbitWorker::determinantHandler(const std::string &request_id, json data, int taskCores) { + std::cout << "RabbitWorker::determinantHandler - Handling determinant for request_id: " << id << std::endl; + std::vector threads; + threads.reserve(taskCores); + + std::vector>> matrices = data.get>>>(); + + std::vector results(matrices.size()); + + for (int i = 0; i < matrices.size(); ++i) { + threads.emplace_back([this, &matrices, &results, i] { + std::cout << "RabbitWorker::determinantHandler - Starting thread for matrix " << i << std::endl; + results[i] = this->determinant(matrices[i], matrices[i].size()); + }); + + if (threads.size() == taskCores || i == matrices.size() - 1) { + for (auto &t: threads) { + t.join(); + } + threads.clear(); + } + } + + struct TaskResult taskResult = { + request_id, + json(results).dump(), + 1 + }; + + Message message = { + MessageType::TaskResult, + json(taskResult).dump() + }; + + connection->sendMessage(json(message).dump()); + std::cout << "RabbitWorker::determinantHandler - Results sent for request_id: " << request_id << std::endl; +} + +void RabbitWorker::matrixMultiplicationHandler(const std::string &request_id, json data, int taskCores) { + std::cout << "RabbitWorker::matrixMultiplicationHandler - Handling matrix multiplication for request_id: " + << request_id << std::endl; + std::vector threads; + threads.reserve(taskCores); + + // засекаем время + auto start = std::chrono::high_resolution_clock::now(); + + std::vector>> matrices = data.get>>>(); + std::vector> matrixA = matrices[0]; + std::vector> matrixB = matrices[1]; + + if (matrixA[0].size() != matrixB.size()) { + std::cerr << "RabbitWorker::matrixMultiplicationHandler - Matrix dimensions do not match for multiplication" + << std::endl; + return; + } + + int rowsA = matrixA.size(); + int colsB = matrixB[0].size(); + std::vector> resultMatrix(rowsA, std::vector(colsB, 0)); + + for (int row = 0; row < rowsA; ++row) { + for (int col = 0; col < colsB; ++col) { + threads.emplace_back(&RabbitWorker::matrixMultiplication, this, std::ref(matrixA), std::ref(matrixB), + std::ref(resultMatrix), row, col); + + if (threads.size() == taskCores || (row == rowsA - 1 && col == colsB - 1)) { + for (auto &t: threads) { + t.join(); + } + threads.clear(); + } + } + } + + struct TaskResult taskResult = { + request_id, + json(resultMatrix).dump(), + 1 + }; + + Message message = { + MessageType::TaskResult, + json(taskResult).dump() + }; + + connection->sendMessage(json(message).dump()); + std::cout << "RabbitWorker::matrixMultiplicationHandler - Results sent for request_id: " << request_id << std::endl; + + // считаем время выполнения + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = end - start; + std::cout << "RabbitWorker::matrixMultiplicationHandler - Execution time: " << elapsed.count() << "s" << std::endl; +} + + diff --git a/src/rabbitCore/worker/RabbitWorker.h b/src/rabbitCore/worker/RabbitWorker.h new file mode 100644 index 0000000..f2e9a62 --- /dev/null +++ b/src/rabbitCore/worker/RabbitWorker.h @@ -0,0 +1,75 @@ +// +// Created by Serge on 23.03.2024. +// + +#ifndef RABBIT_RabbitWorker_H +#define RABBIT_RabbitWorker_H + +#include +#include "protocol/Connection.h" +#include "client/STIPClient.h" +#include +#include +#include +#include +#include +#include + +using boost::asio::ip::udp; +using json = nlohmann::json; + +class RabbitWorker; + +typedef void (RabbitWorker::*func_type)(const std::string&, json, int); + +typedef std::map func_map_type; + +class RabbitWorker { +public: + RabbitWorker(std::string id, std::string host, int port, int cores); + + void init(); + + void startPolling(); + + ~RabbitWorker() { + delete server_socket; + } + +private: + STIP::STIPClient *client; + + std::string id; + std::string host; + int port; + int cores; + STIP::Connection *connection{}; + + udp::resolver *resolver; + udp::endpoint *server_endpoint; + + boost::asio::io_context io_context; + udp::socket *server_socket{}; + + // functions (demo for now) + + int simpleMath(int a, int b); + + int determinant(std::vector> matrix, int n); + + void matrixMultiplication(const std::vector> &matrixA, const std::vector> &matrixB, + std::vector> &resultMatrix, int row, int col); + + static void doWait(int seconds); + + void simpleMathHandler(const std::string& request_id, json data, int taskCores); + + void determinantHandler(const std::string& id, json data, int taskCores); + + void matrixMultiplicationHandler(const std::string& id, json data, int taskCores); + + func_map_type mapping; +}; + + +#endif //RABBIT_RabbitWorker_H diff --git a/src/server/STIPServer.cpp b/src/server/STIPServer.cpp index 6bd5f40..f0acd0c 100644 --- a/src/server/STIPServer.cpp +++ b/src/server/STIPServer.cpp @@ -17,6 +17,7 @@ namespace STIP { udp::endpoint remote_endpoint; size_t length = this->socket->receive_from(boost::asio::buffer(packet), remote_endpoint, 0, error); if (error && error != boost::asio::error::message_size || length == 0) { + std::cerr << "Receive failed in acceptConnection: " << error.message() << std::endl; return nullptr; // throw boost::system::system_error(error); } diff --git a/src/services/TaskQueue/TaskQueue.h b/src/services/TaskQueue/TaskQueue.h new file mode 100644 index 0000000..76c99f9 --- /dev/null +++ b/src/services/TaskQueue/TaskQueue.h @@ -0,0 +1,70 @@ +// +// Created by Potato on 22.05.24. +// + +#ifndef RABBIT_TASKQUEUE_H +#define RABBIT_TASKQUEUE_H + +#include "protocol/STIP.h" +#include "server/STIPServer.h" +#include "protocol/Connection.h" +#include "DataModel/Message.h" +#include "Task.h" +#include +#include +#include +#include +#include + +class TaskQueue { +public: + void enqueue(const Task &task) { + std::lock_guard lock(queueMutex); + taskQueue.push(task); + saveStateAsMarkdown("tasksQueue.md"); + } + + // Try to dequeue a task that meets the core requirement. + // Returns true and the task reference if successful, + // otherwise false and the task reference remains unchanged. + bool tryDequeue(Task &task, int requiredCores) { + std::lock_guard lock(queueMutex); + if (!taskQueue.empty() && taskQueue.front().cores <= requiredCores) { + task = taskQueue.front(); + taskQueue.pop(); + saveStateAsMarkdown("tasksQueue.md"); + return true; + } + return false; + } + + void saveStateAsMarkdown(const std::string &filename) { +// std::lock_guard lock(queueMutex); + std::ofstream file(filename); + if (!file.is_open()) { + throw std::runtime_error("Unable to open file"); + } + + // Write the markdown table header + file << "| Task ID | Cores Required | Description |\n"; + file << "|---------|----------------|-------------|\n"; + + // Write each task as a row in the markdown table + std::queue tempQueue = taskQueue; // Copy the queue for iteration + while (!tempQueue.empty()) { + Task task = tempQueue.front(); + tempQueue.pop(); + + // Assuming Task has getId(), getCores(), and getDescription() methods + file << "| " << task.id << " | " << task.cores << " | " << task.func << " |\n"; + } + + file.close(); + } + +private: + std::queue taskQueue; + std::mutex queueMutex; +}; + +#endif //RABBIT_TASKQUEUE_H diff --git a/src/services/TaskService.cpp b/src/services/TaskService.cpp deleted file mode 100644 index dbe2487..0000000 --- a/src/services/TaskService.cpp +++ /dev/null @@ -1,5 +0,0 @@ -// -// Created by Serge on 26.03.2024. -// - -#include "TaskService.h" diff --git a/src/services/TaskService.h b/src/services/TaskService.h deleted file mode 100644 index 24c39b1..0000000 --- a/src/services/TaskService.h +++ /dev/null @@ -1,14 +0,0 @@ -// -// Created by Serge on 26.03.2024. -// - -#ifndef RABBIT_TASKSERVICE_H -#define RABBIT_TASKSERVICE_H - - -class TaskService { - -}; - - -#endif //RABBIT_TASKSERVICE_H diff --git a/src/services/TaskService/TaskService.cpp b/src/services/TaskService/TaskService.cpp new file mode 100644 index 0000000..6648cd1 --- /dev/null +++ b/src/services/TaskService/TaskService.cpp @@ -0,0 +1,76 @@ +// +// Created by Serge on 26.03.2024. +// + +#include "TaskService.h" +#include +#include + + +TaskService::TaskService() { + tasks = std::vector(); + // storage.sync_schema(); -- remove DB for now +} + +void TaskService::addTask(Task task) { + if (task.id.empty()) { + task.id = newTaskID(); + } + task.status = TaskStatus::Queued; + tasks.push_back(task); + saveTasksToFile("tasks.md"); + +} + +void TaskService::updateTask(const Task &task) { + for (auto &t: tasks) { + if (t.id == task.id) { + t = task; + saveTasksToFile("tasks.md"); + return; + } + } + throw std::runtime_error("Task not found"); +} + +void TaskService::changeTaskStatus(const std::string &id, TaskStatus status) { + for (auto &task: tasks) { + if (task.id == id) { + task.status = status; + saveTasksToFile("tasks.md"); + return; + } + } + throw std::runtime_error("Task not found"); +} + +std::string TaskService::newTaskID() { + return "task-" + std::to_string(tasks.size()); +} + +Task TaskService::findTaskByID(std::string id) { + for (auto &task: tasks) { + if (task.id == id) { + return task; + } + } + throw std::runtime_error("Task not found"); +} + +void TaskService::saveTasksToFile(const std::string &filename) { + std::ofstream file(filename); + if (!file.is_open()) { + throw std::runtime_error("Unable to open file"); + } + + // Write the header + file << "| Task ID | Status | Description | Worker |\n"; + file << "|---------|--------|-------------|--------|\n"; + + // Write each task as a row + for (const auto &task: tasks) { + file << "| " << task.id << " | " << static_cast(task.status) << " | " << task.func << " | " << task.worker_hash_id << " |\n"; + } + + file.close(); +} diff --git a/src/services/TaskService/TaskService.h b/src/services/TaskService/TaskService.h new file mode 100644 index 0000000..0aa0335 --- /dev/null +++ b/src/services/TaskService/TaskService.h @@ -0,0 +1,52 @@ +// +// Created by Serge on 26.03.2024. +// + +#ifndef RABBIT_TASKSERVICE_H +#define RABBIT_TASKSERVICE_H + +#include "DataModel/Task.h" +#include + +class TaskService { +public: + TaskService(); + + void addTask(Task task); + + void changeTaskStatus(const std::string &id, TaskStatus status); + + void updateTask(const Task &task); + + Task findTaskByID(std::string id); + + void saveTasksToFile(const std::string &filename); + +private: + std::vector tasks; + + std::string newTaskID(); + +}; + + +#endif //RABBIT_TASKSERVICE_H + + + +//#include +//using namespace sqlite_orm; -- remone DB for now +/* +auto storage = make_storage( + "db.sqlite", + make_table("tasks", + make_column("id", &Task::id, primary_key().autoincrement()), + make_column("queue", &Task::queue), + make_column("message", &Task::message), + make_column("result_message", &Task::result_message), + make_column("internal_status", &Task::internal_status), + make_column("worker_hash_id", &Task::worker_hash_id), + make_column("client_hash_id", &Task::client_hash_id) + ) +); +*/ diff --git a/src/services/UserDBService/UserDBService.cpp b/src/services/UserDBService/UserDBService.cpp new file mode 100644 index 0000000..8288b3b --- /dev/null +++ b/src/services/UserDBService/UserDBService.cpp @@ -0,0 +1,137 @@ +// +// Created by Potato on 12.04.24. +// + +#include "UserDBService.h" + +#include +#include + + +void UserDBService::addClient(const Client &client) { + clients.push_back(client); + saveStateToFile("users.md"); +}; + +void UserDBService::addWorker(const Worker &worker) { + workers.push_back(worker); + saveStateToFile("users.md"); +}; + +void UserDBService::removeClient(const Client &client) { + for (auto it = clients.begin(); it != clients.end(); it++) { + if (it->id == client.id) { + clients.erase(it); + saveStateToFile("users.md"); + return; + } + } +} + +void UserDBService::removeWorker(const Worker &worker) { + for (auto it = workers.begin(); it != workers.end(); it++) { + if (it->id == worker.id) { + workers.erase(it); + saveStateToFile("users.md"); + return; + } + } +} + +Worker UserDBService::findMostFreeWorker(int requiredCores) { + Worker mostFreeWorker; + int maxFreeCores = -1; // Start with -1 to ensure any valid worker with free cores will be considered. + + for (auto &worker: workers) { + int freeCores = worker.cores - worker.usedCores; + if (freeCores >= requiredCores && freeCores > maxFreeCores) { + mostFreeWorker = worker; + maxFreeCores = freeCores; + } + } + + return mostFreeWorker; // Will return default Worker if no suitable worker is found. +} + + +Client UserDBService::findClientByID(const std::string &id) { + for (auto &client: clients) { + if (client.id == id) { + saveStateToFile("users.md"); + return client; + } + } + throw std::runtime_error("Client not found"); +} + +Worker UserDBService::findWorkerByID(const std::string &id) { + for (auto &worker: workers) { + if (worker.id == id) { + saveStateToFile("users.md"); + return worker; + } + } + throw std::runtime_error("Worker not found"); +} + +void UserDBService::updateWorker(const Worker &worker) { + for (auto &w: workers) { + if (w.id == worker.id) { + w = worker; + saveStateToFile("users.md"); + + return; + } + } + throw std::runtime_error("Worker not found"); +} + +void UserDBService::modifyWorkerUsedCores(const std::string &id, int cores, bool increase) { + for (auto &worker: workers) { + if (worker.id == id) { + std::cout << "Update cores: Worker before has used cores: " << worker.usedCores << std::endl; + if (increase) { + worker.usedCores += cores; + } else { + worker.usedCores -= cores; + } + std::cout << "Update cores: Worker " << worker.id << " used cores: " << worker.usedCores << std::endl; + if (worker.usedCores < 0) { + std::cerr << "[!] Worker " << worker.id << " used cores is negative. Resetting to 0." << std::endl; + worker.usedCores = 0; + } + saveStateToFile("users.md"); + return; + } + } + throw std::runtime_error("Worker not found"); +} + +void UserDBService::printLog() { + // print clients, workers and worker used cores + std::cout << "Clients: " << std::endl; + for (auto &client: clients) { + std::cout << "Client: " << client.id << std::endl; + } + + std::cout << "Workers: " << std::endl; + for (auto &worker: workers) { + std::cout << "Worker: " << worker.id << ", Cores: " << worker.cores << ", Used cores: " << worker.usedCores + << std::endl; + } +} + +void UserDBService::saveStateToFile(const std::string &filename) { + std::ofstream file(filename); + if (!file.is_open()) { + throw std::runtime_error("Unable to open file"); + } + + file << "| ID | Cores | Used Cores |\n"; + file << "|----|-------|------------|\n"; + for (const auto &worker: workers) { + file << "| " << worker.id << " | " << worker.cores << " | " << worker.usedCores << " |\n"; + } + + file.close(); +} diff --git a/src/services/UserDBService/UserDBService.h b/src/services/UserDBService/UserDBService.h new file mode 100644 index 0000000..f73f4e5 --- /dev/null +++ b/src/services/UserDBService/UserDBService.h @@ -0,0 +1,45 @@ +// +// Created by Potato on 12.04.24. +// + +#ifndef RABBIT_USERDBSERVICE_H +#define RABBIT_USERDBSERVICE_H + +#include "DataModel/Client.h" +#include "DataModel/Worker.h" + +class UserDBService { +public: + UserDBService() { + clients = std::vector(); + workers = std::vector(); + }; + + void addClient(const Client &client); + + void addWorker(const Worker &worker); + + void removeClient(const Client &client); + + void removeWorker(const Worker &worker); + + void updateWorker(const Worker &worker); + + Client findClientByID(const std::string &id); + + Worker findWorkerByID(const std::string &id); + + Worker findMostFreeWorker(int cores); + + void modifyWorkerUsedCores(const std::string &id, int cores, bool increase); + + void printLog(); + +private: + std::vector clients; + std::vector workers; + + void saveStateToFile(const std::string &filename); +}; + +#endif //RABBIT_USERDBSERVICE_H diff --git a/tests/testPacketsLoss.cpp b/tests/testPacketsLoss.cpp index f57f45e..2fbc547 100644 --- a/tests/testPacketsLoss.cpp +++ b/tests/testPacketsLoss.cpp @@ -162,7 +162,7 @@ TEST(Protocol, MessageTransferingWithProxyDropDataPackets) { } cout << "Server thread finished" << endl; - }); // TODO: fix + }); // sleep 200ms std::this_thread::sleep_for(std::chrono::milliseconds(200)); diff --git a/web/requirements.txt b/web/requirements.txt new file mode 100644 index 0000000..045ac6c --- /dev/null +++ b/web/requirements.txt @@ -0,0 +1,5 @@ +Flask +watchdog +markdown +flask-socketio +# pip install Flask watchdog markdown flask-socketio \ No newline at end of file diff --git a/web/templates/index.html b/web/templates/index.html new file mode 100644 index 0000000..ac1676b --- /dev/null +++ b/web/templates/index.html @@ -0,0 +1,63 @@ + + + + + + Rabbit Stats + + + + +
+

Rabbit logger

+
+
{{ table1|safe }}
+
+
+
{{ table2|safe }}
+
+
+ + + + + diff --git a/web/web.py b/web/web.py new file mode 100644 index 0000000..6bc2550 --- /dev/null +++ b/web/web.py @@ -0,0 +1,68 @@ +# app.py +from flask import Flask, render_template +from flask_socketio import SocketIO +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +import markdown +import os +import threading +import sys + +app = Flask(__name__) +app.config['SECRET_KEY'] = 'secret!' +socketio = SocketIO(app) + +# Обратите внимание на использование os.path.join и raw строки для путей +md_files = [ + fr"{sys.argv[1]}\tasks.md", + fr"{sys.argv[1]}\users.md" +] +data1 = "" +data2 = "" + +class MDFileHandler(FileSystemEventHandler): + def on_modified(self, event): + try: + # Проверяем, что путь совпадает с любым из наших файлов + if event.src_path.replace("\\", "/") in [f.replace("\\", "/") for f in md_files]: + print(f"File modified: {event.src_path}") + update_data() + socketio.emit('update', {'data1': data1, 'data2': data2}) + except Exception as e: + print(f"Error in on_modified: {e}", file=sys.stderr) + +def update_data(): + global data1, data2 + combined_content1 = "" + combined_content2 = "" + for idx, file in enumerate(md_files): + try: + with open(file, 'r', encoding='utf-8') as f: + if idx == 0: + combined_content1 += f.read() + "\n\n" + elif idx == 1: + combined_content2 += f.read() + "\n\n" + except Exception as e: + print(f"Error reading {file}: {e}", file=sys.stderr) + data1 = markdown.markdown(combined_content1, extensions=['tables']) + data2 = markdown.markdown(combined_content2, extensions=['tables']) + print("Data updated") + +@app.route('/') +def index(): + return render_template('index.html', table1=data1, table2=data2) + +if __name__ == "__main__": + try: + update_data() # Initial load + event_handler = MDFileHandler() + observer = Observer() + for file in md_files: + observer.schedule(event_handler, path=os.path.dirname(file), recursive=False) + observer_thread = threading.Thread(target=observer.start) + observer_thread.daemon = True + observer_thread.start() + + socketio.run(app, debug=True, use_reloader=False) + except Exception as e: + print(f"Error in main: {e}", file=sys.stderr)