diff --git a/src/components/modals/AddModal/index.tsx b/src/components/modals/AddModal/index.tsx index 0f8d36284..2acf91e6d 100644 --- a/src/components/modals/AddModal/index.tsx +++ b/src/components/modals/AddModal/index.tsx @@ -34,6 +34,7 @@ import useGetColor from '../../../hooks/useColor'; import network from '../../../network'; import analytics, { DriveAnalyticsEvent } from '../../../services/AnalyticsService'; import { constants } from '../../../services/AppService'; +import { uploadQueueService } from '../../../services/drive/file/uploadQueue.service'; import { createUploadingFiles, handleDuplicateFiles, @@ -55,7 +56,7 @@ import AppText from '../../AppText'; import BottomModal from '../BottomModal'; import CreateFolderModal from '../CreateFolderModal'; -const MAX_FILES_BULK_UPLOAD = 25; +const MAX_FILES_BULK_UPLOAD = 50; function AddModal(): JSX.Element { const tailwind = useTailwind(); @@ -368,37 +369,47 @@ function AddModal(): JSX.Element { const { filesToUpload, filesExcluded } = validateAndFilterFiles(documents); showFileSizeAlert(filesExcluded); - const filesToProcess = await handleDuplicateFiles(filesToUpload, focusedFolder.uuid); - if (filesToProcess.length === 0) { + + if (filesToUpload.length === 0) { dispatch(uiActions.setShowUploadFileModal(false)); return; } - const preparedFiles = await prepareUploadFiles(filesToProcess, focusedFolder.uuid); - const formattedFiles = createUploadingFiles(preparedFiles, focusedFolder); + const batchId = `batch-${Date.now()}`; + const targetFolder = focusedFolder; - initializeUploads(formattedFiles, dispatch); + return uploadQueueService.enqueue(batchId, async () => { + const filesToProcess = await handleDuplicateFiles(filesToUpload, targetFolder.uuid); + if (filesToProcess.length === 0) { + return; + } - const processedFileIds: number[] = []; + const preparedFiles = await prepareUploadFiles(filesToProcess, targetFolder.uuid); + const formattedFiles = createUploadingFiles(preparedFiles, targetFolder); - for (const file of formattedFiles) { - try { - logger.info(`User from redux when upload: ${user?.username}, bucket: ${user?.bucket}`); - await uploadSingleFile(file, dispatch, uploadFile, uploadSuccess, user); - } catch (error) { - logger.error(`File ${file.name} failed to upload:`, error); + initializeUploads(formattedFiles, dispatch); - notificationsService.show({ - type: NotificationType.Error, - text1: strings.formatString(strings.errors.uploadFile, (error as Error).message) as string, - }); - } finally { - processedFileIds.push(file.id); + const processedFileIds: number[] = []; + const batchFileIds = formattedFiles.map((f) => f.id); + + for (const file of formattedFiles) { + try { + logger.info(`User from redux when upload: ${user?.username}, bucket: ${user?.bucket}`); + await uploadSingleFile(file, dispatch, uploadFile, uploadSuccess, user); + } catch (error) { + logger.error(`File ${file.name} failed to upload:`, error); + notificationsService.show({ + type: NotificationType.Error, + text1: strings.formatString(strings.errors.uploadFile, (error as Error).message) as string, + }); + } finally { + processedFileIds.push(file.id); + } } - } - cleanupStuckUploads(processedFileIds, formattedFiles); - dispatch(driveActions.clearUploadedFiles()); + cleanupStuckUploads(processedFileIds, formattedFiles); + dispatch(driveActions.clearBatchFiles(batchFileIds)); + }); } /** @@ -487,7 +498,7 @@ function AddModal(): JSX.Element { if (!fileSize) { try { const fileInfo = fileSystemService.getFileInfo(cleanUri); - fileSize = fileInfo.exists ? fileInfo.size ?? 0 : 0; + fileSize = fileInfo.exists ? (fileInfo.size ?? 0) : 0; } catch (error) { logger.warn('The file size could not be obtained:', error); fileSize = 0; @@ -580,7 +591,7 @@ function AddModal(): JSX.Element { if (!fileSize) { try { const fileInfo = fileSystemService.getFileInfo(cleanUri); - fileSize = fileInfo.exists ? fileInfo.size ?? 0 : 0; + fileSize = fileInfo.exists ? (fileInfo.size ?? 0) : 0; } catch (error) { logger.warn('The file size could not be obtained:', error); fileSize = 0; @@ -679,7 +690,7 @@ function AddModal(): JSX.Element { const fileInfo = fileSystemService.getFileInfo(assetToUpload.uri); const formatInfo = detectImageFormat(assetToUpload); const name = drive.file.removeExtension(assetToUpload.uri.split('/').pop() as string); - const size = fileInfo.exists ? fileInfo.size ?? 0 : 0; + const size = fileInfo.exists ? (fileInfo.size ?? 0) : 0; const file: UploadingFile = { id: new Date().getTime(), diff --git a/src/services/drive/file/uploadQueue.service.spec.ts b/src/services/drive/file/uploadQueue.service.spec.ts new file mode 100644 index 000000000..700ba4e1d --- /dev/null +++ b/src/services/drive/file/uploadQueue.service.spec.ts @@ -0,0 +1,313 @@ +import { UploadQueueService } from './uploadQueue.service'; + +jest.mock('@internxt-mobile/services/common', () => ({ + logger: { + info: jest.fn(), + error: jest.fn(), + }, +})); + +type Deferred = { + promise: Promise; + resolve: () => void; + reject: (error: Error) => void; +}; + +const createDeferred = (): Deferred => { + let resolve!: () => void; + let reject!: (error: Error) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +}; + +describe('Upload Queue Service', () => { + let sut: UploadQueueService; + + beforeEach(() => { + sut = new UploadQueueService(); + }); + + describe('When enqueueing a single job', () => { + it('when a job is enqueued, then it executes immediately', async () => { + const jobExecuted = jest.fn(); + + await sut.enqueue('batch-1', async () => { + jobExecuted(); + }); + + expect(jobExecuted).toHaveBeenCalledTimes(1); + }); + + it('when a job completes successfully, then the enqueue promise resolves', async () => { + const result = sut.enqueue('batch-1', async () => { + // no-op + }); + + await expect(result).resolves.toBeUndefined(); + }); + + it('when a job throws an error, then the enqueue promise rejects with that error', async () => { + const error = new Error('upload failed'); + + const result = sut.enqueue('batch-1', async () => { + throw error; + }); + + await expect(result).rejects.toBe(error); + }); + }); + + describe('When enqueueing multiple jobs', () => { + it('when multiple jobs are enqueued, then they execute in FIFO order', async () => { + const executionOrder: string[] = []; + + const promise1 = sut.enqueue('batch-1', async () => { + executionOrder.push('first'); + }); + + const promise2 = sut.enqueue('batch-2', async () => { + executionOrder.push('second'); + }); + + const promise3 = sut.enqueue('batch-3', async () => { + executionOrder.push('third'); + }); + + await Promise.all([promise1, promise2, promise3]); + + expect(executionOrder).toEqual(['first', 'second', 'third']); + }); + + it('when a job is running, then subsequent jobs wait until it completes', async () => { + const deferred = createDeferred(); + const executionOrder: string[] = []; + + const promise1 = sut.enqueue('batch-1', async () => { + executionOrder.push('first-start'); + await deferred.promise; + executionOrder.push('first-end'); + }); + + const promise2 = sut.enqueue('batch-2', async () => { + executionOrder.push('second'); + }); + + // At this point, first job is running but blocked, second is waiting + // Give microtasks time to settle + await Promise.resolve(); + + expect(executionOrder).toEqual(['first-start']); + expect(sut.isBusy()).toBe(true); + + // Unblock the first job + deferred.resolve(); + await Promise.all([promise1, promise2]); + + expect(executionOrder).toEqual(['first-start', 'first-end', 'second']); + }); + + it('when a job fails, then the next job still executes', async () => { + const error = new Error('batch-1 failed'); + const secondJobExecuted = jest.fn(); + + const promise1 = sut.enqueue('batch-1', async () => { + throw error; + }); + + const promise2 = sut.enqueue('batch-2', async () => { + secondJobExecuted(); + }); + + await expect(promise1).rejects.toBe(error); + await promise2; + + expect(secondJobExecuted).toHaveBeenCalledTimes(1); + }); + + it('when multiple jobs fail, then all subsequent jobs still execute', async () => { + const executionOrder: string[] = []; + + const promise1 = sut.enqueue('batch-1', async () => { + executionOrder.push('first'); + throw new Error('fail-1'); + }); + + const promise2 = sut.enqueue('batch-2', async () => { + executionOrder.push('second'); + throw new Error('fail-2'); + }); + + const promise3 = sut.enqueue('batch-3', async () => { + executionOrder.push('third'); + }); + + await promise1.catch(() => undefined); + await promise2.catch(() => undefined); + await promise3; + + expect(executionOrder).toEqual(['first', 'second', 'third']); + }); + }); + + describe('When checking queue state', () => { + it('when no jobs are running, then isBusy returns false', () => { + expect(sut.isBusy()).toBe(false); + }); + + it('when a job is running, then isBusy returns true', () => { + const deferred = createDeferred(); + + sut.enqueue('batch-1', () => deferred.promise); + + expect(sut.isBusy()).toBe(true); + + deferred.resolve(); + }); + + it('when all jobs complete, then isBusy returns false', async () => { + await sut.enqueue('batch-1', async () => { return; }); + + expect(sut.isBusy()).toBe(false); + }); + + it('when no jobs are pending, then getPendingCount returns 0', () => { + expect(sut.getPendingCount()).toBe(0); + }); + + it('when jobs are waiting behind an active job, then getPendingCount returns the correct count', () => { + const deferred = createDeferred(); + + sut.enqueue('batch-1', () => deferred.promise); + sut.enqueue('batch-2', async () => { return; }); + sut.enqueue('batch-3', async () => { return; }); + + // batch-1 is active, batch-2 and batch-3 are pending + expect(sut.getPendingCount()).toBe(2); + + deferred.resolve(); + }); + + it('when all jobs complete, then getPendingCount returns 0', async () => { + const promise1 = sut.enqueue('batch-1', async () => { return; }); + const promise2 = sut.enqueue('batch-2', async () => { return; }); + + await Promise.all([promise1, promise2]); + + expect(sut.getPendingCount()).toBe(0); + }); + }); + + describe('When clearing the pending queue', () => { + it('when clearPending is called, then waiting jobs are removed from the queue', () => { + const deferred = createDeferred(); + + sut.enqueue('batch-1', () => deferred.promise); + sut.enqueue('batch-2', async () => { return; }); + sut.enqueue('batch-3', async () => { return; }); + + expect(sut.getPendingCount()).toBe(2); + + sut.clearPending(); + + expect(sut.getPendingCount()).toBe(0); + + deferred.resolve(); + }); + + it('when clearPending is called, then the active job continues running to completion', async () => { + const deferred = createDeferred(); + const activeJobCompleted = jest.fn(); + + const promise1 = sut.enqueue('batch-1', async () => { + await deferred.promise; + activeJobCompleted(); + }); + + sut.enqueue('batch-2', async () => { return; }); + + sut.clearPending(); + + // Active job should still be running + expect(sut.isBusy()).toBe(true); + + deferred.resolve(); + await promise1; + + expect(activeJobCompleted).toHaveBeenCalledTimes(1); + }); + + it('when clearPending is called, then cleared jobs never execute', async () => { + const deferred = createDeferred(); + const clearedJobExecuted = jest.fn(); + + const promise1 = sut.enqueue('batch-1', () => deferred.promise); + + // These will be cleared + sut.enqueue('batch-2', async () => { + clearedJobExecuted(); + }); + sut.enqueue('batch-3', async () => { + clearedJobExecuted(); + }); + + sut.clearPending(); + + deferred.resolve(); + await promise1; + + // Give microtasks time to settle in case something would try to run + await new Promise((r) => setTimeout(r, 50)); + + expect(clearedJobExecuted).not.toHaveBeenCalled(); + }); + }); + + describe('When handling concurrent enqueue calls', () => { + it('when two jobs are enqueued simultaneously, then both complete and resolve independently', async () => { + const deferred1 = createDeferred(); + const deferred2 = createDeferred(); + + let result1Resolved = false; + let result2Resolved = false; + + const promise1 = sut.enqueue('batch-1', () => deferred1.promise).then(() => { + result1Resolved = true; + }); + + const promise2 = sut.enqueue('batch-2', () => deferred2.promise).then(() => { + result2Resolved = true; + }); + + // Complete first job + deferred1.resolve(); + await promise1; + + expect(result1Resolved).toBe(true); + expect(result2Resolved).toBe(false); + + // Complete second job + deferred2.resolve(); + await promise2; + + expect(result2Resolved).toBe(true); + }); + + it('when a failing job is followed by a succeeding job, then each promise settles correctly', async () => { + const error = new Error('first failed'); + + const promise1 = sut.enqueue('batch-1', async () => { + throw error; + }); + + const promise2 = sut.enqueue('batch-2', async () => { + // success + }); + + await expect(promise1).rejects.toBe(error); + await expect(promise2).resolves.toBeUndefined(); + }); + }); +}); diff --git a/src/services/drive/file/uploadQueue.service.ts b/src/services/drive/file/uploadQueue.service.ts new file mode 100644 index 000000000..1b300ef26 --- /dev/null +++ b/src/services/drive/file/uploadQueue.service.ts @@ -0,0 +1,92 @@ +import { logger } from '@internxt-mobile/services/common'; + +type BatchJob = () => Promise; + +interface QueueEntry { + job: BatchJob; + resolve: () => void; + reject: (error: unknown) => void; + batchId: string; +} + +export class UploadQueueService { + private queue: QueueEntry[] = []; + private isProcessing = false; + + /** + * Enqueue a batch upload job. Jobs execute serially — each batch + * waits for the previous one to fully complete before starting. + * This ensures duplicate checks see all previously uploaded files. + */ + enqueue(batchId: string, job: BatchJob): Promise { + logger.info( + `[UploadQueue] enqueue called - batchId: ${batchId}, isProcessing: ${this.isProcessing}, pendingCount: ${this.queue.length}`, + ); + return new Promise((resolve, reject) => { + this.queue.push({ job, resolve, reject, batchId }); + logger.info(`[UploadQueue] Entry added to queue, new queue length: ${this.queue.length}`); + this.processNext(); + }); + } + + private async processNext(): Promise { + logger.info( + `[UploadQueue] processNext called - isProcessing: ${this.isProcessing}, queueLength: ${this.queue.length}`, + ); + + if (this.isProcessing) { + logger.info('[UploadQueue] Already processing, skipping'); + return; + } + + const entry = this.queue.shift(); + if (!entry) { + logger.info('[UploadQueue] Queue empty, nothing to process'); + return; + } + + this.isProcessing = true; + logger.info( + `[UploadQueue] Starting batch: ${entry.batchId}, remaining in queue: ${this.queue.length}`, + ); + + try { + await entry.job(); + logger.info(`[UploadQueue] Batch ${entry.batchId} job completed successfully, calling resolve`); + entry.resolve(); + } catch (error) { + logger.error( + `[UploadQueue] Batch ${entry.batchId} job threw error:`, + JSON.stringify(error), + (error as Error)?.message, + (error as Error)?.stack, + ); + entry.reject(error); + } finally { + logger.info( + `[UploadQueue] Finally block for batch: ${entry.batchId}, setting isProcessing=false, queueLength: ${this.queue.length}`, + ); + this.isProcessing = false; + this.processNext(); + } + } + + getPendingCount(): number { + return this.queue.length; + } + + isBusy(): boolean { + return this.isProcessing; + } + + /** + * Clear the pending queue (does NOT abort the active batch). + * Used on logout or critical error recovery. + */ + clearPending(): void { + this.queue = []; + logger.info('[UploadQueue] Pending queue cleared'); + } +} + +export const uploadQueueService = new UploadQueueService(); diff --git a/src/store/slices/drive/index.ts b/src/store/slices/drive/index.ts index b06699fbd..7acdb83a5 100644 --- a/src/store/slices/drive/index.ts +++ b/src/store/slices/drive/index.ts @@ -417,6 +417,10 @@ export const driveSlice = createSlice({ clearUploadedFiles(state) { state.uploadingFiles = []; }, + clearBatchFiles(state, action: PayloadAction) { + const batchFileIds = new Set(action.payload); + state.uploadingFiles = state.uploadingFiles.filter((file) => !batchFileIds.has(file.id)); + }, uploadFileFinished(state) { state.isLoading = false; state.isUploading = false;