Skip to content

Commit

Permalink
feat: Use of configurable concurrency for extracting assets from BigQ…
Browse files Browse the repository at this point in the history
…uery
  • Loading branch information
ravisuhag committed Oct 31, 2024
1 parent 4c67f9c commit 3772032
Showing 1 changed file with 35 additions and 17 deletions.
52 changes: 35 additions & 17 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -56,6 +57,7 @@ type Config struct {
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"`
Concurrency int `mapstructure:"concurrency" default:"10"`
}

type Exclude struct {
Expand Down Expand Up @@ -122,6 +124,7 @@ type Extractor struct {
policyTagClient *datacatalog.PolicyTagManagerClient
newClient NewClientFunc
randFn randFn
eg *errgroup.Group

datasetsDurn metric.Int64Histogram
tablesDurn metric.Int64Histogram
Expand Down Expand Up @@ -204,13 +207,17 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
e.logger.Error("failed to create policy tag manager client", "err", err)
}

e.eg = &errgroup.Group{}
e.eg.SetLimit(e.config.Concurrency)

return nil
}

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
pageSize := pickFirstNonZero(e.config.DatasetPageSize, e.config.MaxPageSize, 10)

wg := sync.WaitGroup{}
// Fetch and iterate over datasets
pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "")
for {
Expand All @@ -227,14 +234,24 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}
e.extractTable(ctx, ds, emit)
wg.Add(1)
go func(ds *bigquery.Dataset) {
defer wg.Done()
e.extractTable(ctx, ds, emit)
}(ds)
}

if !hasNext {
break
}
}

wg.Wait()
if err := e.eg.Wait(); err != nil {
e.logger.Error("error extracting bigquery tables", "err", err)
return err
}

return nil
}

Expand Down Expand Up @@ -311,22 +328,23 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
continue
}

tableFQN := table.FullyQualifiedName()

e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
}

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
continue
}

emit(models.NewRecord(asset))
table := table
e.eg.Go(func() error {
tableFQN := table.FullyQualifiedName()
e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
return nil
}
asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
return nil
}
emit(models.NewRecord(asset))
return nil
})
}

if !hasNext {
Expand Down

0 comments on commit 3772032

Please sign in to comment.