Skip to content

Commit

Permalink
[Server] add deepflow_admin, deepflow_tenant to data source
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye committed Jun 6, 2024
1 parent 23c4d1d commit 393fa49
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 17 deletions.
5 changes: 5 additions & 0 deletions server/controller/db/mysql/migration/rawsql/default_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ INSERT INTO alarm_policy(user_id, sub_view_type, tag_conditions, query_condition
app_type, sub_type, contrast_type, target_line_uid, target_line_name, target_field, data_level, agg, delay,
threshold_critical, lcuuid)
values(1, 1, "过滤项: N/A | 分组项: *", "", "/v1/alarm/license-0days/", "{}", "[{\"OPERATOR\": {\"return_field\": \"sysalarm_value\", \"return_field_description\": \"至少一个授权文件剩余有效期\", \"return_field_unit\": \"天\"}}]", "DeepFlow 授权过期", 2, 1, 1, 24, 1, "", "", "{\"displayName\":\"sysalarm_value\", \"unit\": \"天\"}", "1d", 1, 0, "{\"OP\":\"<=\",\"VALUE\":0}", @lcuuid);

set @lcuuid = (select uuid());
INSERT INTO data_source (display_name, data_table_collection, `interval`, retention_time, lcuuid)
VALUES ('管理侧监控数据', 'deepflow_admin.*', 0, 7*24, @lcuuid);

