Skip to content

Commit

Permalink
Sync event handling (#3612)
Browse files Browse the repository at this point in the history
* pds: add sync event to account creation

* changeset

* fix bsky subscription handler

* add sync events to @atproto/sync package

* more sync package fixup

* fix sequencer test

* fix sync tests

* clarify firehose event type

---------

Co-authored-by: dholms <dtholmgren@gmail.com>
  • Loading branch information
devinivy and dholms authored Mar 7, 2025
1 parent 8827ff4 commit eab9c00
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 97 deletions.
5 changes: 5 additions & 0 deletions .changeset/slimy-chicken-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Emit sync event on account creation
9 changes: 7 additions & 2 deletions packages/bsky/src/data-plane/server/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IdResolver } from '@atproto/identity'
import { WriteOpAction } from '@atproto/repo'
import { Firehose, MemoryRunner } from '@atproto/sync'
import { Event as FirehoseEvent, Firehose, MemoryRunner } from '@atproto/sync'
import { subLogger as log } from '../../logger'
import { BackgroundQueue } from './background'
import { Database } from './db'
Expand Down Expand Up @@ -70,7 +70,7 @@ const createFirehose = (opts: {
unauthenticatedHandles: true, // indexing service handles these
unauthenticatedCommits: true, // @TODO there seems to be a very rare issue where the authenticator thinks a block is missing in deletion ops
onError: (err) => log.error({ err }, 'error in subscription'),
handleEvent: async (evt) => {
handleEvent: async (evt: FirehoseEvent) => {
if (evt.event === 'identity') {
await indexingSvc.indexHandle(evt.did, evt.time, true)
} else if (evt.event === 'account') {
Expand All @@ -79,6 +79,11 @@ const createFirehose = (opts: {
} else {
await indexingSvc.updateActorStatus(evt.did, evt.active, evt.status)
}
} else if (evt.event === 'sync') {
await Promise.all([
indexingSvc.setCommitLastSeen(evt.did, evt.cid, evt.rev),
indexingSvc.indexHandle(evt.did, evt.time),
])
} else {
const indexFn =
evt.event === 'delete'
Expand Down
5 changes: 5 additions & 0 deletions packages/pds/src/api/com/atproto/server/createAccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { AppContext } from '../../../../context'
import { baseNormalizeAndValidate } from '../../../../handle'
import { Server } from '../../../../lexicon'
import { InputSchema as CreateAccountInput } from '../../../../lexicon/types/com/atproto/server/createAccount'
import { syncEvtDataFromCommit } from '../../../../sequencer'
import { safeResolveDidDoc } from './util'

export default function (server: Server, ctx: AppContext) {
Expand Down Expand Up @@ -75,6 +76,10 @@ export default function (server: Server, ctx: AppContext) {
await ctx.sequencer.sequenceIdentityEvt(did, handle)
await ctx.sequencer.sequenceAccountEvt(did, AccountStatus.Active)
await ctx.sequencer.sequenceCommit(did, commit)
await ctx.sequencer.sequenceSyncEvt(
did,
syncEvtDataFromCommit(commit),
)
}
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
await ctx.actorStore.clearReservedKeypair(signingKey.did(), did)
Expand Down
18 changes: 18 additions & 0 deletions packages/pds/src/sequencer/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from 'node:assert'
import { z } from 'zod'
import { cborEncode, noUndefinedVals, schema } from '@atproto/common'
import { BlockMap, blocksToCarFile } from '@atproto/repo'
Expand Down Expand Up @@ -53,6 +54,23 @@ export const formatSeqSyncEvt = async (
}
}

export const syncEvtDataFromCommit = (
commitData: CommitDataWithOps,
): SyncEvtData => {
const { blocks, missing } = commitData.relevantBlocks.getMany([
commitData.cid,
])
assert(
!missing.length,
'commit block was not found, could not build sync event',
)
return {
rev: commitData.rev,
cid: commitData.cid,
blocks,
}
}

export const formatSeqIdentityEvt = async (
did: string,
handle?: string,
Expand Down
4 changes: 2 additions & 2 deletions packages/pds/tests/sequencer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ describe('sequencer', () => {
await userSeed(sc)
alice = sc.dids.alice
bob = sc.dids.bob
// 14 events in userSeed
totalEvts = 14
// 18 events in userSeed
totalEvts = 18
})

beforeEach(async () => {
Expand Down
22 changes: 22 additions & 0 deletions packages/pds/tests/sync/subscribe-repos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,28 @@ describe('repo subscribe repos', () => {
return readFromGenerator(gen, isDone, waitFor)
}

it('emits sync event on account creation, matching temporary commit event.', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)

const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()

const syncEvts = getSyncEvts(evts)
const commitEvts = getCommitEvents(evts).slice(0, 4)
expect(syncEvts.length).toBe(4)

let i = 0
for (const did of [alice, bob, carol, dan]) {
const syncEvt = syncEvts[i]
const commitEvt = commitEvts[i]
await verifySyncEvent(syncEvt, did, commitEvt.commit, commitEvt.rev)
i++
}
})

it('sync backfilled events', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
Expand Down
12 changes: 11 additions & 1 deletion packages/sync/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { RepoRecord } from '@atproto/lexicon'
import { BlockMap } from '@atproto/repo'
import { AtUri } from '@atproto/syntax'

export type Event = CommitEvt | IdentityEvt | AccountEvt
export type Event = CommitEvt | SyncEvt | IdentityEvt | AccountEvt

export type CommitMeta = {
seq: number
Expand Down Expand Up @@ -36,6 +36,16 @@ export type Delete = CommitMeta & {
event: 'delete'
}

export type SyncEvt = {
seq: number
time: string
event: 'sync'
did: string
cid: CID
rev: string
blocks: BlockMap
}

export type IdentityEvt = {
seq: number
time: string
Expand Down
22 changes: 22 additions & 0 deletions packages/sync/src/firehose/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
formatDataKey,
parseDataKey,
readCar,
readCarWithRoot,
verifyProofs,
} from '@atproto/repo'
import { AtUri } from '@atproto/syntax'
Expand All @@ -23,6 +24,7 @@ import {
CommitMeta,
Event,
IdentityEvt,
SyncEvt,
} from '../events'
import { EventRunner } from '../runner'
import { didAndSeqForEvt } from '../util'
Expand All @@ -32,9 +34,11 @@ import {
type Identity,
type RepoEvent,
RepoOp,
type Sync,
isAccount,
isCommit,
isIdentity,
isSync,
isValidRepoEvent,
} from './lexicons'

Expand All @@ -57,6 +61,7 @@ export type FirehoseOptions = ClientOptions & {
excludeIdentity?: boolean
excludeAccount?: boolean
excludeCommit?: boolean
excludeSync?: boolean
}

export class Firehose {
Expand Down Expand Up @@ -147,6 +152,9 @@ export class Firehose {
this.opts.unauthenticatedHandles,
)
return parsed ? [parsed] : []
} else if (isSync(evt) && !this.opts.excludeSync) {
const parsed = await parseSync(evt)
return parsed ? [parsed] : []
} else {
return []
}
Expand Down Expand Up @@ -279,6 +287,20 @@ const formatCommitOps = async (evt: Commit, ops: RepoOp[]) => {
return evts
}

export const parseSync = async (evt: Sync): Promise<SyncEvt | null> => {
const car = await readCarWithRoot(evt.blocks)

return {
event: 'sync',
seq: evt.seq,
time: evt.time,
did: evt.did,
cid: car.root,
rev: evt.rev,
blocks: car.blocks,
}
}

export const parseIdentity = async (
idResolver: IdResolver,
evt: Identity,
Expand Down
Loading

0 comments on commit eab9c00

Please sign in to comment.