Skip to content

Commit

Permalink
Merge pull request #11 from lovoo/AS-576-migrated-tailer
Browse files Browse the repository at this point in the history
AS-576: migrated tailer from antispam
  • Loading branch information
mmreza79 authored Jun 23, 2020
2 parents 6f7e1dc + c7648cb commit 259e256
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 44 deletions.
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO
cloud.google.com/go v0.53.0 h1:MZQCQQaRwOrAcuKjiHWHrgKykt4fZyuwF2dtiG3fGW8=
cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M=
cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc=
cloud.google.com/go v0.56.0 h1:WRz29PgAsVEyPSDHyk+0fpEkwEFyfhHn+JbksT6gIL4=
cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0 h1:xE3CPsOgttP4ACBePh79zTKALtXwn/Edhcr16R5hMWU=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
cloud.google.com/go/bigquery v1.5.0 h1:K2NyuHRuv15ku6eUpe0DQk5ZykPMnSOnvuVf6IHcjaE=
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/datastore v1.0.0 h1:Kt+gOPPp2LEPWp8CSfxhsM8ik9CcyE/gYu+0r+RnZvM=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
Expand Down Expand Up @@ -110,6 +112,7 @@ github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -286,6 +289,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 h1:MsuvTghUPjX762sGLnGsxC3HM
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
90 changes: 61 additions & 29 deletions tailer/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,31 @@ package tailer
import (
"errors"
"fmt"
"log"
"sync"

"github.com/Shopify/sarama"

"github.com/lovoo/goka"
)

type TailMessageHook func(message *sarama.ConsumerMessage)
// TailMessageHook is called on every message being added to the tailer
type TailMessageHook func(item *TailerItem)

// TailerVisiter is called on reverseiterate for every message
type TailerVisiter func(item *TailerItem) error

// TailerItem represents a decoded messagei n the tailer's ring buffer
type TailerItem struct {
// Key is the key of the original message
Key string
// Value is the decoded value. This will not be nil, as the tailer ignores nils
Value interface{}
// Offset is the message's offset
Offset int64
}

