From bf008116cac4da190703df2823287551d35d4e6b Mon Sep 17 00:00:00 2001 From: Philipp Andronov Date: Mon, 19 Oct 2020 21:02:15 +0300 Subject: [PATCH 1/3] Dirty full duplex impl --- .../code/dirty-full-duplex/echo.c | 253 ++++++++++++++++++ .../code/dirty-full-duplex/echo.h | 25 ++ 2 files changed, 278 insertions(+) create mode 100644 materials/05-non-blocking-network/code/dirty-full-duplex/echo.c create mode 100644 materials/05-non-blocking-network/code/dirty-full-duplex/echo.h diff --git a/materials/05-non-blocking-network/code/dirty-full-duplex/echo.c b/materials/05-non-blocking-network/code/dirty-full-duplex/echo.c new file mode 100644 index 000000000..23a19d3d6 --- /dev/null +++ b/materials/05-non-blocking-network/code/dirty-full-duplex/echo.c @@ -0,0 +1,253 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "echo.h" + +int epollfd; + +void modifyEpollContext(int epollfd, int operation, int fd, uint32_t events, void *data) { + struct epoll_event server_listen_event; + server_listen_event.events = events; + server_listen_event.data.ptr = data; + + if (-1 == epoll_ctl(epollfd, operation, fd, &server_listen_event)) { + printf("Failed to add an event for socket%d Error:%s", fd, strerror(errno)); + exit(1); + } +} +void *handle(void *ptr) { + uint32_t new_event = 0; + + std::string data; + data.resize(4096); + struct EchoEvent *echoEvent = static_cast(ptr); + + if (EPOLLIN == echoEvent->event) { + int n = read(echoEvent->fd, &data[0], 4096); + if (0 == n) { + /* + * Client closed connection. + */ + printf("\nClient closed connection.\n"); + close(echoEvent->fd); + free(echoEvent); + } else if (-1 == n) { + close(echoEvent->fd); + free(echoEvent); + } else { + // echoEvent->length = n; + // printf("\nRead data:%s Length:%d, Adding write event.\n", data, n); + data.resize(n); + if (echoEvent->output.empty()) { + new_event |= EPOLLOUT; + } + + echoEvent->output.push_back(std::move(data)); + if (echoEvent->output.size() < 100) { + new_event &= ~EPOLLIN; + } + } + } + + if (EPOLLOUT == echoEvent->event) { + assert(!echoEvent->output.empty()); + + // QUEUE: + // [xxxxxxxxxxx] + // [xxxxxxxxxxxxxxxxxxxxxxxxx] + // [xxxxxx] + int ret; + auto it = echoEvent->output.begin(); + do { + std::string &qhead = *it; + ret = write(echoEvent->fd, &qhead[0] + echoEvent->head_offset, qhead.size() - echoEvent->head_offset); + + if (ret > 0) { + echoEvent->head_offset += ret; + if (echoEvent->head_offset >= it->size()) { + it++; + echoEvent->head_offset = 0; + } + } + } while (ret > 0 && it != echoEvent->output.end()); + + it--; + echoEvent->output.erase(echoEvent->output.begin(), it); + + if (-1 == ret) { + /* + * Some other error occured. + */ + close(echoEvent->fd); + free(echoEvent); + } + + if (echoEvent->output.size() < 100) { + new_event |= EPOLLIN; + } + + if (!echoEvent->output.empty()) { + new_event |= EPOLLOUT; + } + } + + if (echoEvent->event != new_event) { + echoEvent->event = new_event; + /* + * We have read the data. Add an write event so that we can + * write data whenever the socket is ready to be written. + */ + modifyEpollContext(epollfd, EPOLL_CTL_MOD, echoEvent->fd, echoEvent->event, echoEvent); + } +} + +void makeSocketNonBlocking(int fd) { + int flags; + + flags = fcntl(fd, F_GETFL, NULL); + if (-1 == flags) { + printf("fcntl F_GETFL failed.%s", strerror(errno)); + exit(1); + } + + flags |= O_NONBLOCK; + if (-1 == fcntl(fd, F_SETFL, flags)) { + printf("fcntl F_SETFL failed.%s", strerror(errno)); + exit(1); + } +} + +int main(int argc, char **argv) { + + int serverfd; + struct sockaddr_in server_addr; + struct sockaddr_in clientaddr; + socklen_t clientlen = sizeof(clientaddr); + + /* + * Create server socket. Specify the nonblocking socket option. + * + */ + serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (-1 == serverfd) { + printf("Failed to create socket.%s", strerror(errno)); + exit(1); + } + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(SERVERPORT); + server_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + /* + * Bind the server socket to the required ip-address and port. + * + */ + if (-1 == bind(serverfd, (struct sockaddr *)&server_addr, sizeof(server_addr))) { + printf("Failed to bind.%s", strerror(errno)); + exit(1); + } + + /* + * Mark the server socket has a socket that will be used to . + * accept incoming connections. + */ + if (-1 == listen(serverfd, MAXCONN)) { + printf("Failed to listen.%s", strerror(errno)); + exit(1); + } + + /* + * Create epoll context. + */ + + epollfd = epoll_create(MAXCONN); + if (-1 == epollfd) { + printf("Failed to create epoll context.%s", strerror(errno)); + exit(1); + } + + /* + * Create read event for server socket. + */ + modifyEpollContext(epollfd, EPOLL_CTL_ADD, serverfd, EPOLLIN, &serverfd); + + /* + * Main loop that listens for event. + */ + struct epoll_event *events = calloc(MAXEVENTS, sizeof(struct epoll_event)); + while (1) { + int n = epoll_wait(epollfd, events, MAXEVENTS, -1); + if (-1 == n) { + printf("Failed to wait.%s", strerror(errno)); + exit(1); + } + + int i; + for (i = 0; i < n; i++) { + if (events[i].data.ptr == &serverfd) { + if (events[i].events & EPOLLHUP || events[i].events & EPOLLERR) { + /* + * EPOLLHUP and EPOLLERR are always monitored. + */ + close(serverfd); + exit(1); + } + + /* + * New client connection is available. Call accept. + * Make connection socket non blocking. + * Add read event for the connection socket. + */ + int connfd = accept(serverfd, (struct sockaddr *)&clientaddr, &clientlen); + if (-1 == connfd) { + printf("Accept failed.%s", strerror(errno)); + exit(1); + } else { + printf("Accepted connection, adding a read event"); + makeSocketNonBlocking(connfd); + + struct EchoEvent *echoEvent = calloc(1, sizeof(struct EchoEvent)); + echoEvent->fd = connfd; + echoEvent->event = EPOLLIN; + + /* + * Add a read event. + */ + modifyEpollContext(epollfd, EPOLL_CTL_ADD, echoEvent->fd, echoEvent->event, echoEvent); + } + } else { + /* + *A event has happend for one of the connection sockets. + *Remove the connection socket from the epoll context. + * When the event is handled by handle() function , + *it will add the required event to listen for this + *connection socket again to epoll + *context + */ + if (events[i].events & EPOLLHUP || events[i].events & EPOLLERR) { + struct EchoEvent *echoEvent = (struct EchoEvent *)events[i].data.ptr; + printf("\nClosing connection socket\n"); + close(echoEvent->fd); + free(echoEvent); + } else { + struct EchoEvent *echoEvent = (struct EchoEvent *)events[i].data.ptr; + handle(echoEvent); + } + } + } + } + + free(events); + exit(0); +} diff --git a/materials/05-non-blocking-network/code/dirty-full-duplex/echo.h b/materials/05-non-blocking-network/code/dirty-full-duplex/echo.h new file mode 100644 index 000000000..d450df976 --- /dev/null +++ b/materials/05-non-blocking-network/code/dirty-full-duplex/echo.h @@ -0,0 +1,25 @@ +#ifndef _ECHO_H +#define _ECHO_H + +#include + +#include +#include + +#define SERVERPORT 8080 +#define MAXCONN 200 +#define MAXEVENTS 100 +#define MAXLEN 255 + +struct EchoEvent { + int fd; + uint32_t event; + char data[MAXLEN]; + int length; + int offset; + + std::size_t head_offset; + std::vector output; +}; + +#endif // _ECHO_H From 6c0e4ef22303064c550e2fdc40f032df635699f9 Mon Sep 17 00:00:00 2001 From: Alexey Navolotsky Date: Sat, 21 Nov 2020 20:33:31 +0300 Subject: [PATCH 2/3] hw7: coroutine --- src/coroutine/Engine.cpp | 62 +++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/src/coroutine/Engine.cpp b/src/coroutine/Engine.cpp index 00bdbe50a..25953207b 100644 --- a/src/coroutine/Engine.cpp +++ b/src/coroutine/Engine.cpp @@ -1,19 +1,67 @@ #include -#include -#include -#include +#include +#include +#include namespace Afina { namespace Coroutine { -void Engine::Store(context &ctx) {} +void Engine::Store(context &ctx) { + char currentStack; -void Engine::Restore(context &ctx) {} + ctx.Hight = ctx.Low = StackBottom; + if (¤tStack > StackBottom) { + ctx.Hight = ¤tStack; + } else { + ctx.Low = ¤tStack; + } -void Engine::yield() {} + auto stackSize = ctx.Hight - ctx.Low; + if (stackSize > std::get<1>(ctx.Stack) || stackSize * 2 < std::get<1>(ctx.Stack)) { + delete[] std::get<0>(ctx.Stack); + std::get<0>(ctx.Stack) = new char[stackSize]; + std::get<1>(ctx.Stack) = stackSize; + } -void Engine::sched(void *routine_) {} + std::memcpy(std::get<0>(ctx.Stack), ctx.Low, stackSize); +} + +void Engine::Restore(context &ctx) { + char currentStack; + while (ctx.Low <= ¤tStack && ¤tStack <= ctx.Hight) { + Restore(ctx); + } + + std::memcpy(ctx.Low, std::get<0>(ctx.Stack), ctx.Hight - ctx.Low); + std::longjmp(ctx.Environment, 1); +} + +void Engine::yield() { + auto it = alive; + while (it && it == cur_routine) { + it = it->next; + } + + if (it) { + sched(it); + } +} + +void Engine::sched(void *routine_) { + if (!routine_ || routine_ == cur_routine) { + return; + } + + if (cur_routine) { + Store(*cur_routine); + if (setjmp(cur_routine->Environment)) { + return; + } + } + cur_routine = (context *)routine_; + Restore(*(context *)routine_); +} } // namespace Coroutine } // namespace Afina From ee816244a378405d71d3af691ba4cf19d8eab71e Mon Sep 17 00:00:00 2001 From: Alexey Navolotsky Date: Mon, 30 Nov 2020 21:52:41 +0300 Subject: [PATCH 3/3] hw8: epoll coroutine --- include/afina/coroutine/Engine.h | 27 +++-- src/coroutine/Engine.cpp | 55 +++++++++- src/network/st_coroutine/Connection.cpp | 134 +++++++++++++++++++++++- src/network/st_coroutine/Connection.h | 34 +++++- src/network/st_coroutine/ServerImpl.cpp | 34 ++++-- src/network/st_coroutine/ServerImpl.h | 8 ++ 6 files changed, 264 insertions(+), 28 deletions(-) diff --git a/include/afina/coroutine/Engine.h b/include/afina/coroutine/Engine.h index 28ca67f32..78dff0007 100644 --- a/include/afina/coroutine/Engine.h +++ b/include/afina/coroutine/Engine.h @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -18,9 +19,8 @@ namespace Coroutine { */ class Engine final { public: - using unblocker_func = std::function; + using unblocker_func = std::function; -private: /** * A single coroutine instance which could be scheduled for execution * should be allocated on heap @@ -42,18 +42,21 @@ class Engine final { // To include routine in the different lists, such as "alive", "blocked", e.t.c struct context *prev = nullptr; struct context *next = nullptr; - } context; - /** - * Where coroutines stack begins - */ - char *StackBottom; + bool blocked{false}; + } context; /**const int& * Current coroutine */ context *cur_routine; +private: + /** + * Where coroutines stack begins + */ + char *StackBottom; + /** * List of routines ready to be scheduled. Note that suspended routine ends up here as well */ @@ -88,8 +91,8 @@ class Engine final { static void null_unblocker(Engine &) {} public: - Engine(unblocker_func unblocker = null_unblocker) - : StackBottom(0), cur_routine(nullptr), alive(nullptr), _unblocker(unblocker) {} + Engine(unblocker_func unblocker) + : StackBottom(0), cur_routine(nullptr), alive(nullptr), _unblocker(std::move(unblocker)) {} Engine(Engine &&) = delete; Engine(const Engine &) = delete; @@ -123,7 +126,7 @@ class Engine final { /** * Put coroutine back to list of alive, so that it could be scheduled later */ - void unblock(void *coro); + void unblock(void *routine_); /** * Entry point into the engine. Prepare all internal mechanics and starts given function which is @@ -146,7 +149,7 @@ class Engine final { idle_ctx = new context(); if (setjmp(idle_ctx->Environment) > 0) { if (alive == nullptr) { - _unblocker(*this); + _unblocker(); } // Here: correct finish of the coroutine section @@ -226,6 +229,8 @@ class Engine final { return pc; } + + bool is_blocked() { return blocked && !alive; } }; } // namespace Coroutine diff --git a/src/coroutine/Engine.cpp b/src/coroutine/Engine.cpp index 25953207b..46c37a58f 100644 --- a/src/coroutine/Engine.cpp +++ b/src/coroutine/Engine.cpp @@ -1,7 +1,6 @@ #include #include -#include #include namespace Afina { @@ -63,5 +62,59 @@ void Engine::sched(void *routine_) { Restore(*(context *)routine_); } +void Engine::block(void *routine_) { + auto routine = cur_routine; + if (routine_) { + routine = (context *)routine_; + } + if (routine && !routine->blocked) { + routine->blocked = true; + // Remove from alive + if (routine->prev) { + routine->prev->next = routine->next; + } + if (routine->next) { + routine->next->prev = routine->prev; + } + // Move to blocked + if (blocked) { + blocked->prev = routine; + } + routine->next = blocked; + routine->prev = nullptr; + blocked = routine; + if (routine == cur_routine) { + if (cur_routine && cur_routine != idle_ctx) { + if (setjmp(cur_routine->Environment) > 0) { + return; + } + Store(*cur_routine); + } + cur_routine = nullptr; + Restore(*idle_ctx); + } + } +} + +void Engine::unblock(void *routine_) { + auto routine = (context *)routine_; + if (routine && routine->blocked) { + // Remove from blocked + if (routine->prev) { + routine->prev->next = routine->next; + } + if (routine->next) { + routine->next->prev = routine->prev; + } + // Move to alive + if (alive) { + alive->prev = routine; + } + routine->next = alive; + routine->prev = nullptr; + alive = routine; + } +} + } // namespace Coroutine } // namespace Afina diff --git a/src/network/st_coroutine/Connection.cpp b/src/network/st_coroutine/Connection.cpp index 6208e0b75..6bba8b945 100644 --- a/src/network/st_coroutine/Connection.cpp +++ b/src/network/st_coroutine/Connection.cpp @@ -1,25 +1,149 @@ #include "Connection.h" +#include #include +#include +#include namespace Afina { namespace Network { namespace STcoroutine { // See Connection.h -void Connection::Start() { std::cout << "Start" << std::endl; } +void Connection::Start() { + _event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + _event.data.ptr = this; + _logger->debug("Started connection on socket {}", _socket); +} // See Connection.h -void Connection::OnError() { std::cout << "OnError" << std::endl; } +void Connection::OnError() { + _running = false; + _logger->error("Error on socket {}", _socket); +} // See Connection.h -void Connection::OnClose() { std::cout << "OnClose" << std::endl; } +void Connection::OnClose() { + _running = false; + _logger->debug("Closed connection on socket {}", _socket); +} // See Connection.h -void Connection::DoRead() { std::cout << "DoRead" << std::endl; } +void Connection::DoRead() { + try { + int readed_bytes = -1; + while ((readed_bytes = read(_socket, _client_buffer, sizeof(_client_buffer))) > 0) { + _logger->debug("Got {} bytes from socket", readed_bytes); + + // Single block of data readed from the socket could trigger inside actions a multiple times, + // for example: + // - read#0: [] + // - read#1: [ ... ] + while (readed_bytes > 0) { + _logger->debug("Process {} bytes", readed_bytes); + // There is no command yet + if (!_command_to_execute) { + std::size_t parsed = 0; + if (_parser.Parse(_client_buffer, readed_bytes, parsed)) { + // There is no command to be launched, continue to parse input stream + // Here we are, current chunk finished some command, process it + _logger->debug("Found new command: {} in {} bytes", _parser.Name(), parsed); + _command_to_execute = _parser.Build(_arg_remains); + if (_arg_remains > 0) { + _arg_remains += 2; + } + } + + // Parsed might fails to consume any bytes from input stream. In real life that could happens, + // for example, because we are working with UTF-16 chars and only 1 byte left in stream + if (parsed == 0) { + break; + } else { + std::memmove(_client_buffer, _client_buffer + parsed, readed_bytes - parsed); + readed_bytes -= parsed; + } + } + + // There is command, but we still wait for argument to arrive... + if (_command_to_execute && _arg_remains > 0) { + _logger->debug("Fill argument: {} bytes of {}", readed_bytes, _arg_remains); + // There is some parsed command, and now we are reading argument + std::size_t to_read = std::min(_arg_remains, std::size_t(readed_bytes)); + _argument_for_command.append(_client_buffer, to_read); + + std::memmove(_client_buffer, _client_buffer + to_read, readed_bytes - to_read); + _arg_remains -= to_read; + readed_bytes -= to_read; + } + + // Thre is command & argument - RUN! + if (_command_to_execute && _arg_remains == 0) { + _logger->debug("Start command execution"); + + std::string result; + if (!_argument_for_command.empty()) { + _argument_for_command.resize(_argument_for_command.size() - 2); + } + _command_to_execute->Execute(*_pStorage, _argument_for_command, result); + + // Send response + result += "\r\n"; + auto was_empty = _output.empty(); + _output.push_back(result); + if (was_empty) { + _event.events |= EPOLLOUT; + } + + // Prepare for the next command + _command_to_execute.reset(); + _argument_for_command.resize(0); + _parser.Reset(); + } + } // while (readed_bytes) + } + + // socket is gonna block! + if (errno == EWOULDBLOCK) { + epoll_ctl(_epoll_descr, EPOLL_CTL_ADD, _socket, &_event); + _engine->block(ctx_read); + } + + if (readed_bytes == 0) { + _logger->debug("Connection closed"); + } else { + throw std::runtime_error(std::string(strerror(errno))); + } + } catch (std::runtime_error &ex) { + _logger->error("Failed to read connection on descriptor {}: {}", _socket, ex.what()); + } +} // See Connection.h -void Connection::DoWrite() { std::cout << "DoWrite" << std::endl; } +void Connection::DoWrite() { + size_t sent_total = 0; + _logger->debug("Writing on socket {}", _socket); + while (!_output.empty()) { + auto result = _output.front(); + auto sent = send(_socket, result.data() + sent_total, result.size() - sent_total, 0); + if (sent > 0) { + sent_total += sent; + if (result.size() == sent_total) { + _output.pop_front(); + sent_total = 0; + } + } else { + // socket is gonna block! + if (errno == EWOULDBLOCK) { + epoll_ctl(_epoll_descr, EPOLL_CTL_ADD, _socket, &_event); + _engine->block(ctx_write); + } + break; + } + } + if (_output.empty()) { + _event.events &= !EPOLLOUT; + } +} } // namespace STcoroutine } // namespace Network diff --git a/src/network/st_coroutine/Connection.h b/src/network/st_coroutine/Connection.h index c968848dd..0acb5757c 100644 --- a/src/network/st_coroutine/Connection.h +++ b/src/network/st_coroutine/Connection.h @@ -2,24 +2,36 @@ #define AFINA_NETWORK_ST_COROUTINE_CONNECTION_H #include - +#include +#include #include +#include "protocol/Parser.h" +#include +#include +#include +#include + namespace Afina { namespace Network { namespace STcoroutine { class Connection { public: - Connection(int s) : _socket(s) { + Connection(int s, int epoll_descr, Afina::Coroutine::Engine *engine, std::shared_ptr &logger_, + std::shared_ptr &pStorage_) + : _epoll_descr(epoll_descr), _engine(engine), _socket(s), _logger{logger_}, _pStorage{pStorage_} { std::memset(&_event, 0, sizeof(struct epoll_event)); _event.data.ptr = this; } - inline bool isAlive() const { return true; } + inline bool isAlive() const { return _running; } void Start(); + Afina::Coroutine::Engine::context *ctx_read{nullptr}; + Afina::Coroutine::Engine::context *ctx_write{nullptr}; + protected: void OnError(); void OnClose(); @@ -31,10 +43,24 @@ class Connection { int _socket; struct epoll_event _event; + bool _running{true}; + int _epoll_descr; + Afina::Coroutine::Engine *_engine; + + std::shared_ptr _logger; + std::shared_ptr _pStorage; + + std::size_t _arg_remains; + Protocol::Parser _parser; + std::string _argument_for_command; + std::unique_ptr _command_to_execute; + char _client_buffer[4096]; + std::deque _output; + bool _eof{false}; }; } // namespace STcoroutine } // namespace Network } // namespace Afina -#endif // AFINA_NETWORK_ST_COROUTINE_CONNECTION_H +#endif // AFINA_NETWORK_ST_COROUTINE_CONNECTION_H \ No newline at end of file diff --git a/src/network/st_coroutine/ServerImpl.cpp b/src/network/st_coroutine/ServerImpl.cpp index 352dd453e..a58a4e176 100644 --- a/src/network/st_coroutine/ServerImpl.cpp +++ b/src/network/st_coroutine/ServerImpl.cpp @@ -7,9 +7,9 @@ #include #include +#include #include #include -#include #include #include #include @@ -29,7 +29,8 @@ namespace Network { namespace STcoroutine { // See Server.h -ServerImpl::ServerImpl(std::shared_ptr ps, std::shared_ptr pl) : Server(ps, pl) {} +ServerImpl::ServerImpl(std::shared_ptr ps, std::shared_ptr pl) + : Server(ps, pl), _engine([this] { this->unblocker(); }) {} // See Server.h ServerImpl::~ServerImpl() {} @@ -37,7 +38,7 @@ ServerImpl::~ServerImpl() {} // See Server.h void ServerImpl::Start(uint16_t port, uint32_t n_acceptors, uint32_t n_workers) { _logger = pLogging->select("network"); - _logger->info("Start st_nonblocking network service"); + _logger->info("Start st_coroutine network service"); sigset_t sig_mask; sigemptyset(&sig_mask); @@ -80,7 +81,8 @@ void ServerImpl::Start(uint16_t port, uint32_t n_acceptors, uint32_t n_workers) throw std::runtime_error("Failed to create epoll file descriptor: " + std::string(strerror(errno))); } - _work_thread = std::thread(&ServerImpl::OnRun, this); + _work_thread = std::thread( + [this] { this->_engine.start(static_cast([](ServerImpl *s) { s->OnRun(); }), this); }); } // See Server.h @@ -149,10 +151,10 @@ void ServerImpl::OnRun() { } else { // Depends on what connection wants... if (current_event.events & EPOLLIN) { - pc->DoRead(); + _engine.sched(pc->ctx_read); } if (current_event.events & EPOLLOUT) { - pc->DoWrite(); + _engine.sched(pc->ctx_write); } } @@ -207,13 +209,18 @@ void ServerImpl::OnNewConnection(int epoll_descr) { } // Register the new FD to be monitored by epoll. - Connection *pc = new (std::nothrow) Connection(infd); + Connection *pc = new (std::nothrow) Connection(infd, epoll_descr, &_engine, _logger, pStorage); if (pc == nullptr) { throw std::runtime_error("Failed to allocate connection"); } // Register connection in worker's epoll pc->Start(); + pc->ctx_read = static_cast( + _engine.run(static_cast([](Connection *pc) { pc->DoRead(); }), (Connection *)pc)); + pc->ctx_write = static_cast( + _engine.run(static_cast([](Connection *pc) { pc->DoWrite(); }), (Connection *)pc)); + if (pc->isAlive()) { if (epoll_ctl(epoll_descr, EPOLL_CTL_ADD, pc->_socket, &pc->_event)) { pc->OnError(); @@ -223,6 +230,19 @@ void ServerImpl::OnNewConnection(int epoll_descr) { } } +void ServerImpl::unblocker() { + std::array mod_list{}; + while (_engine.is_blocked()) { + int nmod = epoll_wait(_epoll_descr, &mod_list[0], mod_list.size(), -1); + for (auto i = 0; i < nmod; ++i) { + struct epoll_event ¤t_event = mod_list[i]; + auto *pc = (Connection *)(current_event.data.ptr); + _engine.unblock(pc->ctx_read); + _engine.unblock(pc->ctx_write); + } + } +} + } // namespace STcoroutine } // namespace Network } // namespace Afina diff --git a/src/network/st_coroutine/ServerImpl.h b/src/network/st_coroutine/ServerImpl.h index 13410e92c..575c70df9 100644 --- a/src/network/st_coroutine/ServerImpl.h +++ b/src/network/st_coroutine/ServerImpl.h @@ -5,6 +5,7 @@ #include #include +#include namespace spdlog { class logger; @@ -53,9 +54,16 @@ class ServerImpl : public Server { // Curstom event "device" used to wakeup workers int _event_fd; + int _epoll_descr; // IO thread std::thread _work_thread; + + // Coroutine engine + Afina::Coroutine::Engine _engine; + + // Function to run when there is no more alive coroutines + void unblocker(); }; } // namespace STcoroutine