diff --git a/channel/connection_registry.go b/channel/connection_registry.go index 46b6bd2..f5119dc 100644 --- a/channel/connection_registry.go +++ b/channel/connection_registry.go @@ -128,3 +128,12 @@ func WithMaxFrameLimit(limit int64) ConnectionRegistryOption { r.frameSizeLimit = limit } } + +// WithConcurrencyLimit sets the maximum number of concurrent requests that can be handled by a connection. +// The default concurrency limit is 25. +// When the concurrency limit is exceeded, the connection stops reading messages until the number of concurrent requests decreases. +func WithConcurrencyLimit(limit uint) ConnectionRegistryOption { + return func(r *ConnectionRegistry) { + r.concurrencyLimit = limit + } +} diff --git a/channel/connection_registry_test.go b/channel/connection_registry_test.go index f3da683..74e6f03 100644 --- a/channel/connection_registry_test.go +++ b/channel/connection_registry_test.go @@ -124,3 +124,16 @@ func TestConnectionRegistry_Shutdown(t *testing.T) { t.Error("Expected registry to be closed") } } +func TestConnectionRegistry_WithConcurrencyLimit(t *testing.T) { + registry := NewConnectionRegistry() + + if registry.concurrencyLimit != concurencyLimitPerConnection { + t.Errorf("Unexpected concurrency limit: got %d, expected %d", registry.concurrencyLimit, concurencyLimitPerConnection) + } + + registry = NewConnectionRegistry(WithConcurrencyLimit(10)) + + if registry.concurrencyLimit != 10 { + t.Errorf("Unexpected concurrency limit: got %d, expected %d", registry.concurrencyLimit, 10) + } +}