Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions convex/crons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ crons.interval(

crons.interval(
'skill-stat-events',
{ minutes: 5 },
internal.skillStatEvents.processSkillStatEventsInternal,
{ batchSize: 100 },
{ minutes: 15 },
internal.skillStatEvents.processSkillStatEventsAction,
{},
)

export default crons
7 changes: 7 additions & 0 deletions convex/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ const skillStatEvents = defineTable({
.index('by_unprocessed', ['processedAt'])
.index('by_skill', ['skillId'])

const skillStatUpdateCursors = defineTable({
key: v.string(),
cursorCreationTime: v.optional(v.number()),
updatedAt: v.number(),
}).index('by_key', ['key'])

const soulEmbeddings = defineTable({
soulId: v.id('souls'),
versionId: v.id('soulVersions'),
Expand Down Expand Up @@ -450,6 +456,7 @@ export default defineSchema({
skillLeaderboards,
skillStatBackfillState,
skillStatEvents,
skillStatUpdateCursors,
comments,
skillReports,
soulComments,
Expand Down
8 changes: 8 additions & 0 deletions convex/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ export const searchSkills: ReturnType<typeof action> = action({
},
})

export const getBadgeMapsForSkills = internalQuery({
args: { skillIds: v.array(v.id('skills')) },
handler: async (ctx, args): Promise<Array<[Id<'skills'>, SkillBadgeMap]>> => {
const badgeMap = await getSkillBadgeMaps(ctx, args.skillIds)
return Array.from(badgeMap.entries())
},
})

export const hydrateResults = internalQuery({
args: { embeddingIds: v.array(v.id('skillEmbeddings')) },
handler: async (ctx, args): Promise<HydratedEntry[]> => {
Expand Down
290 changes: 289 additions & 1 deletion convex/skillStatEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { v } from 'convex/values'
import { internal } from './_generated/api'
import type { Doc, Id } from './_generated/dataModel'
import type { MutationCtx } from './_generated/server'
import { internalMutation } from './_generated/server'
import { internalAction, internalMutation, internalQuery } from './_generated/server'
import { applySkillStatDeltas, bumpDailySkillStats } from './lib/skillStats'

/**
Expand Down Expand Up @@ -278,3 +278,291 @@ export const processSkillStatEventsInternal = internalMutation({
return { processed: events.length }
},
})

// ============================================================================
// Action-based processing (cursor-based, runs outside transaction window)
// ============================================================================

const CURSOR_KEY = 'skill_stat_events'
const EVENT_BATCH_SIZE = 500
const MAX_SKILLS_PER_RUN = 500

/**
* Fetch a batch of events after the given cursor (by _creationTime).
* Returns events sorted by _creationTime ascending.
*/
export const getUnprocessedEventBatch = internalQuery({
args: {
cursorCreationTime: v.optional(v.number()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const limit = args.limit ?? EVENT_BATCH_SIZE
const cursor = args.cursorCreationTime

// Query events after the cursor using the built-in creation time index
const events = await ctx.db
.query('skillStatEvents')
.withIndex('by_creation_time', (q) =>
cursor !== undefined ? q.gt('_creationTime', cursor) : q,
)
.take(limit)
return events
},
})

/**
* Get the current cursor position from the cursors table.
*/
export const getStatEventCursor = internalQuery({
args: {},
handler: async (ctx) => {
const cursor = await ctx.db
.query('skillStatUpdateCursors')
.withIndex('by_key', (q) => q.eq('key', CURSOR_KEY))
.unique()
return cursor?.cursorCreationTime
},
})

/**
* Validator for skill deltas passed to the mutation.
*/
const skillDeltaValidator = v.object({
skillId: v.id('skills'),
downloads: v.number(),
stars: v.number(),
installsAllTime: v.number(),
installsCurrent: v.number(),
downloadEvents: v.array(v.number()),
installNewEvents: v.array(v.number()),
})

/**
* Apply aggregated stats to skills and update the cursor.
* This is a single atomic mutation that:
* 1. Updates all affected skills with their aggregated deltas
* 2. Updates daily stats for trending
* 3. Advances the cursor to the new position
*/
export const applyAggregatedStatsAndUpdateCursor = internalMutation({
args: {
skillDeltas: v.array(skillDeltaValidator),
newCursor: v.number(),
},
handler: async (ctx, args) => {
const now = Date.now()

// Process each skill's aggregated deltas
for (const delta of args.skillDeltas) {
const skill = await ctx.db.get(delta.skillId)

// Skill was deleted - skip
if (!skill) {
continue
}

// Apply aggregated deltas to skill stats
if (
delta.downloads !== 0 ||
delta.stars !== 0 ||
delta.installsAllTime !== 0 ||
delta.installsCurrent !== 0
) {
const patch = applySkillStatDeltas(skill, {
downloads: delta.downloads,
stars: delta.stars,
installsAllTime: delta.installsAllTime,
installsCurrent: delta.installsCurrent,
})
await ctx.db.patch(skill._id, {
...patch,
updatedAt: now,
})
}

// Update daily stats for trending/leaderboards
for (const occurredAt of delta.downloadEvents) {
await bumpDailySkillStats(ctx, { skillId: delta.skillId, now: occurredAt, downloads: 1 })
}
for (const occurredAt of delta.installNewEvents) {
await bumpDailySkillStats(ctx, { skillId: delta.skillId, now: occurredAt, installs: 1 })
}
}

// Update cursor position (upsert)
const existingCursor = await ctx.db
.query('skillStatUpdateCursors')
.withIndex('by_key', (q) => q.eq('key', CURSOR_KEY))
.unique()

if (existingCursor) {
await ctx.db.patch(existingCursor._id, {
cursorCreationTime: args.newCursor,
updatedAt: now,
})
} else {
await ctx.db.insert('skillStatUpdateCursors', {
key: CURSOR_KEY,
cursorCreationTime: args.newCursor,
updatedAt: now,
})
}

return { skillsUpdated: args.skillDeltas.length }
},
})

/**
* Action that processes skill stat events in batches outside the transaction window.
*
* Algorithm:
* 1. Get current cursor position
* 2. Fetch events in batches of 500, aggregating as we go
* 3. Stop when we have >= 500 unique skills OR run out of events
* 4. Call mutation to apply all deltas and update cursor atomically
* 5. Self-schedule if we stopped due to skill limit (not exhaustion)
*/
export const processSkillStatEventsAction = internalAction({
args: {},
handler: async (ctx) => {
// Get current cursor position (convert null to undefined for consistency)
const cursorResult = await ctx.runQuery(internal.skillStatEvents.getStatEventCursor)
let cursor: number | undefined = cursorResult ?? undefined

console.log(`[STAT-AGG] Starting aggregation, cursor=${cursor ?? 'none'}`)

// Aggregated deltas per skill
const aggregatedBySkill = new Map<
Id<'skills'>,
{
downloads: number
stars: number
installsAllTime: number
installsCurrent: number
downloadEvents: number[]
installNewEvents: number[]
}
>()

let maxCreationTime: number | undefined = cursor
let exhausted = false
let totalEventsFetched = 0

// Fetch and aggregate until we have enough skills or run out of events
while (aggregatedBySkill.size < MAX_SKILLS_PER_RUN) {
const events = await ctx.runQuery(internal.skillStatEvents.getUnprocessedEventBatch, {
cursorCreationTime: cursor,
limit: EVENT_BATCH_SIZE,
})

if (events.length === 0) {
exhausted = true
break
}

totalEventsFetched += events.length
const skillsBefore = aggregatedBySkill.size

// Aggregate events into per-skill deltas
for (const event of events) {
let skillDelta = aggregatedBySkill.get(event.skillId)
if (!skillDelta) {
skillDelta = {
downloads: 0,
stars: 0,
installsAllTime: 0,
installsCurrent: 0,
downloadEvents: [],
installNewEvents: [],
}
aggregatedBySkill.set(event.skillId, skillDelta)
}

// Apply event to aggregated deltas
switch (event.kind) {
case 'download':
skillDelta.downloads += 1
skillDelta.downloadEvents.push(event.occurredAt)
break
case 'star':
skillDelta.stars += 1
break
case 'unstar':
skillDelta.stars -= 1
break
case 'install_new':
skillDelta.installsAllTime += 1
skillDelta.installsCurrent += 1
skillDelta.installNewEvents.push(event.occurredAt)
break
case 'install_reactivate':
skillDelta.installsCurrent += 1
break
case 'install_deactivate':
skillDelta.installsCurrent -= 1
break
case 'install_clear':
if (event.delta) {
skillDelta.installsAllTime += event.delta.allTime
skillDelta.installsCurrent += event.delta.current
}
break
}

// Track highest _creationTime seen
if (maxCreationTime === undefined || event._creationTime > maxCreationTime) {
maxCreationTime = event._creationTime
}
}

// Update cursor for next batch fetch
cursor = events[events.length - 1]._creationTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Events with the same _creationTime can be lost due to cursor update logic. The code updates the working cursor to the last event's creation time, but when events share the same creation time timestamp, the next run will skip them because the query uses .gt() (strictly greater than) instead of inclusive comparison.

View Details
📝 Patch Details
diff --git a/convex/skillStatEvents.ts b/convex/skillStatEvents.ts
index 03d6722..03f2ddc 100644
--- a/convex/skillStatEvents.ts
+++ b/convex/skillStatEvents.ts
@@ -290,6 +290,15 @@ const MAX_SKILLS_PER_RUN = 500
 /**
  * Fetch a batch of events after the given cursor (by _creationTime).
  * Returns events sorted by _creationTime ascending.
+ *
+ * Uses .gte() (greater than or equal) instead of .gt() to ensure we don't skip
+ * events that share the same _creationTime as the cursor. Since events can be
+ * created within the same millisecond, using .gt() would lose events at the
+ * boundary when batches are split by skill limits.
+ *
+ * To avoid reprocessing: callers must use cursorCreationTime from a previous
+ * complete run (exhausted = true), or must be prepared to re-aggregate events
+ * with the same skillId (which is safe since aggregation is idempotent).
  */
 export const getUnprocessedEventBatch = internalQuery({
   args: {
@@ -300,11 +309,13 @@ export const getUnprocessedEventBatch = internalQuery({
     const limit = args.limit ?? EVENT_BATCH_SIZE
     const cursor = args.cursorCreationTime
 
-    // Query events after the cursor using the built-in creation time index
+    // Query events at or after the cursor using the built-in creation time index.
+    // We use .gte() (>=) instead of .gt() (>) to handle the case where multiple
+    // events share the same _creationTime but are split across batch boundaries.
     const events = await ctx.db
       .query('skillStatEvents')
       .withIndex('by_creation_time', (q) =>
-        cursor !== undefined ? q.gt('_creationTime', cursor) : q,
+        cursor !== undefined ? q.gte('_creationTime', cursor) : q,
       )
       .take(limit)
     return events

Analysis

Cursor-based pagination loses events with duplicate _creationTime timestamps

What fails: The processSkillStatEventsAction function in convex/skillStatEvents.ts can permanently lose unprocessed skill stat events when multiple events share the same _creationTime value and batch processing is interrupted by the skill limit.

How to reproduce:

  1. Insert stat events where multiple events have the same _creationTime timestamp (possible in Convex since events can be created within the same millisecond)
  2. Ensure enough events exist at that timestamp to span across batch boundaries
  3. Configure MAX_SKILLS_PER_RUN small enough that processing stops mid-batch (e.g., 2-3 unique skills)
  4. Run the action - it processes the first batch and hits the skill limit
  5. The cursor is saved as the maximum _creationTime seen
  6. Run the action again - the query uses .gt() (strictly greater than) to fetch the next batch
  7. Events with _creationTime equal to the saved cursor are permanently skipped

Result: Events that were never processed are lost, resulting in permanently missing stat updates for those events.

Expected: All events should be processed exactly once, even when they share the same _creationTime timestamp.

Root cause: The getUnprocessedEventBatch query at line 307 uses .gt('_creationTime', cursor) which performs a strictly greater-than comparison. When events are batched and the batch boundary falls in the middle of a group of events with the same _creationTime, subsequent queries will skip those boundary events.

Fix applied: Changed the query operator from .gt() to .gte() (greater than or equal) in the getUnprocessedEventBatch query function. This ensures events at the cursor timestamp are included in subsequent batches. The aggregation logic is idempotent (processing the same skill's events twice produces the same result), so minor reprocessing of boundary events is safe.

See: Convex index documentation on comparison operators - .gte() performs greater-than-or-equal comparison as opposed to .gt()'s strictly greater-than behavior.


console.log(
`[STAT-AGG] Fetched ${events.length} events, ${aggregatedBySkill.size - skillsBefore} new skills (${aggregatedBySkill.size} total)`,
)

// If we got fewer than requested, we've exhausted the events
if (events.length < EVENT_BATCH_SIZE) {
exhausted = true
break
}
}

// If we have nothing to process, we're done
if (aggregatedBySkill.size === 0 || maxCreationTime === undefined) {
console.log('[STAT-AGG] No events to process, done')
return { processed: 0, skillsUpdated: 0, exhausted: true }
}

// Convert map to array for mutation
const skillDeltas = Array.from(aggregatedBySkill.entries()).map(([skillId, delta]) => ({
skillId,
...delta,
}))

console.log(
`[STAT-AGG] Running mutation for ${skillDeltas.length} skills (${totalEventsFetched} total events)`,
)

// Apply all deltas and update cursor atomically
await ctx.runMutation(internal.skillStatEvents.applyAggregatedStatsAndUpdateCursor, {
skillDeltas,
newCursor: maxCreationTime,
})

// Self-schedule if we stopped because of skill limit, not exhaustion
if (!exhausted) {
console.log('[STAT-AGG] More events remaining, self-scheduling')
await ctx.scheduler.runAfter(0, internal.skillStatEvents.processSkillStatEventsAction, {})
} else {
console.log('[STAT-AGG] All events processed, done')
}

return {
skillsUpdated: skillDeltas.length,
exhausted,
}
},
})
Loading