Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detail log write to sls #106

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pkg/collectconfig/executor/consumer_log_sub_detail.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/traas-stack/holoinsight-agent/pkg/logger"
"github.com/traas-stack/holoinsight-agent/pkg/model"
"github.com/traas-stack/holoinsight-agent/pkg/plugin/output/gateway"
"github.com/traas-stack/holoinsight-agent/pkg/plugin/output/sls"
"github.com/traas-stack/holoinsight-agent/pkg/server/gateway/pb"
"github.com/traas-stack/holoinsight-agent/pkg/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -42,7 +43,9 @@ func (c *detailConsumer) MaybeFlush() {
},
},
Extension: map[string]string{
"details": "1",
// for ceresdb4
"details": "1",
"configKey": c.parent.ct.Config.Key,
},
Timestamp: 0,
Completeness: nil,
Expand All @@ -65,7 +68,17 @@ func (c *detailConsumer) MaybeFlush() {
}

begin := time.Now()
err := gateway.GetWriteService().WriteV4(context.Background(), &gateway.WriteV4Request{Batch: []*pb.WriteMetricsRequestV4_TaskResult{tr}})

var err error
switch c.parent.task.Output.Type {
case "gateway":
err = gateway.GetWriteService().WriteV4(context.Background(), &gateway.WriteV4Request{Batch: []*pb.WriteMetricsRequestV4_TaskResult{tr}})
case "sls":
err = sls.GetOutPut().WriteToSLS(c.parent.ct.Config.Key, c.parent.ct.Target.Key, c.table, c.parent.task.Output.Sls)
default:
logger.Warnf("detail output type %s is not support!", c.parent.task.Output.Type)
}

