Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7568,6 +7568,35 @@ THE SOFTWARE.



--------------------------------------------------------------------------------
Dependency : github.com/brianvoe/gofakeit
Version: v3.18.0+incompatible
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/brianvoe/gofakeit@v3.18.0+incompatible/LICENSE.txt:

The MIT License (MIT)

Copyright (c) [year] [fullname]

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/cavaliergopher/rpm
Version: v1.2.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: >-
[Filestream] ensure harvester always restarts if the file has not been fully ingested.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/beats/pull/47107

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/beats/issues/46923
11 changes: 6 additions & 5 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import (
type config struct {
Reader readerConfig `config:",inline"`

ID string `config:"id"`
Paths []string `config:"paths"`
Close closerConfig `config:"close"`
FileWatcher *conf.Namespace `config:"prospector"`
FileIdentity *conf.Namespace `config:"file_identity"`
ID string `config:"id"`
Paths []string `config:"paths"`
Close closerConfig `config:"close"`
FileWatcher fileWatcherConfig `config:"prospector.scanner"`
FileIdentity *conf.Namespace `config:"file_identity"`

// -1 means that registry will never be cleaned
CleanInactive time.Duration `config:"clean_inactive" validate:"min=-1"`
Expand Down Expand Up @@ -114,6 +114,7 @@ func defaultConfig() config {
CleanRemoved: true,
HarvesterLimit: 0,
IgnoreOlder: 0,
FileWatcher: defaultFileWatcherConfig(), // Config key: prospector.scanner
}
}

Expand Down
128 changes: 64 additions & 64 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package filestream

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -42,19 +42,19 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
libbeatinteg "github.com/elastic/beats/v7/libbeat/tests/integration"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/go-concert/unison"
)

