Skip to content

Commit

Permalink
feat: add uncompressed flushed bytes metric (#161)
Browse files Browse the repository at this point in the history
* feat: track uncompressed bytes
  • Loading branch information
kyungeunni authored Apr 30, 2024
1 parent 934efbe commit 582f708
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 90 deletions.
62 changes: 37 additions & 25 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ var (
// server to make progress encoding while Elasticsearch is busy servicing flushed bulk requests.
type Appender struct {
// legacy metrics for Stats()
bulkRequests int64
docsAdded int64
docsActive int64
docsFailed int64
docsFailedClient int64
docsFailedServer int64
docsIndexed int64
tooManyRequests int64
bytesTotal int64
availableBulkRequests int64
activeCreated int64
activeDestroyed int64
bulkRequests int64
docsAdded int64
docsActive int64
docsFailed int64
docsFailedClient int64
docsFailedServer int64
docsIndexed int64
tooManyRequests int64
bytesTotal int64
bytesUncompressedTotal int64
availableBulkRequests int64
activeCreated int64
activeDestroyed int64

scalingInfo atomic.Value

Expand Down Expand Up @@ -226,19 +227,20 @@ func (a *Appender) Close(ctx context.Context) error {
// Stats returns the bulk indexing stats.
func (a *Appender) Stats() Stats {
return Stats{
Added: atomic.LoadInt64(&a.docsAdded),
Active: atomic.LoadInt64(&a.docsActive),
BulkRequests: atomic.LoadInt64(&a.bulkRequests),
Failed: atomic.LoadInt64(&a.docsFailed),
FailedClient: atomic.LoadInt64(&a.docsFailedClient),
FailedServer: atomic.LoadInt64(&a.docsFailedServer),
Indexed: atomic.LoadInt64(&a.docsIndexed),
TooManyRequests: atomic.LoadInt64(&a.tooManyRequests),
BytesTotal: atomic.LoadInt64(&a.bytesTotal),
AvailableBulkRequests: atomic.LoadInt64(&a.availableBulkRequests),
IndexersActive: a.scalingInformation().activeIndexers,
IndexersCreated: atomic.LoadInt64(&a.activeCreated),
IndexersDestroyed: atomic.LoadInt64(&a.activeDestroyed),
Added: atomic.LoadInt64(&a.docsAdded),
Active: atomic.LoadInt64(&a.docsActive),
BulkRequests: atomic.LoadInt64(&a.bulkRequests),
Failed: atomic.LoadInt64(&a.docsFailed),
FailedClient: atomic.LoadInt64(&a.docsFailedClient),
FailedServer: atomic.LoadInt64(&a.docsFailedServer),
Indexed: atomic.LoadInt64(&a.docsIndexed),
TooManyRequests: atomic.LoadInt64(&a.tooManyRequests),
BytesTotal: atomic.LoadInt64(&a.bytesTotal),
BytesUncompressedTotal: atomic.LoadInt64(&a.bytesUncompressedTotal),
AvailableBulkRequests: atomic.LoadInt64(&a.availableBulkRequests),
IndexersActive: a.scalingInformation().activeIndexers,
IndexersCreated: atomic.LoadInt64(&a.activeCreated),
IndexersDestroyed: atomic.LoadInt64(&a.activeDestroyed),
}
}

Expand Down Expand Up @@ -333,6 +335,11 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
if flushed := bulkIndexer.BytesFlushed(); flushed > 0 {
a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal)
}
// Record the BulkIndexer uncompressed bytes written to the buffer
// as the bytesUncompressedTotal metric after the request has been flushed.
if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 {
a.addCount(int64(flushed), &a.bytesUncompressedTotal, a.metrics.bytesUncompressedTotal)
}
if err != nil {
atomic.AddInt64(&a.docsFailed, int64(n))
logger.Error("bulk indexing request failed", zap.Error(err))
Expand Down Expand Up @@ -740,6 +747,11 @@ type Stats struct {
// which counts bytes at the transport level.
BytesTotal int64

// BytesUncompressedTotal represents the total number of bytes written to
// the request body before compression.
// The number of bytes written will be equal to BytesTotal if compression is disabled.
BytesUncompressedTotal int64

// AvailableBulkRequests represents the number of bulk indexers
// available for making bulk index requests.
AvailableBulkRequests int64
Expand Down
107 changes: 65 additions & 42 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ import (

func TestAppender(t *testing.T) {
var bytesTotal int64
var bytesUncompressed int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, result := docappendertest.DecodeBulkRequest(r)
_, result, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressed += stat.UncompressedBytes
result.HasErrors = true
// Respond with an error for the first two items, with one indicating
// "too many requests". These will be recorded as failures in indexing
Expand Down Expand Up @@ -128,16 +130,17 @@ loop:
stats := indexer.Stats()
failed := int64(3)
assert.Equal(t, docappender.Stats{
Added: N,
Active: 0,
BulkRequests: 1,
Failed: failed,
FailedClient: 1,
FailedServer: 1,
Indexed: N - failed,
TooManyRequests: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
Added: N,
Active: 0,
BulkRequests: 1,
Failed: failed,
FailedClient: 1,
FailedServer: 1,
Indexed: N - failed,
TooManyRequests: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressed,
}, stats)

var rm metricdata.ResourceMetrics
Expand Down Expand Up @@ -187,6 +190,8 @@ loop:
assertCounter(m, stats.AvailableBulkRequests, indexerAttrs)
case "elasticsearch.flushed.bytes":
assertCounter(m, stats.BytesTotal, indexerAttrs)
case "elasticsearch.flushed.uncompressed.bytes":
assertCounter(m, stats.BytesUncompressedTotal, indexerAttrs)
case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency":
// expect this metric name but no assertions done
// as it's histogram and it's checked elsewhere
Expand All @@ -196,7 +201,7 @@ loop:
})

assert.Empty(t, unexpectedMetrics)
assert.Equal(t, int64(6), asserted.Load())
assert.Equal(t, int64(7), asserted.Load())
assert.Equal(t, 4, processedAsserted)
}

Expand Down Expand Up @@ -236,7 +241,8 @@ func TestAppenderAvailableAppenders(t *testing.T) {
err = indexer.Close(context.Background())
require.NoError(t, err)
stats = indexer.Stats()
stats.BytesTotal = 0 // Asserted elsewhere.
stats.BytesTotal = 0 // Asserted elsewhere.
stats.BytesUncompressedTotal = 0 // Asserted elsewhere.
assert.Equal(t, docappender.Stats{
Added: N,
BulkRequests: N,
Expand Down Expand Up @@ -284,9 +290,11 @@ func TestAppenderEncoding(t *testing.T) {

func TestAppenderCompressionLevel(t *testing.T) {
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, result := docappendertest.DecodeBulkRequest(r)
_, result, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal += stat.UncompressedBytes
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.New(client, docappender.Config{
Expand All @@ -303,14 +311,15 @@ func TestAppenderCompressionLevel(t *testing.T) {
require.NoError(t, err)
stats := indexer.Stats()
assert.Equal(t, docappender.Stats{
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 0,
Indexed: 1,
TooManyRequests: 0,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 0,
Indexed: 1,
TooManyRequests: 0,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}, stats)
}

Expand Down Expand Up @@ -504,8 +513,11 @@ func TestAppenderFlushBytes(t *testing.T) {

func TestAppenderServerError(t *testing.T) {
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, _, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal += stat.UncompressedBytes
w.WriteHeader(http.StatusInternalServerError)
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})
Expand All @@ -519,21 +531,25 @@ func TestAppenderServerError(t *testing.T) {
require.EqualError(t, err, "flush failed: [500 Internal Server Error] ")
stats := indexer.Stats()
assert.Equal(t, docappender.Stats{
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}, stats)
}

func TestAppenderServerErrorTooManyRequests(t *testing.T) {
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
// Set the r.ContentLength rather than sum it since 429s will be
// retried by the go-elasticsearch transport.
bytesTotal = r.ContentLength
_, _, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal = stat.UncompressedBytes
w.WriteHeader(http.StatusTooManyRequests)
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})
Expand All @@ -547,13 +563,14 @@ func TestAppenderServerErrorTooManyRequests(t *testing.T) {
require.EqualError(t, err, "flush failed: [429 Too Many Requests] ")
stats := indexer.Stats()
assert.Equal(t, docappender.Stats{
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 1,
TooManyRequests: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 1,
TooManyRequests: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}, stats)
}

Expand Down Expand Up @@ -1108,9 +1125,11 @@ func TestAppenderCloseBusyIndexer(t *testing.T) {
// This test ensures that all the channel items are consumed and indexed
// when the indexer is closed.
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, result := docappendertest.DecodeBulkRequest(r)
_, result, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal = stat.UncompressedBytes
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.New(client, docappender.Config{})
Expand All @@ -1125,12 +1144,13 @@ func TestAppenderCloseBusyIndexer(t *testing.T) {
assert.NoError(t, indexer.Close(context.Background()))

assert.Equal(t, docappender.Stats{
Added: N,
Indexed: N,
BulkRequests: 1,
BytesTotal: bytesTotal,
AvailableBulkRequests: 10,
IndexersActive: 0}, indexer.Stats())
Added: N,
Indexed: N,
BulkRequests: 1,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
AvailableBulkRequests: 10,
IndexersActive: 0}, indexer.Stats())
}

func TestAppenderPipeline(t *testing.T) {
Expand Down Expand Up @@ -1252,6 +1272,7 @@ func TestAppenderScaling(t *testing.T) {
waitForScaleDown(t, indexer, 1)
stats := indexer.Stats()
stats.BytesTotal = 0
stats.BytesUncompressedTotal = 0
assert.Equal(t, docappender.Stats{
Active: 0,
Added: docs,
Expand Down Expand Up @@ -1293,6 +1314,7 @@ func TestAppenderScaling(t *testing.T) {

stats := indexer.Stats()
stats.BytesTotal = 0
stats.BytesUncompressedTotal = 0
assert.Equal(t, docappender.Stats{
Active: 0,
Added: docs,
Expand Down Expand Up @@ -1334,6 +1356,7 @@ func TestAppenderScaling(t *testing.T) {
assert.NoError(t, indexer.Close(ctx))
stats := indexer.Stats()
stats.BytesTotal = 0
stats.BytesUncompressedTotal = 0
assert.Equal(t, docappender.Stats{
Active: 0,
Added: docs,
Expand Down
Loading

0 comments on commit 582f708

Please sign in to comment.