Skip to content

Commit

Permalink
[ingester] deepflow stats store to different database by org
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Jun 4, 2024
1 parent 59b1abd commit 577d080
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 59 deletions.
4 changes: 4 additions & 0 deletions server/ingester/datasource/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const (
EVENT_ALARM_EVENT = "event.alarm_event"
PROFILE = "profile.in_process"
APPLOG = "application_log.log"
DEEPFLOW_TENANT = "deepflow_tenant"
DEEPFLOW_ADMIN = "deepflow_admin"
)

var DatasourceModifiedOnlyIDMap = map[DatasourceModifiedOnly]DatasourceInfo{
Expand All @@ -70,6 +72,8 @@ var DatasourceModifiedOnlyIDMap = map[DatasourceModifiedOnly]DatasourceInfo{
EVENT_ALARM_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 10, "event", []string{"alarm_event"}},
PROFILE: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 11, "profile", []string{"in_process"}},
APPLOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 12, "application_log", []string{"log"}},
DEEPFLOW_TENANT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 13, "deepflow_tenant", []string{"deepflow_collector"}},
DEEPFLOW_ADMIN: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 14, "deepflow_admin", []string{"deepflow_server"}},
}

func (ds DatasourceModifiedOnly) DatasourceInfo() DatasourceInfo {
Expand Down
28 changes: 20 additions & 8 deletions server/ingester/ext_metrics/dbwriter/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,31 @@ type ExtMetrics struct {
}

func (m *ExtMetrics) DatabaseName() string {
if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS || m.MsgType == datatype.MESSAGE_TYPE_SERVER_DFSTATS {
return DEEPFLOW_SYSTEM_DB
} else {
switch m.MsgType {
case datatype.MESSAGE_TYPE_DFSTATS:
return DEEPFLOW_TENANT_DB
case datatype.MESSAGE_TYPE_SERVER_DFSTATS:
if ckdb.IsValidOrgID(m.OrgId) {
return DEEPFLOW_TENANT_DB
} else {
return DEEPFLOW_ADMIN_DB
}
default:
return EXT_METRICS_DB
}
}

func (m *ExtMetrics) TableName() string {
if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS {
return DEEPFLOW_SYSTEM_AGENT_TABLE
} else if m.MsgType == datatype.MESSAGE_TYPE_SERVER_DFSTATS {
return DEEPFLOW_SYSTEM_SERVER_TABLE
} else {
switch m.MsgType {
case datatype.MESSAGE_TYPE_DFSTATS:
return DEEPFLOW_TENANT_COLLECTOR_TABLE
case datatype.MESSAGE_TYPE_SERVER_DFSTATS:
if ckdb.IsValidOrgID(m.OrgId) {
return DEEPFLOW_TENANT_COLLECTOR_TABLE
} else {
return DEEPFLOW_ADMIN_SERVER_TABLE
}
default:
return EXT_METRICS_TABLE
}
}
Expand Down
39 changes: 32 additions & 7 deletions server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,36 @@ import (
var log = logging.MustGetLogger("ext_metrics.dbwriter")

const (
QUEUE_BATCH_SIZE = 1024
EXT_METRICS_DB = "ext_metrics"
EXT_METRICS_TABLE = "metrics"
DEEPFLOW_SYSTEM_DB = "deepflow_system"
DEEPFLOW_SYSTEM_SERVER_TABLE = "deepflow_system_server"
DEEPFLOW_SYSTEM_AGENT_TABLE = "deepflow_system_agent"
QUEUE_BATCH_SIZE = 1024
EXT_METRICS_DB = "ext_metrics"
EXT_METRICS_TABLE = "metrics"

DEEPFLOW_ADMIN_DB = "deepflow_admin"
DEEPFLOW_ADMIN_SERVER_TABLE = "deepflow_server"

DEEPFLOW_TENANT_DB = "deepflow_tenant"
DEEPFLOW_TENANT_COLLECTOR_TABLE = "deepflow_collector"
)

type WriterDBID uint8

const (
EXT_METRICS_DB_ID WriterDBID = iota
DEEPFLOW_TENANT_DB_ID
DEEPFLOW_ADMIN_DB_ID
MAX_DB_ID
)

var writerDbStrings = []string{
EXT_METRICS_DB_ID: EXT_METRICS_DB,
DEEPFLOW_TENANT_DB_ID: DEEPFLOW_TENANT_DB,
DEEPFLOW_ADMIN_DB_ID: DEEPFLOW_ADMIN_DB,
}

func (t WriterDBID) String() string {
return writerDbStrings[t]
}

type ClusterNode struct {
Addr string
Port uint16
Expand Down Expand Up @@ -178,6 +200,9 @@ func NewExtMetricsWriter(
s := AcquireExtMetrics()
s.Timestamp = uint32(time.Now().Unix())
s.MsgType = msgType
if flowTagTablePrefix == DEEPFLOW_TENANT_DB {
s.OrgId = ckdb.DEFAULT_ORG_ID
}
table := s.GenCKTable(w.ckdbCluster, w.ckdbStoragePolicy, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, s.DatabaseName(), s.TableName()))
ckwriter, err := ckwriter.NewCKWriter(
w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword,
Expand All @@ -190,6 +215,6 @@ func NewExtMetricsWriter(
w.ckWriter = ckwriter
w.ckWriter.Run()

common.RegisterCountableForIngester("ext_metrics_writer", w, stats.OptionStatTags{"msg": msgType.String(), "decoder_index": strconv.Itoa(decoderIndex)})
common.RegisterCountableForIngester("ext_metrics_writer", w, stats.OptionStatTags{"msg": msgType.String(), "decoder_index": strconv.Itoa(decoderIndex), "table": flowTagTablePrefix})
return w, nil
}
45 changes: 25 additions & 20 deletions server/ingester/ext_metrics/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ type Counter struct {
}

type Decoder struct {
index int
msgType datatype.MessageType
platformData *grpc.PlatformInfoTable
inQueue queue.QueueReader
extMetricsWriter *dbwriter.ExtMetricsWriter
debugEnabled bool
config *config.Config
index int
msgType datatype.MessageType
platformData *grpc.PlatformInfoTable
inQueue queue.QueueReader
extMetricsWriters [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter
debugEnabled bool
config *config.Config

// universal tag cache
podNameToUniversalTag [grpc.MAX_ORG_COUNT]map[string]*flow_metrics.UniversalTag
Expand All @@ -80,18 +80,18 @@ func NewDecoder(
index int, msgType datatype.MessageType,
platformData *grpc.PlatformInfoTable,
inQueue queue.QueueReader,
extMetricsWriter *dbwriter.ExtMetricsWriter,
extMetricsWriters [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter,
config *config.Config,
) *Decoder {
d := &Decoder{
index: index,
msgType: msgType,
platformData: platformData,
inQueue: inQueue,
debugEnabled: log.IsEnabledFor(logging.DEBUG),
extMetricsWriter: extMetricsWriter,
config: config,
counter: &Counter{},
index: index,
msgType: msgType,
platformData: platformData,
inQueue: inQueue,
debugEnabled: log.IsEnabledFor(logging.DEBUG),
extMetricsWriters: extMetricsWriters,
config: config,
counter: &Counter{},
}
for i := 0; i < grpc.MAX_ORG_COUNT; i++ {
d.podNameToUniversalTag[i] = make(map[string]*flow_metrics.UniversalTag)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (d *Decoder) sendTelegraf(vtapID uint16, point models.Point) {
d.counter.ErrMetrics++
return
}
d.extMetricsWriter.Write(extMetrics)
d.extMetricsWriters[int(dbwriter.EXT_METRICS_DB_ID)].Write(extMetrics)
d.counter.OutCount++
}

Expand All @@ -201,12 +201,13 @@ func (d *Decoder) handleDeepflowStats(vtapID uint16, decoder *codec.SimpleDecode
if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv deepflow stats: %v", d.index, vtapID, pbStats)
}
d.extMetricsWriter.Write(d.StatsToExtMetrics(vtapID, pbStats))
metrics, dbId := d.StatsToExtMetrics(vtapID, pbStats)
d.extMetricsWriters[dbId].Write(metrics)
d.counter.OutCount++
}
}

func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) *dbwriter.ExtMetrics {
func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) (*dbwriter.ExtMetrics, dbwriter.WriterDBID) {
m := dbwriter.AcquireExtMetrics()
m.Timestamp = uint32(s.Timestamp)
m.UniversalTag.VTAPID = vtapID
Expand All @@ -216,18 +217,22 @@ func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) *dbwriter.ExtMet
m.TagValues = s.TagValues
m.MetricsFloatNames = s.MetricsFloatNames
m.MetricsFloatValues = s.MetricsFloatValues
var writerDBID dbwriter.WriterDBID
// if OrgId is set, the set OrgId will be used first.
if s.OrgId != 0 {
m.OrgId, m.TeamID = uint16(s.OrgId), uint16(s.TeamId)
writerDBID = dbwriter.DEEPFLOW_TENANT_DB_ID
} else { // OrgId not set
// from deepflow_server, OrgId set default
if vtapID == 0 {
m.OrgId, m.TeamID = ckdb.DEFAULT_ORG_ID, ckdb.DEFAULT_TEAM_ID
writerDBID = dbwriter.DEEPFLOW_ADMIN_DB_ID
} else { // from deepflow_agent, OrgId Get from header first, then from vtapID
m.OrgId, m.TeamID = d.orgId, d.teamId
writerDBID = dbwriter.DEEPFLOW_TENANT_DB_ID
}
}
return m
return m, writerDBID
}

func (d *Decoder) fillExtMetricsBase(m *dbwriter.ExtMetrics, vtapID uint16, podName string, fillWithVtapId bool) {
Expand Down
42 changes: 23 additions & 19 deletions server/ingester/ext_metrics/ext_metrics/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,44 @@ const (
)

type ExtMetrics struct {
Config *config.Config
Telegraf *Metricsor
DeepflowAgentStats *Metricsor
DeepflowServerStats *Metricsor
Config *config.Config
Telegraf *Metricsor
DeepflowAgentStats *Metricsor
DeepflowStats *Metricsor
}

type Metricsor struct {
Config *config.Config
Decoders []*decoder.Decoder
PlatformDataEnabled bool
PlatformDatas []*grpc.PlatformInfoTable
Writer *dbwriter.ExtMetricsWriter
Writers [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter
}

func NewExtMetrics(config *config.Config, recv *receiver.Receiver, platformDataManager *grpc.PlatformDataManager) (*ExtMetrics, error) {
manager := dropletqueue.NewManager(ingesterctl.INGESTERCTL_EXTMETRICS_QUEUE)

telegraf, err := NewMetricsor(datatype.MESSAGE_TYPE_TELEGRAF, dbwriter.EXT_METRICS_DB, config, platformDataManager, manager, recv, true)
telegraf, err := NewMetricsor(datatype.MESSAGE_TYPE_TELEGRAF, []dbwriter.WriterDBID{dbwriter.EXT_METRICS_DB_ID}, config, platformDataManager, manager, recv, true)
if err != nil {
return nil, err
}
deepflowAgentStats, err := NewMetricsor(datatype.MESSAGE_TYPE_DFSTATS, dbwriter.DEEPFLOW_SYSTEM_AGENT_TABLE, config, platformDataManager, manager, recv, false)
deepflowAgentStats, err := NewMetricsor(datatype.MESSAGE_TYPE_DFSTATS, []dbwriter.WriterDBID{dbwriter.DEEPFLOW_TENANT_DB_ID}, config, platformDataManager, manager, recv, false)
if err != nil {
return nil, err
}
deepflowServerStats, err := NewMetricsor(datatype.MESSAGE_TYPE_SERVER_DFSTATS, dbwriter.DEEPFLOW_SYSTEM_SERVER_TABLE, config, platformDataManager, manager, recv, false)
deepflowStats, err := NewMetricsor(datatype.MESSAGE_TYPE_SERVER_DFSTATS, []dbwriter.WriterDBID{dbwriter.DEEPFLOW_ADMIN_DB_ID, dbwriter.DEEPFLOW_TENANT_DB_ID}, config, platformDataManager, manager, recv, false)
if err != nil {
return nil, err
}
return &ExtMetrics{
Config: config,
Telegraf: telegraf,
DeepflowAgentStats: deepflowAgentStats,
DeepflowServerStats: deepflowServerStats,
Config: config,
Telegraf: telegraf,
DeepflowAgentStats: deepflowAgentStats,
DeepflowStats: deepflowStats,
}, nil
}

func NewMetricsor(msgType datatype.MessageType, flowTagTablePrefix string, config *config.Config, platformDataManager *grpc.PlatformDataManager, manager *dropletqueue.Manager, recv *receiver.Receiver, platformDataEnabled bool) (*Metricsor, error) {
func NewMetricsor(msgType datatype.MessageType, flowTagTablePrefixs []dbwriter.WriterDBID, config *config.Config, platformDataManager *grpc.PlatformDataManager, manager *dropletqueue.Manager, recv *receiver.Receiver, platformDataEnabled bool) (*Metricsor, error) {
queueCount := config.DecoderQueueCount
decodeQueues := manager.NewQueues(
"1-receive-to-decode-"+msgType.String(),
Expand All @@ -102,16 +102,20 @@ func NewMetricsor(msgType datatype.MessageType, flowTagTablePrefix string, confi
return nil, err
}
}
metricsWriter, err := dbwriter.NewExtMetricsWriter(i, msgType, flowTagTablePrefix, config)
if err != nil {
return nil, err
var metricsWriters [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter
for _, tableId := range flowTagTablePrefixs {
metricsWriter, err := dbwriter.NewExtMetricsWriter(i, msgType, tableId.String(), config)
if err != nil {
return nil, err
}
metricsWriters[tableId] = metricsWriter
}
decoders[i] = decoder.NewDecoder(
i,
msgType,
platformDatas[i],
queue.QueueReader(decodeQueues.FixedMultiQueue[i]),
metricsWriter,
metricsWriters,
config,
)
}
Expand Down Expand Up @@ -146,12 +150,12 @@ func (m *Metricsor) Close() {
func (s *ExtMetrics) Start() {
s.Telegraf.Start()
s.DeepflowAgentStats.Start()
s.DeepflowServerStats.Start()
s.DeepflowStats.Start()
}

func (s *ExtMetrics) Close() error {
s.Telegraf.Close()
s.DeepflowAgentStats.Close()
s.DeepflowServerStats.Close()
s.DeepflowStats.Close()
return nil
}
5 changes: 0 additions & 5 deletions server/ingester/pkg/ckwriter/ckwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ func initTable(conn clickhouse.Conn, timeZone string, t *ckdb.Table, orgID uint1
if err := ExecSQL(conn, t.MakeOrgGlobalTableCreateSQL(orgID)); err != nil {
return err
}
for _, view := range t.MakeViewsCreateSQLForDeepflowSystem(orgID) {
if err := ExecSQL(conn, view); err != nil {
return err
}
}

for _, c := range t.Columns {
for _, table := range []string{t.GlobalName, t.LocalName} {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Name , ClientName , ServerName , Type , EnumFile , Category , Permission , Deprecated
time , time , time , time , , Timestamp , 111 , 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Name , DisplayName , Description
time , 时间 ,
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Name , DisplayName , Description
time , Time ,

0 comments on commit 577d080

Please sign in to comment.