Skip to content

Commit

Permalink
Remove CloseFlightInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 24, 2023
1 parent 2ef69be commit 7eb5aca
Show file tree
Hide file tree
Showing 19 changed files with 0 additions and 243 deletions.
9 changes: 0 additions & 9 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,6 @@ arrow::Result<FlightEndpoint> FlightClient::RenewFlightEndpoint(
return std::move(renewed_endpoint);
}

Status FlightClient::CloseFlightInfo(const FlightCallOptions& options,
const FlightInfo& info) {
ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString());
Action action{ActionType::kCloseFlightInfo.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_RETURN_NOT_OK(stream->Drain());
return Status::OK();
}

arrow::Result<std::vector<ActionType>> FlightClient::ListActions(
const FlightCallOptions& options) {
std::vector<ActionType> actions;
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,6 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return CancelFlightInfo({}, request);
}

/// \brief Perform the CloseFlightInfo action
///
/// \param[in] options Per-RPC options
/// \param[in] info The FlightInfo to be closed
/// \return Arrow status
Status CloseFlightInfo(const FlightCallOptions& options, const FlightInfo& info);
Status CloseFlightInfo(const FlightInfo& info) { return CloseFlightInfo({}, info); }

/// \brief Perform the RenewFlightEndpoint action, returning a renewed
/// FlightEndpoint
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ TEST(FlightIntegration, ExpirationTimeCancelFlightInfo) {
ASSERT_OK(RunScenario("expiration_time:cancel_flight_info"));
}

TEST(FlightIntegration, ExpirationTimeCloseFlightInfo) {
ASSERT_OK(RunScenario("expiration_time:close_flight_info"));
}

TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}
Expand Down
52 changes: 0 additions & 52 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,6 @@ class OrderedScenario : public Scenario {
/// a returned FlightInfo by pre-defined RenewFlightEndpoint
/// action. The client can read data from endpoints multiple times
/// within more 10 seconds after the action.
///
/// The client can close a returned FlightInfo explicitly by
/// pre-defined CloseFlightInfo action. The client can't read data
/// from endpoints even within 3 seconds after the action.
class ExpirationTimeServer : public FlightServerBase {
private:
struct EndpointStatus {
Expand All @@ -449,7 +445,6 @@ class ExpirationTimeServer : public FlightServerBase {
std::optional<Timestamp> expiration_time;
uint32_t num_gets = 0;
bool cancelled = false;
bool closed = false;
};

public:
Expand All @@ -476,9 +471,6 @@ class ExpirationTimeServer : public FlightServerBase {
std::unique_ptr<FlightDataStream>* stream) override {
ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket));
auto& status = statuses_[index];
if (status.closed) {
return Status::KeyError("Invalid flight: closed: ", request.ticket);
}
if (status.cancelled) {
return Status::KeyError("Invalid flight: canceled: ", request.ticket);
}
Expand Down Expand Up @@ -532,17 +524,6 @@ class ExpirationTimeServer : public FlightServerBase {
auto cancel_result = CancelFlightInfoResult{cancel_status};
ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
results.push_back(Result{Buffer::FromString(std::move(serialized))});
} else if (action.type == ActionType::kCloseFlightInfo.type) {
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Deserialize(std::string_view(*action.body)));
for (const auto& endpoint : info->endpoints()) {
auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket);
if (!index_result.ok()) {
continue;
}
auto index = *index_result;
statuses_[index].closed = true;
}
} else if (action.type == ActionType::kRenewFlightEndpoint.type) {
ARROW_ASSIGN_OR_RAISE(auto request, RenewFlightEndpointRequest::Deserialize(
std::string_view(*action.body)));
Expand All @@ -567,7 +548,6 @@ class ExpirationTimeServer : public FlightServerBase {
std::vector<ActionType>* actions) override {
*actions = {
ActionType::kCancelFlightInfo,
ActionType::kCloseFlightInfo,
ActionType::kRenewFlightEndpoint,
};
return Status::OK();
Expand Down Expand Up @@ -680,7 +660,6 @@ class ExpirationTimeListActionsScenario : public Scenario {
std::sort(actual_action_types.begin(), actual_action_types.end());
std::vector<std::string> expected_action_types = {
"CancelFlightInfo",
"CloseFlightInfo",
"RenewFlightEndpoint",
};
if (actual_action_types != expected_action_types) {
Expand Down Expand Up @@ -727,34 +706,6 @@ class ExpirationTimeCancelFlightInfoScenario : public Scenario {
}
};

/// \brief The expiration time scenario - CloseFlightInfo.
///
/// This tests that the client can close a FlightInfo explicitly and
/// the server returns an error for DoGet against endpoints in the
/// closed FlightInfo.
class ExpirationTimeCloseFlightInfoScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<ExpirationTimeServer>();
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("expiration")));
ARROW_RETURN_NOT_OK(client->CloseFlightInfo(*info));
for (const auto& endpoint : info->endpoints()) {
auto reader = client->DoGet(endpoint.ticket);
if (reader.ok()) {
return Status::Invalid("DoGet after CloseFlightInfo must be failed");
}
}
return Status::OK();
}
};

