From 3fa1547e1125ec388c6e04718eba33c45bd4d421 Mon Sep 17 00:00:00 2001 From: hamidr Date: Sun, 8 Jan 2017 16:48:59 +0330 Subject: [PATCH 1/3] Add asio support #19 --- CMakeLists.txt | 20 ++-- includes/connection.hpp | 19 ++-- includes/monitor.hpp | 21 +++-- includes/network/async_socket.hpp | 75 --------------- includes/network/tcp_socket.hpp | 18 ---- includes/network/unix_socket.hpp | 17 ---- includes/redis_client.hpp | 7 +- includes/sentinel.hpp | 14 ++- src/connection.cpp | 78 ++++++++-------- src/monitor.cpp | 86 ++++++++--------- src/network/async_socket.cpp | 147 ------------------------------ src/network/tcp_socket.cpp | 43 --------- src/network/unix_socket.cpp | 45 --------- src/redis_client.cpp | 13 +-- src/sentinel.cpp | 24 ++--- test/main.cpp | 46 ++++------ 16 files changed, 159 insertions(+), 514 deletions(-) delete mode 100644 includes/network/async_socket.hpp delete mode 100644 includes/network/tcp_socket.hpp delete mode 100644 includes/network/unix_socket.hpp delete mode 100644 src/network/async_socket.cpp delete mode 100644 src/network/tcp_socket.cpp delete mode 100644 src/network/unix_socket.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b5bd49d..b151453 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,21 +23,12 @@ set(PROJECT_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/includes) include_directories(${PROJECT_INCLUDE_DIR}) include_directories("/usr/include/") -include_directories("/usr/local/include") +include_directories(${CMAKE_SOURCE_DIR}/libs/asio/asio/include/) link_directories("/usr/lib") link_directories("/usr/local/lib") -add_library(event_loop - ${PROJECT_SOURCE_DIR}/event_loop/socket_watcher.cpp - ${PROJECT_SOURCE_DIR}/event_loop/event_loop_ev.cpp) - -add_library(network - ${PROJECT_SOURCE_DIR}/network/async_socket.cpp - ${PROJECT_SOURCE_DIR}/network/unix_socket.cpp - ${PROJECT_SOURCE_DIR}/network/tcp_socket.cpp) - add_library(parser ${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp ${PROJECT_SOURCE_DIR}/parser/array_parser.cpp @@ -59,18 +50,19 @@ if(CMAKE_COMPILER_IS_GNUCXX) set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary endif() -target_link_libraries(event_loop ev) -target_link_libraries(async_redis event_loop parser network) +target_link_libraries(async_redis parser pthread) -install(TARGETS event_loop +install(TARGETS async_redis LIBRARY DESTINATION /usr/local/lib/ - ARCHIVE DESTINATION /usr/local/lib/) + ARCHIVE DESTINATION /usr/local/lib/ + ) install(TARGETS parser LIBRARY DESTINATION /usr/local/lib/ ARCHIVE DESTINATION /usr/local/lib/ ) + install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include) add_executable (a1.out ${CMAKE_SOURCE_DIR}/test/main.cpp) diff --git a/includes/connection.hpp b/includes/connection.hpp index f284d2c..c9d9ad2 100644 --- a/includes/connection.hpp +++ b/includes/connection.hpp @@ -2,26 +2,28 @@ #include #include -#include #include +#include +#include + #include -#include namespace async_redis { class connection { - using async_socket = network::async_socket; + connection(const connection&) = delete; + connection& operator = (const connection&) = delete; public: + using connect_handler_t = std::function; using parser_t = parser::base_resp_parser::parser; using reply_cb_t = std::function; - connection(event_loop::event_loop_ev& event_loop); + connection(asio::io_context&); - void connect(async_socket::connect_handler_t handler, const std::string& ip, int port); - void connect(async_socket::connect_handler_t handler, const std::string& path); + void connect(connect_handler_t handler, const std::string& ip, int port); bool is_connected() const; void disconnect(); @@ -30,13 +32,12 @@ namespace async_redis private: void do_read(); - void reply_received(ssize_t len); + void reply_received(const asio::error_code& ec, size_t len); private: - std::unique_ptr socket_; + asio::ip::tcp::socket socket_; std::queue> req_queue_; - event_loop::event_loop_ev& event_loop_; enum { max_data_size = 1024 }; char data_[max_data_size]; }; diff --git a/includes/monitor.hpp b/includes/monitor.hpp index 23e9844..c738260 100644 --- a/includes/monitor.hpp +++ b/includes/monitor.hpp @@ -1,19 +1,24 @@ #pragma once -#include #include #include #include #include +#include +#include + using std::string; namespace async_redis { class monitor { - using async_socket = network::async_socket; + monitor(const monitor&) = delete; + monitor& operator = (const monitor&) = delete; + + using connect_handler_t = std::function; public: enum EventState { @@ -26,10 +31,8 @@ namespace async_redis using parser_t = parser::base_resp_parser::parser; using watcher_cb_t = std::function; - monitor(event_loop::event_loop_ev &event_loop); - - void connect(async_socket::connect_handler_t handler, const std::string& ip, int port); - void connect(async_socket::connect_handler_t handler, const std::string& path); + monitor(asio::io_context &event_loop); + void connect(connect_handler_t handler, const std::string& ip, int port); bool is_connected() const; bool is_watching() const; @@ -41,6 +44,7 @@ namespace async_redis bool punsubscribe(const std::list& channels, watcher_cb_t&& cb); private: + void do_read(); bool send_and_receive(string&& data); void handle_message_event(parser_t& channel, parser_t& value); void handle_subscribe_event(parser_t& channel, parser_t& clients); @@ -51,15 +55,14 @@ namespace async_redis void handle_event(parser_t&& request); void report_disconnect(); - void stream_received(ssize_t len); + void stream_received(const asio::error_code& ec, size_t len); private: parser_t parser_; std::unordered_map watchers_; std::unordered_map pwatchers_; - std::unique_ptr socket_; - event_loop::event_loop_ev &io_; + asio::ip::tcp::socket socket_; enum { max_data_size = 1024 }; char data_[max_data_size]; bool is_watching_ = false; diff --git a/includes/network/async_socket.hpp b/includes/network/async_socket.hpp deleted file mode 100644 index 2b47a36..0000000 --- a/includes/network/async_socket.hpp +++ /dev/null @@ -1,75 +0,0 @@ -#pragma once - -#include - -#include -#include -#include - -namespace async_redis { - namespace network - { - using std::string; - - class socket_excetion : std::exception {}; - //TODO: NAMING? More of a permission socket issue than a connect one! - class connect_socket_exception : socket_excetion {}; - class nonblocking_socket_exception : socket_excetion {}; - - class async_socket - { - public: - using socket_t = struct sockaddr; - - using socket_identifier_t = event_loop::event_loop_ev::socket_identifier_t; - using recv_cb_t = std::function; - using ready_cb_t = std::function; - using connect_handler_t = std::function; - - async_socket(event_loop::event_loop_ev& io); - - ~async_socket(); - - bool is_valid(); - ssize_t send(const string& data); - ssize_t send(const char *data, size_t len); - ssize_t receive(char *data, size_t len); - bool listen(int backlog = 0); - int accept(); - bool close(); - bool async_write(const string& data, ready_cb_t cb); - bool async_read(char *buffer, int max_len, recv_cb_t cb); - void async_accept(const std::function)>& cb); - - bool is_connected() const; - - protected: - void set_fd_socket(int fd); - - template - void async_connect(int timeout, connect_handler_t handler, Args... args) - { - if (timeout == 10) // is equal to 1 second - return handler(false); - - io_.async_timeout(0.1, [this, timeout, args..., handler]() { - - if (-1 == static_cast(*this).connect(args...)) - return this->async_connect(timeout+1, handler, args...); - - handler(is_connected()); - }); - } - - void create_socket(int domain); - int connect_to(socket_t* socket_addr, int len); - int bind_to(socket_t* socket_addr, int len); - - private: - bool is_connected_ = false; - event_loop::event_loop_ev& io_; - socket_identifier_t id_; - int fd_ = -1; - }; - } -} diff --git a/includes/network/tcp_socket.hpp b/includes/network/tcp_socket.hpp deleted file mode 100644 index 0315783..0000000 --- a/includes/network/tcp_socket.hpp +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include "async_socket.hpp" - -namespace async_redis { - namespace network - { - class tcp_socket : public async_socket - { - public: - tcp_socket(event_loop::event_loop_ev& io); - - void async_connect(const string& ip, int port, connect_handler_t handler); - bool bind(const string& host, int port); - int connect(const string& host, int port); - }; - } -} diff --git a/includes/network/unix_socket.hpp b/includes/network/unix_socket.hpp deleted file mode 100644 index 82466cb..0000000 --- a/includes/network/unix_socket.hpp +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include "async_socket.hpp" - -namespace async_redis { - namespace network - { - class unix_socket : public async_socket - { - public: - unix_socket(event_loop::event_loop_ev &io); - void async_connect(const string& path, connect_handler_t handler); - int connect(const string& path); - bool bind(const string& path); - }; - } -} diff --git a/includes/redis_client.hpp b/includes/redis_client.hpp index fd68805..a035cfd 100644 --- a/includes/redis_client.hpp +++ b/includes/redis_client.hpp @@ -12,14 +12,17 @@ namespace async_redis class redis_client { + redis_client(const redis_client&) = delete; + redis_client& operator = (const redis_client&) = delete; + using reply_cb_t = connection::reply_cb_t; - using connect_cb_t = network::async_socket::connect_handler_t; + using connect_cb_t = connection::connect_handler_t; public: class connect_exception : std::exception {}; using parser_t = connection::parser_t; - redis_client(event_loop::event_loop_ev &eventIO, int n = 1); + redis_client(asio::io_context &io, uint n = 1); bool is_connected() const; template diff --git a/includes/sentinel.hpp b/includes/sentinel.hpp index 487efb7..76c704d 100644 --- a/includes/sentinel.hpp +++ b/includes/sentinel.hpp @@ -1,17 +1,15 @@ #pragma once -#include #include -#include #include + +#include #include -// #include namespace async_redis { class sentinel { - using socket_t = ::async_redis::network::async_socket; - using connect_cb_t = socket_t::connect_handler_t; + using connect_cb_t = connection::connect_handler_t; public: using parser_t = parser::base_resp_parser::parser; @@ -21,7 +19,7 @@ namespace async_redis { Watching }; - sentinel(event_loop::event_loop_ev &event_loop); + sentinel(asio::io_context &io); bool is_connected() const; bool connect(const string& ip, int port, connect_cb_t&& connector); @@ -54,7 +52,7 @@ namespace async_redis { private: int connected_ = 0; - std::unique_ptr stream_; - std::unique_ptr conn_; + monitor stream_; + connection conn_; }; } diff --git a/src/connection.cpp b/src/connection.cpp index ec5c7d1..4c6adb4 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -6,44 +6,33 @@ #include #include -#include -#include - +#include namespace async_redis { -using tcp_socket = network::tcp_socket; -using unix_socket = network::unix_socket; - -connection::connection(event_loop::event_loop_ev& event_loop) - : event_loop_(event_loop) { -} +connection::connection(asio::io_context& io) + : socket_(io) +{ } -void connection::connect(async_socket::connect_handler_t handler, const std::string& ip, int port) +void connection::connect(connect_handler_t handler, const std::string& ip, int port) { - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(event_loop_); + asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip), port); - static_cast(socket_.get())->async_connect(ip, port, handler); -} - -void connection::connect(async_socket::connect_handler_t handler, const std::string& path) -{ - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(event_loop_); - - static_cast(socket_.get())->async_connect(path, handler); + socket_.async_connect(endpoint, [handler](const asio::error_code &ec) { + // std::cout << ec << std::endl; + handler(!ec); + }); } bool connection::is_connected() const { - return socket_ && socket_->is_connected(); + return socket_.is_open(); } void connection::disconnect() { - socket_->close(); + socket_.close(); //TODO: check the policy! Should we free queue or retry again? decltype(req_queue_) free_me; free_me.swap(req_queue_); @@ -54,9 +43,10 @@ bool connection::pipelined_send(std::string&& pipelined_cmds, std::vectorasync_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) { - if (sent_chunk_len == 0) + socket_.async_send(asio::buffer(pipelined_cmds.data(), pipelined_cmds.length()), + [this, cbs = std::move(callbacks)](const asio::error_code &ec, size_t len) + { + if (ec) return disconnect(); if (!req_queue_.size() && cbs.size()) @@ -64,7 +54,10 @@ bool connection::pipelined_send(std::string&& pipelined_cmds, std::vectorasync_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) { - if (sent_chunk_len == 0) + socket_.async_send(asio::buffer(command.data(), command.length()), + [this, read_it](const asio::error_code &ec, size_t len) + { + // std::cout << ec << std::endl; + if (ec) return disconnect(); - if (read_it) - do_read(); - }); + if (read_it) + do_read(); + } + ); + return true; } void connection::do_read() { - socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1)); + socket_.async_read_some( + asio::buffer(data_, max_data_size), + std::bind( + &connection::reply_received, + this, + std::placeholders::_1, + std::placeholders::_2 + ) + ); } -void connection::reply_received(ssize_t len) +void connection::reply_received(const asio::error_code& ec, size_t len) { - if (len == 0) + // std::cout << ec << std::endl; + if (ec) return disconnect(); ssize_t acc = 0; diff --git a/src/monitor.cpp b/src/monitor.cpp index a49f39f..fb175dc 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -2,43 +2,35 @@ #include #include -#include "../includes/network/tcp_socket.hpp" -#include "../includes/network/unix_socket.hpp" namespace async_redis { -using tcp_socket = network::tcp_socket; -using unix_socket = network::unix_socket; -monitor::monitor(event_loop::event_loop_ev &event_loop) - : io_(event_loop) +monitor::monitor(asio::io_context &io) + : socket_(io) {} -void monitor::connect(async_socket::connect_handler_t handler, const std::string& ip, int port) +void monitor::connect(connect_handler_t handler, const std::string& ip, int port) { - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(io_); + if (socket_.is_open()) + socket_.close(); - static_cast(socket_.get())->async_connect(ip, port, handler); -} - -void monitor::connect(async_socket::connect_handler_t handler, const std::string& path) -{ - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(io_); + asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip), port); - static_cast(socket_.get())->async_connect(path, handler); + socket_.async_connect(endpoint, [handler](const asio::error_code &ec) { + handler(!ec); + }); } bool monitor::is_connected() const -{ return socket_->is_connected(); } +{ return socket_.is_open(); } bool monitor::is_watching() const -{ return this->is_connected() && is_watching_; } +{ return is_connected() && is_watching_; } void monitor::disconnect() { - socket_->close(); + socket_.close(); pwatchers_.clear(); watchers_.clear(); @@ -103,25 +95,32 @@ bool monitor::send_and_receive(string&& data) if (!is_connected()) return false; - socket_->async_write(data, [this](ssize_t sent_chunk_len) { - if (is_watching_) - return; - - this->socket_->async_read( - this->data_, - this->max_data_size, - std::bind( - &monitor::stream_received, - this, - std::placeholders::_1 - ) - ); - - this->is_watching_ = true; - }); + socket_.async_send(asio::buffer(data.data(), data.length()), + [this](const asio::error_code& ec, size_t sent_chunk_len) + { + if (is_watching_) + return; + + do_read(); + + this->is_watching_ = true; + }); return true; } +void monitor::do_read() +{ + this->socket_.async_read_some( + asio::buffer(this->data_, this->max_data_size), + std::bind( + &monitor::stream_received, + this, + std::placeholders::_1, + std::placeholders::_2 + ) + ); +} + void monitor::handle_message_event(parser_t& channel, parser_t& value) { const string& ch_key = channel->to_string(); @@ -216,9 +215,9 @@ void monitor::report_disconnect() disconnect(); } -void monitor::stream_received(ssize_t len) + void monitor::stream_received(const asio::error_code& ec, size_t len) { - if (len == 0) + if (ec) return report_disconnect(); ssize_t acc = 0; @@ -238,21 +237,12 @@ void monitor::stream_received(ssize_t len) } } - // if (!(watchers_.size() || pwatchers_.size())) if (!watchers_.size() && !pwatchers_.size()) { is_watching_ = false; return; } - this->socket_->async_read( - this->data_, - this->max_data_size, - std::bind( - &monitor::stream_received, - this, - std::placeholders::_1 - ) - ); + do_read(); } } diff --git a/src/network/async_socket.cpp b/src/network/async_socket.cpp deleted file mode 100644 index a1792f7..0000000 --- a/src/network/async_socket.cpp +++ /dev/null @@ -1,147 +0,0 @@ -#include "../../includes/network/async_socket.hpp" - -#include // fcntl -#include // close - -namespace async_redis { -namespace network { - -async_socket::async_socket(event_loop::event_loop_ev& io) - : io_(io) -{ } - -async_socket::~async_socket() { - close(); -} - -bool async_socket::is_valid() { - return fd_ != -1; -} - -ssize_t async_socket::send(const string& data) { - return send(data.data(), data.size()); -} - -ssize_t async_socket::send(const char *data, size_t len) { - return ::send(fd_, data, len, 0); -} - -ssize_t async_socket::receive(char *data, size_t len) { - return ::recv(fd_, data, len, 0); -} - -bool async_socket::listen(int backlog) { - return ::listen(fd_, backlog) == 0; -} - -int async_socket::accept() { - return ::accept(fd_, nullptr, nullptr); -} - -bool async_socket::close() -{ - if (!is_connected_) - return true; - - if(id_) - io_.unwatch(id_); - - auto res = ::close(fd_) == 0; - is_connected_ = false; - fd_ = -1; - return res; -} - -bool async_socket::async_write(const string& data, ready_cb_t fn) -{ - if (!is_connected() || !data.size()) - return false; - - io_.async_write(id_, [this, data, cb{std::move(fn)}]() -> void { - auto sent_chunk = send(data); - - if(sent_chunk == 0) - close(); - - if (sent_chunk < data.size() && sent_chunk != -1) { - async_write(data.substr(sent_chunk, data.size()), std::move(cb)); - return; - } - - cb(sent_chunk); - }); - - return true; -} - -bool async_socket::async_read(char *buffer, int max_len, recv_cb_t cb) -{ - if (!is_connected()) - return false; - - io_.async_read(id_, [&, buffer, max_len, cb{std::move(cb)}]() -> void { - auto l = receive(buffer, max_len); - if (l == 0) - close(); - - cb(l); - }); - - return true; -} - -void async_socket::async_accept(const std::function)>& cb) -{ - return io_.async_read(id_, - [this, cb]() - { - int fd = this->accept(); - auto s = std::make_shared(io_); - s->set_fd_socket(fd); - cb(s); - this->async_accept(cb); - } - ); -} - -bool async_socket::is_connected() const { - return is_connected_; -} - -void async_socket::set_fd_socket(int fd) -{ - fd_ = fd; - is_connected_ = true; - - id_ = io_.watch(fd_); -} - - -void async_socket::create_socket(int domain) { - if (-1 == (fd_ = socket(domain, SOCK_STREAM, 0))) - throw connect_socket_exception(); - - if (-1 == fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL) | O_NONBLOCK)) - throw nonblocking_socket_exception(); - - id_ = io_.watch(fd_); -} - -int async_socket::connect_to(async_socket::socket_t* socket_addr, int len) -{ - int ret = ::connect(fd_, socket_addr, len); - if (!ret) - is_connected_ = true; - - return ret; -} - -int async_socket::bind_to(async_socket::socket_t* socket_addr, int len) { - int b = ::bind(fd_, socket_addr, len); - if (!b) - is_connected_ = true; - - return b; -} - -}} diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp deleted file mode 100644 index aba11c8..0000000 --- a/src/network/tcp_socket.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include "../../includes/network/tcp_socket.hpp" - -#include - -namespace async_redis { -namespace network { - -tcp_socket::tcp_socket(event_loop::event_loop_ev& io) - : async_socket(io) -{ - this->create_socket(AF_INET); -} - -void tcp_socket::async_connect(const string& ip, int port, connect_handler_t handler) -{ - async_socket::template async_connect(0, handler, ip, port); -} - -bool tcp_socket::bind(const string& host, int port) -{ - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(host.data()); - - return this->bind_to((socket_t *)&addr, sizeof(addr)) == 0; -} - -int tcp_socket::connect(const string& host, int port) -{ - //TODO: - // setsockopt (fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)); - - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(host.data()); - - return this->connect_to((socket_t *)&addr, sizeof(addr)); -} - -} -} diff --git a/src/network/unix_socket.cpp b/src/network/unix_socket.cpp deleted file mode 100644 index ee7de36..0000000 --- a/src/network/unix_socket.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "../../includes/network/unix_socket.hpp" - -#include -#include // close -#include - -namespace async_redis { -namespace network { - -unix_socket::unix_socket(event_loop::event_loop_ev& io) - : async_socket(io) -{ - this->create_socket(AF_UNIX); -} - -void unix_socket::async_connect(const string& path, connect_handler_t handler) -{ - async_socket::template async_connect(0, handler, path); -} - -bool unix_socket::bind(const string& path) -{ - ::unlink(path.data()); - - struct sockaddr_un addr = {0}; - addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, path.data()); - - auto len = strlen(addr.sun_path) + sizeof(addr.sun_family); - - return this->bind_to((socket_t *)&addr, sizeof(addr)) == 0; -} - -int unix_socket::connect(const string& path) -{ - struct sockaddr_un addr = {0}; - addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, path.data()); - auto len = strlen(addr.sun_path) + sizeof(addr.sun_family); - - return this->connect_to((socket_t *)&addr, sizeof(addr)); -} - -} -} diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 9eaa1dc..be86d57 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -1,7 +1,7 @@ #include "../includes/redis_client.hpp" -#include -#include +// #include +// #include #include #include "connection.hpp" @@ -9,12 +9,12 @@ namespace async_redis { -redis_client::redis_client(event_loop::event_loop_ev &eventIO, int n) +redis_client::redis_client(asio::io_context &io, uint n) { conn_pool_.reserve(n); for (int i = 0; i < conn_pool_.capacity(); ++i) - conn_pool_.push_back(std::make_unique(eventIO)); + conn_pool_.push_back(std::make_unique(io)); } bool redis_client::is_connected() const @@ -118,13 +118,14 @@ void redis_client::send(const std::vector&& elems, const reply_cb_t& rep pipelined_cbs_.push_back(std::move(reply)); } + connection& redis_client::get_connection() { - auto &con = conn_pool_[con_rr_ctr_++]; + con_rr_ctr_++; if (con_rr_ctr_ == conn_pool_.size()) con_rr_ctr_ = 0; - return *con.get(); + return *conn_pool_[con_rr_ctr_]; } void redis_client::check_conn_connected(const connect_cb_t& handler, bool res) diff --git a/src/sentinel.cpp b/src/sentinel.cpp index ba38eba..c979145 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -5,14 +5,15 @@ namespace async_redis { -sentinel::sentinel(event_loop::event_loop_ev &event_loop) - : conn_(std::make_unique(event_loop)), - stream_(std::make_unique(event_loop)) + +sentinel::sentinel(asio::io_context &io) + : conn_(io), + stream_(io) { } bool sentinel::is_connected() const { - return stream_->is_connected() && conn_->is_connected(); + return stream_.is_connected() && conn_.is_connected(); } bool sentinel::connect(const string& ip, int port, connect_cb_t&& connector) @@ -24,9 +25,10 @@ bool sentinel::connect(const string& ip, int port, connect_cb_t&& connector) return true; } -void sentinel::disconnect() { - stream_->disconnect(); - conn_->disconnect(); +void sentinel::disconnect() +{ + stream_.disconnect(); + conn_.disconnect(); } bool sentinel::failover(const string& clustername, connection::reply_cb_t&& reply) @@ -53,7 +55,7 @@ bool sentinel::watch_master_change(cb_watch_master_change_t&& fn) [&]()-> bool { using State = monitor::EventState; - return stream_->subscribe({"+switch-master"}, + return stream_.subscribe({"+switch-master"}, [this, fn = std::move(fn)](const string& channel, parser_t event, State state) -> void { switch(state) @@ -147,8 +149,8 @@ void sentinel::connect_all(const string& ip, int port, const connect_cb_t& conne { auto cb = std::bind(&sentinel::check_connected, this, connector, std::placeholders::_1); - conn_->connect(cb, ip, port); - stream_->connect(cb, ip, port); + conn_.connect(cb, ip, port); + stream_.connect(cb, ip, port); } void sentinel::check_connected(const connect_cb_t& connector, bool res) @@ -171,7 +173,7 @@ bool sentinel::send(std::list&& words, connection::reply_cb_t&& reply) cmd += " " + w; cmd += "\r\n"; - if (!conn_->send(std::move(cmd), std::move(reply))) { + if (!conn_.send(std::move(cmd), std::move(reply))) { disconnect(); return false; } diff --git a/test/main.cpp b/test/main.cpp index 07fc251..64d3699 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -1,15 +1,9 @@ -#include #include #include -#include -#include #include #include -#include "../examples/tcp_server.hpp" - - -typedef async_redis::event_loop::event_loop_ev event_loop_ev; +#include struct monitor_test { @@ -20,23 +14,23 @@ struct monitor_test using redis_client_t = async_redis::redis_client; - monitor_test(event_loop_ev &loop, int n = 100) - : my_monitor(std::make_unique(loop)), - my_redis(std::make_unique(loop, 2)), + monitor_test(asio::io_context &loop, int n = 100) + : my_monitor(loop), + my_redis(loop, 2), n_ping(n) { start(); } void start() { - my_redis->connect(std::bind(&monitor_test::check_redis_connected, this, std::placeholders::_1), "127.0.0.1", 6379); + my_redis.connect(std::bind(&monitor_test::check_redis_connected, this, std::placeholders::_1), "127.0.0.1", 6379); } void check_redis_connected(bool status) { if (status) { std::cout << "RedisClient connected! \n"; - my_monitor->connect(std::bind(&monitor_test::check_monitor_connected, this, std::placeholders::_1), "127.0.0.1", 6379); + my_monitor.connect(std::bind(&monitor_test::check_monitor_connected, this, std::placeholders::_1), "127.0.0.1", 6379); } else { std::cout << "REDIS DIDNT CONNECT!" << std::endl; } @@ -45,10 +39,10 @@ struct monitor_test void check_monitor_connected(bool status) { if (status) { std::cout << "Monitor connected!" << std::endl; - my_monitor->subscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + my_monitor.subscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } else { std::cout << "MONITOR DIDNT CONNECT!" << std::endl; - my_redis->disconnect(); + my_redis.disconnect(); } } @@ -63,15 +57,15 @@ struct monitor_test break; case State::Stream: - std::cout << "watch EventStream" << std::endl; + // std::cout << "watch EventStream" << std::endl; if (play_with_event(event)) return; - my_monitor->unsubscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + my_monitor.unsubscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); break; case State::Disconnected: std::cout << "Are we fucked" << std::endl; - my_redis->disconnect(); + my_redis.disconnect(); start(); break; @@ -83,18 +77,18 @@ struct monitor_test void send_ping(long n) { - std::cout << "Pinging" << std::endl; - my_redis->publish("ping", std::to_string(n), + // std::cout << "Pinging" << std::endl; + my_redis.publish("ping", std::to_string(n), [](parsed_t res) { - std::cout << "Pinged " << res->to_string() << " connections." << std::endl; + // std::cout << "Pinged " << res->to_string() << " connections." << std::endl; } ); } bool play_with_event(const parsed_t& event) { - std::cout << "Ponged by " << event->to_string() << std::endl; + // std::cout << "Ponged by " << event->to_string() << std::endl; int n = 0; long x = std::stol(event->to_string()); @@ -106,8 +100,8 @@ struct monitor_test } private: - std::unique_ptr my_monitor; - std::unique_ptr my_redis; + monitor_t my_monitor; + redis_client_t my_redis; const long n_ping; }; @@ -117,7 +111,7 @@ struct sentinel_test using sentinel_t = async_redis::sentinel; using parser_t = sentinel_t::parser_t; - sentinel_test(event_loop_ev& ev) + sentinel_test(asio::io_context& ev) : io_(ev), my_sen1(std::make_unique(ev)) { @@ -158,12 +152,12 @@ struct sentinel_test private: std::unique_ptr my_sen1; - event_loop_ev &io_; + asio::io_context &io_; }; int main(int argc, char** args) { - event_loop_ev loop; + asio::io_context loop; monitor_test monitor_mock(loop, 100000); // async_redis::tcp_server::tcp_server server(loop); From 39ff11fb3c32358fcd1543e9e8a4208e318dd737 Mon Sep 17 00:00:00 2001 From: Hamid Date: Sun, 15 Mar 2020 01:35:01 +0100 Subject: [PATCH 2/3] Makes sure this compiles --- .gitignore | 1 + CMakeLists.txt | 3 +- build.sh | 6 ++ .../async_redis/parser/base_resp_parser.h | 10 +- includes/async_redis/redis_client.hpp | 27 ++---- src/parser/base_resp_parser.cpp | 35 ++----- src/redis_client.cpp | 93 +++---------------- test/main.cpp | 2 +- 8 files changed, 43 insertions(+), 134 deletions(-) create mode 100755 build.sh diff --git a/.gitignore b/.gitignore index 026dc27..58a3f62 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ # build directory build/ +.vscode/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 761d5b6..8db97a5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,8 +15,9 @@ else() SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${GCC_COVERAGE_LINK_FLAGS} -g" ) endif() -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +add_definitions(-DASIO_STANDALONE) set(PROJECT_SOURCE_DIR src) set(PROJECT_INCLUDE_DIR includes) diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..8225c5d --- /dev/null +++ b/build.sh @@ -0,0 +1,6 @@ +git submodule init +git submodule update +mkdir -p build && cd build +cmake .. +make +cd .. diff --git a/includes/async_redis/parser/base_resp_parser.h b/includes/async_redis/parser/base_resp_parser.h index 572d88b..da67fde 100644 --- a/includes/async_redis/parser/base_resp_parser.h +++ b/includes/async_redis/parser/base_resp_parser.h @@ -30,11 +30,11 @@ namespace async_redis { virtual int parse_append(const char*, ssize_t, bool&) = 0; virtual std::string to_string() const = 0; virtual void map(const caller_t &fn); - bool is_array() const; - bool is_number() const; - bool is_string() const; - bool is_enum() const; - bool is_error() const; + bool is_array() const noexcept; + bool is_number() const noexcept; + bool is_string() const noexcept; + bool is_enum() const noexcept; + bool is_error() const noexcept; void print(); }; diff --git a/includes/async_redis/redis_client.hpp b/includes/async_redis/redis_client.hpp index 13ffa60..a8de275 100644 --- a/includes/async_redis/redis_client.hpp +++ b/includes/async_redis/redis_client.hpp @@ -22,13 +22,12 @@ namespace async_redis class connect_exception : std::exception {}; using parser_t = connection::parser_t; - redis_client(asio::io_context &io, uint n = 1); - bool is_connected() const; + redis_client(asio::io_context &io) noexcept; + bool is_connected() const noexcept; template void connect(const connect_cb_t& handler, Args... args) { - for(auto &conn : conn_pool_) - conn->connect(std::bind(&redis_client::check_conn_connected, this, handler, std::placeholders::_1), args...); + conn_.connect(handler, args...); } void disconnect(); @@ -42,27 +41,13 @@ namespace async_redis void ping(reply_cb_t reply); void publish(const string& channel, const string& msg, reply_cb_t&& reply); void sort(const string& hash_name, std::vector&& fields, reply_cb_t&& reply); - - //TODO: wtf?! doesnt make sense with multiple connections! - // void select(uint catalog, reply_cb_t&& reply) { - // send({"select", std::to_string(catalog)}, reply); - // } - - void commit_pipeline(); - redis_client& pipeline_on(); - redis_client& pipeline_off(); + void select(uint catalog, reply_cb_t&& reply); private: - void send(const std::vector&& elems, const reply_cb_t& reply); - connection& get_connection(); - void check_conn_connected(const connect_cb_t& handler, bool res); + void send(std::vector&& elems, const reply_cb_t& reply); private: - std::string pipeline_buffer_; - bool pipelined_state_ = false; - std::vector pipelined_cbs_; - std::vector> conn_pool_; - int con_rr_ctr_ = 0; + connection conn_; int connected_called_ = 0; bool is_connected_ = false; }; diff --git a/src/parser/base_resp_parser.cpp b/src/parser/base_resp_parser.cpp index e3e1af0..219faa6 100644 --- a/src/parser/base_resp_parser.cpp +++ b/src/parser/base_resp_parser.cpp @@ -51,35 +51,20 @@ base_resp_parser::append_chunk(base_resp_parser::parser& data, const char* chunk return data->parse_append(chunk, length, is_finished); } -bool -base_resp_parser::is_array() const -{ - this->type() == RespType::Arr; -} +bool base_resp_parser::is_array() const noexcept +{ return this->type() == RespType::Arr; } -bool -base_resp_parser::is_number() const -{ - this->type() == RespType::Num; -} +bool base_resp_parser::is_number() const noexcept +{ return this->type() == RespType::Num; } -bool -base_resp_parser::is_error() const -{ - this->type() == RespType::Err; -} +bool base_resp_parser::is_error() const noexcept +{ return this->type() == RespType::Err; } -bool -base_resp_parser::is_string() const -{ - this->type() == RespType::BulkStr; -} +bool base_resp_parser::is_string() const noexcept +{ return this->type() == RespType::BulkStr; } -bool -base_resp_parser::is_enum() const -{ - this->type() == RespType::Str; -} +bool base_resp_parser::is_enum() const noexcept +{ return this->type() == RespType::Str; } void base_resp_parser::print() diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 482b66d..ebf7146 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -5,21 +5,15 @@ namespace async_redis { -redis_client::redis_client(asio::io_context &io, uint n) -{ - conn_pool_.reserve(n); - - for (int i = 0; i < conn_pool_.capacity(); ++i) - conn_pool_.push_back(std::make_unique(io)); -} - -bool redis_client::is_connected() const -{ return is_connected_; } +redis_client::redis_client(asio::io_context &io) noexcept + : conn_(io) +{} +bool redis_client::is_connected() const noexcept +{ return conn_.is_connected(); } void redis_client::disconnect() { - for(auto &conn : conn_pool_) - conn->disconnect(); + conn_.disconnect(); } void redis_client::set(const string& key, const string& value, reply_cb_t reply) { @@ -58,10 +52,9 @@ void redis_client::ping(reply_cb_t reply) { send({"ping"}, reply); } -//TODO: wtf?! doesnt make sense with multiple connections! -// void select(uint catalog, reply_cb_t&& reply) { -// send({"select", std::to_string(catalog)}, reply); -// } +void redis_client::select(uint catalog, reply_cb_t&& reply) { + send({"select", std::to_string(catalog)}, reply); +} void redis_client::publish(const string& channel, const string& msg, reply_cb_t&& reply) { send({"publish", channel, msg}, reply); @@ -77,33 +70,9 @@ redis_client::sort(const string& hash_name, std::vector&& fields, reply_ send({"sort " + hash_name + " by nosort", req}, reply); } - -void redis_client::commit_pipeline() { - string buffer; - std::swap(pipeline_buffer_, buffer); - std::vector cbs; - pipelined_cbs_.swap(cbs); - - is_connected_ = get_connection().pipelined_send(std::move(buffer), std::move(cbs)); - if (!is_connected_) { - disconnect(); - throw connect_exception(); - } -} - -redis_client& redis_client::pipeline_on() { - pipelined_state_ = true; - return *this; -} - -redis_client& redis_client::pipeline_off() { - pipelined_state_ = false; - return *this; -} - -void redis_client::send(const std::vector&& elems, const reply_cb_t& reply) +void redis_client::send(std::vector&& elems, const reply_cb_t& reply) { - if (!is_connected_) + if (!is_connected()) throw connect_exception(); string cmd; @@ -112,45 +81,7 @@ void redis_client::send(const std::vector&& elems, const reply_cb_t& rep cmd += "\r\n"; - if (!pipelined_state_) { - is_connected_ = get_connection().send(std::move(cmd), reply); - if(!is_connected_) { - disconnect(); - throw connect_exception(); - } - return; - } - - pipeline_buffer_ += cmd; - pipelined_cbs_.push_back(std::move(reply)); -} - - -connection& redis_client::get_connection() -{ - if (con_rr_ctr_ == conn_pool_.size()) - con_rr_ctr_ = 0; - - return *conn_pool_[con_rr_ctr_++]; -} - -void redis_client::check_conn_connected(const connect_cb_t& handler, bool res) -{ - if (++connected_called_ != conn_pool_.size()) - return; - - bool val = true; - for(auto &con : conn_pool_) - val &= con->is_connected(); - - if (!val) { - for(auto &con : conn_pool_) - con->disconnect(); - } - - connected_called_ = 0; - is_connected_ = val; - return handler(val); + conn_.send(std::move(cmd), reply); } } diff --git a/test/main.cpp b/test/main.cpp index 7700558..fd9416b 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -16,7 +16,7 @@ struct monitor_test monitor_test(asio::io_context &loop, int n = 100) : my_monitor(loop), - my_redis(loop, 2), + my_redis(loop), n_ping(n) { start(); From be8067f254025de59681729ca253ce4d4b541653 Mon Sep 17 00:00:00 2001 From: Hamid Date: Sun, 15 Mar 2020 01:41:43 +0100 Subject: [PATCH 3/3] Removes the exception --- includes/async_redis/redis_client.hpp | 3 +-- src/redis_client.cpp | 7 ++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/includes/async_redis/redis_client.hpp b/includes/async_redis/redis_client.hpp index a8de275..487d59f 100644 --- a/includes/async_redis/redis_client.hpp +++ b/includes/async_redis/redis_client.hpp @@ -19,7 +19,6 @@ namespace async_redis using connect_cb_t = connection::connect_handler_t; public: - class connect_exception : std::exception {}; using parser_t = connection::parser_t; redis_client(asio::io_context &io) noexcept; @@ -44,7 +43,7 @@ namespace async_redis void select(uint catalog, reply_cb_t&& reply); private: - void send(std::vector&& elems, const reply_cb_t& reply); + bool send(std::vector&& elems, const reply_cb_t& reply) noexcept; private: connection conn_; diff --git a/src/redis_client.cpp b/src/redis_client.cpp index ebf7146..77fa3c8 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -70,18 +70,15 @@ redis_client::sort(const string& hash_name, std::vector&& fields, reply_ send({"sort " + hash_name + " by nosort", req}, reply); } -void redis_client::send(std::vector&& elems, const reply_cb_t& reply) +bool redis_client::send(std::vector&& elems, const reply_cb_t& reply) noexcept { - if (!is_connected()) - throw connect_exception(); - string cmd; for (auto &s : elems) cmd += s + " "; cmd += "\r\n"; - conn_.send(std::move(cmd), reply); + return conn_.send(std::move(cmd), reply); } }