From 441a2bff7f48ce0cb468307d5bbf8a4a1a585d89 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Thu, 14 Mar 2024 13:53:24 -0400 Subject: [PATCH 01/10] feat(subscriptions): sub_blocks, watch_contract_event --- Cargo.toml | 1 + examples/subscriptions/Cargo.toml | 32 +++++++++ .../examples/subscribe_blocks.rs | 35 +++++++++ .../examples/watch_contract_event.rs | 72 +++++++++++++++++++ 4 files changed, 140 insertions(+) create mode 100644 examples/subscriptions/Cargo.toml create mode 100644 examples/subscriptions/examples/subscribe_blocks.rs create mode 100644 examples/subscriptions/examples/watch_contract_event.rs diff --git a/Cargo.toml b/Cargo.toml index 72c79195..d027b9b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ license = "MIT OR Apache-2.0" homepage = "https://github.com/alloy-rs/examples" repository = "https://github.com/alloy-rs/examples" publish = false +exclude = ["examples/"] [workspace.dependencies] alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "d5967ab", default-features = false } diff --git a/examples/subscriptions/Cargo.toml b/examples/subscriptions/Cargo.toml new file mode 100644 index 00000000..2b686cc8 --- /dev/null +++ b/examples/subscriptions/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "examples-subscriptions" +version = "0.0.0" +publish = false + +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dev-dependencies] +alloy-primitives.workspace = true +alloy-sol-types = { workspace = true, features = ["json"] } +alloy-provider = { workspace = true, features = ["pubsub", "ws"] } +alloy-contract.workspace = true +alloy-pubsub.workspace = true +alloy-rpc-types.workspace = true +alloy-rpc-client.workspace = true +alloy-rpc-trace-types.workspace = true +alloy-node-bindings.workspace = true +alloy-transport.workspace = true +alloy-transport-http.workspace = true +alloy-network.workspace = true +tokio = { version = "1.36.0", features = ["rt-multi-thread", "macros"] } +eyre = "0.6.12" +reqwest = "0.11.26" +futures-util = "0.3" diff --git a/examples/subscriptions/examples/subscribe_blocks.rs b/examples/subscriptions/examples/subscribe_blocks.rs new file mode 100644 index 00000000..c9e3bb41 --- /dev/null +++ b/examples/subscriptions/examples/subscribe_blocks.rs @@ -0,0 +1,35 @@ +use alloy_network::Ethereum; +use alloy_node_bindings::{Anvil, AnvilInstance}; +use alloy_provider::{Provider, RootProvider}; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_client::RpcClient; +use eyre::Result; +use futures_util::{stream, StreamExt}; +#[tokio::main] +async fn main() -> Result<()> { + let (provider, _anvil) = init().await; + + let sub = provider.subscribe_blocks().await?; + let mut stream = sub.into_stream().take(2); + while let Some(block) = stream.next().await { + println!("Subscribed Block: {:?}", block.header.number); + } + + // Watch Blocks + + let poller = provider.watch_blocks().await?; + let mut stream = poller.into_stream().flat_map(stream::iter).take(2); + while let Some(block_hash) = stream.next().await { + println!("Watched Block: {:?}", block_hash); + } + Ok(()) +} + +async fn init() -> (RootProvider, AnvilInstance) { + let anvil = Anvil::new().block_time(1).spawn(); + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let client = RpcClient::connect_pubsub(ws).await.unwrap(); + let provider = RootProvider::::new(client); + + (provider, anvil) +} diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs new file mode 100644 index 00000000..576b5532 --- /dev/null +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -0,0 +1,72 @@ +use alloy_network::Ethereum; +use alloy_node_bindings::{Anvil, AnvilInstance}; +use alloy_provider::{Provider, RootProvider}; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_client::RpcClient; +use alloy_rpc_types::Filter; +use alloy_sol_types::{sol, SolEvent}; +use eyre::Result; +use futures_util::{stream, StreamExt}; + +sol!( + #[sol(rpc, bytecode = "0x60806040526000805534801561001457600080fd5b50610260806100246000396000f3fe608060405234801561001057600080fd5b50600436106100415760003560e01c80632baeceb71461004657806361bc221a14610050578063d09de08a1461006e575b600080fd5b61004e610078565b005b6100586100d9565b6040516100659190610159565b60405180910390f35b6100766100df565b005b600160008082825461008a91906101a3565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167fdc69c403b972fc566a14058b3b18e1513da476de6ac475716e489fae0cbe4a2660405160405180910390a3565b60005481565b60016000808282546100f191906101e6565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167ff6d1d8d205b41f9fb9549900a8dba5d669d68117a3a2b88c1ebc61163e8117ba60405160405180910390a3565b6000819050919050565b61015381610140565b82525050565b600060208201905061016e600083018461014a565b92915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101ae82610140565b91506101b983610140565b92508282039050818112600084121682821360008512151617156101e0576101df610174565b5b92915050565b60006101f182610140565b91506101fc83610140565b92508282019050828112156000831216838212600084121516171561022457610223610174565b5b9291505056fea26469706673582212208d0d34c26bfd2938ff07dd54c3fcc2bc4509e4ae654edff58101e5e7ab8cf18164736f6c63430008180033")] + contract EventExample { + int256 public counter = 0; + + event Increment(address indexed by, int256 indexed value); + // event Decrement(address indexed by, int256 indexed value); // Uncommenting this line would cause the duplicate definitions for EventExampleInstance_filter + + function increment() public { + counter += 1; + emit Increment(msg.sender, counter); + } + + // function decrement() public { + // counter -= 1; + // emit Decrement(msg.sender, counter); + // } + } +); + +#[tokio::main] +async fn main() -> Result<()> { + let (provider, _anvil) = init().await; + + let deployed_contract = EventExample::deploy(provider.clone()).await?; + + println!("Deployed contract at: {:?}", deployed_contract.address()); + + let filter = Filter::new() + .address(deployed_contract.address().to_owned()) + .event_signature(EventExample::Increment::SIGNATURE_HASH); + + let poller = provider.watch_logs(&filter).await?; + + println!("Watching for events..."); + println!("every {:?}", poller.poll_interval()); // Default 250ms + + let mut stream = poller.into_stream().flat_map(stream::iter).take(5); + + // Build a call to increment the counter + let increment_call = deployed_contract.increment(); + + // Send the increment call 5 times + for _ in 0..5 { + let _ = increment_call.send().await?; + } + + while let Some(log) = stream.next().await { + println!("Received log: {:?}", log); + } + + Ok(()) +} + +async fn init() -> (RootProvider, AnvilInstance) { + let anvil = Anvil::new().block_time(1).spawn(); + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let client = RpcClient::connect_pubsub(ws).await.unwrap(); + let provider = RootProvider::::new(client); + + (provider, anvil) +} From 381b7ac17c7de3b06d40e828b10058c7b86b460b Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Thu, 14 Mar 2024 19:54:51 -0400 Subject: [PATCH 02/10] update: watch_contract_event --- .../examples/watch_contract_event.rs | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs index 576b5532..feaa8fac 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -14,17 +14,17 @@ sol!( int256 public counter = 0; event Increment(address indexed by, int256 indexed value); - // event Decrement(address indexed by, int256 indexed value); // Uncommenting this line would cause the duplicate definitions for EventExampleInstance_filter + event Decrement(address indexed by, int256 indexed value); function increment() public { counter += 1; emit Increment(msg.sender, counter); } - // function decrement() public { - // counter -= 1; - // emit Decrement(msg.sender, counter); - // } + function decrement() public { + counter -= 1; + emit Decrement(msg.sender, counter); + } } ); @@ -36,27 +36,43 @@ async fn main() -> Result<()> { println!("Deployed contract at: {:?}", deployed_contract.address()); - let filter = Filter::new() + let increment_filter = Filter::new() .address(deployed_contract.address().to_owned()) .event_signature(EventExample::Increment::SIGNATURE_HASH); - let poller = provider.watch_logs(&filter).await?; + let increment_poller = provider.watch_logs(&increment_filter).await?; + + let decrement_filter = Filter::new() + .address(deployed_contract.address().to_owned()) + .event_signature(EventExample::Decrement::SIGNATURE_HASH); + let decrement_poller = provider.watch_logs(&decrement_filter).await?; println!("Watching for events..."); - println!("every {:?}", poller.poll_interval()); // Default 250ms + println!("every {:?}", increment_poller.poll_interval()); // Default 250ms for local connection else 7s - let mut stream = poller.into_stream().flat_map(stream::iter).take(5); + let mut increment_stream = increment_poller.into_stream().flat_map(stream::iter).take(2); + let mut decrement_stream = decrement_poller.into_stream().flat_map(stream::iter).take(2); // Build a call to increment the counter let increment_call = deployed_contract.increment(); - - // Send the increment call 5 times - for _ in 0..5 { + // Send the increment call 2 times + for _ in 0..2 { let _ = increment_call.send().await?; } - while let Some(log) = stream.next().await { - println!("Received log: {:?}", log); + // Build a call to decrement the counter + let decrement_call = deployed_contract.decrement(); + // Send the decrement call 2 times + for _ in 0..2 { + let _ = decrement_call.send().await?; + } + + while let Some(log) = increment_stream.next().await { + println!("Received Increment: {:?}", log); + } + + while let Some(log) = decrement_stream.next().await { + println!("Received Decrement: {:?}", log); } Ok(()) From b218d4a8086e9022e464bbf57fa03dbb3fd1d1f9 Mon Sep 17 00:00:00 2001 From: zerosnacks Date: Fri, 15 Mar 2024 10:46:04 +0000 Subject: [PATCH 03/10] clean up examples --- README.md | 7 ++--- examples/subscriptions/Cargo.toml | 30 ++++++++----------- .../{subscribe_blocks.rs => watch_blocks.rs} | 8 +++-- ...{watch_contract_event.rs => watch_logs.rs} | 6 ++++ 4 files changed, 27 insertions(+), 24 deletions(-) rename examples/subscriptions/examples/{subscribe_blocks.rs => watch_blocks.rs} (94%) rename examples/subscriptions/examples/{watch_contract_event.rs => watch_logs.rs} (98%) diff --git a/README.md b/README.md index 3d3de244..d6f1a174 100644 --- a/README.md +++ b/README.md @@ -63,10 +63,9 @@ cargo run --example mnemonic_signer - [ ] Paginated logs - [ ] UniswapV2 pair - [ ] Transactions -- [ ] Subscriptions - - [ ] Watch blocks - - [ ] Subscribe events by type - - [ ] Subscribe logs +- [x] Subscriptions + - [x] [Watch blocks](./examples/subscriptions/examples/watch_blocks.rs) + - [x] [Watch logs](./examples/subscriptions/examples/watch_logs.rs) - [ ] Transactions - [ ] Call override - [ ] Create raw transaction diff --git a/examples/subscriptions/Cargo.toml b/examples/subscriptions/Cargo.toml index 2b686cc8..6505044a 100644 --- a/examples/subscriptions/Cargo.toml +++ b/examples/subscriptions/Cargo.toml @@ -1,32 +1,26 @@ [package] name = "examples-subscriptions" -version = "0.0.0" -publish = false -authors.workspace = true +publish.workspace = true +version.workspace = true edition.workspace = true rust-version.workspace = true +authors.workspace = true license.workspace = true homepage.workspace = true repository.workspace = true -exclude.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dev-dependencies] -alloy-primitives.workspace = true -alloy-sol-types = { workspace = true, features = ["json"] } -alloy-provider = { workspace = true, features = ["pubsub", "ws"] } alloy-contract.workspace = true +alloy-network.workspace = true +alloy-node-bindings.workspace = true +alloy-provider = { workspace = true, features = ["pubsub", "ws"] } alloy-pubsub.workspace = true -alloy-rpc-types.workspace = true +alloy-sol-types = { workspace = true } alloy-rpc-client.workspace = true -alloy-rpc-trace-types.workspace = true -alloy-node-bindings.workspace = true -alloy-transport.workspace = true -alloy-transport-http.workspace = true -alloy-network.workspace = true -tokio = { version = "1.36.0", features = ["rt-multi-thread", "macros"] } -eyre = "0.6.12" -reqwest = "0.11.26" +alloy-rpc-types.workspace = true + +eyre.workspace = true futures-util = "0.3" +reqwest.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/examples/subscriptions/examples/subscribe_blocks.rs b/examples/subscriptions/examples/watch_blocks.rs similarity index 94% rename from examples/subscriptions/examples/subscribe_blocks.rs rename to examples/subscriptions/examples/watch_blocks.rs index c9e3bb41..95216ee3 100644 --- a/examples/subscriptions/examples/subscribe_blocks.rs +++ b/examples/subscriptions/examples/watch_blocks.rs @@ -1,3 +1,5 @@ +//! Example of subscribing to blocks and watching blocks. + use alloy_network::Ethereum; use alloy_node_bindings::{Anvil, AnvilInstance}; use alloy_provider::{Provider, RootProvider}; @@ -5,23 +7,25 @@ use alloy_pubsub::PubSubFrontend; use alloy_rpc_client::RpcClient; use eyre::Result; use futures_util::{stream, StreamExt}; + #[tokio::main] async fn main() -> Result<()> { let (provider, _anvil) = init().await; let sub = provider.subscribe_blocks().await?; let mut stream = sub.into_stream().take(2); + while let Some(block) = stream.next().await { println!("Subscribed Block: {:?}", block.header.number); } - // Watch Blocks - let poller = provider.watch_blocks().await?; let mut stream = poller.into_stream().flat_map(stream::iter).take(2); + while let Some(block_hash) = stream.next().await { println!("Watched Block: {:?}", block_hash); } + Ok(()) } diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_logs.rs similarity index 98% rename from examples/subscriptions/examples/watch_contract_event.rs rename to examples/subscriptions/examples/watch_logs.rs index feaa8fac..623f15d9 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_logs.rs @@ -1,3 +1,5 @@ +//! Example of subscribing to contract events and watching contract events. + use alloy_network::Ethereum; use alloy_node_bindings::{Anvil, AnvilInstance}; use alloy_provider::{Provider, RootProvider}; @@ -47,14 +49,17 @@ async fn main() -> Result<()> { .event_signature(EventExample::Decrement::SIGNATURE_HASH); let decrement_poller = provider.watch_logs(&decrement_filter).await?; + println!("Watching for events..."); println!("every {:?}", increment_poller.poll_interval()); // Default 250ms for local connection else 7s let mut increment_stream = increment_poller.into_stream().flat_map(stream::iter).take(2); let mut decrement_stream = decrement_poller.into_stream().flat_map(stream::iter).take(2); + // Build a call to increment the counter let increment_call = deployed_contract.increment(); + // Send the increment call 2 times for _ in 0..2 { let _ = increment_call.send().await?; @@ -62,6 +67,7 @@ async fn main() -> Result<()> { // Build a call to decrement the counter let decrement_call = deployed_contract.decrement(); + // Send the decrement call 2 times for _ in 0..2 { let _ = decrement_call.send().await?; From 27b5c1f6fd45699e8956736bf2ccdcca1cf097cf Mon Sep 17 00:00:00 2001 From: zerosnacks Date: Fri, 15 Mar 2024 12:47:13 +0000 Subject: [PATCH 04/10] rename --- .../examples/{watch_blocks.rs => subscribe_blocks.rs} | 0 .../examples/{watch_logs.rs => watch_contract_event.rs} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename examples/subscriptions/examples/{watch_blocks.rs => subscribe_blocks.rs} (100%) rename examples/subscriptions/examples/{watch_logs.rs => watch_contract_event.rs} (100%) diff --git a/examples/subscriptions/examples/watch_blocks.rs b/examples/subscriptions/examples/subscribe_blocks.rs similarity index 100% rename from examples/subscriptions/examples/watch_blocks.rs rename to examples/subscriptions/examples/subscribe_blocks.rs diff --git a/examples/subscriptions/examples/watch_logs.rs b/examples/subscriptions/examples/watch_contract_event.rs similarity index 100% rename from examples/subscriptions/examples/watch_logs.rs rename to examples/subscriptions/examples/watch_contract_event.rs From 2f4690976d4469f725fa1119e527a70c98ca41b7 Mon Sep 17 00:00:00 2001 From: zerosnacks Date: Fri, 15 Mar 2024 12:48:44 +0000 Subject: [PATCH 05/10] clean up --- README.md | 4 ++-- examples/subscriptions/examples/watch_contract_event.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d6f1a174..8b326121 100644 --- a/README.md +++ b/README.md @@ -64,8 +64,8 @@ cargo run --example mnemonic_signer - [ ] UniswapV2 pair - [ ] Transactions - [x] Subscriptions - - [x] [Watch blocks](./examples/subscriptions/examples/watch_blocks.rs) - - [x] [Watch logs](./examples/subscriptions/examples/watch_logs.rs) + - [x] [Subscribe and watch blocks](./examples/subscriptions/examples/subscribe_blocks.rs) + - [x] [Subscribe contract events and watch logs](./examples/subscriptions/examples/watch_contract_event.rs) - [ ] Transactions - [ ] Call override - [ ] Create raw transaction diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs index 623f15d9..ccf00e2f 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -1,4 +1,4 @@ -//! Example of subscribing to contract events and watching contract events. +//! Example of subscribing to contract events and watching logs. use alloy_network::Ethereum; use alloy_node_bindings::{Anvil, AnvilInstance}; From 886ffea9fd869f262ce17faeb33d43e0c62a5fb5 Mon Sep 17 00:00:00 2001 From: zerosnacks Date: Fri, 15 Mar 2024 12:59:34 +0000 Subject: [PATCH 06/10] reorder deps to be alphabetic --- examples/subscriptions/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/subscriptions/Cargo.toml b/examples/subscriptions/Cargo.toml index 6505044a..1d9b6eab 100644 --- a/examples/subscriptions/Cargo.toml +++ b/examples/subscriptions/Cargo.toml @@ -16,9 +16,9 @@ alloy-network.workspace = true alloy-node-bindings.workspace = true alloy-provider = { workspace = true, features = ["pubsub", "ws"] } alloy-pubsub.workspace = true -alloy-sol-types = { workspace = true } alloy-rpc-client.workspace = true alloy-rpc-types.workspace = true +alloy-sol-types = { workspace = true } eyre.workspace = true futures-util = "0.3" From c6fd9bb8717bb399f4cc3457b4950a5444cbaf6b Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Sun, 17 Mar 2024 21:39:08 -0400 Subject: [PATCH 07/10] use event_filter from ConstractInstance --- .../examples/watch_contract_event.rs | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs index ccf00e2f..4ea22794 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -5,8 +5,7 @@ use alloy_node_bindings::{Anvil, AnvilInstance}; use alloy_provider::{Provider, RootProvider}; use alloy_pubsub::PubSubFrontend; use alloy_rpc_client::RpcClient; -use alloy_rpc_types::Filter; -use alloy_sol_types::{sol, SolEvent}; +use alloy_sol_types::sol; use eyre::Result; use futures_util::{stream, StreamExt}; @@ -38,15 +37,11 @@ async fn main() -> Result<()> { println!("Deployed contract at: {:?}", deployed_contract.address()); - let increment_filter = Filter::new() - .address(deployed_contract.address().to_owned()) - .event_signature(EventExample::Increment::SIGNATURE_HASH); + let increment_filter = deployed_contract.Increment_filter().filter; let increment_poller = provider.watch_logs(&increment_filter).await?; - let decrement_filter = Filter::new() - .address(deployed_contract.address().to_owned()) - .event_signature(EventExample::Decrement::SIGNATURE_HASH); + let decrement_filter = deployed_contract.Decrement_filter().filter; let decrement_poller = provider.watch_logs(&decrement_filter).await?; @@ -59,17 +54,11 @@ async fn main() -> Result<()> { // Build a call to increment the counter let increment_call = deployed_contract.increment(); - - // Send the increment call 2 times - for _ in 0..2 { - let _ = increment_call.send().await?; - } - // Build a call to decrement the counter let decrement_call = deployed_contract.decrement(); - - // Send the decrement call 2 times + // Send the call 2 times for _ in 0..2 { + let _ = increment_call.send().await?; let _ = decrement_call.send().await?; } From cddd6391faa787c655ce13b265e0229189bc8803 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Sun, 17 Mar 2024 21:55:14 -0400 Subject: [PATCH 08/10] use .watch() from ContractInstance --- .../examples/watch_contract_event.rs | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs index 4ea22794..24d495bb 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -2,12 +2,12 @@ use alloy_network::Ethereum; use alloy_node_bindings::{Anvil, AnvilInstance}; -use alloy_provider::{Provider, RootProvider}; +use alloy_provider::RootProvider; use alloy_pubsub::PubSubFrontend; use alloy_rpc_client::RpcClient; use alloy_sol_types::sol; use eyre::Result; -use futures_util::{stream, StreamExt}; +use futures_util::StreamExt; sol!( #[sol(rpc, bytecode = "0x60806040526000805534801561001457600080fd5b50610260806100246000396000f3fe608060405234801561001057600080fd5b50600436106100415760003560e01c80632baeceb71461004657806361bc221a14610050578063d09de08a1461006e575b600080fd5b61004e610078565b005b6100586100d9565b6040516100659190610159565b60405180910390f35b6100766100df565b005b600160008082825461008a91906101a3565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167fdc69c403b972fc566a14058b3b18e1513da476de6ac475716e489fae0cbe4a2660405160405180910390a3565b60005481565b60016000808282546100f191906101e6565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167ff6d1d8d205b41f9fb9549900a8dba5d669d68117a3a2b88c1ebc61163e8117ba60405160405180910390a3565b6000819050919050565b61015381610140565b82525050565b600060208201905061016e600083018461014a565b92915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101ae82610140565b91506101b983610140565b92508282039050818112600084121682821360008512151617156101e0576101df610174565b5b92915050565b60006101f182610140565b91506101fc83610140565b92508282019050828112156000831216838212600084121516171561022457610223610174565b5b9291505056fea26469706673582212208d0d34c26bfd2938ff07dd54c3fcc2bc4509e4ae654edff58101e5e7ab8cf18164736f6c63430008180033")] @@ -37,20 +37,8 @@ async fn main() -> Result<()> { println!("Deployed contract at: {:?}", deployed_contract.address()); - let increment_filter = deployed_contract.Increment_filter().filter; - - let increment_poller = provider.watch_logs(&increment_filter).await?; - - let decrement_filter = deployed_contract.Decrement_filter().filter; - - let decrement_poller = provider.watch_logs(&decrement_filter).await?; - - println!("Watching for events..."); - println!("every {:?}", increment_poller.poll_interval()); // Default 250ms for local connection else 7s - - let mut increment_stream = increment_poller.into_stream().flat_map(stream::iter).take(2); - - let mut decrement_stream = decrement_poller.into_stream().flat_map(stream::iter).take(2); + let increment_filter = deployed_contract.Increment_filter().watch().await?; + let decrement_filter = deployed_contract.Decrement_filter().watch().await?; // Build a call to increment the counter let increment_call = deployed_contract.increment(); @@ -62,13 +50,35 @@ async fn main() -> Result<()> { let _ = decrement_call.send().await?; } - while let Some(log) = increment_stream.next().await { - println!("Received Increment: {:?}", log); - } - - while let Some(log) = decrement_stream.next().await { - println!("Received Decrement: {:?}", log); - } + increment_filter + .into_stream() + .take(2) + .for_each(|log| async { + match log { + Ok((_event, log)) => { + println!("Received Increment: {:?}", log); + } + Err(e) => { + println!("Error: {:?}", e); + } + } + }) + .await; + + decrement_filter + .into_stream() + .take(2) + .for_each(|log| async { + match log { + Ok((_event, log)) => { + println!("Received Decrement: {:?}", log); + } + Err(e) => { + println!("Error: {:?}", e); + } + } + }) + .await; Ok(()) } From c1e2e38bf373a901876cba68acf23d8ea35c2011 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Sun, 17 Mar 2024 23:08:27 -0400 Subject: [PATCH 09/10] feat(sub): event multiplexer --- examples/subscriptions/Cargo.toml | 1 + .../examples/event_multiplexer.rs | 121 ++++++++++++++++++ .../examples/watch_contract_event.rs | 2 +- 3 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 examples/subscriptions/examples/event_multiplexer.rs diff --git a/examples/subscriptions/Cargo.toml b/examples/subscriptions/Cargo.toml index 1d9b6eab..87856568 100644 --- a/examples/subscriptions/Cargo.toml +++ b/examples/subscriptions/Cargo.toml @@ -16,6 +16,7 @@ alloy-network.workspace = true alloy-node-bindings.workspace = true alloy-provider = { workspace = true, features = ["pubsub", "ws"] } alloy-pubsub.workspace = true +alloy-primitives.workspace = true alloy-rpc-client.workspace = true alloy-rpc-types.workspace = true alloy-sol-types = { workspace = true } diff --git a/examples/subscriptions/examples/event_multiplexer.rs b/examples/subscriptions/examples/event_multiplexer.rs new file mode 100644 index 00000000..d2ff9a12 --- /dev/null +++ b/examples/subscriptions/examples/event_multiplexer.rs @@ -0,0 +1,121 @@ +//! Example of multiplexing watching event logs. + +use std::str::FromStr; + +use alloy_network::Ethereum; +use alloy_node_bindings::{Anvil, AnvilInstance}; +use alloy_primitives::I256; +use alloy_provider::RootProvider; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_client::RpcClient; +use alloy_sol_types::{sol, SolEvent}; +use eyre::Result; +use futures_util::StreamExt; + +sol!( + #[derive(Debug)] + #[sol(rpc, bytecode = "0x608060405234801561001057600080fd5b50610485806100206000396000f3fe608060405234801561001057600080fd5b506004361061004c5760003560e01c80634350913814610051578063a5f3c23b1461006d578063adefc37b14610089578063bbe93d91146100a5575b600080fd5b61006b60048036038101906100669190610248565b6100c1565b005b61008760048036038101906100829190610248565b610114565b005b6100a3600480360381019061009e9190610248565b610167565b005b6100bf60048036038101906100ba9190610248565b6101ba565b005b80826100cd91906102e6565b3373ffffffffffffffffffffffffffffffffffffffff167f1c1e8bbe327890ea8d3f5b22370a56c3fcef7ff82f306161f64647fe5d28588160405160405180910390a35050565b80826101209190610350565b3373ffffffffffffffffffffffffffffffffffffffff167f6da406ea462447ed7804b4a4dc69c67b53d3d45a50381ae3e9cf878c9d7c23df60405160405180910390a35050565b80826101739190610394565b3373ffffffffffffffffffffffffffffffffffffffff167f32e913bf2ad35da1e845597618bb9f3f80642a68dd39f30a093a7838aa61fb2760405160405180910390a35050565b80826101c691906103d7565b3373ffffffffffffffffffffffffffffffffffffffff167fd7a123d4c8e44db3186e04b9c96c102287276929c930f2e8abcaa555ef5dcacc60405160405180910390a35050565b600080fd5b6000819050919050565b61022581610212565b811461023057600080fd5b50565b6000813590506102428161021c565b92915050565b6000806040838503121561025f5761025e61020d565b5b600061026d85828601610233565b925050602061027e85828601610233565b9150509250929050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601260045260246000fd5b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006102f182610212565b91506102fc83610212565b92508261030c5761030b610288565b5b600160000383147f800000000000000000000000000000000000000000000000000000000000000083141615610345576103446102b7565b5b828205905092915050565b600061035b82610212565b915061036683610212565b92508282019050828112156000831216838212600084121516171561038e5761038d6102b7565b5b92915050565b600061039f82610212565b91506103aa83610212565b92508282039050818112600084121682821360008512151617156103d1576103d06102b7565b5b92915050565b60006103e282610212565b91506103ed83610212565b92508282026103fb81610212565b91507f80000000000000000000000000000000000000000000000000000000000000008414600084121615610433576104326102b7565b5b8282058414831517610448576104476102b7565b5b509291505056fea2646970667358221220386c6c77ebc5f1bae50f37d123c5a510f2f678b30900c2d5ebf09f68c9353f4b64736f6c63430008180033")] + contract EventMultiplexer { + event Add(address indexed sender, int256 indexed value); + event Sub(address indexed sender, int256 indexed value); + event Mul(address indexed sender, int256 indexed value); + event Div(address indexed sender, int256 indexed value); + + function add(int256 a, int256 b) public { + emit Add(msg.sender, a + b); + } + + function sub(int256 a, int256 b) public { + emit Sub(msg.sender, a - b); + } + + function mul(int256 a, int256 b) public { + emit Mul(msg.sender, a * b); + } + + function div(int256 a, int256 b) public { + emit Div(msg.sender, a / b); + } + } +); + +#[tokio::main] +async fn main() -> Result<()> { + let (provider, _anvil) = init().await; + + let deployed_contract = EventMultiplexer::deploy(provider.clone()).await?; + + println!("Deployed contract at: {:?}", deployed_contract.address()); + + let add_filter = deployed_contract.Add_filter().watch().await?; + let sub_filter = deployed_contract.Sub_filter().watch().await?; + let mul_filter = deployed_contract.Mul_filter().watch().await?; + let div_filter = deployed_contract.Div_filter().watch().await?; + + let a = I256::from_str("1").unwrap(); + let b = I256::from_str("1").unwrap(); + // Build calls + let add_call = deployed_contract.add(a, b); + let sub_call = deployed_contract.sub(a, b); + let mul_call = deployed_contract.mul(a, b); + let div_call = deployed_contract.div(a, b); + + // Send calls + let _ = add_call.send().await?; + let _ = sub_call.send().await?; + let _ = mul_call.send().await?; + let _ = div_call.send().await?; + + let mut add_stream = add_filter.into_stream(); + let mut sub_stream = sub_filter.into_stream(); + let mut mul_stream = mul_filter.into_stream(); + let mut div_stream = div_filter.into_stream(); + + let add_log = &EventMultiplexer::Add::SIGNATURE_HASH; + let sub_log = &EventMultiplexer::Sub::SIGNATURE_HASH; + let mul_log = &EventMultiplexer::Mul::SIGNATURE_HASH; + let div_log = &EventMultiplexer::Div::SIGNATURE_HASH; + + // Use tokio::select! to multiplex the streams and capture the log + // tokio::select! will return the first event that arrives from any of the streams + // The for loop helps capture all the logs + for _ in 0..4 { + let log = tokio::select! { + Some(log) = add_stream.next() => { + log.unwrap().1 + } + Some(log) = sub_stream.next() => { + log.unwrap().1 + } + Some(log) = mul_stream.next() => { + log.unwrap().1 + } + Some(log) = div_stream.next() => { + log.unwrap().1 + } + }; + + let topic = &log.topics[0]; + + if topic == add_log { + println!("Received Add: {:?}", log); + } else if topic == sub_log { + println!("Received Sub: {:?}", log); + } else if topic == mul_log { + println!("Received Mul: {:?}", log); + } else if topic == div_log { + println!("Received Div: {:?}", log); + } + } + + Ok(()) +} + +async fn init() -> (RootProvider, AnvilInstance) { + let anvil = Anvil::new().block_time(1).spawn(); + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let client = RpcClient::connect_pubsub(ws).await.unwrap(); + let provider = RootProvider::::new(client); + + (provider, anvil) +} diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs index 24d495bb..9a8d998a 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -1,4 +1,4 @@ -//! Example of subscribing to contract events and watching logs. +//! Example of watching contract events. use alloy_network::Ethereum; use alloy_node_bindings::{Anvil, AnvilInstance}; From 2d12d27d0005bb5214eb484f8a1f0f52fa408926 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 25 Mar 2024 12:27:50 -0400 Subject: [PATCH 10/10] fix: use alloy namespace and temp dependency fix for features --- examples/queries/Cargo.toml | 6 ----- examples/subscriptions/Cargo.toml | 27 ++++++++++++------- .../examples/event_multiplexer.rs | 22 ++++----------- .../examples/subscribe_blocks.rs | 17 +++--------- .../examples/watch_contract_event.rs | 18 +++---------- 5 files changed, 31 insertions(+), 59 deletions(-) diff --git a/examples/queries/Cargo.toml b/examples/queries/Cargo.toml index d631f3b5..87c67db2 100644 --- a/examples/queries/Cargo.toml +++ b/examples/queries/Cargo.toml @@ -12,12 +12,6 @@ repository.workspace = true [dev-dependencies] alloy.workspace = true -# alloy-network.workspace = true -# alloy-primitives.workspace = true -# alloy-provider = { workspace = true, features = ["pubsub", "ws"] } -# alloy-rpc-types.workspace = true -# alloy-rpc-client.workspace = true -# alloy-transport-http.workspace = true eyre.workspace = true futures-util = "0.3" diff --git a/examples/subscriptions/Cargo.toml b/examples/subscriptions/Cargo.toml index 87856568..e37af35e 100644 --- a/examples/subscriptions/Cargo.toml +++ b/examples/subscriptions/Cargo.toml @@ -11,15 +11,24 @@ homepage.workspace = true repository.workspace = true [dev-dependencies] -alloy-contract.workspace = true -alloy-network.workspace = true -alloy-node-bindings.workspace = true -alloy-provider = { workspace = true, features = ["pubsub", "ws"] } -alloy-pubsub.workspace = true -alloy-primitives.workspace = true -alloy-rpc-client.workspace = true -alloy-rpc-types.workspace = true -alloy-sol-types = { workspace = true } +alloy.workspace = true +# Temp fix for enabling features. Ref: https://github.com/alloy-rs/examples/pull/3/#discussion_r1537842062 +alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "fd8f065", features = [ + "pubsub", + "ws", +] } +alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "fd8f065", features = [ + "pubsub", +] } +# alloy-contract.workspace = true +# alloy-network.workspace = true +# alloy-node-bindings.workspace = true +# alloy-provider = { workspace = true, features = ["pubsub", "ws"] } +# alloy-pubsub.workspace = true +# alloy-primitives.workspace = true +# alloy-rpc-client.workspace = true +# alloy-rpc-types.workspace = true +# alloy-sol-types = { workspace = true } eyre.workspace = true futures-util = "0.3" diff --git a/examples/subscriptions/examples/event_multiplexer.rs b/examples/subscriptions/examples/event_multiplexer.rs index d2ff9a12..e0522ea7 100644 --- a/examples/subscriptions/examples/event_multiplexer.rs +++ b/examples/subscriptions/examples/event_multiplexer.rs @@ -1,16 +1,11 @@ //! Example of multiplexing watching event logs. -use std::str::FromStr; - -use alloy_network::Ethereum; -use alloy_node_bindings::{Anvil, AnvilInstance}; -use alloy_primitives::I256; +use alloy::{network::Ethereum, node_bindings::Anvil, primitives::I256, sol, sol_types::SolEvent}; use alloy_provider::RootProvider; -use alloy_pubsub::PubSubFrontend; use alloy_rpc_client::RpcClient; -use alloy_sol_types::{sol, SolEvent}; use eyre::Result; use futures_util::StreamExt; +use std::str::FromStr; sol!( #[derive(Debug)] @@ -41,7 +36,9 @@ sol!( #[tokio::main] async fn main() -> Result<()> { - let (provider, _anvil) = init().await; + let anvil = Anvil::new().block_time(1).spawn(); + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let provider = RootProvider::::new(RpcClient::connect_pubsub(ws).await?); let deployed_contract = EventMultiplexer::deploy(provider.clone()).await?; @@ -110,12 +107,3 @@ async fn main() -> Result<()> { Ok(()) } - -async fn init() -> (RootProvider, AnvilInstance) { - let anvil = Anvil::new().block_time(1).spawn(); - let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); - let client = RpcClient::connect_pubsub(ws).await.unwrap(); - let provider = RootProvider::::new(client); - - (provider, anvil) -} diff --git a/examples/subscriptions/examples/subscribe_blocks.rs b/examples/subscriptions/examples/subscribe_blocks.rs index 95216ee3..878967ad 100644 --- a/examples/subscriptions/examples/subscribe_blocks.rs +++ b/examples/subscriptions/examples/subscribe_blocks.rs @@ -1,16 +1,16 @@ //! Example of subscribing to blocks and watching blocks. -use alloy_network::Ethereum; -use alloy_node_bindings::{Anvil, AnvilInstance}; +use alloy::{network::Ethereum, node_bindings::Anvil}; use alloy_provider::{Provider, RootProvider}; -use alloy_pubsub::PubSubFrontend; use alloy_rpc_client::RpcClient; use eyre::Result; use futures_util::{stream, StreamExt}; #[tokio::main] async fn main() -> Result<()> { - let (provider, _anvil) = init().await; + let anvil = Anvil::new().block_time(1).spawn(); + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let provider = RootProvider::::new(RpcClient::connect_pubsub(ws).await?); let sub = provider.subscribe_blocks().await?; let mut stream = sub.into_stream().take(2); @@ -28,12 +28,3 @@ async fn main() -> Result<()> { Ok(()) } - -async fn init() -> (RootProvider, AnvilInstance) { - let anvil = Anvil::new().block_time(1).spawn(); - let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); - let client = RpcClient::connect_pubsub(ws).await.unwrap(); - let provider = RootProvider::::new(client); - - (provider, anvil) -} diff --git a/examples/subscriptions/examples/watch_contract_event.rs b/examples/subscriptions/examples/watch_contract_event.rs index 9a8d998a..936051f6 100644 --- a/examples/subscriptions/examples/watch_contract_event.rs +++ b/examples/subscriptions/examples/watch_contract_event.rs @@ -1,11 +1,8 @@ //! Example of watching contract events. -use alloy_network::Ethereum; -use alloy_node_bindings::{Anvil, AnvilInstance}; +use alloy::{network::Ethereum, node_bindings::Anvil, sol}; use alloy_provider::RootProvider; -use alloy_pubsub::PubSubFrontend; use alloy_rpc_client::RpcClient; -use alloy_sol_types::sol; use eyre::Result; use futures_util::StreamExt; @@ -31,7 +28,9 @@ sol!( #[tokio::main] async fn main() -> Result<()> { - let (provider, _anvil) = init().await; + let anvil = Anvil::new().block_time(1).spawn(); + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let provider = RootProvider::::new(RpcClient::connect_pubsub(ws).await?); let deployed_contract = EventExample::deploy(provider.clone()).await?; @@ -82,12 +81,3 @@ async fn main() -> Result<()> { Ok(()) } - -async fn init() -> (RootProvider, AnvilInstance) { - let anvil = Anvil::new().block_time(1).spawn(); - let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); - let client = RpcClient::connect_pubsub(ws).await.unwrap(); - let provider = RootProvider::::new(client); - - (provider, anvil) -}