Skip to content

Conversation

@tmosleyIII
Copy link
Contributor

This pull request introduces several new configuration files and code changes that enhance the flexibility, reliability, and usability of the pipeline workflow for Stellar/Soroban RPC event processing. The main improvements include support for batch and streaming modes with checkpointing, new pipelines for contract events and USDC transfers, real-time WebSocket streaming, and a test client for WebSocket integration. Additionally, the codebase is updated to support these features, including improved RPC handling and checkpoint management.

Pipeline configuration enhancements:

  • Added batch mode and streaming mode example configurations (rpc-batch-mode.yaml, rpc-streaming-mode.yaml) supporting checkpointing for crash recovery and historical backfills. [1] [2]
  • Introduced new pipelines for contract events (contract_events_rpc.yaml), USDC transfer events (usdc_transfers_rpc.yaml), and real-time WebSocket streaming (websocket_streaming.yaml). [1] [2] [3]

WebSocket integration:

  • Added a Python test client (test_websocket_client.py) for verifying WebSocket server functionality and event streaming.

Codebase improvements for RPC adapter:

  • Implemented checkpointing support in the source adapter for batch and streaming modes, including a new Checkpoint struct for crash recovery.
  • Updated the GetLedgers RPC method to use new parameters and response structures, improving compatibility and clarity. [1] [2]
  • Added logic to select between event and ledger processing loops based on the configured RPC method.

General improvements:

  • Improved request/response handling in the RPC adapter for better error management and logging.
  • Updated import statements to support new features such as checkpointing and XDR decoding.