// Tailer retrieves the last n messages from a given topic
// Tailer updates messages from a topic and keeps them in a ring buffer for reverse iteration
type Tailer struct {
size int64
topic string
Expand All @@ -23,11 +39,11 @@ type Tailer struct {

m sync.RWMutex
numItems int64
items []*sarama.ConsumerMessage
items []*TailerItem
codec goka.Codec
}

func defaultTailHook(*sarama.ConsumerMessage) {}
func defaultTailHook(item *TailerItem) {}

// Stop the tailer
func (t *Tailer) Stop() {
Expand Down Expand Up @@ -66,7 +82,7 @@ func NewTailer(brokers []string, topic string, size int, codec goka.Codec) (*Tai
}, nil
}

// RegisterConsumeHook registers a TailMessageHook hook
// RegisterConsumeHook sets the callback that will be called on every message being added to the tailer
func (t *Tailer) RegisterConsumeHook(tailHook TailMessageHook) {
t.tailHook = tailHook
}
Expand Down Expand Up @@ -114,22 +130,41 @@ func (t *Tailer) Start() error {

func (t *Tailer) tail(partConsumer sarama.PartitionConsumer) {
for msg := range partConsumer.Messages() {
t.addMessage(msg)
err := t.addMessage(msg)
if err != nil {
log.Printf("Error decoding message: %v", err)
}
}
}

func (t *Tailer) addMessage(msg *sarama.ConsumerMessage) {
func (t *Tailer) addMessage(msg *sarama.ConsumerMessage) error {
t.m.Lock()
defer t.m.Unlock()

t.tailHook(msg)
if msg.Value == nil {
return nil
}

decoded, err := t.codec.Decode(msg.Value)
if err != nil {
return fmt.Errorf("Error decoding message %+v with codec %v: %v", msg, t.codec, err)
}

item := &TailerItem{
Key: string(msg.Key),
Offset: msg.Offset,
Value: decoded,
}

t.tailHook(item)

if t.numItems < int64(t.size) {
t.items = append(t.items, msg)
t.items = append(t.items, item)
} else {
t.items[t.numItems%int64(t.size)] = msg
t.items[t.numItems%int64(t.size)] = item
}
t.numItems++
return nil
}

// EndOffset specifies the largest offset. It is used to tell IterateReverse to
Expand All @@ -139,17 +174,18 @@ const (
)

var (
StopErr = errors.New("Iteration stopped.")
// ErrStop indicates the iteration should be stopped. This can be returned from the iterate-Visitor
ErrStop = errors.New("iteration stopped")
)

// IterateReverse iterates over all items ignoring items having bigger offset than maxOffset
// (or all, if EndOffset is given)
func (t *Tailer) IterateReverse(maxOffset int64, visit func(item interface{}, kafkaOffset int64) error) error {
func (t *Tailer) IterateReverse(maxOffset int64, visit TailerVisiter) error {
t.m.RLock()
defer t.m.RUnlock()

if t.numItems == 0 {
return nil
return fmt.Errorf("kafka topic does not have msg")
}

numIterations := t.numItems
Expand All @@ -166,21 +202,21 @@ func (t *Tailer) IterateReverse(maxOffset int64, visit func(item interface{}, ka
if maxOffset > EndOffset && item.Offset > maxOffset {
continue
}
decoded, err := t.codec.Decode(item.Value)
if err != nil {
return fmt.Errorf("Error decoding message %+v with codec %v: %v", item, t.codec, err)
}
if err := visit(decoded, item.Offset); err != nil {
if err == StopErr {
if err := visit(item); err != nil {
if err == ErrStop {
return nil
} else {
return err
}
return err
}
}
return nil
}

func (t *Tailer) Read(num int64, offset int64) ([]interface{}, error) {
// AllItems indicates all items from the tailer should be read
const AllItems = -1

func (t *Tailer) Read(num int64, offset int64) ([]*TailerItem, error) {
t.m.RLock()
defer t.m.RUnlock()
if offset < 0 {
Expand Down Expand Up @@ -226,20 +262,16 @@ func (t *Tailer) Read(num int64, offset int64) ([]interface{}, error) {
}

// prepare the list with appropriate capacity
items := make([]interface{}, 0, lastIndex-firstIndex)
items := make([]*TailerItem, 0, lastIndex-firstIndex)

//fmt.Printf("num=%d, offset=%d, numItems=%d, first=%d, last=%d, items-len=%d, items-cap=%d\n", num, offset, numItems, firstIndex, lastIndex, len(items), cap(items))

// iterate from lastIndex to firstindex
for idx := lastIndex; idx > firstIndex; idx-- {
// get the real index by shifting + modulo it's size.
// that way we iterate backwards through the ring
ringIdx := (idx + idxShift) % t.size

// decode the element and add it
decoded, err := t.codec.Decode(t.items[ringIdx].Value)
if err != nil {
return nil, fmt.Errorf("Error decoding item while reading from the Tailer: %v", err)
}
items = append(items, decoded)
items = append(items, t.items[ringIdx])
}

return items, nil
Expand Down
29 changes: 14 additions & 15 deletions tailer/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,28 @@ import (
"github.com/lovoo/goka/codec"
)

const allItems = -1

func TestTailer_addMessage(t *testing.T) {
tailer := &Tailer{
tailHook: defaultTailHook,
codec: new(codec.String),
size: 3,
}

// add items until we're full
ensure.True(t, tailer.numItems == 0)
tailer.addMessage(&sarama.ConsumerMessage{Offset: 1})
tailer.addMessage(&sarama.ConsumerMessage{Offset: 1, Value: []byte("asdf")})
ensure.True(t, tailer.numItems == 1)
tailer.addMessage(&sarama.ConsumerMessage{Offset: 2})
tailer.addMessage(&sarama.ConsumerMessage{Offset: 2, Value: []byte("asdf")})
ensure.True(t, tailer.numItems == 2)
tailer.addMessage(&sarama.ConsumerMessage{Offset: 3})
tailer.addMessage(&sarama.ConsumerMessage{Offset: 3, Value: []byte("asdf")})
ensure.True(t, tailer.numItems == 3)
// check correct order
ensure.True(t, tailer.items[0].Offset == 1)
ensure.True(t, tailer.items[1].Offset == 2)
ensure.True(t, tailer.items[2].Offset == 3)

// add one more
tailer.addMessage(&sarama.ConsumerMessage{Offset: 4})
tailer.addMessage(&sarama.ConsumerMessage{Offset: 4, Value: []byte("asdf")})
ensure.True(t, tailer.numItems == 4)
ensure.True(t, tailer.items[0].Offset == 4)
ensure.True(t, tailer.items[1].Offset == 2)
Expand All @@ -53,15 +52,15 @@ func TestTailer_Read(t *testing.T) {
ensure.DeepEqual(t, len(items), len(expectedResults), "case", caseNum)

for idx, expected := range expectedResults {
ensure.DeepEqual(t, items[idx].(string), expected, "case", caseNum)
ensure.DeepEqual(t, items[idx].Value.(string), expected, "case", caseNum)
}
}

// read on empty tailer
testRead(0, 0, []string{}, "1")
testRead(1, 0, []string{}, "2")
testRead(1, 1, []string{}, "3")
testRead(allItems, 0, []string{}, "4")
testRead(AllItems, 0, []string{}, "4")

// add one message, and test again
tailer.addMessage(&sarama.ConsumerMessage{Offset: 1, Value: []byte("1")})
Expand All @@ -71,7 +70,7 @@ func TestTailer_Read(t *testing.T) {
testRead(1, 1, []string{}, "7")
testRead(2, 0, []string{"1"}, "8")
testRead(1, 1, []string{}, "9")
testRead(allItems, 0, []string{"1"}, "10")
testRead(AllItems, 0, []string{"1"}, "10")

// add more than the tailer can save and test again
tailer.addMessage(&sarama.ConsumerMessage{Offset: 2, Value: []byte("2")})
Expand All @@ -82,9 +81,9 @@ func TestTailer_Read(t *testing.T) {
testRead(1, 1, []string{"3"}, "13")
testRead(2, 0, []string{"4", "3"}, "14")
testRead(1, 3, []string{}, "15")
testRead(allItems, 0, []string{"4", "3", "2"}, "16")
testRead(allItems, 1, []string{"3", "2"}, "17")
testRead(allItems, 2, []string{"2"}, "18")
testRead(AllItems, 0, []string{"4", "3", "2"}, "16")
testRead(AllItems, 1, []string{"3", "2"}, "17")
testRead(AllItems, 2, []string{"2"}, "18")
}

func TestTailer_IterateReverse(t *testing.T) {
Expand All @@ -96,15 +95,15 @@ func TestTailer_IterateReverse(t *testing.T) {

iterateWithOffset := func(offset int) []string {
var read []string
tailer.IterateReverse(int64(offset), func(item interface{}, kafkaOffset int64) error {
read = append(read, item.(string))
tailer.IterateReverse(int64(offset), func(item *TailerItem) error {
read = append(read, item.Value.(string))
return nil
})
return read
}

iterate := func() []string {
return iterateWithOffset(allItems)
return iterateWithOffset(AllItems)
}

var offset int64
Expand Down

0 comments on commit 259e256

Please sign in to comment.