Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Replace deprecated grpc.Dial with grpc.NewClient #745

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/integration_tests/fakes/doppler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func DopplerEgressV1Client(addr string) (func(), plumbing.DopplerClient) {
)
Expect(err).ToNot(HaveOccurred())

out, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
out, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
return func() {
_ = out.Close()
Expand All @@ -37,7 +37,7 @@ func DopplerEgressV2Client(addr string) (func(), loggregator_v2.EgressClient) {
)
Expect(err).ToNot(HaveOccurred())

out, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
out, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
return func() {
_ = out.Close()
Expand All @@ -53,7 +53,7 @@ func DopplerIngressV1Client(addr string) (func(), plumbing.DopplerIngestor_Pushe
)
Expect(err).ToNot(HaveOccurred())

conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
client := plumbing.NewDopplerIngestorClient(conn)

Expand Down Expand Up @@ -91,7 +91,7 @@ func DopplerIngressV2Client(addr string) (func(), loggregator_v2.Ingress_SenderC
)
Expect(err).ToNot(HaveOccurred())

conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
Expect(err).ToNot(HaveOccurred())
client := loggregator_v2.NewIngressClient(conn)

Expand Down
2 changes: 1 addition & 1 deletion src/metricemitter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewClient(addr string, opts ...ClientOption) (*Client, error) {
opt(client)
}

conn, err := grpc.Dial(addr, client.dialOpts...)
conn, err := grpc.NewClient(addr, client.dialOpts...)
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion src/plumbing/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *Pool) connectToDoppler(addr string) {
for {
log.Printf("adding doppler %s", addr)

conn, err := grpc.Dial(addr, p.dialOpts...)
conn, err := grpc.NewClient(addr, p.dialOpts...)
if err != nil {
// TODO: We don't yet understand how this could happen, we should.
// TODO: Replace with exponential backoff.
Expand All @@ -79,3 +79,9 @@ func (p *Pool) connectToDoppler(addr string) {
return
}
}

func (p *Pool) Size() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.dopplers)
}
78 changes: 15 additions & 63 deletions src/plumbing/pool_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package plumbing_test

import (
"net"

"code.cloudfoundry.org/loggregator-release/src/plumbing"

"golang.org/x/net/context"
Expand All @@ -16,62 +14,33 @@ import (
var _ = Describe("Pool", func() {
var (
pool *plumbing.Pool

listeners []net.Listener
servers []*grpc.Server
)

BeforeEach(func() {
pool = plumbing.NewPool(grpc.WithTransportCredentials(insecure.NewCredentials()))
})

AfterEach(func() {
for _, lis := range listeners {
lis.Close()
}
listeners = nil

for _, server := range servers {
server.Stop()
}
servers = nil
})

Describe("Register() & Close()", func() {
var (
lis1, lis2 net.Listener
accepter1, accepter2 chan bool
)

Describe("Register()", func() {
BeforeEach(func() {
lis1, accepter1 = accepter(startListener("127.0.0.1:0"))
lis2, accepter2 = accepter(startListener("127.0.0.1:0"))
listeners = append(listeners, lis1, lis2)
pool.RegisterDoppler("192.0.2.10:8080")
pool.RegisterDoppler("192.0.2.11:8080")
})

Describe("Register()", func() {
It("fills pool with connections to each doppler", func() {
pool.RegisterDoppler(lis1.Addr().String())
pool.RegisterDoppler(lis2.Addr().String())

Eventually(accepter1).Should(HaveLen(1))
Eventually(accepter2).Should(HaveLen(1))
})
It("adds entries to the pool", func() {
Eventually(pool.Size).Should(Equal(2))
})
})

Describe("Close()", func() {
BeforeEach(func() {
pool.RegisterDoppler(lis1.Addr().String())
})

It("stops the gRPC connections", func() {
pool.Close(lis1.Addr().String())
lis1.Close()
Describe("Close()", func() {
BeforeEach(func() {
pool.RegisterDoppler("192.0.2.10:8080")
pool.RegisterDoppler("192.0.2.11:8080")
})

// Drain the channel
Eventually(accepter1, 5).ShouldNot(Receive())
Consistently(accepter1).Should(HaveLen(0))
})
It("removes entries from the pool", func() {
Eventually(pool.Size).Should(Equal(2))
pool.Close("192.0.2.11:8080")
Eventually(pool.Size).Should(Equal(1))
})
})

Expand Down Expand Up @@ -173,20 +142,3 @@ func fetchRx(
Eventually(f).ShouldNot(HaveOccurred())
return rx
}

func accepter(lis net.Listener) (net.Listener, chan bool) {
c := make(chan bool, 100)
go func() {
var dontGC []net.Conn
for {
conn, err := lis.Accept()
if err != nil {
return
}

dontGC = append(dontGC, conn) //nolint: staticcheck
c <- true
}
}()
return lis, c
}
8 changes: 4 additions & 4 deletions src/rlp-gateway/internal/ingress/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ type LogClient struct {

// NewClient dials the logs provider and returns a new log client.
func NewLogClient(creds credentials.TransportCredentials, logsProviderAddr string) *LogClient {
conn, err := grpc.Dial(logsProviderAddr,
conn, err := grpc.NewClient(logsProviderAddr,
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)),
grpc.WithBlock(),
)
if err != nil {
log.Fatalf("failed to dial logs provider: %s", err)
Expand All @@ -34,13 +33,14 @@ func NewLogClient(creds credentials.TransportCredentials, logsProviderAddr strin
}

// Stream opens a new stream on the log client.
func (c *LogClient) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) web.Receiver {
func (c *LogClient) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (web.Receiver, error) {
receiver, err := c.c.BatchedReceiver(ctx, req)
if err != nil {
log.Printf("failed to open stream from logs provider: %s", err)
return nil, err
}

return receiver.Recv
return receiver.Recv, nil
}

func (c *LogClient) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion src/rlp-gateway/internal/web/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Receiver func() (*loggregator_v2.EnvelopeBatch, error)
// LogsProvder defines the interface for opening streams to the
// logs provider
type LogsProvider interface {
Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) Receiver
Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (Receiver, error)
}

// Handler defines a struct for servering http endpoints
Expand Down
1 change: 1 addition & 0 deletions src/rlp-gateway/internal/web/json_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
errCounterNamePresentButEmpty = newJSONError(http.StatusBadRequest, "missing_counter_name", "counter.name is invalid without value")
errGaugeNamePresentButEmpty = newJSONError(http.StatusBadRequest, "missing_gauge_name", "gauge.name is invalid without value")
errStreamingUnsupported = newJSONError(http.StatusInternalServerError, "streaming_unsupported", "request does not support streaming")
errStreamingUnavailable = newJSONError(http.StatusServiceUnavailable, "streaming_unavailable", "streaming is temporarily unavailable")
errNotFound = newJSONError(http.StatusNotFound, "not_found", "resource not found")
)

Expand Down
18 changes: 11 additions & 7 deletions src/rlp-gateway/internal/web/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ func ReadHandler(
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

flusher.Flush()

recv := lp.Stream(
recv, err := lp.Stream(
ctx,
&loggregator_v2.EgressBatchRequest{
ShardId: query.Get("shard_id"),
Expand All @@ -62,6 +56,16 @@ func ReadHandler(
Selectors: s,
},
)
if err != nil {
errStreamingUnavailable.Write(w)
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

flusher.Flush()

data := make(chan *loggregator_v2.EnvelopeBatch)
errs := make(chan error, 1)
Expand Down
54 changes: 39 additions & 15 deletions src/rlp-gateway/internal/web/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var _ = Describe("Read", func() {

BeforeEach(func() {
lp = newStubLogsProvider()
lp._batchResponse = &loggregator_v2.EnvelopeBatch{
lp.resp = &loggregator_v2.EnvelopeBatch{
Batch: []*loggregator_v2.Envelope{
{
SourceId: "source-id-a",
Expand Down Expand Up @@ -150,7 +150,7 @@ var _ = Describe("Read", func() {
})

It("contains zero values for gauge metrics", func() {
lp._batchResponse = &loggregator_v2.EnvelopeBatch{
lp.resp = &loggregator_v2.EnvelopeBatch{
Batch: []*loggregator_v2.Envelope{
{
SourceId: "source-id-a",
Expand Down Expand Up @@ -310,9 +310,28 @@ var _ = Describe("Read", func() {
}).Should(Equal(io.EOF))
})

It("returns service unavailable when unable to stream from the logs provider", func() {
lp.streamErr = errors.New("streaming unavailable")
req, err := http.NewRequest(http.MethodGet, server.URL+"/v2/read?log", nil)
Expect(err).ToNot(HaveOccurred())

req = req.WithContext(ctx)

resp, err := server.Client().Do(req)
Expect(err).ToNot(HaveOccurred())
body, err := io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())

Expect(resp.StatusCode).To(Equal(http.StatusServiceUnavailable))
Expect(body).To(MatchJSON(`{
"error": "streaming_unavailable",
"message": "streaming is temporarily unavailable"
}`))
})

It("closes the SSE stream if the envelope stream returns any error", func() {
lp._batchResponse = nil
lp._errorResponse = errors.New("an error")
lp.resp = nil
lp.respErr = errors.New("an error")

req, err := http.NewRequest(http.MethodGet, server.URL+"/v2/read?log", nil)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -379,38 +398,43 @@ var _ = Describe("Read", func() {
})

type stubLogsProvider struct {
mu sync.Mutex
_requests []*loggregator_v2.EgressBatchRequest
_batchResponse *loggregator_v2.EnvelopeBatch
_errorResponse error
block bool
mu sync.Mutex
reqs []*loggregator_v2.EgressBatchRequest
resp *loggregator_v2.EnvelopeBatch
respErr error
block bool
streamErr error
}

func newStubLogsProvider() *stubLogsProvider {
return &stubLogsProvider{}
}

func (s *stubLogsProvider) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) web.Receiver {
func (s *stubLogsProvider) Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) (web.Receiver, error) {
s.mu.Lock()
defer s.mu.Unlock()
s._requests = append(s._requests, req)
s.reqs = append(s.reqs, req)

if s.streamErr != nil {
return nil, s.streamErr
}

return func() (*loggregator_v2.EnvelopeBatch, error) {
if s.block {
var block chan int
<-block
}

return s._batchResponse, s._errorResponse
}
return s.resp, s.respErr
}, nil
}

func (s *stubLogsProvider) requests() []*loggregator_v2.EgressBatchRequest {
s.mu.Lock()
defer s.mu.Unlock()

result := make([]*loggregator_v2.EgressBatchRequest, len(s._requests))
copy(result, s._requests)
result := make([]*loggregator_v2.EgressBatchRequest, len(s.reqs))
copy(result, s.reqs)

return result
}
Expand Down
2 changes: 1 addition & 1 deletion src/rlp/app/rlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func setupRLPClient(egressAddr string, testCerts *testservers.TestCerts) (loggre
)
Expect(err).ToNot(HaveOccurred())

conn, err := grpc.Dial(
conn, err := grpc.NewClient(
egressAddr,
grpc.WithTransportCredentials(ingressTLSCredentials),
)
Expand Down
8 changes: 7 additions & 1 deletion src/rlp/internal/ingress/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p *Pool) fetchClient(clients []unsafe.Pointer) loggregator_v2.EgressClient

func (p *Pool) connectToDoppler(addr string, clients []unsafe.Pointer, idx int) {
for {
conn, err := grpc.Dial(addr, p.dialOpts...)
conn, err := grpc.NewClient(addr, p.dialOpts...)
if err != nil {
// TODO: We don't yet understand how this could happen, we should.
// TODO: Replace with exponential backoff.
Expand All @@ -123,3 +123,9 @@ func (p *Pool) connectToDoppler(addr string, clients []unsafe.Pointer, idx int)
return
}
}

func (p *Pool) Size() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.dopplers)
}
Loading
Loading