Skip to content

Commit abcd7fe

Browse files
authored
docs: complete tap-agent docs (#605)
* chore: add missing_docs lint to clippy Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add docs to adaptative_concurrency Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add tracker docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add tap docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add agent docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add backoff docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add cli docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add database docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add metrics docs Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: add final documentation to lib Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> * docs: fix typos and grammar Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai> --------- Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
1 parent 93833c9 commit abcd7fe

22 files changed

+316
-19
lines changed

crates/tap-agent/src/adaptative_concurrency.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,43 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//! # Adaptative concurrency
5+
//! This module provides [AdaptiveLimiter] as a tool to allow concurrency.
6+
//! It's implemented with an Additive increase, Multiplicative decrease
7+
//! ([AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease))
8+
//! strategy.
9+
//!
10+
//!
11+
//!
12+
//! This allows us to have a big number of rav requests running
13+
//! concurrently, but if any of them fails we limit
14+
//! the following requests until the aggregator recovers.
15+
//!
16+
//! ## Behaviour
17+
//! On every request, the caller acquires a slot by calling [AdaptiveLimiter::acquire()].
18+
//! This will increment the number of in_flight connections.
19+
//!
20+
//! If we receive a successful response, we increment our limit to be able to process
21+
//! one more request concurrently.
22+
//!
23+
//! If we receive a failed response, we decrement our limit by half to quickly
24+
//! relieve the pressure in the system.
25+
426
use std::ops::Range;
527

28+
/// Simple struct that keeps track of concurrent requests
29+
///
30+
/// More information on [crate::adaptative_concurrency]
631
pub struct AdaptiveLimiter {
732
range: Range<usize>,
833
current_limit: usize,
934
in_flight: usize,
1035
}
1136

1237
impl AdaptiveLimiter {
38+
/// Creates an instance of [AdaptiveLimiter] with an `initial_limit`
39+
/// and a `range` that contains the minimum and maximum of concurrent
40+
/// requests
1341
pub fn new(initial_limit: usize, range: Range<usize>) -> Self {
1442
Self {
1543
range,
@@ -18,24 +46,33 @@ impl AdaptiveLimiter {
1846
}
1947
}
2048

49+
/// Acquires a slot in our limiter, returning `bool`
50+
/// representing if we had limit available or not
2151
pub fn acquire(&mut self) -> bool {
2252
self.has_limit() && {
2353
self.in_flight += 1;
2454
true
2555
}
2656
}
2757

58+
/// Returns if there're slots available
2859
pub fn has_limit(&self) -> bool {
2960
self.in_flight < self.current_limit
3061
}
3162

63+
/// Callback function that removes in_flight counter
64+
/// and if the current limit is lower than the provided
65+
/// limit, increase the current limit by 1.
3266
pub fn on_success(&mut self) {
3367
self.in_flight -= 1;
3468
if self.current_limit < self.range.end {
3569
self.current_limit += 1; // Additive Increase
3670
}
3771
}
3872

73+
/// Callback function that removes in_flight counter
74+
/// and decreasing the current limit by half, with
75+
/// minimum value to configured value.
3976
pub fn on_failure(&mut self) {
4077
// Multiplicative Decrease
4178
self.in_flight -= 1;

crates/tap-agent/src/agent.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,40 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//! # agent
5+
//!
6+
//! The agent is a set of 3 actors:
7+
//! - [sender_accounts_manager::SenderAccountsManager]
8+
//! - [sender_account::SenderAccount]
9+
//! - [sender_allocation::SenderAllocation]
10+
//!
11+
//! They run under a supervision tree and it goes like the following:
12+
//!
13+
//! [sender_accounts_manager::SenderAccountsManager] monitors allocations provided
14+
//! by the subgraph via a [Watcher](::indexer_watcher). Every time it detects a
15+
//! new escrow account created, it automatically spawns a [sender_account::SenderAccount].
16+
//!
17+
//! Manager is also responsible for spawning an pgnotify task that monitors new receipts.
18+
//!
19+
//! [sender_account::SenderAccount] is then responsible for keeping track of all fees
20+
//! distributed across different allocations and also spawning [sender_allocation::SenderAllocation]s
21+
//! that are going to process receipts and RAV requests.
22+
//!
23+
//! [sender_allocation::SenderAllocation] receives notifications from the spawned task and then
24+
//! it updates its state an notifies its parent actor.
25+
//!
26+
//! Once [sender_account::SenderAccount] gets enough receipts, it uses its tracker to decide
27+
//! what is the allocation with the most amount of fees and send a message to trigger a RavRequest.
28+
//!
29+
//! When the allocation is closed by the indexer, [sender_allocation::SenderAllocation] is
30+
//! responsible for triggering the last rav, that will flush all pending receipts and mark the rav
31+
//! as last to be redeemed by indexer-agent.
32+
//!
33+
//! ## Actors
34+
//! Actors are implemented using the [ractor] library and contain their own message queue.
35+
//! They process one message at a time and that's why concurrent primitives like
36+
//! [std::sync::Mutex]s aren't needed.
37+
438
use indexer_config::{
539
Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig,
640
SubgraphConfig, SubgraphsConfig, TapConfig,
@@ -15,11 +49,19 @@ use crate::{
1549
database, CONFIG, EIP_712_DOMAIN,
1650
};
1751

52+
/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_account::SenderAccount]
1853
pub mod sender_account;
54+
/// Actor, Arguments, State, Messages and implementation for
55+
/// [crate::agent::sender_accounts_manager::SenderAccountsManager]
1956
pub mod sender_accounts_manager;
57+
/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_allocation::SenderAllocation]
2058
pub mod sender_allocation;
59+
/// Unaggregated receipts containing total value and last id stored in the table
2160
pub mod unaggregated_receipts;
2261

62+
/// This is the main entrypoint for starting up tap-agent
63+
///
64+
/// It uses the static [crate::CONFIG] to configure the agent.
2365
pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
2466
let Config {
2567
indexer: IndexerConfig {

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ type Balance = U256;
103103
/// Information for Ravs that are abstracted away from the SignedRav itself
104104
#[derive(Debug, Default, PartialEq, Eq)]
105105
pub struct RavInformation {
106+
/// Allocation Id of a Rav
106107
pub allocation_id: Address,
108+
/// Value Aggregate of a Rav
107109
pub value_aggregate: u128,
108110
}
109111

@@ -160,7 +162,7 @@ pub enum ReceiptFees {
160162
Retry,
161163
}
162164

163-
/// Enum containing all types of messages that a SenderAccount can receive
165+
/// Enum containing all types of messages that a [SenderAccount] can receive
164166
#[derive(Debug)]
165167
pub enum SenderAccountMessage {
166168
/// Updates the sender balance and
@@ -205,6 +207,7 @@ pub struct SenderAccount;
205207

206208
/// Arguments received in startup while spawing [SenderAccount] actor
207209
pub struct SenderAccountArgs {
210+
/// Configuration derived from config.toml
208211
pub config: &'static SenderAccountConfig,
209212

210213
/// Connection to database
@@ -318,20 +321,32 @@ pub struct State {
318321
config: &'static SenderAccountConfig,
319322
}
320323

324+
/// Configuration derived from config.toml
321325
pub struct SenderAccountConfig {
326+
/// Buffer used for the receipts
322327
pub rav_request_buffer: Duration,
328+
/// Maximum amount is willing to lose
323329
pub max_amount_willing_to_lose_grt: u128,
330+
/// What value triggers a new Rav request
324331
pub trigger_value: u128,
325332

326333
// allocation config
334+
/// Timeout config for rav requests
327335
pub rav_request_timeout: Duration,
336+
/// Limit of receipts sent in a Rav Request
328337
pub rav_request_receipt_limit: u64,
338+
/// Current indexer address
329339
pub indexer_address: Address,
340+
/// Polling interval for escrow subgraph
330341
pub escrow_polling_interval: Duration,
342+
/// Timeout used while creating [SenderAccount]
343+
///
344+
/// This is reached if the database is too slow
331345
pub tap_sender_timeout: Duration,
332346
}
333347

334348
impl SenderAccountConfig {
349+
/// Creates a [SenderAccountConfig] by getting a reference of [indexer_config::Config]
335350
pub fn from_config(config: &indexer_config::Config) -> Self {
336351
Self {
337352
rav_request_buffer: config.tap.rav_request.timestamp_buffer_secs,
@@ -1232,6 +1247,7 @@ impl Actor for SenderAccount {
12321247
}
12331248

12341249
impl SenderAccount {
1250+
/// Deny sender by giving `sender` [Address]
12351251
pub async fn deny_sender(pool: &PgPool, sender: Address) {
12361252
sqlx::query!(
12371253
r#"
@@ -1248,6 +1264,7 @@ impl SenderAccount {
12481264

12491265
#[cfg(test)]
12501266
pub mod tests {
1267+
#![allow(missing_docs)]
12511268
use std::{
12521269
collections::{HashMap, HashSet},
12531270
sync::atomic::AtomicU32,
@@ -1326,6 +1343,7 @@ pub mod tests {
13261343
}
13271344
}
13281345

1346+
/// Prefix shared between tests so we don't have conflicts in the global registry
13291347
pub static PREFIX_ID: AtomicU32 = AtomicU32::new(0);
13301348
const ESCROW_VALUE: u128 = 1000;
13311349
const BUFFER_DURATION: Duration = Duration::from_millis(100);

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,24 @@ lazy_static! {
3535
.unwrap();
3636
}
3737

38+
/// Notification received by pgnotify
39+
///
40+
/// This contains a list of properties that are sent by postgres when a receipt is inserted
3841
#[derive(Deserialize, Debug, PartialEq, Eq)]
3942
pub struct NewReceiptNotification {
43+
/// id inside the table
4044
pub id: u64,
45+
/// address of the allocation
4146
pub allocation_id: Address,
47+
/// address of wallet that signed this receipt
4248
pub signer_address: Address,
49+
/// timestamp of the receipt
4350
pub timestamp_ns: u64,
51+
/// value of the receipt
4452
pub value: u128,
4553
}
4654

55+
/// Manager Actor
4756
pub struct SenderAccountsManager;
4857

4958
/// Wrapped AllocationId Address with two possible variants
@@ -53,7 +62,9 @@ pub struct SenderAccountsManager;
5362
/// Rav and Receipt types
5463
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
5564
pub enum AllocationId {
65+
/// Legacy allocation
5666
Legacy(Address),
67+
/// New Subgraph DataService allocation
5768
Horizon(Address),
5869
}
5970

@@ -72,25 +83,42 @@ impl Display for AllocationId {
7283
}
7384
}
7485

86+
/// Enum containing all types of messages that a [SenderAccountsManager] can receive
7587
#[derive(Debug)]
7688
pub enum SenderAccountsManagerMessage {
89+
/// Spawn and Stop [SenderAccount]s that were added or removed
90+
/// in comparison with it current state and updates the state
7791
UpdateSenderAccounts(HashSet<Address>),
7892
}
7993

94+
/// Arguments received in startup while spawing [SenderAccount] actor
8095
pub struct SenderAccountsManagerArgs {
96+
/// Config forwarded to [SenderAccount]
8197
pub config: &'static SenderAccountConfig,
98+
/// Domain separator used for tap
8299
pub domain_separator: Eip712Domain,
83100

101+
/// Database connection
84102
pub pgpool: PgPool,
103+
/// Watcher that returns a map of open and recently closed allocation ids
85104
pub indexer_allocations: Receiver<HashMap<Address, Allocation>>,
105+
/// Watcher containing the escrow accounts
86106
pub escrow_accounts: Receiver<EscrowAccounts>,
107+
/// SubgraphClient of the escrow subgraph
87108
pub escrow_subgraph: &'static SubgraphClient,
109+
/// SubgraphClient of the network subgraph
88110
pub network_subgraph: &'static SubgraphClient,
111+
/// Map containing all endpoints for senders provided in the config
89112
pub sender_aggregator_endpoints: HashMap<Address, Url>,
90113

114+
/// Prefix used to bypass limitations of global actor registry (used for tests)
91115
pub prefix: Option<String>,
92116
}
93117

118+
/// State for [SenderAccountsManager] actor
119+
///
120+
/// This is a separate instance that makes it easier to have mutable
121+
/// reference, for more information check ractor library
94122
pub struct State {
95123
sender_ids: HashSet<Address>,
96124
new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,

crates/tap-agent/src/agent/sender_allocation.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,27 @@ lazy_static! {
7777
/// This is used to give better error messages to users so they have a better understanding
7878
#[derive(Error, Debug)]
7979
pub enum RavError {
80+
/// Database Errors
8081
#[error(transparent)]
8182
Sqlx(#[from] sqlx::Error),
8283

84+
/// Tap Core lib errors
8385
#[error(transparent)]
8486
TapCore(#[from] tap_core::Error),
8587

88+
/// Errors while aggregating
8689
#[error(transparent)]
8790
AggregationError(#[from] AggregationError),
8891

92+
/// Errors with gRPC client
8993
#[error(transparent)]
9094
Grpc(#[from] tonic::Status),
9195

96+
/// All receipts are invalid
9297
#[error("All receipts are invalid")]
9398
AllReceiptsInvalid,
9499

100+
/// Other kind of error
95101
#[error(transparent)]
96102
Other(#[from] anyhow::Error),
97103
}
@@ -151,15 +157,21 @@ pub struct SenderAllocationState<T: NetworkVersion> {
151157
rav_request_receipt_limit: u64,
152158
}
153159

160+
/// Configuration derived from config.toml
154161
#[derive(Clone)]
155162
pub struct AllocationConfig {
163+
/// Buffer used for the receipts
156164
pub timestamp_buffer_ns: u64,
165+
/// Limit of receipts sent in a Rav Request
157166
pub rav_request_receipt_limit: u64,
167+
/// Current indexer address
158168
pub indexer_address: Address,
169+
/// Polling interval for escrow subgraph
159170
pub escrow_polling_interval: Duration,
160171
}
161172

162173
impl AllocationConfig {
174+
/// Creates a [SenderAccountConfig] by getting a reference of [super::sender_account::SenderAccountConfig]
163175
pub fn from_sender_config(config: &SenderAccountConfig) -> Self {
164176
Self {
165177
timestamp_buffer_ns: config.rav_request_buffer.as_nanos() as u64,
@@ -199,11 +211,18 @@ pub struct SenderAllocationArgs<T: NetworkVersion> {
199211
pub config: AllocationConfig,
200212
}
201213

214+
/// Enum containing all types of messages that a [SenderAllocation] can receive
202215
#[derive(Debug)]
203216
pub enum SenderAllocationMessage {
217+
/// New receipt message, sent by the task spawned by
218+
/// [super::sender_accounts_manager::SenderAccountsManager]
204219
NewReceipt(NewReceiptNotification),
220+
/// Triggers a Rav Request for the current allocation
221+
///
222+
/// It notifies its parent with the response
205223
TriggerRavRequest,
206224
#[cfg(any(test, feature = "test"))]
225+
/// Return the internal state (used for tests)
207226
GetUnaggregatedReceipts(ractor::RpcReplyPort<UnaggregatedReceipts>),
208227
}
209228

@@ -756,6 +775,7 @@ where
756775
}
757776
}
758777

778+
/// Sends a database query and mark the allocation rav as last
759779
pub async fn mark_rav_last(&self) -> anyhow::Result<()> {
760780
tracing::info!(
761781
sender = %self.sender,
@@ -932,6 +952,7 @@ where
932952

933953
#[cfg(test)]
934954
pub mod tests {
955+
#![allow(missing_docs)]
935956
use std::{
936957
collections::HashMap,
937958
future::Future,

0 commit comments

Comments
 (0)