diff --git a/package-lock.json b/package-lock.json index 6b8b23e..07d8fe3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -42,6 +42,7 @@ "next": "13.3.4", "node-cron": "3.0.2", "node-sass": "7.0.3", + "p-queue": "^9.0.0", "qs": "6.11.0", "react": "18.2.0", "react-dom": "18.2.0", @@ -10508,6 +10509,32 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-9.0.0.tgz", + "integrity": "sha512-KO1RyxstL9g1mK76530TExamZC/S2Glm080Nx8PE5sTd7nlduDQsAfEl4uXX+qZjLiwvDauvzXavufy3+rJ9zQ==", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^7.0.0" + }, + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-timeout": { + "version": "7.0.1", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-7.0.1.tgz", + "integrity": "sha512-AxTM2wDGORHGEkPCt8yqxOTMgpfbEHqF51f/5fJCmwFC3C/zNcGT63SymH2ttOAaiIws2zVg4+izQCjrakcwHg==", + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", @@ -14077,4 +14104,4 @@ } } } -} +} \ No newline at end of file diff --git a/package.json b/package.json index e8698db..c6b62b6 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,7 @@ "next": "13.3.4", "node-cron": "3.0.2", "node-sass": "7.0.3", + "p-queue": "^9.0.0", "qs": "6.11.0", "react": "18.2.0", "react-dom": "18.2.0", @@ -109,4 +110,4 @@ "publishConfig": { "access": "public" } -} +} \ No newline at end of file diff --git a/scripts/data_sync_checker.ts b/scripts/data_sync_checker.ts index 8d21656..78a471d 100644 --- a/scripts/data_sync_checker.ts +++ b/scripts/data_sync_checker.ts @@ -13,53 +13,201 @@ const endCycle = 0 const saveToFile = false const data_type: any = DataType.RECEIPT // DataType.RECEIPT // DataType.CYCLE // DataType.ORIGINALTX -const api_url = data_type === DataType.RECEIPT ? 'receipt' : data_type === DataType.CYCLE ? 'cycleinfo' : 'originalTx' +const api_url = + data_type === DataType.RECEIPT ? 'receipt' : data_type === DataType.CYCLE ? 'cycleinfo' : 'originalTx' -const runProgram = async (): Promise => { - const limit = 100 - let distributor_responses: any = [] - let api_responses: any = [] - let nextEnd = startCycle + limit - for (let i = startCycle; i < endCycle;) { - console.log(`Start Cycle ${i} End Cycle ${nextEnd}`) - const distributor_data = data_type === DataType.CYCLE ? { - start: i, - end: nextEnd - } : { - startCycle: i, - endCycle: nextEnd, - type: 'tally' +interface MismatchResult { + cycle: number + distributorCount: number + collectorCount: number +} + +interface TallyItem { + cycle: number + receipts?: number + originalTxsData?: number + originalTxs?: number +} + +const fetchBatch = async ( + cycleStart: number, + cycleEnd: number +): Promise<{ distributor: TallyItem[]; api: TallyItem[] }> => { + const distributor_data = + data_type === DataType.CYCLE + ? { + start: cycleStart, + end: cycleEnd, } - const api_data = data_type === DataType.CYCLE ? `?start=${i}&end=${nextEnd}` : `?startCycle=${i}&endCycle=${nextEnd}&tally=true` - - const res1 = await queryFromDistributor(data_type, distributor_data) - // console.log(res1.data) - - const res2 = await axios.get(`${API_SERVER_URL}/api/${api_url}${api_data}`) - // console.log(res2.data) - - switch (data_type) { - case DataType.RECEIPT: - distributor_responses = [...distributor_responses, ...res1.data.receipts] - api_responses = [...api_responses, ...res2.data.totalReceipts] - break - case DataType.CYCLE: - distributor_responses = [...distributor_responses, ...res1.data.cycleInfo] - api_responses = [...api_responses, ...res2.data.cycles] - break - case DataType.ORIGINALTX: - distributor_responses = [...distributor_responses, ...res1.data.originalTxs] - api_responses = [...api_responses, ...res2.data.totalOriginalTxs] - break + : { + startCycle: cycleStart, + endCycle: cycleEnd, + type: 'tally', } - i = nextEnd + 1 - nextEnd += limit + const api_data = + data_type === DataType.CYCLE + ? `?start=${cycleStart}&end=${cycleEnd}` + : `?startCycle=${cycleStart}&endCycle=${cycleEnd}&tally=true` + + const [res1, res2] = await Promise.all([ + queryFromDistributor(data_type, distributor_data), + axios.get(`${API_SERVER_URL}/api/${api_url}${api_data}`), + ]) + + let distributorData: TallyItem[] = [] + let apiData: TallyItem[] = [] + + switch (data_type) { + case DataType.RECEIPT: + distributorData = res1.data.receipts || [] + apiData = res2.data.totalReceipts || [] + break + case DataType.CYCLE: + distributorData = res1.data.cycleInfo || [] + apiData = res2.data.cycles || [] + break + case DataType.ORIGINALTX: + distributorData = res1.data.originalTxs || [] + apiData = res2.data.totalOriginalTxs || [] + break + } + + return { distributor: distributorData, api: apiData } +} + +const chunkArray = (array: T[], chunkSize: number): T[][] => { + const chunks: T[][] = [] + for (let i = 0; i < array.length; i += chunkSize) { + chunks.push(array.slice(i, i + chunkSize)) + } + return chunks +} + +const runProgram = async (): Promise => { + const limit = 100 + const concurrency = 100 + + const batches: Array<{ start: number; end: number }> = [] + + // Create batches without overlapping boundaries + let currentStart = startCycle + while (currentStart <= endCycle) { + const batchEnd = Math.min(currentStart + limit - 1, endCycle) + batches.push({ start: currentStart, end: batchEnd }) + currentStart = batchEnd + 1 + } + + console.log(`Fetching ${batches.length} batches in parallel (concurrency: ${concurrency})...`) + + // Process batches in chunks to limit concurrency + const batchChunks = chunkArray(batches, concurrency) + const allResults: Array<{ distributor: TallyItem[]; api: TallyItem[] }> = [] + + for (const chunk of batchChunks) { + console.log(`Processing ${chunk.length} batches in parallel...`) + const chunkResults = await Promise.all( + chunk.map((batch) => { + console.log(`Fetching cycles ${batch.start} to ${batch.end}`) + return fetchBatch(batch.start, batch.end) + }) + ) + allResults.push(...chunkResults) + } + + // Combine results + let distributor_responses: TallyItem[] = [] + let api_responses: TallyItem[] = [] + + for (const result of allResults) { + distributor_responses = [...distributor_responses, ...result.distributor] + api_responses = [...api_responses, ...result.api] + } + + console.log( + '\nDISTRIBUTOR RESPONSES:', + distributor_responses.length, + 'API SERVER RESPONSES:', + api_responses.length + ) + + // Compare and find mismatches + const mismatches: MismatchResult[] = [] + + if (data_type === DataType.RECEIPT || data_type === DataType.ORIGINALTX) { + // Create maps for easy lookup + const distributorMap = new Map() + const apiMap = new Map() + + for (const item of distributor_responses) { + const count = + data_type === DataType.RECEIPT + ? item.receipts ?? 0 + : data_type === DataType.ORIGINALTX + ? item.originalTxsData ?? item.originalTxs ?? 0 + : 0 + distributorMap.set(item.cycle, count) + } + + for (const item of api_responses) { + const count = + data_type === DataType.RECEIPT + ? item.receipts ?? 0 + : data_type === DataType.ORIGINALTX + ? item.originalTxsData ?? item.originalTxs ?? 0 + : 0 + apiMap.set(item.cycle, count) } - console.log('DISTRIBUTOR RESPONSES', distributor_responses.length, 'API SERVER RESPONSES', api_responses.length) - console.log(isDeepStrictEqual(distributor_responses, api_responses)) - // console.dir(distributor_responses, { depth: null }) - // console.dir(api_responses, { depth: null }) - // save to file + + // Find all unique cycles + const allCycles = new Set([...distributorMap.keys(), ...apiMap.keys()]) + + for (const cycle of allCycles) { + const distributorCount = distributorMap.get(cycle) || 0 + const apiCount = apiMap.get(cycle) || 0 + + if (distributorCount !== apiCount) { + mismatches.push({ + cycle, + distributorCount, + collectorCount: apiCount, + }) + } + } + + // Sort mismatches by cycle + mismatches.sort((a, b) => a.cycle - b.cycle) + } + + // Print mismatches + if (mismatches.length > 0) { + console.log(`\n${'='.repeat(70)}`) + console.log(`Found ${mismatches.length} mismatched cycles:`) + console.log(`${'='.repeat(70)}`) + console.log( + `${'Cycle'.padEnd(10)} | ${'Distributor'.padEnd(15)} | ${'Collector'.padEnd(15)} | ${'Difference'}` + ) + console.log(`${'-'.repeat(70)}`) + + for (const mismatch of mismatches) { + const diff = mismatch.collectorCount - mismatch.distributorCount + console.log( + `${String(mismatch.cycle).padEnd(10)} | ${String(mismatch.distributorCount).padEnd(15)} | ${String( + mismatch.collectorCount + ).padEnd(15)} | ${diff > 0 ? '+' : ''}${diff}` + ) + } + console.log(`${'='.repeat(70)}\n`) + } else { + console.log('\nāœ… No mismatches found! All cycles match.') + } + + // Deep comparison for cycles + if (data_type === DataType.CYCLE) { + const isEqual = isDeepStrictEqual(distributor_responses, api_responses) + console.log('\nDeep comparison result:', isEqual ? 'āœ… MATCH' : 'āŒ MISMATCH') + } + + // Save to file if (saveToFile) { writeFileSync( `distributor_${data_type}_${startCycle}_${endCycle}.json`, @@ -69,6 +217,7 @@ const runProgram = async (): Promise => { `api_server_${data_type}_${startCycle}_${endCycle}.json`, JSON.stringify(api_responses, null, 4) ) + console.log('\nšŸ“ Results saved to files') } } runProgram() diff --git a/scripts/distributor_tally_verifier.ts b/scripts/distributor_tally_verifier.ts new file mode 100644 index 0000000..be0f2a5 --- /dev/null +++ b/scripts/distributor_tally_verifier.ts @@ -0,0 +1,470 @@ +import axios from 'axios' +import * as crypto from '@shardus/crypto-utils' +import { config, DISTRIBUTOR_URL } from '../src/config' +import { queryFromDistributor, DataType } from '../src/class/DataSync' +crypto.init(config.hashKey) + +const startCycle = 0 +const endCycle = 0 + +// Choose data type to verify +const data_type: DataType = DataType.RECEIPT // DataType.RECEIPT or DataType.ORIGINALTX + +// Choose comparison mode: +// 'tally' - Compare tally endpoint vs cycle-based pagination +// 'full' - Compare full data endpoint vs cycle-based pagination +const comparisonMode: 'tally' | 'full' = 'tally' + +interface TallyItem { + cycle: number + receipts?: number + originalTxsData?: number + originalTxs?: number +} + +interface MismatchResult { + cycle: number + tallyCount: number + actualCount: number +} + +interface TransactionIdDetails { + cycle: number + fullDataIds: string[] + cycleBasedIds: string[] +} + +/** + * Fetch tally counts from distributor (aggregated counts per cycle) + */ +const fetchTallyCounts = async ( + cycleStart: number, + cycleEnd: number +): Promise> => { + const tallyMap = new Map() + + const response = await queryFromDistributor(data_type, { + startCycle: cycleStart, + endCycle: cycleEnd, + type: 'tally', + }) + + if (!response?.data) { + console.warn(`No tally data returned for cycles ${cycleStart}-${cycleEnd}`) + return tallyMap + } + + const tallyData: TallyItem[] = + data_type === DataType.RECEIPT ? response.data.receipts || [] : response.data.originalTxs || [] + + for (const item of tallyData) { + const count = + data_type === DataType.RECEIPT + ? item.receipts ?? 0 + : item.originalTxsData ?? item.originalTxs ?? 0 + tallyMap.set(item.cycle, count) + } + + return tallyMap +} + +/** + * Fetch full data from distributor without tally (fetches actual records and counts them) + * Uses pagination to fetch all data across multiple pages + */ +const fetchFullDataCounts = async ( + cycleStart: number, + cycleEnd: number +): Promise<{ counts: Map; ids: Map }> => { + const countsMap = new Map() + const idsMap = new Map() + + let page = 1 + let hasMorePages = true + const maxLimit = config.requestLimits.MAX_RECEIPTS_PER_REQUEST + + while (hasMorePages) { + const response = await queryFromDistributor(data_type, { + startCycle: cycleStart, + endCycle: cycleEnd, + page: page, + // No 'type: tally' - fetch actual data + }) + + if (!response?.data) { + console.warn(`No data returned for cycles ${cycleStart}-${cycleEnd} page ${page}`) + break + } + + const items = + data_type === DataType.RECEIPT ? response.data.receipts || [] : response.data.originalTxs || [] + + if (items.length === 0) { + break // No more data + } + + // Count items per cycle and collect IDs + for (const item of items) { + const cycle = item.cycle + const txId = data_type === DataType.RECEIPT ? item.receiptId : item.txId + + countsMap.set(cycle, (countsMap.get(cycle) || 0) + 1) + + if (!idsMap.has(cycle)) { + idsMap.set(cycle, []) + } + idsMap.get(cycle)!.push(txId) + } + + console.log( + `Fetched page ${page} for cycles ${cycleStart}-${cycleEnd}: ${items.length} items, total cycles tracked: ${countsMap.size}` + ) + + // Check if we need to fetch more pages + if (items.length < maxLimit) { + hasMorePages = false + } else { + page++ + } + } + + return { counts: countsMap, ids: idsMap } +} + +/** + * Fetch actual data from distributor using cycle-based pagination + * (Same method used in ParallelDataSync) + */ +const fetchActualDataCounts = async ( + cycleStart: number, + cycleEnd: number +): Promise<{ counts: Map; ids: Map }> => { + const actualCountsMap = new Map() + const actualIdsMap = new Map() + + let currentCycle = cycleStart + let afterTimestamp = 0 + let afterTxId = '' + const limit = config.requestLimits.MAX_RECEIPTS_PER_REQUEST + + const url = + data_type === DataType.RECEIPT + ? `${DISTRIBUTOR_URL}/receipt/cycle` + : `${DISTRIBUTOR_URL}/originalTx/cycle` + + while (currentCycle <= cycleEnd) { + const requestData = { + startCycle: currentCycle, + endCycle: cycleEnd, + afterTimestamp, + afterTxId, + limit, + sender: config.collectorInfo.publicKey, + sign: undefined, + } + + crypto.signObj(requestData, config.collectorInfo.secretKey, config.collectorInfo.publicKey) + + const response = await axios.post(url, requestData) + + const items = + data_type === DataType.RECEIPT + ? response.data?.receipts || [] + : response.data?.originalTxs || [] + + if (items.length === 0) { + break // No more data + } + + // Count items per cycle and collect IDs + for (const item of items) { + const cycle = item.cycle + const txId = data_type === DataType.RECEIPT ? item.receiptId : item.txId + + actualCountsMap.set(cycle, (actualCountsMap.get(cycle) || 0) + 1) + + if (!actualIdsMap.has(cycle)) { + actualIdsMap.set(cycle, []) + } + actualIdsMap.get(cycle)!.push(txId) + } + + // Update pagination cursors + const lastItem = items[items.length - 1] + currentCycle = lastItem.cycle + afterTimestamp = lastItem.timestamp + afterTxId = data_type === DataType.RECEIPT ? lastItem.receiptId : lastItem.txId + + console.log( + `Fetched ${items.length} items, last in cycle ${currentCycle}, total cycles tracked: ${actualCountsMap.size}` + ) + + // If we got less than limit, we've exhausted the range + if (items.length < limit) { + break + } + } + + return { counts: actualCountsMap, ids: actualIdsMap } +} + +const chunkArray = (array: T[], chunkSize: number): T[][] => { + const chunks: T[][] = [] + for (let i = 0; i < array.length; i += chunkSize) { + chunks.push(array.slice(i, i + chunkSize)) + } + return chunks +} + +const runProgram = async (): Promise => { + const limit = 100 + const concurrency = 10 + + const batches: Array<{ start: number; end: number }> = [] + + // Create batches without overlapping boundaries + let currentStart = startCycle + while (currentStart <= endCycle) { + const batchEnd = Math.min(currentStart + limit - 1, endCycle) + batches.push({ start: currentStart, end: batchEnd }) + currentStart = batchEnd + 1 + } + + const dataTypeName = data_type === DataType.RECEIPT ? 'Receipts' : 'OriginalTxs' + const modeName = + comparisonMode === 'tally' + ? 'Tally vs Cycle-Based Pagination' + : 'Full Data vs Cycle-Based Pagination' + + console.log(`\n${'='.repeat(70)}`) + console.log(`Distributor Verifier - ${dataTypeName}`) + console.log(`${'='.repeat(70)}`) + console.log(`Comparison Mode: ${modeName}`) + console.log(`Cycle Range: ${startCycle} to ${endCycle}`) + console.log(`Batches: ${batches.length}`) + console.log(`Concurrency: ${concurrency}`) + console.log(`${'='.repeat(70)}\n`) + + const batchChunks = chunkArray(batches, concurrency) + + // Step 1: Fetch first dataset (tally or full data) + const firstDataLabel = comparisonMode === 'tally' ? 'tally' : 'full data' + console.log(`Fetching ${firstDataLabel} counts from distributor...`) + + const firstDataCountsMap = new Map() + const firstDataIdsMap = new Map() + + if (comparisonMode === 'tally') { + // Tally mode: only fetch counts (no IDs available) + const tallyMaps: Map[] = [] + for (const chunk of batchChunks) { + const chunkResults = await Promise.all( + chunk.map((batch) => { + console.log(`Fetching ${firstDataLabel} for cycles ${batch.start} to ${batch.end}`) + return fetchTallyCounts(batch.start, batch.end) + }) + ) + tallyMaps.push(...chunkResults) + } + // Merge tally counts + for (const map of tallyMaps) { + for (const [cycle, count] of map.entries()) { + firstDataCountsMap.set(cycle, (firstDataCountsMap.get(cycle) || 0) + count) + } + } + } else { + // Full data mode: fetch counts and IDs + const fullDataResults: Array<{ counts: Map; ids: Map }> = [] + for (const chunk of batchChunks) { + const chunkResults = await Promise.all( + chunk.map((batch) => { + console.log(`Fetching ${firstDataLabel} for cycles ${batch.start} to ${batch.end}`) + return fetchFullDataCounts(batch.start, batch.end) + }) + ) + fullDataResults.push(...chunkResults) + } + // Merge full data counts and IDs + for (const result of fullDataResults) { + for (const [cycle, count] of result.counts.entries()) { + firstDataCountsMap.set(cycle, (firstDataCountsMap.get(cycle) || 0) + count) + } + for (const [cycle, ids] of result.ids.entries()) { + if (!firstDataIdsMap.has(cycle)) { + firstDataIdsMap.set(cycle, []) + } + firstDataIdsMap.get(cycle)!.push(...ids) + } + } + } + + console.log(`\n${firstDataLabel} counts fetched: ${firstDataCountsMap.size} cycles\n`) + + // Step 2: Fetch cycle-based pagination data (always with IDs) + console.log('Fetching data using cycle-based pagination...') + const cycleBasedResults: Array<{ counts: Map; ids: Map }> = [] + + for (const chunk of batchChunks) { + const chunkResults = await Promise.all( + chunk.map((batch) => { + console.log(`Fetching cycle-based data for cycles ${batch.start} to ${batch.end}`) + return fetchActualDataCounts(batch.start, batch.end) + }) + ) + cycleBasedResults.push(...chunkResults) + } + + // Merge cycle-based counts and IDs + const cycleBasedCountsMap = new Map() + const cycleBasedIdsMap = new Map() + + for (const result of cycleBasedResults) { + for (const [cycle, count] of result.counts.entries()) { + cycleBasedCountsMap.set(cycle, (cycleBasedCountsMap.get(cycle) || 0) + count) + } + for (const [cycle, ids] of result.ids.entries()) { + if (!cycleBasedIdsMap.has(cycle)) { + cycleBasedIdsMap.set(cycle, []) + } + cycleBasedIdsMap.get(cycle)!.push(...ids) + } + } + + console.log(`\nCycle-based pagination counts fetched: ${cycleBasedCountsMap.size} cycles\n`) + + // Compare first dataset vs cycle-based counts + const mismatches: MismatchResult[] = [] + const allCycles = new Set([...firstDataCountsMap.keys(), ...cycleBasedCountsMap.keys()]) + + for (const cycle of allCycles) { + const firstDataCount = firstDataCountsMap.get(cycle) || 0 + const cycleBasedCount = cycleBasedCountsMap.get(cycle) || 0 + + if (firstDataCount !== cycleBasedCount) { + mismatches.push({ + cycle, + tallyCount: firstDataCount, + actualCount: cycleBasedCount, + }) + } + } + + // Sort mismatches by cycle + mismatches.sort((a, b) => a.cycle - b.cycle) + + // Print results + const firstColumnLabel = comparisonMode === 'tally' ? 'Tally Count' : 'Full Data Count' + const secondColumnLabel = 'Cycle-Based Count' + + console.log(`\n${'='.repeat(70)}`) + console.log(`Verification Results - ${dataTypeName}`) + console.log(`${'='.repeat(70)}`) + console.log(`Comparison Mode: ${modeName}`) + console.log(`Total cycles checked: ${allCycles.size}`) + console.log(`Cycles with ${firstDataLabel} data: ${firstDataCountsMap.size}`) + console.log(`Cycles with cycle-based data: ${cycleBasedCountsMap.size}`) + console.log(`Mismatches found: ${mismatches.length}`) + console.log(`${'='.repeat(70)}\n`) + + if (mismatches.length > 0) { + console.log(`\n${'='.repeat(70)}`) + console.log(`Mismatched Cycles:`) + console.log(`${'='.repeat(70)}`) + console.log( + `${'Cycle'.padEnd(10)} | ${firstColumnLabel.padEnd(18)} | ${secondColumnLabel.padEnd(18)} | ${'Difference'}` + ) + console.log(`${'-'.repeat(70)}`) + + for (const mismatch of mismatches) { + const diff = mismatch.actualCount - mismatch.tallyCount + console.log( + `${String(mismatch.cycle).padEnd(10)} | ${String(mismatch.tallyCount).padEnd(18)} | ${String( + mismatch.actualCount + ).padEnd(18)} | ${diff > 0 ? '+' : ''}${diff}` + ) + } + console.log(`${'='.repeat(70)}\n`) + } else { + console.log(`āœ… All cycles match! ${firstDataLabel} and cycle-based data are consistent.\n`) + } + + // Calculate total counts + let totalFirstData = 0 + let totalCycleBased = 0 + for (const count of firstDataCountsMap.values()) { + totalFirstData += count + } + for (const count of cycleBasedCountsMap.values()) { + totalCycleBased += count + } + + console.log(`Total ${dataTypeName} from ${firstDataLabel}: ${totalFirstData}`) + console.log(`Total ${dataTypeName} from cycle-based: ${totalCycleBased}`) + console.log(`Difference: ${totalCycleBased - totalFirstData}`) + + // Display transaction IDs for mismatched cycles (if available in full mode) + if (mismatches.length > 0 && firstDataIdsMap.size > 0) { + console.log(`\n${'='.repeat(70)}`) + console.log(`Transaction IDs for Mismatched Cycles:`) + console.log(`${'='.repeat(70)}\n`) + + for (const mismatch of mismatches.slice(0, 10)) { + // Show first 10 mismatches + console.log(`Cycle ${mismatch.cycle}:`) + + const fullDataIds = firstDataIdsMap.get(mismatch.cycle) || [] + const cycleBasedIds = cycleBasedIdsMap.get(mismatch.cycle) || [] + + console.log(` Full Data IDs (${fullDataIds.length}):`) + if (fullDataIds.length > 0) { + fullDataIds.slice(0, 5).forEach((id) => console.log(` - ${id}`)) + if (fullDataIds.length > 5) { + console.log(` ... and ${fullDataIds.length - 5} more`) + } + } else { + console.log(` (none)`) + } + + console.log(` Cycle-Based IDs (${cycleBasedIds.length}):`) + if (cycleBasedIds.length > 0) { + cycleBasedIds.slice(0, 5).forEach((id) => console.log(` - ${id}`)) + if (cycleBasedIds.length > 5) { + console.log(` ... and ${cycleBasedIds.length - 5} more`) + } + } else { + console.log(` (none)`) + } + + // Find IDs that are in one set but not the other + const fullDataSet = new Set(fullDataIds) + const cycleBasedSet = new Set(cycleBasedIds) + + const onlyInFullData = fullDataIds.filter((id) => !cycleBasedSet.has(id)) + const onlyInCycleBased = cycleBasedIds.filter((id) => !fullDataSet.has(id)) + + if (onlyInFullData.length > 0) { + console.log(` Only in Full Data (${onlyInFullData.length}):`) + onlyInFullData.slice(0, 3).forEach((id) => console.log(` - ${id}`)) + if (onlyInFullData.length > 3) { + console.log(` ... and ${onlyInFullData.length - 3} more`) + } + } + + if (onlyInCycleBased.length > 0) { + console.log(` Only in Cycle-Based (${onlyInCycleBased.length}):`) + onlyInCycleBased.slice(0, 3).forEach((id) => console.log(` - ${id}`)) + if (onlyInCycleBased.length > 3) { + console.log(` ... and ${onlyInCycleBased.length - 3} more`) + } + } + + console.log() + } + + if (mismatches.length > 10) { + console.log(`... and ${mismatches.length - 10} more mismatched cycles\n`) + } + } +} + +runProgram() diff --git a/src/class/DataSync.ts b/src/class/DataSync.ts index d6f4de5..4ad76cf 100644 --- a/src/class/DataSync.ts +++ b/src/class/DataSync.ts @@ -15,6 +15,7 @@ export enum DataType { } interface queryFromDistributorParameters { + count?: number start?: number end?: number page?: number @@ -64,6 +65,7 @@ export const queryFromDistributor = async ( const response = await axios.post(url, data, { headers: { 'Content-Type': 'application/json', + 'Accept-Encoding': 'gzip, deflate', // Request compressed responses }, timeout: 45000, transformResponse: (res) => { diff --git a/src/class/DataSyncManager.ts b/src/class/DataSyncManager.ts new file mode 100644 index 0000000..e8077f3 --- /dev/null +++ b/src/class/DataSyncManager.ts @@ -0,0 +1,809 @@ +import { P2P } from '@shardus/types' +import { CycleDB, ReceiptDB, OriginalTxDataDB, AccountDB, TransactionDB } from '../storage' +import { CycleGap } from '../storage/cycle' +import { config } from '../config' +import { queryFromDistributor, DataType, downloadAndSyncGenesisAccounts } from './DataSync' +import { ParallelDataSync } from './ParallelDataSync' + +/** + * Represents a cycle with mismatched transaction data + */ +export interface MismatchedCycle { + cycle: number + localReceipts: number + distributorReceipts: number + localOriginalTxs: number + distributorOriginalTxs: number + receiptsMismatch: boolean + originalTxsMismatch: boolean +} + +/** + * Comprehensive recovery plan for data synchronization + */ +export interface DataSyncRecoveryPlan { + currentDistributorCycle: number + lastLocalCycle: number + missingCycleRanges: CycleGap[] + mismatchedCycles: MismatchedCycle[] + lookbackVerificationRanges: CycleGap[] + totalMissingCycles: number + totalMismatchedCycles: number + recoveryNeeded: boolean +} + +/** + * DataSyncManager + * + * Orchestrates intelligent data synchronization with automatic gap detection and recovery. + * + * Key Features: + * - Anomaly detection: Validates data integrity before sync + * - Gap identification: Detects missing cycle ranges in local database + * - Data reconciliation: Compares local vs distributor data + * - Recovery orchestration: Patches gaps and mismatched cycles + * - Intelligent routing: Fresh start vs resume from interruption + * + * Example Scenario: + * 1. Parallel sync stops at cycle 150000 (target was 300000) + * 2. WebSocket saves incremental data from 300001 to 300100 + * 3. Process restarts at cycle 300105 + * + * Manager identifies and recovers: + * - Missing range: 150000 to 300001 (parallel sync interruption) + * - Missing range: 300100 to 300105 (websocket gap during restart) + * - Mismatched data in lookback window (e.g., 149900-150000) + * + * Handles multiple interruption points automatically. + */ +export class DataSyncManager { + private lookbackCycles: number + + constructor() { + console.log('\n') + console.log('='.repeat(60)) + console.log('INITIALIZING DATA SYNC MANAGER') + console.log('='.repeat(60)) + console.log('DataSyncManager provides intelligent data synchronization with:') + console.log(' • Early data anomaly detection before sync operations') + console.log(' • Automatic gap detection and recovery') + console.log(' • Lookback verification window for data integrity') + console.log(' • Parallel multi-cycle-based sync (10x+ performance improvement)') + console.log('='.repeat(60)) + console.log('\n') + + // Calculate lookback window: cyclesPerBatch * parallelSyncConcurrency + const cyclesPerBatch = config.cyclesPerBatch || 10 + const concurrency = config.parallelSyncConcurrency || 10 + this.lookbackCycles = cyclesPerBatch * concurrency + + console.log(`DataSyncManager initialized with lookback window: ${this.lookbackCycles} cycles`) + } + + /** + * Main entry point for intelligent data synchronization + * Handles both fresh start and recovery from interruptions + */ + async syncData(): Promise { + const latestDistributorCycle = await this.getLatestCycleFromDistributor() + if (!latestDistributorCycle) { + throw new Error('Failed to fetch latest cycle from distributor') + } + const lastLocalCycles = await CycleDB.queryLatestCycleRecords(1) + const lastLocalCycle = lastLocalCycles.length > 0 ? lastLocalCycles[0].counter : -1 + + // Check if this is a fresh start + const isFreshStart = lastLocalCycle === -1 || lastLocalCycle === 0 + + if (isFreshStart) { + // Fresh start - no checkpoint needed, just sync from beginning + console.log('šŸ†• Fresh start detected - syncing from cycle 0') + // Always sync genesis accounts first + console.log('Syncing genesis accounts...') + await downloadAndSyncGenesisAccounts() + + const parallelDataSync = new ParallelDataSync({ + concurrency: config.parallelSyncConcurrency, + cyclesPerBatch: config.cyclesPerBatch, + retryAttempts: config.syncRetryAttempts, + enablePrefetch: config.enablePrefetch, + }) + + const cycleBatches = await parallelDataSync.createCycleBatches(0, latestDistributorCycle) + + await parallelDataSync.startSyncing(cycleBatches) + + // Print final database summary + await this.printSyncSummary() + } else { + // Existing data - use DataSyncManager to identify and patch gaps/mismatches + console.log('šŸ“Š Existing data detected - running recovery analysis') + const recoveryPlan = await this.generateRecoveryPlan(latestDistributorCycle) + + // Execute the complete sync (recovery + normal sync) + await this.executeSyncWithRecovery(recoveryPlan) + } + } + + /** + * Detect data anomalies by verifying last 10-15 cycles against distributor + * Throws error if critical anomalies are found + * Fetches local cycle data internally + */ + async detectDataAnomalies(): Promise { + // Fetch local cycle data + const lastLocalCycles = await CycleDB.queryLatestCycleRecords(1) + const lastLocalCycle = lastLocalCycles.length > 0 ? lastLocalCycles[0].counter : -1 + if (lastLocalCycle === -1) { + console.log('No local data found, skipping anomaly detection') + return + } + + console.log('\nšŸ“Š Running data anomaly detection...') + + const currentDistributorCycle = await this.getLatestCycleFromDistributor() + if (!currentDistributorCycle) { + throw new Error('Failed to fetch latest cycle from distributor') + } + + console.log(`Last local cycle: ${lastLocalCycle}`) + console.log(`Current distributor cycle: ${currentDistributorCycle}`) + + // Anomaly 1: Local DB has more cycles than distributor + if (lastLocalCycle > currentDistributorCycle) { + throw new Error( + `Local DB has newer cycle than distributor (Local: ${lastLocalCycle}, Distributor: ${currentDistributorCycle})` + ) + } + + const verificationCycles = 15 + + // Anomaly 2: Verify last 15 cycles match with distributor + let startCycle = lastLocalCycle - verificationCycles + 1 + if (startCycle < 0) { + startCycle = 0 + } + const endCycle = lastLocalCycle + + console.log( + `Verifying last ${verificationCycles} cycles (${startCycle} to ${endCycle}) against distributor...` + ) + + try { + // Compare cycles data + console.log('\nComparing cycles data...') + console.log('CycleNumber', 'Local-Marker', ' Distributor-Marker') + const localCycles = await CycleDB.queryCycleRecordsBetween(startCycle, endCycle) + const distributorResponse = await queryFromDistributor(DataType.CYCLE, { + start: startCycle, + end: endCycle, + }) + + if (distributorResponse?.data?.cycleInfo) { + const distributorCycles = distributorResponse.data.cycleInfo + + // Verify each cycle's marker matches + for (let i = 0; i < localCycles.length; i++) { + /* eslint-disable security/detect-object-injection */ + const localCycle = localCycles[i] + /* eslint-enable security/detect-object-injection */ + const distributorCycle = distributorCycles.find( + (c: { counter: number; marker: string }) => c.counter === localCycle.counter + ) + console.log(localCycle.counter, localCycle.cycleMarker, distributorCycle?.marker) + if (!distributorCycle) { + throw new Error(`Cycle ${localCycle.counter} exists locally but not in distributor`) + } else if (localCycle.cycleMarker !== distributorCycle.marker) { + throw new Error( + `Cycle ${localCycle.counter} marker mismatch: ` + + `Local ${localCycle.cycleMarker} vs Distributor ${distributorCycle.marker}` + ) + } + } + } + + // Compare receipts count + console.log('\nComparing receipts count...') + console.log('CycleNumber', 'Local-Receipts', 'Distributor-Receipts') + const receiptsResponse = await queryFromDistributor(DataType.RECEIPT, { + startCycle, + endCycle, + type: 'tally', + }) + + if (receiptsResponse?.data?.receipts) { + const distributorReceipts: { cycle: number; receipts: number }[] = receiptsResponse.data.receipts + const localReceiptsCount = await ReceiptDB.queryReceiptCountByCycles(startCycle, endCycle) + for (const distReceipt of distributorReceipts) { + const localReceipt = localReceiptsCount.find((r) => r.cycle === distReceipt.cycle) + console.log(distReceipt.cycle, localReceipt?.receipts, distReceipt.receipts) + if (localReceipt && localReceipt.receipts > distReceipt.receipts) { + throw new Error( + `Receipts count in local DB has more in cycle ${distReceipt.cycle}: ` + + `Local has ${localReceipt.receipts}, Distributor has ${distReceipt.receipts}` + ) + } + } + } + + // Compare originalTxs count + console.log('\nComparing originalTxs count...') + console.log('CycleNumber', 'Local-OriginalTxs', 'Distributor-OriginalTxs') + const originalTxsResponse = await queryFromDistributor(DataType.ORIGINALTX, { + startCycle, + endCycle, + type: 'tally', + }) + + if (originalTxsResponse?.data?.originalTxs) { + const distributorOriginalTxs: { cycle: number; originalTxsData: number }[] = + originalTxsResponse.data.originalTxs + const localOriginalTxsCount = await OriginalTxDataDB.queryOriginalTxDataCountByCycles( + startCycle, + endCycle + ) + + for (const distTx of distributorOriginalTxs) { + const localTx = localOriginalTxsCount.find((t) => t.cycle === distTx.cycle) + console.log(distTx.cycle, localTx?.originalTxsData, distTx.originalTxsData) + if (localTx && localTx.originalTxsData > distTx.originalTxsData) { + throw new Error( + `OriginalTxs count mismatch in cycle ${distTx.cycle}: ` + + `Local has ${localTx.originalTxsData}, Distributor has ${distTx.originalTxsData}` + ) + } + } + } + console.log('āœ… No data anomalies detected') + } catch (error) { + throw Error( + `Data anomalies detected! Local database may be corrupted or out of sync. ` + + `Please patch the database or clear the database and restart the server. ` + + `Error: ${error}` + ) + } + + console.log('āœ… Data anomaly check passed - proceeding with sync') + } + + /** + * Fetch latest cycle from distributor + */ + private async getLatestCycleFromDistributor(): Promise { + const response: { data: { cycleInfo: P2P.CycleCreatorTypes.CycleRecord[] } } = await queryFromDistributor( + DataType.CYCLE, + { count: 1 } + ) + if (!response?.data || response.data?.cycleInfo?.[0]?.counter === undefined) { + return null + } + return response.data.cycleInfo[0].counter + } + + /** + * Fetch total data count from distributor + */ + private async getTotalDataFromDistributor(): Promise<{ + totalCycles: number + totalAccounts: number + totalReceipts: number + totalOriginalTxs: number + } | null> { + const response = await queryFromDistributor(DataType.TOTALDATA, {}) + if (!response?.data || response.data.totalCycles === undefined) { + return null + } + return response.data + } + + /** + * Identify all missing cycle ranges by finding gaps in the cycles DB + * Uses efficient LEFT JOIN-based SQL query to find ranges directly - O(N) complexity + * + * Example: + * - DB has cycles: 0-149999, 300001-300099, 300106-300200 + * - Missing ranges: 150000-300000, 300100-300105 + * - Returns gaps: [{150000, 300000}, {300100, 300105}] + */ + private async identifyMissingCycleRanges(targetCycle: number): Promise { + try { + console.log(`\n${'='.repeat(60)}`) + console.log(`Identifying missing cycle ranges up to cycle ${targetCycle}`) + console.log(`${'='.repeat(60)}`) + + // Get missing cycle ranges directly from SQL using LEFT JOIN + const gaps = await CycleDB.queryMissingCycleRanges(targetCycle) + + // Handle case where no cycles exist in DB + if (gaps.length === 0) { + const cycleCount = await CycleDB.queryCycleCount() + if (cycleCount === 0) { + // No cycles in DB, entire range is missing + console.log('No cycles found in DB, entire range is missing') + return [ + { + startCycle: 0, + endCycle: targetCycle, + gapSize: targetCycle + 1, + }, + ] + } else { + // All cycles present + console.log('āœ… No missing cycles - database is complete up to target cycle') + return [] + } + } + + // Log results + console.log(`\nTotal gaps found: ${gaps.length}`) + for (const gap of gaps) { + console.log(` Gap: ${gap.startCycle} to ${gap.endCycle} (${gap.gapSize} cycles)`) + } + const totalMissing = gaps.reduce((sum, gap) => sum + gap.gapSize, 0) + console.log(`Total missing cycles: ${totalMissing}`) + + return gaps + } catch (error) { + console.error('Error identifying missing cycle ranges:', error) + throw error + } + } + + /** + * Verify data integrity with lookback window before each gap + * + * For each gap, check cyclesPerBatch * parallelSyncConcurrency cycles before the gap + * to ensure transaction data matches the distributor. + * + * Example: Gap at 150000, lookback 100 cycles -> verify 149900-150000 + */ + private async verifyDataIntegrityWithLookback(gaps: CycleGap[]): Promise { + try { + console.log(`\n${'='.repeat(60)}`) + console.log(`Verifying data integrity with ${this.lookbackCycles}-cycle lookback window`) + console.log(`${'='.repeat(60)}`) + + const allMismatchedCycles: MismatchedCycle[] = [] + const verificationRanges: CycleGap[] = [] + + // Build verification ranges for each gap + for (const gap of gaps) { + const lookbackStart = Math.max(0, gap.startCycle - this.lookbackCycles) + const lookbackEnd = gap.startCycle - 1 + + // Only verify if there's a valid lookback range + if (lookbackEnd >= lookbackStart && lookbackEnd >= 0) { + verificationRanges.push({ + startCycle: lookbackStart, + endCycle: lookbackEnd, + gapSize: lookbackEnd - lookbackStart + 1, + }) + console.log( + `Verification range for gap at ${gap.startCycle}: cycles ${lookbackStart}-${lookbackEnd}` + ) + } + } + + // Deduplicate overlapping verification ranges + const mergedRanges = this.mergeOverlappingRanges(verificationRanges) + console.log(`Merged into ${mergedRanges.length} verification ranges`) + + // Verify each range + for (const range of mergedRanges) { + console.log(`\nVerifying cycles ${range.startCycle} to ${range.endCycle}...`) + + const mismatched = await this.compareCycleDataWithDistributor(range.startCycle, range.endCycle) + allMismatchedCycles.push(...mismatched) + } + + if (allMismatchedCycles.length > 0) { + console.log(`\nāš ļø Found ${allMismatchedCycles.length} cycles with mismatched data:`) + for (const mismatch of allMismatchedCycles) { + console.log( + ` Cycle ${mismatch.cycle}: ` + + `Receipts (local: ${mismatch.localReceipts}, distributor: ${mismatch.distributorReceipts}), ` + + `OriginalTxs (local: ${mismatch.localOriginalTxs}, distributor: ${mismatch.distributorOriginalTxs})` + ) + } + } else { + console.log(`\nāœ… All verified cycles have matching data`) + } + + return allMismatchedCycles + } catch (error) { + console.error('Error verifying data integrity:', error) + throw error + } + } + + /** + * Compare cycle data counts between local DB and distributor + * Queries in batches to respect MAX_CYCLES_PER_REQUEST limit + */ + private async compareCycleDataWithDistributor( + startCycle: number, + endCycle: number + ): Promise { + const mismatched: MismatchedCycle[] = [] + + try { + // Split into batches if range is larger than max allowed + const batches: { start: number; end: number }[] = [] + for (let i = startCycle; i <= endCycle; i += config.requestLimits.MAX_CYCLES_PER_REQUEST) { + const batchEnd = Math.min(i + config.requestLimits.MAX_CYCLES_PER_REQUEST, endCycle) + batches.push({ start: i, end: batchEnd }) + } + + // Fetch all distributor data in batches + const allDistributorReceipts: { cycle: number; receipts: number }[] = [] + const allDistributorOriginalTxs: { cycle: number; originalTxsData: number }[] = [] + + for (const batch of batches) { + const [receiptsResponse, originalTxsResponse] = await Promise.all([ + queryFromDistributor(DataType.RECEIPT, { + startCycle: batch.start, + endCycle: batch.end, + type: 'tally', + }), + queryFromDistributor(DataType.ORIGINALTX, { + startCycle: batch.start, + endCycle: batch.end, + type: 'tally', + }), + ]) + + if (receiptsResponse?.data?.receipts) { + allDistributorReceipts.push(...receiptsResponse.data.receipts) + } + if (originalTxsResponse?.data?.originalTxs) { + allDistributorOriginalTxs.push(...originalTxsResponse.data.originalTxs) + } + } + + // Sort distributor data by cycle + allDistributorReceipts.sort((a, b) => a.cycle - b.cycle) + allDistributorOriginalTxs.sort((a, b) => a.cycle - b.cycle) + + // Fetch counts from local DB (single query for entire range) + const [localReceipts, localOriginalTxs] = await Promise.all([ + ReceiptDB.queryReceiptCountByCycles(startCycle, endCycle), + OriginalTxDataDB.queryOriginalTxDataCountByCycles(startCycle, endCycle), + ]) + + console.log( + `Comparing cycles ${startCycle} to ${endCycle} with ${allDistributorReceipts.length} distributor receipts and ${allDistributorOriginalTxs.length} distributor originalTxs` + ) + + for (let cycle = startCycle; cycle <= endCycle; cycle++) { + const distReceipts = allDistributorReceipts.find((r) => r.cycle === cycle)?.receipts || 0 + const distOriginalTxs = allDistributorOriginalTxs.find((t) => t.cycle === cycle)?.originalTxsData || 0 + + const localReceiptsCount = localReceipts.find((r) => r.cycle === cycle)?.receipts || 0 + const localOriginalTxsCount = localOriginalTxs.find((t) => t.cycle === cycle)?.originalTxsData || 0 + + const receiptsMismatch = localReceiptsCount !== distReceipts + const originalTxsMismatch = localOriginalTxsCount !== distOriginalTxs + + if (receiptsMismatch || originalTxsMismatch) { + mismatched.push({ + cycle, + localReceipts: localReceiptsCount, + distributorReceipts: distReceipts, + localOriginalTxs: localOriginalTxsCount, + distributorOriginalTxs: distOriginalTxs, + receiptsMismatch, + originalTxsMismatch, + }) + } + } + + return mismatched + } catch (error) { + console.error(`Error comparing data for cycles ${startCycle}-${endCycle}:`, error) + return mismatched + } + } + + /** + * Merge overlapping or adjacent ranges to minimize API calls + */ + private mergeOverlappingRanges(ranges: CycleGap[]): CycleGap[] { + if (ranges.length === 0) return [] + + // Sort by start cycle + const sorted = [...ranges].sort((a, b) => a.startCycle - b.startCycle) + const merged: CycleGap[] = [sorted[0]] + + for (let i = 1; i < sorted.length; i++) { + const current = sorted[i] + const last = merged[merged.length - 1] + + // If current range overlaps or is adjacent to last range, merge them + if (current.startCycle <= last.endCycle + 1) { + last.endCycle = Math.max(last.endCycle, current.endCycle) + last.gapSize = last.endCycle - last.startCycle + 1 + } else { + merged.push(current) + } + } + + return merged + } + + /** + * Generate comprehensive recovery plan + * + * Orchestrates gap detection and data verification to create a complete recovery strategy. + * NOTE: This should only be called when there's existing data in DB (not fresh start) + */ + async generateRecoveryPlan(currentDistributorCycle: number): Promise { + try { + const lastLocalCycles = await CycleDB.queryLatestCycleRecords(1) + const lastLocalCycle = lastLocalCycles.length > 0 ? lastLocalCycles[0].counter : -1 + + console.log(`\n${'='.repeat(70)}`) + console.log(`GENERATING DATA SYNC RECOVERY PLAN`) + console.log(`${'='.repeat(70)}`) + console.log(`Current distributor cycle: ${currentDistributorCycle}`) + console.log(`Last local cycle: ${lastLocalCycle}`) + + // Step 1: Identify missing cycle ranges + const missingCycleRanges = await this.identifyMissingCycleRanges(currentDistributorCycle) + + // Step 2: Verify data integrity with lookback (only if there are gaps) + const mismatchedCycles = + missingCycleRanges.length > 0 ? await this.verifyDataIntegrityWithLookback(missingCycleRanges) : [] + + // Calculate lookback ranges for reporting + const lookbackVerificationRanges: CycleGap[] = [] + for (const gap of missingCycleRanges) { + const lookbackStart = Math.max(0, gap.startCycle - this.lookbackCycles) + const lookbackEnd = gap.startCycle - 1 + if (lookbackEnd >= lookbackStart && lookbackEnd >= 0) { + lookbackVerificationRanges.push({ + startCycle: lookbackStart, + endCycle: lookbackEnd, + gapSize: lookbackEnd - lookbackStart + 1, + }) + } + } + + const totalMissingCycles = missingCycleRanges.reduce((sum, gap) => sum + gap.gapSize, 0) + const recoveryNeeded = missingCycleRanges.length > 0 || mismatchedCycles.length > 0 + + const plan: DataSyncRecoveryPlan = { + currentDistributorCycle, + lastLocalCycle, + missingCycleRanges, + mismatchedCycles, + lookbackVerificationRanges, + totalMissingCycles, + totalMismatchedCycles: mismatchedCycles.length, + recoveryNeeded, + } + + this.printRecoveryPlan(plan) + + return plan + } catch (error) { + console.error('Error generating recovery plan:', error) + throw error + } + } + + /** + * Execute comprehensive sync with recovery + * + * Combines all sync needs (mismatched cycles + missing ranges) and uses ParallelDataSync + * for everything. No distinction between "patching" and "syncing" - both use the same mechanism. + */ + async executeSyncWithRecovery(recoveryPlan: DataSyncRecoveryPlan): Promise { + console.log(`\n${'='.repeat(70)}`) + console.log(`EXECUTING DATA SYNC WITH RECOVERY`) + console.log(`${'='.repeat(70)}`) + + try { + // Combine mismatched cycles and missing ranges into unified sync plan + const allRangesToSync: CycleGap[] = [] + + // Step 1: Add mismatched cycles (convert to ranges) + if (recoveryPlan.mismatchedCycles.length > 0) { + console.log(`\nšŸ“ Identified ${recoveryPlan.mismatchedCycles.length} mismatched cycles to patch`) + const patchRanges = this.groupCyclesIntoRanges(recoveryPlan.mismatchedCycles.map((m) => m.cycle)) + allRangesToSync.push(...patchRanges) + } + + // Step 2: Add missing cycle ranges + if (recoveryPlan.missingCycleRanges.length > 0) { + console.log(`\nšŸ“„ Identified ${recoveryPlan.missingCycleRanges.length} missing cycle ranges to sync`) + allRangesToSync.push(...recoveryPlan.missingCycleRanges) + } + + // Step 3: Merge and deduplicate ranges + const mergedRanges = this.mergeOverlappingRanges(allRangesToSync) + console.log(`\nMerged into ${mergedRanges.length} sync ranges`) + + // Step 4: Execute ParallelDataSync for all ranges + if (mergedRanges.length > 0) { + console.log('\nšŸ“” Starting data sync with recovery plan') + + const parallelDataSync = new ParallelDataSync({ + concurrency: config.parallelSyncConcurrency, + cyclesPerBatch: config.cyclesPerBatch, + retryAttempts: config.syncRetryAttempts, + enablePrefetch: config.enablePrefetch, + }) + + const cycleBatches = [] + // For each range, create cycle batches and merge them into one + console.log('\nPreparing cycle batches for the following ranges:') + for (const range of mergedRanges) { + console.log(` - range: ${range.startCycle} to ${range.endCycle} (${range.gapSize} cycles)`) + const cycleBatch = parallelDataSync.createCycleBatches(range.startCycle, range.endCycle) + cycleBatches.push(...cycleBatch) + } + + await parallelDataSync.startSyncing(cycleBatches) + + console.log('\nāœ… Data sync with recovery completed successfully') + } else { + console.log('\nāœ… No data to sync, database is up to date') + } + + console.log(`\n${'='.repeat(70)}`) + console.log(`āœ… DATA SYNC COMPLETED SUCCESSFULLY`) + console.log(`${'='.repeat(70)}\n`) + + // Print final database summary + await this.printSyncSummary() + } catch (error) { + console.error('Error executing sync with recovery:', error) + throw error + } + } + + /** + * Group individual cycles into consecutive ranges + */ + private groupCyclesIntoRanges(cycles: number[]): CycleGap[] { + if (cycles.length === 0) return [] + + const sorted = [...cycles].sort((a, b) => a - b) + const ranges: CycleGap[] = [] + let rangeStart = sorted[0] + let rangeEnd = sorted[0] + + for (let i = 1; i < sorted.length; i++) { + if (sorted[i] === rangeEnd + 1) { + // Consecutive cycle, extend range + rangeEnd = sorted[i] + } else { + // Gap found, save current range and start new one + ranges.push({ + startCycle: rangeStart, + endCycle: rangeEnd, + gapSize: rangeEnd - rangeStart + 1, + }) + rangeStart = sorted[i] + rangeEnd = sorted[i] + } + } + + // Add last range + ranges.push({ + startCycle: rangeStart, + endCycle: rangeEnd, + gapSize: rangeEnd - rangeStart + 1, + }) + + return ranges + } + + /** + * Print recovery plan summary + */ + private printRecoveryPlan(plan: DataSyncRecoveryPlan): void { + console.log(`\n${'='.repeat(70)}`) + console.log(`RECOVERY PLAN SUMMARY`) + console.log(`${'='.repeat(70)}`) + console.log(`Current distributor cycle: ${plan.currentDistributorCycle}`) + console.log(`Last local cycle: ${plan.lastLocalCycle}`) + console.log(`Recovery needed: ${plan.recoveryNeeded ? 'āš ļø YES' : 'āœ… NO'}`) + console.log(``) + console.log(`Missing Cycle Ranges: ${plan.missingCycleRanges.length}`) + console.log(`Total missing cycles: ${plan.totalMissingCycles}`) + if (plan.missingCycleRanges.length > 0) { + for (const gap of plan.missingCycleRanges) { + console.log(` - Cycles ${gap.startCycle} to ${gap.endCycle} (${gap.gapSize} cycles)`) + } + } + console.log(``) + console.log(`Mismatched Cycles: ${plan.totalMismatchedCycles}`) + if (plan.mismatchedCycles.length > 0) { + for (const mismatch of plan.mismatchedCycles.slice(0, 10)) { + // Show first 10 + console.log( + ` - Cycle ${mismatch.cycle}: ` + + `Receipts ${mismatch.localReceipts}→${mismatch.distributorReceipts}, ` + + `OriginalTxs ${mismatch.localOriginalTxs}→${mismatch.distributorOriginalTxs}` + ) + } + if (plan.mismatchedCycles.length > 10) { + console.log(` ... and ${plan.mismatchedCycles.length - 10} more`) + } + } + console.log(``) + console.log(`Lookback Verification:`) + for (const range of plan.lookbackVerificationRanges) { + console.log(` - Verified cycles ${range.startCycle} to ${range.endCycle}`) + } + console.log(`${'='.repeat(70)}\n`) + } + + /** + * Get overall sync statistics from database + */ + async getSyncStats(): Promise<{ + totalCycles: number + totalAccounts: number + totalReceipts: number + totalOriginalTxs: number + totalTransactions: number + }> { + try { + const [cycleCount, accountCount, receiptCount, originalTxCount, transactionCount] = await Promise.all([ + CycleDB.queryCycleCount(), + AccountDB.queryAccountCount(), + ReceiptDB.queryReceiptCount(), + OriginalTxDataDB.queryOriginalTxDataCount(), + TransactionDB.queryTransactionCount(), + ]) + + return { + totalCycles: cycleCount || 0, + totalAccounts: accountCount || 0, + totalReceipts: receiptCount || 0, + totalOriginalTxs: originalTxCount || 0, + totalTransactions: transactionCount || 0, + } + } catch (error) { + console.error('Error getting sync stats:', error) + return { + totalCycles: 0, + totalAccounts: 0, + totalReceipts: 0, + totalOriginalTxs: 0, + totalTransactions: 0, + } + } + } + + /** + * Print sync summary + */ + async printSyncSummary(): Promise { + const stats = await this.getSyncStats() + const distributorData = await this.getTotalDataFromDistributor() + + console.log('='.repeat(60)) + console.log('Sync Summary:') + console.log('\nLocal Database:') + console.log(` Total Cycles: ${stats.totalCycles}`) + console.log(` Total Accounts: ${stats.totalAccounts}`) + console.log(` Total Receipts: ${stats.totalReceipts}`) + console.log(` Total OriginalTxs: ${stats.totalOriginalTxs}`) + console.log(` Total Transactions: ${stats.totalTransactions}`) + + if (distributorData) { + console.log('\nDistributor:') + console.log(` Total Cycles: ${distributorData.totalCycles}`) + console.log(` Total Accounts: ${distributorData.totalAccounts}`) + console.log(` Total Receipts: ${distributorData.totalReceipts}`) + console.log(` Total OriginalTxs: ${distributorData.totalOriginalTxs}`) + } else { + console.log('\nDistributor: Failed to fetch data') + } + + console.log('='.repeat(60)) + } +} diff --git a/src/class/ParallelDataSync.ts b/src/class/ParallelDataSync.ts new file mode 100644 index 0000000..1af311f --- /dev/null +++ b/src/class/ParallelDataSync.ts @@ -0,0 +1,1082 @@ +import PQueue from 'p-queue' +import * as crypto from '@shardus/crypto-utils' +import { P2P, Utils as StringUtils } from '@shardus/types' +import { config, DISTRIBUTOR_URL } from '../config' +import { DataType } from './DataSync' +import { + CycleDB, + ReceiptDB, + OriginalTxDataDB, + receiptDatabase, + originalTxDataDatabase, + cycleDatabase, +} from '../storage' +import { Cycle, Receipt, OriginalTxData } from '../types' +import axios, { AxiosInstance } from 'axios' +import http from 'http' +import https from 'https' +import { useManualCheckPoint, checkpointWAL } from '../storage/sqlite3storage' + +// For Debugging Purpose - Set to false to skip processing data and saving to DB +const processData = true + +const DESERIALIZE_RECEIPTS_CHUNK_SIZE = 20 // Number of receipts to deserialize at a time + +/** + * Configuration for parallel sync + */ +export interface ParallelSyncConfig { + concurrency: number // Number of parallel workers + retryAttempts: number // Retry failed requests + retryDelayMs: number // Delay between retries + cyclesPerBatch: number // Number of cycles to batch together (default: 10) + enablePrefetch: boolean // Enable prefetching (default: true) + prefetchDepth: number // Number of batches to prefetch ahead (default: 1) +} + +/** + * Statistics for sync operation + */ +export interface SyncStats { + startTime: number + endTime?: number + totalCyclesToSync: number + completedCycles: number + totalCycles: number + totalReceipts: number + totalOriginalTxs: number + errors: number +} + +/** + * Response size metadata attached by transformResponse and interceptor + */ +interface ResponseSizeMetadata { + decompressedBytes: number + decompressedKB: string + compressedBytes?: number + compressedKB?: string + compressionRatio?: number + compressionSavings?: string +} + +interface ResponseDataWithMetadata { + __responseSize?: ResponseSizeMetadata + __networkElapsed?: number + _deserializedTime?: number + [key: string]: unknown +} + +/** + * Sync receipts and originalTxs data by cycle range with timestamp pagination + * Uses both timestamp and ID to handle timestamp collisions and prevent data loss + */ +export interface SyncTxDataByCycleRange { + startCycle: number + endCycle: number + afterTimestamp?: number + afterTxId?: string // receiptId or txId + limit?: number +} + +/** + * Parallel sync orchestrator using cycle-based partitioning with timestamp + txId pagination + * Implements the optimal sync strategy with: + * - Cycle-level parallelization + * - Composite cursor (timestamp + txId ) to prevent data loss + * - Work queue for load balancing + */ +export class ParallelDataSync { + private queue: PQueue + private syncConfig: ParallelSyncConfig + private stats: SyncStats + private httpAgent: http.Agent + private httpsAgent: https.Agent + private axiosInstance: AxiosInstance + + // Accumulation buffers for batching DB writes - only write when threshold is reached + private receiptBuffer: Receipt[] = [] + private originalTxBuffer: OriginalTxData[] = [] + private cycleBuffer: Cycle[] = [] + private readonly ACCUMULATION_THRESHOLD = 1000 // Write to DB when buffer reaches this size + + // Mutex locks to prevent concurrent buffer access (race conditions) + private receiptBufferLock = false + private originalTxBufferLock = false + private cycleBufferLock = false + + // WAL checkpoint tracking + private flushCount = 0 // Total number of buffer flushes + private readonly CHECKPOINT_FREQUENCY = 10 // Run WAL checkpoint every N flushes to prevent WAL from growing too large + + constructor(syncConfig?: Partial) { + this.syncConfig = { + concurrency: syncConfig?.concurrency || config.parallelSyncConcurrency || 5, + cyclesPerBatch: syncConfig?.cyclesPerBatch || config.cyclesPerBatch || 100, + retryAttempts: syncConfig?.retryAttempts || config.syncRetryAttempts || 5, + retryDelayMs: syncConfig?.retryDelayMs || 1000, + enablePrefetch: syncConfig?.enablePrefetch ?? config.enablePrefetch ?? true, + prefetchDepth: syncConfig?.prefetchDepth || 1, + } + + // Create HTTP agents with keep-alive to reuse connections + this.httpAgent = new http.Agent({ + keepAlive: true, + keepAliveMsecs: 30000, + maxSockets: this.syncConfig.concurrency * 2, + maxFreeSockets: this.syncConfig.concurrency, + }) + + this.httpsAgent = new https.Agent({ + keepAlive: true, + keepAliveMsecs: 30000, + maxSockets: this.syncConfig.concurrency * 2, + maxFreeSockets: this.syncConfig.concurrency, + }) + + // Create axios instance with keep-alive agents and custom JSON serialization with timing + this.axiosInstance = axios.create({ + httpAgent: this.httpAgent, + httpsAgent: this.httpsAgent, + timeout: 45000, + headers: { + 'Content-Type': 'application/json', + 'Accept-Encoding': 'gzip, deflate', // Request compressed responses + }, + transformRequest: [ + (data) => { + // Use custom stringify for request body + const startTime = Date.now() + const result = StringUtils.safeStringify(data) + const elapsed = Date.now() - startTime + if (config.verbose && elapsed > 10) { + console.log( + `[Client] Request stringify: ${elapsed}ms, size: ${(result.length / 1024).toFixed(2)}KB` + ) + } + return result + }, + ], + transformResponse: [ + (res) => { + // Use custom parse for response with timing + const startTime = Date.now() + const result = typeof res === 'string' ? StringUtils.safeJsonParse(res) : res + const deserializedTime = Date.now() - startTime + + // Calculate decompressed size from raw response string + const decompressedBytes = typeof res === 'string' ? Buffer.byteLength(res) : 0 + const sizeKB = (decompressedBytes / 1024).toFixed(2) + + // Attach size metadata to result for later use + if (result && typeof result === 'object') { + Object.defineProperty(result, '__responseSize', { + value: { + decompressedBytes, + decompressedKB: sizeKB, + }, + enumerable: false, // Hidden from JSON.stringify and iteration + configurable: true, + }) + // Attach deserialization time + ;(result as ResponseDataWithMetadata)._deserializedTime = deserializedTime + } + + if (config.verbose && deserializedTime > 50) { + console.log(`[Client] Response deserialization: ${deserializedTime}ms, size: ${sizeKB}KB`) + } + return result + }, + ], + }) + + // Add response interceptor to capture compressed size from socket bytesRead + this.axiosInstance.interceptors.response.use( + (response) => { + // Get Content-Length header for fallback + const contentLength = response.headers['content-length'] + + // Get socket from the request object + const socket = response.request?.socket + + let compressedBytes: number | undefined + + // Try to calculate compressed size from socket bytesRead (most accurate) + // We track cumulative bytesRead on the socket across requests (due to keep-alive) + if (socket && typeof socket.bytesRead === 'number') { + const currentBytesRead = socket.bytesRead + const lastBytesRead = (socket as { _lastBytesRead?: number })._lastBytesRead + + if (lastBytesRead !== undefined) { + const rawBytes = currentBytesRead - lastBytesRead + + // Subtract estimated header size (HTTP response headers + status line) + // Typical: "HTTP/1.1 200 OK\r\n" + headers + "\r\n\r\n" ā‰ˆ 200-400 bytes + const estimatedHeaderSize = 250 + if (rawBytes > estimatedHeaderSize) { + compressedBytes = rawBytes - estimatedHeaderSize + } + } + + // Update last bytesRead for next request on this socket + ;(socket as { _lastBytesRead?: number })._lastBytesRead = currentBytesRead + } + + // Fallback: Use Content-Length header if socket method didn't work + if (!compressedBytes && contentLength) { + compressedBytes = parseInt(contentLength, 10) + } + + // Get existing metadata from transformResponse + const existingMetadata = (response.data as ResponseDataWithMetadata)?.__responseSize + + // Merge compressed size with existing decompressed size metadata + if (existingMetadata && response.data && typeof response.data === 'object') { + const decompressedBytes = existingMetadata.decompressedBytes + + // Calculate compression metrics if both sizes are available + const compressionRatio = + compressedBytes && decompressedBytes > 0 + ? +(compressedBytes / decompressedBytes).toFixed(3) + : undefined + + const compressionSavings = + compressionRatio && compressionRatio < 1 + ? `${((1 - compressionRatio) * 100).toFixed(1)}%` + : undefined + + // Update the metadata with compressed size info + Object.defineProperty(response.data, '__responseSize', { + value: { + ...existingMetadata, + compressedBytes, + compressedKB: compressedBytes ? (compressedBytes / 1024).toFixed(2) : undefined, + compressionRatio, + compressionSavings, + }, + enumerable: false, + configurable: true, + }) + } + + return response + }, + (error) => Promise.reject(error) + ) + + // Add interval between tasks to prevent overwhelming the distributor + this.queue = new PQueue({ + concurrency: this.syncConfig.concurrency, + interval: 100, // 100ms between batches + intervalCap: this.syncConfig.concurrency, + }) + + this.stats = { + startTime: Date.now(), + totalCyclesToSync: 0, + completedCycles: 0, + totalCycles: 0, + totalReceipts: 0, + totalOriginalTxs: 0, + errors: 0, + } + + console.log( + `Parallel Sync initialized:` + + ` concurrency=${this.syncConfig.concurrency},` + + ` cyclesPerBatch=${this.syncConfig.cyclesPerBatch},` + + ` prefetch=${this.syncConfig.enablePrefetch ? 'enabled' : 'disabled'},` + + ` retryAttempts=${this.syncConfig.retryAttempts}` + ) + } + + /** + * Creates batches of cycles for parallel processing. + * This is a preparatory step before calling startSyncing, which expects these batches. + * @param startCycle The starting cycle number. + * @param endCycle The ending cycle number. + * @returns An array of cycle batches, each with a start and end cycle. + */ + public createCycleBatches( + startCycle: number, + endCycle: number + ): { startCycle: number; endCycle: number }[] { + const cycleBatches: { startCycle: number; endCycle: number }[] = [] + + for (let i = startCycle; i <= endCycle; i += this.syncConfig.cyclesPerBatch) { + const batchEndCycle = Math.min(i + this.syncConfig.cyclesPerBatch - 1, endCycle) + cycleBatches.push({ startCycle: i, endCycle: batchEndCycle }) + } + + return cycleBatches + } + + /** + * Main entry point for parallel sync + */ + async startSyncing(cycleBatches: { startCycle: number; endCycle: number }[]): Promise { + if (!cycleBatches || cycleBatches.length === 0) { + console.log('No cycle batches provided for syncing.') + return + } + + const startCycle = cycleBatches[0].startCycle + const endCycle = cycleBatches[cycleBatches.length - 1].endCycle + + console.log(`\n${'='.repeat(60)}`) + console.log(`Starting Parallel Cycle Sync: ${startCycle} → ${endCycle}`) + console.log(`Concurrency: ${this.syncConfig.concurrency} workers`) + console.log(`${'='.repeat(60)}\n`) + + this.stats.startTime = Date.now() + this.stats.totalCyclesToSync = endCycle - startCycle + 1 + + try { + console.log( + `Syncing ${cycleBatches.length} cycle batches created with ${this.syncConfig.cyclesPerBatch} cycles per batch` + ) + + // Three-phase approach for optimal performance: + // Phase 1: Use main queue (concurrency: 5) for parallel API fetching + // Phase 2: Buffer data in memory until ACCUMULATION_THRESHOLD (1000) is reached + // Phase 3: DB writes are batched and serialized via write queue + // This combines parallel I/O with batched, serialized DB writes to minimize contention + const tasks = cycleBatches.map((batch) => + this.queue.add(() => this.syncDataByCycleRange(batch.startCycle, batch.endCycle)) + ) + + console.log(`Waiting for ${tasks.length} tasks to complete...`) + + // Wait for all tasks to complete (even if some fail) + const results = await Promise.allSettled(tasks) + + console.log('All tasks completed, flushing remaining buffers...') + + // Flush any remaining buffered data to database + await this.flushAllBuffers() + + this.stats.endTime = Date.now() + + // Count successful and failed tasks + const successful = results.filter((r) => r.status === 'fulfilled').length + const failed = results.filter((r) => r.status === 'rejected').length + + console.log(`Tasks completed: ${successful} successful, ${failed} failed`) + + // Log failed task errors + if (failed > 0) { + console.error(`\n${failed} tasks failed with errors:`) + results.forEach((result, index) => { + if (result.status === 'rejected') { + const batch = cycleBatches[index] + console.error( + ` Batch ${index} (cycles ${batch.startCycle}-${batch.endCycle}): ${ + result.reason?.message || result.reason + }` + ) + } + }) + } + + console.log('Printing summary...') + // Summary + await this.printSummary(startCycle, endCycle) + + console.log('Summary printed successfully') + + // Throw if there were any failures so the caller knows sync was incomplete + if (failed > 0) { + throw new Error(`Parallel sync completed with ${failed} failed batches out of ${tasks.length} total`) + } + } catch (error) { + console.error('Fatal error in parallel sync:', error) + this.stats.errors++ + // Try to flush buffers even on error to preserve data + try { + await this.flushAllBuffers() + } catch (flushError) { + console.error('Error flushing buffers during error handling:', flushError) + } + throw error + } + } + + /** + * Sync data in parallel using adaptive multi-cycle fetching with prefetching on endpoints + * Adaptively handles partial cycle completion (e.g., if requesting cycles 1-10 but only get data from 1-5, then sends next request for 5-10) + */ + private async syncDataByCycleRange(startCycle: number, endCycle: number): Promise { + // Sync all data types in parallel with individual error tracking + const results = await Promise.allSettled([ + this.syncCycleRecordsByCycleRange(startCycle, endCycle), + this.syncReceiptsByCycleRange(startCycle, endCycle), + this.syncOriginalTxsByCycleRange(startCycle, endCycle), + ]) + + const dataTypes = ['Cycle Records', 'Receipts', 'OriginalTxs'] + const failedTypes: string[] = [] + const errors: unknown[] = [] + + results.forEach((result, index) => { + if (result.status === 'rejected') { + failedTypes.push(dataTypes[index]) + errors.push(result.reason) + } + }) + + if (failedTypes.length > 0) { + console.error( + `Error syncing cycle batch ${startCycle}-${endCycle}: Failed data types: ${failedTypes.join(', ')}` + ) + errors.forEach((error, index) => { + const errorMessage = error instanceof Error ? error.message : String(error) + console.error(` ${failedTypes[index]}: ${errorMessage}`) + }) + this.stats.errors++ + throw new Error( + `Failed to sync ${ + failedTypes.length + } data type(s) for batch ${startCycle}-${endCycle}: ${failedTypes.join(', ')}` + ) + } + + this.stats.completedCycles += endCycle - startCycle + 1 + + const progress = ((this.stats.completedCycles / this.stats.totalCyclesToSync) * 100).toFixed(1) + console.log( + `Progress: ${this.stats.completedCycles}/${this.stats.totalCyclesToSync} cycles (${progress}%) [batch: ${startCycle}-${endCycle}]` + ) + } + + /** + * Sync cycle records across a batch of cycles using multi-cycle fetching + */ + private async syncCycleRecordsByCycleRange(startCycle: number, endCycle: number): Promise { + try { + const response = await this.fetchDataFromDistributor( + DataType.CYCLE, + startCycle, + endCycle, + this.signData({ start: startCycle, end: endCycle }) + ) + + const cycles = response?.data?.cycleInfo || [] + + // Get size metadata from transformResponse and interceptor + const sizeMetadata = (response.data as ResponseDataWithMetadata)?.__responseSize + const decompressedKB = sizeMetadata?.decompressedKB || '0.00' + const compressedKB = sizeMetadata?.compressedKB + const compressionRatio = sizeMetadata?.compressionRatio + const compressionSavings = sizeMetadata?.compressionSavings + const networkElapsed = (response.data as ResponseDataWithMetadata)?.__networkElapsed || 0 + const deserializedTime = (response.data as ResponseDataWithMetadata)?._deserializedTime || 0 + + if (config.verbose || networkElapsed > 1000) { + // Build log message with compression info if available + let logMessage = + `[API Timing] Cycle Records fetch (cycles ${startCycle}-${endCycle}): ${networkElapsed}ms, ` + + `deserialization: ${deserializedTime}ms, ` + + `records: ${cycles.length}` + + // Only show compression metrics if compression actually reduced the size (ratio < 1) + if (compressedKB !== undefined && compressionRatio !== undefined && compressionRatio < 1) { + logMessage += `, payload: ${compressedKB}KB, payloadUncompressed: ${decompressedKB}KB, ratio: ${compressionRatio}, savings: ${compressionSavings}` + } else { + // No compression or not effective, just show uncompressed size + logMessage += `, payload: ${decompressedKB}KB` + } + + logMessage += + (cycles.length === 0 && response.data ? ', response.data exists but empty' : '') + + (!response.data ? ', response.data is null/undefined!' : '') + + console.log(logMessage) + } + + if (!response || !response.data || !response.data.cycleInfo) { + console.error(`Error fetching cycle records for cycle batch ${startCycle}-${endCycle}:`, response) + return // Couldn't fetch any cycles + } + + if (cycles.length === 0) { + return // No more originalTxs in this cycle range + } + const cycleRecords = cycles.map((cycleRecord: Cycle['cycleRecord']) => ({ + counter: cycleRecord.counter, + cycleRecord, + start: cycleRecord.start, + cycleMarker: cycleRecord.marker, + })) + + // Add cycles to buffer - will flush to DB when buffer reaches threshold + await this.addToBuffer('cycle', cycleRecords) + + // Update stats + this.stats.totalCycles += cycleRecords.length + + if (config.verbose) { + console.log(`[Cycles ${startCycle}-${endCycle}] Cycle Records: +${cycleRecords.length}`) + } + } catch (error) { + console.error(`Error fetching cycle records for cycle batch ${startCycle}-${endCycle}:`, error) + throw error + } + } + + /** + * Sync receipts across a batch of cycles using adaptive multi-cycle fetching with prefetching + * Adaptively handles partial cycle completion (e.g., if requesting cycles 1-10 but only get data from 1-5, then sends next request for 5-10) + */ + private async syncReceiptsByCycleRange(startCycle: number, endCycle: number): Promise { + let currentCycle = startCycle + let afterTimestamp = 0 + let afterTxId = '' + let totalFetched = 0 + + const route = `receipt/cycle` + + // Prefetch: Start fetching first batch immediately + let nextFetchPromise: Promise | null = this.syncConfig.enablePrefetch + ? this.fetchDataFromDistributor( + route, + currentCycle, + endCycle, + this.signData({ + startCycle: currentCycle, + endCycle, + afterTimestamp, + afterTxId, + limit: config.requestLimits.MAX_RECEIPTS_PER_REQUEST, + }) + ) + : null + + while (currentCycle <= endCycle) { + try { + // Get the data (either from prefetch or fetch now) + const response = nextFetchPromise + ? await nextFetchPromise + : await this.fetchDataFromDistributor( + route, + currentCycle, + endCycle, + this.signData({ + startCycle: currentCycle, + endCycle, + afterTimestamp, + afterTxId, + limit: config.requestLimits.MAX_RECEIPTS_PER_REQUEST, + }) + ) + + const receipts = response?.data?.receipts || [] + + // Get size metadata from transformResponse and interceptor + const sizeMetadata = (response.data as ResponseDataWithMetadata)?.__responseSize + const decompressedKB = sizeMetadata?.decompressedKB || '0.00' + const compressedKB = sizeMetadata?.compressedKB + const compressionRatio = sizeMetadata?.compressionRatio + const compressionSavings = sizeMetadata?.compressionSavings + const networkElapsed = (response.data as ResponseDataWithMetadata)?.__networkElapsed || 0 + const deserializedTime = (response.data as ResponseDataWithMetadata)?._deserializedTime || 0 + + if (config.verbose || networkElapsed > 1000) { + // Build log message with compression info if available + let logMessage = + `[API Timing] Receipts fetch (cycles ${startCycle}-${endCycle}): ${networkElapsed}ms, ` + + `deserialization: ${deserializedTime}ms, ` + + `records: ${receipts.length}` + + // Only show compression metrics if compression actually reduced the size (ratio < 1) + if (compressedKB !== undefined && compressionRatio !== undefined && compressionRatio < 1) { + logMessage += `, payload: ${compressedKB}KB, payloadUncompressed: ${decompressedKB}KB, ratio: ${compressionRatio}, savings: ${compressionSavings}` + } else { + // No compression or not effective, just show uncompressed size + logMessage += `, payload: ${decompressedKB}KB` + } + + logMessage += + (receipts.length === 0 && response.data ? ', response.data exists but empty' : '') + + (!response.data ? ', response.data is null/undefined!' : '') + + console.log(logMessage) + } + + if (!response || !response.data || !response.data.receipts) { + console.error(`Error fetching receipts for cycle batch ${startCycle}-${endCycle}:`, response) + break // Couldn't fetch any receipts + } + + if (receipts.length === 0) { + break // No more originalTxs in this cycle range + } + + // Update after timestamp and txId based on last receipt BEFORE starting next fetch + const lastReceipt = receipts[receipts.length - 1] + currentCycle = lastReceipt.cycle + afterTimestamp = lastReceipt.timestamp + afterTxId = lastReceipt.receiptId + + // Prefetch next batch while processing current batch + if ( + this.syncConfig.enablePrefetch && + receipts.length >= config.requestLimits.MAX_RECEIPTS_PER_REQUEST + ) { + nextFetchPromise = this.fetchDataFromDistributor( + route, + currentCycle, + endCycle, + this.signData({ + startCycle: currentCycle, + endCycle, + afterTimestamp, + afterTxId, + limit: config.requestLimits.MAX_RECEIPTS_PER_REQUEST, + }) + ) + } else { + nextFetchPromise = null + } + + // Add receipts to buffer - will flush to DB when buffer reaches threshold + await this.addToBuffer('receipt', receipts) + + totalFetched += receipts.length + this.stats.totalReceipts += receipts.length + + if (config.verbose) { + console.log( + `[Cycles ${startCycle}-${endCycle}] Receipts: +${receipts.length} (total: ${totalFetched}), ` + + `last in cycle ${currentCycle}` + + (this.syncConfig.enablePrefetch ? ' [prefetch]' : '') + ) + } + + // If we got less than the max receipts size, we've exhausted this cycle range + if (receipts.length < config.requestLimits.MAX_RECEIPTS_PER_REQUEST) { + break + } + } catch (error) { + console.error(`Error fetching receipts for cycle batch ${startCycle}-${endCycle}:`, error) + throw error + } + } + } + + /** + * Sync originalTxs across a batch of cycles using adaptive multi-cycle fetching with prefetching + * Adaptively handles partial cycle completion (e.g., if requesting cycles 1-10 but only get data from 1-5, then sends next request for 5-10) + */ + private async syncOriginalTxsByCycleRange(startCycle: number, endCycle: number): Promise { + let currentCycle = startCycle + let afterTimestamp = 0 + let afterTxId = '' + let totalFetched = 0 + + const route = `originalTx/cycle` + + // Prefetch: Start fetching first batch immediately + let nextFetchPromise: Promise | null = this.syncConfig.enablePrefetch + ? this.fetchDataFromDistributor( + route, + currentCycle, + endCycle, + this.signData({ + startCycle: currentCycle, + endCycle, + afterTimestamp, + afterTxId, + limit: config.requestLimits.MAX_ORIGINAL_TXS_PER_REQUEST, + }) + ) + : null + + while (currentCycle <= endCycle) { + try { + // Get the data (either from prefetch or fetch now) + const response = nextFetchPromise + ? await nextFetchPromise + : await this.fetchDataFromDistributor( + route, + currentCycle, + endCycle, + this.signData({ + startCycle: currentCycle, + endCycle, + afterTimestamp, + afterTxId, + limit: config.requestLimits.MAX_ORIGINAL_TXS_PER_REQUEST, + }) + ) + + const originalTxs = response?.data?.originalTxs || [] + + // Get size metadata from transformResponse and interceptor + const sizeMetadata = (response.data as ResponseDataWithMetadata)?.__responseSize + const decompressedKB = sizeMetadata?.decompressedKB || '0.00' + const compressedKB = sizeMetadata?.compressedKB + const compressionRatio = sizeMetadata?.compressionRatio + const compressionSavings = sizeMetadata?.compressionSavings + const networkElapsed = (response.data as ResponseDataWithMetadata)?.__networkElapsed || 0 + const deserializedTime = (response.data as ResponseDataWithMetadata)?._deserializedTime || 0 + + if (config.verbose || networkElapsed > 1000) { + // Build log message with compression info if available + let logMessage = + `[API Timing] OriginalTxs fetch (cycles ${startCycle}-${endCycle}): ${networkElapsed}ms, ` + + `deserialization: ${deserializedTime}ms, ` + + `records: ${originalTxs.length}` + + // Only show compression metrics if compression actually reduced the size (ratio < 1) + if (compressedKB !== undefined && compressionRatio !== undefined && compressionRatio < 1) { + logMessage += `, payload: ${compressedKB}KB, payloadUncompressed: ${decompressedKB}KB, ratio: ${compressionRatio}, savings: ${compressionSavings}` + } else { + // No compression or not effective, just show uncompressed size + logMessage += `, payload: ${decompressedKB}KB` + } + + logMessage += + (originalTxs.length === 0 && response.data ? ', response.data exists but empty' : '') + + (!response.data ? ', response.data is null/undefined!' : '') + + console.log(logMessage) + } + + if (!response || !response.data || !response.data.originalTxs) { + console.error(`Error fetching originalTxs for cycle batch ${startCycle}-${endCycle}:`, response) + break // Couldn't fetch any originalTxs + } + + if (originalTxs.length === 0) { + break // No more originalTxs in this cycle range + } + + // Update after timestamp and txId based on last tx BEFORE starting next fetch + const lastTx = originalTxs[originalTxs.length - 1] + currentCycle = lastTx.cycle + afterTimestamp = lastTx.timestamp + afterTxId = lastTx.txId + + // Prefetch next batch while processing current batch + if ( + this.syncConfig.enablePrefetch && + response.length >= config.requestLimits.MAX_ORIGINAL_TXS_PER_REQUEST + ) { + nextFetchPromise = this.fetchDataFromDistributor( + route, + currentCycle, + endCycle, + this.signData({ + startCycle: currentCycle, + endCycle, + afterTimestamp, + afterTxId, + limit: config.requestLimits.MAX_ORIGINAL_TXS_PER_REQUEST, + }) + ) + } else { + nextFetchPromise = null + } + + const startTime = Date.now() + // Deserialize originalTxs + originalTxs.forEach((originalTx) => { + OriginalTxDataDB.deserializeDbOriginalTxData(originalTx) + }) + const elapsed = Date.now() - startTime + if (elapsed > 100) { + console.log(`Deserializing ${originalTxs.length} originalTxs took ${elapsed}ms`) + } + + // Add originalTxs to buffer - will flush to DB when buffer reaches threshold + await this.addToBuffer('originalTx', originalTxs) + + totalFetched += originalTxs.length + this.stats.totalOriginalTxs += originalTxs.length + + if (config.verbose) { + console.log( + `[Cycles ${startCycle}-${endCycle}] OriginalTxs: +${originalTxs.length} (total: ${totalFetched}), ` + + `last in cycle ${currentCycle}` + + (this.syncConfig.enablePrefetch ? ' [prefetch]' : '') + ) + } + + // If we got less than the max originalTxs size, we've exhausted this cycle range + if (originalTxs.length < config.requestLimits.MAX_ORIGINAL_TXS_PER_REQUEST) { + break + } + } catch (error) { + console.error(`Error fetching originalTxs for cycle batch ${startCycle}-${endCycle}:`, error) + throw error + } + } + } + + /** + * Fetch data by multi-cycle range with retry logic + */ + private async fetchDataFromDistributor( + route: string, + startCycle: number, + endCycle: number, + data: any + ): Promise { + const url = `${DISTRIBUTOR_URL}/${route}` + + // Retry with exponential backoff + for (let attempt = 0; attempt <= this.syncConfig.retryAttempts; attempt++) { + try { + const startTime = Date.now() + const response = await this.axiosInstance.post(url, data) + const networkElapsed = Date.now() - startTime + if (response && response.data) { + ;(response.data as ResponseDataWithMetadata).__networkElapsed = networkElapsed + } + return response + } catch (error: any) { + const isLastAttempt = attempt === this.syncConfig.retryAttempts + + // Retry ALL errors (network errors, socket hang up, timeouts, etc.) + // This gives the collector time to recover when overloaded + if (!isLastAttempt) { + // Exponential backoff with longer delays to give collector time to recover + const delay = this.syncConfig.retryDelayMs * Math.pow(2, attempt) + const errorCode = error.code || error.cause?.code || 'UNKNOWN' + const errorMsg = error.message || 'Unknown error' + console.warn( + `Error (${errorCode}: ${errorMsg}) on ${route} fetch (cycles ${startCycle}-${endCycle}), ` + + `attempt ${attempt + 1}/${this.syncConfig.retryAttempts}, ` + + `retrying in ${delay}ms... (Giving collector time to process DB writes)` + ) + await this.sleep(delay) + continue + } + + // Last attempt failed - throw error + console.error( + `Error fetching ${route} for (cycles ${startCycle}-${endCycle}) after ${ + this.syncConfig.retryAttempts + 1 + } attempts:`, + error.message + ) + throw error + } + } + + return null + } + + /** + * Sign data + */ + private signData(obj: SyncTxDataByCycleRange | { start: number; end: number }): P2P.P2PTypes.SignedObject { + const data = { + ...obj, + sender: config.collectorInfo.publicKey, + sign: undefined, + } + crypto.signObj(data, config.collectorInfo.secretKey, config.collectorInfo.publicKey) + return data + } + + /** + * Sleep helper for retry delays + */ + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) + } + + /** + * Print sync summary + */ + private async printSummary(startCycle: number, endCycle: number): Promise { + const elapsedMs = (this.stats.endTime || Date.now()) - this.stats.startTime + const elapsedSec = (elapsedMs / 1000).toFixed(2) + const elapsedMin = (elapsedMs / 60000).toFixed(2) + + const totalRecords = this.stats.totalCycles + this.stats.totalReceipts + this.stats.totalOriginalTxs + const throughput = (totalRecords / (elapsedMs / 1000)).toFixed(0) + + console.log(`\n${'='.repeat(60)}`) + console.log('Parallel Sync Complete!') + console.log(`${'='.repeat(60)}`) + console.log(` Cycle Range: ${startCycle} → ${endCycle}`) + console.log(` Data Cycles Synced: ${this.stats.completedCycles}/${this.stats.totalCyclesToSync}`) + console.log(` Cycles Synced: ${this.stats.totalCycles}`) + console.log(` Receipts Synced: ${this.stats.totalReceipts}`) + console.log(` OriginalTxs Synced: ${this.stats.totalOriginalTxs}`) + console.log(` Total Records: ${totalRecords}`) + console.log(` Errors: ${this.stats.errors}`) + console.log(` Time Elapsed: ${elapsedSec}s (${elapsedMin} min)`) + console.log(` Throughput: ${throughput} records/sec`) + console.log(`${'='.repeat(60)}\n`) + } + + /** + * Generic function to add data to buffer and flush if threshold reached + * Handles all buffer types (receipts, originalTxs, cycles) + */ + private async addToBuffer( + type: 'receipt' | 'originalTx' | 'cycle', + data: Receipt[] | OriginalTxData[] | Cycle[] + ): Promise { + if (type === 'receipt') { + // Wait for lock to be released (prevents concurrent modification during flush) + while (this.receiptBufferLock) { + await new Promise((resolve) => setTimeout(resolve, 10)) + } + + // Add data to buffer + this.receiptBuffer.push(...(data as Receipt[])) + + // Check if buffer reached threshold + if (this.receiptBuffer.length >= this.ACCUMULATION_THRESHOLD) { + await this.flushBuffer('receipt') + } + } else if (type === 'originalTx') { + // Wait for lock to be released (prevents concurrent modification during flush) + while (this.originalTxBufferLock) { + await new Promise((resolve) => setTimeout(resolve, 10)) + } + + // Add data to buffer + this.originalTxBuffer.push(...(data as OriginalTxData[])) + + // Check if buffer reached threshold + if (this.originalTxBuffer.length >= this.ACCUMULATION_THRESHOLD) { + await this.flushBuffer('originalTx') + } + } else { + // Wait for lock to be released (prevents concurrent modification during flush) + while (this.cycleBufferLock) { + await new Promise((resolve) => setTimeout(resolve, 10)) + } + + // Add data to buffer + this.cycleBuffer.push(...(data as Cycle[])) + + // Check if buffer reached threshold + if (this.cycleBuffer.length >= this.ACCUMULATION_THRESHOLD) { + await this.flushBuffer('cycle') + } + } + } + + /** + * Generic function to flush buffer to database + * Handles all buffer types with adaptive delay and locking (adaptive cooling only for receipts) + */ + private async flushBuffer(type: 'receipt' | 'originalTx' | 'cycle'): Promise { + if (type === 'receipt') { + if (this.receiptBuffer.length === 0) return + + this.receiptBufferLock = true + try { + const toFlush = [...this.receiptBuffer] + this.receiptBuffer = [] + + const startTime = Date.now() + // Deserialize receipts in chunks to prevent event loop blocking + for (let i = 0; i < toFlush.length; i += DESERIALIZE_RECEIPTS_CHUNK_SIZE) { + const end = Math.min(i + DESERIALIZE_RECEIPTS_CHUNK_SIZE, toFlush.length) + // Deserialize chunk of receipts + for (let j = i; j < end; j++) { + // eslint-disable-next-line security/detect-object-injection + ReceiptDB.deserializeDbReceipt(toFlush[j] as any) + } + // Yield to event loop after each chunk (except the last one) + if (end < toFlush.length) { + await new Promise((resolve) => setImmediate(resolve)) + } + } + const elapsed = Date.now() - startTime + if (elapsed > 100) { + console.log(`Deserializing ${toFlush.length} receipts took: ${elapsed}ms`) + } + console.log(`[Buffer Flush] Flushing ${toFlush.length} receipts to database`) + if (processData) await ReceiptDB.processReceiptData(toFlush, false, false) + + if (useManualCheckPoint) { + // Increment flush count and potentially checkpoint WAL + this.flushCount++ + await this.maybeCheckpointWAL() + } + } finally { + this.receiptBufferLock = false + } + } else if (type === 'originalTx') { + if (this.originalTxBuffer.length === 0) return + + // If another worker is already locking, return immediately (it will flush our data too) + if (this.originalTxBufferLock) { + return + } + + this.originalTxBufferLock = true + try { + const toFlush = [...this.originalTxBuffer] + this.originalTxBuffer = [] + console.log(`[Buffer Flush] Flushing ${toFlush.length} originaltxs to database`) + if (processData) await OriginalTxDataDB.processOriginalTxData(toFlush) + } finally { + this.originalTxBufferLock = false + } + } else { + if (this.cycleBuffer.length === 0) return + + // If another worker is already locking, return immediately (it will flush our data too) + if (this.cycleBufferLock) { + return + } + + this.cycleBufferLock = true + try { + const toFlush = [...this.cycleBuffer] + this.cycleBuffer = [] + console.log(`[Buffer Flush] Flushing ${toFlush.length} cycles to database`) + if (processData) await CycleDB.bulkInsertCycles(toFlush) + } finally { + this.cycleBufferLock = false + } + } + } + + /** + * Flush all buffers (call at end of sync) + */ + private async flushAllBuffers(): Promise { + await this.flushBuffer('receipt') + await this.flushBuffer('originalTx') + await this.flushBuffer('cycle') + } + + /** + * Conditionally checkpoint WAL files if enough flushes have occurred + * This prevents WAL files from growing too large during long sync operations + */ + private async maybeCheckpointWAL(): Promise { + if (!useManualCheckPoint) return + if (this.flushCount % this.CHECKPOINT_FREQUENCY === 0) { + console.log( + `[WAL Checkpoint] Running periodic checkpoint after ${this.flushCount} buffer flushes (~${ + this.flushCount * this.ACCUMULATION_THRESHOLD + } records)` + ) + // Run checkpoints on all three databases in parallel + // Use PASSIVE mode to avoid blocking readers + await Promise.all([ + checkpointWAL(receiptDatabase, 'PASSIVE'), + checkpointWAL(originalTxDataDatabase, 'PASSIVE'), + checkpointWAL(cycleDatabase, 'PASSIVE'), + ]) + } + } + + /** + * Get current statistics + */ + getStats(): SyncStats { + return { ...this.stats } + } +} diff --git a/src/collector.ts b/src/collector.ts index 1232126..c27f139 100644 --- a/src/collector.ts +++ b/src/collector.ts @@ -28,6 +28,7 @@ import RMQCyclesConsumer from './collectors/rmq/cycles' import RMQOriginalTxsConsumer from './collectors/rmq/original_txs' import RMQReceiptsConsumer from './collectors/rmq/receipts' import { setupCollectorSocketServer } from './collectorServer' +import { DataSyncManager } from './class/DataSyncManager' const DistributorFirehoseEvent = 'FIREHOSE' let ws: WebSocket @@ -78,6 +79,7 @@ if (config.env == envEnum.DEV) { } export const checkAndSyncData = async (): Promise<() => Promise> => { + console.log('Using legacy sequential sync strategy') // Check if there is any existing data in the db let lastStoredReceiptCount = await ReceiptDB.queryReceiptCount() let lastStoredOriginalTxDataCount = await OriginalTxDataDB.queryOriginalTxDataCount() @@ -224,6 +226,7 @@ export const checkAndSyncData = async (): Promise<() => Promise> => { const syncData = async (): Promise => { // If there is already some data in the db, we can assume that the genesis accounts data has been synced already if (lastStoredCycleCount === 0) await downloadAndSyncGenesisAccounts() // To sync accounts data that are from genesis accounts/accounts data that the network start with + // Sync receipts and originalTxsData data first if there is old data if ( lastStoredReceiptCycle > 0 && @@ -271,7 +274,7 @@ const connectToDistributor = (): void => { ws = new WebSocket(URL) ws.onopen = () => { console.log( - `āœ… Socket connected to the Distributor @ ${config.distributorInfo.ip}:${config.distributorInfo.port}}` + `āœ… Socket connected to the Distributor @ ${config.distributorInfo.ip}:${config.distributorInfo.port}` ) connected = true reconnecting = false @@ -282,7 +285,7 @@ const connectToDistributor = (): void => { try { validateData(StringUtils.safeJsonParse(data)) } catch (e) { - console.log('Error in processing received data!', e) + console.log('Error in processing received data!', data, e) } }) ws.onerror = (error) => { @@ -372,7 +375,15 @@ const startServer = async (): Promise => { await Storage.initializeDB() addExitListeners() - const syncData = await checkAndSyncData() + let dataSyncManager = null + + if (config.useParallelSync) { + // Run anomaly detection BEFORE connecting to websocket + // This fails fast if there are data corruption issues + dataSyncManager = new DataSyncManager() + await dataSyncManager.detectDataAnomalies() + } + const syncData = !config.useParallelSync && (await checkAndSyncData()) if (config.dataLogWrite) await initDataLogWriter() addSigListeners() @@ -394,6 +405,11 @@ const startServer = async (): Promise => { } } + if (config.useParallelSync) { + await dataSyncManager.syncData() + return + } + await syncData() } diff --git a/src/config/index.ts b/src/config/index.ts index 2ddb2d5..76cb9ab 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -94,6 +94,11 @@ export interface Config { MAX_ACCOUNT_HISTORY_STATES_PER_REQUEST: number MAX_STATS_PER_REQUEST: number } + parallelSyncConcurrency: number // Number of parallel workers for cycle sync + useParallelSync: boolean // Enable parallel sync with composite cursor + cyclesPerBatch: number // Number of cycles to batch together in multi-cycle requests (default: 10) + enablePrefetch: boolean // Enable prefetching of next batch while processing current batch (default: true) + syncRetryAttempts: number // Number of retry attempts for failed requests (default: 3) dexScreenerAPI: string // Dex Screener API URL for Liberdus token dexScreenerLink: string // Dex Screener Link for Liberdus token } @@ -184,6 +189,11 @@ let config: Config = { MAX_ACCOUNT_HISTORY_STATES_PER_REQUEST: 100, MAX_STATS_PER_REQUEST: 1000000, }, + parallelSyncConcurrency: Number(process.env.PARALLEL_SYNC_CONCURRENCY) || 5, // 5 parallel sync fetches + useParallelSync: process.env.USE_PARALLEL_SYNC !== 'false', // Enable by default + cyclesPerBatch: Number(process.env.CYCLES_PER_BATCH) || 100, // Batch 100 cycles together ( matching MAX_BETWEEN_CYCLES_PER_REQUEST, can be lower if needed ) + enablePrefetch: process.env.ENABLE_PREFETCH !== 'false', // Enable prefetch by default + syncRetryAttempts: Number(process.env.SYNC_RETRY_ATTEMPTS) || 5, // Retry failed requests 5 times dexScreenerAPI: 'https://api.dexscreener.com/latest/dex/search?q=0x693ed886545970F0a3ADf8C59af5cCdb6dDF0a76', dexScreenerLink: 'https://dexscreener.com/polygon/0x041e48a5b11c29fdbd92498eb05573c52728398c', diff --git a/src/storage/account.ts b/src/storage/account.ts index d189fa9..2d1fbef 100644 --- a/src/storage/account.ts +++ b/src/storage/account.ts @@ -79,7 +79,8 @@ export async function bulkInsertAccounts(accounts: Account[]): Promise { ${keepNewerData('accountType')}, ${keepNewerData('isGlobal')}, createdTimestamp = MIN(accounts.createdTimestamp, excluded.createdTimestamp)` - await db.run(accountDatabase, sql, values) + // Serialize write through storage-level queue + transaction for atomicity + await db.executeDbWriteWithTransaction(accountDatabase, sql, values) console.log('Successfully bulk inserted Accounts', accounts.length) } catch (e) { console.log(e) @@ -204,6 +205,47 @@ export async function queryAccountByAccountId(accountId: string): Promise { + try { + const sql = `SELECT timestamp, createdTimestamp FROM accounts WHERE accountId=?` + const dbAccount = (await db.get(accountDatabase, sql, [accountId])) as DbAccount + if (dbAccount) return { timestamp: dbAccount.timestamp, createdTimestamp: dbAccount.createdTimestamp } + return null + } catch (e) { + console.log(e) + return null + } +} + +export async function queryAccountTimestampsBatch( + accountIds: string[] +): Promise> { + const resultMap = new Map() + if (accountIds.length === 0) return resultMap + + try { + // Create placeholders for IN clause + const placeholders = accountIds.map(() => '?').join(', ') + const sql = `SELECT accountId, timestamp, createdTimestamp FROM accounts WHERE accountId IN (${placeholders})` + const accounts = (await db.all(accountDatabase, sql, accountIds)) as DbAccount[] + + for (const account of accounts) { + resultMap.set(account.accountId, { + timestamp: account.timestamp, + createdTimestamp: account.createdTimestamp, + }) + } + + if (config.verbose) console.log('Batch queried accounts', accounts.length, 'of', accountIds.length) + } catch (e) { + console.log('Error in queryAccountTimestampsBatch', e) + } + + return resultMap +} + export async function processAccountData(accounts: AccountsCopy[]): Promise { console.log('accounts size', accounts.length) if (accounts && accounts.length <= 0) return [] diff --git a/src/storage/accountHistoryState.ts b/src/storage/accountHistoryState.ts index c517687..b0e1723 100644 --- a/src/storage/accountHistoryState.ts +++ b/src/storage/accountHistoryState.ts @@ -70,7 +70,8 @@ export async function bulkInsertAccountHistoryStates( ) const sql = `INSERT OR REPLACE INTO accountHistoryState ${fields} VALUES ${allPlaceholders}` - await db.run(accountHistoryStateDatabase, sql, values) + // Serialize write through storage-level queue + transaction for atomicity + await db.executeDbWriteWithTransaction(accountHistoryStateDatabase, sql, values) console.log('Successfully bulk inserted AccountHistoryStates', accountHistoryStates.length) } catch (e) { console.log(e) diff --git a/src/storage/cycle.ts b/src/storage/cycle.ts index 6ddc964..1bb1f43 100644 --- a/src/storage/cycle.ts +++ b/src/storage/cycle.ts @@ -62,7 +62,8 @@ export async function bulkInsertCycles(cycles: Cycle[]): Promise { ) const sql = `INSERT OR REPLACE INTO cycles ${fields} VALUES ${allPlaceholders}` - await db.run(cycleDatabase, sql, values) + // Serialize write through storage-level queue + transaction for atomicity + await db.executeDbWriteWithTransaction(cycleDatabase, sql, values) console.log('Successfully bulk inserted Cycles', cycles.length) } catch (e) { console.log(e) @@ -234,3 +235,94 @@ export async function queryCycleRecordsByTimestamp( return [] } } + +export interface CycleGap { + startCycle: number + endCycle: number + gapSize: number +} + +/** + * Efficiently query for missing cycle ranges + * Returns ranges of missing cycles from 0 to targetCycle + * Uses LEFT JOIN to find gaps between consecutive cycles - O(N) complexity + */ +export async function queryMissingCycleRanges(targetCycle: number): Promise { + try { + // Get first and last cycle for edge gap detection + const firstCycleResult = (await db.get( + cycleDatabase, + 'SELECT MIN(counter) as first_cycle FROM cycles', + [] + )) as { first_cycle: number } | undefined + + const lastCycleResult = (await db.get( + cycleDatabase, + 'SELECT MAX(counter) as last_cycle FROM cycles WHERE counter <= ?', + [targetCycle] + )) as { last_cycle: number } | undefined + + const firstCycle = firstCycleResult?.first_cycle ?? 0 + const lastCycle = lastCycleResult?.last_cycle ?? -1 + + const ranges: CycleGap[] = [] + + // Check for gap at the beginning (0 to firstCycle - 1) + if (firstCycle > 0) { + ranges.push({ + startCycle: 0, + endCycle: firstCycle - 1, + gapSize: firstCycle, + }) + } + + // Find gaps in the middle using LEFT JOIN + // For each cycle c1, check if the next cycle (c1.counter + 1) exists + // If not, find where the gap ends by looking for the next existing cycle + const sql = ` + SELECT + c1.counter + 1 AS startCycle, + (SELECT MIN(c2.counter) - 1 + FROM cycles c2 + WHERE c2.counter > c1.counter AND c2.counter <= ?) AS endCycle + FROM cycles c1 + WHERE NOT EXISTS ( + SELECT 1 FROM cycles c3 + WHERE c3.counter = c1.counter + 1 + ) + AND c1.counter < ? + ORDER BY c1.counter + ` + + const middleGaps = (await db.all(cycleDatabase, sql, [targetCycle, targetCycle])) as { + startCycle: number + endCycle: number + }[] + + // Add middle gaps with calculated gapSize (filter out null endCycle values) + for (const gap of middleGaps) { + if (gap.endCycle !== null && gap.endCycle >= gap.startCycle) { + ranges.push({ + startCycle: gap.startCycle, + endCycle: gap.endCycle, + gapSize: gap.endCycle - gap.startCycle + 1, + }) + } + } + + // Check for gap at the end (lastCycle + 1 to targetCycle) + if (lastCycle >= 0 && lastCycle < targetCycle) { + ranges.push({ + startCycle: lastCycle + 1, + endCycle: targetCycle, + gapSize: targetCycle - lastCycle, + }) + } + + if (config.verbose) console.log(`Found ${ranges.length} missing cycle ranges`) + return ranges + } catch (e) { + console.log('Error querying missing cycle ranges:', e) + throw e + } +} diff --git a/src/storage/index.ts b/src/storage/index.ts index cc314d6..92f0793 100644 --- a/src/storage/index.ts +++ b/src/storage/index.ts @@ -147,6 +147,11 @@ export const initializeDB = async (): Promise => { receiptDatabase, 'CREATE INDEX if not exists `receipts_cycle_timestamp` ON `receipts` (`cycle` DESC, `timestamp` DESC)' ) + // Composite index for cursor-based pagination (optimal for parallel sync) + await runCreate( + receiptDatabase, + 'CREATE INDEX if not exists `receipts_cycle_timestamp_receiptId` ON `receipts` (`cycle` ASC, `timestamp` ASC, `receiptId` ASC)' + ) // be sure to adjust the data types of `transactionType`, `txFrom`, `txTo` as needed await runCreate( originalTxDataDatabase, @@ -173,6 +178,11 @@ export const initializeDB = async (): Promise => { originalTxDataDatabase, 'CREATE INDEX if not exists `originalTxsData_cycle_timestamp` ON `originalTxsData` (`cycle` DESC, `timestamp` DESC)' ) + // Composite index for cursor-based pagination (optimal for parallel sync) + await runCreate( + originalTxDataDatabase, + 'CREATE INDEX if not exists `originalTxsData_cycle_timestamp_txId` ON `originalTxsData` (`cycle` ASC, `timestamp` ASC, `txId` ASC)' + ) await runCreate( originalTxDataDatabase, 'CREATE INDEX if not exists `originalTxsData_txType` ON `originalTxsData` (`transactionType`)' diff --git a/src/storage/originalTxData.ts b/src/storage/originalTxData.ts index 137a5a1..50f37b2 100644 --- a/src/storage/originalTxData.ts +++ b/src/storage/originalTxData.ts @@ -60,7 +60,8 @@ export async function bulkInsertOriginalTxsData(originalTxsData: OriginalTxData[ ) const sql = `INSERT OR REPLACE INTO originalTxsData ${fields} VALUES ${allPlaceholders}` - await db.run(originalTxDataDatabase, sql, values) + // Serialize write through storage-level queue + transaction for atomicity + await db.executeDbWriteWithTransaction(originalTxDataDatabase, sql, values) console.log(`Successfully bulk inserted OriginalTxsData`, originalTxsData.length) } catch (e) { console.log(e) @@ -142,7 +143,7 @@ export async function queryOriginalTxDataCount( } if (startCycle || endCycle) { sql = db.updateSqlStatementClause(sql, values) - sql += `cycleNumber BETWEEN ? AND ?` + sql += `cycle BETWEEN ? AND ?` values.push(startCycle, endCycle) } if (afterTimestamp) { @@ -176,7 +177,7 @@ export async function queryOriginalTxsData(query: QueryOriginalTxsDataParams): P } if (startCycle || endCycle) { sql = db.updateSqlStatementClause(sql, values) - sql += `cycleNumber BETWEEN ? AND ?` + sql += `cycle BETWEEN ? AND ?` values.push(startCycle, endCycle) } if (afterTimestamp) { @@ -196,9 +197,7 @@ export async function queryOriginalTxsData(query: QueryOriginalTxsDataParams): P sql += ` OFFSET ${skip}` } originalTxsData = (await db.all(originalTxDataDatabase, sql, values)) as DbOriginalTxData[] - for (const originalTxData of originalTxsData) { - originalTxData.originalTxData = StringUtils.safeJsonParse(originalTxData.originalTxData) - } + originalTxsData.forEach((originalTxData: DbOriginalTxData) => deserializeDbOriginalTxData(originalTxData)) } catch (e) { console.log(e) } @@ -210,9 +209,7 @@ export async function queryOriginalTxDataByTxId(txId: string): Promise { ) const sql = `INSERT OR REPLACE INTO receipts ${fields} VALUES ${allPlaceholders}` - await db.run(receiptDatabase, sql, values) + // Serialize write through storage-level queue + transaction for atomicity + await db.executeDbWriteWithTransaction(receiptDatabase, sql, values) console.log('Successfully bulk inserted receipts', receipts.length) } catch (e) { console.log(e) @@ -83,14 +84,28 @@ export async function bulkInsertReceipts(receipts: Receipt[]): Promise { export async function processReceiptData( receipts: Receipt[], saveOnlyNewData = false, + filterExistingAccounts = true, // When true, queries DB to filter out older account data before insert forwardToSubscribers = false ): Promise { if (receipts && receipts.length <= 0) return - const bucketSize = 1000 + const bucketSize = 2000 + const bucketSizeForReceipts = 1000 // Receipts size can be big, better to save less than the bucket size let combineReceipts: Receipt[] = [] let combineAccounts: Account[] = [] // For new accounts to bulk insert; Not for accounts that are already stored in database let combineTransactions: Transaction[] = [] let accountHistoryStateList: AccountHistoryStateDB.AccountHistoryState[] = [] + + // Optimization: If saveOnlyNewData is true, batch query existing receipt IDs BEFORE the loop + // to avoid N+1 query problem (individual SELECTs for each receipt) + let existingReceiptIds: Set = new Set() + if (saveOnlyNewData && receipts.length > 0) { + const receiptIds = receipts.map((r) => r.tx.txId) + const placeholders = receiptIds.map(() => '?').join(', ') + const sql = `SELECT receiptId FROM receipts WHERE receiptId IN (${placeholders})` + const existingReceipts = (await db.all(receiptDatabase, sql, receiptIds)) as { receiptId: string }[] + existingReceiptIds = new Set(existingReceipts.map((r) => r.receiptId)) + } + for (const receiptObj of receipts) { const { afterStates, @@ -118,8 +133,10 @@ export async function processReceiptData( applyTimestamp: applyTimestamp ?? calculatedApplyTimestamp, } if (saveOnlyNewData) { - const receiptExist = await queryReceiptByReceiptId(tx.txId) - if (!receiptExist) combineReceipts.push(modifiedReceiptObj as unknown as Receipt) + // Check against pre-fetched set instead of querying database for each receipt + if (!existingReceiptIds.has(tx.txId)) { + combineReceipts.push(modifiedReceiptObj as unknown as Receipt) + } } else combineReceipts.push(modifiedReceiptObj as unknown as Receipt) const txReceipt = appReceiptData receiptsMap.set(tx.txId, tx.timestamp) @@ -128,8 +145,7 @@ export async function processReceiptData( forwardData(receiptObj) } - // Receipts size can be big, better to save per 100 - if (combineReceipts.length >= 100) { + if (combineReceipts.length >= bucketSizeForReceipts) { await bulkInsertReceipts(combineReceipts) combineReceipts = [] } @@ -158,18 +174,19 @@ export async function processReceiptData( combineAccounts.push(accObj) } } else { - const accountExist = await AccountDB.queryAccountByAccountId(accObj.accountId) - if (config.verbose) console.log('accountExist', accountExist) - if (!accountExist) { - combineAccounts.push(accObj) - } else { - if (accountExist.timestamp < accObj.timestamp) { - await AccountDB.updateAccount(accObj) - } - if (accObj.createdTimestamp < accountExist.createdTimestamp) { - await AccountDB.updateCreatedTimestamp(accObj.accountId, accObj.createdTimestamp) - } - } + // const accountExist = await AccountDB.queryAccountTimestamp(accObj.accountId) + // if (config.verbose) console.log('accountExist', accountExist) + // if (accountExist) { + // if (accountExist.timestamp < accObj.timestamp) { + // await AccountDB.updateAccount(accObj) + // // combineAccounts.push(accObj) + // } + // if (accObj.createdTimestamp < accountExist.createdTimestamp) { + // await AccountDB.updateCreatedTimestamp(accObj.accountId, accObj.createdTimestamp) + // } + // } else { + combineAccounts.push(accObj) + // } } // if tx receipt is saved as an account, create tx object from the account and save it @@ -229,13 +246,13 @@ export async function processReceiptData( } txObj.data = {} } - const transactionExist = await TransactionDB.queryTransactionByTxId(tx.txId) - if (config.verbose) console.log('transactionExist', transactionExist) - if (!transactionExist) { - combineTransactions.push(txObj) - } else if (transactionExist.timestamp < txObj.timestamp) { - await TransactionDB.insertTransaction(txObj) - } + // const transactionExist = await TransactionDB.queryTransactionByTxId(tx.txId) + // if (config.verbose) console.log('transactionExist', transactionExist) + // if (!transactionExist) { + combineTransactions.push(txObj) + // } else if (transactionExist.timestamp < txObj.timestamp) { + // await TransactionDB.insertTransaction(txObj) + // } if (config.saveAccountHistoryState) { // Note: This has to be changed once we change the way the global modification tx consensus is updated if ( @@ -286,8 +303,40 @@ export async function processReceiptData( accountHistoryStateList = [] } } + + // Optimization: The bulkInsertAccounts SQL already handles: + // 1. Keeping newer data via CASE WHEN excluded.timestamp > accounts.timestamp + // 2. Preserving oldest createdTimestamp via MIN(accounts.createdTimestamp, excluded.createdTimestamp) + // By default (filterExistingAccounts=false), we skip the batch query and individual updates - just bulk insert everything + + if (filterExistingAccounts) { + // Legacy path: Batch query all collected account IDs once and filter before insert + const accountIdsToQuery = combineAccounts.map((acc) => acc.accountId) + const existingAccounts = await AccountDB.queryAccountTimestampsBatch(accountIdsToQuery) + for (const accObj of combineAccounts) { + const accountExist = existingAccounts.get(accObj.accountId) + if (accountExist) { + if (accountExist.timestamp > accObj.timestamp) { + // await AccountDB.updateAccount(accObj) + // Remove the account from the list + combineAccounts = combineAccounts.filter((acc) => acc.accountId !== accObj.accountId) + } + if (accountExist.createdTimestamp > accObj.createdTimestamp) { + await AccountDB.updateCreatedTimestamp(accObj.accountId, accObj.createdTimestamp) + } + } + } + } + + // Insert the combined accounts in bucketSize + if (combineAccounts.length > 0) { + for (let i = 0; i < combineAccounts.length; i += bucketSize) { + const accounts = combineAccounts.slice(i, i + bucketSize) + await AccountDB.bulkInsertAccounts(accounts) + } + } + if (combineReceipts.length > 0) await bulkInsertReceipts(combineReceipts) - if (combineAccounts.length > 0) await AccountDB.bulkInsertAccounts(combineAccounts) if (combineTransactions.length > 0) await TransactionDB.bulkInsertTransactions(combineTransactions) if (accountHistoryStateList.length > 0) await AccountHistoryStateDB.bulkInsertAccountHistoryStates(accountHistoryStateList) @@ -374,7 +423,10 @@ export async function queryReceiptCountByCycles( let receipts: { cycle: number; 'COUNT(*)': number }[] = [] try { const sql = `SELECT cycle, COUNT(*) FROM receipts GROUP BY cycle HAVING cycle BETWEEN ? AND ? ORDER BY cycle ASC` - receipts = (await db.all(receiptDatabase, sql, [start, end])) as { cycle: number; 'COUNT(*)': number }[] + receipts = (await db.all(receiptDatabase, sql, [start, end])) as { + cycle: number + 'COUNT(*)': number + }[] } catch (e) { console.log(e) } @@ -388,7 +440,7 @@ export async function queryReceiptCountByCycles( }) } -function deserializeDbReceipt(receipt: DbReceipt): void { +export function deserializeDbReceipt(receipt: DbReceipt): void { receipt.tx &&= StringUtils.safeJsonParse(receipt.tx) receipt.beforeStates &&= StringUtils.safeJsonParse(receipt.beforeStates) receipt.afterStates &&= StringUtils.safeJsonParse(receipt.afterStates) diff --git a/src/storage/sqlite3storage.ts b/src/storage/sqlite3storage.ts index 36988e4..2b673de 100644 --- a/src/storage/sqlite3storage.ts +++ b/src/storage/sqlite3storage.ts @@ -1,6 +1,32 @@ import { Utils as StringUtils } from '@shardus/types' import { Database } from 'sqlite3' +const enableWritingQueue = false + +// Simple write queue using Promise chain - serializes all database writes +// This prevents write contention while allowing parallel reads (SELECTs) +// Only INSERT/UPDATE/DELETE operations should use this queue +let writeQueueTail: Promise = Promise.resolve() + +// Control whether to use manual WAL checkpoints +export const useManualCheckPoint = false + +interface QueryTiming { + id: number + sql: string + startMs: number + engineMs?: number +} + +const SQL_LOG_MAX_LENGTH = 200 +const SQL_ENGINE_WARN_THRESHOLD_MS = 500 +const SQL_QUEUE_WARN_THRESHOLD_MS = 250 +const SQL_TOTAL_WARN_THRESHOLD_MS = 1000 + +let queryIdSequence = 0 +const pendingQueries = new Map() +const queuedBySql = new Map() + export const createDB = async (dbPath: string, dbName: string): Promise => { console.log('dbName', dbName, 'dbPath', dbPath) const db = new Database(dbPath, (err) => { @@ -12,19 +38,110 @@ export const createDB = async (dbPath: string, dbName: string): Promise { - if (time > 500 && time < 1000) { - console.log('SLOW QUERY', process.pid, sql, time) - } else if (time > 1000) { - console.log('VERY SLOW QUERY', process.pid, sql, time) + const engineMs = typeof time === 'number' ? time : Number(time) + const queue = queuedBySql.get(sql) + const id = queue && queue.length > 0 ? queue[0] : undefined + if (id === undefined) { + printQueryTimingLog('profile event without pending query', { + engineMs, + sql: formatSqlForLog(sql), + }) + return + } + const entry = pendingQueries.get(id) + if (!entry) { + printQueryTimingLog('profile missing pending entry', { + engineMs, + sql: formatSqlForLog(sql), + }) + return + } + entry.engineMs = engineMs + if (engineMs > SQL_ENGINE_WARN_THRESHOLD_MS) { + console.warn(`[DB Engine] Slow Query: ${engineMs} ms for SQL: ${formatSqlForLog(sql)}`) } }) console.log(`Database ${dbName} Initialized!`) return db } +/** + * Create read-only database connection optimized for SELECT queries + * - Shorter busy_timeout (reads shouldn't block in WAL mode) + * - No synchronous writes (read-only) + * - Large cache and mmap for fast reads + */ +export const createReadDB = async (dbPath: string, dbName: string): Promise => { + console.log('dbName (Read)', dbName, 'dbPath', dbPath) + const db = new Database(dbPath, (err) => { + if (err) { + console.log('Error opening read database:', err) + throw err + } + }) + await run(db, 'PRAGMA journal_mode=WAL') // WAL mode allows concurrent reads with writes + await run(db, 'PRAGMA synchronous = OFF') // Read-only connection doesn't need sync + await run(db, 'PRAGMA temp_store = MEMORY') + await run(db, 'PRAGMA cache_size = -128000') // 128MB cache (smaller than write connection) + await run(db, 'PRAGMA mmap_size = 536870912') // 512MB memory-mapped I/O for faster reads + await run(db, 'PRAGMA busy_timeout = 5000') // Shorter timeout - reads shouldn't block in WAL mode + await run(db, 'PRAGMA threads = 4') // Use up to 4 threads for parallel operations + await run(db, 'PRAGMA query_only = ON') // Enforce read-only mode at SQLite level + db.on('profile', (sql, time) => { + const engineMs = typeof time === 'number' ? time : Number(time) + const queue = queuedBySql.get(sql) + const id = queue && queue.length > 0 ? queue[0] : undefined + if (id === undefined) { + printQueryTimingLog('profile event without pending query (read)', { + engineMs, + sql: formatSqlForLog(sql), + }) + return + } + const entry = pendingQueries.get(id) + if (!entry) { + printQueryTimingLog('profile missing pending entry (read)', { + engineMs, + sql: formatSqlForLog(sql), + }) + return + } + entry.engineMs = engineMs + if (engineMs > SQL_ENGINE_WARN_THRESHOLD_MS) { + console.warn(`[DB Engine Read] Slow Query: ${engineMs} ms for SQL: ${formatSqlForLog(sql)}`) + } + }) + console.log(`Read Database ${dbName} Initialized!`) + return db +} + +/** + * Manually checkpoint the WAL file to prevent it from growing too large + * Uses PASSIVE mode which won't block readers + * Call this periodically during long-running sync operations + */ +export async function checkpointWAL( + db: Database, + mode: 'PASSIVE' | 'FULL' | 'RESTART' = 'PASSIVE' +): Promise { + try { + await run(db, `PRAGMA wal_checkpoint(${mode})`) + console.log(`[WAL Checkpoint] Executed ${mode} checkpoint`) + } catch (error) { + console.error('[WAL Checkpoint] Failed to checkpoint WAL:', error) + } +} + /** * Close Database Connections Gracefully */ @@ -57,13 +174,22 @@ export async function run( sql: string, params: unknown[] | object = [] ): Promise<{ id: number }> { + const entry = registerQuery(sql) return new Promise((resolve, reject) => { + const finalize = (): void => { + setImmediate(() => { + logTiming('run', entry) + cleanupQuery(entry) + }) + } db.run(sql, params, function (err: Error) { if (err) { console.log('Error running sql ' + sql) console.log(err) + finalize() reject(err) } else { + finalize() resolve({ id: this.lastID }) } }) @@ -71,13 +197,22 @@ export async function run( } export async function get(db: Database, sql: string, params = []): Promise { + const entry = registerQuery(sql) return new Promise((resolve, reject) => { + const finalize = (rows?: number): void => { + setImmediate(() => { + logTiming('get', entry, rows) + cleanupQuery(entry) + }) + } db.get(sql, params, (err: Error, result: T) => { if (err) { console.log('Error running sql: ' + sql) console.log(err) + finalize() reject(err) } else { + finalize(result ? 1 : 0) resolve(result) } }) @@ -85,19 +220,112 @@ export async function get(db: Database, sql: string, params = []): Promise } export async function all(db: Database, sql: string, params = []): Promise { + const entry = registerQuery(sql) return new Promise((resolve, reject) => { + const finalize = (rowsCount?: number): void => { + setImmediate(() => { + logTiming('all', entry, rowsCount) + cleanupQuery(entry) + }) + } db.all(sql, params, (err: Error, rows: T[]) => { if (err) { console.log('Error running sql: ' + sql) console.log(err) + finalize() reject(err) } else { + finalize(rows ? rows.length : 0) resolve(rows) } }) }) } +/** + * Executes a database write operation through the shared write queue + * Use this for INSERT/UPDATE/DELETE operations to prevent write contention + * Do NOT use for SELECT queries - they can run in parallel + */ +export async function executeDbWrite(writeOperation: () => Promise): Promise { + const enqueuedAt = Date.now() + + // Wait for previous write to finish, ignoring errors to prevent propagation + const myTurn = writeQueueTail.catch(() => undefined) + + // Create and chain the new write operation + const currentWrite = myTurn.then(async () => { + const startedAt = Date.now() + const promiseQueueMs = startedAt - enqueuedAt + + // Log if we waited a long time in the Promise queue + if (promiseQueueMs > 100) { + console.log(`[Promise Queue] Waited ${promiseQueueMs}ms in Promise queue before starting DB operation`) + } + + const value = await writeOperation() + const completedAt = Date.now() + const executionMs = completedAt - startedAt + + // Log slow DB operations (includes transaction + SQLite busy_timeout) + if (executionMs > 500) { + console.log( + `[DB Operation] Total: ${executionMs}ms (Promise queue: ${promiseQueueMs}ms, DB execution+waiting: ${executionMs}ms)` + ) + } + + return value + }) + + // Update queue tail to current write (for next operation to wait on) + writeQueueTail = currentWrite.catch(() => undefined) + + // Return the actual operation result + return currentWrite +} + +/** + * Execute work within a database transaction + * Uses BEGIN (deferred) since our write queue already serializes writes + * This reduces lock contention compared to BEGIN IMMEDIATE + * @param db Database instance + * @param work Async function containing the work to execute within the transaction + * @returns Result of the work function + */ +export async function executeInTransaction(db: Database, work: () => Promise): Promise { + await run(db, 'BEGIN') // Deferred transaction - acquires RESERVED lock on first write, not at BEGIN + try { + const result = await work() + await run(db, 'COMMIT') + return result + } catch (error) { + await run(db, 'ROLLBACK') + throw error + } +} + +export async function executeDbWriteWithTransaction( + db: Database, + sql: string, + params: unknown[] | object = [] +): Promise { + // Use write queue if enabled + if (enableWritingQueue) { + // Serialize write throuh promise queue + await executeDbWrite(() => + executeInTransaction(db, async () => { + await run(db, sql, params) + }) + ) + return + } + + // Use transaction directly + await executeInTransaction(db, async () => { + await run(db, sql, params) + }) +} + export function extractValues(object: object): string[] { try { const inputs: string[] = [] @@ -134,3 +362,56 @@ export function updateSqlStatementClause(sql: string, inputs: any[]): string { else sql += ' WHERE ' return sql } + +function registerQuery(sql: string): QueryTiming { + const entry: QueryTiming = { + id: ++queryIdSequence, + sql, + startMs: Date.now(), + } + pendingQueries.set(entry.id, entry) + let queue = queuedBySql.get(sql) + if (!queue) { + queue = [] + queuedBySql.set(sql, queue) + } + queue.push(entry.id) + return entry +} + +function cleanupQuery(entry: QueryTiming): void { + pendingQueries.delete(entry.id) + const queue = queuedBySql.get(entry.sql) + if (!queue) return + const index = queue.indexOf(entry.id) + if (index !== -1) queue.splice(index, 1) + if (queue.length === 0) queuedBySql.delete(entry.sql) +} + +function printQueryTimingLog(message: string, payload: object): void { + console.warn(`[DB Timing] ${message}`, JSON.stringify(payload)) +} + +function logTiming(operation: string, entry: QueryTiming, rows?: number): void { + const totalMs = Date.now() - entry.startMs + const engineMs = entry.engineMs ?? 0 + const queueMs = Math.max(0, totalMs - engineMs) + const payload = { + operation, + totalMs: Number(totalMs.toFixed(2)), + queueMs: Number(queueMs.toFixed(2)), + engineMs: Number(engineMs.toFixed(2)), + sql: formatSqlForLog(entry.sql), + rows, + } + + if (totalMs > SQL_TOTAL_WARN_THRESHOLD_MS || queueMs > SQL_QUEUE_WARN_THRESHOLD_MS) { + printQueryTimingLog('', payload) + } +} + +function formatSqlForLog(sql: string): string { + const normalized = sql.replace(/\s+/g, ' ').trim() + if (normalized.length <= SQL_LOG_MAX_LENGTH) return normalized + return `${normalized.slice(0, SQL_LOG_MAX_LENGTH - 3)}...` +} diff --git a/src/storage/transaction.ts b/src/storage/transaction.ts index f46238b..5779cfa 100644 --- a/src/storage/transaction.ts +++ b/src/storage/transaction.ts @@ -61,7 +61,8 @@ export async function bulkInsertTransactions(transactions: Transaction[]): Promi ) const sql = `INSERT OR REPLACE INTO transactions ${fields} VALUES ${allPlaceholders}` - await db.run(transactionDatabase, sql, values) + // Serialize write through storage-level queue + transaction for atomicity + await db.executeDbWriteWithTransaction(transactionDatabase, sql, values) console.log('Successfully bulk inserted transactions', transactions.length) } catch (e) { console.log(e) @@ -378,7 +379,9 @@ export async function queryActiveAccountsCountByTxFee( WHERE timestamp < ? AND timestamp > ? ${excludeZeroFeeTxs ? ' AND txFee > 0' : ''} ` const values = [beforeTimestamp, afterTimestamp] - activeAccounts = (await db.get(transactionDatabase, sql, values)) as { 'COUNT(DISTINCT txFrom)': number } + activeAccounts = (await db.get(transactionDatabase, sql, values)) as { + 'COUNT(DISTINCT txFrom)': number + } } catch (e) { console.log('Error querying active accounts by txFee:', e) }