diff --git a/.gitignore b/.gitignore index 026dc27..58a3f62 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ # build directory build/ +.vscode/ diff --git a/.gitmodules b/.gitmodules index 3b7fecf..c5d4608 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ -[submodule "libs/libevpp"] - path = libs/libevpp - url = https://github.com/hamidr/libevpp +[submodule "libs/asio"] + path = libs/asio + url = https://github.com/chriskohlhoff/asio diff --git a/CMakeLists.txt b/CMakeLists.txt index 54e9dce..8db97a5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,8 +15,9 @@ else() SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${GCC_COVERAGE_LINK_FLAGS} -g" ) endif() -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +add_definitions(-DASIO_STANDALONE) set(PROJECT_SOURCE_DIR src) set(PROJECT_INCLUDE_DIR includes) @@ -25,13 +26,7 @@ include_directories(${PROJECT_INCLUDE_DIR}) include_directories(/usr/include/) include_directories(/usr/local/include) -link_directories(/usr/lib) -link_directories(/usr/local/lib) - -add_subdirectory(libs/libevpp/) - -include_directories(libs/libevpp/includes) - +include_directories(libs/asio/asio/include/) add_library(parser ${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp @@ -54,7 +49,20 @@ if(CMAKE_COMPILER_IS_GNUCXX) set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary endif() -target_link_libraries(async_redis network parser) +target_link_libraries(async_redis parser pthread) + +install(TARGETS async_redis + LIBRARY DESTINATION /usr/local/lib/ + ARCHIVE DESTINATION /usr/local/lib/ + ) + +install(TARGETS parser + LIBRARY DESTINATION /usr/local/lib/ + ARCHIVE DESTINATION /usr/local/lib/ + ) + + +install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include) add_executable (a2.out test/main.cpp) target_link_libraries(a2.out async_redis) diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..8225c5d --- /dev/null +++ b/build.sh @@ -0,0 +1,6 @@ +git submodule init +git submodule update +mkdir -p build && cd build +cmake .. +make +cd .. diff --git a/includes/async_redis/connection.hpp b/includes/async_redis/connection.hpp index 772ea2b..10cba54 100644 --- a/includes/async_redis/connection.hpp +++ b/includes/async_redis/connection.hpp @@ -2,29 +2,27 @@ #include #include -#include #include +#include +#include #include -#include -#include - -using namespace libevpp; namespace async_redis { class connection { - using async_socket = libevpp::network::async_socket; + connection(const connection&) = delete; + connection& operator = (const connection&) = delete; public: + using connect_handler_t = std::function; using parser_t = parser::base_resp_parser::parser; using reply_cb_t = std::function; - connection(event_loop::event_loop_ev& event_loop); + connection(asio::io_context&); - void connect(async_socket::connect_handler_t handler, const std::string& ip, int port); - void connect(async_socket::connect_handler_t handler, const std::string& path); + void connect(connect_handler_t handler, const std::string& ip, int port); bool is_connected() const; void disconnect(); @@ -33,14 +31,14 @@ namespace async_redis private: void do_read(); - void reply_received(ssize_t len); + void reply_received(const asio::error_code& ec, size_t len); private: - std::unique_ptr socket_; + asio::ip::tcp::socket socket_; std::queue> req_queue_; - event_loop::event_loop_ev& event_loop_; enum { max_data_size = 1024 }; char data_[max_data_size]; + bool is_connected_ = false; }; } diff --git a/includes/async_redis/monitor.hpp b/includes/async_redis/monitor.hpp index 8868ff5..64dc766 100644 --- a/includes/async_redis/monitor.hpp +++ b/includes/async_redis/monitor.hpp @@ -1,20 +1,24 @@ #pragma once -#include #include #include #include #include +#include +#include + using std::string; -using namespace libevpp; namespace async_redis { class monitor { - using async_socket = network::async_socket; + monitor(const monitor&) = delete; + monitor& operator = (const monitor&) = delete; + + using connect_handler_t = std::function; public: enum EventState { @@ -27,10 +31,8 @@ namespace async_redis using parser_t = parser::base_resp_parser::parser; using watcher_cb_t = std::function; - monitor(event_loop::event_loop_ev &event_loop); - - void connect(async_socket::connect_handler_t handler, const std::string& ip, int port); - void connect(async_socket::connect_handler_t handler, const std::string& path); + monitor(asio::io_context &event_loop); + void connect(connect_handler_t handler, const std::string& ip, int port); bool is_connected() const; bool is_watching() const; @@ -42,6 +44,7 @@ namespace async_redis bool punsubscribe(const std::list& channels, watcher_cb_t&& cb); private: + void do_read(); bool send_and_receive(string&& data); void handle_message_event(parser_t& channel, parser_t& value); void handle_subscribe_event(parser_t& channel, parser_t& clients); @@ -52,18 +55,18 @@ namespace async_redis void handle_event(parser_t&& request); void report_disconnect(); - void stream_received(ssize_t len); + void stream_received(const asio::error_code& ec, size_t len); private: parser_t parser_; std::unordered_map watchers_; std::unordered_map pwatchers_; - std::unique_ptr socket_; - event_loop::event_loop_ev &io_; + asio::ip::tcp::socket socket_; enum { max_data_size = 1024 }; char data_[max_data_size]; bool is_watching_ = false; + bool is_connected_ = false; }; } diff --git a/includes/async_redis/parser/base_resp_parser.h b/includes/async_redis/parser/base_resp_parser.h index 572d88b..da67fde 100644 --- a/includes/async_redis/parser/base_resp_parser.h +++ b/includes/async_redis/parser/base_resp_parser.h @@ -30,11 +30,11 @@ namespace async_redis { virtual int parse_append(const char*, ssize_t, bool&) = 0; virtual std::string to_string() const = 0; virtual void map(const caller_t &fn); - bool is_array() const; - bool is_number() const; - bool is_string() const; - bool is_enum() const; - bool is_error() const; + bool is_array() const noexcept; + bool is_number() const noexcept; + bool is_string() const noexcept; + bool is_enum() const noexcept; + bool is_error() const noexcept; void print(); }; diff --git a/includes/async_redis/redis_client.hpp b/includes/async_redis/redis_client.hpp index aa5324a..487d59f 100644 --- a/includes/async_redis/redis_client.hpp +++ b/includes/async_redis/redis_client.hpp @@ -12,20 +12,21 @@ namespace async_redis class redis_client { + redis_client(const redis_client&) = delete; + redis_client& operator = (const redis_client&) = delete; + using reply_cb_t = connection::reply_cb_t; - using connect_cb_t = network::async_socket::connect_handler_t; + using connect_cb_t = connection::connect_handler_t; public: - class connect_exception : std::exception {}; using parser_t = connection::parser_t; - redis_client(event_loop::event_loop_ev &eventIO, int n = 1); - bool is_connected() const; + redis_client(asio::io_context &io) noexcept; + bool is_connected() const noexcept; template void connect(const connect_cb_t& handler, Args... args) { - for(auto &conn : conn_pool_) - conn->connect(std::bind(&redis_client::check_conn_connected, this, handler, std::placeholders::_1), args...); + conn_.connect(handler, args...); } void disconnect(); @@ -39,27 +40,13 @@ namespace async_redis void ping(reply_cb_t reply); void publish(const string& channel, const string& msg, reply_cb_t&& reply); void sort(const string& hash_name, std::vector&& fields, reply_cb_t&& reply); - - //TODO: wtf?! doesnt make sense with multiple connections! - // void select(uint catalog, reply_cb_t&& reply) { - // send({"select", std::to_string(catalog)}, reply); - // } - - void commit_pipeline(); - redis_client& pipeline_on(); - redis_client& pipeline_off(); + void select(uint catalog, reply_cb_t&& reply); private: - void send(const std::vector&& elems, const reply_cb_t& reply); - connection& get_connection(); - void check_conn_connected(const connect_cb_t& handler, bool res); + bool send(std::vector&& elems, const reply_cb_t& reply) noexcept; private: - std::string pipeline_buffer_; - bool pipelined_state_ = false; - std::vector pipelined_cbs_; - std::vector> conn_pool_; - int con_rr_ctr_ = 0; + connection conn_; int connected_called_ = 0; bool is_connected_ = false; }; diff --git a/includes/async_redis/sentinel.hpp b/includes/async_redis/sentinel.hpp index 0cb3a5f..29fde6d 100644 --- a/includes/async_redis/sentinel.hpp +++ b/includes/async_redis/sentinel.hpp @@ -1,18 +1,14 @@ #pragma once -#include #include -#include #include +#include #include -using namespace libevpp; - namespace async_redis { class sentinel { - using socket_t = libevpp::network::async_socket; - using connect_cb_t = socket_t::connect_handler_t; + using connect_cb_t = connection::connect_handler_t; public: using parser_t = parser::base_resp_parser::parser; @@ -22,7 +18,7 @@ namespace async_redis { Watching }; - sentinel(event_loop::event_loop_ev &event_loop); + sentinel(asio::io_context &io); bool is_connected() const; bool connect(const string& ip, int port, connect_cb_t&& connector); diff --git a/libs/asio b/libs/asio new file mode 160000 index 0000000..14db637 --- /dev/null +++ b/libs/asio @@ -0,0 +1 @@ +Subproject commit 14db6371b338339383aacaed29b0fa352259645a diff --git a/libs/libevpp b/libs/libevpp deleted file mode 160000 index 906c1f6..0000000 --- a/libs/libevpp +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 906c1f66767da69d6fa5cd3a76458f6a5472bdda diff --git a/src/connection.cpp b/src/connection.cpp index ce2c502..2975a23 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -1,52 +1,37 @@ #include "../includes/async_redis/connection.hpp" -#include -#include -#include -#include #include -#include -#include - namespace async_redis { -using tcp_socket = network::tcp_socket; -using unix_socket = network::unix_socket; - -connection::connection(event_loop::event_loop_ev& event_loop) - : event_loop_(event_loop) { -} +connection::connection(asio::io_context& io) + : socket_(io) +{ } -void connection::connect(async_socket::connect_handler_t handler, const std::string& ip, int port) +void connection::connect(connect_handler_t handler, const std::string& ip, int port) { - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(event_loop_); + asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip), port); - static_cast(socket_.get())->async_connect(ip, port, handler); -} - -void connection::connect(async_socket::connect_handler_t handler, const std::string& path) -{ - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(event_loop_); - - static_cast(socket_.get())->async_connect(path, handler); + socket_.async_connect(endpoint, [handler, this](const asio::error_code &ec) { + is_connected_ = !ec; + handler(!ec); + }); } bool connection::is_connected() const { - return socket_ && socket_->is_connected(); + return is_connected_; } void connection::disconnect() { - socket_->close(); + socket_.close(); //TODO: check the policy! Should we free queue or retry again? decltype(req_queue_) free_me; free_me.swap(req_queue_); + is_connected_ = false; } bool connection::pipelined_send(std::string&& pipelined_cmds, std::vector&& callbacks) @@ -54,9 +39,10 @@ bool connection::pipelined_send(std::string&& pipelined_cmds, std::vectorasync_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) { - if (sent_chunk_len == 0) + socket_.async_send(asio::buffer(pipelined_cmds.data(), pipelined_cmds.length()), + [this, cbs = std::move(callbacks)](const asio::error_code &ec, size_t len) + { + if (ec) return disconnect(); if (!req_queue_.size() && cbs.size()) @@ -64,7 +50,10 @@ bool connection::pipelined_send(std::string&& pipelined_cmds, std::vectorasync_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) { - if (sent_chunk_len == 0) + socket_.async_send(asio::buffer(command.data(), command.length()), + [this, read_it](const asio::error_code &ec, size_t len) + { + // std::cout << ec << std::endl; + if (ec) return disconnect(); - if (read_it) - do_read(); - }); + if (read_it) + do_read(); + } + ); + return true; } void connection::do_read() { - socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1)); + socket_.async_read_some( + asio::buffer(data_, max_data_size), + std::bind( + &connection::reply_received, + this, + std::placeholders::_1, + std::placeholders::_2 + ) + ); } -void connection::reply_received(ssize_t len) +void connection::reply_received(const asio::error_code& ec, size_t len) { - if (len == 0) + if (ec) return disconnect(); ssize_t acc = 0; diff --git a/src/monitor.cpp b/src/monitor.cpp index 2c3ace3..a41ee9f 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -2,47 +2,41 @@ #include #include -#include -#include namespace async_redis { -using tcp_socket = network::tcp_socket; -using unix_socket = network::unix_socket; -monitor::monitor(event_loop::event_loop_ev &event_loop) - : io_(event_loop) +monitor::monitor(asio::io_context &io) + : socket_(io) {} -void monitor::connect(async_socket::connect_handler_t handler, const std::string& ip, int port) +void monitor::connect(connect_handler_t handler, const std::string& ip, int port) { - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(io_); + if (socket_.is_open()) + socket_.close(); - static_cast(socket_.get())->async_connect(ip, port, handler); -} - -void monitor::connect(async_socket::connect_handler_t handler, const std::string& path) -{ - if (!socket_ || !socket_->is_valid()) - socket_ = std::make_unique(io_); + asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip), port); - static_cast(socket_.get())->async_connect(path, handler); + socket_.async_connect(endpoint, [handler, this](const asio::error_code &ec) { + is_connected_ = !ec; + handler(!ec); + }); } bool monitor::is_connected() const -{ return socket_->is_connected(); } +{ return is_connected_; } bool monitor::is_watching() const -{ return this->is_connected() && is_watching_; } +{ return is_connected() && is_watching_; } void monitor::disconnect() { - socket_->close(); + socket_.close(); pwatchers_.clear(); watchers_.clear(); is_watching_ = false; + is_connected_ = false; } bool monitor::psubscribe(const std::list& channels, watcher_cb_t&& cb) @@ -103,25 +97,32 @@ bool monitor::send_and_receive(string&& data) if (!is_connected()) return false; - socket_->async_write(data, [this](ssize_t sent_chunk_len) { - if (is_watching_) - return; - - this->socket_->async_read( - this->data_, - this->max_data_size, - std::bind( - &monitor::stream_received, - this, - std::placeholders::_1 - ) - ); - - this->is_watching_ = true; - }); + socket_.async_send(asio::buffer(data.data(), data.length()), + [this](const asio::error_code& ec, size_t sent_chunk_len) + { + if (is_watching_) + return; + + do_read(); + + this->is_watching_ = true; + }); return true; } +void monitor::do_read() +{ + this->socket_.async_read_some( + asio::buffer(this->data_, this->max_data_size), + std::bind( + &monitor::stream_received, + this, + std::placeholders::_1, + std::placeholders::_2 + ) + ); +} + void monitor::handle_message_event(parser_t& channel, parser_t& value) { const string& ch_key = channel->to_string(); @@ -217,9 +218,9 @@ void monitor::report_disconnect() disconnect(); } -void monitor::stream_received(ssize_t len) + void monitor::stream_received(const asio::error_code& ec, size_t len) { - if (len == 0) + if (ec) return report_disconnect(); ssize_t acc = 0; @@ -244,15 +245,7 @@ void monitor::stream_received(ssize_t len) return; } - this->socket_->async_read( - this->data_, - this->max_data_size, - std::bind( - &monitor::stream_received, - this, - std::placeholders::_1 - ) - ); + do_read(); } } diff --git a/src/parser/base_resp_parser.cpp b/src/parser/base_resp_parser.cpp index e3e1af0..219faa6 100644 --- a/src/parser/base_resp_parser.cpp +++ b/src/parser/base_resp_parser.cpp @@ -51,35 +51,20 @@ base_resp_parser::append_chunk(base_resp_parser::parser& data, const char* chunk return data->parse_append(chunk, length, is_finished); } -bool -base_resp_parser::is_array() const -{ - this->type() == RespType::Arr; -} +bool base_resp_parser::is_array() const noexcept +{ return this->type() == RespType::Arr; } -bool -base_resp_parser::is_number() const -{ - this->type() == RespType::Num; -} +bool base_resp_parser::is_number() const noexcept +{ return this->type() == RespType::Num; } -bool -base_resp_parser::is_error() const -{ - this->type() == RespType::Err; -} +bool base_resp_parser::is_error() const noexcept +{ return this->type() == RespType::Err; } -bool -base_resp_parser::is_string() const -{ - this->type() == RespType::BulkStr; -} +bool base_resp_parser::is_string() const noexcept +{ return this->type() == RespType::BulkStr; } -bool -base_resp_parser::is_enum() const -{ - this->type() == RespType::Str; -} +bool base_resp_parser::is_enum() const noexcept +{ return this->type() == RespType::Str; } void base_resp_parser::print() diff --git a/src/redis_client.cpp b/src/redis_client.cpp index 9e918c6..77fa3c8 100644 --- a/src/redis_client.cpp +++ b/src/redis_client.cpp @@ -1,27 +1,19 @@ #include "../includes/async_redis/redis_client.hpp" -#include -#include #include namespace async_redis { -redis_client::redis_client(event_loop::event_loop_ev &eventIO, int n) -{ - conn_pool_.reserve(n); - - for (int i = 0; i < conn_pool_.capacity(); ++i) - conn_pool_.push_back(std::make_unique(eventIO)); -} - -bool redis_client::is_connected() const -{ return is_connected_; } +redis_client::redis_client(asio::io_context &io) noexcept + : conn_(io) +{} +bool redis_client::is_connected() const noexcept +{ return conn_.is_connected(); } void redis_client::disconnect() { - for(auto &conn : conn_pool_) - conn->disconnect(); + conn_.disconnect(); } void redis_client::set(const string& key, const string& value, reply_cb_t reply) { @@ -60,10 +52,9 @@ void redis_client::ping(reply_cb_t reply) { send({"ping"}, reply); } -//TODO: wtf?! doesnt make sense with multiple connections! -// void select(uint catalog, reply_cb_t&& reply) { -// send({"select", std::to_string(catalog)}, reply); -// } +void redis_client::select(uint catalog, reply_cb_t&& reply) { + send({"select", std::to_string(catalog)}, reply); +} void redis_client::publish(const string& channel, const string& msg, reply_cb_t&& reply) { send({"publish", channel, msg}, reply); @@ -79,81 +70,15 @@ redis_client::sort(const string& hash_name, std::vector&& fields, reply_ send({"sort " + hash_name + " by nosort", req}, reply); } - -void redis_client::commit_pipeline() { - string buffer; - std::swap(pipeline_buffer_, buffer); - std::vector cbs; - pipelined_cbs_.swap(cbs); - - is_connected_ = get_connection().pipelined_send(std::move(buffer), std::move(cbs)); - if (!is_connected_) { - disconnect(); - throw connect_exception(); - } -} - -redis_client& redis_client::pipeline_on() { - pipelined_state_ = true; - return *this; -} - -redis_client& redis_client::pipeline_off() { - pipelined_state_ = false; - return *this; -} - -void redis_client::send(const std::vector&& elems, const reply_cb_t& reply) +bool redis_client::send(std::vector&& elems, const reply_cb_t& reply) noexcept { - if (!is_connected_) - throw connect_exception(); - string cmd; for (auto &s : elems) cmd += s + " "; cmd += "\r\n"; - if (!pipelined_state_) { - is_connected_ = get_connection().send(std::move(cmd), reply); - if(!is_connected_) { - disconnect(); - throw connect_exception(); - } - return; - } - - pipeline_buffer_ += cmd; - pipelined_cbs_.push_back(std::move(reply)); -} - -connection& redis_client::get_connection() -{ - auto &con = conn_pool_[con_rr_ctr_++]; - if (con_rr_ctr_ == conn_pool_.size()) - con_rr_ctr_ = 0; - - return *con.get(); -} - -void redis_client::check_conn_connected(const connect_cb_t& handler, bool res) -{ - ++connected_called_; - if (connected_called_ != conn_pool_.size()) - return; - - bool val = true; - for(auto &con : conn_pool_) - val &= con->is_connected(); - - if (!val) { - for(auto &con : conn_pool_) - con->disconnect(); - } - - connected_called_ = 0; - is_connected_ = val; - return handler(val); + return conn_.send(std::move(cmd), reply); } } diff --git a/src/sentinel.cpp b/src/sentinel.cpp index c36dc3d..3f55786 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -5,9 +5,9 @@ namespace async_redis { -sentinel::sentinel(event_loop::event_loop_ev &event_loop) - : conn_(event_loop), - stream_(event_loop) +sentinel::sentinel(asio::io_context &io) + : conn_(io), + stream_(io) { } bool sentinel::is_connected() const @@ -24,7 +24,8 @@ bool sentinel::connect(const string& ip, int port, connect_cb_t&& connector) return true; } -void sentinel::disconnect() { +void sentinel::disconnect() +{ stream_.disconnect(); conn_.disconnect(); } diff --git a/test/main.cpp b/test/main.cpp index 076b62b..fd9416b 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -1,15 +1,9 @@ -#include - #include #include -#include -#include #include #include -#include "../examples/tcp_server.hpp" - -typedef libevpp::event_loop::event_loop_ev event_loop_ev; +#include struct monitor_test { @@ -20,23 +14,23 @@ struct monitor_test using redis_client_t = async_redis::redis_client; - monitor_test(event_loop_ev &loop, int n = 100) - : my_monitor(std::make_unique(loop)), - my_redis(std::make_unique(loop, 2)), + monitor_test(asio::io_context &loop, int n = 100) + : my_monitor(loop), + my_redis(loop), n_ping(n) { start(); } void start() { - my_redis->connect(std::bind(&monitor_test::check_redis_connected, this, std::placeholders::_1), "127.0.0.1", 6379); + my_redis.connect(std::bind(&monitor_test::check_redis_connected, this, std::placeholders::_1), "127.0.0.1", 6379); } void check_redis_connected(bool status) { if (status) { std::cout << "RedisClient connected! \n"; - my_monitor->connect(std::bind(&monitor_test::check_monitor_connected, this, std::placeholders::_1), "127.0.0.1", 6379); + my_monitor.connect(std::bind(&monitor_test::check_monitor_connected, this, std::placeholders::_1), "127.0.0.1", 6379); } else { std::cout << "REDIS DIDNT CONNECT!" << std::endl; } @@ -45,10 +39,10 @@ struct monitor_test void check_monitor_connected(bool status) { if (status) { std::cout << "Monitor connected!" << std::endl; - my_monitor->subscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + my_monitor.subscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } else { std::cout << "MONITOR DIDNT CONNECT!" << std::endl; - my_redis->disconnect(); + my_redis.disconnect(); } } @@ -63,15 +57,15 @@ struct monitor_test break; case State::Stream: - std::cout << "watch EventStream" << std::endl; + // std::cout << "watch EventStream" << std::endl; if (play_with_event(event)) return; - my_monitor->unsubscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + my_monitor.unsubscribe({"ping"}, std::bind(&monitor_test::watch_hello_msgs, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); break; case State::Disconnected: std::cout << "Are we fucked" << std::endl; - my_redis->disconnect(); + my_redis.disconnect(); start(); break; @@ -83,18 +77,18 @@ struct monitor_test void send_ping(long n) { - std::cout << "Pinging" << std::endl; - my_redis->publish("ping", std::to_string(n), + // std::cout << "Pinging" << std::endl; + my_redis.publish("ping", std::to_string(n), [](parsed_t res) { - std::cout << "Pinged " << res->to_string() << " connections." << std::endl; + // std::cout << "Pinged " << res->to_string() << " connections." << std::endl; } ); } bool play_with_event(const parsed_t& event) { - std::cout << "Ponged by " << event->to_string() << std::endl; + // std::cout << "Ponged by " << event->to_string() << std::endl; int n = 0; long x = std::stol(event->to_string()); @@ -106,8 +100,8 @@ struct monitor_test } private: - std::unique_ptr my_monitor; - std::unique_ptr my_redis; + monitor_t my_monitor; + redis_client_t my_redis; const long n_ping; }; @@ -117,7 +111,7 @@ struct sentinel_test using sentinel_t = async_redis::sentinel; using parser_t = sentinel_t::parser_t; - sentinel_test(event_loop_ev& ev) + sentinel_test(asio::io_context& ev) : io_(ev), my_sen1(std::make_unique(ev)) { @@ -158,12 +152,12 @@ struct sentinel_test private: std::unique_ptr my_sen1; - event_loop_ev &io_; + asio::io_context &io_; }; int main(int argc, char** args) { - event_loop_ev loop; + asio::io_context loop; monitor_test monitor_mock(loop, 100000); // async_redis::tcp_server::tcp_server server(loop);