Skip to content

Commit

Permalink
refactor: send completeness info to server (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Oct 16, 2023
1 parent b62baeb commit a73190a
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 156 deletions.
19 changes: 10 additions & 9 deletions pkg/collectconfig/executor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,6 @@ func (c *Consumer) getTargetHostname() string {
}

func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData) {
if len(datum) == 0 {
c.updatePeriodStatus(expectedTs, func(status *PeriodStatus) {
status.EmitSuccess = true
c.reportUpEvent(expectedTs, status)
})
return
}

if !c.addCommonTags(datum) {
logger.Errorz("[consumer] [log] fail to add common tags to metrics", zap.String("key", c.key))
return
Expand All @@ -271,7 +263,16 @@ func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailDa
}
}

err := c.output.WriteBatchV4(c.ct.Config.Key, c.ct.Target.Key, c.metricName, datum)
ps := c.getOrCreatePeriodStatusWithoutLock(expectedTs)
ok := ps.Watermark >= expectedTs+c.Window.Interval.Milliseconds() &&
c.firstIOSuccessTime < expectedTs

pc := &output.PeriodCompleteness{
TS: expectedTs,
OK: ok,
Target: c.ct.Target.Meta,
}
err := c.output.WriteBatchV4(c.ct.Config.Key, c.ct.Target.Key, c.metricName, datum, pc)
c.runInLock(func() {
c.updatePeriodStatus(expectedTs, func(status *PeriodStatus) {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/output/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *ConsoleOutput) WriteBatchAsync(configKey, targetKey, metricName string,
return nil
}

func (c *ConsoleOutput) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData) error {
func (c *ConsoleOutput) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData, _ *output.PeriodCompleteness) error {
return nil
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/plugin/output/gateway/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/traas-stack/holoinsight-agent/pkg/server/gateway/pb"
"github.com/traas-stack/holoinsight-agent/pkg/util/stat"
"go.uber.org/zap"
"strings"
"time"
)

Expand Down Expand Up @@ -101,7 +102,19 @@ func (b *batchConsumerV4) Consume(a []interface{}) {
}
}
begin := time.Now()
resp, err := b.gw.WriteMetrics(ctx, taskResults)

var err error
var resp *pb.WriteMetricsResponse

for i := 0; i < 3; i++ {
resp, err = b.gw.WriteMetrics(ctx, taskResults)
if err != nil && strings.Contains(err.Error(), "connection refused") {
time.Sleep(300 * time.Millisecond)
continue
}
break
}

cost := time.Now().Sub(begin)
if err == nil && resp.Header.Code != 0 {
err = fmt.Errorf("server error %+v", resp.Header)
Expand Down
27 changes: 22 additions & 5 deletions pkg/plugin/output/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ func (c *gatewayOutput) Stop() {
}

func (c *gatewayOutput) WriteBatchAsync(configKey, targetKey, metricName string, array []*model.DetailData) error {
batch := convertToTaskResult2(configKey, targetKey, metricName, array)
batch := convertToTaskResult2(configKey, targetKey, metricName, array, nil)
go GetWriteService().WriteV4(context.Background(), &WriteV4Request{Batch: batch})
return nil
}

func (c *gatewayOutput) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData) error {
batch := convertToTaskResult2(configKey, targetKey, metricName, array)
func (c *gatewayOutput) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData, completeness *output.PeriodCompleteness) error {
batch := convertToTaskResult2(configKey, targetKey, metricName, array, completeness)
return GetWriteService().WriteV4(context.Background(), &WriteV4Request{Batch: batch})
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func getOrCreate(configKey, targetKey, metricName string, taskResultByValueName
return r
}

func convertToTaskResult2(configKey, targetKey, metricName string, array []*model.DetailData) []*pb.WriteMetricsRequestV4_TaskResult {
func convertToTaskResult2(configKey, targetKey, metricName string, array []*model.DetailData, completeness *output.PeriodCompleteness) []*pb.WriteMetricsRequestV4_TaskResult {
// datum in array must have same tag keys

taskResultByValueName := make(map[string]*pb.WriteMetricsRequestV4_TaskResult)
Expand All @@ -148,10 +148,27 @@ func convertToTaskResult2(configKey, targetKey, metricName string, array []*mode
}
}

a := make([]*pb.WriteMetricsRequestV4_TaskResult, 0, len(taskResultByValueName))
a := make([]*pb.WriteMetricsRequestV4_TaskResult, 0, len(taskResultByValueName)+1)
for _, r := range taskResultByValueName {
a = append(a, r)
}

if completeness != nil {
r := &pb.WriteMetricsRequestV4_TaskResult{
Key: configKey + "/" + targetKey,
RefCollectKey: configKey,
RefTargetKey: targetKey,
Table: &pb.WriteMetricsRequestV4_Table{
Timestamp: completeness.TS,
},
Timestamp: completeness.TS,
Completeness: &pb.WriteMetricsRequestV4_Completeness{
Ok: completeness.OK,
},
}
a = append(a, r)
}

return a
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/plugin/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ type (
Output interface {
WriteMetricsV1([]*model.Metric, Extension)

WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData) error
WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData, c *PeriodCompleteness) error
}
composite struct {
array []Output
}
PeriodCompleteness struct {
Valid bool
TS int64
OK bool
Target map[string]string
}
)

func (c *composite) WriteMetricsV1(metrics []*model.Metric, extension Extension) {
Expand All @@ -26,9 +32,9 @@ func (c *composite) WriteMetricsV1(metrics []*model.Metric, extension Extension)
}
}

func (c *composite) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData) error {
func (c *composite) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData, pc *PeriodCompleteness) error {
for _, output := range c.array {
output.WriteBatchV4(configKey, targetKey, metricName, array)
output.WriteBatchV4(configKey, targetKey, metricName, array, pc)
}
return nil
}
Expand Down
Loading

0 comments on commit a73190a

Please sign in to comment.