Skip to content

Commit

Permalink
Add support for CancelQuery() method
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 23, 2023
1 parent 28cd13b commit 5d10166
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions go/arrow/flight/flightsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ type ActionCancelQueryRequest interface {
GetInfo() *flight.FlightInfo
}

type cancelQueryRequest struct {
info *flight.FlightInfo
}

func (c *cancelQueryRequest) GetInfo() *flight.FlightInfo { return c.info }

type cancelQueryServer interface {
CancelQuery(context.Context, ActionCancelQueryRequest) (CancelResult, error)
}

type ActionEndTransactionRequest interface {
GetTransactionId() []byte
GetAction() EndTransactionRequestType
Expand Down Expand Up @@ -937,6 +947,21 @@ func (f *flightSqlServer) ListActions(_ *flight.Empty, stream flight.FlightServi
return nil
}

func cancelResultToCancelStatus(result CancelResult) flight.CancelStatus {
switch result {
case CancelResultUnspecified:
return flight.CancelStatusUnspecified
case CancelResultCancelled:
return flight.CancelStatusCancelled
case CancelResultCancelling:
return flight.CancelStatusCancelling
case CancelResultNotCancellable:
return flight.CancelStatusNotCancellable
default:
return flight.CancelStatusUnspecified
}
}

func cancelStatusToCancelResult(status flight.CancelStatus) CancelResult {
switch status {
case flight.CancelStatusUnspecified:
Expand Down Expand Up @@ -967,8 +992,17 @@ func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightServi
return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CancelFlightInfo: %s", err.Error())
}

if result, err = f.srv.CancelFlightInfo(stream.Context(), &info); err != nil {
return err
if cancel, ok := f.srv.(cancelQueryServer); ok {
cancelResult, err := cancel.CancelQuery(stream.Context(), &cancelQueryRequest{&info})
if err != nil {
return err
}
result.Status = cancelResultToCancelStatus(cancelResult)
} else {
result, err = f.srv.CancelFlightInfo(stream.Context(), &info)
if err != nil {
return err
}
}

out := &pb.Result{}
Expand Down Expand Up @@ -1081,11 +1115,18 @@ func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightServi
return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CancelQuery: %s", err)
}

cancelFlightInfoResult, err := f.srv.CancelFlightInfo(stream.Context(), &info)
if err != nil {
return err
if cancel, ok := f.srv.(cancelQueryServer); ok {
result.Result, err = cancel.CancelQuery(stream.Context(), &cancelQueryRequest{&info})
if err != nil {
return err
}
} else {
cancelFlightInfoResult, err := f.srv.CancelFlightInfo(stream.Context(), &info)
if err != nil {
return err
}
result.Result = cancelStatusToCancelResult(cancelFlightInfoResult.Status)
}
result.Result = cancelStatusToCancelResult(cancelFlightInfoResult.Status)

out, err := packActionResult(&result)
if err != nil {
Expand Down

0 comments on commit 5d10166

Please sign in to comment.