Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: opfs support issues #1973

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ if(EMSCRIPTEN)
_malloc, \
_calloc, \
_free, \
stringToUTF8, \
lengthBytesUTF8, \
stackAlloc, \
_duckdb_web_clear_response, \
_duckdb_web_collect_file_stats, \
_duckdb_web_connect, \
Expand Down
23 changes: 20 additions & 3 deletions lib/src/webdb_api.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <cstring>
#include <exception>
#include <iostream>
#include <stdexcept>

Expand Down Expand Up @@ -93,10 +95,25 @@ void duckdb_web_fs_drop_file(WASMResponse* packed, const char* file_name) {
GET_WEBDB(*packed);
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(file_name));
}
/// Drop a file
void duckdb_web_fs_drop_files(WASMResponse* packed) {
/// Drop a files
void duckdb_web_fs_drop_files(WASMResponse* packed, const char** names, int name_count) {
GET_WEBDB(*packed);
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
if (name_count == 0 || names == NULL) {
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
} else {
for (int i = 0; i < name_count; i++) {
const char* name = names[i];
if (name == NULL) {
std::cerr << "Error: NULL pointer detected at index " << i << std::endl;
continue;
}
if (std::strlen(name) == 0) {
std::cerr << "Error: Empty string detected at index " << i << std::endl;
continue;
}
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(name));
}
}
}
/// Glob file infos
void duckdb_web_fs_glob_file_infos(WASMResponse* packed, const char* file_name) {
Expand Down
62 changes: 51 additions & 11 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
}
}
return handle;
return handle;
}
/** Register a file object URL async */
public async registerFileHandleAsync<HandleType>(
Expand Down Expand Up @@ -583,12 +583,52 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
dropResponseBuffers(this.mod);
}
/** Drop files */
public dropFiles(): void {
const [s, d, n] = callSRet(this.mod, 'duckdb_web_fs_drop_files', [], []);
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
public dropFiles(names?:string[]): void {
const pointers:number[] = [];
let pointerOfArray:number = -1;
try {
for (const str of (names ?? [])) {
if (str !== null && str !== undefined && str.length > 0) {
const size = this.mod.lengthBytesUTF8(str) + 1;
const ret = this.mod._malloc(size);
if (!ret) {
throw new Error(`Failed to allocate memory for string: ${str}`);
}
this.mod.stringToUTF8(str, ret, size);
pointers.push(ret);
}
}
pointerOfArray = this.mod._malloc(pointers.length * 4);
if (!pointerOfArray) {
throw new Error(`Failed to allocate memory for pointers array`);
}
for (let i = 0; i < pointers.length; i++) {
this.mod.HEAP32[(pointerOfArray >> 2) + i] = pointers[i];
}
const [s, d, n] = callSRet(
this.mod,
'duckdb_web_fs_drop_files',
[
'number',
'number'
],
[
pointerOfArray,
pointers.length
]
);
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
}
dropResponseBuffers(this.mod);
} finally {
for (const pointer of pointers) {
this.mod._free(pointer);
}
if( pointerOfArray > 0 ){
this.mod._free(pointerOfArray);
}
}
dropResponseBuffers(this.mod);
}
/** Flush all files */
public flushFiles(): void {
Expand All @@ -615,11 +655,11 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
return copy;
}
/** Enable tracking of file statistics */
public registerOPFSFileName(file: string): Promise<void> {
if (file.startsWith("opfs://")) {
return this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS);
} else {
throw new Error("Not an OPFS file name: " + file);
public async registerOPFSFileName(file: string): Promise<void> {
if (file.startsWith("opfs://")) {
return await this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS);
} else {
throw new Error("Not an OPFS file name: " + file);
}
}
public collectFileStatistics(file: string, enable: boolean): void {
Expand Down
4 changes: 2 additions & 2 deletions packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ export interface DuckDBBindings {
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
globFiles(path: string): WebFile[];
dropFile(name: string): void;
dropFiles(): void;
dropFiles(names?: string[]): void;
flushFiles(): void;
copyFileToPath(name: string, path: string): void;
copyFileToBuffer(name: string): Uint8Array;
registerOPFSFileName(file: string): void;
registerOPFSFileName(file: string): Promise<void>;
collectFileStatistics(file: string, enable: boolean): void;
exportFileStatistics(file: string): FileStatistics;
}
13 changes: 13 additions & 0 deletions packages/duckdb-wasm/src/bindings/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ export interface DuckDBFilesystemConfig {
allowFullHTTPReads?: boolean;
}

export interface DuckDBOPFSConfig {
/**
* Defines how `opfs://` files are handled during SQL execution.
* - "auto": Automatically register `opfs://` files and drop them after execution.
* - "manual": Files must be manually registered and dropped.
*/
fileHandling?: "auto" | "manual";
}

export enum DuckDBAccessMode {
UNDEFINED = 0,
AUTOMATIC = 1,
Expand Down Expand Up @@ -70,4 +79,8 @@ export interface DuckDBConfig {
* Custom user agent string
*/
customUserAgent?: string;
/**
* opfs string
*/
opfs?: DuckDBOPFSConfig;
}
2 changes: 2 additions & 0 deletions packages/duckdb-wasm/src/bindings/duckdb_module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export interface DuckDBModule extends EmscriptenModule {
stackSave: typeof stackSave;
stackAlloc: typeof stackAlloc;
stackRestore: typeof stackRestore;
lengthBytesUTF8: typeof lengthBytesUTF8;
stringToUTF8: typeof stringToUTF8;

ccall: typeof ccall;
PThread: PThread;
Expand Down
9 changes: 6 additions & 3 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
let fileName = opfsPath;
if (PATH_SEP_REGEX.test(opfsPath)) {
const folders = opfsPath.split(PATH_SEP_REGEX);
fileName = folders.pop()!;
if (folders.length === 0) {
throw new Error(`Invalid path ${opfsPath}`);
}
fileName = folders[folders.length - 1];
if (!fileName) {
throw new Error(`Invalid path ${path}`);
throw new Error(`Invalid path ${opfsPath}. File Not Found.`);
}
// mkdir -p
folders.pop();
for (const folder of folders) {
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true });
}
Expand Down
72 changes: 69 additions & 3 deletions packages/duckdb-wasm/src/parallel/async_bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { InstantiationProgress } from '../bindings/progress';
import { arrowToSQLField } from '../json_typedef';
import { WebFile } from '../bindings/web_file';
import { DuckDBDataProtocol } from '../bindings';
import { searchOPFSFiles, isOPFSProtocol } from "../utils/opfs_util";

const TEXT_ENCODER = new TextEncoder();

Expand Down Expand Up @@ -45,6 +46,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
protected _nextMessageId = 0;
/** The pending requests */
protected _pendingRequests: Map<number, WorkerTaskVariant> = new Map();
/** The DuckDBConfig */
protected _config: DuckDBConfig = {};

constructor(logger: Logger, worker: Worker | null = null) {
this._logger = logger;
Expand All @@ -59,6 +62,11 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
return this._logger;
}

/** Get the logger */
public get config(): DuckDBConfig {
return this._config;
}

/** Attach to worker */
protected attach(worker: Worker): void {
this._worker = worker;
Expand Down Expand Up @@ -100,7 +108,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
transfer: ArrayBuffer[] = [],
): Promise<WorkerTaskReturnType<W>> {
if (!this._worker) {
console.error('cannot send a message since the worker is not set!');
console.error('cannot send a message since the worker is not set!:' + task.type+"," + task.data);
return undefined as any;
}
const mid = this._nextMessageId++;
Expand Down Expand Up @@ -317,8 +325,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
return await this.postTask(task);
}
/** Try to drop files */
public async dropFiles(): Promise<null> {
const task = new WorkerTask<WorkerRequestType.DROP_FILES, null, null>(WorkerRequestType.DROP_FILES, null);
public async dropFiles(names?: string[]): Promise<null> {
const task = new WorkerTask<WorkerRequestType.DROP_FILES, string[] | undefined, null>(WorkerRequestType.DROP_FILES, names);
return await this.postTask(task);
}
/** Flush all files */
Expand Down Expand Up @@ -360,6 +368,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {

/** Open a new database */
public async open(config: DuckDBConfig): Promise<void> {
this._config = config;
const task = new WorkerTask<WorkerRequestType.OPEN, DuckDBConfig, null>(WorkerRequestType.OPEN, config);
await this.postTask(task);
}
Expand Down Expand Up @@ -394,6 +403,21 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {

/** Run a query */
public async runQuery(conn: ConnectionID, text: string): Promise<Uint8Array> {
if( this.shouldOPFSFileHandling() ){
const files = await this.registerOPFSFileFromSQL(text);
try {
return await this._runQueryAsync(conn, text);
} finally {
if( files.length > 0 ){
await this.dropFiles(files);
}
}
} else {
return await this._runQueryAsync(conn, text);
}
}

private async _runQueryAsync(conn: ConnectionID, text: string): Promise<Uint8Array> {
const task = new WorkerTask<WorkerRequestType.RUN_QUERY, [ConnectionID, string], Uint8Array>(
WorkerRequestType.RUN_QUERY,
[conn, text],
Expand All @@ -406,6 +430,25 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
conn: ConnectionID,
text: string,
allowStreamResult: boolean = false,
): Promise<Uint8Array | null> {
if( this.shouldOPFSFileHandling() ){
const files = await this.registerOPFSFileFromSQL(text);
try {
return await this._startPendingQueryAsync(conn, text, allowStreamResult);
} finally {
if( files.length > 0 ){
await this.dropFiles(files);
}
}
} else {
return await this._startPendingQueryAsync(conn, text, allowStreamResult);
}
}

private async _startPendingQueryAsync(
conn: ConnectionID,
text: string,
allowStreamResult: boolean = false,
): Promise<Uint8Array | null> {
const task = new WorkerTask<
WorkerRequestType.START_PENDING_QUERY,
Expand All @@ -414,6 +457,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
>(WorkerRequestType.START_PENDING_QUERY, [conn, text, allowStreamResult]);
return await this.postTask(task);
}

/** Poll a pending query */
public async pollPendingQuery(conn: ConnectionID): Promise<Uint8Array | null> {
const task = new WorkerTask<WorkerRequestType.POLL_PENDING_QUERY, ConnectionID, Uint8Array | null>(
Expand Down Expand Up @@ -647,4 +691,26 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
);
await this.postTask(task);
}

private shouldOPFSFileHandling():boolean {
if( isOPFSProtocol(this.config.path ?? "")){
return this.config.opfs?.fileHandling == "auto";
}
return false;
}

private async registerOPFSFileFromSQL(text: string) {
const files = searchOPFSFiles(text);
const result: string[] = [];
for (const file of files) {
try {
await this.registerOPFSFileName(file);
result.push(file);
} catch (e) {
console.error(e);
throw new Error("File Not found:" + file);
}
}
return result;
}
}
3 changes: 3 additions & 0 deletions packages/duckdb-wasm/src/parallel/async_bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export interface AsyncDuckDBBindings {
insertArrowFromIPCStream(conn: number, buffer: Uint8Array, options?: CSVInsertOptions): Promise<void>;
insertCSVFromPath(conn: number, path: string, options: CSVInsertOptions): Promise<void>;
insertJSONFromPath(conn: number, path: string, options: JSONInsertOptions): Promise<void>;

dropFile(name: string):Promise<null>;
dropFiles(names?: string[]):Promise<null>;
}
4 changes: 2 additions & 2 deletions packages/duckdb-wasm/src/parallel/worker_dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
this.sendOK(request);
break;
case WorkerRequestType.DROP_FILES:
this._bindings.dropFiles();
this._bindings.dropFiles(request.data);
this.sendOK(request);
break;
case WorkerRequestType.FLUSH_FILES:
Expand Down Expand Up @@ -361,7 +361,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
break;

case WorkerRequestType.REGISTER_OPFS_FILE_NAME:
this._bindings.registerOPFSFileName(request.data[0]);
await this._bindings.registerOPFSFileName(request.data[0]);
this.sendOK(request);
break;

Expand Down
4 changes: 2 additions & 2 deletions packages/duckdb-wasm/src/parallel/worker_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export type WorkerRequestVariant =
| WorkerRequest<WorkerRequestType.CREATE_PREPARED, [ConnectionID, string]>
| WorkerRequest<WorkerRequestType.DISCONNECT, number>
| WorkerRequest<WorkerRequestType.DROP_FILE, string>
| WorkerRequest<WorkerRequestType.DROP_FILES, null>
| WorkerRequest<WorkerRequestType.DROP_FILES, string[] | undefined>
| WorkerRequest<WorkerRequestType.EXPORT_FILE_STATISTICS, string>
| WorkerRequest<WorkerRequestType.FETCH_QUERY_RESULTS, number>
| WorkerRequest<WorkerRequestType.FLUSH_FILES, null>
Expand Down Expand Up @@ -176,7 +176,7 @@ export type WorkerTaskVariant =
| WorkerTask<WorkerRequestType.CREATE_PREPARED, [number, string], number>
| WorkerTask<WorkerRequestType.DISCONNECT, ConnectionID, null>
| WorkerTask<WorkerRequestType.DROP_FILE, string, null>
| WorkerTask<WorkerRequestType.DROP_FILES, null, null>
| WorkerTask<WorkerRequestType.DROP_FILES, string[] | undefined, null>
| WorkerTask<WorkerRequestType.EXPORT_FILE_STATISTICS, string, FileStatistics>
| WorkerTask<WorkerRequestType.FETCH_QUERY_RESULTS, ConnectionID, Uint8Array>
| WorkerTask<WorkerRequestType.FLUSH_FILES, null, null>
Expand Down
10 changes: 10 additions & 0 deletions packages/duckdb-wasm/src/utils/opfs_util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const REGEX_OPFS_FILE = /'(opfs:\/\/\S*?)'/g;
export const REGEX_OPFS_PROTOCOL = /(opfs:\/\/\S*?)/g;

export function isOPFSProtocol(path: string): boolean {
return path.search(REGEX_OPFS_PROTOCOL) > -1;
}

export function searchOPFSFiles(text: string) {
return [...text.matchAll(REGEX_OPFS_FILE)].map(match => match[1]);
}
Loading
Loading