Skip to content

Commit

Permalink
rebuilding agents and filter request
Browse files Browse the repository at this point in the history
Tests pass again and cleaned up some of the set up process.
  • Loading branch information
Autoparallel committed Aug 1, 2023
1 parent 574152d commit 7605f42
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 103 deletions.
65 changes: 39 additions & 26 deletions core/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,33 @@ use std::sync::Arc;

use ethers::providers::Middleware;

use crate::{environment::Connection, middleware::RevmMiddleware};
use crate::middleware::RevmMiddleware;

pub struct Agent<M: Middleware> {
pub name: String,
pub client: Arc<M>,
pub behaviors: Vec<Box<dyn Behavior>>,
pub trait Attached {
type Client;
}

impl<M: Middleware> std::fmt::Debug for Agent<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Agent")
.field("name", &self.name)
.field("client", &self.client)
.finish()
}
pub struct IsAttached<M: Middleware> {
marker: std::marker::PhantomData<M>,
}
pub struct NotAttached {}
impl<M: Middleware> Attached for IsAttached<M> {
type Client = Arc<M>;
}
impl Attached for NotAttached {
type Client = ();
}

impl Agent<RevmMiddleware> {
pub(crate) fn new_simulation_agent(name: String, connection: Connection) -> Self {
Self {
name: name.clone(),
client: Arc::new(RevmMiddleware::new(connection, name)),
behaviors: vec![],
}
}
pub struct Agent<A: Attached> {
pub name: String,
pub client: A::Client,
pub behaviors: Vec<Box<dyn Behavior>>,
}

impl<M: Middleware> Agent<M> {
pub fn new(name: String, middleware: M) -> Self {
impl Agent<NotAttached> {
pub(crate) fn new<S: Into<String>>(name: S) -> Self {
Self {
name,
client: Arc::new(middleware),
name: name.into(),
client: (),
behaviors: vec![],
}
}
Expand All @@ -47,6 +42,24 @@ impl<M: Middleware> Agent<M> {
{
self.behaviors.push(Box::new(behavior));
}

pub fn attach_to_client<M: Middleware>(self, client: Arc<M>) -> Agent<IsAttached<M>> {
Agent::<IsAttached<M>> {
name: self.name,
client,
behaviors: self.behaviors,
}
}

pub fn attach_to_environment(self, environment: &mut crate::environment::Environment) {
let middleware = RevmMiddleware::new(&self, environment);
let agent = Agent::<IsAttached<RevmMiddleware>> {
name: self.name,
client: Arc::new(middleware),
behaviors: self.behaviors,
};
environment.add_agent(agent);
}
}

// TODO: Note -- Artemis uses a `process_event` function that returns an `Option<Action>` for something to happen.
Expand Down Expand Up @@ -95,7 +108,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn agent_behavior() {
let name = TEST_AGENT_NAME.to_string();
let mut agent = Agent::new(name, TestMiddleware {});
let mut agent = Agent::new(name);

// Add a behavior of the first type.
let data = TEST_BEHAVIOR_DATA.to_string();
Expand Down
75 changes: 54 additions & 21 deletions core/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
#![warn(unsafe_code)]

use crossbeam_channel::{unbounded, Receiver, Sender};
use ethers::providers::{JsonRpcClient, ProviderError};
use revm::{
db::{CacheDB, EmptyDB},
primitives::{ExecutionResult, Log, TxEnv, U256},
EVM,
};
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Debug,
sync::{Arc, Mutex},
thread,
};

use crate::utils::revm_logs_to_ethers_logs;
use crate::{utils::revm_logs_to_ethers_logs, agent::IsAttached};
use crate::{agent::Agent, middleware::RevmMiddleware};

/// Type Aliases for the event channel.
Expand All @@ -36,52 +38,83 @@ pub struct Environment {
pub label: String,
pub(crate) state: State,
pub(crate) evm: EVM<CacheDB<EmptyDB>>,
pub connection: Connection,
pub(crate) tx_sender: TxEnvSender,
tx_receiver: TxEnvReceiver,
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
/// Clients (Agents) in the environment
pub clients: Vec<Arc<Agent<RevmMiddleware>>>,
pub agents: Vec<Agent<IsAttached<RevmMiddleware>>>,
// pub deployed_contracts: HashMap<String, Contract<RevmMiddleware>>,
}

#[derive(Debug, Clone)]
pub struct Connection {
// TODO: If the provider holds the connection then this can work better.
pub struct RevmProvider {
pub(crate) tx_sender: TxEnvSender,
tx_receiver: TxEnvReceiver,
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
pub(crate) result_sender: crossbeam_channel::Sender<ExecutionResult>,
pub(crate) result_receiver: crossbeam_channel::Receiver<ExecutionResult>,
pub(crate) event_receiver: crossbeam_channel::Receiver<Vec<ethers::types::Log>>,
}

impl Debug for RevmProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RevmProvider").finish()
}
}

#[async_trait::async_trait]
impl JsonRpcClient for RevmProvider {
type Error = ProviderError;

async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
&self,
method: &str,
params: T,
) -> Result<R, ProviderError> {
match method {
"eth_getFilterChanges" => {
let logs = self.event_receiver.recv().unwrap();
println!("logs: {:?}", logs);
let logs_str = serde_json::to_string(&logs).unwrap();
let logs_deserializeowned: R = serde_json::from_str(&logs_str)?;
return Ok(logs_deserializeowned);
// return Ok(serde::to_value(self.event_receiver.recv().ok()).unwrap())
},
_ => {
unimplemented!("We don't cover this case yet.")
}
}
}
}

impl Environment {
pub(crate) fn new(label: String) -> Self {
pub(crate) fn new<S: Into<String>>(label: S) -> Self {
let mut evm = EVM::new();
let db = CacheDB::new(EmptyDB {});
evm.database(db);
evm.env.cfg.limit_contract_code_size = Some(0x100000); // This is a large contract size limit, beware!
evm.env.block.gas_limit = U256::MAX;
let transaction_channel = unbounded::<(ToTransact, TxEnv, Sender<ExecutionResult>)>();
let connection = Connection {
tx_sender: transaction_channel.0,
tx_receiver: transaction_channel.1,
event_broadcaster: Arc::new(Mutex::new(EventBroadcaster::new())),
};
let (tx_sender, tx_receiver) = unbounded::<(ToTransact, TxEnv, Sender<ExecutionResult>)>();
Self {
label,
label: label.into(),
state: State::Stopped,
evm,
connection,
clients: vec![],
tx_sender,
tx_receiver,
event_broadcaster: Arc::new(Mutex::new(EventBroadcaster::new())),
agents: vec![],
}
}
// TODO: We need to make this the way to add agents to the environment.
// in `agent.rs` we have `new_simulation_agent` which should probably just be called from this function instead.
// OR agents can be created (without a connection?) and then added to the environment where they will gain a connection?
pub fn add_agent(&mut self, agent: Agent<RevmMiddleware>) {
self.clients.push(Arc::new(agent));
pub fn add_agent(&mut self, agent: Agent<IsAttached<RevmMiddleware>>) {
self.agents.push(agent);
}

// TODO: Run should now run the agents as well as the evm.
pub(crate) fn run(&mut self) {
let tx_receiver = self.connection.tx_receiver.clone();
let tx_receiver = self.tx_receiver.clone();
let mut evm = self.evm.clone();
let event_broadcaster = self.connection.event_broadcaster.clone();
let event_broadcaster = self.event_broadcaster.clone();
self.state = State::Running;

//give all agents their own thread and let them start watching for their evnts
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ pub mod environment;
pub mod manager;
pub mod math;
pub mod middleware;
#[cfg(test)] //TODO: UNCOMMENT THIS LATER
// #[cfg(test)] //TODO: UNCOMMENT THIS LATER
pub mod tests;
pub mod utils;
80 changes: 48 additions & 32 deletions core/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,68 @@
//! Most of the middleware is essentially a placeholder, but it is necessary to have a middleware to work with bindings more efficiently.

use ethers::prelude::pending_transaction::PendingTxState;
use ethers::providers::{PendingTransaction, Provider, FilterKind};
use ethers::providers::{FilterKind, JsonRpcClient, JsonRpcError, PendingTransaction, Provider};
use ethers::{
prelude::k256::{ecdsa::SigningKey,sha2::{Digest, Sha256}},
prelude::k256::{
ecdsa::SigningKey,
sha2::{Digest, Sha256},
},
prelude::ProviderError,
providers::{FilterWatcher, Middleware, MockProvider},
signers::{Signer, Wallet},
types::{transaction::eip2718::TypedTransaction, Address, BlockId, Bytes, Filter, Log},
};
use rand::rngs::StdRng;
use rand::SeedableRng;
use revm::primitives::{CreateScheme, ExecutionResult, Output, TransactTo, TxEnv, B160, U256};
use revm::primitives::{
result, CreateScheme, ExecutionResult, Output, TransactTo, TxEnv, B160, U256,
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::time::Duration;

use crate::environment::Connection;
use crate::agent::{Agent, NotAttached};
use crate::environment::{Environment, RevmProvider};
use crate::utils::{recast_address, revm_logs_to_ethers_logs};

// TODO: Refactor the connection and channels slightly to be more intuitive. For instance, the middleware may not really need to own a connection, but input one to set up everything else?
#[derive(Debug)]
pub struct RevmMiddleware {
provider: Provider<MockProvider>,
connection: Connection,
provider: Provider<RevmProvider>,
wallet: Wallet<SigningKey>,
result_sender: crossbeam_channel::Sender<ExecutionResult>,
result_receiver: crossbeam_channel::Receiver<ExecutionResult>,
event_receiver: crossbeam_channel::Receiver<Vec<Log>>,
}

impl RevmMiddleware {
pub fn new(connection: Connection, name: String) -> Self {
let provider = Provider::new(MockProvider::new());
let mut hasher = Sha256::new();
hasher.update(name.as_bytes());
let seed = hasher.finalize();
let mut rng = StdRng::from_seed(seed.into());
let wallet = Wallet::new(&mut rng);
let (result_sender, result_receiver) = crossbeam_channel::unbounded();
pub fn new(agent: &Agent<NotAttached>, environment: &Environment) -> Self {
let (event_sender, event_receiver) = crossbeam_channel::unbounded();
connection
environment
.event_broadcaster
.lock()
.unwrap()
.add_sender(event_sender);
Self {
provider,
connection,
wallet,
let tx_sender = environment.tx_sender.clone();
let (result_sender, result_receiver) = crossbeam_channel::unbounded();
let revm_provider = RevmProvider {
tx_sender,
result_sender,
result_receiver,
event_receiver,
}
};
let provider = Provider::new(revm_provider);
let mut hasher = Sha256::new();
hasher.update(agent.name.as_bytes());
let seed = hasher.finalize();
let mut rng = StdRng::from_seed(seed.into());
let wallet = Wallet::new(&mut rng);
Self { provider, wallet }
}
}

#[async_trait::async_trait]
impl Middleware for RevmMiddleware {
/// The JSON-RPC client type at the bottom of the stack
type Provider = MockProvider;
type Provider = RevmProvider;
/// Error type returned by most operations
type Error = ProviderError; //RevmMiddlewareError;
/// The next-lower middleware in the middleware stack
Expand Down Expand Up @@ -106,11 +110,16 @@ impl Middleware for RevmMiddleware {
nonce: None,
access_list: Vec::new(),
};
self.connection
self.provider
.as_ref()
.tx_sender
.send((true, tx_env.clone(), self.result_sender.clone()))
.send((
true,
tx_env.clone(),
self.provider.as_ref().result_sender.clone(),
))
.unwrap();
let result = self.result_receiver.recv().unwrap();
let result = self.provider.as_ref().result_receiver.recv().unwrap();
let (output, revm_logs) = match result.clone() {
ExecutionResult::Success { output, logs, .. } => (output, logs),
ExecutionResult::Revert { output, .. } => panic!("Failed due to revert: {:?}", output),
Expand Down Expand Up @@ -165,11 +174,16 @@ impl Middleware for RevmMiddleware {
access_list: Vec::new(),
};
// TODO: Modify this to work for calls/deploys
self.connection
self.provider
.as_ref()
.tx_sender
.send((false, tx_env.clone(), self.result_sender.clone()))
.send((
false,
tx_env.clone(),
self.provider.as_ref().result_sender.clone(),
))
.unwrap();
let result = self.result_receiver.recv().unwrap();
let result = self.provider.as_ref().result_receiver.recv().unwrap();
let output = match result.clone() {
ExecutionResult::Success { output, .. } => output,
ExecutionResult::Revert { output, .. } => panic!("Failed due to revert: {:?}", output),
Expand All @@ -189,11 +203,13 @@ impl Middleware for RevmMiddleware {
todo!("we should be able to get logs.")
}


// NOTES: It might be good to have individual channels for the EVM to send events to so that an agent can install a filter and the logs can be filtered by the EVM itself.
// This could be handled similarly to how broadcasts are done now and maybe nothing there needs to change except for attaching a filter to the event channels.
// It would be good to also pass to a separate thread to do broadcasting if we aren't already doing that so that the EVM can process while events are being sent out.
async fn new_filter(&self, filter: FilterKind<'_>) -> Result<ethers::types::U256, ProviderError> {
async fn new_filter(
&self,
filter: FilterKind<'_>,
) -> Result<ethers::types::U256, ProviderError> {
todo!()
// let (method, args) = match filter {
// FilterKind::NewBlocks => unimplemented!("We will need to implement this."),
Expand Down
Loading

0 comments on commit 7605f42

Please sign in to comment.