Skip to content

Commit

Permalink
Merge pull request #739 from primitivefinance/arbiter-engine/world-co…
Browse files Browse the repository at this point in the history
…nnection

arbiter-engine: worlds and messaging layer
  • Loading branch information
0xJepsen authored Dec 8, 2023
2 parents 59e7577 + 427743a commit 7b0ce75
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 38 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ proc-macro2 = { version = "=1.0.69" }
tokio = { version = "1.34.0", features = ["macros", "full"] }
arbiter-core = { path = "./arbiter-core" }
crossbeam-channel = { version = "=0.5.8" }
futures-util = { version = "=0.3.29" }
async-trait = { version = "0.1.74" }

# Dependencies for the release build
[dependencies]
Expand Down
5 changes: 2 additions & 3 deletions arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ serde.workspace = true
serde_json.workspace = true

# Concurrency/async
# tokio = { version = "1.32.0", features = ["macros", "full"] }
tokio.workspace = true
async-trait = { version = "0.1.74" }
async-trait.workspace = true
crossbeam-channel = { version = "=0.5.8" }
futures-timer = { version = "=3.0.2" }
futures-locks = { version = "=0.7.1" }
Expand All @@ -37,7 +36,7 @@ statrs = { version = "=0.16.0" }
thiserror.workspace = true

# Logging
futures-util = { version = "=0.3.29" }
futures-util.workspace = true
tracing = "0.1.40"

# File types
Expand Down
14 changes: 14 additions & 0 deletions arbiter-core/src/middleware/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ pub struct Connection {
pub(crate) filter_receivers: Arc<Mutex<HashMap<ethers::types::U256, FilterReceiver>>>,
}

impl From<&Environment> for Connection {
fn from(environment: &Environment) -> Self {
let instruction_sender = &Arc::clone(&environment.socket.instruction_sender);
let (outcome_sender, outcome_receiver) = crossbeam_channel::unbounded();
Self {
instruction_sender: Arc::downgrade(instruction_sender),
outcome_sender,
outcome_receiver,
event_broadcaster: Arc::clone(&environment.socket.event_broadcaster),
filter_receivers: Arc::new(Mutex::new(HashMap::new())),
}
}
}

