From c7f4bcdf1ffc11243d4a45e9b608677fe5e68445 Mon Sep 17 00:00:00 2001 From: jairajdev Date: Thu, 30 Oct 2025 23:09:02 +0800 Subject: [PATCH 1/2] feat: Add cycle-based data synchronization mechanism - Implement SyncTracker interface for tracking lastSavedCycle across data types - Add cycleTracker, receiptTracker, and originalTxTracker instances - Create sync validation functions with 1-cycle buffer logic ( current cycle - 2 ) - Auto-initialize trackers on first use to prevent syncing from cycle 0 - Integrate checkAndSyncDataByCycle into insertOrUpdateCycle - Sync runs in background on new cycle data from websocket distributor --- src/class/DataSync.ts | 189 ++++++++++++++++++++++++++++++++++++++++++ src/server.ts | 2 +- src/storage/cycle.ts | 16 ++-- 3 files changed, 200 insertions(+), 7 deletions(-) diff --git a/src/class/DataSync.ts b/src/class/DataSync.ts index 92e7d9d..b671af7 100644 --- a/src/class/DataSync.ts +++ b/src/class/DataSync.ts @@ -23,6 +23,12 @@ interface queryFromDistributorParameters { endCycle?: number } +// Sync tracker interface for data synchronization based on cycles +interface SyncTracker { + lastSavedCycle: number + updateLastSavedCycle(cycle: number): void +} + export const queryFromDistributor = async ( type: DataType, queryParameters: queryFromDistributorParameters @@ -71,6 +77,31 @@ export const queryFromDistributor = async ( } } +// Tracker instances for data synchronization +export const cycleTracker: SyncTracker = { + lastSavedCycle: 0, + updateLastSavedCycle(cycle: number): void { + console.log(`Updating lastSavedCycle from ${this.lastSavedCycle} to ${cycle}`) + this.lastSavedCycle = cycle + }, +} + +export const receiptTracker: SyncTracker = { + lastSavedCycle: 0, + updateLastSavedCycle(cycle: number): void { + console.log(`Updating lastSavedReceiptCycle from ${this.lastSavedCycle} to ${cycle}`) + this.lastSavedCycle = cycle + }, +} + +export const originalTxTracker: SyncTracker = { + lastSavedCycle: 0, + updateLastSavedCycle(cycle: number): void { + console.log(`Updating lastSavedOriginalTxCycle from ${this.lastSavedCycle} to ${cycle}`) + this.lastSavedCycle = cycle + }, +} + export async function compareWithOldReceiptsData( lastStoredReceiptCycle = 0 ): Promise<{ success: boolean; matchedCycle: number }> { @@ -541,6 +572,164 @@ export async function downloadOriginalTxsDataByCycle( } } +/** + * Calculate the safe sync cycle with buffer + * If new cycle is 9, we can safely sync up to cycle 7 (9 - 2) + * This maintains a 1-cycle buffer for safety + * @param newCycle - The current/latest cycle number + * @returns The cycle number that is safe to sync up to + */ +export function getSafeSyncCycle(newCycle: number): number { + const CYCLE_BUFFER = 2 + return Math.max(0, newCycle - CYCLE_BUFFER) +} + +/** + * Check if data synchronization should be performed for a given data type + * @param tracker - The sync tracker for the data type + * @param newCycle - The current/latest cycle number + * @returns Object with shouldSync flag and targetCycle to sync up to + */ +export function shouldSyncData( + tracker: SyncTracker, + newCycle: number +): { shouldSync: boolean; targetCycle: number; startCycle: number } { + const targetCycle = getSafeSyncCycle(newCycle) + + // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle + // This prevents trying to sync from cycle 0 on first run + if (tracker.lastSavedCycle === 0) { + tracker.lastSavedCycle = targetCycle + if (config.verbose) { + console.log(`Initialized tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})`) + } + } + + const startCycle = tracker.lastSavedCycle + 1 + const shouldSync = targetCycle > tracker.lastSavedCycle + + if (config.verbose) { + console.log( + `Sync check: lastSaved=${tracker.lastSavedCycle}, current=${newCycle}, target=${targetCycle}, shouldSync=${shouldSync}` + ) + } + + return { + shouldSync, + targetCycle, + startCycle, + } +} + +/** + * Validate and synchronize receipts data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data + */ +export async function validateAndSyncReceipts(newCycle: number): Promise { + const syncInfo = shouldSyncData(receiptTracker, newCycle) + + if (!syncInfo.shouldSync) { + if (config.verbose) console.log('Receipts are already synchronized') + return + } + + console.log( + `Syncing receipts from cycle ${syncInfo.startCycle} to ${syncInfo.targetCycle} (current cycle: ${newCycle})` + ) + + try { + // Compare and validate receipts count between cycles + const unmatchedCycles = await compareReceiptsCountByCycles(syncInfo.startCycle, syncInfo.targetCycle) + + if (unmatchedCycles && unmatchedCycles.length > 0) { + console.log(`Found ${unmatchedCycles.length} cycles with mismatched receipt counts`) + await downloadReceiptsByCycle(unmatchedCycles) + } + + // Update the tracker after successful sync + receiptTracker.updateLastSavedCycle(syncInfo.targetCycle) + console.log(`✅ Receipts synchronized up to cycle ${syncInfo.targetCycle}`) + } catch (error) { + console.error('Error syncing receipts:', error) + } +} + +/** + * Validate and synchronize original transactions data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data + */ +export async function validateAndSyncOriginalTxs(newCycle: number): Promise { + const syncInfo = shouldSyncData(originalTxTracker, newCycle) + + if (!syncInfo.shouldSync) { + if (config.verbose) console.log('OriginalTxs are already synchronized') + return + } + + console.log( + `Syncing originalTxs from cycle ${syncInfo.startCycle} to ${syncInfo.targetCycle} (current cycle: ${newCycle})` + ) + + try { + // Compare and validate originalTxs count between cycles + const unmatchedCycles = await compareOriginalTxsCountByCycles(syncInfo.startCycle, syncInfo.targetCycle) + + if (unmatchedCycles && unmatchedCycles.length > 0) { + console.log(`Found ${unmatchedCycles.length} cycles with mismatched originalTx counts`) + await downloadOriginalTxsDataByCycle(unmatchedCycles) + } + + // Update the tracker after successful sync + originalTxTracker.updateLastSavedCycle(syncInfo.targetCycle) + console.log(`✅ OriginalTxs synchronized up to cycle ${syncInfo.targetCycle}`) + } catch (error) { + console.error('Error syncing originalTxs:', error) + } +} + +/** + * Validate and synchronize cycle data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data + */ +export async function validateAndSyncCycles(newCycle: number): Promise { + const syncInfo = shouldSyncData(cycleTracker, newCycle) + + if (!syncInfo.shouldSync) { + if (config.verbose) console.log('Cycles are already synchronized') + return + } + + console.log( + `Syncing cycles from ${syncInfo.startCycle} to ${syncInfo.targetCycle} (current cycle: ${currentCycle})` + ) + + try { + await downloadCyclcesBetweenCycles(syncInfo.startCycle, syncInfo.targetCycle, true) + + // Update the tracker after successful sync + cycleTracker.updateLastSavedCycle(syncInfo.targetCycle) + console.log(`✅ Cycles synchronized up to cycle ${syncInfo.targetCycle}`) + } catch (error) { + console.error('Error syncing cycles:', error) + } +} + +/** + * Check and synchronize all data types based on the current cycle + * This should be called when new cycle data is inserted/updated + * @param currentCycle - The current cycle number from newly inserted/updated cycle + */ +export async function checkAndSyncDataByCycle(currentCycle: number): Promise { + if (config.verbose) console.log(`Checking and syncing data for cycle: ${currentCycle}`) + + // Run all sync operations in parallel + await Promise.all([ + validateAndSyncCycles(currentCycle), + validateAndSyncReceipts(currentCycle), + validateAndSyncOriginalTxs(currentCycle), + ]) +} + export const downloadCyclcesBetweenCycles = async ( startCycle: number, totalCyclesToSync: number, diff --git a/src/server.ts b/src/server.ts index bf578c6..39cb68d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -54,7 +54,7 @@ import { ValidatorStats } from './stats/validatorStats' import { TransactionStats, convertBaseTxStatsAsArray } from './stats/transactionStats' import { DailyTransactionStats } from './stats/dailyTransactionStats' import { DailyAccountStats } from './stats/dailyAccountStats' -import { DailyCoinStats, DailyCoinStatsSummary } from './stats/dailyCoinStats' +import { DailyCoinStats } from './stats/dailyCoinStats' if (config.env == envEnum.DEV) { //default debug mode diff --git a/src/storage/cycle.ts b/src/storage/cycle.ts index c28f0d4..6ddc964 100644 --- a/src/storage/cycle.ts +++ b/src/storage/cycle.ts @@ -5,17 +5,13 @@ import { config } from '../config/index' import { cleanOldReceiptsMap } from './receipt' import { cleanOldOriginalTxsMap } from './originalTxData' import { Utils as StringUtils } from '@shardus/types' +import { checkAndSyncDataByCycle } from '../class/DataSync' type DbCycle = Cycle & { cycleRecord: string } -const CYCLE_COLUMNS: readonly (keyof Cycle)[] = [ - 'cycleMarker', - 'counter', - 'start', - 'cycleRecord', -] as const +const CYCLE_COLUMNS: readonly (keyof Cycle)[] = ['cycleMarker', 'counter', 'start', 'cycleRecord'] as const export function isCycle(obj: Cycle): obj is Cycle { return (obj as Cycle).cycleRecord !== undefined && (obj as Cycle).cycleMarker !== undefined @@ -110,6 +106,14 @@ export async function insertOrUpdateCycle(cycle: Cycle): Promise { cleanOldReceiptsMap(CLEAN_UP_TIMESTMAP_MS) cleanOldOriginalTxsMap(CLEAN_UP_TIMESTMAP_MS) } + + // Trigger cycle-based synchronization check when new cycle data is received + if (cycleInfo.counter > 0) { + // Run sync in background + checkAndSyncDataByCycle(cycleInfo.counter).catch((error) => { + console.error('Error in checkAndSyncDataByCycle:', error) + }) + } } else { console.log('No cycleRecord or cycleMarker in cycle,', cycle) } From 945d9a22b8d8c215c4a3228b264f71da5e7ffc8f Mon Sep 17 00:00:00 2001 From: jairajdev Date: Fri, 31 Oct 2025 12:24:36 +0800 Subject: [PATCH 2/2] refactor: Optimize cycle-based sync implementation - Remove shouldSyncData helper and inline logic into each validateAndSync function - Move tracker updates outside try-catch to ensure progression on errors - Add isInSync optimization in validateAndSyncCycles to skip redundant downloads - Improve error logging with newCycle and targetCycle context - Log unmatched cycles array for better debugging visibility - Rename currentCycle parameter to newCycle for consistency --- src/class/DataSync.ts | 156 ++++++++++++++++++++---------------------- 1 file changed, 76 insertions(+), 80 deletions(-) diff --git a/src/class/DataSync.ts b/src/class/DataSync.ts index b671af7..d6f4de5 100644 --- a/src/class/DataSync.ts +++ b/src/class/DataSync.ts @@ -583,75 +583,45 @@ export function getSafeSyncCycle(newCycle: number): number { const CYCLE_BUFFER = 2 return Math.max(0, newCycle - CYCLE_BUFFER) } - /** - * Check if data synchronization should be performed for a given data type - * @param tracker - The sync tracker for the data type - * @param newCycle - The current/latest cycle number - * @returns Object with shouldSync flag and targetCycle to sync up to + * Validate and synchronize receipts data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data */ -export function shouldSyncData( - tracker: SyncTracker, - newCycle: number -): { shouldSync: boolean; targetCycle: number; startCycle: number } { +export async function validateAndSyncReceipts(newCycle: number): Promise { const targetCycle = getSafeSyncCycle(newCycle) // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle // This prevents trying to sync from cycle 0 on first run - if (tracker.lastSavedCycle === 0) { - tracker.lastSavedCycle = targetCycle + if (receiptTracker.lastSavedCycle === 0) { + receiptTracker.lastSavedCycle = targetCycle if (config.verbose) { - console.log(`Initialized tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})`) + console.log( + `Initialized receipt sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})` + ) } } - const startCycle = tracker.lastSavedCycle + 1 - const shouldSync = targetCycle > tracker.lastSavedCycle - - if (config.verbose) { - console.log( - `Sync check: lastSaved=${tracker.lastSavedCycle}, current=${newCycle}, target=${targetCycle}, shouldSync=${shouldSync}` - ) - } - - return { - shouldSync, - targetCycle, - startCycle, - } -} - -/** - * Validate and synchronize receipts data based on cycle tracker - * @param newCycle - The current/latest cycle number from incoming data - */ -export async function validateAndSyncReceipts(newCycle: number): Promise { - const syncInfo = shouldSyncData(receiptTracker, newCycle) + const startCycle = receiptTracker.lastSavedCycle + 1 + const shouldSyncCheck = targetCycle > receiptTracker.lastSavedCycle - if (!syncInfo.shouldSync) { + if (!shouldSyncCheck) { if (config.verbose) console.log('Receipts are already synchronized') return } - - console.log( - `Syncing receipts from cycle ${syncInfo.startCycle} to ${syncInfo.targetCycle} (current cycle: ${newCycle})` - ) - try { // Compare and validate receipts count between cycles - const unmatchedCycles = await compareReceiptsCountByCycles(syncInfo.startCycle, syncInfo.targetCycle) + const unmatchedCycles = await compareReceiptsCountByCycles(startCycle, targetCycle) if (unmatchedCycles && unmatchedCycles.length > 0) { - console.log(`Found ${unmatchedCycles.length} cycles with mismatched receipt counts`) + console.log(`Found ${unmatchedCycles.length} cycles with mismatched receipt counts`, unmatchedCycles) await downloadReceiptsByCycle(unmatchedCycles) } - - // Update the tracker after successful sync - receiptTracker.updateLastSavedCycle(syncInfo.targetCycle) - console.log(`✅ Receipts synchronized up to cycle ${syncInfo.targetCycle}`) } catch (error) { - console.error('Error syncing receipts:', error) + console.error('Error sync checking receipts:', newCycle, targetCycle, error) } + // Update the tracker after sync check + receiptTracker.updateLastSavedCycle(targetCycle) + console.log(`✅ Receipts synchronized up to cycle ${targetCycle}`) } /** @@ -659,32 +629,41 @@ export async function validateAndSyncReceipts(newCycle: number): Promise { * @param newCycle - The current/latest cycle number from incoming data */ export async function validateAndSyncOriginalTxs(newCycle: number): Promise { - const syncInfo = shouldSyncData(originalTxTracker, newCycle) + const targetCycle = getSafeSyncCycle(newCycle) - if (!syncInfo.shouldSync) { - if (config.verbose) console.log('OriginalTxs are already synchronized') - return + // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle + // This prevents trying to sync from cycle 0 on first run + if (originalTxTracker.lastSavedCycle === 0) { + originalTxTracker.lastSavedCycle = targetCycle + if (config.verbose) { + console.log( + `Initialized originalTx sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})` + ) + } } - console.log( - `Syncing originalTxs from cycle ${syncInfo.startCycle} to ${syncInfo.targetCycle} (current cycle: ${newCycle})` - ) + const startCycle = originalTxTracker.lastSavedCycle + 1 + const shouldSyncCheck = targetCycle > originalTxTracker.lastSavedCycle + if (!shouldSyncCheck) { + if (config.verbose) console.log('OriginalTxs are already synchronized') + return + } + console.log(`Syncing originalTxs from cycle ${startCycle} to ${targetCycle} (current cycle: ${newCycle})`) try { // Compare and validate originalTxs count between cycles - const unmatchedCycles = await compareOriginalTxsCountByCycles(syncInfo.startCycle, syncInfo.targetCycle) + const unmatchedCycles = await compareOriginalTxsCountByCycles(startCycle, targetCycle) if (unmatchedCycles && unmatchedCycles.length > 0) { - console.log(`Found ${unmatchedCycles.length} cycles with mismatched originalTx counts`) + console.log(`Found ${unmatchedCycles.length} cycles with mismatched originalTx counts`, unmatchedCycles) await downloadOriginalTxsDataByCycle(unmatchedCycles) } - - // Update the tracker after successful sync - originalTxTracker.updateLastSavedCycle(syncInfo.targetCycle) - console.log(`✅ OriginalTxs synchronized up to cycle ${syncInfo.targetCycle}`) } catch (error) { - console.error('Error syncing originalTxs:', error) + console.error('Error sync checking originalTxs:', newCycle, targetCycle, error) } + // Update the tracker after sync check + originalTxTracker.updateLastSavedCycle(targetCycle) + console.log(`✅ OriginalTxs synchronized up to cycle ${targetCycle}`) } /** @@ -692,41 +671,58 @@ export async function validateAndSyncOriginalTxs(newCycle: number): Promise { - const syncInfo = shouldSyncData(cycleTracker, newCycle) + const targetCycle = getSafeSyncCycle(newCycle) - if (!syncInfo.shouldSync) { + // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle + // This prevents trying to sync from cycle 0 on first run + if (cycleTracker.lastSavedCycle === 0) { + cycleTracker.lastSavedCycle = targetCycle + if (config.verbose) { + console.log( + `Initialized cycle sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})` + ) + } + } + + const startCycle = cycleTracker.lastSavedCycle + 1 + const shouldSyncCheck = targetCycle > cycleTracker.lastSavedCycle + + if (!shouldSyncCheck) { if (config.verbose) console.log('Cycles are already synchronized') return } - console.log( - `Syncing cycles from ${syncInfo.startCycle} to ${syncInfo.targetCycle} (current cycle: ${currentCycle})` - ) - - try { - await downloadCyclcesBetweenCycles(syncInfo.startCycle, syncInfo.targetCycle, true) + const isInSync = targetCycle === startCycle - // Update the tracker after successful sync - cycleTracker.updateLastSavedCycle(syncInfo.targetCycle) - console.log(`✅ Cycles synchronized up to cycle ${syncInfo.targetCycle}`) - } catch (error) { - console.error('Error syncing cycles:', error) + if (!isInSync) { + try { + console.log(`Syncing cycles from ${startCycle} to ${targetCycle} (current cycle: ${newCycle})`) + await downloadCyclcesBetweenCycles(startCycle, targetCycle, true) + } catch (error) { + console.error('Error syncing cycles:', error) + } + } else { + if (config.verbose) console.log('Cycles are already synchronized') } + + // Update the tracker after sync check + cycleTracker.updateLastSavedCycle(targetCycle) + console.log(`✅ Cycles synchronized up to cycle ${targetCycle}`) } /** * Check and synchronize all data types based on the current cycle * This should be called when new cycle data is inserted/updated - * @param currentCycle - The current cycle number from newly inserted/updated cycle + * @param newCycle - The current cycle number from newly inserted/updated cycle */ -export async function checkAndSyncDataByCycle(currentCycle: number): Promise { - if (config.verbose) console.log(`Checking and syncing data for cycle: ${currentCycle}`) +export async function checkAndSyncDataByCycle(newCycle: number): Promise { + if (config.verbose) console.log(`Checking and syncing data for cycle: ${newCycle}`) // Run all sync operations in parallel await Promise.all([ - validateAndSyncCycles(currentCycle), - validateAndSyncReceipts(currentCycle), - validateAndSyncOriginalTxs(currentCycle), + validateAndSyncCycles(newCycle), + validateAndSyncReceipts(newCycle), + validateAndSyncOriginalTxs(newCycle), ]) }