diff --git a/background-jobs/update-rays-cron-function/src/index.ts b/background-jobs/update-rays-cron-function/src/index.ts index caa745372b..dce0d4eecd 100644 --- a/background-jobs/update-rays-cron-function/src/index.ts +++ b/background-jobs/update-rays-cron-function/src/index.ts @@ -6,7 +6,7 @@ import { getSummerPointsSubgraphClient } from '@summerfi/summer-events-subgraph' import { ChainId } from '@summerfi/serverless-shared' import { PositionPoints, SummerPointsService } from './point-accrual' import { positionIdResolver } from './position-id-resolver' -import { Kysely } from 'kysely' +import { Kysely, Transaction } from 'kysely' const logger = new Logger({ serviceName: 'update-rays-cron-function' }) @@ -17,32 +17,57 @@ const FOURTEEN_DAYS_IN_MILLISECONDS = 14 * 24 * 60 * 60 * 1000 const SIXTY_DAYS_IN_MILLISECONDS = 60 * 24 * 60 * 60 const THIRTY_DAYS_IN_MILLISECONDS = 30 * 24 * 60 * 60 * 1000 -enum EligibilityConditionType { +enum UserMultiplier { + PROTOCOL_BOOST = 'PROTOCOL_BOOST', + SWAP = 'SWAP', +} + +enum RetroPointDistribution { + SNAPSHOT_GENERAL = 'Snapshot_General', + SNAPSHOT_DEFI = 'Snapshot_Defi', + SNAPSHOT_SUMMER = 'Snapshot_Summer', + SNAPSHOT_SUMMER_POWER = 'Snapshot_SummerPower', +} + +enum OngoingPointDistribution { + OPEN_POSITION = 'OPEN_POSITION', + MIGRATION = 'MIGRATION', + SWAP = 'SWAP', +} + +enum PositionMultiplier { + TIME_OPEN = 'TIME_OPEN', + AUTOMATION = 'AUTOMATION', + LAZY_VAULT = 'LAZY_VAULT', +} + +enum EligibilityCondition { POSITION_OPEN_TIME = 'POSITION_OPEN_TIME', POINTS_EXPIRED = 'POINTS_EXPIRED', BECOME_SUMMER_USER = 'BECOME_SUMMER_USER', } + type Eligibility = { - type: EligibilityConditionType + type: EligibilityCondition description: string metadata: Record> } -const eligibilityConditions: Record = { - [EligibilityConditionType.POSITION_OPEN_TIME]: { - type: EligibilityConditionType.POSITION_OPEN_TIME, +const eligibilityConditions: Record = { + [EligibilityCondition.POSITION_OPEN_TIME]: { + type: EligibilityCondition.POSITION_OPEN_TIME, description: 'The position must be open for at least 30 days', metadata: { minDays: 30, }, }, - [EligibilityConditionType.POINTS_EXPIRED]: { - type: EligibilityConditionType.POINTS_EXPIRED, + [EligibilityCondition.POINTS_EXPIRED]: { + type: EligibilityCondition.POINTS_EXPIRED, description: 'The points have expired', metadata: {}, }, - [EligibilityConditionType.BECOME_SUMMER_USER]: { - type: EligibilityConditionType.BECOME_SUMMER_USER, + [EligibilityCondition.BECOME_SUMMER_USER]: { + type: EligibilityCondition.BECOME_SUMMER_USER, description: 'Must use summer for at least 14 days with at least 500 USD', metadata: { minDays: 14, @@ -165,67 +190,94 @@ export const handler = async ( return } - const points = await pointAccuralService.accruePoints(startTimestamp, endTimestamp) - const sortedPoints = points.sort((a, b) => a.positionId.localeCompare(b.positionId)) + const accruedPointsFromSnapshot = await pointAccuralService.accruePoints( + startTimestamp, + endTimestamp, + ) + const sortedAccruedPointsFromSnapshot = accruedPointsFromSnapshot.sort((a, b) => + a.positionId.localeCompare(b.positionId), + ) - await checkMigrationEligibility(db, sortedPoints) - await checkOpenedPositionEligibility(db, sortedPoints) - await insertAllMissingUsers(sortedPoints, db) + await checkMigrationEligibility(db, sortedAccruedPointsFromSnapshot) + await checkOpenedPositionEligibility(db, sortedAccruedPointsFromSnapshot) + await insertAllMissingUsers(sortedAccruedPointsFromSnapshot, db) - const chunkedPoints: PositionPoints[] = createChunksOfUserPointsDistributions(sortedPoints, 30) - await db.transaction().execute(async (transaction) => { - for (let i = 0; i < chunkedPoints.length; i++) { - logger.info(`Processing: Chunk ${i} of ${chunkedPoints.length}`) - const chunk = chunkedPoints[i] - const addressesForChunk = chunk.map((c) => c.user) - const positionsForChunk = chunk.map((c) => c.positionId) + const chunkedPoints: PositionPoints[] = createChunksOfUserPointsDistributions( + sortedAccruedPointsFromSnapshot, + 30, + ) - const userAddresses = await db - .selectFrom('userAddress') - .where('address', 'in', addressesForChunk) - .selectAll() - .execute() - const positions = await db - .selectFrom('position') - .where('externalId', 'in', positionsForChunk) - .selectAll() - .execute() + // Get all unique addresses and positions from all chunks + const uniqueAddressesFromSnapshot = Array.from( + new Set(chunkedPoints.flatMap((chunk) => chunk.map((c) => c.user))), + ) + const allPositions = Array.from( + new Set(chunkedPoints.flatMap((chunk) => chunk.map((c) => c.positionId))), + ) - const usersMultipliers = await db - .selectFrom('multiplier') - .innerJoin('userAddress', 'multiplier.userAddressId', 'userAddress.id') - .where('userAddress.address', 'in', addressesForChunk) - .select([ - 'multiplier.value', - 'multiplier.type', - 'multiplier.id', - 'multiplier.userAddressId', - 'multiplier.positionId', - ]) - .execute() + // Fetch all necessary data for all chunks at once + const uniqueUserAddressesFromDatabase = await db + .selectFrom('userAddress') + .where('address', 'in', uniqueAddressesFromSnapshot) + .selectAll() + .execute() + + const positionsFromDatabase = await db + .selectFrom('position') + .where('externalId', 'in', allPositions) + .selectAll() + .execute() + + const usersMultipliersFromDatabase = await db + .selectFrom('multiplier') + .innerJoin('userAddress', 'multiplier.userAddressId', 'userAddress.id') + .where('userAddress.address', 'in', uniqueAddressesFromSnapshot) + .select([ + 'multiplier.value', + 'multiplier.type', + 'multiplier.id', + 'multiplier.userAddressId', + 'multiplier.positionId', + ]) + .execute() + const positionsMultipliersFromDatabase = await db + .selectFrom('multiplier') + .innerJoin('position', 'multiplier.positionId', 'position.id') + .where('position.externalId', 'in', allPositions) + .select([ + 'multiplier.value', + 'multiplier.type', + 'multiplier.id', + 'multiplier.userAddressId', + 'multiplier.positionId', + ]) + .execute() - const positionsMultipliers = await db - .selectFrom('multiplier') - .innerJoin('position', 'multiplier.positionId', 'position.id') - .where('position.externalId', 'in', positionsForChunk) - .select([ - 'multiplier.value', - 'multiplier.type', - 'multiplier.id', - 'multiplier.userAddressId', - 'multiplier.positionId', - ]) - .execute() + await db.transaction().execute(async (transaction) => { + await addOrUpdateUserMultipliers( + uniqueAddressesFromSnapshot, + uniqueUserAddressesFromDatabase, + accruedPointsFromSnapshot, + usersMultipliersFromDatabase, + transaction, + ) + + for (let i = 0; i < chunkedPoints.length; i++) { + const startTime = process.hrtime() + logger.info(`Processing: Chunk ${i + 1} of ${chunkedPoints.length}`) + const chunk = chunkedPoints[i] for (const record of chunk) { - const userAddress = userAddresses.find((ua) => ua.address === record.user) + const userAddress = uniqueUserAddressesFromDatabase.find( + (ua) => ua.address === record.user, + ) if (!userAddress) { throw new Error('User address not found') } const positionId = positionIdResolver(record.positionId) - let position = positions.find((p) => p.externalId === record.positionId) + let position = positionsFromDatabase.find((p) => p.externalId === record.positionId) if (!position) { position = await transaction .insertInto('position') @@ -250,7 +302,7 @@ export const handler = async ( description: 'Points for opening a position', points: record.points.openPositionsPoints, positionId: position.id, - type: 'OPEN_POSITION', + type: OngoingPointDistribution.OPEN_POSITION, }) .executeTakeFirstOrThrow() } @@ -277,7 +329,7 @@ export const handler = async ( description: 'Points for migrations', points: record.points.migrationPoints, positionId: position.id, - type: 'MIGRATION', + type: OngoingPointDistribution.MIGRATION, eligibilityConditionId: eligibilityCondition.id, }) .executeTakeFirstOrThrow() @@ -290,7 +342,7 @@ export const handler = async ( description: 'Points for swap', points: record.points.swapPoints, positionId: position.id, - type: 'SWAP', + type: OngoingPointDistribution.SWAP, }) .executeTakeFirstOrThrow() } @@ -299,62 +351,23 @@ export const handler = async ( // protocolBoostMultiplier: -> user multiplier -> type = 'PROTOCOL_BOOST' // swapMultiplier: number -> user multiplier -> type = 'SWAP' // timeOpenMultiplier: number -> position multiplier -> type = 'TIME_OPEN' - // automationProtectionMultiplier: number -> position multiplier -> type = 'AUTOMATION' + // automationProtectionMultiplier: number -> position multiplier -> type = PositionMultiplier.AUTOMATION // lazyVaultMultiplier: number -> position multiplier -> type = 'LAZY_VAULT' - const userMultipliers = usersMultipliers.filter((m) => m.userAddressId === userAddress.id) - const positionMultipliers = positionsMultipliers.filter( + const positionMultipliers = positionsMultipliersFromDatabase.filter( (m) => m.positionId === position.id, ) - let procotolBoostMultiplier = userMultipliers.find((m) => m.type === 'PROTOCOL_BOOST') - - if (!procotolBoostMultiplier) { - procotolBoostMultiplier = await transaction - .insertInto('multiplier') - .values({ - userAddressId: userAddress.id, - type: 'PROTOCOL_BOOST', - value: record.multipliers.protocolBoostMultiplier, - }) - .returningAll() - .executeTakeFirstOrThrow() - } else { - await transaction - .updateTable('multiplier') - .set('value', record.multipliers.protocolBoostMultiplier) - .where('id', '=', procotolBoostMultiplier.id) - .execute() - } - - let swapMultiplier = userMultipliers.find((m) => m.type === 'SWAP') - - if (!swapMultiplier) { - swapMultiplier = await transaction - .insertInto('multiplier') - .values({ - userAddressId: userAddress.id, - type: 'SWAP', - value: record.multipliers.swapMultiplier, - }) - .returningAll() - .executeTakeFirstOrThrow() - } else { - await transaction - .updateTable('multiplier') - .set('value', record.multipliers.swapMultiplier) - .where('id', '=', swapMultiplier.id) - .execute() - } - - let timeOpenMultiplier = positionMultipliers.find((m) => m.type === 'TIME_OPEN') + let timeOpenMultiplier = positionMultipliers.find( + (m) => m.type === PositionMultiplier.TIME_OPEN, + ) if (!timeOpenMultiplier) { timeOpenMultiplier = await transaction .insertInto('multiplier') .values({ positionId: position.id, - type: 'TIME_OPEN', + type: PositionMultiplier.TIME_OPEN, value: record.multipliers.timeOpenMultiplier, }) .returningAll() @@ -368,7 +381,7 @@ export const handler = async ( } let automationProtectionMultiplier = positionMultipliers.find( - (m) => m.type === 'AUTOMATION', + (m) => m.type === PositionMultiplier.AUTOMATION, ) if (!automationProtectionMultiplier) { @@ -376,7 +389,7 @@ export const handler = async ( .insertInto('multiplier') .values({ positionId: position.id, - type: 'AUTOMATION', + type: PositionMultiplier.AUTOMATION, value: record.multipliers.automationProtectionMultiplier, }) .returningAll() @@ -389,13 +402,15 @@ export const handler = async ( .execute() } - const lazyVaultMultiplier = positionMultipliers.find((m) => m.type === 'LAZY_VAULT') + const lazyVaultMultiplier = positionMultipliers.find( + (m) => m.type === PositionMultiplier.LAZY_VAULT, + ) if (!lazyVaultMultiplier) { await transaction .insertInto('multiplier') .values({ positionId: position.id, - type: 'LAZY_VAULT', + type: PositionMultiplier.LAZY_VAULT, value: record.multipliers.lazyVaultMultiplier, }) .execute() @@ -416,7 +431,7 @@ export const handler = async ( endTimestamp: new Date(endTimestamp * 1000), startTimestamp: new Date(startTimestamp * 1000), metadata: { - positions: points.length, + positions: accruedPointsFromSnapshot.length, }, }) .executeTakeFirstOrThrow() @@ -426,6 +441,10 @@ export const handler = async ( .set('lastTimestamp', new Date(endTimestamp * 1000)) .where('id', '=', LAST_RUN_ID) .execute() + const endTime = process.hrtime(startTime) + logger.info( + `Chunk ${i} of ${chunkedPoints.length} took ${endTime[0]}s ${endTime[1] / 1000000}ms`, + ) } }) } catch (e) { @@ -443,6 +462,117 @@ export const handler = async ( export default handler +/** + * The `addOrUpdateUserMultipliers` function is an asynchronous function that updates or inserts multipliers for users. + * + * @param addressesFromSnapshot - An array of strings, where each string is an Ethereum or Solana address. + * + * @param uniqueUserAddressesFromDatabase - An array of objects, where each object represents a user address (from DB) . Each object has the following properties: + * - `address`: A string representing the Ethereum or Solana address of the user. + * - `createdAt`: A Date object representing when the user address was created. + * - `id`: A number representing the unique ID of the user address. + * - `type`: A string that can be either 'ETH' or 'SOL', representing the type of the address. + * - `updatedAt`: A Date object representing when the user address was last updated. + * - `userId`: A number representing the unique ID of the user. + * + * @param pointDistributions - An object representing the points of a position. The structure of this object depends on the `PositionPoints` type. + * + * @param usersMultipliersFromDatabase - An array of objects, where each object represents a multiplier. Each object has the following properties: + * - `userAddressId`: A number or null, representing the unique ID of the user address associated with the multiplier. If null, the multiplier is not associated with any user address. + * - `id`: A number representing the unique ID of the multiplier. + * - `type`: A string representing the type of the multiplier. + * - `positionId`: A number or null, representing the unique ID of the position associated with the multiplier. If null, the multiplier is not associated with any position. + * - `value`: A string representing the value of the multiplier. + * + * @param transaction - A Transaction object representing a database transaction. This transaction is used to execute the database operations. + * + * The function works as follows: + * 1. It iterates over each address in the `addressesFromSnapshot` array. + * 2. For each address, it finds the corresponding user address and points distribution. + * 3. If no user address or points distribution is found, it throws an error. + * 4. It then finds the multipliers associated with the user address. + * 5. For each type of multiplier (protocol boost and swap), it checks if a multiplier already exists. + * 6. If a multiplier does not exist, it inserts a new multiplier into the 'multiplier' table with the value from the points distribution. + * 7. If a multiplier does exist, it updates the value of the multiplier in the 'multiplier' table with the value from the points distribution. + */ +async function addOrUpdateUserMultipliers( + uniqueAddressesFromSnapshot: string[], + uniqueUserAddressesFromDatabase: { + address: string + createdAt: Date + id: number + type: 'ETH' | 'SOL' + updatedAt: Date + userId: number + }[], + pointDistributionsFromSnapshot: PositionPoints, + usersMultipliersFromDatabase: { + userAddressId: number | null + id: number + type: string + positionId: number | null + value: string + }[], + transaction: Transaction, +) { + for (const user of uniqueAddressesFromSnapshot) { + const userAddress = uniqueUserAddressesFromDatabase.find((ua) => ua.address === user) + // any points distribution attached to a specific user hold the same multipliers, hence we can take the first one + const userPointsDistribution = pointDistributionsFromSnapshot.find((p) => p.user === user) + + if (!userPointsDistribution) { + throw new Error('User points distribution not found') + } + if (!userAddress) { + throw new Error('User address not found') + } + + const userMultipliers = usersMultipliersFromDatabase.filter( + (m) => m.userAddressId === userAddress.id, + ) + let procotolBoostMultiplier = userMultipliers.find( + (m) => m.type === UserMultiplier.PROTOCOL_BOOST, + ) + if (!procotolBoostMultiplier) { + procotolBoostMultiplier = await transaction + .insertInto('multiplier') + .values({ + userAddressId: userAddress.id, + type: UserMultiplier.PROTOCOL_BOOST, + value: userPointsDistribution.multipliers.protocolBoostMultiplier, + }) + .returningAll() + .executeTakeFirstOrThrow() + } else { + await transaction + .updateTable('multiplier') + .set('value', userPointsDistribution.multipliers.protocolBoostMultiplier) + .where('id', '=', procotolBoostMultiplier.id) + .execute() + } + + let swapMultiplier = userMultipliers.find((m) => m.type === UserMultiplier.SWAP) + + if (!swapMultiplier) { + swapMultiplier = await transaction + .insertInto('multiplier') + .values({ + userAddressId: userAddress.id, + type: UserMultiplier.SWAP, + value: userPointsDistribution.multipliers.swapMultiplier, + }) + .returningAll() + .executeTakeFirstOrThrow() + } else { + await transaction + .updateTable('multiplier') + .set('value', userPointsDistribution.multipliers.swapMultiplier) + .where('id', '=', swapMultiplier.id) + .execute() + } + } +} + /** * Inserts missing users into the database. * @@ -452,7 +582,7 @@ export default handler */ async function insertAllMissingUsers(sortedPoints: PositionPoints, db: Kysely) { const uniqueUsers = new Set(sortedPoints.map((p) => p.user)) - const userAddresses = await db + const uniqueUserAddressesFromDatabase = await db .selectFrom('userAddress') .where('address', 'in', Array.from(uniqueUsers)) .selectAll() @@ -460,7 +590,7 @@ async function insertAllMissingUsers(sortedPoints: PositionPoints, db: Kysely { for (const user of uniqueUsers) { - const userAddress = userAddresses.find((ua) => ua.address === user) + const userAddress = uniqueUserAddressesFromDatabase.find((ua) => ua.address === user) if (!userAddress) { const result = await transaction .insertInto('blockchainUser') @@ -516,7 +646,7 @@ function createChunksOfUserPointsDistributions(sortedPoints: PositionPoints, chu * @param positionPoints - The position points. */ async function checkMigrationEligibility(db: Kysely, positionPoints: PositionPoints) { - const existingPointDistributionsWithEligibilityCondition = await db + const existingOngoingPointDistributionsWithEligibilityCondition = await db .selectFrom('pointsDistribution') .select(['pointsDistribution.id as pointsId']) .innerJoin( @@ -526,13 +656,13 @@ async function checkMigrationEligibility(db: Kysely, positionPoints: P ) .innerJoin('position', 'position.id', 'pointsDistribution.positionId') .where('eligibilityCondition.type', '=', eligibilityConditions.POSITION_OPEN_TIME.type) - .where('pointsDistribution.type', '=', 'MIGRATION') + .where('pointsDistribution.type', '=', OngoingPointDistribution.MIGRATION) .selectAll() .execute() - if (existingPointDistributionsWithEligibilityCondition.length > 0) { + if (existingOngoingPointDistributionsWithEligibilityCondition.length > 0) { await db.transaction().execute(async (transaction) => { - for (const point of existingPointDistributionsWithEligibilityCondition) { + for (const point of existingOngoingPointDistributionsWithEligibilityCondition) { if (point.dueDate && point.dueDate < new Date()) { const positionInSnapshot = positionPoints.find((p) => p.positionId === point.externalId) if (!positionInSnapshot || positionInSnapshot.netValue <= 0) { @@ -591,6 +721,13 @@ async function checkOpenedPositionEligibility( 'pointsDistribution.eligibilityConditionId', ) .leftJoin('userAddress', 'userAddress.id', 'pointsDistribution.userAddressId') + .where((eb) => + eb('pointsDistribution.type', '=', RetroPointDistribution.SNAPSHOT_GENERAL).or( + 'pointsDistribution.type', + '=', + RetroPointDistribution.SNAPSHOT_DEFI, + ), + ) .where('eligibilityCondition.type', '=', eligibilityConditions.BECOME_SUMMER_USER.type) .selectAll() .execute() @@ -600,15 +737,18 @@ async function checkOpenedPositionEligibility( for (const user of existingUsersWithEligibilityCondition) { if (user.dueDate && user.dueDate >= new Date()) { const eligiblePositionsFromPointsAccrual = positionPoints - .filter( - (p) => - p.netValue >= 500 && - p.positionCreated * 1000 < Date.now() - FOURTEEN_DAYS_IN_MILLISECONDS, - ) .filter((p) => p.user === user.address) + .filter((p) => { + console.log('p', p, user.id) + return ( + Number(p.netValue) >= 500 && + p.positionCreated * 1000 < Date.now() - FOURTEEN_DAYS_IN_MILLISECONDS + ) + }) .sort((a, b) => a.positionCreated - b.positionCreated) + if (eligiblePositionsFromPointsAccrual.length == 0) { - return + continue } else { const oldestEligiblePosition = eligiblePositionsFromPointsAccrual[0] const becomeSummerUserMultiplier = getBecomeSummerUserMultiplier( @@ -617,9 +757,16 @@ async function checkOpenedPositionEligibility( const pointsDistributions = await transaction .selectFrom('pointsDistribution') .where('userAddressId', '=', user.id) - .where((eb) => eb('type', '=', 'Snapshot_General').or('type', '=', 'Snapshot_Defi')) + .where((eb) => + eb('type', '=', RetroPointDistribution.SNAPSHOT_GENERAL).or( + 'type', + '=', + RetroPointDistribution.SNAPSHOT_DEFI, + ), + ) .selectAll() .execute() + logger.info(`pointsDistributions for ${user.id}`, JSON.stringify(pointsDistributions)) for (const pointsDistribution of pointsDistributions) { // update points distribution await transaction