Skip to content

Commit

Permalink
recovery db + fix some errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Mar 6, 2025
1 parent 9612287 commit 6741e42
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 42 deletions.
2 changes: 1 addition & 1 deletion packages/pds/src/scripts/publish-identity.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs from 'node:fs/promises'
import { wait } from '@atproto/common'
import { Sequencer } from '../sequencer'
import { parseIntArg } from './util'
import { wait } from '@atproto/common'

type Context = {
sequencer: Sequencer
Expand Down
9 changes: 5 additions & 4 deletions packages/pds/src/scripts/sequencer-recovery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ImageUrlBuilder } from '../../image/image-url-builder'
import { Sequencer } from '../../sequencer'
import { parseIntArg } from '../util'
import { Recoverer, RecovererContext } from './recoverer'
import { getAndMigrateRecoveryDb } from './recovery-db'

export const sequencerRecovery = async (
ctx: RecovererContext,
Expand All @@ -15,10 +16,9 @@ export const sequencerRecovery = async (
const concurrency = args[1] ? parseIntArg(args[1]) : 10

const recover = new Recoverer(ctx, {
cursor,
concurrency,
})
await recover.run()
await recover.run(cursor)
}

const run = async () => {
Expand All @@ -45,8 +45,9 @@ const run = async () => {
{} as any,
'',
)
const ctx = { sequencer, accountManager, actorStore }
return sequencerRecovery(ctx, ['14775345'])
const recoveryDb = await getAndMigrateRecoveryDb('./backup/recovery.sqlite')
const ctx = { sequencer, accountManager, actorStore, recoveryDb }
return sequencerRecovery(ctx, [])
}

run()
108 changes: 71 additions & 37 deletions packages/pds/src/scripts/sequencer-recovery/recoverer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,70 @@ import {
import { AccountManager, AccountStatus } from '../../account-manager'
import { ActorStore } from '../../actor-store/actor-store'
import { ActorStoreTransactor } from '../../actor-store/actor-store-transactor'
import { countAll } from '../../db'
import {
PreparedWrite,
prepareCreate,
prepareDelete,
prepareUpdate,
} from '../../repo'
import { AccountEvt, CommitEvt, SeqEvt, Sequencer } from '../../sequencer'
import { RecoveryDb } from './recovery-db'
import { UserQueues } from './user-queues'

export type RecovererContext = {
recoveryDb: RecoveryDb
sequencer: Sequencer
accountManager: AccountManager
actorStore: ActorStore
}

const PAGE_SIZE = 5000

export class Recoverer {
cursor: number
queues: UserQueues
failed: Set<string>

constructor(
public ctx: RecovererContext,
opts: { cursor: number; concurrency: number },
opts: { concurrency: number },
) {
this.cursor = opts.cursor
this.queues = new UserQueues(opts.concurrency)
this.failed = new Set()
}

async run() {
let done = false
while (!done) {
done = await this.loadNextPage()
async run(startCursor = 0) {
const failed = await this.ctx.recoveryDb.db
.selectFrom('failed')
.select('did')
.execute()
for (const row of failed) {
this.failed.add(row.did)
}

const totalRes = await this.ctx.sequencer.db.db
.selectFrom('repo_seq')
.select(countAll.as('count'))
.executeTakeFirstOrThrow()
const totalEvts = totalRes.count
let completed = 0

let cursor: number | undefined = startCursor
while (cursor !== undefined) {
const page = await this.ctx.sequencer.requestSeqRange({
earliestSeq: cursor,
limit: PAGE_SIZE,
})
page.forEach((evt) => this.processEvent(evt))
cursor = page.at(-1)?.seq

await this.queues.onEmpty()

completed += PAGE_SIZE
const percentComplete = (completed / totalEvts) * 100
console.log(`${percentComplete.toFixed(2)}% - ${cursor}`)
}

await this.queues.processAll()
}

Expand All @@ -56,22 +87,6 @@ export class Recoverer {
await this.queues.destroy()
}

private async loadNextPage(): Promise<boolean> {
const page = await this.ctx.sequencer.requestSeqRange({
earliestSeq: this.cursor,
limit: 5000,
})
console.log('PAGE: ', page.at(-1)?.seq)
page.forEach((evt) => this.processEvent(evt))
const lastEvt = page.at(-1)
if (!lastEvt) {
return true
} else {
this.cursor = lastEvt.seq
return false
}
}

processEvent(evt: SeqEvt) {
// only need to process commits & tombstones
if (evt.type === 'account') {
Expand All @@ -85,6 +100,9 @@ export class Recoverer {
processCommit(evt: CommitEvt) {
const did = evt.repo
this.queues.addToUser(did, async () => {
if (this.failed.has(did)) {
return
}
const { writes, blocks } = await parseCommitEvt(evt)
if (evt.since === null) {
const actorExists = await this.ctx.actorStore.exists(did)
Expand All @@ -109,10 +127,8 @@ export class Recoverer {
this.trackBlobs(actorTxn, writes),
])
})
.catch((err) => {
console.log(evt.repo)
console.log(writes)
throw err
.catch(async (err) => {
await this.trackFailure(did, err)
})
})
}
Expand All @@ -132,7 +148,6 @@ export class Recoverer {
}
}
}

async processRepoCreation(
evt: CommitEvt,
writes: PreparedWrite[],
Expand All @@ -157,7 +172,7 @@ export class Recoverer {
actorTxn.repo.blob.processWriteBlobs(commit.rev, writes),
]),
)
console.log(`created repo and keypair for ${did}`)
await this.trackNewAccount(did)
}

processAccountEvt(evt: AccountEvt) {
Expand All @@ -167,16 +182,35 @@ export class Recoverer {
}
const did = evt.did
this.queues.addToUser(did, async () => {
try {
const { directory } = await this.ctx.actorStore.getLocation(did)
await rmIfExists(directory, true)
await this.ctx.accountManager.deleteAccount(did)
} catch (err) {
console.log('DID:', did)
throw err
}
const { directory } = await this.ctx.actorStore.getLocation(did)
await rmIfExists(directory, true)
await this.ctx.accountManager.deleteAccount(did)
})
}

async trackFailure(did: string, err: unknown) {
this.failed.add(did)
await this.ctx.recoveryDb.db
.insertInto('failed')
.values({
did,
error: err?.toString(),
fixed: 0,
})
.onConflict((oc) => oc.doNothing())
.execute()
}

async trackNewAccount(did: string) {
await this.ctx.recoveryDb.db
.insertInto('new_account')
.values({
did,
published: 0,
})
.onConflict((oc) => oc.doNothing())
.execute()
}
}

const parseCommitEvt = async (
Expand Down
56 changes: 56 additions & 0 deletions packages/pds/src/scripts/sequencer-recovery/recovery-db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Kysely } from 'kysely'
import { Database, Migrator } from '../../db'

export interface NewAccount {
did: string
published: 0 | 1
}

export interface Failed {
did: string
error: string | null
fixed: 0 | 1
}

export type RecoveryDbSchema = {
new_account: NewAccount
failed: Failed
}

export type RecoveryDb = Database<RecoveryDbSchema>

export const getAndMigrateRecoveryDb = async (
location: string,
disableWalAutoCheckpoint = false,
): Promise<RecoveryDb> => {
const pragmas: Record<string, string> = disableWalAutoCheckpoint
? { wal_autocheckpoint: '0' }
: {}
const db = Database.sqlite(location, pragmas)
const migrator = new Migrator(db.db, migrations)
await migrator.migrateToLatestOrThrow()
return db
}

const migrations = {
'001': {
up: async (db: Kysely<unknown>) => {
await db.schema
.createTable('new_account')
.addColumn('did', 'varchar', (col) => col.primaryKey())
.addColumn('published', 'int2', (col) => col.notNull())
.execute()

await db.schema
.createTable('failed')
.addColumn('did', 'varchar', (col) => col.primaryKey())
.addColumn('error', 'varchar')
.addColumn('fixed', 'int2', (col) => col.notNull())
.execute()
},
down: async (db: Kysely<unknown>) => {
await db.schema.dropTable('new_account').execute()
await db.schema.dropTable('failed').execute()
},
},
}

0 comments on commit 6741e42

Please sign in to comment.