Skip to content

Commit

Permalink
feat(v2): metadata string interning (#3744)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Dec 10, 2024
1 parent 1886f2c commit a5ef3f9
Show file tree
Hide file tree
Showing 84 changed files with 2,694 additions and 2,221 deletions.
220 changes: 115 additions & 105 deletions api/gen/proto/go/metastore/v1/types.pb.go

Large diffs are not rendered by default.

625 changes: 343 additions & 282 deletions api/gen/proto/go/metastore/v1/types_vtproto.pb.go

Large diffs are not rendered by default.

33 changes: 19 additions & 14 deletions api/metastore/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,35 @@ syntax = "proto3";

package metastore.v1;

import "types/v1/types.proto";

message BlockList {
string tenant = 1;
uint32 shard = 2;
repeated string blocks = 3;
}

message BlockMeta {
uint64 format_version = 1;
uint32 format_version = 1;
// Block ID is a unique identifier for the block.
// This is the only field that is not included into
// the string table.
string id = 2;
int64 min_time = 3;
int64 max_time = 4;
uint32 shard = 5;
uint32 compaction_level = 6;
// Optional. Empty if compaction level is 0.
string tenant_id = 7;
repeated Dataset datasets = 8;
int32 tenant = 3;
uint32 shard = 4;
uint32 compaction_level = 5;
int64 min_time = 6;
int64 max_time = 7;
int32 created_by = 8;
uint64 size = 9;
string created_by = 10;
repeated Dataset datasets = 10;
// String table contains strings of the block.
// By convention, the first string is always an empty string.
repeated string string_table = 11;
}

message Dataset {
string tenant_id = 1;
string name = 2;
int32 tenant = 1;
int32 name = 2;
int64 min_time = 3;
int64 max_time = 4;

Expand All @@ -44,6 +48,7 @@ message Dataset {

// TODO: Delete. Use labels instead.
// Profile types present in the tenant service data.
repeated string profile_types = 7;
repeated types.v1.Labels labels = 8;
repeated int32 profile_types = 7;
// Length prefixed label key-value pairs.
repeated int32 labels = 8;
}
62 changes: 38 additions & 24 deletions api/openapiv2/gen/phlare.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -602,19 +602,17 @@
"type": "object",
"properties": {
"formatVersion": {
"type": "string",
"format": "uint64"
"type": "integer",
"format": "int64"
},
"id": {
"type": "string"
},
"minTime": {
"type": "string",
"format": "int64"
"description": "Block ID is a unique identifier for the block.\nThis is the only field that is not included into\nthe string table."
},
"maxTime": {
"type": "string",
"format": "int64"
"tenant": {
"type": "integer",
"format": "int32",
"description": "Optional. Empty if compaction level is 0."
},
"shard": {
"type": "integer",
Expand All @@ -624,9 +622,21 @@
"type": "integer",
"format": "int64"
},
"tenantId": {
"minTime": {
"type": "string",
"description": "Optional. Empty if compaction level is 0."
"format": "int64"
},
"maxTime": {
"type": "string",
"format": "int64"
},
"createdBy": {
"type": "integer",
"format": "int32"
},
"size": {
"type": "string",
"format": "uint64"
},
"datasets": {
"type": "array",
Expand All @@ -635,12 +645,12 @@
"$ref": "#/definitions/v1Dataset"
}
},
"size": {
"type": "string",
"format": "uint64"
},
"createdBy": {
"type": "string"
"stringTable": {
"type": "array",
"items": {
"type": "string"
},
"description": "String table contains strings of the block.\nBy convention, the first string is always an empty string."
}
}
},
Expand Down Expand Up @@ -824,11 +834,13 @@
"v1Dataset": {
"type": "object",
"properties": {
"tenantId": {
"type": "string"
"tenant": {
"type": "integer",
"format": "int32"
},
"name": {
"type": "string"
"type": "integer",
"format": "int32"
},
"minTime": {
"type": "string",
Expand All @@ -854,16 +866,18 @@
"profileTypes": {
"type": "array",
"items": {
"type": "string"
"type": "integer",
"format": "int32"
},
"description": "TODO: Delete. Use labels instead.\nProfile types present in the tenant service data."
},
"labels": {
"type": "array",
"items": {
"type": "object",
"$ref": "#/definitions/v1Labels"
}
"type": "integer",
"format": "int32"
},
"description": "Length prefixed label key-value pairs."
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/valyala/bytebufferpool v1.0.0
github.com/xlab/treeprint v1.2.0
go.etcd.io/bbolt v1.3.10
go.etcd.io/bbolt v1.3.11
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/proto/otlp v1.1.0
go.uber.org/atomic v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0=
go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.etcd.io/etcd/api/v3 v3.5.7 h1:sbcmosSVesNrWOJ58ZQFitHMdncusIifYcrBfwrlJSY=
go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA=
go.etcd.io/etcd/client/pkg/v3 v3.5.7 h1:y3kf5Gbp4e4q7egZdn5T7W9TSHUvkClN6u+Rq9mEOmg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package block

import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
Expand All @@ -12,7 +11,6 @@ import (
"sync"

"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -115,18 +113,17 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
}
level++

// Assuming that the first block in the job is the oldest one.
timestamp := ulid.MustParse(r.meta.Id).Time()
g := NewULIDGenerator(objects)
m := make(map[string]*CompactionPlan)
for _, obj := range objects {
for _, s := range obj.meta.Datasets {
tm, ok := m[s.TenantId]
tm, ok := m[obj.meta.StringTable[s.Tenant]]
if !ok {
tm = newBlockCompaction(timestamp, s.TenantId, r.meta.Shard, level)
m[s.TenantId] = tm
tm = newBlockCompaction(g.ULID().String(), obj.meta.StringTable[s.Tenant], r.meta.Shard, level)
m[obj.meta.StringTable[s.Tenant]] = tm
}
sm := tm.addDataset(s)
// Bind objects to datasets.
sm := tm.addDataset(obj.meta, s)
sm.append(NewDataset(s, obj))
}
}
Expand All @@ -135,44 +132,49 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
for _, tm := range m {
ordered = append(ordered, tm)
slices.SortFunc(tm.datasets, func(a, b *datasetCompaction) int {
return strings.Compare(a.meta.Name, b.meta.Name)
return strings.Compare(a.name, b.name)
})
}
slices.SortFunc(ordered, func(a, b *CompactionPlan) int {
return strings.Compare(a.tenantID, b.tenantID)
return strings.Compare(a.tenant, b.tenant)
})

return ordered, nil
}

type CompactionPlan struct {
tenantID string
datasetMap map[string]*datasetCompaction
tenant string
path string
datasetMap map[int32]*datasetCompaction
datasets []*datasetCompaction
meta *metastorev1.BlockMeta
strings *MetadataStrings
}

func newBlockCompaction(unixMilli uint64, tenantID string, shard uint32, compactionLevel uint32) *CompactionPlan {
return &CompactionPlan{
tenantID: tenantID,
datasetMap: make(map[string]*datasetCompaction),
meta: &metastorev1.BlockMeta{
FormatVersion: 1,
// TODO(kolesnikovae): Make it deterministic?
Id: ulid.MustNew(unixMilli, rand.Reader).String(),
TenantId: tenantID,
Shard: shard,
CompactionLevel: compactionLevel,
Datasets: nil,
MinTime: 0,
MaxTime: 0,
Size: 0,
},
}
func newBlockCompaction(
id string,
tenant string,
shard uint32,
compactionLevel uint32,
) *CompactionPlan {
p := &CompactionPlan{
tenant: tenant,
datasetMap: make(map[int32]*datasetCompaction),
strings: NewMetadataStringTable(),
}
p.path = BuildObjectPath(tenant, shard, compactionLevel, id)
p.meta = &metastorev1.BlockMeta{
FormatVersion: 1,
Id: id,
Tenant: p.strings.Put(tenant),
Shard: shard,
CompactionLevel: compactionLevel,
}
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
w := NewBlockWriter(dst, ObjectPath(b.meta), tmpdir)
w := NewBlockWriter(dst, b.path, tmpdir)
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
Expand All @@ -187,14 +189,17 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi
return nil, fmt.Errorf("flushing block writer: %w", err)
}
b.meta.Size = w.Offset()
b.meta.StringTable = b.strings.Strings
return b.meta, nil
}

func (b *CompactionPlan) addDataset(s *metastorev1.Dataset) *datasetCompaction {
sm, ok := b.datasetMap[s.Name]
func (b *CompactionPlan) addDataset(md *metastorev1.BlockMeta, s *metastorev1.Dataset) *datasetCompaction {
name := b.strings.Put(md.StringTable[s.Name])
tenant := b.strings.Put(md.StringTable[s.Tenant])
sm, ok := b.datasetMap[name]
if !ok {
sm = newDatasetCompaction(s.TenantId, s.Name)
b.datasetMap[s.Name] = sm
sm = b.newDatasetCompaction(tenant, name)
b.datasetMap[name] = sm
b.datasets = append(b.datasets, sm)
}
if b.meta.MinTime == 0 || s.MinTime < b.meta.MinTime {
Expand All @@ -207,8 +212,12 @@ func (b *CompactionPlan) addDataset(s *metastorev1.Dataset) *datasetCompaction {
}

type datasetCompaction struct {
// Dataset name.
name string
parent *CompactionPlan

meta *metastorev1.Dataset
ptypes map[string]struct{}
ptypes map[int32]struct{}
path string // Set at open.

datasets []*Dataset
Expand All @@ -224,12 +233,14 @@ type datasetCompaction struct {
flushOnce sync.Once
}

func newDatasetCompaction(tenantID, name string) *datasetCompaction {
func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
return &datasetCompaction{
ptypes: make(map[string]struct{}, 10),
name: b.strings.Strings[name],
parent: b,
ptypes: make(map[int32]struct{}, 10),
meta: &metastorev1.Dataset{
TenantId: tenantID,
Name: name,
Tenant: tenant,
Name: name,
// Updated at append.
MinTime: 0,
MaxTime: 0,
Expand All @@ -250,7 +261,8 @@ func (m *datasetCompaction) append(s *Dataset) {
m.meta.MaxTime = s.meta.MaxTime
}
for _, pt := range s.meta.ProfileTypes {
m.ptypes[pt] = struct{}{}
ptn := m.parent.strings.Put(s.obj.meta.StringTable[pt])
m.ptypes[ptn] = struct{}{}
}
}

Expand Down Expand Up @@ -388,11 +400,10 @@ func (m *datasetCompaction) writeTo(w *Writer) (err error) {
return err
}
m.meta.Size = w.Offset() - off
m.meta.ProfileTypes = make([]string, 0, len(m.ptypes))
m.meta.ProfileTypes = make([]int32, 0, len(m.ptypes))
for pt := range m.ptypes {
m.meta.ProfileTypes = append(m.meta.ProfileTypes, pt)
}
sort.Strings(m.meta.ProfileTypes)
return nil
}

Expand Down
Loading

0 comments on commit a5ef3f9

Please sign in to comment.