@tmosleyIII tmosleyIII requested a review from Copilot November 11, 2025 12:47
Copilot finished reviewing on behalf of tmosleyIII November 11, 2025 12:50
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +685 to +688
if checkpoint != nil && checkpoint.LastProcessedLedger > 0 {
log.Printf("Resuming from checkpoint: ledger %d", checkpoint.LastProcessedLedger)
currentLedger = checkpoint.LastProcessedLedger + 1
totalProcessed = checkpoint.TotalProcessed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid skipping ledger when resuming from checkpoint

After each batch the loop sets currentLedger = batchEnd and later calls saveCheckpoint(currentLedger, …) so the checkpoint stores the next ledger to fetch. On restart this block resumes with currentLedger = checkpoint.LastProcessedLedger + 1, so the first ledger of the next batch is skipped entirely. A crash or manual restart will therefore lose one ledger of data. Persist the last processed ledger (e.g. batchEnd-1) or resume without adding one.

Useful? React with 👍 / 👎.

@claude
Copy link

claude bot commented Nov 11, 2025

Code Review: PR #26 - Update RPC Source Adapter

Executive Summary

This PR introduces significant enhancements to the RPC source adapter including checkpointing support, dual-mode operation (streaming/batch), and comprehensive documentation. The implementation shows good architectural design but contains several critical bugs that must be fixed before merging.

Recommendation: 🔴 Request Changes - Critical data integrity issues identified


🎯 What This PR Does Well

Excellent Documentation: The README files (README_RPC_EVENTS.md, README_WEBSOCKET.md) are comprehensive with clear examples
Dual-Mode Architecture: Clean separation between streaming and batch processing modes
Atomic Checkpointing: Uses temp file + rename pattern for crash-safe checkpoint writes
Graceful Shutdown: Proper context handling for clean termination
Configuration Examples: Multiple YAML examples covering common use cases


🚨 Critical Issues (MUST FIX)

1. Data Loss Risk: Incorrect Ledger Advancement Logic

Location: source_adapter_rpc.go:377-379, source_adapter_rpc.go:811

```go
// In GetLedgers()
limit := to - from
if limit <= 0 {
limit = 1
}

// Later in loop
currentLedger = batchEnd // ❌ Assumes all ledgers returned
```

Problem: If `GetLedgers(100, 105)` requests 5 ledgers but RPC returns only 3, the code still advances `currentLedger` to 105, skipping ledgers 103-104 without processing them.

Impact: Silent data loss - ledgers can be permanently skipped
Fix Required: Use actual returned ledger count to advance position:
```go
if len(ledgers) > 0 {
lastLedgerSeq := getLastLedgerSequence(ledgers)
currentLedger = lastLedgerSeq + 1
} else {
currentLedger = batchEnd // Only if truly no ledgers exist
}
```

2. Silent Ledger Skipping Without Tracking

Location: source_adapter_rpc.go:778-801

```go
ledgerMap, ok := ledger.(map[string]interface{})
if !ok {
log.Printf("WARNING: Ledger is not a map, skipping: %T", ledger)
continue // ❌ Silently skips, no tracking
}
```

Problem: Malformed ledgers from RPC are silently discarded with only a log message. No tracking of which ledgers were skipped.

Impact: Data integrity violations without visibility
Fix Required: Implement skip tracking and expose metrics:
```go
var skippedLedgers []uint64
// Track skipped sequences
// After batch, log: "Skipped ledgers: [101, 105, 203]"
```

3. Checkpoint Resume Off-By-One Risk

Location: source_adapter_rpc.go:684-689

```go
if checkpoint != nil && checkpoint.LastProcessedLedger > 0 {
currentLedger = checkpoint.LastProcessedLedger + 1
totalProcessed = checkpoint.TotalProcessed
}
```

Problem: Semantics unclear - is `LastProcessedLedger` the last fully processed ledger or the last one attempted? If checkpoint is saved mid-batch during a crash, this could lead to duplicate processing or skipped ledgers.

Impact: Data integrity violations on recovery
Fix Required: Clarify checkpoint semantics and document:

  • Either save "next ledger to process" (no +1 needed)
  • Or guarantee checkpoint only saved after full batch success

4. Unsafe Type Conversions Without Validation

Location: source_adapter_rpc.go:219-223

```go
if slFloat, ok := config["start_ledger"].(float64); ok {
startLedger = uint64(slFloat) // ❌ No validation
}
```

Problem: No validation that `slFloat` is positive or within uint64 bounds. A negative value like `-1000` would convert to `18446744073709550616`.

Impact: Invalid ledger numbers causing RPC failures
Fix Required: Add validation:
```go
if slFloat, ok := config["start_ledger"].(float64); ok {
if slFloat < 0 || slFloat > float64(math.MaxUint64) {
return fmt.Errorf("invalid start_ledger: %f", slFloat)
}
startLedger = uint64(slFloat)
}
```


⚠️ High Priority Issues (SHOULD FIX)

5. Context Handling Bug

Location: source_adapter_rpc.go:878

```go
latest, err := s.GetLatestLedger(context.Background()) // ❌ Ignores parent context
```

Problem: Uses `context.Background()` instead of passed `ctx`, ignoring timeouts and cancellation signals.

Impact: Resource leaks, hanging requests
Fix: `latest, err := s.GetLatestLedger(ctx)`

6. Processor Errors Silently Ignored

Location: source_adapter_rpc.go:796-801

```go
if err := proc.Process(ctx, msg); err != nil {
log.Printf("Processor %T failed: %v", proc, err)
// ❌ Continues processing next ledgers
}
```

Problem: Critical processor failures don't stop the pipeline, potentially corrupting downstream data.

Impact: Data corruption with only log indication
Recommendation: Add configurable fail-fast mode for critical processors

7. Checkpoint Directory Fallback Issues

Location: source_adapter_rpc.go:583-586

```go
if err := os.MkdirAll(dir, 0755); err != nil {
log.Printf("WARNING: Failed to create checkpoint directory: %v, using /tmp", err)
dir = "/tmp" // ❌ No validation
}
```

Problems:

  • Falls back to `/tmp` without checking if writable
  • Multiple pipelines would write to same `/tmp/ledgers.json`, causing collisions
  • No guarantee `/tmp` exists or has space

Fix: Validate alternative path or fail explicitly


📊 Code Quality Observations

Strengths:

  • Well-documented: READMEs are extremely helpful with practical examples
  • Modular design: Clean separation of concerns
  • Config flexibility: Supports multiple RPC providers and authentication methods
  • WebSocket integration: Good addition for real-time streaming use cases

Weaknesses:

  • No test coverage: source_adapter_rpc.go has zero test files
  • No retry logic: Single RPC failure stops entire pipeline (no exponential backoff)
  • Weak error handling: Many errors logged but not acted upon
  • No metrics/observability: Missing counters for processed/failed/skipped ledgers
  • Type confusion: Two different `GetEventsResponse` structs in codebase

🔒 Security Review

✅ Good Practices:

  • Template configs use placeholders like `YOUR_API_KEY` instead of real credentials
  • Auth headers properly documented for different providers
  • No hardcoded secrets in code

⚠️ Minor Concerns:

  • Checkpoint files created with `0755` permissions (readable by all users)
  • Consider using `0750` or `0700` for checkpoint directory
  • WebSocket server has no TLS configuration examples (should document wss:// setup)

🧪 Test Coverage Assessment

Current State:ZERO tests for the modified RPC adapter

Missing Tests:

  • ✗ Unit tests for checkpoint save/load/resume logic
  • ✗ Integration tests with mock RPC server
  • ✗ Error scenario tests (network failures, malformed responses)
  • ✗ Concurrent access tests for checkpoint files
  • ✗ Ledger skip/duplicate detection tests

Recommendation: Add at minimum:

  1. Checkpoint logic unit tests
  2. Ledger advancement logic tests with various RPC response scenarios
  3. Configuration validation tests

🎯 Performance Considerations

Potential Issues:

  1. Memory usage: Large `batch_size` values (e.g., 10000) could consume significant RAM for ledger buffers
  2. No rate limiting: Could overwhelm public RPC endpoints
  3. Single-threaded: No concurrent ledger processing (may be intentional for ordering)

Recommendations:

  • Document recommended `batch_size` ranges per RPC provider
  • Consider implementing request rate limiting
  • Add memory usage metrics

📝 Documentation Review

README_RPC_EVENTS.md ⭐️

Excellent quality - comprehensive examples, troubleshooting section, clear use cases

README_WEBSOCKET.md ⭐️

Outstanding - includes client code examples in multiple languages (Python, JS, Go)

Config Examples ✅

Well-commented YAML files with clear inline documentation

Suggestion: Add a migration guide for users currently using the old RPC adapter


✅ Pre-Merge Checklist

  • CRITICAL: Fix ledger advancement logic to use actual returned count
  • CRITICAL: Implement ledger skip tracking and reporting
  • CRITICAL: Clarify and document checkpoint semantics
  • HIGH: Add type conversion validation
  • HIGH: Fix context.Background() usage
  • MEDIUM: Add basic test coverage for checkpoint logic
  • MEDIUM: Implement retry logic with exponential backoff
  • LOW: Add metrics for monitoring (processed/failed/skipped counts)

🏁 Conclusion

This is a significant and valuable feature addition with excellent documentation. However, the critical data integrity bugs make it unsuitable for production use without fixes. The ledger advancement logic in particular poses a real risk of silent data loss.

Recommended Actions:

  1. Fix the 4 critical issues before merge
  2. Add test coverage for checkpoint and ledger processing logic
  3. Consider implementing retry logic for production resilience

Once these issues are addressed, this will be a solid enhancement to the pipeline. Great work on the documentation and architecture! 👍


Reviewed by: Claude Code
Review Date: 2025-11-11
Files Changed: 10 files (+1958, -8)

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request significantly enhances the RPC source adapter by introducing checkpointing for crash recovery, supporting both batch and streaming modes for ledger processing, and adding comprehensive pipeline configurations and documentation for WebSocket streaming and contract event processing.

Key Changes

  • Implemented checkpoint-based crash recovery system with save/load functionality and configurable intervals
  • Added batch mode (with defined start/end ledgers) and streaming mode (continuous processing) with automatic mode detection
  • Introduced new pipeline configurations for various use cases including USDC transfers, contract events, and WebSocket streaming

Reviewed Changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
source_adapter_rpc.go Core implementation of checkpointing, batch/streaming modes, and getLedgers loop with XDR decoding
config/examples/rpc-batch-mode.yaml Batch mode example configuration with historical backfill settings and checkpoint configuration
config/examples/rpc-streaming-mode.yaml Streaming mode example with continuous processing and frequent checkpoint saves
config/base/websocket_streaming.yaml Real-time WebSocket streaming pipeline configuration for contract events
config/base/usdc_transfers_rpc.yaml USDC transfer monitoring pipeline with mainnet configuration
config/base/contract_events_rpc.yaml Generic contract events pipeline with multiple filter examples
config/base/test_websocket_client.py Python test client for WebSocket server integration testing
config/base/README_WEBSOCKET.md Comprehensive WebSocket consumer documentation with client examples
config/base/README_RPC_EVENTS.md Complete guide for RPC event filtering and pipeline configuration
.gitignore Added new documentation file to ignore list

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ioutil package is deprecated since Go 1.16. Use os.ReadFile and os.WriteFile instead.

Replace:

  • ioutil.ReadFile(path) with os.ReadFile(path)
  • ioutil.WriteFile(tmpPath, data, 0644) with os.WriteFile(tmpPath, data, 0644)

Then remove the io/ioutil import.

Suggested change
"io/ioutil"

Copilot uses AI. Check for mistakes.

for {
// In batch mode, check if we've reached the end
if !streamingMode && currentLedger >= endLedger {
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition currentLedger >= endLedger is incorrect for batch mode. If endLedger is meant to be exclusive (which is typical), this condition is correct. However, if endLedger is meant to be inclusive (the last ledger to process), this should be currentLedger > endLedger.

Additionally, when currentLedger == endLedger, the loop would exit without processing endLedger, which may not be the intended behavior. Please clarify the semantics of endLedger and adjust the condition accordingly.

Suggested change
if !streamingMode && currentLedger >= endLedger {
if !streamingMode && currentLedger > endLedger {

Copilot uses AI. Check for mistakes.
Comment on lines +818 to +819
} else if totalProcessed%checkpointInterval == 0 {
// In batch mode, save based on interval
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint save logic in batch mode has a potential issue. The condition totalProcessed%checkpointInterval == 0 will trigger when totalProcessed is an exact multiple of checkpointInterval, but this may not align with ledger boundaries as intended.

For example, if checkpointInterval=1000 and you process batches of 100 ledgers, the checkpoint would save at totalProcessed = 1000, 2000, etc. However, this means the checkpoint is saved after processing exactly 1000 ledgers total, regardless of when that occurs within a batch.

Consider whether the checkpoint should be based on ledger numbers instead (e.g., save every N ledgers by checking currentLedger % checkpointInterval == 0) for more predictable behavior.

Suggested change
} else if totalProcessed%checkpointInterval == 0 {
// In batch mode, save based on interval
} else if checkpointInterval > 0 && currentLedger%checkpointInterval == 0 {
// In batch mode, save checkpoint at ledger boundaries

Copilot uses AI. Check for mistakes.
Comment on lines +837 to +838
// Rate limiting
if s.config.PollInterval > 0 {
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rate limiting sleep is applied unconditionally on every iteration, even in batch mode when processing historical data. This can significantly slow down backfills unnecessarily.

Consider only applying rate limiting in streaming mode or when explicitly configured:

// Rate limiting - only in streaming mode or when poll_interval > 0
if streamingMode || s.config.PollInterval > 0 {
    time.Sleep(s.config.PollInterval)
}

This allows batch mode to run at full speed (when poll_interval: 0 is set) while still supporting rate limiting when needed.

Suggested change
// Rate limiting
if s.config.PollInterval > 0 {
// Rate limiting - only in streaming mode or when poll_interval > 0
if s.isStreamingMode() || s.config.PollInterval > 0 {

Copilot uses AI. Check for mistakes.
Comment on lines 376 to 380
// Calculate limit as the number of ledgers in the range
limit := to - from
if limit <= 0 {
limit = 1
}
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit calculation is incorrect. The current logic calculates to - from, but this represents the number of ledgers minus 1 (since the range is inclusive on both ends). For example, if from=100 and to=102, you have 3 ledgers (100, 101, 102), but the calculation yields 102-100=2.

Additionally, the comparison limit <= 0 won't work correctly when to < from, which should be validated earlier.

Suggested fix:

// Validate range
if to < from {
    return nil, fmt.Errorf("invalid range: to (%d) must be >= from (%d)", to, from)
}

// Calculate limit as number of ledgers (inclusive range)
limit := to - from + 1
Suggested change
// Calculate limit as the number of ledgers in the range
limit := to - from
if limit <= 0 {
limit = 1
}
// Validate range
if to < from {
return nil, fmt.Errorf("invalid range: to (%d) must be >= from (%d)", to, from)
}
// Calculate limit as number of ledgers (inclusive range)
limit := to - from + 1

Copilot uses AI. Check for mistakes.
} else {
// In batch mode, empty response is unexpected
log.Printf("WARNING: No ledgers returned for range %d-%d", currentLedger, batchEnd)
break
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In streaming mode, when no ledgers are returned and you break out of the inner if-else (line 771), the code continues to line 774 which tries to access ledgersResp.Result.Ledgers but we just confirmed it's empty (line 745). This will cause the loop to continue with ledgers := [] and skip the processing block, but then attempt to update counters with len(ledgers) = 0 (line 805).

While this doesn't cause an error, the logic flow is confusing. After the continue statement on line 759 or the context cancellation return on line 765, the code after line 772 should never execute. However, the break on line 770 exits only the inner if statement, not the loop, so execution continues to line 774 with an empty result.

This should be restructured to use continue instead of break on line 770 to skip to the next iteration of the main loop.

Suggested change
break
continue

Copilot uses AI. Check for mistakes.
batch_size: 10 # Fetch 10 ledgers per batch

# Polling configuration
poll_interval: 5 # Poll every 5 seconds when caught up (minimum 1s)
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "minimum 1s" for poll_interval, but this constraint is only mentioned in the code (line 709-711 in source_adapter_rpc.go) and enforced with a warning. Users might set a value less than 1 second expecting it to work as configured. Consider adding a more prominent note about this requirement:

poll_interval: 5  # Poll every 5 seconds when caught up (REQUIRED: minimum 1s in streaming mode)
Suggested change
poll_interval: 5 # Poll every 5 seconds when caught up (minimum 1s)
poll_interval: 5 # Poll every 5 seconds when caught up (REQUIRED: minimum 1s in streaming mode)

Copilot uses AI. Check for mistakes.
- "CCW67TSZV3SSS2HXMBQ5JFGCKJNXKZM7UQUWUZPUTHXSTZLEO7SJMI75"
topics:
# Transfer event signature: transfer(from: Address, to: Address, amount: i128)
# Base64 encoded "transfer"
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment describes the topic as "Transfer event signature: transfer(from: Address, to: Address, amount: i128)" and states "Base64 encoded 'transfer'", but the actual value shown is the full XDR encoding: AAAADwAAAAh0cmFuc2Zlcg==.

The simpler base64 encoding of just "transfer" would be dHJhbnNmZXI= (as correctly shown in README_RPC_EVENTS.md line 87-88). The XDR encoding includes additional bytes for type information.

Update the comment to clarify this is the XDR-encoded topic, not just base64:

# Transfer event signature: transfer(from: Address, to: Address, amount: i128)
# XDR-encoded topic (not just base64 "transfer")
- ["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]
Suggested change
# Base64 encoded "transfer"
# XDR-encoded topic (not just base64 "transfer")

Copilot uses AI. Check for mistakes.
)

# Run forever (blocking call)
ws.run_forever(ping_interval=30, ping_timeout=10)
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ping_interval and ping_timeout parameters are passed to run_forever(), but the WebSocket library's ping/pong mechanism needs careful handling. If the server doesn't respond to pings within the timeout period, the connection will be closed.

Consider adding error handling and reconnection logic, especially since this is meant to be a test client. Additionally, document these parameters in the file docstring so users understand their purpose:

# Run forever with keepalive settings
# ping_interval: Send ping every 30 seconds to keep connection alive
# ping_timeout: Wait up to 10 seconds for pong response
ws.run_forever(ping_interval=30, ping_timeout=10)

Copilot uses AI. Check for mistakes.
import websocket
import json
import sys
import time
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'time' is not used.

Suggested change
import time

Copilot uses AI. Check for mistakes.
@claude
Copy link

claude bot commented Nov 11, 2025

Pull Request Review: Update RPC Source Adapter

This PR introduces significant improvements for Stellar/Soroban RPC event processing with batch/streaming modes and checkpointing.


✅ Strengths

  1. Excellent Documentation - README_RPC_EVENTS.md (446 lines) and README_WEBSOCKET.md (668 lines) with clear examples
  2. Robust Checkpointing - Atomic saves, crash recovery, handles corrupted checkpoints
  3. Dual Mode Operation - Batch and Streaming modes with intelligent polling
  4. Comprehensive RPC Support - 11 RPC methods supported

⚠️ Critical Issues

1. Missing Stellar SDK Helper Methods (HIGH PRIORITY)

Per CLAUDE.md lines 11-24, code MUST use Stellar Go SDK helper methods. This PR manually decodes XDR at source_adapter_rpc.go:655-676.

Recommendation: Review Stellar SDK for helper methods that handle V1/V2/V3/V4 protocol differences automatically.

2. Error Handling Issues

a) Context cancellation checked only at end of loop (line 871) - long RPC calls cannot be interrupted
b) No retry logic - single RPC failure terminates pipeline (lines 744-748)

Recommendation: Implement exponential backoff retry for transient failures.

3. Configuration Validation

Issue: auth_header is required but should be optional (lines 196-198) for public RPC endpoints.

Recommendation: Make auth_header optional, default to empty string.

4. Type Safety

Issue: Extensive use of interface{} and unsafe type assertions (lines 790-794)

Recommendation: Define typed structs for RPC responses.

5. Test Coverage

Issue: No test files for checkpointing, batch/streaming modes, or error recovery.

Recommendation: Add unit tests and integration tests with mock RPC server.


🔧 Code Quality Improvements

  1. Magic Numbers - HTTP timeout hardcoded to 30s (line 233) - make configurable
  2. Code Duplication - Type conversion repeated at lines 218-222, 528-530, 545-550, 798-805
  3. Logging - Inconsistent formats, use structured logging

🔒 Security

  1. ✅ API keys use placeholders
  2. ⚠️ Checkpoint dir permissions 0755 (world-readable) - use 0700
  3. ⚠️ No rate limiting - poll_interval=0 allows rapid-fire requests

📊 Performance

  1. Memory - All ledgers loaded into memory simultaneously
  2. I/O - Checkpoint saved after EVERY batch in streaming mode

Recommendation: Add minimum time interval between checkpoints (e.g., 30s).


🎯 Recommendations Summary

High Priority:

  1. Use Stellar SDK helper methods per CLAUDE.md
  2. Add retry logic with exponential backoff
  3. Make auth_header optional
  4. Add unit tests

Medium Priority:
5. Proper context cancellation in RPC calls
6. Define typed structs instead of interface{}
7. Add input validation for ledger ranges
8. Extract type conversion helper

Low Priority:
9. Structured logging
10. Configurable RPC timeout
11. Rate limiting
12. Reduce checkpoint I/O frequency


✨ Overall Assessment

Pros: Excellent docs, robust checkpointing, clean dual-mode design
Cons: Missing SDK helpers (critical), limited error recovery, no tests

Recommendation: Request changes to address critical issues before merging. Foundation is strong but needs production-ready improvements.

Great work on documentation and checkpointing!

@tmosleyIII tmosleyIII merged commit eb6272b into main Nov 11, 2025
1 check passed
@tmosleyIII tmosleyIII deleted the feature/tmosley/update-rpc-source-adapter branch November 11, 2025 18:49
@claude
Copy link

claude bot commented Nov 11, 2025

Pull Request Review: Update RPC Source Adapter

Overview

This PR introduces significant enhancements to the RPC source adapter, adding support for batch/streaming modes with checkpointing, new pipeline configurations, and comprehensive documentation. Overall, this is a substantial improvement to the pipeline's reliability and flexibility.

🎯 Strengths

1. Excellent Documentation

  • Comprehensive README files for RPC events and WebSocket streaming
  • Clear examples with practical use cases
  • Well-commented configuration files
  • Helpful troubleshooting sections

2. Robust Checkpointing System

  • Crash recovery with atomic file writes (.tmp + rename pattern)
  • Proper state tracking with Checkpoint struct
  • Streaming mode saves after every batch (good for real-time pipelines)
  • Batch mode uses configurable intervals (efficient for backfills)

3. Good Architecture

  • Clear separation between batch and streaming modes
  • Generic RPC call support via CallRPC()
  • Proper XDR decoding with helpful comments explaining SDK usage
  • Well-structured configuration parsing

4. Production-Ready Features

  • Context cancellation support throughout
  • Proper error handling and logging
  • HTTP timeout configuration (30s)
  • Poll interval validation for streaming mode

🐛 Issues Found

CRITICAL: Potential Data Loss in Checkpoint Logic

Location: source_adapter_rpc.go:846-855

// Advance to next ledger based on what was actually processed
if processedCount > 0 {
    // Resume from the ledger after the last one we successfully processed
    currentLedger = lastProcessedLedger + 1
} else {
    // No ledgers processed - in streaming mode this means caught up, in batch mode skip the range
    if !streamingMode {
        log.Printf("WARNING: No ledgers processed in range %d-%d, advancing past range", currentLedger, batchEnd)
    }
    currentLedger = batchEnd  // ⚠️ PROBLEM: Skips ledgers in batch mode
}

Problem: In batch mode, when processedCount == 0 (e.g., all ledgers failed to decode), the code advances currentLedger = batchEnd, skipping an entire batch of potentially valid ledgers. This could lead to data loss.

Scenario:

  1. Batch fetches ledgers 1000-1100
  2. All 100 ledgers fail to decode (network issue, bad data, etc.)
  3. Code sets currentLedger = 1100, skipping 100 ledgers forever
  4. Next iteration starts at 1100, never retrying 1000-1099

Recommended Fix:

} else {
    if !streamingMode {
        log.Printf("ERROR: No ledgers processed in range %d-%d, retrying...", currentLedger, batchEnd)
        // Don't advance - retry the same range
        // Add exponential backoff or max retries to prevent infinite loops
        time.Sleep(time.Duration(retryCount) * s.config.PollInterval)
    }
    currentLedger = batchEnd  // Only advance in streaming mode
}

MAJOR: Race Condition in Stats Tracking

Location: source_adapter_rpc.go:48-54, 183

type EventStats struct {
    ProcessedEvents  uint64
    SuccessfulEvents uint64
    FailedEvents     uint64
    LastLedger       uint64
    LastEventTime    time.Time  // ⚠️ Not safe for concurrent access
}

type SorobanSourceAdapter struct {
    // ...
    stats      EventStats  // ⚠️ No mutex protection
}

Problem: EventStats is accessed from multiple goroutines (processors) without synchronization. While not currently causing issues (single-threaded processing), this could break if processors become concurrent.

Recommended Fix:

type SorobanSourceAdapter struct {
    config     SorobanConfig
    processors []cdpProcessor.Processor
    client     *http.Client
    lastLedger uint64
    stats      EventStats
    statsMux   sync.RWMutex  // Add mutex for thread-safe stats access
}

MODERATE: Configuration Type Assertion Issues

Location: source_adapter_rpc.go:196-199

authHeader, ok := config["auth_header"].(string)
if !ok {
    return nil, errors.New("auth_header must be specified")
}

Problem: Requires auth_header even when using public RPC endpoints. The error message says "must be specified" but documentation shows it can be empty "".

Recommended Fix:

authHeader := ""
if ah, ok := config["auth_header"].(string); ok {
    authHeader = ah
}

MODERATE: Checkpoint Race Condition on Shutdown

Location: source_adapter_rpc.go:779-784, 884-888

Problem: When context is cancelled, checkpoint is saved immediately. However, if another iteration also tries to save a checkpoint concurrently, there could be a race condition.

Scenario:

  1. Thread 1: Context cancelled at line 884
  2. Thread 2: Checkpoint interval reached at line 867
  3. Both threads call saveCheckpoint() simultaneously
  4. .tmp file might be corrupted or overwritten

Recommended Fix:
Add a mutex to protect checkpoint operations:

type SorobanSourceAdapter struct {
    // ...
    checkpointMux sync.Mutex
}

func (s *SorobanSourceAdapter) saveCheckpoint(ledger, totalProcessed uint64) error {
    s.checkpointMux.Lock()
    defer s.checkpointMux.Unlock()
    // ... existing code
}

MINOR: Hardcoded Values

Location: source_adapter_rpc.go:233, 266

client:     &http.Client{Timeout: 30 * time.Second},  // Hardcoded timeout

Issue: HTTP timeout is hardcoded. For large batches or slow RPC endpoints, 30s might be insufficient.

Recommendation: Make configurable:

timeout := 30 * time.Second
if t, ok := config["rpc_timeout"].(int); ok {
    timeout = time.Duration(t) * time.Second
}
client: &http.Client{Timeout: timeout},

MINOR: Incomplete Error Context

Location: source_adapter_rpc.go:822-824

ledgerCloseMeta, err := decodeLedgerXDR(ledgerMap)
if err != nil {
    log.Printf("ERROR: Failed to decode ledger XDR for sequence %d: %v", ledgerSeq, err)
    continue  // ⚠️ Silently skips ledger
}

Issue: When XDR decoding fails, the ledger is skipped without incrementing any error counter or saving state. If this happens frequently, it's hard to detect.

Recommendation:

if err != nil {
    log.Printf("ERROR: Failed to decode ledger XDR for sequence %d: %v", ledgerSeq, err)
    s.stats.FailedEvents++  // Track failures
    continue
}

MINOR: Documentation Inconsistency

Location: config/base/README_RPC_EVENTS.md:88

echo -n "transfer" | base64
# Output: dHJhbnNmZXI=
# Full XDR encoding: AAAADwAAAAh0cmFuc2Zlcg==

Issue: The comment says "Full XDR encoding" but doesn't explain how to go from dHJhbnNmZXI= to AAAADwAAAAh0cmFuc2Zlcg==. This could confuse users.

Recommendation: Add a note explaining this is the XDR-encoded SCVal representation, or provide a link to Stellar docs.


🔒 Security Considerations

1. API Key Exposure

The example configs have placeholders like Api-Key YOUR_API_KEY. Good practice, but add a note:

# IMPORTANT: Never commit real API keys to version control!
# Use environment variables or secret management instead
auth_header: "${OBSRVR_API_KEY}"  # Example: read from env var

2. File Path Traversal

Location: source_adapter_rpc.go:581-595

dir := "/tmp/checkpoints/soroban-source"
if checkpointDir, ok := s.config.Extra["checkpoint_dir"].(string); ok {
    dir = checkpointDir  // ⚠️ No validation
}

Issue: User-supplied checkpoint_dir is used directly without path validation. A malicious config could write to /etc/passwd or other sensitive locations.

Recommendation:

if checkpointDir, ok := s.config.Extra["checkpoint_dir"].(string); ok {
    // Validate path doesn't escape intended directory
    cleaned := filepath.Clean(checkpointDir)
    if !filepath.IsAbs(cleaned) || strings.Contains(cleaned, "..") {
        log.Printf("WARNING: Invalid checkpoint_dir: %s, using default", checkpointDir)
    } else {
        dir = cleaned
    }
}

🧪 Test Coverage

Current State: No test files (*_test.go) exist for the new functionality.

Recommended Tests:

  1. Unit tests:

    • TestCheckpointSaveLoad() - Verify checkpoint persistence
    • TestDecodeLedgerXDR() - Test XDR decoding with various inputs
    • TestBatchVsStreamingMode() - Verify mode detection logic
    • TestConfigParsing() - Test type assertions and edge cases
  2. Integration tests:

    • Mock RPC server for testing GetLedgers flow
    • Test crash recovery (save checkpoint, restart, verify resume)
    • Test empty batch handling

Example:

func TestCheckpointSaveLoad(t *testing.T) {
    adapter := &SorobanSourceAdapter{
        config: SorobanConfig{Extra: map[string]interface{}{}},
    }
    
    err := adapter.saveCheckpoint(12345, 100)
    require.NoError(t, err)
    
    checkpoint, err := adapter.loadCheckpoint()
    require.NoError(t, err)
    assert.Equal(t, uint64(12345), checkpoint.LastProcessedLedger)
}

📊 Performance Considerations

1. Memory Usage in Large Batches

ledgers := ledgersResp.Result.Ledgers  // Entire batch loaded into memory

With batch_size: 500, this could consume significant memory for ledgers with many transactions. Consider adding a max memory limit or warning.

2. No Connection Pooling

client: &http.Client{Timeout: 30 * time.Second},

The HTTP client doesn't configure MaxIdleConns or MaxConnsPerHost. For high-throughput scenarios, this could lead to connection exhaustion.

Recommendation:

client: &http.Client{
    Timeout: 30 * time.Second,
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxConnsPerHost:     10,
        IdleConnTimeout:     90 * time.Second,
    },
}

✅ Follows CLAUDE.md Guidelines

Good Examples:

  1. Line 655-668: Excellent comment explaining why direct XDR access is acceptable here (RPC response decoding, not transaction processing)
  2. Documentation: Well-structured docs match the project's style
  3. Configuration: Follows existing YAML patterns in config/base/

Suggestions:

  • Consider adding migration notes if existing users have custom RPC configurations
  • Add example showing integration with Stellar SDK helpers in downstream processors

🎓 Best Practices Followed

✅ Atomic file writes for checkpoints (.tmp + rename)
✅ Context cancellation support throughout
✅ Comprehensive logging with appropriate levels
✅ Clear separation of concerns (RPC calls, checkpoint logic, processing loop)
✅ Helpful error messages with context
✅ Configuration validation with defaults


📝 Recommendations Summary

Must Fix Before Merge:

  1. ❗ Fix data loss issue in batch mode when processedCount == 0 (line 846-855)
  2. ❗ Make auth_header optional (line 196-199)
  3. ❗ Add path validation for checkpoint_dir (security)

Should Fix:

  1. Add mutex protection for stats and checkpoints (concurrency safety)
  2. Add basic unit tests for checkpoint logic
  3. Make HTTP timeout configurable
  4. Track failed ledger decoding in stats

Nice to Have:

  1. Add environment variable support for API keys in examples
  2. Configure HTTP connection pooling
  3. Add XDR encoding explanation in docs
  4. Add memory usage warnings for large batches

🎉 Conclusion

This is a well-designed, production-quality PR that significantly enhances the pipeline's capabilities. The checkpointing system is solid, documentation is excellent, and the streaming/batch mode distinction is clear.

The critical issues are straightforward to fix, and once addressed, this will be a valuable addition to the codebase.

Recommendation: Approve with changes requested for the critical issues.

Great work! 👏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants