Skip to content

Commit

Permalink
fix: do not double increment clientpool wg (#4005)
Browse files Browse the repository at this point in the history
* fix: do not double increment clientpool wg

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test: verify that client is successfully closed

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: test has no data race

Signed-off-by: Calum Murray <cmurray@redhat.com>

* address review comments

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jul 25, 2024
1 parent 9c96bce commit 961961c
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 6 deletions.
14 changes: 11 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, sec
type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
return nil, err
}

client.incrementCallers()
return client, nil
}

func (cp *ClientPool) getClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (*client, error) {
// (bootstrapServers, secret) uniquely identifies a sarama client config with the options we allow users to configure
key := makeClusterAdminKey(bootstrapServers, secret)

Expand All @@ -76,7 +86,6 @@ func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string,
// if a corresponding connection already exists, lets use it
if val, ok := cp.cache.Get(key); ok && val.hasCorrectSecretVersion(secret) {
logger.Debug("successfully got a client from the clientpool")
val.incrementCallers()
return val, nil
}
logger.Debug("failed to get an existing client, going to create one")
Expand Down Expand Up @@ -108,13 +117,12 @@ func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string,

logger.Debug("Closing client")

if err := value.client.Close(); !errors.Is(err, sarama.ErrClosedClient) {
if err := value.client.Close(); err != nil && !errors.Is(err, sarama.ErrClosedClient) {
logger.Errorw("Failed to close client", zap.Error(err))
}
}()
})

val.incrementCallers()
return val, nil
}

Expand Down
43 changes: 43 additions & 0 deletions control-plane/pkg/kafka/clientpool/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing"
Expand Down Expand Up @@ -117,6 +118,48 @@ func TestGetClusterAdmin(t *testing.T) {
cancel()
}

func TestClientCloses(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)

cache := prober.NewLocalExpiringCache[clientKey, *client, struct{}](ctx, time.Second*1)
clientClosed := atomic.NewBool(false)
adminClosed := atomic.NewBool(false)

clients := &ClientPool{
cache: cache,
newSaramaClient: func(_ []string, _ *sarama.Config) (sarama.Client, error) {
return &kafkatesting.MockKafkaClient{OnClose: func() {
clientClosed.Toggle()
}}, nil
},
newClusterAdminFromClient: func(c sarama.Client) (sarama.ClusterAdmin, error) {
return &kafkatesting.MockKafkaClusterAdmin{ExpectedTopics: []string{"topic1"}, OnClose: func() {
c.Close()
adminClosed.Toggle()
}}, nil
},
}

client1, err := clients.GetClient(ctx, []string{"localhost:9092"}, nil)
assert.NoError(t, err)

clusterAdmin, err := clients.GetClusterAdmin(ctx, []string{"localhost:9092"}, nil)
assert.NoError(t, err)

clusterAdmin.Close()
client1.Close()

time.Sleep(time.Second * 2)

// the client should have been closed successfully now
assert.True(t, clientClosed.Load())
assert.True(t, adminClosed.Load())

cancel()
}

func TestMakeClientKey(t *testing.T) {
key1 := makeClusterAdminKey([]string{"localhost:9090", "localhost:9091", "localhost:9092"}, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "knative-eventing"}})
key2 := makeClusterAdminKey([]string{"localhost:9092", "localhost:9091", "localhost:9090"}, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "knative-eventing"}})
Expand Down
4 changes: 1 addition & 3 deletions control-plane/pkg/kafka/clientpool/clusteradmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func clusterAdminFromClient(saramaClient sarama.Client, makeClusterAdmin kafka.N
return nil, err
}

c.incrementCallers()

return &clusterAdmin{
client: c,
clusterAdmin: ca,
Expand Down Expand Up @@ -291,5 +289,5 @@ func (a *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstan
}

func (a *clusterAdmin) Close() error {
return a.client.Close()
return a.clusterAdmin.Close()
}
5 changes: 5 additions & 0 deletions control-plane/pkg/kafka/testing/admin_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type MockKafkaClusterAdmin struct {

ErrorOnDeleteConsumerGroup error

OnClose func()

T *testing.T
}

Expand Down Expand Up @@ -301,5 +303,8 @@ func (m *MockKafkaClusterAdmin) RemoveMemberFromConsumerGroup(groupId string, gr

func (m *MockKafkaClusterAdmin) Close() error {
m.ExpectedClose = true
if m.OnClose != nil {
m.OnClose()
}
return m.ExpectedCloseError
}
4 changes: 4 additions & 0 deletions control-plane/pkg/kafka/testing/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type MockKafkaClient struct {
ShouldFailRefreshMetadata bool
ShouldFailRefreshBrokers bool
ShouldFailBrokenPipe bool
OnClose func()
}

var _ sarama.Client = &MockKafkaClient{}
Expand Down Expand Up @@ -195,6 +196,9 @@ func (m MockKafkaClient) LeastLoadedBroker() *sarama.Broker {

func (m *MockKafkaClient) Close() error {
m.IsClosed = true
if m.OnClose != nil {
m.OnClose()
}
return m.CloseError
}

Expand Down

0 comments on commit 961961c

Please sign in to comment.