From c7648cb89881cf587b8974870ce4f5efda952bfc Mon Sep 17 00:00:00 2001 From: Mohsin Reza Date: Tue, 23 Jun 2020 09:35:06 +0200 Subject: [PATCH] AS-576: migrated tailer from antispam --- go.sum | 4 ++ tailer/tailer.go | 90 +++++++++++++++++++++++++++++-------------- tailer/tailer_test.go | 29 +++++++------- 3 files changed, 79 insertions(+), 44 deletions(-) diff --git a/go.sum b/go.sum index 7be9cfa..32fc5f7 100644 --- a/go.sum +++ b/go.sum @@ -10,11 +10,13 @@ cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO cloud.google.com/go v0.53.0 h1:MZQCQQaRwOrAcuKjiHWHrgKykt4fZyuwF2dtiG3fGW8= cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc= +cloud.google.com/go v0.56.0 h1:WRz29PgAsVEyPSDHyk+0fpEkwEFyfhHn+JbksT6gIL4= cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0 h1:xE3CPsOgttP4ACBePh79zTKALtXwn/Edhcr16R5hMWU= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= +cloud.google.com/go/bigquery v1.5.0 h1:K2NyuHRuv15ku6eUpe0DQk5ZykPMnSOnvuVf6IHcjaE= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/datastore v1.0.0 h1:Kt+gOPPp2LEPWp8CSfxhsM8ik9CcyE/gYu+0r+RnZvM= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= @@ -110,6 +112,7 @@ github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -286,6 +289,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 h1:MsuvTghUPjX762sGLnGsxC3HM golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/tailer/tailer.go b/tailer/tailer.go index 86a888b..b0f67a3 100644 --- a/tailer/tailer.go +++ b/tailer/tailer.go @@ -3,15 +3,31 @@ package tailer import ( "errors" "fmt" + "log" "sync" "github.com/Shopify/sarama" + "github.com/lovoo/goka" ) -type TailMessageHook func(message *sarama.ConsumerMessage) +// TailMessageHook is called on every message being added to the tailer +type TailMessageHook func(item *TailerItem) + +// TailerVisiter is called on reverseiterate for every message +type TailerVisiter func(item *TailerItem) error + +// TailerItem represents a decoded messagei n the tailer's ring buffer +type TailerItem struct { + // Key is the key of the original message + Key string + // Value is the decoded value. This will not be nil, as the tailer ignores nils + Value interface{} + // Offset is the message's offset + Offset int64 +} -// Tailer retrieves the last n messages from a given topic +// Tailer updates messages from a topic and keeps them in a ring buffer for reverse iteration type Tailer struct { size int64 topic string @@ -23,11 +39,11 @@ type Tailer struct { m sync.RWMutex numItems int64 - items []*sarama.ConsumerMessage + items []*TailerItem codec goka.Codec } -func defaultTailHook(*sarama.ConsumerMessage) {} +func defaultTailHook(item *TailerItem) {} // Stop the tailer func (t *Tailer) Stop() { @@ -66,7 +82,7 @@ func NewTailer(brokers []string, topic string, size int, codec goka.Codec) (*Tai }, nil } -// RegisterConsumeHook registers a TailMessageHook hook +// RegisterConsumeHook sets the callback that will be called on every message being added to the tailer func (t *Tailer) RegisterConsumeHook(tailHook TailMessageHook) { t.tailHook = tailHook } @@ -114,22 +130,41 @@ func (t *Tailer) Start() error { func (t *Tailer) tail(partConsumer sarama.PartitionConsumer) { for msg := range partConsumer.Messages() { - t.addMessage(msg) + err := t.addMessage(msg) + if err != nil { + log.Printf("Error decoding message: %v", err) + } } } -func (t *Tailer) addMessage(msg *sarama.ConsumerMessage) { +func (t *Tailer) addMessage(msg *sarama.ConsumerMessage) error { t.m.Lock() defer t.m.Unlock() - t.tailHook(msg) + if msg.Value == nil { + return nil + } + + decoded, err := t.codec.Decode(msg.Value) + if err != nil { + return fmt.Errorf("Error decoding message %+v with codec %v: %v", msg, t.codec, err) + } + + item := &TailerItem{ + Key: string(msg.Key), + Offset: msg.Offset, + Value: decoded, + } + + t.tailHook(item) if t.numItems < int64(t.size) { - t.items = append(t.items, msg) + t.items = append(t.items, item) } else { - t.items[t.numItems%int64(t.size)] = msg + t.items[t.numItems%int64(t.size)] = item } t.numItems++ + return nil } // EndOffset specifies the largest offset. It is used to tell IterateReverse to @@ -139,17 +174,18 @@ const ( ) var ( - StopErr = errors.New("Iteration stopped.") + // ErrStop indicates the iteration should be stopped. This can be returned from the iterate-Visitor + ErrStop = errors.New("iteration stopped") ) // IterateReverse iterates over all items ignoring items having bigger offset than maxOffset // (or all, if EndOffset is given) -func (t *Tailer) IterateReverse(maxOffset int64, visit func(item interface{}, kafkaOffset int64) error) error { +func (t *Tailer) IterateReverse(maxOffset int64, visit TailerVisiter) error { t.m.RLock() defer t.m.RUnlock() if t.numItems == 0 { - return nil + return fmt.Errorf("kafka topic does not have msg") } numIterations := t.numItems @@ -166,21 +202,21 @@ func (t *Tailer) IterateReverse(maxOffset int64, visit func(item interface{}, ka if maxOffset > EndOffset && item.Offset > maxOffset { continue } - decoded, err := t.codec.Decode(item.Value) - if err != nil { - return fmt.Errorf("Error decoding message %+v with codec %v: %v", item, t.codec, err) - } - if err := visit(decoded, item.Offset); err != nil { - if err == StopErr { + if err := visit(item); err != nil { + if err == ErrStop { return nil + } else { + return err } - return err } } return nil } -func (t *Tailer) Read(num int64, offset int64) ([]interface{}, error) { +// AllItems indicates all items from the tailer should be read +const AllItems = -1 + +func (t *Tailer) Read(num int64, offset int64) ([]*TailerItem, error) { t.m.RLock() defer t.m.RUnlock() if offset < 0 { @@ -226,20 +262,16 @@ func (t *Tailer) Read(num int64, offset int64) ([]interface{}, error) { } // prepare the list with appropriate capacity - items := make([]interface{}, 0, lastIndex-firstIndex) + items := make([]*TailerItem, 0, lastIndex-firstIndex) + + //fmt.Printf("num=%d, offset=%d, numItems=%d, first=%d, last=%d, items-len=%d, items-cap=%d\n", num, offset, numItems, firstIndex, lastIndex, len(items), cap(items)) // iterate from lastIndex to firstindex for idx := lastIndex; idx > firstIndex; idx-- { // get the real index by shifting + modulo it's size. // that way we iterate backwards through the ring ringIdx := (idx + idxShift) % t.size - - // decode the element and add it - decoded, err := t.codec.Decode(t.items[ringIdx].Value) - if err != nil { - return nil, fmt.Errorf("Error decoding item while reading from the Tailer: %v", err) - } - items = append(items, decoded) + items = append(items, t.items[ringIdx]) } return items, nil diff --git a/tailer/tailer_test.go b/tailer/tailer_test.go index de99023..bc84e51 100644 --- a/tailer/tailer_test.go +++ b/tailer/tailer_test.go @@ -8,21 +8,20 @@ import ( "github.com/lovoo/goka/codec" ) -const allItems = -1 - func TestTailer_addMessage(t *testing.T) { tailer := &Tailer{ tailHook: defaultTailHook, + codec: new(codec.String), size: 3, } // add items until we're full ensure.True(t, tailer.numItems == 0) - tailer.addMessage(&sarama.ConsumerMessage{Offset: 1}) + tailer.addMessage(&sarama.ConsumerMessage{Offset: 1, Value: []byte("asdf")}) ensure.True(t, tailer.numItems == 1) - tailer.addMessage(&sarama.ConsumerMessage{Offset: 2}) + tailer.addMessage(&sarama.ConsumerMessage{Offset: 2, Value: []byte("asdf")}) ensure.True(t, tailer.numItems == 2) - tailer.addMessage(&sarama.ConsumerMessage{Offset: 3}) + tailer.addMessage(&sarama.ConsumerMessage{Offset: 3, Value: []byte("asdf")}) ensure.True(t, tailer.numItems == 3) // check correct order ensure.True(t, tailer.items[0].Offset == 1) @@ -30,7 +29,7 @@ func TestTailer_addMessage(t *testing.T) { ensure.True(t, tailer.items[2].Offset == 3) // add one more - tailer.addMessage(&sarama.ConsumerMessage{Offset: 4}) + tailer.addMessage(&sarama.ConsumerMessage{Offset: 4, Value: []byte("asdf")}) ensure.True(t, tailer.numItems == 4) ensure.True(t, tailer.items[0].Offset == 4) ensure.True(t, tailer.items[1].Offset == 2) @@ -53,7 +52,7 @@ func TestTailer_Read(t *testing.T) { ensure.DeepEqual(t, len(items), len(expectedResults), "case", caseNum) for idx, expected := range expectedResults { - ensure.DeepEqual(t, items[idx].(string), expected, "case", caseNum) + ensure.DeepEqual(t, items[idx].Value.(string), expected, "case", caseNum) } } @@ -61,7 +60,7 @@ func TestTailer_Read(t *testing.T) { testRead(0, 0, []string{}, "1") testRead(1, 0, []string{}, "2") testRead(1, 1, []string{}, "3") - testRead(allItems, 0, []string{}, "4") + testRead(AllItems, 0, []string{}, "4") // add one message, and test again tailer.addMessage(&sarama.ConsumerMessage{Offset: 1, Value: []byte("1")}) @@ -71,7 +70,7 @@ func TestTailer_Read(t *testing.T) { testRead(1, 1, []string{}, "7") testRead(2, 0, []string{"1"}, "8") testRead(1, 1, []string{}, "9") - testRead(allItems, 0, []string{"1"}, "10") + testRead(AllItems, 0, []string{"1"}, "10") // add more than the tailer can save and test again tailer.addMessage(&sarama.ConsumerMessage{Offset: 2, Value: []byte("2")}) @@ -82,9 +81,9 @@ func TestTailer_Read(t *testing.T) { testRead(1, 1, []string{"3"}, "13") testRead(2, 0, []string{"4", "3"}, "14") testRead(1, 3, []string{}, "15") - testRead(allItems, 0, []string{"4", "3", "2"}, "16") - testRead(allItems, 1, []string{"3", "2"}, "17") - testRead(allItems, 2, []string{"2"}, "18") + testRead(AllItems, 0, []string{"4", "3", "2"}, "16") + testRead(AllItems, 1, []string{"3", "2"}, "17") + testRead(AllItems, 2, []string{"2"}, "18") } func TestTailer_IterateReverse(t *testing.T) { @@ -96,15 +95,15 @@ func TestTailer_IterateReverse(t *testing.T) { iterateWithOffset := func(offset int) []string { var read []string - tailer.IterateReverse(int64(offset), func(item interface{}, kafkaOffset int64) error { - read = append(read, item.(string)) + tailer.IterateReverse(int64(offset), func(item *TailerItem) error { + read = append(read, item.Value.(string)) return nil }) return read } iterate := func() []string { - return iterateWithOffset(allItems) + return iterateWithOffset(AllItems) } var offset int64