sendCost := time.Since(begin)
logger.Infoz("detail", zap.String("key", c.parent.key), zap.Int("count", len(c.table.Rows)), zap.Duration("sendCost", sendCost), zap.Error(err))
c.table = nil
Expand Down
70 changes: 70 additions & 0 deletions pkg/collectconfig/executor/log-to-sls.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"json": {
"select": {
},
"from": {
"type": "log",
"log": {
"path": [
{
"type": "sls"
}
]
}
},
"groupBy": {
"groups": [
{
"name": "value",
"elect": {
"type": "line"
}
}
],
"details": {
"enabled": true
}
},
"window": {
"interval": 60000
},
"output": {
"type": "sls",
"sls": {
"endpoint": "",
"project": "",
"logstore": "",
"ak": "",
"sk": ""
}
}
},
"collectRange": {
"type": "cloudmonitor",
"cloudmonitor": {
"table": "sls_shard",
"condition": [
{
"endpoint": [
""
],
"sk": [
""
],
"project": [
""
],
"ak": [
""
],
"logstore": [
""
]
}
]
}
},
"executeRule": {
"fixedRate": 0
}
}
12 changes: 10 additions & 2 deletions pkg/collectconfig/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,16 @@ type (
Interval interface{} `json:"interval"`
}
Output struct {
Type string `json:"type"`
Gateway *Gateway `json:"gateway"`
Type string `json:"type"`
Gateway *Gateway `json:"gateway"`
Sls *SlsConfig `json:"sls"`
}
SlsConfig struct {
Endpoint string `json:"endpoint"`
AK string `json:"ak"`
SK string `json:"sk"`
Project string `json:"project"`
Logstore string `json:"logstore"`
}
Gateway struct {
// 用户可以覆盖, 否则默认使用 tableName
Expand Down
8 changes: 5 additions & 3 deletions pkg/pipeline/standard/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import (

type (
Output struct {
Tenant string
O output.Output
Tenant string
O output.Output
ConfigKey string
}
)

func (o *Output) Write(metrics []*model.Metric) {
oe := output.Extension{
Tenant: o.Tenant,
Tenant: o.Tenant,
ConfigKey: o.ConfigKey,
}
o.O.WriteMetricsV1(metrics, oe)
}
5 changes: 3 additions & 2 deletions pkg/pipeline/standard/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func ParsePipeline(task *collecttask.CollectTask) (*Pipeline, error) {

tenant := task.Target.GetTenant()
to := &Output{
Tenant: tenant,
O: out,
Tenant: tenant,
ConfigKey: task.Config.Key,
O: out,
}

return NewPipeline(task, baseConf, i, to)
Expand Down
3 changes: 0 additions & 3 deletions pkg/plugin/input/thread/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import (
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"github.com/traas-stack/holoinsight-agent/pkg/cri/criutils"
"github.com/traas-stack/holoinsight-agent/pkg/ioc"
"github.com/traas-stack/holoinsight-agent/pkg/logger"
"github.com/traas-stack/holoinsight-agent/pkg/model"
"github.com/traas-stack/holoinsight-agent/pkg/plugin/api"
"go.uber.org/zap"
)

type (
Expand Down Expand Up @@ -51,7 +49,6 @@ func (i *input) Collect(a api.Accumulator) error {
return err
}
data := resp.Data.(map[string]interface{})
logger.Infoz("test thread count ", zap.Any("result", data))
for metric, value := range data {
a.AddMetric(&model.Metric{
Name: metric,
Expand Down
1 change: 1 addition & 0 deletions pkg/plugin/output/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ package all
import (
_ "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/console"
_ "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/gateway"
_ "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/sls"
)
9 changes: 5 additions & 4 deletions pkg/plugin/output/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ func (c *gatewayOutput) WriteMetricsV1(metrics []*model.Metric, oe output.Extens
NoMerge: false,
}

request.Extension = make(map[string]string)
if oe.Tenant != "" {

request.Extension = map[string]string{
"tenant": oe.Tenant,
}
request.Extension["tenant"] = oe.Tenant
request.NoMerge = true
}
if oe.ConfigKey != "" {
request.Extension["configKey"] = oe.ConfigKey
}

err := GetWriteService().WriteV1(context.Background(), request)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import "github.com/traas-stack/holoinsight-agent/pkg/model"

type (
Extension struct {
Tenant string
Tenant string
ConfigKey string
}
Output interface {
WriteMetricsV1([]*model.Metric, Extension)
Expand Down
13 changes: 13 additions & 0 deletions pkg/plugin/output/sls/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package sls

import "github.com/traas-stack/holoinsight-agent/pkg/plugin/output"

func init() {
output.Register("sls", func(config output.Config) (output.Output, error) {
return NewSLSOutput()
})
}
124 changes: 124 additions & 0 deletions pkg/plugin/output/sls/sls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package sls

import (
"fmt"
aliyunsls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/traas-stack/holoinsight-agent/pkg/collectconfig"
"github.com/traas-stack/holoinsight-agent/pkg/logger"
"github.com/traas-stack/holoinsight-agent/pkg/model"
"github.com/traas-stack/holoinsight-agent/pkg/plugin/output"
"github.com/traas-stack/holoinsight-agent/pkg/util"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"net/http"
"time"
)

var (
dnsCacheHelper *util.DnsCacheHelper
slsHttpclient *http.Client
slsOutput = &SLSOutput{clientCache: make(map[string]*aliyunsls.Client)}
)

func init() {
dnsCacheHelper = util.NewDnsCacheHelper()
dnsCacheHelper.Start()
slsHttpclient = dnsCacheHelper.NewHttpClient()
}

type (
SLSOutput struct {
clientCache map[string]*aliyunsls.Client
}
)

func GetOutPut() *SLSOutput {
return slsOutput
}

func (c *SLSOutput) WriteMetricsV1(metrics []*model.Metric, extension output.Extension) {
}

func NewSLSOutput() (output.Output, error) {
return slsOutput, nil
}

func (c *SLSOutput) WriteBatchAsync(configKey, targetKey, metricName string, array []*model.DetailData) error {
return nil
}

func (c *SLSOutput) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData, _ *output.PeriodCompleteness) error {
return nil
}

func (c *SLSOutput) Start() {
}

func (c *SLSOutput) Stop() {
}

func (c *SLSOutput) WriteToSLS(configKey, targetKey string, table *model.Table, slsConfig *collectconfig.SlsConfig) error {
if slsConfig == nil {
return nil
}
client := c.getSLSClient(slsConfig)

// Create a log group
logGroup := &aliyunsls.LogGroup{}

//Create log contents
for _, row := range table.Rows {
contents := make([]*aliyunsls.LogContent, 0)
for i, value := range row.TagValues {
content := &aliyunsls.LogContent{
Key: proto.String(table.Header.TagKeys[i]),
Value: proto.String(value),
}
contents = append(contents, content)
}
// Create a log entry
log := &aliyunsls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: contents,
}

// Add log to log group
logGroup.Logs = append(logGroup.Logs, log)
// PUT log to SLS
err := client.PutLogs(slsConfig.Project, slsConfig.Logstore, logGroup)
if err != nil {
logger.Errorz("detail log write to sls error", zap.String("configKey", configKey), zap.String("targetKey", targetKey), zap.Error(err))
return err
}
}

logger.Infoz("detail log write to sls success", zap.String("configKey", configKey), zap.String("targetKey", targetKey), zap.Int("count", len(table.Rows)))
return nil
}

func (c *SLSOutput) buildKey(endpoint, project, logstore string) string {
return fmt.Sprintf("%s/%s/%s", endpoint, project, logstore)
}

func (c *SLSOutput) getSLSClient(slsConfig *collectconfig.SlsConfig) *aliyunsls.Client {
key := c.buildKey(slsConfig.Endpoint, slsConfig.Project, slsConfig.Logstore)
var client *aliyunsls.Client
if v, ok := c.clientCache[key]; ok {
client = v
} else {
client = &aliyunsls.Client{
Endpoint: slsConfig.Endpoint,
AccessKeyID: slsConfig.AK,
AccessKeySecret: slsConfig.SK,
RequestTimeOut: 5 * time.Second,
RetryTimeOut: 5 * time.Second,
HTTPClient: slsHttpclient,
}
c.clientCache[key] = client
}
return client
}
Loading