Skip to content

Commit e8d6035

Browse files
committed
fix: no need for collection in tap-agent after all
1 parent c008d3e commit e8d6035

File tree

13 files changed

+9
-141
lines changed

13 files changed

+9
-141
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/config/maximal-config-example.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,3 @@ max_receipts_per_request = 10000
153153
host = "0.0.0.0"
154154
port = "7601"
155155
allowed_payers = ["0x3333333333333333333333333333333333333333"]
156-
dipper_grpc_url = "https://dipper.thegraph.com/"

crates/config/src/config.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,6 @@ pub struct DipsConfig {
390390
pub host: String,
391391
pub port: String,
392392
pub allowed_payers: Vec<Address>,
393-
pub dipper_grpc_url: String,
394393
}
395394

396395
impl Default for DipsConfig {
@@ -399,7 +398,6 @@ impl Default for DipsConfig {
399398
host: "0.0.0.0".to_string(),
400399
port: "7601".to_string(),
401400
allowed_payers: vec![],
402-
dipper_grpc_url: "".to_string(),
403401
}
404402
}
405403
}
@@ -464,7 +462,6 @@ mod tests {
464462
allowed_payers: vec![Address(
465463
FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(),
466464
)],
467-
dipper_grpc_url: "https://dipper.thegraph.com/".to_string(),
468465
..Default::default()
469466
});
470467

