Skip to content

Commit

Permalink
[pkg/stanza] Fix nil pointer dereference (#27882)
Browse files Browse the repository at this point in the history
**Description:** 
Copying reader with empty FlushState will cause a nil pointer
dereference.
  • Loading branch information
haoqixu authored Oct 20, 2023
1 parent e347ff3 commit da24c2b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
13 changes: 3 additions & 10 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) {
Offset: old.Offset,
FileAttributes: util.MapCopy(old.FileAttributes),
HeaderFinalized: old.HeaderFinalized,
FlushState: &flush.State{
LastDataChange: old.FlushState.LastDataChange,
LastDataLength: old.FlushState.LastDataLength,
},
FlushState: old.FlushState.Copy(),
})
}

Expand All @@ -71,12 +68,8 @@ func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) {
decoder: decode.New(f.Encoding),
}

if m.FlushState == nil {
r.lineSplitFunc = trim.WithFunc(trim.ToLength(f.SplitFunc, f.Config.MaxLogSize), f.TrimFunc)
} else {
flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout)
r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.Config.MaxLogSize), f.TrimFunc)
}
flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout)
r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.Config.MaxLogSize), f.TrimFunc)

if !f.FromBeginning {
if err = r.offsetToEnd(); err != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func TestCopyReaderWithoutFlusher(t *testing.T) {
f, _ := testReaderFactory(t, split.Config{}, defaultMaxLogSize, 0)

temp := openTemp(t, t.TempDir())
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)

r, err := f.NewReader(temp, fp)
require.NoError(t, err)

// A copy of the reader should not panic
_, err = f.Copy(r, temp)
assert.NoError(t, err)
}

func TestPersistFlusher(t *testing.T) {
flushPeriod := 100 * time.Millisecond
f, emitChan := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod)
Expand Down
12 changes: 11 additions & 1 deletion pkg/stanza/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@ type State struct {
LastDataLength int
}

func (s *State) Copy() *State {
if s == nil {
return nil
}
return &State{
LastDataChange: s.LastDataChange,
LastDataLength: s.LastDataLength,
}
}

// Func wraps a bufio.SplitFunc with a timer.
// When the timer expires, an incomplete token may be returned.
// The timer will reset any time the data parameter changes.
func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc {
if period <= 0 {
if s == nil || period <= 0 {
return splitFunc
}

Expand Down

0 comments on commit da24c2b

Please sign in to comment.