Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions src/class/DataSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ interface queryFromDistributorParameters {
endCycle?: number
}

// Sync tracker interface for data synchronization based on cycles
interface SyncTracker {
lastSavedCycle: number
updateLastSavedCycle(cycle: number): void
}

export const queryFromDistributor = async (
type: DataType,
queryParameters: queryFromDistributorParameters
Expand Down Expand Up @@ -71,6 +77,31 @@ export const queryFromDistributor = async (
}
}

// Tracker instances for data synchronization
export const cycleTracker: SyncTracker = {
lastSavedCycle: 0,
updateLastSavedCycle(cycle: number): void {
console.log(`Updating lastSavedCycle from ${this.lastSavedCycle} to ${cycle}`)
this.lastSavedCycle = cycle
},
}

export const receiptTracker: SyncTracker = {
lastSavedCycle: 0,
updateLastSavedCycle(cycle: number): void {
console.log(`Updating lastSavedReceiptCycle from ${this.lastSavedCycle} to ${cycle}`)
this.lastSavedCycle = cycle
},
}

export const originalTxTracker: SyncTracker = {
lastSavedCycle: 0,
updateLastSavedCycle(cycle: number): void {
console.log(`Updating lastSavedOriginalTxCycle from ${this.lastSavedCycle} to ${cycle}`)
this.lastSavedCycle = cycle
},
}

export async function compareWithOldReceiptsData(
lastStoredReceiptCycle = 0
): Promise<{ success: boolean; matchedCycle: number }> {
Expand Down Expand Up @@ -541,6 +572,160 @@ export async function downloadOriginalTxsDataByCycle(
}
}

/**
* Calculate the safe sync cycle with buffer
* If new cycle is 9, we can safely sync up to cycle 7 (9 - 2)
* This maintains a 1-cycle buffer for safety
* @param newCycle - The current/latest cycle number
* @returns The cycle number that is safe to sync up to
*/
export function getSafeSyncCycle(newCycle: number): number {
const CYCLE_BUFFER = 2
return Math.max(0, newCycle - CYCLE_BUFFER)
}
/**
* Validate and synchronize receipts data based on cycle tracker
* @param newCycle - The current/latest cycle number from incoming data
*/
export async function validateAndSyncReceipts(newCycle: number): Promise<void> {
const targetCycle = getSafeSyncCycle(newCycle)

// Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle
// This prevents trying to sync from cycle 0 on first run
if (receiptTracker.lastSavedCycle === 0) {
receiptTracker.lastSavedCycle = targetCycle
if (config.verbose) {
console.log(
`Initialized receipt sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})`
)
}
}

const startCycle = receiptTracker.lastSavedCycle + 1
const shouldSyncCheck = targetCycle > receiptTracker.lastSavedCycle

if (!shouldSyncCheck) {
if (config.verbose) console.log('Receipts are already synchronized')
return
}
try {
// Compare and validate receipts count between cycles
const unmatchedCycles = await compareReceiptsCountByCycles(startCycle, targetCycle)

if (unmatchedCycles && unmatchedCycles.length > 0) {
console.log(`Found ${unmatchedCycles.length} cycles with mismatched receipt counts`, unmatchedCycles)
await downloadReceiptsByCycle(unmatchedCycles)
}
} catch (error) {
console.error('Error sync checking receipts:', newCycle, targetCycle, error)
}
// Update the tracker after sync check
receiptTracker.updateLastSavedCycle(targetCycle)
console.log(`✅ Receipts synchronized up to cycle ${targetCycle}`)
}

/**
* Validate and synchronize original transactions data based on cycle tracker
* @param newCycle - The current/latest cycle number from incoming data
*/
export async function validateAndSyncOriginalTxs(newCycle: number): Promise<void> {
const targetCycle = getSafeSyncCycle(newCycle)

// Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle
// This prevents trying to sync from cycle 0 on first run
if (originalTxTracker.lastSavedCycle === 0) {
originalTxTracker.lastSavedCycle = targetCycle
if (config.verbose) {
console.log(
`Initialized originalTx sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})`
)
}
}

const startCycle = originalTxTracker.lastSavedCycle + 1
const shouldSyncCheck = targetCycle > originalTxTracker.lastSavedCycle

if (!shouldSyncCheck) {
if (config.verbose) console.log('OriginalTxs are already synchronized')
return
}
console.log(`Syncing originalTxs from cycle ${startCycle} to ${targetCycle} (current cycle: ${newCycle})`)
try {
// Compare and validate originalTxs count between cycles
const unmatchedCycles = await compareOriginalTxsCountByCycles(startCycle, targetCycle)

if (unmatchedCycles && unmatchedCycles.length > 0) {
console.log(`Found ${unmatchedCycles.length} cycles with mismatched originalTx counts`, unmatchedCycles)
await downloadOriginalTxsDataByCycle(unmatchedCycles)
}
} catch (error) {
console.error('Error sync checking originalTxs:', newCycle, targetCycle, error)
}
// Update the tracker after sync check
originalTxTracker.updateLastSavedCycle(targetCycle)
console.log(`✅ OriginalTxs synchronized up to cycle ${targetCycle}`)
}

