Skip to content

Commit

Permalink
apacheGH-35500: [C++][Go][Java][FlightRPC] Add support for result set…
Browse files Browse the repository at this point in the history
… expiration (apache#36009)

### Rationale for this change

Currently, it is undefined whether a client can call DoGet more than once. Clients may want to retry requests, and servers may not want to persist a query result forever.

### What changes are included in this PR?

Add an expiration time to FlightEndpoint. If present, clients may assume they can retry DoGet requests. Otherwise, clients should avoid retrying DoGet requests.

This is not a full retry protocol.

Also, add "pre-defined" actions to Flight RPC for working with result sets. These are pre-defined Protobuf messages with standardized encodings for use with DoAction:

  * CancelFlightInfo: Asynchronously cancel the execution of a distributed query. (Replaces the equivalent Flight SQL action.)
  * RefreshFlightEndpoint: Request an extension of the expiration of a FlightEndpoint.

This lets the ADBC/JDBC/ODBC drivers for Flight SQL explicitly manage result set lifetimes. These can be used with Flight SQL as regular actions.

#### Backward compatibility

Flight SQL's CancelQuery is deprecated by Flight RPC's CancelFlightInfo. But some clients may not be able to migrate to CancelFlightInfo entirely.

If a client can assume that a server requires 13.0.0 or later, the client can always use CancelFlightInfo. Otherwise, the client need to use CancelQuery (for old server) and/or CancelFlightInfo (for newer server).

A server needs to implement only CancelFlightInfo. Because Flight SQL server libraries provide the default CancelQuery implementation that delegates to CancelFlightInfo. Clients can use both of Flight SQL's CancelQuery and Flight RPC's CancelFLightInfo by this feature.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

* Closes: apache#35500

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou committed Jul 3, 2023
1 parent 575b095 commit 0b7bd74
Show file tree
Hide file tree
Showing 54 changed files with 4,391 additions and 1,231 deletions.
24 changes: 24 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,30 @@ Status FlightClient::DoAction(const FlightCallOptions& options, const Action& ac
return DoAction(options, action).Value(results);
}

arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto cancel_result, CancelFlightInfoResult::Deserialize(
std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return std::move(cancel_result);
}

arrow::Result<FlightEndpoint> FlightClient::RenewFlightEndpoint(
const FlightCallOptions& options, const RenewFlightEndpointRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kRenewFlightEndpoint.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto renewed_endpoint,
FlightEndpoint::Deserialize(std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return std::move(renewed_endpoint);
}

arrow::Result<std::vector<ActionType>> FlightClient::ListActions(
const FlightCallOptions& options) {
std::vector<ActionType> actions;
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return DoAction({}, action).Value(results);
}

/// \brief Perform the CancelFlightInfo action, returning a
/// CancelFlightInfoResult
///
/// \param[in] options Per-RPC options
/// \param[in] request The CancelFlightInfoRequest
/// \return Arrow result with a CancelFlightInfoResult
arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request);
arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
const CancelFlightInfoRequest& request) {
return CancelFlightInfo({}, request);
}

/// \brief Perform the RenewFlightEndpoint action, returning a renewed
/// FlightEndpoint
///
/// \param[in] options Per-RPC options
/// \param[in] request The RenewFlightEndpointRequest
/// \return Arrow result with a renewed FlightEndpoint
arrow::Result<FlightEndpoint> RenewFlightEndpoint(
const FlightCallOptions& options, const RenewFlightEndpointRequest& request);
arrow::Result<FlightEndpoint> RenewFlightEndpoint(
const RenewFlightEndpointRequest& request) {
return RenewFlightEndpoint({}, request);
}

/// \brief Retrieve a list of available Action types
/// \param[in] options Per-RPC options
/// \return Arrow result with the available actions
Expand Down
59 changes: 37 additions & 22 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,42 @@ TEST(FlightTypes, FlightDescriptor) {
TEST(FlightTypes, FlightEndpoint) {
ASSERT_OK_AND_ASSIGN(auto location1, Location::ForGrpcTcp("localhost", 1024));
ASSERT_OK_AND_ASSIGN(auto location2, Location::ForGrpcTls("localhost", 1024));
// 2023-06-19 03:14:06.004330100
// We must use microsecond resolution here for portability.
// std::chrono::system_clock::time_point may not provide nanosecond
// resolution on some platforms such as Windows.
const auto expiration_time_duration =
std::chrono::seconds{1687144446} + std::chrono::nanoseconds{4339000};
Timestamp expiration_time(
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
std::vector<FlightEndpoint> values = {
{{""}, {}},
{{"foo"}, {}},
{{"bar"}, {}},
{{"foo"}, {location1}},
{{"bar"}, {location1}},
{{"foo"}, {location2}},
{{"foo"}, {location1, location2}},
{{""}, {}, std::nullopt},
{{"foo"}, {}, std::nullopt},
{{"bar"}, {}, std::nullopt},
{{"foo"}, {}, expiration_time},
{{"foo"}, {location1}, std::nullopt},
{{"bar"}, {location1}, std::nullopt},
{{"foo"}, {location2}, std::nullopt},
{{"foo"}, {location1, location2}, std::nullopt},
};
std::vector<std::string> reprs = {
"<FlightEndpoint ticket=<Ticket ticket=''> locations=[]>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations=[]>",
"<FlightEndpoint ticket=<Ticket ticket=''> locations=[] "
"expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations=[] "
"expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=2023-06-19 03:14:06.004339000>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1024]>",
"[grpc+tcp://localhost:1024] expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations="
"[grpc+tcp://localhost:1024]>",
"[grpc+tcp://localhost:1024] expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tls://localhost:1024]>",
"[grpc+tls://localhost:1024] expiration_time=null>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1024, grpc+tls://localhost:1024]>",
"[grpc+tcp://localhost:1024, grpc+tls://localhost:1024] "
"expiration_time=null>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightEndpoint>(values, reprs));
Expand All @@ -210,8 +225,8 @@ TEST(FlightTypes, FlightInfo) {
Schema schema2({});
auto desc1 = FlightDescriptor::Command("foo");
auto desc2 = FlightDescriptor::Command("bar");
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}};
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}, std::nullopt};
std::vector<FlightInfo> values = {
MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
Expand All @@ -227,13 +242,13 @@ TEST(FlightTypes, FlightInfo) {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>] "
"total_records=-1 total_bytes=42 ordered=true>",
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>] total_records=-1 total_bytes=42 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>, "
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 "
"ordered=false>",
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>, <FlightEndpoint ticket=<Ticket ticket='foo'> "
"locations=[grpc+tcp://localhost:1234] expiration_time=null>] "
"total_records=64 total_bytes=-1 ordered=false>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }

TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }

TEST(FlightIntegration, ExpirationTimeDoGet) {
ASSERT_OK(RunScenario("expiration_time:do_get"));
}

TEST(FlightIntegration, ExpirationTimeListActions) {
ASSERT_OK(RunScenario("expiration_time:list_actions"));
}

TEST(FlightIntegration, ExpirationTimeCancelFlightInfo) {
ASSERT_OK(RunScenario("expiration_time:cancel_flight_info"));
}

TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }

TEST(FlightIntegration, FlightSqlExtension) {
Expand Down
Loading

0 comments on commit 0b7bd74

Please sign in to comment.