/// \brief The expiration time scenario - RenewFlightEndpoint.
///
/// This tests that the client can renew a FlightEndpoint and read
Expand Down Expand Up @@ -1871,9 +1822,6 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "expiration_time:cancel_flight_info") {
*out = std::make_shared<ExpirationTimeCancelFlightInfoScenario>();
return Status::OK();
} else if (scenario_name == "expiration_time:close_flight_info") {
*out = std::make_shared<ExpirationTimeCloseFlightInfoScenario>();
return Status::OK();
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
return Status::OK();
Expand Down
9 changes: 0 additions & 9 deletions cpp/src/arrow/flight/sql/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,6 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
::arrow::Result<CancelResult> CancelQuery(const FlightCallOptions& options,
const FlightInfo& info);

/// \brief Explicitly close a FlightInfo.
///
/// \param[in] options RPC-layer hints for this call.
/// \param[in] info The FlightInfo to close.
/// \return Arrow status
Status CloseFlightInfo(const FlightCallOptions& options, const FlightInfo& info) {
return impl_->CloseFlightInfo(options, info);
}

/// \brief Extends the expiration of a FlightEndpoint.
///
/// \param[in] options RPC-layer hints for this call.
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,6 @@ Status FlightSqlServerBase::ListActions(const ServerCallContext& context,
std::vector<ActionType>* actions) {
*actions = {
ActionType::kCancelFlightInfo,
ActionType::kCloseFlightInfo,
ActionType::kRenewFlightEndpoint,
FlightSqlServerBase::kBeginSavepointActionType,
FlightSqlServerBase::kBeginTransactionActionType,
Expand All @@ -786,10 +785,6 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));

results.push_back(std::move(packed_result));
} else if (action.type == ActionType::kCloseFlightInfo.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto info, FlightInfo::Deserialize(body));
ARROW_RETURN_NOT_OK(CloseFlightInfo(context, *info));
} else if (action.type == ActionType::kRenewFlightEndpoint.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, RenewFlightEndpointRequest::Deserialize(body));
Expand Down Expand Up @@ -1097,11 +1092,6 @@ arrow::Result<CancelResult> FlightSqlServerBase::CancelQuery(
return static_cast<CancelResult>(result.status);
}

Status FlightSqlServerBase::CloseFlightInfo(const ServerCallContext& context,
const FlightInfo& info) {
return Status::NotImplemented("CloseFlightInfo not implemented");
}

