Skip to content

Commit

Permalink
feat(source): support webhook source table
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Nov 6, 2024
1 parent a945f52 commit 45db053
Show file tree
Hide file tree
Showing 39 changed files with 732 additions and 44 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ message StreamSourceInfo {
map<string, secret.SecretRef> format_encode_secret_refs = 16;
}

message WebhookSourceInfo {
secret.SecretRef secret_ref = 1;
string header_key = 2;
expr.ExprNode signature_expr = 3;
}

message Source {
// For shared source, this is the same as the job id.
// For non-shared source and table with connector, this is a different oid.
Expand Down Expand Up @@ -433,6 +439,8 @@ message Table {
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
optional uint32 maybe_vnode_count = 40;

optional WebhookSourceInfo webhook_info = 41;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
48 changes: 29 additions & 19 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ message ExprNode {
message NowRexNode {}
// TODO: move this into `FunctionCall`.
enum Type {
// `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the viriant of `rex_node`.
// Their types are therefore deprecated and should be `UNSPECIFIED` instead.
// `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the
// viriant of `rex_node`. Their types are therefore deprecated and should be
// `UNSPECIFIED` instead.
reserved 1, 2, 3000;
reserved "INPUT_REF", "CONSTANT_VALUE", "UDF";

Expand Down Expand Up @@ -127,7 +128,7 @@ message ExprNode {
LPAD = 238;
RPAD = 239;
REVERSE = 240;
STRPOS = 241 [deprecated = true]; // duplicated with POSITION
STRPOS = 241 [ deprecated = true ]; // duplicated with POSITION
TO_ASCII = 242;
TO_HEX = 243;
QUOTE_IDENT = 244;
Expand Down Expand Up @@ -197,6 +198,7 @@ message ExprNode {
INET_NTOA = 329;
QUOTE_LITERAL = 330;
QUOTE_NULLABLE = 331;
HMAC = 332;

// Unary operators
NEG = 401;
Expand Down Expand Up @@ -329,7 +331,8 @@ message ExprNode {
// EXTERNAL
ICEBERG_TRANSFORM = 2201;
}
// Only use this field for function call. For other types of expression, it should be UNSPECIFIED.
// Only use this field for function call. For other types of expression, it
// should be UNSPECIFIED.
Type function_type = 1;
data.DataType return_type = 3;
oneof rex_node {
Expand Down Expand Up @@ -390,8 +393,8 @@ message Constant {

// The items which can occur in the select list of `ProjectSet` operator.
//
// When there are table functions in the SQL query `SELECT ...`, it will be planned as `ProjectSet`.
// Otherwise it will be planned as `Project`.
// When there are table functions in the SQL query `SELECT ...`, it will be
// planned as `ProjectSet`. Otherwise it will be planned as `Project`.
//
// # Examples
//
Expand Down Expand Up @@ -421,9 +424,7 @@ message ProjectSetSelectItem {
}
}

message FunctionCall {
repeated ExprNode children = 1;
}
message FunctionCall { repeated ExprNode children = 1; }

// Aggregate Function Calls for Aggregation
message AggCall {
Expand Down Expand Up @@ -460,7 +461,8 @@ message AggCall {

// user defined aggregate function
USER_DEFINED = 100;
// wraps a scalar function that takes a list as input as an aggregate function.
// wraps a scalar function that takes a list as input as an aggregate
// function.
WRAP_SCALAR = 101;
}
Kind kind = 1;
Expand Down Expand Up @@ -494,7 +496,8 @@ message WindowFrame {
enum Type {
TYPE_UNSPECIFIED = 0;

TYPE_ROWS_LEGACY = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
TYPE_ROWS_LEGACY = 2
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.

TYPE_ROWS = 5;
TYPE_RANGE = 10;
Expand Down Expand Up @@ -554,8 +557,10 @@ message WindowFrame {

Type type = 1;

Bound start = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
Bound end = 3 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
Bound start = 2
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.
Bound end = 3
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.

Exclusion exclusion = 4;

Expand Down Expand Up @@ -589,8 +594,9 @@ message WindowFunction {
WindowFrame frame = 5;
}

// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall,
// while UserDefinedFunctionMetadata is embedded as a field in TableFunction and AggCall.
// Note: due to historic reasons, UserDefinedFunction is a oneof variant
// parallel to FunctionCall, while UserDefinedFunctionMetadata is embedded as a
// field in TableFunction and AggCall.

message UserDefinedFunction {
repeated ExprNode children = 1;
Expand All @@ -601,18 +607,22 @@ message UserDefinedFunction {
// The link to the external function service.
optional string link = 5;
// An unique identifier to the function.
// - If `link` is not empty, the name of the function in the external function service.
// - If `language` is `rust` or `wasm`, the name of the function in the wasm binary file.
// - If `link` is not empty, the name of the function in the external function
// service.
// - If `language` is `rust` or `wasm`, the name of the function in the wasm
// binary file.
// - If `language` is `javascript`, the name of the function.
optional string identifier = 6;
// - If `language` is `javascript`, the source code of the function.
optional string body = 7;
// - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary.
optional bytes compressed_binary = 10;
bool always_retry_on_network_error = 9;
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
// The runtime used when javascript is used as the language. Could be
// "quickjs" or "deno".
optional string runtime = 11;
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
// The function type, which is used to execute the function. Could be "sync",
// "async", "generator" or "async_generator"
optional string function_type = 12;
}

Expand Down
24 changes: 24 additions & 0 deletions src/common/secret/src/secret_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ impl LocalSecretManager {
Ok(options)
}

pub fn fill_secret(&self, secret_ref: PbSecretRef) -> SecretResult<String> {
let secret_guard = self.secrets.read();
let secret_id = secret_ref.secret_id;
let pb_secret_bytes = secret_guard
.get(&secret_id)
.ok_or(SecretError::ItemNotFound(secret_id))?;
let secret_value_bytes = Self::get_secret_value(pb_secret_bytes)?;
match secret_ref.ref_as() {
RefAsType::Text => {
// We converted the secret string from sql to bytes using `as_bytes` in frontend.
// So use `from_utf8` here to convert it back to string.
return Ok(String::from_utf8(secret_value_bytes.clone())?);
}
RefAsType::File => {
let path_str =
self.get_or_init_secret_file(secret_id, secret_value_bytes.clone())?;
return Ok(path_str);
}
RefAsType::Unspecified => {
return Err(SecretError::UnspecifiedRefType(secret_id));
}
}
}

/// Get the secret file for the given secret id and return the path string. If the file does not exist, create it.
/// WARNING: This method should be called only when the secret manager is locked.
fn get_or_init_secret_file(
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;

pub const WEBHOOK_CONNECTOR: &str = "webhook";

pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
const PREFIXES: &[&str] = &[
"schema.registry",
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
ginepro = "0.8"
hex = "0.4"
hmac = "0.12"
icelake = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
Expand Down
78 changes: 78 additions & 0 deletions src/expr/impl/src/scalar/hmac.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance witmuth the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use hex::encode;
use hmac::{Hmac, Mac};
use risingwave_expr::function;
use sha1::Sha1;
use sha2::Sha256;

#[function("hmac(varchar, bytea, varchar) -> bytea")]
pub fn hmac<'a>(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> {
if sha_type == "sha1" {
sha1_hmac(secret, payload)
} else if sha_type == "sha256" {
sha256_hmac(secret, payload)
} else {
panic!("Unsupported SHA type: {}", sha_type)
}
}

fn sha256_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
let mut mac =
Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");

mac.update(payload);

let result = mac.finalize();
let code_bytes = result.into_bytes();
let computed_signature = format!("sha256={}", encode(code_bytes));
computed_signature.as_bytes().into()
}

fn sha1_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
let mut mac =
Hmac::<Sha1>::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");

mac.update(payload);

let result = mac.finalize();
let code_bytes = result.into_bytes();
let computed_signature = format!("sha1={}", encode(code_bytes));
computed_signature.as_bytes().into()
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn test_verify_signature_hmac_sha256() -> anyhow::Result<()> {
let secret = "your_secret_key";
let payload = b"your_webhook_payload";
let signature = b"sha256=cef8b98a91902c492b85d97f049aa4bfc5e7e3f9b8b7bf7cb49c5f829d2dac85"; // 替换为
assert!(*sha256_hmac(secret, payload) == *signature);
Ok(())
}

#[tokio::test]
async fn test_verify_signature_hmac_sha1() -> anyhow::Result<()> {
let secret = "your_secret_key";
let payload = b"your_webhook_payload";
let signature = b"sha1=65cb920a4b8c6ab8e2eab861a096a7bc2c05d8ba"; // 替换为
assert!(*sha1_hmac(secret, payload) == *signature);
Ok(())
}
}
1 change: 1 addition & 0 deletions src/expr/impl/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod extract;
mod field;
mod format;
mod format_type;
mod hmac;
mod in_;
mod int256;
mod jsonb_access;
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-recursion = "1.1.0"
async-trait = "0.1"
auto_enums = { workspace = true }
auto_impl = "1"
axum = { workspace = true }
base64 = "0.22"
bk-tree = "0.5.0"
bytes = "1"
Expand All @@ -36,6 +37,8 @@ fancy-regex = "0.14.0"
fixedbitset = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
hmac = "0.12"
iana-time-zone = "0.1"
iceberg = { workspace = true }
icelake = { workspace = true }
Expand Down Expand Up @@ -79,6 +82,7 @@ risingwave_variables = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha1 = "0.10.6"
sha2 = "0.10.7"
smallvec = { version = "1.13.1", features = ["serde"] }
speedate = "0.15.0"
Expand All @@ -97,10 +101,17 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.6", features = [
"add-extension",
"cors",
"compression-gzip",
] }
tracing = "0.1"
uuid = "1"
zstd = { version = "0.13", default-features = false }


[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl TestCase {
cdc_table_info,
include_column_options,
wildcard_idx,
webhook_info,
..
} => {
let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
Expand All @@ -453,6 +454,7 @@ impl TestCase {
with_version_column,
cdc_table_info,
include_column_options,
webhook_info,
)
.await?;
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/expr/function/builtin_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl Binder {
("sha512", raw_call(ExprType::Sha512)),
("encrypt", raw_call(ExprType::Encrypt)),
("decrypt", raw_call(ExprType::Decrypt)),
("hmac", raw_call(ExprType::Hmac)),
("left", raw_call(ExprType::Left)),
("right", raw_call(ExprType::Right)),
("inet_aton", raw_call(ExprType::InetAton)),
Expand Down
Loading

0 comments on commit 45db053

Please sign in to comment.