From 7eb5aca0597f9399e8d88c1e5f041b9651167bf5 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Sun, 25 Jun 2023 06:05:51 +0900 Subject: [PATCH] Remove CloseFlightInfo --- cpp/src/arrow/flight/client.cc | 9 --- cpp/src/arrow/flight/client.h | 8 --- .../flight_integration_test.cc | 4 -- .../integration_tests/test_integration.cc | 52 ----------------- cpp/src/arrow/flight/sql/client.h | 9 --- cpp/src/arrow/flight/sql/server.cc | 10 ---- cpp/src/arrow/flight/sql/server.h | 7 --- cpp/src/arrow/flight/sql/server_test.cc | 6 -- cpp/src/arrow/flight/types.cc | 5 -- cpp/src/arrow/flight/types.h | 1 - dev/archery/archery/integration/runner.py | 6 -- docs/source/format/Flight.rst | 6 -- go/arrow/flight/client.go | 15 ----- go/arrow/flight/flightsql/client.go | 4 -- go/arrow/flight/flightsql/client_test.go | 16 ------ go/arrow/flight/flightsql/server.go | 18 ------ go/arrow/flight/flightsql/server_test.go | 9 --- go/arrow/flight/server.go | 1 - .../internal/flight_integration/scenario.go | 57 ------------------- 19 files changed, 243 deletions(-) diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 0f0727e929dbb..e5e9f141aa62b 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -593,15 +593,6 @@ arrow::Result FlightClient::RenewFlightEndpoint( return std::move(renewed_endpoint); } -Status FlightClient::CloseFlightInfo(const FlightCallOptions& options, - const FlightInfo& info) { - ARROW_ASSIGN_OR_RAISE(auto body, info.SerializeToString()); - Action action{ActionType::kCloseFlightInfo.type, Buffer::FromString(body)}; - ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action)); - ARROW_RETURN_NOT_OK(stream->Drain()); - return Status::OK(); -} - arrow::Result> FlightClient::ListActions( const FlightCallOptions& options) { std::vector actions; diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h index 3db14adad0995..730605c25848b 100644 --- a/cpp/src/arrow/flight/client.h +++ b/cpp/src/arrow/flight/client.h @@ -259,14 +259,6 @@ class ARROW_FLIGHT_EXPORT FlightClient { return CancelFlightInfo({}, request); } - /// \brief Perform the CloseFlightInfo action - /// - /// \param[in] options Per-RPC options - /// \param[in] info The FlightInfo to be closed - /// \return Arrow status - Status CloseFlightInfo(const FlightCallOptions& options, const FlightInfo& info); - Status CloseFlightInfo(const FlightInfo& info) { return CloseFlightInfo({}, info); } - /// \brief Perform the RenewFlightEndpoint action, returning a renewed /// FlightEndpoint /// diff --git a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc index a5c05b13a7934..d2f14fb01b0a8 100644 --- a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc +++ b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc @@ -67,10 +67,6 @@ TEST(FlightIntegration, ExpirationTimeCancelFlightInfo) { ASSERT_OK(RunScenario("expiration_time:cancel_flight_info")); } -TEST(FlightIntegration, ExpirationTimeCloseFlightInfo) { - ASSERT_OK(RunScenario("expiration_time:close_flight_info")); -} - TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) { ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint")); } diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc index dfbef0f18c250..2d75d16edce5a 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc @@ -436,10 +436,6 @@ class OrderedScenario : public Scenario { /// a returned FlightInfo by pre-defined RenewFlightEndpoint /// action. The client can read data from endpoints multiple times /// within more 10 seconds after the action. -/// -/// The client can close a returned FlightInfo explicitly by -/// pre-defined CloseFlightInfo action. The client can't read data -/// from endpoints even within 3 seconds after the action. class ExpirationTimeServer : public FlightServerBase { private: struct EndpointStatus { @@ -449,7 +445,6 @@ class ExpirationTimeServer : public FlightServerBase { std::optional expiration_time; uint32_t num_gets = 0; bool cancelled = false; - bool closed = false; }; public: @@ -476,9 +471,6 @@ class ExpirationTimeServer : public FlightServerBase { std::unique_ptr* stream) override { ARROW_ASSIGN_OR_RAISE(auto index, ExtractIndexFromTicket(request.ticket)); auto& status = statuses_[index]; - if (status.closed) { - return Status::KeyError("Invalid flight: closed: ", request.ticket); - } if (status.cancelled) { return Status::KeyError("Invalid flight: canceled: ", request.ticket); } @@ -532,17 +524,6 @@ class ExpirationTimeServer : public FlightServerBase { auto cancel_result = CancelFlightInfoResult{cancel_status}; ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString()); results.push_back(Result{Buffer::FromString(std::move(serialized))}); - } else if (action.type == ActionType::kCloseFlightInfo.type) { - ARROW_ASSIGN_OR_RAISE(auto info, - FlightInfo::Deserialize(std::string_view(*action.body))); - for (const auto& endpoint : info->endpoints()) { - auto index_result = ExtractIndexFromTicket(endpoint.ticket.ticket); - if (!index_result.ok()) { - continue; - } - auto index = *index_result; - statuses_[index].closed = true; - } } else if (action.type == ActionType::kRenewFlightEndpoint.type) { ARROW_ASSIGN_OR_RAISE(auto request, RenewFlightEndpointRequest::Deserialize( std::string_view(*action.body))); @@ -567,7 +548,6 @@ class ExpirationTimeServer : public FlightServerBase { std::vector* actions) override { *actions = { ActionType::kCancelFlightInfo, - ActionType::kCloseFlightInfo, ActionType::kRenewFlightEndpoint, }; return Status::OK(); @@ -680,7 +660,6 @@ class ExpirationTimeListActionsScenario : public Scenario { std::sort(actual_action_types.begin(), actual_action_types.end()); std::vector expected_action_types = { "CancelFlightInfo", - "CloseFlightInfo", "RenewFlightEndpoint", }; if (actual_action_types != expected_action_types) { @@ -727,34 +706,6 @@ class ExpirationTimeCancelFlightInfoScenario : public Scenario { } }; -/// \brief The expiration time scenario - CloseFlightInfo. -/// -/// This tests that the client can close a FlightInfo explicitly and -/// the server returns an error for DoGet against endpoints in the -/// closed FlightInfo. -class ExpirationTimeCloseFlightInfoScenario : public Scenario { - Status MakeServer(std::unique_ptr* server, - FlightServerOptions* options) override { - *server = std::make_unique(); - return Status::OK(); - } - - Status MakeClient(FlightClientOptions* options) override { return Status::OK(); } - - Status RunClient(std::unique_ptr client) override { - ARROW_ASSIGN_OR_RAISE(auto info, - client->GetFlightInfo(FlightDescriptor::Command("expiration"))); - ARROW_RETURN_NOT_OK(client->CloseFlightInfo(*info)); - for (const auto& endpoint : info->endpoints()) { - auto reader = client->DoGet(endpoint.ticket); - if (reader.ok()) { - return Status::Invalid("DoGet after CloseFlightInfo must be failed"); - } - } - return Status::OK(); - } -}; - /// \brief The expiration time scenario - RenewFlightEndpoint. /// /// This tests that the client can renew a FlightEndpoint and read @@ -1871,9 +1822,6 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr* } else if (scenario_name == "expiration_time:cancel_flight_info") { *out = std::make_shared(); return Status::OK(); - } else if (scenario_name == "expiration_time:close_flight_info") { - *out = std::make_shared(); - return Status::OK(); } else if (scenario_name == "expiration_time:renew_flight_endpoint") { *out = std::make_shared(); return Status::OK(); diff --git a/cpp/src/arrow/flight/sql/client.h b/cpp/src/arrow/flight/sql/client.h index f03095cfb7fe5..357aeddf6ce12 100644 --- a/cpp/src/arrow/flight/sql/client.h +++ b/cpp/src/arrow/flight/sql/client.h @@ -350,15 +350,6 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient { ::arrow::Result CancelQuery(const FlightCallOptions& options, const FlightInfo& info); - /// \brief Explicitly close a FlightInfo. - /// - /// \param[in] options RPC-layer hints for this call. - /// \param[in] info The FlightInfo to close. - /// \return Arrow status - Status CloseFlightInfo(const FlightCallOptions& options, const FlightInfo& info) { - return impl_->CloseFlightInfo(options, info); - } - /// \brief Extends the expiration of a FlightEndpoint. /// /// \param[in] options RPC-layer hints for this call. diff --git a/cpp/src/arrow/flight/sql/server.cc b/cpp/src/arrow/flight/sql/server.cc index cf7e330c09bed..ba13d81d7d478 100644 --- a/cpp/src/arrow/flight/sql/server.cc +++ b/cpp/src/arrow/flight/sql/server.cc @@ -761,7 +761,6 @@ Status FlightSqlServerBase::ListActions(const ServerCallContext& context, std::vector* actions) { *actions = { ActionType::kCancelFlightInfo, - ActionType::kCloseFlightInfo, ActionType::kRenewFlightEndpoint, FlightSqlServerBase::kBeginSavepointActionType, FlightSqlServerBase::kBeginTransactionActionType, @@ -786,10 +785,6 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context, ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result))); results.push_back(std::move(packed_result)); - } else if (action.type == ActionType::kCloseFlightInfo.type) { - std::string_view body(*action.body); - ARROW_ASSIGN_OR_RAISE(auto info, FlightInfo::Deserialize(body)); - ARROW_RETURN_NOT_OK(CloseFlightInfo(context, *info)); } else if (action.type == ActionType::kRenewFlightEndpoint.type) { std::string_view body(*action.body); ARROW_ASSIGN_OR_RAISE(auto request, RenewFlightEndpointRequest::Deserialize(body)); @@ -1097,11 +1092,6 @@ arrow::Result FlightSqlServerBase::CancelQuery( return static_cast(result.status); } -Status FlightSqlServerBase::CloseFlightInfo(const ServerCallContext& context, - const FlightInfo& info) { - return Status::NotImplemented("CloseFlightInfo not implemented"); -} - arrow::Result FlightSqlServerBase::RenewFlightEndpoint( const ServerCallContext& context, const RenewFlightEndpointRequest& request) { return Status::NotImplemented("RenewFlightEndpoint not implemented"); diff --git a/cpp/src/arrow/flight/sql/server.h b/cpp/src/arrow/flight/sql/server.h index e848225577641..360677c078c81 100644 --- a/cpp/src/arrow/flight/sql/server.h +++ b/cpp/src/arrow/flight/sql/server.h @@ -614,13 +614,6 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase { virtual arrow::Result CancelQuery( const ServerCallContext& context, const ActionCancelQueryRequest& request); - /// \brief Attempt to explicitly close a FlightInfo. - /// \param[in] context The call context. - /// \param[in] info The FlightInfo to close. - /// \return The close status. - virtual Status CloseFlightInfo(const ServerCallContext& context, - const FlightInfo& info); - /// \brief Attempt to explicitly renew a FlightEndpoint. /// \param[in] context The call context. /// \param[in] request The RenewFlightEndpointRequest. diff --git a/cpp/src/arrow/flight/sql/server_test.cc b/cpp/src/arrow/flight/sql/server_test.cc index e06736f3a0ddd..42e0c7d7a1f12 100644 --- a/cpp/src/arrow/flight/sql/server_test.cc +++ b/cpp/src/arrow/flight/sql/server_test.cc @@ -775,12 +775,6 @@ TEST_F(TestFlightSqlServer, CancelQuery) { ARROW_UNSUPPRESS_DEPRECATION_WARNING } -TEST_F(TestFlightSqlServer, CloseFlightInfo) { - // Not supported - ASSERT_OK_AND_ASSIGN(auto flight_info, sql_client->GetSqlInfo({}, {})); - ASSERT_RAISES(NotImplemented, sql_client->CloseFlightInfo({}, *flight_info)); -} - TEST_F(TestFlightSqlServer, RenewFlightEndpoint) { // Not supported ASSERT_OK_AND_ASSIGN(auto flight_info, sql_client->GetSqlInfo({}, {})); diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index 7070455f65ad5..20fda6939150e 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -596,11 +596,6 @@ const ActionType ActionType::kCancelFlightInfo = "Explicitly cancel a running FlightInfo.\n" "Request Message: CancelFlightInfoRequest\n" "Response Message: CancelFlightInfoResult"}; -const ActionType ActionType::kCloseFlightInfo = - ActionType{"CloseFlightInfo", - "Close the given FlightInfo explicitly.\n" - "Request Message: FlightInfo to be closed\n" - "Response Message: N/A"}; const ActionType ActionType::kRenewFlightEndpoint = ActionType{"RenewFlightEndpoint", "Extend expiration time of the given FlightEndpoint.\n" diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index 4f06fd622b75e..9978dea408db1 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -179,7 +179,6 @@ struct ARROW_FLIGHT_EXPORT ActionType { static arrow::Result Deserialize(std::string_view serialized); static const ActionType kCancelFlightInfo; - static const ActionType kCloseFlightInfo; static const ActionType kRenewFlightEndpoint; }; diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index fc0bcfb6c2a88..af092b92e89a8 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -454,12 +454,6 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, "CancelFlightInfo are working as expected."), skip={"JS", "C#", "Rust"}, ), - Scenario( - "expiration_time:close_flight_info", - description=("Ensure FlightEndpoint.expiration_time and " - "CloseFlightInfo are working as expected."), - skip={"JS", "C#", "Rust"}, - ), Scenario( "expiration_time:renew_flight_endpoint", description=("Ensure FlightEndpoint.expiration_time and " diff --git a/docs/source/format/Flight.rst b/docs/source/format/Flight.rst index e50487f2d1316..97818a2e17bf7 100644 --- a/docs/source/format/Flight.rst +++ b/docs/source/format/Flight.rst @@ -151,12 +151,6 @@ A client that wishes to download the data would: ``CancelFlightInfo`` action. The client need to use ``DoAction`` with ``CancelFlightInfo`` action type to cancel the ``FlightInfo``. - The client may be able to close the returned ``FlightInfo`` - explicitly by ``CloseFlightInfo`` action. The client need to use - ``DoAction`` with ``CloseFlightInfo`` action type to cancel the - ``FlightInfo``. In general, the client don't need to close the - returned ``FlightInfo`` explicitly. - .. _google.protobuf.Timestamp: https://protobuf.dev/reference/protobuf/google.protobuf/#timestamp Uploading Data diff --git a/go/arrow/flight/client.go b/go/arrow/flight/client.go index dbeced4384ca9..31ffc26cfd35a 100644 --- a/go/arrow/flight/client.go +++ b/go/arrow/flight/client.go @@ -68,7 +68,6 @@ type Client interface { AuthenticateBasicToken(ctx context.Context, username string, password string, opts ...grpc.CallOption) (context.Context, error) CancelFlightInfo(ctx context.Context, request *CancelFlightInfoRequest, opts ...grpc.CallOption) (CancelFlightInfoResult, error) Close() error - CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) error RenewFlightEndpoint(ctx context.Context, request *RenewFlightEndpointRequest, opts ...grpc.CallOption) (*FlightEndpoint, error) // join the interface from the FlightServiceClient instead of re-defining all // the endpoints here. @@ -395,20 +394,6 @@ func (c *client) Close() error { return nil } -func (c *client) CloseFlightInfo(ctx context.Context, info *FlightInfo, opts ...grpc.CallOption) (err error) { - var action flight.Action - action.Type = CloseFlightInfoActionType - action.Body, err = proto.Marshal(info) - if err != nil { - return - } - stream, err := c.DoAction(ctx, &action, opts...) - if err != nil { - return - } - return ReadUntilEOF(stream) -} - func (c *client) RenewFlightEndpoint(ctx context.Context, request *RenewFlightEndpointRequest, opts ...grpc.CallOption) (*FlightEndpoint, error) { var err error var action flight.Action diff --git a/go/arrow/flight/flightsql/client.go b/go/arrow/flight/flightsql/client.go index 4b1c95d78b182..76c9f6fb01d32 100644 --- a/go/arrow/flight/flightsql/client.go +++ b/go/arrow/flight/flightsql/client.go @@ -538,10 +538,6 @@ func (c *Client) CancelFlightInfo(ctx context.Context, request *flight.CancelFli return c.Client.CancelFlightInfo(ctx, request, opts...) } -func (c *Client) CloseFlightInfo(ctx context.Context, info *flight.FlightInfo, opts ...grpc.CallOption) error { - return c.Client.CloseFlightInfo(ctx, info, opts...) -} - func (c *Client) RenewFlightEndpoint(ctx context.Context, request *flight.RenewFlightEndpointRequest, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) { return c.Client.RenewFlightEndpoint(ctx, request, opts...) } diff --git a/go/arrow/flight/flightsql/client_test.go b/go/arrow/flight/flightsql/client_test.go index cfb017c73f1e8..2b57596fb188c 100644 --- a/go/arrow/flight/flightsql/client_test.go +++ b/go/arrow/flight/flightsql/client_test.go @@ -65,10 +65,6 @@ func (m *FlightServiceClientMock) CancelFlightInfo(ctx context.Context, request return args.Get(0).(flight.CancelFlightInfoResult), args.Error(1) } -func (m *FlightServiceClientMock) CloseFlightInfo(ctx context.Context, info *flight.FlightInfo, opts ...grpc.CallOption) error { - return m.Called(info, opts).Error(0) -} - func (m *FlightServiceClientMock) RenewFlightEndpoint(ctx context.Context, request *flight.RenewFlightEndpointRequest, opts ...grpc.CallOption) (*flight.FlightEndpoint, error) { args := m.Called(request, opts) return args.Get(0).(*flight.FlightEndpoint), args.Error(1) @@ -634,18 +630,6 @@ func (s *FlightSqlClientSuite) TestCancelFlightInfo() { s.Equal(mockedCancelResult, cancelResult) } -func (s *FlightSqlClientSuite) TestCloseFlightInfo() { - query := "SELECT * FROM data" - cmd := &pb.CommandStatementQuery{Query: query} - desc := getDesc(cmd) - s.mockClient.On("GetFlightInfo", desc.Type, desc.Cmd, s.callOpts).Return(&emptyFlightInfo, nil) - info, err := s.sqlClient.Execute(context.Background(), query, s.callOpts...) - s.NoError(err) - s.Equal(&emptyFlightInfo, info) - s.mockClient.On("CloseFlightInfo", info, s.callOpts).Return(nil) - s.NoError(s.sqlClient.CloseFlightInfo(context.TODO(), info, s.callOpts...)) -} - func (s *FlightSqlClientSuite) TestRenewFlightEndpoint() { query := "SELECT * FROM data" cmd := &pb.CommandStatementQuery{Query: query} diff --git a/go/arrow/flight/flightsql/server.go b/go/arrow/flight/flightsql/server.go index 319142727770a..f54b2b999cfa2 100644 --- a/go/arrow/flight/flightsql/server.go +++ b/go/arrow/flight/flightsql/server.go @@ -520,10 +520,6 @@ func (BaseServer) CancelFlightInfo(context.Context, *flight.CancelFlightInfoRequ status.Error(codes.Unimplemented, "CancelFlightInfo not implemented") } -func (BaseServer) CloseFlightInfo(context.Context, *flight.FlightInfo) error { - return status.Error(codes.Unimplemented, "CloseFlightInfo not implemented") -} - func (BaseServer) RenewFlightEndpoint(context.Context, *flight.RenewFlightEndpointRequest) (*flight.FlightEndpoint, error) { return nil, status.Error(codes.Unimplemented, "RenewFlightEndpoint not implemented") } @@ -654,8 +650,6 @@ type Server interface { EndTransaction(context.Context, ActionEndTransactionRequest) error // CancelFlightInfo attempts to explicitly cancel a FlightInfo CancelFlightInfo(context.Context, *flight.CancelFlightInfoRequest) (flight.CancelFlightInfoResult, error) - // CloseFlightInfo attempts to explicitly close a FlightInfo - CloseFlightInfo(context.Context, *flight.FlightInfo) error // RenewFlightEndpoint attempts to extend the expiration of a FlightEndpoint RenewFlightEndpoint(context.Context, *flight.RenewFlightEndpointRequest) (*flight.FlightEndpoint, error) @@ -927,7 +921,6 @@ func (f *flightSqlServer) DoPut(stream flight.FlightService_DoPutServer) error { func (f *flightSqlServer) ListActions(_ *flight.Empty, stream flight.FlightService_ListActionsServer) error { actions := []string{ flight.CancelFlightInfoActionType, - flight.CloseFlightInfoActionType, flight.RenewFlightEndpointActionType, CreatePreparedStatementActionType, ClosePreparedStatementActionType, @@ -1011,17 +1004,6 @@ func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightServi return err } return stream.Send(out) - case flight.CloseFlightInfoActionType: - var ( - info flight.FlightInfo - err error - ) - - if err = proto.Unmarshal(cmd.Body, &info); err != nil { - return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CloseFlightInfo: %s", err.Error()) - } - - return f.srv.CloseFlightInfo(stream.Context(), &info) case flight.RenewFlightEndpointActionType: var ( request flight.RenewFlightEndpointRequest diff --git a/go/arrow/flight/flightsql/server_test.go b/go/arrow/flight/flightsql/server_test.go index cd8fcc27e5d6d..9ced8e0ed6cdf 100644 --- a/go/arrow/flight/flightsql/server_test.go +++ b/go/arrow/flight/flightsql/server_test.go @@ -371,15 +371,6 @@ func (s *UnimplementedFlightSqlServerSuite) TestCancelFlightInfo() { s.Equal("CancelFlightInfo not implemented", st.Message()) } -func (s *UnimplementedFlightSqlServerSuite) TestCloseFlightInfo() { - info := flight.FlightInfo{} - err := s.cl.CloseFlightInfo(context.TODO(), &info) - st, ok := status.FromError(err) - s.True(ok) - s.Equal(codes.Unimplemented, st.Code()) - s.Equal("CloseFlightInfo not implemented", st.Message()) -} - func (s *UnimplementedFlightSqlServerSuite) TestRenewFlightEndpoint() { endpoint := flight.FlightEndpoint{} request := flight.RenewFlightEndpointRequest{Endpoint: &endpoint} diff --git a/go/arrow/flight/server.go b/go/arrow/flight/server.go index b469f069be94a..1dd02d0defaed 100644 --- a/go/arrow/flight/server.go +++ b/go/arrow/flight/server.go @@ -59,7 +59,6 @@ type ( // Constants for Action types const ( CancelFlightInfoActionType = "CancelFlightInfo" - CloseFlightInfoActionType = "CloseFlightInfo" RenewFlightEndpointActionType = "RenewFlightEndpoint" ) diff --git a/go/arrow/internal/flight_integration/scenario.go b/go/arrow/internal/flight_integration/scenario.go index 12a1754e5bfe1..3cc325db1066f 100644 --- a/go/arrow/internal/flight_integration/scenario.go +++ b/go/arrow/internal/flight_integration/scenario.go @@ -66,8 +66,6 @@ func GetScenario(name string, args ...string) Scenario { return &expirationTimeListActionsScenarioTester{} case "expiration_time:cancel_flight_info": return &expirationTimeCancelFlightInfoScenarioTester{} - case "expiration_time:close_flight_info": - return &expirationTimeCloseFlightInfoScenarioTester{} case "expiration_time:renew_flight_endpoint": return &expirationTimeRenewFlightEndpointScenarioTester{} case "flight_sql": @@ -826,7 +824,6 @@ func (tester *expirationTimeScenarioTester) DoGet(tkt *flight.Ticket, fs flight. func (tester *expirationTimeScenarioTester) ListActions(_ *flight.Empty, stream flight.FlightService_ListActionsServer) error { actions := []string{ flight.CancelFlightInfoActionType, - flight.CloseFlightInfoActionType, flight.RenewFlightEndpointActionType, } @@ -884,23 +881,6 @@ func (tester *expirationTimeScenarioTester) DoAction(cmd *flight.Action, stream return err } return nil - case flight.CloseFlightInfoActionType: - var info flight.FlightInfo - if err := proto.Unmarshal(cmd.Body, &info); err != nil { - return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error()) - } - - for _, ep := range info.Endpoint { - ticket := string(ep.Ticket.Ticket) - index, err := tester.ExtractIndexFromTicket(ticket) - if err != nil { - continue - } - st := tester.statuses[index] - st.closed = true - tester.statuses[index] = st - } - return nil case flight.RenewFlightEndpointActionType: var request flight.RenewFlightEndpointRequest if err := proto.Unmarshal(cmd.Body, &request); err != nil { @@ -1059,7 +1039,6 @@ func (tester *expirationTimeListActionsScenarioTester) RunClient(addr string, op sort.Strings(actionTypeNames) expectedActionTypeNames := []string{ "CancelFlightInfo", - "CloseFlightInfo", "RenewFlightEndpoint", } if !reflect.DeepEqual(actionTypeNames, expectedActionTypeNames) { @@ -1115,42 +1094,6 @@ func (tester *expirationTimeCancelFlightInfoScenarioTester) RunClient(addr strin return nil } -type expirationTimeCloseFlightInfoScenarioTester struct { - expirationTimeScenarioTester -} - -func (tester *expirationTimeCloseFlightInfoScenarioTester) RunClient(addr string, opts ...grpc.DialOption) error { - client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...) - if err != nil { - return err - } - defer client.Close() - - ctx := context.Background() - info, err := client.GetFlightInfo(ctx, &flight.FlightDescriptor{Type: flight.DescriptorCMD, Cmd: []byte("expiration_time")}) - if err != nil { - return err - } - - err = client.CloseFlightInfo(ctx, info) - if err != nil && !errors.Is(err, io.EOF) { - return err - } - for _, ep := range info.Endpoint { - stream, err := client.DoGet(ctx, ep.Ticket) - if err != nil { - return err - } - rdr, err := flight.NewRecordReader(stream) - if err == nil { - rdr.Release() - return fmt.Errorf("DoGet after CloseFlightInfo must be failed") - } - } - - return nil -} - type expirationTimeRenewFlightEndpointScenarioTester struct { expirationTimeScenarioTester }