Skip to content

Commit

Permalink
Factor out common Authenticate logic
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 30, 2023
1 parent 0a4b6ad commit fadb6eb
Showing 1 changed file with 29 additions and 39 deletions.
68 changes: 29 additions & 39 deletions cpp/src/arrow/flight/transport/grpc/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,52 +732,16 @@ class GrpcClientImpl : public internal::ClientTransport {
std::unique_ptr<ClientAuthHandler> auth_handler) override {
auth_handler_ = std::move(auth_handler);
ClientRpc rpc(options);
std::shared_ptr<
::grpc::ClientReaderWriter<pb::HandshakeRequest, pb::HandshakeResponse>>
stream = stub_->Handshake(&rpc.context);
GrpcClientAuthSender outgoing{stream};
GrpcClientAuthReader incoming{stream};
RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
// Explicitly close our side of the connection
bool finished_writes = stream->WritesDone();
if (!finished_writes) {
return MakeFlightError(FlightStatusCode::Internal,
"Could not finish writing before closing");
}
// Drain the read side, as otherwise gRPC Finish() will hang. We
// only call Finish() when the client closes the writer or the
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side.
pb::HandshakeResponse response;
while (stream->Read(&response)) {
}
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
return Status::OK();
return AuthenticateInternal(rpc);
}

arrow::Result<std::pair<std::string, std::string>> AuthenticateBasicToken(
const FlightCallOptions& options, const std::string& username,
const std::string& password) override {
// Add basic auth headers to outgoing headers.
ClientRpc rpc(options);
// Add basic auth headers to outgoing headers.
AddBasicAuthHeaders(&rpc.context, username, password);
std::shared_ptr<
::grpc::ClientReaderWriter<pb::HandshakeRequest, pb::HandshakeResponse>>
stream = stub_->Handshake(&rpc.context);
// Explicitly close our side of the connection.
bool finished_writes = stream->WritesDone();
if (!finished_writes) {
return MakeFlightError(FlightStatusCode::Internal,
"Could not finish writing before closing");
}
// Drain the read side, as otherwise gRPC Finish() will hang. We
// only call Finish() when the client closes the writer or the
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side.
pb::HandshakeResponse response;
while (stream->Read(&response)) {
}
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
RETURN_NOT_OK(AuthenticateInternal(rpc));
// Grab bearer token from incoming headers.
return GetBearerTokenHeader(rpc.context);
}
Expand Down Expand Up @@ -907,6 +871,32 @@ class GrpcClientImpl : public internal::ClientTransport {
}

private:
Status AuthenticateInternal(ClientRpc& rpc) {
std::shared_ptr<
::grpc::ClientReaderWriter<pb::HandshakeRequest, pb::HandshakeResponse>>
stream = stub_->Handshake(&rpc.context);
if (auth_handler_) {
GrpcClientAuthSender outgoing{stream};
GrpcClientAuthReader incoming{stream};
RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
}
// Explicitly close our side of the connection
bool finished_writes = stream->WritesDone();
if (!finished_writes) {
return MakeFlightError(FlightStatusCode::Internal,
"Could not finish writing before closing");
}
// Drain the read side, as otherwise gRPC Finish() will hang. We
// only call Finish() when the client closes the writer or the
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side.
pb::HandshakeResponse response;
while (stream->Read(&response)) {
}
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
return Status::OK();
}

std::unique_ptr<pb::FlightService::Stub> stub_;
std::shared_ptr<ClientAuthHandler> auth_handler_;
#if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS) && \
Expand Down

0 comments on commit fadb6eb

Please sign in to comment.