Skip to content

Commit

Permalink
Fix retried docs to not be in FailedDocs (#170)
Browse files Browse the repository at this point in the history
A document should either be retried or failed in a request.
  • Loading branch information
carsonip authored May 15, 2024
1 parent 5ac128f commit f0b5680
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
9 changes: 8 additions & 1 deletion appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,13 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
}
return err
}
var docsFailed, docsIndexed, tooManyRequests, clientFailed, serverFailed int64
var (
docsFailed, docsIndexed,
// breakdown of failed docs:
tooManyRequests, // failed after document retries (if it applies) and final status is 429
clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429
serverFailed int64 // failed after document retries (if it applies) and final status is 500s
)
docsIndexed = resp.Indexed
var failedCount map[BulkIndexerResponseItem]int
if len(resp.FailedDocs) > 0 {
Expand Down Expand Up @@ -401,6 +407,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
atomic.AddInt64(&a.docsFailed, docsFailed)
}
if resp.RetriedDocs > 0 {
// docs are scheduled to be retried but not yet failed due to retry limit
a.addCount(resp.RetriedDocs,
nil,
a.metrics.docsRetried,
Expand Down
7 changes: 4 additions & 3 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,10 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
}
}

if len(tmp) > 0 {
resp.FailedDocs = tmp
}
// FailedDocs contain responses of
// - non-retriable errors
// - retriable errors that reached the retry limit
resp.FailedDocs = tmp
}

return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestBulkIndexer(t *testing.T) {
stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(0), stat.Indexed)
require.Equal(t, itemCount, len(stat.FailedDocs))
require.Len(t, stat.FailedDocs, 0)
require.Equal(t, int64(itemCount), stat.RetriedDocs)

// all the flushed bytes are now in the buffer again to be retried
Expand Down

0 comments on commit f0b5680

Please sign in to comment.