Skip to content

Commit b667f4f

Browse files
authored
pyroscope(v2): query-backend tenant isolation check (#4184)
* pyroscope(v2): query-backend tenant isolation check * asserting error type * remove unresolved conflict * fix tests * skip invalid, log and metric * just filter out invalid datasets * improved metric, rename function, remove allocation, check tenant size * metric rename
1 parent 509d97f commit b667f4f

File tree

4 files changed

+102
-3
lines changed

4 files changed

+102
-3
lines changed

pkg/experiment/query_backend/block_reader.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@ package query_backend
33
import (
44
"context"
55
"fmt"
6+
"strings"
67

78
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
10+
"github.com/grafana/dskit/multierror"
11+
"github.com/grafana/dskit/tracing"
812
"github.com/opentracing/opentracing-go"
13+
"github.com/prometheus/client_golang/prometheus"
914
"github.com/prometheus/common/model"
1015
"github.com/prometheus/prometheus/model/labels"
1116
"github.com/prometheus/prometheus/promql/parser"
1217
"golang.org/x/sync/errgroup"
1318
"google.golang.org/grpc/codes"
1419
"google.golang.org/grpc/status"
1520

21+
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
1622
queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1"
1723
"github.com/grafana/pyroscope/pkg/experiment/block"
1824
"github.com/grafana/pyroscope/pkg/objstore"
@@ -41,6 +47,8 @@ type BlockReader struct {
4147
log log.Logger
4248
storage objstore.Bucket
4349

50+
metrics *metrics
51+
4452
// TODO:
4553
// - Use a worker pool instead of the errgroup.
4654
// - Reusable query context.
@@ -49,10 +57,11 @@ type BlockReader struct {
4957
// Instead, they should share the processing pipeline, if possible.
5058
}
5159

52-
func NewBlockReader(logger log.Logger, storage objstore.Bucket) *BlockReader {
60+
func NewBlockReader(logger log.Logger, storage objstore.Bucket, reg prometheus.Registerer) *BlockReader {
5361
return &BlockReader{
5462
log: logger,
5563
storage: storage,
64+
metrics: newMetrics(reg),
5665
}
5766
}
5867

@@ -71,7 +80,21 @@ func (b *BlockReader) Invoke(
7180
g, ctx := errgroup.WithContext(ctx)
7281
agg := newAggregator(req)
7382

83+
tenantMap := make(map[string]struct{})
84+
for _, tenant := range req.Tenant {
85+
tenantMap[tenant] = struct{}{}
86+
}
87+
7488
for _, md := range req.QueryPlan.Root.Blocks {
89+
md.Datasets, err = filterNotOwnedDatasets(md, tenantMap)
90+
if err != nil {
91+
b.metrics.datasetTenantIsolationFailure.Inc()
92+
traceId, _ := tracing.ExtractTraceID(ctx)
93+
level.Error(b.log).Log("msg", "trying to query datasets of other tenants", "valid-tenant", strings.Join(req.Tenant, ","), "block", md.Id, "err", err, "traceId", traceId)
94+
}
95+
if len(md.Datasets) == 0 {
96+
continue
97+
}
7598
obj := block.NewObject(b.storage, md)
7699
g.Go(util.RecoverPanic((&blockContext{
77100
ctx: ctx,
@@ -120,6 +143,9 @@ func validateRequest(req *queryv1.InvokeRequest) (*request, error) {
120143
if req.QueryPlan == nil || len(req.QueryPlan.Root.Blocks) == 0 {
121144
return nil, fmt.Errorf("no blocks to query")
122145
}
146+
if len(req.Tenant) == 0 {
147+
return nil, fmt.Errorf("no tenant provided")
148+
}
123149
matchers, err := parser.ParseMetricSelector(req.LabelSelector)
124150
if err != nil {
125151
return nil, fmt.Errorf("label selection is invalid: %w", err)
@@ -132,3 +158,20 @@ func validateRequest(req *queryv1.InvokeRequest) (*request, error) {
132158
}
133159
return &r, nil
134160
}
161+
162+
// While the metastore is expected to already filter datasets of other tenants, we do an additional check to avoid
163+
// processing blocks or datasets belonging to the wrong tenant.
164+
func filterNotOwnedDatasets(b *metastorev1.BlockMeta, tenantMap map[string]struct{}) ([]*metastorev1.Dataset, error) {
165+
errs := multierror.New()
166+
datasets := make([]*metastorev1.Dataset, 0)
167+
for _, dataset := range b.Datasets {
168+
datasetTenant := b.StringTable[dataset.Tenant]
169+
_, ok := tenantMap[datasetTenant]
170+
if ok {
171+
datasets = append(datasets, dataset)
172+
} else {
173+
errs.Add(fmt.Errorf(`dataset "%s" belongs to tenant "%s"`, b.StringTable[dataset.Name], datasetTenant))
174+
}
175+
}
176+
return datasets, errs.Err()
177+
}

pkg/experiment/query_backend/block_reader_test.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type testSuite struct {
3737
reader *BlockReader
3838
meta []*metastorev1.BlockMeta
3939
plan *queryv1.QueryPlan
40+
tenant []string
4041
}
4142

4243
func (s *testSuite) SetupSuite() {
@@ -47,13 +48,19 @@ func (s *testSuite) SetupSuite() {
4748
func (s *testSuite) SetupTest() {
4849
s.ctx = context.Background()
4950
s.logger = test.NewTestingLogger(s.T())
50-
s.reader = NewBlockReader(s.logger, &objstore.ReaderAtBucket{Bucket: s.bucket})
51+
s.reader = NewBlockReader(s.logger, &objstore.ReaderAtBucket{Bucket: s.bucket}, nil)
5152
s.meta = make([]*metastorev1.BlockMeta, len(s.blocks))
5253
for i, b := range s.blocks {
5354
s.meta[i] = b.CloneVT()
5455
}
5556
s.sanitizeMetadata()
5657
s.plan = query_plan.Build(s.meta, 10, 10)
58+
s.tenant = make([]string, 0)
59+
for _, b := range s.plan.Root.Blocks {
60+
for _, d := range b.Datasets {
61+
s.tenant = append(s.tenant, b.StringTable[d.Tenant])
62+
}
63+
}
5764
}
5865

5966
func (s *testSuite) loadFromDir(dir string) {
@@ -119,6 +126,7 @@ func (s *testSuite) Test_QueryTree_All() {
119126
QueryType: queryv1.QueryType_QUERY_TREE,
120127
Tree: &queryv1.TreeQuery{MaxNodes: 16},
121128
}},
129+
Tenant: s.tenant,
122130
})
123131

124132
s.Require().NoError(err)
@@ -142,6 +150,7 @@ func (s *testSuite) Test_QueryTree_Filter() {
142150
QueryType: queryv1.QueryType_QUERY_TREE,
143151
Tree: &queryv1.TreeQuery{MaxNodes: 16},
144152
}},
153+
Tenant: s.tenant,
145154
})
146155

147156
s.Require().NoError(err)
@@ -163,6 +172,7 @@ func (s *testSuite) Test_QueryPprof_Metadata() {
163172
QueryType: queryv1.QueryType_QUERY_PPROF,
164173
Pprof: &queryv1.PprofQuery{},
165174
}},
175+
Tenant: s.tenant,
166176
})
167177

168178
s.Require().NoError(err)
@@ -193,6 +203,7 @@ func (s *testSuite) Test_DatasetIndex_SeriesLabels_GroupBy() {
193203
LabelNames: []string{"service_name", "__profile_type__"},
194204
},
195205
}},
206+
Tenant: s.tenant,
196207
})
197208

198209
s.Require().NoError(err)
@@ -215,6 +226,7 @@ func (s *testSuite) Test_SeriesLabels() {
215226
QueryType: queryv1.QueryType_QUERY_SERIES_LABELS,
216227
SeriesLabels: &queryv1.SeriesLabelsQuery{},
217228
}},
229+
Tenant: s.tenant,
218230
})
219231

