Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asio support #19 #21

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@

# build directory
build/
.vscode/
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "libs/libevpp"]
path = libs/libevpp
url = https://github.com/hamidr/libevpp
[submodule "libs/asio"]
path = libs/asio
url = https://github.com/chriskohlhoff/asio
26 changes: 17 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ else()
SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${GCC_COVERAGE_LINK_FLAGS} -g" )
endif()

set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_definitions(-DASIO_STANDALONE)

set(PROJECT_SOURCE_DIR src)
set(PROJECT_INCLUDE_DIR includes)
Expand All @@ -25,13 +26,7 @@ include_directories(${PROJECT_INCLUDE_DIR})
include_directories(/usr/include/)
include_directories(/usr/local/include)

link_directories(/usr/lib)
link_directories(/usr/local/lib)

add_subdirectory(libs/libevpp/)

include_directories(libs/libevpp/includes)

include_directories(libs/asio/asio/include/)

add_library(parser
${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp
Expand All @@ -54,7 +49,20 @@ if(CMAKE_COMPILER_IS_GNUCXX)
set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary
endif()

target_link_libraries(async_redis network parser)
target_link_libraries(async_redis parser pthread)

install(TARGETS async_redis
LIBRARY DESTINATION /usr/local/lib/
ARCHIVE DESTINATION /usr/local/lib/
)

install(TARGETS parser
LIBRARY DESTINATION /usr/local/lib/
ARCHIVE DESTINATION /usr/local/lib/
)


install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include)

add_executable (a2.out test/main.cpp)
target_link_libraries(a2.out async_redis)
6 changes: 6 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
git submodule init
git submodule update
mkdir -p build && cd build
cmake ..
make
cd ..
22 changes: 10 additions & 12 deletions includes/async_redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,27 @@

#include <queue>
#include <functional>
#include <memory>
#include <tuple>

#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <async_redis/parser/base_resp_parser.h>
#include <libevpp/event_loop/event_loop_ev.h>
#include <libevpp/network/async_socket.hpp>

using namespace libevpp;

namespace async_redis
{
class connection
{
using async_socket = libevpp::network::async_socket;
connection(const connection&) = delete;
connection& operator = (const connection&) = delete;

public:
using connect_handler_t = std::function<void(bool)>;
using parser_t = parser::base_resp_parser::parser;
using reply_cb_t = std::function<void (parser_t&)>;

connection(event_loop::event_loop_ev& event_loop);
connection(asio::io_context&);

void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
void connect(async_socket::connect_handler_t handler, const std::string& path);
void connect(connect_handler_t handler, const std::string& ip, int port);

bool is_connected() const;
void disconnect();
Expand All @@ -33,14 +31,14 @@ namespace async_redis

private:
void do_read();
void reply_received(ssize_t len);
void reply_received(const asio::error_code& ec, size_t len);

private:
std::unique_ptr<async_socket> socket_;
asio::ip::tcp::socket socket_;
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;

event_loop::event_loop_ev& event_loop_;
enum { max_data_size = 1024 };
char data_[max_data_size];
bool is_connected_ = false;
};
}
23 changes: 13 additions & 10 deletions includes/async_redis/monitor.hpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
#pragma once

#include <libevpp/network/async_socket.hpp>
#include <async_redis/parser/base_resp_parser.h>

#include <unordered_map>
#include <list>
#include <string>

#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>

using std::string;
using namespace libevpp;

namespace async_redis
{
class monitor
{
using async_socket = network::async_socket;
monitor(const monitor&) = delete;
monitor& operator = (const monitor&) = delete;

using connect_handler_t = std::function<void(bool)>;

public:
enum EventState {
Expand All @@ -27,10 +31,8 @@ namespace async_redis
using parser_t = parser::base_resp_parser::parser;
using watcher_cb_t = std::function<void (const string&, parser_t&, EventState)>;

monitor(event_loop::event_loop_ev &event_loop);

void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
void connect(async_socket::connect_handler_t handler, const std::string& path);
monitor(asio::io_context &event_loop);
void connect(connect_handler_t handler, const std::string& ip, int port);

bool is_connected() const;
bool is_watching() const;
Expand All @@ -42,6 +44,7 @@ namespace async_redis
bool punsubscribe(const std::list<string>& channels, watcher_cb_t&& cb);

private:
void do_read();
bool send_and_receive(string&& data);
void handle_message_event(parser_t& channel, parser_t& value);
void handle_subscribe_event(parser_t& channel, parser_t& clients);
Expand All @@ -52,18 +55,18 @@ namespace async_redis
void handle_event(parser_t&& request);
void report_disconnect();

void stream_received(ssize_t len);
void stream_received(const asio::error_code& ec, size_t len);

private:
parser_t parser_;
std::unordered_map<std::string, watcher_cb_t> watchers_;
std::unordered_map<std::string, watcher_cb_t> pwatchers_;

std::unique_ptr<async_socket> socket_;
event_loop::event_loop_ev &io_;
asio::ip::tcp::socket socket_;
enum { max_data_size = 1024 };
char data_[max_data_size];
bool is_watching_ = false;
bool is_connected_ = false;
};

}
10 changes: 5 additions & 5 deletions includes/async_redis/parser/base_resp_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ namespace async_redis {
virtual int parse_append(const char*, ssize_t, bool&) = 0;
virtual std::string to_string() const = 0;
virtual void map(const caller_t &fn);
bool is_array() const;
bool is_number() const;
bool is_string() const;
bool is_enum() const;
bool is_error() const;
bool is_array() const noexcept;
bool is_number() const noexcept;
bool is_string() const noexcept;
bool is_enum() const noexcept;
bool is_error() const noexcept;

void print();
};
Expand Down
33 changes: 10 additions & 23 deletions includes/async_redis/redis_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ namespace async_redis

class redis_client
{
redis_client(const redis_client&) = delete;
redis_client& operator = (const redis_client&) = delete;

using reply_cb_t = connection::reply_cb_t;
using connect_cb_t = network::async_socket::connect_handler_t;
using connect_cb_t = connection::connect_handler_t;

public:
class connect_exception : std::exception {};
using parser_t = connection::parser_t;

redis_client(event_loop::event_loop_ev &eventIO, int n = 1);
bool is_connected() const;
redis_client(asio::io_context &io) noexcept;
bool is_connected() const noexcept;

template <typename ...Args>
void connect(const connect_cb_t& handler, Args... args) {
for(auto &conn : conn_pool_)
conn->connect(std::bind(&redis_client::check_conn_connected, this, handler, std::placeholders::_1), args...);
conn_.connect(handler, args...);
}

void disconnect();
Expand All @@ -39,27 +40,13 @@ namespace async_redis
void ping(reply_cb_t reply);
void publish(const string& channel, const string& msg, reply_cb_t&& reply);
void sort(const string& hash_name, std::vector<string>&& fields, reply_cb_t&& reply);

//TODO: wtf?! doesnt make sense with multiple connections!
// void select(uint catalog, reply_cb_t&& reply) {
// send({"select", std::to_string(catalog)}, reply);
// }

void commit_pipeline();
redis_client& pipeline_on();
redis_client& pipeline_off();
void select(uint catalog, reply_cb_t&& reply);

private:
void send(const std::vector<string>&& elems, const reply_cb_t& reply);
connection& get_connection();
void check_conn_connected(const connect_cb_t& handler, bool res);
bool send(std::vector<string>&& elems, const reply_cb_t& reply) noexcept;

private:
std::string pipeline_buffer_;
bool pipelined_state_ = false;
std::vector<reply_cb_t> pipelined_cbs_;
std::vector<std::unique_ptr<connection>> conn_pool_;
int con_rr_ctr_ = 0;
connection conn_;
int connected_called_ = 0;
bool is_connected_ = false;
};
Expand Down
10 changes: 3 additions & 7 deletions includes/async_redis/sentinel.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#pragma once
#include <libevpp/network/async_socket.hpp>
#include <async_redis/parser/base_resp_parser.h>

#include <async_redis/monitor.hpp>
#include <functional>
#include <async_redis/monitor.hpp>
#include <async_redis/connection.hpp>

using namespace libevpp;

namespace async_redis {
class sentinel
{
using socket_t = libevpp::network::async_socket;
using connect_cb_t = socket_t::connect_handler_t;
using connect_cb_t = connection::connect_handler_t;

public:
using parser_t = parser::base_resp_parser::parser;
Expand All @@ -22,7 +18,7 @@ namespace async_redis {
Watching
};

sentinel(event_loop::event_loop_ev &event_loop);
sentinel(asio::io_context &io);

bool is_connected() const;
bool connect(const string& ip, int port, connect_cb_t&& connector);
Expand Down
1 change: 1 addition & 0 deletions libs/asio
Submodule asio added at 14db63
1 change: 0 additions & 1 deletion libs/libevpp
Submodule libevpp deleted from 906c1f
Loading