Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions libbeat/processors/add_cloud_metadata/add_cloud_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package add_cloud_metadata

import (
"context"
"fmt"
"sync"
"time"
Expand All @@ -44,11 +45,13 @@ func init() {
}

type addCloudMetadata struct {
initOnce sync.Once
initData *initData
initDone chan struct{}
metadata mapstr.M
logger *logp.Logger
baseCtx context.Context
baseCtxCancel context.CancelFunc
initOnce sync.Once
initData *initData
initDone chan struct{}
metadata mapstr.M
logger *logp.Logger
}

type initData struct {
Expand All @@ -75,15 +78,18 @@ func New(c *cfg.C, log *logp.Logger) (beat.Processor, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
p := &addCloudMetadata{
initData: &initData{
fetchers: fetchers,
timeout: config.Timeout,
tlsConfig: tlsConfig,
overwrite: config.Overwrite,
},
initDone: make(chan struct{}),
logger: log.Named("add_cloud_metadata"),
initDone: make(chan struct{}),
logger: log.Named("add_cloud_metadata"),
baseCtx: ctx,
baseCtxCancel: cancel,
}

go p.init()
Expand All @@ -98,7 +104,7 @@ func (r result) String() string {
func (p *addCloudMetadata) init() {
p.initOnce.Do(func() { // fetch metadata only once
defer close(p.initDone) // signal that init() completed
result := p.fetchMetadata()
result := p.fetchMetadata(p.baseCtx)
if result == nil {
p.logger.Info("add_cloud_metadata: hosting provider type not detected.")
return
Expand Down Expand Up @@ -138,6 +144,12 @@ func (p *addCloudMetadata) String() string {
return "add_cloud_metadata=" + metadataStr
}

func (p *addCloudMetadata) Close() error {
p.baseCtxCancel()
p.initOnce.Do(func() {})
return nil
}

func (p *addCloudMetadata) addMeta(event *beat.Event, meta mapstr.M) error {
for key, metaVal := range meta {
// If key exists in event already and overwrite flag is set to false, this processor will not overwrite the
Expand Down
4 changes: 2 additions & 2 deletions libbeat/processors/add_cloud_metadata/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func setupFetchers(providers map[string]provider, c *conf.C, logger *logp.Logger
// hosting providers supported by this processor. It will wait for the results to
// be returned or for a timeout to occur then returns the first result that
// completed in time.
func (p *addCloudMetadata) fetchMetadata() *result {
func (p *addCloudMetadata) fetchMetadata(ctx context.Context) *result {
p.logger.Debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", p.initData.timeout)
start := time.Now()
defer func() {
Expand All @@ -174,7 +174,7 @@ func (p *addCloudMetadata) fetchMetadata() *result {
}

// Create context to enable explicit cancellation of the http requests.
ctx, cancel := context.WithTimeout(context.TODO(), p.initData.timeout)
ctx, cancel := context.WithTimeout(ctx, p.initData.timeout)
defer cancel()

results := make(chan result)
Expand Down
Loading