From f686e5b184556009fede394ea908a891ddef12ab Mon Sep 17 00:00:00 2001 From: sophatvathana Date: Mon, 1 Dec 2025 19:03:06 +0700 Subject: [PATCH] feat(adapter): introduce AWS adapter support for SQS and EventBridge, refactor existing SQS adapter, and update configuration handling --- Cargo.lock | 373 +++++++- Cargo.toml | 5 +- .../{adapter-sqs => adapter-aws}/Cargo.toml | 4 +- .../rohas-adapters/adapter-aws/src/common.rs | 87 ++ .../adapter-aws/src/eventbridge.rs | 844 ++++++++++++++++++ crates/rohas-adapters/adapter-aws/src/lib.rs | 255 ++++++ crates/rohas-adapters/adapter-aws/src/sqs.rs | 362 ++++++++ crates/rohas-adapters/adapter-sqs/src/lib.rs | 13 - crates/rohas-cli/src/commands/codegen.rs | 2 +- crates/rohas-cli/src/commands/dev.rs | 4 +- crates/rohas-cli/src/commands/init.rs | 2 +- crates/rohas-cli/src/commands/validate.rs | 2 +- crates/rohas-dev-server/src/lib.rs | 14 +- crates/rohas-dev-server/src/ts_compiler.rs | 8 +- crates/rohas-engine/Cargo.toml | 1 + crates/rohas-engine/src/adapter.rs | 110 +++ crates/rohas-engine/src/api.rs | 4 +- crates/rohas-engine/src/config.rs | 38 +- crates/rohas-engine/src/engine.rs | 41 +- crates/rohas-engine/src/error.rs | 2 +- crates/rohas-engine/src/event.rs | 83 +- crates/rohas-engine/src/lib.rs | 1 + crates/rohas-parser/src/ast.rs | 2 + crates/rohas-parser/src/parser.rs | 23 +- crates/rohas-parser/src/rohas.pest | 1 + crates/rohas-runtime/src/node_runtime.rs | 40 +- crates/rohas-runtime/src/python_runtime.rs | 14 +- examples/hello-world/config/rohas.toml | 9 +- examples/hello-world/pyproject.toml | 2 +- examples/hello-world/schema/api/test_ws.ro | 2 +- examples/hello-world/schema/api/user_api.ro | 4 +- .../schema/events/timeline_events.ro | 3 + examples/hello-world/src/handlers/api/test.py | 2 +- .../handlers/events/bottleneck_detected.py | 1 + .../{logging.py => request_logging.py} | 12 +- 35 files changed, 2255 insertions(+), 115 deletions(-) rename crates/rohas-adapters/{adapter-sqs => adapter-aws}/Cargo.toml (84%) create mode 100644 crates/rohas-adapters/adapter-aws/src/common.rs create mode 100644 crates/rohas-adapters/adapter-aws/src/eventbridge.rs create mode 100644 crates/rohas-adapters/adapter-aws/src/lib.rs create mode 100644 crates/rohas-adapters/adapter-aws/src/sqs.rs delete mode 100644 crates/rohas-adapters/adapter-sqs/src/lib.rs create mode 100644 crates/rohas-engine/src/adapter.rs rename examples/hello-world/src/middlewares/{logging.py => request_logging.py} (84%) diff --git a/Cargo.lock b/Cargo.lock index 30a7858..65376ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adapter-aws" +version = "0.1.0" +dependencies = [ + "async-trait", + "aws-config", + "aws-sdk-eventbridge", + "aws-sdk-sqs", + "serde", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "adapter-kafka" version = "0.1.0" @@ -68,19 +83,6 @@ dependencies = [ "tokio-test", ] -[[package]] -name = "adapter-sqs" -version = "0.1.0" -dependencies = [ - "async-trait", - "aws-sdk-sqs", - "serde", - "serde_json", - "thiserror 2.0.17", - "tokio", - "tracing", -] - [[package]] name = "adler2" version = "2.0.1" @@ -456,6 +458,36 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-config" +version = "1.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0149602eeaf915158e14029ba0c78dedb8c08d554b024d54c8f239aab46511d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.4.0", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + [[package]] name = "aws-credential-types" version = "1.2.10" @@ -514,6 +546,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-eventbridge" +version = "1.97.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39c5f843b5331e2afcf7be9bed5fba5f9edf137c3f34b540c8c62b4bc53a21cb" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sqs" version = "1.90.0" @@ -536,6 +590,73 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-sso" +version = "1.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f18e53542c522459e757f81e274783a78f8c81acdfc8d1522ee8a18b5fb1c66" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.92.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532f4d866012ffa724a4385c82e8dd0e59f0ca0e600f3f22d4c03b6824b34e4a" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.94.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1be6fbbfa1a57724788853a623378223fe828fc4c09b146c992f0c95b6256174" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sigv4" version = "1.3.6" @@ -547,15 +668,20 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", + "crypto-bigint 0.5.5", "form_urlencoded", "hex", "hmac", "http 0.2.12", "http 1.4.0", + "p256", "percent-encoding", + "ring", "sha2", + "subtle", "time", "tracing", + "zeroize", ] [[package]] @@ -638,6 +764,16 @@ dependencies = [ "aws-smithy-runtime-api", ] +[[package]] +name = "aws-smithy-query" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d28a63441360c477465f80c7abac3b9c4d075ca638f982e605b7dc2a2c7156c9" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + [[package]] name = "aws-smithy-runtime" version = "1.9.4" @@ -705,6 +841,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "aws-smithy-xml" +version = "0.60.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab77cdd036b11056d2a30a7af7b775789fb024bf216acc13884c6c97752ae56" +dependencies = [ + "xmlparser", +] + [[package]] name = "aws-types" version = "1.3.10" @@ -783,6 +928,12 @@ dependencies = [ "fastrand", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.21.7" @@ -1093,8 +1244,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b77c319abfd5219629c45c34c89ba945ed3c5e49fcde9d16b6c3885f118a730" dependencies = [ "const-oid", - "der", - "spki", + "der 0.7.10", + "spki 0.7.3", "x509-cert", ] @@ -1214,6 +1365,28 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.7" @@ -1256,6 +1429,16 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "der" version = "0.7.10" @@ -1379,13 +1562,25 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der 0.6.1", + "elliptic-curve", + "rfc6979", + "signature 1.6.4", +] + [[package]] name = "ed25519" version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" dependencies = [ - "signature", + "signature 2.2.0", ] [[package]] @@ -1397,7 +1592,7 @@ dependencies = [ "curve25519-dalek", "ed25519", "sha2", - "signature", + "signature 2.2.0", "subtle", ] @@ -1407,6 +1602,26 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der 0.6.1", + "digest", + "ff", + "generic-array", + "group", + "pkcs8 0.9.0", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1468,6 +1683,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "fiat-crypto" version = "0.2.9" @@ -1722,6 +1947,17 @@ dependencies = [ "walkdir", ] +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "gzip-header" version = "1.0.0" @@ -2684,7 +2920,7 @@ dependencies = [ "base64 0.22.1", "cbc", "cms", - "der", + "der 0.7.10", "des", "hex", "hmac", @@ -2698,6 +2934,17 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "parking" version = "2.2.1" @@ -2899,9 +3146,9 @@ checksum = "695b3df3d3cc1015f12d70235e35b6b79befc5fa7a9b95b951eab1dd07c9efc2" dependencies = [ "cms", "const-oid", - "der", + "der 0.7.10", "digest", - "spki", + "spki 0.7.3", "x509-cert", "zeroize", ] @@ -2914,11 +3161,21 @@ checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" dependencies = [ "aes", "cbc", - "der", + "der 0.7.10", "pbkdf2", "scrypt", "sha2", - "spki", + "spki 0.7.3", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der 0.6.1", + "spki 0.6.0", ] [[package]] @@ -2927,8 +3184,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der", - "spki", + "der 0.7.10", + "spki 0.7.3", ] [[package]] @@ -3298,6 +3555,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "ring" version = "0.17.14" @@ -3395,6 +3663,7 @@ dependencies = [ name = "rohas-engine" version = "0.1.0" dependencies = [ + "adapter-aws", "adapter-memory", "adapter-rocksdb", "anyhow", @@ -3741,6 +4010,20 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der 0.6.1", + "generic-array", + "pkcs8 0.9.0", + "subtle", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -3930,12 +4213,22 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" dependencies = [ - "pkcs8", + "pkcs8 0.10.2", "rand_core 0.6.4", - "signature", + "signature 2.2.0", "zeroize", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "signature" version = "2.2.0" @@ -4003,6 +4296,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der 0.6.1", +] + [[package]] name = "spki" version = "0.7.3" @@ -4010,7 +4313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", - "der", + "der 0.7.10", ] [[package]] @@ -4661,6 +4964,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -5148,8 +5457,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1301e935010a701ae5f8655edc0ad17c44bad3ac5ce8c39185f75453b720ae94" dependencies = [ "const-oid", - "der", - "spki", + "der 0.7.10", + "spki 0.7.3", ] [[package]] @@ -5169,6 +5478,12 @@ dependencies = [ "time", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index b11f9ce..b242c9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ "crates/rohas-adapters/adapter-nats", "crates/rohas-adapters/adapter-kafka", "crates/rohas-adapters/adapter-rabbitmq", - "crates/rohas-adapters/adapter-sqs", + "crates/rohas-adapters/adapter-aws", "crates/rohas-adapters/adapter-rocksdb", ] @@ -70,6 +70,7 @@ async-nats = "0.45.0" rdkafka = { version = "0.38.0", features = ["cmake-build"] } lapin = "3.7.2" aws-sdk-sqs = "1.9" +aws-sdk-eventbridge = "1.9" # Watching / Hot reload notify = "8.2.0" @@ -95,7 +96,7 @@ adapter-memory = { path = "crates/rohas-adapters/adapter-memory" } adapter-nats = { path = "crates/rohas-adapters/adapter-nats" } adapter-kafka = { path = "crates/rohas-adapters/adapter-kafka" } adapter-rabbitmq = { path = "crates/rohas-adapters/adapter-rabbitmq" } -adapter-sqs = { path = "crates/rohas-adapters/adapter-sqs" } +adapter-aws = { path = "crates/rohas-adapters/adapter-aws" } adapter-rocksdb = { path = "crates/rohas-adapters/adapter-rocksdb" } rohas-telemetry = { path = "crates/rohas-telemetry" } diff --git a/crates/rohas-adapters/adapter-sqs/Cargo.toml b/crates/rohas-adapters/adapter-aws/Cargo.toml similarity index 84% rename from crates/rohas-adapters/adapter-sqs/Cargo.toml rename to crates/rohas-adapters/adapter-aws/Cargo.toml index 431e6d9..0c7f7c3 100644 --- a/crates/rohas-adapters/adapter-sqs/Cargo.toml +++ b/crates/rohas-adapters/adapter-aws/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "adapter-sqs" +name = "adapter-aws" version = { workspace = true } edition = { workspace = true } authors = { workspace = true } @@ -8,6 +8,8 @@ license = { workspace = true } [dependencies] tokio = { workspace = true } aws-sdk-sqs = { workspace = true } +aws-sdk-eventbridge = "1.9" +aws-config = "1.1" serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/crates/rohas-adapters/adapter-aws/src/common.rs b/crates/rohas-adapters/adapter-aws/src/common.rs new file mode 100644 index 0000000..b530a04 --- /dev/null +++ b/crates/rohas-adapters/adapter-aws/src/common.rs @@ -0,0 +1,87 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Error, Debug)] +pub enum AdapterError { + #[error("AWS SQS error: {0}")] + AwsSqs(String), + + #[error("AWS EventBridge error: {0}")] + AwsEventBridge(String), + + #[error("Queue not found: {0}")] + QueueNotFound(String), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("Invalid message format: {0}")] + InvalidMessage(String), + + #[error("Configuration error: {0}")] + Configuration(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Message { + pub topic: String, + pub payload: serde_json::Value, + pub timestamp: String, + pub metadata: HashMap, +} + +impl Message { + pub fn new(topic: impl Into, payload: serde_json::Value) -> Self { + use std::time::SystemTime; + Self { + topic: topic.into(), + payload, + timestamp: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + .to_string(), + metadata: HashMap::new(), + } + } + + pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } +} + +#[async_trait] +pub trait MessageHandler: Send + Sync { + async fn handle(&self, message: Message) -> Result<()>; +} + +#[derive(Debug, Clone)] +pub struct AwsConfig { + pub region: String, + pub queue_prefix: Option, // For SQS + pub event_bus_name: Option, // For EventBridge (default: "default") + pub source: Option, // For EventBridge (default: "rohas") + pub visibility_timeout_seconds: Option, // For SQS + pub message_retention_seconds: Option, // For SQS + pub receive_wait_time_seconds: Option, // For SQS (long polling) +} + +impl Default for AwsConfig { + fn default() -> Self { + Self { + region: "us-east-1".to_string(), + queue_prefix: Some("rohas-".to_string()), + event_bus_name: None, // Use default event bus + source: Some("rohas".to_string()), + visibility_timeout_seconds: Some(30), + message_retention_seconds: Some(345600), // 4 days + receive_wait_time_seconds: Some(20), // Long polling + } + } +} + diff --git a/crates/rohas-adapters/adapter-aws/src/eventbridge.rs b/crates/rohas-adapters/adapter-aws/src/eventbridge.rs new file mode 100644 index 0000000..6377386 --- /dev/null +++ b/crates/rohas-adapters/adapter-aws/src/eventbridge.rs @@ -0,0 +1,844 @@ +use crate::common::{AdapterError, Message, MessageHandler, Result}; +use aws_sdk_eventbridge::Client as EventBridgeClient; +use aws_sdk_sqs::Client as SqsClient; +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +#[derive(Debug, Clone)] +pub struct EventBridgeConfig { + pub region: String, + pub event_bus_name: Option, + pub source: Option, +} + +impl Default for EventBridgeConfig { + fn default() -> Self { + Self { + region: "us-east-1".to_string(), + event_bus_name: None, // Use default event bus + source: Some("rohas".to_string()), + } + } +} + +pub struct EventBridgeAdapter { + client: EventBridgeClient, + sqs_client: SqsClient, + #[allow(dead_code)] + config: EventBridgeConfig, + event_bus_name: String, + source: String, + published_topics: Arc>>, + queue_urls: Arc>>, // topic -> queue_url + rule_names: Arc>>, // topic -> rule_name +} + +impl EventBridgeAdapter { + + pub async fn new(config: EventBridgeConfig) -> Result { + let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(aws_sdk_eventbridge::config::Region::new(config.region.clone())) + .load() + .await; + + let client = EventBridgeClient::new(&aws_config); + let sqs_client = SqsClient::new(&aws_config); + + let event_bus_name = config.event_bus_name.clone().unwrap_or_else(|| "default".to_string()); + let source = config.source.clone().unwrap_or_else(|| "rohas".to_string()); + + info!( + "Initialized EventBridge adapter for region: {}, event_bus: {}, source: {}", + config.region, event_bus_name, source + ); + + Ok(Self { + client, + sqs_client, + config, + event_bus_name, + source, + published_topics: Arc::new(RwLock::new(HashMap::new())), + queue_urls: Arc::new(RwLock::new(HashMap::new())), + rule_names: Arc::new(RwLock::new(HashMap::new())), + }) + } + + pub async fn publish( + &self, + topic: impl Into, + payload: serde_json::Value, + ) -> Result<()> { + let topic = topic.into(); + let message = Message::new(topic.clone(), payload.clone()); + + { + let mut topics = self.published_topics.write().await; + topics.insert(topic.clone(), ()); + } + + let detail = serde_json::to_string(&message) + .map_err(|e| AdapterError::Serialization(e))?; + + info!("Publishing EventBridge event - source: '{}', detail-type: '{}', detail length: {} bytes", + self.source, topic, detail.len()); + debug!("EventBridge event detail content: {}", detail); + + let mut event_builder = aws_sdk_eventbridge::types::PutEventsRequestEntry::builder() + .source(&self.source) + .detail_type(&topic) + .detail(&detail); + + if self.event_bus_name != "default" { + event_builder = event_builder.event_bus_name(&self.event_bus_name); + } + + let event = event_builder.build(); + + let send_result = self + .client + .put_events() + .set_entries(Some(vec![event])) + .send() + .await; + + match send_result { + Ok(response) => { + let entries = response.entries(); + if !entries.is_empty() { + if let Some(entry) = entries.first() { + if let Some(error_code) = entry.error_code() { + error!( + "EventBridge publish failed for topic '{}': {} - {}", + topic, + error_code, + entry.error_message().unwrap_or("Unknown error") + ); + return Err(AdapterError::AwsEventBridge(format!( + "Failed to publish event: {} - {}", + error_code, + entry.error_message().unwrap_or("Unknown error") + ))); + } + } + } + info!( + "Published message to EventBridge topic: {} (event_bus: {}, source: {})", + topic, self.event_bus_name, self.source + ); + Ok(()) + } + Err(e) => { + error!("Failed to send message to EventBridge '{}': {}", topic, e); + Err(AdapterError::AwsEventBridge(format!( + "Failed to send event: {}", + e + ))) + } + } + } + + async fn get_or_create_queue(&self, topic: &str) -> Result { + { + let queue_urls = self.queue_urls.read().await; + if let Some(url) = queue_urls.get(topic) { + return Ok(url.clone()); + } + } + + let queue_name = format!("rohas-eb-{}", topic) + .chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c + } else { + '-' + } + }) + .collect::(); + + info!("Checking if SQS queue '{}' exists...", queue_name); + let get_queue_result = self + .sqs_client + .get_queue_url() + .queue_name(&queue_name) + .send() + .await; + + let queue_url = match get_queue_result { + Ok(response) => { + if let Some(url) = response.queue_url() { + info!("Found existing SQS queue for EventBridge topic '{}': {}", topic, url); + url.to_string() + } else { + error!("Queue URL not returned for '{}'", queue_name); + return Err(AdapterError::AwsEventBridge(format!( + "Queue URL not returned for '{}'", + queue_name + ))); + } + } + Err(e) => { + warn!("SQS queue '{}' not found (error: {}), creating new queue...", queue_name, e); + info!("Creating SQS queue for EventBridge topic '{}': {}", topic, queue_name); + let create_result = self + .sqs_client + .create_queue() + .queue_name(&queue_name) + .send() + .await + .map_err(|e| { + error!("Failed to create SQS queue '{}': {}", queue_name, e); + AdapterError::AwsEventBridge(format!("Failed to create queue '{}': {}", queue_name, e)) + })?; + + if let Some(url) = create_result.queue_url() { + info!("Created SQS queue for EventBridge topic '{}': {}", topic, url); + url.to_string() + } else { + error!("Queue created but no URL returned for '{}'", queue_name); + return Err(AdapterError::AwsEventBridge(format!( + "Queue created but no URL returned for '{}'", + queue_name + ))); + } + } + }; + + { + let mut queue_urls = self.queue_urls.write().await; + queue_urls.insert(topic.to_string(), queue_url.clone()); + } + + Ok(queue_url) + } + + async fn get_or_create_rule(&self, topic: &str, queue_arn: &str) -> Result { + { + let rule_names = self.rule_names.read().await; + if let Some(rule_name) = rule_names.get(topic) { + return Ok(rule_name.clone()); + } + } + + let rule_name = format!("rohas-rule-{}", topic) + .chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c + } else { + '-' + } + }) + .collect::(); + + let event_pattern = serde_json::json!({ + "source": [self.source], + "detail-type": [topic] + }); + + info!("EventBridge rule pattern for topic '{}': {}", topic, event_pattern.to_string()); + info!("Expected event format: source='{}', detail-type='{}'", self.source, topic); + + let get_rule_result = self + .client + .describe_rule() + .name(&rule_name) + .set_event_bus_name(if self.event_bus_name != "default" { + Some(self.event_bus_name.clone()) + } else { + None + }) + .send() + .await; + + let target_id = format!("sqs-target-{}", topic); + let target = aws_sdk_eventbridge::types::Target::builder() + .id(&target_id) + .arn(queue_arn) + .build() + .map_err(|e| { + AdapterError::AwsEventBridge(format!("Failed to build target: {}", e)) + })?; + + match get_rule_result { + Ok(rule_desc) => { + info!("Found existing EventBridge rule for topic '{}': {}", topic, rule_name); + if let Some(state) = rule_desc.state() { + match state { + aws_sdk_eventbridge::types::RuleState::Enabled => { + info!("EventBridge rule '{}' is ENABLED", rule_name); + } + aws_sdk_eventbridge::types::RuleState::Disabled => { + warn!("EventBridge rule '{}' is DISABLED - enabling it now...", rule_name); + match self + .client + .enable_rule() + .name(&rule_name) + .set_event_bus_name(if self.event_bus_name != "default" { + Some(self.event_bus_name.clone()) + } else { + None + }) + .send() + .await + { + Ok(_) => { + info!("EventBridge rule '{}' has been enabled", rule_name); + } + Err(e) => { + error!("Failed to enable EventBridge rule '{}': {}", rule_name, e); + return Err(AdapterError::AwsEventBridge(format!( + "Failed to enable rule '{}': {}", + rule_name, e + ))); + } + } + } + _ => { + warn!("EventBridge rule '{}' has unknown state: {:?}", rule_name, state); + } + } + } + info!("Ensuring SQS queue target is added to existing rule '{}'", rule_name); + } + Err(e) => { + warn!("EventBridge rule '{}' not found (error: {}), creating new rule...", rule_name, e); + info!("Creating EventBridge rule for topic '{}': {}", topic, rule_name); + + let put_rule_result = self + .client + .put_rule() + .name(&rule_name) + .event_pattern(event_pattern.to_string()) + .state(aws_sdk_eventbridge::types::RuleState::Enabled) + .set_event_bus_name(if self.event_bus_name != "default" { + Some(self.event_bus_name.clone()) + } else { + None + }) + .send() + .await + .map_err(|e| { + error!("Failed to create EventBridge rule '{}': {}", rule_name, e); + AdapterError::AwsEventBridge(format!("Failed to create rule '{}': {}", rule_name, e)) + })?; + + info!("Created EventBridge rule '{}' (arn: {:?})", rule_name, put_rule_result.rule_arn()); + } + } + + info!("Adding SQS queue as target to EventBridge rule '{}'", rule_name); + info!("Target details: ID='{}', ARN='{}'", target_id, queue_arn); + let put_targets_result = self + .client + .put_targets() + .rule(&rule_name) + .set_targets(Some(vec![target])) + .set_event_bus_name(if self.event_bus_name != "default" { + Some(self.event_bus_name.clone()) + } else { + None + }) + .send() + .await + .map_err(|e| { + error!("Failed to add target to EventBridge rule '{}': {}", rule_name, e); + AdapterError::AwsEventBridge(format!("Failed to add target to rule '{}': {}", rule_name, e)) + })?; + + let failed_entries = put_targets_result.failed_entries(); + if !failed_entries.is_empty() { + error!("Failed to add target to EventBridge rule '{}': {:?}", rule_name, failed_entries); + for entry in failed_entries { + error!(" - Error Code: {}, Error Message: {}", + entry.error_code().unwrap_or("unknown"), + entry.error_message().unwrap_or("unknown")); + } + return Err(AdapterError::AwsEventBridge(format!( + "Failed to add target to rule '{}': {:?}", + rule_name, failed_entries + ))); + } + + info!("Successfully added SQS queue as target to EventBridge rule '{}'", rule_name); + info!("Target configuration: Queue ARN='{}', Target ID='{}'", queue_arn, target_id); + + let list_targets_result = self + .client + .list_targets_by_rule() + .rule(&rule_name) + .set_event_bus_name(if self.event_bus_name != "default" { + Some(self.event_bus_name.clone()) + } else { + None + }) + .send() + .await; + + if let Ok(targets_response) = list_targets_result { + let targets = targets_response.targets(); + if targets.is_empty() { + error!("CRITICAL: EventBridge rule '{}' has NO TARGETS configured!", rule_name); + return Err(AdapterError::AwsEventBridge(format!( + "Rule '{}' has no targets configured", + rule_name + ))); + } else { + info!("Verified: EventBridge rule '{}' has {} target(s) configured", rule_name, targets.len()); + for target in targets { + let target_arn = target.arn(); + if target_arn == queue_arn { + info!("Target verified: SQS queue ARN '{}' is configured as target", queue_arn); + } else { + warn!("Found target with different ARN: {} (expected: {})", target_arn, queue_arn); + } + } + } + } else { + warn!("Could not list targets for rule '{}'", rule_name); + } + + let verify_result = self + .client + .describe_rule() + .name(&rule_name) + .set_event_bus_name(if self.event_bus_name != "default" { + Some(self.event_bus_name.clone()) + } else { + None + }) + .send() + .await; + + if let Ok(rule_desc) = verify_result { + if let Some(state) = rule_desc.state() { + match state { + aws_sdk_eventbridge::types::RuleState::Enabled => { + info!("Verified: EventBridge rule '{}' is ENABLED and ready", rule_name); + } + aws_sdk_eventbridge::types::RuleState::Disabled => { + error!("CRITICAL: EventBridge rule '{}' is still DISABLED after setup!", rule_name); + return Err(AdapterError::AwsEventBridge(format!( + "Rule '{}' is disabled and could not be enabled", + rule_name + ))); + } + _ => { + warn!("EventBridge rule '{}' has unknown state: {:?}", rule_name, state); + } + } + } + } else { + warn!("Could not verify rule state after setup"); + } + + { + let mut rule_names = self.rule_names.write().await; + rule_names.insert(topic.to_string(), rule_name.clone()); + } + + Ok(rule_name) + } + + async fn get_queue_arn(&self, queue_url: &str) -> Result { + let queue_name = queue_url + .split('/') + .last() + .ok_or_else(|| { + error!("Invalid queue URL format: {}", queue_url); + AdapterError::AwsEventBridge(format!("Invalid queue URL: {}", queue_url)) + })?; + + info!("Retrieving ARN for SQS queue '{}'...", queue_name); + + let attributes_result = self + .sqs_client + .get_queue_attributes() + .queue_url(queue_url) + .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn) + .send() + .await + .map_err(|e| { + error!("Failed to get queue attributes for '{}': {}", queue_name, e); + AdapterError::AwsEventBridge(format!("Failed to get queue attributes for '{}': {}", queue_name, e)) + })?; + + if let Some(attributes) = attributes_result.attributes() { + if let Some(arn) = attributes.get(&aws_sdk_sqs::types::QueueAttributeName::QueueArn) { + info!("Retrieved ARN for queue '{}': {}", queue_name, arn); + return Ok(arn.clone()); + } + } + + error!("Queue ARN not found in attributes for queue '{}'", queue_name); + Err(AdapterError::AwsEventBridge(format!( + "Queue ARN not found for queue '{}'", + queue_name + ))) + } + + pub async fn subscribe_fn(&self, topic: impl Into, handler: F) -> Result<()> + where + F: Fn(Message) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let topic = topic.into(); + info!("=== Setting up EventBridge subscription for topic: {} ===", topic); + + info!("Step 1: Creating/getting SQS queue for topic '{}'...", topic); + let queue_url = match self.get_or_create_queue(&topic).await { + Ok(url) => { + info!("SQS queue ready for EventBridge topic '{}': {}", topic, url); + url + } + Err(e) => { + error!("Failed to create/get SQS queue for topic '{}': {}", topic, e); + return Err(e); + } + }; + + info!("Step 2: Getting SQS queue ARN for topic '{}'...", topic); + let queue_arn = match self.get_queue_arn(&queue_url).await { + Ok(arn) => { + info!("SQS queue ARN for topic '{}': {}", topic, arn); + arn + } + Err(e) => { + error!("Failed to get SQS queue ARN for topic '{}': {}", topic, e); + return Err(e); + } + }; + + info!("Step 3: Creating/getting EventBridge rule for topic '{}'...", topic); + let rule_name = match self.get_or_create_rule(&topic, &queue_arn).await { + Ok(name) => { + info!("EventBridge rule ready for topic '{}': {}", topic, name); + name + } + Err(e) => { + error!("Failed to create/get EventBridge rule for topic '{}': {}", topic, e); + return Err(e); + } + }; + + let account_id = queue_arn + .split(':') + .nth(4) + .unwrap_or("*"); + + // EventBridge rule ARN format: + // - Default bus: arn:aws:events:region:account-id:rule/rule-name + // - Custom bus: arn:aws:events:region:account-id:rule/event-bus-name/rule-name + let rule_arn = if self.event_bus_name == "default" { + format!( + "arn:aws:events:{}:{}:rule/{}", + self.config.region, + account_id, + rule_name + ) + } else { + format!( + "arn:aws:events:{}:{}:rule/{}/{}", + self.config.region, + account_id, + self.event_bus_name, + rule_name + ) + }; + + info!("Step 4: Setting up SQS queue policy for EventBridge access (rule ARN: {})...", rule_arn); + + let policy = serde_json::json!({ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": { + "Service": "events.amazonaws.com" + }, + "Action": "sqs:SendMessage", + "Resource": queue_arn, + "Condition": { + "ArnEquals": { + "aws:SourceArn": rule_arn + } + } + }] + }); + + match self + .sqs_client + .set_queue_attributes() + .queue_url(&queue_url) + .attributes( + aws_sdk_sqs::types::QueueAttributeName::Policy, + policy.to_string(), + ) + .send() + .await + { + Ok(_) => { + info!("Successfully set SQS queue policy for EventBridge access"); + } + Err(e) => { + warn!("Failed to set SQS queue policy (this may be okay if policy already exists): {}", e); + } + } + + info!("Step 5: Starting SQS queue polling for topic '{}'...", topic); + let sqs_client = self.sqs_client.clone(); + let topic_clone = topic.clone(); + let queue_url_clone = queue_url.clone(); + let queue_arn_clone = queue_arn.clone(); + let rule_name_clone = rule_name.clone(); + + struct ClosureHandler + where + F: Fn(Message) -> Fut + Send + Sync, + Fut: std::future::Future> + Send, + { + func: F, + } + + #[async_trait] + impl MessageHandler for ClosureHandler + where + F: Fn(Message) -> Fut + Send + Sync, + Fut: std::future::Future> + Send, + { + async fn handle(&self, message: Message) -> Result<()> { + (self.func)(message).await + } + } + + let handler = Arc::new(ClosureHandler { func: handler }); + + tokio::spawn(async move { + info!("EventBridge subscription polling loop started for topic '{}' (queue: {})", topic_clone, queue_url); + let mut poll_count = 0u64; + loop { + poll_count += 1; + if poll_count % 10 == 0 { + info!("EventBridge polling loop still active for topic '{}' (poll #{}), queue: {}", topic_clone, poll_count, queue_url); + } + if poll_count == 1 || poll_count % 5 == 0 { + info!("Polling SQS queue for EventBridge topic '{}' (poll #{})...", topic_clone, poll_count); + } else { + debug!("Polling SQS queue for EventBridge topic '{}' (poll #{})...", topic_clone, poll_count); + } + let receive_result = sqs_client + .receive_message() + .queue_url(&queue_url) + .max_number_of_messages(10) + .wait_time_seconds(20) + .send() + .await; + + match receive_result { + Ok(response) => { + let messages = response.messages(); + if !messages.is_empty() { + info!("Received {} message(s) from EventBridge queue for topic '{}'", messages.len(), topic_clone); + for sqs_message in messages { + if let Some(body) = sqs_message.body() { + info!("Raw SQS message body for topic '{}': {}", topic_clone, body); + + debug!("Attempting to parse EventBridge message for topic '{}'", topic_clone); + let message_result = { + debug!("Trying to parse as array of events..."); + if let Ok(events_array) = serde_json::from_str::>(body) { + debug!("Successfully parsed as array with {} event(s)", events_array.len()); + if let Some(event) = events_array.first() { + debug!("First event structure: {:?}", event); + if let Some(detail_str) = event.get("detail").and_then(|d| d.as_str()) { + debug!("Found 'detail' field as string (length: {}): {}", detail_str.len(), detail_str); + match serde_json::from_str::(detail_str) { + Ok(msg) => { + debug!("Successfully parsed Message from detail string"); + Some(msg) + } + Err(e) => { + debug!("Failed to parse Message from detail string: {}", e); + None + } + } + } else if let Some(detail_obj) = event.get("detail") { + debug!("Found 'detail' field as object: {:?}", detail_obj); + match serde_json::from_value::(detail_obj.clone()) { + Ok(msg) => { + debug!("Successfully parsed Message from detail object"); + Some(msg) + } + Err(e) => { + debug!("Failed to parse Message from detail object: {}", e); + None + } + } + } else { + debug!("No 'detail' field found in event object"); + None + } + } else { + debug!("Array is empty"); + None + } + } else { + debug!("Not an array, trying as single event object..."); + None + } + }.or_else(|| { + debug!("Trying to parse as single event object..."); + if let Ok(event_obj) = serde_json::from_str::(body) { + debug!("Successfully parsed as event object"); + if let Some(detail_str) = event_obj.get("detail").and_then(|d| d.as_str()) { + debug!("Found 'detail' field as string (length: {}): {}", detail_str.len(), detail_str); + match serde_json::from_str::(detail_str) { + Ok(msg) => { + debug!("Successfully parsed Message from detail string"); + Some(msg) + } + Err(e) => { + debug!("Failed to parse Message from detail string: {}", e); + None + } + } + } else if let Some(detail_obj) = event_obj.get("detail") { + debug!("Found 'detail' field as object: {:?}", detail_obj); + match serde_json::from_value::(detail_obj.clone()) { + Ok(msg) => { + debug!("Successfully parsed Message from detail object"); + Some(msg) + } + Err(e) => { + debug!("Failed to parse Message from detail object: {}", e); + None + } + } + } else { + debug!("No 'detail' field found in event object"); + None + } + } else { + debug!("Not a valid JSON object, trying direct Message parse..."); + None + } + }).or_else(|| { + debug!("Trying to parse body directly as Message..."); + match serde_json::from_str::(body) { + Ok(msg) => { + debug!("Successfully parsed body directly as Message"); + Some(msg) + } + Err(e) => { + debug!("Failed to parse body directly as Message: {}", e); + None + } + } + }); + + let message_result = message_result.ok_or_else(|| { + let last_error = serde_json::from_str::(body) + .map_err(|e| e) + .unwrap_err(); + debug!("All parsing attempts failed. Last error: {}", last_error); + last_error + }); + + match message_result { + Ok(message) => { + info!("Successfully parsed EventBridge message for topic '{}'", topic_clone); + info!("Message topic: {}, payload: {:?}", message.topic, message.payload); + info!("Calling handler for EventBridge message..."); + if let Err(e) = handler.handle(message).await { + error!("Handler error for EventBridge topic '{}': {}", topic_clone, e); + } else { + info!("Handler completed successfully for EventBridge topic '{}'", topic_clone); + } + + if let Some(receipt_handle) = sqs_message.receipt_handle() { + if let Err(e) = sqs_client + .delete_message() + .queue_url(&queue_url) + .receipt_handle(receipt_handle) + .send() + .await + { + warn!( + "Failed to delete message from EventBridge queue '{}': {}", + queue_url, e + ); + } + } + } + Err(e) => { + error!( + "Failed to deserialize EventBridge message for topic '{}': {}. Body: {}", + topic_clone, e, body + ); + if let Some(receipt_handle) = sqs_message.receipt_handle() { + let _ = sqs_client + .delete_message() + .queue_url(&queue_url) + .receipt_handle(receipt_handle) + .send() + .await; + } + } + } + } + } + } else { + debug!("No messages received from EventBridge queue for topic '{}' (this is normal, continuing to poll...)", topic_clone); + } + } + Err(e) => { + error!( + "Error receiving messages from EventBridge queue '{}' for topic '{}': {}. Retrying in 5 seconds...", + queue_url, topic_clone, e + ); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + } + } + }); + + info!("=== EventBridge subscription set up successfully for topic: {} ===", topic); + info!("Summary:"); + info!(" - SQS Queue URL: {}", queue_url_clone); + info!(" - SQS Queue ARN: {}", queue_arn_clone); + info!(" - EventBridge Rule: {}", rule_name_clone); + info!(" - Event Pattern: source='{}', detail-type='{}'", self.source, topic); + info!(" - Event Bus: {}", self.event_bus_name); + info!(" - Target: SQS queue '{}' (ARN: {})", queue_url_clone, queue_arn_clone); + info!(" - Polling: Active (long polling enabled, 20s wait time)"); + info!(" - Next: Events matching the pattern will be routed to the SQS queue"); + let rule_arn_final = if self.event_bus_name == "default" { + format!( + "arn:aws:events:{}:{}:rule/{}", + self.config.region, + queue_arn_clone.split(':').nth(4).unwrap_or("unknown"), + rule_name_clone + ) + } else { + format!( + "arn:aws:events:{}:{}:rule/{}/{}", + self.config.region, + queue_arn_clone.split(':').nth(4).unwrap_or("unknown"), + self.event_bus_name, + rule_name_clone + ) + }; + info!(" - Rule ARN: {}", rule_arn_final); + info!(" - To verify: Check AWS EventBridge console for rule '{}' and ensure it has the SQS queue as a target", rule_name_clone); + Ok(()) + } + + pub async fn list_topics(&self) -> Vec { + let topics = self.published_topics.read().await; + topics.keys().cloned().collect() + } +} + diff --git a/crates/rohas-adapters/adapter-aws/src/lib.rs b/crates/rohas-adapters/adapter-aws/src/lib.rs new file mode 100644 index 0000000..655ab2d --- /dev/null +++ b/crates/rohas-adapters/adapter-aws/src/lib.rs @@ -0,0 +1,255 @@ +pub mod sqs; +pub mod eventbridge; +pub mod common; + +pub use common::{AwsConfig, Message, Result}; +pub use sqs::SqsAdapter; +pub use eventbridge::EventBridgeAdapter; + +use serde_json::Value; +use std::sync::Arc; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AwsAdapterType { + Sqs, + EventBridge, +} + +pub enum AwsAdapter { + Sqs(Arc), + EventBridge(Arc), + Both { + sqs: Arc, + eventbridge: Arc, + default_type: AwsAdapterType, + }, +} + +impl AwsAdapter { + pub async fn new( + adapter_type: AwsAdapterType, + config: AwsConfig, + ) -> common::Result { + match adapter_type { + AwsAdapterType::Sqs => { + let sqs_config = sqs::SqsConfig { + region: config.region.clone(), + queue_prefix: config.queue_prefix.clone(), + visibility_timeout_seconds: config.visibility_timeout_seconds, + message_retention_seconds: config.message_retention_seconds, + receive_wait_time_seconds: config.receive_wait_time_seconds, + }; + Ok(AwsAdapter::Sqs(Arc::new( + SqsAdapter::new(sqs_config).await? + ))) + } + AwsAdapterType::EventBridge => { + let eb_config = eventbridge::EventBridgeConfig { + region: config.region.clone(), + event_bus_name: config.event_bus_name.clone(), + source: config.source.clone(), + }; + Ok(AwsAdapter::EventBridge(Arc::new( + EventBridgeAdapter::new(eb_config).await? + ))) + } + } + } + + pub async fn new_with_both( + default_type: AwsAdapterType, + config: AwsConfig, + ) -> common::Result { + tracing::info!( + "AwsAdapter::new_with_both: Initializing with default_type: {:?}, region: {}, queue_prefix: {:?}", + default_type, + config.region, + config.queue_prefix + ); + + let sqs_config = sqs::SqsConfig { + region: config.region.clone(), + queue_prefix: config.queue_prefix.clone(), + visibility_timeout_seconds: config.visibility_timeout_seconds, + message_retention_seconds: config.message_retention_seconds, + receive_wait_time_seconds: config.receive_wait_time_seconds, + }; + tracing::info!("AwsAdapter::new_with_both: Creating SQS adapter..."); + let sqs_adapter = Arc::new(SqsAdapter::new(sqs_config).await.map_err(|e| { + tracing::error!("AwsAdapter::new_with_both: Failed to create SQS adapter: {}", e); + e + })?); + tracing::info!("AwsAdapter::new_with_both: SQS adapter created successfully"); + + let eb_config = eventbridge::EventBridgeConfig { + region: config.region.clone(), + event_bus_name: config.event_bus_name.clone(), + source: config.source.clone(), + }; + tracing::info!("AwsAdapter::new_with_both: Creating EventBridge adapter..."); + let eb_adapter = Arc::new(EventBridgeAdapter::new(eb_config).await.map_err(|e| { + tracing::error!("AwsAdapter::new_with_both: Failed to create EventBridge adapter: {}", e); + e + })?); + tracing::info!("AwsAdapter::new_with_both: EventBridge adapter created successfully"); + + tracing::info!( + "AwsAdapter::new_with_both: Both adapters initialized successfully with default_type: {:?}", + default_type + ); + + Ok(AwsAdapter::Both { + sqs: sqs_adapter, + eventbridge: eb_adapter, + default_type, + }) + } + + pub async fn publish( + &self, + topic: impl Into, + payload: Value, + ) -> common::Result<()> { + match self { + AwsAdapter::Sqs(adapter) => adapter.publish(topic, payload).await, + AwsAdapter::EventBridge(adapter) => adapter.publish(topic, payload).await, + AwsAdapter::Both { sqs, eventbridge: _, default_type: _ } => { + sqs.publish(topic, payload).await + } + } + } + + pub async fn publish_with_type( + &self, + topic: impl Into, + payload: Value, + adapter_type: Option<&str>, + ) -> common::Result<()> { + let topic_str = topic.into(); + match self { + AwsAdapter::Sqs(adapter) => { + tracing::info!("AwsAdapter::publish_with_type: Using SQS adapter for topic: {}", topic_str); + adapter.publish(topic_str, payload).await + } + AwsAdapter::EventBridge(adapter) => { + tracing::info!("AwsAdapter::publish_with_type: Using EventBridge adapter for topic: {}", topic_str); + adapter.publish(topic_str, payload).await + } + AwsAdapter::Both { sqs, eventbridge, default_type } => { + let use_type = adapter_type + .map(|s| s.to_lowercase()) + .unwrap_or_else(|| match default_type { + AwsAdapterType::Sqs => "sqs".to_string(), + AwsAdapterType::EventBridge => "eventbridge".to_string(), + }); + + tracing::info!( + "AwsAdapter::publish_with_type: Both mode - requested: {:?}, using: {}, topic: {}", + adapter_type, + use_type, + topic_str + ); + + match use_type.as_str() { + "sqs" => { + tracing::info!("AwsAdapter::publish_with_type: Routing to SQS for topic: {}", topic_str); + sqs.publish(topic_str, payload).await + } + "eventbridge" => { + tracing::info!("AwsAdapter::publish_with_type: Routing to EventBridge for topic: {}", topic_str); + eventbridge.publish(topic_str, payload).await + } + _ => { + tracing::warn!( + "AwsAdapter::publish_with_type: Unknown adapter type '{}', falling back to default for topic: {}", + use_type, + topic_str + ); + match default_type { + AwsAdapterType::Sqs => sqs.publish(topic_str, payload).await, + AwsAdapterType::EventBridge => eventbridge.publish(topic_str, payload).await, + } + } + } + } + } + } + + pub async fn subscribe_fn(&self, topic: impl Into, handler: F) -> common::Result<()> + where + F: Fn(common::Message) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + self.subscribe_with_type(topic, handler, None).await + } + + pub async fn subscribe_with_type( + &self, + topic: impl Into, + handler: F, + adapter_type: Option<&str>, + ) -> common::Result<()> + where + F: Fn(common::Message) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let topic_str = topic.into(); + match self { + AwsAdapter::Sqs(adapter) => adapter.subscribe_fn(topic_str, handler).await, + AwsAdapter::EventBridge(adapter) => adapter.subscribe_fn(topic_str, handler).await, + AwsAdapter::Both { sqs, eventbridge, default_type } => { + let use_type = adapter_type + .map(|s| s.to_lowercase()) + .unwrap_or_else(|| match default_type { + AwsAdapterType::Sqs => "sqs".to_string(), + AwsAdapterType::EventBridge => "eventbridge".to_string(), + }); + + tracing::info!( + "AwsAdapter::subscribe_with_type: Both mode - requested: {:?}, using: {}, topic: {}", + adapter_type, + use_type, + topic_str + ); + + match use_type.as_str() { + "sqs" => { + tracing::info!("AwsAdapter::subscribe_with_type: Using SQS for subscription - topic: {}", topic_str); + sqs.subscribe_fn(topic_str, handler).await + } + "eventbridge" => { + tracing::info!("AwsAdapter::subscribe_with_type: Using EventBridge for subscription - topic: {}", topic_str); + eventbridge.subscribe_fn(topic_str, handler).await + } + _ => { + tracing::warn!( + "AwsAdapter::subscribe_with_type: Unknown adapter type '{}', falling back to default for topic: {}", + use_type, + topic_str + ); + match default_type { + AwsAdapterType::Sqs => sqs.subscribe_fn(topic_str, handler).await, + AwsAdapterType::EventBridge => eventbridge.subscribe_fn(topic_str, handler).await, + } + } + } + } + } + } + + pub async fn list_topics(&self) -> Vec { + match self { + AwsAdapter::Sqs(adapter) => adapter.list_topics().await, + AwsAdapter::EventBridge(adapter) => adapter.list_topics().await, + AwsAdapter::Both { sqs, eventbridge, default_type: _ } => { + let mut topics = sqs.list_topics().await; + let mut eb_topics = eventbridge.list_topics().await; + topics.append(&mut eb_topics); + topics.sort(); + topics.dedup(); + topics + } + } + } +} + diff --git a/crates/rohas-adapters/adapter-aws/src/sqs.rs b/crates/rohas-adapters/adapter-aws/src/sqs.rs new file mode 100644 index 0000000..f083466 --- /dev/null +++ b/crates/rohas-adapters/adapter-aws/src/sqs.rs @@ -0,0 +1,362 @@ +use crate::common::{AdapterError, Message, MessageHandler, Result}; +use aws_sdk_sqs::{ + types::{MessageAttributeValue, QueueAttributeName}, + Client as SqsClient, +}; +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +#[derive(Debug, Clone)] +pub struct SqsConfig { + pub region: String, + pub queue_prefix: Option, + pub visibility_timeout_seconds: Option, + pub message_retention_seconds: Option, + pub receive_wait_time_seconds: Option, // Long polling wait time +} + +impl Default for SqsConfig { + fn default() -> Self { + Self { + region: "us-east-1".to_string(), + queue_prefix: Some("rohas-".to_string()), + visibility_timeout_seconds: Some(30), + message_retention_seconds: Some(345600), // 4 days + receive_wait_time_seconds: Some(20), // Long polling + } + } +} + +pub struct SqsAdapter { + client: SqsClient, + config: SqsConfig, + queue_urls: Arc>>, // topic -> queue_url +} + +impl SqsAdapter { + pub async fn new(config: SqsConfig) -> Result { + let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(aws_sdk_sqs::config::Region::new(config.region.clone())) + .load() + .await; + + let client = SqsClient::new(&aws_config); + + info!( + "Initialized SQS adapter for region: {}", + config.region + ); + + Ok(Self { + client, + config, + queue_urls: Arc::new(RwLock::new(HashMap::new())), + }) + } + + async fn get_or_create_queue(&self, topic: &str) -> Result { + { + let queue_urls = self.queue_urls.read().await; + if let Some(url) = queue_urls.get(topic) { + return Ok(url.clone()); + } + } + + let queue_name = if let Some(prefix) = &self.config.queue_prefix { + format!("{}{}", prefix, topic) + } else { + topic.to_string() + }; + + let queue_name = queue_name + .chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c + } else { + '-' + } + }) + .collect::(); + + let get_queue_result = self + .client + .get_queue_url() + .queue_name(&queue_name) + .send() + .await; + + let queue_url = match get_queue_result { + Ok(response) => { + if let Some(url) = response.queue_url() { + info!("Found existing queue for topic '{}': {}", topic, url); + url.to_string() + } else { + return Err(AdapterError::QueueNotFound(queue_name)); + } + } + Err(_) => { + debug!("Queue '{}' not found, creating...", queue_name); + + let mut create_request = self.client.create_queue().queue_name(&queue_name); + + let mut attributes = HashMap::new(); + if let Some(visibility) = self.config.visibility_timeout_seconds { + attributes.insert( + QueueAttributeName::VisibilityTimeout, + visibility.to_string(), + ); + } + if let Some(retention) = self.config.message_retention_seconds { + attributes.insert( + QueueAttributeName::MessageRetentionPeriod, + retention.to_string(), + ); + } + if let Some(wait_time) = self.config.receive_wait_time_seconds { + attributes.insert( + QueueAttributeName::ReceiveMessageWaitTimeSeconds, + wait_time.to_string(), + ); + } + + if !attributes.is_empty() { + create_request = create_request.set_attributes(Some(attributes)); + } + + let create_result = create_request.send().await.map_err(|e| { + AdapterError::AwsSqs(format!("Failed to create queue '{}': {}", queue_name, e)) + })?; + + if let Some(url) = create_result.queue_url() { + info!("Created queue for topic '{}': {}", topic, url); + url.to_string() + } else { + return Err(AdapterError::AwsSqs(format!( + "Queue created but no URL returned for '{}'", + queue_name + ))); + } + } + }; + + { + let mut queue_urls = self.queue_urls.write().await; + queue_urls.insert(topic.to_string(), queue_url.clone()); + } + + Ok(queue_url) + } + + pub async fn publish( + &self, + topic: impl Into, + payload: serde_json::Value, + ) -> Result<()> { + let topic = topic.into(); + tracing::info!("SqsAdapter::publish: Starting publish for topic: {}", topic); + + let message = Message::new(topic.clone(), payload); + + let message_body = serde_json::to_string(&message) + .map_err(|e| { + tracing::error!("SqsAdapter::publish: Serialization error for topic {}: {}", topic, e); + AdapterError::Serialization(e) + })?; + + tracing::debug!("SqsAdapter::publish: Message serialized, getting/creating queue for topic: {}", topic); + + let queue_url = self.get_or_create_queue(&topic).await.map_err(|e| { + tracing::error!("SqsAdapter::publish: Failed to get/create queue for topic {}: {}", topic, e); + e + })?; + + tracing::info!("SqsAdapter::publish: Queue URL obtained: {} for topic: {}", queue_url, topic); + + let mut attributes = HashMap::new(); + attributes.insert( + "topic".to_string(), + MessageAttributeValue::builder() + .data_type("String") + .string_value(&topic) + .build() + .map_err(|e| AdapterError::AwsSqs(format!("Failed to build attribute: {}", e)))?, + ); + attributes.insert( + "timestamp".to_string(), + MessageAttributeValue::builder() + .data_type("String") + .string_value(&message.timestamp) + .build() + .map_err(|e| AdapterError::AwsSqs(format!("Failed to build attribute: {}", e)))?, + ); + + let send_result = self + .client + .send_message() + .queue_url(&queue_url) + .message_body(&message_body) + .set_message_attributes(Some(attributes)) + .send() + .await; + + match send_result { + Ok(response) => { + if let Some(message_id) = response.message_id() { + info!("Published message to SQS topic: {} (queue: {}, message_id: {})", topic, queue_url, message_id); + } else { + info!("Published message to SQS topic: {} (queue: {})", topic, queue_url); + } + Ok(()) + } + Err(e) => { + error!("Failed to send message to SQS queue '{}' for topic '{}': {}", queue_url, topic, e); + let error_msg = format!( + "Failed to send message to queue '{}': {}", + queue_url, e + ); + tracing::error!("SqsAdapter::publish: Error details - {}", error_msg); + Err(AdapterError::AwsSqs(error_msg)) + } + } + } + + pub async fn subscribe(&self, topic: impl Into, handler: Arc) -> Result<()> + where + H: MessageHandler + 'static, + { + let topic = topic.into(); + let queue_url = self.get_or_create_queue(&topic).await?; + + info!("Subscribing to topic: {} (queue: {})", topic, queue_url); + + let client = self.client.clone(); + let handler = handler.clone(); + let topic_clone = topic.clone(); + + tokio::spawn(async move { + info!("SQS subscription polling loop started for topic '{}' (queue: {})", topic_clone, queue_url); + let mut poll_count = 0u64; + loop { + poll_count += 1; + if poll_count % 5 == 0 { + info!("SQS polling loop still active for topic '{}' (poll #{}), queue: {}", topic_clone, poll_count, queue_url); + } else if poll_count <= 3 { + info!("SQS polling loop active for topic '{}' (poll #{}), queue: {}", topic_clone, poll_count, queue_url); + } else { + debug!("Polling SQS queue for topic '{}' (poll #{})...", topic_clone, poll_count); + } + let receive_result = client + .receive_message() + .queue_url(&queue_url) + .max_number_of_messages(10) + .wait_time_seconds(20) + .send() + .await; + + match receive_result { + Ok(response) => { + let messages = response.messages(); + if !messages.is_empty() { + info!("Received {} message(s) from SQS queue for topic '{}'", messages.len(), topic_clone); + for sqs_message in messages { + if let Some(body) = sqs_message.body() { + info!("Raw SQS message body for topic '{}': {}", topic_clone, body); + match serde_json::from_str::(body) { + Ok(message) => { + info!("Successfully parsed SQS message for topic '{}'", topic_clone); + info!("Message topic: {}, payload: {:?}", message.topic, message.payload); + info!("Calling handler for SQS message..."); + if let Err(e) = handler.handle(message).await { + error!("Handler error for SQS topic '{}': {}", topic_clone, e); + } else { + info!("Handler completed successfully for SQS topic '{}'", topic_clone); + } + + if let Some(receipt_handle) = sqs_message.receipt_handle() { + if let Err(e) = client + .delete_message() + .queue_url(&queue_url) + .receipt_handle(receipt_handle) + .send() + .await + { + warn!( + "Failed to delete message from queue '{}': {}", + queue_url, e + ); + } + } + } + Err(e) => { + error!( + "Failed to deserialize SQS message for topic '{}': {}. Body: {}", + topic_clone, e, body + ); + if let Some(receipt_handle) = sqs_message.receipt_handle() { + let _ = client + .delete_message() + .queue_url(&queue_url) + .receipt_handle(receipt_handle) + .send() + .await; + } + } + } + } + } + } else { + debug!("No messages received from SQS queue for topic '{}' (this is normal, continuing to poll...)", topic_clone); + } + } + Err(e) => { + error!( + "Error receiving messages from SQS queue '{}' for topic '{}': {}. Retrying in 5 seconds...", + queue_url, topic_clone, e + ); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + } + } + }); + + Ok(()) + } + + pub async fn subscribe_fn(&self, topic: impl Into, handler: F) -> Result<()> + where + F: Fn(Message) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + struct ClosureHandler + where + F: Fn(Message) -> Fut + Send + Sync, + Fut: std::future::Future> + Send, + { + func: F, + } + + #[async_trait] + impl MessageHandler for ClosureHandler + where + F: Fn(Message) -> Fut + Send + Sync, + Fut: std::future::Future> + Send, + { + async fn handle(&self, message: Message) -> Result<()> { + (self.func)(message).await + } + } + + let handler = Arc::new(ClosureHandler { func: handler }); + self.subscribe(topic, handler).await + } + + pub async fn list_topics(&self) -> Vec { + let queue_urls = self.queue_urls.read().await; + queue_urls.keys().cloned().collect() + } +} + diff --git a/crates/rohas-adapters/adapter-sqs/src/lib.rs b/crates/rohas-adapters/adapter-sqs/src/lib.rs deleted file mode 100644 index 0f381d9..0000000 --- a/crates/rohas-adapters/adapter-sqs/src/lib.rs +++ /dev/null @@ -1,13 +0,0 @@ -pub struct SqsAdapter; - -impl SqsAdapter { - pub fn new() -> Self { - Self - } -} - -impl Default for SqsAdapter { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/rohas-cli/src/commands/codegen.rs b/crates/rohas-cli/src/commands/codegen.rs index ab67943..c46e3ce 100644 --- a/crates/rohas-cli/src/commands/codegen.rs +++ b/crates/rohas-cli/src/commands/codegen.rs @@ -77,7 +77,7 @@ pub async fn execute( // Generate code generate(&schema, &output_path, language)?; - info!("✓ Code generation completed successfully!"); + info!("Code generation completed successfully!"); info!(" Output directory: {}", output_path.display()); Ok(()) diff --git a/crates/rohas-cli/src/commands/dev.rs b/crates/rohas-cli/src/commands/dev.rs index c6c553c..168fae7 100644 --- a/crates/rohas-cli/src/commands/dev.rs +++ b/crates/rohas-cli/src/commands/dev.rs @@ -108,7 +108,7 @@ async fn find_or_init_workbench() -> Result { match init_workbench().await { Ok(workbench_dir) => { if workbench_dir.exists() && workbench_dir.join("package.json").exists() { - info!("✓ Workbench initialized successfully"); + info!("Workbench initialized successfully"); return Ok(workbench_dir); } } @@ -223,7 +223,7 @@ async fn start_workbench( anyhow::bail!("Failed to build workbench"); } - info!("✓ Workbench built successfully"); + info!("Workbench built successfully"); info!("Starting workbench server on port {}...", workbench_port); diff --git a/crates/rohas-cli/src/commands/init.rs b/crates/rohas-cli/src/commands/init.rs index bc21fe9..816de62 100644 --- a/crates/rohas-cli/src/commands/init.rs +++ b/crates/rohas-cli/src/commands/init.rs @@ -144,7 +144,7 @@ Rohas project initialized with {} handlers. fs::write(project_dir.join("README.md"), readme)?; - info!("✓ Project '{}' created successfully!", name); + info!("Project '{}' created successfully!", name); info!(" Run 'cd {}' to enter the project directory", name); info!(" Run 'rohas codegen' to generate code"); info!(" Run 'rohas dev' to start the development server"); diff --git a/crates/rohas-cli/src/commands/validate.rs b/crates/rohas-cli/src/commands/validate.rs index fd4baa5..3bb098a 100644 --- a/crates/rohas-cli/src/commands/validate.rs +++ b/crates/rohas-cli/src/commands/validate.rs @@ -17,7 +17,7 @@ pub async fn execute(schema_path: PathBuf) -> Result<()> { schema.validate()?; - info!("✓ Schema validation passed!"); + info!("Schema validation passed!"); info!(" - {} models", schema.models.len()); info!(" - {} inputs", schema.inputs.len()); info!(" - {} APIs", schema.apis.len()); diff --git a/crates/rohas-dev-server/src/lib.rs b/crates/rohas-dev-server/src/lib.rs index 028f85b..b435585 100644 --- a/crates/rohas-dev-server/src/lib.rs +++ b/crates/rohas-dev-server/src/lib.rs @@ -136,7 +136,7 @@ impl DevServer { if let Err(e) = rohas_engine::tracing_log::register_tracing_log_layer(layer) { warn!("Failed to register tracing log layer: {}", e); } else { - info!("✓ Tracing log layer registered"); + info!("Tracing log layer registered"); } engine.initialize().await?; @@ -144,7 +144,7 @@ impl DevServer { let mut engine_lock = self.engine.write().await; *engine_lock = Some(engine); - info!("✓ Engine loaded and initialized"); + info!("Engine loaded and initialized"); Ok(()) } @@ -286,7 +286,7 @@ impl DevServer { match reload_result { Ok(_) => { - info!("✓ Engine reloaded successfully"); + info!("Engine reloaded successfully"); info!("Starting new HTTP server with updated schema..."); let engine = self.engine.clone(); @@ -306,7 +306,7 @@ impl DevServer { if !new_handle.is_finished() { server_handle = Some(new_handle); - info!("✓ HTTP server restarted with new schema on port {}", port); + info!("HTTP server restarted with new schema on port {}", port); } else { match new_handle.await { Ok(_) => { @@ -336,7 +336,7 @@ impl DevServer { if let Err(e) = eng.clear_handler_cache().await { error!("Failed to clear handler cache: {}", e); } else { - info!("✓ Handler cache cleared"); + info!("Handler cache cleared"); } } } else { @@ -344,7 +344,7 @@ impl DevServer { match self.reload_typescript_handler().await { Ok(_) => { - info!("✓ Handler reloaded successfully"); + info!("Handler reloaded successfully"); } Err(e) => { error!("Failed to reload handler: {}", e); @@ -394,7 +394,7 @@ impl DevServer { rohas_codegen::generate(schema, &output_dir, lang)?; - info!("✓ Codegen completed"); + info!("Codegen completed"); Ok(()) } diff --git a/crates/rohas-dev-server/src/ts_compiler.rs b/crates/rohas-dev-server/src/ts_compiler.rs index 2733dbb..00f383c 100644 --- a/crates/rohas-dev-server/src/ts_compiler.rs +++ b/crates/rohas-dev-server/src/ts_compiler.rs @@ -30,7 +30,7 @@ impl TypeScriptCompiler { anyhow::bail!("TypeScript compilation failed"); } - info!("✓ TypeScript compilation completed"); + info!("TypeScript compilation completed"); Ok(()) } @@ -47,7 +47,7 @@ impl TypeScriptCompiler { .stderr(Stdio::inherit()) .spawn()?; - info!("✓ TypeScript watch mode started"); + info!("TypeScript watch mode started"); self.watch_process = Some(child); Ok(()) } @@ -57,7 +57,7 @@ impl TypeScriptCompiler { info!("Stopping TypeScript watch mode..."); child.kill()?; child.wait()?; - info!("✓ TypeScript watch mode stopped"); + info!("TypeScript watch mode stopped"); } Ok(()) } @@ -97,7 +97,7 @@ impl TypeScriptCompiler { anyhow::bail!("npm install failed"); } - info!("✓ npm dependencies installed"); + info!("npm dependencies installed"); Ok(()) } diff --git a/crates/rohas-engine/Cargo.toml b/crates/rohas-engine/Cargo.toml index 070fe89..2f19759 100644 --- a/crates/rohas-engine/Cargo.toml +++ b/crates/rohas-engine/Cargo.toml @@ -11,6 +11,7 @@ rohas-runtime = { workspace = true } rohas-cron = { workspace = true } rohas-codegen = { workspace = true } adapter-memory = { workspace = true } +adapter-aws = { workspace = true } rohas-telemetry = { workspace = true } adapter-rocksdb = { workspace = true } diff --git a/crates/rohas-engine/src/adapter.rs b/crates/rohas-engine/src/adapter.rs new file mode 100644 index 0000000..6268050 --- /dev/null +++ b/crates/rohas-engine/src/adapter.rs @@ -0,0 +1,110 @@ +use crate::error::Result; +use serde_json::Value; +use std::sync::Arc; + +/// Enum wrapper for different adapter types +pub enum Adapter { + Memory(Arc), + Aws(Arc), +} + +impl Adapter { + /// Publish a message to a topic + pub async fn publish(&self, topic: impl Into, payload: Value) -> Result<()> { + self.publish_with_type(topic, payload, None).await + } + + /// Publish a message to a topic with optional adapter type override + pub async fn publish_with_type( + &self, + topic: impl Into, + payload: Value, + adapter_type: Option<&str>, + ) -> Result<()> { + let topic_str = topic.into(); + match self { + Adapter::Memory(adapter) => { + tracing::debug!("Publishing to Memory adapter - topic: {}", topic_str); + adapter.publish(topic_str, payload) + .await + .map_err(|e| crate::error::EngineError::Adapter(e.to_string())) + } + Adapter::Aws(adapter) => { + let topic_clone = topic_str.clone(); + if let Some(adapter_type) = adapter_type { + tracing::info!("Publishing to AWS adapter (type: {}) - topic: {}", adapter_type, topic_str); + } else { + tracing::info!("Publishing to AWS adapter - topic: {}", topic_str); + } + adapter.publish_with_type(topic_str, payload, adapter_type) + .await + .map_err(|e| { + tracing::error!("AWS adapter publish failed for topic {}: {}", topic_clone, e); + crate::error::EngineError::Adapter(e.to_string()) + }) + } + } + } + + /// Subscribe to a topic with a closure handler + pub async fn subscribe_fn(&self, topic: impl Into, handler: F) -> Result<()> + where + F: Fn(adapter_memory::Message) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + self.subscribe_with_type(topic, handler, None).await + } + + /// Subscribe to a topic with a closure handler and optional adapter type + pub async fn subscribe_with_type( + &self, + topic: impl Into, + handler: F, + adapter_type: Option<&str>, + ) -> Result<()> + where + F: Fn(adapter_memory::Message) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + match self { + Adapter::Memory(adapter) => { + adapter.subscribe_fn(topic, move |msg| { + let fut = handler(msg); + async move { + fut.await.map_err(|e| { + adapter_memory::AdapterError::ChannelError(e.to_string()) + }) + } + }) + .await + .map_err(|e| crate::error::EngineError::Adapter(e.to_string())) + } + Adapter::Aws(adapter) => { + // Convert adapter_memory::Message to adapter_aws::Message + adapter.subscribe_with_type(topic, move |aws_msg| { + let fut = handler(adapter_memory::Message { + topic: aws_msg.topic, + payload: aws_msg.payload, + timestamp: aws_msg.timestamp, + metadata: aws_msg.metadata, + }); + async move { + fut.await.map_err(|e| { + adapter_aws::common::AdapterError::AwsSqs(e.to_string()) + }) + } + }, adapter_type) + .await + .map_err(|e| crate::error::EngineError::Adapter(e.to_string())) + } + } + } + + /// Get list of all topics + pub async fn list_topics(&self) -> Vec { + match self { + Adapter::Memory(adapter) => adapter.list_topics().await, + Adapter::Aws(adapter) => adapter.list_topics().await, + } + } +} diff --git a/crates/rohas-engine/src/api.rs b/crates/rohas-engine/src/api.rs index a2fede9..e2737b7 100644 --- a/crates/rohas-engine/src/api.rs +++ b/crates/rohas-engine/src/api.rs @@ -316,7 +316,7 @@ async fn api_handler( &api_name, ) .await; - + if let Err(e) = middleware_result { state .trace_store @@ -324,7 +324,7 @@ async fn api_handler( .await; return Err(ApiError::BadRequest(e)); } - + let (final_payload, final_query_params) = middleware_result.unwrap(); let result = execute_handler( diff --git a/crates/rohas-engine/src/config.rs b/crates/rohas-engine/src/config.rs index fd4f289..01fa1d8 100644 --- a/crates/rohas-engine/src/config.rs +++ b/crates/rohas-engine/src/config.rs @@ -106,7 +106,14 @@ pub enum AdapterType { Nats { url: String }, Kafka { brokers: String }, RabbitMQ { url: String }, - Sqs, + Aws { + region: String, + #[serde(rename = "aws_type")] + aws_type: String, // "sqs" or "eventbridge" + queue_prefix: Option, // For SQS + event_bus_name: Option, // For EventBridge + source: Option, // For EventBridge + }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -225,6 +232,13 @@ struct TomlAdapter { #[serde(rename = "type")] adapter_type: String, buffer_size: usize, + // AWS-specific fields + region: Option, + #[serde(rename = "aws_type")] + aws_type: Option, // "sqs" or "eventbridge" + queue_prefix: Option, // For SQS + event_bus_name: Option, // For EventBridge + source: Option, // For EventBridge } #[derive(Debug, Deserialize)] @@ -255,6 +269,28 @@ impl TomlConfig { let adapter_type = match self.adapter.adapter_type.to_lowercase().as_str() { "memory" => AdapterType::Memory, + "aws" => { + let aws_type = self.adapter.aws_type.as_deref() + .unwrap_or("sqs") + .to_lowercase(); + if aws_type != "sqs" && aws_type != "eventbridge" { + anyhow::bail!("Unsupported AWS adapter type: {}. Must be 'sqs' or 'eventbridge'", aws_type); + } + AdapterType::Aws { + region: self.adapter.region.unwrap_or_else(|| "us-east-1".to_string()), + aws_type, + queue_prefix: self.adapter.queue_prefix, + event_bus_name: self.adapter.event_bus_name, + source: self.adapter.source, + } + } + "sqs" => AdapterType::Aws { + region: self.adapter.region.unwrap_or_else(|| "us-east-1".to_string()), + aws_type: "sqs".to_string(), + queue_prefix: self.adapter.queue_prefix, + event_bus_name: None, + source: None, + }, _ => anyhow::bail!("Unsupported adapter type: {}", self.adapter.adapter_type), }; diff --git a/crates/rohas-engine/src/engine.rs b/crates/rohas-engine/src/engine.rs index 4d75953..777a2c4 100644 --- a/crates/rohas-engine/src/engine.rs +++ b/crates/rohas-engine/src/engine.rs @@ -1,9 +1,9 @@ +use crate::adapter::Adapter; use crate::api; use crate::config::EngineConfig; use crate::error::{EngineError, Result}; use crate::event::EventBus; use crate::router; -use adapter_memory::MemoryAdapter; use rohas_cron::{JobConfig, Scheduler}; use rohas_parser::{Parser, Schema}; use rohas_runtime::{Executor, RuntimeConfig}; @@ -20,7 +20,7 @@ pub struct Engine { executor: Arc, event_bus: Arc, scheduler: Arc, - adapter: Arc, + adapter: Arc, trace_store: Arc, tracing_log_store: Arc, telemetry: Arc, @@ -78,7 +78,42 @@ impl Engine { let trace_store = Arc::new(crate::telemetry::TraceStore::new(telemetry.clone())); let tracing_log_store = Arc::new(crate::tracing_log::TracingLogStore::new(1000)); // Keep last 1000 logs - let adapter = Arc::new(MemoryAdapter::new(config.adapter.buffer_size)); + // Create adapter based on configuration + let adapter = Arc::new(match &config.adapter.adapter_type { + crate::config::AdapterType::Memory => { + info!("Using Memory adapter for event bus"); + Adapter::Memory(Arc::new(adapter_memory::MemoryAdapter::new(config.adapter.buffer_size))) + } + crate::config::AdapterType::Aws { region, aws_type, queue_prefix, event_bus_name, source } => { + info!("Initializing AWS adapter - region: {}, default type: {}", region, aws_type); + let adapter_type = match aws_type.as_str() { + "sqs" => adapter_aws::AwsAdapterType::Sqs, + "eventbridge" => adapter_aws::AwsAdapterType::EventBridge, + _ => return Err(EngineError::Initialization(format!("Unsupported AWS adapter type: {}", aws_type))), + }; + let aws_config = adapter_aws::AwsConfig { + region: region.clone(), + queue_prefix: queue_prefix.clone(), + event_bus_name: event_bus_name.clone(), + source: source.clone(), + ..Default::default() + }; + let aws_adapter = adapter_aws::AwsAdapter::new_with_both(adapter_type, aws_config) + .await + .map_err(|e| EngineError::Initialization(format!("Failed to initialize AWS adapter: {}", e)))?; + info!("AWS adapter (both SQS and EventBridge) initialized successfully with default type: {}", aws_type); + Adapter::Aws(Arc::new(aws_adapter)) + } + crate::config::AdapterType::Nats { .. } => { + return Err(EngineError::Initialization("NATS adapter not yet implemented".to_string())); + } + crate::config::AdapterType::Kafka { .. } => { + return Err(EngineError::Initialization("Kafka adapter not yet implemented".to_string())); + } + crate::config::AdapterType::RabbitMQ { .. } => { + return Err(EngineError::Initialization("RabbitMQ adapter not yet implemented".to_string())); + } + }); let event_bus = Arc::new(EventBus::new( adapter.clone(), diff --git a/crates/rohas-engine/src/error.rs b/crates/rohas-engine/src/error.rs index cef4156..21590c2 100644 --- a/crates/rohas-engine/src/error.rs +++ b/crates/rohas-engine/src/error.rs @@ -14,7 +14,7 @@ pub enum EngineError { Cron(#[from] rohas_cron::CronError), #[error("Adapter error: {0}")] - Adapter(#[from] adapter_memory::AdapterError), + Adapter(String), #[error("Configuration error: {0}")] Config(String), diff --git a/crates/rohas-engine/src/event.rs b/crates/rohas-engine/src/event.rs index 1a227dd..f745935 100644 --- a/crates/rohas-engine/src/event.rs +++ b/crates/rohas-engine/src/event.rs @@ -1,14 +1,14 @@ +use crate::adapter::Adapter; use crate::error::{EngineError, Result}; use crate::trace::{TraceEntryType, TraceStatus, TriggeredEventInfo}; use crate::telemetry::TraceStore; -use adapter_memory::MemoryAdapter; use rohas_parser::{Event as SchemaEvent, Schema}; use rohas_runtime::Executor; use std::sync::Arc; -use tracing::{debug, info}; +use tracing::{debug, error, info, warn}; pub struct EventBus { - adapter: Arc, + adapter: Arc, executor: Arc, schema: Arc, trace_store: Arc, @@ -16,7 +16,7 @@ pub struct EventBus { impl EventBus { pub fn new( - adapter: Arc, + adapter: Arc, executor: Arc, schema: Arc, trace_store: Arc, @@ -31,9 +31,19 @@ impl EventBus { pub async fn initialize(&self) -> Result<()> { info!("Initializing event bus"); + info!("Total events in schema: {}", self.schema.events.len()); for event in &self.schema.events { - self.subscribe_event(event).await?; + info!("Processing event: {} (adapter_type: {:?})", event.name, event.adapter_type); + match self.subscribe_event(event).await { + Ok(_) => { + info!("Successfully subscribed to event: {}", event.name); + } + Err(e) => { + error!("Failed to subscribe to event '{}': {}", event.name, e); + return Err(e); + } + } } info!( @@ -51,11 +61,19 @@ impl EventBus { let executor = self.executor.clone(); let adapter = self.adapter.clone(); let trace_store = self.trace_store.clone(); + let schema = self.schema.clone(); + + let adapter_type = event.adapter_type.as_deref(); - debug!("Subscribing to event: {}", event_name); + if let Some(adapter_type) = adapter_type { + info!("Subscribing to event: {} (via {})", event_name, adapter_type); + } else { + debug!("Subscribing to event: {}", event_name); + } + let adapter_type_clone = adapter_type; self.adapter - .subscribe_fn(event_name.clone(), move |msg| { + .subscribe_with_type(event_name.clone(), move |msg: adapter_memory::Message| { let handlers = handlers.clone(); let triggers = triggers.clone(); let executor = executor.clone(); @@ -63,6 +81,7 @@ impl EventBus { let event_name = event_name.clone(); let event_payload_type = event_payload_type.clone(); let trace_store = trace_store.clone(); + let schema = schema.clone(); async move { let span = tracing::info_span!( @@ -71,7 +90,9 @@ impl EventBus { ); let _enter = span.enter(); - info!("Received event: {}", event_name); + info!("=== Received event: {} ===", event_name); + info!("Event payload: {:?}", msg.payload); + info!("Event handlers to execute: {:?}", handlers); let mut metadata = std::collections::HashMap::new(); metadata.insert("event".to_string(), event_name.clone()); @@ -90,7 +111,7 @@ impl EventBus { ); let _enter = handler_span.enter(); - debug!("Executing handler: {}", handler_name); + info!("Executing handler: {} for event: {}", handler_name, event_name); let mut handler_context = rohas_runtime::HandlerContext::new(handler_name, msg.payload.clone()); @@ -155,14 +176,25 @@ impl EventBus { let mut triggered_events: Vec = Vec::new(); for trigger in &triggers { - debug!("Triggering downstream event: {}", trigger); + info!("Triggering downstream event: {}", trigger); let trigger_start = std::time::Instant::now(); - let publish_result = adapter.publish(trigger, msg.payload.clone()).await; + let trigger_event = schema.events.iter().find(|e| e.name == *trigger); + let adapter_type = trigger_event.and_then(|e| e.adapter_type.as_deref()); + let publish_result = adapter.publish_with_type(trigger, msg.payload.clone(), adapter_type).await; let trigger_duration = trigger_start.elapsed().as_millis() as u64; let trigger_timestamp = chrono::Utc::now().to_rfc3339(); - if let Err(e) = publish_result { - tracing::error!("Failed to trigger event {}: {}", trigger, e); + match publish_result { + Ok(_) => { + if let Some(adapter_type) = adapter_type { + info!("Successfully triggered event: {} (via {})", trigger, adapter_type); + } else { + info!("Successfully triggered event: {}", trigger); + } + } + Err(e) => { + tracing::error!("Failed to trigger event {}: {}", trigger, e); + } } triggered_events.push(TriggeredEventInfo { @@ -199,7 +231,7 @@ impl EventBus { Ok(()) } - }) + }, adapter_type_clone) .await?; Ok(()) @@ -211,13 +243,24 @@ impl EventBus { payload: serde_json::Value, ) -> Result<()> { let event_name = event_name.into(); - debug!("Emitting event: {}", event_name); + info!("Emitting event: {}", event_name); - self.adapter - .publish(event_name.clone(), payload) - .await - .map_err(|e| EngineError::EventDispatch(format!("Failed to emit event: {}", e)))?; + let event = self.schema.events.iter().find(|e| e.name == event_name); + let adapter_type = event.and_then(|e| e.adapter_type.as_deref()); - Ok(()) + match self.adapter.publish_with_type(event_name.clone(), payload, adapter_type).await { + Ok(_) => { + if let Some(adapter_type) = adapter_type { + info!("Successfully emitted event: {} (via {})", event_name, adapter_type); + } else { + info!("Successfully emitted event: {}", event_name); + } + Ok(()) + } + Err(e) => { + tracing::error!("Failed to emit event {}: {}", event_name, e); + Err(EngineError::EventDispatch(format!("Failed to emit event: {}", e))) + } + } } } diff --git a/crates/rohas-engine/src/lib.rs b/crates/rohas-engine/src/lib.rs index 7fd6bf2..21e96ed 100644 --- a/crates/rohas-engine/src/lib.rs +++ b/crates/rohas-engine/src/lib.rs @@ -1,3 +1,4 @@ +pub mod adapter; pub mod api; pub mod config; pub mod engine; diff --git a/crates/rohas-parser/src/ast.rs b/crates/rohas-parser/src/ast.rs index 94c15f4..6ead403 100644 --- a/crates/rohas-parser/src/ast.rs +++ b/crates/rohas-parser/src/ast.rs @@ -195,6 +195,8 @@ pub struct Event { pub payload: String, pub handlers: Vec, pub triggers: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub adapter_type: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/crates/rohas-parser/src/parser.rs b/crates/rohas-parser/src/parser.rs index cf14bc2..db8e936 100644 --- a/crates/rohas-parser/src/parser.rs +++ b/crates/rohas-parser/src/parser.rs @@ -230,18 +230,34 @@ impl Parser { let mut payload = String::new(); let mut handlers = Vec::new(); let mut triggers = Vec::new(); + let mut adapter_type = None; for prop in inner { if prop.as_rule() == Rule::event_property { + let prop_text = prop.as_str(); let mut prop_inner = prop.into_inner(); if let Some(value) = prop_inner.next() { match value.as_rule() { - Rule::ident => payload = value.as_str().to_string(), + Rule::ident => { + if prop_text.starts_with("payload:") { + payload = value.as_str().to_string(); + } else if prop_text.starts_with("type:") { + let type_str = value.as_str().to_lowercase(); + if type_str == "sqs" || type_str == "eventbridge" { + adapter_type = Some(type_str); + } else { + return Err(ParseError::InvalidEvent(format!( + "Invalid adapter type '{}' for event '{}'. Must be 'sqs' or 'eventbridge'", + value.as_str(), name + ))); + } + } + } Rule::handler_list | Rule::trigger_list => { let items = Self::parse_string_list(value)?; - if handlers.is_empty() { + if prop_text.starts_with("handler:") { handlers = items; - } else { + } else if prop_text.starts_with("triggers:") { triggers = items; } } @@ -256,6 +272,7 @@ impl Parser { payload, handlers, triggers, + adapter_type, }) } diff --git a/crates/rohas-parser/src/rohas.pest b/crates/rohas-parser/src/rohas.pest index 3ddd164..55be8a4 100644 --- a/crates/rohas-parser/src/rohas.pest +++ b/crates/rohas-parser/src/rohas.pest @@ -48,6 +48,7 @@ event_property = { ("payload:" ~ ident) | ("handler:" ~ handler_list) | ("triggers:" ~ trigger_list) + | ("type:" ~ ident) // AWS adapter type: "sqs" or "eventbridge" } handler_list = { "[" ~ ident ~ ("," ~ ident)* ~ "]" } diff --git a/crates/rohas-runtime/src/node_runtime.rs b/crates/rohas-runtime/src/node_runtime.rs index e2bf426..77403e2 100644 --- a/crates/rohas-runtime/src/node_runtime.rs +++ b/crates/rohas-runtime/src/node_runtime.rs @@ -238,6 +238,9 @@ impl NodeRuntime { let handler_name_escaped = handler_name .replace('\\', "\\\\") .replace('\'', "\\'"); + let handle_name_escaped = format!("handle_{}", handler_name) + .replace('\\', "\\\\") + .replace('\'', "\\'"); format!( r#" @@ -341,23 +344,38 @@ impl NodeRuntime { }} else if (module.exports && typeof module.exports === 'object') {{ // Try CommonJS exports (exports.handler or exports.handleXxx) const exportKeys = Object.keys(module.exports); - - // Look for any exported function (handleXxx, handler, default) - for (const key of exportKeys) {{ - if (typeof module.exports[key] === 'function') {{ - handlerFn = module.exports[key]; - break; + + // For event handlers, try "handle_" first, then any exported function + const handleName = '{}'; + if (module.exports[handleName] && typeof module.exports[handleName] === 'function') {{ + handlerFn = module.exports[handleName]; + }} else {{ + // Look for any exported function (handleXxx, handler, default) + for (const key of exportKeys) {{ + if (typeof module.exports[key] === 'function') {{ + handlerFn = module.exports[key]; + break; + }} }} }} }} - // Fallback to global handler function - if (!handlerFn && typeof handler !== 'undefined') {{ - handlerFn = handler; + // Fallback to global handler function (try handle_ first, then handler) + if (!handlerFn) {{ + const handleName = '{}'; + if (typeof window !== 'undefined' && typeof window[handleName] === 'function') {{ + handlerFn = window[handleName]; + }} else if (typeof global !== 'undefined' && typeof global[handleName] === 'function') {{ + handlerFn = global[handleName]; + }} else if (typeof {} !== 'undefined') {{ + handlerFn = {}; + }} else if (typeof handler !== 'undefined') {{ + handlerFn = handler; + }} }} if (!handlerFn) {{ - throw new Error('Handler not found: No exported function or global handler'); + throw new Error('Handler not found: No exported function or global handler. Tried: {}, handler, and module.exports'); }} // Execute handler - pass state if handler accepts 2 parameters @@ -390,7 +408,7 @@ impl NodeRuntime { }} }})() "#, - handler_code, context_escaped, handler_name_escaped + handler_code, context_escaped, handler_name_escaped, handle_name_escaped, handle_name_escaped, handle_name_escaped, handle_name_escaped, handle_name_escaped ) } diff --git a/crates/rohas-runtime/src/python_runtime.rs b/crates/rohas-runtime/src/python_runtime.rs index fa36ee0..737a6ba 100644 --- a/crates/rohas-runtime/src/python_runtime.rs +++ b/crates/rohas-runtime/src/python_runtime.rs @@ -173,7 +173,19 @@ impl PythonRuntime { .unwrap_or(false); let function_name = if is_event_handler || is_websocket_handler { - handler_name.to_string() + let direct_name = handler_name.to_string(); + let handle_name = format!("handle_{}", handler_name); + + match module.hasattr(handle_name.as_str()) { + Ok(true) => { + debug!("Using function name '{}' for handler '{}'", handle_name, handler_name); + handle_name + } + _ => { + debug!("Using function name '{}' for handler '{}' (handle_ variant not found)", direct_name, handler_name); + direct_name + } + } } else if is_middleware { format!("{}_middleware", templates::to_snake_case(handler_name)) } else { diff --git a/examples/hello-world/config/rohas.toml b/examples/hello-world/config/rohas.toml index 77daa40..3125d92 100644 --- a/examples/hello-world/config/rohas.toml +++ b/examples/hello-world/config/rohas.toml @@ -9,8 +9,15 @@ port = 4400 enable_cors = true [adapter] -type = "memory" +type = "aws" buffer_size = 1000 +region = "ap-southeast-1" +aws_type = "sqs" # or "eventbridge" +queue_prefix = "rohas-hello-world" +# For EventBridge: +# event_bus_name = "my-event-bus" # Optional, defaults to "default" +# source = "rohas" # Optional, defaults to "rohas" + [telemetry] # Telemetry adapter type: rocksdb (default), prometheus, influxdb, timescaledb diff --git a/examples/hello-world/pyproject.toml b/examples/hello-world/pyproject.toml index 273b3f2..54f2878 100644 --- a/examples/hello-world/pyproject.toml +++ b/examples/hello-world/pyproject.toml @@ -1,5 +1,5 @@ [project] -name = "rohas-app" +name = "hello-world" version = "0.1.0" description = "Rohas event-driven application" requires-python = ">=3.9" diff --git a/examples/hello-world/schema/api/test_ws.ro b/examples/hello-world/schema/api/test_ws.ro index c7e26c4..e2fbc0e 100644 --- a/examples/hello-world/schema/api/test_ws.ro +++ b/examples/hello-world/schema/api/test_ws.ro @@ -6,5 +6,5 @@ ws HelloWorld { onDisconnect: [on_disconnect_handler] triggers: [UserCreated] broadcast: true - middlewares: [auth, rate_limit, logging] + middlewares: [auth, rate_limit, request_logging] } diff --git a/examples/hello-world/schema/api/user_api.ro b/examples/hello-world/schema/api/user_api.ro index 40db2a5..57c40e3 100644 --- a/examples/hello-world/schema/api/user_api.ro +++ b/examples/hello-world/schema/api/user_api.ro @@ -15,8 +15,8 @@ api Test { method: GET path: "/test" response: String - triggers: [UserCreated] - middlewares: [auth, rate_limit, logging] + triggers: [BottleneckDetected] + middlewares: [auth, rate_limit, request_logging] } diff --git a/examples/hello-world/schema/events/timeline_events.ro b/examples/hello-world/schema/events/timeline_events.ro index a684a4d..02260c9 100644 --- a/examples/hello-world/schema/events/timeline_events.ro +++ b/examples/hello-world/schema/events/timeline_events.ro @@ -1,11 +1,13 @@ event FastCompleted { payload: Json handler: [fast_completed] + type: sqs } event SlowCompleted { payload: Json handler: [slow_completed] + type: sqs } event VerySlowCompleted { @@ -16,6 +18,7 @@ event VerySlowCompleted { event BottleneckDetected { payload: Json handler: [bottleneck_detected] + type: sqs } event MajorBottleneckDetected { diff --git a/examples/hello-world/src/handlers/api/test.py b/examples/hello-world/src/handlers/api/test.py index d1b29de..decace8 100644 --- a/examples/hello-world/src/handlers/api/test.py +++ b/examples/hello-world/src/handlers/api/test.py @@ -32,4 +32,4 @@ async def handle_test(req: TestRequest, state: State) -> TestResponse: state.logger.debug('Hello, world!') state.logger.trace('Hello, world!') - return TestResponse(data="Hello, world!") + return TestResponse(data="Hello, world!s") diff --git a/examples/hello-world/src/handlers/events/bottleneck_detected.py b/examples/hello-world/src/handlers/events/bottleneck_detected.py index 3ca1b91..49c05a4 100644 --- a/examples/hello-world/src/handlers/events/bottleneck_detected.py +++ b/examples/hello-world/src/handlers/events/bottleneck_detected.py @@ -5,6 +5,7 @@ async def handle_bottleneck_detected(event: BottleneckDetected, state: State) -> None: """Event handler for bottleneck detection - logs the bottleneck""" await asyncio.sleep(0.1) # 100ms + print(f'Bottleneck detected: {event.payload}') state.logger.warning(f'Bottleneck detected: {event.payload}') state.trigger_event('BottleneckLogged', {'operation': event.payload.get('operation', 'unknown')}) diff --git a/examples/hello-world/src/middlewares/logging.py b/examples/hello-world/src/middlewares/request_logging.py similarity index 84% rename from examples/hello-world/src/middlewares/logging.py rename to examples/hello-world/src/middlewares/request_logging.py index 3137b65..8158fe8 100644 --- a/examples/hello-world/src/middlewares/logging.py +++ b/examples/hello-world/src/middlewares/request_logging.py @@ -1,9 +1,9 @@ from typing import Dict, Any, Optional from generated.state import State -async def logging_middleware(context: Dict[str, Any], state: State) -> Optional[Dict[str, Any]]: +async def request_logging_middleware(context: Dict[str, Any], state: State) -> Optional[Dict[str, Any]]: """ - Middleware function for logging. + Middleware function for request_logging. Args: context: Request context containing: @@ -19,22 +19,22 @@ async def logging_middleware(context: Dict[str, Any], state: State) -> Optional[ Optional[Dict[str, Any]]: Modified context with 'payload' and/or 'query_params' keys, or None to pass through unchanged. Return a dict with 'error' key to reject the request. - To reject the request, raise an exception + To reject the request, raise an exception """ # TODO: Implement middleware logic # Example: Validate authentication # Example: Rate limiting # Example: Logging # Example: Modify payload/query_params - # + # # To modify the request: # return { # 'payload': modified_payload, # 'query_params': modified_query_params # } - # + # # To reject the request: # raise Exception('Access denied') - + # Pass through unchanged return None