Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Nov 6, 2024
1 parent d74b1ae commit 4a07e56
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 36 deletions.
21 changes: 20 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- Install dependencies"
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests
apt-get -y install jq

echo "--- e2e, inline test"
Expand Down Expand Up @@ -154,3 +154,22 @@ risedev slt './e2e_test/source_legacy/basic/old_row_format_syntax/*.slt'
echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
risedev slt './e2e_test/ch_benchmark/streaming/*.slt'

risedev ci-kill
echo "--- cluster killed "

echo "--- starting risingwave cluster for webhook source test"
risedev ci-start ci-1cn-1fe-with-recovery
sleep 5
# check results
risedev slt "e2e_test/webhook/webhook_source.slt"

risedev kill

risedev dev ci-1cn-1fe-with-recovery
echo "--- wait for cluster recovery finish"
sleep 20
risedev slt "e2e_test/webhook/webhook_source_recovery.slt"

risedev ci-kill
echo "--- cluster killed "
Empty file removed e2e_test/webhook/check.slt.part
Empty file.
9 changes: 9 additions & 0 deletions e2e_test/webhook/check_1.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha256;
----
github sha256

query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
11 changes: 11 additions & 0 deletions e2e_test/webhook/check_2.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha256;
----
github sha256
github sha256

query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1
13 changes: 13 additions & 0 deletions e2e_test/webhook/check_3.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha256;
----
github sha256
github sha256
github sha256

query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1
github sha1
7 changes: 7 additions & 0 deletions e2e_test/webhook/drop_table.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

statement ok
DROP TABLE github_sha256;

statement ok
DROP TABLE github_sha1;

88 changes: 88 additions & 0 deletions e2e_test/webhook/sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import argparse
import requests
import json
import sys
import hmac
import hashlib

message = {
"event": "order.created",
"source": "placeholder",
"auth_algo": "placeholder",
"data": {
"order_id": 1234,
"customer_name": "Alice",
"amount": 99.99,
"currency": "USD"
},
"timestamp": 1639581841
}

SERVER_URL = "http://127.0.0.1:8080/message/root/dev/public/"


def generate_signature_hmac(secret, payload, auth_algo):
secret_bytes = bytes(secret, 'utf-8')
payload_bytes = bytes(payload, 'utf-8')
signature = ""
if auth_algo == "sha1":
signature = "sha1=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha1).hexdigest()
elif auth_algo == "sha256":
signature = "sha256=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha256).hexdigest()
else:
print("Unsupported auth type")
sys.exit(1)
return signature


def send_webhook(url, headers, payload_json):
response = requests.post(url, headers=headers, data=payload_json)

# Check response status and exit on failure
if response.status_code == 200:
print("Webhook sent successfully:", response)
else:
print(f"Webhook failed to send, Status Code: {response.status_code}, Response: {response.text}")
sys.exit(1) # Exit the program with an error


def send_github_sha1(secret):
payload = message
payload['source'] = "github"
payload['auth_algo'] = "sha1"
url = SERVER_URL + "github_sha1"

