Skip to content

Commit

Permalink
update: parallel indexeddb
Browse files Browse the repository at this point in the history
  • Loading branch information
vpbs2 committed Sep 4, 2024
1 parent 8e6f4a7 commit 3e491b3
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 411 deletions.
28 changes: 14 additions & 14 deletions benchmarking/public/runner/index.html

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const ParallelIndexedDBMProvider = ({
origin: 'http://localhost:4200',
totalRunners: 4,
fetchTableFileBuffers: async (table) => {
return fileManagerRef.current.getTableBufferData(table);
return [];
},
fetchPreQuery: () => {
return [];
Expand Down
4 changes: 2 additions & 2 deletions meerkat-dbm/src/dbm/dbm-parallel/dbm-parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const roundRobin = (counter: number, maxValue: number): number => {
};

export class DBMParallel {
private fileManager: FileManagerType<SharedArrayBuffer>;
private fileManager: FileManagerType<SharedArrayBuffer | Uint8Array>;
private logger: DBMLogger;
private tableLockRegistry: Record<string, TableLock> = {};

Expand All @@ -42,7 +42,7 @@ export class DBMParallel {
instanceManager,
onDuckDBShutdown,
iFrameRunnerManager,
}: DBMConstructorOptions<SharedArrayBuffer> & {
}: DBMConstructorOptions<SharedArrayBuffer | Uint8Array> & {
iFrameRunnerManager: IFrameRunnerManager;
}) {
this.fileManager = fileManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export class IndexedDBFileManager implements FileManagerType {

async mountFileBufferByTables(tables: TableConfig[]): Promise<void> {
const tableData = await this.getFilesNameForTables(tables);

console.log('tableData', tableData);
/**
* Check if the file registered size is not more than the limit
* If it is more than the limit, then remove the files which are not needed while mounting this the tables
Expand All @@ -245,7 +245,7 @@ export class IndexedDBFileManager implements FileManagerType {
const filesList = _filesList.filter(
(fileName) => !this.fileRegisterer.isFileRegisteredInDB(fileName)
);

console.log('filesList', filesList);
const uniqueFileNames = Array.from(new Set(filesList));

const filesData = await this.indexedDB?.files.bulkGet(uniqueFileNames);
Expand All @@ -267,12 +267,12 @@ export class IndexedDBFileManager implements FileManagerType {
async getTableData(table: TableConfig): Promise<Table | undefined> {
const tableData = await this.indexedDB.tablesKey.get(table.name);

if (!tableData) return undefined;
if (!tableData) return undefined;

return {
...tableData,
files: getFilesByPartition(tableData?.files ?? [], table.partitions),
};
return {
...tableData,
files: getFilesByPartition(tableData?.files ?? [], table.partitions),
};
}

async setTableMetadata(tableName: string, metadata: object): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,247 +1,19 @@
import { InstanceManagerType } from '../../dbm/instance-manager';
import { TableConfig } from '../../dbm/types';
import { DBMEvent, DBMLogger } from '../../logger';
import { Table, TableWiseFiles } from '../../types';
import {
getBufferFromJSON,
isDefined,
mergeFileBufferStoreIntoTable,
} from '../../utils';
import {
FileBufferStore,
FileJsonStore,
FileManagerConstructorOptions,
FileManagerType,
} from '../file-manager-type';
import { FileRegisterer } from '../file-registerer';
import { getFilesByPartition } from '../partition';
import { MeerkatDatabase } from './meerkat-database';
import { IndexedDBFileManager } from './indexed-db-file-manager';

export class ParallelIndexedDBFileManager
extends IndexedDBFileManager
implements FileManagerType<Uint8Array>
{
private indexedDB: MeerkatDatabase; // IndexedDB instance
private instanceManager: InstanceManagerType;
private fileRegisterer: FileRegisterer;
private configurationOptions: FileManagerConstructorOptions['options'];

fetchTableFileBuffers: (tableName: string) => Promise<FileBufferStore[]>;

private logger?: DBMLogger;
private onEvent?: (event: DBMEvent) => void;

constructor({
fetchTableFileBuffers,
instanceManager,
options,
logger,
onEvent,
}: FileManagerConstructorOptions) {
this.fetchTableFileBuffers = fetchTableFileBuffers;
this.indexedDB = new MeerkatDatabase();
this.instanceManager = instanceManager;
this.fileRegisterer = new FileRegisterer({ instanceManager });
this.configurationOptions = options;
this.logger = logger;
this.onEvent = onEvent;
}

async bulkRegisterFileBuffer(
fileBuffers: FileBufferStore<Uint8Array>[]
): Promise<void> {
const tableNames = Array.from(
new Set(fileBuffers.map((fileBuffer) => fileBuffer.tableName))
);

const currentTableData = await this.indexedDB.tablesKey.toArray();

const updatedTableMap = mergeFileBufferStoreIntoTable(
fileBuffers,
currentTableData
);

/**
* Extracts the tables and files data from the tablesMap and fileBuffers
* in format that can be stored in IndexedDB
*/
const updatedTableData = tableNames.map((tableName) => {
return { tableName, files: updatedTableMap.get(tableName)?.files ?? [] };
});

const newFilesData = fileBuffers.map((fileBuffer) => {
return { buffer: fileBuffer.buffer, fileName: fileBuffer.fileName };
});

// Update the tables and files table in IndexedDB
await this.indexedDB
.transaction(
'rw',
this.indexedDB.tablesKey,
this.indexedDB.files,
async () => {
await this.indexedDB.tablesKey.bulkPut(updatedTableData);

await this.indexedDB.files.bulkPut(newFilesData);
}
)
.catch((error) => {
console.error(error);
});
constructor(options: FileManagerConstructorOptions) {
super(options);
}

async registerFileBuffer(
fileBuffer: FileBufferStore<Uint8Array>
): Promise<void> {
const { buffer, fileName, tableName } = fileBuffer;

const currentTableData = await this.indexedDB.tablesKey.toArray();

const updatedTableMap = mergeFileBufferStoreIntoTable(
[fileBuffer],
currentTableData
);

// Update the tables and files table in IndexedDB
await this.indexedDB
.transaction(
'rw',
this.indexedDB.tablesKey,
this.indexedDB.files,
async () => {
await this.indexedDB.tablesKey.put({
tableName: fileBuffer.tableName,
files: updatedTableMap.get(tableName)?.files ?? [],
});

await this.indexedDB.files.put({
fileName,
buffer: buffer,
});
}
)
.catch((error) => {
console.error(error);
});
}

async bulkRegisterJSON(jsonData: FileJsonStore[]): Promise<void> {
const fileBuffers = await Promise.all(
jsonData.map(async (jsonFile) => {
const { json, tableName, ...fileData } = jsonFile;

const bufferData = await getBufferFromJSON({
instanceManager: this.instanceManager,
json: json,
tableName,
logger: this.logger,
onEvent: this.onEvent,
metadata: jsonFile.metadata,
});

return { buffer: bufferData, tableName, ...fileData };
})
);

await this.bulkRegisterFileBuffer(fileBuffers);
}

async registerJSON(jsonData: FileJsonStore): Promise<void> {
const { json, tableName, ...fileData } = jsonData;

/**
* Convert JSON to buffer
*/
const bufferData = await getBufferFromJSON({
instanceManager: this.instanceManager,
json,
tableName,
logger: this.logger,
onEvent: this.onEvent,
metadata: jsonData.metadata,
});

/**
* Register the buffer in the file manager
*/
await this.registerFileBuffer({
buffer: bufferData,
tableName,
...fileData,
});
}

async getFileBuffer(fileName: string): Promise<Uint8Array | undefined> {
// Retrieve file data from IndexedDB
const fileData = await this.indexedDB.files.get(fileName);

return fileData?.buffer;
}

async getFilesNameForTables(
tables: TableConfig[]
): Promise<TableWiseFiles[]> {
const tableNames = tables.map((table) => table.name);

const tableData = (await this.indexedDB.tablesKey.bulkGet(tableNames))
.filter(isDefined)
.reduce((tableObj, table) => {
tableObj[table.tableName] = table;
return tableObj;
}, {} as { [key: string]: Table });

return tables.map((table) => ({
tableName: table.name,
files: getFilesByPartition(
tableData[table.name]?.files ?? [],
table.partitions
),
}));
}

async mountFileBufferByTables(tables: TableConfig[]): Promise<void> {
override async mountFileBufferByTables(tables: TableConfig[]): Promise<void> {
// no-op
}

async getTableData(table: TableConfig): Promise<Table | undefined> {
const tableData = await this.indexedDB.tablesKey.get(table.name);

if (!tableData) return undefined;

return {
...tableData,
files: getFilesByPartition(tableData?.files ?? [], table.partitions),
};
}

async setTableMetadata(tableName: string, metadata: object): Promise<void> {
await this.indexedDB.tablesKey.update(tableName, {
metadata,
});
}

async dropFilesByTableName(
tableName: string,
fileNames: string[]
): Promise<void> {
const tableData = await this.indexedDB.tablesKey.get(tableName);

if (tableData) {
// Retrieve the files that are not dropped
const updatedFiles = tableData.files.filter(
(file) => !fileNames.includes(file.fileName)
);

await this.indexedDB.tablesKey.put({
tableName,
files: updatedFiles,
});
}

// Remove the files from the IndexedDB
await this.indexedDB.files.bulkDelete(fileNames);
}

async onDBShutdownHandler() {
this.fileRegisterer.flushFileCache();
}
}
Loading

0 comments on commit 3e491b3

Please sign in to comment.