From 577d080102fa4581d5d02e5ca3ec4806f76c63cb Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Mon, 3 Jun 2024 19:48:36 +0800 Subject: [PATCH] [ingester] deepflow stats store to different database by org --- server/ingester/datasource/handle.go | 4 ++ .../ext_metrics/dbwriter/ext_metrics.go | 28 ++++++++---- .../dbwriter/ext_metrics_writer.go | 39 +++++++++++++--- .../ingester/ext_metrics/decoder/decoder.go | 45 ++++++++++--------- .../ext_metrics/ext_metrics/ext_metrics.go | 42 +++++++++-------- server/ingester/pkg/ckwriter/ckwriter.go | 5 --- .../deepflow_server_common} | 0 .../deepflow_server_common.ch} | 0 .../deepflow_server_common.en} | 0 .../deepflow_tenant/deepflow_collector_common | 2 + .../deepflow_collector_common.ch | 2 + .../deepflow_collector_common.en | 2 + 12 files changed, 110 insertions(+), 59 deletions(-) rename server/querier/db_descriptions/clickhouse/tag/{deepflow_system/deepflow_system_common => deepflow_admin/deepflow_server_common} (100%) rename server/querier/db_descriptions/clickhouse/tag/{deepflow_system/deepflow_system_common.ch => deepflow_admin/deepflow_server_common.ch} (100%) rename server/querier/db_descriptions/clickhouse/tag/{deepflow_system/deepflow_system_common.en => deepflow_admin/deepflow_server_common.en} (100%) create mode 100644 server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common create mode 100644 server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.ch create mode 100644 server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.en diff --git a/server/ingester/datasource/handle.go b/server/ingester/datasource/handle.go index f0b51c804609..5983cdf8f189 100644 --- a/server/ingester/datasource/handle.go +++ b/server/ingester/datasource/handle.go @@ -55,6 +55,8 @@ const ( EVENT_ALARM_EVENT = "event.alarm_event" PROFILE = "profile.in_process" APPLOG = "application_log.log" + DEEPFLOW_TENANT = "deepflow_tenant" + DEEPFLOW_ADMIN = "deepflow_admin" ) var DatasourceModifiedOnlyIDMap = map[DatasourceModifiedOnly]DatasourceInfo{ @@ -70,6 +72,8 @@ var DatasourceModifiedOnlyIDMap = map[DatasourceModifiedOnly]DatasourceInfo{ EVENT_ALARM_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 10, "event", []string{"alarm_event"}}, PROFILE: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 11, "profile", []string{"in_process"}}, APPLOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 12, "application_log", []string{"log"}}, + DEEPFLOW_TENANT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 13, "deepflow_tenant", []string{"deepflow_collector"}}, + DEEPFLOW_ADMIN: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 14, "deepflow_admin", []string{"deepflow_server"}}, } func (ds DatasourceModifiedOnly) DatasourceInfo() DatasourceInfo { diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics.go b/server/ingester/ext_metrics/dbwriter/ext_metrics.go index 903ac71b2293..3a7efa46f784 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics.go @@ -51,19 +51,31 @@ type ExtMetrics struct { } func (m *ExtMetrics) DatabaseName() string { - if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS || m.MsgType == datatype.MESSAGE_TYPE_SERVER_DFSTATS { - return DEEPFLOW_SYSTEM_DB - } else { + switch m.MsgType { + case datatype.MESSAGE_TYPE_DFSTATS: + return DEEPFLOW_TENANT_DB + case datatype.MESSAGE_TYPE_SERVER_DFSTATS: + if ckdb.IsValidOrgID(m.OrgId) { + return DEEPFLOW_TENANT_DB + } else { + return DEEPFLOW_ADMIN_DB + } + default: return EXT_METRICS_DB } } func (m *ExtMetrics) TableName() string { - if m.MsgType == datatype.MESSAGE_TYPE_DFSTATS { - return DEEPFLOW_SYSTEM_AGENT_TABLE - } else if m.MsgType == datatype.MESSAGE_TYPE_SERVER_DFSTATS { - return DEEPFLOW_SYSTEM_SERVER_TABLE - } else { + switch m.MsgType { + case datatype.MESSAGE_TYPE_DFSTATS: + return DEEPFLOW_TENANT_COLLECTOR_TABLE + case datatype.MESSAGE_TYPE_SERVER_DFSTATS: + if ckdb.IsValidOrgID(m.OrgId) { + return DEEPFLOW_TENANT_COLLECTOR_TABLE + } else { + return DEEPFLOW_ADMIN_SERVER_TABLE + } + default: return EXT_METRICS_TABLE } } diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go b/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go index e22493408f91..f44b9aa58749 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go @@ -38,14 +38,36 @@ import ( var log = logging.MustGetLogger("ext_metrics.dbwriter") const ( - QUEUE_BATCH_SIZE = 1024 - EXT_METRICS_DB = "ext_metrics" - EXT_METRICS_TABLE = "metrics" - DEEPFLOW_SYSTEM_DB = "deepflow_system" - DEEPFLOW_SYSTEM_SERVER_TABLE = "deepflow_system_server" - DEEPFLOW_SYSTEM_AGENT_TABLE = "deepflow_system_agent" + QUEUE_BATCH_SIZE = 1024 + EXT_METRICS_DB = "ext_metrics" + EXT_METRICS_TABLE = "metrics" + + DEEPFLOW_ADMIN_DB = "deepflow_admin" + DEEPFLOW_ADMIN_SERVER_TABLE = "deepflow_server" + + DEEPFLOW_TENANT_DB = "deepflow_tenant" + DEEPFLOW_TENANT_COLLECTOR_TABLE = "deepflow_collector" +) + +type WriterDBID uint8 + +const ( + EXT_METRICS_DB_ID WriterDBID = iota + DEEPFLOW_TENANT_DB_ID + DEEPFLOW_ADMIN_DB_ID + MAX_DB_ID ) +var writerDbStrings = []string{ + EXT_METRICS_DB_ID: EXT_METRICS_DB, + DEEPFLOW_TENANT_DB_ID: DEEPFLOW_TENANT_DB, + DEEPFLOW_ADMIN_DB_ID: DEEPFLOW_ADMIN_DB, +} + +func (t WriterDBID) String() string { + return writerDbStrings[t] +} + type ClusterNode struct { Addr string Port uint16 @@ -178,6 +200,9 @@ func NewExtMetricsWriter( s := AcquireExtMetrics() s.Timestamp = uint32(time.Now().Unix()) s.MsgType = msgType + if flowTagTablePrefix == DEEPFLOW_TENANT_DB { + s.OrgId = ckdb.DEFAULT_ORG_ID + } table := s.GenCKTable(w.ckdbCluster, w.ckdbStoragePolicy, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, s.DatabaseName(), s.TableName())) ckwriter, err := ckwriter.NewCKWriter( w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword, @@ -190,6 +215,6 @@ func NewExtMetricsWriter( w.ckWriter = ckwriter w.ckWriter.Run() - common.RegisterCountableForIngester("ext_metrics_writer", w, stats.OptionStatTags{"msg": msgType.String(), "decoder_index": strconv.Itoa(decoderIndex)}) + common.RegisterCountableForIngester("ext_metrics_writer", w, stats.OptionStatTags{"msg": msgType.String(), "decoder_index": strconv.Itoa(decoderIndex), "table": flowTagTablePrefix}) return w, nil } diff --git a/server/ingester/ext_metrics/decoder/decoder.go b/server/ingester/ext_metrics/decoder/decoder.go index 4bcaaf39182b..c89e06bb8849 100644 --- a/server/ingester/ext_metrics/decoder/decoder.go +++ b/server/ingester/ext_metrics/decoder/decoder.go @@ -56,13 +56,13 @@ type Counter struct { } type Decoder struct { - index int - msgType datatype.MessageType - platformData *grpc.PlatformInfoTable - inQueue queue.QueueReader - extMetricsWriter *dbwriter.ExtMetricsWriter - debugEnabled bool - config *config.Config + index int + msgType datatype.MessageType + platformData *grpc.PlatformInfoTable + inQueue queue.QueueReader + extMetricsWriters [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter + debugEnabled bool + config *config.Config // universal tag cache podNameToUniversalTag [grpc.MAX_ORG_COUNT]map[string]*flow_metrics.UniversalTag @@ -80,18 +80,18 @@ func NewDecoder( index int, msgType datatype.MessageType, platformData *grpc.PlatformInfoTable, inQueue queue.QueueReader, - extMetricsWriter *dbwriter.ExtMetricsWriter, + extMetricsWriters [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter, config *config.Config, ) *Decoder { d := &Decoder{ - index: index, - msgType: msgType, - platformData: platformData, - inQueue: inQueue, - debugEnabled: log.IsEnabledFor(logging.DEBUG), - extMetricsWriter: extMetricsWriter, - config: config, - counter: &Counter{}, + index: index, + msgType: msgType, + platformData: platformData, + inQueue: inQueue, + debugEnabled: log.IsEnabledFor(logging.DEBUG), + extMetricsWriters: extMetricsWriters, + config: config, + counter: &Counter{}, } for i := 0; i < grpc.MAX_ORG_COUNT; i++ { d.podNameToUniversalTag[i] = make(map[string]*flow_metrics.UniversalTag) @@ -175,7 +175,7 @@ func (d *Decoder) sendTelegraf(vtapID uint16, point models.Point) { d.counter.ErrMetrics++ return } - d.extMetricsWriter.Write(extMetrics) + d.extMetricsWriters[int(dbwriter.EXT_METRICS_DB_ID)].Write(extMetrics) d.counter.OutCount++ } @@ -201,12 +201,13 @@ func (d *Decoder) handleDeepflowStats(vtapID uint16, decoder *codec.SimpleDecode if d.debugEnabled { log.Debugf("decoder %d vtap %d recv deepflow stats: %v", d.index, vtapID, pbStats) } - d.extMetricsWriter.Write(d.StatsToExtMetrics(vtapID, pbStats)) + metrics, dbId := d.StatsToExtMetrics(vtapID, pbStats) + d.extMetricsWriters[dbId].Write(metrics) d.counter.OutCount++ } } -func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) *dbwriter.ExtMetrics { +func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) (*dbwriter.ExtMetrics, dbwriter.WriterDBID) { m := dbwriter.AcquireExtMetrics() m.Timestamp = uint32(s.Timestamp) m.UniversalTag.VTAPID = vtapID @@ -216,18 +217,22 @@ func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) *dbwriter.ExtMet m.TagValues = s.TagValues m.MetricsFloatNames = s.MetricsFloatNames m.MetricsFloatValues = s.MetricsFloatValues + var writerDBID dbwriter.WriterDBID // if OrgId is set, the set OrgId will be used first. if s.OrgId != 0 { m.OrgId, m.TeamID = uint16(s.OrgId), uint16(s.TeamId) + writerDBID = dbwriter.DEEPFLOW_TENANT_DB_ID } else { // OrgId not set // from deepflow_server, OrgId set default if vtapID == 0 { m.OrgId, m.TeamID = ckdb.DEFAULT_ORG_ID, ckdb.DEFAULT_TEAM_ID + writerDBID = dbwriter.DEEPFLOW_ADMIN_DB_ID } else { // from deepflow_agent, OrgId Get from header first, then from vtapID m.OrgId, m.TeamID = d.orgId, d.teamId + writerDBID = dbwriter.DEEPFLOW_TENANT_DB_ID } } - return m + return m, writerDBID } func (d *Decoder) fillExtMetricsBase(m *dbwriter.ExtMetrics, vtapID uint16, podName string, fillWithVtapId bool) { diff --git a/server/ingester/ext_metrics/ext_metrics/ext_metrics.go b/server/ingester/ext_metrics/ext_metrics/ext_metrics.go index 5cce3c59e5e0..34855746b7be 100644 --- a/server/ingester/ext_metrics/ext_metrics/ext_metrics.go +++ b/server/ingester/ext_metrics/ext_metrics/ext_metrics.go @@ -41,10 +41,10 @@ const ( ) type ExtMetrics struct { - Config *config.Config - Telegraf *Metricsor - DeepflowAgentStats *Metricsor - DeepflowServerStats *Metricsor + Config *config.Config + Telegraf *Metricsor + DeepflowAgentStats *Metricsor + DeepflowStats *Metricsor } type Metricsor struct { @@ -52,33 +52,33 @@ type Metricsor struct { Decoders []*decoder.Decoder PlatformDataEnabled bool PlatformDatas []*grpc.PlatformInfoTable - Writer *dbwriter.ExtMetricsWriter + Writers [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter } func NewExtMetrics(config *config.Config, recv *receiver.Receiver, platformDataManager *grpc.PlatformDataManager) (*ExtMetrics, error) { manager := dropletqueue.NewManager(ingesterctl.INGESTERCTL_EXTMETRICS_QUEUE) - telegraf, err := NewMetricsor(datatype.MESSAGE_TYPE_TELEGRAF, dbwriter.EXT_METRICS_DB, config, platformDataManager, manager, recv, true) + telegraf, err := NewMetricsor(datatype.MESSAGE_TYPE_TELEGRAF, []dbwriter.WriterDBID{dbwriter.EXT_METRICS_DB_ID}, config, platformDataManager, manager, recv, true) if err != nil { return nil, err } - deepflowAgentStats, err := NewMetricsor(datatype.MESSAGE_TYPE_DFSTATS, dbwriter.DEEPFLOW_SYSTEM_AGENT_TABLE, config, platformDataManager, manager, recv, false) + deepflowAgentStats, err := NewMetricsor(datatype.MESSAGE_TYPE_DFSTATS, []dbwriter.WriterDBID{dbwriter.DEEPFLOW_TENANT_DB_ID}, config, platformDataManager, manager, recv, false) if err != nil { return nil, err } - deepflowServerStats, err := NewMetricsor(datatype.MESSAGE_TYPE_SERVER_DFSTATS, dbwriter.DEEPFLOW_SYSTEM_SERVER_TABLE, config, platformDataManager, manager, recv, false) + deepflowStats, err := NewMetricsor(datatype.MESSAGE_TYPE_SERVER_DFSTATS, []dbwriter.WriterDBID{dbwriter.DEEPFLOW_ADMIN_DB_ID, dbwriter.DEEPFLOW_TENANT_DB_ID}, config, platformDataManager, manager, recv, false) if err != nil { return nil, err } return &ExtMetrics{ - Config: config, - Telegraf: telegraf, - DeepflowAgentStats: deepflowAgentStats, - DeepflowServerStats: deepflowServerStats, + Config: config, + Telegraf: telegraf, + DeepflowAgentStats: deepflowAgentStats, + DeepflowStats: deepflowStats, }, nil } -func NewMetricsor(msgType datatype.MessageType, flowTagTablePrefix string, config *config.Config, platformDataManager *grpc.PlatformDataManager, manager *dropletqueue.Manager, recv *receiver.Receiver, platformDataEnabled bool) (*Metricsor, error) { +func NewMetricsor(msgType datatype.MessageType, flowTagTablePrefixs []dbwriter.WriterDBID, config *config.Config, platformDataManager *grpc.PlatformDataManager, manager *dropletqueue.Manager, recv *receiver.Receiver, platformDataEnabled bool) (*Metricsor, error) { queueCount := config.DecoderQueueCount decodeQueues := manager.NewQueues( "1-receive-to-decode-"+msgType.String(), @@ -102,16 +102,20 @@ func NewMetricsor(msgType datatype.MessageType, flowTagTablePrefix string, confi return nil, err } } - metricsWriter, err := dbwriter.NewExtMetricsWriter(i, msgType, flowTagTablePrefix, config) - if err != nil { - return nil, err + var metricsWriters [dbwriter.MAX_DB_ID]*dbwriter.ExtMetricsWriter + for _, tableId := range flowTagTablePrefixs { + metricsWriter, err := dbwriter.NewExtMetricsWriter(i, msgType, tableId.String(), config) + if err != nil { + return nil, err + } + metricsWriters[tableId] = metricsWriter } decoders[i] = decoder.NewDecoder( i, msgType, platformDatas[i], queue.QueueReader(decodeQueues.FixedMultiQueue[i]), - metricsWriter, + metricsWriters, config, ) } @@ -146,12 +150,12 @@ func (m *Metricsor) Close() { func (s *ExtMetrics) Start() { s.Telegraf.Start() s.DeepflowAgentStats.Start() - s.DeepflowServerStats.Start() + s.DeepflowStats.Start() } func (s *ExtMetrics) Close() error { s.Telegraf.Close() s.DeepflowAgentStats.Close() - s.DeepflowServerStats.Close() + s.DeepflowStats.Close() return nil } diff --git a/server/ingester/pkg/ckwriter/ckwriter.go b/server/ingester/pkg/ckwriter/ckwriter.go index e4c188a4d9d7..f8cebb1c9f3b 100644 --- a/server/ingester/pkg/ckwriter/ckwriter.go +++ b/server/ingester/pkg/ckwriter/ckwriter.go @@ -98,11 +98,6 @@ func initTable(conn clickhouse.Conn, timeZone string, t *ckdb.Table, orgID uint1 if err := ExecSQL(conn, t.MakeOrgGlobalTableCreateSQL(orgID)); err != nil { return err } - for _, view := range t.MakeViewsCreateSQLForDeepflowSystem(orgID) { - if err := ExecSQL(conn, view); err != nil { - return err - } - } for _, c := range t.Columns { for _, table := range []string{t.GlobalName, t.LocalName} { diff --git a/server/querier/db_descriptions/clickhouse/tag/deepflow_system/deepflow_system_common b/server/querier/db_descriptions/clickhouse/tag/deepflow_admin/deepflow_server_common similarity index 100% rename from server/querier/db_descriptions/clickhouse/tag/deepflow_system/deepflow_system_common rename to server/querier/db_descriptions/clickhouse/tag/deepflow_admin/deepflow_server_common diff --git a/server/querier/db_descriptions/clickhouse/tag/deepflow_system/deepflow_system_common.ch b/server/querier/db_descriptions/clickhouse/tag/deepflow_admin/deepflow_server_common.ch similarity index 100% rename from server/querier/db_descriptions/clickhouse/tag/deepflow_system/deepflow_system_common.ch rename to server/querier/db_descriptions/clickhouse/tag/deepflow_admin/deepflow_server_common.ch diff --git a/server/querier/db_descriptions/clickhouse/tag/deepflow_system/deepflow_system_common.en b/server/querier/db_descriptions/clickhouse/tag/deepflow_admin/deepflow_server_common.en similarity index 100% rename from server/querier/db_descriptions/clickhouse/tag/deepflow_system/deepflow_system_common.en rename to server/querier/db_descriptions/clickhouse/tag/deepflow_admin/deepflow_server_common.en diff --git a/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common b/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common new file mode 100644 index 000000000000..939e898f469b --- /dev/null +++ b/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common @@ -0,0 +1,2 @@ +# Name , ClientName , ServerName , Type , EnumFile , Category , Permission , Deprecated +time , time , time , time , , Timestamp , 111 , 0 diff --git a/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.ch b/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.ch new file mode 100644 index 000000000000..9987444c08fb --- /dev/null +++ b/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.ch @@ -0,0 +1,2 @@ +# Name , DisplayName , Description +time , 时间 , diff --git a/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.en b/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.en new file mode 100644 index 000000000000..470e724631e3 --- /dev/null +++ b/server/querier/db_descriptions/clickhouse/tag/deepflow_tenant/deepflow_collector_common.en @@ -0,0 +1,2 @@ +# Name , DisplayName , Description +time , Time ,