Skip to content

Commit

Permalink
feat(source): support webhook source table
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Nov 7, 2024
1 parent a945f52 commit 5ac5bc9
Show file tree
Hide file tree
Showing 48 changed files with 908 additions and 15 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- Install dependencies"
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests
apt-get -y install jq

echo "--- e2e, inline test"
Expand Down Expand Up @@ -154,3 +154,22 @@ risedev slt './e2e_test/source_legacy/basic/old_row_format_syntax/*.slt'
echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
risedev slt './e2e_test/ch_benchmark/streaming/*.slt'

risedev ci-kill
echo "--- cluster killed "

echo "--- starting risingwave cluster for webhook source test"
risedev ci-start ci-1cn-1fe-with-recovery
sleep 5
# check results
risedev slt "e2e_test/webhook/webhook_source.slt"

risedev kill

risedev dev ci-1cn-1fe-with-recovery
echo "--- wait for cluster recovery finish"
sleep 20
risedev slt "e2e_test/webhook/webhook_source_recovery.slt"

risedev ci-kill
echo "--- cluster killed "
9 changes: 9 additions & 0 deletions e2e_test/webhook/check_1.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha256;
----
github sha256

query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
11 changes: 11 additions & 0 deletions e2e_test/webhook/check_2.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha256;
----
github sha256
github sha256

query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1
13 changes: 13 additions & 0 deletions e2e_test/webhook/check_3.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha256;
----
github sha256
github sha256
github sha256

query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1
github sha1
21 changes: 21 additions & 0 deletions e2e_test/webhook/create_table.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

statement ok
create table github_sha1 (
data JSONB
) WITH (
connector = 'webhook',
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'X-Hub-Signature',
hmac(test_secret, data, 'sha1')
);

statement ok
create table github_sha256 (
data JSONB
) WITH (
connector = 'webhook',
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'X-Hub-Signature-256',
hmac(test_secret, data, 'sha256')
);

7 changes: 7 additions & 0 deletions e2e_test/webhook/drop_table.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

statement ok
DROP TABLE github_sha256;

statement ok
DROP TABLE github_sha1;

88 changes: 88 additions & 0 deletions e2e_test/webhook/sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import argparse
import requests
import json
import sys
import hmac
import hashlib

message = {
"event": "order.created",
"source": "placeholder",
"auth_algo": "placeholder",
"data": {
"order_id": 1234,
"customer_name": "Alice",
"amount": 99.99,
"currency": "USD"
},
"timestamp": 1639581841
}

SERVER_URL = "http://127.0.0.1:8080/message/root/dev/public/"


def generate_signature_hmac(secret, payload, auth_algo):
secret_bytes = bytes(secret, 'utf-8')
payload_bytes = bytes(payload, 'utf-8')
signature = ""
if auth_algo == "sha1":
signature = "sha1=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha1).hexdigest()
elif auth_algo == "sha256":
signature = "sha256=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha256).hexdigest()
else:
print("Unsupported auth type")
sys.exit(1)
return signature


def send_webhook(url, headers, payload_json):
response = requests.post(url, headers=headers, data=payload_json)

# Check response status and exit on failure
if response.status_code == 200:
print("Webhook sent successfully:", response)
else:
print(f"Webhook failed to send, Status Code: {response.status_code}, Response: {response.text}")
sys.exit(1) # Exit the program with an error


def send_github_sha1(secret):
payload = message
payload['source'] = "github"
payload['auth_algo'] = "sha1"
url = SERVER_URL + "github_sha1"

payload_json = json.dumps(payload)
signature = generate_signature_hmac(secret, payload_json, 'sha1')
# Webhook message headers
headers = {
"Content-Type": "application/json",
"X-Hub-Signature": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


def send_github_sha256(secret):
payload = message
payload['source'] = "github"
payload['auth_algo'] = "sha256"
url = SERVER_URL + "github_sha256"

payload_json = json.dumps(payload)
signature = generate_signature_hmac(secret, payload_json, 'sha256')
# Webhook message headers
headers = {
"Content-Type": "application/json",
"X-Hub-Signature-256": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simulate sending Webhook messages")
parser.add_argument("--secret", required=True, help="Secret key for generating signature")
args = parser.parse_args()
secret = args.secret
# send data
send_github_sha1(secret)
send_github_sha256(secret)
27 changes: 27 additions & 0 deletions e2e_test/webhook/webhook_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Simulation test for table with webhook source

control substitution on

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE SECRET test_secret WITH ( backend = 'meta') AS 'TEST_WEBHOOK';

include ./create_table.slt.part

# insert once
system ok
python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK

sleep 3s

include ./check_1.slt.part

# insert again
system ok
python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK

sleep 3s

include ./check_2.slt.part
19 changes: 19 additions & 0 deletions e2e_test/webhook/webhook_source_recovery.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Simulation test for table with webhook source after recovery

control substitution on

statement ok
SET RW_IMPLICIT_FLUSH TO true;

# insert once
system ok
python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK

sleep 3s

include ./check_3.slt.part

include ./drop_table.slt.part

statement ok
DROP SECRET test_secret
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ message StreamSourceInfo {
map<string, secret.SecretRef> format_encode_secret_refs = 16;
}

message WebhookSourceInfo {
secret.SecretRef secret_ref = 1;
string header_key = 2;
expr.ExprNode signature_expr = 3;
}

message Source {
// For shared source, this is the same as the job id.
// For non-shared source and table with connector, this is a different oid.
Expand Down Expand Up @@ -433,6 +439,9 @@ message Table {
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
optional uint32 maybe_vnode_count = 40;

// The information used by webhook source to validate the incoming data.
optional WebhookSourceInfo webhook_info = 41;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ message ExprNode {
INET_NTOA = 329;
QUOTE_LITERAL = 330;
QUOTE_NULLABLE = 331;
HMAC = 332;

// Unary operators
NEG = 401;
Expand Down
22 changes: 22 additions & 0 deletions src/common/secret/src/secret_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,28 @@ impl LocalSecretManager {
Ok(options)
}

pub fn fill_secret(&self, secret_ref: PbSecretRef) -> SecretResult<String> {
let secret_guard = self.secrets.read();
let secret_id = secret_ref.secret_id;
let pb_secret_bytes = secret_guard
.get(&secret_id)
.ok_or(SecretError::ItemNotFound(secret_id))?;
let secret_value_bytes = Self::get_secret_value(pb_secret_bytes)?;
match secret_ref.ref_as() {
RefAsType::Text => {
// We converted the secret string from sql to bytes using `as_bytes` in frontend.
// So use `from_utf8` here to convert it back to string.
Ok(String::from_utf8(secret_value_bytes.clone())?)
}
RefAsType::File => {
let path_str =
self.get_or_init_secret_file(secret_id, secret_value_bytes.clone())?;
Ok(path_str)
}
RefAsType::Unspecified => Err(SecretError::UnspecifiedRefType(secret_id)),
}
}

/// Get the secret file for the given secret id and return the path string. If the file does not exist, create it.
/// WARNING: This method should be called only when the secret manager is locked.
fn get_or_init_secret_file(
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;

pub const WEBHOOK_CONNECTOR: &str = "webhook";

pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
const PREFIXES: &[&str] = &[
"schema.registry",
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
ginepro = "0.8"
hex = "0.4"
hmac = "0.12"
icelake = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
Expand Down
Loading

0 comments on commit 5ac5bc9

Please sign in to comment.