Skip to content

Commit cfb8358

Browse files
committed
refactor: changes to dips to better support payment collection in indexer-agent
1 parent abcd7fe commit cfb8358

18 files changed

+558
-105
lines changed

.sqlx/query-47f848e049f3ff22e65bb53edc7ddc1646e68d6db58124b6e0d780c037f73513.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa.json renamed to .sqlx/query-5c973791bb27465b198eafbab7d63850617919fa84bfaecb0518fd6d4ab70bec.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-d0258d53c19174720f2d4d9cde481e457931b65eca471efed650b10d70d2282a.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/dips/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7+
axum.workspace = true
8+
build-info.workspace = true
79
thiserror.workspace = true
810
anyhow.workspace = true
911
alloy-rlp = "0.3.10"
@@ -15,6 +17,7 @@ prost-types.workspace = true
1517
uuid.workspace = true
1618
base64.workspace = true
1719
tokio.workspace = true
20+
sqlx.workspace = true
1821

1922
[build-dependencies]
2023
tonic-build = { workspace = true }

crates/dips/build.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ fn main() {
66
tonic_build::configure()
77
.out_dir("src/proto")
88
.include_file("indexer.rs")
9-
.build_client(false)
109
.protoc_arg("--experimental_allow_proto3_optional")
1110
.compile_protos(&["proto/indexer.proto"], &["proto/"])
1211
.expect("Failed to compile DIPs indexer RPC proto(s)");
1312

1413
tonic_build::configure()
1514
.out_dir("src/proto")
1615
.include_file("gateway.rs")
17-
.build_server(false)
1816
.protoc_arg("--experimental_allow_proto3_optional")
1917
.compile_protos(&["proto/gateway.proto"], &["proto"])
2018
.expect("Failed to compile DIPs gateway RPC proto(s)");

crates/dips/proto/gateway.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
package graphprotocol.gateway.dips;
44

5-
service DipsService {
5+
service DipperService {
66
/**
77
* Cancel an _indexing agreement_.
88
*

crates/dips/proto/indexer.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
package graphprotocol.indexer.dips;
44

5-
service DipsService {
5+
service IndexerDipsService {
66
/**
77
* Propose a new _indexing agreement_ to an _indexer_.
88
*

crates/service/src/database/dips.rs renamed to crates/dips/src/database.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ use std::str::FromStr;
55

66
use axum::async_trait;
77
use build_info::chrono::{DateTime, Utc};
8-
use indexer_dips::{
9-
store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
10-
SubgraphIndexingVoucherMetadata,
11-
};
128
use sqlx::{types::BigDecimal, PgPool};
139
use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType};
1410
use uuid::Uuid;
1511

12+
use crate::{
13+
store::{AgreementStore, StoredIndexingAgreement},
14+
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
15+
SubgraphIndexingVoucherMetadata,
16+
};
17+
1618
#[derive(Debug)]
1719
pub struct PsqlAgreementStore {
1820
pub pool: PgPool,
@@ -25,10 +27,7 @@ fn uint256_to_bigdecimal(value: &uint256, field: &str) -> Result<BigDecimal, Dip
2527

2628
#[async_trait]
2729
impl AgreementStore for PsqlAgreementStore {
28-
async fn get_by_id(
29-
&self,
30-
id: Uuid,
31-
) -> Result<Option<(SignedIndexingAgreementVoucher, bool)>, DipsError> {
30+
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError> {
3231
let item = sqlx::query!("SELECT * FROM indexing_agreements WHERE id=$1", id,)
3332
.fetch_one(&self.pool)
3433
.await;
@@ -41,8 +40,18 @@ impl AgreementStore for PsqlAgreementStore {
4140

4241
let signed = SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true)
4342
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
43+
let metadata =
44+
SubgraphIndexingVoucherMetadata::abi_decode(signed.voucher.metadata.as_ref(), true)
45+
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
4446
let cancelled = item.cancelled_at.is_some();
45-
Ok(Some((signed, cancelled)))
47+
Ok(Some(StoredIndexingAgreement {
48+
voucher: signed,
49+
metadata,
50+
cancelled,
51+
current_allocation_id: item.current_allocation_id,
52+
last_allocation_id: item.last_allocation_id,
53+
last_payment_collected_at: item.last_payment_collected_at,
54+
}))
4655
}
4756
async fn create_agreement(
4857
&self,
@@ -72,7 +81,7 @@ impl AgreementStore for PsqlAgreementStore {
7281
let min_epochs_per_collection: i64 = agreement.voucher.minEpochsPerCollection.into();
7382
let max_epochs_per_collection: i64 = agreement.voucher.maxEpochsPerCollection.into();
7483
sqlx::query!(
75-
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null)",
84+
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null,null)",
7685
id,
7786
agreement.signature.as_ref(),
7887
bs,
@@ -126,15 +135,15 @@ pub(crate) mod test {
126135
use std::sync::Arc;
127136

128137
use build_info::chrono::Duration;
129-
use indexer_dips::{CancellationRequest, IndexingAgreementVoucher};
130138
use sqlx::PgPool;
131139
use thegraph_core::alloy::{
132140
primitives::{ruint::aliases::U256, Address},
133-
sol_types::{SolType, SolValue},
141+
sol_types::SolValue,
134142
};
135143
use uuid::Uuid;
136144

137145
use super::*;
146+
use crate::{CancellationRequest, IndexingAgreementVoucher};
138147

139148
#[sqlx::test(migrations = "../../migrations")]
140149
async fn test_store_agreement(pool: PgPool) {
@@ -226,19 +235,15 @@ pub(crate) mod test {
226235
.unwrap();
227236

228237
// Retrieve agreement
229-
let (retrieved_signed_voucher, cancelled) = store.get_by_id(id).await.unwrap().unwrap();
230-
231-
let retrieved_voucher = &retrieved_signed_voucher.voucher;
232-
let retrieved_metadata =
233-
<indexer_dips::SubgraphIndexingVoucherMetadata as SolType>::abi_decode(
234-
retrieved_voucher.metadata.as_ref(),
235-
true,
236-
)
237-
.unwrap();
238+
let stored_agreement = store.get_by_id(id).await.unwrap().unwrap();
239+
240+
let retrieved_voucher = &stored_agreement.voucher;
241+
let retrieved_metadata = stored_agreement.metadata;
242+
238243
// Verify retrieved agreement matches original
239-
assert_eq!(retrieved_signed_voucher.signature, agreement.signature);
244+
assert_eq!(retrieved_voucher.signature, agreement.signature);
240245
assert_eq!(
241-
retrieved_voucher.durationEpochs,
246+
retrieved_voucher.voucher.durationEpochs,
242247
agreement.voucher.durationEpochs
243248
);
244249
assert_eq!(retrieved_metadata.protocolNetwork, metadata.protocolNetwork);
@@ -247,26 +252,29 @@ pub(crate) mod test {
247252
retrieved_metadata.subgraphDeploymentId,
248253
metadata.subgraphDeploymentId
249254
);
250-
assert_eq!(retrieved_voucher.payer, agreement.voucher.payer);
251-
assert_eq!(retrieved_voucher.recipient, agreement.voucher.recipient);
252-
assert_eq!(retrieved_voucher.service, agreement.voucher.service);
255+
assert_eq!(retrieved_voucher.voucher.payer, agreement.voucher.payer);
256+
assert_eq!(
257+
retrieved_voucher.voucher.recipient,
258+
agreement.voucher.recipient
259+
);
260+
assert_eq!(retrieved_voucher.voucher.service, agreement.voucher.service);
253261
assert_eq!(
254-
retrieved_voucher.maxInitialAmount,
262+
retrieved_voucher.voucher.maxInitialAmount,
255263
agreement.voucher.maxInitialAmount
256264
);
257265
assert_eq!(
258-
retrieved_voucher.maxOngoingAmountPerEpoch,
266+
retrieved_voucher.voucher.maxOngoingAmountPerEpoch,
259267
agreement.voucher.maxOngoingAmountPerEpoch
260268
);
261269
assert_eq!(
262-
retrieved_voucher.maxEpochsPerCollection,
270+
retrieved_voucher.voucher.maxEpochsPerCollection,
263271
agreement.voucher.maxEpochsPerCollection
264272
);
265273
assert_eq!(
266-
retrieved_voucher.minEpochsPerCollection,
274+
retrieved_voucher.voucher.minEpochsPerCollection,
267275
agreement.voucher.minEpochsPerCollection
268276
);
269-
assert!(!cancelled);
277+
assert!(!stored_agreement.cancelled);
270278
}
271279

272280
#[sqlx::test(migrations = "../../migrations")]

crates/dips/src/lib.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use thegraph_core::alloy::{
1111
sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue},
1212
};
1313

14+
pub mod database;
1415
pub mod proto;
1516
pub mod server;
1617
pub mod store;
@@ -310,11 +311,11 @@ pub async fn validate_and_cancel_agreement(
310311
decoded_request.request.agreement_id.into(),
311312
))
312313
.await?;
313-
let (agreement, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
314-
if cancelled {
314+
let stored_agreement = result.ok_or(DipsError::AgreementNotFound)?;
315+
if stored_agreement.cancelled {
315316
return Err(DipsError::AgreementCancelled);
316317
}
317-
let expected_signer = agreement.voucher.payer;
318+
let expected_signer = stored_agreement.voucher.voucher.payer;
318319
let id = Uuid::from_bytes(decoded_request.request.agreement_id.into());
319320
decoded_request.validate(domain, &expected_signer)?;
320321

@@ -391,11 +392,10 @@ mod test {
391392
.unwrap();
392393
assert_eq!(actual_id, id);
393394

394-
let actual = store.get_by_id(actual_id).await.unwrap();
395+
let stored_agreement = store.get_by_id(actual_id).await.unwrap().unwrap();
395396

396-
let (actual_voucher, actual_cancelled) = actual.unwrap();
397-
assert_eq!(voucher, actual_voucher);
398-
assert!(!actual_cancelled);
397+
assert_eq!(voucher, stored_agreement.voucher);
398+
assert!(!stored_agreement.cancelled);
399399
Ok(())
400400
}
401401

@@ -596,9 +596,8 @@ mod test {
596596
assert_eq!(agreement_id, cancelled_id);
597597

598598
// Verify agreement is cancelled
599-
let result = store.get_by_id(agreement_id).await?;
600-
let (_, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
601-
assert!(cancelled);
599+
let stored_agreement = store.get_by_id(agreement_id).await?.unwrap();
600+
assert!(stored_agreement.cancelled);
602601

603602
Ok(())
604603
}

0 commit comments

Comments
 (0)