/**
* Validate and synchronize cycle data based on cycle tracker
* @param newCycle - The current/latest cycle number from incoming data
*/
export async function validateAndSyncCycles(newCycle: number): Promise<void> {
const targetCycle = getSafeSyncCycle(newCycle)

// Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle
// This prevents trying to sync from cycle 0 on first run
if (cycleTracker.lastSavedCycle === 0) {
cycleTracker.lastSavedCycle = targetCycle
if (config.verbose) {
console.log(
`Initialized cycle sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})`
)
}
}

const startCycle = cycleTracker.lastSavedCycle + 1
const shouldSyncCheck = targetCycle > cycleTracker.lastSavedCycle

if (!shouldSyncCheck) {
if (config.verbose) console.log('Cycles are already synchronized')
return
}

const isInSync = targetCycle === startCycle

if (!isInSync) {
try {
console.log(`Syncing cycles from ${startCycle} to ${targetCycle} (current cycle: ${newCycle})`)
await downloadCyclcesBetweenCycles(startCycle, targetCycle, true)
} catch (error) {
console.error('Error syncing cycles:', error)
}
} else {
if (config.verbose) console.log('Cycles are already synchronized')
}

// Update the tracker after sync check
cycleTracker.updateLastSavedCycle(targetCycle)
console.log(`✅ Cycles synchronized up to cycle ${targetCycle}`)
}

/**
* Check and synchronize all data types based on the current cycle
* This should be called when new cycle data is inserted/updated
* @param newCycle - The current cycle number from newly inserted/updated cycle
*/
export async function checkAndSyncDataByCycle(newCycle: number): Promise<void> {
if (config.verbose) console.log(`Checking and syncing data for cycle: ${newCycle}`)

// Run all sync operations in parallel
await Promise.all([
validateAndSyncCycles(newCycle),
validateAndSyncReceipts(newCycle),
validateAndSyncOriginalTxs(newCycle),
])
}

export const downloadCyclcesBetweenCycles = async (
startCycle: number,
totalCyclesToSync: number,
Expand Down
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import { ValidatorStats } from './stats/validatorStats'
import { TransactionStats, convertBaseTxStatsAsArray } from './stats/transactionStats'
import { DailyTransactionStats } from './stats/dailyTransactionStats'
import { DailyAccountStats } from './stats/dailyAccountStats'
import { DailyCoinStats, DailyCoinStatsSummary } from './stats/dailyCoinStats'
import { DailyCoinStats } from './stats/dailyCoinStats'

if (config.env == envEnum.DEV) {
//default debug mode
Expand Down
16 changes: 10 additions & 6 deletions src/storage/cycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ import { config } from '../config/index'
import { cleanOldReceiptsMap } from './receipt'
import { cleanOldOriginalTxsMap } from './originalTxData'
import { Utils as StringUtils } from '@shardus/types'
import { checkAndSyncDataByCycle } from '../class/DataSync'

type DbCycle = Cycle & {
cycleRecord: string
}

const CYCLE_COLUMNS: readonly (keyof Cycle)[] = [
'cycleMarker',
'counter',
'start',
'cycleRecord',
] as const
const CYCLE_COLUMNS: readonly (keyof Cycle)[] = ['cycleMarker', 'counter', 'start', 'cycleRecord'] as const

export function isCycle(obj: Cycle): obj is Cycle {
return (obj as Cycle).cycleRecord !== undefined && (obj as Cycle).cycleMarker !== undefined
Expand Down Expand Up @@ -110,6 +106,14 @@ export async function insertOrUpdateCycle(cycle: Cycle): Promise<void> {
cleanOldReceiptsMap(CLEAN_UP_TIMESTMAP_MS)
cleanOldOriginalTxsMap(CLEAN_UP_TIMESTMAP_MS)
}

// Trigger cycle-based synchronization check when new cycle data is received
if (cycleInfo.counter > 0) {
// Run sync in background
checkAndSyncDataByCycle(cycleInfo.counter).catch((error) => {
console.error('Error in checkAndSyncDataByCycle:', error)
})
}
} else {
console.log('No cycleRecord or cycleMarker in cycle,', cycle)
}
Expand Down