From 6741e420052fd8455f1d5984ec28e5db7b1b8b6f Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 6 Mar 2025 13:15:38 -0600 Subject: [PATCH] recovery db + fix some errors --- packages/pds/src/scripts/publish-identity.ts | 2 +- .../src/scripts/sequencer-recovery/index.ts | 9 +- .../scripts/sequencer-recovery/recoverer.ts | 108 ++++++++++++------ .../scripts/sequencer-recovery/recovery-db.ts | 56 +++++++++ 4 files changed, 133 insertions(+), 42 deletions(-) create mode 100644 packages/pds/src/scripts/sequencer-recovery/recovery-db.ts diff --git a/packages/pds/src/scripts/publish-identity.ts b/packages/pds/src/scripts/publish-identity.ts index 01da70773e6..cfef76a9bc9 100644 --- a/packages/pds/src/scripts/publish-identity.ts +++ b/packages/pds/src/scripts/publish-identity.ts @@ -1,7 +1,7 @@ import fs from 'node:fs/promises' +import { wait } from '@atproto/common' import { Sequencer } from '../sequencer' import { parseIntArg } from './util' -import { wait } from '@atproto/common' type Context = { sequencer: Sequencer diff --git a/packages/pds/src/scripts/sequencer-recovery/index.ts b/packages/pds/src/scripts/sequencer-recovery/index.ts index cfa62de0af0..d7b4cf69cac 100644 --- a/packages/pds/src/scripts/sequencer-recovery/index.ts +++ b/packages/pds/src/scripts/sequencer-recovery/index.ts @@ -6,6 +6,7 @@ import { ImageUrlBuilder } from '../../image/image-url-builder' import { Sequencer } from '../../sequencer' import { parseIntArg } from '../util' import { Recoverer, RecovererContext } from './recoverer' +import { getAndMigrateRecoveryDb } from './recovery-db' export const sequencerRecovery = async ( ctx: RecovererContext, @@ -15,10 +16,9 @@ export const sequencerRecovery = async ( const concurrency = args[1] ? parseIntArg(args[1]) : 10 const recover = new Recoverer(ctx, { - cursor, concurrency, }) - await recover.run() + await recover.run(cursor) } const run = async () => { @@ -45,8 +45,9 @@ const run = async () => { {} as any, '', ) - const ctx = { sequencer, accountManager, actorStore } - return sequencerRecovery(ctx, ['14775345']) + const recoveryDb = await getAndMigrateRecoveryDb('./backup/recovery.sqlite') + const ctx = { sequencer, accountManager, actorStore, recoveryDb } + return sequencerRecovery(ctx, []) } run() diff --git a/packages/pds/src/scripts/sequencer-recovery/recoverer.ts b/packages/pds/src/scripts/sequencer-recovery/recoverer.ts index d083e5bf370..10dd5764340 100644 --- a/packages/pds/src/scripts/sequencer-recovery/recoverer.ts +++ b/packages/pds/src/scripts/sequencer-recovery/recoverer.ts @@ -12,6 +12,7 @@ import { import { AccountManager, AccountStatus } from '../../account-manager' import { ActorStore } from '../../actor-store/actor-store' import { ActorStoreTransactor } from '../../actor-store/actor-store-transactor' +import { countAll } from '../../db' import { PreparedWrite, prepareCreate, @@ -19,32 +20,62 @@ import { prepareUpdate, } from '../../repo' import { AccountEvt, CommitEvt, SeqEvt, Sequencer } from '../../sequencer' +import { RecoveryDb } from './recovery-db' import { UserQueues } from './user-queues' export type RecovererContext = { + recoveryDb: RecoveryDb sequencer: Sequencer accountManager: AccountManager actorStore: ActorStore } +const PAGE_SIZE = 5000 + export class Recoverer { - cursor: number queues: UserQueues + failed: Set constructor( public ctx: RecovererContext, - opts: { cursor: number; concurrency: number }, + opts: { concurrency: number }, ) { - this.cursor = opts.cursor this.queues = new UserQueues(opts.concurrency) + this.failed = new Set() } - async run() { - let done = false - while (!done) { - done = await this.loadNextPage() + async run(startCursor = 0) { + const failed = await this.ctx.recoveryDb.db + .selectFrom('failed') + .select('did') + .execute() + for (const row of failed) { + this.failed.add(row.did) + } + + const totalRes = await this.ctx.sequencer.db.db + .selectFrom('repo_seq') + .select(countAll.as('count')) + .executeTakeFirstOrThrow() + const totalEvts = totalRes.count + let completed = 0 + + let cursor: number | undefined = startCursor + while (cursor !== undefined) { + const page = await this.ctx.sequencer.requestSeqRange({ + earliestSeq: cursor, + limit: PAGE_SIZE, + }) + page.forEach((evt) => this.processEvent(evt)) + cursor = page.at(-1)?.seq + await this.queues.onEmpty() + + completed += PAGE_SIZE + const percentComplete = (completed / totalEvts) * 100 + console.log(`${percentComplete.toFixed(2)}% - ${cursor}`) } + await this.queues.processAll() } @@ -56,22 +87,6 @@ export class Recoverer { await this.queues.destroy() } - private async loadNextPage(): Promise { - const page = await this.ctx.sequencer.requestSeqRange({ - earliestSeq: this.cursor, - limit: 5000, - }) - console.log('PAGE: ', page.at(-1)?.seq) - page.forEach((evt) => this.processEvent(evt)) - const lastEvt = page.at(-1) - if (!lastEvt) { - return true - } else { - this.cursor = lastEvt.seq - return false - } - } - processEvent(evt: SeqEvt) { // only need to process commits & tombstones if (evt.type === 'account') { @@ -85,6 +100,9 @@ export class Recoverer { processCommit(evt: CommitEvt) { const did = evt.repo this.queues.addToUser(did, async () => { + if (this.failed.has(did)) { + return + } const { writes, blocks } = await parseCommitEvt(evt) if (evt.since === null) { const actorExists = await this.ctx.actorStore.exists(did) @@ -109,10 +127,8 @@ export class Recoverer { this.trackBlobs(actorTxn, writes), ]) }) - .catch((err) => { - console.log(evt.repo) - console.log(writes) - throw err + .catch(async (err) => { + await this.trackFailure(did, err) }) }) } @@ -132,7 +148,6 @@ export class Recoverer { } } } - async processRepoCreation( evt: CommitEvt, writes: PreparedWrite[], @@ -157,7 +172,7 @@ export class Recoverer { actorTxn.repo.blob.processWriteBlobs(commit.rev, writes), ]), ) - console.log(`created repo and keypair for ${did}`) + await this.trackNewAccount(did) } processAccountEvt(evt: AccountEvt) { @@ -167,16 +182,35 @@ export class Recoverer { } const did = evt.did this.queues.addToUser(did, async () => { - try { - const { directory } = await this.ctx.actorStore.getLocation(did) - await rmIfExists(directory, true) - await this.ctx.accountManager.deleteAccount(did) - } catch (err) { - console.log('DID:', did) - throw err - } + const { directory } = await this.ctx.actorStore.getLocation(did) + await rmIfExists(directory, true) + await this.ctx.accountManager.deleteAccount(did) }) } + + async trackFailure(did: string, err: unknown) { + this.failed.add(did) + await this.ctx.recoveryDb.db + .insertInto('failed') + .values({ + did, + error: err?.toString(), + fixed: 0, + }) + .onConflict((oc) => oc.doNothing()) + .execute() + } + + async trackNewAccount(did: string) { + await this.ctx.recoveryDb.db + .insertInto('new_account') + .values({ + did, + published: 0, + }) + .onConflict((oc) => oc.doNothing()) + .execute() + } } const parseCommitEvt = async ( diff --git a/packages/pds/src/scripts/sequencer-recovery/recovery-db.ts b/packages/pds/src/scripts/sequencer-recovery/recovery-db.ts new file mode 100644 index 00000000000..c32c0068c3d --- /dev/null +++ b/packages/pds/src/scripts/sequencer-recovery/recovery-db.ts @@ -0,0 +1,56 @@ +import { Kysely } from 'kysely' +import { Database, Migrator } from '../../db' + +export interface NewAccount { + did: string + published: 0 | 1 +} + +export interface Failed { + did: string + error: string | null + fixed: 0 | 1 +} + +export type RecoveryDbSchema = { + new_account: NewAccount + failed: Failed +} + +export type RecoveryDb = Database + +export const getAndMigrateRecoveryDb = async ( + location: string, + disableWalAutoCheckpoint = false, +): Promise => { + const pragmas: Record = disableWalAutoCheckpoint + ? { wal_autocheckpoint: '0' } + : {} + const db = Database.sqlite(location, pragmas) + const migrator = new Migrator(db.db, migrations) + await migrator.migrateToLatestOrThrow() + return db +} + +const migrations = { + '001': { + up: async (db: Kysely) => { + await db.schema + .createTable('new_account') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .addColumn('published', 'int2', (col) => col.notNull()) + .execute() + + await db.schema + .createTable('failed') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .addColumn('error', 'varchar') + .addColumn('fixed', 'int2', (col) => col.notNull()) + .execute() + }, + down: async (db: Kysely) => { + await db.schema.dropTable('new_account').execute() + await db.schema.dropTable('failed').execute() + }, + }, +}