Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriptions): sub_blocks, watch_contract_event #5

Merged
merged 14 commits into from
Mar 25, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "MIT OR Apache-2.0"
homepage = "https://github.com/alloy-rs/examples"
repository = "https://github.com/alloy-rs/examples"
publish = false
exclude = ["examples/"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed as publishing is disabled


[workspace.dependencies]
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e7dfb4f", default-features = false }
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ cargo run --example mnemonic_signer
- [ ] Paginated logs
- [ ] UniswapV2 pair
- [ ] Transactions
- [ ] Subscriptions
- [ ] Watch blocks
- [ ] Subscribe events by type
- [ ] Subscribe logs
- [x] Subscriptions
- [x] [Subscribe and watch blocks](./examples/subscriptions/examples/subscribe_blocks.rs)
- [x] [Subscribe contract events and watch logs](./examples/subscriptions/examples/watch_contract_event.rs)
- [ ] Transactions
- [ ] Call override
- [ ] Create raw transaction
Expand Down
27 changes: 27 additions & 0 deletions examples/subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "examples-subscriptions"

publish.workspace = true
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dev-dependencies]
alloy-contract.workspace = true
alloy-network.workspace = true
alloy-node-bindings.workspace = true
alloy-provider = { workspace = true, features = ["pubsub", "ws"] }
alloy-pubsub.workspace = true
alloy-primitives.workspace = true
alloy-rpc-client.workspace = true
alloy-rpc-types.workspace = true
alloy-sol-types = { workspace = true }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the new Alloy namespace (see: #11)


eyre.workspace = true
futures-util = "0.3"
reqwest.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
121 changes: 121 additions & 0 deletions examples/subscriptions/examples/event_multiplexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//! Example of multiplexing watching event logs.

use std::str::FromStr;

use alloy_network::Ethereum;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_primitives::I256;
use alloy_provider::RootProvider;
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_client::RpcClient;
use alloy_sol_types::{sol, SolEvent};
use eyre::Result;
use futures_util::StreamExt;

sol!(
#[derive(Debug)]
#[sol(rpc, bytecode = "0x608060405234801561001057600080fd5b50610485806100206000396000f3fe608060405234801561001057600080fd5b506004361061004c5760003560e01c80634350913814610051578063a5f3c23b1461006d578063adefc37b14610089578063bbe93d91146100a5575b600080fd5b61006b60048036038101906100669190610248565b6100c1565b005b61008760048036038101906100829190610248565b610114565b005b6100a3600480360381019061009e9190610248565b610167565b005b6100bf60048036038101906100ba9190610248565b6101ba565b005b80826100cd91906102e6565b3373ffffffffffffffffffffffffffffffffffffffff167f1c1e8bbe327890ea8d3f5b22370a56c3fcef7ff82f306161f64647fe5d28588160405160405180910390a35050565b80826101209190610350565b3373ffffffffffffffffffffffffffffffffffffffff167f6da406ea462447ed7804b4a4dc69c67b53d3d45a50381ae3e9cf878c9d7c23df60405160405180910390a35050565b80826101739190610394565b3373ffffffffffffffffffffffffffffffffffffffff167f32e913bf2ad35da1e845597618bb9f3f80642a68dd39f30a093a7838aa61fb2760405160405180910390a35050565b80826101c691906103d7565b3373ffffffffffffffffffffffffffffffffffffffff167fd7a123d4c8e44db3186e04b9c96c102287276929c930f2e8abcaa555ef5dcacc60405160405180910390a35050565b600080fd5b6000819050919050565b61022581610212565b811461023057600080fd5b50565b6000813590506102428161021c565b92915050565b6000806040838503121561025f5761025e61020d565b5b600061026d85828601610233565b925050602061027e85828601610233565b9150509250929050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601260045260246000fd5b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006102f182610212565b91506102fc83610212565b92508261030c5761030b610288565b5b600160000383147f800000000000000000000000000000000000000000000000000000000000000083141615610345576103446102b7565b5b828205905092915050565b600061035b82610212565b915061036683610212565b92508282019050828112156000831216838212600084121516171561038e5761038d6102b7565b5b92915050565b600061039f82610212565b91506103aa83610212565b92508282039050818112600084121682821360008512151617156103d1576103d06102b7565b5b92915050565b60006103e282610212565b91506103ed83610212565b92508282026103fb81610212565b91507f80000000000000000000000000000000000000000000000000000000000000008414600084121615610433576104326102b7565b5b8282058414831517610448576104476102b7565b5b509291505056fea2646970667358221220386c6c77ebc5f1bae50f37d123c5a510f2f678b30900c2d5ebf09f68c9353f4b64736f6c63430008180033")]
contract EventMultiplexer {
event Add(address indexed sender, int256 indexed value);
event Sub(address indexed sender, int256 indexed value);
event Mul(address indexed sender, int256 indexed value);
event Div(address indexed sender, int256 indexed value);

function add(int256 a, int256 b) public {
emit Add(msg.sender, a + b);
}

function sub(int256 a, int256 b) public {
emit Sub(msg.sender, a - b);
}

function mul(int256 a, int256 b) public {
emit Mul(msg.sender, a * b);
}

function div(int256 a, int256 b) public {
emit Div(msg.sender, a / b);
}
}
);

#[tokio::main]
async fn main() -> Result<()> {
let (provider, _anvil) = init().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also flatten, no function, not useful for examples


let deployed_contract = EventMultiplexer::deploy(provider.clone()).await?;

println!("Deployed contract at: {:?}", deployed_contract.address());

let add_filter = deployed_contract.Add_filter().watch().await?;
let sub_filter = deployed_contract.Sub_filter().watch().await?;
let mul_filter = deployed_contract.Mul_filter().watch().await?;
let div_filter = deployed_contract.Div_filter().watch().await?;

let a = I256::from_str("1").unwrap();
let b = I256::from_str("1").unwrap();
// Build calls
let add_call = deployed_contract.add(a, b);
let sub_call = deployed_contract.sub(a, b);
let mul_call = deployed_contract.mul(a, b);
let div_call = deployed_contract.div(a, b);

// Send calls
let _ = add_call.send().await?;
let _ = sub_call.send().await?;
let _ = mul_call.send().await?;
let _ = div_call.send().await?;

let mut add_stream = add_filter.into_stream();
let mut sub_stream = sub_filter.into_stream();
let mut mul_stream = mul_filter.into_stream();
let mut div_stream = div_filter.into_stream();

let add_log = &EventMultiplexer::Add::SIGNATURE_HASH;
let sub_log = &EventMultiplexer::Sub::SIGNATURE_HASH;
let mul_log = &EventMultiplexer::Mul::SIGNATURE_HASH;
let div_log = &EventMultiplexer::Div::SIGNATURE_HASH;

// Use tokio::select! to multiplex the streams and capture the log
// tokio::select! will return the first event that arrives from any of the streams
// The for loop helps capture all the logs
for _ in 0..4 {
let log = tokio::select! {
Some(log) = add_stream.next() => {
log.unwrap().1
}
Some(log) = sub_stream.next() => {
log.unwrap().1
}
Some(log) = mul_stream.next() => {
log.unwrap().1
}
Some(log) = div_stream.next() => {
log.unwrap().1
}
};

let topic = &log.topics[0];

if topic == add_log {
println!("Received Add: {:?}", log);
} else if topic == sub_log {
println!("Received Sub: {:?}", log);
} else if topic == mul_log {
println!("Received Mul: {:?}", log);
} else if topic == div_log {
println!("Received Div: {:?}", log);
}
}

Ok(())
}

async fn init() -> (RootProvider<Ethereum, PubSubFrontend>, AnvilInstance) {
let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let client = RpcClient::connect_pubsub(ws).await.unwrap();
let provider = RootProvider::<Ethereum, _>::new(client);

(provider, anvil)
}
39 changes: 39 additions & 0 deletions examples/subscriptions/examples/subscribe_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Example of subscribing to blocks and watching blocks.

use alloy_network::Ethereum;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_provider::{Provider, RootProvider};
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_client::RpcClient;
use eyre::Result;
use futures_util::{stream, StreamExt};

#[tokio::main]
async fn main() -> Result<()> {
let (provider, _anvil) = init().await;

let sub = provider.subscribe_blocks().await?;
let mut stream = sub.into_stream().take(2);
Comment on lines +15 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always call into_stream()? Should we return that in the subscribe_blocks return type instead @DaniPopes by impl Stream on the type?


while let Some(block) = stream.next().await {
println!("Subscribed Block: {:?}", block.header.number);
}

let poller = provider.watch_blocks().await?;
let mut stream = poller.into_stream().flat_map(stream::iter).take(2);

while let Some(block_hash) = stream.next().await {
println!("Watched Block: {:?}", block_hash);
}

Ok(())
}

async fn init() -> (RootProvider<Ethereum, PubSubFrontend>, AnvilInstance) {
let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let client = RpcClient::connect_pubsub(ws).await.unwrap();
let provider = RootProvider::<Ethereum, _>::new(client);

(provider, anvil)
}
93 changes: 93 additions & 0 deletions examples/subscriptions/examples/watch_contract_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! Example of watching contract events.

use alloy_network::Ethereum;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_provider::RootProvider;
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_client::RpcClient;
use alloy_sol_types::sol;
use eyre::Result;
use futures_util::StreamExt;

sol!(
#[sol(rpc, bytecode = "0x60806040526000805534801561001457600080fd5b50610260806100246000396000f3fe608060405234801561001057600080fd5b50600436106100415760003560e01c80632baeceb71461004657806361bc221a14610050578063d09de08a1461006e575b600080fd5b61004e610078565b005b6100586100d9565b6040516100659190610159565b60405180910390f35b6100766100df565b005b600160008082825461008a91906101a3565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167fdc69c403b972fc566a14058b3b18e1513da476de6ac475716e489fae0cbe4a2660405160405180910390a3565b60005481565b60016000808282546100f191906101e6565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167ff6d1d8d205b41f9fb9549900a8dba5d669d68117a3a2b88c1ebc61163e8117ba60405160405180910390a3565b6000819050919050565b61015381610140565b82525050565b600060208201905061016e600083018461014a565b92915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101ae82610140565b91506101b983610140565b92508282039050818112600084121682821360008512151617156101e0576101df610174565b5b92915050565b60006101f182610140565b91506101fc83610140565b92508282019050828112156000831216838212600084121516171561022457610223610174565b5b9291505056fea26469706673582212208d0d34c26bfd2938ff07dd54c3fcc2bc4509e4ae654edff58101e5e7ab8cf18164736f6c63430008180033")]
contract EventExample {
Comment on lines +10 to +11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we ship compiler it'll be interesting to see if we can bundle it with the sol macro

int256 public counter = 0;

event Increment(address indexed by, int256 indexed value);
event Decrement(address indexed by, int256 indexed value);

function increment() public {
counter += 1;
emit Increment(msg.sender, counter);
}

function decrement() public {
counter -= 1;
emit Decrement(msg.sender, counter);
}
}
);

#[tokio::main]
async fn main() -> Result<()> {
let (provider, _anvil) = init().await;

let deployed_contract = EventExample::deploy(provider.clone()).await?;

println!("Deployed contract at: {:?}", deployed_contract.address());

let increment_filter = deployed_contract.Increment_filter().watch().await?;
let decrement_filter = deployed_contract.Decrement_filter().watch().await?;

// Build a call to increment the counter
let increment_call = deployed_contract.increment();
// Build a call to decrement the counter
let decrement_call = deployed_contract.decrement();
// Send the call 2 times
for _ in 0..2 {
let _ = increment_call.send().await?;
let _ = decrement_call.send().await?;
}

increment_filter
.into_stream()
.take(2)
.for_each(|log| async {
match log {
Ok((_event, log)) => {
println!("Received Increment: {:?}", log);
}
Err(e) => {
println!("Error: {:?}", e);
}
}
})
.await;

decrement_filter
.into_stream()
.take(2)
.for_each(|log| async {
match log {
Ok((_event, log)) => {
println!("Received Decrement: {:?}", log);
}
Err(e) => {
println!("Error: {:?}", e);
}
}
})
.await;

Ok(())
}

async fn init() -> (RootProvider<Ethereum, PubSubFrontend>, AnvilInstance) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaniPopes what's a good way to type erase here? impl Provider? wonder if we can make functions that return providers easy to use basically so that they don't need to manually write the network and the transport type

let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let client = RpcClient::connect_pubsub(ws).await.unwrap();
let provider = RootProvider::<Ethereum, _>::new(client);

(provider, anvil)
}
Loading