Skip to content

New Optimized Data Sync Manager with Parallel Data Sync Feature#5

Merged
jairajdev merged 14 commits intodevfrom
parallel-data-sync
Nov 17, 2025
Merged

New Optimized Data Sync Manager with Parallel Data Sync Feature#5
jairajdev merged 14 commits intodevfrom
parallel-data-sync

Conversation

@jairajdev
Copy link
Contributor

No description provided.

@jairajdev jairajdev changed the base branch from main to feat/cycle-based-data-sync November 17, 2025 14:07
Base automatically changed from feat/cycle-based-data-sync to dev November 17, 2025 14:08
**Parallel Sync Implementation:**
- Add ParallelCycleSync class with 10x+ performance improvement
- Implement composite cursor pagination (cycle + timestamp + ID) to prevent data loss
- Multi-cycle batching: fetch 10-200 cycles per HTTP request (vs 1 cycle previously)
- Prefetching: overlap network fetch with database writes
- Automatic checkpoint/resume from database (ParallelSyncCheckpointManager)
- Configurable concurrency, batch size, retry attempts via env vars

**Client-Side JSON Optimization:**
- Configure axios with StringUtils.safeStringify for request serialization
- Configure axios with StringUtils.safeJsonParse for response parsing
- Add timing measurements for stringify/parse operations
- Use Content-Length header for size (eliminates expensive re-stringify)
- HTTP connection pooling with keep-alive agents (maxSockets: concurrency * 2)

**Database Enhancements:**
- Add composite indexes for cursor-based pagination:
  - receipts: (cycle ASC, timestamp ASC, receiptId ASC)
  - originalTxsData: (cycle ASC, timestamp ASC, txId ASC)
- Add SQLite lock contention diagnostics (queueMs vs engineMs)
- Track query timing with registerQuery/cleanupQuery pattern
- Warn on queueMs > 250ms or totalMs > 1000ms

**Configuration:**
- Add PARALLEL_SYNC_CONCURRENCY env var (default: 10)
- Add USE_PARALLEL_SYNC env var (default: true)
- Add CYCLES_PER_BATCH env var (default: 10)
- Add ENABLE_PREFETCH env var (default: true)
- Add SYNC_RETRY_ATTEMPTS env var (default: 3)

**Collector Entry Point:**
- Auto-select parallel vs legacy sync based on USE_PARALLEL_SYNC flag
- Add downloadTxsDataAndCyclesParallel() function in DataSync.ts
- Maintain backward compatibility with legacy sequential sync

**Error Handling:**
- Exponential backoff retry for ECONNRESET/ETIMEDOUT/ECONNREFUSED/EPIPE
- Detailed error logging with cycle ranges and attempt numbers

**Package Updates:**
- Add p-queue for work queue management
- Track response payload sizes (compressed and uncompressed) via socket bytesRead
- Calculate compression ratio and savings for API responses
- Standardize log naming to match Fastify: payload (compressed), payloadUncompressed (uncompressed)
- Add response interceptor to capture actual bytes transferred over network
- Update receipts and originalTxs fetch logs with consistent format
…ync architecture

- Rename ParallelCycleSync class to ParallelDataSync for better clarity
- Remove batchSize from ParallelSyncConfig, use config limits instead
- Replace composite cursor approach with simpler timestamp + txId pagination
- Simplify sync flow by removing separate cycle metadata fetching step
- Update method names: syncCycleRange → startSyncing for better semantics
- Remove legacy downloadTxsDataAndCyclesParallel function from DataSync
- Streamline API endpoints to use /cycle instead of /multi-cycle-cursor
- Add SyncTxDataByCycleRange interface for cleaner parameter passing
Implements DataSyncManager to handle missing data identification and recovery
for parallel sync operations with comprehensive gap detection and verification.

Key features:
- Automatic gap detection across cycle ranges
- Data anomaly detection (validates last 15 cycles before sync)
- Lookback verification window (cyclesPerBatch × parallelSyncConcurrency)
- Recovery orchestration using ParallelDataSync for all scenarios
- Fail-fast validation before websocket connection
- Fresh start vs resume from interruption routing

Handles complex scenarios:
- Multiple interruption points during parallel sync
- Incremental data ( received through websocket ) gaps  during process restarts
- Data integrity verification and mismatch detection
- Unified recovery strategy using cycle-batch parallel sync
…finding

- Move CycleGap interface to storage/cycle.ts and implement efficient SQL-based gap detection
- Enhance anomaly detection with better error handling and validation logic
- Add sync summary functionality to DataSyncManager with database statistics
- Remove ParallelSyncCheckpoint dependency and simplify ParallelDataSync
- Improve logging and error messages throughout sync process
…tch processing

- Replace separate fetch methods with unified fetchDataFromDistributor method
- Refactor startSyncing to accept pre-created cycle batches instead of range parameters
- Add createCycleBatches method for better batch management separation
- Optimize account/transaction processing by removing individual existence checks
- Add batch querying for account timestamps to reduce database calls
- Improve SQLite performance with increased cache size and memory-mapped I/O
- Add network timing and compression metrics logging for better observability
- Update DataSyncManager to use new batch-based sync approach
- Increase default cyclesPerBatch from 10 to 100 for better throughput
- Separate totalCyclesToSync from totalCycles in stats
- Calculate throughput based on all record types (cycles + receipts + originalTxs)
- Track actual cycle records inserted, not just cycle ranges processed
- Update progress reporting to show data cycles vs total records
- Change throughput label from "receipts/sec" to "records/sec"
- Implement parallel batch fetching with configurable concurrency (up to 100)
- Fix batch boundaries to prevent overlapping cycles and double-counting
- Add detailed mismatch reporting with formatted table output
- Improve type safety with TallyItem and MismatchResult interfaces

feat: add distributor_tally_verifier script

Create comprehensive verification tool to compare distributor endpoints:
- Tally mode: Compare tally endpoint vs cycle-based pagination
- Full mode: Compare full data endpoint vs cycle-based pagination
- Track transaction IDs to debug count discrepancies
- Support page-based pagination for full data endpoint
- Display detailed mismatch analysis with ID-level comparison
…allel sync

- Track deserialization time for API responses and log when > 50ms
- Add explicit deserialization timing for receipts and originalTxs processing
- Improve parallel sync error handling with Promise.allSettled for better failure reporting
- Fix cycle count calculation (add +1 for inclusive range)
- Rename syncCyclesByCycleRange to syncCycleRecordsByCycleRange for clarity
- Refactor database timing functions and move to bottom of file
- Export deserializeDbReceipt and deserializeDbOriginalTxData functions
- Optimize deserialization calls using forEach instead of for loops
…rallel sync

- Add accumulation buffers (1000 record threshold) to batch DB writes and reduce contention
- Implement serialized write queue with transaction support to prevent concurrent write conflicts
- Optimize receipt processing by pre-fetching existing IDs to avoid N+1 query problem
- Increase retry attempts to 5 with exponential backoff for better collector recovery
- Add mutex locks to prevent race conditions during buffer flushes
- Configure WAL checkpoint frequency and database pragmas for high-throughput operations
- Set as a config for manual WAL checkpointing every 10 buffer flushes to prevent WAL growth
- Reduce concurrency from 10 to 5 workers to balance throughput with DB pressure
- Add chunked receipt deserialization (20 per chunk) to prevent event loop blocking
- Put as a config for write queue infrastructure for serialized database operations
- Add processData flag for debugging and performance testing
@jairajdev jairajdev merged commit 2478408 into dev Nov 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant