Skip to content

Commit

Permalink
Merge branch 'main' into feature-support-custom-service
Browse files Browse the repository at this point in the history
  • Loading branch information
sharang authored Feb 15, 2025
2 parents 2edb13b + d9c2893 commit 9f59b73
Show file tree
Hide file tree
Showing 27 changed files with 743 additions and 7 deletions.
33 changes: 33 additions & 0 deletions server/common/module_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/deepflowio/deepflow/server/libs/eventapi"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/queue"
"github.com/deepflowio/deepflow/server/libs/tracetree"
logging "github.com/op/go-logging"
Expand Down Expand Up @@ -84,6 +85,7 @@ func ExportersEnabled(configPath string) bool {

type OrgHanderInterface interface {
DropOrg(orgId uint16) error
UpdateNativeTag(nativetag.NativeTagOP, uint16, *nativetag.NativeTag) error
}

var ingesterOrgHanders []OrgHanderInterface
Expand Down Expand Up @@ -124,3 +126,34 @@ func DropOrg(orgId uint16) error {
}
return nil
}

// When starting, you need to call the interface
func PushNativeTags(orgId uint16, nativeTags []nativetag.NativeTag) {
if len(nativeTags) == 0 {
return
}
for i := range nativeTags {
log.Infof("orgId %d update native tag: %+v", orgId, nativeTags[i])
nativetag.UpdateNativeTag(nativetag.NATIVE_TAG_ADD, orgId, &nativeTags[i])
}
return
}

// When adding or removing native_tag, you need to call the interface
func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativetag.NativeTag) error {
log.Infof("orgId %d %s native tag: %+v", orgId, op, nativeTag)
if ingesterOrgHanders == nil {
err := fmt.Errorf("ingester is not ready, update native tag failed")
log.Error(err)
return err
}
for _, ingesterOrgHander := range ingesterOrgHanders {
err := ingesterOrgHander.UpdateNativeTag(op, orgId, nativeTag)
if err != nil {
log.Error(err)
return err
}
}
nativetag.UpdateNativeTag(op, orgId, nativeTag)
return nil
}
3 changes: 3 additions & 0 deletions server/controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/http/router"
"github.com/deepflowio/deepflow/server/controller/manager"
"github.com/deepflowio/deepflow/server/controller/monitor"
"github.com/deepflowio/deepflow/server/controller/native_field"
"github.com/deepflowio/deepflow/server/controller/prometheus"
"github.com/deepflowio/deepflow/server/controller/recorder"
"github.com/deepflowio/deepflow/server/controller/report"
Expand Down Expand Up @@ -163,6 +164,8 @@ func Start(ctx context.Context, configPath, serverLogFile string, shared *server
controllerCheck := monitor.NewControllerCheck(cfg, ctx)
analyzerCheck := monitor.NewAnalyzerCheck(cfg, ctx)
go checkAndStartMasterFunctions(cfg, ctx, controllerCheck, analyzerCheck)
// native field
native_field.Refresh()

router.SetInitStageForHealthChecker("Register routers init")
httpServer.SetControllerChecker(controllerCheck)
Expand Down
3 changes: 3 additions & 0 deletions server/controller/native_field/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/deepflowio/deepflow/server/controller/native_field

go 1.18
20 changes: 20 additions & 0 deletions server/controller/native_field/native_field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package native_field

func Refresh() {
}
2 changes: 2 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ replace (
github.com/deepflowio/deepflow/server/controller/http/service/agentlicense => ./controller/http/service/agentlicense
github.com/deepflowio/deepflow/server/controller/http/service/configuration => ./controller/http/service/configuration
github.com/deepflowio/deepflow/server/controller/monitor/license => ./controller/monitor/license
github.com/deepflowio/deepflow/server/controller/native_field => ./controller/native_field
github.com/deepflowio/deepflow/server/ingester/config/configdefaults => ./ingester/config/configdefaults
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import => ./ingester/flow_log/log_data/dd_import
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import
Expand Down Expand Up @@ -110,6 +111,7 @@ require (
github.com/bytedance/sonic v1.12.5
github.com/deepflowio/deepflow/server/controller/http/appender v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/controller/http/service/agentlicense v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/controller/native_field v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/libs/logger/blocker v0.0.0-20240822020041-cdaf0f82ce6f
Expand Down
5 changes: 5 additions & 0 deletions server/ingester/app_log/dbwriter/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/exporters/config"
"github.com/deepflowio/deepflow/server/ingester/flow_tag"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/pool"
)

Expand Down Expand Up @@ -114,6 +115,10 @@ type ApplicationLogStore struct {
MetricsValues []float64 `json:"metrics_values" category:"$metrics" data_type:"[]float64"`
}

func (l *ApplicationLogStore) NativeTagVersion() uint32 {
return nativetag.GetTableNativeTagsVersion(l.OrgId, nativetag.APPLICATION_LOG)
}

func (l *ApplicationLogStore) OrgID() uint16 {
return l.OrgId
}
Expand Down
18 changes: 16 additions & 2 deletions server/ingester/app_log/dbwriter/log_column_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/ClickHouse/ch-go/proto"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

type LogBlock struct {
Expand Down Expand Up @@ -61,6 +62,7 @@ type LogBlock struct {
ColAttributeValues *proto.ColArr[string]
ColMetricsNames *proto.ColArr[string]
ColMetricsValues *proto.ColArr[float64]
*nativetag.NativeTagsBlock
}

func (b *LogBlock) Reset() {
Expand Down Expand Up @@ -102,10 +104,13 @@ func (b *LogBlock) Reset() {
b.ColAttributeValues.Reset()
b.ColMetricsNames.Reset()
b.ColMetricsValues.Reset()
if b.NativeTagsBlock != nil {
b.NativeTagsBlock.Reset()
}
}

func (b *LogBlock) ToInput(input proto.Input) proto.Input {
return append(input,
input = append(input,
proto.InputColumn{Name: ckdb.COLUMN_TIME, Data: &b.ColTime},
proto.InputColumn{Name: ckdb.COLUMN_TIMESTAMP, Data: &b.ColTimestamp},
proto.InputColumn{Name: ckdb.COLUMN__ID, Data: &b.ColId},
Expand Down Expand Up @@ -145,16 +150,22 @@ func (b *LogBlock) ToInput(input proto.Input) proto.Input {
proto.InputColumn{Name: ckdb.COLUMN_METRICS_NAMES, Data: b.ColMetricsNames},
proto.InputColumn{Name: ckdb.COLUMN_METRICS_VALUES, Data: b.ColMetricsValues},
)
if b.NativeTagsBlock != nil {
input = b.NativeTagsBlock.ToInput(input)
}
return input
}

func (n *ApplicationLogStore) NewColumnBlock() ckdb.CKColumnBlock {
return &LogBlock{
block := &LogBlock{
ColAppService: new(proto.ColStr).LowCardinality(),
ColAttributeNames: new(proto.ColStr).LowCardinality().Array(),
ColAttributeValues: new(proto.ColStr).Array(),
ColMetricsNames: new(proto.ColStr).LowCardinality().Array(),
ColMetricsValues: new(proto.ColFloat64).Array(),
NativeTagsBlock: nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.APPLICATION_LOG),
}
return block
}

func (n *ApplicationLogStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
Expand Down Expand Up @@ -197,4 +208,7 @@ func (n *ApplicationLogStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
block.ColAttributeValues.Append(n.AttributeValues)
block.ColMetricsNames.Append(n.MetricsNames)
block.ColMetricsValues.Append(n.MetricsValues)
if block.NativeTagsBlock != nil {
block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, n.MetricsNames, n.MetricsValues)
}
}
21 changes: 21 additions & 0 deletions server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/datasource"
"github.com/deepflowio/deepflow/server/libs/ckdb"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

var log = logging.MustGetLogger("issu")
Expand Down Expand Up @@ -1338,19 +1339,29 @@ func (i *Issu) Start() error {

var err error
orgIDPrefixs := make([][]string, len(i.Connections))
nativeTags := nativetag.GetAllNativeTags()
// update default organization databases first
for index, connect := range i.Connections {
err = i.startOrg(index, "", connect)
if err != nil {
log.Error(err)
return err
}
for _, nativeTag := range nativeTags[ckdb.DEFAULT_ORG_ID] {
if nativeTag == nil {
continue
}
if e := nativetag.CKAddNativeTag(i.cfg.CKDB.Type == ckdb.CKDBTypeByconity, connect, ckdb.DEFAULT_ORG_ID, nativeTag); e != nil {
log.Error(err)
}
}
orgIDPrefixs[index], err = i.getOrgIDPrefixsWithoutDefault(connect)
if err != nil {
return fmt.Errorf("get orgIDs failed, err: %s", err)
}
}

// update other organization databases
var wg sync.WaitGroup
for index, prefixes := range orgIDPrefixs {
orgCount := len(prefixes)
Expand Down Expand Up @@ -1389,7 +1400,17 @@ func (i *Issu) Start() error {
log.Error(err)
errCount++
}
orgId := parseOrgId(orgIDPrefix + "event")
for _, nativeTag := range nativeTags[orgId] {
if nativeTag == nil {
continue
}
if e := nativetag.CKAddNativeTag(i.cfg.CKDB.Type == ckdb.CKDBTypeByconity, connect, orgId, nativeTag); e != nil {
log.Error(err)
}
}
}

log.Infof("end ckissu %+v", orgPrefixs)
}(prefixes[minIndex:maxIndex])
}
Expand Down
4 changes: 4 additions & 0 deletions server/ingester/event/dbwriter/alert_event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (e *AlertEventStore) Release() {
ReleaseAlertEventStore(e)
}

func (e *AlertEventStore) NativeTagVersion() uint32 {
return 0
}

func (e *AlertEventStore) OrgID() uint16 {
return e.OrgId
}
Expand Down
8 changes: 8 additions & 0 deletions server/ingester/event/dbwriter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
utag "github.com/deepflowio/deepflow/server/ingester/exporters/universal_tag"
"github.com/deepflowio/deepflow/server/ingester/flow_tag"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/pool"
"github.com/deepflowio/deepflow/server/libs/utils"
)
Expand Down Expand Up @@ -106,6 +107,13 @@ type EventStore struct {
Duration uint64 `json:"duration" category:"$metrics" sub:"delay"`
}

func (e *EventStore) NativeTagVersion() uint32 {
if e.HasMetrics {
return nativetag.GetTableNativeTagsVersion(e.OrgId, nativetag.EVENT_PERF_EVENT)
}
return nativetag.GetTableNativeTagsVersion(e.OrgId, nativetag.EVENT_EVENT)
}

func (e *EventStore) OrgID() uint16 {
return e.OrgId
}
Expand Down
20 changes: 19 additions & 1 deletion server/ingester/event/dbwriter/event_column_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ClickHouse/ch-go/proto"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

type EventBlock struct {
Expand Down Expand Up @@ -63,6 +64,8 @@ type EventBlock struct {
HasMetrics bool
ColBytes proto.ColUInt32
ColDuration proto.ColUInt64

*nativetag.NativeTagsBlock
}

func (b *EventBlock) Reset() {
Expand Down Expand Up @@ -103,6 +106,9 @@ func (b *EventBlock) Reset() {
b.ColAttributeValues.Reset()
b.ColBytes.Reset()
b.ColDuration.Reset()
if b.NativeTagsBlock != nil {
b.NativeTagsBlock.Reset()
}
}

func (b *EventBlock) ToInput(input proto.Input) proto.Input {
Expand Down Expand Up @@ -149,16 +155,25 @@ func (b *EventBlock) ToInput(input proto.Input) proto.Input {
proto.InputColumn{Name: ckdb.COLUMN_DURATION, Data: &b.ColDuration},
)
}
if b.NativeTagsBlock != nil {
input = b.NativeTagsBlock.ToInput(input)
}
return input
}

func (n *EventStore) NewColumnBlock() ckdb.CKColumnBlock {
return &EventBlock{
b := &EventBlock{
HasMetrics: n.HasMetrics,
ColEventType: new(proto.ColStr).LowCardinality(),
ColAttributeNames: new(proto.ColStr).LowCardinality().Array(),
ColAttributeValues: new(proto.ColStr).Array(),
}
if n.HasMetrics {
b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EVENT_PERF_EVENT)
} else {
b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EVENT_EVENT)
}
return b
}

func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
Expand Down Expand Up @@ -200,4 +215,7 @@ func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
block.ColAttributeValues.Append(n.AttributeValues)
block.ColBytes.Append(n.Bytes)
block.ColDuration.Append(n.Duration)
if block.NativeTagsBlock != nil {
block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, nil, nil)
}
}
16 changes: 16 additions & 0 deletions server/ingester/ext_metrics/dbwriter/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/datatype"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/pool"
)

Expand Down Expand Up @@ -89,6 +90,21 @@ func (m *ExtMetrics) VirtualTableName() string {
return m.VTableName
}

func (m *ExtMetrics) NativeTagVersion() uint32 {
switch m.MsgType {
case datatype.MESSAGE_TYPE_DFSTATS:
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_TENANT)
case datatype.MESSAGE_TYPE_SERVER_DFSTATS:
if ckdb.IsValidOrgID(m.RawOrgId) {
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_TENANT)
} else {
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_ADMIN)
}
default:
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.EXT_METRICS)
}
}

func (m *ExtMetrics) OrgID() uint16 {
return m.OrgId
}
Expand Down
Loading

0 comments on commit 9f59b73

Please sign in to comment.