Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 25, 2023
1 parent 2df8da8 commit 2b6c56c
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 35 deletions.
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ class ExpirationTimeServer : public FlightServerBase {
cancel_status = CancelStatus::kNotCancellable;
}
}
auto cancel_result = CancelFlightInfoResult{cancel_status};
CancelFlightInfoResult cancel_result{cancel_status};
ARROW_ASSIGN_OR_RAISE(auto serialized, cancel_result.SerializeToString());
results.push_back(Result{Buffer::FromString(std::move(serialized))});
} else if (action.type == ActionType::kRenewFlightEndpoint.type) {
Expand Down Expand Up @@ -1653,9 +1653,9 @@ class FlightSqlExtensionScenario : public FlightSqlScenario {
ARROW_RETURN_NOT_OK(ValidateSchema(GetQuerySchema(), *schema));

ARROW_ASSIGN_OR_RAISE(info, sql_client->ExecuteSubstrait({}, kSubstraitPlan));
// TODO: Use CancelQuery() instead of CancelFlightInfo() here
// because some Flight SQL implementations still don't support
// CancelFlightInfo yet.
// TODO: Use CancelFLightInfo() instead of CancelQuery() here. We
// use CancelQuery() here for now because some Flight SQL
// implementations still don't support CancelFlightInfo yet.
ARROW_SUPPRESS_DEPRECATION_WARNING
ARROW_ASSIGN_OR_RAISE(auto cancel_result, sql_client->CancelQuery({}, *info));
ARROW_UNSUPPRESS_DEPRECATION_WARNING
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,6 @@ Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {
return Status::OK();
}

// CancelFlightInfoRequest

Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request) {
FlightInfo::Data data;
RETURN_NOT_OK(FromProto(pb_request.info(), &data));
request->info = std::make_unique<FlightInfo>(std::move(data));
return Status::OK();
}

Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* basic_auth) {
basic_auth->password = pb_basic_auth.password();
basic_auth->username = pb_basic_auth.username();
Expand Down Expand Up @@ -297,6 +287,16 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
return Status::OK();
}

// CancelFlightInfoRequest

Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request) {
FlightInfo::Data data;
RETURN_NOT_OK(FromProto(pb_request.info(), &data));
request->info = std::make_unique<FlightInfo>(std::move(data));
return Status::OK();
}

Status ToProto(const CancelFlightInfoRequest& request,
pb::CancelFlightInfoRequest* pb_request) {
RETURN_NOT_OK(ToProto(*request.info, pb_request->mutable_info()));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/sql/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
/// \brief Explicitly cancel a FlightInfo.
///
/// \param[in] options RPC-layer hints for this call.
/// \param[in] info The FlightInfo to cancel.
/// \param[in] request The CancelFlightInfoRequest.
/// \return Arrow result with a canceled result.
::arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ arrow::Result<CancelFlightInfoResult> CancelFlightInfoResult::Deserialize(
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_result.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid ActionCancelFlightInfoResult");
return Status::Invalid("Not a valid CancelFlightInfoResult");
}
CancelFlightInfoResult out;
RETURN_NOT_OK(internal::FromProto(pb_result, &out));
Expand Down
3 changes: 3 additions & 0 deletions format/FlightSql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,9 @@ message ActionCancelQueryRequest {
* The result of cancelling a query.
*
* The result should be wrapped in a google.protobuf.Any message.
*
* This command is deprecated since 13.0.0. Use the "CancelFlightInfo"
* action with DoAction instead.
*/
message ActionCancelQueryResult {
option deprecated = true;
Expand Down
18 changes: 5 additions & 13 deletions go/arrow/flight/flightsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,25 +977,17 @@ func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightServi
case flight.CancelFlightInfoActionType:
var (
request flight.CancelFlightInfoRequest
result flight.CancelFlightInfoResult
err error
result flight.CancelFlightInfoResult
err error
)

if err = proto.Unmarshal(cmd.Body, &request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal CancelFlightInfoRequest for CancelFlightInfo: %s", err.Error())
}

if cancel, ok := f.srv.(cancelQueryServer); ok {
cancelResult, err := cancel.CancelQuery(stream.Context(), &cancelQueryRequest{request.Info})
if err != nil {
return err
}
result.Status = cancelResultToCancelStatus(cancelResult)
} else {
result, err = f.srv.CancelFlightInfo(stream.Context(), &request)
if err != nil {
return err
}
result, err = f.srv.CancelFlightInfo(stream.Context(), &request)
if err != nil {
return err
}

out := &pb.Result{}
Expand Down
6 changes: 0 additions & 6 deletions go/arrow/internal/flight_integration/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,6 @@ type expirationTimeEndpointStatus struct {
expirationTime *time.Time
numGets uint32
cancelled bool
closed bool
}

type expirationTimeScenarioTester struct {
Expand Down Expand Up @@ -734,7 +733,6 @@ func (tester *expirationTimeScenarioTester) AppendGetFlightInfo(endpoints []*fli
expirationTime: expirationTime,
numGets: 0,
cancelled: false,
closed: false,
}
return endpoints
}
Expand Down Expand Up @@ -781,10 +779,6 @@ func (tester *expirationTimeScenarioTester) DoGet(tkt *flight.Ticket, fs flight.
return err
}
st := tester.statuses[index]
if st.closed {
return status.Errorf(codes.InvalidArgument,
"Invalid flight: closed: %s", ticket)
}
if st.cancelled {
return status.Errorf(codes.InvalidArgument,
"Invalid flight: cancelled: %s", ticket)
Expand Down

0 comments on commit 2b6c56c

Please sign in to comment.