Skip to content

Commit

Permalink
Notification cursor delay (#3573)
Browse files Browse the repository at this point in the history
* Use ISO 8601 datetime string as notif list cursor

* Refactor pagination functions to methods

* Implement configurable notification delay

* Add comment

* Apply PR suggestions

* Implement suggestions

* Properly synchronize notification delay test

* Simplify date validation logic
  • Loading branch information
rafaelbsky authored Feb 28, 2025
1 parent dc6e4ec commit be80036
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 63 deletions.
34 changes: 28 additions & 6 deletions packages/bsky/src/api/app/bsky/notification/listNotifications.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { mapDefined } from '@atproto/common'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { ServerConfig } from '../../../../config'
import { AppContext } from '../../../../context'
import { HydrateCtx, Hydrator } from '../../../../hydration/hydrator'
import { Server } from '../../../../lexicon'
Expand All @@ -15,7 +16,7 @@ import {
import { Notification } from '../../../../proto/bsky_pb'
import { uriToDid as didFromUri } from '../../../../util/uris'
import { Views } from '../../../../views'
import { clearlyBadCursor, resHeaders } from '../../../util'
import { resHeaders } from '../../../util'

export default function (server: Server, ctx: AppContext) {
const listNotifications = createPipeline(
Expand Down Expand Up @@ -93,24 +94,44 @@ const paginateNotifications = async (opts: {
}
}

/**
* Applies a configurable delay to the datetime string of a cursor,
* effectively allowing for a delay on listing the notifications.
* This is useful to allow time for services to process notifications
* before they are listed to the user.
*/
export const delayCursor = (
cursorStr: string | undefined,
delayMs: number,
): string => {
const nowMinusDelay = Date.now() - delayMs
if (cursorStr === undefined) return new Date(nowMinusDelay).toISOString()
const cursor = new Date(cursorStr).getTime()
if (isNaN(cursor)) return cursorStr
return new Date(Math.min(cursor, nowMinusDelay)).toISOString()
}

const skeleton = async (
input: SkeletonFnInput<Context, Params>,
): Promise<SkeletonState> => {
const { params, ctx } = input
if (params.seenAt) {
throw new InvalidRequestError('The seenAt parameter is unsupported')
}

const originalCursor = params.cursor
const delayedCursor = delayCursor(
originalCursor,
ctx.cfg.notificationsDelayMs,
)
const viewer = params.hydrateCtx.viewer
const priority = params.priority ?? (await getPriority(ctx, viewer))
if (clearlyBadCursor(params.cursor)) {
return { notifs: [], priority }
}
const [res, lastSeenRes] = await Promise.all([
paginateNotifications({
ctx,
priority,
reasons: params.reasons,
cursor: params.cursor,
cursor: delayedCursor,
limit: params.limit,
viewer,
}),
Expand All @@ -122,7 +143,7 @@ const skeleton = async (
// @NOTE for the first page of results if there's no last-seen time, consider top notification unread
// rather than all notifications. bit of a hack to be more graceful when seen times are out of sync.
let lastSeenDate = lastSeenRes.timestamp?.toDate()
if (!lastSeenDate && !params.cursor) {
if (!lastSeenDate && !originalCursor) {
lastSeenDate = res.notifications.at(0)?.timestamp?.toDate()
}
return {
Expand Down Expand Up @@ -210,6 +231,7 @@ const presentation = (
type Context = {
hydrator: Hydrator
views: Views
cfg: ServerConfig
}

type Params = QueryParams & {
Expand Down
11 changes: 11 additions & 0 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export interface ServerConfigValues {
bigThreadUris: Set<string>
bigThreadDepth?: number
maxThreadDepth?: number
// notifications
notificationsDelayMs?: number
// client config
clientCheckEmailConfirmed?: boolean
topicsEnabled?: boolean
Expand Down Expand Up @@ -170,6 +172,10 @@ export class ServerConfig {
? parseInt(process.env.BSKY_MAX_THREAD_DEPTH || '', 10)
: undefined

const notificationsDelayMs = process.env.BSKY_NOTIFICATIONS_DELAY_MS
? parseInt(process.env.BSKY_NOTIFICATIONS_DELAY_MS || '', 10)
: 0

const disableSsrfProtection = process.env.BSKY_DISABLE_SSRF_PROTECTION
? process.env.BSKY_DISABLE_SSRF_PROTECTION === 'true'
: debugMode
Expand Down Expand Up @@ -231,6 +237,7 @@ export class ServerConfig {
bigThreadUris,
bigThreadDepth,
maxThreadDepth,
notificationsDelayMs,
disableSsrfProtection,
proxyAllowHTTP2,
proxyHeadersTimeout,
Expand Down Expand Up @@ -426,6 +433,10 @@ export class ServerConfig {
return this.cfg.maxThreadDepth
}

get notificationsDelayMs() {
return this.cfg.notificationsDelayMs ?? 0
}

get disableSsrfProtection(): boolean {
return this.cfg.disableSsrfProtection ?? false
}
Expand Down
193 changes: 158 additions & 35 deletions packages/bsky/src/data-plane/server/db/pagination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { sql } from 'kysely'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { AnyQb, DbRef } from './util'

export type Cursor = { primary: string; secondary: string }
export type LabeledResult = {
type KeysetCursor = { primary: string; secondary: string }
type KeysetLabeledResult = {
primary: string | number
secondary: string | number
}
Expand All @@ -22,14 +22,14 @@ export type LabeledResult = {
* Result -*-> LabeledResult <-*-> Cursor <--> packed/string cursor
* ↳ SQL Condition
*/
export abstract class GenericKeyset<R, LR extends LabeledResult> {
export abstract class GenericKeyset<R, LR extends KeysetLabeledResult> {
constructor(
public primary: DbRef,
public secondary: DbRef,
) {}
abstract labelResult(result: R): LR
abstract labeledResultToCursor(labeled: LR): Cursor
abstract cursorToLabeledResult(cursor: Cursor): LR
abstract labeledResultToCursor(labeled: LR): KeysetCursor
abstract cursorToLabeledResult(cursor: KeysetCursor): LR
packFromResult(results: R | R[]): string | undefined {
const result = Array.isArray(results) ? results.at(-1) : results
if (!result) return
Expand All @@ -45,11 +45,11 @@ export abstract class GenericKeyset<R, LR extends LabeledResult> {
if (!cursor) return
return this.cursorToLabeledResult(cursor)
}
packCursor(cursor?: Cursor): string | undefined {
packCursor(cursor?: KeysetCursor): string | undefined {
if (!cursor) return
return `${cursor.primary}__${cursor.secondary}`
}
unpackCursor(cursorStr?: string): Cursor | undefined {
unpackCursor(cursorStr?: string): KeysetCursor | undefined {
if (!cursorStr) return
const result = cursorStr.split('__')
const [primary, secondary, ...others] = result
Expand Down Expand Up @@ -79,10 +79,43 @@ export abstract class GenericKeyset<R, LR extends LabeledResult> {
}
}
}
paginate<QB extends AnyQb>(
qb: QB,
opts: {
limit?: number
cursor?: string
direction?: 'asc' | 'desc'
tryIndex?: boolean
// By default, pg does nullsFirst
nullsLast?: boolean
},
): QB {
const { limit, cursor, direction = 'desc', tryIndex, nullsLast } = opts
const keysetSql = this.getSql(this.unpack(cursor), direction, tryIndex)
return qb
.if(!!limit, (q) => q.limit(limit as number))
.if(!nullsLast, (q) =>
q.orderBy(this.primary, direction).orderBy(this.secondary, direction),
)
.if(!!nullsLast, (q) =>
q
.orderBy(
direction === 'asc'
? sql`${this.primary} asc nulls last`
: sql`${this.primary} desc nulls last`,
)
.orderBy(
direction === 'asc'
? sql`${this.secondary} asc nulls last`
: sql`${this.secondary} desc nulls last`,
),
)
.if(!!keysetSql, (qb) => (keysetSql ? qb.where(keysetSql) : qb)) as QB
}
}

type SortAtCidResult = { sortAt: string; cid: string }
type TimeCidLabeledResult = Cursor
type TimeCidLabeledResult = KeysetCursor

export class TimeCidKeyset<
TimeCidResult = SortAtCidResult,
Expand All @@ -97,7 +130,7 @@ export class TimeCidKeyset<
secondary: labeled.secondary,
}
}
cursorToLabeledResult(cursor: Cursor) {
cursorToLabeledResult(cursor: KeysetCursor) {
const primaryDate = new Date(parseInt(cursor.primary, 10))
if (isNaN(primaryDate.getTime())) {
throw new InvalidRequestError('Malformed cursor')
Expand Down Expand Up @@ -127,6 +160,9 @@ export class IndexedAtDidKeyset extends TimeCidKeyset<{
}
}

/**
* This is being deprecated. Use {@link GenericKeyset#paginate} instead.
*/
export const paginate = <
QB extends AnyQb,
K extends GenericKeyset<unknown, any>,
Expand All @@ -142,32 +178,119 @@ export const paginate = <
nullsLast?: boolean
},
): QB => {
const {
limit,
cursor,
keyset,
direction = 'desc',
tryIndex,
nullsLast,
} = opts
const keysetSql = keyset.getSql(keyset.unpack(cursor), direction, tryIndex)
return qb
.if(!!limit, (q) => q.limit(limit as number))
.if(!nullsLast, (q) =>
q.orderBy(keyset.primary, direction).orderBy(keyset.secondary, direction),
)
.if(!!nullsLast, (q) =>
q
.orderBy(
direction === 'asc'
? sql`${keyset.primary} asc nulls last`
: sql`${keyset.primary} desc nulls last`,
)
.orderBy(
return opts.keyset.paginate(qb, opts)
}

type SingleKeyCursor = {
primary: string
}

type SingleKeyLabeledResult = {
primary: string | number
}

/**
* GenericSingleKey is similar to {@link GenericKeyset} but for a single key cursor.
*/
export abstract class GenericSingleKey<R, LR extends SingleKeyLabeledResult> {
constructor(public primary: DbRef) {}
abstract labelResult(result: R): LR
abstract labeledResultToCursor(labeled: LR): SingleKeyCursor
abstract cursorToLabeledResult(cursor: SingleKeyCursor): LR
packFromResult(results: R | R[]): string | undefined {
const result = Array.isArray(results) ? results.at(-1) : results
if (!result) return
return this.pack(this.labelResult(result))
}
pack(labeled?: LR): string | undefined {
if (!labeled) return
const cursor = this.labeledResultToCursor(labeled)
return this.packCursor(cursor)
}
unpack(cursorStr?: string): LR | undefined {
const cursor = this.unpackCursor(cursorStr)
if (!cursor) return
return this.cursorToLabeledResult(cursor)
}
packCursor(cursor?: SingleKeyCursor): string | undefined {
if (!cursor) return
return cursor.primary
}
unpackCursor(cursorStr?: string): SingleKeyCursor | undefined {
if (!cursorStr) return
const result = cursorStr.split('__')
const [primary, ...others] = result
if (!primary || others.length > 0) {
throw new InvalidRequestError('Malformed cursor')
}
return {
primary,
}
}
getSql(labeled?: LR, direction?: 'asc' | 'desc') {
if (labeled === undefined) return
if (direction === 'asc') {
return sql`${this.primary} > ${labeled.primary}`
}
return sql`${this.primary} < ${labeled.primary}`
}
paginate<QB extends AnyQb>(
qb: QB,
opts: {
limit?: number
cursor?: string
direction?: 'asc' | 'desc'
// By default, pg does nullsFirst
nullsLast?: boolean
},
): QB {
const { limit, cursor, direction = 'desc', nullsLast } = opts
const keySql = this.getSql(this.unpack(cursor), direction)
return qb
.if(!!limit, (q) => q.limit(limit as number))
.if(!nullsLast, (q) => q.orderBy(this.primary, direction))
.if(!!nullsLast, (q) =>
q.orderBy(
direction === 'asc'
? sql`${keyset.secondary} asc nulls last`
: sql`${keyset.secondary} desc nulls last`,
? sql`${this.primary} asc nulls last`
: sql`${this.primary} desc nulls last`,
),
)
.if(!!keysetSql, (qb) => (keysetSql ? qb.where(keysetSql) : qb)) as QB
)
.if(!!keySql, (qb) => (keySql ? qb.where(keySql) : qb)) as QB
}
}

type SortAtResult = { sortAt: string }
type TimeLabeledResult = SingleKeyCursor

export class IsoTimeKey<TimeResult = SortAtResult> extends GenericSingleKey<
TimeResult,
TimeLabeledResult
> {
labelResult(result: TimeResult): TimeLabeledResult
labelResult<TimeResult extends SortAtResult>(result: TimeResult) {
return { primary: result.sortAt }
}
labeledResultToCursor(labeled: TimeLabeledResult) {
return {
primary: new Date(labeled.primary).toISOString(),
}
}
cursorToLabeledResult(cursor: SingleKeyCursor) {
const primaryDate = new Date(cursor.primary)
if (isNaN(primaryDate.getTime())) {
throw new InvalidRequestError('Malformed cursor')
}
return {
primary: primaryDate.toISOString(),
}
}
}

export class IsoSortAtKey extends IsoTimeKey<{
sortAt: string
}> {
labelResult(result: { sortAt: string }) {
return { primary: result.sortAt }
}
}
Loading

0 comments on commit be80036

Please sign in to comment.