type inputTestingEnvironment struct {
logger *logp.Logger
loggerBuffer *bytes.Buffer
t *testing.T
workingDir string
stateStore statestore.States
pipeline *mockPipelineConnector
monitoring beat.Monitoring
testLogger *logptest.Logger
t *testing.T
workingDir string
stateStore statestore.States
pipeline *mockPipelineConnector
monitoring beat.Monitoring

pluginInitOnce sync.Once
plugin v2.Plugin
Expand All @@ -71,38 +71,19 @@ type registryEntry struct {
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
// logp.NewInMemoryLocal will always use a console encoder, passing a
// JSONEncoderConfig will only change the keys, not the final encoding.
logger, buff := logp.NewInMemoryLocal("", logp.ConsoleEncoderConfig())

t.Cleanup(func() {
if t.Failed() {
f, err := os.CreateTemp("", t.Name()+"-*")
if err != nil {
t.Errorf("cannot create temp file for logs: %s", err)
}

defer f.Close()

data := buff.Bytes()
t.Logf("Debug Logs:%s\n", string(data))
t.Logf("Logs written to %s", f.Name())
if _, err := f.Write(data); err != nil {
t.Logf("could not write log file for debugging: %s", err)
}

return
}
})
tempDir := libbeatinteg.CreateTempDir(t, filepath.Join("..", "..", "build", "integration-tests"))
logger := logptest.NewFileLogger(
t,
tempDir,
)

return &inputTestingEnvironment{
logger: logger,
loggerBuffer: buff,
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
monitoring: beat.NewMonitoring(),
testLogger: logger,
t: t,
workingDir: tempDir,
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
monitoring: beat.NewMonitoring(),
}
}

Expand Down Expand Up @@ -134,7 +115,7 @@ func (e *inputTestingEnvironment) createInput(config map[string]any) (v2.Input,

func (e *inputTestingEnvironment) getManager() v2.InputManager {
e.pluginInitOnce.Do(func() {
e.plugin = Plugin(e.logger, e.stateStore)
e.plugin = Plugin(e.testLogger.Logger, e.stateStore)
})
return e.plugin.Manager
}
Expand All @@ -145,7 +126,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, id string, inp
defer wg.Done()
defer func() { _ = grp.Stop() }()

logger, _ := logp.NewDevelopmentLogger("")
logger := e.testLogger.Named("metrics-registry")
reg := inputmon.NewMetricsRegistry(
id, inp.Name(), e.monitoring.InputsRegistry(), logger)
defer inputmon.CancelMetricsRegistry(
Expand All @@ -158,7 +139,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, id string, inp
Cancelation: ctx,
StatusReporter: nil,
MetricsRegistry: reg,
Logger: e.logger,
Logger: e.testLogger.Named("input.filestream"),
}
_ = inp.Run(inputCtx, e.pipeline)
}(&e.wg, &e.grp)
Expand All @@ -168,12 +149,15 @@ func (e *inputTestingEnvironment) waitUntilInputStops() {
e.wg.Wait()
}

func (e *inputTestingEnvironment) mustWriteToFile(filename string, data []byte) {
// mustWriteToFile writes data to file and returns the full path
func (e *inputTestingEnvironment) mustWriteToFile(filename string, data []byte) string {
path := e.abspath(filename)
err := os.WriteFile(path, data, 0o644)
if err != nil {
e.t.Fatalf("failed to write file '%s': %+v", path, err)
}

return path
}

func (e *inputTestingEnvironment) mustAppendToFile(filename string, data []byte) {
Expand Down Expand Up @@ -266,34 +250,37 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID stri

// requireMetaInRegistry checks if the expected metadata is saved to the registry.
func (e *inputTestingEnvironment) waitUntilMetaInRegistry(filename, inputID string, expectedMeta fileMeta) {
for {
require.EventuallyWithT(e.t, func(t *assert.CollectT) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
continue
t.Errorf("cannot stat file: %s", err)
return
}

id := getIDFromPath(filepath, inputID, fi)
entry, err := e.getRegistryState(id)
if err != nil {
continue
t.Errorf("cannot get registry state: %s", err)
return
}

if entry.Meta == nil {
continue
t.Errorf("entry metadata cannot be nil")
return
}

var meta fileMeta
err = typeconv.Convert(&meta, entry.Meta)
if err != nil {
e.t.Fatalf("cannot convert: %+v", err)
t.Errorf("cannot convert: %+v", err)
}

if requireMetadataEquals(expectedMeta, meta) {
break
if !requireMetadataEquals(expectedMeta, meta) {
t.Errorf("Metadata is not equal. Expecting:\n%#v\nGot:\n%#v", expectedMeta, meta)
}
time.Sleep(10 * time.Millisecond)
}

}, 30*time.Second, time.Second, "Metadata differs expected")
}

func requireMetadataEquals(one, other fileMeta) bool {
Expand Down Expand Up @@ -386,7 +373,6 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e
keys = append(keys, key)
return false, nil
})
e.t.Logf("keys in store: %v", keys)

return registryEntry{},
fmt.Errorf("error when getting expected key '%s' from store: %w",
Expand Down Expand Up @@ -467,13 +453,15 @@ func (e *inputTestingEnvironment) waitUntilEventCountCtx(ctx context.Context, co

// waitUntilAtLeastEventCount waits until at least count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilAtLeastEventCount(count int) {
for {
sum := len(e.pipeline.GetAllEvents())
if count <= sum {
return
}
time.Sleep(10 * time.Millisecond)
}
require.EventuallyWithTf(
e.t,
func(t *assert.CollectT) {
sum := len(e.pipeline.GetAllEvents())
require.GreaterOrEqualf(t, sum, count, "got %d events", sum)
},
30*time.Second,
200*time.Millisecond,
"found less than %d events", count)
}

// waitUntilHarvesterIsDone detects Harvester stop by checking if the last client has been closed
Expand All @@ -482,7 +470,7 @@ func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() {
require.Eventually(
e.t,
func() bool {
return e.pipeline.clients[len(e.pipeline.clients)-1].closed
return e.pipeline.clients[len(e.pipeline.clients)-1].closed.Load()
},
time.Second*10,
time.Millisecond*10,
Expand Down Expand Up @@ -564,6 +552,18 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
require.True(e.t, selectedEvent.Timestamp.Equal(tm), "got: %s, expected: %s", selectedEvent.Timestamp.String(), tm.String())
}

// logContains ensures s is a sub string on any log line.
// If s is not found, the test fails
func (e *inputTestingEnvironment) logContains(s string) {
e.t.Helper()
e.testLogger.LogContains(e.t, s)
}

func (e *inputTestingEnvironment) WaitLogsContains(s string, timeout time.Duration, msgAndArgs ...any) {
e.t.Helper()
e.testLogger.WaitLogsContains(e.t, s, timeout, msgAndArgs...)
}

var _ statestore.States = (*testInputStore)(nil)

type testInputStore struct {
Expand Down Expand Up @@ -592,7 +592,7 @@ type mockClient struct {
publishing []beat.Event
published []beat.Event
ackHandler beat.EventListener
closed bool
closed atomic.Bool
mtx sync.Mutex
canceler context.CancelFunc
}
Expand Down Expand Up @@ -635,11 +635,11 @@ func (c *mockClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()

if c.closed {
if c.closed.Load() {
return fmt.Errorf("mock client already closed")
}

c.closed = true
c.closed.Store(true)
return nil
}

Expand Down
Loading
Loading