Skip to content

Commit

Permalink
[exporter/sumologic]: add sticky session (#33011)
Browse files Browse the repository at this point in the history
**Description:**

Adds support for sticky session in order to better support AWS LB. This
code is moved from Sumo Logic repository

**Link to tracking Issue:**

#32315

**Testing:**

Tested manually

**Documentation:**

N/A

---------

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>
  • Loading branch information
sumo-drosiek authored May 13, 2024
1 parent d43a904 commit 5e41c6b
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 25 deletions.
27 changes: 27 additions & 0 deletions .chloggen/drosiek-sumo-sticky-session.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sumologicexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add sticky session support

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32315]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 4 additions & 0 deletions exporter/sumologicexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ exporters:
# maximum connection timeout is 55s, default = 5s
timeout: <timeout>

# defines if sticky session support is enable.
# default=false
sticky_session_enabled: {true, false}

# for below described queueing and retry related configuration please refer to:
# https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md#configuration

Expand Down
10 changes: 8 additions & 2 deletions exporter/sumologicexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Config struct {
SourceHost string `mapstructure:"source_host"`
// Name of the client
Client string `mapstructure:"client"`

// StickySessionEnabled defines if sticky session support is enable.
// By default this is false.
StickySessionEnabled bool `mapstructure:"sticky_session_enabled"`
}

// createDefaultClientConfig returns default http client settings
Expand Down Expand Up @@ -132,8 +136,10 @@ const (
DefaultClient string = "otelcol"
// DefaultLogKey defines default LogKey value
DefaultLogKey string = "log"
// DefaultGraphiteTemplate defines default template for Graphite
DefaultGraphiteTemplate string = "%{_metric_}"
// DefaultDropRoutingAttribute defines default DropRoutingAttribute
DefaultDropRoutingAttribute string = ""
// DefaultStickySessionEnabled defines default StickySessionEnabled value
DefaultStickySessionEnabled bool = false
)

func (cfg *Config) Validate() error {
Expand Down
28 changes: 28 additions & 0 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type sumologicexporter struct {
foundSumologicExtension bool
sumologicExtension *sumologicextension.SumologicExtension

stickySessionCookieLock sync.RWMutex
stickySessionCookie string

id component.ID
}

Expand Down Expand Up @@ -289,6 +292,8 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
metricsURL,
logsURL,
tracesURL,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

Expand Down Expand Up @@ -366,6 +371,8 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met
metricsURL,
logsURL,
tracesURL,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

Expand Down Expand Up @@ -406,6 +413,27 @@ func (se *sumologicexporter) handleUnauthorizedErrors(ctx context.Context, errs
}
}

func (se *sumologicexporter) StickySessionCookie() string {
if se.foundSumologicExtension {
return se.sumologicExtension.StickySessionCookie()
}

se.stickySessionCookieLock.RLock()
defer se.stickySessionCookieLock.RUnlock()
return se.stickySessionCookie
}

func (se *sumologicexporter) SetStickySessionCookie(stickySessionCookie string) {
if se.foundSumologicExtension {
se.sumologicExtension.SetStickySessionCookie(stickySessionCookie)
return
}

se.stickySessionCookieLock.Lock()
se.stickySessionCookie = stickySessionCookie
se.stickySessionCookieLock.Unlock()
}

// get the destination url for a given signal type
// this mostly adds signal-specific suffixes if the format is otlp
func getSignalURL(oCfg *Config, endpointURL string, signal component.DataType) (string, error) {
Expand Down
7 changes: 4 additions & 3 deletions exporter/sumologicexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func createDefaultConfig() component.Config {
SourceHost: DefaultSourceHost,
Client: DefaultClient,

ClientConfig: createDefaultClientConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: qs,
ClientConfig: createDefaultClientConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: qs,
StickySessionEnabled: DefaultStickySessionEnabled,
}
}

Expand Down
80 changes: 60 additions & 20 deletions exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,18 @@ func (b *bodyBuilder) toCountingReader() *countingReader {
}

type sender struct {
logger *zap.Logger
config *Config
client *http.Client
filter filter
sources sourceFormats
prometheusFormatter prometheusFormatter
dataURLMetrics string
dataURLLogs string
dataURLTraces string
id component.ID
logger *zap.Logger
config *Config
client *http.Client
filter filter
sources sourceFormats
prometheusFormatter prometheusFormatter
dataURLMetrics string
dataURLLogs string
dataURLTraces string
stickySessionCookieFunc func() string
setStickySessionCookieFunc func(string)
id component.ID
}

const (
Expand All @@ -140,6 +142,7 @@ const (
contentTypeLogs string = "application/x-www-form-urlencoded"
contentTypePrometheus string = "application/vnd.sumologic.prometheus"
contentTypeOTLP string = "application/x-protobuf"
stickySessionKey string = "AWSALB"
)

func newSender(
Expand All @@ -152,19 +155,23 @@ func newSender(
metricsURL string,
logsURL string,
tracesURL string,
stickySessionCookieFunc func() string,
setStickySessionCookieFunc func(string),
id component.ID,
) *sender {
return &sender{
logger: logger,
config: cfg,
client: cl,
filter: f,
sources: s,
prometheusFormatter: pf,
dataURLMetrics: metricsURL,
dataURLLogs: logsURL,
dataURLTraces: tracesURL,
id: id,
logger: logger,
config: cfg,
client: cl,
filter: f,
sources: s,
prometheusFormatter: pf,
dataURLMetrics: metricsURL,
dataURLLogs: logsURL,
dataURLTraces: tracesURL,
stickySessionCookieFunc: stickySessionCookieFunc,
setStickySessionCookieFunc: setStickySessionCookieFunc,
id: id,
}
}

Expand All @@ -181,6 +188,10 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, reader *counti
return err
}

if s.config.StickySessionEnabled {
s.addStickySessionCookie(req)
}

s.logger.Debug("Sending data",
zap.String("pipeline", string(pipeline)),
zap.Any("headers", req.Header),
Expand All @@ -200,6 +211,10 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, reader *counti
}

func (s *sender) handleReceiverResponse(resp *http.Response) error {
if s.config.StickySessionEnabled {
s.updateStickySessionCookie(resp)
}

// API responds with a 200 or 204 with ConentLength set to 0 when all data
// has been successfully ingested.
if resp.ContentLength == 0 && (resp.StatusCode == 200 || resp.StatusCode == 204) {
Expand Down Expand Up @@ -684,3 +699,28 @@ func (s *sender) recordMetrics(duration time.Duration, count int64, req *http.Re
s.logger.Debug("error for recording metric for sent request", zap.Error(err))
}
}

func (s *sender) addStickySessionCookie(req *http.Request) {
currectCookieValue := s.stickySessionCookieFunc()
if currectCookieValue != "" {
cookie := &http.Cookie{
Name: stickySessionKey,
Value: currectCookieValue,
}
req.AddCookie(cookie)
}
}

func (s *sender) updateStickySessionCookie(resp *http.Response) {
cookies := resp.Cookies()
if len(cookies) > 0 {
for _, cookie := range cookies {
if cookie.Name == stickySessionKey {
if cookie.Value != s.stickySessionCookieFunc() {
s.setStickySessionCookieFunc(cookie.Value)
}
return
}
}
}
}
2 changes: 2 additions & 0 deletions exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func prepareSenderTest(t *testing.T, compression configcompression.Type, cb []fu
testServer.URL,
testServer.URL,
testServer.URL,
func() string { return "" },
func(string) {},
component.ID{},
),
}
Expand Down

0 comments on commit 5e41c6b

Please sign in to comment.