Skip to content

Commit

Permalink
refactor: changes to dips to better support payment collection in ind…
Browse files Browse the repository at this point in the history
…exer-agent
  • Loading branch information
pcarranzav committed Feb 7, 2025
1 parent abcd7fe commit 845f6f3
Show file tree
Hide file tree
Showing 18 changed files with 558 additions and 106 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/dips/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum.workspace = true
build-info.workspace = true
thiserror.workspace = true
anyhow.workspace = true
alloy-rlp = "0.3.10"
Expand All @@ -15,6 +17,7 @@ prost-types.workspace = true
uuid.workspace = true
base64.workspace = true
tokio.workspace = true
sqlx.workspace = true

[build-dependencies]
tonic-build = { workspace = true }
2 changes: 0 additions & 2 deletions crates/dips/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ fn main() {
tonic_build::configure()
.out_dir("src/proto")
.include_file("indexer.rs")
.build_client(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["proto/indexer.proto"], &["proto/"])
.expect("Failed to compile DIPs indexer RPC proto(s)");

tonic_build::configure()
.out_dir("src/proto")
.include_file("gateway.rs")
.build_server(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["proto/gateway.proto"], &["proto"])
.expect("Failed to compile DIPs gateway RPC proto(s)");
Expand Down
2 changes: 1 addition & 1 deletion crates/dips/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@graphprotocol/dips-proto",
"version": "0.1.0",
"version": "0.2.0",
"main": "generated/index.js",
"types": "generated/index.d.ts",
"files": [
Expand Down
2 changes: 1 addition & 1 deletion crates/dips/proto/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

package graphprotocol.gateway.dips;

service DipsService {
service DipperService {
/**
* Cancel an _indexing agreement_.
*
Expand Down
2 changes: 1 addition & 1 deletion crates/dips/proto/indexer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

package graphprotocol.indexer.dips;

service DipsService {
service IndexerDipsService {
/**
* Propose a new _indexing agreement_ to an _indexer_.
*
Expand Down
70 changes: 39 additions & 31 deletions crates/service/src/database/dips.rs → crates/dips/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::str::FromStr;

use axum::async_trait;
use build_info::chrono::{DateTime, Utc};
use indexer_dips::{
store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};
use sqlx::{types::BigDecimal, PgPool};
use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType};
use uuid::Uuid;

use crate::{
store::{AgreementStore, StoredIndexingAgreement},
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};

#[derive(Debug)]
pub struct PsqlAgreementStore {
pub pool: PgPool,
Expand All @@ -25,10 +27,7 @@ fn uint256_to_bigdecimal(value: &uint256, field: &str) -> Result<BigDecimal, Dip

#[async_trait]
impl AgreementStore for PsqlAgreementStore {
async fn get_by_id(
&self,
id: Uuid,
) -> Result<Option<(SignedIndexingAgreementVoucher, bool)>, DipsError> {
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError> {
let item = sqlx::query!("SELECT * FROM indexing_agreements WHERE id=$1", id,)
.fetch_one(&self.pool)
.await;
Expand All @@ -41,8 +40,18 @@ impl AgreementStore for PsqlAgreementStore {

let signed = SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let metadata =
SubgraphIndexingVoucherMetadata::abi_decode(signed.voucher.metadata.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let cancelled = item.cancelled_at.is_some();
Ok(Some((signed, cancelled)))
Ok(Some(StoredIndexingAgreement {
voucher: signed,
metadata,
cancelled,
current_allocation_id: item.current_allocation_id,
last_allocation_id: item.last_allocation_id,
last_payment_collected_at: item.last_payment_collected_at,
}))
}
async fn create_agreement(
&self,
Expand Down Expand Up @@ -72,7 +81,7 @@ impl AgreementStore for PsqlAgreementStore {
let min_epochs_per_collection: i64 = agreement.voucher.minEpochsPerCollection.into();
let max_epochs_per_collection: i64 = agreement.voucher.maxEpochsPerCollection.into();
sqlx::query!(
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null)",
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null,null)",
id,
agreement.signature.as_ref(),
bs,
Expand Down Expand Up @@ -126,15 +135,15 @@ pub(crate) mod test {
use std::sync::Arc;

use build_info::chrono::Duration;
use indexer_dips::{CancellationRequest, IndexingAgreementVoucher};
use sqlx::PgPool;
use thegraph_core::alloy::{
primitives::{ruint::aliases::U256, Address},
sol_types::{SolType, SolValue},
sol_types::SolValue,
};
use uuid::Uuid;

use super::*;
use crate::{CancellationRequest, IndexingAgreementVoucher};

#[sqlx::test(migrations = "../../migrations")]
async fn test_store_agreement(pool: PgPool) {
Expand Down Expand Up @@ -226,19 +235,15 @@ pub(crate) mod test {
.unwrap();

// Retrieve agreement
let (retrieved_signed_voucher, cancelled) = store.get_by_id(id).await.unwrap().unwrap();

let retrieved_voucher = &retrieved_signed_voucher.voucher;
let retrieved_metadata =
<indexer_dips::SubgraphIndexingVoucherMetadata as SolType>::abi_decode(
retrieved_voucher.metadata.as_ref(),
true,
)
.unwrap();
let stored_agreement = store.get_by_id(id).await.unwrap().unwrap();

let retrieved_voucher = &stored_agreement.voucher;
let retrieved_metadata = stored_agreement.metadata;

// Verify retrieved agreement matches original
assert_eq!(retrieved_signed_voucher.signature, agreement.signature);
assert_eq!(retrieved_voucher.signature, agreement.signature);
assert_eq!(
retrieved_voucher.durationEpochs,
retrieved_voucher.voucher.durationEpochs,
agreement.voucher.durationEpochs
);
assert_eq!(retrieved_metadata.protocolNetwork, metadata.protocolNetwork);
Expand All @@ -247,26 +252,29 @@ pub(crate) mod test {
retrieved_metadata.subgraphDeploymentId,
metadata.subgraphDeploymentId
);
assert_eq!(retrieved_voucher.payer, agreement.voucher.payer);
assert_eq!(retrieved_voucher.recipient, agreement.voucher.recipient);
assert_eq!(retrieved_voucher.service, agreement.voucher.service);
assert_eq!(retrieved_voucher.voucher.payer, agreement.voucher.payer);
assert_eq!(
retrieved_voucher.voucher.recipient,
agreement.voucher.recipient
);
assert_eq!(retrieved_voucher.voucher.service, agreement.voucher.service);
assert_eq!(
retrieved_voucher.maxInitialAmount,
retrieved_voucher.voucher.maxInitialAmount,
agreement.voucher.maxInitialAmount
);
assert_eq!(
retrieved_voucher.maxOngoingAmountPerEpoch,
retrieved_voucher.voucher.maxOngoingAmountPerEpoch,
agreement.voucher.maxOngoingAmountPerEpoch
);
assert_eq!(
retrieved_voucher.maxEpochsPerCollection,
retrieved_voucher.voucher.maxEpochsPerCollection,
agreement.voucher.maxEpochsPerCollection
);
assert_eq!(
retrieved_voucher.minEpochsPerCollection,
retrieved_voucher.voucher.minEpochsPerCollection,
agreement.voucher.minEpochsPerCollection
);
assert!(!cancelled);
assert!(!stored_agreement.cancelled);
}

#[sqlx::test(migrations = "../../migrations")]
Expand Down
19 changes: 9 additions & 10 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use thegraph_core::alloy::{
sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue},
};

pub mod database;
pub mod proto;
pub mod server;
pub mod store;
Expand Down Expand Up @@ -310,11 +311,11 @@ pub async fn validate_and_cancel_agreement(
decoded_request.request.agreement_id.into(),
))
.await?;
let (agreement, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
if cancelled {
let stored_agreement = result.ok_or(DipsError::AgreementNotFound)?;
if stored_agreement.cancelled {
return Err(DipsError::AgreementCancelled);
}
let expected_signer = agreement.voucher.payer;
let expected_signer = stored_agreement.voucher.voucher.payer;
let id = Uuid::from_bytes(decoded_request.request.agreement_id.into());
decoded_request.validate(domain, &expected_signer)?;

Expand Down Expand Up @@ -391,11 +392,10 @@ mod test {
.unwrap();
assert_eq!(actual_id, id);

let actual = store.get_by_id(actual_id).await.unwrap();
let stored_agreement = store.get_by_id(actual_id).await.unwrap().unwrap();

let (actual_voucher, actual_cancelled) = actual.unwrap();
assert_eq!(voucher, actual_voucher);
assert!(!actual_cancelled);
assert_eq!(voucher, stored_agreement.voucher);
assert!(!stored_agreement.cancelled);
Ok(())
}

Expand Down Expand Up @@ -596,9 +596,8 @@ mod test {
assert_eq!(agreement_id, cancelled_id);

// Verify agreement is cancelled
let result = store.get_by_id(agreement_id).await?;
let (_, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
assert!(cancelled);
let stored_agreement = store.get_by_id(agreement_id).await?.unwrap();
assert!(stored_agreement.cancelled);

Ok(())
}
Expand Down
Loading

0 comments on commit 845f6f3

Please sign in to comment.