From 871763ff0f368abe45c19bc9adfe8ee5febcc55c Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Mon, 6 Nov 2023 20:44:36 +0000
Subject: [PATCH] Bump google.golang.org/grpc from 1.57.0 to 1.57.1

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.57.0 to 1.57.1.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.57.0...v1.57.1)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
---
 go.mod                                        |  2 +-
 go.sum                                        |  4 +-
 .../grpc/internal/transport/http2_server.go   | 11 +--
 vendor/google.golang.org/grpc/server.go       | 69 +++++++++++++------
 vendor/google.golang.org/grpc/version.go      |  2 +-
 vendor/modules.txt                            |  2 +-
 6 files changed, 56 insertions(+), 34 deletions(-)

diff --git a/go.mod b/go.mod
index 4fe4351..f6f40e6 100644
--- a/go.mod
+++ b/go.mod
@@ -21,6 +21,6 @@ require (
 	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
 	google.golang.org/api v0.134.0 // indirect
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20230731193218-e0aa005b6bdf // indirect
-	google.golang.org/grpc v1.57.0 // indirect
+	google.golang.org/grpc v1.57.1 // indirect
 	google.golang.org/protobuf v1.31.0 // indirect
 )
diff --git a/go.sum b/go.sum
index 3d5902f..cbaf4e7 100644
--- a/go.sum
+++ b/go.sum
@@ -180,8 +180,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
 google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
 google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
 google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
-google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
-google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
+google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg=
+google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index f960640..0bc7a7d 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -171,15 +171,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
 		ID:  http2.SettingMaxFrameSize,
 		Val: http2MaxFrameLen,
 	}}
-	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
-	// permitted in the HTTP2 spec.
-	maxStreams := config.MaxStreams
-	if maxStreams == 0 {
-		maxStreams = math.MaxUint32
-	} else {
+	if config.MaxStreams != math.MaxUint32 {
 		isettings = append(isettings, http2.Setting{
 			ID:  http2.SettingMaxConcurrentStreams,
-			Val: maxStreams,
+			Val: config.MaxStreams,
 		})
 	}
 	dynamicWindow := true
@@ -258,7 +253,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
 		framer:            framer,
 		readerDone:        make(chan struct{}),
 		writerDone:        make(chan struct{}),
-		maxStreams:        maxStreams,
+		maxStreams:        config.MaxStreams,
 		inTapHandle:       config.InTapHandle,
 		fc:                &trInFlow{limit: uint32(icwz)},
 		state:             reachable,
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index e076ec7..b44c7e8 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -115,12 +115,6 @@ type serviceInfo struct {
 	mdata       interface{}
 }
 
-type serverWorkerData struct {
-	st     transport.ServerTransport
-	wg     *sync.WaitGroup
-	stream *transport.Stream
-}
-
 // Server is a gRPC server to serve RPC requests.
 type Server struct {
 	opts serverOptions
@@ -145,7 +139,7 @@ type Server struct {
 	channelzID *channelz.Identifier
 	czData     *channelzData
 
-	serverWorkerChannel chan *serverWorkerData
+	serverWorkerChannel chan func()
 }
 
 type serverOptions struct {
@@ -178,6 +172,7 @@ type serverOptions struct {
 }
 
 var defaultServerOptions = serverOptions{
+	maxConcurrentStreams:  math.MaxUint32,
 	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
 	maxSendMessageSize:    defaultServerMaxSendMessageSize,
 	connectionTimeout:     120 * time.Second,
@@ -389,6 +384,9 @@ func MaxSendMsgSize(m int) ServerOption {
 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
 // of concurrent streams to each ServerTransport.
 func MaxConcurrentStreams(n uint32) ServerOption {
+	if n == 0 {
+		n = math.MaxUint32
+	}
 	return newFuncServerOption(func(o *serverOptions) {
 		o.maxConcurrentStreams = n
 	})
@@ -590,24 +588,19 @@ const serverWorkerResetThreshold = 1 << 16
 // [1] https://github.com/golang/go/issues/18138
 func (s *Server) serverWorker() {
 	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
-		data, ok := <-s.serverWorkerChannel
+		f, ok := <-s.serverWorkerChannel
 		if !ok {
 			return
 		}
-		s.handleSingleStream(data)
+		f()
 	}
 	go s.serverWorker()
 }
 
-func (s *Server) handleSingleStream(data *serverWorkerData) {
-	defer data.wg.Done()
-	s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
-}
-
 // initServerWorkers creates worker goroutines and a channel to process incoming
 // connections to reduce the time spent overall on runtime.morestack.
 func (s *Server) initServerWorkers() {
-	s.serverWorkerChannel = make(chan *serverWorkerData)
+	s.serverWorkerChannel = make(chan func())
 	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
 		go s.serverWorker()
 	}
@@ -966,21 +959,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
 	defer st.Close(errors.New("finished serving streams for the server transport"))
 	var wg sync.WaitGroup
 
+	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
 	st.HandleStreams(func(stream *transport.Stream) {
 		wg.Add(1)
+
+		streamQuota.acquire()
+		f := func() {
+			defer streamQuota.release()
+			defer wg.Done()
+			s.handleStream(st, stream, s.traceInfo(st, stream))
+		}
+
 		if s.opts.numServerWorkers > 0 {
-			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
 			select {
-			case s.serverWorkerChannel <- data:
+			case s.serverWorkerChannel <- f:
 				return
 			default:
 				// If all stream workers are busy, fallback to the default code path.
 			}
 		}
-		go func() {
-			defer wg.Done()
-			s.handleStream(st, stream, s.traceInfo(st, stream))
-		}()
+		go f()
 	}, func(ctx context.Context, method string) context.Context {
 		if !EnableTracing {
 			return ctx
@@ -2075,3 +2073,32 @@ func validateSendCompressor(name, clientCompressors string) error {
 	}
 	return fmt.Errorf("client does not support compressor %q", name)
 }
+
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be
+// called synchronously; release may be called asynchronously.
+type atomicSemaphore struct {
+	n    int64 // accessed atomically
+	wait chan struct{}
+}
+
+func (q *atomicSemaphore) acquire() {
+	if atomic.AddInt64(&q.n, -1) < 0 {
+		// We ran out of quota.  Block until a release happens.
+		<-q.wait
+	}
+}
+
+func (q *atomicSemaphore) release() {
+	// N.B. the "<= 0" check below should allow for this to work with multiple
+	// concurrent calls to acquire, but also note that with synchronous calls to
+	// acquire, as our system does, n will never be less than -1.  There are
+	// fairness issues (queuing) to consider if this was to be generalized.
+	if atomic.AddInt64(&q.n, -1) <= 0 {
+		// An acquire was waiting on us.  Unblock it.
+		q.wait <- struct{}{}
+	}
+}
+
+func newHandlerQuota(n uint32) *atomicSemaphore {
+	return &atomicSemaphore{n: int64(n), wait: make(chan struct{}, 1)}
+}
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 353cfd5..12a5a9d 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
 package grpc
 
 // Version is the current grpc version.
-const Version = "1.57.0"
+const Version = "1.57.1"
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 81d0b29..991cd41 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -85,7 +85,7 @@ google.golang.org/api/internal/third_party/uritemplates
 google.golang.org/genproto/googleapis/rpc/code
 google.golang.org/genproto/googleapis/rpc/errdetails
 google.golang.org/genproto/googleapis/rpc/status
-# google.golang.org/grpc v1.57.0
+# google.golang.org/grpc v1.57.1
 ## explicit; go 1.17
 google.golang.org/grpc
 google.golang.org/grpc/attributes