Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a metric to track the number of bytes we're sending to Kusto #598

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion ingestor/adx/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (n *uploader) upload(ctx context.Context) error {
}
}

mr := io.MultiReader(readers...)
mr := newCountingReader(io.MultiReader(readers...))

now := time.Now()
if err := n.uploadReader(mr, database, table, mapping); err != nil {
Expand All @@ -310,6 +310,8 @@ func (n *uploader) upload(ctx context.Context) error {
metrics.LogsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
}
}

metrics.IngestorUploadedBytes.WithLabelValues(database, table).Set(float64(mr.Count()))
}()

}
Expand Down Expand Up @@ -388,3 +390,32 @@ type clusterDetails struct {
ClockDescription string `kusto:"ClockDescription"`
RuntimeDescription string `kusto:"RuntimeDescription"`
}

// countingReader wraps an io.Reader and counts bytes read without allocations.
type countingReader struct {
reader io.Reader
count int64
}

// Read implements io.Reader without allocations.
func (c *countingReader) Read(p []byte) (int, error) {
n, err := c.reader.Read(p)
c.count += int64(n)
return n, err
}

// Count returns the total bytes read.
func (c *countingReader) Count() int64 {
return c.count
}

// newCountingReader returns a countingReader wrapping the provided reader.
func newCountingReader(r io.Reader) *countingReader {
return &countingReader{reader: r}
}

// Reset allows reusing countingReader by resetting its internal state.
func (c *countingReader) Reset(r io.Reader) {
c.reader = r
c.count = 0
}
19 changes: 19 additions & 0 deletions ingestor/adx/uploader_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package adx

import (
"bytes"
"context"
"io"
"testing"

"github.com/Azure/adx-mon/pkg/testutils"
Expand Down Expand Up @@ -32,3 +34,20 @@ func TestClusterRequiresDirectIngest(t *testing.T) {
require.NoError(t, err)
require.True(t, requiresDirectIngest)
}

func BenchmarkCountingReader(b *testing.B) {
data := bytes.Repeat([]byte("testdata"), 1024)
reader := bytes.NewReader(data)
countReader := newCountingReader(reader)

b.ReportAllocs()
for i := 0; i < b.N; i++ {
reader.Reset(data)
countReader.Reset(reader)

_, _ = io.Copy(io.Discard, countReader)
if countReader.Count() != int64(len(data)) {
b.Fatalf("expected %d bytes read, got %d", len(data), countReader.Count())
}
}
}
7 changes: 7 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ var (
Help: "Counter of the number of logs uploaded to Kusto",
}, []string{"database", "table"})

IngestorUploadedBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will quite do what we want. This gauge will get its value overwritten every time it uploads a segment to whatever size a given segment is, so we'll get a lot of different values for this gauge that don't really correspond to any consistent view of how much we're uploading.

I think if we want to keep track of the bytes being uploaded, this needs to be a Counter that we always append to. This way we're keeping track of the total as time goes on, and the multiple goroutines that write the metric won't step on each other and overwrite each other's value.

Namespace: Namespace,
Subsystem: "ingestor",
Name: "uploaded_size_bytes",
Help: "Gauge for the size of uploaded data to Kusto in bytes",
}, []string{"database", "table"})

InvalidLogsDropped = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "ingestor",
Expand Down