diff --git a/harvester.go b/harvester.go index ad09b931..fe4fee5f 100644 --- a/harvester.go +++ b/harvester.go @@ -126,6 +126,7 @@ func (h *Harvester) open() *os.File { func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_timeout time.Duration) (*string, int, error) { var is_partial bool = true + var over_line_limit = false var newline_length int = 1 start_time := time.Now() @@ -143,8 +144,16 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim } } - // TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it? - buffer.Write(segment) + if options.maxLineBytes > 0 && buffer.Len() + len(segment) > options.maxLineBytes { + if !over_line_limit { + emit("harvest: max line length reached, ignoring rest of line.") + over_line_limit = true + } + newline_length = 0 + buffer.Write(segment[:options.maxLineBytes-buffer.Len()]) + } else { + buffer.Write(segment) + } } if err != nil { diff --git a/logstash-forwarder.go b/logstash-forwarder.go index cb521650..a7ca9621 100644 --- a/logstash-forwarder.go +++ b/logstash-forwarder.go @@ -27,6 +27,7 @@ var options = &struct { useSyslog bool tailOnRotate bool quiet bool + maxLineBytes int version bool }{ spoolSize: 1024, @@ -42,8 +43,9 @@ func emitOptions() { emit("\tharvester-buff-size: %d\n", options.harvesterBufferSize) emit("\t--- flags ---------\n") emit("\ttail (on-rotation): %t\n", options.tailOnRotate) - emit("\tlog-to-syslog: %t\n", options.useSyslog) - emit("\tquiet: %t\n", options.quiet) + emit("\tlog-to-syslog: %t\n", options.useSyslog) + emit("\tquiet: %t\n", options.quiet) + emit("\tmax-line-bytes: %d\n", options.maxLineBytes) if runProfiler() { emit("\t--- profile run ---\n") emit("\tcpu-profile-file: %s\n", options.cpuProfileFile) @@ -79,6 +81,7 @@ func init() { flag.BoolVar(&options.quiet, "quiet", options.quiet, "operate in quiet mode - only emit errors to log") flag.BoolVar(&options.version, "version", options.version, "output the version of this program") + flag.IntVar(&options.maxLineBytes, "max-line-bytes", options.maxLineBytes, "max number of bytes forwarded per line - note: set to 0 for unlimited.") } func init() { diff --git a/spec/lumberjack_spec.rb b/spec/lumberjack_spec.rb index 5d8a16ac..16aa8256 100644 --- a/spec/lumberjack_spec.rb +++ b/spec/lumberjack_spec.rb @@ -23,6 +23,7 @@ Lumberjack::Server.new(:ssl_certificate => ssl_certificate, :ssl_key => ssl_key, :port => port) end + let(:max_line_bytes) { 32 } let(:logstash_forwarder_config) do <<-CONFIG { @@ -68,7 +69,9 @@ # TODO(sissel): Refactor this once we figure out a good way to do # multi-component integration tests and property tests. fd = File.new(input_file, "wb") - lines = [ "Hello world", "Fancy Pants", "Some Unicode Emoji: 👍 💗 " ] + lines = [ "Hello world", "Fancy Pants", "Some Unicode Emoji: 👍 💗 ", + "A really long line that should get truncated", + "X" * (2 ** 12) ] lines.each { |l| fd.write(l + "\n") } fd.flush fd.close @@ -86,7 +89,7 @@ lines.zip(events).each do |line, event| # TODO(sissel): Resolve the need for this hack. event["line"].force_encoding("UTF-8") - expect(event["line"]).to(eq(line)) + expect(event["line"]).to(eq(line[0...max_line_bytes])) expect(event[random_field]).to(eq(random_value)) end end @@ -97,7 +100,9 @@ context "when compiled from source" do let(:lsf) do # Start the process, return the pid - IO.popen(["./logstash-forwarder", "-config", config_file, "-quiet"]) + IO.popen(["./logstash-forwarder", "-config", config_file, "-quiet", + "-max-line-bytes", "#{max_line_bytes}", + "-harvest-buffer-size", "#{max_line_bytes-1}"]) end let(:host) { "localhost" } it_behaves_like "logstash-forwarder"