Skip to content

Commit 330b472

Browse files
committed
move stats to central place
1 parent 5b79689 commit 330b472

File tree

17 files changed

+204
-293
lines changed

17 files changed

+204
-293
lines changed

pkg/dataobj/internal/dataset/read_stats.go

Lines changed: 0 additions & 41 deletions
This file was deleted.

pkg/dataobj/internal/dataset/reader.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
7575
}
7676
}
7777

78-
r.region.Record(StatReadCalls.Observe(1))
78+
r.region.Record(xcap.StatDatasetReadCalls.Observe(1))
7979
ctx = xcap.ContextWithRegion(ctx, r.region)
8080

8181
// Our Read implementation works by:
@@ -149,8 +149,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
149149
primaryColumnBytes += s[i].Size()
150150
}
151151

152-
r.region.Record(StatPrimaryRowsRead.Observe(int64(rowsRead)))
153-
r.region.Record(StatPrimaryRowBytes.Observe(primaryColumnBytes))
152+
r.region.Record(xcap.StatPrimaryRowsRead.Observe(int64(rowsRead)))
153+
r.region.Record(xcap.StatPrimaryRowBytes.Observe(primaryColumnBytes))
154154
} else {
155155
rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize])
156156
if err != nil {
@@ -182,8 +182,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
182182
totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes)
183183
}
184184

185-
r.region.Record(StatSecondaryRowsRead.Observe(int64(count)))
186-
r.region.Record(StatSecondaryRowBytes.Observe(totalBytesFilled))
185+
r.region.Record(xcap.StatSecondaryRowsRead.Observe(int64(count)))
186+
r.region.Record(xcap.StatSecondaryRowBytes.Observe(totalBytesFilled))
187187
}
188188

189189
// We only advance r.row after we successfully read and filled rows. This
@@ -268,8 +268,8 @@ func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int,
268268
readSize = passCount
269269
}
270270

271-
r.region.Record(StatPrimaryRowsRead.Observe(int64(rowsRead)))
272-
r.region.Record(StatPrimaryRowBytes.Observe(primaryColumnBytes))
271+
r.region.Record(xcap.StatPrimaryRowsRead.Observe(int64(rowsRead)))
272+
r.region.Record(xcap.StatPrimaryRowBytes.Observe(primaryColumnBytes))
273273

