Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriptions): sub_blocks, watch_contract_event #5

Merged
merged 14 commits into from
Mar 25, 2024
Merged
43 changes: 23 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Example code using [alloy](https://github.com/alloy-rs/alloy) and [alloy-core](https://github.com/alloy-rs/core).

These examples demonstrate the main features of [Alloy](https://github.com/alloy-rs/alloy) and how to use them.
These examples demonstrate the main features of [Alloy](https://github.com/alloy-rs/alloy) and how to use them.
To run an example, use the command `cargo run --example <Example>`.

```sh
Expand All @@ -14,35 +14,38 @@ cargo run --example mnemonic_signer
## Table of Contents

- [x] Anvil
- [x] [Deploy contract](./examples/anvil/examples/deploy_contract_anvil.rs)
- [x] [Fork](./examples/anvil/examples/fork_anvil.rs)
- [x] [Local](./examples/anvil/examples/local_anvil.rs)
- [x] [Deploy contract](./examples/anvil/examples/deploy_contract_anvil.rs)
- [x] [Fork](./examples/anvil/examples/fork_anvil.rs)
- [x] [Local](./examples/anvil/examples/local_anvil.rs)
- [x] Big numbers
- [x] [Comparison and equivalence](./examples/big-numbers/examples/comparison_equivalence.rs)
- [x] [Conversion](./examples/big-numbers/examples/conversion.rs)
- [x] [Creating Instances](./examples/big-numbers/examples/create_instances.rs)
- [x] [Math operations](./examples/big-numbers/examples/math_operations.rs)
- [x] [Math utilities](./examples/big-numbers/examples/math_utilities.rs)
- [x] [Comparison and equivalence](./examples/big-numbers/examples/comparison_equivalence.rs)
- [x] [Conversion](./examples/big-numbers/examples/conversion.rs)
- [x] [Creating Instances](./examples/big-numbers/examples/create_instances.rs)
- [x] [Math operations](./examples/big-numbers/examples/math_operations.rs)
- [x] [Math utilities](./examples/big-numbers/examples/math_utilities.rs)
- [x] Contracts
- [x] [Deploy from artifact](./examples/contracts/examples/deploy_from_artifact.rs)
- [x] [Deploy from contract](./examples/contracts/examples/deploy_from_contract.rs)
- [x] [Generate](./examples/contracts/examples/generate.rs)
- [x] [Deploy from artifact](./examples/contracts/examples/deploy_from_artifact.rs)
- [x] [Deploy from contract](./examples/contracts/examples/deploy_from_contract.rs)
- [x] [Generate](./examples/contracts/examples/generate.rs)
- [x] [Deploy from artifact](./examples/contracts/examples/deploy_from_artifact.rs)
- [x] [Deploy from contract](./examples/contracts/examples/deploy_from_contract.rs)
- [x] [Generate](./examples/contracts/examples/generate.rs)
- [x] Layers
- [x] [Nonce manager](./examples/layers/examples/nonce_layer.rs)
- [x] [Signature manager](./examples/layers/examples/signer_layer.rs)
- [x] Subscriptions
- [x] [Subscribe and watch blocks](./examples/subscriptions/examples/subscribe_blocks.rs)
- [x] [Subscribe contract events and watch logs](./examples/subscriptions/examples/watch_contract_event.rs)
- [x] [Event multiplexer](./examples/subscriptions/examples/event_multiplexer.rs)
- [x] Providers
- [x] [Builder](./examples/providers/examples/builder.rs)
- [x] [HTTP](./examples/providers/examples/http.rs)
- [x] [IPC](./examples/providers/examples/ipc.rs)
- [x] [WS](./examples/providers/examples/ws.rs)
- [x] Layers
- [x] [Nonce manager](./examples/layers/examples/nonce_layer.rs)
- [x] [Signature manager](./examples/layers/examples/signer_layer.rs)
- [x] Queries
- [x] [Contract storage](./examples/queries/examples/query_contract_storage.rs)
- [x] [Contract deployed bytecode](./examples/queries/examples/query_deployed_bytecode.rs)
- [x] [Logs](./examples/queries/examples/query_logs.rs)
- [ ] Subscriptions
- [ ] Watch blocks
- [ ] Subscribe events by type
- [ ] Subscribe logs
- [x] Transactions
- [x] [Decode input](./examples/transactions/examples/decode_input.rs)
- [x] [Get gas price in USD](./examples/transactions/examples/gas_price_usd.rs)
Expand All @@ -60,4 +63,4 @@ cargo run --example mnemonic_signer
- [x] [Sign permit hash](./examples/wallets/examples/sign_permit_hash.rs)
- [x] [Trezor signer](./examples/wallets/examples/trezor_signer.rs)
- [x] [Yubi signer](./examples/wallets/examples/yubi_signer.rs)
- [ ] Keystore signer
- [ ] Keystore signer
6 changes: 0 additions & 6 deletions examples/queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ repository.workspace = true

[dev-dependencies]
alloy.workspace = true
# alloy-network.workspace = true
# alloy-primitives.workspace = true
# alloy-provider = { workspace = true, features = ["pubsub", "ws"] }
# alloy-rpc-types.workspace = true
# alloy-rpc-client.workspace = true
# alloy-transport-http.workspace = true

eyre.workspace = true
futures-util = "0.3"
Expand Down
36 changes: 36 additions & 0 deletions examples/subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "examples-subscriptions"

publish.workspace = true
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dev-dependencies]
alloy.workspace = true
# Temp fix for enabling features. Ref: https://github.com/alloy-rs/examples/pull/3/#discussion_r1537842062
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "fd8f065", features = [
"pubsub",
"ws",
] }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "fd8f065", features = [
"pubsub",
] }
# alloy-contract.workspace = true
# alloy-network.workspace = true
# alloy-node-bindings.workspace = true
# alloy-provider = { workspace = true, features = ["pubsub", "ws"] }
# alloy-pubsub.workspace = true
# alloy-primitives.workspace = true
# alloy-rpc-client.workspace = true
# alloy-rpc-types.workspace = true
# alloy-sol-types = { workspace = true }

eyre.workspace = true
futures-util = "0.3"
reqwest.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
109 changes: 109 additions & 0 deletions examples/subscriptions/examples/event_multiplexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! Example of multiplexing watching event logs.

use alloy::{network::Ethereum, node_bindings::Anvil, primitives::I256, sol, sol_types::SolEvent};
use alloy_provider::RootProvider;
use alloy_rpc_client::RpcClient;
use eyre::Result;
use futures_util::StreamExt;
use std::str::FromStr;

sol!(
#[derive(Debug)]
#[sol(rpc, bytecode = "0x608060405234801561001057600080fd5b50610485806100206000396000f3fe608060405234801561001057600080fd5b506004361061004c5760003560e01c80634350913814610051578063a5f3c23b1461006d578063adefc37b14610089578063bbe93d91146100a5575b600080fd5b61006b60048036038101906100669190610248565b6100c1565b005b61008760048036038101906100829190610248565b610114565b005b6100a3600480360381019061009e9190610248565b610167565b005b6100bf60048036038101906100ba9190610248565b6101ba565b005b80826100cd91906102e6565b3373ffffffffffffffffffffffffffffffffffffffff167f1c1e8bbe327890ea8d3f5b22370a56c3fcef7ff82f306161f64647fe5d28588160405160405180910390a35050565b80826101209190610350565b3373ffffffffffffffffffffffffffffffffffffffff167f6da406ea462447ed7804b4a4dc69c67b53d3d45a50381ae3e9cf878c9d7c23df60405160405180910390a35050565b80826101739190610394565b3373ffffffffffffffffffffffffffffffffffffffff167f32e913bf2ad35da1e845597618bb9f3f80642a68dd39f30a093a7838aa61fb2760405160405180910390a35050565b80826101c691906103d7565b3373ffffffffffffffffffffffffffffffffffffffff167fd7a123d4c8e44db3186e04b9c96c102287276929c930f2e8abcaa555ef5dcacc60405160405180910390a35050565b600080fd5b6000819050919050565b61022581610212565b811461023057600080fd5b50565b6000813590506102428161021c565b92915050565b6000806040838503121561025f5761025e61020d565b5b600061026d85828601610233565b925050602061027e85828601610233565b9150509250929050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601260045260246000fd5b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006102f182610212565b91506102fc83610212565b92508261030c5761030b610288565b5b600160000383147f800000000000000000000000000000000000000000000000000000000000000083141615610345576103446102b7565b5b828205905092915050565b600061035b82610212565b915061036683610212565b92508282019050828112156000831216838212600084121516171561038e5761038d6102b7565b5b92915050565b600061039f82610212565b91506103aa83610212565b92508282039050818112600084121682821360008512151617156103d1576103d06102b7565b5b92915050565b60006103e282610212565b91506103ed83610212565b92508282026103fb81610212565b91507f80000000000000000000000000000000000000000000000000000000000000008414600084121615610433576104326102b7565b5b8282058414831517610448576104476102b7565b5b509291505056fea2646970667358221220386c6c77ebc5f1bae50f37d123c5a510f2f678b30900c2d5ebf09f68c9353f4b64736f6c63430008180033")]
contract EventMultiplexer {
event Add(address indexed sender, int256 indexed value);
event Sub(address indexed sender, int256 indexed value);
event Mul(address indexed sender, int256 indexed value);
event Div(address indexed sender, int256 indexed value);

function add(int256 a, int256 b) public {
emit Add(msg.sender, a + b);
}

function sub(int256 a, int256 b) public {
emit Sub(msg.sender, a - b);
}

function mul(int256 a, int256 b) public {
emit Mul(msg.sender, a * b);
}

function div(int256 a, int256 b) public {
emit Div(msg.sender, a / b);
}
}
);

#[tokio::main]
async fn main() -> Result<()> {
let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let provider = RootProvider::<Ethereum, _>::new(RpcClient::connect_pubsub(ws).await?);

let deployed_contract = EventMultiplexer::deploy(provider.clone()).await?;

println!("Deployed contract at: {:?}", deployed_contract.address());

let add_filter = deployed_contract.Add_filter().watch().await?;
let sub_filter = deployed_contract.Sub_filter().watch().await?;
let mul_filter = deployed_contract.Mul_filter().watch().await?;
let div_filter = deployed_contract.Div_filter().watch().await?;

let a = I256::from_str("1").unwrap();
let b = I256::from_str("1").unwrap();
// Build calls
let add_call = deployed_contract.add(a, b);
let sub_call = deployed_contract.sub(a, b);
let mul_call = deployed_contract.mul(a, b);
let div_call = deployed_contract.div(a, b);

// Send calls
let _ = add_call.send().await?;
let _ = sub_call.send().await?;
let _ = mul_call.send().await?;
let _ = div_call.send().await?;

let mut add_stream = add_filter.into_stream();
let mut sub_stream = sub_filter.into_stream();
let mut mul_stream = mul_filter.into_stream();
let mut div_stream = div_filter.into_stream();

let add_log = &EventMultiplexer::Add::SIGNATURE_HASH;
let sub_log = &EventMultiplexer::Sub::SIGNATURE_HASH;
let mul_log = &EventMultiplexer::Mul::SIGNATURE_HASH;
let div_log = &EventMultiplexer::Div::SIGNATURE_HASH;

// Use tokio::select! to multiplex the streams and capture the log
// tokio::select! will return the first event that arrives from any of the streams
// The for loop helps capture all the logs
for _ in 0..4 {
let log = tokio::select! {
Some(log) = add_stream.next() => {
log.unwrap().1
}
Some(log) = sub_stream.next() => {
log.unwrap().1
}
Some(log) = mul_stream.next() => {
log.unwrap().1
}
Some(log) = div_stream.next() => {
log.unwrap().1
}
};

let topic = &log.topics[0];

if topic == add_log {
println!("Received Add: {:?}", log);
} else if topic == sub_log {
println!("Received Sub: {:?}", log);
} else if topic == mul_log {
println!("Received Mul: {:?}", log);
} else if topic == div_log {
println!("Received Div: {:?}", log);
}
}

Ok(())
}
30 changes: 30 additions & 0 deletions examples/subscriptions/examples/subscribe_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Example of subscribing to blocks and watching blocks.

use alloy::{network::Ethereum, node_bindings::Anvil};
use alloy_provider::{Provider, RootProvider};
use alloy_rpc_client::RpcClient;
use eyre::Result;
use futures_util::{stream, StreamExt};

#[tokio::main]
async fn main() -> Result<()> {
let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let provider = RootProvider::<Ethereum, _>::new(RpcClient::connect_pubsub(ws).await?);

let sub = provider.subscribe_blocks().await?;
let mut stream = sub.into_stream().take(2);
Comment on lines +15 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always call into_stream()? Should we return that in the subscribe_blocks return type instead @DaniPopes by impl Stream on the type?


while let Some(block) = stream.next().await {
println!("Subscribed Block: {:?}", block.header.number);
}

let poller = provider.watch_blocks().await?;
let mut stream = poller.into_stream().flat_map(stream::iter).take(2);

while let Some(block_hash) = stream.next().await {
println!("Watched Block: {:?}", block_hash);
}

Ok(())
}
83 changes: 83 additions & 0 deletions examples/subscriptions/examples/watch_contract_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Example of watching contract events.

use alloy::{network::Ethereum, node_bindings::Anvil, sol};
use alloy_provider::RootProvider;
use alloy_rpc_client::RpcClient;
use eyre::Result;
use futures_util::StreamExt;

sol!(
#[sol(rpc, bytecode = "0x60806040526000805534801561001457600080fd5b50610260806100246000396000f3fe608060405234801561001057600080fd5b50600436106100415760003560e01c80632baeceb71461004657806361bc221a14610050578063d09de08a1461006e575b600080fd5b61004e610078565b005b6100586100d9565b6040516100659190610159565b60405180910390f35b6100766100df565b005b600160008082825461008a91906101a3565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167fdc69c403b972fc566a14058b3b18e1513da476de6ac475716e489fae0cbe4a2660405160405180910390a3565b60005481565b60016000808282546100f191906101e6565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167ff6d1d8d205b41f9fb9549900a8dba5d669d68117a3a2b88c1ebc61163e8117ba60405160405180910390a3565b6000819050919050565b61015381610140565b82525050565b600060208201905061016e600083018461014a565b92915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101ae82610140565b91506101b983610140565b92508282039050818112600084121682821360008512151617156101e0576101df610174565b5b92915050565b60006101f182610140565b91506101fc83610140565b92508282019050828112156000831216838212600084121516171561022457610223610174565b5b9291505056fea26469706673582212208d0d34c26bfd2938ff07dd54c3fcc2bc4509e4ae654edff58101e5e7ab8cf18164736f6c63430008180033")]
contract EventExample {
Comment on lines +10 to +11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we ship compiler it'll be interesting to see if we can bundle it with the sol macro

int256 public counter = 0;

event Increment(address indexed by, int256 indexed value);
event Decrement(address indexed by, int256 indexed value);

function increment() public {
counter += 1;
emit Increment(msg.sender, counter);
}

function decrement() public {
counter -= 1;
emit Decrement(msg.sender, counter);
}
}
);

#[tokio::main]
async fn main() -> Result<()> {
let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let provider = RootProvider::<Ethereum, _>::new(RpcClient::connect_pubsub(ws).await?);

let deployed_contract = EventExample::deploy(provider.clone()).await?;

println!("Deployed contract at: {:?}", deployed_contract.address());

let increment_filter = deployed_contract.Increment_filter().watch().await?;
let decrement_filter = deployed_contract.Decrement_filter().watch().await?;

// Build a call to increment the counter
let increment_call = deployed_contract.increment();
// Build a call to decrement the counter
let decrement_call = deployed_contract.decrement();
// Send the call 2 times
for _ in 0..2 {
let _ = increment_call.send().await?;
let _ = decrement_call.send().await?;
}

increment_filter
.into_stream()
.take(2)
.for_each(|log| async {
match log {
Ok((_event, log)) => {
println!("Received Increment: {:?}", log);
}
Err(e) => {
println!("Error: {:?}", e);
}
}
})
.await;

decrement_filter
.into_stream()
.take(2)
.for_each(|log| async {
match log {
Ok((_event, log)) => {
println!("Received Decrement: {:?}", log);
}
Err(e) => {
println!("Error: {:?}", e);
}
}
})
.await;

Ok(())
}
Loading