Skip to content

Commit

Permalink
Merge pull request #93 from UnCor3/main
Browse files Browse the repository at this point in the history
  • Loading branch information
james-pre authored Jul 29, 2024
2 parents baf1311 + 8c214dd commit 0f01299
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 17 deletions.
21 changes: 18 additions & 3 deletions src/backends/port/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import { File } from '../../file.js';
import { Async, FileSystem, type FileSystemMetadata } from '../../filesystem.js';
import { Stats, type FileType } from '../../stats.js';
import { InMemory } from '../memory.js';
import type { Backend } from '../backend.js';
import type { Backend, FilesystemOf } from '../backend.js';
import * as RPC from './rpc.js';
import { type MountConfiguration, resolveMountConfig } from '../../config.js';

type FileMethods = Omit<ExtractProperties<File, (...args: any[]) => Promise<any>>, typeof Symbol.asyncDispose>;
type FileMethod = keyof FileMethods;
Expand Down Expand Up @@ -223,9 +224,15 @@ let nextFd = 0;

const descriptors: Map<number, File> = new Map();

type FileOrFSRequest = FSRequest | FileRequest;
/**
* @internal
*/
export type FileOrFSRequest = FSRequest | FileRequest;

async function handleRequest(port: RPC.Port, fs: FileSystem, request: FileOrFSRequest): Promise<void> {
/**
* @internal
*/
export async function handleRequest(port: RPC.Port, fs: FileSystem, request: FileOrFSRequest): Promise<void> {
if (!RPC.isMessage(request)) {
return;
}
Expand Down Expand Up @@ -308,3 +315,11 @@ export const Port = {
return new PortFS(options);
},
} satisfies Backend<PortFS, RPC.Options>;

export async function resolveRemoteMount<T extends Backend>(port: RPC.Port, config: MountConfiguration<T>, _depth = 0): Promise<FilesystemOf<T>> {
const stopAndReplay = RPC.catchMessages(port);
const fs = await resolveMountConfig(config, _depth);
attachFS(port, fs);
stopAndReplay(fs);
return fs;
}
19 changes: 7 additions & 12 deletions src/backends/port/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,29 @@ await configure({
Worker:

```ts
import { InMemory, resolveMountConfig } from '@zenfs/core';
import { attachFS } from '@zenfs/port';
import { InMemory, resolveRemoteMount, attachFS } from '@zenfs/core';
import { parentPort } from 'node:worker_threads';

const tmpfs = await resolveMountConfig({ backend: InMemory, name: 'tmp' });
attachFS(parentPort, tmpfs);
await resolveRemoteMount(parentPort, { backend: InMemory, name: 'tmp' });
```

If you are using using web workers, you would use `self` instead of importing `parentPort` in the worker, and would not need to import `Worker` in the main thread.

#### Using with multiple ports on the same thread

```ts
import { InMemory, fs, resolveMountConfig } from '@zenfs/core';
import { Port, attachFS } from '@zenfs/port';
import { InMemory, fs, resolveMountConfig, resolveRemoteMount, Port } from '@zenfs/core';
import { MessageChannel } from 'node:worker_threads';

const { port1, port2 } = new MessageChannel();
const { port1: localPort, port2: remotePort } = new MessageChannel();

const tmpfs = await resolveMountConfig({ backend: InMemory, name: 'tmp' });
attachFS(port2, tmpfs);
fs.mount('/port', await resolveMountConfig({ backend: Port, port: port1 }));
console.log('/port');
fs.mount('/remote', await resolveRemoteMount(remotePort, { backend: InMemory, name: 'tmp' }));
fs.mount('/port', await resolveMountConfig({ backend: Port, port: localPort }));

const content = 'FS is in a port';

await fs.promises.writeFile('/port/test', content);

fs.readFileSync('/tmp/test', 'utf8'); // FS is in a port
fs.readFileSync('/remote/test', 'utf8'); // FS is in a port
await fs.promises.readFile('/port/test', 'utf8'); // FS is in a port
```
17 changes: 16 additions & 1 deletion src/backends/port/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Errno, ErrnoError, type ErrnoErrorJSON } from '../../error.js';
import { PortFile, type PortFS } from './fs.js';
import type { Backend, FilesystemOf } from '../backend.js';
import { handleRequest, PortFile, type PortFS } from './fs.js';
import type { FileOrFSRequest } from './fs.js';

type _MessageEvent<T = any> = T | { data: T };

Expand Down Expand Up @@ -148,3 +150,16 @@ export function detach<T extends Message>(port: Port, handler: (message: T) => u
handler('data' in message ? message.data : message);
});
}

export function catchMessages<T extends Backend>(port: Port): (fs: FilesystemOf<T>) => void {
const events: _MessageEvent[] = [];
const handler = events.push.bind(events);
attach(port, handler);
return function (fs: any) {
detach(port, handler);
for (const event of events) {
const request: FileOrFSRequest = 'data' in event ? event.data : event;
handleRequest(port, fs, request);
}
};
}
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export async function resolveMountConfig<T extends Backend>(config: MountConfigu
return mount;
}

type ConfigMounts = { [K in AbsolutePath]: Backend };
export type ConfigMounts = { [K in AbsolutePath]: Backend };

/**
* Configuration
Expand Down
40 changes: 40 additions & 0 deletions tests/port/config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import { Worker } from 'node:worker_threads';
import { Port } from '../../src/backends/port/fs.js';
import { configureSingle, fs } from '../../src/index.js';

const dir = dirname(fileURLToPath(import.meta.url));

let port: Worker;

try {
port = new Worker(dir + '/config.worker.js');
} catch (e) {
/* nothing */
}

describe('Remote FS with resolveRemoteMount', () => {
const content = 'FS is in a port';

test('Build exists for worker', () => {
expect(port).toBeDefined();
});

(port ? test : test.skip)('Configuration', async () => {
await configureSingle({ backend: Port, port, timeout: 300 });
});

(port ? test : test.skip)('Write', async () => {
await fs.promises.writeFile('/test', content);
});

(port ? test : test.skip)('Read', async () => {
expect(await fs.promises.readFile('/test', 'utf8')).toBe(content);
});

(port ? test : test.skip)('Cleanup', async () => {
await port.terminate();
port.unref();
});
});
5 changes: 5 additions & 0 deletions tests/port/config.worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { parentPort } from 'node:worker_threads';
import { resolveRemoteMount } from '../../dist/backends/port/fs.js';
import { InMemory } from '../../dist/backends/memory.js';

await resolveRemoteMount(parentPort, { backend: InMemory });

0 comments on commit 0f01299

Please sign in to comment.