diff --git a/src/app/api/workers/resync-failed-files/route.ts b/src/app/api/workers/resync-failed-files/route.ts new file mode 100644 index 0000000..18767af --- /dev/null +++ b/src/app/api/workers/resync-failed-files/route.ts @@ -0,0 +1,6 @@ +import { resyncFailedFiles } from '@/features/workers/resync-failed-files/api/resync-failed-files.controller' +import { withErrorHandler } from '@/utils/withErrorHandler' + +export const maxDuration = 300 + +export const GET = withErrorHandler(resyncFailedFiles) diff --git a/src/config/server.env.ts b/src/config/server.env.ts index 44d0b7e..cd2d54d 100644 --- a/src/config/server.env.ts +++ b/src/config/server.env.ts @@ -12,6 +12,7 @@ const ServerEnvSchema = z.object({ DROPBOX_SCOPES: z.string().min(1), DROPBOX_API_URL: z.url(), TRIGGER_MACHINE: TriggerMachineSchema, + CRON_SECRET: z.string().min(1), }) const env = ServerEnvSchema.parse(process.env) diff --git a/src/features/workers/resync-failed-files/api/resync-failed-files.controller.ts b/src/features/workers/resync-failed-files/api/resync-failed-files.controller.ts new file mode 100644 index 0000000..a765f0f --- /dev/null +++ b/src/features/workers/resync-failed-files/api/resync-failed-files.controller.ts @@ -0,0 +1,16 @@ +import httpStatus from 'http-status' +import { type NextRequest, NextResponse } from 'next/server' +import env from '@/config/server.env' +import APIError from '@/errors/APIError' +import { ResyncService } from '@/features/workers/resync-failed-files/lib/resync-failed-files.service' + +export const resyncFailedFiles = async (request: NextRequest) => { + const authHeader = request.headers.get('authorization') + if (authHeader !== `Bearer ${env.CRON_SECRET}`) { + throw new APIError('Unauthorized', httpStatus.UNAUTHORIZED) + } + + const resyncService = new ResyncService() + await resyncService.resyncFailedFiles() + return NextResponse.json({ message: 'Succeslyfully trigger re syncing of files.' }) +} diff --git a/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts b/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts new file mode 100644 index 0000000..808aa1e --- /dev/null +++ b/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts @@ -0,0 +1,126 @@ +import { and, eq } from 'drizzle-orm' +import z from 'zod' +import env from '@/config/server.env' +import db from '@/db' +import { type FileSyncSelectType, fileFolderSync } from '@/db/schema/fileFolderSync.schema' +import APIError from '@/errors/APIError' +import { SyncService } from '@/features/sync/lib/Sync.service' +import { DropboxFileListFolderSingleEntrySchema } from '@/features/sync/types' +import { CopilotAPI } from '@/lib/copilot/CopilotAPI' +import { generateToken } from '@/lib/copilot/generateToken' +import User from '@/lib/copilot/models/User.model' +import { DropboxClient } from '@/lib/dropbox/DropboxClient' + +export const syncFailedFilesToAssembly = async ( + portalId: string, + failedSyncs: FileSyncSelectType[], +) => { + const dropboxConnection = await getDropboxConnection(portalId) + if (!dropboxConnection) { + console.warn('Dropbox account not found for portal:', portalId) + return null + } + + const { user, copilotApi, dbxClient, connectionToken, syncService } = + await initializeSyncDependencies(dropboxConnection, portalId) + + for (const failedSync of failedSyncs) { + await processFailedSync(failedSync, copilotApi, dbxClient, syncService, user, connectionToken) + } +} + +const getDropboxConnection = async (portalId: string) => { + const connection = await db.query.dropboxConnections.findFirst({ + where: (dropboxConnections, { eq }) => + and(eq(dropboxConnections.portalId, portalId), eq(dropboxConnections.status, true)), + }) + + if (!connection?.refreshToken || !connection?.accountId) { + console.error('⚠️ Dropbox connection not found for portal:', portalId) + return null + } + + return connection +} + +const initializeSyncDependencies = async ( + dropboxConnection: NonNullable>>, + portalId: string, +) => { + const { refreshToken, rootNamespaceId, accountId, initiatedBy } = dropboxConnection + + if (!refreshToken || !accountId || !rootNamespaceId) { + throw new APIError(`Dropbox connection not found for portal: ${portalId}`, 404) + } + + const token = generateToken(env.COPILOT_API_KEY, { + workspaceId: portalId, + internalUserId: initiatedBy, + }) + + const user = await User.authenticate(token) + const copilotApi = new CopilotAPI(token) + const dbxClient = new DropboxClient(refreshToken, rootNamespaceId) + + const connectionToken = { + refreshToken, + accountId, + rootNamespaceId, + } + + const syncService = new SyncService(user, connectionToken) + + return { user, copilotApi, dbxClient, connectionToken, syncService } +} + +const processFailedSync = async ( + failedSync: FileSyncSelectType, + copilotApi: CopilotAPI, + dbxClient: DropboxClient, + syncService: SyncService, + user: Awaited>, + connectionToken: { refreshToken: string; accountId: string; rootNamespaceId: string }, +) => { + const fileId = z.string().parse(failedSync.assemblyFileId) + const file = await copilotApi.retrieveFile(fileId) + + // Only proceed if file is missing or pending in Assembly + if (file && file.status !== 'pending') return + + const fileInDropbox = await getFileFromDropbox(dbxClient, failedSync.dbxFileId ?? '') + if (!fileInDropbox) return + + const channelSync = await db.query.channelSync.findFirst({ + where: (channelSync, { eq }) => eq(channelSync.id, failedSync.channelSyncId), + }) + + if (!channelSync) return + + // Sync file from Dropbox to Assembly + const payload = { + entry: DropboxFileListFolderSingleEntrySchema.parse(fileInDropbox), + opts: { + dbxRootPath: channelSync.dbxRootPath, + assemblyChannelId: channelSync.assemblyChannelId, + channelSyncId: channelSync.id, + user, + connectionToken, + }, + } + + await syncService.syncDropboxFilesToAssembly(payload) + await db.delete(fileFolderSync).where(eq(fileFolderSync.id, failedSync.id)) +} + +const getFileFromDropbox = async (dbxClient: DropboxClient, dropboxFileId: string) => { + if (!dropboxFileId) return null + + const dropboxClient = dbxClient.getDropboxClient() + + try { + const fileMetadata = await dropboxClient.filesGetMetadata({ path: dropboxFileId }) + return fileMetadata.result + } catch (_err) { + return null + } +} diff --git a/src/features/workers/resync-failed-files/lib/resync-failed-files.service.ts b/src/features/workers/resync-failed-files/lib/resync-failed-files.service.ts new file mode 100644 index 0000000..7b5c839 --- /dev/null +++ b/src/features/workers/resync-failed-files/lib/resync-failed-files.service.ts @@ -0,0 +1,44 @@ +import { and, isNull } from 'drizzle-orm' +import db from '@/db' +import { ObjectType } from '@/db/constants' +import { resyncFailedFilesInAssembly } from '@/trigger/processFileSync' +import type { FailedSyncWorkspaceMap } from '../utils/types' + +export class ResyncService { + async resyncFailedFiles() { + const failedSyncs = await db.query.fileFolderSync.findMany({ + where: (fileFolderSync, { eq }) => + and( + isNull(fileFolderSync.contentHash), + eq(fileFolderSync.object, ObjectType.FILE), + isNull(fileFolderSync.deletedAt), + ), + }) + + console.info('Total number of failed syncs: ', failedSyncs.length) + const failedSyncWorkspaceMap: FailedSyncWorkspaceMap = failedSyncs.reduce( + (acc: FailedSyncWorkspaceMap, failedSync) => { + const portalId = failedSync.portalId + + if (!acc[portalId]) { + acc[portalId] = [] + } + + acc[portalId].push(failedSync) + return acc + }, + {}, + ) + + for (const portalId in failedSyncWorkspaceMap) { + const failedSyncsForPortal = failedSyncWorkspaceMap[portalId] + resyncFailedFilesInAssembly.trigger({ + portalId, + failedSyncs: failedSyncsForPortal, + }) + console.info( + `Enqueued resync job for portal: ${portalId} with ${failedSyncsForPortal.length} files`, + ) + } + } +} diff --git a/src/features/workers/resync-failed-files/utils/types.ts b/src/features/workers/resync-failed-files/utils/types.ts new file mode 100644 index 0000000..fa68e84 --- /dev/null +++ b/src/features/workers/resync-failed-files/utils/types.ts @@ -0,0 +1,3 @@ +import type { FileSyncSelectType } from '@/db/schema/fileFolderSync.schema' + +export type FailedSyncWorkspaceMap = Record diff --git a/src/lib/copilot/CopilotAPI.ts b/src/lib/copilot/CopilotAPI.ts index b43b31d..ac51919 100644 --- a/src/lib/copilot/CopilotAPI.ts +++ b/src/lib/copilot/CopilotAPI.ts @@ -23,6 +23,8 @@ import { CopilotFileCreateSchema, type CopilotFileList, CopilotFileListSchema, + CopilotFileRetrieve, + CopilotFileRetrieveSchema, type CopilotListArgs, type CopilotPrice, CopilotPriceSchema, @@ -223,6 +225,11 @@ export class CopilotAPI { return CopilotFileListSchema.parse(list) } + async _retrieveFile(id: string): Promise { + const file = await this.copilot.retrieveFile({ id }) + return CopilotFileRetrieveSchema.parse(file) + } + async _retrieveFileChannel(id: string) { const fileChannel = await this.copilot.retrieveFileChannel({ id }) return CopilotFileChannelRetrieveSchema.parse(fileChannel) @@ -260,6 +267,7 @@ export class CopilotAPI { uploadFile = this.wrapWithRetry(this._uploadFile) deleteFile = this.wrapWithRetry(this._deleteFile) listFiles = this.wrapWithRetry(this._listFiles) + retrieveFile = this.wrapWithRetry(this._retrieveFile) retrieveFileChannel = this.wrapWithRetry(this._retrieveFileChannel) listFileChannels = this.wrapWithRetry(this._listFileChannels) } diff --git a/src/trigger/processFileSync.ts b/src/trigger/processFileSync.ts index d3ea855..dadf47e 100644 --- a/src/trigger/processFileSync.ts +++ b/src/trigger/processFileSync.ts @@ -3,7 +3,7 @@ import { and, eq, isNotNull } from 'drizzle-orm' import z from 'zod' import env from '@/config/server.env' import type { DropboxConnectionTokens } from '@/db/schema/dropboxConnections.schema' -import { fileFolderSync } from '@/db/schema/fileFolderSync.schema' +import { type FileSyncSelectType, fileFolderSync } from '@/db/schema/fileFolderSync.schema' import { MAX_FILES_LIMIT } from '@/features/sync/constant' import { MapFilesService } from '@/features/sync/lib/MapFiles.service' import { SyncService } from '@/features/sync/lib/Sync.service' @@ -15,6 +15,7 @@ import { type WhereClause, } from '@/features/sync/types' import { DropboxWebhook } from '@/features/webhook/dropbox/lib/webhook.service' +import { syncFailedFilesToAssembly } from '@/features/workers/resync-failed-files/helper/resync-failed-files.helper' import { CopilotAPI } from '@/lib/copilot/CopilotAPI' import type User from '@/lib/copilot/models/User.model' import { DropboxAuthClient } from '@/lib/dropbox/DropboxAuthClient' @@ -399,3 +400,19 @@ export const updateAssemblyFileInDropbox = task({ await syncAssemblyFileToDropbox.trigger(payload) }, }) + +export const resyncFailedFilesInAssembly = task({ + id: 'resync-failed-files-in-assembly', + machine, + queue: { + name: 'resync-failed-files-in-assembly', + concurrencyLimit: 5, + }, + retry: { + maxAttempts: 3, + }, + run: async (payload: { portalId: string; failedSyncs: FileSyncSelectType[] }) => { + const { portalId, failedSyncs } = payload + await syncFailedFilesToAssembly(portalId, failedSyncs) + }, +}) diff --git a/vercel.json b/vercel.json new file mode 100644 index 0000000..de818b3 --- /dev/null +++ b/vercel.json @@ -0,0 +1,11 @@ +{ + "$schema": "https://openapi.vercel.sh/vercel.json", + "regions": ["iad1", "pdx1"], + "buildCommand": "./scripts/build/build.sh", + "crons": [ + { + "path": "/api/workers/resync-failed-files", + "schedule": "0 0 * * *" + } + ] +}