diff --git a/examples/callbacks/notify-delete.callback.ts b/examples/callbacks/notify-delete.callback.ts index 450bf396..a5e379a0 100644 --- a/examples/callbacks/notify-delete.callback.ts +++ b/examples/callbacks/notify-delete.callback.ts @@ -1,6 +1,6 @@ import { logger } from "@/logger"; -export const onDeleteCallback = (fileId: string, callback: (response: boolean) => void) => { +export const notifyDeleteCallback = (fileId: string, callback: (response: boolean) => void) => { logger.info({ event: "onDelete", fileId }); callback(true); }; diff --git a/examples/callbacks/notify-fetch-data.callback.ts b/examples/callbacks/notify-fetch-data.callback.ts index dc61e534..fa075951 100644 --- a/examples/callbacks/notify-fetch-data.callback.ts +++ b/examples/callbacks/notify-fetch-data.callback.ts @@ -4,7 +4,7 @@ import { sleep } from "@/utils"; type CallbackResponse = (data: boolean, path: string, errorHandler?: () => void) => Promise<{ finished: boolean; progress: number }>; -export const onFetchDataCallback = async (id: string, callback: CallbackResponse) => { +export const fetchDataCallback = async (id: string, callback: CallbackResponse) => { const path = await getInfoItem(id); let finish = false; diff --git a/examples/config.json b/examples/config.json deleted file mode 100644 index 544ad457..00000000 --- a/examples/config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "driveName": "Internxt", - "driveVersion": "0.0.1", - "syncRootPath": "C:\\Users\\jonat\\Desktop\\carpeta3", - "fileName": "prueba1.txt", - "folderName": "newfolder" -} diff --git a/examples/queueManager.ts b/examples/queueManager.ts deleted file mode 100644 index bbe4af8a..00000000 --- a/examples/queueManager.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { HandleAction, HandleActions } from "src/queue/queueManager"; - -import { logger } from "@/logger"; -import { sleep } from "@/utils"; - -import { IQueueManager, QueueItem } from "../index"; - -export type QueueHandler = { - handleAdd: HandleAction; - handleHydrate: HandleAction; - handleDehydrate: HandleAction; - handleChange?: HandleAction; - handleChangeSize: HandleAction; -}; - -export class QueueManager implements IQueueManager { - private _queue: QueueItem[] = []; - - private isProcessing = false; - - actions: HandleActions; - - constructor(handlers: QueueHandler) { - this.actions = { - add: handlers.handleAdd, - hydrate: handlers.handleHydrate, - dehydrate: handlers.handleDehydrate, - change: handlers.handleChange || (() => Promise.resolve()), - changeSize: handlers.handleChangeSize, - }; - } - - public enqueue(task: QueueItem): void { - logger.debug({ fn: "enqueue", task }); - this._queue.push(task); - this.sortQueue(); - if (!this.isProcessing) { - this.processAll(); - } - } - - private sortQueue(): void { - this._queue.sort((a, b) => { - if (a.isFolder && b.isFolder) { - return 0; - } - if (a.isFolder) { - return -1; - } - if (b.isFolder) { - return 1; - } - return 0; - }); - } - - public async processNext(): Promise { - const task = this._queue.shift(); - if (!task) return; - - logger.debug({ fn: "processNext", task }); - - switch (task.type) { - case "add": - return await this.actions.add(task); - case "hydrate": - return await this.actions.hydrate(task); - case "dehydrate": - return await this.actions.dehydrate(task); - case "change": - return await this.actions.change(task); - case "changeSize": - return await this.actions.changeSize(task); - default: - console.debug("Unknown task type."); - break; - } - } - - public async processAll(): Promise { - logger.debug({ fn: "processAll", queueLength: this._queue.length }); - - this.isProcessing = true; - while (this._queue.length > 0) { - await this.processNext(); - await sleep(200); - } - - this.isProcessing = false; - } -} diff --git a/examples/register.ts b/examples/register.ts index 583cfdfa..e7a10c9d 100644 --- a/examples/register.ts +++ b/examples/register.ts @@ -1,13 +1,13 @@ -import { QueueItem, VirtualDrive } from "src"; - import { logger } from "@/logger"; +import { QueueManager } from "@/queue/queue-manager"; +import { QueueItem } from "@/queue/queueManager"; +import VirtualDrive from "@/virtual-drive"; import { onCancelFetchDataCallback, onMessageCallback, onRenameCallbackWithCallback } from "./callbacks"; -import { onDeleteCallback } from "./callbacks/notify-delete.callback"; -import { onFetchDataCallback } from "./callbacks/notify-fetch-data.callback"; +import { notifyDeleteCallback } from "./callbacks/notify-delete.callback"; +import { fetchDataCallback } from "./callbacks/notify-fetch-data.callback"; import { drive } from "./drive"; import { addInfoItem, initInfoItems } from "./info-items-manager"; -import { QueueManager } from "./queueManager"; import settings from "./settings"; import { generateRandomFilesAndFolders } from "./utils/generate-random-file-tree"; @@ -18,13 +18,13 @@ drive.registerSyncRoot( settings.driveVersion, "{12345678-1234-1234-1234-123456789012}", { - notifyDeleteCallback: onDeleteCallback, + notifyDeleteCallback, notifyRenameCallback: onRenameCallbackWithCallback, - fetchDataCallback: onFetchDataCallback, + fetchDataCallback, cancelFetchDataCallback: onCancelFetchDataCallback, notifyMessageCallback: onMessageCallback, }, - settings.defaultIconPath, + settings.iconPath, ); const handleAdd = async (task: QueueItem) => { @@ -32,7 +32,6 @@ const handleAdd = async (task: QueueItem) => { logger.info({ fn: "handleAdd", path: task.path }); const id = await addInfoItem(task.path); drive.convertToPlaceholder(task.path, id); - // await drive.updateSyncStatus(task.path, task.isFolder, true); } catch (error) { logger.error(error, "handleAdd"); } @@ -69,12 +68,19 @@ const handleChangeSize = async (task: QueueItem) => { } }; -const queueManager = new QueueManager({ +const handlers = { handleAdd, handleHydrate, handleDehydrate, handleChangeSize, -}); +}; + +const notify = { + onTaskSuccess: async () => logger.info({ fn: "onTaskSuccess" }), + onTaskProcessing: async () => logger.info({ fn: "onTaskProcessing" }), +}; + +const queueManager = new QueueManager(handlers, notify, settings.queuePersistPath); drive.connectSyncRoot(); @@ -98,5 +104,3 @@ const fileGenerationOptions = { console.log("[EXAMPLE] error: " + error); } })(); - -export default drive; diff --git a/examples/settings.ts b/examples/settings.ts index 80069bc6..76b1d2a8 100644 --- a/examples/settings.ts +++ b/examples/settings.ts @@ -9,9 +9,10 @@ const settings = { driveName: "Internxt", driveVersion: "2.0.4", syncRootPath: join(TMP_PATH, "sync-root"), + iconPath: join(cwd(), "assets", "icon.ico"), defaultLogPath: join(TMP_PATH, "drive.log"), - defaultIconPath: join(cwd(), "assets", "icon.ico"), watcherLogPath: join(TMP_PATH, "watcher.log"), + queuePersistPath: join(TMP_PATH, "queue-manager.json"), }; export default settings; diff --git a/examples/utils/generate-random-file-tree.ts b/examples/utils/generate-random-file-tree.ts index ddf2a431..f5cebba8 100644 --- a/examples/utils/generate-random-file-tree.ts +++ b/examples/utils/generate-random-file-tree.ts @@ -1,4 +1,4 @@ -import { VirtualDrive } from 'src'; +import VirtualDrive from '@/virtual-drive'; import { v4 as uuidv4 } from 'uuid'; interface GenerateOptions { diff --git a/index.ts b/index.ts index be9c36de..328152c6 100644 --- a/index.ts +++ b/index.ts @@ -1,17 +1,5 @@ +import { QueueManager } from "./src/queue/queue-manager"; +import { IQueueManager, QueueItem, typeQueue, HandleAction, HandleActions } from "./src/queue/queueManager"; import VirtualDrive from "./src/virtual-drive"; -import { - IQueueManager, - QueueItem, - typeQueue, - HandleAction, - HandleActions, -} from "./src/queue/queueManager"; -export { - VirtualDrive, - QueueItem, - typeQueue, - IQueueManager, - HandleAction, - HandleActions, -}; +export { VirtualDrive, QueueItem, typeQueue, IQueueManager, HandleAction, HandleActions, QueueManager }; diff --git a/package.json b/package.json index b8338c79..6a38f2e9 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "devDependencies": { "@trivago/prettier-plugin-sort-imports": "^5.2.1", "@types/jest": "^29.5.12", + "@types/lodash.chunk": "^4.2.9", "@types/node": "^20.5.0", "@types/yargs": "^17.0.32", "jest": "^29.7.0", @@ -49,6 +50,7 @@ }, "dependencies": { "chokidar": "^3.6.0", + "lodash.chunk": "^4.2.0", "pino": "^9.6.0", "pino-pretty": "^13.0.0", "tsconfig-paths": "^4.2.0", diff --git a/src/index.ts b/src/index.ts deleted file mode 100644 index 9872d17a..00000000 --- a/src/index.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { - IQueueManager, - QueueItem, - typeQueue, - HandleAction, -} from "./queue/queueManager"; -import VirtualDrive from "./virtual-drive"; - -export { VirtualDrive, QueueItem, typeQueue, IQueueManager, HandleAction }; diff --git a/src/queue/queue-manager.ts b/src/queue/queue-manager.ts new file mode 100644 index 00000000..656d77f2 --- /dev/null +++ b/src/queue/queue-manager.ts @@ -0,0 +1,199 @@ +import fs from "fs"; +import lodashChunk from "lodash.chunk"; + +// import { logger } from "@/logger"; + +import { HandleAction, HandleActions, IQueueManager, QueueItem, typeQueue } from "./queueManager"; + +export type QueueHandler = { + handleAdd: HandleAction; + handleHydrate: HandleAction; + handleDehydrate: HandleAction; + handleChange?: HandleAction; + handleChangeSize: HandleAction; +}; + +export type QueueManagerCallback = { + onTaskSuccess: () => Promise; + onTaskProcessing: () => Promise; +}; + +export class QueueManager implements IQueueManager { + private queues: { [key: string]: QueueItem[] } = { + add: [], + hydrate: [], + dehydrate: [], + change: [], + changeSize: [], + }; + + private isProcessing: { [key: string]: boolean } = { + add: false, + hydrate: false, + dehydrate: false, + change: false, + changeSize: false, + }; + + private enqueueTimeout: NodeJS.Timeout | null = null; + private enqueueDelay = 2000; + + private readonly notify: QueueManagerCallback; + private readonly persistPath: string; + + actions: HandleActions; + + constructor(handlers: QueueHandler, notify: QueueManagerCallback, persistPath: string) { + this.actions = { + add: handlers.handleAdd, + hydrate: handlers.handleHydrate, + dehydrate: handlers.handleDehydrate, + changeSize: handlers.handleChangeSize, + change: handlers.handleChange || (() => Promise.resolve()), + }; + this.notify = notify; + this.persistPath = persistPath; + if (!fs.existsSync(this.persistPath)) { + fs.writeFileSync(this.persistPath, JSON.stringify(this.queues)); + } else { + this.loadQueueStateFromFile(); + } + } + private saveQueueStateToFile(): void { + if (!this.persistPath) return; + + fs.writeFileSync( + this.persistPath, + JSON.stringify( + { + add: [], + hydrate: this.queues.hydrate, + dehydrate: this.queues.dehydrate, + change: [], + changeSize: [], + }, + null, + 2, + ), + ); + } + + private loadQueueStateFromFile(): void { + console.debug("Loading queue state from file:" + this.persistPath); + if (this.persistPath) { + if (!fs.existsSync(this.persistPath)) { + this.saveQueueStateToFile(); + } + + const data = fs.readFileSync(this.persistPath, "utf-8"); + if (!data) { + return; + } + this.queues = JSON.parse(data); + } + } + + public clearQueue(): void { + this.queues = { + add: [], + hydrate: [], + dehydrate: [], + change: [], + changeSize: [], + }; + this.saveQueueStateToFile(); + } + + public enqueue(task: QueueItem): void { + console.debug(`Task enqueued: ${JSON.stringify(task)}`); + const existingTask = this.queues[task.type].find((item) => item.path === task.path && item.type === task.type); + if (existingTask) { + console.debug("Task already exists in queue. Skipping."); + return; + } + + this.queues[task.type].push(task); + this.sortQueue(task.type); + this.saveQueueStateToFile(); + this.resetEnqueueTimeout(); + } + + private resetEnqueueTimeout(): void { + if (this.enqueueTimeout) { + clearTimeout(this.enqueueTimeout); + } + + // Inicia el temporizador de espera + this.enqueueTimeout = setTimeout(() => { + console.debug("Processing all tasks"); + this.processAll(); + }, this.enqueueDelay); + } + + private sortQueue(type: typeQueue): void { + this.queues[type].sort((a, b) => { + if (a.isFolder && b.isFolder) { + return 0; + } + if (a.isFolder) { + return -1; + } + if (b.isFolder) { + return 1; + } + return 0; + }); + } + + private async processQueue(type: typeQueue): Promise { + if (this.isProcessing[type]) return; + + this.isProcessing[type] = true; + + if (type === typeQueue.add) { + await this.processInChunks(type, 7); + } else { + await this.processSequentially(type); + } + + this.isProcessing[type] = false; + } + + private async processInChunks(type: typeQueue, chunkSize: number): Promise { + const chunks = lodashChunk(this.queues[type], chunkSize); + + for (const chunk of chunks) { + await this.notify.onTaskProcessing(); + + await Promise.all(chunk.map((task) => this.processTask(type, task))); + this.queues[type] = this.queues[type].slice(chunk.length); + } + } + + private async processSequentially(type: typeQueue): Promise { + while (this.queues[type].length > 0) { + await this.notify.onTaskProcessing(); + + const task = this.queues[type].shift(); + this.saveQueueStateToFile(); + + if (task) await this.processTask(type, task); + } + } + + private async processTask(type: typeQueue, task: QueueItem): Promise { + console.debug(`Processing ${type} task: ${JSON.stringify(task)}`); + try { + await this.actions[task.type](task); + } catch (error) { + console.error(`Failed to process ${type} task:`, task, error); + } + } + + public async processAll(): Promise { + const taskTypes = Object.keys(this.queues) as typeQueue[]; + await this.notify.onTaskProcessing(); + await Promise.all(taskTypes.map((type: typeQueue) => this.processQueue(type))); + await this.notify.onTaskSuccess(); + } +} diff --git a/src/watcher/watcher.unit.test.ts b/src/watcher/watcher.unit.test.ts index 5da07070..7523219c 100644 --- a/src/watcher/watcher.unit.test.ts +++ b/src/watcher/watcher.unit.test.ts @@ -1,5 +1,4 @@ import { execSync } from "child_process"; -import { QueueManager } from "examples/queueManager"; import { existsSync } from "fs"; import { mkdir, writeFile } from "fs/promises"; import { join } from "path"; @@ -9,6 +8,8 @@ import { v4 } from "uuid"; import { beforeEach } from "vitest"; import { mockDeep } from "vitest-mock-extended"; +import { QueueManager } from "@/queue/queue-manager"; + import { OnAddDirService } from "./events/on-add-dir.service"; import { OnAddService } from "./events/on-add.service"; import { OnAllService } from "./events/on-all.service"; diff --git a/tsconfig.build.json b/tsconfig.build.json index 1b1f6761..9f3031e8 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -1,8 +1,11 @@ { "compilerOptions": { + "declaration": true, + "noEmit": false, + "outDir": "./dist", "types": [] }, + "exclude": ["**/*.test.ts"], "extends": "./tsconfig.json", - "include": ["src", "examples"], - "exclude": ["**/*.test.ts"] + "include": ["index.ts", "src", "examples"] } diff --git a/tsconfig.json b/tsconfig.json index c153ad34..6408cdd1 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,19 +1,18 @@ { "compilerOptions": { - "target": "ES2020", - "module": "CommonJS", - "strict": true, - "lib": ["ES2020", "DOM"], - "esModuleInterop": true, - "declaration": true, - "outDir": "./dist", "baseUrl": ".", - "resolveJsonModule": true, - "skipLibCheck": true, - "types": ["vitest/globals"], + "esModuleInterop": true, + "lib": ["ES2023"], + "module": "Node16", + "moduleResolution": "node16", + "noEmit": true, "paths": { - "@/*": ["./src/*"], - } + "@/*": ["./src/*"] + }, + "skipLibCheck": true, + "strict": true, + "target": "ES2022", + "types": ["vitest/globals"] }, - "include": ["index.ts", "src/**/*.ts", "*.d.ts", "examples/**/*.ts", "test/**/*.ts"], + "exclude": ["node_modules", "dist"] } diff --git a/yarn.lock b/yarn.lock index a653629b..caf60fdb 100644 --- a/yarn.lock +++ b/yarn.lock @@ -892,6 +892,18 @@ expect "^29.0.0" pretty-format "^29.0.0" +"@types/lodash.chunk@^4.2.9": + version "4.2.9" + resolved "https://registry.yarnpkg.com/@types/lodash.chunk/-/lodash.chunk-4.2.9.tgz#60da44c404dfa8b01b426034c1183e5eb9b09727" + integrity sha512-Z9VtFUSnmT0No/QymqfG9AGbfOA4O5qB/uyP89xeZBqDAsKsB4gQFTqt7d0pHjbsTwtQ4yZObQVHuKlSOhIJ5Q== + dependencies: + "@types/lodash" "*" + +"@types/lodash@*": + version "4.17.15" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.17.15.tgz#12d4af0ed17cc7600ce1f9980cec48fc17ad1e89" + integrity sha512-w/P33JFeySuhN6JLkysYUK2gEmy9kHHFN7E8ro0tkfmlDOgxBDzWEZ/J8cWA+fHqFevpswDTFZnDx+R9lbL6xw== + "@types/node@*": version "22.10.10" resolved "https://registry.yarnpkg.com/@types/node/-/node-22.10.10.tgz#85fe89f8bf459dc57dfef1689bd5b52ad1af07e6" @@ -2278,6 +2290,11 @@ locate-path@^5.0.0: dependencies: p-locate "^4.1.0" +lodash.chunk@^4.2.0: + version "4.2.0" + resolved "https://registry.yarnpkg.com/lodash.chunk/-/lodash.chunk-4.2.0.tgz#66e5ce1f76ed27b4303d8c6512e8d1216e8106bc" + integrity sha512-ZzydJKfUHJwHa+hF5X66zLFCBrWn5GeF28OHEr4WVWtNDXlQ/IjWKPBiikqKo2ne0+v6JgCgJ0GzJp8k8bHC7w== + lodash.memoize@^4.1.2: version "4.1.2" resolved "https://registry.yarnpkg.com/lodash.memoize/-/lodash.memoize-4.1.2.tgz#bcc6c49a42a2840ed997f323eada5ecd182e0bfe"