Skip to content

Commit

Permalink
RetryInfo -> PollInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Aug 9, 2023
1 parent f74e634 commit 419d8cf
Show file tree
Hide file tree
Showing 30 changed files with 313 additions and 304 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightClient::GetFlightInfo(
return info;
}

arrow::Result<std::unique_ptr<RetryInfo>> FlightClient::PollFlightInfo(
arrow::Result<std::unique_ptr<PollInfo>> FlightClient::PollFlightInfo(
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
std::unique_ptr<RetryInfo> info;
std::unique_ptr<PollInfo> info;
RETURN_NOT_OK(CheckOpen());
RETURN_NOT_OK(transport_->PollFlightInfo(options, descriptor, &info));
return info;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ class ARROW_FLIGHT_EXPORT FlightClient {
/// \param[in] options Per-RPC options
/// \param[in] descriptor the dataset request or a descriptor returned by a
/// prioir PollFlightInfo call
/// \return Arrow result with the RetryInfo describing the status of
/// \return Arrow result with the PollInfo describing the status of
/// the requested query
arrow::Result<std::unique_ptr<RetryInfo>> PollFlightInfo(
arrow::Result<std::unique_ptr<PollInfo>> PollFlightInfo(
const FlightCallOptions& options, const FlightDescriptor& descriptor);
arrow::Result<std::unique_ptr<RetryInfo>> PollFlightInfo(
arrow::Result<std::unique_ptr<PollInfo>> PollFlightInfo(
const FlightDescriptor& descriptor) {
return PollFlightInfo({}, descriptor);
}
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void TestRoundtrip(const std::vector<FlightType>& values,
ASSERT_OK_AND_ASSIGN(std::string serialized, values[i].SerializeToString());
ASSERT_OK_AND_ASSIGN(auto deserialized, FlightType::Deserialize(serialized));
if constexpr (std::is_same_v<FlightType, FlightInfo> ||
std::is_same_v<FlightType, RetryInfo>) {
std::is_same_v<FlightType, PollInfo>) {
ARROW_SCOPED_TRACE("Deserialized = ", deserialized->ToString());
EXPECT_EQ(values[i], *deserialized);
} else {
Expand Down Expand Up @@ -260,7 +260,7 @@ TEST(FlightTypes, FlightInfo) {
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
}

TEST(FlightTypes, RetryInfo) {
TEST(FlightTypes, PollInfo) {
ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", 1234));
Schema schema({field("ints", int64())});
auto desc = FlightDescriptor::Command("foo");
Expand All @@ -274,22 +274,22 @@ TEST(FlightTypes, RetryInfo) {
std::chrono::seconds{1687144446} + std::chrono::nanoseconds{4339000};
Timestamp expiration_time(
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
std::vector<RetryInfo> values = {
RetryInfo{std::make_unique<FlightInfo>(info), std::nullopt, std::nullopt,
std::nullopt},
RetryInfo{std::make_unique<FlightInfo>(info), FlightDescriptor::Command("retry"),
0.1, expiration_time},
std::vector<PollInfo> values = {
PollInfo{std::make_unique<FlightInfo>(info), std::nullopt, std::nullopt,
std::nullopt},
PollInfo{std::make_unique<FlightInfo>(info), FlightDescriptor::Command("poll"), 0.1,
expiration_time},
};
std::vector<std::string> reprs = {
"<RetryInfo info=" + info.ToString() +
"<PollInfo info=" + info.ToString() +
" descriptor=null "
"progress=null expiration_time=null>",
"<RetryInfo info=" + info.ToString() +
" descriptor=<FlightDescriptor cmd='retry'> "
"<PollInfo info=" + info.ToString() +
" descriptor=<FlightDescriptor cmd='poll'> "
"progress=0.1 expiration_time=2023-06-19 03:14:06.004339000>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::RetryInfo>(values, reprs));
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::PollInfo>(values, reprs));
}

TEST(FlightTypes, Result) {
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -751,20 +751,20 @@ class PollFlightInfoServer : public FlightServerBase {

Status PollFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* result) override {
std::unique_ptr<PollInfo>* result) override {
auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
std::vector<FlightEndpoint> endpoints = {
FlightEndpoint{{"long-running query"}, {}, std::nullopt}};
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
if (descriptor == FlightDescriptor::Command("retry")) {
*result = std::make_unique<RetryInfo>(std::make_unique<FlightInfo>(std::move(info)),
std::nullopt, 1.0, std::nullopt);
if (descriptor == FlightDescriptor::Command("poll")) {
*result = std::make_unique<PollInfo>(std::make_unique<FlightInfo>(std::move(info)),
std::nullopt, 1.0, std::nullopt);
} else {
*result =
std::make_unique<RetryInfo>(std::make_unique<FlightInfo>(std::move(info)),
FlightDescriptor::Command("retry"), 0.1,
Timestamp::clock::now() + std::chrono::seconds{10});
std::make_unique<PollInfo>(std::make_unique<FlightInfo>(std::move(info)),
FlightDescriptor::Command("poll"), 0.1,
Timestamp::clock::now() + std::chrono::seconds{10});
}
return Status::OK();
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
return Status::OK();
}

// RetryInfo
// PollInfo

Status FromProto(const pb::RetryInfo& pb_info, RetryInfo* info) {
Status FromProto(const pb::PollInfo& pb_info, PollInfo* info) {
FlightInfo::Data data;
RETURN_NOT_OK(FromProto(pb_info.info(), &data));
info->info = std::make_unique<FlightInfo>(std::move(data));
Expand All @@ -326,7 +326,7 @@ Status FromProto(const pb::RetryInfo& pb_info, RetryInfo* info) {
return Status::OK();
}

Status ToProto(const RetryInfo& info, pb::RetryInfo* pb_info) {
Status ToProto(const PollInfo& info, pb::PollInfo* pb_info) {
RETURN_NOT_OK(ToProto(*info.info, pb_info->mutable_info()));
if (info.descriptor) {
RETURN_NOT_OK(ToProto(*info.descriptor, pb_info->mutable_flight_descriptor()));
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/serialization_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint
Status FromProto(const pb::RenewFlightEndpointRequest& pb_request,
RenewFlightEndpointRequest* request);
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);
Status FromProto(const pb::RetryInfo& pb_info, RetryInfo* info);
Status FromProto(const pb::PollInfo& pb_info, PollInfo* info);
Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request);
Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
Expand All @@ -73,7 +73,7 @@ Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint);
Status ToProto(const RenewFlightEndpointRequest& request,
pb::RenewFlightEndpointRequest* pb_request);
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
Status ToProto(const RetryInfo& info, pb::RetryInfo* pb_info);
Status ToProto(const PollInfo& info, pb::PollInfo* pb_info);
Status ToProto(const CancelFlightInfoRequest& request,
pb::CancelFlightInfoRequest* pb_request);
Status ToProto(const ActionType& type, pb::ActionType* pb_type);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ Status FlightServerBase::GetFlightInfo(const ServerCallContext& context,

Status FlightServerBase::PollFlightInfo(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<RetryInfo>* info) {
std::unique_ptr<PollInfo>* info) {
return Status::NotImplemented("NYI");
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class ARROW_FLIGHT_EXPORT FlightServerBase {
/// \return Status
virtual Status PollFlightInfo(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<RetryInfo>* info);
std::unique_ptr<PollInfo>* info);

/// \brief Retrieve the schema for the indicated descriptor
/// \param[in] context The call context.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Status ClientTransport::GetFlightInfo(const FlightCallOptions& options,
}
Status ClientTransport::PollFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* info) {
std::unique_ptr<PollInfo>* info) {
return Status::NotImplemented("PollFlightInfo for this transport");
}
arrow::Result<std::unique_ptr<SchemaResult>> ClientTransport::GetSchema(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ARROW_FLIGHT_EXPORT ClientTransport {
std::unique_ptr<FlightInfo>* info);
virtual Status PollFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* info);
std::unique_ptr<PollInfo>* info);
virtual arrow::Result<std::unique_ptr<SchemaResult>> GetSchema(
const FlightCallOptions& options, const FlightDescriptor& descriptor);
virtual Status ListFlights(const FlightCallOptions& options, const Criteria& criteria,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/transport/grpc/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -806,9 +806,9 @@ class GrpcClientImpl : public internal::ClientTransport {

Status PollFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* info) override {
std::unique_ptr<PollInfo>* info) override {
pb::FlightDescriptor pb_descriptor;
pb::RetryInfo pb_response;
pb::PollInfo pb_response;

RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor));

Expand All @@ -818,7 +818,7 @@ class GrpcClientImpl : public internal::ClientTransport {
stub_->PollFlightInfo(&rpc.context, pb_descriptor, &pb_response), &rpc.context);
RETURN_NOT_OK(s);

info->reset(new RetryInfo());
info->reset(new PollInfo());
RETURN_NOT_OK(internal::FromProto(pb_response, info->get()));
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/transport/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ class GrpcServiceHandler final : public FlightService::Service {

::grpc::Status PollFlightInfo(ServerContext* context,
const pb::FlightDescriptor* request,
pb::RetryInfo* response) {
pb::PollInfo* response) {
GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(
CheckAuth(FlightMethod::PollFlightInfo, context, flight_context));
Expand All @@ -424,7 +424,7 @@ class GrpcServiceHandler final : public FlightService::Service {
FlightDescriptor descr;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &descr));

std::unique_ptr<RetryInfo> info;
std::unique_ptr<PollInfo> info;
SERVICE_RETURN_NOT_OK(flight_context,
impl_->base()->PollFlightInfo(flight_context, descr, &info));

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/transport/ucx/ucx_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {

Status PollFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* info) override {
std::unique_ptr<PollInfo>* info) override {
ARROW_ASSIGN_OR_RAISE(auto connection, CheckoutConnection(options));
UcpCallDriver* driver = connection.driver();

Expand All @@ -618,7 +618,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport {
ARROW_ASSIGN_OR_RAISE(auto incoming_message, driver->ReadNextFrame());
if (incoming_message->type == FrameType::kBuffer) {
ARROW_ASSIGN_OR_RAISE(
*info, RetryInfo::Deserialize(std::string_view(*incoming_message->buffer)));
*info, PollInfo::Deserialize(std::string_view(*incoming_message->buffer)));
ARROW_ASSIGN_OR_RAISE(incoming_message, driver->ReadNextFrame());
}
RETURN_NOT_OK(driver->ExpectFrameType(*incoming_message, FrameType::kHeaders));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/transport/ucx/ucx_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class UcxServerImpl : public arrow::flight::internal::ServerTransport {
FlightDescriptor::Deserialize(std::string_view(*frame->buffer))
.Value(&descriptor));

std::unique_ptr<RetryInfo> info;
std::unique_ptr<PollInfo> info;
std::string response;
SERVER_RETURN_NOT_OK(driver, base_->PollFlightInfo(context, descriptor, &info));
SERVER_RETURN_NOT_OK(driver, info->SerializeToString().Value(&response));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct FlightClientOptions;
struct FlightDescriptor;
struct FlightEndpoint;
class FlightInfo;
class RetryInfo;
class PollInfo;
class FlightListing;
class FlightMetadataReader;
class FlightMetadataWriter;
Expand Down
24 changes: 12 additions & 12 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,36 +336,36 @@ bool FlightInfo::Equals(const FlightInfo& other) const {
data_.ordered == other.data_.ordered;
}

arrow::Result<std::string> RetryInfo::SerializeToString() const {
pb::RetryInfo pb_info;
arrow::Result<std::string> PollInfo::SerializeToString() const {
pb::PollInfo pb_info;
RETURN_NOT_OK(internal::ToProto(*this, &pb_info));

std::string out;
if (!pb_info.SerializeToString(&out)) {
return Status::IOError("Serialized RetryInfo exceeded 2 GiB limit");
return Status::IOError("Serialized PollInfo exceeded 2 GiB limit");
}
return out;
}

arrow::Result<std::unique_ptr<RetryInfo>> RetryInfo::Deserialize(
arrow::Result<std::unique_ptr<PollInfo>> PollInfo::Deserialize(
std::string_view serialized) {
pb::RetryInfo pb_info;
pb::PollInfo pb_info;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized RetryInfo size should not exceed 2 GiB");
return Status::Invalid("Serialized PollInfo size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_info.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid RetryInfo");
return Status::Invalid("Not a valid PollInfo");
}
RetryInfo info;
PollInfo info;
RETURN_NOT_OK(internal::FromProto(pb_info, &info));
return std::make_unique<RetryInfo>(std::move(info));
return std::make_unique<PollInfo>(std::move(info));
}

std::string RetryInfo::ToString() const {
std::string PollInfo::ToString() const {
std::stringstream ss;
ss << "<RetryInfo info=" << info->ToString();
ss << "<PollInfo info=" << info->ToString();
ss << " descriptor=";
if (descriptor) {
ss << descriptor->ToString();
Expand Down Expand Up @@ -394,7 +394,7 @@ std::string RetryInfo::ToString() const {
return ss.str();
}

bool RetryInfo::Equals(const RetryInfo& other) const {
bool PollInfo::Equals(const PollInfo& other) const {
if ((info.get() != nullptr) != (other.info.get() != nullptr)) {
return false;
}
Expand Down
26 changes: 13 additions & 13 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
};

/// \brief The information to process a long-running query.
class ARROW_FLIGHT_EXPORT RetryInfo {
class ARROW_FLIGHT_EXPORT PollInfo {
public:
/// The currently available results so far.
std::unique_ptr<FlightInfo> info = NULLPTR;
Expand All @@ -662,26 +662,26 @@ class ARROW_FLIGHT_EXPORT RetryInfo {
/// monotonic or nondecreasing. If unknown, do not set.
std::optional<double> progress = std::nullopt;
/// Expiration time for this request. After this passes, the server
/// might not accept the retry descriptor anymore (and the query may
/// might not accept the poll descriptor anymore (and the query may
/// be cancelled). This may be updated on a call to PollFlightInfo.
std::optional<Timestamp> expiration_time = std::nullopt;

RetryInfo()
PollInfo()
: info(NULLPTR),
descriptor(std::nullopt),
progress(std::nullopt),
expiration_time(std::nullopt) {}

explicit RetryInfo(std::unique_ptr<FlightInfo> info,
std::optional<FlightDescriptor> descriptor,
std::optional<double> progress,
std::optional<Timestamp> expiration_time)
explicit PollInfo(std::unique_ptr<FlightInfo> info,
std::optional<FlightDescriptor> descriptor,
std::optional<double> progress,
std::optional<Timestamp> expiration_time)
: info(std::move(info)),
descriptor(std::move(descriptor)),
progress(progress),
expiration_time(expiration_time) {}

explicit RetryInfo(const RetryInfo& other)
explicit PollInfo(const PollInfo& other)
: info(other.info ? std::make_unique<FlightInfo>(*other.info) : NULLPTR),
descriptor(other.descriptor),
progress(other.progress),
Expand All @@ -697,20 +697,20 @@ class ARROW_FLIGHT_EXPORT RetryInfo {
///
/// Useful when interoperating with non-Flight systems (e.g. REST
/// services) that may want to return Flight types.
static arrow::Result<std::unique_ptr<RetryInfo>> Deserialize(
static arrow::Result<std::unique_ptr<PollInfo>> Deserialize(
std::string_view serialized);

std::string ToString() const;

/// Compare two RetryInfo for equality. This will compare the
/// Compare two PollInfo for equality. This will compare the
/// serialized schema representations, NOT the logical equality of
/// the schemas.
bool Equals(const RetryInfo& other) const;
bool Equals(const PollInfo& other) const;

friend bool operator==(const RetryInfo& left, const RetryInfo& right) {
friend bool operator==(const PollInfo& left, const PollInfo& right) {
return left.Equals(right);
}
friend bool operator!=(const RetryInfo& left, const RetryInfo& right) {
friend bool operator!=(const PollInfo& left, const PollInfo& right) {
return !(left == right);
}
};
Expand Down
Loading

0 comments on commit 419d8cf

Please sign in to comment.