From eea4e9954c2f7d21da86e7e4b0d2fa4445a4e5a9 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Mon, 13 Jan 2025 18:28:35 +0100 Subject: [PATCH 01/15] chore: add async and @types/async dependencies to package.json --- package.json | 2 ++ yarn.lock | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 0adee561..94af63d0 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "@internxt/sdk": "1.7.0", "@oclif/core": "4.2.0", "@types/validator": "13.12.2", + "async": "^3.2.6", "axios": "1.7.9", "bip39": "3.1.0", "body-parser": "1.20.3", @@ -70,6 +71,7 @@ "@internxt/prettier-config": "internxt/prettier-config#v1.0.2", "@oclif/test": "4.1.4", "@openpgp/web-stream-tools": "0.0.11-patch-0", + "@types/async": "^3.2.24", "@types/cli-progress": "3.11.6", "@types/express": "5.0.0", "@types/mime-types": "2.1.4", diff --git a/yarn.lock b/yarn.lock index a2da60fb..06fb1f9e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2826,6 +2826,11 @@ resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.4.tgz#0b92dcc0cc1c81f6f306a381f28e31b1a56536e9" integrity sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA== +"@types/async@^3.2.24": + version "3.2.24" + resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.24.tgz#3a96351047575bbcf2340541b2d955a35339608f" + integrity sha512-8iHVLHsCCOBKjCF2KwFe0p9Z3rfM9mL+sSP8btyR5vTjJRAqpBYD28/ZLgXPf0pjG1VxOvtCV/BgXkQbpSe8Hw== + "@types/body-parser@*": version "1.19.5" resolved "https://registry.yarnpkg.com/@types/body-parser/-/body-parser-1.19.5.tgz#04ce9a3b677dc8bd681a17da1ab9835dc9d3ede4" @@ -3397,7 +3402,7 @@ async@^2.6.3, async@~2.6.1: dependencies: lodash "^4.17.14" -async@^3.2.0, async@^3.2.3, async@~3.2.0: +async@^3.2.0, async@^3.2.3, async@^3.2.6, async@~3.2.0: version "3.2.6" resolved "https://registry.yarnpkg.com/async/-/async-3.2.6.tgz#1b0728e14929d51b85b449b7f06e27c1145e38ce" integrity sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA== From 59f278b5d4c0aecd1b0d05cac791648eed987ee5 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Mon, 13 Jan 2025 20:06:57 +0100 Subject: [PATCH 02/15] feat: add streamReadableIntoChunks method to StreamUtils for chunking readable streams --- src/utils/stream.utils.ts | 44 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/src/utils/stream.utils.ts b/src/utils/stream.utils.ts index 2075a455..769d49ed 100644 --- a/src/utils/stream.utils.ts +++ b/src/utils/stream.utils.ts @@ -1,5 +1,5 @@ import { ReadStream, WriteStream } from 'node:fs'; -import { Transform, TransformCallback } from 'node:stream'; +import { Readable, Transform, TransformCallback } from 'node:stream'; export class StreamUtils { static readStreamToReadableStream(readStream: ReadStream): ReadableStream { @@ -64,6 +64,48 @@ export class StreamUtils { return stream; } + + /** + * Given a readable stream, it enqueues its parts into chunks as it is being read + * @param readable Readable stream + * @param chunkSize The chunkSize in bytes that we want each chunk to be + * @returns A readable stream whose output is chunks of size chunkSize + */ + static streamReadableIntoChunks(readable: Readable, chunkSize: number): Readable { + let buffer = Buffer.alloc(0); + + const mergeBuffers = (buffer1: Buffer, buffer2: Buffer): Buffer => { + return Buffer.concat([buffer1, buffer2]); + }; + + const outputStream = new Readable({ + read() { + // noop + }, + }); + + readable.on('data', (chunk: Buffer) => { + buffer = mergeBuffers(buffer, chunk); + + while (buffer.length >= chunkSize) { + outputStream.push(buffer.subarray(0, chunkSize)); + buffer = buffer.subarray(chunkSize); + } + }); + + readable.on('end', () => { + if (buffer.length > 0) { + outputStream.push(buffer); + } + outputStream.push(null); // Signal the end of the stream + }); + + readable.on('error', (err) => { + outputStream.destroy(err); + }); + + return outputStream; + } } export class ProgressTransform extends Transform { From e6e046d8d24e6f8709d240c8385ef7e88cb32032 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Mon, 13 Jan 2025 20:08:32 +0100 Subject: [PATCH 03/15] feat: implement encryptStreamInParts method for chunked encryption of readable streams --- src/services/crypto.service.ts | 22 +++++++++++++++---- .../network/network-facade.service.ts | 2 +- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/services/crypto.service.ts b/src/services/crypto.service.ts index b43dd6f7..c71febbe 100644 --- a/src/services/crypto.service.ts +++ b/src/services/crypto.service.ts @@ -1,7 +1,7 @@ import { CryptoProvider } from '@internxt/sdk'; import { Keys, Password } from '@internxt/sdk/dist/auth'; import { createCipheriv, createDecipheriv, createHash, Decipher, pbkdf2Sync, randomBytes } from 'node:crypto'; -import { Transform } from 'node:stream'; +import { Readable, Transform } from 'node:stream'; import { KeysService } from './keys.service'; import { ConfigService } from '../services/config.service'; import { StreamUtils } from '../utils/stream.utils'; @@ -116,12 +116,26 @@ export class CryptoService { return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8'); }; - public async decryptStream( + public encryptStreamInParts = ( + readable: Readable, + cipher: Transform, + size: number, + parts: number, + ): Transform => { + // We include a marginChunkSize because if we split the chunk directly, there will always be one more chunk left, this will cause a mismatch with the urls provided + const marginChunkSize = 1024; + const chunkSize = size / parts + marginChunkSize; + const readableChunks = StreamUtils.streamReadableIntoChunks(readable, chunkSize); + + return readableChunks.pipe(cipher); + }; + + public decryptStream = ( inputSlices: ReadableStream[], key: Buffer, iv: Buffer, startOffsetByte?: number, - ) { + ) => { let decipher: Decipher; if (startOffsetByte) { const aesBlockSize = 16; @@ -164,7 +178,7 @@ export class CryptoService { }); return decryptedStream; - } + }; public getEncryptionTransform = (key: Buffer, iv: Buffer): Transform => { const cipher = createCipheriv('aes-256-ctr', key, iv); diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index 04b4bb10..aa657a50 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -73,7 +73,7 @@ export class NetworkFacade { if (rangeOptions) { startOffsetByte = rangeOptions.parsed.start; } - fileStream = await this.cryptoService.decryptStream( + fileStream = this.cryptoService.decryptStream( encryptedContentStreams, Buffer.from(key as ArrayBuffer), Buffer.from(iv as ArrayBuffer), From dc5e27d4ef07892b1d560a4c0a06a5be38ea79bf Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Mon, 13 Jan 2025 20:10:12 +0100 Subject: [PATCH 04/15] feat: update uploadFile method to accept Buffer in addition to Readable stream and add new types for upload tasks and multipart options --- src/services/network/upload.service.ts | 2 +- src/types/network.types.ts | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/services/network/upload.service.ts b/src/services/network/upload.service.ts index f11a2a4e..c1ce0fb1 100644 --- a/src/services/network/upload.service.ts +++ b/src/services/network/upload.service.ts @@ -5,7 +5,7 @@ import { UploadOptions } from '../../types/network.types'; export class UploadService { public static readonly instance: UploadService = new UploadService(); - async uploadFile(url: string, from: Readable, options: UploadOptions): Promise<{ etag: string }> { + async uploadFile(url: string, from: Readable | Buffer, options: UploadOptions): Promise<{ etag: string }> { const response = await axios.put(url, from, { signal: options.abortController?.signal, onUploadProgress: (progressEvent) => { diff --git a/src/types/network.types.ts b/src/types/network.types.ts index 800fe09e..72c3869d 100644 --- a/src/types/network.types.ts +++ b/src/types/network.types.ts @@ -17,3 +17,13 @@ export interface SelfsignedCert { cert: string | Buffer; key: string | Buffer; } + +export interface UploadTask { + contentToUpload: Buffer; + urlToUpload: string; + index: number; +} + +export interface UploadMultipartOptions extends UploadOptions { + parts: number; +} From 0219dd78ca27f98d4f9e8e7c321dfa84a8352aa6 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 10:16:30 +0100 Subject: [PATCH 05/15] feat: add uploadMultipartFromStream method for multi-part encrypted uploads from a Readable stream --- .../network/network-facade.service.ts | 145 +++++++++++++++++- 1 file changed, 144 insertions(+), 1 deletion(-) diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index aa657a50..6d3ac044 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -6,17 +6,26 @@ import { DownloadFileFunction, EncryptFileFunction, UploadFileFunction, + UploadFileMultipartFunction, } from '@internxt/sdk/dist/network'; import { Environment } from '@internxt/inxt-js'; import { randomBytes } from 'node:crypto'; import { Readable, Transform } from 'node:stream'; -import { DownloadOptions, UploadOptions, UploadProgressCallback, DownloadProgressCallback } from '../../types/network.types'; +import { + DownloadOptions, + UploadOptions, + UploadProgressCallback, + DownloadProgressCallback, + UploadMultipartOptions, + UploadTask, +} from '../../types/network.types'; import { CryptoService } from '../crypto.service'; import { UploadService } from './upload.service'; import { DownloadService } from './download.service'; import { ValidationService } from '../validation.service'; import { HashStream } from '../../utils/hash.utils'; import { RangeOptions } from '../../utils/network.utils'; +import { queue, QueueObject } from 'async'; export class NetworkFacade { private readonly cryptoLib: Network.Crypto; @@ -183,4 +192,138 @@ export class NetworkFacade { return [uploadOperation(), abortable]; } + + /** + * Performs a multi-part upload encrypting the stream content + * + * @param bucketId The bucket where the file will be uploaded + * @param mnemonic The plain mnemonic of the user + * @param size The total size of the stream content + * @param from The source ReadStream to upload from + * @param options The upload options + * @returns A promise to execute the upload and an abort controller to cancel the upload + */ + async uploadMultipartFromStream( + bucketId: string, + mnemonic: string, + size: number, + from: Readable, + options: UploadMultipartOptions, + ): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> { + const hashStream = new HashStream(); + const abortable = options?.abortController ?? new AbortController(); + let encryptionTransform: Transform; + let hash: Buffer; + + const partsUploadedBytes: Record = {}; + const fileParts: { PartNumber: number; ETag: string }[] = []; + + const onProgress = (partId: number, loadedBytes: number) => { + if (!options?.progressCallback) return; + partsUploadedBytes[partId] = loadedBytes; + const currentTotalLoadedBytes = Object.values(partsUploadedBytes).reduce((a, p) => a + p, 0); + const reportedProgress = Math.round((currentTotalLoadedBytes / size) * 100); + options.progressCallback(reportedProgress); + }; + + const encryptFile: EncryptFileFunction = async (_, key, iv) => { + const encryptionCipher = this.cryptoService.getEncryptionTransform( + Buffer.from(key as ArrayBuffer), + Buffer.from(iv as ArrayBuffer), + ); + const streamInParts = this.cryptoService.encryptStreamInParts(from, encryptionCipher, size, options.parts); + encryptionTransform = streamInParts.pipe(hashStream); + }; + + const uploadFileMultipart: UploadFileMultipartFunction = async (urls: string[]) => { + let partIndex = 0; + const limitConcurrency = 6; + + const worker = async (upload: UploadTask) => { + const { etag } = await this.uploadService.uploadFile(upload.urlToUpload, upload.contentToUpload, { + abortController: abortable, + progressCallback: (loadedBytes: number) => { + onProgress(upload.index, loadedBytes); + }, + }); + + fileParts.push({ + ETag: etag, + PartNumber: upload.index + 1, + }); + }; + + const uploadQueue: QueueObject = queue(function (task, callback) { + worker(task) + .then(() => { + callback(); + }) + .catch((e) => { + callback(e); + }); + }, limitConcurrency); + + for await (const chunk of encryptionTransform) { + const part: Buffer = chunk; + + if (uploadQueue.running() === limitConcurrency) { + await uploadQueue.unsaturated(); + } + + if (abortable.signal.aborted) { + throw new Error('Upload cancelled by user'); + } + + let errorAlreadyThrown = false; + + uploadQueue + .pushAsync({ + contentToUpload: part, + urlToUpload: urls[partIndex], + index: partIndex++, + }) + .catch((err: Error) => { + if (errorAlreadyThrown) return; + + errorAlreadyThrown = true; + if (err) { + uploadQueue.kill(); + if (!abortable?.signal.aborted) { + abortable.abort(); + } + } + }); + } + + while (uploadQueue.running() > 0 || uploadQueue.length() > 0) { + await uploadQueue.drain(); + } + + hash = hashStream.getHash(); + return { + hash: hash.toString('hex'), + parts: fileParts.sort((pA, pB) => pA.PartNumber - pB.PartNumber), + }; + }; + + const uploadOperation = async () => { + const uploadResult = await NetworkUpload.uploadMultipartFile( + this.network, + this.cryptoLib, + bucketId, + mnemonic, + size, + encryptFile, + uploadFileMultipart, + options.parts, + ); + + return { + fileId: uploadResult, + hash: hash, + }; + }; + + return [uploadOperation(), abortable]; + } } From c735f5453ed6e5697429e271c069022a74dfd270 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 10:18:08 +0100 Subject: [PATCH 06/15] feat: implement multipart upload based on file size --- src/commands/upload-file.ts | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/commands/upload-file.ts b/src/commands/upload-file.ts index b63f86a4..feb85ae2 100644 --- a/src/commands/upload-file.ts +++ b/src/commands/upload-file.ts @@ -82,17 +82,38 @@ export default class UploadFile extends Command { linewrap: true, }); progressBar.start(100, 0); - const [uploadPromise, abortable] = await networkFacade.uploadFromStream( - user.bucket, - user.mnemonic, - stats.size, - fileStream, - { + + const minimumMultipartThreshold = 100 * 1024 * 1024; + const useMultipart = stats.size > minimumMultipartThreshold; + const partSize = 30 * 1024 * 1024; + const parts = Math.ceil(stats.size / partSize); + + let uploadOperation: Promise< + [ + Promise<{ + fileId: string; + hash: Buffer; + }>, + AbortController, + ] + >; + + if (useMultipart) { + uploadOperation = networkFacade.uploadMultipartFromStream(user.bucket, user.mnemonic, stats.size, fileStream, { + parts, progressCallback: (progress) => { progressBar.update(progress * 0.99); }, - }, - ); + }); + } else { + uploadOperation = networkFacade.uploadFromStream(user.bucket, user.mnemonic, stats.size, fileStream, { + progressCallback: (progress) => { + progressBar.update(progress * 0.99); + }, + }); + } + + const [uploadPromise, abortable] = await uploadOperation; process.on('SIGINT', () => { abortable.abort('SIGINT received'); From 0a89592c941d334d8c65d0feb3ccc81ed225f5dd Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 12:11:00 +0100 Subject: [PATCH 07/15] feat: add uncaughtException handler to log errors in WebDAV server --- src/webdav/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/webdav/index.ts b/src/webdav/index.ts index 39067c0d..4760f139 100644 --- a/src/webdav/index.ts +++ b/src/webdav/index.ts @@ -40,4 +40,8 @@ const init = async () => { .catch((err) => webdavLogger.error('Failed to start WebDAV server', err)); }; +process.on('uncaughtException', (err) => { + webdavLogger.error('Unhandled exception:', err); +}); + init(); From 1a80eeb13bbd60377afd2aea9b479d9c6d20558f Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 12:17:24 +0100 Subject: [PATCH 08/15] feat: enhance PUT request handler to support multipart uploads --- src/webdav/handlers/PUT.handler.ts | 53 ++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/src/webdav/handlers/PUT.handler.ts b/src/webdav/handlers/PUT.handler.ts index 3e9ae149..8e5d43aa 100644 --- a/src/webdav/handlers/PUT.handler.ts +++ b/src/webdav/handlers/PUT.handler.ts @@ -11,6 +11,7 @@ import { DriveFileItem, DriveFolderItem } from '../../types/drive.types'; import { DriveFolderService } from '../../services/drive/drive-folder.service'; import { TrashService } from '../../services/drive/trash.service'; import { EncryptionVersion } from '@internxt/sdk/dist/drive/storage/types'; +import { CLIUtils } from '../../utils/cli.utils'; export class PUTRequestHandler implements WebDavMethodHandler { constructor( @@ -68,19 +69,50 @@ export class PUTRequestHandler implements WebDavMethodHandler { const { user } = await authService.getAuthDetails(); - let lastLoggedProgress = 0; - const [uploadPromise] = await networkFacade.uploadFromStream(user.bucket, user.mnemonic, contentLength, req, { - progressCallback: (progress) => { - const percentage = Math.floor(100 * progress); + const timer = CLIUtils.timer(); + + const minimumMultipartThreshold = 100 * 1024 * 1024; + const useMultipart = contentLength > minimumMultipartThreshold; + const partSize = 30 * 1024 * 1024; + const parts = Math.ceil(contentLength / partSize); + + let uploadOperation: Promise< + [ + Promise<{ + fileId: string; + hash: Buffer; + }>, + AbortController, + ] + >; + + const progressCallback = (progress: number) => { + webdavLogger.info(`[PUT] Upload progress for file ${resource.name}: ${progress}%`); + }; + + if (useMultipart) { + uploadOperation = networkFacade.uploadMultipartFromStream(user.bucket, user.mnemonic, contentLength, req, { + parts, + progressCallback, + }); + } else { + uploadOperation = networkFacade.uploadFromStream(user.bucket, user.mnemonic, contentLength, req, { + progressCallback, + }); + } + + const [uploadPromise, abortable] = await uploadOperation; - if (percentage >= lastLoggedProgress + 1) { - lastLoggedProgress = percentage; - webdavLogger.info(`[PUT] Upload progress for file ${resource.name}: ${percentage}%`); - } - }, + let uploaded = false; + res.on('close', () => { + if (!uploaded) { + webdavLogger.info('[PUT] ❌ HTTP Client has been disconnected, res has been closed.'); + abortable.abort('HTTP Client has been disconnected.'); + } }); const uploadResult = await uploadPromise; + uploaded = true; webdavLogger.info('[PUT] ✅ File uploaded to network'); @@ -95,7 +127,8 @@ export class PUTRequestHandler implements WebDavMethodHandler { name: '', }); - webdavLogger.info('[PUT] ✅ File uploaded to internxt drive'); + const uploadTime = timer.stop(); + webdavLogger.info(`[PUT] ✅ File uploaded in ${uploadTime}ms to Internxt Drive`); await driveDatabaseManager.createFile(file, resource.path.dir + '/'); From b2e326493f7b4564f85715eb2d8ebc7860142bc4 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 13:48:17 +0100 Subject: [PATCH 09/15] test: add progress reporting for file uploads in network facade --- .../network/network-facade.service.test.ts | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/test/services/network/network-facade.service.test.ts b/test/services/network/network-facade.service.test.ts index 94ba3f24..69fad07c 100644 --- a/test/services/network/network-facade.service.test.ts +++ b/test/services/network/network-facade.service.test.ts @@ -10,6 +10,8 @@ import { DownloadService } from '../../../src/services/network/download.service' import { Readable } from 'node:stream'; import axios from 'axios'; import { fail } from 'node:assert'; +import crypto from 'node:crypto'; +import { HashStream } from '../../../src/utils/hash.utils'; describe('Network Facade Service', () => { beforeEach(() => { @@ -77,6 +79,68 @@ describe('Network Facade Service', () => { expect(uploadResult.fileId).to.be.equal('uploaded_file_id'); }); + it('When a file is uploaded, then it should report progress', async () => { + const bucket = 'f1858bc9675f9e4f7ab29429'; + const networkMock = getNetworkMock(); + + const sut = new NetworkFacade( + networkMock, + UploadService.instance, + DownloadService.instance, + CryptoService.instance, + ); + const file = crypto.randomBytes(16).toString('hex'); + const readStream = new Readable({ + read() { + this.push(file); + this.push(null); + }, + }); + const options = { + progressCallback: vi.fn(), + abortController: new AbortController(), + }; + + vi.spyOn(HashStream.prototype, 'getHash').mockImplementation(() => Buffer.from('')); + + vi.spyOn(axios, 'put').mockImplementation((_, __, config) => { + config?.onUploadProgress?.({ + loaded: file.length, + total: file.length, + bytes: file.length, + lengthComputable: true, + }); + return Promise.resolve({ + data: readStream, + headers: { + etag: 'any-etag', + }, + }); + }); + + vi.spyOn(networkMock, 'startUpload').mockResolvedValue({ + uploads: [{ index: 0, url: 'any-url', uuid: 'any-uuid', urls: [] }], + }); + + vi.spyOn(networkMock, 'finishUpload') + // @ts-expect-error - We only mock the properties we need + .mockResolvedValue({ + id: 'any-id', + }); + + const [executeUpload] = await sut.uploadFromStream( + bucket, + 'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text', + file.length, + readStream, + options, + ); + + await executeUpload; + + expect(options.progressCallback).toHaveBeenCalledWith(100); + }); + it('When a file is downloaded, should write it to a stream', async () => { const encryptedContent = Buffer.from('b6ccfa381c150f3a4b65245bffa4d84087', 'hex'); const bucket = 'cd8abd7e8b13081660b58dbe'; @@ -220,7 +284,7 @@ describe('Network Facade Service', () => { loaded: encryptedContent.length, total: encryptedContent.length, bytes: encryptedContent.length, - lengthComputable: true + lengthComputable: true, }); return Promise.resolve({ data: readableContent }); }); From 74a0d495965d387035a7724ea5143610a1d1e854 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 15:31:27 +0100 Subject: [PATCH 10/15] test: add multipart file uploads in network facade service --- .../network/network-facade.service.test.ts | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/test/services/network/network-facade.service.test.ts b/test/services/network/network-facade.service.test.ts index 69fad07c..9fe4ed2c 100644 --- a/test/services/network/network-facade.service.test.ts +++ b/test/services/network/network-facade.service.test.ts @@ -12,6 +12,7 @@ import axios from 'axios'; import { fail } from 'node:assert'; import crypto from 'node:crypto'; import { HashStream } from '../../../src/utils/hash.utils'; +import { UploadMultipartOptions } from '../../../src/types/network.types'; describe('Network Facade Service', () => { beforeEach(() => { @@ -304,4 +305,76 @@ describe('Network Facade Service', () => { expect(options.progressCallback).toHaveBeenCalledWith(100); }); + + it('When a file is uploaded via multipart, then it should report progress', async () => { + const bucket = 'f1858bc9675f9e4f7ab29429'; + const networkMock = getNetworkMock(); + + const sut = new NetworkFacade( + networkMock, + UploadService.instance, + DownloadService.instance, + CryptoService.instance, + ); + const file = crypto.randomBytes(16).toString('hex'); + const readStream = new Readable({ + read() { + this.push(file); + this.push(null); + }, + }); + const options: UploadMultipartOptions = { + progressCallback: vi.fn(), + abortController: new AbortController(), + parts: 2, + }; + + vi.spyOn(HashStream.prototype, 'getHash').mockImplementation(() => Buffer.from('')); + + vi.spyOn(axios, 'put').mockImplementation((_, __, config) => { + config?.onUploadProgress?.({ + loaded: file.length, + total: file.length, + bytes: file.length, + lengthComputable: true, + }); + return Promise.resolve({ + data: readStream, + headers: { + etag: 'any-etag', + }, + }); + }); + + vi.spyOn(networkMock, 'startUpload').mockResolvedValue({ + uploads: [ + { + index: 0, + url: 'any-url', + uuid: 'any-uuid', + UploadId: 'any-UploadId', + urls: ['url_1', 'url_2'], + }, + ], + }); + + vi.spyOn(networkMock, 'finishUpload') + // @ts-expect-error - We only mock the properties we need + .mockResolvedValue({ + id: 'uploaded_file_id', + }); + + const [executeUpload] = await sut.uploadMultipartFromStream( + bucket, + 'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text', + file.length, + readStream, + options, + ); + + const uploadResult = await executeUpload; + + expect(uploadResult.fileId).to.be.equal('uploaded_file_id'); + expect(options.progressCallback).toHaveBeenCalledWith(100); + }); }); From eb5eb59e502dbf47632a2f2122684e6b3b5b4db2 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 15:41:10 +0100 Subject: [PATCH 11/15] refactor: fix maintainability issue --- src/services/network/network-facade.service.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index 6d3ac044..7bb20623 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -300,9 +300,10 @@ export class NetworkFacade { } hash = hashStream.getHash(); + const sortedParts = fileParts.sort((pA, pB) => pA.PartNumber - pB.PartNumber); return { hash: hash.toString('hex'), - parts: fileParts.sort((pA, pB) => pA.PartNumber - pB.PartNumber), + parts: sortedParts, }; }; From d0241dede5e812f41496243c5f166c1dcfeac05b Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 15:47:54 +0100 Subject: [PATCH 12/15] test: add multipart file upload handling in PUT request handler --- test/webdav/handlers/PUT.handler.test.ts | 72 ++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/test/webdav/handlers/PUT.handler.test.ts b/test/webdav/handlers/PUT.handler.test.ts index e2791c82..8a201157 100644 --- a/test/webdav/handlers/PUT.handler.test.ts +++ b/test/webdav/handlers/PUT.handler.test.ts @@ -207,4 +207,76 @@ describe('PUT request handler', () => { expect(deleteDBFileStub).toHaveBeenCalledOnce(); expect(deleteDriveFileStub).toHaveBeenCalledOnce(); }); + + it('When the Drive destination folder is found, then it should upload the multipart file', async () => { + const driveDatabaseManager = getDriveDatabaseManager(); + const downloadService = DownloadService.instance; + const uploadService = UploadService.instance; + const cryptoService = CryptoService.instance; + const authService = AuthService.instance; + const networkFacade = new NetworkFacade(getNetworkMock(), uploadService, downloadService, cryptoService); + const sut = new PUTRequestHandler({ + driveFileService: DriveFileService.instance, + driveFolderService: DriveFolderService.instance, + authService: AuthService.instance, + trashService: TrashService.instance, + networkFacade, + driveDatabaseManager, + }); + + const multipartFileSize = 105 * 1024 * 1024; + + const requestedFileResource: WebDavRequestedResource = getRequestedFileResource(); + const requestedParentFolderResource: WebDavRequestedResource = getRequestedFolderResource({ + parentFolder: '/', + folderName: '', + }); + const folderFixture = newFolderItem({ name: requestedParentFolderResource.name }); + const fileFixture = newDriveFile({ + folderId: folderFixture.id, + folderUuid: folderFixture.uuid, + size: multipartFileSize, + }); + + const request = createWebDavRequestFixture({ + method: 'PUT', + url: requestedFileResource.url, + headers: { + 'content-length': multipartFileSize.toString(), + }, + }); + + const response = createWebDavResponseFixture({ + status: vi.fn().mockReturnValue({ send: vi.fn() }), + }); + + const getRequestedResourceStub = vi + .spyOn(WebDavUtils, 'getRequestedResource') + .mockResolvedValueOnce(requestedFileResource) + .mockResolvedValueOnce(requestedParentFolderResource); + const getAndSearchItemFromResourceStub = vi + .spyOn(WebDavUtils, 'getAndSearchItemFromResource') + .mockResolvedValueOnce(folderFixture) + .mockRejectedValue(new Error()); + const getAuthDetailsStub = vi.spyOn(authService, 'getAuthDetails').mockResolvedValue(UserCredentialsFixture); + const uploadMultipartFromStreamStub = vi + .spyOn(networkFacade, 'uploadMultipartFromStream') + .mockResolvedValue([ + Promise.resolve({ fileId: '09218313209', hash: Buffer.from('test') }), + new AbortController(), + ]); + const createDriveFileStub = vi + .spyOn(DriveFileService.instance, 'createFile') + .mockResolvedValue(fileFixture.toItem()); + const createDBFileStub = vi.spyOn(driveDatabaseManager, 'createFile').mockResolvedValue(fileFixture); + + await sut.handle(request, response); + expect(response.status).toHaveBeenCalledWith(200); + expect(getRequestedResourceStub).toHaveBeenCalledTimes(2); + expect(getAndSearchItemFromResourceStub).toHaveBeenCalledTimes(2); + expect(getAuthDetailsStub).toHaveBeenCalledOnce(); + expect(uploadMultipartFromStreamStub).toHaveBeenCalledOnce(); + expect(createDriveFileStub).toHaveBeenCalledOnce(); + expect(createDBFileStub).toHaveBeenCalledOnce(); + }); }); From 141b10b9971f47179f84995d94c0dc602e886e18 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 16:18:24 +0100 Subject: [PATCH 13/15] refactor: fix maintainability issue --- src/services/network/network-facade.service.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index 7bb20623..28ac00e8 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -216,7 +216,11 @@ export class NetworkFacade { let hash: Buffer; const partsUploadedBytes: Record = {}; - const fileParts: { PartNumber: number; ETag: string }[] = []; + type Part = { + PartNumber: number; + ETag: string; + }; + const fileParts: Part[] = []; const onProgress = (partId: number, loadedBytes: number) => { if (!options?.progressCallback) return; @@ -300,7 +304,8 @@ export class NetworkFacade { } hash = hashStream.getHash(); - const sortedParts = fileParts.sort((pA, pB) => pA.PartNumber - pB.PartNumber); + const compareParts = (pA: Part, pB: Part) => pA.PartNumber - pB.PartNumber; + const sortedParts = fileParts.sort(compareParts); return { hash: hash.toString('hex'), parts: sortedParts, From 0dc9d4a6dcf3e54ef18d6d3e87be95c1c3f8fed8 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Tue, 14 Jan 2025 16:23:13 +0100 Subject: [PATCH 14/15] refactor: rename worker function to uploadPart for clarity --- src/services/network/network-facade.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index 28ac00e8..b2f56277 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -243,7 +243,7 @@ export class NetworkFacade { let partIndex = 0; const limitConcurrency = 6; - const worker = async (upload: UploadTask) => { + const uploadPart = async (upload: UploadTask) => { const { etag } = await this.uploadService.uploadFile(upload.urlToUpload, upload.contentToUpload, { abortController: abortable, progressCallback: (loadedBytes: number) => { @@ -258,7 +258,7 @@ export class NetworkFacade { }; const uploadQueue: QueueObject = queue(function (task, callback) { - worker(task) + uploadPart(task) .then(() => { callback(); }) From 8d8575a1dd54b9c1d55038e725939fc34e920b03 Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Wed, 15 Jan 2025 15:38:14 +0100 Subject: [PATCH 15/15] chore(deps): pin async and @types/async to specific versions --- package.json | 4 ++-- yarn.lock | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index a5a9c08f..6cecad9d 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "@internxt/sdk": "1.7.0", "@oclif/core": "4.2.3", "@types/validator": "13.12.2", - "async": "^3.2.6", + "async": "3.2.6", "axios": "1.7.9", "bip39": "3.1.0", "body-parser": "1.20.3", @@ -71,7 +71,7 @@ "@internxt/prettier-config": "internxt/prettier-config#v1.0.2", "@oclif/test": "4.1.7", "@openpgp/web-stream-tools": "0.0.11-patch-1", - "@types/async": "^3.2.24", + "@types/async": "3.2.24", "@types/cli-progress": "3.11.6", "@types/express": "5.0.0", "@types/mime-types": "2.1.4", diff --git a/yarn.lock b/yarn.lock index ea9c93ae..82141fcf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2192,7 +2192,7 @@ resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.4.tgz#0b92dcc0cc1c81f6f306a381f28e31b1a56536e9" integrity sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA== -"@types/async@^3.2.24": +"@types/async@3.2.24": version "3.2.24" resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.24.tgz#3a96351047575bbcf2340541b2d955a35339608f" integrity sha512-8iHVLHsCCOBKjCF2KwFe0p9Z3rfM9mL+sSP8btyR5vTjJRAqpBYD28/ZLgXPf0pjG1VxOvtCV/BgXkQbpSe8Hw== @@ -2766,6 +2766,11 @@ async-retry@^1.3.3: dependencies: retry "0.13.1" +async@3.2.6, async@^3.2.0, async@^3.2.3, async@~3.2.0: + version "3.2.6" + resolved "https://registry.yarnpkg.com/async/-/async-3.2.6.tgz#1b0728e14929d51b85b449b7f06e27c1145e38ce" + integrity sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA== + async@^2.6.3, async@~2.6.1: version "2.6.4" resolved "https://registry.yarnpkg.com/async/-/async-2.6.4.tgz#706b7ff6084664cd7eae713f6f965433b5504221" @@ -2773,11 +2778,6 @@ async@^2.6.3, async@~2.6.1: dependencies: lodash "^4.17.14" -async@^3.2.0, async@^3.2.3, async@^3.2.6, async@~3.2.0: - version "3.2.6" - resolved "https://registry.yarnpkg.com/async/-/async-3.2.6.tgz#1b0728e14929d51b85b449b7f06e27c1145e38ce" - integrity sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA== - asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"