Skip to content

Commit

Permalink
Adapt to ASIO
Browse files Browse the repository at this point in the history
  • Loading branch information
hamidr committed Jan 12, 2017
2 parents b0bc663 + 3fa1547 commit 255fd5f
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 168 deletions.
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "libs/libevpp"]
path = libs/libevpp
url = https://github.com/hamidr/libevpp
[submodule "libs/asio"]
path = libs/asio
url = https://github.com/chriskohlhoff/asio
23 changes: 15 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@ include_directories(${PROJECT_INCLUDE_DIR})
include_directories(/usr/include/)
include_directories(/usr/local/include)

link_directories(/usr/lib)
link_directories(/usr/local/lib)

add_subdirectory(libs/libevpp/)

include_directories(libs/libevpp/includes)

include_directories(libs/asio/asio/include/)

add_library(parser
${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp
Expand All @@ -54,7 +48,20 @@ if(CMAKE_COMPILER_IS_GNUCXX)
set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary
endif()

target_link_libraries(async_redis network parser)
target_link_libraries(async_redis parser pthread)

install(TARGETS async_redis
LIBRARY DESTINATION /usr/local/lib/
ARCHIVE DESTINATION /usr/local/lib/
)

install(TARGETS parser
LIBRARY DESTINATION /usr/local/lib/
ARCHIVE DESTINATION /usr/local/lib/
)


install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include)

add_executable (a2.out test/main.cpp)
target_link_libraries(a2.out async_redis)
22 changes: 10 additions & 12 deletions includes/async_redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,27 @@

#include <queue>
#include <functional>
#include <memory>
#include <tuple>

#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <async_redis/parser/base_resp_parser.h>
#include <libevpp/event_loop/event_loop_ev.h>
#include <libevpp/network/async_socket.hpp>

using namespace libevpp;

namespace async_redis
{
class connection
{
using async_socket = libevpp::network::async_socket;
connection(const connection&) = delete;
connection& operator = (const connection&) = delete;

public:
using connect_handler_t = std::function<void(bool)>;
using parser_t = parser::base_resp_parser::parser;
using reply_cb_t = std::function<void (parser_t&)>;

connection(event_loop::event_loop_ev& event_loop);
connection(asio::io_context&);

void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
void connect(async_socket::connect_handler_t handler, const std::string& path);
void connect(connect_handler_t handler, const std::string& ip, int port);

bool is_connected() const;
void disconnect();
Expand All @@ -33,14 +31,14 @@ namespace async_redis

private:
void do_read();
void reply_received(ssize_t len);
void reply_received(const asio::error_code& ec, size_t len);

private:
std::unique_ptr<async_socket> socket_;
asio::ip::tcp::socket socket_;
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;

event_loop::event_loop_ev& event_loop_;
enum { max_data_size = 1024 };
char data_[max_data_size];
bool is_connected_ = false;
};
}
23 changes: 13 additions & 10 deletions includes/async_redis/monitor.hpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
#pragma once

#include <libevpp/network/async_socket.hpp>
#include <async_redis/parser/base_resp_parser.h>

#include <unordered_map>
#include <list>
#include <string>

#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>

using std::string;
using namespace libevpp;

namespace async_redis
{
class monitor
{
using async_socket = network::async_socket;
monitor(const monitor&) = delete;
monitor& operator = (const monitor&) = delete;

using connect_handler_t = std::function<void(bool)>;

public:
enum EventState {
Expand All @@ -27,10 +31,8 @@ namespace async_redis
using parser_t = parser::base_resp_parser::parser;
using watcher_cb_t = std::function<void (const string&, parser_t&, EventState)>;

monitor(event_loop::event_loop_ev &event_loop);

void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
void connect(async_socket::connect_handler_t handler, const std::string& path);
monitor(asio::io_context &event_loop);
void connect(connect_handler_t handler, const std::string& ip, int port);

bool is_connected() const;
bool is_watching() const;
Expand All @@ -42,6 +44,7 @@ namespace async_redis
bool punsubscribe(const std::list<string>& channels, watcher_cb_t&& cb);

private:
void do_read();
bool send_and_receive(string&& data);
void handle_message_event(parser_t& channel, parser_t& value);
void handle_subscribe_event(parser_t& channel, parser_t& clients);
Expand All @@ -52,18 +55,18 @@ namespace async_redis
void handle_event(parser_t&& request);
void report_disconnect();

void stream_received(ssize_t len);
void stream_received(const asio::error_code& ec, size_t len);

private:
parser_t parser_;
std::unordered_map<std::string, watcher_cb_t> watchers_;
std::unordered_map<std::string, watcher_cb_t> pwatchers_;

std::unique_ptr<async_socket> socket_;
event_loop::event_loop_ev &io_;
asio::ip::tcp::socket socket_;
enum { max_data_size = 1024 };
char data_[max_data_size];
bool is_watching_ = false;
bool is_connected_ = false;
};

}
7 changes: 5 additions & 2 deletions includes/async_redis/redis_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ namespace async_redis

class redis_client
{
redis_client(const redis_client&) = delete;
redis_client& operator = (const redis_client&) = delete;

using reply_cb_t = connection::reply_cb_t;
using connect_cb_t = network::async_socket::connect_handler_t;
using connect_cb_t = connection::connect_handler_t;

public:
class connect_exception : std::exception {};
using parser_t = connection::parser_t;

redis_client(event_loop::event_loop_ev &eventIO, int n = 1);
redis_client(asio::io_context &io, uint n = 1);
bool is_connected() const;

template <typename ...Args>
Expand Down
10 changes: 3 additions & 7 deletions includes/async_redis/sentinel.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#pragma once
#include <libevpp/network/async_socket.hpp>
#include <async_redis/parser/base_resp_parser.h>

#include <async_redis/monitor.hpp>
#include <functional>
#include <async_redis/monitor.hpp>
#include <async_redis/connection.hpp>

using namespace libevpp;

namespace async_redis {
class sentinel
{
using socket_t = libevpp::network::async_socket;
using connect_cb_t = socket_t::connect_handler_t;
using connect_cb_t = connection::connect_handler_t;

public:
using parser_t = parser::base_resp_parser::parser;
Expand All @@ -22,7 +18,7 @@ namespace async_redis {
Watching
};

sentinel(event_loop::event_loop_ev &event_loop);
sentinel(asio::io_context &io);

bool is_connected() const;
bool connect(const string& ip, int port, connect_cb_t&& connector);
Expand Down
1 change: 1 addition & 0 deletions libs/asio
Submodule asio added at 14db63
1 change: 0 additions & 1 deletion libs/libevpp
Submodule libevpp deleted from 906c1f
81 changes: 41 additions & 40 deletions src/connection.cpp
Original file line number Diff line number Diff line change
@@ -1,70 +1,59 @@
#include "../includes/async_redis/connection.hpp"

#include <queue>
#include <functional>
#include <memory>
#include <tuple>

#include <async_redis/parser/base_resp_parser.h>
#include <libevpp/network/tcp_socket.hpp>
#include <libevpp/network/unix_socket.hpp>


namespace async_redis
{

using tcp_socket = network::tcp_socket;
using unix_socket = network::unix_socket;

connection::connection(event_loop::event_loop_ev& event_loop)
: event_loop_(event_loop) {
}
connection::connection(asio::io_context& io)
: socket_(io)
{ }

void connection::connect(async_socket::connect_handler_t handler, const std::string& ip, int port)
void connection::connect(connect_handler_t handler, const std::string& ip, int port)
{
if (!socket_ || !socket_->is_valid())
socket_ = std::make_unique<tcp_socket>(event_loop_);
asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip), port);

static_cast<tcp_socket*>(socket_.get())->async_connect(ip, port, handler);
}

void connection::connect(async_socket::connect_handler_t handler, const std::string& path)
{
if (!socket_ || !socket_->is_valid())
socket_ = std::make_unique<unix_socket>(event_loop_);

static_cast<unix_socket*>(socket_.get())->async_connect(path, handler);
socket_.async_connect(endpoint, [handler, this](const asio::error_code &ec) {
is_connected_ = !ec;
handler(!ec);
});
}

bool connection::is_connected() const
{
return socket_ && socket_->is_connected();
return is_connected_;
}

void connection::disconnect()
{
socket_->close();
socket_.close();
//TODO: check the policy! Should we free queue or retry again?
decltype(req_queue_) free_me;
free_me.swap(req_queue_);
is_connected_ = false;
}

bool connection::pipelined_send(std::string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks)
{
if (!is_connected())
return false;

return
socket_->async_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) {
if (sent_chunk_len == 0)
socket_.async_send(asio::buffer(pipelined_cmds.data(), pipelined_cmds.length()),
[this, cbs = std::move(callbacks)](const asio::error_code &ec, size_t len)
{
if (ec)
return disconnect();

if (!req_queue_.size() && cbs.size())
do_read();

for(auto &&cb : cbs)
req_queue_.emplace(std::move(cb), nullptr);
});
}
);

return true;
}

bool connection::send(const std::string&& command, const reply_cb_t& reply_cb)
Expand All @@ -75,24 +64,36 @@ bool connection::send(const std::string&& command, const reply_cb_t& reply_cb)
bool read_it = !req_queue_.size();
req_queue_.emplace(reply_cb, nullptr);

return
socket_->async_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) {
if (sent_chunk_len == 0)
socket_.async_send(asio::buffer(command.data(), command.length()),
[this, read_it](const asio::error_code &ec, size_t len)
{
// std::cout << ec << std::endl;
if (ec)
return disconnect();

if (read_it)
do_read();
});
if (read_it)
do_read();
}
);
return true;
}

void connection::do_read()
{
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
socket_.async_read_some(
asio::buffer(data_, max_data_size),
std::bind(
&connection::reply_received,
this,
std::placeholders::_1,
std::placeholders::_2
)
);
}

void connection::reply_received(ssize_t len)
void connection::reply_received(const asio::error_code& ec, size_t len)
{
if (len == 0)
if (ec)
return disconnect();

ssize_t acc = 0;
Expand Down
Loading

0 comments on commit 255fd5f

Please sign in to comment.