diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc index ea75c22f14d43..a1d0e3266b4e6 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc @@ -732,52 +732,16 @@ class GrpcClientImpl : public internal::ClientTransport { std::unique_ptr auth_handler) override { auth_handler_ = std::move(auth_handler); ClientRpc rpc(options); - std::shared_ptr< - ::grpc::ClientReaderWriter> - stream = stub_->Handshake(&rpc.context); - GrpcClientAuthSender outgoing{stream}; - GrpcClientAuthReader incoming{stream}; - RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming)); - // Explicitly close our side of the connection - bool finished_writes = stream->WritesDone(); - if (!finished_writes) { - return MakeFlightError(FlightStatusCode::Internal, - "Could not finish writing before closing"); - } - // Drain the read side, as otherwise gRPC Finish() will hang. We - // only call Finish() when the client closes the writer or the - // reader finishes, so it's OK to assume the client no longer - // wants to read and drain the read side. - pb::HandshakeResponse response; - while (stream->Read(&response)) { - } - RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context)); - return Status::OK(); + return AuthenticateInternal(rpc); } arrow::Result> AuthenticateBasicToken( const FlightCallOptions& options, const std::string& username, const std::string& password) override { - // Add basic auth headers to outgoing headers. ClientRpc rpc(options); + // Add basic auth headers to outgoing headers. AddBasicAuthHeaders(&rpc.context, username, password); - std::shared_ptr< - ::grpc::ClientReaderWriter> - stream = stub_->Handshake(&rpc.context); - // Explicitly close our side of the connection. - bool finished_writes = stream->WritesDone(); - if (!finished_writes) { - return MakeFlightError(FlightStatusCode::Internal, - "Could not finish writing before closing"); - } - // Drain the read side, as otherwise gRPC Finish() will hang. We - // only call Finish() when the client closes the writer or the - // reader finishes, so it's OK to assume the client no longer - // wants to read and drain the read side. - pb::HandshakeResponse response; - while (stream->Read(&response)) { - } - RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context)); + RETURN_NOT_OK(AuthenticateInternal(rpc)); // Grab bearer token from incoming headers. return GetBearerTokenHeader(rpc.context); } @@ -907,6 +871,32 @@ class GrpcClientImpl : public internal::ClientTransport { } private: + Status AuthenticateInternal(ClientRpc& rpc) { + std::shared_ptr< + ::grpc::ClientReaderWriter> + stream = stub_->Handshake(&rpc.context); + if (auth_handler_) { + GrpcClientAuthSender outgoing{stream}; + GrpcClientAuthReader incoming{stream}; + RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming)); + } + // Explicitly close our side of the connection + bool finished_writes = stream->WritesDone(); + if (!finished_writes) { + return MakeFlightError(FlightStatusCode::Internal, + "Could not finish writing before closing"); + } + // Drain the read side, as otherwise gRPC Finish() will hang. We + // only call Finish() when the client closes the writer or the + // reader finishes, so it's OK to assume the client no longer + // wants to read and drain the read side. + pb::HandshakeResponse response; + while (stream->Read(&response)) { + } + RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context)); + return Status::OK(); + } + std::unique_ptr stub_; std::shared_ptr auth_handler_; #if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS) && \