Skip to content

Commit

Permalink
Refresh -> Renew
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 23, 2023
1 parent 61754f3 commit 9f80e60
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 127 deletions.
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,16 @@ arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(
return std::move(cancel_result);
}

arrow::Result<FlightEndpoint> FlightClient::RefreshFlightEndpoint(
arrow::Result<FlightEndpoint> FlightClient::RenewFlightEndpoint(
const FlightCallOptions& options, const FlightEndpoint& endpoint) {
ARROW_ASSIGN_OR_RAISE(auto body, endpoint.SerializeToString());
Action action{ActionType::kRefreshFlightEndpoint.type, Buffer::FromString(body)};
Action action{ActionType::kRenewFlightEndpoint.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto refreshed_endpoint,
ARROW_ASSIGN_OR_RAISE(auto renewed_endpoint,
FlightEndpoint::Deserialize(std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return std::move(refreshed_endpoint);
return std::move(renewed_endpoint);
}

Status FlightClient::CloseFlightInfo(const FlightCallOptions& options,
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,16 @@ class ARROW_FLIGHT_EXPORT FlightClient {
Status CloseFlightInfo(const FlightCallOptions& options, const FlightInfo& info);
Status CloseFlightInfo(const FlightInfo& info) { return CloseFlightInfo({}, info); }

/// \brief Perform the RefreshFlightEndpoint action, returning a refreshed
/// \brief Perform the RenewFlightEndpoint action, returning a renewed
/// FlightEndpoint
///
/// \param[in] options Per-RPC options
/// \param[in] endpoint The FlightEndpoint to be refreshed
/// \return Arrow result with a refreshed FlightEndpoint
arrow::Result<FlightEndpoint> RefreshFlightEndpoint(const FlightCallOptions& options,
const FlightEndpoint& endpoint);
arrow::Result<FlightEndpoint> RefreshFlightEndpoint(const FlightEndpoint& endpoint) {
return RefreshFlightEndpoint({}, endpoint);
/// \param[in] endpoint The FlightEndpoint to be renewed
/// \return Arrow result with a renewed FlightEndpoint
arrow::Result<FlightEndpoint> RenewFlightEndpoint(const FlightCallOptions& options,
const FlightEndpoint& endpoint);
arrow::Result<FlightEndpoint> RenewFlightEndpoint(const FlightEndpoint& endpoint) {
return RenewFlightEndpoint({}, endpoint);
}

/// \brief Retrieve a list of available Action types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ TEST(FlightIntegration, ExpirationTimeCloseFlightInfo) {
ASSERT_OK(RunScenario("expiration_time:close_flight_info"));
}

TEST(FlightIntegration, ExpirationTimeRefreshFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:refresh_flight_endpoint"));
TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }
Expand Down
44 changes: 22 additions & 22 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class OrderedScenario : public Scenario {
/// even within 3 seconds after the action.
///
/// The client can extend the expiration time of a FlightEndpoint in
/// a returned FlightInfo by pre-defined RefreshFlightEndpoint
/// a returned FlightInfo by pre-defined RenewFlightEndpoint
/// action. The client can read data from endpoints multiple times
/// within more 10 seconds after the action.
///
Expand Down Expand Up @@ -543,14 +543,14 @@ class ExpirationTimeServer : public FlightServerBase {
auto index = *index_result;
statuses_[index].closed = true;
}
} else if (action.type == ActionType::kRefreshFlightEndpoint.type) {
} else if (action.type == ActionType::kRenewFlightEndpoint.type) {
ARROW_ASSIGN_OR_RAISE(auto endpoint,
FlightEndpoint::Deserialize(std::string_view(*action.body)));
ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(endpoint.ticket.ticket));
if (statuses_[index].cancelled) {
return Status::Invalid("Invalid flight: canceled: ", endpoint.ticket.ticket);
}
endpoint.ticket.ticket += ": refreshed (+ 10 seconds)";
endpoint.ticket.ticket += ": renewed (+ 10 seconds)";
endpoint.expiration_time = Timestamp::clock::now() + std::chrono::seconds{10};
statuses_[index].expiration_time = endpoint.expiration_time.value();
ARROW_ASSIGN_OR_RAISE(auto serialized, endpoint.SerializeToString());
Expand All @@ -567,7 +567,7 @@ class ExpirationTimeServer : public FlightServerBase {
*actions = {
ActionType::kCancelFlightInfo,
ActionType::kCloseFlightInfo,
ActionType::kRefreshFlightEndpoint,
ActionType::kRenewFlightEndpoint,
};
return Status::OK();
}
Expand Down Expand Up @@ -680,7 +680,7 @@ class ExpirationTimeListActionsScenario : public Scenario {
std::vector<std::string> expected_action_types = {
"CancelFlightInfo",
"CloseFlightInfo",
"RefreshFlightEndpoint",
"RenewFlightEndpoint",
};
if (actual_action_types != expected_action_types) {
return Status::Invalid(
Expand Down Expand Up @@ -752,12 +752,12 @@ class ExpirationTimeCloseFlightInfoScenario : public Scenario {
}
};

/// \brief The expiration time scenario - RefreshFlightEndpoint.
/// \brief The expiration time scenario - RenewFlightEndpoint.
///
/// This tests that the client can refresh a FlightEndpoint and read
/// data in refreshed expiration time even when the original
/// This tests that the client can renew a FlightEndpoint and read
/// data in renewed expiration time even when the original
/// expiration time is over.
class ExpirationTimeRefreshFlightEndpointScenario : public Scenario {
class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<ExpirationTimeServer>();
Expand All @@ -769,23 +769,23 @@ class ExpirationTimeRefreshFlightEndpointScenario : public Scenario {
Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("expiration")));
// Refresh all endpoints that have expiration time
// Renew all endpoints that have expiration time
for (const auto& endpoint : info->endpoints()) {
if (!endpoint.expiration_time.has_value()) {
continue;
}
const auto& expiration_time = endpoint.expiration_time.value();
ARROW_ASSIGN_OR_RAISE(auto refreshed_endpoint,
client->RefreshFlightEndpoint(endpoint));
if (!refreshed_endpoint.expiration_time.has_value()) {
return Status::Invalid("Refreshed endpoint must have expiration time: ",
refreshed_endpoint.ToString());
ARROW_ASSIGN_OR_RAISE(auto renewed_endpoint,
client->RenewFlightEndpoint(endpoint));
if (!renewed_endpoint.expiration_time.has_value()) {
return Status::Invalid("Renewed endpoint must have expiration time: ",
renewed_endpoint.ToString());
}
const auto& refreshed_expiration_time = refreshed_endpoint.expiration_time.value();
if (refreshed_expiration_time <= expiration_time) {
return Status::Invalid("Refreshed endpoint must have newer expiration time\n",
"Original:\n", endpoint.ToString(), "Refreshed:\n",
refreshed_endpoint.ToString());
const auto& renewed_expiration_time = renewed_endpoint.expiration_time.value();
if (renewed_expiration_time <= expiration_time) {
return Status::Invalid("Renewed endpoint must have newer expiration time\n",
"Original:\n", endpoint.ToString(), "Renewed:\n",
renewed_endpoint.ToString());
}
}
return Status::OK();
Expand Down Expand Up @@ -1870,8 +1870,8 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "expiration_time:close_flight_info") {
*out = std::make_shared<ExpirationTimeCloseFlightInfoScenario>();
return Status::OK();
} else if (scenario_name == "expiration_time:refresh_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRefreshFlightEndpointScenario>();
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/flight/sql/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
/// \brief Extends the expiration of a FlightEndpoint.
///
/// \param[in] options RPC-layer hints for this call.
/// \param[in] endpoint The FlightEndpoint to refresh.
/// \return Arrow result with a refreshed FlightEndpoint
::arrow::Result<FlightEndpoint> RefreshFlightEndpoint(const FlightCallOptions& options,
const FlightEndpoint& endpoint) {
return impl_->RefreshFlightEndpoint(options, endpoint);
/// \param[in] endpoint The FlightEndpoint to renew.
/// \return Arrow result with a renewed FlightEndpoint
::arrow::Result<FlightEndpoint> RenewFlightEndpoint(const FlightCallOptions& options,
const FlightEndpoint& endpoint) {
return impl_->RenewFlightEndpoint(options, endpoint);
}

/// \brief Explicitly shut down and clean up the client.
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ Status FlightSqlServerBase::ListActions(const ServerCallContext& context,
*actions = {
ActionType::kCancelFlightInfo,
ActionType::kCloseFlightInfo,
ActionType::kRefreshFlightEndpoint,
ActionType::kRenewFlightEndpoint,
FlightSqlServerBase::kBeginSavepointActionType,
FlightSqlServerBase::kBeginTransactionActionType,
FlightSqlServerBase::kCancelQueryActionType,
Expand Down Expand Up @@ -790,12 +790,12 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto info, FlightInfo::Deserialize(body));
ARROW_RETURN_NOT_OK(CloseFlightInfo(context, *info));
} else if (action.type == ActionType::kRefreshFlightEndpoint.type) {
} else if (action.type == ActionType::kRenewFlightEndpoint.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto endpoint, FlightEndpoint::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto refreshed_result,
RefreshFlightEndpoint(context, endpoint));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(refreshed_result));
ARROW_ASSIGN_OR_RAISE(auto renewed_result,
RenewFlightEndpoint(context, endpoint));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(renewed_result));

results.push_back(std::move(packed_result));
} else {
Expand Down Expand Up @@ -1101,9 +1101,9 @@ Status FlightSqlServerBase::CloseFlightInfo(const ServerCallContext& context,
return Status::NotImplemented("CloseFlightInfo not implemented");
}

arrow::Result<FlightEndpoint> FlightSqlServerBase::RefreshFlightEndpoint(
arrow::Result<FlightEndpoint> FlightSqlServerBase::RenewFlightEndpoint(
const ServerCallContext& context, const FlightEndpoint& info) {
return Status::NotImplemented("CancelFlightEndpoint not implemented");
return Status::NotImplemented("RenewFlightEndpoint not implemented");
}

arrow::Result<ActionCreatePreparedStatementResult>
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/sql/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
virtual Status CloseFlightInfo(const ServerCallContext& context,
const FlightInfo& info);

/// \brief Attempt to explicitly refresh a FlightEndpoint.
/// \brief Attempt to explicitly renew a FlightEndpoint.
/// \param[in] context The call context.
/// \param[in] endpoint The FlightEndpoint to refresh.
/// \return The refresh result.
virtual arrow::Result<FlightEndpoint> RefreshFlightEndpoint(
/// \param[in] endpoint The FlightEndpoint to renew.
/// \return The renew result.
virtual arrow::Result<FlightEndpoint> RenewFlightEndpoint(
const ServerCallContext& context, const FlightEndpoint& endpoint);

/// @}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/sql/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -780,11 +780,11 @@ TEST_F(TestFlightSqlServer, CloseFlightInfo) {
ASSERT_RAISES(NotImplemented, sql_client->CloseFlightInfo({}, *flight_info));
}

TEST_F(TestFlightSqlServer, RefreshFlightEndpoint) {
TEST_F(TestFlightSqlServer, RenewFlightEndpoint) {
// Not supported
ASSERT_OK_AND_ASSIGN(auto flight_info, sql_client->GetSqlInfo({}, {}));
ASSERT_RAISES(NotImplemented,
sql_client->RefreshFlightEndpoint({}, flight_info->endpoints()[0]));
sql_client->RenewFlightEndpoint({}, flight_info->endpoints()[0]));
}

TEST_F(TestFlightSqlServer, Transactions) {
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,11 @@ const ActionType ActionType::kCloseFlightInfo =
"Close the given FlightInfo explicitly.\n"
"Request Message: FlightInfo to be closed\n"
"Response Message: N/A"};
const ActionType ActionType::kRefreshFlightEndpoint =
ActionType{"RefreshFlightEndpoint",
const ActionType ActionType::kRenewFlightEndpoint =
ActionType{"RenewFlightEndpoint",
"Extend expiration time of the given FlightEndpoint.\n"
"Request Message: FlightEndpoint to be refreshed\n"
"Response Message: Refreshed FlightEndpoint"};
"Request Message: FlightEndpoint to be renewed\n"
"Response Message: Renewed FlightEndpoint"};

bool ActionType::Equals(const ActionType& other) const {
return type == other.type && description == other.description;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ struct ARROW_FLIGHT_EXPORT ActionType {

static const ActionType kCancelFlightInfo;
static const ActionType kCloseFlightInfo;
static const ActionType kRefreshFlightEndpoint;
static const ActionType kRenewFlightEndpoint;
};

/// \brief Opaque selection criteria for ListFlights RPC
Expand Down
4 changes: 2 additions & 2 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:refresh_flight_endpoint",
"expiration_time:renew_flight_endpoint",
description=("Ensure FlightEndpoint.expiration_time and "
"RefreshFlightEndpoint are working as expected."),
"RenewFlightEndpoint are working as expected."),
skip={"JS", "C#", "Rust"},
),
Scenario(
Expand Down
12 changes: 6 additions & 6 deletions go/arrow/flight/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Client interface {
CancelFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (CancelFlightInfoResult, error)
Close() error
CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) error
RefreshFlightEndpoint(ctx context.Context, endpoint *FlightEndpoint, opts ...grpc.CallOption) (*FlightEndpoint, error)
RenewFlightEndpoint(ctx context.Context, endpoint *FlightEndpoint, opts ...grpc.CallOption) (*FlightEndpoint, error)
// join the interface from the FlightServiceClient instead of re-defining all
// the endpoints here.
FlightServiceClient
Expand Down Expand Up @@ -409,10 +409,10 @@ func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...
return ReadUntilEOF(stream)
}

func (c *client) RefreshFlightEndpoint(ctx context.Context, endpoint *FlightEndpoint, opts ...grpc.CallOption) (*FlightEndpoint, error) {
func (c *client) RenewFlightEndpoint(ctx context.Context, endpoint *FlightEndpoint, opts ...grpc.CallOption) (*FlightEndpoint, error) {
var err error
var action flight.Action
action.Type = RefreshFlightEndpointActionType
action.Type = RenewFlightEndpointActionType
action.Body, err = proto.Marshal(endpoint)
if err != nil {
return nil, err
Expand All @@ -425,14 +425,14 @@ func (c *client) RefreshFlightEndpoint(ctx context.Context, endpoint *FlightEndp
if err != nil {
return nil, err
}
var refreshedEndpoint FlightEndpoint
err = proto.Unmarshal(res.Body, &refreshedEndpoint)
var renewedEndpoint FlightEndpoint
err = proto.Unmarshal(res.Body, &renewedEndpoint)
if err != nil {
return nil, err
}
err = ReadUntilEOF(stream)
if err != nil {
return nil, err
}
return &refreshedEndpoint, nil
return &renewedEndpoint, nil
}
4 changes: 2 additions & 2 deletions go/arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ func (c *Client) CloseFlightInfo(ctx context.Context, info *flight.FlightInfo, o
return c.Client.CloseFlightInfo(ctx, info, opts...)
}

func (c *Client) RefreshFlightEndpoint(ctx context.Context, endpoint *flight.FlightEndpoint, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) {
return c.Client.RefreshFlightEndpoint(ctx, endpoint, opts...)
func (c *Client) RenewFlightEndpoint(ctx context.Context, endpoint *flight.FlightEndpoint, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) {
return c.Client.RenewFlightEndpoint(ctx, endpoint, opts...)
}

func (c *Client) BeginTransaction(ctx context.Context, opts ...grpc.CallOption) (*Txn, error) {
Expand Down
12 changes: 6 additions & 6 deletions go/arrow/flight/flightsql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *FlightServiceClientMock) CloseFlightInfo(ctx context.Context, info *fli
return m.Called(info, opts).Error(0)
}

func (m *FlightServiceClientMock) RefreshFlightEndpoint(ctx context.Context, endpoint *flight.FlightEndpoint, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) {
func (m *FlightServiceClientMock) RenewFlightEndpoint(ctx context.Context, endpoint *flight.FlightEndpoint, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) {
args := m.Called(endpoint, opts)
return args.Get(0).(*flight.FlightEndpoint), args.Error(1)
}
Expand Down Expand Up @@ -645,7 +645,7 @@ func (s *FlightSqlClientSuite) TestCloseFlightInfo() {
s.NoError(s.sqlClient.CloseFlightInfo(context.TODO(), info, s.callOpts...))
}

func (s *FlightSqlClientSuite) TestRefreshFlightEndpoint() {
func (s *FlightSqlClientSuite) TestRenewFlightEndpoint() {
query := "SELECT * FROM data"
cmd := &pb.CommandStatementQuery{Query: query}
desc := getDesc(cmd)
Expand All @@ -657,11 +657,11 @@ func (s *FlightSqlClientSuite) TestRefreshFlightEndpoint() {
info, err := s.sqlClient.Execute(context.Background(), query, s.callOpts...)
s.NoError(err)
s.Equal(&mockedInfo, info)
var mockedRefreshedEndpoint flight.FlightEndpoint
s.mockClient.On("RefreshFlightEndpoint", &mockedEndpoint, s.callOpts).Return(&mockedRefreshedEndpoint, nil)
refreshedEndpoint, err := s.sqlClient.RefreshFlightEndpoint(context.TODO(), info.Endpoint[0], s.callOpts...)
var mockedRenewedEndpoint flight.FlightEndpoint
s.mockClient.On("RenewFlightEndpoint", &mockedEndpoint, s.callOpts).Return(&mockedRenewedEndpoint, nil)
renewedEndpoint, err := s.sqlClient.RenewFlightEndpoint(context.TODO(), info.Endpoint[0], s.callOpts...)
s.NoError(err)
s.Equal(&mockedRefreshedEndpoint, refreshedEndpoint)
s.Equal(&mockedRenewedEndpoint, renewedEndpoint)
}

func TestFlightSqlClient(t *testing.T) {
Expand Down
Loading

0 comments on commit 9f80e60

Please sign in to comment.