Skip to content

Commit

Permalink
Merge branch 'traas-stack:main' into kelei_develop
Browse files Browse the repository at this point in the history
  • Loading branch information
wangsiyuan-code authored Nov 22, 2023
2 parents dd59fdf + 6f99674 commit 74b0d69
Show file tree
Hide file tree
Showing 37 changed files with 2,218 additions and 1,765 deletions.
3 changes: 2 additions & 1 deletion pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func (b *AgentBootstrap) setupDaemonAgent() error {
if err := b.setupCRI(); err != nil {
return err
}
b.AddStopComponents(ioc.Crii)

// system metrics
{
Expand Down Expand Up @@ -355,7 +356,7 @@ func (b *AgentBootstrap) setupDaemonAgent() error {
bsm := bistream.NewManager(ioc.RegistryService, bizbistream.GetBiStreamHandlerRegistry())

b.TM = manager.NewTransferManager(b.PM, b.LSM)
b.TM.AddStopComponents(b.httpServerComponent, ctm, bsm, b.AM)
b.TM.AddStopComponents(b.httpServerComponent, ctm, bsm, b.AM, ioc.Crii)
if err := b.TM.Transfer(); err != nil {
logger.Errorz("[transfer] error", zap.Error(err))
}
Expand Down
27 changes: 18 additions & 9 deletions pkg/collectconfig/executor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type (
AggWhereError int32
// error count when select
SelectError int32
ZeroBytes int
}

ParsedConf struct {
Expand Down Expand Up @@ -247,14 +248,6 @@ func (c *Consumer) getTargetHostname() string {
}

func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData) {
if len(datum) == 0 {
c.updatePeriodStatus(expectedTs, func(status *PeriodStatus) {
status.EmitSuccess = true
c.reportUpEvent(expectedTs, status)
})
return
}

if !c.addCommonTags(datum) {
logger.Errorz("[consumer] [log] fail to add common tags to metrics", zap.String("key", c.key))
return
Expand All @@ -271,7 +264,16 @@ func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailDa
}
}

err := c.output.WriteBatchV4(c.ct.Config.Key, c.ct.Target.Key, c.metricName, datum)
ps := c.getOrCreatePeriodStatusWithoutLock(expectedTs)
ok := ps.Watermark >= expectedTs+c.Window.Interval.Milliseconds() &&
c.firstIOSuccessTime < expectedTs

pc := &output.PeriodCompleteness{
TS: expectedTs,
OK: ok,
Target: c.ct.Target.Meta,
}
err := c.output.WriteBatchV4(c.ct.Config.Key, c.ct.Target.Key, c.metricName, datum, pc)
c.runInLock(func() {
c.updatePeriodStatus(expectedTs, func(status *PeriodStatus) {
if err != nil {
Expand Down Expand Up @@ -306,6 +308,7 @@ func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err e
if err != nil {
c.stat.IoError++
}
c.stat.ZeroBytes += resp.ZeroBytes

fileNotExists := os.IsNotExist(err)
if fileNotExists {
Expand Down Expand Up @@ -771,6 +774,7 @@ func (c *Consumer) createStatEvent(stat ConsumerStat) *pb2.ReportEventRequest_Ev
"f_where": int64(stat.FilterWhere),
"f_delay": int64(stat.FilterDelay),
"f_multiline": int64(stat.FilterMultiline),
"f_zerobytes": int64(stat.ZeroBytes),

"out_emit": int64(stat.Emit),
"out_error": int64(stat.EmitError),
Expand Down Expand Up @@ -814,6 +818,7 @@ func (c *Consumer) printStat() {
zap.Bool("miss", stat.Miss),
zap.Bool("broken", stat.Broken),
zap.Int32("processed", stat.Processed),
zap.Int("zerobytes", stat.ZeroBytes),
zap.Int32("fwhere", stat.FilterWhere),
zap.Int32("fbwhere", stat.FilterBeforeParseWhere),
zap.Int32("flogparse", stat.FilterLogParseError),
Expand Down Expand Up @@ -1184,6 +1189,10 @@ func (c *Consumer) LoadState(state *consumerStateObj) error {
return nil
}

func (c *Consumer) maybeFlush() {
c.sub.MaybeFlush()
}

func removeZeroNumbers(event *pb2.ReportEventRequest_Event) {
for key, value := range event.Numbers {
if key != "ok" && value == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/collectconfig/executor/consumer_log_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ type (
// Returns true if data is not empty
Emit(expectedTs int64) bool
init()
MaybeFlush()
}
)
3 changes: 3 additions & 0 deletions pkg/collectconfig/executor/consumer_log_sub_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
}
)

func (c *logAnalysisSubConsumer) MaybeFlush() {
}

func init() {
gob.Register(&logAnalysisSubConsumerState{})
}
Expand Down
204 changes: 204 additions & 0 deletions pkg/collectconfig/executor/consumer_log_sub_detail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package executor

import (
"context"
"github.com/traas-stack/holoinsight-agent/pkg/collectconfig/executor/agg"
"github.com/traas-stack/holoinsight-agent/pkg/collectconfig/executor/dryrun/event"
"github.com/traas-stack/holoinsight-agent/pkg/logger"
"github.com/traas-stack/holoinsight-agent/pkg/model"
"github.com/traas-stack/holoinsight-agent/pkg/plugin/output/gateway"
"github.com/traas-stack/holoinsight-agent/pkg/server/gateway/pb"
"github.com/traas-stack/holoinsight-agent/pkg/util"
"go.uber.org/zap"
"time"
)

type (
detailConsumer struct {
parent *Consumer
table *model.Table
}
)

func (c *detailConsumer) MaybeFlush() {
if c.table == nil {
return
}

tr := &pb.WriteMetricsRequestV4_TaskResult{
Key: c.parent.key,
RefCollectKey: c.parent.ct.Config.Key,
RefTargetKey: c.parent.ct.Target.Key,
Table: &pb.WriteMetricsRequestV4_Table{
Timestamp: 0,
Header: &pb.WriteMetricsRequestV4_Header{
MetricName: c.parent.metricName,
TagKeys: c.table.Header.TagKeys,
ValueKeys: c.table.Header.FieldKeys,
},
},
Extension: map[string]string{
"details": "1",
},
Timestamp: 0,
Completeness: nil,
}
for _, row := range c.table.Rows {
pbValueValues := make([]*pb.DataNode, len(row.FieldValues))
for i, fieldValue := range row.FieldValues {
pbValueValues[i] = &pb.DataNode{
Type: 0,
Count: 0,
Value: fieldValue,
Bytes: nil,
}
}
tr.Table.Rows = append(tr.Table.Rows, &pb.WriteMetricsRequestV4_Row{
Timestamp: row.Timestamp,
TagValues: row.TagValues,
ValueValues: nil,
})
}

begin := time.Now()
err := gateway.GetWriteService().WriteV4(context.Background(), &gateway.WriteV4Request{Batch: []*pb.WriteMetricsRequestV4_TaskResult{tr}})
sendCost := time.Since(begin)
logger.Infoz("detail", zap.Int("count", len(c.table.Rows)), zap.Duration("sendCost", sendCost), zap.Error(err))
c.table = nil
}

func (c *detailConsumer) setParent(parent *Consumer) {
c.parent = parent
}

func (c *detailConsumer) Update(f func()) {
f()
}

func (c *detailConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext, maxTs *int64) {

var processGroupEvent *event.Event
if c.parent.debugEvent != nil {
processGroupEvent = c.parent.debugEvent.AddChild("process group").Set("lines", util.CopyStringSlice(ctx.log.Lines))
ctx.event = processGroupEvent
}

// execute before parse filter
if processGroupEvent != nil {
ctx.whereEvent = &event.WhereEvent{}
ctx.event.Set("beforeParseWhere", ctx.whereEvent)
}
if !c.parent.executeBeforeParseWhere(ctx) {
// 在内部处理了
return
}
ctx.whereEvent = nil

// execute log parse
if !c.parent.executeLogParse(ctx) {
if processGroupEvent != nil {
processGroupEvent.Info("logParse false, break")
}
return
}

if !c.parent.executeVarsProcess(ctx) {
if processGroupEvent != nil {
processGroupEvent.Info("parseVars error, break")
}
return
}

// execute time parse
ts, b := c.parent.executeTimeParse(ctx)
if !b {
if processGroupEvent != nil {
processGroupEvent.Info("parseTime false, break")
}
return
}

intervalMs := c.parent.Window.Interval.Milliseconds()
alignTs := ts / intervalMs * intervalMs
if *maxTs < ts {
*maxTs = ts
}

periodStatus := c.parent.getOrCreatePeriodStatusWithoutLock(alignTs)
periodStatus.Stat.Broken = periodStatus.Stat.Broken || c.parent.stat.Broken
periodStatus.Stat.Groups++
ctx.periodStatus = periodStatus

if processGroupEvent != nil {
processGroupEvent.Set("timestamp", ts)
}

// execute where
if processGroupEvent != nil {
ctx.whereEvent = &event.WhereEvent{}
ctx.event.Set("where", ctx.whereEvent)
}
if !c.parent.executeWhere(ctx) {
if processGroupEvent != nil {
processGroupEvent.Info("where false, break")
}
return
}
ctx.whereEvent = nil

c.parent.stat.Processed++
periodStatus.Stat.Processed++

groupValues, ok := c.parent.executeGroupBy(ctx)
if !ok {
return
}
xs := c.parent.Select.(*xSelect)

if c.table == nil {
c.table = &model.Table{
Name: c.parent.metricName,
Header: &model.Header{
TagKeys: c.parent.GroupBy.GroupNames(),
FieldKeys: xs.valueNames,
},
Rows: nil,
}
}

row := &model.Row{
Timestamp: ts,
TagValues: make([]string, len(c.table.Header.TagKeys)),
FieldValues: make([]float64, len(c.table.Header.FieldKeys)),
}

for i := 0; i < len(groupValues); i++ {
row.TagValues[i] = groupValues[i]
}

for i, item := range xs.values {
if item.agg == agg.AggCount {
row.FieldValues[i] = 1
continue
}
if item.elect == nil {
continue
}
f64, _ := item.elect.ElectNumber(ctx)
row.FieldValues[i] = f64
}

c.table.Rows = append(c.table.Rows, row)

}

func (c *detailConsumer) Emit(expectedTs int64) bool {
return true
}

func (c *detailConsumer) init() {
}
3 changes: 3 additions & 0 deletions pkg/collectconfig/executor/consumer_log_sub_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type (
}
)

func (c *logStatSubConsumer) MaybeFlush() {
}

func (c *logStatSubConsumer) init() {
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/collectconfig/executor/consumer_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func parseConsumer(st *api.SubTask) (*Consumer, error) {
if err != nil {
return nil, err
}
} else if task.GroupBy.Details != nil && task.GroupBy.Details.Enabled {
sub = &detailConsumer{}
} else {
sub = &logStatSubConsumer{}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/collectconfig/executor/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type (
Count int
// Use a string to describe the scope of this read.
Range string
// Count of \u0000 of this read
ZeroBytes int

decodeMutex sync.Mutex
decodedCache map[string][]string
Expand Down
15 changes: 13 additions & 2 deletions pkg/collectconfig/executor/logstream/logstream_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package logstream

import (
"bytes"
"encoding/gob"
"errors"
"fmt"
Expand All @@ -13,12 +14,15 @@ import (
"go.uber.org/zap"
"io"
"os"
"strings"
"sync/atomic"
"time"
)

const (
expireTimeout = 3 * time.Minute
expireTimeout = 3 * time.Minute
DiscardLineWithZeroBytes = true
DiscardZeroBytesThreshold = 4096
)

type (
Expand Down Expand Up @@ -231,6 +235,7 @@ func (f *fileSubLogStream) Read(resp *ReadResponse) error {
n, err := f.file.ReadAt(buf, f.offset)
resp.IOEndTime = time.Now()
buf = buf[:n]
resp.ZeroBytes = bytes.Count(buf, []byte{0})
if err != nil && err != io.EOF {
f.closeFile()
logger.Errorz("[logstream] read error", zap.String("path", f.config.Path), zap.Error(err))
Expand All @@ -240,7 +245,13 @@ func (f *fileSubLogStream) Read(resp *ReadResponse) error {
resp.HasMore = f.offset < fileLength

var lines []string
if f.consumeBytes(buf[:n], func(line string) { lines = append(lines, line) }) {
if f.consumeBytes(buf[:n], func(line string) {
if DiscardLineWithZeroBytes && strings.Count(line, "\u0000") >= DiscardZeroBytesThreshold {
resp.HasBroken = true
} else {
lines = append(lines, line)
}
}) {
resp.HasBroken = true
}
resp.Lines = lines
Expand Down
1 change: 1 addition & 0 deletions pkg/collectconfig/executor/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (p *LogPipeline) pullAndConsume() {
// give up scheduling
runtime.Gosched()
}
p.consumer.maybeFlush()
}
}

Expand Down
Loading

0 comments on commit 74b0d69

Please sign in to comment.