Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/app/api/workers/resync-failed-files/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { resyncFailedFiles } from '@/features/workers/resync-failed-files/api/resync-failed-files.controller'
import { withErrorHandler } from '@/utils/withErrorHandler'

export const maxDuration = 300

export const GET = withErrorHandler(resyncFailedFiles)
1 change: 1 addition & 0 deletions src/config/server.env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const ServerEnvSchema = z.object({
DROPBOX_SCOPES: z.string().min(1),
DROPBOX_API_URL: z.url(),
TRIGGER_MACHINE: TriggerMachineSchema,
CRON_SECRET: z.string().min(1),
})

const env = ServerEnvSchema.parse(process.env)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import httpStatus from 'http-status'
import { type NextRequest, NextResponse } from 'next/server'
import env from '@/config/server.env'
import APIError from '@/errors/APIError'
import { ResyncService } from '@/features/workers/resync-failed-files/lib/resync-failed-files.service'

export const resyncFailedFiles = async (request: NextRequest) => {
const authHeader = request.headers.get('authorization')
if (authHeader !== `Bearer ${env.CRON_SECRET}`) {
throw new APIError('Unauthorized', httpStatus.UNAUTHORIZED)
}

const resyncService = new ResyncService()
await resyncService.resyncFailedFiles()
return NextResponse.json({ message: 'Succeslyfully trigger re syncing of files.' })
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { and, eq } from 'drizzle-orm'
import z from 'zod'
import env from '@/config/server.env'
import db from '@/db'
import { type FileSyncSelectType, fileFolderSync } from '@/db/schema/fileFolderSync.schema'
import APIError from '@/errors/APIError'
import { SyncService } from '@/features/sync/lib/Sync.service'
import { DropboxFileListFolderSingleEntrySchema } from '@/features/sync/types'
import { CopilotAPI } from '@/lib/copilot/CopilotAPI'
import { generateToken } from '@/lib/copilot/generateToken'
import User from '@/lib/copilot/models/User.model'
import { DropboxClient } from '@/lib/dropbox/DropboxClient'

export const syncFailedFilesToAssembly = async (
portalId: string,
failedSyncs: FileSyncSelectType[],
) => {
const dropboxConnection = await getDropboxConnection(portalId)
if (!dropboxConnection) {
console.warn('Dropbox account not found for portal:', portalId)
return null
}

const { user, copilotApi, dbxClient, connectionToken, syncService } =
await initializeSyncDependencies(dropboxConnection, portalId)

for (const failedSync of failedSyncs) {
await processFailedSync(failedSync, copilotApi, dbxClient, syncService, user, connectionToken)
}
}

const getDropboxConnection = async (portalId: string) => {
const connection = await db.query.dropboxConnections.findFirst({
where: (dropboxConnections, { eq }) =>
and(eq(dropboxConnections.portalId, portalId), eq(dropboxConnections.status, true)),
})

if (!connection?.refreshToken || !connection?.accountId) {
console.error('⚠️ Dropbox connection not found for portal:', portalId)
return null
}

return connection
}

const initializeSyncDependencies = async (
dropboxConnection: NonNullable<Awaited<ReturnType<typeof getDropboxConnection>>>,
portalId: string,
) => {
const { refreshToken, rootNamespaceId, accountId, initiatedBy } = dropboxConnection

if (!refreshToken || !accountId || !rootNamespaceId) {
throw new APIError(`Dropbox connection not found for portal: ${portalId}`, 404)
}

const token = generateToken(env.COPILOT_API_KEY, {
workspaceId: portalId,
internalUserId: initiatedBy,
})

const user = await User.authenticate(token)
const copilotApi = new CopilotAPI(token)
const dbxClient = new DropboxClient(refreshToken, rootNamespaceId)

const connectionToken = {
refreshToken,
accountId,
rootNamespaceId,
}

const syncService = new SyncService(user, connectionToken)

return { user, copilotApi, dbxClient, connectionToken, syncService }
}

const processFailedSync = async (
failedSync: FileSyncSelectType,
copilotApi: CopilotAPI,
dbxClient: DropboxClient,
syncService: SyncService,
user: Awaited<ReturnType<typeof User.authenticate>>,
connectionToken: { refreshToken: string; accountId: string; rootNamespaceId: string },
) => {
const fileId = z.string().parse(failedSync.assemblyFileId)
const file = await copilotApi.retrieveFile(fileId)

// Only proceed if file is missing or pending in Assembly
if (file && file.status !== 'pending') return

const fileInDropbox = await getFileFromDropbox(dbxClient, failedSync.dbxFileId ?? '')
if (!fileInDropbox) return

const channelSync = await db.query.channelSync.findFirst({
where: (channelSync, { eq }) => eq(channelSync.id, failedSync.channelSyncId),
})

if (!channelSync) return

// Sync file from Dropbox to Assembly
const payload = {
entry: DropboxFileListFolderSingleEntrySchema.parse(fileInDropbox),
opts: {
dbxRootPath: channelSync.dbxRootPath,
assemblyChannelId: channelSync.assemblyChannelId,
channelSyncId: channelSync.id,
user,
connectionToken,
},
}

await syncService.syncDropboxFilesToAssembly(payload)
await db.delete(fileFolderSync).where(eq(fileFolderSync.id, failedSync.id))
}

const getFileFromDropbox = async (dbxClient: DropboxClient, dropboxFileId: string) => {
if (!dropboxFileId) return null

const dropboxClient = dbxClient.getDropboxClient()

try {
const fileMetadata = await dropboxClient.filesGetMetadata({ path: dropboxFileId })
return fileMetadata.result
} catch (_err) {
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { and, isNull } from 'drizzle-orm'
import db from '@/db'
import { ObjectType } from '@/db/constants'
import { resyncFailedFilesInAssembly } from '@/trigger/processFileSync'
import type { FailedSyncWorkspaceMap } from '../utils/types'

export class ResyncService {
async resyncFailedFiles() {
const failedSyncs = await db.query.fileFolderSync.findMany({
where: (fileFolderSync, { eq }) =>
and(
isNull(fileFolderSync.contentHash),
eq(fileFolderSync.object, ObjectType.FILE),
isNull(fileFolderSync.deletedAt),
),
})

console.info('Total number of failed syncs: ', failedSyncs.length)
const failedSyncWorkspaceMap: FailedSyncWorkspaceMap = failedSyncs.reduce(
(acc: FailedSyncWorkspaceMap, failedSync) => {
const portalId = failedSync.portalId

if (!acc[portalId]) {
acc[portalId] = []
}

acc[portalId].push(failedSync)
return acc
},
{},
)

for (const portalId in failedSyncWorkspaceMap) {
const failedSyncsForPortal = failedSyncWorkspaceMap[portalId]
resyncFailedFilesInAssembly.trigger({
portalId,
failedSyncs: failedSyncsForPortal,
})
console.info(
`Enqueued resync job for portal: ${portalId} with ${failedSyncsForPortal.length} files`,
)
}
}
}
3 changes: 3 additions & 0 deletions src/features/workers/resync-failed-files/utils/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import type { FileSyncSelectType } from '@/db/schema/fileFolderSync.schema'

export type FailedSyncWorkspaceMap = Record<string, FileSyncSelectType[]>
8 changes: 8 additions & 0 deletions src/lib/copilot/CopilotAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
CopilotFileCreateSchema,
type CopilotFileList,
CopilotFileListSchema,
CopilotFileRetrieve,
CopilotFileRetrieveSchema,
type CopilotListArgs,
type CopilotPrice,
CopilotPriceSchema,
Expand Down Expand Up @@ -223,6 +225,11 @@ export class CopilotAPI {
return CopilotFileListSchema.parse(list)
}

async _retrieveFile(id: string): Promise<CopilotFileRetrieve> {
const file = await this.copilot.retrieveFile({ id })
return CopilotFileRetrieveSchema.parse(file)
}

async _retrieveFileChannel(id: string) {
const fileChannel = await this.copilot.retrieveFileChannel({ id })
return CopilotFileChannelRetrieveSchema.parse(fileChannel)
Expand Down Expand Up @@ -260,6 +267,7 @@ export class CopilotAPI {
uploadFile = this.wrapWithRetry(this._uploadFile)
deleteFile = this.wrapWithRetry(this._deleteFile)
listFiles = this.wrapWithRetry(this._listFiles)
retrieveFile = this.wrapWithRetry(this._retrieveFile)
retrieveFileChannel = this.wrapWithRetry(this._retrieveFileChannel)
listFileChannels = this.wrapWithRetry(this._listFileChannels)
}
19 changes: 18 additions & 1 deletion src/trigger/processFileSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { and, eq, isNotNull } from 'drizzle-orm'
import z from 'zod'
import env from '@/config/server.env'
import type { DropboxConnectionTokens } from '@/db/schema/dropboxConnections.schema'
import { fileFolderSync } from '@/db/schema/fileFolderSync.schema'
import { type FileSyncSelectType, fileFolderSync } from '@/db/schema/fileFolderSync.schema'
import { MAX_FILES_LIMIT } from '@/features/sync/constant'
import { MapFilesService } from '@/features/sync/lib/MapFiles.service'
import { SyncService } from '@/features/sync/lib/Sync.service'
Expand All @@ -15,6 +15,7 @@ import {
type WhereClause,
} from '@/features/sync/types'
import { DropboxWebhook } from '@/features/webhook/dropbox/lib/webhook.service'
import { syncFailedFilesToAssembly } from '@/features/workers/resync-failed-files/helper/resync-failed-files.helper'
import { CopilotAPI } from '@/lib/copilot/CopilotAPI'
import type User from '@/lib/copilot/models/User.model'
import { DropboxAuthClient } from '@/lib/dropbox/DropboxAuthClient'
Expand Down Expand Up @@ -399,3 +400,19 @@ export const updateAssemblyFileInDropbox = task({
await syncAssemblyFileToDropbox.trigger(payload)
},
})

export const resyncFailedFilesInAssembly = task({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider defining machine here since single portal could have many files and can be long running as well. We have TRIGGER_MACHINE env that you can import from server.env.ts file and use that here.

id: 'resync-failed-files-in-assembly',
machine,
queue: {
name: 'resync-failed-files-in-assembly',
concurrencyLimit: 5,
},
retry: {
maxAttempts: 3,
},
run: async (payload: { portalId: string; failedSyncs: FileSyncSelectType[] }) => {
const { portalId, failedSyncs } = payload
await syncFailedFilesToAssembly(portalId, failedSyncs)
},
})
11 changes: 11 additions & 0 deletions vercel.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"$schema": "https://openapi.vercel.sh/vercel.json",
"regions": ["iad1", "pdx1"],
"buildCommand": "./scripts/build/build.sh",
"crons": [
{
"path": "/api/workers/resync-failed-files",
"schedule": "0 0 * * *"
}
]
}