Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Port message loss before attachFS is called #93

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/backends/port/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import { File } from '../../file.js';
import { Async, FileSystem, type FileSystemMetadata } from '../../filesystem.js';
import { Stats, type FileType } from '../../stats.js';
import { InMemory } from '../memory.js';
import type { Backend } from '../backend.js';
import type { Backend, FilesystemOf } from '../backend.js';
import * as RPC from './rpc.js';
import { type MountConfiguration, resolveMountConfig } from '../../config.js';

type FileMethods = Omit<ExtractProperties<File, (...args: any[]) => Promise<any>>, typeof Symbol.asyncDispose>;
type FileMethod = keyof FileMethods;
Expand Down Expand Up @@ -223,9 +224,15 @@ let nextFd = 0;

const descriptors: Map<number, File> = new Map();

type FileOrFSRequest = FSRequest | FileRequest;
/**
* @internal
*/
export type FileOrFSRequest = FSRequest | FileRequest;
james-pre marked this conversation as resolved.
Show resolved Hide resolved

async function handleRequest(port: RPC.Port, fs: FileSystem, request: FileOrFSRequest): Promise<void> {
/**
* @internal
*/
export async function handleRequest(port: RPC.Port, fs: FileSystem, request: FileOrFSRequest): Promise<void> {
james-pre marked this conversation as resolved.
Show resolved Hide resolved
if (!RPC.isMessage(request)) {
return;
}
Expand Down Expand Up @@ -308,3 +315,11 @@ export const Port = {
return new PortFS(options);
},
} satisfies Backend<PortFS, RPC.Options>;

export async function resolveRemoteMount<T extends Backend>(port: RPC.Port, config: MountConfiguration<T>, _depth = 0): Promise<FilesystemOf<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_depth is an argument used internally by resolveMountConfig to track recursive calls and avoid infinite loops. I don't think it should be used here, though it could be helpful for debugging.

const stopAndReplay = RPC.catchMessages(port);
const fs = await resolveMountConfig(config, _depth);
attachFS(port, fs);
stopAndReplay(fs);
return fs;
}
19 changes: 7 additions & 12 deletions src/backends/port/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,29 @@ await configure({
Worker:

```ts
import { InMemory, resolveMountConfig } from '@zenfs/core';
import { attachFS } from '@zenfs/port';
import { InMemory, resolveRemoteMount, attachFS } from '@zenfs/core';
import { parentPort } from 'node:worker_threads';

const tmpfs = await resolveMountConfig({ backend: InMemory, name: 'tmp' });
attachFS(parentPort, tmpfs);
await resolveRemoteMount(parentPort, { backend: InMemory, name: 'tmp' });
```

If you are using using web workers, you would use `self` instead of importing `parentPort` in the worker, and would not need to import `Worker` in the main thread.

#### Using with multiple ports on the same thread

```ts
import { InMemory, fs, resolveMountConfig } from '@zenfs/core';
import { Port, attachFS } from '@zenfs/port';
import { InMemory, fs, resolveMountConfig, resolveRemoteMount, Port } from '@zenfs/core';
import { MessageChannel } from 'node:worker_threads';

const { port1, port2 } = new MessageChannel();
const { port1: localPort, port2: remotePort } = new MessageChannel();

const tmpfs = await resolveMountConfig({ backend: InMemory, name: 'tmp' });
attachFS(port2, tmpfs);
fs.mount('/port', await resolveMountConfig({ backend: Port, port: port1 }));
console.log('/port');
fs.mount('/remote', await resolveRemoteMount(remotePort, { backend: InMemory, name: 'tmp' }));
fs.mount('/port', await resolveMountConfig({ backend: Port, port: localPort }));

const content = 'FS is in a port';

await fs.promises.writeFile('/port/test', content);

fs.readFileSync('/tmp/test', 'utf8'); // FS is in a port
fs.readFileSync('/remote/test', 'utf8'); // FS is in a port
await fs.promises.readFile('/port/test', 'utf8'); // FS is in a port
```
17 changes: 16 additions & 1 deletion src/backends/port/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Errno, ErrnoError, type ErrnoErrorJSON } from '../../error.js';
import { PortFile, type PortFS } from './fs.js';
import type { Backend, FilesystemOf } from '../backend.js';
import { handleRequest, PortFile, type PortFS } from './fs.js';
import type { FileOrFSRequest } from './fs.js';

type _MessageEvent<T = any> = T | { data: T };

Expand Down Expand Up @@ -148,3 +150,16 @@ export function detach<T extends Message>(port: Port, handler: (message: T) => u
handler('data' in message ? message.data : message);
});
}

export function catchMessages<T extends Backend>(port: Port): (fs: FilesystemOf<T>) => void {
const events: _MessageEvent[] = [];
const handler = events.push.bind(events);
attach(port, handler);
return function (fs: any) {
detach(port, handler);
for (const event of events) {
const request: FileOrFSRequest = 'data' in event ? event.data : event;
handleRequest(port, fs, request);
}
};
}
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export async function resolveMountConfig<T extends Backend>(config: MountConfigu
return mount;
}

type ConfigMounts = { [K in AbsolutePath]: Backend };
export type ConfigMounts = { [K in AbsolutePath]: Backend };

/**
* Configuration
Expand Down
40 changes: 40 additions & 0 deletions tests/port/config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import { Worker } from 'node:worker_threads';
import { Port } from '../../src/backends/port/fs.js';
import { configureSingle, fs } from '../../src/index.js';

const dir = dirname(fileURLToPath(import.meta.url));

let port: Worker;

try {
port = new Worker(dir + '/config.worker.js');
} catch (e) {
/* nothing */
}

describe('Remote FS with resolveRemoteMount', () => {
const content = 'FS is in a port';

test('Build exists for worker', () => {
expect(port).toBeDefined();
});

(port ? test : test.skip)('Configuration', async () => {
await configureSingle({ backend: Port, port, timeout: 300 });
});

(port ? test : test.skip)('Write', async () => {
await fs.promises.writeFile('/test', content);
});

(port ? test : test.skip)('Read', async () => {
expect(await fs.promises.readFile('/test', 'utf8')).toBe(content);
});

(port ? test : test.skip)('Cleanup', async () => {
await port.terminate();
port.unref();
});
});
5 changes: 5 additions & 0 deletions tests/port/config.worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { parentPort } from 'node:worker_threads';
import { resolveRemoteMount } from '../../dist/backends/port/fs.js';
import { InMemory } from '../../dist/backends/memory.js';

await resolveRemoteMount(parentPort, { backend: InMemory });
Loading