Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Add Subscription prober (#433) (#481)
Browse files Browse the repository at this point in the history
* Add Subscription prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix endpoints informer in cons. KafkaChannel controller

* Fix unittests after adding status prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Format and order go imports in cons. channel controller

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Rename import alias and remove unused variable

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add dispatcher prober test for tesitng a single pod

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Support probing dispatchers for multiple partitions kafka channels

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Update deps

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix conumer handler test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* remove unused hashes from status probing test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Apply review comments and add a prober test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove old comment

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix fake status manager

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Return error if IsReady returns an error

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Change probing to be partition based and fix some corner cases of channel deletion

* Change cleanup logic to clean ready subscriptions only

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove cleanup to avaid consumers race

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>
  • Loading branch information
devguyio authored Mar 25, 2021
1 parent 19cc51b commit 3d0fa6d
Show file tree
Hide file tree
Showing 51 changed files with 2,374 additions and 857 deletions.
2 changes: 2 additions & 0 deletions config/channel/consolidated/deployments/dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ spec:
template:
metadata:
labels:
# Do not change. Used by the controller for probing.
messaging.knative.dev/channel: kafka-channel
# Do not change. Used by the controller for probing.
messaging.knative.dev/role: dispatcher
kafka.eventing.knative.dev/release: devel
spec:
Expand Down
16 changes: 2 additions & 14 deletions config/channel/consolidated/roles/controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ rules:
- apiGroups:
- "" # Core API group.
resources:
- services
- configmaps
- secrets
verbs:
Expand All @@ -51,6 +50,7 @@ rules:
- "" # Core API group.
resources:
- services
- serviceaccounts
verbs: &everything
- get
- list
Expand All @@ -74,18 +74,11 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- "" # Core API group.
resources:
- endpoints
- pods
verbs:
- get
- list
Expand All @@ -96,11 +89,6 @@ rules:
- deployments
- deployments/status
verbs: *everything
- apiGroups:
- "" # Core API group.
resources:
- serviceaccounts
verbs: *everything
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.opencensus.io v0.22.6
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
Expand All @@ -33,7 +34,8 @@ require (
k8s.io/apimachinery v0.19.7
k8s.io/client-go v0.19.7
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
knative.dev/eventing v0.21.0
knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf
knative.dev/pkg v0.0.0-20210216013737-584933f8280b
knative.dev/eventing v0.21.3
knative.dev/hack v0.0.0-20210305150220-f99a25560134
knative.dev/networking v0.0.0-20210304153916-f813b5904943
knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d
)
19 changes: 16 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -512,6 +514,7 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.0 h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg=
Expand Down Expand Up @@ -690,6 +693,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
Expand Down Expand Up @@ -1040,6 +1044,7 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200616133436-c1934b75d054/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
Expand All @@ -1048,6 +1053,8 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818 h1:u2ssHESKr0HP2d1wlnjMKH+V/22Vg1lGCVuXmOYU1qA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1244,13 +1251,19 @@ k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6 h1:+WnxoVtG8TMiudHBSEtrVL
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K8Hf8whTseBgJcg=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/eventing v0.21.0 h1:oZoL0lXXslUClNvoi5mLO2YI6zn2C8qg80mz8Y6Vmkg=
knative.dev/eventing v0.21.0/go.mod h1:JjbVEOTJJHqo9CTxbTfrMn018hG8fOr3UfBoCJ7KWaA=
knative.dev/eventing v0.21.3 h1:H4hDHhlyScnU90Ns/qQfC69KCygPHX36Z0EM4i2fAwk=
knative.dev/eventing v0.21.3/go.mod h1:JjbVEOTJJHqo9CTxbTfrMn018hG8fOr3UfBoCJ7KWaA=
knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf h1:u4cY4jr2LYvhoz/1HBWEPsMiLkm0HMdDTfmmw1RE8zE=
knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20210305150220-f99a25560134 h1:lUllAp28TkevQIgWrsjow8ZLnXJy3AraRzGFm/ffD2c=
knative.dev/hack v0.0.0-20210305150220-f99a25560134/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/networking v0.0.0-20210304153916-f813b5904943 h1:EEAnCZzqVoTNNPMYyONXqOD3e/45OPVahA4jm8ET4/g=
knative.dev/networking v0.0.0-20210304153916-f813b5904943/go.mod h1:G+KCelFuLocMrnfayHoxqsFG+IYX4t8To1celZes77k=
knative.dev/pkg v0.0.0-20210215165523-84c98f3c3e7a/go.mod h1:TJSdebQOWX5N2bszohOYVi0H1QtXbtlYLuMghAFBMhY=
knative.dev/pkg v0.0.0-20210216013737-584933f8280b h1:2v+GBBenjPXhtk3KX/YqxCQVBU6o/AaRkkqE58lgFRQ=
knative.dev/pkg v0.0.0-20210216013737-584933f8280b/go.mod h1:TJSdebQOWX5N2bszohOYVi0H1QtXbtlYLuMghAFBMhY=
knative.dev/pkg v0.0.0-20210303192215-8fbab7ebb77b/go.mod h1:TJSdebQOWX5N2bszohOYVi0H1QtXbtlYLuMghAFBMhY=
knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d h1:2Uc3qyLRLIYOqJrGGKFkJc69X+cxlhoH3jk7p4b4KFM=
knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d/go.mod h1:fP690UCcs5x+qQVhjJxNcm97OWIiUdFC1dqbD3Gsp64=
knative.dev/reconciler-test v0.0.0-20210216030508-77f50054d024/go.mod h1:RP/K5xJylB72Go6eAsXYEsQHp4zCCNMNjmsqhvq7wko=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU=
Expand Down
84 changes: 84 additions & 0 deletions pkg/channel/consolidated/dispatcher/consumer_message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatcher

import (
"context"
"errors"

"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"go.uber.org/zap"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
)

type consumerMessageHandler struct {
logger *zap.SugaredLogger
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
consumerGroup string
}

var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil)

func (c consumerMessageHandler) GetConsumerGroup() string {
return c.consumerGroup
}

func (c consumerMessageHandler) SetReady(partition int32, ready bool) {
c.kafkaSubscription.SetReady(c.sub.UID, partition, ready)
}

func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) {
defer func() {
if r := recover(); r != nil {
c.logger.Warn("Panic happened while handling a message",
zap.String("topic", consumerMessage.Topic),
zap.Any("panic value", r),
)
}
}()
message := protocolkafka.NewMessageFromConsumerMessage(consumerMessage)
if message.ReadEncoding() == binding.EncodingUnknown {
return false, errors.New("received a message with unknown encoding")
}

c.logger.Debug("Going to dispatch the message",
zap.String("topic", consumerMessage.Topic),
zap.String("subscription", c.sub.String()),
)

ctx, span := tracing.StartTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

_, err := c.dispatcher.DispatchMessageWithRetries(
ctx,
message,
nil,
c.sub.Subscriber,
c.sub.Reply,
c.sub.DeadLetter,
c.sub.RetryConfig,
)

// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
}
Loading

0 comments on commit 3d0fa6d

Please sign in to comment.