Skip to content

Commit

Permalink
feat: instrument bigquery extractor with OpenTelemetry (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudo-suhas authored Aug 31, 2023
1 parent dd1ba54 commit d3b9647
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 18 deletions.
52 changes: 43 additions & 9 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
"fmt"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/logging/logadmin"
"github.com/goto/meteor/plugins"
"github.com/goto/salt/log"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
auditpb "google.golang.org/genproto/googleapis/cloud/audit"
Expand All @@ -23,20 +28,34 @@ type Config struct {
UsageProjectIDs []string
}

const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s" AND %s`
const (
advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s" AND %s`

metricTableDurn = "meteor.bq.client.table.duration"
)

type AuditLog struct {
logger log.Logger
client *logadmin.Client
config Config

histogram metric.Int64Histogram
}

func New(logger log.Logger) *AuditLog {
h, err := otel.Meter("github.com/goto/meteor/plugins/extractors/bigquery").
Int64Histogram(metricTableDurn, metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

return &AuditLog{
logger: logger,

histogram: h,
}
}

Expand Down Expand Up @@ -69,20 +88,35 @@ func (l *AuditLog) createClient(ctx context.Context) (*logadmin.Client, error) {
return logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON)))
}

func (l *AuditLog) Collect(ctx context.Context, tableID string) (*TableStats, error) {
func (l *AuditLog) Collect(ctx context.Context, tbl *bigquery.Table) (stats *TableStats, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.operation", "table.audit_logs"),
attribute.String("bq.project_id", tbl.ProjectID),
attribute.String("bq.dataset_id", tbl.DatasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

l.histogram.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

if l.client == nil {
return nil, errors.New("auditlog client is nil")
}

filter := l.buildFilter(tableID)
filter := l.buildFilter(tbl.TableID)
it := l.client.Entries(ctx,
logadmin.ProjectIDs(l.config.UsageProjectIDs),
logadmin.Filter(filter))

l.logger.Info("getting logs in these projects", "projects", l.config.UsageProjectIDs)
l.logger.Info("getting logs with the filter", "filter", filter)

tableStats := NewTableStats()
stats = NewTableStats()
for {
entry, err := it.Next()
if errors.Is(err, iterator.Done) {
Expand All @@ -98,12 +132,12 @@ func (l *AuditLog) Collect(ctx context.Context, tableID string) (*TableStats, er
continue
}

if errF := tableStats.Populate(logData); errF != nil {
if errF := stats.Populate(logData); errF != nil {
l.logger.Warn("error populating logdata", "err", errF)
continue
}
}
return tableStats, nil
return stats, nil
}

func (l *AuditLog) buildFilter(tableID string) string {
Expand Down
131 changes: 122 additions & 9 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"strings"
"sync"
"time"

"cloud.google.com/go/bigquery"
datacatalog "cloud.google.com/go/datacatalog/apiv1"
Expand All @@ -22,6 +23,9 @@ import (
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -60,6 +64,12 @@ type Exclude struct {

const (
maxPageSizeDefault = 100

metricDatasetsDurn = "meteor.bq.client.datasets.duration"
metricTablesDurn = "meteor.bq.client.tables.duration"
metricTableDurn = "meteor.bq.client.table.duration"
metricExcludedDatasetCtr = "meteor.bq.dataset.excluded"
metricExcludedTableCtr = "meteor.bq.table.excluded"
)

var sampleConfig = `
Expand Down Expand Up @@ -108,20 +118,49 @@ type Extractor struct {
policyTagClient *datacatalog.PolicyTagManagerClient
newClient NewClientFunc
randFn randFn

datasetsDurn metric.Int64Histogram
tablesDurn metric.Int64Histogram
tableDurn metric.Int64Histogram
excludedDatasetCtr metric.Int64Counter
excludedTableCtr metric.Int64Counter
}

type randFn func(rndSeed int64) func(int64) int64

type NewClientFunc func(ctx context.Context, logger log.Logger, config *Config) (*bigquery.Client, error)

func New(logger log.Logger, newClient NewClientFunc, randFn randFn) *Extractor {
meter := otel.Meter("github.com/goto/meteor/plugins/extractors/bigquery")

datasetsDurn, err := meter.Int64Histogram(metricDatasetsDurn, metric.WithUnit("ms"))
handleOtelErr(err)

tablesDurn, err := meter.Int64Histogram(metricTablesDurn, metric.WithUnit("ms"))
handleOtelErr(err)

tableDurn, err := meter.Int64Histogram(metricTableDurn, metric.WithUnit("ms"))
handleOtelErr(err)

excludedDatasetCtr, err := meter.Int64Counter(metricExcludedDatasetCtr)
handleOtelErr(err)

excludedTableCtr, err := meter.Int64Counter(metricExcludedTableCtr)
handleOtelErr(err)

galc := auditlog.New(logger)

e := &Extractor{
logger: logger,
galClient: galc,
newClient: newClient,
randFn: randFn,

datasetsDurn: datasetsDurn,
tablesDurn: tablesDurn,
tableDurn: tableDurn,
excludedDatasetCtr: excludedDatasetCtr,
excludedTableCtr: excludedTableCtr,
}
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
e.ScopeNotRequired = true
Expand Down Expand Up @@ -171,28 +210,49 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
// Fetch and iterate over datasets
pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "")
for {
var datasets []*bigquery.Dataset
nextToken, err := pager.NextPage(&datasets)
datasets, hasNext, err := e.fetchDatasetsNextPage(ctx, pager)
if err != nil {
return fmt.Errorf("fetch dataset: %w", err)
return err
}

for _, ds := range datasets {
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.excludedDatasetCtr.Add(
ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)),
)
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}
e.extractTable(ctx, ds, emit)
}

if nextToken == "" {
if !hasNext {
break
}
}

return nil
}

func (e *Extractor) fetchDatasetsNextPage(ctx context.Context, pager *iterator.Pager) (datasets []*bigquery.Dataset, hasNext bool, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{attribute.String("bq.project_id", e.config.ProjectID)}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}
e.datasetsDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

nextToken, err := pager.NextPage(&datasets)
if err != nil {
return nil, false, fmt.Errorf("fetch dataset: %w", err)
}

return datasets, nextToken != "", nil
}

// CreateClient creates a bigquery client
func CreateClient(ctx context.Context, logger log.Logger, config *Config) (*bigquery.Client, error) {
if config.ServiceAccountBase64 == "" && config.ServiceAccountJSON == "" {
Expand Down Expand Up @@ -227,8 +287,7 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit

pager := iterator.NewPager(ds.Tables(ctx), pageSize, "")
for {
var tables []*bigquery.Table
nextToken, err := pager.NextPage(&tables)
tables, hasNext, err := e.fetchTablesNextPage(ctx, ds.DatasetID, pager)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
break
Expand All @@ -240,14 +299,18 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit

for _, table := range tables {
if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes(
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", ds.DatasetID),
))
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
continue
}

tableFQN := table.FullyQualifiedName()

e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := table.Metadata(ctx)
tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
Expand All @@ -262,19 +325,44 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
emit(models.NewRecord(asset))
}

if nextToken == "" {
if !hasNext {
break
}
}
}

func (e *Extractor) fetchTablesNextPage(
ctx context.Context, datasetID string, pager *iterator.Pager,
) (tables []*bigquery.Table, hasNext bool, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", datasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

e.tablesDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

nextToken, err := pager.NextPage(&tables)
if err != nil {
return nil, false, err
}

return tables, nextToken != "", nil
}

// Build the bigquery table metadata
func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) (*v1beta2.Asset, error) {
var tableStats *auditlog.TableStats
if e.config.IsCollectTableUsage {
// Fetch and extract logs first to build a map
var errL error
tableStats, errL = e.galClient.Collect(ctx, t.TableID)
tableStats, errL = e.galClient.Collect(ctx, t)
if errL != nil {
e.logger.Warn("error populating table stats usage", "error", errL)
}
Expand Down Expand Up @@ -637,6 +725,25 @@ func (e *Extractor) getMaxPageSize() int {
return maxPageSizeDefault
}

func (e *Extractor) fetchTableMetadata(ctx context.Context, tbl *bigquery.Table) (md *bigquery.TableMetadata, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.operation", "table.metadata"),
attribute.String("bq.project_id", tbl.ProjectID),
attribute.String("bq.dataset_id", tbl.DatasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

e.tableDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

return tbl.Metadata(ctx)
}

// Register the extractor to catalog
func init() {
if err := registry.Extractors.Register("bigquery", func() plugins.Extractor {
Expand All @@ -661,3 +768,9 @@ func pickFirstNonZero(ints ...int) int {
}
return 0
}

func handleOtelErr(err error) {
if err != nil {
otel.Handle(err)
}
}
11 changes: 11 additions & 0 deletions plugins/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/go-playground/validator/v10"
"github.com/googleapis/gax-go/v2/apierror"
"github.com/goto/meteor/models"
"github.com/mcuadros/go-defaults"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -124,6 +125,16 @@ func DrainBody(resp *http.Response) {
_ = resp.Body.Close()
}

func BQErrReason(err error) string {
reason := "UNKNOWN"
var apiErr *apierror.APIError
if errors.As(err, &apiErr) {
reason = apiErr.Reason()
}

return reason
}

func parseBQTableFQN(fqn string) (projectID, datasetID, tableID string, err error) {
// fqn is the ID of the table in projectID:datasetID.tableID format.
if !strings.ContainsRune(fqn, ':') || strings.IndexRune(fqn, '.') < strings.IndexRune(fqn, ':') {
Expand Down

0 comments on commit d3b9647

Please sign in to comment.