payload_json = json.dumps(payload)
signature = generate_signature_hmac(secret, payload_json, 'sha1')
# Webhook message headers
headers = {
"Content-Type": "application/json",
"X-Hub-Signature": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


def send_github_sha256(secret):
payload = message
payload['source'] = "github"
payload['auth_algo'] = "sha256"
url = SERVER_URL + "github_sha256"

payload_json = json.dumps(payload)
signature = generate_signature_hmac(secret, payload_json, 'sha256')
# Webhook message headers
headers = {
"Content-Type": "application/json",
"X-Hub-Signature-256": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simulate sending Webhook messages")
parser.add_argument("--secret", required=True, help="Secret key for generating signature")
args = parser.parse_args()
secret = args.secret
# send data
send_github_sha1(secret)
send_github_sha256(secret)
25 changes: 24 additions & 1 deletion e2e_test/webhook/webhook_source.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
# Simulation test for table with webhook source

control substitution on

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE SECRET test_secret WITH ( backend = 'meta') AS 'TEST_WEBHOOK';

include ../create_table.slt.part
include ./create_table.slt.part

# insert once
system ok
python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK

sleep 3s

include ./check_1.slt.part

# insert again
system ok
python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK

sleep 3s

include ./check_2.slt.part
19 changes: 19 additions & 0 deletions e2e_test/webhook/webhook_source_recovery.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Simulation test for table with webhook source after recovery

control substitution on

statement ok
SET RW_IMPLICIT_FLUSH TO true;

# insert once
system ok
python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK

sleep 3s

include ./check_3.slt.part

include ./drop_table.slt.part

statement ok
DROP SECRET test_secret
3 changes: 2 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@ message Table {
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
optional uint32 maybe_vnode_count = 40;

// The information used by webhook source to validate the incoming data.
optional WebhookSourceInfo webhook_info = 41;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
47 changes: 19 additions & 28 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ message ExprNode {
message NowRexNode {}
// TODO: move this into `FunctionCall`.
enum Type {
// `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the
// viriant of `rex_node`. Their types are therefore deprecated and should be
// `UNSPECIFIED` instead.
// `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the viriant of `rex_node`.
// Their types are therefore deprecated and should be `UNSPECIFIED` instead.
reserved 1, 2, 3000;
reserved "INPUT_REF", "CONSTANT_VALUE", "UDF";

Expand Down Expand Up @@ -128,7 +127,7 @@ message ExprNode {
LPAD = 238;
RPAD = 239;
REVERSE = 240;
STRPOS = 241 [ deprecated = true ]; // duplicated with POSITION
STRPOS = 241 [deprecated = true]; // duplicated with POSITION
TO_ASCII = 242;
TO_HEX = 243;
QUOTE_IDENT = 244;
Expand Down Expand Up @@ -331,8 +330,7 @@ message ExprNode {
// EXTERNAL
ICEBERG_TRANSFORM = 2201;
}
// Only use this field for function call. For other types of expression, it
// should be UNSPECIFIED.
// Only use this field for function call. For other types of expression, it should be UNSPECIFIED.
Type function_type = 1;
data.DataType return_type = 3;
oneof rex_node {
Expand Down Expand Up @@ -393,8 +391,8 @@ message Constant {

// The items which can occur in the select list of `ProjectSet` operator.
//
// When there are table functions in the SQL query `SELECT ...`, it will be
// planned as `ProjectSet`. Otherwise it will be planned as `Project`.
// When there are table functions in the SQL query `SELECT ...`, it will be planned as `ProjectSet`.
// Otherwise it will be planned as `Project`.
//
// # Examples
//
Expand Down Expand Up @@ -424,7 +422,9 @@ message ProjectSetSelectItem {
}
}

message FunctionCall { repeated ExprNode children = 1; }
message FunctionCall {
repeated ExprNode children = 1;
}

// Aggregate Function Calls for Aggregation
message AggCall {
Expand Down Expand Up @@ -461,8 +461,7 @@ message AggCall {

// user defined aggregate function
USER_DEFINED = 100;
// wraps a scalar function that takes a list as input as an aggregate
// function.
// wraps a scalar function that takes a list as input as an aggregate function.
WRAP_SCALAR = 101;
}
Kind kind = 1;
Expand Down Expand Up @@ -496,8 +495,7 @@ message WindowFrame {
enum Type {
TYPE_UNSPECIFIED = 0;

TYPE_ROWS_LEGACY = 2
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.
TYPE_ROWS_LEGACY = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.

TYPE_ROWS = 5;
TYPE_RANGE = 10;
Expand Down Expand Up @@ -557,10 +555,8 @@ message WindowFrame {

Type type = 1;

Bound start = 2
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.
Bound end = 3
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.
Bound start = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
Bound end = 3 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.

Exclusion exclusion = 4;

Expand Down Expand Up @@ -594,9 +590,8 @@ message WindowFunction {
WindowFrame frame = 5;
}

// Note: due to historic reasons, UserDefinedFunction is a oneof variant
// parallel to FunctionCall, while UserDefinedFunctionMetadata is embedded as a
// field in TableFunction and AggCall.
// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall,
// while UserDefinedFunctionMetadata is embedded as a field in TableFunction and AggCall.

message UserDefinedFunction {
repeated ExprNode children = 1;
Expand All @@ -607,22 +602,18 @@ message UserDefinedFunction {
// The link to the external function service.
optional string link = 5;
// An unique identifier to the function.
// - If `link` is not empty, the name of the function in the external function
// service.
// - If `language` is `rust` or `wasm`, the name of the function in the wasm
// binary file.
// - If `link` is not empty, the name of the function in the external function service.
// - If `language` is `rust` or `wasm`, the name of the function in the wasm binary file.
// - If `language` is `javascript`, the name of the function.
optional string identifier = 6;
// - If `language` is `javascript`, the source code of the function.
optional string body = 7;
// - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary.
optional bytes compressed_binary = 10;
bool always_retry_on_network_error = 9;
// The runtime used when javascript is used as the language. Could be
// "quickjs" or "deno".
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
optional string runtime = 11;
// The function type, which is used to execute the function. Could be "sync",
// "async", "generator" or "async_generator"
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
optional string function_type = 12;
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Binder {
);
return self.bind_array_transform(arg_list.args);
}

let mut args: Vec<_> = arg_list
.args
.iter()
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/webhook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ pub(super) mod handlers {
Path((user, database, schema, table)): Path<(String, String, String, String)>,
body: Bytes,
) -> Result<()> {
println!(
"WKXLOG receive something: {:?}, database: {}, table: {}",
body, database, table
);
let session_mgr = SESSION_MANAGER
.get()
.expect("session manager has been initialized");
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/compaction_catalog_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ mod tests {
created_at_cluster_version: None,
cdc_table_id: None,
maybe_vnode_count: None,
webhook_info: None,
}
}

Expand Down

0 comments on commit 4a07e56

Please sign in to comment.