Skip to content

Commit

Permalink
fix: Replace deprecated grpc.Dial with grpc.NewClient (#745)
Browse files Browse the repository at this point in the history
* Replace usages of grpc.Dial with grpc.NewClient

* Propagate errors in rlp log client streaming

Propogates errors received from the rlp via gRPC when sending requests.
Now that we don't block on dialing the server, we need to handle errors
at the sending requests level.

* chore: Make pool tests pass

Fixes the failing pool tests in `plumbing` and `rlp/internal/ingress`
following the change to use `grpc.NewClient` instead of `grpc.Dial`.
This is because they were testing that connections were initiated when a
new doppler was added. However, with the change to NewClient,
connections won't be made until the resulting grpc client connections
are used for RPC. This also means that dopplers may be added to the pool
map when in the past they would not be added due to the connection on
Dial failing.

One approach to getting the existing tests to pass would have been to
call `Connect` on the grpc client connections to force them to leave
idle mode. However, `Connect` is experimental, and really gRPC is
encouraging us not to care about the connection state when creating
client connections.

To fix the tests we ended up just asserting on the pool size. One
downside of this was that we couldn't see a nice way to assert that
`Close` was called on the gRPC client connection when `Close` was called
for a doppler address. We could have replaced the connection creation
with an interface and mocked that but it didn't seem worth it.

---------

Signed-off-by: Andrew Crump <andrew.crump@broadcom.com>
Signed-off-by: Carson Long <12767276+ctlong@users.noreply.github.com>
Signed-off-by: Rebecca Roberts <rebecca.roberts@broadcom.com>
  • Loading branch information
ctlong authored Apr 17, 2024
1 parent ed1b5fe commit a5b858b
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 168 deletions.
8 changes: 4 additions & 4 deletions src/integration_tests/fakes/doppler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func DopplerEgressV1Client(addr string) (func(), plumbing.DopplerClient) {
)
Expect(err).ToNot(HaveOccurred())

out, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
out, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
return func() {
_ = out.Close()
Expand All @@ -37,7 +37,7 @@ func DopplerEgressV2Client(addr string) (func(), loggregator_v2.EgressClient) {
)
Expect(err).ToNot(HaveOccurred())

out, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
out, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
return func() {
_ = out.Close()
Expand All @@ -53,7 +53,7 @@ func DopplerIngressV1Client(addr string) (func(), plumbing.DopplerIngestor_Pushe
)
Expect(err).ToNot(HaveOccurred())

conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
client := plumbing.NewDopplerIngestorClient(conn)

Expand Down Expand Up @@ -91,7 +91,7 @@ func DopplerIngressV2Client(addr string) (func(), loggregator_v2.Ingress_SenderC
)
Expect(err).ToNot(HaveOccurred())

conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
client := loggregator_v2.NewIngressClient(conn)

Expand Down
2 changes: 1 addition & 1 deletion src/metricemitter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewClient(addr string, opts ...ClientOption) (*Client, error) {
opt(client)
}

conn, err := grpc.Dial(addr, client.dialOpts...)
conn, err := grpc.NewClient(addr, client.dialOpts...)
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion src/plumbing/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *Pool) connectToDoppler(addr string) {
for {
log.Printf("adding doppler %s", addr)

conn, err := grpc.Dial(addr, p.dialOpts...)
conn, err := grpc.NewClient(addr, p.dialOpts...)
if err != nil {
// TODO: We don't yet understand how this could happen, we should.
// TODO: Replace with exponential backoff.
Expand All @@ -79,3 +79,9 @@ func (p *Pool) connectToDoppler(addr string) {
return
}
}

func (p *Pool) Size() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.dopplers)
}
78 changes: 15 additions & 63 deletions src/plumbing/pool_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package plumbing_test

import (
"net"

"code.cloudfoundry.org/loggregator-release/src/plumbing"

"golang.org/x/net/context"
Expand All @@ -16,62 +14,33 @@ import (
var _ = Describe("Pool", func() {
var (
pool *plumbing.Pool

listeners []net.Listener
servers []*grpc.Server
)

BeforeEach(func() {
pool = plumbing.NewPool(grpc.WithTransportCredentials(insecure.NewCredentials()))
})

AfterEach(func() {
for _, lis := range listeners {
lis.Close()
}
listeners = nil

for _, server := range servers {
server.Stop()
}
servers = nil
})

Describe("Register() & Close()", func() {
var (
lis1, lis2 net.Listener
accepter1, accepter2 chan bool
)

Describe("Register()", func() {
BeforeEach(func() {
lis1, accepter1 = accepter(startListener("127.0.0.1:0"))
lis2, accepter2 = accepter(startListener("127.0.0.1:0"))
listeners = append(listeners, lis1, lis2)
pool.RegisterDoppler("192.0.2.10:8080")
pool.RegisterDoppler("192.0.2.11:8080")
})

Describe("Register()", func() {
It("fills pool with connections to each doppler", func() {
pool.RegisterDoppler(lis1.Addr().String())
pool.RegisterDoppler(lis2.Addr().String())

Eventually(accepter1).Should(HaveLen(1))
Eventually(accepter2).Should(HaveLen(1))
})
It("adds entries to the pool", func() {
Eventually(pool.Size).Should(Equal(2))
})
})

Describe("Close()", func() {
BeforeEach(func() {
pool.RegisterDoppler(lis1.Addr().String())
})

It("stops the gRPC connections", func() {
pool.Close(lis1.Addr().String())
lis1.Close()
Describe("Close()", func() {
BeforeEach(func() {
pool.RegisterDoppler("192.0.2.10:8080")
pool.RegisterDoppler("192.0.2.11:8080")
})

// Drain the channel
Eventually(accepter1, 5).ShouldNot(Receive())
Consistently(accepter1).Should(HaveLen(0))
})
It("removes entries from the pool", func() {
Eventually(pool.Size).Should(Equal(2))
pool.Close("192.0.2.11:8080")
Eventually(pool.Size).Should(Equal(1))
})
})

Expand Down Expand Up @@ -173,20 +142,3 @@ func fetchRx(
Eventually(f).ShouldNot(HaveOccurred())
return rx
}

func accepter(lis net.Listener) (net.Listener, chan bool) {
c := make(chan bool, 100)
go func() {
var dontGC []net.Conn
for {
conn, err := lis.Accept()
if err != nil {
return
}

dontGC = append(dontGC, conn) //nolint: staticcheck
c <- true
}
}()
return lis, c
}
8 changes: 4 additions & 4 deletions src/rlp-gateway/internal/ingress/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ type LogClient struct {

// NewClient dials the logs provider and returns a new log client.
func NewLogClient(creds credentials.TransportCredentials, logsProviderAddr string) *LogClient {
conn, err := grpc.Dial(logsProviderAddr,
conn, err := grpc.NewClient(logsProviderAddr,
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)),
grpc.WithBlock(),
)
if err != nil {
log.Fatalf("failed to dial logs provider: %s", err)
Expand All @@ -34,13 +33,14 @@ func NewLogClient(creds credentials.TransportCredentials, logsProviderAddr strin
}

// Stream opens a new stream on the log client.
func (c *LogClient) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) web.Receiver {
func (c *LogClient) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (web.Receiver, error) {
receiver, err := c.c.BatchedReceiver(ctx, req)
if err != nil {
log.Printf("failed to open stream from logs provider: %s", err)
return nil, err
}

return receiver.Recv
return receiver.Recv, nil
}

func (c *LogClient) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion src/rlp-gateway/internal/web/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Receiver func() (*loggregator_v2.EnvelopeBatch, error)
// LogsProvder defines the interface for opening streams to the
// logs provider
type LogsProvider interface {
Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) Receiver
Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (Receiver, error)
}

// Handler defines a struct for servering http endpoints
Expand Down
1 change: 1 addition & 0 deletions src/rlp-gateway/internal/web/json_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
errCounterNamePresentButEmpty = newJSONError(http.StatusBadRequest, "missing_counter_name", "counter.name is invalid without value")
errGaugeNamePresentButEmpty = newJSONError(http.StatusBadRequest, "missing_gauge_name", "gauge.name is invalid without value")
errStreamingUnsupported = newJSONError(http.StatusInternalServerError, "streaming_unsupported", "request does not support streaming")
errStreamingUnavailable = newJSONError(http.StatusServiceUnavailable, "streaming_unavailable", "streaming is temporarily unavailable")
errNotFound = newJSONError(http.StatusNotFound, "not_found", "resource not found")
)

Expand Down
18 changes: 11 additions & 7 deletions src/rlp-gateway/internal/web/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ func ReadHandler(
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

flusher.Flush()

recv := lp.Stream(
recv, err := lp.Stream(
ctx,
&loggregator_v2.EgressBatchRequest{
ShardId: query.Get("shard_id"),
Expand All @@ -62,6 +56,16 @@ func ReadHandler(
Selectors: s,
},
)
if err != nil {
errStreamingUnavailable.Write(w)
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

flusher.Flush()

data := make(chan *loggregator_v2.EnvelopeBatch)
errs := make(chan error, 1)
Expand Down
54 changes: 39 additions & 15 deletions src/rlp-gateway/internal/web/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var _ = Describe("Read", func() {

BeforeEach(func() {
lp = newStubLogsProvider()
lp._batchResponse = &loggregator_v2.EnvelopeBatch{
lp.resp = &loggregator_v2.EnvelopeBatch{
Batch: []*loggregator_v2.Envelope{
{
SourceId: "source-id-a",
Expand Down Expand Up @@ -150,7 +150,7 @@ var _ = Describe("Read", func() {
})

It("contains zero values for gauge metrics", func() {
lp._batchResponse = &loggregator_v2.EnvelopeBatch{
lp.resp = &loggregator_v2.EnvelopeBatch{
Batch: []*loggregator_v2.Envelope{
{
SourceId: "source-id-a",
Expand Down Expand Up @@ -310,9 +310,28 @@ var _ = Describe("Read", func() {
}).Should(Equal(io.EOF))
})

It("returns service unavailable when unable to stream from the logs provider", func() {
lp.streamErr = errors.New("streaming unavailable")
req, err := http.NewRequest(http.MethodGet, server.URL+"/v2/read?log", nil)
Expect(err).ToNot(HaveOccurred())

req = req.WithContext(ctx)

resp, err := server.Client().Do(req)
Expect(err).ToNot(HaveOccurred())
body, err := io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())

Expect(resp.StatusCode).To(Equal(http.StatusServiceUnavailable))
Expect(body).To(MatchJSON(`{
"error": "streaming_unavailable",
"message": "streaming is temporarily unavailable"
}`))
})

It("closes the SSE stream if the envelope stream returns any error", func() {
lp._batchResponse = nil
lp._errorResponse = errors.New("an error")
lp.resp = nil
lp.respErr = errors.New("an error")

req, err := http.NewRequest(http.MethodGet, server.URL+"/v2/read?log", nil)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -379,38 +398,43 @@ var _ = Describe("Read", func() {
})

type stubLogsProvider struct {
mu sync.Mutex
_requests []*loggregator_v2.EgressBatchRequest
_batchResponse *loggregator_v2.EnvelopeBatch
_errorResponse error
block bool
mu sync.Mutex
reqs []*loggregator_v2.EgressBatchRequest
resp *loggregator_v2.EnvelopeBatch
respErr error
block bool
streamErr error
}

func newStubLogsProvider() *stubLogsProvider {
return &stubLogsProvider{}
}

func (s *stubLogsProvider) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) web.Receiver {
func (s *stubLogsProvider) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (web.Receiver, error) {
s.mu.Lock()
defer s.mu.Unlock()
s._requests = append(s._requests, req)
s.reqs = append(s.reqs, req)

if s.streamErr != nil {
return nil, s.streamErr
}

return func() (*loggregator_v2.EnvelopeBatch, error) {
if s.block {
var block chan int
<-block
}

return s._batchResponse, s._errorResponse
}
return s.resp, s.respErr
}, nil
}

func (s *stubLogsProvider) requests() []*loggregator_v2.EgressBatchRequest {
s.mu.Lock()
defer s.mu.Unlock()

result := make([]*loggregator_v2.EgressBatchRequest, len(s._requests))
copy(result, s._requests)
result := make([]*loggregator_v2.EgressBatchRequest, len(s.reqs))
copy(result, s.reqs)

return result
}
Expand Down
2 changes: 1 addition & 1 deletion src/rlp/app/rlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func setupRLPClient(egressAddr string, testCerts *testservers.TestCerts) (loggre
)
Expect(err).ToNot(HaveOccurred())

conn, err := grpc.Dial(
conn, err := grpc.NewClient(
egressAddr,
grpc.WithTransportCredentials(ingressTLSCredentials),
)
Expand Down
8 changes: 7 additions & 1 deletion src/rlp/internal/ingress/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p *Pool) fetchClient(clients []unsafe.Pointer) loggregator_v2.EgressClient

func (p *Pool) connectToDoppler(addr string, clients []unsafe.Pointer, idx int) {
for {
conn, err := grpc.Dial(addr, p.dialOpts...)
conn, err := grpc.NewClient(addr, p.dialOpts...)
if err != nil {
// TODO: We don't yet understand how this could happen, we should.
// TODO: Replace with exponential backoff.
Expand All @@ -123,3 +123,9 @@ func (p *Pool) connectToDoppler(addr string, clients []unsafe.Pointer, idx int)
return
}
}

func (p *Pool) Size() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.dopplers)
}
Loading

0 comments on commit a5b858b

Please sign in to comment.