#[async_trait::async_trait]
impl JsonRpcClient for Connection {
type Error = ProviderError;
Expand Down
24 changes: 11 additions & 13 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ impl RevmMiddleware {
environment: &Environment,
seed_and_label: Option<&str>,
) -> Result<Arc<Self>, RevmMiddlewareError> {
let instruction_sender = &Arc::clone(&environment.socket.instruction_sender);
let (outcome_sender, outcome_receiver) = crossbeam_channel::unbounded();
let connection = Connection::from(environment);
let wallet = if let Some(seed) = seed_and_label {
let mut hasher = Sha256::new();
hasher.update(seed);
Expand All @@ -159,24 +158,23 @@ impl RevmMiddleware {
let mut rng = rand::thread_rng();
Wallet::new(&mut rng)
};
instruction_sender
connection
.instruction_sender
.upgrade()
.ok_or(errors::RevmMiddlewareError::Send(
"Environment is offline!".to_string(),
))?
.send(Instruction::AddAccount {
address: wallet.address(),
outcome_sender: outcome_sender.clone(),
outcome_sender: connection.outcome_sender.clone(),
})
.map_err(|e| RevmMiddlewareError::Send(e.to_string()))?;
outcome_receiver.recv()??;
connection.outcome_receiver.recv()??;

let connection = Connection {
instruction_sender: Arc::downgrade(instruction_sender),
outcome_sender,
outcome_receiver: outcome_receiver.clone(),
event_broadcaster: Arc::clone(&environment.socket.event_broadcaster),
filter_receivers: Arc::new(Mutex::new(HashMap::new())),
};
let provider = Provider::new(connection);
info!(
"Created new `RevmMiddleware` instance attached to environment labeled: {:?}",
"Created new `RevmMiddleware` instance attached to environment labeled:
{:?}",
environment.parameters.label
);
Ok(Arc::new(Self {
Expand Down
8 changes: 6 additions & 2 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ arbiter-core.workspace = true
arbiter-bindings = { path = "../arbiter-bindings" }
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git"}
crossbeam-channel.workspace = true

[dev-dependencies]
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
anyhow = { version = "=1.0.75" }
async-stream = "0.3.5"
67 changes: 47 additions & 20 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
// NOTES: Each agent essentially has its own engine. We can collect all of the
// engines together into a world.
#![warn(missing_docs)]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// TODO: Notes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// We may need traits for Events and Actions (e.g., "Event" and "Action"
// which have a method like "parse()" and "produce()" or something.).
// Need an init signal or something.
// We can give agents a "calculator" evm to send "Actions" to when they are just
// doing compute so they aren't blocking the main tx thread.
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

// AGENT SHOULD BE A STRUCT WITH A STRATEGY

// CAN GIVE AGENT A CALCULATOR EVM TOO!

// Can probably use the MempoolExecutor from artemis
//! The agent module contains the core agent abstraction for the Arbiter Engine.
use artemis_core::{
engine::Engine,
types::{Collector, Executor},
};
use crossbeam_channel::{Receiver, Sender};

struct Instruction(String);

/// An agent is an entity capable of processing events and producing actions.
/// These are the core actors in simulations or in onchain systems.
/// Agents can be connected of other agents either as a dependent, or a
/// dependency.
pub struct Agent<E, A> {
engine: Engine<E, A>,
_dependencies: Vec<Receiver<Instruction>>,
_dependents: Vec<Sender<Instruction>>,
/// Identifier for this agent.
/// Used for routing messages.
_id: String,

/// The engine that this agent uses to process events and produce actions.
engine: Engine<E, A>, /* Note, agent shouldn't NEED a client as a field as the engine can
* handle this. */

/// Agents that this agent depends on.
dependencies: Vec<String>,

/// Agents that depend on this agent.
dependents: Vec<String>,
}

impl<E, A> Agent<E, A>
Expand All @@ -27,21 +41,37 @@ where
A: Send + Clone + 'static + std::fmt::Debug,
{
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
/// Produces a new agent with the given identifier.
pub fn new(id: &str) -> Self {
Self {
_id: id.to_owned(),
engine: Engine::new(),
_dependencies: vec![],
_dependents: vec![],
dependencies: vec![],
dependents: vec![],
}
}

/// Adds a collector to the agent's engine.
pub fn add_collector(&mut self, collector: impl Collector<E> + 'static) {
self.engine.add_collector(Box::new(collector));
}

/// Adds an executor to the agent's engine.
pub fn add_executor(&mut self, executor: impl Executor<A> + 'static) {
self.engine.add_executor(Box::new(executor));
}

/// Adds a dependency to the agent.
/// Dependencies are agents that this agent depends on.
pub fn add_dependency(&mut self, dependency: &str) {
self.dependencies.push(dependency.to_owned());
}

/// Adds a dependent to the agent.
/// Dependents are agents that depend on this agent.
pub fn add_dependent(&mut self, dependent: &str) {
self.dependents.push(dependent.to_owned());
}
}

#[cfg(test)]
Expand Down Expand Up @@ -71,7 +101,7 @@ mod tests {
.unwrap();

// Build the agent
let mut agent = Agent::new();
let mut agent = Agent::new("test");
let collector = LogCollector::new(client.clone(), arb.transfer_filter().filter);
agent.add_collector(collector);
let executor = MempoolExecutor::new(client.clone());
Expand All @@ -82,8 +112,5 @@ mod tests {
tx,
gas_bid_info: None,
};

// TODO: We should write a test that runs the agent's engine in some
// meaningful way.
}
}
115 changes: 115 additions & 0 deletions arbiter-engine/src/examples.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#![warn(missing_docs)]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// TODO: Notes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Create a BlockAdmin and a TokenAdmin.
// Potentially create an `Orchestrator`` that sends instructions to both
// BlockAdmin and TokenAdmin.
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

//! The examples module contains example strategies.
use std::{collections::HashMap, sync::Arc};

use arbiter_bindings::bindings::arbiter_token::ArbiterToken;
use arbiter_core::middleware::RevmMiddleware;
use artemis_core::{
executors::mempool_executor::SubmitTxToMempool,
types::{Executor, Strategy},
};
use ethers::types::{Address, U256};

use super::*;
use crate::messager::Message;

/// A block executor that updates the block number and timestamp in the
/// database.
pub struct BlockExecutor {
client: Arc<RevmMiddleware>,
}

/// Used as an action to set new block number and timestamp.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct NewBlock {
timestamp: u64,
number: u64,
}

// TODO: Consider replacing this with a cheatcode executor.
#[async_trait::async_trait]
impl Executor<NewBlock> for BlockExecutor {
async fn execute(&self, new_block: NewBlock) -> Result<()> {
let _receipt_data = self
.client
.update_block(new_block.number, new_block.timestamp)?;
Ok(())
}
}

// TODO: This may not be necessary in this way.
/// The block admin is responsible for sending new block events to the block
/// executor.
pub struct BlockAdmin {
/// The identifier of the block admin.
pub id: String, // TODO: The strategies should not really need an ID.
}

#[async_trait::async_trait]
impl Strategy<Message, NewBlock> for BlockAdmin {
async fn sync_state(&mut self) -> Result<()> {
Ok(())
}

async fn process_event(&mut self, event: Message) -> Vec<NewBlock> {
if event.to == self.id {
let new_block: NewBlock = serde_json::from_str(&event.data).unwrap();
vec![new_block]
} else {
vec![]
}
}
}

/// The token admin is responsible for handling token minting requests.
pub struct TokenAdmin {
/// The identifier of the token admin.
pub id: String, // TODO: The strategies should not really need an ID.

/// The tokens that the token admin has control over.
pub tokens: HashMap<String, ArbiterToken<RevmMiddleware>>,
}

/// Used as an action to mint tokens.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TokenRequest {
/// The token to mint.
pub token: String,

/// The address to mint to.
pub mint_to: Address,

/// The amount to mint.
pub mint_amount: u64,
}

#[async_trait::async_trait]
impl Strategy<Message, SubmitTxToMempool> for TokenAdmin {
async fn sync_state(&mut self) -> Result<()> {
Ok(())
}

async fn process_event(&mut self, event: Message) -> Vec<SubmitTxToMempool> {
if event.to == self.id {
let token_request: TokenRequest = serde_json::from_str(&event.data).unwrap();
let token = self.tokens.get(&token_request.token).unwrap();
let tx = SubmitTxToMempool {
tx: token
.mint(token_request.mint_to, U256::from(token_request.mint_amount))
.tx,
gas_bid_info: None,
};
vec![tx]
} else {
vec![]
}
}
}
8 changes: 8 additions & 0 deletions arbiter-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
use std::collections::HashMap;

use anyhow::Result;
use serde::{Deserialize, Serialize};

pub mod agent;
pub mod examples;
pub mod messager;
pub mod world;
Loading

0 comments on commit 7b0ce75

Please sign in to comment.