Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions src/cache/blockcache/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,59 +28,70 @@

#include <ostream>

#include "cache/common/closure.h"
#include "cache/common/context.h"
#include "common/io_buffer.h"
#include "common/status.h"

namespace dingofs {
namespace cache {

struct Aio : public Closure {
class Aio {
public:
struct Attr {
ContextSPtr ctx;
int fd;
off_t offset;
size_t length;
char* buffer;
int buf_index;
bool for_read;
};

struct Result {
Status status;
};

Aio(ContextSPtr ctx, int fd, off_t offset, size_t length, char* buffer,
int buf_index, bool for_read)
: ctx(ctx),
fd(fd),
offset(offset),
length(length),
buffer(buffer),
buf_index(buf_index),
for_read(for_read) {
: attr_({ctx, fd, offset, length, buffer, buf_index, for_read}),
result_({Status::Unknown("unknown error")}) {
CHECK_GE(fd, 0);
CHECK_GE(offset, 0);
CHECK_GT(length, 0);
CHECK_NOTNULL(buffer);
}

const struct Attr& Attr() const { return attr_; }
const struct Result& Result() const { return result_; }
struct Result& Result() { return result_; }

void Wait() {
std::unique_lock<bthread::Mutex> lk(mutex);
while (!finish) {
cond.wait(lk);
std::unique_lock<bthread::Mutex> lk(mutex_);
while (!finish_) {
cond_.wait(lk);
}
}

void Run() override {
std::lock_guard<bthread::Mutex> lk(mutex);
finish = true;
cond.notify_one();
void Run() {
std::lock_guard<bthread::Mutex> lk(mutex_);
finish_ = true;
cond_.notify_one();
}

ContextSPtr ctx;
int fd;
off_t offset;
size_t length;
char* buffer;
int buf_index;
bool for_read;

bool finish{false};
bthread::Mutex mutex;
bthread::ConditionVariable cond;
private:
const struct Attr attr_;
struct Result result_;
bool finish_{false};
bthread::Mutex mutex_;
bthread::ConditionVariable cond_;
};

inline std::ostream& operator<<(std::ostream& os, const Aio& aio) {
os << "Aio{" << (aio.for_read ? "read" : "write") << " fd=" << aio.fd
<< " offset=" << aio.offset << " length=" << aio.length
<< " buf_index=" << aio.buf_index << "}";
const auto& attr = aio.Attr();
os << "Aio{op=" << (attr.for_read ? "read" : "write") << " fd=" << attr.fd
<< " offset=" << attr.offset << " length=" << attr.length
<< " buffer=" << static_cast<void*>(attr.buffer)
<< " buf_index=" << attr.buf_index << "}";
return os;
}

Expand Down
9 changes: 5 additions & 4 deletions src/cache/blockcache/aio_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ DEFINE_uint32(iodepth, 128, "aio queue maximum iodepth");
AioQueue::AioQueue(const std::vector<iovec>& fixed_write_buffers,
const std::vector<iovec>& fixed_read_buffers)
: running_(false),
io_uring_(
std::make_unique<IOUring>(fixed_write_buffers, fixed_read_buffers)),
io_uring_(std::make_unique<IOUring>(
IOUringOptions{.fixed_write_buffers = fixed_write_buffers,
.fixed_read_buffers = fixed_read_buffers})),
prep_io_queue_id_({0}) {}

Status AioQueue::Start() {
Expand Down Expand Up @@ -155,12 +156,12 @@ void AioQueue::BackgroundWait() {

void AioQueue::OnError(Aio* aio, Status status) {
LOG(ERROR) << "Fail to run " << *aio;
aio->status() = status;
aio->Result().status = std::move(status);
RunClosure(aio);
}

void AioQueue::OnComplete(Aio* aio) {
if (!aio->status().ok()) {
if (!aio->Result().status.ok()) {
LOG(ERROR) << "Fail to run " << *aio;
}
RunClosure(aio);
Expand Down
187 changes: 119 additions & 68 deletions src/cache/blockcache/io_uring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,30 @@

#include "cache/blockcache/io_uring.h"

#include <absl/strings/str_format.h>
#include <butil/memory/aligned_memory.h>
#include <butil/memory/scope_guard.h>
#include <glog/logging.h>
#include <liburing.h>
#include <unistd.h>

#include <atomic>
#include <cstdint>
#include <cstring>

#include "cache/blockcache/aio.h"
#include "cache/common/macro.h"
#include "common/io_buffer.h"
#include "common/options/cache.h"
#include "common/status.h"

namespace dingofs {
namespace cache {

IOUring::IOUring(const std::vector<iovec>& fixed_write_buffers,
const std::vector<iovec>& fixed_read_buffers)
: running_(false), io_uring_(), epoll_fd_(-1) {
// NOTE: only call register_buffers once, so we need to merge the buffers
fixed_buffers_.reserve(fixed_write_buffers.size() +
fixed_read_buffers.size());
fixed_buffers_.insert(fixed_buffers_.end(), fixed_write_buffers.begin(),
fixed_write_buffers.end());
fixed_buffers_.insert(fixed_buffers_.end(), fixed_read_buffers.begin(),
fixed_read_buffers.end());
write_buf_index_offset_ = 0;
read_buf_index_offset_ = fixed_write_buffers.size();
}
IOUring::IOUring(IOUringOptions options)
: options_(std::move(options)),
running_(false),
io_uring_(),
fixed_buffers_(std::make_unique<FixedBuffers>(
std::move(options_.fixed_write_buffers),
std::move(options_.fixed_read_buffers))),
epoll_fd_(-1) {}

bool IOUring::Supported() {
struct io_uring ring;
int rc = io_uring_queue_init(16, &ring, 0);
if (rc < 0) {
LOG(ERROR) << "Fail to init io_uring_queue: " << strerror(-rc);
return false;
}

io_uring_queue_exit(&ring);
return true;
}
IOUring::~IOUring() = default;

Status IOUring::Start() {
if (running_.load(std::memory_order_relaxed)) {
Expand All @@ -79,73 +60,137 @@ Status IOUring::Start() {
return Status::NotSupport("not support io_uring");
}

int flags = IORING_SETUP_SQPOLL;
int rc = io_uring_queue_init(FLAGS_iodepth * 2, &io_uring_, flags);
Status status = InitIOUring();
if (!status.ok()) {
return status;
}

BRPC_SCOPE_EXIT {
if (!status.ok()) {
Cleanup();
}
};

status = RegisterBuffers();
if (!status.ok()) {
return status;
}

status = SetupEpoll();
if (!status.ok()) {
return status;
}

running_.store(true, std::memory_order_relaxed);
LOG(INFO) << "Successfully started " << *this;
return Status::OK();
}

Status IOUring::Shutdown() {
if (!running_.load(std::memory_order_relaxed)) {
LOG(WARNING) << "IOUring already shutdown";
return Status::OK();
}

LOG(INFO) << "IOUring is shutting down...";

Cleanup();

running_.store(false, std::memory_order_relaxed);
LOG(INFO) << "Successfully shutdown IOUring";
return Status::OK();
}

bool IOUring::Supported() {
struct io_uring ring;
int rc = io_uring_queue_init(16, &ring, 0);
if (rc < 0) {
LOG(ERROR) << "Fail to init io_uring_queue: " << strerror(-rc);
return false;
}

io_uring_queue_exit(&ring);
return true;
}

Status IOUring::InitIOUring() {
int flags = options_.use_sqpoll ? IORING_SETUP_SQPOLL : 0;

int rc = io_uring_queue_init(options_.entries, &io_uring_, flags);
if (rc != 0) {
LOG(ERROR) << "Fail to init io_uring queue: " << strerror(-rc);
return Status::Internal("init io_uring failed");
}
return Status::OK();
}

Status IOUring::RegisterBuffers() {
auto& buffers = fixed_buffers_->buffers;
if (buffers.empty()) {
return Status::OK();
}

rc = io_uring_register_buffers(&io_uring_, fixed_buffers_.data(),
fixed_buffers_.size());
int rc =
io_uring_register_buffers(&io_uring_, buffers.data(), buffers.size());
if (rc < 0) {
LOG(ERROR) << "Fail to register buffers: " << strerror(-rc);
return Status::Internal("register buffers failed");
}
return Status::OK();
}

Status IOUring::SetupEpoll() {
epoll_fd_ = epoll_create1(0);
if (epoll_fd_ <= 0) {
if (epoll_fd_ < 0) {
PLOG(ERROR) << "Fail to create epoll";
return Status::Internal("create epoll failed");
}

struct epoll_event ev;
ev.events = EPOLLIN;
rc = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, io_uring_.ring_fd, &ev);
ev.data.fd = io_uring_.ring_fd;
int rc = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, io_uring_.ring_fd, &ev);
if (rc != 0) {
PLOG(ERROR) << "Fail to add epoll event";
return Status::Internal("add epoll event failed");
}

running_.store(true, std::memory_order_relaxed);
LOG(INFO) << "IOUring{iodepth=" << FLAGS_iodepth << "} is up";
return Status::OK();
}

Status IOUring::Shutdown() {
if (!running_.load(std::memory_order_relaxed)) {
LOG(WARNING) << "IOUring already shutdown";
return Status::OK();
void IOUring::Cleanup() {
if (!fixed_buffers_->buffers.empty()) {
io_uring_unregister_buffers(&io_uring_);
}

LOG(INFO) << "IOUring is shutting down...";

io_uring_queue_exit(&io_uring_);
io_uring_unregister_buffers(&io_uring_);
epoll_fd_ = -1;

running_.store(false, std::memory_order_relaxed);
LOG(INFO) << "IOUring is down";
return Status::OK();
if (epoll_fd_ >= 0) {
close(epoll_fd_);
epoll_fd_ = -1;
}
}

void IOUring::PrepWrite(io_uring_sqe* sqe, Aio* aio) const {
if (aio->buf_index >= 0) {
io_uring_prep_write_fixed(sqe, aio->fd, aio->buffer, aio->length,
aio->offset,
aio->buf_index + write_buf_index_offset_);
const auto& attr = aio->Attr();
if (attr.buf_index >= 0) {
int idx = fixed_buffers_->GetIndex(false, attr.buf_index);
CHECK_GE(idx, 0) << "Invalid write buffer index " << attr.buf_index;
io_uring_prep_write_fixed(sqe, attr.fd, attr.buffer, attr.length,
attr.offset, idx);
} else {
io_uring_prep_write(sqe, aio->fd, aio->buffer, aio->length, aio->offset);
io_uring_prep_write(sqe, attr.fd, attr.buffer, attr.length, attr.offset);
}
}

void IOUring::PrepRead(io_uring_sqe* sqe, Aio* aio) const {
if (aio->buf_index >= 0) {
io_uring_prep_read_fixed(sqe, aio->fd, aio->buffer, aio->length,
aio->offset,
aio->buf_index + read_buf_index_offset_);
const auto& attr = aio->Attr();
if (attr.buf_index >= 0) {
int idx = fixed_buffers_->GetIndex(true, attr.buf_index);
CHECK_GE(idx, 0) << "Invalid read buffer index " << attr.buf_index;
io_uring_prep_read_fixed(sqe, attr.fd, attr.buffer, attr.length,
attr.offset, idx);
} else {
io_uring_prep_read(sqe, aio->fd, aio->buffer, aio->length, aio->offset);
io_uring_prep_read(sqe, attr.fd, attr.buffer, attr.length, attr.offset);
}
}

Expand All @@ -155,7 +200,7 @@ Status IOUring::PrepareIO(Aio* aio) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&io_uring_);
CHECK_NOTNULL(sqe);

if (aio->for_read) {
if (aio->Attr().for_read) {
PrepRead(sqe, aio);
} else {
PrepWrite(sqe, aio);
Expand Down Expand Up @@ -202,20 +247,26 @@ int IOUring::WaitIO(uint64_t timeout_ms, Aio* completed_aios[]) {
}

void IOUring::OnComplete(Aio* aio, int result) {
const auto& attr = aio->Attr();
Status status;
if (result < 0) {
status = Status::IoError(strerror(-result));
LOG(ERROR) << "Fail to execute " << aio << ": " << strerror(-result);
} else if (result != aio->length) {
status = Status::IoError("unexpected io length");
LOG(ERROR) << "Fail to execute " << aio << ", want "
<< (aio->for_read ? "read" : "write") << aio->length
<< " bytes but got " << result << " bytes";
LOG(ERROR) << *aio << " failed: " << strerror(-result);
} else if (result != static_cast<int>(attr.length)) {
status = Status::IoError("short io");
LOG(ERROR) << *aio << " incomplete: expected " << attr.length
<< " bytes, got " << result;
} else {
status = Status::OK();
VLOG(3) << *aio << " completed successfully";
}
aio->Result().status = std::move(status);
}

aio->status() = status;
std::ostream& operator<<(std::ostream& os, const IOUring& r) {
return os << "IOUring{entries=" << r.options_.entries
<< " sqpoll=" << r.options_.use_sqpoll
<< " fixed_buffers=" << r.fixed_buffers_->buffers.size() << "}";
}

} // namespace cache
Expand Down
Loading
Loading