Skip to content

Commit

Permalink
fix: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vpbs2 committed Aug 27, 2024
1 parent 4dfb093 commit 3ca8b10
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 108 deletions.
5 changes: 2 additions & 3 deletions benchmarking/src/app/file-loader/file-loader.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { convertUint8ArrayToSharedArrayBuffer } from '@devrev/meerkat-dbm';
import axios from 'axios';
import { useState } from 'react';
import TAXI_JSON_DATA from '../../../public/data-sets/taxi.json';
Expand Down Expand Up @@ -25,9 +26,7 @@ export const FileLoader = ({
const fileBuffer = file.data;

if (bufferType === 'sharedArrayBuffer') {
const sharedBuffer = new SharedArrayBuffer(fileBuffer.byteLength);
const fileBufferView = new Uint8Array(sharedBuffer);
fileBufferView.set(new Uint8Array(fileBuffer));
const sharedBuffer = convertUint8ArrayToSharedArrayBuffer(fileBuffer);

await fileManager.registerFileBuffer({
tableName: 'taxi',
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion meerkat-dbm/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@devrev/meerkat-dbm",
"version": "0.0.170",
"version": "0.1.0",
"dependencies": {
"tslib": "^2.3.0",
"@duckdb/duckdb-wasm": "^1.28.0",
Expand Down
171 changes: 87 additions & 84 deletions meerkat-dbm/src/dbm/__test__/dbm-parallel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,27 @@ import { IFrameRunnerManager } from '../dbm-parallel/runner-manager';
import { MockFileManager } from './dbm.spec';
import { InstanceManager } from './mock';

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
const iFrameRunnerManager: IFrameRunnerManager = {
startRunners: jest.fn(),
stopRunners: jest.fn(),
getRunnerIds: jest.fn().mockReturnValue(['1', '2']),
isFrameRunnerReady: jest.fn().mockResolvedValue(true),
iFrameManagers: new Map(),
};

const loggerMock = {
debug: jest.fn(),
error: jest.fn(),
info: jest.fn(),
} as unknown as DBMLogger;

const iFrameRunnerManager = {
startRunners: jest.fn(),
stopRunners: jest.fn(),
getRunnerIds: jest.fn().mockReturnValue(['1', '2']),
isFrameRunnerReady: jest.fn().mockResolvedValue(true),
fetchTableFileBuffers: jest.fn().mockResolvedValue([]),
totalRunners: 2,
iFrameManagers: new Map(),
iFrameReadyMap: new Map(),
logger: loggerMock, // Adjusted to use the loggerMock
addIFrameManager: jest.fn(),
areRunnersRunning: jest.fn(),
messageListener: jest.fn(),
} as unknown as jest.Mocked<IFrameRunnerManager>;

const runnerMock = {
communication: {
sendRequest: jest.fn(),
Expand Down Expand Up @@ -55,102 +60,100 @@ describe('DBMParallel', () => {
jest.clearAllMocks();
});

describe('queryWithTables', () => {
it('should execute query with tables successfully', async () => {
runnerMock.communication.sendRequest.mockResolvedValue({
message: { isError: false, data: [{ data: 1 }] },
});
it('should execute query with tables successfully', async () => {
runnerMock.communication.sendRequest.mockResolvedValue({
message: { isError: false, data: [{ data: 1 }] },
});

const result = await dbmParallel.queryWithTables({
query: 'SELECT * FROM table',
tables: [],
});

const result = await dbmParallel.queryWithTables({
expect(result).toEqual([{ data: 1 }]);
expect(runnerMock.communication.sendRequest).toHaveBeenCalledWith({
type: BROWSER_RUNNER_TYPE.EXEC_QUERY,
payload: {
query: 'SELECT * FROM table',
tables: [],
});

expect(result).toEqual([{ data: 1 }]);
expect(runnerMock.communication.sendRequest).toHaveBeenCalledWith({
type: BROWSER_RUNNER_TYPE.EXEC_QUERY,
payload: {
query: 'SELECT * FROM table',
tables: [],
options: undefined,
},
});
options: undefined,
},
});
});

it('should handle query execution errors', async () => {
const response = {
message: { isError: true, error: 'Query failed', data: [] },
};
it('should handle query execution errors', async () => {
const response = {
message: { isError: true, error: 'Query failed', data: [] },
};

runnerMock.communication.sendRequest.mockResolvedValue(response);
runnerMock.communication.sendRequest.mockResolvedValue(response);

await expect(
dbmParallel.queryWithTables({
query: 'SELECT * FROM table',
tables: [],
})
).rejects.toThrow('Query failed');
});
await expect(
dbmParallel.queryWithTables({
query: 'SELECT * FROM table',
tables: [],
})
).rejects.toThrow('Query failed');
});

it('should shut down after all queries are complete and timeout elapses', async () => {
jest.useFakeTimers();
it('should shut down after all queries are complete and timeout elapses', async () => {
jest.useFakeTimers();

runnerMock.communication.sendRequest.mockResolvedValue({
message: { isError: false, data: [{ data: 1 }] },
});
runnerMock.communication.sendRequest.mockResolvedValue({
message: { isError: false, data: [{ data: 1 }] },
});

// Execute a query
await dbmParallel.queryWithTables({
query: 'SELECT * FROM table1',
tables: [],
});
// Execute a query
await dbmParallel.queryWithTables({
query: 'SELECT * FROM table1',
tables: [],
});

// Advance timer to time less than the shutdown time
jest.advanceTimersByTime(500);
// Advance timer to time less than the shutdown time
jest.advanceTimersByTime(500);

// Check if the shutdown is not triggered
expect(fileManager.onDBShutdownHandler).not.toHaveBeenCalled();
expect(iFrameRunnerManager.stopRunners).not.toHaveBeenCalled();
// Check if the shutdown is not triggered
expect(fileManager.onDBShutdownHandler).not.toHaveBeenCalled();
expect(iFrameRunnerManager.stopRunners).not.toHaveBeenCalled();

// Advance timer to time more than the shutdown time
jest.advanceTimersByTime(600);
// Advance timer to time more than the shutdown time
jest.advanceTimersByTime(600);

expect(fileManager.onDBShutdownHandler).toHaveBeenCalled();
expect(iFrameRunnerManager.stopRunners).toHaveBeenCalled();
expect(fileManager.onDBShutdownHandler).toHaveBeenCalled();
expect(iFrameRunnerManager.stopRunners).toHaveBeenCalled();

jest.useRealTimers();
});
jest.useRealTimers();
});

it('should clear previous shutdown timer if a new query arrives before shutdown', async () => {
jest.useFakeTimers();
it('should clear previous shutdown timer if a new query arrives before shutdown', async () => {
jest.useFakeTimers();

// Simulate a query execution
runnerMock.communication.sendRequest.mockResolvedValue({
message: { isError: false, data: [{ data: 1 }] },
});
// Simulate a query execution
runnerMock.communication.sendRequest.mockResolvedValue({
message: { isError: false, data: [{ data: 1 }] },
});

await dbmParallel.queryWithTables({
query: 'SELECT * FROM table1',
tables: [],
});
await dbmParallel.queryWithTables({
query: 'SELECT * FROM table1',
tables: [],
});

// Advance timer to time 100 less than the shutdown time
jest.advanceTimersByTime(900);
// Advance timer to time 100 less than the shutdown time
jest.advanceTimersByTime(900);

// Execute another query
await dbmParallel.queryWithTables({
query: 'SELECT * FROM table2',
tables: [],
});
// Execute another query
await dbmParallel.queryWithTables({
query: 'SELECT * FROM table2',
tables: [],
});

// Shutdown timer should be reset due to the new query
jest.advanceTimersByTime(900);
// Shutdown timer should be reset due to the new query
jest.advanceTimersByTime(900);

// Shutdown should not have been triggered
expect(fileManager.onDBShutdownHandler).not.toHaveBeenCalled();
expect(iFrameRunnerManager.stopRunners).not.toHaveBeenCalled();
// Shutdown should not have been triggered
expect(fileManager.onDBShutdownHandler).not.toHaveBeenCalled();
expect(iFrameRunnerManager.stopRunners).not.toHaveBeenCalled();

jest.useRealTimers();
});
jest.useRealTimers();
});
});
17 changes: 4 additions & 13 deletions meerkat-dbm/src/dbm/dbm-parallel/dbm-parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,12 @@ import { DBMConstructorOptions, QueryOptions, TableConfig } from '../types';
import { IFrameRunnerManager } from './runner-manager';

//Round Robin for multiple runners like 10
const roundRobin = (
counter: number,
maxValue: number
): {
counter: number;
} => {
const roundRobin = (counter: number, maxValue: number): number => {
if (counter === maxValue) {
return {
counter: 0,
};
return 0;
}

return {
counter: counter + 1,
};
return counter + 1;
};

export class DBMParallel {
Expand Down Expand Up @@ -117,7 +108,7 @@ export class DBMParallel {
* A simple round-robin to select the runner
*/
const runners = this.iFrameRunnerManager.getRunnerIds();
this.counter = roundRobin(this.counter, runners.length - 1).counter;
this.counter = roundRobin(this.counter, runners.length - 1);

const runner = this.iFrameRunnerManager.iFrameManagers.get(
runners[this.counter]
Expand Down
8 changes: 7 additions & 1 deletion meerkat-dbm/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
export * from './dbm';
export * from './file-manager';
export * from './logger';
export * from './utils/duck-type-convertor';
export {
convertArrowTableToJSON,
convertSharedArrayBufferToUint8Array,
convertUint8ArrayToSharedArrayBuffer,
getMainAppName,
getRunnerAppName,
} from './utils';
export * from './utils/parallel-dbm-utils/get-app-name';
export * from './window-communication/runner-types';
export * from './window-communication/window-communication';
34 changes: 34 additions & 0 deletions meerkat-dbm/src/utils/__tests__/array-convertor.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import {
convertSharedArrayBufferToUint8Array,
convertUint8ArrayToSharedArrayBuffer,
} from '../array-convertor';

describe('Buffer Conversion Functions', () => {
test('convertUint8ArrayToSharedArrayBuffer should correctly convert Uint8Array to SharedArrayBuffer', () => {
// Input Uint8Array
const inputArray = new Uint8Array([1, 2, 3, 4, 5]);

// Convert to SharedArrayBuffer
const sharedBuffer = convertUint8ArrayToSharedArrayBuffer(inputArray);

// Create a view of the SharedArrayBuffer
const sharedArrayView = new Uint8Array(sharedBuffer);

// Assert that the SharedArrayBuffer has the same length and data as the input Uint8Array
expect(sharedArrayView.byteLength).toBe(inputArray.length);
expect(sharedArrayView).toEqual(inputArray);
});

test('convertSharedArrayBufferToUint8Array should correctly convert SharedArrayBuffer to Uint8Array', () => {
// Input SharedArrayBuffer
const inputArray = new Uint8Array([10, 20, 30, 40, 50]);
const sharedBuffer = convertUint8ArrayToSharedArrayBuffer(inputArray);

// Convert back to Uint8Array
const resultArray = convertSharedArrayBufferToUint8Array(sharedBuffer);

// Assert that the Uint8Array has the same length and data as the original input
expect(resultArray.length).toBe(inputArray.length);
expect(resultArray).toEqual(inputArray);
});
});
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Field, Type, vectorFromArray } from 'apache-arrow';

import { convertArrowValueToJS } from './duck-type-convertor';
import { convertArrowValueToJS } from '../duck-type-convertor';

const FIELD = {
metadata: {},
name: 'null',
nullable: true,
type: {
children: [{ metadata: {}, name: 'null', nullable: true, typeId: Type.Utf8 }],
children: [
{ metadata: {}, name: 'null', nullable: true, typeId: Type.Utf8 },
],
},
};

Expand Down Expand Up @@ -44,15 +46,19 @@ const DECIMAL_FIELD = {
const LIST_INT_FIELD = {
...FIELD,
type: {
children: [{ metadata: {}, name: 'null', nullable: true, typeId: Type.Int }],
children: [
{ metadata: {}, name: 'null', nullable: true, typeId: Type.Int },
],
},
typeId: Type.List,
} as Field;

const LIST_UTF8_FIELD = {
...FIELD,
type: {
children: [{ metadata: {}, name: 'null', nullable: true, typeId: Type.Utf8 }],
children: [
{ metadata: {}, name: 'null', nullable: true, typeId: Type.Utf8 },
],
},
typeId: Type.List,
} as Field;
Expand Down Expand Up @@ -111,13 +117,17 @@ const duckDBComplexTypeConvertorArray = [
describe('DuckDBTypeConvertor', () => {
it('should convert to duckdb to check for simple types', () => {
duckDbSimpleTypeConvertorArray.forEach((item) => {
expect(convertArrowValueToJS(item.field, item.input)).toStrictEqual(item.output);
expect(convertArrowValueToJS(item.field, item.input)).toStrictEqual(
item.output
);
});
});

it('should convert to duckdb to check for complex types', () => {
duckDBComplexTypeConvertorArray.forEach((item) => {
expect(convertArrowValueToJS(item.field, item.input)).toStrictEqual(item.output);
expect(convertArrowValueToJS(item.field, item.input)).toStrictEqual(
item.output
);
});
});
});
2 changes: 2 additions & 0 deletions meerkat-dbm/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export * from './array-convertor';
export * from './duck-type-convertor';
export * from './get-buffer-from-json';
export * from './is-defined';
export * from './merge-file-buffer-store-into-table';
export * from './parallel-dbm-utils/get-app-name';

0 comments on commit 3ca8b10

Please sign in to comment.