Skip to content

Commit

Permalink
apacheGH-34063: [C++] Avoid waste in GcsFileSystem::ReadAt() (apach…
Browse files Browse the repository at this point in the history
…e#34065)

With this change `GcsFileSystem::ReadAt()` will only request *exactly* the bytes it needs.

Fixes apache#34063.

### Are these changes tested?

There are existing tests for `ReadAt()` this is just a performance change.

### Are there any user-facing changes?

No.

* Closes: apache#34063

Authored-by: Carlos O'Ryan <coryan@google.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
coryan authored and sjperkins committed Feb 10, 2023
1 parent 1896406 commit a080977
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class GcsOutputStream : public arrow::io::OutputStream {
};

using InputStreamFactory = std::function<Result<std::shared_ptr<GcsInputStream>>(
gcs::Generation, gcs::ReadFromOffset)>;
gcs::Generation, gcs::ReadRange, gcs::ReadFromOffset)>;

class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
Expand Down Expand Up @@ -298,31 +298,34 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
if (closed()) return Status::Invalid("Cannot read from closed file");
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
gcs::ReadRange(position, position + nbytes),
gcs::ReadFromOffset()));
return stream->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot read from closed file");
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream, factory_(gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
gcs::ReadRange(position, position + nbytes),
gcs::ReadFromOffset()));
return stream->Read(nbytes);
}
//@}

// from Seekable
Status Seek(int64_t position) override {
if (closed()) return Status::Invalid("Cannot seek in a closed file");
ARROW_ASSIGN_OR_RAISE(stream_, factory_(gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset(position)));
ARROW_ASSIGN_OR_RAISE(
stream_, factory_(gcs::Generation(metadata_.generation()), gcs::ReadRange(),
gcs::ReadFromOffset(position)));
return Status::OK();
}

private:
Status InitializeStream() const {
if (!stream_) {
ARROW_ASSIGN_OR_RAISE(stream_, factory_(gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset()));
gcs::ReadRange(), gcs::ReadFromOffset()));
}
return Status::OK();
}
Expand Down Expand Up @@ -632,8 +635,9 @@ class GcsFileSystem::Impl {

Result<std::shared_ptr<GcsInputStream>> OpenInputStream(const GcsPath& path,
gcs::Generation generation,
gcs::ReadRange range,
gcs::ReadFromOffset offset) {
auto stream = client_.ReadObject(path.bucket, path.object, generation, offset);
auto stream = client_.ReadObject(path.bucket, path.object, generation, range, offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream), path, generation, client_);
}
Expand Down Expand Up @@ -921,7 +925,8 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const std::string& path) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadRange(),
gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
Expand All @@ -932,7 +937,8 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
}
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadRange(),
gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
Expand All @@ -941,9 +947,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto impl = impl_;
auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, offset);
auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadRange range,
gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, range, offset);
};

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
Expand All @@ -960,8 +966,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, offset);
auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadRange range,
gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, range, offset);
};
return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata));
Expand Down

0 comments on commit a080977

Please sign in to comment.