4 changes: 2 additions & 2 deletions server/controller/db/mysql/migration/rawsql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2297,7 +2297,7 @@ INSERT INTO data_source (id, display_name, data_table_collection, `interval`, re
VALUES (11, '网络-PCAP 数据', 'flow_log.l7_packet', 0, 3*24, @lcuuid);
set @lcuuid = (select uuid());
INSERT INTO data_source (id, display_name, data_table_collection, `interval`, retention_time, lcuuid)
VALUES (12, '系统监控数据', 'deepflow_system.*', 0, 7*24, @lcuuid);
VALUES (12, '租户侧监控数据', 'deepflow_tenant.*', 0, 7*24, @lcuuid);
set @lcuuid = (select uuid());
INSERT INTO data_source (id, display_name, data_table_collection, `interval`, retention_time, lcuuid)
VALUES (13, '外部指标数据', 'ext_metrics.*', 0, 7*24, @lcuuid);
Expand All @@ -2321,7 +2321,7 @@ INSERT INTO data_source (id, display_name, data_table_collection, `interval`, re
VALUES (19, '网络-网络策略', 'flow_metrics.traffic_policy', 60, 3*24, @lcuuid);
set @lcuuid = (select uuid());
INSERT INTO data_source (id, display_name, data_table_collection, `interval`, retention_time, lcuuid)
VALUES (20, '日志数据', 'application_log.log', 1, 7*24, @lcuuid);
VALUES (20, '日志-日志数据', 'application_log.log', 1, 30*24, @lcuuid);


CREATE TABLE IF NOT EXISTS license (
Expand Down
32 changes: 32 additions & 0 deletions server/controller/db/mysql/migration/rawsql/issu/6.5.1.42.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- modify start, add upgrade sql
DROP PROCEDURE IF EXISTS update_data_sources;

CREATE PROCEDURE update_data_sources()
BEGIN
DECLARE current_db_name VARCHAR(255);

START TRANSACTION;

UPDATE data_source SET display_name='日志-日志数据',retention_time=720 WHERE data_table_collection='application_log.log';
UPDATE data_source SET display_name='租户侧监控数据', data_table_collection='deepflow_tenant.*' WHERE data_table_collection='deepflow_system.*';

-- do migration in default db

SELECT DATABASE() INTO current_db_name;
IF @defaultDatabaseName = current_db_name THEN
IF NOT EXISTS (SELECT 1 FROM data_source WHERE data_table_collection = 'deepflow_admin.*') THEN
SET @lcuuid = (SELECT UUID());
INSERT INTO data_source (display_name, data_table_collection, `interval`, retention_time, lcuuid)
VALUES ('管理侧监控数据', 'deepflow_admin.*', 0, 7*24, @lcuuid);
END IF;
END IF;

COMMIT;

END;

CALL update_data_sources();

-- whether default db or not, update db_version to latest, remember update DB_VERSION_EXPECT in migrate/init.go
UPDATE db_version SET version='6.5.1.42';
-- modify end
2 changes: 1 addition & 1 deletion server/controller/db/mysql/migration/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package migration

const (
DB_VERSION_TABLE = "db_version"
DB_VERSION_EXPECTED = "6.5.1.41"
DB_VERSION_EXPECTED = "6.5.1.42"
)
16 changes: 10 additions & 6 deletions server/controller/http/service/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ var DEFAULT_DATA_SOURCE_DISPLAY_NAMES = []string{
"应用-调用日志", // flow_log.l7_flow_log
"网络-TCP 时序数据", // flow_log.l4_packet
"网络-PCAP 数据", // flow_log.l7_packet
"系统监控数据", // deepflow_system.*
"租户侧监控数据", // deepflow_tenant.*
"系统监控数据", // deepflow_admin.*
"外部指标数据", // ext_metrics.*
"Prometheus 数据", // prometheus.*
"事件-资源变更事件", // event.event
"事件-IO 事件", // event.perf_event
"事件-告警事件", // event.alarm_event
"应用-性能剖析", // profile.in_process
"网络-网络策略", // flow_metrics.traffic_policy
"日志数据", // application_log.log
"日志-日志数据", // application_log.log
}

func (d *DataSource) GetDataSources(orgID int, filter map[string]interface{}, specCfg *config.Specification) (resp []model.DataSource, err error) {
Expand Down Expand Up @@ -106,8 +107,10 @@ func (d *DataSource) GetDataSources(orgID int, filter map[string]interface{}, sp
collection = "flow_metrics.application*"
case "network":
collection = "flow_metrics.network*"
case "deepflow_system":
collection = "deepflow_system.*"
case "deepflow_tenant":
collection = "deepflow_tenant.*"
case "deepflow_admin.*":
collection = "deepflow_admin.*"
case "ext_metrics":
collection = "ext_metrics.*"
case "prometheus":
Expand Down Expand Up @@ -159,7 +162,8 @@ func (d *DataSource) GetDataSources(orgID int, filter map[string]interface{}, sp
dataSourceResp.IsDefault = false
}
if specCfg != nil {
if dataSource.DataTableCollection == "deepflow_system.*" {
if dataSource.DataTableCollection == "deepflow_tenant.*" ||
dataSource.DataTableCollection == "deepflow_admin.*" {
dataSourceResp.Interval = common.DATA_SOURCE_DEEPFLOW_SYSTEM_INTERVAL
}
if dataSource.DataTableCollection == "ext_metrics.*" {
Expand Down Expand Up @@ -561,7 +565,7 @@ func getName(interval int, collection string) (string, error) {
case 0:
// return value: flow_log.l4_flow_log, flow_log.l7_flow_log,
// flow_log.l4_packet, flow_log.l7_packet,
// deepflow_system, ext_metrics, prometheus,
// deepflow_tenant, deepflow_admin, ext_metrics, prometheus,
// event.event, event.perf_event, event.alarm_event
return strings.TrimSuffix(collection, ".*"), nil
case 1: // one second
Expand Down
2 changes: 1 addition & 1 deletion server/controller/http/service/rebalance/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (q *Query) GetAgentDispatcher(orgDB *mysql.DB, domainPrefix string, dataDur
}
queryURL := fmt.Sprintf("http://%sdeepflow-server:%d/v1/query", domainPrefix, config.Cfg.ListenPort)
values := url.Values{}
db := "deepflow_system"
db := "deepflow_tenant"
now := time.Now()
before := now.UTC().Add(time.Second * -1 * time.Duration(dataDuration))
sql := fmt.Sprintf("SELECT `tag.host`, Sum(`metrics.tx-bytes`) AS `tx-bps` FROM deepflow_agent_collect_sender"+
Expand Down
2 changes: 1 addition & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721
github.com/olivere/elastic v6.2.37+incompatible
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/openshift/api v0.0.0-20210422150128-d8a48168c81c // indirect
github.com/openshift/api v0.0.0-20210422150128-d8a48168c81c
github.com/openshift/client-go v0.0.0-20210422153130-25c8450d1535
github.com/pebbe/zmq4 v1.2.9
github.com/pkg/errors v0.9.1
Expand Down
12 changes: 6 additions & 6 deletions server/querier/engine/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var (
db: "ext_metrics",
}, {
input: "select Sum(`metrics.pending`) from `deepflow_server.queue`",
output: "SELECT SUM(if(indexOf(metrics_float_names, 'pending')=0,null,metrics_float_values[indexOf(metrics_float_names, 'pending')])) AS `Sum(metrics.pending)` FROM deepflow_tenant.`deepflow_collector` WHERE (virtual_table_name='deepflow_server.queue') LIMIT 10000",
output: "SELECT SUM(if(indexOf(metrics_float_names, 'pending')=0,null,metrics_float_values[indexOf(metrics_float_names, 'pending')])) AS `Sum(metrics.pending)` FROM deepflow_tenant.`deepflow_collector` PREWHERE (virtual_table_name='deepflow_server.queue') LIMIT 10000",
db: "deepflow_tenant",
}, {
input: "select `k8s.label_0` from l7_flow_log",
Expand Down Expand Up @@ -255,23 +255,23 @@ var (
db: "flow_metrics",
}, {
input: "SELECT time(time,5,1,0) as toi, AAvg(`metrics.dropped`) AS `AAvg(metrics.dropped)` FROM `deepflow_agent_collect_sender` GROUP BY toi ORDER BY toi desc",
output: "WITH toStartOfInterval(_time, toIntervalSecond(5)) + toIntervalSecond(arrayJoin([0]) * 5) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, AVG(`_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])`) AS `AAvg(metrics.dropped)` FROM (WITH toStartOfInterval(time, toIntervalSecond(1)) AS `_time` SELECT _time, SUM(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')])) AS `_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])` FROM deepflow_tenant.`deepflow_collector` WHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `_time`) GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
output: "WITH toStartOfInterval(_time, toIntervalSecond(5)) + toIntervalSecond(arrayJoin([0]) * 5) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, AVG(`_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])`) AS `AAvg(metrics.dropped)` FROM (WITH toStartOfInterval(time, toIntervalSecond(1)) AS `_time` SELECT _time, SUM(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')])) AS `_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])` FROM deepflow_tenant.`deepflow_collector` PREWHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `_time`) GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
db: "deepflow_tenant",
}, {
input: "SELECT time(time,5,1,0) as toi, Avg(`metrics.dropped`) AS `Avg(metrics.dropped)` FROM `deepflow_agent_collect_sender` GROUP BY toi ORDER BY toi desc",
output: "WITH toStartOfInterval(time, toIntervalSecond(5)) + toIntervalSecond(arrayJoin([0]) * 5) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, sum(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')]))/(5/1) AS `Avg(metrics.dropped)` FROM deepflow_tenant.`deepflow_collector` WHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
output: "WITH toStartOfInterval(time, toIntervalSecond(5)) + toIntervalSecond(arrayJoin([0]) * 5) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, sum(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')]))/(5/1) AS `Avg(metrics.dropped)` FROM deepflow_tenant.`deepflow_collector` PREWHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
db: "deepflow_tenant",
}, {
input: "SELECT time(time,120,1,0) as toi, AAvg(`metrics.dropped`) AS `AAvg(metrics.dropped)` FROM `deepflow_agent_collect_sender` GROUP BY toi ORDER BY toi desc",
output: "WITH toStartOfInterval(_time, toIntervalSecond(120)) + toIntervalSecond(arrayJoin([0]) * 120) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, AVG(`_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])`) AS `AAvg(metrics.dropped)` FROM (WITH toStartOfInterval(time, toIntervalSecond(1)) AS `_time` SELECT _time, SUM(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')])) AS `_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])` FROM deepflow_tenant.`deepflow_collector` WHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `_time`) GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
output: "WITH toStartOfInterval(_time, toIntervalSecond(120)) + toIntervalSecond(arrayJoin([0]) * 120) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, AVG(`_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])`) AS `AAvg(metrics.dropped)` FROM (WITH toStartOfInterval(time, toIntervalSecond(1)) AS `_time` SELECT _time, SUM(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')])) AS `_sum_if(indexOf(metrics_float_names, dropped)=0,null,metrics_float_values[indexOf(metrics_float_names, dropped)])` FROM deepflow_tenant.`deepflow_collector` PREWHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `_time`) GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
db: "deepflow_tenant",
}, {
input: "SELECT time(time,120,1,0) as toi, Avg(`metrics.dropped`) AS `Avg(metrics.dropped)` FROM `deepflow_agent_collect_sender` GROUP BY toi ORDER BY toi desc",
output: "WITH toStartOfInterval(time, toIntervalSecond(120)) + toIntervalSecond(arrayJoin([0]) * 120) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, sum(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')]))/(120/1) AS `Avg(metrics.dropped)` FROM deepflow_tenant.`deepflow_collector` WHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
output: "WITH toStartOfInterval(time, toIntervalSecond(120)) + toIntervalSecond(arrayJoin([0]) * 120) AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, sum(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')]))/(120/1) AS `Avg(metrics.dropped)` FROM deepflow_tenant.`deepflow_collector` PREWHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
db: "deepflow_tenant",
}, {
input: "SELECT time(time,120,1,0,30) as toi, Avg(`metrics.dropped`) AS `Avg(metrics.dropped)` FROM `deepflow_agent_collect_sender` GROUP BY toi ORDER BY toi desc",
output: "WITH toStartOfInterval(time-30, toIntervalSecond(120)) + toIntervalSecond(arrayJoin([0]) * 120) + 30 AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, sum(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')]))/(120/1) AS `Avg(metrics.dropped)` FROM deepflow_tenant.`deepflow_collector` WHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
output: "WITH toStartOfInterval(time-30, toIntervalSecond(120)) + toIntervalSecond(arrayJoin([0]) * 120) + 30 AS `_toi` SELECT toUnixTimestamp(`_toi`) AS `toi`, sum(if(indexOf(metrics_float_names, 'dropped')=0,null,metrics_float_values[indexOf(metrics_float_names, 'dropped')]))/(120/1) AS `Avg(metrics.dropped)` FROM deepflow_tenant.`deepflow_collector` PREWHERE (virtual_table_name='deepflow_agent_collect_sender') GROUP BY `toi` ORDER BY `toi` desc LIMIT 10000",
db: "deepflow_tenant",
}, {
input: "SELECT chost_id_0 from l4_flow_log WHERE NOT exist(chost_0) LIMIT 1",
Expand Down

0 comments on commit 393fa49

Please sign in to comment.