From f45c76f14b11bbb98a40b7a2b216a640d809a7af Mon Sep 17 00:00:00 2001 From: GiantKing Date: Mon, 14 Mar 2022 20:32:31 +0800 Subject: [PATCH] feat(direct_io): download files from hdfs with direct I/O (#1069) --- src/block_service/directio_writable_file.cpp | 132 +++++++++++++++++++ src/block_service/directio_writable_file.h | 53 ++++++++ src/block_service/hdfs/CMakeLists.txt | 6 +- src/block_service/hdfs/hdfs_service.cpp | 39 ++++-- 4 files changed, 221 insertions(+), 9 deletions(-) create mode 100644 src/block_service/directio_writable_file.cpp create mode 100644 src/block_service/directio_writable_file.h diff --git a/src/block_service/directio_writable_file.cpp b/src/block_service/directio_writable_file.cpp new file mode 100644 index 0000000000..c0cf6e6b68 --- /dev/null +++ b/src/block_service/directio_writable_file.cpp @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include // posix_memalign +#include +#include +#include +#include // getpagesize + +#include +#include + +#include "block_service/directio_writable_file.h" + +namespace dsn { +namespace dist { +namespace block_service { + +DSN_DEFINE_uint32("replication", + direct_io_buffer_pages, + 64, + "Number of pages we need to set to direct io buffer"); +DSN_TAG_VARIABLE(direct_io_buffer_pages, FT_MUTABLE); + +DSN_DEFINE_bool("replication", + enable_direct_io, + false, + "Whether to enable direct I/O when download files"); +DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE); + +const uint32_t g_page_size = getpagesize(); + +direct_io_writable_file::direct_io_writable_file(const std::string &file_path) + : _file_path(file_path), + _fd(-1), + _file_size(0), + _buffer(nullptr), + _buffer_size(FLAGS_direct_io_buffer_pages * g_page_size), + _offset(0) +{ +} + +direct_io_writable_file::~direct_io_writable_file() +{ + if (!_buffer || _fd < 0) { + return; + } + // Here is an ensurance, users shuold call finalize manually + dassert(_offset == 0, "finalize() should be called before destructor"); + + free(_buffer); + close(_fd); +} + +bool direct_io_writable_file::initialize() +{ + if (posix_memalign(&_buffer, g_page_size, _buffer_size) != 0) { + derror_f("Allocate memaligned buffer failed, errno = {}", errno); + return false; + } + + int flag = O_WRONLY | O_TRUNC | O_CREAT | O_DIRECT; + _fd = open(_file_path.c_str(), flag, S_IRUSR | S_IWUSR | S_IRGRP); + if (_fd < 0) { + derror_f("Failed to open {} with flag {}, errno = {}", _file_path, flag, errno); + free(_buffer); + _buffer = nullptr; + return false; + } + return true; +} + +bool direct_io_writable_file::finalize() +{ + dassert(_buffer && _fd >= 0, "Initialize the instance first"); + + if (_offset > 0) { + if (::write(_fd, _buffer, _buffer_size) != _buffer_size) { + derror_f("Failed to write last chunk, filie_path = {}, errno = {}", _file_path, errno); + return false; + } + _offset = 0; + ftruncate(_fd, _file_size); + } + return true; +} + +bool direct_io_writable_file::write(const char *s, size_t n) +{ + dassert(_buffer && _fd >= 0, "Initialize the instance first"); + + uint32_t remaining = n; + while (remaining > 0) { + uint32_t bytes = std::min((_buffer_size - _offset), remaining); + memcpy((char *)_buffer + _offset, s, bytes); + _offset += bytes; + remaining -= bytes; + s += bytes; + // buffer is full, flush to file + if (_offset == _buffer_size) { + if (::write(_fd, _buffer, _buffer_size) != _buffer_size) { + derror_f("Failed to write to direct_io_writable_file, errno = {}", errno); + return false; + } + // reset offset + _offset = 0; + } + } + _file_size += n; + return true; +} + +} // namespace block_service +} // namespace dist +} // namespace dsn diff --git a/src/block_service/directio_writable_file.h b/src/block_service/directio_writable_file.h new file mode 100644 index 0000000000..b690f13f67 --- /dev/null +++ b/src/block_service/directio_writable_file.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace dsn { +namespace dist { +namespace block_service { + +class direct_io_writable_file +{ +public: + explicit direct_io_writable_file(const std::string &file_path); + ~direct_io_writable_file(); + + bool initialize(); + bool write(const char *s, size_t n); + bool finalize(); + +private: + DISALLOW_COPY_AND_ASSIGN(direct_io_writable_file); + + std::string _file_path; + int _fd; + uint32_t _file_size; + + // page size aligned buffer + void *_buffer; + // buffer size + uint32_t _buffer_size; + // buffer offset + uint32_t _offset; +}; + +} // namespace block_service +} // namespace dist +} // namespace dsn diff --git a/src/block_service/hdfs/CMakeLists.txt b/src/block_service/hdfs/CMakeLists.txt index fdbbafa919..324c20486c 100644 --- a/src/block_service/hdfs/CMakeLists.txt +++ b/src/block_service/hdfs/CMakeLists.txt @@ -1,8 +1,12 @@ set(MY_PROJ_NAME dsn.block_service.hdfs) +set(DIRECTIO_SRC + ../directio_writable_file.cpp + ) + #Source files under CURRENT project directory will be automatically included. #You can manually set MY_PROJ_SRC to include source files under other directories. -set(MY_PROJ_SRC "") +set(MY_PROJ_SRC "${DIRECTIO_SRC}") #Search mode for source files under CURRENT project directory ? #"GLOB_RECURSE" for recursive search diff --git a/src/block_service/hdfs/hdfs_service.cpp b/src/block_service/hdfs/hdfs_service.cpp index fd118edd71..e7985e9302 100644 --- a/src/block_service/hdfs/hdfs_service.cpp +++ b/src/block_service/hdfs/hdfs_service.cpp @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include "hdfs_service.h" - #include #include @@ -31,6 +29,9 @@ #include #include +#include "hdfs_service.h" +#include "block_service/directio_writable_file.h" + namespace dsn { namespace dist { namespace block_service { @@ -55,6 +56,8 @@ DSN_DEFINE_uint64("replication", "hdfs write batch size, the default value is 64MB"); DSN_TAG_VARIABLE(hdfs_write_batch_size_bytes, FT_MUTABLE); +DSN_DECLARE_bool(enable_direct_io); + hdfs_service::hdfs_service() { _read_token_bucket.reset(new folly::DynamicTokenBucket()); @@ -497,13 +500,33 @@ dsn::task_ptr hdfs_file_object::download(const download_request &req, resp.err = read_data_in_batches(req.remote_pos, req.remote_length, read_buffer, read_length); if (resp.err == ERR_OK) { - std::ofstream out(req.output_local_name, - std::ios::binary | std::ios::out | std::ios::trunc); - if (out.is_open()) { - out.write(read_buffer.c_str(), read_length); - out.close(); - resp.downloaded_size = read_length; + bool write_succ = false; + if (FLAGS_enable_direct_io) { + auto dio_file = std::make_unique(req.output_local_name); + do { + if (!dio_file->initialize()) { + break; + } + bool wr_ret = dio_file->write(read_buffer.c_str(), read_length); + if (!wr_ret) { + break; + } + if (dio_file->finalize()) { + resp.downloaded_size = read_length; + write_succ = true; + } + } while (0); } else { + std::ofstream out(req.output_local_name, + std::ios::binary | std::ios::out | std::ios::trunc); + if (out.is_open()) { + out.write(read_buffer.c_str(), read_length); + out.close(); + resp.downloaded_size = read_length; + write_succ = true; + } + } + if (!write_succ) { derror_f("HDFS download failed: fail to open localfile {} when download {}, " "error: {}", req.output_local_name,