Skip to content

Commit

Permalink
fix: discard lines containing \0 (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Nov 21, 2023
1 parent 834f746 commit 6f99674
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pkg/collectconfig/executor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type (
AggWhereError int32
// error count when select
SelectError int32
ZeroBytes int
}

ParsedConf struct {
Expand Down Expand Up @@ -307,6 +308,7 @@ func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err e
if err != nil {
c.stat.IoError++
}
c.stat.ZeroBytes += resp.ZeroBytes

fileNotExists := os.IsNotExist(err)
if fileNotExists {
Expand Down Expand Up @@ -772,6 +774,7 @@ func (c *Consumer) createStatEvent(stat ConsumerStat) *pb2.ReportEventRequest_Ev
"f_where": int64(stat.FilterWhere),
"f_delay": int64(stat.FilterDelay),
"f_multiline": int64(stat.FilterMultiline),
"f_zerobytes": int64(stat.ZeroBytes),

"out_emit": int64(stat.Emit),
"out_error": int64(stat.EmitError),
Expand Down Expand Up @@ -815,6 +818,7 @@ func (c *Consumer) printStat() {
zap.Bool("miss", stat.Miss),
zap.Bool("broken", stat.Broken),
zap.Int32("processed", stat.Processed),
zap.Int("zerobytes", stat.ZeroBytes),
zap.Int32("fwhere", stat.FilterWhere),
zap.Int32("fbwhere", stat.FilterBeforeParseWhere),
zap.Int32("flogparse", stat.FilterLogParseError),
Expand Down
2 changes: 2 additions & 0 deletions pkg/collectconfig/executor/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type (
Count int
// Use a string to describe the scope of this read.
Range string
// Count of \u0000 of this read
ZeroBytes int

decodeMutex sync.Mutex
decodedCache map[string][]string
Expand Down
15 changes: 13 additions & 2 deletions pkg/collectconfig/executor/logstream/logstream_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package logstream

import (
"bytes"
"encoding/gob"
"errors"
"fmt"
Expand All @@ -13,12 +14,15 @@ import (
"go.uber.org/zap"
"io"
"os"
"strings"
"sync/atomic"
"time"
)

const (
expireTimeout = 3 * time.Minute
expireTimeout = 3 * time.Minute
DiscardLineWithZeroBytes = true
DiscardZeroBytesThreshold = 4096
)

type (
Expand Down Expand Up @@ -231,6 +235,7 @@ func (f *fileSubLogStream) Read(resp *ReadResponse) error {
n, err := f.file.ReadAt(buf, f.offset)
resp.IOEndTime = time.Now()
buf = buf[:n]
resp.ZeroBytes = bytes.Count(buf, []byte{0})
if err != nil && err != io.EOF {
f.closeFile()
logger.Errorz("[logstream] read error", zap.String("path", f.config.Path), zap.Error(err))
Expand All @@ -240,7 +245,13 @@ func (f *fileSubLogStream) Read(resp *ReadResponse) error {
resp.HasMore = f.offset < fileLength

var lines []string
if f.consumeBytes(buf[:n], func(line string) { lines = append(lines, line) }) {
if f.consumeBytes(buf[:n], func(line string) {
if DiscardLineWithZeroBytes && strings.Count(line, "\u0000") >= DiscardZeroBytesThreshold {
resp.HasBroken = true
} else {
lines = append(lines, line)
}
}) {
resp.HasBroken = true
}
resp.Lines = lines
Expand Down

0 comments on commit 6f99674

Please sign in to comment.