diff --git a/backend.go b/backend.go index 659d26b..d57b086 100644 --- a/backend.go +++ b/backend.go @@ -2,47 +2,56 @@ package wasabi import ( "bytes" - "log/slog" "net/http" ) -type Backend interface { - Handle(conn Connection, r Request) error -} +type RequestFactory func(req Request) (*http.Request, error) type HTTPBackend struct { - endpoint string + factory RequestFactory + client *http.Client + sem chan struct{} } -func NewBackend(endpoint string) *HTTPBackend { - return &HTTPBackend{endpoint: endpoint} +const ( + MaxConcurrentRequests = 50 +) + +func NewBackend(factory RequestFactory) *HTTPBackend { + return &HTTPBackend{ + factory: factory, + client: &http.Client{}, + sem: make(chan struct{}, MaxConcurrentRequests), + } } func (b *HTTPBackend) Handle(conn Connection, r Request) error { - req := string(r.Data()) - body := bytes.NewBufferString(req) - httpReq, err := http.NewRequest("POST", b.endpoint, body) - + httpReq, err := b.factory(r) if err != nil { return err } - httpReq.Header.Set("Content-Type", "application/json") + ctx := r.Context() + + select { + case <-ctx.Done(): + return ctx.Err() + case b.sem <- struct{}{}: + } - client := &http.Client{} - resp, err := client.Do(httpReq) + resp, err := b.client.Do(httpReq) + <-b.sem if err != nil { - slog.Error("Error sending request", "error", err) return err } + defer resp.Body.Close() respBody := bytes.NewBuffer(make([]byte, 0)) _, err = respBody.ReadFrom(resp.Body) if err != nil { - slog.Error("Error reading response body", "error", err) return err } diff --git a/channel.go b/channel.go index abf8ff3..87d9b20 100644 --- a/channel.go +++ b/channel.go @@ -7,11 +7,9 @@ import ( "golang.org/x/net/websocket" ) -// Channel is interface for channels -type Channel interface { - Path() string - SetContext(ctx context.Context) - Handler() http.Handler +// Dispatcher is interface for dispatchers +type Dispatcher interface { + Dispatch(conn Connection, data []byte) } // Middlewere is interface for middlewares diff --git a/cmd/web/main.go b/cmd/web/main.go index 6a66a22..b3ea33c 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -1,8 +1,10 @@ package main import ( + "bytes" "context" "log/slog" + "net/http" "os" "github.com/ksysoev/wasabi" @@ -15,7 +17,18 @@ const ( func main() { slog.LogAttrs(context.Background(), slog.LevelDebug, "") - backend := wasabi.NewBackend("http://localhost:8081") + backend := wasabi.NewBackend(func(req wasabi.Request) (*http.Request, error) { + bodyReader := bytes.NewBufferString(string(req.Data())) + httpReq, err := http.NewRequest("GET", "http://localhost:8080/", bodyReader) + if err != nil { + return nil, err + } + + httpReq.Header.Set("Content-Type", "application/json") + + return httpReq, nil + }) + connRegistry := wasabi.NewDefaultConnectionRegistry() dispatcher := wasabi.NewPipeDispatcher(backend) server := wasabi.NewServer(Port) diff --git a/dispatcher.go b/dispatcher.go index b4f57ca..1a8a2e0 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -2,9 +2,8 @@ package wasabi import "log/slog" -// Dispatcher is interface for dispatchers -type Dispatcher interface { - Dispatch(conn Connection, data []byte) +type Backend interface { + Handle(conn Connection, r Request) error } // PipeDispatcher is a dispatcher that does not support any routing of requests diff --git a/server.go b/server.go index 048e7ce..02cf9b8 100644 --- a/server.go +++ b/server.go @@ -9,6 +9,13 @@ import ( "golang.org/x/exp/slog" ) +// Channel is interface for channels +type Channel interface { + Path() string + SetContext(ctx context.Context) + Handler() http.Handler +} + const ( ReadHeaderTimeout = 3 * time.Second ReadTimeout = 30 * time.Second