Skip to content

Commit

Permalink
fix: Split sync health job into 10 min chunks to limit memory usage (#…
Browse files Browse the repository at this point in the history
…2345)

## Why is this change needed?

Hubs might be running into memory pressure because divergingSyncIds is
examining too many messages at once. Chunk sync health job to 10 minute
intervals to reduce memory usage.


## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.


<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on optimizing the `syncHealthJob` by splitting the
processing of sync health messages into 10-minute intervals to reduce
memory usage.

### Detailed summary
- Introduced a loop to process `syncHealthMessageStats` in 10-minute
chunks.
- Adjusted the handling of `syncHealthMessageStats` and error logging.
- Updated the process for pushing diverging sync IDs and logging
results.
- Improved logging details for computed sync health stats.

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sanjayprabhu authored Oct 1, 2024
1 parent 6408496 commit dbf1f15
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
5 changes: 5 additions & 0 deletions .changeset/gentle-cobras-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Split sync health job into 10 min chunks to limit memory usage
95 changes: 50 additions & 45 deletions apps/hubble/src/network/sync/syncHealthJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,58 +213,63 @@ export class MeasureSyncHealthJobScheduler {

const syncHealthProbe = new SyncHealthProbe(this._metadataRetriever, peerMetadataRetriever);

const syncHealthMessageStats = await syncHealthProbe.computeSyncHealthMessageStats(
new Date(startTime),
new Date(stopTime),
);
// Split the start and stop time into 10 minute intervals, so we don't have to process too many messages at once
const interval = 10 * 60 * 1000; // 10 minutes in milliseconds
for (let chunkStartTime = startTime; chunkStartTime < stopTime; chunkStartTime += interval) {
const chunkStopTime = Math.min(chunkStartTime + interval, stopTime);
const syncHealthMessageStats = await syncHealthProbe.computeSyncHealthMessageStats(
new Date(chunkStartTime),
new Date(chunkStopTime),
);

if (syncHealthMessageStats.isErr()) {
const contactInfo = this.contactInfoForLogs(peer);
log.info(
{
peerId: peer.identifier,
err: syncHealthMessageStats.error,
contactInfo,
},
`Error computing SyncHealth: ${syncHealthMessageStats.error}.`,
if (syncHealthMessageStats.isErr()) {
const contactInfo = this.contactInfoForLogs(peer);
log.info(
{
peerId: peer.identifier,
err: syncHealthMessageStats.error,
contactInfo,
},
`Error computing SyncHealth: ${syncHealthMessageStats.error}.`,
);
continue;
}

const resultsPushingToUs = await syncHealthProbe.tryPushingDivergingSyncIds(
new Date(chunkStartTime),
new Date(chunkStopTime),
"FromPeer",
);
continue;
}

const resultsPushingToUs = await syncHealthProbe.tryPushingDivergingSyncIds(
new Date(startTime),
new Date(stopTime),
"FromPeer",
);
if (resultsPushingToUs.isErr()) {
log.info(
{ peerId: peer.identifier, err: resultsPushingToUs.error },
`Error pushing new messages to ourself ${resultsPushingToUs.error}`,
);
continue;
}

const processedResults = await this.processSumbitResults(
resultsPushingToUs.value,
peer.identifier,
chunkStartTime,
chunkStopTime,
);

if (resultsPushingToUs.isErr()) {
log.info(
{ peerId: peer.identifier, err: resultsPushingToUs.error },
`Error pushing new messages to ourself ${resultsPushingToUs.error}`,
{
ourNumMessages: syncHealthMessageStats.value.primaryNumMessages,
theirNumMessages: syncHealthMessageStats.value.peerNumMessages,
syncHealth: syncHealthMessageStats.value.computeDiff(),
syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(),
resultsPushingToUs: processedResults,
peerId: peer.identifier,
startTime: chunkStartTime,
stopTime: chunkStopTime,
},
"Computed SyncHealth stats for peer",
);
continue;
}

const processedResults = await this.processSumbitResults(
resultsPushingToUs.value,
peer.identifier,
startTime,
stopTime,
);

log.info(
{
ourNumMessages: syncHealthMessageStats.value.primaryNumMessages,
theirNumMessages: syncHealthMessageStats.value.peerNumMessages,
syncHealth: syncHealthMessageStats.value.computeDiff(),
syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(),
resultsPushingToUs: processedResults,
peerId: peer.identifier,
startTime,
stopTime,
},
"Computed SyncHealth stats for peer",
);
}
}
}

0 comments on commit dbf1f15

Please sign in to comment.