Skip to content

Commit

Permalink
Moves concurrency limit to backend level and optimize backend respons…
Browse files Browse the repository at this point in the history
…e handler
  • Loading branch information
ksysoev committed Jun 1, 2024
1 parent 8bab7df commit 4fa4d46
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 13 deletions.
34 changes: 29 additions & 5 deletions backend/http.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package backend

import (
"bytes"
"fmt"
"io"
"net/http"
"time"

"github.com/ksysoev/wasabi"
)

const defaultTimeout = 30 * time.Second
const defaultMaxReqPerHost = 50

// HTTPBackend represents an HTTP backend for handling requests.
type HTTPBackend struct {
Expand All @@ -18,6 +20,7 @@ type HTTPBackend struct {

type httpBackendConfig struct {
defaultTimeout time.Duration
maxReqPerHost int
}

type HTTPBackendOption func(*httpBackendConfig)
Expand All @@ -26,6 +29,7 @@ type HTTPBackendOption func(*httpBackendConfig)
func NewBackend(factory RequestFactory, options ...HTTPBackendOption) *HTTPBackend {
httpBackendConfig := &httpBackendConfig{
defaultTimeout: defaultTimeout,
maxReqPerHost: defaultMaxReqPerHost,
}

for _, option := range options {
Expand All @@ -36,10 +40,15 @@ func NewBackend(factory RequestFactory, options ...HTTPBackendOption) *HTTPBacke
factory: factory,
client: &http.Client{
Timeout: httpBackendConfig.defaultTimeout,
Transport: &http.Transport{
MaxConnsPerHost: httpBackendConfig.maxReqPerHost,
},
},
}
}

// Handle handles the incoming request by sending it to the HTTP server and returning the response.
// It takes a connection and a request as parameters and returns an error if any.
func (b *HTTPBackend) Handle(conn wasabi.Connection, r wasabi.Request) error {
httpReq, err := b.factory(r)
if err != nil {
Expand All @@ -55,18 +64,33 @@ func (b *HTTPBackend) Handle(conn wasabi.Connection, r wasabi.Request) error {

defer resp.Body.Close()

respBody := bytes.NewBuffer(make([]byte, 0))
length := 0
if resp.ContentLength > 0 {
length = int(resp.ContentLength)
} else {
return fmt.Errorf("response content length is unknown")
}

_, err = respBody.ReadFrom(resp.Body)
if err != nil {
body := make([]byte, length)
_, err = resp.Body.Read(body)

if err != nil && err != io.EOF {
return err
}

return conn.Send(wasabi.MsgTypeText, respBody.Bytes())
return conn.Send(wasabi.MsgTypeText, body)
}

// WithTimeout sets the default timeout for the HTTP client.
func WithTimeout(timeout time.Duration) HTTPBackendOption {
return func(cfg *httpBackendConfig) {
cfg.defaultTimeout = timeout
}
}

// WithMaxRequestsPerHost sets the maximum number of requests per host.
func WithMaxRequestsPerHost(maxReqPerHost int) HTTPBackendOption {
return func(cfg *httpBackendConfig) {
cfg.maxReqPerHost = maxReqPerHost
}
}
17 changes: 17 additions & 0 deletions backend/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,20 @@ func TestHTTPBackend_Handle_TimeoutRequestByContext(t *testing.T) {
t.Errorf("Expected error to be %v, but got %v", context.DeadlineExceeded, err)
}
}
func TestWithMaxRequestsPerHost(t *testing.T) {
maxReqPerHost := 100

backend := NewBackend(nil, WithMaxRequestsPerHost(maxReqPerHost))

if backend.client.Transport.(*http.Transport).MaxConnsPerHost != maxReqPerHost {
t.Errorf("Expected MaxConnsPerHost to be %v, but got %v", maxReqPerHost, backend.client.Transport.(*http.Transport).MaxConnsPerHost)
}
}

func TestWithMaxRequestsPerHost_DefaultValue(t *testing.T) {
backend := NewBackend(nil)

if backend.client.Transport.(*http.Transport).MaxConnsPerHost != defaultMaxReqPerHost {
t.Errorf("Expected MaxConnsPerHost to be %v, but got %v", defaultMaxReqPerHost, backend.client.Transport.(*http.Transport).MaxConnsPerHost)
}
}
8 changes: 0 additions & 8 deletions examples/http_backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ksysoev/wasabi/backend"
"github.com/ksysoev/wasabi/channel"
"github.com/ksysoev/wasabi/dispatch"
"github.com/ksysoev/wasabi/middleware/request"
"github.com/ksysoev/wasabi/server"
)

Expand All @@ -31,16 +30,9 @@ func main() {
return httpReq, nil
})

ErrHandler := request.NewErrorHandlingMiddleware(func(conn wasabi.Connection, req wasabi.Request, err error) error {
conn.Send(wasabi.MsgTypeText, []byte("Failed to process request: "+err.Error()))
return nil
})

dispatcher := dispatch.NewRouterDispatcher(backend, func(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) wasabi.Request {
return dispatch.NewRawRequest(conn.Context(), msgType, data)
})
dispatcher.Use(ErrHandler)
dispatcher.Use(request.NewTrottlerMiddleware(100))

channel := channel.NewChannel("/", dispatcher, channel.NewConnectionRegistry(), channel.WithOriginPatterns("*"))

Expand Down

0 comments on commit 4fa4d46

Please sign in to comment.