Skip to content

Commit

Permalink
feat: supports upload log details to server
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo committed Oct 24, 2023
1 parent bcd122a commit 3567b04
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/collectconfig/executor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,10 @@ func (c *Consumer) LoadState(state *consumerStateObj) error {
return nil
}

func (c *Consumer) maybeFlush() {
c.sub.MaybeFlush()
}

func removeZeroNumbers(event *pb2.ReportEventRequest_Event) {
for key, value := range event.Numbers {
if key != "ok" && value == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/collectconfig/executor/consumer_log_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ type (
// Returns true if data is not empty
Emit(expectedTs int64) bool
init()
MaybeFlush()
}
)
3 changes: 3 additions & 0 deletions pkg/collectconfig/executor/consumer_log_sub_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
}
)

func (c *logAnalysisSubConsumer) MaybeFlush() {
}

func init() {
gob.Register(&logAnalysisSubConsumerState{})
}
Expand Down
200 changes: 200 additions & 0 deletions pkg/collectconfig/executor/consumer_log_sub_detail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package executor

import (
"context"
"github.com/traas-stack/holoinsight-agent/pkg/collectconfig/executor/agg"
"github.com/traas-stack/holoinsight-agent/pkg/collectconfig/executor/dryrun/event"
"github.com/traas-stack/holoinsight-agent/pkg/logger"
"github.com/traas-stack/holoinsight-agent/pkg/model"
"github.com/traas-stack/holoinsight-agent/pkg/plugin/output/gateway"
"github.com/traas-stack/holoinsight-agent/pkg/server/gateway/pb"
"github.com/traas-stack/holoinsight-agent/pkg/util"
"go.uber.org/zap"
"time"
)

type (
detailConsumer struct {
parent *Consumer
table *model.Table
}
)

func (c *detailConsumer) MaybeFlush() {
if c.table == nil {
return
}

tr := &pb.WriteMetricsRequestV4_TaskResult{
Key: c.parent.key,
RefCollectKey: c.parent.ct.Config.Key,
RefTargetKey: c.parent.ct.Target.Key,
Table: &pb.WriteMetricsRequestV4_Table{
Timestamp: 0,
Header: &pb.WriteMetricsRequestV4_Header{
MetricName: c.parent.metricName,
TagKeys: c.table.Header.TagKeys,
ValueKeys: c.table.Header.FieldKeys,
},
},
Extension: map[string]string{
"details": "1",
},
Timestamp: 0,
Completeness: nil,
}
for _, row := range c.table.Rows {
pbValueValues := make([]*pb.DataNode, len(row.FieldValues))
for i, fieldValue := range row.FieldValues {
pbValueValues[i] = &pb.DataNode{
Type: 0,
Count: 0,
Value: fieldValue,
Bytes: nil,
}
}
tr.Table.Rows = append(tr.Table.Rows, &pb.WriteMetricsRequestV4_Row{
Timestamp: row.Timestamp,
TagValues: row.TagValues,
ValueValues: nil,
})
}

begin := time.Now()
err := gateway.GetWriteService().WriteV4(context.Background(), &gateway.WriteV4Request{Batch: []*pb.WriteMetricsRequestV4_TaskResult{tr}})
sendCost := time.Since(begin)
logger.Infoz("detail", zap.Int("count", len(c.table.Rows)), zap.Duration("sendCost", sendCost), zap.Error(err))
c.table = nil
}

func (c *detailConsumer) setParent(parent *Consumer) {
c.parent = parent
}

func (c *detailConsumer) Update(f func()) {
f()
}

