Skip to content

Commit

Permalink
skip handled event
Browse files Browse the repository at this point in the history
  • Loading branch information
nanne007 committed Sep 25, 2023
1 parent e023c42 commit 44346bc
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 13 deletions.
4 changes: 2 additions & 2 deletions crates/aptos-events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ impl AggerQueries {
}
}

pub fn get_query_stream(self) -> impl Stream<Item = AptosResult<UserQuery>> {
pub fn get_query_stream(self, start: u64) -> impl Stream<Item = AptosResult<UserQuery>> {
stream! {
let mut cur = 0;
let mut cur = start;
loop {
let event = self.get_event(cur).await;
match event {
Expand Down
12 changes: 9 additions & 3 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use agger_node::{open_db, proof_responder::ProofResponder};
use agger_prove_dispatcher::{ProveTask, ProvingTaskDispatcher};
use agger_storage::{UserQueryKey, UserQuerySchema, UserQueryValue};
use agger_storage::{AggerStore, UserQueryKey, UserQuerySchema, UserQueryValue};
use aptos_events::{AggerQueries, AptosAccountAddress, AptosBaseUrl};
use clap::Parser;
use futures_util::{pin_mut, StreamExt, TryFutureExt, TryStreamExt};
Expand Down Expand Up @@ -66,7 +66,8 @@ async fn run_server(
agger_address: AptosAccountAddress,
store_path: PathBuf,
) -> anyhow::Result<()> {
let store = Arc::new(open_db(&store_path)?);
let store = open_db(&store_path)?;
let store = Arc::new(AggerStore::new(store));
let proof_responder = ProofResponder::new(store.clone());

let prover_threads = threadpool::Builder::new()
Expand All @@ -82,9 +83,14 @@ async fn run_server(

let event_manager = AggerQueries::new(parse_aptos_url(&aptos_rpc)?, agger_address);

//skip proved event
let query_event_from = store
.last_proved_event_number()?
.map(|x| x + 1)
.unwrap_or(0);
let new_query_event_stream = event_manager
.clone()
.get_query_stream()
.get_query_stream(query_event_from)
.map_err(anyhow::Error::new)
.and_then(|s| {
query_function_resolver
Expand Down
6 changes: 3 additions & 3 deletions crates/node/src/proof_responder.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use agger_contract_types::UserQuery;
use agger_storage::{schemadb::DB, UserQueryProofSchema};
use agger_storage::{AggerStore, UserQueryProofSchema};
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;

/// Responder read proof from store or from message bus, and send it to chain.
pub struct ProofResponder {
db: Arc<DB>,
db: Arc<AggerStore>,
}

impl ProofResponder {
pub fn new(db: Arc<DB>) -> Self {
pub fn new(db: Arc<AggerStore>) -> Self {
Self { db }
}

Expand Down
2 changes: 1 addition & 1 deletion crates/prove-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn run_task(
param,
}: ProveTask,
) -> Result<Vec<u8>> {
let witness = witness(query.clone(), modules, config)?;
let witness = witness(query, modules, config)?;
prove(witness, param, vk)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/prove-dispatcher/src/witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn witness(
let entry_module_address =
move_core_types::account_address::AccountAddress::from_bytes(&query.query.module_address)?;
let entry_module_name = Identifier::from_utf8(query.query.module_name.clone())?;
let entry_function_name = Identifier::from_utf8(query.query.function_name.clone())?;
let entry_function_name = Identifier::from_utf8(query.query.function_name)?;
let entry_module_id = ModuleId::new(entry_module_address, entry_module_name);
let traces = rt
.execute_entry_function(
Expand All @@ -59,7 +59,7 @@ pub fn witness(
if args.is_empty() {
None
} else {
Some(ScriptArguments::new(args.clone()))
Some(ScriptArguments::new(args))
},
&mut state,
)
Expand Down
2 changes: 1 addition & 1 deletion crates/query-module-resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl AggerModuleResolver {

let mut reqs: Vec<_> = reqs
.iter()
.map(|req| self.client.view(&req, Some(version)))
.map(|req| self.client.view(req, Some(version)))
.collect();

let (param, vk, config) = try_join!(
Expand Down
36 changes: 35 additions & 1 deletion crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,43 @@ use agger_contract_types::UserQuery;
pub use aptos_schemadb as schemadb;
use aptos_schemadb::{
schema::{KeyCodec, Schema, ValueCodec},
ColumnFamilyName,
ColumnFamilyName, ReadOptions, DB,
};
use serde::{Deserialize, Serialize};
use std::ops::Deref;

#[derive(Debug)]
pub struct AggerStore {
db: DB,
}

impl Deref for AggerStore {
type Target = DB;

fn deref(&self) -> &Self::Target {
&self.db
}
}

impl AggerStore {
pub fn new(db: DB) -> Self {
Self { db }
}
pub fn last_proved_event_number(&self) -> anyhow::Result<Option<u64>> {
let mut iters = self
.db
.iter::<UserQueryProofSchema>(ReadOptions::default())?;
iters.seek_to_last();
let value = iters.next().transpose()?.map(|(k, _)| k.sequence_number);
Ok(value)
}
pub fn last_seen_event(&self) -> anyhow::Result<Option<UserQuery>> {
let mut iters = self.db.iter::<UserQuerySchema>(ReadOptions::default())?;
iters.seek_to_last();
let value = iters.next().transpose()?.map(|(_k, v)| v.query);
Ok(value)
}
}

pub const QUERY_COLUMN_FAMILY_NAME: &str = "queries";
pub const PROOF_COLUMN_FAMILY_NAME: &str = "proofs";
Expand Down

0 comments on commit 44346bc

Please sign in to comment.