From 845f6f3466c7df9bf4b4db5d96ab76f6358d9084 Mon Sep 17 00:00:00 2001 From: Pablo Carranza Velez Date: Fri, 7 Feb 2025 10:32:25 -0300 Subject: [PATCH] refactor: changes to dips to better support payment collection in indexer-agent --- ...ddc1646e68d6db58124b6e0d780c037f73513.json | 12 + ...3850617919fa84bfaecb0518fd6d4ab70bec.json} | 4 +- ...81e457931b65eca471efed650b10d70d2282a.json | 12 + Cargo.lock | 3 + crates/dips/Cargo.toml | 3 + crates/dips/build.rs | 2 - crates/dips/package.json | 2 +- crates/dips/proto/gateway.proto | 2 +- crates/dips/proto/indexer.proto | 2 +- .../database/dips.rs => dips/src/database.rs} | 70 ++--- crates/dips/src/lib.rs | 19 +- .../src/proto/graphprotocol.gateway.dips.rs | 263 +++++++++++++++++- .../src/proto/graphprotocol.indexer.dips.rs | 194 +++++++++++-- crates/dips/src/server.rs | 7 +- crates/dips/src/store.rs | 49 ++-- crates/service/src/database/mod.rs | 1 - crates/service/src/service.rs | 15 +- migrations/20241030141929_dips.up.sql | 4 +- 18 files changed, 558 insertions(+), 106 deletions(-) rename .sqlx/{query-cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa.json => query-5c973791bb27465b198eafbab7d63850617919fa84bfaecb0518fd6d4ab70bec.json} (86%) rename crates/{service/src/database/dips.rs => dips/src/database.rs} (86%) diff --git a/.sqlx/query-47f848e049f3ff22e65bb53edc7ddc1646e68d6db58124b6e0d780c037f73513.json b/.sqlx/query-47f848e049f3ff22e65bb53edc7ddc1646e68d6db58124b6e0d780c037f73513.json index efc3a3d0..96e66a3d 100644 --- a/.sqlx/query-47f848e049f3ff22e65bb53edc7ddc1646e68d6db58124b6e0d780c037f73513.json +++ b/.sqlx/query-47f848e049f3ff22e65bb53edc7ddc1646e68d6db58124b6e0d780c037f73513.json @@ -112,6 +112,16 @@ "ordinal": 21, "name": "current_allocation_id", "type_info": "Bpchar" + }, + { + "ordinal": 22, + "name": "last_allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 23, + "name": "last_payment_collected_at", + "type_info": "Timestamptz" } ], "parameters": { @@ -141,6 +151,8 @@ false, true, true, + true, + true, true ] }, diff --git a/.sqlx/query-cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa.json b/.sqlx/query-5c973791bb27465b198eafbab7d63850617919fa84bfaecb0518fd6d4ab70bec.json similarity index 86% rename from .sqlx/query-cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa.json rename to .sqlx/query-5c973791bb27465b198eafbab7d63850617919fa84bfaecb0518fd6d4ab70bec.json index b9a20515..7de223ff 100644 --- a/.sqlx/query-cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa.json +++ b/.sqlx/query-5c973791bb27465b198eafbab7d63850617919fa84bfaecb0518fd6d4ab70bec.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null)", + "query": "INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null,null)", "describe": { "columns": [], "parameters": { @@ -28,5 +28,5 @@ }, "nullable": [] }, - "hash": "cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa" + "hash": "5c973791bb27465b198eafbab7d63850617919fa84bfaecb0518fd6d4ab70bec" } diff --git a/.sqlx/query-d0258d53c19174720f2d4d9cde481e457931b65eca471efed650b10d70d2282a.json b/.sqlx/query-d0258d53c19174720f2d4d9cde481e457931b65eca471efed650b10d70d2282a.json index 5e94b7cb..0733b172 100644 --- a/.sqlx/query-d0258d53c19174720f2d4d9cde481e457931b65eca471efed650b10d70d2282a.json +++ b/.sqlx/query-d0258d53c19174720f2d4d9cde481e457931b65eca471efed650b10d70d2282a.json @@ -112,6 +112,16 @@ "ordinal": 21, "name": "current_allocation_id", "type_info": "Bpchar" + }, + { + "ordinal": 22, + "name": "last_allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 23, + "name": "last_payment_collected_at", + "type_info": "Timestamptz" } ], "parameters": { @@ -141,6 +151,8 @@ false, true, true, + true, + true, true ] }, diff --git a/Cargo.lock b/Cargo.lock index 9fb90bd0..8b3d2f93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3697,9 +3697,12 @@ dependencies = [ "alloy-rlp", "anyhow", "async-trait", + "axum", "base64 0.22.1", + "build-info", "prost", "prost-types", + "sqlx", "thegraph-core", "thiserror 1.0.69", "tokio", diff --git a/crates/dips/Cargo.toml b/crates/dips/Cargo.toml index 049fd23b..4323f35e 100644 --- a/crates/dips/Cargo.toml +++ b/crates/dips/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] +axum.workspace = true +build-info.workspace = true thiserror.workspace = true anyhow.workspace = true alloy-rlp = "0.3.10" @@ -15,6 +17,7 @@ prost-types.workspace = true uuid.workspace = true base64.workspace = true tokio.workspace = true +sqlx.workspace = true [build-dependencies] tonic-build = { workspace = true } diff --git a/crates/dips/build.rs b/crates/dips/build.rs index ee392624..68c74e07 100644 --- a/crates/dips/build.rs +++ b/crates/dips/build.rs @@ -6,7 +6,6 @@ fn main() { tonic_build::configure() .out_dir("src/proto") .include_file("indexer.rs") - .build_client(false) .protoc_arg("--experimental_allow_proto3_optional") .compile_protos(&["proto/indexer.proto"], &["proto/"]) .expect("Failed to compile DIPs indexer RPC proto(s)"); @@ -14,7 +13,6 @@ fn main() { tonic_build::configure() .out_dir("src/proto") .include_file("gateway.rs") - .build_server(false) .protoc_arg("--experimental_allow_proto3_optional") .compile_protos(&["proto/gateway.proto"], &["proto"]) .expect("Failed to compile DIPs gateway RPC proto(s)"); diff --git a/crates/dips/package.json b/crates/dips/package.json index 82198ec1..43913f9d 100644 --- a/crates/dips/package.json +++ b/crates/dips/package.json @@ -1,6 +1,6 @@ { "name": "@graphprotocol/dips-proto", - "version": "0.1.0", + "version": "0.2.0", "main": "generated/index.js", "types": "generated/index.d.ts", "files": [ diff --git a/crates/dips/proto/gateway.proto b/crates/dips/proto/gateway.proto index f310b9ab..42b0f9f7 100644 --- a/crates/dips/proto/gateway.proto +++ b/crates/dips/proto/gateway.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package graphprotocol.gateway.dips; -service DipsService { +service DipperService { /** * Cancel an _indexing agreement_. * diff --git a/crates/dips/proto/indexer.proto b/crates/dips/proto/indexer.proto index c29b729e..dc97e82e 100644 --- a/crates/dips/proto/indexer.proto +++ b/crates/dips/proto/indexer.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package graphprotocol.indexer.dips; -service DipsService { +service IndexerDipsService { /** * Propose a new _indexing agreement_ to an _indexer_. * diff --git a/crates/service/src/database/dips.rs b/crates/dips/src/database.rs similarity index 86% rename from crates/service/src/database/dips.rs rename to crates/dips/src/database.rs index 2784a374..81174b8c 100644 --- a/crates/service/src/database/dips.rs +++ b/crates/dips/src/database.rs @@ -5,14 +5,16 @@ use std::str::FromStr; use axum::async_trait; use build_info::chrono::{DateTime, Utc}; -use indexer_dips::{ - store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher, - SubgraphIndexingVoucherMetadata, -}; use sqlx::{types::BigDecimal, PgPool}; use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType}; use uuid::Uuid; +use crate::{ + store::{AgreementStore, StoredIndexingAgreement}, + DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher, + SubgraphIndexingVoucherMetadata, +}; + #[derive(Debug)] pub struct PsqlAgreementStore { pub pool: PgPool, @@ -25,10 +27,7 @@ fn uint256_to_bigdecimal(value: &uint256, field: &str) -> Result Result, DipsError> { + async fn get_by_id(&self, id: Uuid) -> Result, DipsError> { let item = sqlx::query!("SELECT * FROM indexing_agreements WHERE id=$1", id,) .fetch_one(&self.pool) .await; @@ -41,8 +40,18 @@ impl AgreementStore for PsqlAgreementStore { let signed = SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true) .map_err(|e| DipsError::AbiDecoding(e.to_string()))?; + let metadata = + SubgraphIndexingVoucherMetadata::abi_decode(signed.voucher.metadata.as_ref(), true) + .map_err(|e| DipsError::AbiDecoding(e.to_string()))?; let cancelled = item.cancelled_at.is_some(); - Ok(Some((signed, cancelled))) + Ok(Some(StoredIndexingAgreement { + voucher: signed, + metadata, + cancelled, + current_allocation_id: item.current_allocation_id, + last_allocation_id: item.last_allocation_id, + last_payment_collected_at: item.last_payment_collected_at, + })) } async fn create_agreement( &self, @@ -72,7 +81,7 @@ impl AgreementStore for PsqlAgreementStore { let min_epochs_per_collection: i64 = agreement.voucher.minEpochsPerCollection.into(); let max_epochs_per_collection: i64 = agreement.voucher.maxEpochsPerCollection.into(); sqlx::query!( - "INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null)", + "INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null,null)", id, agreement.signature.as_ref(), bs, @@ -126,15 +135,15 @@ pub(crate) mod test { use std::sync::Arc; use build_info::chrono::Duration; - use indexer_dips::{CancellationRequest, IndexingAgreementVoucher}; use sqlx::PgPool; use thegraph_core::alloy::{ primitives::{ruint::aliases::U256, Address}, - sol_types::{SolType, SolValue}, + sol_types::SolValue, }; use uuid::Uuid; use super::*; + use crate::{CancellationRequest, IndexingAgreementVoucher}; #[sqlx::test(migrations = "../../migrations")] async fn test_store_agreement(pool: PgPool) { @@ -226,19 +235,15 @@ pub(crate) mod test { .unwrap(); // Retrieve agreement - let (retrieved_signed_voucher, cancelled) = store.get_by_id(id).await.unwrap().unwrap(); - - let retrieved_voucher = &retrieved_signed_voucher.voucher; - let retrieved_metadata = - ::abi_decode( - retrieved_voucher.metadata.as_ref(), - true, - ) - .unwrap(); + let stored_agreement = store.get_by_id(id).await.unwrap().unwrap(); + + let retrieved_voucher = &stored_agreement.voucher; + let retrieved_metadata = stored_agreement.metadata; + // Verify retrieved agreement matches original - assert_eq!(retrieved_signed_voucher.signature, agreement.signature); + assert_eq!(retrieved_voucher.signature, agreement.signature); assert_eq!( - retrieved_voucher.durationEpochs, + retrieved_voucher.voucher.durationEpochs, agreement.voucher.durationEpochs ); assert_eq!(retrieved_metadata.protocolNetwork, metadata.protocolNetwork); @@ -247,26 +252,29 @@ pub(crate) mod test { retrieved_metadata.subgraphDeploymentId, metadata.subgraphDeploymentId ); - assert_eq!(retrieved_voucher.payer, agreement.voucher.payer); - assert_eq!(retrieved_voucher.recipient, agreement.voucher.recipient); - assert_eq!(retrieved_voucher.service, agreement.voucher.service); + assert_eq!(retrieved_voucher.voucher.payer, agreement.voucher.payer); + assert_eq!( + retrieved_voucher.voucher.recipient, + agreement.voucher.recipient + ); + assert_eq!(retrieved_voucher.voucher.service, agreement.voucher.service); assert_eq!( - retrieved_voucher.maxInitialAmount, + retrieved_voucher.voucher.maxInitialAmount, agreement.voucher.maxInitialAmount ); assert_eq!( - retrieved_voucher.maxOngoingAmountPerEpoch, + retrieved_voucher.voucher.maxOngoingAmountPerEpoch, agreement.voucher.maxOngoingAmountPerEpoch ); assert_eq!( - retrieved_voucher.maxEpochsPerCollection, + retrieved_voucher.voucher.maxEpochsPerCollection, agreement.voucher.maxEpochsPerCollection ); assert_eq!( - retrieved_voucher.minEpochsPerCollection, + retrieved_voucher.voucher.minEpochsPerCollection, agreement.voucher.minEpochsPerCollection ); - assert!(!cancelled); + assert!(!stored_agreement.cancelled); } #[sqlx::test(migrations = "../../migrations")] diff --git a/crates/dips/src/lib.rs b/crates/dips/src/lib.rs index ea00ef06..8dd0666c 100644 --- a/crates/dips/src/lib.rs +++ b/crates/dips/src/lib.rs @@ -11,6 +11,7 @@ use thegraph_core::alloy::{ sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue}, }; +pub mod database; pub mod proto; pub mod server; pub mod store; @@ -310,11 +311,11 @@ pub async fn validate_and_cancel_agreement( decoded_request.request.agreement_id.into(), )) .await?; - let (agreement, cancelled) = result.ok_or(DipsError::AgreementNotFound)?; - if cancelled { + let stored_agreement = result.ok_or(DipsError::AgreementNotFound)?; + if stored_agreement.cancelled { return Err(DipsError::AgreementCancelled); } - let expected_signer = agreement.voucher.payer; + let expected_signer = stored_agreement.voucher.voucher.payer; let id = Uuid::from_bytes(decoded_request.request.agreement_id.into()); decoded_request.validate(domain, &expected_signer)?; @@ -391,11 +392,10 @@ mod test { .unwrap(); assert_eq!(actual_id, id); - let actual = store.get_by_id(actual_id).await.unwrap(); + let stored_agreement = store.get_by_id(actual_id).await.unwrap().unwrap(); - let (actual_voucher, actual_cancelled) = actual.unwrap(); - assert_eq!(voucher, actual_voucher); - assert!(!actual_cancelled); + assert_eq!(voucher, stored_agreement.voucher); + assert!(!stored_agreement.cancelled); Ok(()) } @@ -596,9 +596,8 @@ mod test { assert_eq!(agreement_id, cancelled_id); // Verify agreement is cancelled - let result = store.get_by_id(agreement_id).await?; - let (_, cancelled) = result.ok_or(DipsError::AgreementNotFound)?; - assert!(cancelled); + let stored_agreement = store.get_by_id(agreement_id).await?.unwrap(); + assert!(stored_agreement.cancelled); Ok(()) } diff --git a/crates/dips/src/proto/graphprotocol.gateway.dips.rs b/crates/dips/src/proto/graphprotocol.gateway.dips.rs index e7bccb44..7c543988 100644 --- a/crates/dips/src/proto/graphprotocol.gateway.dips.rs +++ b/crates/dips/src/proto/graphprotocol.gateway.dips.rs @@ -86,7 +86,7 @@ impl CollectPaymentStatus { } } /// Generated client implementations. -pub mod dips_service_client { +pub mod dipper_service_client { #![allow( unused_variables, dead_code, @@ -97,10 +97,10 @@ pub mod dips_service_client { use tonic::codegen::*; use tonic::codegen::http::Uri; #[derive(Debug, Clone)] - pub struct DipsServiceClient { + pub struct DipperServiceClient { inner: tonic::client::Grpc, } - impl DipsServiceClient { + impl DipperServiceClient { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where @@ -111,7 +111,7 @@ pub mod dips_service_client { Ok(Self::new(conn)) } } - impl DipsServiceClient + impl DipperServiceClient where T: tonic::client::GrpcService, T::Error: Into, @@ -129,7 +129,7 @@ pub mod dips_service_client { pub fn with_interceptor( inner: T, interceptor: F, - ) -> DipsServiceClient> + ) -> DipperServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, @@ -143,7 +143,7 @@ pub mod dips_service_client { http::Request, >>::Error: Into + std::marker::Send + std::marker::Sync, { - DipsServiceClient::new(InterceptedService::new(inner, interceptor)) + DipperServiceClient::new(InterceptedService::new(inner, interceptor)) } /// Compress requests with the given encoding. /// @@ -198,13 +198,13 @@ pub mod dips_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/graphprotocol.gateway.dips.DipsService/CancelAgreement", + "/graphprotocol.gateway.dips.DipperService/CancelAgreement", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "graphprotocol.gateway.dips.DipsService", + "graphprotocol.gateway.dips.DipperService", "CancelAgreement", ), ); @@ -232,13 +232,13 @@ pub mod dips_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/graphprotocol.gateway.dips.DipsService/CollectPayment", + "/graphprotocol.gateway.dips.DipperService/CollectPayment", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "graphprotocol.gateway.dips.DipsService", + "graphprotocol.gateway.dips.DipperService", "CollectPayment", ), ); @@ -246,3 +246,246 @@ pub mod dips_service_client { } } } +/// Generated server implementations. +pub mod dipper_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with DipperServiceServer. + #[async_trait] + pub trait DipperService: std::marker::Send + std::marker::Sync + 'static { + /// * + /// Cancel an _indexing agreement_. + /// + /// This method allows the indexer to notify the DIPs gateway that the agreement + /// should be canceled. + async fn cancel_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// * + /// Collect payment for an _indexing agreement_. + /// + /// This method allows the indexer to report the work completed to the DIPs gateway + /// and receive payment for the indexing work done. + async fn collect_payment( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct DipperServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl DipperServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for DipperServiceServer + where + T: DipperService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/graphprotocol.gateway.dips.DipperService/CancelAgreement" => { + #[allow(non_camel_case_types)] + struct CancelAgreementSvc(pub Arc); + impl< + T: DipperService, + > tonic::server::UnaryService + for CancelAgreementSvc { + type Response = super::CancelAgreementResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::cancel_agreement(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CancelAgreementSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.gateway.dips.DipperService/CollectPayment" => { + #[allow(non_camel_case_types)] + struct CollectPaymentSvc(pub Arc); + impl< + T: DipperService, + > tonic::server::UnaryService + for CollectPaymentSvc { + type Response = super::CollectPaymentResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::collect_payment(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CollectPaymentSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for DipperServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "graphprotocol.gateway.dips.DipperService"; + impl tonic::server::NamedService for DipperServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/crates/dips/src/proto/graphprotocol.indexer.dips.rs b/crates/dips/src/proto/graphprotocol.indexer.dips.rs index d4dd2007..455f4239 100644 --- a/crates/dips/src/proto/graphprotocol.indexer.dips.rs +++ b/crates/dips/src/proto/graphprotocol.indexer.dips.rs @@ -71,8 +71,165 @@ impl ProposalResponse { } } } +/// Generated client implementations. +pub mod indexer_dips_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct IndexerDipsServiceClient { + inner: tonic::client::Grpc, + } + impl IndexerDipsServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl IndexerDipsServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> IndexerDipsServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + IndexerDipsServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// * + /// Propose a new _indexing agreement_ to an _indexer_. + /// + /// The _indexer_ can `ACCEPT` or `REJECT` the agreement. + pub async fn submit_agreement_proposal( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.IndexerDipsService/SubmitAgreementProposal", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.IndexerDipsService", + "SubmitAgreementProposal", + ), + ); + self.inner.unary(req, path, codec).await + } + /// * + /// Request to cancel an existing _indexing agreement_. + pub async fn cancel_agreement( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.IndexerDipsService/CancelAgreement", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.IndexerDipsService", + "CancelAgreement", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} /// Generated server implementations. -pub mod dips_service_server { +pub mod indexer_dips_service_server { #![allow( unused_variables, dead_code, @@ -81,9 +238,9 @@ pub mod dips_service_server { clippy::let_unit_value, )] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with DipsServiceServer. + /// Generated trait containing gRPC methods that should be implemented for use with IndexerDipsServiceServer. #[async_trait] - pub trait DipsService: std::marker::Send + std::marker::Sync + 'static { + pub trait IndexerDipsService: std::marker::Send + std::marker::Sync + 'static { /// * /// Propose a new _indexing agreement_ to an _indexer_. /// @@ -106,14 +263,14 @@ pub mod dips_service_server { >; } #[derive(Debug)] - pub struct DipsServiceServer { + pub struct IndexerDipsServiceServer { inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - impl DipsServiceServer { + impl IndexerDipsServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -164,9 +321,9 @@ pub mod dips_service_server { self } } - impl tonic::codegen::Service> for DipsServiceServer + impl tonic::codegen::Service> for IndexerDipsServiceServer where - T: DipsService, + T: IndexerDipsService, B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { @@ -181,11 +338,11 @@ pub mod dips_service_server { } fn call(&mut self, req: http::Request) -> Self::Future { match req.uri().path() { - "/graphprotocol.indexer.dips.DipsService/SubmitAgreementProposal" => { + "/graphprotocol.indexer.dips.IndexerDipsService/SubmitAgreementProposal" => { #[allow(non_camel_case_types)] - struct SubmitAgreementProposalSvc(pub Arc); + struct SubmitAgreementProposalSvc(pub Arc); impl< - T: DipsService, + T: IndexerDipsService, > tonic::server::UnaryService for SubmitAgreementProposalSvc { type Response = super::SubmitAgreementProposalResponse; @@ -201,7 +358,7 @@ pub mod dips_service_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::submit_agreement_proposal( + ::submit_agreement_proposal( &inner, request, ) @@ -232,11 +389,11 @@ pub mod dips_service_server { }; Box::pin(fut) } - "/graphprotocol.indexer.dips.DipsService/CancelAgreement" => { + "/graphprotocol.indexer.dips.IndexerDipsService/CancelAgreement" => { #[allow(non_camel_case_types)] - struct CancelAgreementSvc(pub Arc); + struct CancelAgreementSvc(pub Arc); impl< - T: DipsService, + T: IndexerDipsService, > tonic::server::UnaryService for CancelAgreementSvc { type Response = super::CancelAgreementResponse; @@ -250,7 +407,8 @@ pub mod dips_service_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::cancel_agreement(&inner, request).await + ::cancel_agreement(&inner, request) + .await }; Box::pin(fut) } @@ -297,7 +455,7 @@ pub mod dips_service_server { } } } - impl Clone for DipsServiceServer { + impl Clone for IndexerDipsServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -310,8 +468,8 @@ pub mod dips_service_server { } } /// Generated gRPC service name - pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.DipsService"; - impl tonic::server::NamedService for DipsServiceServer { + pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.IndexerDipsService"; + impl tonic::server::NamedService for IndexerDipsServiceServer { const NAME: &'static str = SERVICE_NAME; } } diff --git a/crates/dips/src/server.rs b/crates/dips/src/server.rs index 0feb35c3..293eb5f6 100644 --- a/crates/dips/src/server.rs +++ b/crates/dips/src/server.rs @@ -9,8 +9,9 @@ use tonic::{Request, Response, Status}; use crate::{ proto::indexer::graphprotocol::indexer::dips::{ - dips_service_server::DipsService, CancelAgreementRequest, CancelAgreementResponse, - ProposalResponse, SubmitAgreementProposalRequest, SubmitAgreementProposalResponse, + indexer_dips_service_server::IndexerDipsService, CancelAgreementRequest, + CancelAgreementResponse, ProposalResponse, SubmitAgreementProposalRequest, + SubmitAgreementProposalResponse, }, store::AgreementStore, validate_and_cancel_agreement, validate_and_create_agreement, @@ -25,7 +26,7 @@ pub struct DipsServer { } #[async_trait] -impl DipsService for DipsServer { +impl IndexerDipsService for DipsServer { async fn submit_agreement_proposal( &self, request: Request, diff --git a/crates/dips/src/store.rs b/crates/dips/src/store.rs index e437f7cc..1a987732 100644 --- a/crates/dips/src/store.rs +++ b/crates/dips/src/store.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use build_info::chrono::{DateTime, Utc}; use uuid::Uuid; use crate::{ @@ -11,12 +12,19 @@ use crate::{ SubgraphIndexingVoucherMetadata, }; +#[derive(Debug, Clone)] +pub struct StoredIndexingAgreement { + pub voucher: SignedIndexingAgreementVoucher, + pub metadata: SubgraphIndexingVoucherMetadata, + pub cancelled: bool, + pub current_allocation_id: Option, + pub last_allocation_id: Option, + pub last_payment_collected_at: Option>, +} + #[async_trait] pub trait AgreementStore: Sync + Send + std::fmt::Debug { - async fn get_by_id( - &self, - id: Uuid, - ) -> Result, DipsError>; + async fn get_by_id(&self, id: Uuid) -> Result, DipsError>; async fn create_agreement( &self, agreement: SignedIndexingAgreementVoucher, @@ -30,15 +38,12 @@ pub trait AgreementStore: Sync + Send + std::fmt::Debug { #[derive(Default, Debug)] pub struct InMemoryAgreementStore { - pub data: tokio::sync::RwLock>, + pub data: tokio::sync::RwLock>, } #[async_trait] impl AgreementStore for InMemoryAgreementStore { - async fn get_by_id( - &self, - id: Uuid, - ) -> Result, DipsError> { + async fn get_by_id(&self, id: Uuid) -> Result, DipsError> { Ok(self .data .try_read() @@ -49,15 +54,21 @@ impl AgreementStore for InMemoryAgreementStore { async fn create_agreement( &self, agreement: SignedIndexingAgreementVoucher, - _medatadata: SubgraphIndexingVoucherMetadata, + metadata: SubgraphIndexingVoucherMetadata, ) -> Result<(), DipsError> { + let id = Uuid::from_bytes(agreement.voucher.agreement_id.into()); + let stored_agreement = StoredIndexingAgreement { + voucher: agreement, + metadata, + cancelled: false, + current_allocation_id: None, + last_allocation_id: None, + last_payment_collected_at: None, + }; self.data .try_write() .map_err(|e| DipsError::UnknownError(e.into()))? - .insert( - Uuid::from_bytes(agreement.voucher.agreement_id.into()), - (agreement.clone(), false), - ); + .insert(id, stored_agreement); Ok(()) } @@ -67,7 +78,7 @@ impl AgreementStore for InMemoryAgreementStore { ) -> Result { let id = Uuid::from_bytes(signed_cancellation.request.agreement_id.into()); - let agreement = { + let mut agreement = { let read_lock = self .data .try_read() @@ -78,11 +89,17 @@ impl AgreementStore for InMemoryAgreementStore { .ok_or(DipsError::AgreementNotFound)? }; + if agreement.cancelled { + return Err(DipsError::AgreementCancelled); + } + + agreement.cancelled = true; + let mut write_lock = self .data .try_write() .map_err(|e| DipsError::UnknownError(e.into()))?; - write_lock.insert(id, (agreement.0, true)); + write_lock.insert(id, agreement); Ok(id) } diff --git a/crates/service/src/database/mod.rs b/crates/service/src/database/mod.rs index e97d4137..b5d102c9 100644 --- a/crates/service/src/database/mod.rs +++ b/crates/service/src/database/mod.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub mod cost_model; -pub mod dips; use std::time::Duration; diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index fea102ec..c56883dd 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -8,8 +8,9 @@ use axum::{extract::Request, serve, ServiceExt}; use clap::Parser; use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig}; use indexer_dips::{ - proto::indexer::graphprotocol::indexer::dips::dips_service_server::{ - DipsService, DipsServiceServer, + database::PsqlAgreementStore, + proto::indexer::graphprotocol::indexer::dips::indexer_dips_service_server::{ + IndexerDipsService, IndexerDipsServiceServer, }, server::DipsServer, }; @@ -21,11 +22,7 @@ use tokio::{net::TcpListener, signal}; use tower_http::normalize_path::NormalizePath; use tracing::info; -use crate::{ - cli::Cli, - database::{self, dips::PsqlAgreementStore}, - metrics::serve_metrics, -}; +use crate::{cli::Cli, database, metrics::serve_metrics}; mod release; mod router; @@ -161,9 +158,9 @@ pub async fn run() -> anyhow::Result<()> { .with_graceful_shutdown(shutdown_handler()) .await?) } -async fn start_dips_server(addr: SocketAddr, service: impl DipsService) { +async fn start_dips_server(addr: SocketAddr, service: impl IndexerDipsService) { tonic::transport::Server::builder() - .add_service(DipsServiceServer::new(service)) + .add_service(IndexerDipsServiceServer::new(service)) .serve(addr) .await .expect("unable to start dips grpc"); diff --git a/migrations/20241030141929_dips.up.sql b/migrations/20241030141929_dips.up.sql index d517138e..b06422ff 100644 --- a/migrations/20241030141929_dips.up.sql +++ b/migrations/20241030141929_dips.up.sql @@ -28,7 +28,9 @@ CREATE TABLE IF NOT EXISTS indexing_agreements ( cancelled_at TIMESTAMP WITH TIME ZONE, signed_cancellation_payload BYTEA, - current_allocation_id CHAR(40) + current_allocation_id CHAR(40), + last_allocation_id CHAR(40), + last_payment_collected_at TIMESTAMP WITH TIME ZONE ); CREATE UNIQUE INDEX IX_UNIQ_SIGNATURE_AGREEMENT on indexing_agreements(signature);