Skip to content

Commit

Permalink
fix(Raw) Fixed count option and ensure records are flushed before exit.
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfeidau committed Nov 10, 2017
1 parent fe96175 commit 27547af
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
15 changes: 11 additions & 4 deletions cmd/kinesis-tail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func processRawData(svc kinesisiface.KinesisAPI, stream string, timeout int64, c

timer1 := time.NewTimer(time.Duration(timeout) * time.Millisecond)

if count > 0 {
logger.WithField("count", count).Debug("waiting for records")
}

LOOP:
for {

Expand All @@ -172,16 +176,19 @@ LOOP:
msgResults = append(msgResults, msg)
}

messageSorter.PushBatch(msgResults)

recordCount = recordCount + len(result.Records)

if count != 0 {
if recordCount == count {
if recordCount >= count {
messageSorter.Flush()

logger.WithField("recordCount", recordCount).Info("reached count exit")
break LOOP
}
}

recordCount++
messageSorter.PushBatch(msgResults)

case <-timer1.C:
logger.Info("timer expired exit")
break LOOP
Expand Down
18 changes: 12 additions & 6 deletions pkg/sorter/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ func (lms *MessageSorter) PushBatch(logMessageBatch []*ktail.LogMessage) bool {
return lms.flushCheck()
}

func (lms *MessageSorter) flushCheck() bool {
lms.current++

if lms.current != lms.batchSize {
return false
}
// Flush force a flush of messages
func (lms *MessageSorter) Flush() {
sort.Sort(ktail.ByTimestamp(lms.cache))

for _, msg := range lms.cache {
Expand All @@ -48,6 +44,16 @@ func (lms *MessageSorter) flushCheck() bool {

lms.cache = []*ktail.LogMessage{}
lms.current = 0
}

func (lms *MessageSorter) flushCheck() bool {
lms.current++

if lms.current != lms.batchSize {
return false
}

lms.Flush()

return true
}

0 comments on commit 27547af

Please sign in to comment.