-
Notifications
You must be signed in to change notification settings - Fork 74
Writing Stream Smoke Test
Stream smoke test framework scan all the test suite files under the path specified in --test_folders parameter of ci_runner.py and read all the test suites specified by --test_suites parameter of ci_runner.py and run all the test suites in multiple process parallelly. When test suite process starts, a query executer process is started too and test suite process will check test_suite_config and execute setup or input in the test_suite_config and then run each test case one by one in sequence. For each test case it will read id, tags, name, description, steps and expected_results and then execute the steps. There are 2 types of step, one is statements and another is inputs. Statements is for execute SQL, sending REST requests or run command lines. When a statements step starts, the test suite process will read and send each statement to the query executer one by one. And inputs is for input data into proton under test via REST, test suite process just execute inputs in sequence. It's worthy to be mentioned that the query executer will start a new process for a stream query and then execute the next statement, while the table query will be only executed after the previous one is done. After all the steps are completed, query results are sent back from query executer to test suite process and the test suite process will check the test results by comparing the query results with the expected results specified in test case. And finally stream smoke framework will consolidate all the test suite results into an unified test report in html format.
All the stream smoke test suite files are under proton/tests/stream/test_stream_smoke, and test suite file could be end with .json in json format or in .yaml/.yml in yaml format (all the example or description are based on .json file). A test suite consists of test_suite_name, tag, test_suite_config, tests_2_run, comments, and tests, test suite file in json follows the convention below:
{
"test_suite_name": "<test_suite_name>",
"tag":"<tag>",
"test_suite_config":{
"setup": {
"statements": [
{<statement>},
{<statement},
...
{<statement>}
]
},
"tests_2_run": {"ids_2_run": ["All"|"<test_id>", "test_id", ...], "tags_2_run":["<tag>", ], "tags_2_skip":{"<setting>":["<tag>", "<tag>", ... "<tag>"],"<setting>": ["<tag>", "<tag>", ... "<tag>"]}}
},
"comments": "Tests covering the stream query smoke cases.",
"tests": [
{...},
{...},
...
{...}]
test_suite_config is for running setup statements or create streams when test suite starts, and it consists of setup contains statements and table_schemas that is a list of stream schema definition. You could write inputs to insert data (which is done by test framework via REST) into a stream table and statements to run SQL (which is done by test framework via proton python client on native protocol) in setup. And you could let smoke test framework to create streams via REST API by giving a table_schemas list in test_suite_config, of course you could use setup statements to create streams too.
A statements is a lit of statement, and a statement consists of client, query_type, terminate, query_end_timer, wait, depends_on, depends_on_stream and Act Of Done. statements is to let smoke framework to run SQL or REST or CMD on the proton under test, statement could be in setup and can be in each test, but when statement is in setup only "client":"python" is supported.
- client: client specified which type of client will be used for the statement execution, python is for a python Proton client to run SQL, rest is for a REST api client and exec is for a command line client.
- query_type: There are 2 query_types, one is stream and another is table. When the query_type is stream, query executor will start a new process to run the statement and when the stream query is killed is critical to the stream query result so kill and kill_wait in Act Of Done is provided to make it easier to make case stable.
- terminate It's used for specifying if the query in the statement is terminated automatically by query executor or manually by a explicit ** kill query** statement.
- query_end_timer It's for statement of stream query and it make the query executer to start a timer of the number of seconds specified by query_end_timer right after the stream query process is started and when the timer hits the query executor kill the query.
- wait: Let the statement to be executed after sleeping for the number of seconds specified in wait
- depends_on: Stream smoke test framework keep checking the query id specified by depends_on, when the query id is found the statement is executed. While if query id could not be found even after retries stream smoke test framework will throw exception out.
- depends_on_stream: Similar with depends_on, statement will be executed when the stream specified by depends_on_stream is found.
- **Act Of Done: see in the Act Of Done section
- query: Specify a query to be executed for the statement.
A inputs is a list of input and a input consists of table_name, depends_on, wait, Act Of Done and data.
- table_name: Specify the stream table name the data to be input.
- data: A list of data to be input, complies with the data filed format of a REST post.
To make case writing more easily, some act of done are supported in ci smoke framework, the act of done
means when a statement is done the act will be called
exist, exist_wait in statements (smoke framework only scan statements for creating stream/view, won't scan this act in inputs)
-
When to use? When we creating a stream, view or materialized view and we want to check if the stream/view/materialized view right after creating sql is called, smoke framework will scan each statement if 'create' is in the statement for "exist": "<stream/view>", "exist_wait": after the query of the statement is executed, smoke framework will wait for and then check and wait <stream/view> valid to be queried by 'show stream where name == <stream/view>', and the internal timeout of 'exist' is about 20s, if the <stream/view> can't be found in 20s, exception will be thrown. After you have
exist
check right after creating a stream/view, in the following test cases, you don't need to have depends_on_stream to check if the stream/view exists. -
Usage example of exist and exist_wait in statements: In setup phase: in this case we create stream 'multishards_stream' in setup phase and want smoke framework check and wait for the existence of multishards_stream, after doing this you don't need to have depends_on_stream in each cases.
"test_suite_config":{
"setup": {
"statements": [
{"client":"python", "query_type": "table","wait":1, "query":"drop stream if exists multishards_stream"},
{"client":"python", "query_type": "table", "wait":2,"exist":"multishards_stream", "exist_wait":2, "query":"create stream if not exists multishards_stream(i int) settings shards=3"}
]
},
"tests_2_run": {"ids_2_run": ["all"], "tags_2_run":[], "tags_2_skip":{"default":["todo", "to_support", "change", "bug", "sample"],"cluster": ["view"]}}
},
In a test case:
{
"id": 64,
"tags": ["materialized view","tumble window"],
"name": "table tail from materialized view as 5s tumble window aggregation by sum and group by id, designate eventtime colume in tumble sql",
"description": "create materialized view as materialized view as 5s tumble window aggregation by sum and group by id, select id, sum_value from table(mv), designated timestamp as eventtime column in tumble sql",
"steps":[
{"statements": [
{"client":"python", "query_type": "table", "query":"drop view if exists test_mv_stream_tumble_v"},
{"client":"python", "query_type": "table", "query":"drop stream if exists test_mv"},
{"client":"python", "query_type": "table","exist":"test_mv","exist_wait":1, "query":"create stream if not exists test_mv (`id` string, `location` string, `value` float32, `json` string, `timestamp` datetime64(3) DEFAULT now64(3))"},
{"client":"python","query_id":"300","wait":2, "query_type": "table", "exist":"test_mv_stream_tumble_v", "exist_wait": 1, "query":"create materialized view if not exists test_mv_stream_tumble_v as select id, sum(value) as sum_value, window_start as s, window_end as e from tumble(test_mv, timestamp, interval 5 second) group by id, window_start, window_end"},
{"client":"python", "query_id":"301", "run_mode":"table","wait":5,"query_type": "table", "query":"select id, sum_value from table(test_mv_stream_tumble_v)"}
]},
{"inputs": [
{"table_name":"test_mv", "wait":1, "depends_on_done":"301", "data": [["dev1", "ca", 57.3, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:00"],
["dev1", "ca", 66, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:01"],
["dev2", "ca", 58.3, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:02"]]},
{"table_name":"test_mv", "data": [["dev2", "ca", 59, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:03"],
["dev8", "ca", 67, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:02"],
["dev8", "ca", 77, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:08"]]},
{"table_name":"test_mv", "data": [["dev3", "ca", 57.3, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:00"],
["dev3", "ca", 66, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:01"],
["dev2", "ca", 76, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:05"]]},
{"table_name":"test_mv", "data": [["dev2", "ca", 80, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:03"],
["dev8", "ca", 67, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:02"],
["dev8", "ca", 77, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:08"]]}
]},
{"statements": [
{"client":"python", "query_id":"302", "wait": 5, "run_mode":"table", "query_type": "table","drop_view":"test_mv_stream_tumble_v", "drop_view_wait":1, "query":"select id, sum_value from table(test_mv_stream_tumble_v)"}
]
}
],
"expected_results": [
{"query_id":"301", "expected_results":[
]},
{"query_id":"302", "expected_results":[
["dev1", "123.3"],
["dev2", "117.3"],
["dev8", "67"]
]}
]
}
- When to use? When we want to kill a stream query right after the input batch is done or a query statement is done, "kill":"<query_id>","kill_wait": , when the input batch or query statement is done smoke framework will wait for <kill_wait> seconds and then kill the <query_id>.
- Usage example of kill and kill_wait in a input batch:
{
"id": 0,
"tags": ["tumble window"],
"name": "stream tumble window max(value) aggregate group by id, window_start, window_end",
"description": "tumble window aggregate by max and group by id, window_start, window_end",
"steps":[
{"statements": [
{"client":"python", "query_id": 101, "depends_on_stream":"test", "run_mode":"process", "query_type": "stream", "terminate":"manuel", "query":"select id, max(value), window_start, window_end from tumble(test, timestamp, interval 5 second) group by id, window_start, window_end emit stream"},
{"client":"python", "query_id": 102, "depends_on_stream":"test", "run_mode":"process", "query_type": "stream", "terminate":"manual", "query":"select id, max(value), window_start, window_end from tumble(test, timestamp, interval 5 second) group by id, window_start, window_end emit stream"}
]},
{"inputs": [
{"table_name": "test", "depends_on":"102", "data": [
["dev1", "ca", 57.3, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:00:00"],
["dev2", "ca", 58.3, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:00:03"]
]},
{"table_name": "test", "data": [["dev6", "ca", 66, "\"create_time\":\"2021-11-02 20:00:11\"", "2020-02-02 20:00:04"]]}
,
{"table_name": "test","kill":"101,102", "kill_wait":1, "data": [["dev8", "ca", 67, "\"create_time\":\"2021-11-02 20:00:11\"", "2020-02-02 20:00:10"]]}
]},
{"statements": [
{"client":"python", "query_type": "table", "query":"select id, max(value) from table(test) group by id, location"}
]
}
],
"expected_results": [
{"query_id": "101",
"expected_results":
[["dev2", "58.3", "2020-02-02 20:00:00", "2020-02-02 20:00:05"],
["dev6", "66.0", "2020-02-02 20:00:00", "2020-02-02 20:00:05"],
["dev1", "57.3", "2020-02-02 20:00:00", "2020-02-02 20:00:05"]]
},
{"query_id": "102",
"expected_results":
[["dev2", "58.3", "2020-02-02 20:00:00", "2020-02-02 20:00:05"],
["dev6", "66.0", "2020-02-02 20:00:00", "2020-02-02 20:00:05"],
["dev1", "57.3", "2020-02-02 20:00:00", "2020-02-02 20:00:05"]]
}
]
},
- Usage example of kill and kill_wait in a query statement:
{
"id": 0,
"tags": ["stream_join_stream"],
"name": "date_diff_within-inner-join",
"description": "stream to stream inner join by using date_diff_within",
"steps":[
{
"statements": [
{"client":"python","query_id":"1001", "depends_on_stream":"left_stream,right_stream","wait":1, "terminate":"manuel", "query_type": "stream", "query":"select s, ss from left_stream inner join right_stream on i=ii and date_diff_within(10s, ts, tts)"},
{"client":"python", "query_type": "table","depends_on":1001, "query": "insert into left_stream(i, s, ts) values (1, 's', '2022-05-23 15:45:10')"},
{"client":"python", "query_type": "table", "wait":1,"kill": 1001, "kill_wait":1,
"query": "insert into right_stream(ii, ss, tts) values
(1, 's1', '2022-05-23 15:44:59'),
(1, 's2', '2022-05-23 15:45:00'),
(1, 's3', '2022-05-23 15:45:09'),
(1, 's4', '2022-05-23 15:45:10'),
(1, 's5', '2022-05-23 15:45:15'),
(1, 's6', '2022-05-23 15:45:19'),
(1, 's7', '2022-05-23 15:45:20'),
(1, 's8', '2022-05-23 15:45:21')"
}
]
}
],
"expected_results": [
{
"query_id":"1001",
"expected_results":[
["s", "s2"], ["s", "s3"], ["s", "s4"], ["s", "s5"], ["s", "s6"], ["s", "s7"]
]
}
]
},
- When to use? When we want to drop a view after the input batch is done or a query statement is done, "drop_view":"<view_name>","drop_view_wait": , when the input batch or query statement is done smoke framework will wait for <drop_view_wait> seconds and then drop the <view_name>.
- Usage example of drop_view and drop_view_wait in a input batch: in the following case, we want drop_view is executed right after the stream query (query_id="301") is done.
{
"id": 63,
"tags": ["materialized view", "tumble window"],
"name": "streaming tail from materialized view as 5s tumble window aggregation by sum and group by id",
"description": "create materialized view as materialized view as 5s tumble window aggregation by sum and group by id, select id, sum_value from mv, timestamp designated as event_time_column when creating table, response should use utc time.",
"steps":[
{"statements": [
{"client":"python", "query_type": "table", "query":"drop view if exists test_mv_stream_tumble_v"},
{"client":"python", "query_type": "table","wait":1, "query":"create materialized view test_mv_stream_tumble_v as select id, sum(value) as sum_value, window_start as win_start, window_end as win_end from tumble(test_mv, timestamp, interval 5 second) group by id, win_start, win_end"},
{"client":"python", "query_id":"301", "query_type": "stream","depends_on_stream":"test_mv_stream_tumble_v","wait":5, "terminate":"manual","drop_view":"test_mv_stream_tumble_v", "drop_view_wait":1, "query":"select id, sum_value, win_start, win_end from test_mv_stream_tumble_v"}]},
{"inputs": [
{"table_name":"test_mv", "wait":1, "depends_on":"301", "data": [["dev1", "ca", 57.3, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:00"],
["dev1", "ca", 66, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:01"],
["dev2", "ca", 58.3, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:02"]]},
{"table_name":"test_mv", "data": [["dev2", "ca", 59, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:03"],
["dev8", "ca", 67, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:02"],
["dev8", "ca", 77, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:08"]]},
{"table_name":"test_mv", "data": [["dev3", "ca", 57.3, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:00"],
["dev3", "ca", 66, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:01"],
["dev2", "ca", 76, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:05"]]},
{"table_name":"test_mv", "data": [["dev2", "ca", 80, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:03"],
["dev8", "ca", 67, "\"create_time\":\"2021-11-02 20:00:01\"", "2020-02-02 20:01:02"],
["dev8", "ca", 77, "\"create_time\":\"2021-11-02 20:00:10\"", "2020-02-02 20:01:08"]]}
]},
{"statements": [
{"client":"python", "wait": 3, "run_mode":"table", "depends":{"query_id":301}, "query_type": "table", "query":"kill query where query_id = '301'"}
]
}
],
"expected_results": [{"query_id":"301", "expected_results":[
["dev1", "123.3","2020-02-02 20:01:00", "2020-02-02 20:01:05"],
["dev2", "117.3", "2020-02-02 20:01:00", "2020-02-02 20:01:05"],
["dev8", "67", "2020-02-02 20:01:00", "2020-02-02 20:01:05"]
]}
]
}
- When to use? When we want to drop a stream after the input batch is done or a query statement is done, "drop_stream":"<stream_name>","drop_stream_wait": , when the input batch or query statement is done smoke framework will wait for <drop_stream_wait> seconds and then drop the <stream_name>.
- Usage example of drop_stream and drop_stream_wait in a input batch: in the following case, we want drop_stream is executed right after the stream query (query_id="801-1") is done.
{
"id": 0,
"tags": ["type"],
"name": "default type in one table and injest",
"description": "create a stream test_type, 1 column for 1 type and ingest data with same or different column sequence of show create",
"steps":[
{"statements": [
{"client":"python", "query_type": "table","wait":1, "query":"drop stream if exists test_type"},
{"client":"python", "query_type": "table","wait":2, "query":"create stream test_type (uuid uuid, int int, uint uint8, string string, float float, decimal decimal32(3), date date, datetime datetime, enum enum('a'=1, 'b'=2, 'z'=26), tuple tuple(s string, i int), ipv4 ipv4, ipv6 ipv6, map map(string, int), nullable nullable(datetime64), timestamp datetime64(3) default now64(3))"},
{"client":"python", "query_type": "table", "depends_on_stream":"test_type", "wait":1, "query":"insert into test_type (uuid, int, uint, string, float, decimal, date, datetime, enum, tuple, ipv4, ipv6, map, nullable, timestamp) values ('4c6a2a19-4f9f-456f-b076-c43ef97255a7', -1234567, 1234567, '{@string}', 12345.123, 123456.123, to_date('2022-03-24'), to_datetime('2022-03-24 17:17:17'), 'a', ('a', 0), '192.168.168.168', '2a02:aa08:e000:3100::20', {'key1':111}, null ,'2020-01-01 11:11:11')"},
{"client":"python","query_id":"801-1", "query_end_timer":2,"drop_stream":"test_type","drop_stream_wait":2, "query_type": "stream", "query":"select uuid, int, uint, string, float, decimal, date, datetime, enum, tuple, ipv4, ipv6, map, nullable from test_type settings seek_to = 'earliest'"}
]}
],
"expected_results": [{"query_id":"801-1", "expected_results":[
["4c6a2a19-4f9f-456f-b076-c43ef97255a7", -1234567, 135, "{@string}", 12345.123, "123456.123", "2022-03-24", "2022-03-24 17:17:17", "a", "('a', 0)", "192.168.168.168", "2a02:aa08:e000:3100::20", "{'key1': 111}", "None"]]}
]
}