diff --git a/pkg/plugin/response.go b/pkg/plugin/response.go index 6c6e214..1c4b1ac 100644 --- a/pkg/plugin/response.go +++ b/pkg/plugin/response.go @@ -42,23 +42,25 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse { lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0) lineField.Name = gLineField - sc := bufio.NewReader(reader) + // aligns with -insert.maxLineSizeBytes + br := bufio.NewReaderSize(reader, 256*1024) + var parser fastjson.Parser + var finishedReading bool - finishedReading := false - n := -1 - for !finishedReading { + for n := 0; !finishedReading; n++ { n++ - b, err := sc.ReadBytes('\n') + b, err := br.ReadBytes('\n') if err != nil { - if !errors.Is(err, io.EOF) && !errors.Is(err, bufio.ErrBufferFull) { - return newResponseError(fmt.Errorf("cannot read line in response: %s", err), backend.StatusInternal) - } if errors.Is(err, bufio.ErrBufferFull) { - // Skip the line if it's too long. backend.Logger.Info("skipping line number #%d: line too long", n) continue } - finishedReading = true + if errors.Is(err, io.EOF) { + // b can be != nil when EOF is returned, so we need to process it + finishedReading = true + } else { + return newResponseError(fmt.Errorf("cannot read line in response: %s", err), backend.StatusInternal) + } } if len(b) == 0 { @@ -66,7 +68,7 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse { } b = bytes.Trim(b, "\n") - value, err := fastjson.ParseBytes(b) + value, err := parser.ParseBytes(b) if err != nil { return newResponseError(fmt.Errorf("error decode response: %s", err), backend.StatusInternal) }