From 9d9b90d45e9d5d8c039c1157723c9470694f226c Mon Sep 17 00:00:00 2001 From: gmbronco <83549293+gmbronco@users.noreply.github.com> Date: Thu, 12 Sep 2024 11:12:17 +0200 Subject: [PATCH] Subgraph fallbacks (#922) --- .changeset/smart-berries-watch.md | 5 + apps/env.ts | 4 + config/arbitrum.ts | 5 +- config/avalanche.ts | 4 +- config/base.ts | 2 +- config/fantom.ts | 5 +- config/fraxtal.ts | 3 +- config/gnosis.ts | 4 +- config/mainnet.ts | 4 +- config/mode.ts | 2 +- config/optimism.ts | 5 +- config/polygon.ts | 4 +- config/sepolia.ts | 2 +- config/zkevm.ts | 2 +- env.local | 1 + modules/network/network-config-types.ts | 2 +- modules/protocol/protocol.service.ts | 11 +- modules/sources/subgraphs/retry-on-failure.ts | 49 +++ .../subgraphs/subgraph-service-base.ts | 26 ++ .../balancer-subgraph.service.ts | 414 +++++++++++------- modules/subgraphs/balancer-subgraph/index.ts | 14 +- modules/token/latest-fx-price.ts | 12 +- 22 files changed, 391 insertions(+), 189 deletions(-) create mode 100644 .changeset/smart-berries-watch.md create mode 100644 modules/sources/subgraphs/retry-on-failure.ts create mode 100644 modules/sources/subgraphs/subgraph-service-base.ts diff --git a/.changeset/smart-berries-watch.md b/.changeset/smart-berries-watch.md new file mode 100644 index 000000000..c2eeffd40 --- /dev/null +++ b/.changeset/smart-berries-watch.md @@ -0,0 +1,5 @@ +--- +'backend': minor +--- + +Subgraph client will fallback to another URL on failure diff --git a/apps/env.ts b/apps/env.ts index 361b31201..2d2942f98 100644 --- a/apps/env.ts +++ b/apps/env.ts @@ -33,6 +33,10 @@ export const schema = { optional: true, type: String, }, + SATSUMA_API_KEY: { + optional: true, + type: String, + }, WORKER_QUEUE_URL: { optional: true, type: String, diff --git a/config/arbitrum.ts b/config/arbitrum.ts index b732c6c46..15142bcb9 100644 --- a/config/arbitrum.ts +++ b/config/arbitrum.ts @@ -12,7 +12,10 @@ export default { }, subgraphs: { startDate: '2021-08-23', - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmPbjY6L1NhPjpBv7wDTfG9EPx5FpCuBqeg1XxByzBTLcs`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmPbjY6L1NhPjpBv7wDTfG9EPx5FpCuBqeg1XxByzBTLcs`, + `https://subgraph.satsuma-prod.com/${env.SATSUMA_API_KEY}/balancer/balancer-v2-fantom/api`, + ], cowAmm: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmTSU862YAXb5XMhGsE7JCajuvf5FPiZjrdvC9nnbzd86x`, beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/arbitrum-blocks/version/latest', diff --git a/config/avalanche.ts b/config/avalanche.ts index 040f09540..eaa06f158 100644 --- a/config/avalanche.ts +++ b/config/avalanche.ts @@ -12,7 +12,9 @@ export default { }, subgraphs: { startDate: '2023-06-06', - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmeJY1ZjmuJVPvmVghZSuiSxEx2a9kmpKnjr4Qw5hNdpLU`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmeJY1ZjmuJVPvmVghZSuiSxEx2a9kmpKnjr4Qw5hNdpLU`, + ], beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/avalanche-blocks/version/latest', gauge: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmYCJJToWTY31LgsJG2vZTNkKWpQ7i91cTVYgPUBsJ5nQZ`, diff --git a/config/base.ts b/config/base.ts index 68c926872..29025adeb 100644 --- a/config/base.ts +++ b/config/base.ts @@ -12,7 +12,7 @@ export default { }, subgraphs: { startDate: '2023-07-10', - balancer: `https://api.studio.thegraph.com/query/24660/balancer-base-v2/version/latest`, + balancer: [`https://api.studio.thegraph.com/query/24660/balancer-base-v2/version/latest`], beetsBar: '', blocks: 'https://api.studio.thegraph.com/query/48427/bleu-base-blocks/version/latest', gauge: `https://api.studio.thegraph.com/query/24660/balancer-gauges-base/version/latest`, diff --git a/config/fantom.ts b/config/fantom.ts index 22aeafa76..29fcae05b 100644 --- a/config/fantom.ts +++ b/config/fantom.ts @@ -13,7 +13,10 @@ export default { }, subgraphs: { startDate: '2021-10-08', - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_FANTOM}/deployments/id/QmYN8qV7PEokFeQvhhWMinYD5wsspP1Sc87pGKEvAmjSCJ`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_FANTOM}/deployments/id/QmYN8qV7PEokFeQvhhWMinYD5wsspP1Sc87pGKEvAmjSCJ`, + `https://subgraph.satsuma-prod.com/${env.SATSUMA_API_KEY}/balancer/balancer-v2-arbitrum/api`, + ], beetsBar: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_FANTOM}/deployments/id/QmXcxzZioHXV5ts2UcG6gNNEayoaZ9ip7D9JvPS88K2HXe`, blocks: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_FANTOM}/subgraphs/id/3drjZDpA9hAuYGA19ttEkhW432mVe2XHy5YarBDVYHbz`, masterchef: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_FANTOM}/deployments/id/QmZQJu1rxMEDwzZb5TSqDXjcFiS1DN8BRKCLKRv6ifEBhA`, diff --git a/config/fraxtal.ts b/config/fraxtal.ts index de55859a0..2da5b2f50 100644 --- a/config/fraxtal.ts +++ b/config/fraxtal.ts @@ -11,8 +11,9 @@ export default { }, subgraphs: { startDate: '2024-05-22', - balancer: + balancer: [ 'https://api.goldsky.com/api/public/project_clwhu1vopoigi01wmbn514m1z/subgraphs/balancer-fraxtal-v2/latest/gn', + ], beetsBar: '', blocks: 'https://api.goldsky.com/api/public/project_clwhu1vopoigi01wmbn514m1z/subgraphs/fraxtal-blocks/1.0.0/gn', gauge: 'https://api.goldsky.com/api/public/project_clwhu1vopoigi01wmbn514m1z/subgraphs/balancer-gauges-fraxtal/latest/gn', diff --git a/config/gnosis.ts b/config/gnosis.ts index c48d29877..4ee5d5477 100644 --- a/config/gnosis.ts +++ b/config/gnosis.ts @@ -12,7 +12,9 @@ export default { }, subgraphs: { startDate: '2021-08-23', - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmXXSKeLh14DnJgR1ncHhAHciqacfRshcHKXasAGy7LP4Y`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmXXSKeLh14DnJgR1ncHhAHciqacfRshcHKXasAGy7LP4Y`, + ], beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/gnosis-blocks/version/latest', gauge: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/Qme9hQY1NZ8ReVDSSQb893s2fGpeLkgfwXd3YU5rndACaP`, diff --git a/config/mainnet.ts b/config/mainnet.ts index 820d28b64..1c0d835f4 100644 --- a/config/mainnet.ts +++ b/config/mainnet.ts @@ -22,7 +22,9 @@ export default { subgraphs: { startDate: '2019-04-20', cowAmm: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmUvfS6hqU3nGQxFFzxoMBkufYJ7Jh3cYdTUM64hucgqe7`, - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmQ5TT2yYBZgoUxsat3bKmNe5Fr9LW9YAtDs8aeuc1BRhj`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmQ5TT2yYBZgoUxsat3bKmNe5Fr9LW9YAtDs8aeuc1BRhj`, + ], beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/ethereum-blocks/version/latest', gauge: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmdmQBHbBtwD6wNypHbuGKB1uKHpHNVuSHbo9FsvrMhXSn`, diff --git a/config/mode.ts b/config/mode.ts index 40bda93db..e2ac3cb98 100644 --- a/config/mode.ts +++ b/config/mode.ts @@ -11,7 +11,7 @@ export default { }, subgraphs: { startDate: '2024-05-22', - balancer: `https://api.studio.thegraph.com/query/75376/balancer-mode-v2/version/latest`, + balancer: [`https://api.studio.thegraph.com/query/75376/balancer-mode-v2/version/latest`], beetsBar: '', blocks: 'https://api.studio.thegraph.com/query/48427/bleu-mode-blocks/version/latest', gauge: `https://api.studio.thegraph.com/query/75376/balancer-gauges-mode/version/latest`, diff --git a/config/optimism.ts b/config/optimism.ts index cafde6ae3..f48874ad0 100644 --- a/config/optimism.ts +++ b/config/optimism.ts @@ -13,7 +13,10 @@ export default { }, subgraphs: { startDate: '2022-01-01', - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmWUgkiUM5c3BW1Z51DUkZfnyQfyfesE8p3BRnEtA9vyPL`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmWUgkiUM5c3BW1Z51DUkZfnyQfyfesE8p3BRnEtA9vyPL`, + `https://subgraph.satsuma-prod.com/${env.SATSUMA_API_KEY}/balancer/balancer-v2-optimism/api`, + ], beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/optimism-blocks/version/latest', gauge: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/Qmdtj1ix1nUCRtSoiyF7a3oKMSvrKT8KTEFJdep53EHtRy`, diff --git a/config/polygon.ts b/config/polygon.ts index 7acdcec53..05447e55c 100644 --- a/config/polygon.ts +++ b/config/polygon.ts @@ -12,7 +12,9 @@ export default { }, subgraphs: { startDate: '2021-06-16', - balancer: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmUqS6BAVQgvstEsVrxuwsu1DwQdfAdj3Q6gz2j3DbUYQ9`, + balancer: [ + `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmUqS6BAVQgvstEsVrxuwsu1DwQdfAdj3Q6gz2j3DbUYQ9`, + ], beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/polygon-blocks/version/latest', gauge: `https://gateway-arbitrum.network.thegraph.com/api/${env.THEGRAPH_API_KEY_BALANCER}/deployments/id/QmewSgLJf9TZt8trr61dECJhEGGyHxKFWbNQ3AnNZAdYyU`, diff --git a/config/sepolia.ts b/config/sepolia.ts index 6b4fd524d..8cdefd021 100644 --- a/config/sepolia.ts +++ b/config/sepolia.ts @@ -13,7 +13,7 @@ export default { subgraphs: { startDate: '2023-05-03', cowAmm: 'https://api.studio.thegraph.com/proxy/75376/balancer-cow-amm-sepolia/version/latest', - balancer: 'https://api.studio.thegraph.com/query/24660/balancer-sepolia-v2/version/latest', + balancer: ['https://api.studio.thegraph.com/query/24660/balancer-sepolia-v2/version/latest'], balancerV3: 'https://api.studio.thegraph.com/query/31386/balancer-v3-sepolia-8th/version/latest', balancerPoolsV3: 'https://api.studio.thegraph.com/query/31386/balancer-pools-v3-sepolia-8th/version/latest', beetsBar: 'https://', diff --git a/config/zkevm.ts b/config/zkevm.ts index 9e6ff0721..17e0d6448 100644 --- a/config/zkevm.ts +++ b/config/zkevm.ts @@ -12,7 +12,7 @@ export default { }, subgraphs: { startDate: '2023-05-17', - balancer: `https://api.studio.thegraph.com/query/24660/balancer-polygon-zk-v2/version/latest`, + balancer: [`https://api.studio.thegraph.com/query/24660/balancer-polygon-zk-v2/version/latest`], beetsBar: 'https://', blocks: 'https://api.studio.thegraph.com/query/48427/bleu-polygon-zkevm-blocks/version/latest', gauge: `https://api.studio.thegraph.com/query/24660/balancer-gauges-polygon-zk/version/latest`, diff --git a/env.local b/env.local index b082f9e1c..3a93406ed 100644 --- a/env.local +++ b/env.local @@ -4,6 +4,7 @@ DEPLOYMENT_ENV=canary # RPCs ALCHEMY_API_KEY= +SATSUMA_API_KEY= # Database DATABASE_URL=postgresql://backend:let-me-in@localhost:5431/database?schema=public diff --git a/modules/network/network-config-types.ts b/modules/network/network-config-types.ts index 8561ba099..0dfadf191 100644 --- a/modules/network/network-config-types.ts +++ b/modules/network/network-config-types.ts @@ -61,7 +61,7 @@ export interface NetworkData { }; subgraphs: { startDate: string; - balancer: string; + balancer: string[]; balancerV3?: string; balancerPoolsV3?: string; blocks: string; diff --git a/modules/protocol/protocol.service.ts b/modules/protocol/protocol.service.ts index a4023da83..6db5af2f5 100644 --- a/modules/protocol/protocol.service.ts +++ b/modules/protocol/protocol.service.ts @@ -4,12 +4,13 @@ import { Cache } from 'memory-cache'; import { Chain, PrismaLastBlockSyncedCategory, PrismaUserBalanceType } from '@prisma/client'; import _ from 'lodash'; import { networkContext } from '../network/network-context.service'; -import { AllNetworkConfigs, AllNetworkConfigsKeyedOnChain } from '../network/network-config'; +import { AllNetworkConfigs, AllNetworkConfigsKeyedOnChain, chainToIdMap } from '../network/network-config'; import { GqlProtocolMetricsAggregated, GqlProtocolMetricsChain } from '../../schema'; import { GraphQLClient } from 'graphql-request'; import { getSdk } from '../subgraphs/balancer-subgraph/generated/balancer-subgraph-types'; import axios from 'axios'; import { tokenService } from '../token/token.service'; +import { getV2SubgraphClient } from '../subgraphs/balancer-subgraph'; interface LatestSyncedBlocks { userWalletSyncBlock: string; @@ -69,10 +70,12 @@ export class ProtocolService { public async cacheProtocolMetrics(chain: Chain): Promise { const oneDayAgo = moment().subtract(24, 'hours').unix(); - const client = new GraphQLClient(AllNetworkConfigsKeyedOnChain[chain].data.subgraphs.balancer); - const subgraphClient = getSdk(client); + const client = getV2SubgraphClient( + AllNetworkConfigsKeyedOnChain[chain].data.subgraphs.balancer, + Number(chainToIdMap[chain]), + ); - const { balancers } = await subgraphClient.BalancerProtocolData({}); + const { balancers } = await client.BalancerProtocolData({}); const { totalSwapFee, totalSwapVolume, poolCount } = balancers[0]; const pools = await prisma.prismaPool.findMany({ diff --git a/modules/sources/subgraphs/retry-on-failure.ts b/modules/sources/subgraphs/retry-on-failure.ts new file mode 100644 index 000000000..394c1b4d9 --- /dev/null +++ b/modules/sources/subgraphs/retry-on-failure.ts @@ -0,0 +1,49 @@ +export async function retryOnFailureWithRotation( + sdkClients: any[], + fn: (sdk: any) => Promise, + retries: number = 3, +): Promise { + let attempts = 0; + let currentSdkIndex = 0; + + while (attempts < retries) { + try { + const sdk = sdkClients[currentSdkIndex]; // Get the current SDK client + return await fn(sdk); // Try the operation using the current SDK + } catch (error) { + attempts += 1; + console.error(`Subgraph failed ${new URL(sdkClients[currentSdkIndex].url).host}:`, error); + + if (attempts < retries) { + // Rotate to the next SDK client + currentSdkIndex = (currentSdkIndex + 1) % sdkClients.length; + console.log(`Retrying with ${new URL(sdkClients[currentSdkIndex].url).host}...`); + } else { + throw new Error('All SDK clients failed after retries.'); + } + } + } + + throw new Error('Unexpected failure, retries exhausted.'); +} + +export function wrapSdkWithRetryAndRotation(sdkClients: T[], retries: number = 3): T { + const wrappedSdk: Partial = {}; + + // Use one SDK as a template for wrapping all functions + const sdk = sdkClients[0]; + + for (const key of Object.keys(sdk)) { + const value = (sdk as any)[key]; + + if (typeof value === 'function') { + // Wrap each SDK method to use retry with rotation + wrappedSdk[key as keyof T] = ((...args: any[]) => + retryOnFailureWithRotation(sdkClients, (sdk) => value.apply(sdk, args), retries)) as T[keyof T]; + } else { + wrappedSdk[key as keyof T] = value; + } + } + + return wrappedSdk as T; +} diff --git a/modules/sources/subgraphs/subgraph-service-base.ts b/modules/sources/subgraphs/subgraph-service-base.ts new file mode 100644 index 000000000..2e7c875cc --- /dev/null +++ b/modules/sources/subgraphs/subgraph-service-base.ts @@ -0,0 +1,26 @@ +import { GraphQLClient } from 'graphql-request'; +import { Cache, CacheClass } from 'memory-cache'; +import { retryOnFailureWithRotation } from './retry-on-failure'; + +export class SubgraphServiceBase { + protected cache: CacheClass; + private sdks: TSdk[]; + + constructor( + subgraphUrl: string | string[], + protected chainId: number, + private getSdk: (client: GraphQLClient) => TSdk, + ) { + this.cache = new Cache(); + + if (Array.isArray(subgraphUrl)) { + this.sdks = subgraphUrl.map((url) => this.getSdk(new GraphQLClient(url))); + } else { + this.sdks = [this.getSdk(new GraphQLClient(subgraphUrl))]; + } + } + + protected async retryOnFailure(fn: (sdk: TSdk) => Promise, retries: number = 3): Promise { + return retryOnFailureWithRotation(this.sdks, fn, retries); + } +} diff --git a/modules/subgraphs/balancer-subgraph/balancer-subgraph.service.ts b/modules/subgraphs/balancer-subgraph/balancer-subgraph.service.ts index ebedfb45c..a9c784b1a 100644 --- a/modules/subgraphs/balancer-subgraph/balancer-subgraph.service.ts +++ b/modules/subgraphs/balancer-subgraph/balancer-subgraph.service.ts @@ -1,4 +1,3 @@ -import { GraphQLClient } from 'graphql-request'; import { Balancer, BalancerAmpUpdateFragment, @@ -37,75 +36,88 @@ import { BalancerUsersQueryVariables, getSdk, OrderDirection, + Pool_OrderBy, + PoolBalancesFragment, + PoolBalancesQueryVariables, PoolShare_OrderBy, + PoolSnapshot_OrderBy, Swap_OrderBy, } from './generated/balancer-subgraph-types'; import { subgraphLoadAll } from '../subgraph-util'; -import { Cache, CacheClass } from 'memory-cache'; import { fiveMinutesInMs, twentyFourHoursInMs } from '../../common/time'; import { BalancerUserPoolShare } from './balancer-subgraph-types'; +import { SubgraphServiceBase } from '../../sources/subgraphs/subgraph-service-base'; const ALL_POOLS_CACHE_KEY = `balance-subgraph_all-pools`; const PORTFOLIO_POOLS_CACHE_KEY = `balance-subgraph_portfolio-pools`; -export class BalancerSubgraphService { - private cache: CacheClass; - private sdk: ReturnType; - - constructor(subgraphUrl: string, private chainId: number) { - this.cache = new Cache(); - this.sdk = getSdk(new GraphQLClient(subgraphUrl)); +export class BalancerSubgraphService extends SubgraphServiceBase> { + constructor(subgraphUrl: string | string[], chainId: number) { + super(subgraphUrl, chainId, getSdk); } public async getMetadata() { - const { meta } = await this.sdk.BalancerGetMeta(); - - if (!meta) { - throw new Error('Missing meta data'); - } - - return meta; + return this.retryOnFailure(async (sdk) => { + const { meta } = await sdk.BalancerGetMeta(); + if (!meta) { + throw new Error('Missing meta data'); + } + return meta; + }); } public async getProtocolData(args: BalancerProtocolDataQueryVariables): Promise { - const { balancers } = await this.sdk.BalancerProtocolData(args); - - if (balancers.length === 0) { - throw new Error('Missing protocol data'); - } - - //There is only ever one - return balancers[0] as Balancer; + return this.retryOnFailure(async (sdk) => { + const { balancers } = await sdk.BalancerProtocolData(args); + if (balancers.length === 0) { + throw new Error('Missing protocol data'); + } + return balancers[0] as Balancer; + }); } public async getTokenPrices(args: BalancerTokenPricesQueryVariables): Promise { - return this.sdk.BalancerTokenPrices(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerTokenPrices(args); + }); } public async getTokens(args: BalancerTokensQueryVariables): Promise { - return this.sdk.BalancerTokens(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerTokens(args); + }); } public async getPoolSnapshots(args: BalancerPoolSnapshotsQueryVariables): Promise { - return this.sdk.BalancerPoolSnapshots(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerPoolSnapshots(args); + }); } public async getAllPoolSnapshots( args: BalancerPoolSnapshotsQueryVariables, ): Promise { - return subgraphLoadAll(this.sdk.BalancerPoolSnapshots, 'poolSnapshots', args); + return this.retryOnFailure(async (sdk) => { + return subgraphLoadAll(sdk.BalancerPoolSnapshots, 'poolSnapshots', args); + }); } public async getPools(args: BalancerPoolsQueryVariables): Promise { - return this.sdk.BalancerPools(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerPools(args); + }); } public async getSwaps(args: BalancerSwapsQueryVariables): Promise { - return this.sdk.BalancerSwaps(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerSwaps(args); + }); } public async getAllSwaps(args: BalancerSwapsQueryVariables): Promise { - return subgraphLoadAll(this.sdk.BalancerSwaps, 'swaps', args); + return this.retryOnFailure(async (sdk) => { + return subgraphLoadAll(sdk.BalancerSwaps, 'swaps', args); + }); } public async getAllSwapsWithPaging({ @@ -115,208 +127,290 @@ export class BalancerSubgraphService { }: Pick & { startTimestamp: number }): Promise< BalancerSwapFragment[] > { - const limit = 1000; - let timestamp = startTimestamp; - let hasMore = true; - let swaps: BalancerSwapFragment[] = []; - - while (hasMore) { - const response = await this.sdk.BalancerSwaps({ - where: { ...where, timestamp_gt: timestamp }, - block, - orderBy: Swap_OrderBy.Timestamp, - orderDirection: OrderDirection.Asc, - first: limit, - }); - - swaps = [...swaps, ...response.swaps]; - - if (response.swaps.length < limit) { - hasMore = false; - } else { - timestamp = response.swaps[response.swaps.length - 1].timestamp; + return this.retryOnFailure(async (sdk) => { + const limit = 1000; + let timestamp = startTimestamp; + let hasMore = true; + let swaps: BalancerSwapFragment[] = []; + + while (hasMore) { + const response = await sdk.BalancerSwaps({ + where: { ...where, timestamp_gt: timestamp }, + block, + orderBy: Swap_OrderBy.Timestamp, + orderDirection: OrderDirection.Asc, + first: limit, + }); + + swaps = [...swaps, ...response.swaps]; + if (response.swaps.length < limit) { + hasMore = false; + } else { + timestamp = response.swaps[response.swaps.length - 1].timestamp; + } } - } - return swaps; + return swaps; + }); } public async getAllGradualWeightUpdates( args: BalancerGradualWeightUpdatesQueryVariables, ): Promise { - return subgraphLoadAll( - this.sdk.BalancerGradualWeightUpdates, - 'gradualWeightUpdates', - args, - ); + return this.retryOnFailure(async (sdk) => { + return subgraphLoadAll( + sdk.BalancerGradualWeightUpdates, + 'gradualWeightUpdates', + args, + ); + }); } public async getAllAmpUpdates(args: BalancerAmpUpdatesQueryVariables): Promise { - return subgraphLoadAll(this.sdk.BalancerAmpUpdates, 'ampUpdates', args); + return this.retryOnFailure(async (sdk) => { + return subgraphLoadAll(sdk.BalancerAmpUpdates, 'ampUpdates', args); + }); } public async getPool(args: BalancerPoolQueryVariables): Promise { - return this.sdk.BalancerPool(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerPool(args); + }); } public async getPortfolioData(id: string, previousBlockNumber: number): Promise { - return this.sdk.BalancerPortfolioData({ id, previousBlockNumber }); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerPortfolioData({ id, previousBlockNumber }); + }); } public async getUser(userAddress: string): Promise { - const { users } = await this.sdk.BalancerUsers({ where: { id: userAddress.toLowerCase() }, first: 1 }); - - if (users.length === 0) { - return null; - } - - return this.normalizeBalancerUser(users[0]); + return this.retryOnFailure(async (sdk) => { + const { users } = await sdk.BalancerUsers({ where: { id: userAddress.toLowerCase() }, first: 1 }); + if (users.length === 0) { + return null; + } + return this.normalizeBalancerUser(users[0]); + }); } public async getAllUsers(args: BalancerUsersQueryVariables): Promise { - const users = await subgraphLoadAll(this.sdk.BalancerUsers, 'users', args); - - return users.map((user) => this.normalizeBalancerUser(user)); + return this.retryOnFailure(async (sdk) => { + const users = await subgraphLoadAll(sdk.BalancerUsers, 'users', args); + return users.map((user) => this.normalizeBalancerUser(user)); + }); } public async getPoolShares(args: BalancerPoolSharesQueryVariables): Promise { - const { poolShares } = await this.sdk.BalancerPoolShares(args); - - return poolShares.map((shares) => ({ - ...shares, - //ensure the user balance isn't negative, unsure how the subgraph ever allows this to happen - balance: parseFloat(shares.balance) < 0 ? '0' : shares.balance, - poolId: shares.poolId.id, - poolAddress: shares.id.split('-')[0], - userAddress: shares.id.split('-')[1], - })); + return this.retryOnFailure(async (sdk) => { + const { poolShares } = await sdk.BalancerPoolShares(args); + return poolShares.map((shares) => ({ + ...shares, + balance: parseFloat(shares.balance) < 0 ? '0' : shares.balance, + poolId: shares.poolId.id, + poolAddress: shares.id.split('-')[0], + userAddress: shares.id.split('-')[1], + })); + }); } public async getAllPoolSharesWithBalance( poolIds: string[], excludedAddresses: string[], ): Promise { - const allPoolShares: BalancerPoolShareFragment[] = []; - let hasMore = true; - let id = `0`; - const pageSize = 1000; - - while (hasMore) { - const shares = await this.sdk.BalancerPoolShares({ - where: { - id_gt: id, - poolId_in: poolIds.length > 0 ? poolIds : undefined, - userAddress_not_in: excludedAddresses, - balance_gt: '0', - }, - orderBy: PoolShare_OrderBy.Id, - orderDirection: OrderDirection.Asc, - first: pageSize, - }); - - if (shares.poolShares.length === 0) { - break; + return this.retryOnFailure(async (sdk) => { + const allPoolShares: BalancerPoolShareFragment[] = []; + let hasMore = true; + let id = `0`; + const pageSize = 1000; + + while (hasMore) { + const shares = await sdk.BalancerPoolShares({ + where: { + id_gt: id, + poolId_in: poolIds.length > 0 ? poolIds : undefined, + userAddress_not_in: excludedAddresses, + balance_gt: '0', + }, + orderBy: PoolShare_OrderBy.Id, + orderDirection: OrderDirection.Asc, + first: pageSize, + }); + + if (shares.poolShares.length === 0) { + break; + } + + if (shares.poolShares.length < pageSize) { + hasMore = false; + } + + allPoolShares.push(...shares.poolShares); + id = shares.poolShares[shares.poolShares.length - 1].id; } - if (shares.poolShares.length < pageSize) { - hasMore = false; - } - - allPoolShares.push(...shares.poolShares); - id = shares.poolShares[shares.poolShares.length - 1].id; - } - - return allPoolShares.map((shares) => ({ - ...shares, - //ensure the user balance isn't negative, unsure how the subgraph ever allows this to happen - balance: parseFloat(shares.balance) < 0 ? '0' : shares.balance, - poolId: shares.poolId.id, - poolAddress: shares.id.split('-')[0], - userAddress: shares.id.split('-')[1], - })); + return allPoolShares.map((shares) => ({ + ...shares, + balance: parseFloat(shares.balance) < 0 ? '0' : shares.balance, + poolId: shares.poolId.id, + poolAddress: shares.id.split('-')[0], + userAddress: shares.id.split('-')[1], + })); + }); } public async getLatestPrices(args: BalancerLatestPricesQueryVariables): Promise { - return this.sdk.BalancerLatestPrices(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerLatestPrices(args); + }); } public async getLatestPrice(id: string): Promise { - const { latestPrice } = await this.sdk.BalancerLatestPrice({ id }); - - return latestPrice || null; + return this.retryOnFailure(async (sdk) => { + const { latestPrice } = await sdk.BalancerLatestPrice({ id }); + return latestPrice || null; + }); } public async getAllTokenPrices(args: BalancerTokenPricesQueryVariables): Promise { - return subgraphLoadAll(this.sdk.BalancerTokenPrices, 'tokenPrices', args); + return this.retryOnFailure(async (sdk) => { + return subgraphLoadAll(sdk.BalancerTokenPrices, 'tokenPrices', args); + }); } - // we dont sync linear pools anymore public async getAllPools( args: BalancerPoolsQueryVariables, applyTotalSharesFilter = true, ): Promise { - return subgraphLoadAll(this.sdk.BalancerPools, 'pools', { - ...args, - where: { - totalShares_not: applyTotalSharesFilter ? '0.00000000001' : undefined, - poolType_not_contains_nocase: 'linear', - ...args.where, - }, + return this.retryOnFailure(async (sdk) => { + return subgraphLoadAll(sdk.BalancerPools, 'pools', { + ...args, + where: { + totalShares_not: applyTotalSharesFilter ? '0.00000000001' : undefined, + poolType_not_contains_nocase: 'linear', + ...args.where, + }, + }); }); } public async getPoolJoinExits(args: BalancerJoinExitsQueryVariables): Promise { - return this.sdk.BalancerJoinExits(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerJoinExits(args); + }); } public async getPortfolioPoolsData(previousBlockNumber: number): Promise { - const cached = this.cache.get( - `${PORTFOLIO_POOLS_CACHE_KEY}:${this.chainId}`, - ) as BalancerPortfolioPoolsDataQuery | null; - - if (cached) { - return cached; - } + return this.retryOnFailure(async (sdk) => { + const cached = this.cache.get( + `${PORTFOLIO_POOLS_CACHE_KEY}:${this.chainId}`, + ) as BalancerPortfolioPoolsDataQuery | null; - const portfolioPools = await this.sdk.BalancerPortfolioPoolsData({ previousBlockNumber }); + if (cached) { + return cached; + } - this.cache.put(`${PORTFOLIO_POOLS_CACHE_KEY}:${this.chainId}`, portfolioPools, fiveMinutesInMs); + const portfolioPools = await sdk.BalancerPortfolioPoolsData({ previousBlockNumber }); + this.cache.put(`${PORTFOLIO_POOLS_CACHE_KEY}:${this.chainId}`, portfolioPools, fiveMinutesInMs); - return portfolioPools; + return portfolioPools; + }); } public async getAllPoolsAtBlock(block: number): Promise { - const cached = this.cache.get(`${ALL_POOLS_CACHE_KEY}:${this.chainId}:${block}`) as - | BalancerPoolFragment[] - | null; - - if (cached) { - return cached; - } - - const { pools } = await this.sdk.BalancerPools({ - first: 1000, - where: { totalShares_gt: '0' }, - block: { number: block }, - }); + return this.retryOnFailure(async (sdk) => { + const cached = this.cache.get(`${ALL_POOLS_CACHE_KEY}:${this.chainId}:${block}`) as + | BalancerPoolFragment[] + | null; - this.cache.put(`${ALL_POOLS_CACHE_KEY}:${this.chainId}:${block}`, pools, twentyFourHoursInMs); + if (cached) { + return cached; + } - return pools; + const { pools } = await sdk.BalancerPools({ + first: 1000, + where: { totalShares_gt: '0' }, + block: { number: block }, + }); + + this.cache.put(`${ALL_POOLS_CACHE_KEY}:${this.chainId}:${block}`, pools, twentyFourHoursInMs); + return pools; + }); } public async getTradePairSnapshots( args: BalancerTradePairSnapshotsQueryVariables, ): Promise { - return this.sdk.BalancerTradePairSnapshots(args); + return this.retryOnFailure(async (sdk) => { + return sdk.BalancerTradePairSnapshots(args); + }); } public async getPoolsWithActiveUpdates(timestamp: number): Promise { - const { ampUpdates, gradualWeightUpdates } = await this.sdk.BalancerGetPoolsWithActiveUpdates({ - timestamp: `${timestamp}`, + return this.retryOnFailure(async (sdk) => { + const { ampUpdates, gradualWeightUpdates } = await sdk.BalancerGetPoolsWithActiveUpdates({ + timestamp: `${timestamp}`, + }); + + return [...ampUpdates, ...gradualWeightUpdates].map((item) => item.poolId.id); }); + } - return [...ampUpdates, ...gradualWeightUpdates].map((item) => item.poolId.id); + public async getSnapshotsForTimestamp(timestamp: number): Promise { + return this.retryOnFailure(async (sdk) => { + const limit = 1000; + let hasMore = true; + let id = `0x`; + let snapshots: BalancerPoolSnapshotFragment[] = []; + + while (hasMore) { + const response = await sdk.BalancerPoolSnapshots({ + where: { timestamp, id_gt: id }, + orderBy: PoolSnapshot_OrderBy.Id, + orderDirection: OrderDirection.Asc, + first: limit, + }); + + snapshots = [...snapshots, ...response.poolSnapshots]; + + if (response.poolSnapshots.length < limit) { + hasMore = false; + } else { + id = snapshots[snapshots.length - 1].id; + } + } + + return snapshots; + }); + } + + public async getAllPoolBalances({ where, block }: PoolBalancesQueryVariables): Promise { + return this.retryOnFailure(async (sdk) => { + const limit = 1000; + let hasMore = true; + let id = `0x`; + let data: PoolBalancesFragment[] = []; + + while (hasMore) { + const response = await sdk.PoolBalances({ + where: { ...where, id_gt: id }, + orderBy: Pool_OrderBy.Id, + orderDirection: OrderDirection.Asc, + first: limit, + block, + }); + + data = [...data, ...response.pools]; + + if (response.pools.length < limit) { + hasMore = false; + } else { + id = response.pools[response.pools.length - 1].id; + } + } + + return data; + }); } private normalizeBalancerUser(user: BalancerUserFragment): BalancerUserFragment { diff --git a/modules/subgraphs/balancer-subgraph/index.ts b/modules/subgraphs/balancer-subgraph/index.ts index ea704ce9f..a6ba45174 100644 --- a/modules/subgraphs/balancer-subgraph/index.ts +++ b/modules/subgraphs/balancer-subgraph/index.ts @@ -9,15 +9,17 @@ import { getSdk, } from './generated/balancer-subgraph-types'; import { BalancerSubgraphService } from './balancer-subgraph.service'; +import { wrapSdkWithRetryAndRotation } from '../../sources/subgraphs/retry-on-failure'; export type V2SubgraphClient = ReturnType; -export function getV2SubgraphClient(url: string, chainId: number) { - const sdk = getSdk(new GraphQLClient(url)); +export function getV2SubgraphClient(urls: string[], chainId: number) { + const sdkClients = urls.map((url) => getSdk(new GraphQLClient(url))); + const sdkWithRetryAndRotation = wrapSdkWithRetryAndRotation(sdkClients); return { - ...sdk, - legacyService: new BalancerSubgraphService(url, chainId), + ...sdkWithRetryAndRotation, + legacyService: new BalancerSubgraphService(urls, chainId), async getSnapshotsForTimestamp(timestamp: number): Promise { const limit = 1000; let hasMore = true; @@ -25,7 +27,7 @@ export function getV2SubgraphClient(url: string, chainId: number) { let snapshots: BalancerPoolSnapshotFragment[] = []; while (hasMore) { - const response = await sdk.BalancerPoolSnapshots({ + const response = await sdkWithRetryAndRotation.BalancerPoolSnapshots({ where: { timestamp, id_gt: id }, orderBy: PoolSnapshot_OrderBy.Id, orderDirection: OrderDirection.Asc, @@ -50,7 +52,7 @@ export function getV2SubgraphClient(url: string, chainId: number) { let data: PoolBalancesFragment[] = []; while (hasMore) { - const response = await sdk.PoolBalances({ + const response = await sdkWithRetryAndRotation.PoolBalances({ where: { ...where, id_gt: id }, orderBy: Pool_OrderBy.Id, orderDirection: OrderDirection.Asc, diff --git a/modules/token/latest-fx-price.ts b/modules/token/latest-fx-price.ts index 2eb090661..5571ea1ad 100644 --- a/modules/token/latest-fx-price.ts +++ b/modules/token/latest-fx-price.ts @@ -1,7 +1,7 @@ -import { GraphQLClient } from 'graphql-request'; -import { getSdk } from '../subgraphs/balancer-subgraph/generated/balancer-subgraph-types'; import { prisma } from '../../prisma/prisma-client'; import { Chain } from '@prisma/client'; +import { getV2SubgraphClient } from '../subgraphs/balancer-subgraph'; +import { chainToIdMap } from '../network/network-config'; /** * 'Latest FX Price' is relevant only to FX pools. It is sourced from offchain platforms, like Chainlink. @@ -11,8 +11,8 @@ import { Chain } from '@prisma/client'; * * Note: 'LatestFXPrice' is a dependency of SORv2. */ -export const syncLatestFXPrices = async (subgraphUrl: string, chain: Chain) => { - const { pools } = await fetchFxPools(subgraphUrl); +export const syncLatestFXPrices = async (subgraphUrls: string[], chain: Chain) => { + const { pools } = await fetchFxPools(subgraphUrls, chain); for (const pool of pools) { const { tokens } = pool; @@ -40,8 +40,8 @@ export const syncLatestFXPrices = async (subgraphUrl: string, chain: Chain) => { return true; }; -const fetchFxPools = (subgraphUrl: string) => { - const sdk = getSdk(new GraphQLClient(subgraphUrl)); +const fetchFxPools = (subgraphUrls: string[], chain: Chain) => { + const sdk = getV2SubgraphClient(subgraphUrls, Number(chainToIdMap[chain])); return sdk.BalancerPools({ where: { poolType: 'FX' },