diff --git a/src/apps/renderer/pages/Widget/Item.tsx b/src/apps/renderer/pages/Widget/Item.tsx index f6600616cb..2c2bda30ab 100644 --- a/src/apps/renderer/pages/Widget/Item.tsx +++ b/src/apps/renderer/pages/Widget/Item.tsx @@ -7,16 +7,17 @@ import { fileIcon } from '../../assets/icons/getIcon'; export function Item({ name, action, progress }: DriveOperationInfo) { const { translate } = useTranslationContext(); - const progressDisplay = progress ? `${Math.ceil(progress * 100)}%` : ''; + const hasProgress = progress !== undefined && progress !== null; + const progressDisplay = hasProgress ? `${Math.ceil(progress * 100)}%` : ''; let description = ''; if (action === 'DOWNLOADING') { - description = progress + description = hasProgress ? translate('widget.body.activity.operation.downloading') : translate('widget.body.activity.operation.decrypting'); } else if (action === 'UPLOADING') { - description = progress + description = hasProgress ? translate('widget.body.activity.operation.uploading') : translate('widget.body.activity.operation.encrypting'); } else if (action === 'DOWNLOADED') { diff --git a/src/apps/shared/fs/ReadStreamToBuffer.ts b/src/apps/shared/fs/ReadStreamToBuffer.ts deleted file mode 100644 index a8155923fb..0000000000 --- a/src/apps/shared/fs/ReadStreamToBuffer.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Readable, Writable, pipeline } from 'stream'; -import { promisify } from 'util'; -const promisifiedPipeline = promisify(pipeline); - -export class ReadStreamToBuffer { - static async read(stream: Readable): Promise { - const bufferArray: any[] = []; - - const bufferWriter = new Writable({ - write: (chunk, _, callback) => { - bufferArray.push(chunk); - callback(); - }, - }); - - await promisifiedPipeline(stream, bufferWriter); - - return Buffer.concat(bufferArray); - } -} diff --git a/src/apps/shared/fs/read-stream-to-buffer.test.ts b/src/apps/shared/fs/read-stream-to-buffer.test.ts new file mode 100644 index 0000000000..d36eeffbd1 --- /dev/null +++ b/src/apps/shared/fs/read-stream-to-buffer.test.ts @@ -0,0 +1,48 @@ +import { Readable } from 'stream'; +import { readStreamToBuffer } from './read-stream-to-buffer'; +import { calls } from 'tests/vitest/utils.helper'; + +describe('readStreamToBuffer', () => { + it('returns the full buffer and reports progress', async () => { + const onProgress = vi.fn(); + const stream = Readable.from([Buffer.from('he'), Buffer.from('llo')]); + + const result = await readStreamToBuffer({ stream, onProgress }); + + expect(result).toEqual(Buffer.from('hello')); + calls(onProgress).toHaveLength(2); + calls(onProgress).toMatchObject([2, 5]); + }); + + it('returns an empty buffer and does not call onProgress when stream is empty', async () => { + const onProgress = vi.fn(); + const stream = Readable.from([]); + + const result = await readStreamToBuffer({ stream, onProgress }); + + expect(result).toEqual(Buffer.alloc(0)); + calls(onProgress).toHaveLength(0); + }); + + it('handles a single chunk correctly', async () => { + const onProgress = vi.fn(); + const stream = Readable.from([Buffer.from('world')]); + + const result = await readStreamToBuffer({ stream, onProgress }); + + expect(result).toEqual(Buffer.from('world')); + calls(onProgress).toHaveLength(1); + calls(onProgress).toMatchObject([5]); + }); + + it('rejects when the stream emits an error', async () => { + const onProgress = vi.fn(); + const stream = new Readable({ + read() { + this.destroy(new Error('stream failure')); + }, + }); + + await expect(readStreamToBuffer({ stream, onProgress })).rejects.toThrow('stream failure'); + }); +}); diff --git a/src/apps/shared/fs/read-stream-to-buffer.ts b/src/apps/shared/fs/read-stream-to-buffer.ts new file mode 100644 index 0000000000..5625d66d0e --- /dev/null +++ b/src/apps/shared/fs/read-stream-to-buffer.ts @@ -0,0 +1,28 @@ +import { Readable, Writable, pipeline } from 'stream'; +import { promisify } from 'util'; + +const promisifiedPipeline = promisify(pipeline); + +type Props = { + stream: Readable; + onProgress: (bytesWritten: number) => void; +}; + +export async function readStreamToBuffer({ stream, onProgress }: Props) { + const bufferArray: any[] = []; + let bytesWritten = 0; + + const bufferWriter = new Writable({ + write: (chunk, _, callback) => { + bufferArray.push(chunk); + bytesWritten += chunk.length; + onProgress(bytesWritten); + + callback(); + }, + }); + + await promisifiedPipeline(stream, bufferWriter); + + return Buffer.concat(bufferArray); +} diff --git a/src/apps/shared/fs/write-readable-to-file.test.ts b/src/apps/shared/fs/write-readable-to-file.test.ts new file mode 100644 index 0000000000..2186d09987 --- /dev/null +++ b/src/apps/shared/fs/write-readable-to-file.test.ts @@ -0,0 +1,52 @@ +import fs from 'fs'; +import { Readable, PassThrough } from 'stream'; +import { writeReadableToFile } from './write-readable-to-file'; +import { calls } from 'tests/vitest/utils.helper'; + +vi.mock('fs'); + +const mockedFS = vi.mocked(fs, true); + +describe('writeReadableToFile', () => { + it('writes the readable to the given path and reports progress', async () => { + const onProgress = vi.fn(); + const writable = new PassThrough(); + mockedFS.createWriteStream.mockReturnValue(writable as unknown as fs.WriteStream); + + const readable = Readable.from([Buffer.from('he'), Buffer.from('llo')]); + + const promise = writeReadableToFile({ + readable, + path: '/tmp/test-file.txt', + onProgress, + }); + + await promise; + + expect(mockedFS.createWriteStream).toHaveBeenCalledWith('/tmp/test-file.txt'); + calls(onProgress).toHaveLength(2); + calls(onProgress).toMatchObject([2, 5]); + }); + + it('rejects when the writable stream emits an error', async () => { + const onProgress = vi.fn(); + const writable = new PassThrough(); + mockedFS.createWriteStream.mockReturnValue(writable as unknown as fs.WriteStream); + + const readable = new Readable({ + read() {}, + }); + + const promise = writeReadableToFile({ + readable, + path: '/tmp/fail.txt', + onProgress, + }); + + const error = new Error('disk full'); + writable.destroy(error); + + await expect(promise).rejects.toThrow('disk full'); + calls(onProgress).toHaveLength(0); + }); +}); diff --git a/src/apps/shared/fs/write-readable-to-file.ts b/src/apps/shared/fs/write-readable-to-file.ts index 857cc509f5..9c4e2ba467 100644 --- a/src/apps/shared/fs/write-readable-to-file.ts +++ b/src/apps/shared/fs/write-readable-to-file.ts @@ -1,15 +1,26 @@ import fs, { PathLike } from 'fs'; import { Readable } from 'stream'; -export class WriteReadableToFile { - static write(readable: Readable, path: PathLike): Promise { - const writableStream = fs.createWriteStream(path); +type Props = { + readable: Readable; + path: PathLike; + onProgress: (bytesWritten: number) => void; +}; - readable.pipe(writableStream); +export function writeReadableToFile({ readable, path, onProgress }: Props) { + const writableStream = fs.createWriteStream(path); - return new Promise((resolve, reject) => { - writableStream.on('finish', resolve); - writableStream.on('error', reject); - }); - } + let bytesWritten = 0; + + readable.on('data', (chunk: Buffer) => { + bytesWritten += chunk.length; + onProgress(bytesWritten); + }); + + readable.pipe(writableStream); + + return new Promise((resolve, reject) => { + writableStream.on('finish', resolve); + writableStream.on('error', reject); + }); } diff --git a/src/context/shared/domain/DownloadProgressTracker.ts b/src/context/shared/domain/DownloadProgressTracker.ts index 3a07c6c18f..6b137d56ef 100644 --- a/src/context/shared/domain/DownloadProgressTracker.ts +++ b/src/context/shared/domain/DownloadProgressTracker.ts @@ -1,5 +1,5 @@ export abstract class DownloadProgressTracker { - abstract downloadStarted(name: string, extension: string, size: number): Promise; + abstract downloadStarted(name: string, extension: string): Promise; abstract downloadUpdate( name: string, @@ -9,13 +9,6 @@ export abstract class DownloadProgressTracker { percentage: number; }, ): Promise; - abstract downloadFinished( - name: string, - extension: string, - size: number, - progress: { - elapsedTime: number; - }, - ): Promise; + abstract downloadFinished(name: string, extension: string): Promise; abstract error(name: string, extension: string): Promise; } diff --git a/src/context/shared/infrastructure/MainProcess/MainProcessDownloadProgressTracker.ts b/src/context/shared/infrastructure/MainProcess/MainProcessDownloadProgressTracker.ts index 0d5249fced..c686ed3a66 100644 --- a/src/context/shared/infrastructure/MainProcess/MainProcessDownloadProgressTracker.ts +++ b/src/context/shared/infrastructure/MainProcess/MainProcessDownloadProgressTracker.ts @@ -6,13 +6,12 @@ import { Service } from 'diod'; @Service() export class MainProcessDownloadProgressTracker extends SyncMessenger implements DownloadProgressTracker { - async downloadStarted(name: string, extension: string, size: number): Promise { + async downloadStarted(name: string, extension: string): Promise { setTrayStatus('SYNCING'); broadcastToWindows('sync-info-update', { action: 'DOWNLOADING', name: this.nameWithExtension(name, extension), - progress: 0, }); } @@ -28,14 +27,7 @@ export class MainProcessDownloadProgressTracker extends SyncMessenger implements }); } - async downloadFinished( - name: string, - extension: string, - size: number, - progress: { - elapsedTime: number; - }, - ): Promise { + async downloadFinished(name: string, extension: string) { const nameWithExtension = this.nameWithExtension(name, extension); setTrayStatus('IDLE'); diff --git a/src/context/storage/StorageFiles/__mocks__/DownloadProgressTrackerMock.ts b/src/context/storage/StorageFiles/__mocks__/DownloadProgressTrackerMock.ts index 401d430169..417d26b116 100644 --- a/src/context/storage/StorageFiles/__mocks__/DownloadProgressTrackerMock.ts +++ b/src/context/storage/StorageFiles/__mocks__/DownloadProgressTrackerMock.ts @@ -1,28 +1,8 @@ import { DownloadProgressTracker } from '../../../shared/domain/DownloadProgressTracker'; export class DownloadProgressTrackerMock implements DownloadProgressTracker { - private downloadStartedMock = vi.fn(); - private downloadUpdateMock = vi.fn(); - private downloadFinishedMock = vi.fn(); - private errorMock = vi.fn(); - - downloadStarted(name: string, extension: string, size: number): Promise { - return this.downloadStartedMock(name, extension, size); - } - - downloadUpdate( - name: string, - extension: string, - progress: { elapsedTime: number; percentage: number }, - ): Promise { - return this.downloadUpdateMock(name, extension, progress); - } - - downloadFinished(name: string, extension: string, size: number, progress: { elapsedTime: number }): Promise { - return this.downloadFinishedMock(name, extension, size, progress); - } - - error(name: string, extension: string): Promise { - return this.errorMock(name, extension); - } + downloadStarted = vi.fn(); + downloadUpdate = vi.fn(); + downloadFinished = vi.fn(); + error = vi.fn(); } diff --git a/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.test.ts b/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.test.ts index 9f2f0b4854..77b75cdb6b 100644 --- a/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.test.ts +++ b/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.test.ts @@ -34,31 +34,14 @@ describe('StorageFileDownloader', () => { }); describe('registerEvents', () => { - it('should handle start download', async () => { - await sut.run(file, metadata); - expect(downloaderHandler.on).toHaveBeenCalledWith('start', expect.any(Function)); - }); - - it('should handle download progress', async () => { - await sut.run(file, metadata); - - expect(downloaderHandler.on).toHaveBeenCalledWith('progress', expect.any(Function)); - }); - it('should handle download errors', async () => { await sut.run(file, metadata); expect(downloaderHandler.on).toHaveBeenCalledWith('error', expect.any(Function)); }); - - it('should handle download finish', async () => { - await sut.run(file, metadata); - - expect(downloaderHandler.on).toHaveBeenCalledWith('finish', expect.any(Function)); - }); }); - it('should successfully download a file', async () => { + it('should successfully download a file and return download result', async () => { const mockStream = new Readable({ read() { this.push('mock data'); @@ -68,9 +51,11 @@ describe('StorageFileDownloader', () => { downloaderHandler.download.mockResolvedValue(mockStream); - const stream = await sut.run(file, metadata); + const result = await sut.run(file, metadata); - expect(stream).toBeInstanceOf(Readable); + expect(result.stream).toBeInstanceOf(Readable); + expect(result.metadata).toEqual(metadata); + expect(result.handler).toBe(downloaderHandler); expect(downloaderHandler.download).toHaveBeenCalledWith(file); }); }); diff --git a/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.ts b/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.ts index 2369c3baf3..c43617cb26 100644 --- a/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.ts +++ b/src/context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader.ts @@ -13,32 +13,6 @@ export class StorageFileDownloader { private readonly tracker: DownloadProgressTracker, ) {} - private async registerEvents( - handler: DownloaderHandler, - { name, type, size }: { name: string; type: string; size: number }, - ) { - handler.on('start', () => { - this.tracker.downloadStarted(name, type, size); - }); - - handler.on('progress', (progress: number, elapsedTime: number) => { - this.tracker.downloadUpdate(name, type, { - elapsedTime, - percentage: progress, - }); - }); - - handler.on('error', () => { - this.tracker.error(name, type); - }); - - handler.on('finish', () => { - this.tracker.downloadFinished(name, type, size, { - elapsedTime: handler.elapsedTime(), - }); - }); - } - async run( file: StorageFile, metadata: { @@ -46,10 +20,10 @@ export class StorageFileDownloader { type: string; size: number; }, - ): Promise { + ): Promise<{ stream: Readable; metadata: typeof metadata; handler: DownloaderHandler }> { const downloader = this.managerFactory.downloader(); - await this.registerEvents(downloader, metadata); + downloader.on('error', () => this.tracker.error(metadata.name, metadata.type)); const stream = await downloader.download(file); @@ -57,6 +31,6 @@ export class StorageFileDownloader { msg: `stream created "${metadata.name}.${metadata.type}" with ${file.id.value}`, }); - return stream; + return { stream, metadata, handler: downloader }; } } diff --git a/src/context/storage/StorageFiles/application/download/__test-helpers__/StorageFileDownloaderTestClass.ts b/src/context/storage/StorageFiles/application/download/__test-helpers__/StorageFileDownloaderTestClass.ts index 5e082e6426..4302ecd019 100644 --- a/src/context/storage/StorageFiles/application/download/__test-helpers__/StorageFileDownloaderTestClass.ts +++ b/src/context/storage/StorageFiles/application/download/__test-helpers__/StorageFileDownloaderTestClass.ts @@ -3,6 +3,7 @@ import { StorageFileDownloader } from '../StorageFileDownloader/StorageFileDownl import { StorageFile } from '../../../domain/StorageFile'; import { DownloadProgressTrackerMock } from '../../../__mocks__/DownloadProgressTrackerMock'; import { DownloaderHandlerFactoryMock } from '../../../domain/download/__mocks__/DownloaderHandlerFactoryMock'; +import { DownloaderHandler } from '../../../domain/download/DownloaderHandler'; export class StorageFileDownloaderTestClass extends StorageFileDownloader { private mock = vi.fn(); @@ -13,12 +14,22 @@ export class StorageFileDownloaderTestClass extends StorageFileDownloader { super(factory, tracker); } - run(file: StorageFile, metadata: { name: string; type: string; size: number }): Promise { + run( + file: StorageFile, + metadata: { name: string; type: string; size: number }, + ): Promise<{ stream: Readable; metadata: typeof metadata; handler: DownloaderHandler }> { return this.mock(file, metadata); } returnsAReadable() { - this.mock.mockResolvedValue(Readable.from('Hello world!')); + const factory = new DownloaderHandlerFactoryMock(); + const handler = factory.downloader(); + (handler.elapsedTime as any).mockReturnValue(1000); + this.mock.mockResolvedValue({ + stream: Readable.from('Hello world!'), + metadata: { name: 'test', type: 'txt', size: 12 }, + handler, + }); } assertHasBeenCalled() { diff --git a/src/context/storage/StorageFiles/application/download/download-with-progress-tracking.test.ts b/src/context/storage/StorageFiles/application/download/download-with-progress-tracking.test.ts new file mode 100644 index 0000000000..fc95118b65 --- /dev/null +++ b/src/context/storage/StorageFiles/application/download/download-with-progress-tracking.test.ts @@ -0,0 +1,64 @@ +import { Readable } from 'stream'; +import { downloadWithProgressTracking } from './download-with-progress-tracking'; +import { DownloadProgressTrackerMock } from '../../__mocks__/DownloadProgressTrackerMock'; +import { FileMother } from '../../../../virtual-drive/files/domain/__test-helpers__/FileMother'; +import { StorageFilesRepositoryMock } from '../../__mocks__/StorageFilesRepositoryMock'; +import { StorageFile } from '../../domain/StorageFile'; +import { call, calls } from 'tests/vitest/utils.helper'; + +describe('downloadWithProgressTracking', () => { + const elapsedTime = 123; + + let tracker: DownloadProgressTrackerMock; + let downloader: { run: ReturnType }; + let repository: StorageFilesRepositoryMock; + + beforeEach(() => { + tracker = new DownloadProgressTrackerMock(); + downloader = { + run: vi.fn(), + }; + repository = new StorageFilesRepositoryMock(); + }); + + it('tracks progress, stores the file, and returns the storage file', async () => { + const virtualFile = FileMother.fromPartial({ + size: 100, + path: 'folder/test-file.txt', + }); + + const handler = { elapsedTime: vi.fn(() => elapsedTime) }; + const stream = Readable.from('hello'); + const metadata = { name: virtualFile.name, type: virtualFile.type, size: virtualFile.size }; + + downloader.run.mockResolvedValue({ stream, metadata, handler }); + const storeSpy = vi.fn(async (_file: StorageFile, _readable: Readable, onProgress: (bytes: number) => void) => { + [20, 200].forEach((bytes) => onProgress(bytes)); + }); + (repository as any).store = storeSpy; + + const result = await downloadWithProgressTracking({ + virtualFile, + tracker, + downloader: downloader as any, + repository, + }); + + call(tracker.downloadStarted).toMatchObject([virtualFile.name, virtualFile.type]); + calls(tracker.downloadUpdate).toHaveLength(2); + calls(tracker.downloadUpdate).toMatchObject([ + [metadata.name, metadata.type, { percentage: 0.2, elapsedTime }], + [metadata.name, metadata.type, { percentage: 1, elapsedTime }], + ]); + call(tracker.downloadFinished).toMatchObject([metadata.name, metadata.type]); + + call(downloader.run).toMatchObject([expect.any(StorageFile), virtualFile]); + call(storeSpy).toMatchObject([expect.any(StorageFile), stream, expect.any(Function)]); + + expect(result.attributes()).toEqual({ + id: virtualFile.contentsId, + virtualId: virtualFile.uuid, + size: virtualFile.size, + }); + }); +}); diff --git a/src/context/storage/StorageFiles/application/download/download-with-progress-tracking.ts b/src/context/storage/StorageFiles/application/download/download-with-progress-tracking.ts new file mode 100644 index 0000000000..b2a897d4d6 --- /dev/null +++ b/src/context/storage/StorageFiles/application/download/download-with-progress-tracking.ts @@ -0,0 +1,32 @@ +import { DownloadProgressTracker } from '../../../../shared/domain/DownloadProgressTracker'; +import { StorageFileDownloader } from '../../application/download/StorageFileDownloader/StorageFileDownloader'; +import { StorageFilesRepository } from '../../domain/StorageFilesRepository'; +import { File } from '../../../../../context/virtual-drive/files/domain/File'; +import { StorageFile } from '../../domain/StorageFile'; + +type Props = { + virtualFile: File; + tracker: DownloadProgressTracker; + downloader: StorageFileDownloader; + repository: StorageFilesRepository; +}; + +export async function downloadWithProgressTracking({ virtualFile, tracker, downloader, repository }: Props) { + const storage = StorageFile.from({ + id: virtualFile.contentsId, + virtualId: virtualFile.uuid, + size: virtualFile.size, + }); + + tracker.downloadStarted(virtualFile.name, virtualFile.type); + const { stream, metadata, handler } = await downloader.run(storage, virtualFile); + + await repository.store(storage, stream, (bytesWritten) => { + const percentage = Math.min(bytesWritten / virtualFile.size, 1); + tracker.downloadUpdate(metadata.name, metadata.type, { percentage, elapsedTime: handler.elapsedTime() }); + }); + + tracker.downloadFinished(metadata.name, metadata.type); + + return storage; +} diff --git a/src/context/storage/StorageFiles/application/offline/CacheStorageFile.test.ts b/src/context/storage/StorageFiles/application/offline/CacheStorageFile.test.ts index d79a56027c..f06f7a387d 100644 --- a/src/context/storage/StorageFiles/application/offline/CacheStorageFile.test.ts +++ b/src/context/storage/StorageFiles/application/offline/CacheStorageFile.test.ts @@ -4,6 +4,7 @@ import { SingleFileMatchingFinderTestClass } from '../../../../virtual-drive/fil import { FileMother } from '../../../../virtual-drive/files/domain/__test-helpers__/FileMother'; import { StorageFileCacheMock } from '../../__mocks__/StorageFileCacheMock'; import { StorageFileDownloaderTestClass } from '../download/__test-helpers__/StorageFileDownloaderTestClass'; +import { DownloadProgressTrackerMock } from '../../__mocks__/DownloadProgressTrackerMock'; describe('Cache Storage File', () => { let SUT: CacheStorageFile; @@ -11,13 +12,15 @@ describe('Cache Storage File', () => { let cache: StorageFileCacheMock; let finder: SingleFileMatchingFinderTestClass; let downloader: StorageFileDownloaderTestClass; + let tracker: DownloadProgressTrackerMock; beforeAll(() => { cache = new StorageFileCacheMock(); finder = new SingleFileMatchingFinderTestClass(); downloader = new StorageFileDownloaderTestClass(); + tracker = new DownloadProgressTrackerMock(); - SUT = new CacheStorageFile(finder, cache, downloader); + SUT = new CacheStorageFile(finder, cache, downloader, tracker); }); beforeEach(() => { diff --git a/src/context/storage/StorageFiles/application/offline/CacheStorageFile.ts b/src/context/storage/StorageFiles/application/offline/CacheStorageFile.ts index eace9801fd..0a74d5ceda 100644 --- a/src/context/storage/StorageFiles/application/offline/CacheStorageFile.ts +++ b/src/context/storage/StorageFiles/application/offline/CacheStorageFile.ts @@ -6,6 +6,9 @@ import { StorageFile } from '../../domain/StorageFile'; import { StorageFileId } from '../../domain/StorageFileId'; import { StorageFileDownloader } from '../download/StorageFileDownloader/StorageFileDownloader'; import { StorageFileCache } from '../../domain/StorageFileCache'; +import { DownloadProgressTracker } from '../../../../shared/domain/DownloadProgressTracker'; +import { getDownloadLock, setDownloadLock } from './download-lock'; +import { File } from '../../../../../context/virtual-drive/files/domain/File'; @Service() export class CacheStorageFile { @@ -13,6 +16,7 @@ export class CacheStorageFile { private readonly virtualFileFinder: SingleFileMatchingFinder, private readonly cache: StorageFileCache, private readonly downloader: StorageFileDownloader, + private readonly tracker: DownloadProgressTracker, ) {} async run(path: string) { @@ -31,19 +35,36 @@ export class CacheStorageFile { const id = new StorageFileId(virtual.contentsId); const alreadyExists = await this.cache.has(id); + if (alreadyExists) return; - if (alreadyExists) { - return; + let downloadPromise = getDownloadLock(id.value); + if (!downloadPromise) { + downloadPromise = this.performDownload(id, virtual); + setDownloadLock(id.value, downloadPromise); } + await downloadPromise; + } + + private async performDownload(id: StorageFileId, virtual: File) { const storage = StorageFile.from({ id: virtual.contentsId, virtualId: virtual.uuid, size: virtual.size, }); - const readable = await this.downloader.run(storage, virtual); - await this.cache.pipe(id, readable); + this.tracker.downloadStarted(virtual.name, virtual.type); + const { stream, metadata, handler } = await this.downloader.run(storage, virtual); + + await this.cache.pipe(id, stream, (bytesWritten) => { + const progress = Math.min(bytesWritten / virtual.size, 1); + this.tracker.downloadUpdate(metadata.name, metadata.type, { + percentage: progress, + elapsedTime: handler.elapsedTime(), + }); + }); + + this.tracker.downloadFinished(metadata.name, metadata.type); logger.debug({ msg: `File "${virtual.nameWithExtension}" with ${storage.id.value} is cached`, diff --git a/src/context/storage/StorageFiles/application/offline/MakeStorageFileAvaliableOffline.ts b/src/context/storage/StorageFiles/application/offline/MakeStorageFileAvaliableOffline.ts index 50cbbe9ee9..75f6a6785f 100644 --- a/src/context/storage/StorageFiles/application/offline/MakeStorageFileAvaliableOffline.ts +++ b/src/context/storage/StorageFiles/application/offline/MakeStorageFileAvaliableOffline.ts @@ -2,10 +2,11 @@ import { Service } from 'diod'; import { logger } from '@internxt/drive-desktop-core/build/backend'; import { SingleFileMatchingFinder } from '../../../../virtual-drive/files/application/SingleFileMatchingFinder'; import { FileStatuses } from '../../../../virtual-drive/files/domain/FileStatus'; -import { StorageFile } from '../../domain/StorageFile'; import { StorageFileId } from '../../domain/StorageFileId'; import { StorageFilesRepository } from '../../domain/StorageFilesRepository'; import { StorageFileDownloader } from '../download/StorageFileDownloader/StorageFileDownloader'; +import { DownloadProgressTracker } from '../../../../shared/domain/DownloadProgressTracker'; +import { downloadWithProgressTracking } from '../download/download-with-progress-tracking'; @Service() export class MakeStorageFileAvaliableOffline { @@ -13,6 +14,7 @@ export class MakeStorageFileAvaliableOffline { private readonly repository: StorageFilesRepository, private readonly virtualFileFinder: SingleFileMatchingFinder, private readonly downloader: StorageFileDownloader, + private readonly tracker: DownloadProgressTracker, ) {} async run(path: string) { @@ -24,22 +26,17 @@ export class MakeStorageFileAvaliableOffline { const id = new StorageFileId(virtual.contentsId); const alreadyExists = await this.repository.exists(id); + if (alreadyExists) return; - if (alreadyExists) { - return; - } - - const storage = StorageFile.from({ - id: virtual.contentsId, - virtualId: virtual.uuid, - size: virtual.size, + const storagedFile = await downloadWithProgressTracking({ + virtualFile: virtual, + tracker: this.tracker, + downloader: this.downloader, + repository: this.repository, }); - const readable = await this.downloader.run(storage, virtual); - await this.repository.store(storage, readable); - logger.debug({ - msg: `File "${virtual.nameWithExtension}" with ${storage.id.value} is now avaliable locally`, + msg: `File "${virtual.nameWithExtension}" with ${storagedFile.id.value} is now avaliable locally`, }); } } diff --git a/src/context/storage/StorageFiles/application/offline/download-lock.ts b/src/context/storage/StorageFiles/application/offline/download-lock.ts new file mode 100644 index 0000000000..2d7395e5c4 --- /dev/null +++ b/src/context/storage/StorageFiles/application/offline/download-lock.ts @@ -0,0 +1,13 @@ +const downloadingFiles: Map> = new Map(); + +export function getDownloadLock(fileId: string): Promise | undefined { + return downloadingFiles.get(fileId); +} + +export function setDownloadLock(fileId: string, promise: Promise): void { + downloadingFiles.set(fileId, promise); + + promise.finally(() => { + downloadingFiles.delete(fileId); + }); +} diff --git a/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.test.ts b/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.test.ts index 421e7cd17c..a9555d35b1 100644 --- a/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.test.ts +++ b/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.test.ts @@ -6,6 +6,7 @@ import { FileMother } from '../../../../virtual-drive/files/domain/__test-helper import { StorageFilesRepositoryMock } from '../../__mocks__/StorageFilesRepositoryMock'; import { StorageFileMother } from '../../../__test-helpers__/StorageFileMother'; import { StorageFileDownloaderTestClass } from '../download/__test-helpers__/StorageFileDownloaderTestClass'; +import { DownloadProgressTrackerMock } from '../../__mocks__/DownloadProgressTrackerMock'; describe('Storage Remote Changes Syncher', () => { let SUT: StorageRemoteChangesSyncher; @@ -13,13 +14,15 @@ describe('Storage Remote Changes Syncher', () => { let repository: StorageFilesRepositoryMock; let singleFileMatchingSearcher: SingleFileMatchingSearcherTestClass; let storageFileDownloader: StorageFileDownloaderTestClass; + let tracker: DownloadProgressTrackerMock; beforeAll(() => { repository = new StorageFilesRepositoryMock(); singleFileMatchingSearcher = new SingleFileMatchingSearcherTestClass(); storageFileDownloader = new StorageFileDownloaderTestClass(); + tracker = new DownloadProgressTrackerMock(); - SUT = new StorageRemoteChangesSyncher(repository, singleFileMatchingSearcher, storageFileDownloader); + SUT = new StorageRemoteChangesSyncher(repository, singleFileMatchingSearcher, storageFileDownloader, tracker); }); beforeEach(() => { @@ -49,6 +52,7 @@ describe('Storage Remote Changes Syncher', () => { repository.returnAll(expectFilesDeleted); singleFileMatchingSearcher.returnOneAtATime(expectFilesDeleted.map(() => FileMother.any())); + storageFileDownloader.returnsAReadable(); await SUT.run(); @@ -72,6 +76,7 @@ describe('Storage Remote Changes Syncher', () => { repository.returnAll(storageFilesFound); singleFileMatchingSearcher.returnOneAtATime(virtualFilesAssociated); + storageFileDownloader.returnsAReadable(); await SUT.run(); diff --git a/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.ts b/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.ts index 63d3cd4492..3242ac0630 100644 --- a/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.ts +++ b/src/context/storage/StorageFiles/application/sync/StorageRemoteChangesSyncher.ts @@ -5,6 +5,8 @@ import { StorageFile } from '../../domain/StorageFile'; import { StorageFilesRepository } from '../../domain/StorageFilesRepository'; import { StorageFileDownloader } from '../download/StorageFileDownloader/StorageFileDownloader'; import { logger } from '@internxt/drive-desktop-core/build/backend'; +import { DownloadProgressTracker } from '../../../../shared/domain/DownloadProgressTracker'; +import { downloadWithProgressTracking } from '../download/download-with-progress-tracking'; @Service() export class StorageRemoteChangesSyncher { @@ -12,6 +14,7 @@ export class StorageRemoteChangesSyncher { private readonly repository: StorageFilesRepository, private readonly fileSearcher: SingleFileMatchingSearcher, private readonly downloader: StorageFileDownloader, + private readonly tracker: DownloadProgressTracker, ) {} private async sync(storage: StorageFile): Promise { @@ -31,17 +34,15 @@ export class StorageRemoteChangesSyncher { await this.repository.delete(storage.id); - const newer = StorageFile.from({ - id: virtualFile.contentsId, - virtualId: storage.virtualId.value, - size: virtualFile.size, + const storagedFile = await downloadWithProgressTracking({ + virtualFile, + tracker: this.tracker, + downloader: this.downloader, + repository: this.repository, }); - const readable = await this.downloader.run(newer, virtualFile); - await this.repository.store(newer, readable); - logger.debug({ - msg: `File "${virtualFile.nameWithExtension}" with ${newer.id.value} is avaliable offline`, + msg: `File "${virtualFile.nameWithExtension}" with ${storagedFile.id.value} is avaliable offline`, }); } diff --git a/src/context/storage/StorageFiles/domain/StorageFileCache.ts b/src/context/storage/StorageFiles/domain/StorageFileCache.ts index 80de15c733..f18dfb086d 100644 --- a/src/context/storage/StorageFiles/domain/StorageFileCache.ts +++ b/src/context/storage/StorageFiles/domain/StorageFileCache.ts @@ -4,7 +4,7 @@ import { StorageFileId } from './StorageFileId'; export abstract class StorageFileCache { abstract has(id: StorageFileId): Promise; abstract store(id: StorageFileId, value: Buffer): Promise; - abstract pipe(id: StorageFileId, stream: Readable): Promise; + abstract pipe(id: StorageFileId, stream: Readable, onProgress: (bytesWritten: number) => void): Promise; abstract read(id: StorageFileId): Promise; abstract delete(id: StorageFileId): Promise; abstract clear(): Promise; diff --git a/src/context/storage/StorageFiles/domain/StorageFilesRepository.ts b/src/context/storage/StorageFiles/domain/StorageFilesRepository.ts index 7918ee1495..43bac2f83f 100644 --- a/src/context/storage/StorageFiles/domain/StorageFilesRepository.ts +++ b/src/context/storage/StorageFiles/domain/StorageFilesRepository.ts @@ -7,7 +7,7 @@ export abstract class StorageFilesRepository { abstract retrieve(id: StorageFileId): Promise; - abstract store(file: StorageFile, readable: Readable): Promise; + abstract store(file: StorageFile, readable: Readable, onProgress: (bytesWritten: number) => void): Promise; abstract read(id: StorageFileId): Promise; diff --git a/src/context/storage/StorageFiles/domain/download/__mocks__/DownloaderHandlerFactoryMock.ts b/src/context/storage/StorageFiles/domain/download/__mocks__/DownloaderHandlerFactoryMock.ts index 23d5bd2cd0..e935029472 100644 --- a/src/context/storage/StorageFiles/domain/download/__mocks__/DownloaderHandlerFactoryMock.ts +++ b/src/context/storage/StorageFiles/domain/download/__mocks__/DownloaderHandlerFactoryMock.ts @@ -2,5 +2,14 @@ import { DownloaderHandler } from '../DownloaderHandler'; import { DownloaderHandlerFactory } from '../DownloaderHandlerFactory'; export class DownloaderHandlerFactoryMock implements DownloaderHandlerFactory { - downloader = vi.fn<() => DownloaderHandler>(); + downloader = vi.fn<() => DownloaderHandler>( + () => + ({ + download: vi.fn(), + downloadById: vi.fn(), + forceStop: vi.fn(), + on: vi.fn(), + elapsedTime: vi.fn().mockReturnValue(0), + }) as unknown as DownloaderHandler, + ); } diff --git a/src/context/storage/StorageFiles/infrastructure/persistance/cache/InMemoryStorageFileCache.ts b/src/context/storage/StorageFiles/infrastructure/persistance/cache/InMemoryStorageFileCache.ts index 255a382302..efb7cd0ed8 100644 --- a/src/context/storage/StorageFiles/infrastructure/persistance/cache/InMemoryStorageFileCache.ts +++ b/src/context/storage/StorageFiles/infrastructure/persistance/cache/InMemoryStorageFileCache.ts @@ -2,7 +2,7 @@ import { Service } from 'diod'; import { StorageFileCache } from '../../../domain/StorageFileCache'; import { StorageFileId } from '../../../domain/StorageFileId'; import { Readable } from 'stream'; -import { ReadStreamToBuffer } from '../../../../../../apps/shared/fs/ReadStreamToBuffer'; +import { readStreamToBuffer } from '../../../../../../apps/shared/fs/read-stream-to-buffer'; @Service() export class InMemoryStorageFileCache implements StorageFileCache { @@ -16,8 +16,8 @@ export class InMemoryStorageFileCache implements StorageFileCache { this.buffers.set(id.value, value); } - async pipe(id: StorageFileId, stream: Readable): Promise { - const buffer = await ReadStreamToBuffer.read(stream); + async pipe(id: StorageFileId, stream: Readable, onProgress: (bytesWritten: number) => void): Promise { + const buffer = await readStreamToBuffer({ stream, onProgress }); await this.store(id, buffer); } diff --git a/src/context/storage/StorageFiles/infrastructure/persistance/repository/typeorm/TypeOrmAndNodeFsStorageFilesRepository.ts b/src/context/storage/StorageFiles/infrastructure/persistance/repository/typeorm/TypeOrmAndNodeFsStorageFilesRepository.ts index e4b291a2c6..c551ea7353 100644 --- a/src/context/storage/StorageFiles/infrastructure/persistance/repository/typeorm/TypeOrmAndNodeFsStorageFilesRepository.ts +++ b/src/context/storage/StorageFiles/infrastructure/persistance/repository/typeorm/TypeOrmAndNodeFsStorageFilesRepository.ts @@ -4,7 +4,7 @@ import { readFile, unlink } from 'fs/promises'; import path from 'path'; import { DataSource, Repository } from 'typeorm'; import { ensureFolderExists } from '../../../../../../../apps/shared/fs/ensure-folder-exists'; -import { WriteReadableToFile } from '../../../../../../../apps/shared/fs/write-readable-to-file'; +import { writeReadableToFile } from '../../../../../../../apps/shared/fs/write-readable-to-file'; import { StorageFile } from '../../../../domain/StorageFile'; import { StorageFileId } from '../../../../domain/StorageFileId'; import { StorageFilesRepository } from '../../../../domain/StorageFilesRepository'; @@ -25,10 +25,10 @@ export class TypeOrmAndNodeFsStorageFilesRepository implements StorageFilesRepos ensureFolderExists(this.baseFolder); } - async store(file: StorageFile, readable: Readable): Promise { + async store(file: StorageFile, readable: Readable, onProgress: (bytesWritten: number) => void): Promise { const where = path.join(this.baseFolder, file.id.value); - await WriteReadableToFile.write(readable, where); + await writeReadableToFile({ readable, path: where, onProgress }); await this.db.save(file.attributes()); } diff --git a/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnailRepository.test.ts b/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnailRepository.test.ts index 4c01fd3cc1..a57c388591 100644 --- a/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnailRepository.test.ts +++ b/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnailRepository.test.ts @@ -4,7 +4,7 @@ import { LocalThumbnailRepository } from './LocalThumbnsailsRepository'; import { SystemThumbnailNameCalculator } from './SystemThumbnailNameCalculator'; import { RelativePathToAbsoluteConverterTestClass } from '../../../../virtual-drive/shared/application/__test-helpers__/RelativePathToAbsoluteConverterTestClass'; import { FileMother } from '../../../../virtual-drive/files/domain/__test-helpers__/FileMother'; -import { WriteReadableToFile } from '../../../../../apps/shared/fs/write-readable-to-file'; +import * as WriteReadableToFile from '../../../../../apps/shared/fs/write-readable-to-file'; import { ThumbnailMother } from '../../../thumbnails/__test-helpers__/ThumbnailMother'; import { Readable } from 'node:stream'; @@ -79,9 +79,9 @@ describe('Local Thumbnail Repository', () => { }); const absolutePath = '/home/jens/photos/me.png'; - const writeSpy = vi.spyOn(WriteReadableToFile, 'write'); + const writeSpy = vi.spyOn(WriteReadableToFile, 'writeReadableToFile'); - writeSpy.mockImplementation((readable: Readable) => { + writeSpy.mockImplementation(({ readable }) => { return new Promise((resolve) => { readable.on('data', () => { /* Intentionally empty - just consuming the stream */ @@ -97,8 +97,10 @@ describe('Local Thumbnail Repository', () => { await SUT.push(file, readableStream); expect(writeSpy).toBeCalledWith( - expect.any(Readable), - path.join(thumbnailFolder, 'normal', 'c6ee772d9e49320e97ec29a7eb5b1697.png'), + expect.objectContaining({ + readable: expect.any(Readable), + path: path.join(thumbnailFolder, 'normal', 'c6ee772d9e49320e97ec29a7eb5b1697.png'), + }), ); }); }); diff --git a/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnsailsRepository.ts b/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnsailsRepository.ts index cd28390947..80b08397cc 100644 --- a/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnsailsRepository.ts +++ b/src/context/storage/thumbnails/infrastructrue/local/LocalThumbnsailsRepository.ts @@ -3,7 +3,7 @@ import { logger } from '@internxt/drive-desktop-core/build/backend'; import fs from 'fs'; import path from 'path'; import { Readable } from 'stream'; -import { WriteReadableToFile } from '../../../../../apps/shared/fs/write-readable-to-file'; +import { writeReadableToFile } from '../../../../../apps/shared/fs/write-readable-to-file'; import { File } from '../../../../virtual-drive/files/domain/File'; import { RelativePathToAbsoluteConverter } from '../../../../virtual-drive/shared/application/RelativePathToAbsoluteConverter'; import { Thumbnail } from '../../domain/Thumbnail'; @@ -82,7 +82,15 @@ export class LocalThumbnailRepository implements ThumbnailsRepository { path.join(this.systemThumbnailsFolder, 'large', name), ]; - await Promise.all(where.map((p) => WriteReadableToFile.write(stream, p))); + await Promise.all( + where.map((p) => + writeReadableToFile({ + readable: stream, + path: p, + onProgress: () => {}, + }), + ), + ); logger.debug({ msg: `Thumbnail Created for ${file.nameWithExtension} on ${where}`,