arrow::Result<FlightEndpoint> FlightSqlServerBase::RenewFlightEndpoint(
const ServerCallContext& context, const RenewFlightEndpointRequest& request) {
return Status::NotImplemented("RenewFlightEndpoint not implemented");
Expand Down
7 changes: 0 additions & 7 deletions cpp/src/arrow/flight/sql/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,6 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
virtual arrow::Result<CancelResult> CancelQuery(
const ServerCallContext& context, const ActionCancelQueryRequest& request);

/// \brief Attempt to explicitly close a FlightInfo.
/// \param[in] context The call context.
/// \param[in] info The FlightInfo to close.
/// \return The close status.
virtual Status CloseFlightInfo(const ServerCallContext& context,
const FlightInfo& info);

/// \brief Attempt to explicitly renew a FlightEndpoint.
/// \param[in] context The call context.
/// \param[in] request The RenewFlightEndpointRequest.
Expand Down
6 changes: 0 additions & 6 deletions cpp/src/arrow/flight/sql/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -775,12 +775,6 @@ TEST_F(TestFlightSqlServer, CancelQuery) {
ARROW_UNSUPPRESS_DEPRECATION_WARNING
}

TEST_F(TestFlightSqlServer, CloseFlightInfo) {
// Not supported
ASSERT_OK_AND_ASSIGN(auto flight_info, sql_client->GetSqlInfo({}, {}));
ASSERT_RAISES(NotImplemented, sql_client->CloseFlightInfo({}, *flight_info));
}

TEST_F(TestFlightSqlServer, RenewFlightEndpoint) {
// Not supported
ASSERT_OK_AND_ASSIGN(auto flight_info, sql_client->GetSqlInfo({}, {}));
Expand Down
5 changes: 0 additions & 5 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,6 @@ const ActionType ActionType::kCancelFlightInfo =
"Explicitly cancel a running FlightInfo.\n"
"Request Message: CancelFlightInfoRequest\n"
"Response Message: CancelFlightInfoResult"};
const ActionType ActionType::kCloseFlightInfo =
ActionType{"CloseFlightInfo",
"Close the given FlightInfo explicitly.\n"
"Request Message: FlightInfo to be closed\n"
"Response Message: N/A"};
const ActionType ActionType::kRenewFlightEndpoint =
ActionType{"RenewFlightEndpoint",
"Extend expiration time of the given FlightEndpoint.\n"
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ struct ARROW_FLIGHT_EXPORT ActionType {
static arrow::Result<ActionType> Deserialize(std::string_view serialized);

static const ActionType kCancelFlightInfo;
static const ActionType kCloseFlightInfo;
static const ActionType kRenewFlightEndpoint;
};

Expand Down
6 changes: 0 additions & 6 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,6 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
"CancelFlightInfo are working as expected."),
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:close_flight_info",
description=("Ensure FlightEndpoint.expiration_time and "
"CloseFlightInfo are working as expected."),
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:renew_flight_endpoint",
description=("Ensure FlightEndpoint.expiration_time and "
Expand Down
6 changes: 0 additions & 6 deletions docs/source/format/Flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@ A client that wishes to download the data would:
``CancelFlightInfo`` action. The client need to use ``DoAction``
with ``CancelFlightInfo`` action type to cancel the ``FlightInfo``.

The client may be able to close the returned ``FlightInfo``
explicitly by ``CloseFlightInfo`` action. The client need to use
``DoAction`` with ``CloseFlightInfo`` action type to cancel the
``FlightInfo``. In general, the client don't need to close the
returned ``FlightInfo`` explicitly.

.. _google.protobuf.Timestamp: https://protobuf.dev/reference/protobuf/google.protobuf/#timestamp

Uploading Data
Expand Down
15 changes: 0 additions & 15 deletions go/arrow/flight/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type Client interface {
AuthenticateBasicToken(ctx context.Context, username string, password string, opts ...grpc.CallOption) (context.Context, error)
CancelFlightInfo(ctx context.Context, request *CancelFlightInfoRequest, opts ...grpc.CallOption) (CancelFlightInfoResult, error)
Close() error
CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) error
RenewFlightEndpoint(ctx context.Context, request *RenewFlightEndpointRequest, opts ...grpc.CallOption) (*FlightEndpoint, error)
// join the interface from the FlightServiceClient instead of re-defining all
// the endpoints here.
Expand Down Expand Up @@ -395,20 +394,6 @@ func (c *client) Close() error {
return nil
}

func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) {
var action flight.Action
action.Type = CloseFlightInfoActionType
action.Body, err = proto.Marshal(info)
if err != nil {
return
}
stream, err := c.DoAction(ctx, &action, opts...)
if err != nil {
return
}
return ReadUntilEOF(stream)
}

func (c *client) RenewFlightEndpoint(ctx context.Context, request *RenewFlightEndpointRequest, opts ...grpc.CallOption) (*FlightEndpoint, error) {
var err error
var action flight.Action
Expand Down
4 changes: 0 additions & 4 deletions go/arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,6 @@ func (c *Client) CancelFlightInfo(ctx context.Context, request *flight.CancelFli
return c.Client.CancelFlightInfo(ctx, request, opts...)
}

func (c *Client) CloseFlightInfo(ctx context.Context, info *flight.FlightInfo, opts ...grpc.CallOption) error {
return c.Client.CloseFlightInfo(ctx, info, opts...)
}

func (c *Client) RenewFlightEndpoint(ctx context.Context, request *flight.RenewFlightEndpointRequest, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) {
return c.Client.RenewFlightEndpoint(ctx, request, opts...)
}
Expand Down
16 changes: 0 additions & 16 deletions go/arrow/flight/flightsql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func (m *FlightServiceClientMock) CancelFlightInfo(ctx context.Context, request
return args.Get(0).(flight.CancelFlightInfoResult), args.Error(1)
}

func (m *FlightServiceClientMock) CloseFlightInfo(ctx context.Context, info *flight.FlightInfo, opts ...grpc.CallOption) error {
return m.Called(info, opts).Error(0)
}

func (m *FlightServiceClientMock) RenewFlightEndpoint(ctx context.Context, request *flight.RenewFlightEndpointRequest, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) {
args := m.Called(request, opts)
return args.Get(0).(*flight.FlightEndpoint), args.Error(1)
Expand Down Expand Up @@ -634,18 +630,6 @@ func (s *FlightSqlClientSuite) TestCancelFlightInfo() {
s.Equal(mockedCancelResult, cancelResult)
}

func (s *FlightSqlClientSuite) TestCloseFlightInfo() {
query := "SELECT * FROM data"
cmd := &pb.CommandStatementQuery{Query: query}
desc := getDesc(cmd)
s.mockClient.On("GetFlightInfo", desc.Type, desc.Cmd, s.callOpts).Return(&emptyFlightInfo, nil)
info, err := s.sqlClient.Execute(context.Background(), query, s.callOpts...)
s.NoError(err)
s.Equal(&emptyFlightInfo, info)
s.mockClient.On("CloseFlightInfo", info, s.callOpts).Return(nil)
s.NoError(s.sqlClient.CloseFlightInfo(context.TODO(), info, s.callOpts...))
}

func (s *FlightSqlClientSuite) TestRenewFlightEndpoint() {
query := "SELECT * FROM data"
cmd := &pb.CommandStatementQuery{Query: query}
Expand Down
18 changes: 0 additions & 18 deletions go/arrow/flight/flightsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,6 @@ func (BaseServer) CancelFlightInfo(context.Context, *flight.CancelFlightInfoRequ
status.Error(codes.Unimplemented, "CancelFlightInfo not implemented")
}

func (BaseServer) CloseFlightInfo(context.Context, *flight.FlightInfo) error {
return status.Error(codes.Unimplemented, "CloseFlightInfo not implemented")
}

func (BaseServer) RenewFlightEndpoint(context.Context, *flight.RenewFlightEndpointRequest) (*flight.FlightEndpoint, error) {
return nil, status.Error(codes.Unimplemented, "RenewFlightEndpoint not implemented")
}
Expand Down Expand Up @@ -654,8 +650,6 @@ type Server interface {
EndTransaction(context.Context, ActionEndTransactionRequest) error
// CancelFlightInfo attempts to explicitly cancel a FlightInfo
CancelFlightInfo(context.Context, *flight.CancelFlightInfoRequest) (flight.CancelFlightInfoResult, error)
// CloseFlightInfo attempts to explicitly close a FlightInfo
CloseFlightInfo(context.Context, *flight.FlightInfo) error
// RenewFlightEndpoint attempts to extend the expiration of a FlightEndpoint
RenewFlightEndpoint(context.Context, *flight.RenewFlightEndpointRequest) (*flight.FlightEndpoint, error)

Expand Down Expand Up @@ -927,7 +921,6 @@ func (f *flightSqlServer) DoPut(stream flight.FlightService_DoPutServer) error {
func (f *flightSqlServer) ListActions(_ *flight.Empty, stream flight.FlightService_ListActionsServer) error {
actions := []string{
flight.CancelFlightInfoActionType,
flight.CloseFlightInfoActionType,
flight.RenewFlightEndpointActionType,
CreatePreparedStatementActionType,
ClosePreparedStatementActionType,
Expand Down Expand Up @@ -1011,17 +1004,6 @@ func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightServi
return err
}
return stream.Send(out)
case flight.CloseFlightInfoActionType:
var (
info flight.FlightInfo
err error
)

if err = proto.Unmarshal(cmd.Body, &info); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CloseFlightInfo: %s", err.Error())
}

return f.srv.CloseFlightInfo(stream.Context(), &info)
case flight.RenewFlightEndpointActionType:
var (
request flight.RenewFlightEndpointRequest
Expand Down
Loading

0 comments on commit 7eb5aca

Please sign in to comment.