crates/dips/src/database.rs

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -50,49 +50,9 @@ impl AgreementStore for PsqlAgreementStore {
5050
cancelled,
5151
current_allocation_id: item.current_allocation_id,
5252
last_allocation_id: item.last_allocation_id,
53+
last_payment_collected_at: item.last_payment_collected_at,
5354
}))
5455
}
55-
async fn get_by_last_allocation_id(
56-
&self,
57-
allocation_id: String,
58-
) -> Result<Vec<StoredIndexingAgreement>, DipsError> {
59-
let items = sqlx::query!(
60-
"SELECT * FROM indexing_agreements WHERE last_allocation_id=$1",
61-
allocation_id,
62-
)
63-
.fetch_all(&self.pool)
64-
.await;
65-
66-
let items = match items {
67-
Ok(items) => items,
68-
Err(sqlx::Error::RowNotFound) => return Ok(vec![]),
69-
Err(err) => return Err(DipsError::UnknownError(err.into())),
70-
};
71-
72-
// Note: we discard any agreements that fail to decode
73-
let agreements = items
74-
.into_iter()
75-
.map(|item| {
76-
let signed =
77-
SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true)
78-
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
79-
let metadata = SubgraphIndexingVoucherMetadata::abi_decode(
80-
signed.voucher.metadata.as_ref(),
81-
true,
82-
)
83-
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
84-
Ok(StoredIndexingAgreement {
85-
voucher: signed,
86-
metadata,
87-
cancelled: item.cancelled_at.is_some(),
88-
current_allocation_id: item.current_allocation_id,
89-
last_allocation_id: item.last_allocation_id,
90-
})
91-
})
92-
.filter_map(|agreement: Result<StoredIndexingAgreement, DipsError>| agreement.ok())
93-
.collect();
94-
Ok(agreements)
95-
}
9656
async fn create_agreement(
9757
&self,
9858
agreement: SignedIndexingAgreementVoucher,
@@ -121,7 +81,7 @@ impl AgreementStore for PsqlAgreementStore {
12181
let min_epochs_per_collection: i64 = agreement.voucher.minEpochsPerCollection.into();
12282
let max_epochs_per_collection: i64 = agreement.voucher.maxEpochsPerCollection.into();
12383
sqlx::query!(
124-
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null)",
84+
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null,null)",
12585
id,
12686
agreement.signature.as_ref(),
12787
bs,

crates/dips/src/lib.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ pub mod store;
2020

2121
use store::AgreementStore;
2222
use thiserror::Error;
23-
use tonic::transport::Channel;
2423
use uuid::Uuid;
2524

2625
/// The Arbitrum One (mainnet) chain ID (eip155).
@@ -153,8 +152,6 @@ pub enum DipsError {
153152
AgreementCancelled,
154153
#[error("invalid voucher: {0}")]
155154
InvalidVoucher(String),
156-
#[error("connection error: {0}")]
157-
ConnectionError(#[from] tonic::transport::Error),
158155
}
159156

160157
// TODO: send back messages
@@ -329,25 +326,6 @@ pub async fn validate_and_cancel_agreement(
329326
Ok(id)
330327
}
331328

332-
pub async fn collect_indexing_agreements(
333-
store: Arc<dyn AgreementStore>,
334-
allocation_id: Address,
335-
_dipper_client: &DipperServiceClient<Channel>,
336-
) -> Result<(), DipsError> {
337-
let agreements = store
338-
.get_by_last_allocation_id(allocation_id.encode_hex())
339-
.await?;
340-
341-
for _agreement in agreements {
342-
// TODO get the entities count for the deployment from graph node
343-
// TODO create collection request
344-
// TODO sign collection request
345-
// TODO send collection request to the dipper via grpc
346-
}
347-
348-
Ok(())
349-
}
350-
351329
#[cfg(test)]
352330
mod test {
353331
use std::{

crates/dips/src/store.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::collections::HashMap;
55

66
use async_trait::async_trait;
7+
use build_info::chrono::{DateTime, Utc};
78
use uuid::Uuid;
89

910
use crate::{
@@ -18,15 +19,12 @@ pub struct StoredIndexingAgreement {
1819
pub cancelled: bool,
1920
pub current_allocation_id: Option<String>,
2021
pub last_allocation_id: Option<String>,
22+
pub last_payment_collected_at: Option<DateTime<Utc>>,
2123
}
2224

2325
#[async_trait]
2426
pub trait AgreementStore: Sync + Send + std::fmt::Debug {
2527
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError>;
26-
async fn get_by_last_allocation_id(
27-
&self,
28-
allocation_id: String,
29-
) -> Result<Vec<StoredIndexingAgreement>, DipsError>;
3028
async fn create_agreement(
3129
&self,
3230
agreement: SignedIndexingAgreementVoucher,
@@ -53,19 +51,6 @@ impl AgreementStore for InMemoryAgreementStore {
5351
.get(&id)
5452
.cloned())
5553
}
56-
async fn get_by_last_allocation_id(
57-
&self,
58-
allocation_id: String,
59-
) -> Result<Vec<StoredIndexingAgreement>, DipsError> {
60-
Ok(self
61-
.data
62-
.try_read()
63-
.map_err(|e| DipsError::UnknownError(e.into()))?
64-
.values()
65-
.filter(|agreement| agreement.last_allocation_id == Some(allocation_id.clone()))
66-
.cloned()
67-
.collect())
68-
}
6954
async fn create_agreement(
7055
&self,
7156
agreement: SignedIndexingAgreementVoucher,
@@ -78,6 +63,7 @@ impl AgreementStore for InMemoryAgreementStore {
7863
cancelled: false,
7964
current_allocation_id: None,
8065
last_allocation_id: None,
66+
last_payment_collected_at: None,
8167
};
8268
self.data
8369
.try_write()

crates/query/graphql/closed_allocations.query.graphql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ query ClosedAllocations(
1919
}
2020
) {
2121
id
22+
poi
2223
}
2324
}

crates/service/src/service.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ pub async fn run() -> anyhow::Result<()> {
124124
host,
125125
port,
126126
allowed_payers,
127-
dipper_grpc_url: _,
128127
} = dips;
129128

130129
let addr = format!("{}:{}", host, port)

crates/tap-agent/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ indexer-monitor = { path = "../monitor" }
1616
indexer-watcher = { path = "../watcher" }
1717
indexer-allocation = { path = "../allocation" }
1818
indexer-config = { path = "../config" }
19-
indexer-dips = { path = "../dips" }
2019
indexer-query = { path = "../query" }
2120
indexer-receipt = { path = "../indexer-receipt" }
2221
anyhow.workspace = true

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,12 @@
44
use std::{
55
collections::{HashMap, HashSet},
66
str::FromStr,
7-
sync::Arc,
87
time::Duration,
98
};
109

1110
use anyhow::Context;
1211
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
1312
use futures::{stream, StreamExt};
14-
use indexer_dips::{
15-
collect_indexing_agreements, database::PsqlAgreementStore,
16-
proto::gateway::graphprotocol::gateway::dips::dipper_service_client::DipperServiceClient,
17-
store::AgreementStore, DipsError,
18-
};
1913
use indexer_monitor::{EscrowAccounts, SubgraphClient};
2014
use indexer_query::{
2115
closed_allocations::{self, ClosedAllocations},
@@ -53,6 +47,7 @@ use crate::{
5347
tap::context::{Horizon, Legacy},
5448
tracker::{SenderFeeTracker, SimpleFeeTracker},
5549
};
50+
5651
lazy_static! {
5752
static ref SENDER_DENIED: IntGaugeVec =
5853
register_int_gauge_vec!("tap_sender_denied", "Sender is denied", &["sender"]).unwrap();
@@ -324,10 +319,6 @@ pub struct State {
324319

325320
// Config forwarded to [SenderAllocation]
326321
config: &'static SenderAccountConfig,
327-
/// Indexing agreement store
328-
agreement_store: Arc<dyn AgreementStore>,
329-
/// Client for the dipper grpc server
330-
dipper_client: Option<DipperServiceClient<Channel>>,
331322
}
332323

333324
/// Configuration derived from config.toml
@@ -352,8 +343,6 @@ pub struct SenderAccountConfig {
352343
///
353344
/// This is reached if the database is too slow
354345
pub tap_sender_timeout: Duration,
355-
/// URL of the dipper grpc server
356-
pub dipper_grpc_url: Option<String>,
357346
}
358347

359348
impl SenderAccountConfig {
@@ -368,10 +357,6 @@ impl SenderAccountConfig {
368357
trigger_value: config.tap.get_trigger_value(),
369358
rav_request_timeout: config.tap.rav_request.request_timeout_secs,
370359
tap_sender_timeout: config.tap.sender_timeout_secs,
371-
dipper_grpc_url: config
372-
.dips
373-
.as_ref()
374-
.map(|dips_config| dips_config.dipper_grpc_url.clone()),
375360
}
376361
}
377362
}
@@ -836,19 +821,6 @@ impl Actor for SenderAccount {
836821
// wiremock_grpc used for tests doesn't support Zstd compression
837822
#[cfg(not(test))]
838823
let aggregator_v2 = aggregator_v2.send_compressed(tonic::codec::CompressionEncoding::Zstd);
839-
840-
let agreement_store = Arc::new(PsqlAgreementStore {
841-
pool: pgpool.clone(),
842-
});
843-
844-
let dipper_client = if let Some(ref url) = config.dipper_grpc_url {
845-
let endpoint = tonic::transport::Endpoint::from_str(url)
846-
.map_err(DipsError::ConnectionError)?
847-
.connect_lazy();
848-
Some(DipperServiceClient::new(endpoint))
849-
} else {
850-
None
851-
};
852824
let state = State {
853825
prefix,
854826
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
@@ -870,8 +842,6 @@ impl Actor for SenderAccount {
870842
aggregator_v2,
871843
backoff_info: BackoffInfo::default(),
872844
config,
873-
agreement_store,
874-
dipper_client,
875845
};
876846

877847
stream::iter(allocation_ids)
@@ -1083,24 +1053,6 @@ impl Actor for SenderAccount {
10831053
if let Some(sender_handle) = ActorRef::<SenderAllocationMessage>::where_is(
10841054
state.format_sender_allocation(&allocation_id.address()),
10851055
) {
1086-
if let Some(ref dipper_client) = state.dipper_client {
1087-
tracing::trace!(%allocation_id, "SenderAccount checking for indexing agreements to collect");
1088-
match collect_indexing_agreements(
1089-
state.agreement_store.clone(),
1090-
allocation_id.address(),
1091-
dipper_client,
1092-
)
1093-
.await
1094-
{
1095-
Ok(_) => {
1096-
tracing::info!(%allocation_id, "Indexing agreements collected")
1097-
}
1098-
Err(e) => {
1099-
tracing::warn!(%allocation_id, error = %e, "Error collecting indexing agreements")
1100-
}
1101-
}
1102-
}
1103-
11041056
tracing::trace!(%allocation_id, "SenderAccount shutting down SenderAllocation");
11051057
// we can not send a rav request to this allocation
11061058
// because it's gonna trigger the last rav

crates/tap-agent/src/test.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig {
8181
indexer_address: INDEXER.1,
8282
escrow_polling_interval: ESCROW_POLLING_INTERVAL,
8383
tap_sender_timeout: Duration::from_secs(63),
84-
dipper_grpc_url: None,
8584
}))
8685
}
8786

@@ -111,7 +110,6 @@ pub async fn create_sender_account(
111110
indexer_address: INDEXER.1,
112111
escrow_polling_interval: Duration::default(),
113112
tap_sender_timeout: TAP_SENDER_TIMEOUT,
114-
dipper_grpc_url: None,
115113
}));
116114

117115
let network_subgraph = Box::leak(Box::new(

crates/tap-agent/tests/tap_agent_test.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ pub async fn start_agent(
8787
indexer_address: INDEXER_ADDRESS,
8888
escrow_polling_interval: Duration::from_secs(10),
8989
tap_sender_timeout: Duration::from_secs(30),
90-
dipper_grpc_url: None,
9190
}));
9291

9392
let args = SenderAccountsManagerArgs {

migrations/20241030141929_dips.up.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS indexing_agreements (
2929
signed_cancellation_payload BYTEA,
3030

3131
current_allocation_id CHAR(40),
32-
last_allocation_id CHAR(40)
32+
last_allocation_id CHAR(40),
33+
last_payment_collected_at TIMESTAMP WITH TIME ZONE
3334
);
3435

3536
CREATE UNIQUE INDEX IX_UNIQ_SIGNATURE_AGREEMENT on indexing_agreements(signature);

0 commit comments

Comments
 (0)