220232
s.Require().NoError(err)
@@ -242,6 +254,7 @@ func (s *testSuite) Test_QueryTimeSeries() {
242254
Query: []*queryv1.Query{query},
243255
QueryPlan: s.plan,
244256
LabelSelector: "{}",
257+
Tenant: s.tenant,
245258
}
246259

247260
resp, err := s.reader.Invoke(s.ctx, req)
@@ -250,3 +263,24 @@ func (s *testSuite) Test_QueryTimeSeries() {
250263
s.Require().Len(resp.Reports, 1)
251264
s.Require().NotNil(resp.Reports[0].TimeSeries)
252265
}
266+
267+
func (s *testSuite) Test_QueryTree_All_Tenant_Isolation() {
268+
queryTenant := "some-tenant"
269+
270+
s.Require().NotContains(s.tenant, queryTenant)
271+
272+
resp, err := s.reader.Invoke(s.ctx, &queryv1.InvokeRequest{
273+
EndTime: time.Now().UnixMilli(),
274+
LabelSelector: "{}",
275+
QueryPlan: s.plan,
276+
Query: []*queryv1.Query{{
277+
QueryType: queryv1.QueryType_QUERY_TREE,
278+
Tree: &queryv1.TreeQuery{MaxNodes: 16},
279+
}},
280+
Tenant: []string{queryTenant},
281+
})
282+
283+
s.Require().NoError(err)
284+
s.Require().NotNil(resp)
285+
s.Require().Len(resp.Reports, 0)
286+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package query_backend
2+
3+
import "github.com/prometheus/client_golang/prometheus"
4+
5+
type metrics struct {
6+
datasetTenantIsolationFailure prometheus.Counter
7+
}
8+
9+
func newMetrics(reg prometheus.Registerer) *metrics {
10+
m := &metrics{
11+
datasetTenantIsolationFailure: prometheus.NewCounter(
12+
prometheus.CounterOpts{
13+
Namespace: "pyroscope",
14+
Subsystem: "query_backend",
15+
Name: "dataset_tenant_isolation_failure_total",
16+
}),
17+
}
18+
if reg != nil {
19+
reg.MustRegister(m.datasetTenantIsolationFailure)
20+
}
21+
return m
22+
}

pkg/phlare/modules_experimental.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func (f *Phlare) initQueryBackend() (services.Service, error) {
329329
logger,
330330
f.reg,
331331
f.queryBackendClient,
332-
querybackend.NewBlockReader(f.logger, f.storageBucket),
332+
querybackend.NewBlockReader(f.logger, f.storageBucket, f.reg),
333333
)
334334
if err != nil {
335335
return nil, err

0 commit comments

Comments
 (0)