274274
return rowsRead, passCount, nil
275275
}
@@ -555,11 +555,11 @@ func (r *Reader) initDownloader(ctx context.Context) error {
555555

556556
if primary {
557557
r.primaryColumnIndexes = append(r.primaryColumnIndexes, i)
558-
r.region.Record(StatPrimaryColumns.Observe(1))
559-
r.region.Record(StatPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
558+
r.region.Record(xcap.StatPrimaryColumns.Observe(1))
559+
r.region.Record(xcap.StatPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
560560
} else {
561-
r.region.Record(StatSecondaryColumns.Observe(1))
562-
r.region.Record(StatSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
561+
r.region.Record(xcap.StatSecondaryColumns.Observe(1))
562+
r.region.Record(xcap.StatSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
563563
}
564564
}
565565

@@ -593,8 +593,8 @@ func (r *Reader) initDownloader(ctx context.Context) error {
593593
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
594594
}
595595

596-
r.region.Record(StatMaxRows.Observe(int64(rowsCount)))
597-
r.region.Record(StatRowsAfterPruning.Observe(int64(ranges.TotalRowCount())))
596+
r.region.Record(xcap.StatMaxRows.Observe(int64(rowsCount)))
597+
r.region.Record(xcap.StatRowsAfterPruning.Observe(int64(ranges.TotalRowCount())))
598598

599599
return nil
600600
}

pkg/dataobj/internal/dataset/reader_downloader.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,13 @@ func (dl *readerDownloader) downloadBatch(ctx context.Context, requestor *reader
232232
if region := xcap.RegionFromContext(ctx); region != nil {
233233
for _, page := range batch {
234234
if page.column.primary {
235-
region.Record(StatPrimaryPagesDownloaded.Observe(1))
236-
region.Record(StatPrimaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
237-
region.Record(StatPrimaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
235+
region.Record(xcap.StatPrimaryPagesDownloaded.Observe(1))
236+
region.Record(xcap.StatPrimaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
237+
region.Record(xcap.StatPrimaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
238238
} else {
239-
region.Record(StatSecondaryPagesDownloaded.Observe(1))
240-
region.Record(StatSecondaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
241-
region.Record(StatSecondaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
239+
region.Record(xcap.StatSecondaryPagesDownloaded.Observe(1))
240+
region.Record(xcap.StatSecondaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
241+
region.Record(xcap.StatSecondaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
242242
}
243243
}
244244
}
@@ -589,13 +589,13 @@ func (page *readerPage) PageDesc() *PageDesc {
589589

590590
func (page *readerPage) ReadPage(ctx context.Context) (PageData, error) {
591591
region := xcap.RegionFromContext(ctx)
592-
region.Record(StatPagesScanned.Observe(1))
592+
region.Record(xcap.StatPagesScanned.Observe(1))
593593
if page.data != nil {
594-
region.Record(StatPagesFoundInCache.Observe(1))
594+
region.Record(xcap.StatPagesFoundInCache.Observe(1))
595595
return page.data, nil
596596
}
597597

598-
region.Record(StatPageDownloadRequests.Observe(1))
598+
region.Record(xcap.StatPageDownloadRequests.Observe(1))
599599
if err := page.column.dl.downloadBatch(ctx, page); err != nil {
600600
return nil, err
601601
}

pkg/dataobj/internal/dataset/reader_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -917,14 +917,14 @@ func Test_Reader_Stats(t *testing.T) {
917917
obsMap[obs.Statistic.Name()] = obs.Value.(int64)
918918
}
919919

920-
require.Equal(t, int64(2), obsMap[StatReadCalls.Name()])
921-
require.Equal(t, int64(2), obsMap[StatPrimaryColumns.Name()])
922-
require.Equal(t, int64(2), obsMap[StatSecondaryColumns.Name()])
923-
require.Equal(t, int64(5), obsMap[StatPrimaryColumnPages.Name()])
924-
require.Equal(t, int64(8), obsMap[StatSecondaryColumnPages.Name()])
925-
926-
require.Equal(t, int64(len(basicReaderTestData)), obsMap[StatMaxRows.Name()])
927-
require.Equal(t, int64(3), obsMap[StatRowsAfterPruning.Name()])
928-
require.Equal(t, int64(3), obsMap[StatPrimaryRowsRead.Name()])
929-
require.Equal(t, int64(1), obsMap[StatSecondaryRowsRead.Name()])
920+
require.Equal(t, int64(2), obsMap[xcap.StatDatasetReadCalls.Name()])
921+
require.Equal(t, int64(2), obsMap[xcap.StatPrimaryColumns.Name()])
922+
require.Equal(t, int64(2), obsMap[xcap.StatSecondaryColumns.Name()])
923+
require.Equal(t, int64(5), obsMap[xcap.StatPrimaryColumnPages.Name()])
924+
require.Equal(t, int64(8), obsMap[xcap.StatSecondaryColumnPages.Name()])
925+
926+
require.Equal(t, int64(len(basicReaderTestData)), obsMap[xcap.StatMaxRows.Name()])
927+
require.Equal(t, int64(3), obsMap[xcap.StatRowsAfterPruning.Name()])
928+
require.Equal(t, int64(3), obsMap[xcap.StatPrimaryRowsRead.Name()])
929+
require.Equal(t, int64(1), obsMap[xcap.StatSecondaryRowsRead.Name()])
930930
}

pkg/dataobj/metastore/object.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,6 @@ const (
3939
metastoreWindowSize = 12 * time.Hour
4040
)
4141

42-
var (
43-
statIndexObjects = xcap.NewStatisticInt64("metastore.index.objects", xcap.AggregationTypeSum)
44-
statResolvedSections = xcap.NewStatisticInt64("metastore.resolved.sections", xcap.AggregationTypeSum)
45-
)
46-
4742
type ObjectMetastore struct {
4843
bucket objstore.Bucket
4944
parallelism int
@@ -193,7 +188,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
193188
}
194189

195190
m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths)))
196-
region.Record(statIndexObjects.Observe(int64(len(indexPaths))))
191+
region.Record(xcap.StatIndexObjects.Observe(int64(len(indexPaths))))
197192

198193
// Return early if no index files are found
199194
if len(indexPaths) == 0 {
@@ -250,7 +245,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
250245
duration := sectionsTimer.ObserveDuration()
251246
m.metrics.resolvedSectionsTotal.Observe(float64(len(streamSectionPointers)))
252247
m.metrics.resolvedSectionsRatio.Observe(float64(len(streamSectionPointers)) / float64(initialSectionPointersCount))
253-
region.Record(statResolvedSections.Observe(int64(len(streamSectionPointers))))
248+
region.Record(xcap.StatResolvedSections.Observe(int64(len(streamSectionPointers))))
254249

255250
level.Debug(utillog.WithContext(ctx, m.logger)).Log(
256251
"msg", "resolved sections",

pkg/dataobj/sections/internal/columnar/decoder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (dec *Decoder) Pages(ctx context.Context, columns []*datasetmd.ColumnDesc)
7979
region := xcap.RegionFromContext(ctx)
8080
startTime := time.Now()
8181
defer func() {
82-
region.Record(dataset.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds()))
82+
region.Record(xcap.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds()))
8383
}()
8484

8585
ranges := make([]rangeio.Range, 0, len(columns))
@@ -135,7 +135,7 @@ func (dec *Decoder) ReadPages(ctx context.Context, pages []*datasetmd.PageDesc)
135135
region := xcap.RegionFromContext(ctx)
136136
startTime := time.Now()
137137
defer func() {
138-
region.Record(dataset.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds()))
138+
region.Record(xcap.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds()))
139139
}()
140140

141141
ranges := make([]rangeio.Range, 0, len(pages))

pkg/engine/internal/executor/compat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
8181
return cmp.Compare(a.name, b.name)
8282
})
8383

84-
region.Record(statCompatCollisionFound.Observe(true))
84+
region.Record(xcap.StatCompatCollisionFound.Observe(true))
8585

8686
// Next, update the schema with the new columns that have the _extracted suffix.
8787
newSchema := batch.Schema()

pkg/engine/internal/executor/pipeline.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,17 +337,17 @@ func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error)
337337
start := time.Now()
338338

339339
if p.region != nil {
340-
p.region.Record(statReadCalls.Observe(1))
340+
p.region.Record(xcap.StatReadCalls.Observe(1))
341341
}
342342

343343
rec, err := p.inner.Read(ctx)
344344

345345
if p.region != nil {
346346
if rec != nil {
347-
p.region.Record(statRowsOut.Observe(rec.NumRows()))
347+
p.region.Record(xcap.StatRowsOut.Observe(rec.NumRows()))
348348
}
349349

350-
p.region.Record(statReadDuration.Observe(time.Since(start).Nanoseconds()))
350+
p.region.Record(xcap.StatReadDuration.Observe(time.Since(start).Nanoseconds()))
351351
}
352352

353353
return rec, err

pkg/engine/internal/executor/stats.go

Lines changed: 0 additions & 14 deletions
This file was deleted.

pkg/storage/bucket/xcap_bucket.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,6 @@ import (
99
"github.com/grafana/loki/v3/pkg/xcap"
1010
)
1111

12-
// Statistics for tracking bucket operation counts.
13-
var (
14-
statBucketGet = xcap.NewStatisticInt64("bucket.get", xcap.AggregationTypeSum)
15-
statBucketGetRange = xcap.NewStatisticInt64("bucket.getrange", xcap.AggregationTypeSum)
16-
statBucketIter = xcap.NewStatisticInt64("bucket.iter", xcap.AggregationTypeSum)
17-
statBucketAttributes = xcap.NewStatisticInt64("bucket.attributes", xcap.AggregationTypeSum)
18-
)
19-
2012
// XCapBucket wraps an objstore.Bucket and records request counts to the xcap
2113
// Region found in the context. If no Region is present in the context, the
2214
// wrapper simply delegates to the underlying bucket without recording.
@@ -51,13 +43,13 @@ func (b *XCapBucket) Close() error {
5143

5244
// Iter calls f for each entry in the given directory (not recursive.).
5345
func (b *XCapBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
54-
recordOp(ctx, statBucketIter)
46+
recordOp(ctx, xcap.StatBucketIter)
5547
return b.bkt.Iter(ctx, dir, f, options...)
5648
}
5749

5850
// IterWithAttributes calls f for each entry in the given directory similar to Iter.
5951
func (b *XCapBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
60-
recordOp(ctx, statBucketIter)
52+
recordOp(ctx, xcap.StatBucketIter)
6153
return b.bkt.IterWithAttributes(ctx, dir, f, options...)
6254
}
6355

@@ -68,13 +60,13 @@ func (b *XCapBucket) SupportedIterOptions() []objstore.IterOptionType {
6860

6961
// Get returns a reader for the given object name.
7062
func (b *XCapBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
71-
recordOp(ctx, statBucketGet)
63+
recordOp(ctx, xcap.StatBucketGet)
7264
return b.bkt.Get(ctx, name)
7365
}
7466

7567
// GetRange returns a new range reader for the given object name and range.
7668
func (b *XCapBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
77-
recordOp(ctx, statBucketGetRange)
69+
recordOp(ctx, xcap.StatBucketGetRange)
7870
return b.bkt.GetRange(ctx, name, off, length)
7971
}
8072

@@ -100,7 +92,7 @@ func (b *XCapBucket) IsAccessDeniedErr(err error) bool {
10092

10193
// Attributes returns information about the specified object.
10294
func (b *XCapBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
103-
recordOp(ctx, statBucketAttributes)
95+
recordOp(ctx, xcap.StatBucketAttributes)
10496
return b.bkt.Attributes(ctx, name)
10597
}
10698

0 commit comments

Comments
 (0)