From 3f2c0a04ed03c97fbd86d1e0d82fc46402f99b0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 22:13:15 +0100 Subject: [PATCH 1/8] feat: add sqlite --- .env.template | 21 ++ .github/workflows/ci-workflow.yml | 2 + .vscode/settings.json | 27 ++ docs/common-api.md | 128 +++++++++ docs/sqlite-api.md | 250 ++++++++++++++++++ jest.config.js | 2 + package.json | 57 +++- .../AbstractBatchEntityReaderStream.ts | 90 +++++++ .../AbstractBatchEntityWriterStream.ts | 98 +++++++ src/common/classes/_index.ts | 2 + src/common/index.ts | 1 + src/sqlite/classes/SqliteBatchEntityReader.ts | 139 ++++++++++ src/sqlite/classes/SqliteBatchEntityWriter.ts | 155 +++++++++++ src/sqlite/classes/_index.ts | 2 + src/sqlite/index.ts | 1 + .../AbstractBatchEntityReaderStream.test.ts | 134 ++++++++++ .../AbstractBatchEntityWriterStream.test.ts | 124 +++++++++ test/global-setup.ts | 5 + .../classes/SqliteBatchEntityReader.test.ts | 115 ++++++++ .../classes/SqliteBatchEntityWriter.test.ts | 59 +++++ test/sqlite/mocks/UserBatchReader.ts | 19 ++ test/sqlite/mocks/UserBatchWriter.ts | 18 ++ test/sqlite/mocks/UserDTO.ts | 8 + test/sqlite/mocks/UserDatabase.ts | 33 +++ test/sqlite/tmp/.gitignore | 2 + test/sqlite/utils/db.ts | 6 + tsconfig.json | 3 +- 27 files changed, 1488 insertions(+), 13 deletions(-) create mode 100644 .env.template create mode 100644 .vscode/settings.json create mode 100644 docs/common-api.md create mode 100644 docs/sqlite-api.md create mode 100644 src/common/classes/AbstractBatchEntityReaderStream.ts create mode 100644 src/common/classes/AbstractBatchEntityWriterStream.ts create mode 100644 src/common/classes/_index.ts create mode 100644 src/common/index.ts create mode 100644 src/sqlite/classes/SqliteBatchEntityReader.ts create mode 100644 src/sqlite/classes/SqliteBatchEntityWriter.ts create mode 100644 src/sqlite/classes/_index.ts create mode 100644 src/sqlite/index.ts create mode 100644 test/common/classes/AbstractBatchEntityReaderStream.test.ts create mode 100644 test/common/classes/AbstractBatchEntityWriterStream.test.ts create mode 100644 test/global-setup.ts create mode 100644 test/sqlite/classes/SqliteBatchEntityReader.test.ts create mode 100644 test/sqlite/classes/SqliteBatchEntityWriter.test.ts create mode 100644 test/sqlite/mocks/UserBatchReader.ts create mode 100644 test/sqlite/mocks/UserBatchWriter.ts create mode 100644 test/sqlite/mocks/UserDTO.ts create mode 100644 test/sqlite/mocks/UserDatabase.ts create mode 100644 test/sqlite/tmp/.gitignore create mode 100644 test/sqlite/utils/db.ts diff --git a/.env.template b/.env.template new file mode 100644 index 0000000..f9aac07 --- /dev/null +++ b/.env.template @@ -0,0 +1,21 @@ +# Environment variables for testing + +CI=true + +MARIADB_HOST= +MARIADB_PORT= +MARIADB_USER= +MARIADB_PASSWORD= +MARIADB_DATABASE= + +MYSQL_HOST= +MYSQL_PORT= +MYSQL_USER= +MYSQL_PASSWORD= +MYSQL_DATABASE= + +POSTGRESQL_HOST= +POSTGRESQL_PORT= +POSTGRESQL_USER= +POSTGRESQL_PASSWORD= +POSTGRESQL_DATABASE= diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml index 51425be..32f369a 100644 --- a/.github/workflows/ci-workflow.yml +++ b/.github/workflows/ci-workflow.yml @@ -42,6 +42,8 @@ jobs: - name: Run tests run: npm run test + env: + CI: true - name: Upload coverage report if: always() diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..8409a4e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,27 @@ +{ + /** + ----- Linting configurations + */ + // Prevent editor formatting on save for certain file types + "[javascript]": { + "editor.formatOnSave": false, + "editor.formatOnPaste": false + }, + "[typescript]": { + "editor.formatOnSave": false, + "editor.formatOnPaste": false + }, + // Configure eslint to report + fix errors for correct file types + "eslint.validate": ["javascript", "typescript"], + "editor.codeActionsOnSave": { + "source.fixAll.eslint": "always" + }, + // Configuration cspell + "cSpell.language": "en", + "cSpell.caseSensitive": false, + "cSpell.languageSettings": [ + { "languageId": "typescript", "caseSensitive": false } + ], + "cSpell.words": ["Alcaraz", "batchjs", "Martínez"], + "liveServer.settings.port": 5501 +} diff --git a/docs/common-api.md b/docs/common-api.md new file mode 100644 index 0000000..196f2c6 --- /dev/null +++ b/docs/common-api.md @@ -0,0 +1,128 @@ +# Common API + +In this documentation, we will focus on the common API. This module includes the core of BatchJS. This API allows you to create your own custom jobs and steps using the common interface. + +------------ + + + +## Table of Contents + + - [AbstractBatchEntityReaderStream](#abstractbatchentityreaderstream) + - [AbstractBatchEntityWriterStream](#abstractbatchentitywriterstream) + +## AbstractBatchEntityReaderStream + +`extends ObjectReadable` + +Class that enable to implement classes to read data in batches of a specified size in different types of data storage. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the AbstractBatchEntityReaderStream. | AbstractBatchEntityReaderStreamOptions | + | **options.batchSize** | The maximum number of elements in a batch. | number | + + + +### _read (function) + + + +Reads a batch of data from the data storage and pushes it to the consumer stream. +If the size parameter is not specified, it reads the number of entities specified in the batchSize option. +If the size parameter is specified, it reads the minimum of the size and the batchSize option. +If no data is available, it pushes null to the consumer stream to signal that the end of the stream has been reached. +If an error occurs while reading data, it emits an error event to the stream. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **size** | The size parameter for controlling the read operation. | number | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the data has been read and pushed to the consumer stream. | + + +### _flush (function) + +`private` + +Flushes the buffer by pushing its content to the consumer stream. If the consumer stream is not ready to receive data, it waits for the drain event and flushes the buffer again when it is emitted. +This function is recursive and will keep flushing the buffer until it is empty. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the buffer is flushed. | + + +## AbstractBatchEntityWriterStream + +`extends ObjectWritable` + +Class that enable to implement classes to write data in batches of a specified size in different types of data storage. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the AbstractBatchEntityWriterStream. | AbstractBatchEntityWriterStreamOptions | + | **options.batchSize** | The maximum number of elements in a batch. | number | + + + +### _write (function) + + + +A method to write data to the stream, push the chunk to the buffer, and execute the callback. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **chunk** | The data chunk to write to the stream. | T | + | **encoding** | The encoding of the data. | BufferEncoding | + | **callback** | The callback function to be executed after writing the data. | WriteCallback | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | This function does not return anything. | + + +### _final (function) + + + +Finalizes the stream by pushing remaining data batches, handling errors, +and executing the final callback. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **callback** | The callback function to be executed after finalizing the stream. | WriteCallback | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | This function does not return anything. | + + +### _flush (function) + +`private` + +Creates a batch of data from the buffer and flushes it to the storage. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | | + + diff --git a/docs/sqlite-api.md b/docs/sqlite-api.md new file mode 100644 index 0000000..71e1e8e --- /dev/null +++ b/docs/sqlite-api.md @@ -0,0 +1,250 @@ +# Common API + +In this documentation, we will focus on the common API. This module includes the core of BatchJS. This API allows you to create your own custom jobs and steps using the common interface. + +------------ + + + +## Table of Contents + + - [SqliteBatchEntityReader](#sqlitebatchentityreader) + - [SqliteBatchEntityWriter](#sqlitebatchentitywriter) + +## SqliteBatchEntityReader + +`extends AbstractBatchEntityReaderStream` + +Class that read data in batches of a specified size in SQLite databases. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the SqliteBatchEntityReader. | SqliteBatchEntityReaderOptions | + | **options.dbConnectionFactory** | Function that creates a database connection. | function | + + + +### fetch (function) + +`private` + +Fetches a batch of data from the database. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **size** | The size of the batch to fetch. | number | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<BatchData.<T>> | A promise that resolves with the batch of data. | + + +### _destroy (function) + +`private` + +Destroys the writer by finalizing the statement used to read entities and +closing the database connection. This method should be called when the +writer is no longer needed to free up resources. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **error** | The error that caused the destruction. | Error, null | + | **callback** | The callback function to be executed after destroying the reader. | ReadCallback | + + +### connectDatabase (function) + +`private` + +Connects to the database by creating a new database connection if none +already exists, or by reusing an existing connection. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<sqlite.Database> | A promise that resolves with the database connection. | + + +### disconnectDatabase (function) + + + +Disconnects from the database by closing the active database connection +and setting the connection reference to null. This method should be called +when the reader is no longer needed to free up resources. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the database connection +is successfully closed. | + + +### prepareStatement (function) + +`private` + +Prepares a statement for fetching entities. If the statement has already been +prepared, it is reused. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **db** | The database connection. | sqlite.Database | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<sqlite.Statement> | The prepared statement. | + + +### finalizeStatement (function) + +`private` + +Finalizes the statement used to fetch entities. This method should be +called when the reader is no longer needed to free up resources. + + +## SqliteBatchEntityWriter + +`extends AbstractBatchEntityWriterStream` + +Class that write data in batches of a specified size in SQLite databases. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the SqliteBatchEntityWriter. | SqliteBatchEntityWriterOptions | + | **options.dbConnectionFactory** | Function that creates a database connection. | function | + + + +### batchWrite (function) + +`protected` + +Writes a batch of data to the storage. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **chunk** | The batch of data to write to the storage. | BatchData.<T> | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the batch is successfully written. +The promise should be rejected if there is an error during writing. | + + +### executeBatch (function) + +`private` + +Executes a batch of data in the database, commits the transaction if all +promises are resolved, or rolls back the transaction if any of the promises +are rejected. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **db** | The database connection. | sqlite3.Database | + | **chunk** | The batch of data to write to the storage. | BatchData.<T> | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<Array.<void>> | A promise that resolves when all promises are resolved. | + + +### commitTransaction (function) + +`private` + +Commits the transaction if no errors occurred during the batch execution. +If an error occurred, the transaction is rolled back and the error is propagated. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **db** | The database connection. | sqlite.Database | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the transaction is committed. | + + +### rollbackTransaction (function) + +`private` + +Rolls back the transaction if an error occurred during the batch execution. +The error is propagated to the caller. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **db** | The database connection. | sqlite.Database | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the transaction is rolled back. | + + +### prepareStatement (function) + +`private` + +Prepares a statement for saving entities. If the statement has already been +prepared, it is reused. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **db** | The database connection. | sqlite.Database | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<sqlite.Statement> | The prepared statement. | + + +### finalizeStatement (function) + +`private` + +Finalizes the statement used to save entities. This method should be +called when the writer is no longer needed to free up resources. + + +### _final (function) + +`private` + +Finalizes the writer by calling the _final method of the superclass and +finalizing the statement used to save entities. This method should be +called when the writer is no longer needed to free up resources. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **callback** | The callback function to be executed after finalizing the writer. | WriteCallback | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | | + + diff --git a/jest.config.js b/jest.config.js index b332273..e330579 100644 --- a/jest.config.js +++ b/jest.config.js @@ -23,6 +23,8 @@ module.exports = { lines: 80, }, }, + globalSetup: "/test/global-setup.ts", coverageDirectory: "coverage", reporters: [["github-actions", {silent: false}], "summary"], + maxConcurrency: 1 }; \ No newline at end of file diff --git a/package.json b/package.json index b46beab..9f7307f 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "batchjs-sql", + "name": "batchjs-data", "version": "1.0.0", "author": { "name": "Pablo Alcaraz Martínez", @@ -9,7 +9,7 @@ "type": "GitHub Sponsors", "url": "https://github.com/sponsors/palcarazm" }, - "description": "Extension for BatchJS adding data storage support for SQL databases", + "description": "Extension for BatchJS adding data storage support for databases", "main": "dist/cjs/index.js", "module": "dist/esm/index.js", "types": "dist/@types", @@ -23,28 +23,32 @@ "build:esm": "tsc --project tsconfig.esm.json", "build": "npm run build:cjs && npm run build:esm", "build:watch": "tsc --build tsconfig.cjs.json --watch", - "docs": "npm run docs:sqlite && npm run docs:streams", + "docs": "npm run docs:common && npm run docs:sqlite && npm run docs:postgresql && npm run docs:mariadb && npm run docs:mysql", + "docs:common": "jsdoc2md --files ./src/common/**/*.ts --helper ./scripts/jsdoc2md/helpers.js --configure ./scripts/jsdoc2md/jsdoc2md.rc --template ./scripts/jsdoc2md/template-common.hbs > ./docs/common-api.md", "docs:sqlite": "jsdoc2md --files ./src/sqlite/**/*.ts --helper ./scripts/jsdoc2md/helpers.js --configure ./scripts/jsdoc2md/jsdoc2md.rc --template ./scripts/jsdoc2md/template-common.hbs > ./docs/sqlite-api.md", + "docs:postgresql": "jsdoc2md --files ./src/postgresql/**/*.ts --helper ./scripts/jsdoc2md/helpers.js --configure ./scripts/jsdoc2md/jsdoc2md.rc --template ./scripts/jsdoc2md/template-common.hbs > ./docs/postgresql-api.md", + "docs:mariadb": "jsdoc2md --files ./src/mariadb/**/*.ts --helper ./scripts/jsdoc2md/helpers.js --configure ./scripts/jsdoc2md/jsdoc2md.rc --template ./scripts/jsdoc2md/template-common.hbs > ./docs/mariadb-api.md", + "docs:mysql": "jsdoc2md --files ./src/mysql/**/*.ts --helper ./scripts/jsdoc2md/helpers.js --configure ./scripts/jsdoc2md/jsdoc2md.rc --template ./scripts/jsdoc2md/template-common.hbs > ./docs/mysql-api.md", "test": "jest --coverage", "lint": "eslint ./src", "prepare": "husky" }, "keywords": [ - "SQL batch", - "SQL", + "database batch framework", + "database", "batch", "framework", "streams" ], "repository": { "type": "git", - "url": "git+https://github.com/palcarazm/batchjs-sql.git" + "url": "git+https://github.com/palcarazm/batchjs-data.git" }, "license": "MIT", "bugs": { - "url": "https://github.com/palcarazm/batchjs-sql/issues" + "url": "https://github.com/palcarazm/batchjs-data/issues" }, - "homepage": "https://github.com/palcarazm/batchjs-sql#readme", + "homepage": "https://github.com/palcarazm/batchjs-data#readme", "devDependencies": { "@babel/core": "^7.24.9", "@babel/preset-env": "^7.24.8", @@ -55,6 +59,7 @@ "@typescript-eslint/eslint-plugin": "^8.20.0", "@typescript-eslint/parser": "^8.20.0", "commitlint": "^19.2.1", + "dotenv": "^16.4.7", "eslint": "^9.8.0", "globals": "^15.9.0", "husky": "^9.0.11", @@ -65,17 +70,45 @@ "ts-jest": "^29.2.3", "typescript": "^5.5.4" }, + "optionalDependencies": { + "@types/pg": "^8.11.11", + "mariadb": "^3.4.0", + "mysql2": "^3.12.0", + "pg": "^8.13.1", + "sqlite": "^5.1.1", + "sqlite3": "^5.1.7" + }, "dependencies": { - "batchjs": "^1.1.0" + "batchjs": "^1.0.0" }, "commitlint": { "extends": "@commitlint/config-conventional" }, "exports": { ".": { - "types": "./dist/@types/index.d.ts", - "import": "./dist/esm/index.js", - "require": "./dist/cjs/index.js" + "types": "./dist/@types/common/index.d.ts", + "import": "./dist/esm/common/index.js", + "require": "./dist/cjs/common/index.js" + }, + "./sqlite": { + "types": "./dist/@types/sqlite/index.d.ts", + "import": "./dist/esm/sqlite/index.js", + "require": "./dist/cjs/sqlite/index.js" + }, + "./postgresql": { + "types": "./dist/@types/postgresql/index.d.ts", + "import": "./dist/esm/postgresql/index.js", + "require": "./dist/cjs/postgresql/index.js" + }, + "./mariadb": { + "types": "./dist/@types/mariadb/index.d.ts", + "import": "./dist/esm/mariadb/index.js", + "require": "./dist/cjs/mariadb/index.js" + }, + "./mysql": { + "types": "./dist/@types/mysql/index.d.ts", + "import": "./dist/esm/mysql/index.js", + "require": "./dist/cjs/mysql/index.js" } } } diff --git a/src/common/classes/AbstractBatchEntityReaderStream.ts b/src/common/classes/AbstractBatchEntityReaderStream.ts new file mode 100644 index 0000000..846215c --- /dev/null +++ b/src/common/classes/AbstractBatchEntityReaderStream.ts @@ -0,0 +1,90 @@ +import { ObjectReadable, ObjectReadableOptions, BatchData } from "batchjs/streams"; + +/** + * @interface + * Options for the AbstractBatchEntityReaderStream. + * @extends ObjectReadableOptions + */ +export interface AbstractBatchEntityReaderStreamOptions extends ObjectReadableOptions { + batchSize: number; +} + +/** + * @class + * Class that enable to implement classes to read data in batches of a specified size in different types of data storage. + * @extends ObjectReadable + * @template T + */ +export abstract class AbstractBatchEntityReaderStream extends ObjectReadable { + private reading: boolean = false; + protected buffer: BatchData = []; + private readonly batchSize: number; + + /** + * @constructor + * @param {AbstractBatchEntityReaderStreamOptions} options - The options for the AbstractBatchEntityReaderStream. + * @param [options.batchSize] {number} - The maximum number of elements in a batch. + */ + constructor(options: AbstractBatchEntityReaderStreamOptions) { + super(options); + this.batchSize = options.batchSize; + } + + /** + * Reads a batch of data from the data storage and pushes it to the consumer stream. + * If the size parameter is not specified, it reads the number of entities specified in the `batchSize` option. + * If the size parameter is specified, it reads the minimum of the size and the `batchSize` option. + * If no data is available, it pushes null to the consumer stream to signal that the end of the stream has been reached. + * If an error occurs while reading data, it emits an error event to the stream. + * + * @param {number} [size] - The size parameter for controlling the read operation. + * @returns {Promise} A promise that resolves when the data has been read and pushed to the consumer stream. + */ + async _read(size: number): Promise { + if (this.reading) return; + this.reading = true; + try { + const entities: T[] = await this.fetch(Math.min(size, this.batchSize)); + if (entities.length === 0) { + this.push(null); + }else{ + this.buffer.push(...entities); + await this._flush(); + } + } catch (error) { + this.emit("error", error as Error); + }finally{ + this.reading = false; + } + } + + /** + * Flushes the buffer by pushing its content to the consumer stream. If the consumer stream is not ready to receive data, it waits for the drain event and flushes the buffer again when it is emitted. + * This function is recursive and will keep flushing the buffer until it is empty. + * + * @private + * @returns {Promise} A promise that resolves when the buffer is flushed. + */ + private async _flush():Promise{ + while (this.buffer.length > 0) { + const chunk = this.buffer.shift() as T; + if (!this.push(chunk)) { + this.buffer.unshift(chunk); + await new Promise((resolve) => this.once("drain", resolve)); + } + } + return Promise.resolve(); + }; + + /** + * Abstract method for fetching data from the data storage. This method should be implemented + * by subclasses to define the specific logic for reading a batch of data. + * + * @protected + * @abstract + * @param size {number} - The size parameter for controlling the read operation. + * @returns {Promise} A promise that resolves with an array of entities. + */ + protected abstract fetch(size: number): Promise; + +} \ No newline at end of file diff --git a/src/common/classes/AbstractBatchEntityWriterStream.ts b/src/common/classes/AbstractBatchEntityWriterStream.ts new file mode 100644 index 0000000..c26d62f --- /dev/null +++ b/src/common/classes/AbstractBatchEntityWriterStream.ts @@ -0,0 +1,98 @@ +import { ObjectWritable, ObjectWritableOptions, WriteCallback, BatchData } from "batchjs/streams"; + +/** + * @interface + * Options for the AbstractBatchEntityWriterStream. + * @extends ObjectWritableOptions + */ +export interface AbstractBatchEntityWriterStreamOptions extends ObjectWritableOptions { + batchSize: number; +} + +/** + * @class + * Class that enable to implement classes to write data in batches of a specified size in different types of data storage. + * @extends ObjectWritable + * @template T + */ +export abstract class AbstractBatchEntityWriterStream extends ObjectWritable { + protected buffer: BatchData = []; + private readonly batchSize: number; + + /** + * @constructor + * @param {AbstractBatchEntityWriterStreamOptions} options - The options for the AbstractBatchEntityWriterStream. + * @param [options.batchSize] {number} - The maximum number of elements in a batch. + */ + constructor(options: AbstractBatchEntityWriterStreamOptions) { + super(options); + this.batchSize = options.batchSize; + } + + /** + * A method to write data to the stream, push the chunk to the buffer, and execute the callback. + * + * @param {T} chunk - The data chunk to write to the stream. + * @param {BufferEncoding} encoding - The encoding of the data. + * @param {WriteCallback} callback - The callback function to be executed after writing the data. + * @return {Promise} This function does not return anything. + */ + async _write(chunk: T, encoding: BufferEncoding, callback: WriteCallback): Promise { + try { + this.buffer.push(chunk); + if (this.buffer.length >= this.batchSize) { + await this._flush(); + } + callback(); + } catch (error) { + callback(error as Error); + } + } + + /** + * Finalizes the stream by pushing remaining data batches, handling errors, + * and executing the final callback. + * + * @param {WriteCallback} callback - The callback function to be executed after finalizing the stream. + * @return {Promise} This function does not return anything. + */ + async _final(callback: WriteCallback): Promise { + try { + await this._flush(); + callback(); + } catch (error) { + callback(error as Error); + } + } + + /** + * Creates a batch of data from the buffer and flushes it to the storage. + * + * @private + * @returns {Promise} + */ + private async _flush(): Promise { + if (this.buffer.length === 0) return Promise.resolve(); + const batch = [...this.buffer]; + this.buffer = []; + try { + await this.batchWrite(batch); + } catch (error) { + this.buffer.unshift(...batch); + return Promise.reject(error as Error); + } + } + + /** + * Writes a batch of data to the storage. This method should be implemented + * by subclasses to define the specific logic for writing a batch of data. + * + * @protected + * @abstract + * @param {BatchData} chunk - The batch of data to write to the storage. + * @returns {Promise} A promise that resolves when the batch is successfully written. + * The promise should be rejected if there is an error during writing. + */ + protected abstract batchWrite(chunk: BatchData): Promise; + +} \ No newline at end of file diff --git a/src/common/classes/_index.ts b/src/common/classes/_index.ts new file mode 100644 index 0000000..5b80505 --- /dev/null +++ b/src/common/classes/_index.ts @@ -0,0 +1,2 @@ +export * from "./AbstractBatchEntityReaderStream"; +export * from "./AbstractBatchEntityWriterStream"; \ No newline at end of file diff --git a/src/common/index.ts b/src/common/index.ts new file mode 100644 index 0000000..e2a1bf3 --- /dev/null +++ b/src/common/index.ts @@ -0,0 +1 @@ +export * from "./classes/_index"; \ No newline at end of file diff --git a/src/sqlite/classes/SqliteBatchEntityReader.ts b/src/sqlite/classes/SqliteBatchEntityReader.ts new file mode 100644 index 0000000..b5bb2b0 --- /dev/null +++ b/src/sqlite/classes/SqliteBatchEntityReader.ts @@ -0,0 +1,139 @@ +import sqlite from "sqlite"; +import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; +import { BatchData, ReadCallback } from "batchjs/streams"; + +/** + * @interface + * Options for the SqliteBatchEntityReader. + * @extends AbstractBatchEntityReaderStreamOptions + */ +export interface SqliteBatchEntityReaderOptions extends AbstractBatchEntityReaderStreamOptions { + dbConnectionFactory: ()=>Promise; + query: string; +} + +/** + * @class + * Class that read data in batches of a specified size in SQLite databases. + * @extends AbstractBatchEntityReaderStream + * @template T + */ +export abstract class SqliteBatchEntityReader extends AbstractBatchEntityReaderStream { + private readonly dbConnectionFactory: ()=>Promise; + private dbConnection: sqlite.Database | null = null; + private readonly query: string; + private fetchEntityStatement: sqlite.Statement | null = null; + private entitiesRead : number = 0; + + /** + * @constructor + * @param {SqliteBatchEntityReaderOptions} options - The options for the SqliteBatchEntityReader. + * @param [options.dbConnectionFactory] {Function} - Function that creates a database connection. + */ + constructor(options: SqliteBatchEntityReaderOptions) { + super(options); + this.dbConnectionFactory = options.dbConnectionFactory; + this.query = options.query; + } + + /** + * Fetches a batch of data from the database. + * + * @private + * @param {number} size - The size of the batch to fetch. + * @returns {Promise>} A promise that resolves with the batch of data. + */ + protected async fetch(size:number): Promise> { + return this.connectDatabase() + .then((db)=>this.prepareStatement(db)) + .then((statement) => statement.all({ "@limit": size, "@offset": this.entitiesRead })) + .then((results: unknown[]) => { + this.entitiesRead += size; + return results.map(this.rowToEntity);; + }); + } + + /** + * Destroys the writer by finalizing the statement used to read entities and + * closing the database connection. This method should be called when the + * writer is no longer needed to free up resources. + * @see AbstractBatchEntityReaderStream._destroy + * @private + * @param error {Error|null} - The error that caused the destruction. + * @param callback {ReadCallback} - The callback function to be executed after destroying the reader. + */ + _destroy(error: Error | null, callback: ReadCallback):void { + let destroyError = error; + + this.finalizeStatement() + .then(()=>this.disconnectDatabase()) + .catch((error) => {destroyError = error;}) + .finally(() => super._destroy(destroyError, callback)); + } + + + /** + * Connects to the database by creating a new database connection if none + * already exists, or by reusing an existing connection. + * @private + * @returns {Promise} A promise that resolves with the database connection. + */ + private async connectDatabase(): Promise { + if (!this.dbConnection) { + this.dbConnection = await this.dbConnectionFactory(); + } + return this.dbConnection; + } + + /** + * Disconnects from the database by closing the active database connection + * and setting the connection reference to null. This method should be called + * when the reader is no longer needed to free up resources. + * @returns {Promise} A promise that resolves when the database connection + * is successfully closed. + */ + private async disconnectDatabase(): Promise { + if (this.dbConnection) { + await this.dbConnection.close(); + this.dbConnection = null; + } + } + + /** + * Prepares a statement for fetching entities. If the statement has already been + * prepared, it is reused. + * @private + * @param {sqlite.Database} db - The database connection. + * @returns {Promise} The prepared statement. + */ + private async prepareStatement(db: sqlite.Database): Promise { + if (!this.fetchEntityStatement) { + this.fetchEntityStatement = await db.prepare( `${this.query} LIMIT @limit OFFSET @offset`); + } + return this.fetchEntityStatement; + } + + /** + * Finalizes the statement used to fetch entities. This method should be + * called when the reader is no longer needed to free up resources. + * + * @private + */ + private async finalizeStatement(): Promise { + if (this.fetchEntityStatement) { + await this.fetchEntityStatement.finalize(); + this.fetchEntityStatement = null; + } + } + + + /** + * Abstract method to convert a row to an entity. This method should be implemented + * by subclasses to define the specific logic for reading a batch of data. + * + * @abstract + * @protected + * @param row {unknown} - The row to be converted to an entity. + */ + protected abstract rowToEntity(row: unknown): T +} \ No newline at end of file diff --git a/src/sqlite/classes/SqliteBatchEntityWriter.ts b/src/sqlite/classes/SqliteBatchEntityWriter.ts new file mode 100644 index 0000000..f21bdaf --- /dev/null +++ b/src/sqlite/classes/SqliteBatchEntityWriter.ts @@ -0,0 +1,155 @@ +import sqlite from "sqlite"; +import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common/index"; +import { BatchData, WriteCallback } from "batchjs/streams"; + +/** + * @interface + * Options for the SqliteBatchEntityWriter. + * @extends AbstractBatchEntityWriterStreamOptions + */ +export interface SqliteBatchEntityWriterOptions extends AbstractBatchEntityWriterStreamOptions { + dbConnectionFactory: ()=>Promise; + prepareStatement: string; +} + +/** + * @class + * Class that write data in batches of a specified size in SQLite databases. + * @extends AbstractBatchEntityWriterStream + * @template T + */ +export abstract class SqliteBatchEntityWriter extends AbstractBatchEntityWriterStream { + private readonly dbConnectionFactory: ()=>Promise; + private readonly prepareStatementString: string; + private saveEntityStatement: sqlite.Statement | null = null; + + /** + * @constructor + * @param {SqliteBatchEntityWriterOptions} options - The options for the SqliteBatchEntityWriter. + * @param [options.dbConnectionFactory] {Function} - Function that creates a database connection. + */ + constructor(options: SqliteBatchEntityWriterOptions) { + super(options); + this.dbConnectionFactory = options.dbConnectionFactory; + this.prepareStatementString = options.prepareStatement; + } + + /** + * Writes a batch of data to the storage. + * + * @protected + * @param {BatchData} chunk - The batch of data to write to the storage. + * @returns {Promise} A promise that resolves when the batch is successfully written. + * The promise should be rejected if there is an error during writing. + */ + protected async batchWrite(chunk: BatchData): Promise{ + const db = await this.dbConnectionFactory(); + return db.exec("BEGIN TRANSACTION") + .then(()=>this.executeBatch(db, chunk)) + .then(()=>this.commitTransaction(db)) + .catch((error) => { + this.rollbackTransaction(db); + return Promise.reject(error as Error); + }).finally(async()=>{ + await this.finalizeStatement(); + await db.close(); + }); + } + + /** + * Executes a batch of data in the database, commits the transaction if all + * promises are resolved, or rolls back the transaction if any of the promises + * are rejected. + * + * @private + * @param {sqlite3.Database} db - The database connection. + * @param {BatchData} chunk - The batch of data to write to the storage. + * @returns {Promise} A promise that resolves when all promises are resolved. + */ + private async executeBatch(db: sqlite.Database, chunk: BatchData) : Promise { + const stmt = await this.prepareStatement(db); + const promises = chunk.map((entity) => this.saveEntity(entity, stmt)); + return Promise.all(promises); + } + + /** + * Commits the transaction if no errors occurred during the batch execution. + * If an error occurred, the transaction is rolled back and the error is propagated. + * + * @private + * @param {sqlite.Database} db - The database connection. + * @returns {Promise} A promise that resolves when the transaction is committed. + */ + private async commitTransaction(db: sqlite.Database):Promise { + return db.exec("COMMIT"); + } + + /** + * Rolls back the transaction if an error occurred during the batch execution. + * The error is propagated to the caller. + * + * @private + * @param {sqlite.Database} db - The database connection. + * @returns {Promise} A promise that resolves when the transaction is rolled back. + */ + private async rollbackTransaction(db: sqlite.Database):Promise { + return db.exec("ROLLBACK"); + } + + /** + * Prepares a statement for saving entities. If the statement has already been + * prepared, it is reused. + * @private + * @param {sqlite.Database} db - The database connection. + * @returns {Promise} The prepared statement. + */ + private async prepareStatement(db: sqlite.Database): Promise { + if (!this.saveEntityStatement) { + this.saveEntityStatement = await db.prepare(this.prepareStatementString); + } + return this.saveEntityStatement; + } + + /** + * Finalizes the statement used to save entities. This method should be + * called when the writer is no longer needed to free up resources. + * + * @private + */ + private async finalizeStatement(): Promise { + if (this.saveEntityStatement) { + await this.saveEntityStatement.finalize(); + this.saveEntityStatement = null; + } + } + + /** + * Finalizes the writer by calling the _final method of the superclass and + * finalizing the statement used to save entities. This method should be + * called when the writer is no longer needed to free up resources. + * @see AbstractBatchEntityWriterStream._final + * @private + * @param callback {WriteCallback} - The callback function to be executed after finalizing the writer. + * @returns {Promise} + */ + async _final(callback: WriteCallback): Promise { + try { + await super._final(callback); + } finally { + await this.finalizeStatement(); + } + } + + /** + * Save or update an entity in the database.This method should be implemented + * by subclasses to define the specific logic for writing a batch of data. + * + * @protected + * @abstract + * @param entity {T} - Entity to be added or updated in the database + * @param stmt {sqlite.Statement} - Database Statement + * @returns {Promise} + */ + protected abstract saveEntity(entity: T, stmt: sqlite.Statement): Promise; + +} \ No newline at end of file diff --git a/src/sqlite/classes/_index.ts b/src/sqlite/classes/_index.ts new file mode 100644 index 0000000..66e710b --- /dev/null +++ b/src/sqlite/classes/_index.ts @@ -0,0 +1,2 @@ +export * from "./SqliteBatchEntityReader"; +export * from "./SqliteBatchEntityWriter"; \ No newline at end of file diff --git a/src/sqlite/index.ts b/src/sqlite/index.ts new file mode 100644 index 0000000..e2a1bf3 --- /dev/null +++ b/src/sqlite/index.ts @@ -0,0 +1 @@ +export * from "./classes/_index"; \ No newline at end of file diff --git a/test/common/classes/AbstractBatchEntityReaderStream.test.ts b/test/common/classes/AbstractBatchEntityReaderStream.test.ts new file mode 100644 index 0000000..af15606 --- /dev/null +++ b/test/common/classes/AbstractBatchEntityReaderStream.test.ts @@ -0,0 +1,134 @@ +import { AbstractBatchEntityReaderStream } from "../../../src/common/index"; + +describe("AbstractBatchEntityReaderStream", () => { + class AbstractBatchEntityReaderStreamImplementation extends AbstractBatchEntityReaderStream { + private chunks: string[]; + + constructor(chunks: string[], batchSize: number) { + super({ batchSize }); + this.chunks = chunks; + } + + protected async fetch(size: number): Promise { + const batch = this.chunks.splice(0, size); + return Promise.resolve(batch); + } + } + + let reader: AbstractBatchEntityReaderStreamImplementation; + let chunks: Array; + const data: Array = ["A", "B", "C", "D", "E", "F"]; + Object.freeze(data); + + beforeEach(() => { + chunks = Object.assign([], data); + reader = new AbstractBatchEntityReaderStreamImplementation(chunks, 2); + }); + + test('should emit end event when all data is read', (done) => { + reader = new AbstractBatchEntityReaderStreamImplementation([], 2); + const result: string[] = []; + + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("end", () => { + expect(result).toEqual([]); + done(); + }); + + reader.read(); + }); + + test('should emit data in batches', (done) => { + const result: string[] = []; + + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("end", () => { + expect(result).toEqual(data); + done(); + }); + }); + + + test('should handle backpressure', (done) => { + const result: string[] = []; + const spy = jest.spyOn(reader, "push"); + + reader.on("data", (chunk) => { + result.push(chunk); + + if (result.length === 2) { + spy.mockImplementationOnce(() => false); + setTimeout(()=>{ + reader.emit("drain"); + },50); + } + }); + + reader.on("end", () => { + expect(result).toEqual(data); + expect(spy).toHaveBeenCalledTimes(data.length + 2); + done(); + }); + + reader.read(); + }); + + test('should emit error if fetch fails', (done) => { + const faultyReader = new AbstractBatchEntityReaderStreamImplementation([], 2); + + jest.spyOn(faultyReader as any, "fetch").mockImplementation(() => { + return Promise.reject(new Error("Simulated fetch error")); + }); + + faultyReader.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect(err.message).toBe("Simulated fetch error"); + done(); + }); + + faultyReader.read(); + }); + + + test('should emit all data even if multiple reads are made', (done) => { + const result: string[] = []; + + const processNextBatch = () => { + let chunk; + while ((chunk = reader.read()) !== null) { + result.push(chunk); + } + if (result.length === 6) { + expect(result).toEqual(data); + done(); + } + }; + + reader.on("readable", processNextBatch); + }); + + test('should handle with slow fetch', (done) => { + const result: string[] = []; + + jest.spyOn(reader as any, "fetch").mockImplementation(async(size) => { + reader._read(size as number); + await new Promise((resolve) => setTimeout(resolve, 50)); + return chunks.splice(0, size as number); + }); + + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("end", () => { + expect(result).toEqual(data); + done(); + }); + }); +}); diff --git a/test/common/classes/AbstractBatchEntityWriterStream.test.ts b/test/common/classes/AbstractBatchEntityWriterStream.test.ts new file mode 100644 index 0000000..5b2c835 --- /dev/null +++ b/test/common/classes/AbstractBatchEntityWriterStream.test.ts @@ -0,0 +1,124 @@ +import { AbstractBatchEntityWriterStream } from "../../../src/common/index"; + +describe("AbstractBatchEntityWriterStream", () => { + class AbstractBatchEntityWriterStreamImplementation extends AbstractBatchEntityWriterStream { + protected async batchWrite(chunk: number[]): Promise { + return new Promise((resolve) => setTimeout(resolve, 10)); + } + } + + let writer: AbstractBatchEntityWriterStreamImplementation; + let chunks: Array; + + beforeEach(() => { + writer = new AbstractBatchEntityWriterStreamImplementation({ batchSize: 3 }); + }); + + test('should accumulate data in buffer when writing chunks', async () => { + expect(writer['buffer']).toHaveLength(0); + + await writer.write(1); + await writer.write(2); + + expect(writer['buffer']).toHaveLength(2); + }); + + test('should automatically flush the buffer when batchSize is reached', async () => { + const flushSpy = jest.spyOn(writer as any, '_flush'); + + await writer.write(1); + await writer.write(2); + await writer.write(3); + + expect(flushSpy).toHaveBeenCalledTimes(1); + }); + + test('should flush remaining data in buffer on final', (done) => { + const finalSpy = jest.spyOn(writer as any, '_final'); + const flushSpy = jest.spyOn(writer as any, '_flush'); + + writer.once("finish", () => { + expect(writer['buffer']).toHaveLength(0); + expect(flushSpy).toHaveBeenCalledTimes(2); + expect(finalSpy).toHaveBeenCalledTimes(1); + done() + }) + + writer.write(1); + writer.write(2); + writer.write(3); + writer.write(4); + writer.end(); + + }); + + test('should not call batchWrite when there is no remaining data in buffer', (done) => { + const finalSpy = jest.spyOn(writer as any, '_final'); + const flushSpy = jest.spyOn(writer as any, '_flush'); + const batchWriteSpy = jest.spyOn(writer as any, 'batchWrite'); + + writer.once("finish", () => { + expect(writer['buffer']).toHaveLength(0); + expect(flushSpy).toHaveBeenCalledTimes(2); + expect(batchWriteSpy).toHaveBeenCalledTimes(1); + expect(finalSpy).toHaveBeenCalledTimes(1); + done() + }) + + writer.write(1); + writer.write(2); + writer.write(3); + writer.end(); + + }); + + + test('should handle errors in batchWrite correctly', (done) => { + const errorWriter = new AbstractBatchEntityWriterStreamImplementation({ + batchSize: 1 + }); + errorWriter['batchWrite'] = jest.fn(() => Promise.reject(new Error('Write failed'))); + + errorWriter.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect(err.message).toBe('Write failed'); + done(); + }); + + errorWriter.write(1); + }); + + test('should handle errors on final in batchWrite correctly', (done) => { + const errorWriter = new AbstractBatchEntityWriterStreamImplementation({ + batchSize: 2 + }); + errorWriter['batchWrite'] = jest.fn(() => Promise.reject(new Error('Write failed'))); + + errorWriter.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect(err.message).toBe('Write failed'); + done(); + }); + + errorWriter.write(1); + errorWriter.end(); + }); + + test('should handle high load and concurrent writes correctly', async () => { + const startTime = Date.now(); + const numWrites = 1000; + const writePromises : boolean[] = []; + + for (let i = 0; i < numWrites; i++) { + writePromises.push(writer.write(i)); + } + + await Promise.all(writePromises); + + const endTime = Date.now(); + const duration = endTime - startTime; + + expect(duration).toBeLessThan(5000); + expect(writer['buffer']).toHaveLength(0); + }); +}); diff --git a/test/global-setup.ts b/test/global-setup.ts new file mode 100644 index 0000000..5f26b7d --- /dev/null +++ b/test/global-setup.ts @@ -0,0 +1,5 @@ +module.exports = () => { + if (process.env.CI !== "true") { + require("dotenv").config(); + } +}; diff --git a/test/sqlite/classes/SqliteBatchEntityReader.test.ts b/test/sqlite/classes/SqliteBatchEntityReader.test.ts new file mode 100644 index 0000000..ebe4851 --- /dev/null +++ b/test/sqlite/classes/SqliteBatchEntityReader.test.ts @@ -0,0 +1,115 @@ +import { UserBatchReader } from "../mocks/UserBatchReader"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +describe("SqliteBatchEntityReader", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown().then(() => done()); + }); + + beforeEach((done) => { + UserDatabase.setup() + .then(() => UserDatabase.mockData(data)) + .then(() => done()); + }); + + afterEach((done) => { + UserDatabase.teardown().then(() => done()); + }); + + test("should read users in batches", (done) => { + const reader = new UserBatchReader({ + batchSize: 2, + }); + + const result: UserDTO[] = []; + + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("error", (error) => { + fail(error); + }); + + reader.on("end", () => { + expect(result).toEqual(data); + done(); + }); + + reader.read(); + }); + + test("should handle empty result", (done) => { + const reader = new UserBatchReader({ + batchSize: 2, + query: "SELECT * FROM users WHERE 1=0", + }); + + const result: UserDTO[] = []; + + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("end", () => { + expect(result).toEqual([]); + done(); + }); + + reader.read(); + }); + + test("should destroy the connection properly", (done) => { + const reader = new UserBatchReader({ + batchSize: 2, + }); + + const spyFinalizeStatement = jest.spyOn(reader as any, "finalizeStatement"); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + fail(error); + }); + + reader.on("close", () => { + expect(reader["dbConnection"]).toBeNull(); + expect(spyFinalizeStatement).toHaveBeenCalled(); + done(); + }); + + reader.read(); + }); + + test("should handle error on close", (done) => { + const reader = new UserBatchReader({ + batchSize: 2, + }); + + jest.spyOn(reader as any, "finalizeStatement").mockImplementation(() => { + return Promise.reject(new Error("Error on close")); + }); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + expect(error).toBeInstanceOf(Error); + expect(error.message).toBe("Error on close"); + done(); + }); + + reader.read(); + }); +}); diff --git a/test/sqlite/classes/SqliteBatchEntityWriter.test.ts b/test/sqlite/classes/SqliteBatchEntityWriter.test.ts new file mode 100644 index 0000000..4675b47 --- /dev/null +++ b/test/sqlite/classes/SqliteBatchEntityWriter.test.ts @@ -0,0 +1,59 @@ +import { UserBatchWriter } from "../mocks/UserBatchWriter"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +describe("SqliteBatchEntityWriter", () => { + beforeAll((done) => { + UserDatabase.teardown().then(() => done()); + }); + + beforeEach((done) => { + UserDatabase.setup().then(() => done()); + }); + + afterEach((done) => { + UserDatabase.teardown().then(() => done()); + }); + + test("should save entities in database", (done) => { + const writer = new UserBatchWriter({ + batchSize: 3, + }); + + writer.once("finish", () => { + UserDatabase.db + .then((db) => db.all("SELECT * FROM users")) + .then((rows) => { + expect(rows).toEqual([ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]); + }).finally(()=>done()); + }); + + writer.write(new UserDTO(1, "Alice")); + writer.write(new UserDTO(2, "Bob")); + writer.write(new UserDTO(3, "Charlie")); + writer.end(); + }); + + test("should rollback on error (Duplicated ID)", (done) => { + const writer = new UserBatchWriter({ + batchSize: 3, + }); + + writer.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + UserDatabase.db + .then((db) => db.all("SELECT * FROM users")) + .then((rows) => { + expect(rows).toEqual([]); + }).finally(()=>done()); + }); + + writer.write(new UserDTO(1, "Alice")); + writer.write(new UserDTO(1, "DUPLICATED ID")); + writer.end(); + }); +}); \ No newline at end of file diff --git a/test/sqlite/mocks/UserBatchReader.ts b/test/sqlite/mocks/UserBatchReader.ts new file mode 100644 index 0000000..cdf8e80 --- /dev/null +++ b/test/sqlite/mocks/UserBatchReader.ts @@ -0,0 +1,19 @@ +import sqlite3 from "sqlite3"; +import {open} from "sqlite"; +import { SqliteBatchEntityReader } from "../../../src/sqlite"; +import { UserDTO } from "./UserDTO"; +import { DB } from "../utils/db"; + +export class UserBatchReader extends SqliteBatchEntityReader { + constructor(options:{batchSize:number,query?:string}) { + super({ + batchSize: options.batchSize, + dbConnectionFactory: () => { return open({filename: DB.dbPath, driver: sqlite3.Database});}, + query: options.query || "SELECT id, username FROM users" + }); + } + + protected rowToEntity(row: unknown): UserDTO { + return row as UserDTO; + } +} \ No newline at end of file diff --git a/test/sqlite/mocks/UserBatchWriter.ts b/test/sqlite/mocks/UserBatchWriter.ts new file mode 100644 index 0000000..8373aa6 --- /dev/null +++ b/test/sqlite/mocks/UserBatchWriter.ts @@ -0,0 +1,18 @@ +import sqlite3 from "sqlite3"; +import sqlite, {open} from "sqlite"; +import { SqliteBatchEntityWriter } from "../../../src/sqlite"; +import { UserDTO } from "./UserDTO"; +import { DB } from "../utils/db"; + +export class UserBatchWriter extends SqliteBatchEntityWriter { + constructor(options:{batchSize:number}){ + super({ + batchSize: options.batchSize, + dbConnectionFactory: () => { return open({filename: DB.dbPath, driver: sqlite3.Database});}, + prepareStatement: "INSERT INTO users (id, username) VALUES (@id, @username)" + }); + } + protected saveEntity(entity: UserDTO, stmt: sqlite.Statement): Promise { + return stmt.all({'@id': entity.id, '@username': entity.username}); + } +} \ No newline at end of file diff --git a/test/sqlite/mocks/UserDTO.ts b/test/sqlite/mocks/UserDTO.ts new file mode 100644 index 0000000..40ed3ec --- /dev/null +++ b/test/sqlite/mocks/UserDTO.ts @@ -0,0 +1,8 @@ +export class UserDTO { + id: number; + username: string; + constructor(id: number, username: string) { + this.id = id; + this.username = username; + } +} \ No newline at end of file diff --git a/test/sqlite/mocks/UserDatabase.ts b/test/sqlite/mocks/UserDatabase.ts new file mode 100644 index 0000000..1e913e9 --- /dev/null +++ b/test/sqlite/mocks/UserDatabase.ts @@ -0,0 +1,33 @@ +import sqlite3 from "sqlite3"; +import {open} from "sqlite"; +import { DB } from "../utils/db"; +import { UserDTO } from "./UserDTO"; + +export class UserDatabase{ + static readonly db = open({filename: DB.dbPath, driver: sqlite3.Database}); + + static async setup(): Promise { + const db = await UserDatabase.db; + return db.exec( + `CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + username TEXT NOT NULL + )`) + .then(() => db.exec(`DELETE FROM users`)); + } + + static async mockData(data:UserDTO[]): Promise { + const db = await UserDatabase.db; + return Promise.all( + data.map((user) => { + return db.exec(`INSERT INTO users (id, username) VALUES (${user.id}, '${user.username}')`); + }) + ); + } + + + static async teardown(): Promise { + const db = await UserDatabase.db; + return db.exec("DROP TABLE IF EXISTS users"); + } +} \ No newline at end of file diff --git a/test/sqlite/tmp/.gitignore b/test/sqlite/tmp/.gitignore new file mode 100644 index 0000000..d6b7ef3 --- /dev/null +++ b/test/sqlite/tmp/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/test/sqlite/utils/db.ts b/test/sqlite/utils/db.ts new file mode 100644 index 0000000..92ca6bf --- /dev/null +++ b/test/sqlite/utils/db.ts @@ -0,0 +1,6 @@ +import path from "path"; + +export class DB{ + static readonly folder = path.resolve(__dirname, "../tmp"); + static readonly dbPath = path.join(DB.folder, "test.db"); +} \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index 614944d..7001a93 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,7 +1,8 @@ { "compilerOptions": { "target": "ES2020", - "module": "ESNext", + "module": "NodeNext", + "moduleResolution": "NodeNext", "declaration": true, "outDir": "./dist/esm", "declarationDir": "./dist/@types", From 2f87f4b8a95ab3c00a288ba947ec8f2a7dfe9ca3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 22:14:40 +0100 Subject: [PATCH 2/8] feat: add postgreSQL --- docs/postgresql-api.md | 117 +++++++++++ .../classes/PostgresBatchEntityReader.ts | 122 ++++++++++++ .../classes/PostgresBatchEntityWriter.ts | 66 +++++++ src/postgresql/classes/_index.ts | 2 + src/postgresql/index.ts | 1 + .../classes/PostgresBatchEntityReader.test.ts | 127 ++++++++++++ .../classes/PostgresBatchEntityWriter.test.ts | 71 +++++++ test/postgresql/mocks/UserBatchReader.ts | 16 ++ test/postgresql/mocks/UserBatchWriter.ts | 19 ++ test/postgresql/mocks/UserDTO.ts | 8 + test/postgresql/mocks/UserDatabase.ts | 183 ++++++++++++++++++ 11 files changed, 732 insertions(+) create mode 100644 docs/postgresql-api.md create mode 100644 src/postgresql/classes/PostgresBatchEntityReader.ts create mode 100644 src/postgresql/classes/PostgresBatchEntityWriter.ts create mode 100644 src/postgresql/classes/_index.ts create mode 100644 src/postgresql/index.ts create mode 100644 test/postgresql/classes/PostgresBatchEntityReader.test.ts create mode 100644 test/postgresql/classes/PostgresBatchEntityWriter.test.ts create mode 100644 test/postgresql/mocks/UserBatchReader.ts create mode 100644 test/postgresql/mocks/UserBatchWriter.ts create mode 100644 test/postgresql/mocks/UserDTO.ts create mode 100644 test/postgresql/mocks/UserDatabase.ts diff --git a/docs/postgresql-api.md b/docs/postgresql-api.md new file mode 100644 index 0000000..1719d8f --- /dev/null +++ b/docs/postgresql-api.md @@ -0,0 +1,117 @@ +# Common API + +In this documentation, we will focus on the common API. This module includes the core of BatchJS. This API allows you to create your own custom jobs and steps using the common interface. + +------------ + + + +## Table of Contents + + - [PostgresBatchEntityReader](#postgresbatchentityreader) + - [PostgresBatchEntityWriter](#postgresbatchentitywriter) + +## PostgresBatchEntityReader + +`extends AbstractBatchEntityReaderStream` + +Class that reads data in batches of a specified size using PostgreSQL cursors. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the PostgresBatchEntityReader. | PostgresBatchEntityReaderOptions | + | **options.pool** | The PostgreSQL connection pool. | Pool | + | **options.query** | SQL query to be executed (without LIMIT and OFFSET). | string | + + + +### fetch (function) + +`protected` + +Fetches a batch of data using a PostgreSQL cursor. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **size** | The size of the batch to fetch. | number | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<BatchData.<T>> | A promise that resolves with the batch of data. | + + +### initializeCursor (function) + +`private` + +Initializes the cursor for the query if not already initialized. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<{cursorName:string, client:PoolClient}> | A promise that resolves when the cursor is initialized. | + + +### closeCursor (function) + +`private` + +Closes the cursor and releases the client connection. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the cursor is closed. | + + +### _destroy (function) + + + +Destroys the reader by closing the cursor and releasing resources. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **error** | The error that caused the destruction. | Error, null | + | **callback** | The callback function to be executed after destruction. | function | + + +## PostgresBatchEntityWriter + +`extends AbstractBatchEntityWriterStream` + +Class that writes data in batches of a specified size in PostgreSQL databases. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the PostgresBatchEntityWriter. | PostgresBatchEntityWriterOptions | + | **options.pool** | The PostgreSQL connection pool. | Pool | + + + +### batchWrite (function) + +`protected` + +Writes a batch of data to the storage. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **chunk** | The batch of data to write to the storage. | BatchData.<T> | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | | + + diff --git a/src/postgresql/classes/PostgresBatchEntityReader.ts b/src/postgresql/classes/PostgresBatchEntityReader.ts new file mode 100644 index 0000000..1f39f54 --- /dev/null +++ b/src/postgresql/classes/PostgresBatchEntityReader.ts @@ -0,0 +1,122 @@ +import { Pool, PoolClient } from "pg"; +import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; +import { BatchData } from "batchjs/streams"; + +/** + * @interface + * Options for the PostgresBatchEntityReader. + * @extends AbstractBatchEntityReaderStreamOptions + */ +export interface PostgresBatchEntityReaderOptions extends AbstractBatchEntityReaderStreamOptions { + pool: Pool; // The PostgreSQL connection pool + query: string; // SQL query (without LIMIT and OFFSET) +} + +/** + * @class + * Class that reads data in batches of a specified size using PostgreSQL cursors. + * @extends AbstractBatchEntityReaderStream + * @template T + */ +export abstract class PostgresBatchEntityReader extends AbstractBatchEntityReaderStream { + private readonly pool: Pool; + private readonly query: string; + private client: PoolClient | null = null; + private cursorName: string | null = null; + private cursorOpened: boolean = false; + + /** + * @constructor + * @param {PostgresBatchEntityReaderOptions} options - The options for the PostgresBatchEntityReader. + * @param [options.pool] {Pool} - The PostgreSQL connection pool. + * @param [options.query] {string} - SQL query to be executed (without LIMIT and OFFSET). + */ + constructor(options: PostgresBatchEntityReaderOptions) { + super(options); + this.pool = options.pool; + this.query = options.query; + } + + /** + * Fetches a batch of data using a PostgreSQL cursor. + * + * @protected + * @param {number} size - The size of the batch to fetch. + * @returns {Promise>} A promise that resolves with the batch of data. + */ + protected fetch(size: number): Promise> { + return this.initializeCursor() + .then(({cursorName,client})=>client.query({ + text: `FETCH ${size} FROM ${cursorName};`, + })) + .then((result) => result.rows.map(this.rowToEntity)); + } + + /** + * Initializes the cursor for the query if not already initialized. + * + * @private + * @returns {Promise<{cursorName:string,client:PoolClient}>} A promise that resolves when the cursor is initialized. + */ + private async initializeCursor(): Promise<{cursorName:string,client:PoolClient}> { + if (!this.cursorOpened ) { + this.cursorName = `cursor_${Date.now()}`; + return this.pool.connect() + .then((client) => { + this.client = client; + return this.client.query("BEGIN;"); + }) + .then(() => (this.client as PoolClient).query(`DECLARE ${this.cursorName} CURSOR FOR ${this.query};`)) + .then(() => { + this.cursorOpened = true; + }) + .then(() => ({cursorName:this.cursorName as string,client:this.client as PoolClient})); + } + return Promise.resolve({cursorName:this.cursorName as string,client:this.client as PoolClient}); + } + + /** + * Closes the cursor and releases the client connection. + * + * @private + * @returns {Promise} A promise that resolves when the cursor is closed. + */ + private async closeCursor(): Promise { + if (this.cursorOpened && this.cursorName && this.client) { + try { + await this.client.query(`CLOSE ${this.cursorName};`); + await this.client.query("COMMIT;"); + } finally { + this.client.release(); + this.client = null; + this.cursorName = null; + this.cursorOpened = false; + } + } + } + + /** + * Destroys the reader by closing the cursor and releasing resources. + * + * @param {Error|null} error - The error that caused the destruction. + * @param {Function} callback - The callback function to be executed after destruction. + */ + _destroy(error: Error | null, callback: (error?: Error | null) => void): void { + let destroyError = error; + + this.closeCursor() + .catch((error) => {destroyError = error;}) + .finally(() => super._destroy(destroyError, callback)); + } + + /** + * Abstract method to convert a row to an entity. + * This method must be implemented by subclasses. + * + * @abstract + * @protected + * @param {unknown} row - A row from the database. + * @returns {T} The entity corresponding to the row. + */ + protected abstract rowToEntity(row: unknown): T; +} diff --git a/src/postgresql/classes/PostgresBatchEntityWriter.ts b/src/postgresql/classes/PostgresBatchEntityWriter.ts new file mode 100644 index 0000000..36f1190 --- /dev/null +++ b/src/postgresql/classes/PostgresBatchEntityWriter.ts @@ -0,0 +1,66 @@ +import { Pool, PoolClient } from "pg"; +import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common/index"; +import { BatchData } from "batchjs/streams"; + +/** + * @interface + * Options for the PostgresBatchEntityWriter. + * @extends AbstractBatchEntityWriterStreamOptions + */ +export interface PostgresBatchEntityWriterOptions extends AbstractBatchEntityWriterStreamOptions { + pool: Pool; +} + +/** + * @class + * Class that writes data in batches of a specified size in PostgreSQL databases. + * @extends AbstractBatchEntityWriterStream + * @template T + */ +export abstract class PostgresBatchEntityWriter extends AbstractBatchEntityWriterStream { + private readonly pool: Pool; + + /** + * @constructor + * @param {PostgresBatchEntityWriterOptions} options - The options for the PostgresBatchEntityWriter. + * @param [options.pool] {Pool} - The PostgreSQL connection pool. + */ + constructor(options: PostgresBatchEntityWriterOptions) { + super(options); + this.pool = options.pool; + } + + /** + * Writes a batch of data to the storage. + * + * @protected + * @param {BatchData} chunk - The batch of data to write to the storage. + * @returns {Promise} + */ + protected async batchWrite(chunk: BatchData): Promise { + const client: PoolClient = await this.pool.connect(); + try { + await client.query("BEGIN"); + for (const entity of chunk) { + await this.saveEntity(entity, client); + } + await client.query("COMMIT"); + } catch (error) { + await client.query("ROLLBACK"); + return Promise.reject(error as Error); + } finally { + client.release(); + } + } + + /** + * Save or update an entity in the database. + * + * @protected + * @abstract + * @param {T} entity - Entity to be saved or updated. + * @param {PoolClient} client - The database client. + * @returns {Promise} + */ + protected abstract saveEntity(entity: T, client: PoolClient): Promise; +} diff --git a/src/postgresql/classes/_index.ts b/src/postgresql/classes/_index.ts new file mode 100644 index 0000000..4ba7ce5 --- /dev/null +++ b/src/postgresql/classes/_index.ts @@ -0,0 +1,2 @@ +export * from "./PostgresBatchEntityReader"; +export * from "./PostgresBatchEntityWriter"; \ No newline at end of file diff --git a/src/postgresql/index.ts b/src/postgresql/index.ts new file mode 100644 index 0000000..e2a1bf3 --- /dev/null +++ b/src/postgresql/index.ts @@ -0,0 +1 @@ +export * from "./classes/_index"; \ No newline at end of file diff --git a/test/postgresql/classes/PostgresBatchEntityReader.test.ts b/test/postgresql/classes/PostgresBatchEntityReader.test.ts new file mode 100644 index 0000000..45d0ace --- /dev/null +++ b/test/postgresql/classes/PostgresBatchEntityReader.test.ts @@ -0,0 +1,127 @@ +import { UserBatchReader } from "../mocks/UserBatchReader"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +jest.mock("pg", () => { + if (process.env.CI === "true") { + console.info("Setup mocked PostgreSQL database"); + return { Pool: jest.fn(() => UserDatabase.mPool) }; + } + console.info("Setup real PostgreSQL database"); + return jest.requireActual("pg"); +}); + +describe("PostgresBatchEntityReader", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + beforeEach((done) => { + UserDatabase.setup() + .then(() => done()) + .catch((error) => done(error)); + }); + + afterEach((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + test("should read users in batches", (done) => { + UserDatabase.load(Object.assign([], data)).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const result: UserDTO[] = []; + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("end", () => { + expect(result).toEqual(data); + done(); + }); + + reader.read(); + }); + }); + + test("should handle empty result", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const result: UserDTO[] = []; + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("end", () => { + expect(result).toEqual([]); + done(); + }); + + reader.read(); + }); + }); + + test("should destroy the connection properly", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const spyCloseCursor = jest.spyOn(reader as any, "closeCursor"); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("close", () => { + expect(reader["client"]).toBeNull(); + expect(reader["cursorName"]).toBeNull(); + expect(reader["cursorOpened"]).toBeFalsy(); + expect(spyCloseCursor).toHaveBeenCalled(); + done(); + }); + + reader.read(); + }); + }); + + test("should handle error on close", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + jest.spyOn(reader as any, "closeCursor").mockImplementation(() => { + return Promise.reject(new Error("Error on close")); + }); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + expect(error).toBeInstanceOf(Error); + expect(error.message).toBe("Error on close"); + done(); + }); + + reader.read(); + }); + }); +}); diff --git a/test/postgresql/classes/PostgresBatchEntityWriter.test.ts b/test/postgresql/classes/PostgresBatchEntityWriter.test.ts new file mode 100644 index 0000000..c8f3a55 --- /dev/null +++ b/test/postgresql/classes/PostgresBatchEntityWriter.test.ts @@ -0,0 +1,71 @@ +import { UserBatchWriter } from "../mocks/UserBatchWriter"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +jest.mock("pg", () => { + if (process.env.CI === "true") { + console.info("Setup mocked PostgreSQL database"); + return { Pool: jest.fn(() => UserDatabase.mPool) }; + } + console.info("Setup real PostgreSQL database"); + return jest.requireActual("pg"); +}); + +describe("PostgresBatchEntityWriter", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + beforeEach((done) => { + UserDatabase.setup() + .then(() => done()) + .catch((error) => done(error)); + }); + + afterEach((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + test("should save entities in database", (done) => { + const writer = new UserBatchWriter({ + batchSize: data.length, + }); + + writer.once("finish", () => { + UserDatabase.fetch().then((results) => { + expect(results).toHaveLength(data.length); + expect(results).toEqual(data); + done(); + }); + }); + + data.forEach((user) => writer.write(user)); + writer.end(); + }); + + test("should rollback on error", (done) => { + const writer = new UserBatchWriter({ + batchSize: 3, + }); + + writer.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + done(); + }); + + writer.write(new UserDTO(1, "Alice")); + writer.write(new UserDTO(1, "DUPLICATED ID")); + writer.end(); + }); +}); diff --git a/test/postgresql/mocks/UserBatchReader.ts b/test/postgresql/mocks/UserBatchReader.ts new file mode 100644 index 0000000..e5ac182 --- /dev/null +++ b/test/postgresql/mocks/UserBatchReader.ts @@ -0,0 +1,16 @@ +import { PostgresBatchEntityReader } from "../../../src/postgresql"; +import { UserDatabase } from "./UserDatabase"; +import { UserDTO } from "./UserDTO"; + + export class UserBatchReader extends PostgresBatchEntityReader { + constructor(options:{batchSize:number,query?:string}) { + super({ + batchSize: options.batchSize, + pool: UserDatabase.getPool(), + query: options.query || "SELECT id, username FROM users" + }); + } + protected rowToEntity(row: unknown): UserDTO { + return row as UserDTO; + } + } \ No newline at end of file diff --git a/test/postgresql/mocks/UserBatchWriter.ts b/test/postgresql/mocks/UserBatchWriter.ts new file mode 100644 index 0000000..7704512 --- /dev/null +++ b/test/postgresql/mocks/UserBatchWriter.ts @@ -0,0 +1,19 @@ +import { PoolClient } from "pg"; +import { PostgresBatchEntityWriter } from "../../../src/postgresql"; +import { UserDTO } from "./UserDTO"; +import { UserDatabase } from "./UserDatabase"; + +export class UserBatchWriter extends PostgresBatchEntityWriter { + constructor(options: { batchSize: number }) { + super({ + batchSize: options.batchSize, + pool: UserDatabase.getPool(), + }); + } + protected saveEntity(entity: UserDTO, client: PoolClient): Promise { + return client.query({ + text: "INSERT INTO users (id, username) VALUES ($1, $2)", + values: [entity.id, entity.username], + }).then(() => {}); + } +} diff --git a/test/postgresql/mocks/UserDTO.ts b/test/postgresql/mocks/UserDTO.ts new file mode 100644 index 0000000..40ed3ec --- /dev/null +++ b/test/postgresql/mocks/UserDTO.ts @@ -0,0 +1,8 @@ +export class UserDTO { + id: number; + username: string; + constructor(id: number, username: string) { + this.id = id; + this.username = username; + } +} \ No newline at end of file diff --git a/test/postgresql/mocks/UserDatabase.ts b/test/postgresql/mocks/UserDatabase.ts new file mode 100644 index 0000000..98d50ec --- /dev/null +++ b/test/postgresql/mocks/UserDatabase.ts @@ -0,0 +1,183 @@ +import { ClientConfig, Pool } from "pg"; +import { UserDTO } from "./UserDTO"; + +export class UserDatabase { + private static data: UserDTO[] = []; + private static pool: Pool | null = null; + + static readonly mPoolClient = { + query: jest.fn(), + release: jest.fn(), + }; + + static readonly mPool = { + query: jest.fn(), + connect: jest.fn(() => Promise.resolve(UserDatabase.mPoolClient)), + end: jest.fn(), + }; + + private static getConnectionOptions(): ClientConfig { + if (process.env.CI !== "true") { + return { + user: process.env.POSTGRESQL_USER, + password: process.env.POSTGRESQL_PASSWORD, + host: process.env.POSTGRESQL_HOST, + port: parseInt(process.env.POSTGRESQL_PORT as string), + database: process.env.POSTGRESQL_DATABASE, + }; + } + return {}; + } + + static getPool(): Pool { + if (!UserDatabase.pool) { + UserDatabase.pool = new Pool(UserDatabase.getConnectionOptions()); + } + return UserDatabase.pool; + } + + static mockWriter() { + const idSet: Set = new Set(); + + UserDatabase.mPoolClient.query.mockImplementation( + (query: { text: string; values: any[] } | string) => { + if (typeof query === "string") { + return Promise.resolve({}); + } + + if (query.text === "INSERT INTO users (id, username) VALUES ($1, $2)") { + const id = query.values[0] as number; + const username = query.values[1] as string; + + if (idSet.has(id)) { + return Promise.reject(new Error("Duplicated ID")); + } + idSet.add(id); + + UserDatabase.data.push(new UserDTO(id, username)); + return Promise.resolve({}); + } + + return Promise.resolve({}); + } + ); + } + + static load(data: UserDTO[]): Promise { + if (process.env.CI !== "true") { + return UserDatabase.getPool() + .connect() + .then((client) => { + return client + .query("BEGIN") + .then(() => + Promise.all( + data + .map((user) => [user.id, user.username]) + .map((row) => + client.query( + `INSERT INTO users (id, username) VALUES ($1, $2)`, + row + ) + ) + ) + ) + .then(() => {client.query("COMMIT")}) + .finally(() => client.release()); + }); + } + + UserDatabase.mPoolClient.query.mockImplementation((query: any) => { + if (typeof query === "string") { + return Promise.resolve({ rows: [] }); + } + + const fetchMatch = query.text.match(/FETCH (\d+) FROM/); + if (fetchMatch) { + const fetchSize = parseInt(fetchMatch[1], 10); + return Promise.resolve({ rows: data.splice(0, fetchSize) }); + } + + return Promise.resolve({ rows: [] }); + }); + return Promise.resolve(); + } + + static async fetch(): Promise { + if (process.env.CI !== "true") { + return UserDatabase.getPool() + .connect() + .then((connection) => { + return connection + .query("SELECT * FROM users") + .finally(() => connection.release()); + }) + .then(({ rows }) => { + return rows.map((row: unknown) => { + return row as UserDTO; + }); + }); + } + return Promise.resolve(UserDatabase.data); + } + + static async setup(): Promise { + if (process.env.CI === "true") { + UserDatabase.data = []; + UserDatabase.mockWriter(); + return Promise.resolve(); + } + + return UserDatabase.getPool() + .connect() + .then((connection) => { + return connection + .query("BEGIN") + .then(() => + connection.query( + `CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + username TEXT NOT NULL + )` + ) + ) + .then(() => { + connection.query(`DELETE FROM users`); + }) + .then(() => { + connection.query("COMMIT"); + }) + .catch(() => { + connection.query("ROLLBACK"); + }) + .finally(() => connection.release()); + }); + } + + static async teardown(): Promise { + if (process.env.CI === "true") { + UserDatabase.data = []; + UserDatabase.pool = null; + return Promise.resolve(); + } + + return UserDatabase.getPool() + .connect() + .then((connection) => { + return connection + .query("BEGIN") + .then(() => connection.query(`DROP TABLE IF EXISTS users`)) + .then(() => { + connection.query("COMMIT"); + }) + .catch(() => { + connection.query("ROLLBACK"); + }) + .finally(() => { + connection.release(); + UserDatabase.pool?.end(); + UserDatabase.pool = null; + }); + }); + } +} From 4d0719beecfb016ef13ba73107cbdc7756c4c3f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 22:15:23 +0100 Subject: [PATCH 3/8] feat: add MySQL --- docs/mysql-api.md | 124 +++++++++++++++ src/mysql/classes/MysqlBatchEntityReader.ts | 107 +++++++++++++ src/mysql/classes/MysqlBatchEntityWriter.ts | 70 +++++++++ src/mysql/classes/_index.ts | 2 + src/mysql/index.ts | 1 + .../classes/MysqlBatchEntityReader.test.ts | 110 +++++++++++++ .../classes/MysqlBatchEntityWriter.test.ts | 62 ++++++++ test/mysql/mocks/UserBatchReader.ts | 18 +++ test/mysql/mocks/UserBatchWriter.ts | 18 +++ test/mysql/mocks/UserDTO.ts | 10 ++ test/mysql/mocks/UserDatabase.ts | 144 ++++++++++++++++++ test/mysql/mocks/UserEntity.ts | 1 + 12 files changed, 667 insertions(+) create mode 100644 docs/mysql-api.md create mode 100644 src/mysql/classes/MysqlBatchEntityReader.ts create mode 100644 src/mysql/classes/MysqlBatchEntityWriter.ts create mode 100644 src/mysql/classes/_index.ts create mode 100644 src/mysql/index.ts create mode 100644 test/mysql/classes/MysqlBatchEntityReader.test.ts create mode 100644 test/mysql/classes/MysqlBatchEntityWriter.test.ts create mode 100644 test/mysql/mocks/UserBatchReader.ts create mode 100644 test/mysql/mocks/UserBatchWriter.ts create mode 100644 test/mysql/mocks/UserDTO.ts create mode 100644 test/mysql/mocks/UserDatabase.ts create mode 100644 test/mysql/mocks/UserEntity.ts diff --git a/docs/mysql-api.md b/docs/mysql-api.md new file mode 100644 index 0000000..ba96575 --- /dev/null +++ b/docs/mysql-api.md @@ -0,0 +1,124 @@ +# Common API + +In this documentation, we will focus on the common API. This module includes the core of BatchJS. This API allows you to create your own custom jobs and steps using the common interface. + +------------ + + + +## Table of Contents + + - [MysqlBatchEntityReader](#mysqlbatchentityreader) + - [MysqlBatchEntityWriter](#mysqlbatchentitywriter) + +## MysqlBatchEntityReader + +`extends AbstractBatchEntityReaderStream` + +Class that reads data in batches of a specified size from a MySQL database. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the MysqlBatchEntityReader. | MysqlBatchEntityReaderOptions | + | **options.pool** | The MySQL connection pool. | Pool | + | **options.query** | SQL query to be executed (without LIMIT and OFFSET). | string | + + + +### fetch (function) + +`private` + +Fetches a batch of data from the database. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **size** | The size of the batch to fetch. | number | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<BatchData.<T>> | A promise that resolves with the batch of data. | + + +### _destroy (function) + +`private` + +Destroys the writer by closing the database connection. This method should be called when the +writer is no longer needed to free up resources. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **error** | The error that caused the destruction. | Error, null | + | **callback** | The callback function to be executed after destroying the reader. | ReadCallback | + + +### connectDatabase (function) + +`private` + +Connects to the database by creating a new database connection if none +already exists, or by reusing an existing connection. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<PoolConnection> | A promise that resolves with the database connection. | + + +### disconnectDatabase (function) + + + +Disconnects from the database by releasing the active database connection +and setting the client reference to null. This method should be called +when the reader is no longer needed to free up resources. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the database connection +is successfully released. | + + +## MysqlBatchEntityWriter + +`extends AbstractBatchEntityWriterStream` + +Class that writes data in batches of a specified size into a MySQL database. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the MysqlBatchEntityWriter. | MysqlBatchEntityWriterOptions | + | **options.pool** | The MySQL connection pool. | Pool | + | **options.prepareStatement** | Insert SQL prepared statement to be executed. | String | + + + +### batchWrite (function) + +`protected` + +Writes a batch of data to the storage. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **chunk** | The batch of data to write to the storage. | BatchData.<T> | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the batch is successfully written. +The promise should be rejected if there is an error during writing. | + + diff --git a/src/mysql/classes/MysqlBatchEntityReader.ts b/src/mysql/classes/MysqlBatchEntityReader.ts new file mode 100644 index 0000000..6def77a --- /dev/null +++ b/src/mysql/classes/MysqlBatchEntityReader.ts @@ -0,0 +1,107 @@ +import { Pool, PoolConnection, RowDataPacket } from "mysql2/promise"; +import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; +import { BatchData, ReadCallback } from "batchjs/streams"; + +/** + * @interface + * Options for the MysqlBatchEntityReader. + * @extends AbstractBatchEntityReaderStreamOptions + */ +export interface MysqlBatchEntityReaderOptions extends AbstractBatchEntityReaderStreamOptions { + pool: Pool; + query: string; +} + +/** + * @class + * Class that reads data in batches of a specified size from a MySQL database. + * @extends AbstractBatchEntityReaderStream + * @template T input chunk + * @template E row entity + */ +export abstract class MysqlBatchEntityReader extends AbstractBatchEntityReaderStream { + private readonly pool: Pool; + private readonly query: string; + private client: PoolConnection | null = null; + private entitiesRead: number = 0; + + /** + * @constructor + * @param {MysqlBatchEntityReaderOptions} options - The options for the MysqlBatchEntityReader. + * @param [options.pool] {Pool} - The MySQL connection pool. + * @param [options.query] {string} - SQL query to be executed (without LIMIT and OFFSET). + */ + constructor(options: MysqlBatchEntityReaderOptions) { + super(options); + this.pool = options.pool; + this.query = options.query; + } + + /** + * Fetches a batch of data from the database. + * + * @private + * @param {number} size - The size of the batch to fetch. + * @returns {Promise>} A promise that resolves with the batch of data. + */ + protected async fetch(size: number): Promise> { + return this.connectDatabase() + .then((client) => client.query(`${this.query} LIMIT ${size} OFFSET ${this.entitiesRead}`)) + .then(([results]) => { + this.entitiesRead += size; + return (results as E[]).map(this.rowToEntity); + }); + } + + + /** + * Destroys the writer by closing the database connection. This method should be called when the + * writer is no longer needed to free up resources. + * @see AbstractBatchEntityReaderStream._destroy + * @private + * @param error {Error|null} - The error that caused the destruction. + * @param callback {ReadCallback} - The callback function to be executed after destroying the reader. + */ + _destroy(error: Error | null, callback: ReadCallback): void { + let destroyError = error; + this.disconnectDatabase() + .catch((err) => (destroyError = err)) + .finally(() => super._destroy(destroyError, callback)); + } + + /** + * Connects to the database by creating a new database connection if none + * already exists, or by reusing an existing connection. + * @private + * @returns {Promise} A promise that resolves with the database connection. + */ + private async connectDatabase(): Promise { + if (!this.client) { + this.client = await this.pool.getConnection(); + } + return this.client; + } + + /** + * Disconnects from the database by releasing the active database connection + * and setting the client reference to null. This method should be called + * when the reader is no longer needed to free up resources. + * @returns {Promise} A promise that resolves when the database connection + * is successfully released. + */ + private disconnectDatabase(): Promise { + if (this.client) { + this.client.release(); + this.client = null; + } + return Promise.resolve(); + } + + /** + * Abstract method to convert a row to an entity. + * @abstract + * @param row E - The row to be converted to an entity. + * @returns T - The converted entity. + */ + protected abstract rowToEntity(row: E): T; +} \ No newline at end of file diff --git a/src/mysql/classes/MysqlBatchEntityWriter.ts b/src/mysql/classes/MysqlBatchEntityWriter.ts new file mode 100644 index 0000000..e855527 --- /dev/null +++ b/src/mysql/classes/MysqlBatchEntityWriter.ts @@ -0,0 +1,70 @@ +import { Pool, PoolConnection } from "mysql2/promise"; +import { BatchData } from "batchjs/streams"; +import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common"; + +/** + * @interface + * Options for the MysqlBatchEntityWriter. + * @extends AbstractBatchEntityWriterStreamOptions + */ +export interface MysqlBatchEntityWriterOptions extends AbstractBatchEntityWriterStreamOptions { + pool: Pool; + prepareStatement: string; +} + +/** + * @class + * Class that writes data in batches of a specified size into a MySQL database. + * @extends AbstractBatchEntityWriterStream + * @template T input chunk + * @template E row entity + */ +export abstract class MysqlBatchEntityWriter extends AbstractBatchEntityWriterStream { + private readonly pool: Pool; + private readonly prepareStatement: string; + + /** + * @constructor + * @param {MysqlBatchEntityWriterOptions} options - The options for the MysqlBatchEntityWriter. + * @param [options.pool] {Pool} - The MySQL connection pool. + * @param [options.prepareStatement] {String} - Insert SQL prepared statement to be executed. + */ + constructor(options: MysqlBatchEntityWriterOptions) { + super(options); + this.pool = options.pool; + this.prepareStatement = options.prepareStatement; + } + + /** + * Writes a batch of data to the storage. + * + * @protected + * @param {BatchData} chunk - The batch of data to write to the storage. + * @returns {Promise} A promise that resolves when the batch is successfully written. + * The promise should be rejected if there is an error during writing. + */ + protected async batchWrite(chunk: BatchData): Promise { + const client: PoolConnection = await this.pool.getConnection(); + try { + await client.beginTransaction(); + await Promise.all(chunk.map((entity) => client.query(this.prepareStatement, this.entityToRow(entity)))); + await client.commit(); + } catch (error) { + await client.rollback(); + throw error; + } finally { + client.release(); + } + } + + /** + * Abstract method to convert an entity to a row. + * This method should be implemented by subclasses to define the specific logic for writing a batch of data. + * + * @abstract + * @protected + * @param {T} entity - The entity to be converted to a row. + * @returns {E} The row to be inserted or updated. + */ + protected abstract entityToRow(entity: T): E; +} diff --git a/src/mysql/classes/_index.ts b/src/mysql/classes/_index.ts new file mode 100644 index 0000000..39db608 --- /dev/null +++ b/src/mysql/classes/_index.ts @@ -0,0 +1,2 @@ +export * from "./MysqlBatchEntityReader"; +export * from "./MysqlBatchEntityWriter"; \ No newline at end of file diff --git a/src/mysql/index.ts b/src/mysql/index.ts new file mode 100644 index 0000000..e2a1bf3 --- /dev/null +++ b/src/mysql/index.ts @@ -0,0 +1 @@ +export * from "./classes/_index"; \ No newline at end of file diff --git a/test/mysql/classes/MysqlBatchEntityReader.test.ts b/test/mysql/classes/MysqlBatchEntityReader.test.ts new file mode 100644 index 0000000..796ec97 --- /dev/null +++ b/test/mysql/classes/MysqlBatchEntityReader.test.ts @@ -0,0 +1,110 @@ +import { UserBatchReader } from "../mocks/UserBatchReader"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +jest.mock("mysql2/promise", () => { + if (process.env.CI === "true") { + console.info("Setup mocked MySQL database"); + return { createPool: jest.fn(() => UserDatabase.mPool) }; + } + console.info("Setup real MySQL database"); + return jest.requireActual("mysql2/promise"); +}); + +describe("MysqlBatchEntityReader", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown().then(() => done()).catch(done); + }); + + beforeEach((done) => { + UserDatabase.setup().then(() => done()).catch(done); + }); + + afterEach((done) => { + UserDatabase.teardown().then(() => done()).catch(done); + }); + + test("should read users in batches", (done) => { + UserDatabase.load([...data]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + const result: UserDTO[] = []; + + reader.on("data", (chunk) => result.push(chunk)); + reader.on("error", done); + reader.on("end", () => { + expect(result).toEqual(data); + done(); + }); + + reader.read(); + }); + }); + + test("should handle empty result", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + const result: UserDTO[] = []; + + reader.on("data", (chunk) => result.push(chunk)); + reader.on("error", done); + reader.on("end", () => { + expect(result).toEqual([]); + done(); + }); + + reader.read(); + }); + }); + + test("should destroy the connection properly", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const spyDisconnectDatabase = jest.spyOn( + reader as any, + "disconnectDatabase" + ); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("close", () => { + expect(reader["client"]).toBeNull(); + expect(spyDisconnectDatabase).toHaveBeenCalled(); + done(); + }); + + reader.read(); + }); + }); + + test("should handle error on close", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + jest.spyOn(reader as any, "disconnectDatabase").mockImplementation(() => { + return Promise.reject(new Error("Error on close")); + }); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + expect(error).toBeInstanceOf(Error); + expect(error.message).toBe("Error on close"); + done(); + }); + + reader.read(); + }); + }); +}); \ No newline at end of file diff --git a/test/mysql/classes/MysqlBatchEntityWriter.test.ts b/test/mysql/classes/MysqlBatchEntityWriter.test.ts new file mode 100644 index 0000000..5e86692 --- /dev/null +++ b/test/mysql/classes/MysqlBatchEntityWriter.test.ts @@ -0,0 +1,62 @@ +import { UserBatchWriter } from "../mocks/UserBatchWriter"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +jest.mock("mysql2/promise", () => { + if (process.env.CI === "true") { + console.info("Setup mocked MySQL database"); + return { createPool: jest.fn(() => UserDatabase.mPool) }; + } + console.info("Setup real MySQL database"); + return jest.requireActual("mysql2/promise"); +}); + +describe("MysqlBatchEntityWriter", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown().then(() => done()).catch(done); + }); + + beforeEach((done) => { + UserDatabase.setup().then(() => done()).catch(done); + }); + + afterEach((done) => { + UserDatabase.teardown().then(() => done()).catch(done); + }); + + test("should save entities in database", (done) => { + const writer = new UserBatchWriter({ batchSize: data.length }); + + writer.on("error", done); + writer.once("finish", () => { + UserDatabase.fetch().then((results) => { + expect(results).toHaveLength(data.length); + expect(results).toEqual(data); + done(); + }); + }); + + data.forEach((user) => writer.write(user)); + writer.end(); + }); + + test("should rollback on error", (done) => { + const writer = new UserBatchWriter({ batchSize: 3 }); + + writer.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + done(); + }); + + writer.write(new UserDTO(1, "Alice")); + writer.write(new UserDTO(1, "DUPLICATED ID")); + writer.end(); + }); +}); \ No newline at end of file diff --git a/test/mysql/mocks/UserBatchReader.ts b/test/mysql/mocks/UserBatchReader.ts new file mode 100644 index 0000000..14be5fe --- /dev/null +++ b/test/mysql/mocks/UserBatchReader.ts @@ -0,0 +1,18 @@ +import { RowDataPacket } from "mysql2"; +import { MysqlBatchEntityReader } from "../../../src/mysql"; +import { UserDTO } from "./UserDTO"; +import { UserDatabase } from "./UserDatabase"; + +export class UserBatchReader extends MysqlBatchEntityReader { + constructor(options: { batchSize: number; query?: string }) { + super({ + batchSize: options.batchSize, + pool: UserDatabase.getPool(), + query: options.query || "SELECT id, username FROM users", + }); + } + + protected rowToEntity(row: UserDTO): UserDTO { + return row; + } +} diff --git a/test/mysql/mocks/UserBatchWriter.ts b/test/mysql/mocks/UserBatchWriter.ts new file mode 100644 index 0000000..e3f6dfc --- /dev/null +++ b/test/mysql/mocks/UserBatchWriter.ts @@ -0,0 +1,18 @@ +import { MysqlBatchEntityWriter } from "../../../src/mysql"; +import { UserDTO } from "./UserDTO"; +import { UserDatabase } from "./UserDatabase"; +import { UserEntity } from "./UserEntity"; + +export class UserBatchWriter extends MysqlBatchEntityWriter { + constructor(options: { batchSize: number }) { + super({ + batchSize: options.batchSize, + pool: UserDatabase.getPool(), + prepareStatement: "INSERT INTO users (id, username) VALUES (?, ?)", + }); + } + + protected entityToRow(entity: UserDTO): UserEntity { + return [entity.id, entity.username]; + } +} diff --git a/test/mysql/mocks/UserDTO.ts b/test/mysql/mocks/UserDTO.ts new file mode 100644 index 0000000..749da1b --- /dev/null +++ b/test/mysql/mocks/UserDTO.ts @@ -0,0 +1,10 @@ +export class UserDTO { + id: number; + username: string; + + constructor(id: number, username: string) { + this.id = id; + this.username = username; + } + } + \ No newline at end of file diff --git a/test/mysql/mocks/UserDatabase.ts b/test/mysql/mocks/UserDatabase.ts new file mode 100644 index 0000000..29ae806 --- /dev/null +++ b/test/mysql/mocks/UserDatabase.ts @@ -0,0 +1,144 @@ +import { Pool, createPool } from "mysql2/promise"; +import { UserDTO } from "./UserDTO"; + +export class UserDatabase { + private static data: UserDTO[] = []; + private static pool: Pool | null = null; + + static readonly mPoolClient = { + query: jest.fn, [string, any[]]>(), + beginTransaction: jest.fn(), + commit: jest.fn(), + rollback: jest.fn(), + release: jest.fn(), + }; + + static readonly mPool = { + getConnection: jest.fn(() => Promise.resolve(UserDatabase.mPoolClient)), + }; + + private static getConnectionOptions() { + if (process.env.CI !== "true") { + return { + user: process.env.MYSQL_USER, + password: process.env.MYSQL_PASSWORD, + host: process.env.MYSQL_HOST, + port: parseInt(process.env.MYSQL_PORT as string), + database: process.env.MYSQL_DATABASE, + waitForConnections: true, + connectionLimit: 10, + }; + } + return {}; + } + + static getPool(): Pool { + if (!UserDatabase.pool) { + UserDatabase.pool = createPool(UserDatabase.getConnectionOptions()); + } + return UserDatabase.pool; + } + + static mockQuery() { + const idSet: Set = new Set(); + UserDatabase.mPoolClient.query.mockImplementation((sql: string, values: any[]) => { + if (sql.startsWith("INSERT")) { + const [id, username] = values + if (idSet.has(id)) { + return Promise.reject(new Error("Duplicated ID")); + } + idSet.add(id); + UserDatabase.data.push(new UserDTO(id, username)); + return Promise.resolve(); + } + if (sql.startsWith("SELECT")) { + const fetchMatch = sql.match(/LIMIT (\d+)/); + if(fetchMatch) { + const fetchSize = parseInt(fetchMatch[1], 10); + return Promise.resolve( [UserDatabase.data.splice(0, fetchSize)]); + } + } + return Promise.reject(new Error("Unknown query")); + }); + } + + static async load(data: UserDTO[]): Promise { + if (process.env.CI !== "true") { + if (data.length === 0) return; + const pool = UserDatabase.getPool(); + const connection = await pool.getConnection(); + + try { + await connection.beginTransaction(); + await connection.query("INSERT INTO users (id, username) VALUES ?", [ + data.map((user) => [user.id, user.username]), + ]); + await connection.commit(); + } catch (error) { + await connection.rollback(); + throw error; + } finally { + connection.release(); + } + } else { + UserDatabase.data = Object.assign([], data); + } + } + + static async fetch(): Promise { + if (process.env.CI !== "true") { + const pool = UserDatabase.getPool(); + const connection = await pool.getConnection(); + try { + const [rows] = await connection.query("SELECT * FROM users"); + return (rows as any[]).map((row) => new UserDTO(row.id, row.username)); + } finally { + connection.release(); + } + } + return Promise.resolve(UserDatabase.data); + } + + static async setup(): Promise { + if (process.env.CI === "true") { + UserDatabase.data = []; + UserDatabase.mockQuery(); + return; + } + + const pool = UserDatabase.getPool(); + const connection = await pool.getConnection(); + try { + await connection.beginTransaction(); + await connection.query(` + CREATE TABLE IF NOT EXISTS users ( + id INT PRIMARY KEY, + username VARCHAR(255) NOT NULL + )`); + await connection.query("DELETE FROM users"); + await connection.commit(); + } finally { + connection.release(); + } + } + + static async teardown(): Promise { + if (process.env.CI === "true") { + UserDatabase.data = []; + UserDatabase.pool = null; + return; + } + + const pool = UserDatabase.getPool(); + const connection = await pool.getConnection(); + try { + await connection.beginTransaction(); + await connection.query("DROP TABLE IF EXISTS users"); + await connection.commit(); + } finally { + connection.release(); + await pool.end(); + UserDatabase.pool = null; + } + } +} diff --git a/test/mysql/mocks/UserEntity.ts b/test/mysql/mocks/UserEntity.ts new file mode 100644 index 0000000..4267a4d --- /dev/null +++ b/test/mysql/mocks/UserEntity.ts @@ -0,0 +1 @@ +export type UserEntity = Array; From c637637d1ed9f09021e9e5f0d08c3c92e8265dea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 22:15:34 +0100 Subject: [PATCH 4/8] feat: add MariaDB --- docs/mariadb-api.md | 150 +++++++++++++++++ .../classes/MariadbBatchEntityReader.ts | 142 ++++++++++++++++ .../classes/MariadbBatchEntityWriter.ts | 69 ++++++++ src/mariadb/classes/_index.ts | 2 + src/mariadb/index.ts | 1 + .../classes/MariadbBatchEntityReader.test.ts | 129 ++++++++++++++ .../classes/MariadbBatchEntityWriter.test.ts | 75 +++++++++ test/mariadb/mocks/UserBatchReader.ts | 16 ++ test/mariadb/mocks/UserBatchWriter.ts | 18 ++ test/mariadb/mocks/UserDTO.ts | 8 + test/mariadb/mocks/UserDatabase.ts | 159 ++++++++++++++++++ test/mariadb/mocks/UserEntity.ts | 1 + 12 files changed, 770 insertions(+) create mode 100644 docs/mariadb-api.md create mode 100644 src/mariadb/classes/MariadbBatchEntityReader.ts create mode 100644 src/mariadb/classes/MariadbBatchEntityWriter.ts create mode 100644 src/mariadb/classes/_index.ts create mode 100644 src/mariadb/index.ts create mode 100644 test/mariadb/classes/MariadbBatchEntityReader.test.ts create mode 100644 test/mariadb/classes/MariadbBatchEntityWriter.test.ts create mode 100644 test/mariadb/mocks/UserBatchReader.ts create mode 100644 test/mariadb/mocks/UserBatchWriter.ts create mode 100644 test/mariadb/mocks/UserDTO.ts create mode 100644 test/mariadb/mocks/UserDatabase.ts create mode 100644 test/mariadb/mocks/UserEntity.ts diff --git a/docs/mariadb-api.md b/docs/mariadb-api.md new file mode 100644 index 0000000..7c60a2f --- /dev/null +++ b/docs/mariadb-api.md @@ -0,0 +1,150 @@ +# Common API + +In this documentation, we will focus on the common API. This module includes the core of BatchJS. This API allows you to create your own custom jobs and steps using the common interface. + +------------ + + + +## Table of Contents + + - [MariadbBatchEntityReader](#mariadbbatchentityreader) + - [MariadbBatchEntityWriter](#mariadbbatchentitywriter) + +## MariadbBatchEntityReader + +`extends AbstractBatchEntityReaderStream` + +Class that read data in batches of a specified size in Mariadb databases. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the MariadbBatchEntityReader. | MariadbBatchEntityReaderOptions | + | **options.pool** | The MariadbQL connection pool. | Pool | + | **options.query** | SQL query to be executed (without LIMIT and OFFSET). | string | + + + +### fetch (function) + +`private` + +Fetches a batch of data from the database. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **size** | The size of the batch to fetch. | number | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<BatchData.<T>> | A promise that resolves with the batch of data. | + + +### _destroy (function) + +`private` + +Destroys the writer by finalizing the statement used to read entities and +closing the database connection. This method should be called when the +writer is no longer needed to free up resources. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **error** | The error that caused the destruction. | Error, null | + | **callback** | The callback function to be executed after destroying the reader. | ReadCallback | + + +### connectDatabase (function) + +`private` + +Connects to the database by creating a new database connection if none +already exists, or by reusing an existing connection. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<PoolConnection> | A promise that resolves with the database connection. | + + +### disconnectDatabase (function) + + + +Disconnects from the database by closing the active database connection +and setting the connection reference to null. This method should be called +when the reader is no longer needed to free up resources. + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | A promise that resolves when the database connection +is successfully closed. | + + +### prepareStatement (function) + +`private` + +Prepares a statement for fetching entities. If the statement has already been +prepared, it is reused. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **db** | The database connection. | Mariadb.Database | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<Mariadb.Statement> | The prepared statement. | + + +### finalizeStatement (function) + +`private` + +Finalizes the statement used to fetch entities. This method should be +called when the reader is no longer needed to free up resources. + + +## MariadbBatchEntityWriter + +`extends AbstractBatchEntityWriterStream` + +Class that writes data in batches of a specified size in MariadbQL databases. + + + + ### Constructor + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **options** | The options for the MariadbBatchEntityWriter. | MariadbBatchEntityWriterOptions | + | **options.pool** | The MariadbQL connection pool. | Pool | + | **options.prepareStatement** | Insert SQL prepared statement to be executed. | String | + + + +### batchWrite (function) + +`protected` + +Writes a batch of data to the storage. + + #### Parameters + | Name | Description | Type | + |------------|-----------------------------------------|------------------------------| + | **chunk** | The batch of data to write to the storage. | BatchData.<T> | + + #### Returns + | Type | Description | + |------------|-----------------------------------------| + | Promise.<void> | | + + diff --git a/src/mariadb/classes/MariadbBatchEntityReader.ts b/src/mariadb/classes/MariadbBatchEntityReader.ts new file mode 100644 index 0000000..10e4f7c --- /dev/null +++ b/src/mariadb/classes/MariadbBatchEntityReader.ts @@ -0,0 +1,142 @@ +import { Pool, PoolConnection, Prepare } from "mariadb"; +import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; +import { BatchData, ReadCallback } from "batchjs/streams"; + +/** + * @interface + * Options for the MariadbBatchEntityReader. + * @extends AbstractBatchEntityReaderStreamOptions + */ +export interface MariadbBatchEntityReaderOptions extends AbstractBatchEntityReaderStreamOptions { + pool: Pool; + query: string; +} + +/** + * @class + * Class that read data in batches of a specified size in Mariadb databases. + * @extends AbstractBatchEntityReaderStream + * @template T input chunk + * @template E row entity + */ +export abstract class MariadbBatchEntityReader extends AbstractBatchEntityReaderStream { + private readonly pool: Pool; + private readonly query: string; + private client: PoolConnection | null = null; + private fetchEntityStatement: Prepare | null = null; + private entitiesRead : number = 0; + + /** + * @constructor + * @param {MariadbBatchEntityReaderOptions} options - The options for the MariadbBatchEntityReader. + * @param [options.pool] {Pool} - The MariadbQL connection pool. + * @param [options.query] {string} - SQL query to be executed (without LIMIT and OFFSET). + */ + constructor(options: MariadbBatchEntityReaderOptions) { + super(options); + this.pool = options.pool; + this.query = options.query; + } + + /** + * Fetches a batch of data from the database. + * + * @private + * @param {number} size - The size of the batch to fetch. + * @returns {Promise>} A promise that resolves with the batch of data. + */ + protected async fetch(size:number): Promise> { + return this.connectDatabase() + .then((client)=>this.prepareStatement(client)) + .then((statement) => statement.execute([size,this.entitiesRead ])) + .then((results: E[]) => { + this.entitiesRead += size; + return results.map(this.rowToEntity); + }); + } + + /** + * Destroys the writer by finalizing the statement used to read entities and + * closing the database connection. This method should be called when the + * writer is no longer needed to free up resources. + * @see AbstractBatchEntityReaderStream._destroy + * @private + * @param error {Error|null} - The error that caused the destruction. + * @param callback {ReadCallback} - The callback function to be executed after destroying the reader. + */ + _destroy(error: Error | null, callback: ReadCallback):void { + let destroyError = error; + + this.finalizeStatement() + .then(()=>this.disconnectDatabase()) + .catch((error) => {destroyError = error;}) + .finally(() => super._destroy(destroyError, callback)); + } + + + + /** + * Connects to the database by creating a new database connection if none + * already exists, or by reusing an existing connection. + * @private + * @returns {Promise} A promise that resolves with the database connection. + */ + private async connectDatabase(): Promise { + if (!this.client) { + this.client = await this.pool.getConnection(); + } + return this.client; + } + + /** + * Disconnects from the database by closing the active database connection + * and setting the connection reference to null. This method should be called + * when the reader is no longer needed to free up resources. + * @returns {Promise} A promise that resolves when the database connection + * is successfully closed. + */ + private async disconnectDatabase(): Promise { + if (this.client) { + await this.client.release(); + this.client = null; + } + } + + /** + * Prepares a statement for fetching entities. If the statement has already been + * prepared, it is reused. + * @private + * @param {Mariadb.Database} db - The database connection. + * @returns {Promise} The prepared statement. + */ + private async prepareStatement(client: PoolConnection): Promise { + if (!this.fetchEntityStatement) { + this.fetchEntityStatement = await client.prepare( `${this.query} LIMIT ? OFFSET ?`); + } + return this.fetchEntityStatement; + } + + /** + * Finalizes the statement used to fetch entities. This method should be + * called when the reader is no longer needed to free up resources. + * + * @private + */ + private async finalizeStatement(): Promise { + if (this.fetchEntityStatement) { + this.fetchEntityStatement.close(); + this.fetchEntityStatement = null; + } + } + + + /** + * Abstract method to convert a row to an entity. This method should be implemented + * by subclasses to define the specific logic for reading a batch of data. + * + * @abstract + * @protected + * @param row {E} - The row to be converted to an entity. + */ + protected abstract rowToEntity(row: E): T +} \ No newline at end of file diff --git a/src/mariadb/classes/MariadbBatchEntityWriter.ts b/src/mariadb/classes/MariadbBatchEntityWriter.ts new file mode 100644 index 0000000..938e8a2 --- /dev/null +++ b/src/mariadb/classes/MariadbBatchEntityWriter.ts @@ -0,0 +1,69 @@ +import {Pool, PoolConnection} from "mariadb"; +import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common/index"; +import { BatchData } from "batchjs/streams"; + +/** + * @interface + * Options for the MariadbBatchEntityWriter. + * @extends AbstractBatchEntityWriterStreamOptions + */ +export interface MariadbBatchEntityWriterOptions extends AbstractBatchEntityWriterStreamOptions { + pool: Pool; + prepareStatement: string; +} + +/** + * @class + * Class that writes data in batches of a specified size in MariadbQL databases. + * @extends AbstractBatchEntityWriterStream + * @template T input chunk + * @template E row entity + */ +export abstract class MariadbBatchEntityWriter extends AbstractBatchEntityWriterStream { + private readonly pool: Pool; + private readonly prepareStatement: string; + + /** + * @constructor + * @param {MariadbBatchEntityWriterOptions} options - The options for the MariadbBatchEntityWriter. + * @param [options.pool] {Pool} - The MariadbQL connection pool. + * @param [options.prepareStatement] {String} - Insert SQL prepared statement to be executed. + */ + constructor(options: MariadbBatchEntityWriterOptions) { + super(options); + this.pool = options.pool; + this.prepareStatement = options.prepareStatement; + } + + /** + * Writes a batch of data to the storage. + * + * @protected + * @param {BatchData} chunk - The batch of data to write to the storage. + * @returns {Promise} + */ + protected async batchWrite(chunk: BatchData): Promise { + const client: PoolConnection = await this.pool.getConnection(); + try { + await client.beginTransaction(); + await client.batch(this.prepareStatement, chunk.map(this.entityToRow)); + await client.commit(); + } catch (error) { + await client.rollback(); + return Promise.reject(error as Error); + } finally { + client.release(); + } + } + + /** + * Abstract method to convert an entity to a row. + * This method should be implemented by subclasses to define the specific logic for writing a batch of data. + * + * @abstract + * @protected + * @param {T} entity - The entity to be converted to a row. + * @returns {E} The row to be inserted or updated. + */ + protected abstract entityToRow(entity: T): E; +} diff --git a/src/mariadb/classes/_index.ts b/src/mariadb/classes/_index.ts new file mode 100644 index 0000000..6cec29e --- /dev/null +++ b/src/mariadb/classes/_index.ts @@ -0,0 +1,2 @@ +export * from "./MariadbBatchEntityReader"; +export * from "./MariadbBatchEntityWriter"; \ No newline at end of file diff --git a/src/mariadb/index.ts b/src/mariadb/index.ts new file mode 100644 index 0000000..e2a1bf3 --- /dev/null +++ b/src/mariadb/index.ts @@ -0,0 +1 @@ +export * from "./classes/_index"; \ No newline at end of file diff --git a/test/mariadb/classes/MariadbBatchEntityReader.test.ts b/test/mariadb/classes/MariadbBatchEntityReader.test.ts new file mode 100644 index 0000000..a93f78c --- /dev/null +++ b/test/mariadb/classes/MariadbBatchEntityReader.test.ts @@ -0,0 +1,129 @@ +import { UserBatchReader } from "../mocks/UserBatchReader"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +jest.mock("mariadb", () => { + if (process.env.CI === "true") { + console.info("Setup mocked MariaDB database"); + return { createPool: jest.fn(() => UserDatabase.mPool) }; + } + console.info("Setup real MariaDB database"); + return jest.requireActual("mariadb"); +}); + +describe("MariadbBatchEntityReader", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + beforeEach((done) => { + UserDatabase.setup() + .then(() => done()) + .catch((error) => done(error)); + }); + + afterEach((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + test("should read users in batches", (done) => { + UserDatabase.load(Object.assign([], data)).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const result: UserDTO[] = []; + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("end", () => { + expect(result).toEqual(data); + done(); + }); + + reader.read(); + }); + }); + + test("should handle empty result", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const result: UserDTO[] = []; + reader.on("data", (chunk) => { + result.push(chunk); + }); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("end", () => { + expect(result).toEqual([]); + done(); + }); + + reader.read(); + }); + }); + + test("should destroy the connection properly", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + const spyFinalizeStatement = jest.spyOn( + reader as any, + "finalizeStatement" + ); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + done(error); + }); + + reader.on("close", () => { + expect(reader["client"]).toBeNull(); + expect(reader["fetchEntityStatement"]).toBeNull(); + expect(spyFinalizeStatement).toHaveBeenCalled(); + done(); + }); + + reader.read(); + }); + }); + + test("should handle error on close", (done) => { + UserDatabase.load([]).then(() => { + const reader = new UserBatchReader({ batchSize: 2 }); + + jest.spyOn(reader as any, "finalizeStatement").mockImplementation(() => { + return Promise.reject(new Error("Error on close")); + }); + + reader.on("data", () => {}); + + reader.on("error", (error) => { + expect(error).toBeInstanceOf(Error); + expect(error.message).toBe("Error on close"); + done(); + }); + + reader.read(); + }); + }); +}); diff --git a/test/mariadb/classes/MariadbBatchEntityWriter.test.ts b/test/mariadb/classes/MariadbBatchEntityWriter.test.ts new file mode 100644 index 0000000..b4f5eb2 --- /dev/null +++ b/test/mariadb/classes/MariadbBatchEntityWriter.test.ts @@ -0,0 +1,75 @@ +import { UserBatchWriter } from "../mocks/UserBatchWriter"; +import { UserDTO } from "../mocks/UserDTO"; +import { UserDatabase } from "../mocks/UserDatabase"; + +jest.mock("mariadb", () => { + if (process.env.CI === "true") { + console.info("Setup mocked MariaDB database"); + return { createPool: jest.fn(() => UserDatabase.mPool) }; + } + console.info("Setup real MariaDB database"); + return jest.requireActual("mariadb"); +}); + +describe("MariadbBatchEntityWriter", () => { + const data = [ + { id: 1, username: "Alice" }, + { id: 2, username: "Bob" }, + { id: 3, username: "Charlie" }, + ]; + Object.freeze(data); + + beforeAll((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + beforeEach((done) => { + UserDatabase.setup() + .then(() => done()) + .catch((error) => done(error)); + }); + + afterEach((done) => { + UserDatabase.teardown() + .then(() => done()) + .catch((error) => done(error)); + }); + + test("should save entities in database", (done) => { + const writer = new UserBatchWriter({ + batchSize: data.length, + }); + + writer.on("error", (err) => { + done(err); + }); + + writer.once("finish", () => { + UserDatabase.fetch().then((results) => { + expect(results).toHaveLength(data.length); + expect(results).toEqual(data); + done(); + }); + }); + + data.forEach((user) => writer.write(user)); + writer.end(); + }); + + test("should rollback on error", (done) => { + const writer = new UserBatchWriter({ + batchSize: 3, + }); + + writer.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + done(); + }); + + writer.write(new UserDTO(1, "Alice")); + writer.write(new UserDTO(1, "DUPLICATED ID")); + writer.end(); + }); +}); diff --git a/test/mariadb/mocks/UserBatchReader.ts b/test/mariadb/mocks/UserBatchReader.ts new file mode 100644 index 0000000..74b1732 --- /dev/null +++ b/test/mariadb/mocks/UserBatchReader.ts @@ -0,0 +1,16 @@ +import { MariadbBatchEntityReader } from "../../../src/mariadb"; +import { UserDTO } from "./UserDTO"; +import { UserDatabase } from "./UserDatabase"; + + export class UserBatchReader extends MariadbBatchEntityReader { + constructor(options:{batchSize:number,query?:string}) { + super({ + batchSize: options.batchSize, + pool: UserDatabase.getPool(), + query: options.query || "SELECT id, username FROM users" + }); + } + protected rowToEntity(row: UserDTO): UserDTO { + return row; + } + } \ No newline at end of file diff --git a/test/mariadb/mocks/UserBatchWriter.ts b/test/mariadb/mocks/UserBatchWriter.ts new file mode 100644 index 0000000..b14336e --- /dev/null +++ b/test/mariadb/mocks/UserBatchWriter.ts @@ -0,0 +1,18 @@ +import { MariadbBatchEntityWriter } from "../../../src/mariadb"; +import { UserDTO } from "./UserDTO"; +import { UserDatabase } from "./UserDatabase"; +import { UserEntity } from "./UserEntity"; + +export class UserBatchWriter extends MariadbBatchEntityWriter { + constructor(options: { batchSize: number }) { + super({ + batchSize: options.batchSize, + pool: UserDatabase.getPool(), + prepareStatement: "INSERT INTO users (id, username) VALUES (?, ?)", + }); + } + + protected entityToRow(entity: UserDTO): UserEntity { + return [entity.id, entity.username]; + } +} diff --git a/test/mariadb/mocks/UserDTO.ts b/test/mariadb/mocks/UserDTO.ts new file mode 100644 index 0000000..40ed3ec --- /dev/null +++ b/test/mariadb/mocks/UserDTO.ts @@ -0,0 +1,8 @@ +export class UserDTO { + id: number; + username: string; + constructor(id: number, username: string) { + this.id = id; + this.username = username; + } +} \ No newline at end of file diff --git a/test/mariadb/mocks/UserDatabase.ts b/test/mariadb/mocks/UserDatabase.ts new file mode 100644 index 0000000..299ebd3 --- /dev/null +++ b/test/mariadb/mocks/UserDatabase.ts @@ -0,0 +1,159 @@ +import { ConnectionConfig, createConnection, createPool, Pool } from "mariadb"; +import { UserDTO } from "./UserDTO"; + +export class UserDatabase { + private static data: UserDTO[] = []; + private static pool : Pool | null = null + + static readonly mPrepareStatement = { + execute: jest.fn(), + close: jest.fn(), + }; + static readonly mPoolClient = { + prepare: jest.fn(() => Promise.resolve(UserDatabase.mPrepareStatement)), + beginTransaction: jest.fn(), + batch: jest.fn(), + commit: jest.fn(), + rollback: jest.fn(), + release: jest.fn(), + }; + + static readonly mPool = { + getConnection: jest.fn(() => Promise.resolve(UserDatabase.mPoolClient)), + }; + + private static getConnectionOptions(): ConnectionConfig { + if (process.env.CI !== "true") { + return { + user: process.env.MARIADB_USER, + password: process.env.MARIADB_PASSWORD, + host: process.env.MARIADB_HOST, + port: parseInt(process.env.MARIADB_PORT as string), + database: process.env.MARIADB_DATABASE, + }; + } + return {}; + } + static getPool(): Pool { + if (!UserDatabase.pool){ + UserDatabase.pool = createPool(UserDatabase.getConnectionOptions()) + } + return UserDatabase.pool; + } + + static mockWriter() { + const idSet: Set = new Set(); + UserDatabase.mPoolClient.batch.mockImplementation( + (sql: string, values: any[]) => { + return Promise.all( + values.map((value) => { + const id = value[0] as number; + const username = value[1] as string; + if (idSet.has(id)) { + return Promise.reject(new Error("Duplicated ID")); + } + idSet.add(id); + + UserDatabase.data.push(new UserDTO(id, username)); + return Promise.resolve(); + }) + ); + } + ); + } + + static load(data: UserDTO[]): Promise { + if (process.env.CI !== "true") { + if (data.length === 0) return Promise.resolve(); + return createConnection(UserDatabase.getConnectionOptions()).then( + (connection) => { + return connection + .beginTransaction() + .then(() => + connection.batch( + `INSERT INTO users (id, username) VALUES (?, ?)`, + data.map((user) => [user.id, user.username]) + ) + ) + .then(() => { + connection.commit(); + }) + .finally(() => connection.end()); + } + ); + } + + UserDatabase.mPrepareStatement.execute.mockImplementation( + ([size, offset]: number[]) => { + return data.splice(0, size); + } + ); + + return Promise.resolve(); + } + + static async fetch(): Promise { + if (process.env.CI !== "true") { + return createConnection(UserDatabase.getConnectionOptions()) + .then((connection) => { + return connection + .query("SELECT * FROM users") + .finally(() => connection.end()); + }) + .then((rows) => { + return rows.map((row: unknown) => { + return row as UserDTO; + }); + }); + } + return Promise.resolve(UserDatabase.data); + } + + static async setup(): Promise { + if (process.env.CI === "true") { + UserDatabase.data = []; + UserDatabase.mockWriter(); + return Promise.resolve(); + } + + return createConnection(UserDatabase.getConnectionOptions()).then( + (connection) => { + return connection + .beginTransaction() + .then(() => + connection.query( + `CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + username TEXT NOT NULL + )` + ) + ) + .then(() => connection.query(`DELETE FROM users`)) + .then(() => connection.commit()) + .finally(() => connection.end()); + } + ); + } + + static async teardown(): Promise { + if (process.env.CI === "true") { + UserDatabase.data = []; + UserDatabase.pool = null + return Promise.resolve(); + } + + return createConnection(UserDatabase.getConnectionOptions()).then( + (connection) => { + return connection + .beginTransaction() + .then(() => connection.query(`DROP TABLE IF EXISTS users`)) + .then(() => connection.commit()) + .finally(() => { + connection.end(); + UserDatabase.pool?.end(); + UserDatabase.pool = null; + }); + } + ); + } +} diff --git a/test/mariadb/mocks/UserEntity.ts b/test/mariadb/mocks/UserEntity.ts new file mode 100644 index 0000000..ec8cdbb --- /dev/null +++ b/test/mariadb/mocks/UserEntity.ts @@ -0,0 +1 @@ +export type UserEntity = Array; \ No newline at end of file From fd93c741ca81766f4df31357245a81982942e872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 23:16:57 +0100 Subject: [PATCH 5/8] fix: sqlite test --- test/sqlite/classes/SqliteBatchEntityReader.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/sqlite/classes/SqliteBatchEntityReader.test.ts b/test/sqlite/classes/SqliteBatchEntityReader.test.ts index ebe4851..e4681e6 100644 --- a/test/sqlite/classes/SqliteBatchEntityReader.test.ts +++ b/test/sqlite/classes/SqliteBatchEntityReader.test.ts @@ -36,7 +36,7 @@ describe("SqliteBatchEntityReader", () => { }); reader.on("error", (error) => { - fail(error); + done(error); }); reader.on("end", () => { From eda5df38b3cf620522a0e1fc84b74676bf4a8cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 23:17:59 +0100 Subject: [PATCH 6/8] chore: bump batchjs version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 9f7307f..debc21b 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,7 @@ "sqlite3": "^5.1.7" }, "dependencies": { - "batchjs": "^1.0.0" + "batchjs": "^1.1.0" }, "commitlint": { "extends": "@commitlint/config-conventional" From 78dd990399e9eb069259abad150adbc442839313 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 23:21:02 +0100 Subject: [PATCH 7/8] chore: add global export --- package.json | 5 +++++ src/index.ts | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 src/index.ts diff --git a/package.json b/package.json index debc21b..120e157 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,11 @@ }, "exports": { ".": { + "types": "./dist/@types/index.d.ts", + "import": "./dist/esm/index.js", + "require": "./dist/cjs/index.js" + }, + "./common": { "types": "./dist/@types/common/index.d.ts", "import": "./dist/esm/common/index.js", "require": "./dist/cjs/common/index.js" diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..faa452b --- /dev/null +++ b/src/index.ts @@ -0,0 +1,5 @@ +export * from "./common/index"; +export * from "./mariadb/index"; +export * from "./mysql/index"; +export * from "./postgresql/index"; +export * from "./sqlite/index"; \ No newline at end of file From 1b68835f4870a6459daea293eeb6d2cca3960f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Fri, 31 Jan 2025 23:21:40 +0100 Subject: [PATCH 8/8] chore: fix build --- src/common/classes/AbstractBatchEntityReaderStream.ts | 2 +- src/common/classes/AbstractBatchEntityWriterStream.ts | 2 +- src/mariadb/classes/MariadbBatchEntityReader.ts | 2 +- src/mariadb/classes/MariadbBatchEntityWriter.ts | 2 +- src/mysql/classes/MysqlBatchEntityReader.ts | 2 +- src/mysql/classes/MysqlBatchEntityWriter.ts | 2 +- src/postgresql/classes/PostgresBatchEntityReader.ts | 2 +- src/postgresql/classes/PostgresBatchEntityWriter.ts | 2 +- src/sqlite/classes/SqliteBatchEntityReader.ts | 2 +- src/sqlite/classes/SqliteBatchEntityWriter.ts | 2 +- tsconfig.cjs.json | 4 ++++ tsconfig.esm.json | 4 ++++ 12 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/common/classes/AbstractBatchEntityReaderStream.ts b/src/common/classes/AbstractBatchEntityReaderStream.ts index 846215c..b640d3b 100644 --- a/src/common/classes/AbstractBatchEntityReaderStream.ts +++ b/src/common/classes/AbstractBatchEntityReaderStream.ts @@ -1,4 +1,4 @@ -import { ObjectReadable, ObjectReadableOptions, BatchData } from "batchjs/streams"; +import { ObjectReadable, ObjectReadableOptions, BatchData } from "batchjs"; /** * @interface diff --git a/src/common/classes/AbstractBatchEntityWriterStream.ts b/src/common/classes/AbstractBatchEntityWriterStream.ts index c26d62f..dc6afdd 100644 --- a/src/common/classes/AbstractBatchEntityWriterStream.ts +++ b/src/common/classes/AbstractBatchEntityWriterStream.ts @@ -1,4 +1,4 @@ -import { ObjectWritable, ObjectWritableOptions, WriteCallback, BatchData } from "batchjs/streams"; +import { ObjectWritable, ObjectWritableOptions, WriteCallback, BatchData } from "batchjs"; /** * @interface diff --git a/src/mariadb/classes/MariadbBatchEntityReader.ts b/src/mariadb/classes/MariadbBatchEntityReader.ts index 10e4f7c..d1327e9 100644 --- a/src/mariadb/classes/MariadbBatchEntityReader.ts +++ b/src/mariadb/classes/MariadbBatchEntityReader.ts @@ -1,6 +1,6 @@ import { Pool, PoolConnection, Prepare } from "mariadb"; import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; -import { BatchData, ReadCallback } from "batchjs/streams"; +import { BatchData, ReadCallback } from "batchjs"; /** * @interface diff --git a/src/mariadb/classes/MariadbBatchEntityWriter.ts b/src/mariadb/classes/MariadbBatchEntityWriter.ts index 938e8a2..476d1ce 100644 --- a/src/mariadb/classes/MariadbBatchEntityWriter.ts +++ b/src/mariadb/classes/MariadbBatchEntityWriter.ts @@ -1,6 +1,6 @@ import {Pool, PoolConnection} from "mariadb"; import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common/index"; -import { BatchData } from "batchjs/streams"; +import { BatchData } from "batchjs"; /** * @interface diff --git a/src/mysql/classes/MysqlBatchEntityReader.ts b/src/mysql/classes/MysqlBatchEntityReader.ts index 6def77a..29c877f 100644 --- a/src/mysql/classes/MysqlBatchEntityReader.ts +++ b/src/mysql/classes/MysqlBatchEntityReader.ts @@ -1,6 +1,6 @@ import { Pool, PoolConnection, RowDataPacket } from "mysql2/promise"; import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; -import { BatchData, ReadCallback } from "batchjs/streams"; +import { BatchData, ReadCallback } from "batchjs"; /** * @interface diff --git a/src/mysql/classes/MysqlBatchEntityWriter.ts b/src/mysql/classes/MysqlBatchEntityWriter.ts index e855527..71fd3d3 100644 --- a/src/mysql/classes/MysqlBatchEntityWriter.ts +++ b/src/mysql/classes/MysqlBatchEntityWriter.ts @@ -1,5 +1,5 @@ import { Pool, PoolConnection } from "mysql2/promise"; -import { BatchData } from "batchjs/streams"; +import { BatchData } from "batchjs"; import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common"; /** diff --git a/src/postgresql/classes/PostgresBatchEntityReader.ts b/src/postgresql/classes/PostgresBatchEntityReader.ts index 1f39f54..3037974 100644 --- a/src/postgresql/classes/PostgresBatchEntityReader.ts +++ b/src/postgresql/classes/PostgresBatchEntityReader.ts @@ -1,6 +1,6 @@ import { Pool, PoolClient } from "pg"; import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; -import { BatchData } from "batchjs/streams"; +import { BatchData } from "batchjs"; /** * @interface diff --git a/src/postgresql/classes/PostgresBatchEntityWriter.ts b/src/postgresql/classes/PostgresBatchEntityWriter.ts index 36f1190..7095d3c 100644 --- a/src/postgresql/classes/PostgresBatchEntityWriter.ts +++ b/src/postgresql/classes/PostgresBatchEntityWriter.ts @@ -1,6 +1,6 @@ import { Pool, PoolClient } from "pg"; import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common/index"; -import { BatchData } from "batchjs/streams"; +import { BatchData } from "batchjs"; /** * @interface diff --git a/src/sqlite/classes/SqliteBatchEntityReader.ts b/src/sqlite/classes/SqliteBatchEntityReader.ts index b5bb2b0..1bc1341 100644 --- a/src/sqlite/classes/SqliteBatchEntityReader.ts +++ b/src/sqlite/classes/SqliteBatchEntityReader.ts @@ -1,6 +1,6 @@ import sqlite from "sqlite"; import { AbstractBatchEntityReaderStream, AbstractBatchEntityReaderStreamOptions } from "../../common/index"; -import { BatchData, ReadCallback } from "batchjs/streams"; +import { BatchData, ReadCallback } from "batchjs"; /** * @interface diff --git a/src/sqlite/classes/SqliteBatchEntityWriter.ts b/src/sqlite/classes/SqliteBatchEntityWriter.ts index f21bdaf..2f94dc0 100644 --- a/src/sqlite/classes/SqliteBatchEntityWriter.ts +++ b/src/sqlite/classes/SqliteBatchEntityWriter.ts @@ -1,6 +1,6 @@ import sqlite from "sqlite"; import { AbstractBatchEntityWriterStream, AbstractBatchEntityWriterStreamOptions } from "../../common/index"; -import { BatchData, WriteCallback } from "batchjs/streams"; +import { BatchData, WriteCallback } from "batchjs"; /** * @interface diff --git a/tsconfig.cjs.json b/tsconfig.cjs.json index 8ab4792..58969a6 100644 --- a/tsconfig.cjs.json +++ b/tsconfig.cjs.json @@ -2,6 +2,10 @@ "extends": "./tsconfig.json", "compilerOptions": { "module": "CommonJS", + "moduleResolution": "node", + "esModuleInterop": true, + "allowSyntheticDefaultImports": true, + "resolveJsonModule": true, "outDir": "./dist/cjs" } } diff --git a/tsconfig.esm.json b/tsconfig.esm.json index 48e68b0..4eeedce 100644 --- a/tsconfig.esm.json +++ b/tsconfig.esm.json @@ -2,6 +2,10 @@ "extends": "./tsconfig.json", "compilerOptions": { "module": "ESNext", + "moduleResolution": "node", + "esModuleInterop": true, + "allowSyntheticDefaultImports": true, + "resolveJsonModule": true, "outDir": "./dist/esm" } }