Skip to content

Commit

Permalink
fix: Add wrapper around libbeat instrumentation to propagate missing …
Browse files Browse the repository at this point in the history
…tracer config (#13653)

Add wrapper to libbeat instrumentation to propagate missing tracer
config via apm environmental variables.

(cherry picked from commit 0a57bbe)
  • Loading branch information
1pkg authored and mergify[bot] committed Jul 11, 2024
1 parent 0c22dc6 commit e4b833d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
2 changes: 1 addition & 1 deletion changelogs/8.15.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ https://github.com/elastic/apm-server/compare/v8.14.2\...v8.15.0[View commits]
- Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326]
- Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196]
- Add require data stream to bulk index requests {pull}13398[13398]
- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514]
- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] {pull}13653[13653]
- Add mapping for OpenTelemetry attribute `messaging.destination.name` to derive `service.target` correctly {pull}13472[13472]
53 changes: 52 additions & 1 deletion internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (s *Runner) Run(ctx context.Context) error {
}
}

instrumentation, err := instrumentation.New(s.rawConfig, "apm-server", version.Version)
instrumentation, err := newInstrumentation(s.rawConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -537,6 +537,57 @@ func (s *Runner) Run(ctx context.Context) error {
return errors.Join(result, closeErr)
}

// newInstrumentation is a thin wrapper around libbeat instrumentation that
// sets missing tracer configuration from elastic agent.
func newInstrumentation(rawConfig *agentconfig.C) (instrumentation.Instrumentation, error) {
// This config struct contains missing fields from elastic agent APMConfig
// https://github.com/elastic/elastic-agent/blob/main/internal/pkg/core/monitoring/config/config.go#L127
// that are not directly handled by libbeat instrumentation below.
//
// Note that original config keys were additionally marshalled by
// https://github.com/elastic/elastic-agent/blob/main/pkg/component/runtime/apm_config_mapper.go#L18
// that's why the keys are different from the original APMConfig struct.
var apmCfg struct {
GlobalLabels string `config:"globallabels"`
TLS struct {
SkipVerify bool `config:"skipverify"`
ServerCertificate string `config:"servercert"`
ServerCA string `config:"serverca"`
} `config:"tls"`
}
cfg, err := rawConfig.Child("instrumentation", -1)
if err != nil {
// Fallback to instrumentation.New if the configs are not present.
return instrumentation.New(rawConfig, "apm-server", version.Version)
}
if err := cfg.Unpack(&apmCfg); err != nil {
return nil, err
}
const (
envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT"
envServerCert = "ELASTIC_APM_SERVER_CERT"
envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE"
envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS"
)
if apmCfg.TLS.SkipVerify {
os.Setenv(envVerifyServerCert, "false")
defer os.Unsetenv(envVerifyServerCert)
}
if apmCfg.TLS.ServerCertificate != "" {
os.Setenv(envServerCert, apmCfg.TLS.ServerCertificate)
defer os.Unsetenv(envServerCert)
}
if apmCfg.TLS.ServerCA != "" {
os.Setenv(envCACert, apmCfg.TLS.ServerCA)
defer os.Unsetenv(envCACert)
}
if len(apmCfg.GlobalLabels) > 0 {
os.Setenv(envGlobalLabels, apmCfg.GlobalLabels)
defer os.Unsetenv(envGlobalLabels)
}
return instrumentation.New(rawConfig, "apm-server", version.Version)
}

func maxConcurrentDecoders(memLimitGB float64) uint {
// Allow 128 concurrent decoders for each 1GB memory, limited to at most 2048.
const max = 2048
Expand Down
44 changes: 44 additions & 0 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
package beater

import (
"compress/zlib"
"context"
"encoding/json"
"encoding/pem"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -227,3 +231,43 @@ func TestRunnerNewDocappenderConfig(t *testing.T) {
})
}
}

func TestNewInstrumentation(t *testing.T) {
labels := make(chan map[string]string, 1)
defer close(labels)
s := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/intake/v2/events" {
var b struct {
Metadata struct {
Labels map[string]string `json:"labels"`
} `json:"metadata"`
}
zr, _ := zlib.NewReader(r.Body)
_ = json.NewDecoder(zr).Decode(&b)
labels <- b.Metadata.Labels
}
w.WriteHeader(http.StatusOK)
}))
defer s.Close()
certPath := filepath.Join(t.TempDir(), "cert.pem")
f, err := os.Create(certPath)
assert.NoError(t, err)
err = pem.Encode(f, &pem.Block{Type: "CERTIFICATE", Bytes: s.Certificate().Raw})
assert.NoError(t, err)
cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{
"instrumentation": map[string]interface{}{
"enabled": true,
"hosts": []string{s.URL},
"tls": map[string]interface{}{
"servercert": certPath,
},
"globallabels": "k1=val,k2=new val",
},
})
i, err := newInstrumentation(cfg)
require.NoError(t, err)
tracer := i.Tracer()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)
assert.Equal(t, map[string]string{"k1": "val", "k2": "new val"}, <-labels)
}

0 comments on commit e4b833d

Please sign in to comment.