From ff60c5b88dba3dc04ed445f7c44501c1d2ff5ff3 Mon Sep 17 00:00:00 2001 From: "xiangfeng.xzc" Date: Tue, 21 Nov 2023 15:07:55 +0800 Subject: [PATCH] fix: discard lines containing \0 --- pkg/collectconfig/executor/consumer.go | 4 ++++ pkg/collectconfig/executor/logstream/logstream.go | 2 ++ .../executor/logstream/logstream_file.go | 15 +++++++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pkg/collectconfig/executor/consumer.go b/pkg/collectconfig/executor/consumer.go index e2f73e0..bbb55e9 100644 --- a/pkg/collectconfig/executor/consumer.go +++ b/pkg/collectconfig/executor/consumer.go @@ -119,6 +119,7 @@ type ( AggWhereError int32 // error count when select SelectError int32 + ZeroBytes int } ParsedConf struct { @@ -307,6 +308,7 @@ func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err e if err != nil { c.stat.IoError++ } + c.stat.ZeroBytes += resp.ZeroBytes fileNotExists := os.IsNotExist(err) if fileNotExists { @@ -772,6 +774,7 @@ func (c *Consumer) createStatEvent(stat ConsumerStat) *pb2.ReportEventRequest_Ev "f_where": int64(stat.FilterWhere), "f_delay": int64(stat.FilterDelay), "f_multiline": int64(stat.FilterMultiline), + "f_zerobytes": int64(stat.ZeroBytes), "out_emit": int64(stat.Emit), "out_error": int64(stat.EmitError), @@ -815,6 +818,7 @@ func (c *Consumer) printStat() { zap.Bool("miss", stat.Miss), zap.Bool("broken", stat.Broken), zap.Int32("processed", stat.Processed), + zap.Int("zerobytes", stat.ZeroBytes), zap.Int32("fwhere", stat.FilterWhere), zap.Int32("fbwhere", stat.FilterBeforeParseWhere), zap.Int32("flogparse", stat.FilterLogParseError), diff --git a/pkg/collectconfig/executor/logstream/logstream.go b/pkg/collectconfig/executor/logstream/logstream.go index 39e965b..bc0b0cb 100644 --- a/pkg/collectconfig/executor/logstream/logstream.go +++ b/pkg/collectconfig/executor/logstream/logstream.go @@ -80,6 +80,8 @@ type ( Count int // Use a string to describe the scope of this read. Range string + // Count of \u0000 of this read + ZeroBytes int decodeMutex sync.Mutex decodedCache map[string][]string diff --git a/pkg/collectconfig/executor/logstream/logstream_file.go b/pkg/collectconfig/executor/logstream/logstream_file.go index 4f50144..561c4a7 100644 --- a/pkg/collectconfig/executor/logstream/logstream_file.go +++ b/pkg/collectconfig/executor/logstream/logstream_file.go @@ -5,6 +5,7 @@ package logstream import ( + "bytes" "encoding/gob" "errors" "fmt" @@ -13,12 +14,15 @@ import ( "go.uber.org/zap" "io" "os" + "strings" "sync/atomic" "time" ) const ( - expireTimeout = 3 * time.Minute + expireTimeout = 3 * time.Minute + DiscardLineWithZeroBytes = true + DiscardZeroBytesThreshold = 4096 ) type ( @@ -231,6 +235,7 @@ func (f *fileSubLogStream) Read(resp *ReadResponse) error { n, err := f.file.ReadAt(buf, f.offset) resp.IOEndTime = time.Now() buf = buf[:n] + resp.ZeroBytes = bytes.Count(buf, []byte{0}) if err != nil && err != io.EOF { f.closeFile() logger.Errorz("[logstream] read error", zap.String("path", f.config.Path), zap.Error(err)) @@ -240,7 +245,13 @@ func (f *fileSubLogStream) Read(resp *ReadResponse) error { resp.HasMore = f.offset < fileLength var lines []string - if f.consumeBytes(buf[:n], func(line string) { lines = append(lines, line) }) { + if f.consumeBytes(buf[:n], func(line string) { + if DiscardLineWithZeroBytes && strings.Count(line, "\u0000") >= DiscardZeroBytesThreshold { + resp.HasBroken = true + } else { + lines = append(lines, line) + } + }) { resp.HasBroken = true } resp.Lines = lines