Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (h *Harvester) open() *os.File {

func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_timeout time.Duration) (*string, int, error) {
var is_partial bool = true
var over_line_limit = false
var newline_length int = 1
start_time := time.Now()

Expand All @@ -143,8 +144,16 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim
}
}

// TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
buffer.Write(segment)
if options.maxLineBytes > 0 && buffer.Len() + len(segment) > options.maxLineBytes {
if !over_line_limit {
emit("harvest: max line length reached, ignoring rest of line.")
over_line_limit = true
}
newline_length = 0
buffer.Write(segment[:options.maxLineBytes-buffer.Len()])
} else {
buffer.Write(segment)
}
}

if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions logstash-forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var options = &struct {
useSyslog bool
tailOnRotate bool
quiet bool
maxLineBytes int
version bool
}{
spoolSize: 1024,
Expand All @@ -42,8 +43,9 @@ func emitOptions() {
emit("\tharvester-buff-size: %d\n", options.harvesterBufferSize)
emit("\t--- flags ---------\n")
emit("\ttail (on-rotation): %t\n", options.tailOnRotate)
emit("\tlog-to-syslog: %t\n", options.useSyslog)
emit("\tquiet: %t\n", options.quiet)
emit("\tlog-to-syslog: %t\n", options.useSyslog)
emit("\tquiet: %t\n", options.quiet)
emit("\tmax-line-bytes: %d\n", options.maxLineBytes)
if runProfiler() {
emit("\t--- profile run ---\n")
emit("\tcpu-profile-file: %s\n", options.cpuProfileFile)
Expand Down Expand Up @@ -79,6 +81,7 @@ func init() {

flag.BoolVar(&options.quiet, "quiet", options.quiet, "operate in quiet mode - only emit errors to log")
flag.BoolVar(&options.version, "version", options.version, "output the version of this program")
flag.IntVar(&options.maxLineBytes, "max-line-bytes", options.maxLineBytes, "max number of bytes forwarded per line - note: set to 0 for unlimited.")
}

func init() {
Expand Down
11 changes: 8 additions & 3 deletions spec/lumberjack_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Lumberjack::Server.new(:ssl_certificate => ssl_certificate, :ssl_key => ssl_key, :port => port)
end

let(:max_line_bytes) { 32 }
let(:logstash_forwarder_config) do
<<-CONFIG
{
Expand Down Expand Up @@ -68,7 +69,9 @@
# TODO(sissel): Refactor this once we figure out a good way to do
# multi-component integration tests and property tests.
fd = File.new(input_file, "wb")
lines = [ "Hello world", "Fancy Pants", "Some Unicode Emoji: 👍 💗 " ]
lines = [ "Hello world", "Fancy Pants", "Some Unicode Emoji: 👍 💗 ",
"A really long line that should get truncated",
"X" * (2 ** 12) ]
lines.each { |l| fd.write(l + "\n") }
fd.flush
fd.close
Expand All @@ -86,7 +89,7 @@
lines.zip(events).each do |line, event|
# TODO(sissel): Resolve the need for this hack.
event["line"].force_encoding("UTF-8")
expect(event["line"]).to(eq(line))
expect(event["line"]).to(eq(line[0...max_line_bytes]))
expect(event[random_field]).to(eq(random_value))
end
end
Expand All @@ -97,7 +100,9 @@
context "when compiled from source" do
let(:lsf) do
# Start the process, return the pid
IO.popen(["./logstash-forwarder", "-config", config_file, "-quiet"])
IO.popen(["./logstash-forwarder", "-config", config_file, "-quiet",
"-max-line-bytes", "#{max_line_bytes}",
"-harvest-buffer-size", "#{max_line_bytes-1}"])
end
let(:host) { "localhost" }
it_behaves_like "logstash-forwarder"
Expand Down