diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index cb0c7a2852af4..76df1cb20d34d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1018,8 +1018,9 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader, } Result ReadNext() override { - auto collect_listener = static_cast(raw_listener()); - while (collect_listener->num_record_batches() == 0) { + auto collect_listener = checked_cast(raw_listener()); + while (collect_listener->num_record_batches() == 0 && + state() != StreamDecoderInternal::State::EOS) { ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage()); if (!message) { // End of stream if (state() == StreamDecoderInternal::State::INITIAL_DICTIONARIES) { diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 4b4e0b9057d67..edc25608542f1 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -313,15 +313,17 @@ class ARROW_EXPORT CollectListener : public Listener { std::shared_ptr schema() const { return schema_; } /// \return the all decoded record batches - std::vector> record_batches() const { + const std::vector>& record_batches() const { return record_batches_; } /// \return the all decoded metadatas - std::vector> metadatas() const { return metadatas_; } + const std::vector>& metadatas() const { + return metadatas_; + } /// \return the number of collected record batches - size_t num_record_batches() const { return record_batches_.size(); } + int64_t num_record_batches() const { return record_batches_.size(); } /// \return the last decoded record batch and remove it from /// record_batches