From f90ef88fe4da9f77723adf5af2d49d45d40f45a3 Mon Sep 17 00:00:00 2001 From: Kirill Sysoev Date: Sun, 7 Apr 2024 14:15:43 +0800 Subject: [PATCH] Moves throttiling logic from backend to middleware --- backend.go | 16 ---------------- examples/http_backend/main.go | 1 + middleware/request/throttler.go | 22 ++++++++++++++++++++++ 3 files changed, 23 insertions(+), 16 deletions(-) create mode 100644 middleware/request/throttler.go diff --git a/backend.go b/backend.go index d57b086..bbf7ed3 100644 --- a/backend.go +++ b/backend.go @@ -10,18 +10,12 @@ type RequestFactory func(req Request) (*http.Request, error) type HTTPBackend struct { factory RequestFactory client *http.Client - sem chan struct{} } -const ( - MaxConcurrentRequests = 50 -) - func NewBackend(factory RequestFactory) *HTTPBackend { return &HTTPBackend{ factory: factory, client: &http.Client{}, - sem: make(chan struct{}, MaxConcurrentRequests), } } @@ -31,17 +25,7 @@ func (b *HTTPBackend) Handle(conn Connection, r Request) error { return err } - ctx := r.Context() - - select { - case <-ctx.Done(): - return ctx.Err() - case b.sem <- struct{}{}: - } - resp, err := b.client.Do(httpReq) - <-b.sem - if err != nil { return err } diff --git a/examples/http_backend/main.go b/examples/http_backend/main.go index 726eb4e..fcc00da 100644 --- a/examples/http_backend/main.go +++ b/examples/http_backend/main.go @@ -47,6 +47,7 @@ func main() { connRegistry := wasabi.NewDefaultConnectionRegistry() dispatcher := wasabi.NewPipeDispatcher(backend) dispatcher.Use(ErrHandler) + dispatcher.Use(request.NewTrottlerMiddleware(10)) server := wasabi.NewServer(Port) channel := wasabi.NewDefaultChannel("/", dispatcher, connRegistry) diff --git a/middleware/request/throttler.go b/middleware/request/throttler.go new file mode 100644 index 0000000..1d86e19 --- /dev/null +++ b/middleware/request/throttler.go @@ -0,0 +1,22 @@ +package request + +import "github.com/ksysoev/wasabi" + +type token struct{} + +func NewTrottlerMiddleware(limit uint) func(next wasabi.RequestHandler) wasabi.RequestHandler { + sem := make(chan token, limit) + + return func(next wasabi.RequestHandler) wasabi.RequestHandler { + return wasabi.RequestHandlerFunc(func(conn wasabi.Connection, req wasabi.Request) error { + + select { + case sem <- token{}: + defer func() { <-sem }() + return next.Handle(conn, req) + case <-conn.Context().Done(): + return conn.Context().Err() + } + }) + } +}