diff --git a/convex/cleanup.ts b/convex/cleanup.ts index 5fcfcd655..3ef9b914f 100644 --- a/convex/cleanup.ts +++ b/convex/cleanup.ts @@ -1,7 +1,7 @@ import { v } from "convex/values"; import { internalMutation, internalQuery, type MutationCtx } from "./_generated/server"; import { internal } from "./_generated/api"; -import type { Id } from "./_generated/dataModel"; +import type { Doc, Id } from "./_generated/dataModel"; const delayInMs = parseFloat(process.env.DEBUG_FILE_CLEANUP_DELAY_MS ?? "500"); const debugFileCleanupBatchSize = parseInt(process.env.DEBUG_FILE_CLEANUP_BATCH_SIZE ?? "100"); @@ -363,3 +363,142 @@ export const getReferencedFiles = internalQuery({ } }, }); + +const maxMembersToProcess = 1000; +export const deleteDataForDeletedMembers = internalMutation({ + args: {}, + handler: async (ctx) => { + const deletedMembers = await ctx.db + .query("convexMembers") + .withIndex("byIsDeletedAccount", (q) => q.eq("isDeletedAccount", true)) + .take(maxMembersToProcess); + + console.log(`Processing ${deletedMembers.length} soft-deleted members`); + + for (const deletedMember of deletedMembers) { + await ctx.scheduler.runAfter(calculateJitteredDelay(50, 0.2), internal.cleanup.deleteDataForMember, { + memberId: deletedMember._id, + }); + } + }, +}); + +export const deleteDataForMember = internalMutation({ + args: { memberId: v.id("convexMembers") }, + handler: async (ctx, { memberId }) => { + const member: Doc<"convexMembers"> = (await ctx.db.get(memberId)) as Doc<"convexMembers">; + if (!member) { + throw new Error(`Member ${memberId} not found`); + } + if (!member.isDeletedAccount) { + throw new Error(`Member ${memberId} is not soft deleted`); + } + console.log(`Processing soft-deleted member ${memberId}`); + const sessions = await ctx.db + .query("sessions") + .withIndex("byMemberId", (q) => q.eq("memberId", memberId)) + .collect(); + + if (sessions.length === 0) { + console.log(`No sessions found for member ${memberId}, deleting member data`); + const projectCredentials = await ctx.db + .query("convexProjectCredentials") + .withIndex("byMemberId", (q) => q.eq("memberId", memberId)) + .collect(); + for (const projectCredential of projectCredentials) { + await ctx.db.delete(projectCredential._id); + } + const openaiTokens = await ctx.db + .query("memberOpenAITokens") + .withIndex("byMemberId", (q) => q.eq("memberId", memberId)) + .collect(); + for (const openaiToken of openaiTokens) { + await ctx.db.delete(openaiToken._id); + } + + const resendTokens = await ctx.db + .query("resendTokens") + .withIndex("byMemberId", (q) => q.eq("memberId", memberId)) + .collect(); + for (const resendToken of resendTokens) { + await ctx.db.delete(resendToken._id); + } + + const convexAdmins = await ctx.db + .query("convexAdmins") + .withIndex("byConvexMemberId", (q) => q.eq("memberId", memberId)) + .collect(); + for (const convexAdmin of convexAdmins) { + await ctx.db.delete(convexAdmin._id); + } + + await ctx.db.delete(memberId); + return; + } + + for (const session of sessions) { + const chats = await ctx.db + .query("chats") + .withIndex("byCreatorAndId", (q) => q.eq("creatorId", session._id)) + .collect(); + + if (chats.length === 0) { + console.log(`No chats found for session ${session._id}, deleting session`); + await ctx.db.delete(session._id); + return; + } + + for (const chat of chats) { + ctx.scheduler.runAfter(calculateJitteredDelay(50, 0.2), internal.cleanup.deleteChat, { + chatId: chat._id, + }); + } + } + }, +}); + +export const deleteChat = internalMutation({ + args: { chatId: v.id("chats") }, + handler: async (ctx, { chatId }) => { + const chat: Doc<"chats"> = (await ctx.db.get(chatId)) as Doc<"chats">; + if (!chat) { + throw new Error(`Chat ${chatId} not found`); + } + + const messages = await ctx.db + .query("chatMessagesStorageState") + .withIndex("byChatId", (q) => q.eq("chatId", chat._id)) + .collect(); + + if (messages.length === 0) { + console.log(`No messages found for chat ${chat._id}, deleting chat`); + await ctx.db.delete(chat._id); + return; + } + + const debugLogs = await ctx.db + .query("debugChatApiRequestLog") + .withIndex("byChatId", (q) => q.eq("chatId", chat._id)) + .collect(); + + for (const log of debugLogs) { + await ctx.db.delete(log._id); + } + + for (const message of messages) { + if (message.storageId) { + await ctx.storage.delete(message.storageId); + } + // also snapshot id + // also delete shares + await ctx.db.delete(message._id); + } + }, +}); + +function calculateJitteredDelay(baseDelayMs: number, jitterFactor: number) { + // Generate a random number between -jitterFactor and +jitterFactor + const randomJitter = (Math.random() * 2 - 1) * jitterFactor; + // Apply the jitter to the base delay + return baseDelayMs * (1 + randomJitter); +} diff --git a/convex/crons.ts b/convex/crons.ts index 04a6a0ef6..d05b0f319 100644 --- a/convex/crons.ts +++ b/convex/crons.ts @@ -10,4 +10,8 @@ crons.daily( { forReal: true, shouldScheduleNext: true, daysInactive: 14 }, ); +crons.( + "delete inactive users and chats" +) + export default crons; diff --git a/convex/schema.ts b/convex/schema.ts index a7e593fb5..235299aed 100644 --- a/convex/schema.ts +++ b/convex/schema.ts @@ -40,6 +40,7 @@ export default defineSchema({ apiKey: v.optional(apiKeyValidator), convexMemberId: v.optional(v.string()), softDeletedForWorkOSMerge: v.optional(v.boolean()), + isDeletedAccount: v.optional(v.boolean()), // Not authoritative, just a cache of the user's profile from WorkOS/provision host. cachedProfile: v.optional( v.object({ @@ -51,7 +52,8 @@ export default defineSchema({ ), }) .index("byTokenIdentifier", ["tokenIdentifier"]) - .index("byConvexMemberId", ["convexMemberId", "softDeletedForWorkOSMerge"]), + .index("byConvexMemberId", ["convexMemberId", "softDeletedForWorkOSMerge"]) + .index("byIsDeletedAccount", ["isDeletedAccount"]), /* * Admin status means being on the convex team on the provision host. @@ -115,7 +117,9 @@ export default defineSchema({ teamSlug: v.string(), memberId: v.optional(v.id("convexMembers")), projectDeployKey: v.string(), - }).index("bySlugs", ["teamSlug", "projectSlug"]), + }) + .index("bySlugs", ["teamSlug", "projectSlug"]) + .index("byMemberId", ["memberId"]), chatMessagesStorageState: defineTable({ chatId: v.id("chats"), storageId: v.union(v.id("_storage"), v.null()),