Skip to content

Commit

Permalink
apacheGH-37434: [C++] IO: Refactor BufferedInputStream::Read for smal…
Browse files Browse the repository at this point in the history
…l input (apache#37460)

### Rationale for this change

If we Set BufferSize == 100k, and read 3k bytes per IO. When we read the 34 times, the IO would be (99k, 102k]

In Read, it will read buffered (99k, 100k], issue IO for (100k, 102k]. Rather than (100k, 200k]. 

### What changes are included in this PR?

Refactor `BufferedInputStream::Read` to optimize small IO.

### Are these changes tested?

Already has tests?

### Are there any user-facing changes?

User might get io-pattern changed. It can be optimization or downgrade.

* Closes: apache#37434

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
mapleFU and pitrou authored Aug 31, 2023
1 parent 3c2e74c commit 84583d6
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 46 deletions.
88 changes: 49 additions & 39 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,23 +342,28 @@ class BufferedInputStream::Impl : public BufferedBase {
buffer_pos_ = bytes_buffered_ = 0;
}

Status BufferIfNeeded() {
if (bytes_buffered_ == 0) {
// Fill buffer
if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}
Status DoBuffer() {
// Fill buffer
if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}

int64_t bytes_to_buffer = buffer_size_;
if (raw_read_bound_ >= 0) {
bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ - raw_read_total_);
}
ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer, buffer_data_));
buffer_pos_ = 0;
raw_read_total_ += bytes_buffered_;
int64_t bytes_to_buffer = buffer_size_;
if (raw_read_bound_ >= 0) {
bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ - raw_read_total_);
}
ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer, buffer_data_));
buffer_pos_ = 0;
raw_read_total_ += bytes_buffered_;

// Do not make assumptions about the raw stream position
raw_pos_ = -1;
// Do not make assumptions about the raw stream position
raw_pos_ = -1;
return Status::OK();
}

Status BufferIfNeeded() {
if (bytes_buffered_ == 0) {
return DoBuffer();
}
return Status::OK();
}
Expand All @@ -373,33 +378,38 @@ class BufferedInputStream::Impl : public BufferedBase {
return Status::Invalid("Bytes to read must be positive. Received:", nbytes);
}

if (nbytes < buffer_size_) {
// Pre-buffer for small reads
RETURN_NOT_OK(BufferIfNeeded());
// 1. First consume pre-buffered data.
int64_t pre_buffer_copy_bytes = std::min(nbytes, bytes_buffered_);
if (pre_buffer_copy_bytes > 0) {
memcpy(out, buffer_data_ + buffer_pos_, pre_buffer_copy_bytes);
ConsumeBuffer(pre_buffer_copy_bytes);
}

if (nbytes > bytes_buffered_) {
// Copy buffered bytes into out, then read rest
memcpy(out, buffer_data_ + buffer_pos_, bytes_buffered_);

int64_t bytes_to_read = nbytes - bytes_buffered_;
if (raw_read_bound_ >= 0) {
bytes_to_read = std::min(bytes_to_read, raw_read_bound_ - raw_read_total_);
}
ARROW_ASSIGN_OR_RAISE(
int64_t bytes_read,
raw_->Read(bytes_to_read, reinterpret_cast<uint8_t*>(out) + bytes_buffered_));
int64_t remaining_bytes = nbytes - pre_buffer_copy_bytes;
if (raw_read_bound_ >= 0) {
remaining_bytes = std::min(remaining_bytes, raw_read_bound_ - raw_read_total_);
}
if (remaining_bytes == 0) {
return pre_buffer_copy_bytes;
}
DCHECK_EQ(0, bytes_buffered_);

// 2. Read from storage.
if (remaining_bytes >= buffer_size_) {
// 2.1. If read is larger than buffer size, read directly from storage.
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
raw_->Read(remaining_bytes, reinterpret_cast<uint8_t*>(out) +
pre_buffer_copy_bytes));
raw_read_total_ += bytes_read;

// Do not make assumptions about the raw stream position
raw_pos_ = -1;
bytes_read += bytes_buffered_;
RewindBuffer();
return bytes_read;
return pre_buffer_copy_bytes + bytes_read;
} else {
memcpy(out, buffer_data_ + buffer_pos_, nbytes);
ConsumeBuffer(nbytes);
return nbytes;
// 2.2. If read is smaller than buffer size, fill buffer and copy from buffer.
RETURN_NOT_OK(DoBuffer());
int64_t bytes_copy_after_buffer = std::min(bytes_buffered_, remaining_bytes);
memcpy(reinterpret_cast<uint8_t*>(out) + pre_buffer_copy_bytes,
buffer_data_ + buffer_pos_, bytes_copy_after_buffer);
ConsumeBuffer(bytes_copy_after_buffer);
return pre_buffer_copy_bytes + bytes_copy_after_buffer;
}
}

Expand Down Expand Up @@ -432,7 +442,7 @@ class BufferedInputStream::Impl : public BufferedBase {
BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw,
MemoryPool* pool,
int64_t raw_total_bytes_bound) {
impl_.reset(new Impl(std::move(raw), pool, raw_total_bytes_bound));
impl_ = std::make_unique<Impl>(std::move(raw), pool, raw_total_bytes_bound);
}

BufferedInputStream::~BufferedInputStream() { internal::CloseFromDestructor(this); }
Expand Down
Loading

0 comments on commit 84583d6

Please sign in to comment.