From 38320191e559f8b928c6e951a9b4a6207240bfc1 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Fri, 28 Feb 2025 17:19:18 -0600 Subject: [PATCH] Wrap sync semantics (#3585) * deprecate blobs & tooBig * add sync event, deprecate handle & tombstone * fix up tests * small tidy * add test for sync account on account activation * use new sync event in another place * remove deprecated events from lexicons * formatting * pr cleanup * changeset --- .changeset/late-eyes-promise.md | 6 + lexicons/com/atproto/sync/subscribeRepos.json | 44 +------ packages/api/src/client/lexicons.ts | 65 ----------- .../types/com/atproto/sync/subscribeRepos.ts | 56 --------- packages/bsky/src/lexicon/lexicons.ts | 81 ------------- .../types/com/atproto/repo/listRecords.ts | 4 - .../types/com/atproto/sync/getRecord.ts | 2 - .../types/com/atproto/sync/subscribeRepos.ts | 59 ---------- packages/ozone/src/lexicon/lexicons.ts | 65 ----------- .../types/com/atproto/sync/subscribeRepos.ts | 59 ---------- packages/pds/src/actor-store/repo/reader.ts | 11 ++ .../pds/src/actor-store/repo/transactor.ts | 19 ++- packages/pds/src/actor-store/repo/util.ts | 22 ---- .../api/com/atproto/admin/deleteAccount.ts | 3 +- .../com/atproto/admin/updateAccountHandle.ts | 1 - .../api/com/atproto/identity/updateHandle.ts | 1 - .../api/com/atproto/server/activateAccount.ts | 22 +--- .../api/com/atproto/server/deleteAccount.ts | 3 +- .../api/com/atproto/sync/subscribeRepos.ts | 11 +- packages/pds/src/lexicon/lexicons.ts | 65 ----------- .../types/com/atproto/sync/subscribeRepos.ts | 59 ---------- packages/pds/src/repo/types.ts | 9 +- packages/pds/src/scripts/rebuild-repo.ts | 5 +- packages/pds/src/sequencer/db/schema.ts | 9 +- packages/pds/src/sequencer/events.ts | 102 +++++----------- packages/pds/src/sequencer/sequencer.ts | 32 ++--- packages/pds/tests/account-deletion.test.ts | 8 +- packages/pds/tests/sequencer.test.ts | 45 +++---- .../pds/tests/sync/subscribe-repos.test.ts | 110 +++++++++++------- 29 files changed, 177 insertions(+), 801 deletions(-) create mode 100644 .changeset/late-eyes-promise.md delete mode 100644 packages/pds/src/actor-store/repo/util.ts diff --git a/.changeset/late-eyes-promise.md b/.changeset/late-eyes-promise.md new file mode 100644 index 00000000000..889d26b4be5 --- /dev/null +++ b/.changeset/late-eyes-promise.md @@ -0,0 +1,6 @@ +--- +"@atproto/api": patch +"@atproto/pds": patch +--- + +Wrap sync v1.1 semantics. Add #sync event to subscribeRepos and deprecate #handle and #tombstone events diff --git a/lexicons/com/atproto/sync/subscribeRepos.json b/lexicons/com/atproto/sync/subscribeRepos.json index b52a70d31f8..373b050f2b8 100644 --- a/lexicons/com/atproto/sync/subscribeRepos.json +++ b/lexicons/com/atproto/sync/subscribeRepos.json @@ -17,16 +17,7 @@ "message": { "schema": { "type": "union", - "refs": [ - "#commit", - "#sync", - "#identity", - "#account", - "#handle", - "#migrate", - "#tombstone", - "#info" - ] + "refs": ["#commit", "#sync", "#identity", "#account", "#info"] } }, "errors": [ @@ -186,39 +177,6 @@ } } }, - "handle": { - "type": "object", - "description": "DEPRECATED -- Use #identity event instead", - "required": ["seq", "did", "handle", "time"], - "properties": { - "seq": { "type": "integer" }, - "did": { "type": "string", "format": "did" }, - "handle": { "type": "string", "format": "handle" }, - "time": { "type": "string", "format": "datetime" } - } - }, - "migrate": { - "type": "object", - "description": "DEPRECATED -- Use #account event instead", - "required": ["seq", "did", "migrateTo", "time"], - "nullable": ["migrateTo"], - "properties": { - "seq": { "type": "integer" }, - "did": { "type": "string", "format": "did" }, - "migrateTo": { "type": "string" }, - "time": { "type": "string", "format": "datetime" } - } - }, - "tombstone": { - "type": "object", - "description": "DEPRECATED -- Use #account event instead", - "required": ["seq", "did", "time"], - "properties": { - "seq": { "type": "integer" }, - "did": { "type": "string", "format": "did" }, - "time": { "type": "string", "format": "datetime" } - } - }, "info": { "type": "object", "required": ["name"], diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 6463e7c6ec3..9384e884c23 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -3991,9 +3991,6 @@ export const schemaDict = { 'lex:com.atproto.sync.subscribeRepos#sync', 'lex:com.atproto.sync.subscribeRepos#identity', 'lex:com.atproto.sync.subscribeRepos#account', - 'lex:com.atproto.sync.subscribeRepos#handle', - 'lex:com.atproto.sync.subscribeRepos#migrate', - 'lex:com.atproto.sync.subscribeRepos#tombstone', 'lex:com.atproto.sync.subscribeRepos#info', ], }, @@ -4197,68 +4194,6 @@ export const schemaDict = { }, }, }, - handle: { - type: 'object', - description: 'DEPRECATED -- Use #identity event instead', - required: ['seq', 'did', 'handle', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - handle: { - type: 'string', - format: 'handle', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - migrate: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'migrateTo', 'time'], - nullable: ['migrateTo'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - migrateTo: { - type: 'string', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - tombstone: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, info: { type: 'object', required: ['name'], diff --git a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts index c46173b458d..1d3b628179a 100644 --- a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts @@ -122,62 +122,6 @@ export function validateAccount(v: V) { return validate(v, id, hashAccount) } -/** DEPRECATED -- Use #identity event instead */ -export interface Handle { - $type?: 'com.atproto.sync.subscribeRepos#handle' - seq: number - did: string - handle: string - time: string -} - -const hashHandle = 'handle' - -export function isHandle(v: V) { - return is$typed(v, id, hashHandle) -} - -export function validateHandle(v: V) { - return validate(v, id, hashHandle) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Migrate { - $type?: 'com.atproto.sync.subscribeRepos#migrate' - seq: number - did: string - migrateTo: string | null - time: string -} - -const hashMigrate = 'migrate' - -export function isMigrate(v: V) { - return is$typed(v, id, hashMigrate) -} - -export function validateMigrate(v: V) { - return validate(v, id, hashMigrate) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Tombstone { - $type?: 'com.atproto.sync.subscribeRepos#tombstone' - seq: number - did: string - time: string -} - -const hashTombstone = 'tombstone' - -export function isTombstone(v: V) { - return is$typed(v, id, hashTombstone) -} - -export function validateTombstone(v: V) { - return validate(v, id, hashTombstone) -} - export interface Info { $type?: 'com.atproto.sync.subscribeRepos#info' name: 'OutdatedCursor' | (string & {}) diff --git a/packages/bsky/src/lexicon/lexicons.ts b/packages/bsky/src/lexicon/lexicons.ts index 2468340216a..8df4d9c35c4 100644 --- a/packages/bsky/src/lexicon/lexicons.ts +++ b/packages/bsky/src/lexicon/lexicons.ts @@ -2035,16 +2035,6 @@ export const schemaDict = { cursor: { type: 'string', }, - rkeyStart: { - type: 'string', - description: - 'DEPRECATED: The lowest sort-ordered rkey to start from (exclusive)', - }, - rkeyEnd: { - type: 'string', - description: - 'DEPRECATED: The highest sort-ordered rkey to stop at (exclusive)', - }, reverse: { type: 'boolean', description: 'Flag to reverse the order of the returned records.', @@ -3585,12 +3575,6 @@ export const schemaDict = { description: 'Record Key', format: 'record-key', }, - commit: { - type: 'string', - format: 'cid', - description: - 'DEPRECATED: referenced a repo commit by CID, and retrieved record as of that commit', - }, }, }, output: { @@ -4007,9 +3991,6 @@ export const schemaDict = { 'lex:com.atproto.sync.subscribeRepos#sync', 'lex:com.atproto.sync.subscribeRepos#identity', 'lex:com.atproto.sync.subscribeRepos#account', - 'lex:com.atproto.sync.subscribeRepos#handle', - 'lex:com.atproto.sync.subscribeRepos#migrate', - 'lex:com.atproto.sync.subscribeRepos#tombstone', 'lex:com.atproto.sync.subscribeRepos#info', ], }, @@ -4213,68 +4194,6 @@ export const schemaDict = { }, }, }, - handle: { - type: 'object', - description: 'DEPRECATED -- Use #identity event instead', - required: ['seq', 'did', 'handle', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - handle: { - type: 'string', - format: 'handle', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - migrate: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'migrateTo', 'time'], - nullable: ['migrateTo'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - migrateTo: { - type: 'string', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - tombstone: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, info: { type: 'object', required: ['name'], diff --git a/packages/bsky/src/lexicon/types/com/atproto/repo/listRecords.ts b/packages/bsky/src/lexicon/types/com/atproto/repo/listRecords.ts index 573d34d70a8..df001bdb54f 100644 --- a/packages/bsky/src/lexicon/types/com/atproto/repo/listRecords.ts +++ b/packages/bsky/src/lexicon/types/com/atproto/repo/listRecords.ts @@ -20,10 +20,6 @@ export interface QueryParams { /** The number of records to return. */ limit: number cursor?: string - /** DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) */ - rkeyStart?: string - /** DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) */ - rkeyEnd?: string /** Flag to reverse the order of the returned records. */ reverse?: boolean } diff --git a/packages/bsky/src/lexicon/types/com/atproto/sync/getRecord.ts b/packages/bsky/src/lexicon/types/com/atproto/sync/getRecord.ts index 1c7a509b1b4..89051c859fa 100644 --- a/packages/bsky/src/lexicon/types/com/atproto/sync/getRecord.ts +++ b/packages/bsky/src/lexicon/types/com/atproto/sync/getRecord.ts @@ -19,8 +19,6 @@ export interface QueryParams { collection: string /** Record Key */ rkey: string - /** DEPRECATED: referenced a repo commit by CID, and retrieved record as of that commit */ - commit?: string } export type InputSchema = undefined diff --git a/packages/bsky/src/lexicon/types/com/atproto/sync/subscribeRepos.ts b/packages/bsky/src/lexicon/types/com/atproto/sync/subscribeRepos.ts index decfb210b25..318d096aa83 100644 --- a/packages/bsky/src/lexicon/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/bsky/src/lexicon/types/com/atproto/sync/subscribeRepos.ts @@ -22,9 +22,6 @@ export type OutputSchema = | $Typed | $Typed | $Typed - | $Typed - | $Typed - | $Typed | $Typed | { $type: string } export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'> @@ -150,62 +147,6 @@ export function validateAccount(v: V) { return validate(v, id, hashAccount) } -/** DEPRECATED -- Use #identity event instead */ -export interface Handle { - $type?: 'com.atproto.sync.subscribeRepos#handle' - seq: number - did: string - handle: string - time: string -} - -const hashHandle = 'handle' - -export function isHandle(v: V) { - return is$typed(v, id, hashHandle) -} - -export function validateHandle(v: V) { - return validate(v, id, hashHandle) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Migrate { - $type?: 'com.atproto.sync.subscribeRepos#migrate' - seq: number - did: string - migrateTo: string | null - time: string -} - -const hashMigrate = 'migrate' - -export function isMigrate(v: V) { - return is$typed(v, id, hashMigrate) -} - -export function validateMigrate(v: V) { - return validate(v, id, hashMigrate) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Tombstone { - $type?: 'com.atproto.sync.subscribeRepos#tombstone' - seq: number - did: string - time: string -} - -const hashTombstone = 'tombstone' - -export function isTombstone(v: V) { - return is$typed(v, id, hashTombstone) -} - -export function validateTombstone(v: V) { - return validate(v, id, hashTombstone) -} - export interface Info { $type?: 'com.atproto.sync.subscribeRepos#info' name: 'OutdatedCursor' | (string & {}) diff --git a/packages/ozone/src/lexicon/lexicons.ts b/packages/ozone/src/lexicon/lexicons.ts index 6463e7c6ec3..9384e884c23 100644 --- a/packages/ozone/src/lexicon/lexicons.ts +++ b/packages/ozone/src/lexicon/lexicons.ts @@ -3991,9 +3991,6 @@ export const schemaDict = { 'lex:com.atproto.sync.subscribeRepos#sync', 'lex:com.atproto.sync.subscribeRepos#identity', 'lex:com.atproto.sync.subscribeRepos#account', - 'lex:com.atproto.sync.subscribeRepos#handle', - 'lex:com.atproto.sync.subscribeRepos#migrate', - 'lex:com.atproto.sync.subscribeRepos#tombstone', 'lex:com.atproto.sync.subscribeRepos#info', ], }, @@ -4197,68 +4194,6 @@ export const schemaDict = { }, }, }, - handle: { - type: 'object', - description: 'DEPRECATED -- Use #identity event instead', - required: ['seq', 'did', 'handle', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - handle: { - type: 'string', - format: 'handle', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - migrate: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'migrateTo', 'time'], - nullable: ['migrateTo'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - migrateTo: { - type: 'string', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - tombstone: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, info: { type: 'object', required: ['name'], diff --git a/packages/ozone/src/lexicon/types/com/atproto/sync/subscribeRepos.ts b/packages/ozone/src/lexicon/types/com/atproto/sync/subscribeRepos.ts index decfb210b25..318d096aa83 100644 --- a/packages/ozone/src/lexicon/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/ozone/src/lexicon/types/com/atproto/sync/subscribeRepos.ts @@ -22,9 +22,6 @@ export type OutputSchema = | $Typed | $Typed | $Typed - | $Typed - | $Typed - | $Typed | $Typed | { $type: string } export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'> @@ -150,62 +147,6 @@ export function validateAccount(v: V) { return validate(v, id, hashAccount) } -/** DEPRECATED -- Use #identity event instead */ -export interface Handle { - $type?: 'com.atproto.sync.subscribeRepos#handle' - seq: number - did: string - handle: string - time: string -} - -const hashHandle = 'handle' - -export function isHandle(v: V) { - return is$typed(v, id, hashHandle) -} - -export function validateHandle(v: V) { - return validate(v, id, hashHandle) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Migrate { - $type?: 'com.atproto.sync.subscribeRepos#migrate' - seq: number - did: string - migrateTo: string | null - time: string -} - -const hashMigrate = 'migrate' - -export function isMigrate(v: V) { - return is$typed(v, id, hashMigrate) -} - -export function validateMigrate(v: V) { - return validate(v, id, hashMigrate) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Tombstone { - $type?: 'com.atproto.sync.subscribeRepos#tombstone' - seq: number - did: string - time: string -} - -const hashTombstone = 'tombstone' - -export function isTombstone(v: V) { - return is$typed(v, id, hashTombstone) -} - -export function validateTombstone(v: V) { - return validate(v, id, hashTombstone) -} - export interface Info { $type?: 'com.atproto.sync.subscribeRepos#info' name: 'OutdatedCursor' | (string & {}) diff --git a/packages/pds/src/actor-store/repo/reader.ts b/packages/pds/src/actor-store/repo/reader.ts index 6a327a62629..ebfccbb814d 100644 --- a/packages/pds/src/actor-store/repo/reader.ts +++ b/packages/pds/src/actor-store/repo/reader.ts @@ -1,4 +1,5 @@ import { BlobStore } from '@atproto/repo' +import { SyncEvtData } from '../../repo' import { BlobReader } from '../blob/reader' import { ActorDb } from '../db' import { RecordReader } from '../record/reader' @@ -17,4 +18,14 @@ export class RepoReader { this.record = new RecordReader(db) this.storage = new SqlRepoReader(db) } + + async getSyncEventData(): Promise { + const root = await this.storage.getRootDetailed() + const { blocks } = await this.storage.getBlocks([root.cid]) + return { + cid: root.cid, + rev: root.rev, + blocks, + } + } } diff --git a/packages/pds/src/actor-store/repo/transactor.ts b/packages/pds/src/actor-store/repo/transactor.ts index 06357ccd96d..f227f040078 100644 --- a/packages/pds/src/actor-store/repo/transactor.ts +++ b/packages/pds/src/actor-store/repo/transactor.ts @@ -18,7 +18,6 @@ import { ActorDb } from '../db' import { RecordTransactor } from '../record/transactor' import { RepoReader } from './reader' import { SqlRepoTransactor } from './sql-repo-transactor' -import { blobCidsFromWrites, commitOpsFromCreates } from './util' export class RepoTransactor extends RepoReader { blob: BlobTransactor @@ -61,10 +60,14 @@ export class RepoTransactor extends RepoReader { this.indexWrites(writes, commit.rev), this.blob.processWriteBlobs(commit.rev, writes), ]) + const ops = writes.map((w) => ({ + action: 'create' as const, + path: formatDataKey(w.uri.collection, w.uri.rkey), + cid: w.cid, + })) return { ...commit, - ops: commitOpsFromCreates(writes), - blobs: blobCidsFromWrites(writes), + ops, prevData: null, } } @@ -74,7 +77,16 @@ export class RepoTransactor extends RepoReader { swapCommitCid?: CID, ): Promise { this.db.assertTransaction() + if (writes.length > 200) { + throw new InvalidRequestError('Too many writes. Max: 200') + } + const commit = await this.formatCommit(writes, swapCommitCid) + // Do not allow commits > 2MB + if (commit.relevantBlocks.byteSize > 2000000) { + throw new InvalidRequestError('Too many writes. Max event size: 2MB') + } + await Promise.all([ // persist the commit to repo storage this.storage.applyCommit(commit), @@ -166,7 +178,6 @@ export class RepoTransactor extends RepoReader { return { ...commit, ops: commitOps, - blobs: blobCidsFromWrites(writes), prevData, } } diff --git a/packages/pds/src/actor-store/repo/util.ts b/packages/pds/src/actor-store/repo/util.ts deleted file mode 100644 index 6d2c7799a47..00000000000 --- a/packages/pds/src/actor-store/repo/util.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { CidSet, formatDataKey } from '@atproto/repo' -import { CommitOp, PreparedCreate, PreparedWrite } from '../../repo' - -export const blobCidsFromWrites = (writes: PreparedWrite[]): CidSet => { - const blobCids = new CidSet() - for (const w of writes) { - if (w.action === 'create' || w.action === 'update') { - for (const blob of w.blobs) { - blobCids.add(blob.cid) - } - } - } - return blobCids -} - -export const commitOpsFromCreates = (writes: PreparedCreate[]): CommitOp[] => { - return writes.map((w) => ({ - action: 'create' as const, - path: formatDataKey(w.uri.collection, w.uri.rkey), - cid: w.cid, - })) -} diff --git a/packages/pds/src/api/com/atproto/admin/deleteAccount.ts b/packages/pds/src/api/com/atproto/admin/deleteAccount.ts index 33251eabb97..20aa9fee9be 100644 --- a/packages/pds/src/api/com/atproto/admin/deleteAccount.ts +++ b/packages/pds/src/api/com/atproto/admin/deleteAccount.ts @@ -9,12 +9,11 @@ export default function (server: Server, ctx: AppContext) { const { did } = input.body await ctx.actorStore.destroy(did) await ctx.accountManager.deleteAccount(did) - const tombstoneSeq = await ctx.sequencer.sequenceTombstone(did) const accountSeq = await ctx.sequencer.sequenceAccountEvt( did, AccountStatus.Deleted, ) - await ctx.sequencer.deleteAllForUser(did, [accountSeq, tombstoneSeq]) + await ctx.sequencer.deleteAllForUser(did, [accountSeq]) }, }) } diff --git a/packages/pds/src/api/com/atproto/admin/updateAccountHandle.ts b/packages/pds/src/api/com/atproto/admin/updateAccountHandle.ts index 4b8beea2aa5..86f8b0ae588 100644 --- a/packages/pds/src/api/com/atproto/admin/updateAccountHandle.ts +++ b/packages/pds/src/api/com/atproto/admin/updateAccountHandle.ts @@ -44,7 +44,6 @@ export default function (server: Server, ctx: AppContext) { } try { - await ctx.sequencer.sequenceHandleUpdate(did, handle) await ctx.sequencer.sequenceIdentityEvt(did, handle) } catch (err) { httpLogger.error( diff --git a/packages/pds/src/api/com/atproto/identity/updateHandle.ts b/packages/pds/src/api/com/atproto/identity/updateHandle.ts index 61c42bae53e..af9ce0a8239 100644 --- a/packages/pds/src/api/com/atproto/identity/updateHandle.ts +++ b/packages/pds/src/api/com/atproto/identity/updateHandle.ts @@ -79,7 +79,6 @@ export default function (server: Server, ctx: AppContext) { } try { - await ctx.sequencer.sequenceHandleUpdate(requester, handle) await ctx.sequencer.sequenceIdentityEvt(requester, handle) } catch (err) { httpLogger.error( diff --git a/packages/pds/src/api/com/atproto/server/activateAccount.ts b/packages/pds/src/api/com/atproto/server/activateAccount.ts index d3c73d077d7..63342b264a6 100644 --- a/packages/pds/src/api/com/atproto/server/activateAccount.ts +++ b/packages/pds/src/api/com/atproto/server/activateAccount.ts @@ -1,4 +1,3 @@ -import { CidSet } from '@atproto/repo' import { INVALID_HANDLE } from '@atproto/syntax' import { InvalidRequestError } from '@atproto/xrpc-server' import { AppContext } from '../../../../context' @@ -31,22 +30,9 @@ export default function (server: Server, ctx: AppContext) { await ctx.accountManager.activateAccount(requester) - const commitData = await ctx.actorStore.read(requester, async (store) => { - const root = await store.repo.storage.getRootDetailed() - const blocks = await store.repo.storage.getBlocks([root.cid]) - return { - cid: root.cid, - rev: root.rev, - since: null, - prev: null, - newBlocks: blocks.blocks, - relevantBlocks: blocks.blocks, - removedCids: new CidSet(), - ops: [], - blobs: new CidSet(), - prevData: null, - } - }) + const syncData = await ctx.actorStore.read(requester, (store) => + store.repo.getSyncEventData(), + ) // @NOTE: we're over-emitting for now for backwards compatibility, can reduce this in the future const status = await ctx.accountManager.getAccountStatus(requester) @@ -55,7 +41,7 @@ export default function (server: Server, ctx: AppContext) { requester, account.handle ?? INVALID_HANDLE, ) - await ctx.sequencer.sequenceCommit(requester, commitData) + await ctx.sequencer.sequenceSyncEvt(requester, syncData) }, }) } diff --git a/packages/pds/src/api/com/atproto/server/deleteAccount.ts b/packages/pds/src/api/com/atproto/server/deleteAccount.ts index c7aa20da9f4..9b3ed017d82 100644 --- a/packages/pds/src/api/com/atproto/server/deleteAccount.ts +++ b/packages/pds/src/api/com/atproto/server/deleteAccount.ts @@ -48,8 +48,7 @@ export default function (server: Server, ctx: AppContext) { did, AccountStatus.Deleted, ) - const tombstoneSeq = await ctx.sequencer.sequenceTombstone(did) - await ctx.sequencer.deleteAllForUser(did, [accountSeq, tombstoneSeq]) + await ctx.sequencer.deleteAllForUser(did, [accountSeq]) }, }) } diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index cfa5e75ecc4..c2418d343aa 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -45,9 +45,9 @@ export default function (server: Server, ctx: AppContext) { time: evt.time, ...evt.evt, } - } else if (evt.type === 'handle') { + } else if (evt.type === 'sync') { yield { - $type: '#handle', + $type: '#sync', seq: evt.seq, time: evt.time, ...evt.evt, @@ -66,13 +66,6 @@ export default function (server: Server, ctx: AppContext) { time: evt.time, ...evt.evt, } - } else if (evt.type === 'tombstone') { - yield { - $type: '#tombstone', - seq: evt.seq, - time: evt.time, - ...evt.evt, - } } } }) diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 6463e7c6ec3..9384e884c23 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -3991,9 +3991,6 @@ export const schemaDict = { 'lex:com.atproto.sync.subscribeRepos#sync', 'lex:com.atproto.sync.subscribeRepos#identity', 'lex:com.atproto.sync.subscribeRepos#account', - 'lex:com.atproto.sync.subscribeRepos#handle', - 'lex:com.atproto.sync.subscribeRepos#migrate', - 'lex:com.atproto.sync.subscribeRepos#tombstone', 'lex:com.atproto.sync.subscribeRepos#info', ], }, @@ -4197,68 +4194,6 @@ export const schemaDict = { }, }, }, - handle: { - type: 'object', - description: 'DEPRECATED -- Use #identity event instead', - required: ['seq', 'did', 'handle', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - handle: { - type: 'string', - format: 'handle', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - migrate: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'migrateTo', 'time'], - nullable: ['migrateTo'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - migrateTo: { - type: 'string', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, - tombstone: { - type: 'object', - description: 'DEPRECATED -- Use #account event instead', - required: ['seq', 'did', 'time'], - properties: { - seq: { - type: 'integer', - }, - did: { - type: 'string', - format: 'did', - }, - time: { - type: 'string', - format: 'datetime', - }, - }, - }, info: { type: 'object', required: ['name'], diff --git a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts index decfb210b25..318d096aa83 100644 --- a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts @@ -22,9 +22,6 @@ export type OutputSchema = | $Typed | $Typed | $Typed - | $Typed - | $Typed - | $Typed | $Typed | { $type: string } export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'> @@ -150,62 +147,6 @@ export function validateAccount(v: V) { return validate(v, id, hashAccount) } -/** DEPRECATED -- Use #identity event instead */ -export interface Handle { - $type?: 'com.atproto.sync.subscribeRepos#handle' - seq: number - did: string - handle: string - time: string -} - -const hashHandle = 'handle' - -export function isHandle(v: V) { - return is$typed(v, id, hashHandle) -} - -export function validateHandle(v: V) { - return validate(v, id, hashHandle) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Migrate { - $type?: 'com.atproto.sync.subscribeRepos#migrate' - seq: number - did: string - migrateTo: string | null - time: string -} - -const hashMigrate = 'migrate' - -export function isMigrate(v: V) { - return is$typed(v, id, hashMigrate) -} - -export function validateMigrate(v: V) { - return validate(v, id, hashMigrate) -} - -/** DEPRECATED -- Use #account event instead */ -export interface Tombstone { - $type?: 'com.atproto.sync.subscribeRepos#tombstone' - seq: number - did: string - time: string -} - -const hashTombstone = 'tombstone' - -export function isTombstone(v: V) { - return is$typed(v, id, hashTombstone) -} - -export function validateTombstone(v: V) { - return validate(v, id, hashTombstone) -} - export interface Info { $type?: 'com.atproto.sync.subscribeRepos#info' name: 'OutdatedCursor' | (string & {}) diff --git a/packages/pds/src/repo/types.ts b/packages/pds/src/repo/types.ts index ccf61a787c8..6eb77d79ad9 100644 --- a/packages/pds/src/repo/types.ts +++ b/packages/pds/src/repo/types.ts @@ -1,6 +1,6 @@ import { CID } from 'multiformats/cid' import { RepoRecord } from '@atproto/lexicon' -import { CidSet, CommitData, WriteOpAction } from '@atproto/repo' +import { BlockMap, CommitData, WriteOpAction } from '@atproto/repo' import { AtUri } from '@atproto/syntax' export type ValidationStatus = 'valid' | 'unknown' | undefined @@ -51,12 +51,17 @@ export type CommitOp = { export type CommitDataWithOps = CommitData & { ops: CommitOp[] - blobs: CidSet prevData: CID | null } export type PreparedWrite = PreparedCreate | PreparedUpdate | PreparedDelete +export type SyncEvtData = { + cid: CID + rev: string + blocks: BlockMap +} + export class InvalidRecordError extends Error {} export class BadCommitSwapError extends Error { diff --git a/packages/pds/src/scripts/rebuild-repo.ts b/packages/pds/src/scripts/rebuild-repo.ts index ccb4ca92769..6b8f147d736 100644 --- a/packages/pds/src/scripts/rebuild-repo.ts +++ b/packages/pds/src/scripts/rebuild-repo.ts @@ -77,7 +77,10 @@ export const rebuildRepo = async (ctx: AppContext, args: string[]) => { } }) await ctx.accountManager.updateRepoRoot(did, commit.cid, rev) - await ctx.sequencer.sequenceCommit(did, commit) + const syncData = await ctx.actorStore.read(did, (store) => + store.repo.getSyncEventData(), + ) + await ctx.sequencer.sequenceSyncEvt(did, syncData) } const promptContinue = async (): Promise => { diff --git a/packages/pds/src/sequencer/db/schema.ts b/packages/pds/src/sequencer/db/schema.ts index 815125c39ca..cc9294a878b 100644 --- a/packages/pds/src/sequencer/db/schema.ts +++ b/packages/pds/src/sequencer/db/schema.ts @@ -1,13 +1,6 @@ import { Generated, GeneratedAlways, Insertable, Selectable } from 'kysely' -export type RepoSeqEventType = - | 'append' - | 'rebase' - | 'handle' - | 'migrate' - | 'identity' - | 'account' - | 'tombstone' +export type RepoSeqEventType = 'append' | 'sync' | 'identity' | 'account' export interface RepoSeq { seq: GeneratedAlways diff --git a/packages/pds/src/sequencer/events.ts b/packages/pds/src/sequencer/events.ts index 536d874c196..b3be05b07e9 100644 --- a/packages/pds/src/sequencer/events.ts +++ b/packages/pds/src/sequencer/events.ts @@ -2,7 +2,7 @@ import { z } from 'zod' import { cborEncode, noUndefinedVals, schema } from '@atproto/common' import { BlockMap, blocksToCarFile } from '@atproto/repo' import { AccountStatus } from '../account-manager' -import { CommitDataWithOps } from '../repo' +import { CommitDataWithOps, SyncEvtData } from '../repo' import { RepoSeqInsert } from './db' export const formatSeqCommit = async ( @@ -13,41 +13,18 @@ export const formatSeqCommit = async ( blocksToSend.addMap(commitData.newBlocks) blocksToSend.addMap(commitData.relevantBlocks) - let evt: CommitEvt - - // If event is too big (max 200 ops or 1MB of data) - if (commitData.ops.length > 200 || blocksToSend.byteSize > 1000000) { - const justRoot = new BlockMap() - const rootBlock = blocksToSend.get(commitData.cid) - if (rootBlock) { - justRoot.set(commitData.cid, rootBlock) - } - - evt = { - rebase: false, - tooBig: true, - repo: did, - commit: commitData.cid, - rev: commitData.rev, - since: commitData.since, - blocks: await blocksToCarFile(commitData.cid, justRoot), - ops: [], - blobs: [], - prevData: commitData.prevData ?? undefined, - } - } else { - evt = { - rebase: false, - tooBig: false, - repo: did, - commit: commitData.cid, - rev: commitData.rev, - since: commitData.since, - blocks: await blocksToCarFile(commitData.cid, blocksToSend), - ops: commitData.ops, - blobs: commitData.blobs.toList(), - prevData: commitData.prevData ?? undefined, - } + const evt = { + repo: did, + commit: commitData.cid, + rev: commitData.rev, + since: commitData.since, + blocks: await blocksToCarFile(commitData.cid, blocksToSend), + ops: commitData.ops, + prevData: commitData.prevData ?? undefined, + // deprecated (but still required) fields + rebase: false, + tooBig: false, + blobs: [], } return { @@ -58,17 +35,19 @@ export const formatSeqCommit = async ( } } -export const formatSeqHandleUpdate = async ( +export const formatSeqSyncEvt = async ( did: string, - handle: string, + data: SyncEvtData, ): Promise => { - const evt: HandleEvt = { + const blocks = await blocksToCarFile(data.cid, data.blocks) + const evt: SyncEvt = { did, - handle, + rev: data.rev, + blocks, } return { did, - eventType: 'handle', + eventType: 'sync', event: cborEncode(evt), sequencedAt: new Date().toISOString(), } @@ -112,20 +91,6 @@ export const formatSeqAccountEvt = async ( } } -export const formatSeqTombstone = async ( - did: string, -): Promise => { - const evt: TombstoneEvt = { - did, - } - return { - did, - eventType: 'tombstone', - event: cborEncode(evt), - sequencedAt: new Date().toISOString(), - } -} - export const commitEvtOp = z.object({ action: z.union([ z.literal('create'), @@ -152,11 +117,12 @@ export const commitEvt = z.object({ }) export type CommitEvt = z.infer -export const handleEvt = z.object({ +export const syncEvt = z.object({ did: z.string(), - handle: z.string(), + blocks: schema.bytes, + rev: z.string(), }) -export type HandleEvt = z.infer +export type SyncEvt = z.infer export const identityEvt = z.object({ did: z.string(), @@ -178,22 +144,17 @@ export const accountEvt = z.object({ }) export type AccountEvt = z.infer -export const tombstoneEvt = z.object({ - did: z.string(), -}) -export type TombstoneEvt = z.infer - type TypedCommitEvt = { type: 'commit' seq: number time: string evt: CommitEvt } -type TypedHandleEvt = { - type: 'handle' +type TypedSyncEvt = { + type: 'sync' seq: number time: string - evt: HandleEvt + evt: SyncEvt } type TypedIdentityEvt = { type: 'identity' @@ -207,15 +168,8 @@ type TypedAccountEvt = { time: string evt: AccountEvt } -type TypedTombstoneEvt = { - type: 'tombstone' - seq: number - time: string - evt: TombstoneEvt -} export type SeqEvt = | TypedCommitEvt - | TypedHandleEvt + | TypedSyncEvt | TypedIdentityEvt | TypedAccountEvt - | TypedTombstoneEvt diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index 5da563795b3..c9a53d702dc 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -4,7 +4,7 @@ import { SECOND, cborDecode, wait } from '@atproto/common' import { AccountStatus } from '../account-manager/helpers/account' import { Crawlers } from '../crawlers' import { seqLogger as log } from '../logger' -import { CommitDataWithOps } from '../repo' +import { CommitDataWithOps, SyncEvtData } from '../repo' import { RepoSeqEntry, RepoSeqInsert, @@ -15,15 +15,13 @@ import { import { AccountEvt, CommitEvt, - HandleEvt, IdentityEvt, SeqEvt, - TombstoneEvt, + SyncEvt, formatSeqAccountEvt, formatSeqCommit, - formatSeqHandleUpdate, formatSeqIdentityEvt, - formatSeqTombstone, + formatSeqSyncEvt, } from './events' export * from './events' @@ -135,19 +133,19 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { continue } const evt = cborDecode(row.event) - if (row.eventType === 'append' || row.eventType === 'rebase') { + if (row.eventType === 'append') { seqEvts.push({ type: 'commit', seq: row.seq, time: row.sequencedAt, evt: evt as CommitEvt, }) - } else if (row.eventType === 'handle') { + } else if (row.eventType === 'sync') { seqEvts.push({ - type: 'handle', + type: 'sync', seq: row.seq, time: row.sequencedAt, - evt: evt as HandleEvt, + evt: evt as SyncEvt, }) } else if (row.eventType === 'identity') { seqEvts.push({ @@ -163,13 +161,6 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { time: row.sequencedAt, evt: evt as AccountEvt, }) - } else if (row.eventType === 'tombstone') { - seqEvts.push({ - type: 'tombstone', - seq: row.seq, - time: row.sequencedAt, - evt: evt as TombstoneEvt, - }) } } @@ -222,8 +213,8 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { return await this.sequenceEvt(evt) } - async sequenceHandleUpdate(did: string, handle: string): Promise { - const evt = await formatSeqHandleUpdate(did, handle) + async sequenceSyncEvt(did: string, data: SyncEvtData) { + const evt = await formatSeqSyncEvt(did, data) return await this.sequenceEvt(evt) } @@ -240,11 +231,6 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { return await this.sequenceEvt(evt) } - async sequenceTombstone(did: string): Promise { - const evt = await formatSeqTombstone(did) - return await this.sequenceEvt(evt) - } - async deleteAllForUser(did: string, excludingSeqs: number[] = []) { await this.db.executeWithRetry( this.db.db diff --git a/packages/pds/tests/account-deletion.test.ts b/packages/pds/tests/account-deletion.test.ts index b2fe0a41143..be0d048ac70 100644 --- a/packages/pds/tests/account-deletion.test.ts +++ b/packages/pds/tests/account-deletion.test.ts @@ -151,14 +151,12 @@ describe('account deletion', () => { expect( updatedDbContents.repoSeqs .filter((row) => row.did === carol.did) - .every( - (row) => row.eventType === 'tombstone' || row.eventType === 'account', - ), + .every((row) => row.eventType === 'account'), ).toBe(true) - // check we do have a tombstone for this did + // check we do have a account (deletion) event for this did expect( updatedDbContents.repoSeqs.filter( - (row) => row.did === carol.did && row.eventType === 'tombstone', + (row) => row.did === carol.did && row.eventType === 'account', ).length, ).toEqual(1) expect(updatedDbContents.appPasswords).toEqual( diff --git a/packages/pds/tests/sequencer.test.ts b/packages/pds/tests/sequencer.test.ts index b8e64fcd08c..4be1e57593b 100644 --- a/packages/pds/tests/sequencer.test.ts +++ b/packages/pds/tests/sequencer.test.ts @@ -7,9 +7,8 @@ import { import { randomStr } from '@atproto/crypto' import { SeedClient, TestNetworkNoAppView } from '@atproto/dev-env' import { readCarWithRoot } from '@atproto/repo' -import { repoPrepare, sequencer } from '../../pds' -import { ids } from '../src/lexicon/lexicons' -import { SeqEvt, Sequencer, formatSeqCommit } from '../src/sequencer' +import { sequencer } from '../../pds' +import { SeqEvt, Sequencer, formatSeqSyncEvt } from '../src/sequencer' import { Outbox } from '../src/sequencer/outbox' import userSeed from './seeds/users' @@ -220,35 +219,27 @@ describe('sequencer', () => { lastSeen = results[0].at(-1)?.seq ?? lastSeen }) - it('root block must be returned in tooBig seq commit', async () => { - // Create good records to exceed the event limit (the current limit is 200 events) - // it creates events completely locally, so it doesn't need to be in the network - const eventsToCreate = 250 - const createPostRecord = () => - repoPrepare.prepareCreate({ - did: sc.dids.alice, - collection: ids.AppBskyFeedPost, - record: { text: 'valid', createdAt: new Date().toISOString() }, - }) - const writesPromises = Array.from( - { length: eventsToCreate }, - createPostRecord, - ) - const writes = await Promise.all(writesPromises) - // just format commit without processing writes - const writeCommit = await network.pds.ctx.actorStore.transact( + it('root block must be returned in sync event', async () => { + const syncData = await network.pds.ctx.actorStore.read( sc.dids.alice, - (store) => store.repo.formatCommit(writes), + async (store) => { + const root = await store.repo.storage.getRootDetailed() + const { blocks } = await store.repo.storage.getBlocks([root.cid]) + return { + cid: root.cid, + rev: root.rev, + blocks, + } + }, ) - const repoSeqInsert = await formatSeqCommit(sc.dids.alice, writeCommit) - - const evt = cborDecode(repoSeqInsert.event) - expect(evt.tooBig).toBe(true) - + const dbEvt = await formatSeqSyncEvt(sc.dids.alice, syncData) + const evt = cborDecode(dbEvt.event) + expect(evt.did).toBe(sc.dids.alice) const car = await readCarWithRoot(evt.blocks) - expect(car.root.toString()).toBe(writeCommit.cid.toString()) + expect(car.root.toString()).toBe(syncData.cid.toString()) // in the case of tooBig, the blocks must contain the root block only expect(car.blocks.size).toBe(1) + expect(car.blocks.has(syncData.cid)).toBeTruthy() }) }) diff --git a/packages/pds/tests/sync/subscribe-repos.test.ts b/packages/pds/tests/sync/subscribe-repos.test.ts index 1fe81c7fa03..dfceac52cd4 100644 --- a/packages/pds/tests/sync/subscribe-repos.test.ts +++ b/packages/pds/tests/sync/subscribe-repos.test.ts @@ -18,9 +18,8 @@ import { AccountStatus } from '../../src/account-manager' import { Account as AccountEvt, Commit as CommitEvt, - Handle as HandleEvt, Identity as IdentityEvt, - Tombstone as TombstoneEvt, + Sync as SyncEvt, } from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' import basicSeed from '../seeds/basic' @@ -74,10 +73,12 @@ describe('repo subscribe repos', () => { if ( (frame.header.t === '#commit' && (frame.body as CommitEvt).repo === userDid) || - (frame.header.t === '#handle' && - (frame.body as HandleEvt).did === userDid) || - (frame.header.t === '#tombstone' && - (frame.body as TombstoneEvt).did === userDid) + (frame.header.t === '#sync' && + (frame.body as SyncEvt).did === userDid) || + (frame.header.t === '#identity' && + (frame.body as IdentityEvt).did === userDid) || + (frame.header.t === '#account' && + (frame.body as AccountEvt).did === userDid) ) { types.push(frame.body) } @@ -96,6 +97,10 @@ describe('repo subscribe repos', () => { return evts } + const getSyncEvts = (frames: Frame[]): SyncEvt[] => { + return getEventType(frames, '#sync') + } + const getAccountEvts = (frames: Frame[]): AccountEvt[] => { return getEventType(frames, '#account') } @@ -104,14 +109,6 @@ describe('repo subscribe repos', () => { return getEventType(frames, '#identity') } - const getHandleEvts = (frames: Frame[]): HandleEvt[] => { - return getEventType(frames, '#handle') - } - - const getTombstoneEvts = (frames: Frame[]): TombstoneEvt[] => { - return getEventType(frames, '#tombstone') - } - const getCommitEvents = (frames: Frame[]): CommitEvt[] => { return getEventType(frames, '#commit') } @@ -127,13 +124,6 @@ describe('repo subscribe repos', () => { expect(evt.handle).toEqual(handle) } - const verifyHandleEvent = (evt: HandleEvt, did: string, handle: string) => { - expect(typeof evt.seq).toBe('number') - expect(evt.did).toBe(did) - expect(evt.handle).toBe(handle) - expect(typeof evt.time).toBe('string') - } - const verifyAccountEvent = ( evt: AccountEvt, did: string, @@ -147,10 +137,20 @@ describe('repo subscribe repos', () => { expect(evt.status).toBe(status) } - const verifyTombstoneEvent = (evt: unknown, did: string) => { - expect(evt?.['did']).toBe(did) - expect(typeof evt?.['time']).toBe('string') - expect(typeof evt?.['seq']).toBe('number') + const verifySyncEvent = async ( + evt: SyncEvt, + did: string, + commit: CID, + rev: string, + ) => { + expect(typeof evt.seq).toBe('number') + expect(evt.did).toBe(did) + expect(typeof evt.time).toBe('string') + expect(evt.rev).toBe(rev) + const car = await repo.readCarWithRoot(evt.blocks) + expect(car.root.equals(commit)).toBe(true) + expect(car.blocks.size).toBe(1) + expect(car.blocks.has(car.root)).toBe(true) } const verifyCommitEvents = async (frames: Frame[]) => { @@ -329,7 +329,7 @@ describe('repo subscribe repos', () => { } }) - it('syncs handle changes', async () => { + it('syncs handle changes (identity evts)', async () => { await sc.updateHandle(alice, 'alice2.test') await sc.updateHandle(bob, 'bob2.test') await sc.updateHandle(bob, 'bob2.test') // idempotent update re-sends @@ -344,20 +344,14 @@ describe('repo subscribe repos', () => { await verifyCommitEvents(evts) - const handleEvts = getHandleEvts(evts.slice(-6)) - expect(handleEvts.length).toBe(3) - verifyHandleEvent(handleEvts[0], alice, 'alice2.test') - verifyHandleEvent(handleEvts[1], bob, 'bob2.test') - verifyHandleEvent(handleEvts[2], bob, 'bob2.test') - - const identityEvts = getIdentityEvts(evts.slice(-6)) + const identityEvts = getIdentityEvts(evts.slice(-3)) expect(identityEvts.length).toBe(3) verifyIdentityEvent(identityEvts[0], alice, 'alice2.test') verifyIdentityEvent(identityEvts[1], bob, 'bob2.test') verifyIdentityEvent(identityEvts[2], bob, 'bob2.test') }) - it('resends handle events on idempotent updates', async () => { + it('resends identity events on idempotent updates', async () => { const update = sc.updateHandle(bob, 'bob2.test') const ws = new WebSocket( @@ -368,8 +362,8 @@ describe('repo subscribe repos', () => { const evts = await readTillCaughtUp(gen, update) ws.terminate() - const handleEvts = getHandleEvts(evts.slice(-2)) - verifyHandleEvent(handleEvts[0], bob, 'bob2.test') + const identityEvts = getIdentityEvts(evts.slice(-1)) + verifyIdentityEvent(identityEvts[0], bob, 'bob2.test') }) it('syncs account events', async () => { @@ -487,7 +481,35 @@ describe('repo subscribe repos', () => { verifyAccountEvent(accountEvts[3], alice, true) }) - it('syncs tombstones', async () => { + it('emits sync event on account activation', async () => { + await agent.api.com.atproto.server.deactivateAccount( + {}, + { + encoding: 'application/json', + headers: sc.getHeaders(alice), + }, + ) + await agent.api.com.atproto.server.activateAccount(undefined, { + headers: sc.getHeaders(alice), + }) + + const ws = new WebSocket( + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, + ) + + const gen = byFrame(ws) + const evts = await readTillCaughtUp(gen) + ws.terminate() + + const syncEvts = getSyncEvts(evts.slice(-1)) + expect(syncEvts.length).toBe(1) + const root = await ctx.actorStore.read(alice, (store) => + store.repo.storage.getRootDetailed(), + ) + await verifySyncEvent(syncEvts[0], alice, root.cid, root.rev) + }) + + it('syncs account deletions (account evt)', async () => { const baddie1 = ( await sc.createAccount('baddie1.test', { email: 'baddie1@test.com', @@ -529,12 +551,7 @@ describe('repo subscribe repos', () => { const evts = await readTillCaughtUp(gen) ws.terminate() - const tombstoneEvts = getTombstoneEvts(evts.slice(-4)) - expect(tombstoneEvts.length).toBe(2) - verifyTombstoneEvent(tombstoneEvts[0], baddie1) - verifyTombstoneEvent(tombstoneEvts[1], baddie2) - - const accountEvts = getAccountEvts(evts.slice(-4)) + const accountEvts = getAccountEvts(evts.slice(-2)) expect(accountEvts.length).toBe(2) verifyAccountEvent(accountEvts[0], baddie1, false, AccountStatus.Deleted) verifyAccountEvent(accountEvts[1], baddie2, false, AccountStatus.Deleted) @@ -571,7 +588,12 @@ describe('repo subscribe repos', () => { const didEvts = getAllEvents(baddie3, evts) expect(didEvts.length).toBe(1) - verifyTombstoneEvent(didEvts[0], baddie3) + verifyAccountEvent( + didEvts[0] as AccountEvt, + baddie3, + false, + AccountStatus.Deleted, + ) }) it('sends info frame on out of date cursor', async () => {