func (c *detailConsumer) ProcessGroup(iw *inputWrapper, ctx *LogContext, maxTs *int64) {

var processGroupEvent *event.Event
if c.parent.debugEvent != nil {
processGroupEvent = c.parent.debugEvent.AddChild("process group").Set("lines", util.CopyStringSlice(ctx.log.Lines))
ctx.event = processGroupEvent
}

// execute before parse filter
if processGroupEvent != nil {
ctx.whereEvent = &event.WhereEvent{}
ctx.event.Set("beforeParseWhere", ctx.whereEvent)
}
if !c.parent.executeBeforeParseWhere(ctx) {
// 在内部处理了
return
}
ctx.whereEvent = nil

// execute log parse
if !c.parent.executeLogParse(ctx) {
if processGroupEvent != nil {
processGroupEvent.Info("logParse false, break")
}
return
}

if !c.parent.executeVarsProcess(ctx) {
if processGroupEvent != nil {
processGroupEvent.Info("parseVars error, break")
}
return
}

// execute time parse
ts, b := c.parent.executeTimeParse(ctx)
if !b {
if processGroupEvent != nil {
processGroupEvent.Info("parseTime false, break")
}
return
}

intervalMs := c.parent.Window.Interval.Milliseconds()
alignTs := ts / intervalMs * intervalMs
if *maxTs < ts {
*maxTs = ts
}

periodStatus := c.parent.getOrCreatePeriodStatusWithoutLock(alignTs)
periodStatus.Stat.Broken = periodStatus.Stat.Broken || c.parent.stat.Broken
periodStatus.Stat.Groups++
ctx.periodStatus = periodStatus

if processGroupEvent != nil {
processGroupEvent.Set("timestamp", ts)
}

// execute where
if processGroupEvent != nil {
ctx.whereEvent = &event.WhereEvent{}
ctx.event.Set("where", ctx.whereEvent)
}
if !c.parent.executeWhere(ctx) {
if processGroupEvent != nil {
processGroupEvent.Info("where false, break")
}
return
}
ctx.whereEvent = nil

c.parent.stat.Processed++
periodStatus.Stat.Processed++

groupValues, ok := c.parent.executeGroupBy(ctx)
if !ok {
return
}
xs := c.parent.Select.(*xSelect)

if c.table == nil {
c.table = &model.Table{
Name: c.parent.metricName,
Header: &model.Header{
TagKeys: c.parent.GroupBy.GroupNames(),
FieldKeys: xs.valueNames,
},
Rows: nil,
}
}

row := &model.Row{
Timestamp: ts,
TagValues: make([]string, len(c.table.Header.TagKeys)),
FieldValues: make([]float64, len(c.table.Header.FieldKeys)),
}

for i := 0; i < len(groupValues); i++ {
row.TagValues[i] = groupValues[i]
}

for i, item := range xs.values {
if item.agg == agg.AggCount {
row.FieldValues[i] = 1
continue
}
if item.elect == nil {
continue
}
f64, _ := item.elect.ElectNumber(ctx)
row.FieldValues[i] = f64
}

c.table.Rows = append(c.table.Rows, row)

}

func (c *detailConsumer) Emit(expectedTs int64) bool {
return true
}

func (c *detailConsumer) init() {
}
3 changes: 3 additions & 0 deletions pkg/collectconfig/executor/consumer_log_sub_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type (
}
)

func (c *logStatSubConsumer) MaybeFlush() {
}

func (c *logStatSubConsumer) init() {
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/collectconfig/executor/consumer_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func parseConsumer(st *api.SubTask) (*Consumer, error) {
if err != nil {
return nil, err
}
} else if task.GroupBy.Details != nil && task.GroupBy.Details.Enabled {
sub = &detailConsumer{}
} else {
sub = &logStatSubConsumer{}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/collectconfig/executor/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (p *LogPipeline) pullAndConsume() {
// give up scheduling
runtime.Gosched()
}
p.consumer.maybeFlush()
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/collectconfig/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type (
Groups []*Group `json:"groups"`
MaxKeySize int `json:"maxKeySize"`
LogAnalysis *LogAnalysisConf `json:"logAnalysis"`
Details *Details `json:"details"`
}
Window struct {
// 5s 5000
Expand Down Expand Up @@ -314,6 +315,10 @@ type (
Name string `json:"name"`
MetricType string `json:"metricType"`
}
Details struct {
// If Enabled is true, the elect results will be reported as details
Enabled bool `json:"enabled"`
}
)

var (
Expand Down
14 changes: 14 additions & 0 deletions pkg/model/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ type (
Ip string
Port int
}
Table struct {
Name string `json:"name"`
Header *Header `json:"header"`
Rows []*Row `json:"rows"`
}
Header struct {
TagKeys []string `json:"tagKeys"`
FieldKeys []string `json:"fieldKeys"`
}
Row struct {
Timestamp int64 `json:"timestamp"`
TagValues []string `json:"tagValues"`
FieldValues []float64 `json:"fieldValues"`
}
)

func (a Addr) String() string {
Expand Down

0 comments on commit 3567b04

Please sign in to comment.