Skip to content

Commit

Permalink
stream documentation and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
turbocrime committed Mar 4, 2025
1 parent 0646432 commit e681747
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 98 deletions.
108 changes: 108 additions & 0 deletions packages/transport-chrome/docs/streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Streams

The classes `PortStreamSource` and `PortStreamSink` provide a way to stream jsonifiable values between documents through the chrome runtime.

In [the stream spec's model](https://streams.spec.whatwg.org/#model):

- the message channel is a Push source
- the Producer is the document which names the channel and listens for `onConnect`
- the Consumer is the document which recieves the channel name and calls `connect`

## Stream Lifecycle

The Producer must convey the channel name to the Consumer out-of-band.

All steps must be completely synchronous until an 'async' step, or task ordering may violate expectations.

### Initialization

1. Producer names the channel.
2. Producer attaches an `onConnect` handler for that name.
3. **async then Producer**
4. Producer transmits channel name to Consumer.
5. **async then Consumer**
6. Consumer receives the channel name.
7. **async then Consumer**
8. Consumer calls `connect` and obtains a port
9. Consumer constructs a `new PortStreamSource`
10. the `Source` attaches `onMessage`, `onDisconnect` handlers
11. _timeout clock begins for Source_
12. **async then Consumer**
13. Producer's `onConnect` activates and provides a port
14. Producer constructs a `new PortStreamSink`
15. the Sink attaches an `onDisconnect` handler
16. _timeout clock begins for Sink_

Enter any other phase.

### Streaming

1. Producer writes a chunk to Sink
2. _timeout clock is reset for Sink_
3. Sink posts a `StreamValue` message
4. **async then Consumer**
5. Source receives the `StreamValue` message
6. _timeout clock is reset for Source_
7. Source enqueues the chunk

Enter Streaming again or any Termination phase.

### Termination

#### Success: Producer finished

1. Producer finishes writing, and calls `PortStreamSink.close`.
2. Sink removes its `onDisconnect` handler
3. Sink posts a `StreamEnd` message
4. **async then Consumer**
5. Source receives the `StreamEnd` control
6. Source closes its controller
7. Source disconnects the channel

Complete.

#### Success: Consumer cancel

1. Consumer finishes reading, and calls `PortStreamSource.cancel`
2. Source disconnects the channel

Enter phase Success: Consumer disconnect

#### Success: Consumer disconnect

The consumer cancelled or was destroyed.

1. Producer Sink's `onDisconnect` handler is activated
2. Sink errors its controller with `Code.Canceled`

Complete.

#### Failure: Producer abort

1. Producer calls `PortStreamSink.abort`
2. Sink detaches its `onDisconnect` handler
3. Sink posts a `StreamAbort` message
4. **async then Consumer**
5. Consumer's Source recieves the `StreamAbort` control
6. Source disconnects the channel
7. Source errors its controller with `Code.Aborted`

Complete.

#### Failure: Producer disconnect

1. Consumer Source's `onDisconnect` handler is activated
2. Source errors its controller with `Code.Unavailable`

Complete.

#### Failure: Timeout

Either counterpart may time out.

1. a timeout clock expires
2. the stream controller is errored with `Code.DeadlineExceeded`
3. the channel is disconnected
4. **async then** counterpart
- **Producer** enters Success: Consumer disconnect
- **Consumer** enters Failure: Producer disconnect
3 changes: 2 additions & 1 deletion packages/transport-chrome/src/session-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
} from '@penumbra-zone/transport-dom/messages';
import { ChannelLabel, nameConnection } from './channel-names.js';
import { isTransportInitChannel, TransportInitChannel } from './message.js';
import { PortStreamSink, PortStreamSource } from './stream.js';
import { PortStreamSink } from './stream/sink.js';
import { PortStreamSource } from './stream/source.js';

const localErrorJson = (err: unknown, relevantMessage?: unknown) =>
err instanceof Error
Expand Down
3 changes: 2 additions & 1 deletion packages/transport-chrome/src/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { ConnectError } from '@connectrpc/connect';
import { errorToJson } from '@connectrpc/connect/protocol-connect';
import { ChannelLabel, nameConnection, parseConnectionName } from './channel-names.js';
import { isTransportInitChannel, TransportInitChannel } from './message.js';
import { PortStreamSink, PortStreamSource } from './stream.js';
import { PortStreamSink } from './stream/sink.js';
import { PortStreamSource } from './stream/source.js';
import { ChannelHandlerFn } from '@penumbra-zone/transport-dom/adapter';
import {
isTransportAbort,
Expand Down
95 changes: 0 additions & 95 deletions packages/transport-chrome/src/stream.ts

This file was deleted.

24 changes: 24 additions & 0 deletions packages/transport-chrome/src/stream/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { JsonValue } from '@bufbuild/protobuf';

// control message types below

export interface StreamValue {
value: JsonValue;
}

export interface StreamEnd {
done: true;
}

export interface StreamAbort {
abort: JsonValue;
}

export const isStreamValue = (s: unknown): s is StreamValue =>
s != null && typeof s === 'object' && 'value' in s;

export const isStreamEnd = (s: unknown): s is StreamEnd =>
s != null && typeof s === 'object' && 'done' in s && s.done === true;

export const isStreamAbort = (s: unknown): s is StreamAbort =>
s != null && typeof s === 'object' && 'abort' in s;
76 changes: 76 additions & 0 deletions packages/transport-chrome/src/stream/sink.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import type { JsonValue } from '@bufbuild/protobuf';
import { ConnectError, Code } from '@connectrpc/connect';
import { errorToJson } from '@connectrpc/connect/protocol-connect';
import { shouldDisconnect } from '../util/should-disconnect.js';
import { StreamValue, StreamEnd, StreamAbort } from './message.js';

/**
* Implements an UnderlyingSink that encapsulates an outgoing stream within a
* Chrome extension messaging channel.
*
* In the stream spec's model, this is used by the Producer that writes stream
* data to a Consumer. It handles the conversion of standard stream write
* operations into extension message activity.
*/
export class PortStreamSink implements UnderlyingSink<JsonValue> {
/** Abort controller for port disconnect */
private ac = new AbortController();

/** Typed method to post valued chunks through the port. */
private postChunk: (item: StreamValue) => void;

/** Typed method to post terminal chunks through the port. */
private postFinal: (item: StreamEnd | StreamAbort) => Promise<void>;

constructor(outgoing: chrome.runtime.Port) {
this.postChunk = item => outgoing.postMessage(item);

this.postFinal = async item => {
// disconnect is no longer a cancellation
outgoing.onDisconnect.removeListener(this.onDisconnect);
// set up expectation that the counterpart will disconnect
const termination = shouldDisconnect(outgoing);
// post the final message
outgoing.postMessage(item);
await termination.finally(
// ensure port disconnected
() => outgoing.disconnect(),
);

outgoing.onDisconnect.addListener(this.onDisconnect);
};

this.ac.signal.addEventListener('abort', () => {
if (globalThis.__DEV__) {
console.debug('PortStreamSink signal', this.ac.signal.reason);
}
outgoing.disconnect();
});
}

private onDisconnect = () => this.ac.abort(ConnectError.from('Sink disconnected', Code.Canceled));

/** This is part of UnderlyingSink. */
start(cont: WritableStreamDefaultController) {
this.ac.signal.throwIfAborted();
this.ac.signal.addEventListener('abort', () => cont.error(this.ac.signal.reason));
}

/** This is part of UnderlyingSink. */
write(value: JsonValue) {
this.ac.signal.throwIfAborted();
this.postChunk({ value });
}

/** This is part of UnderlyingSink. */
async close() {
await this.postFinal({ done: true });
}

/** This is part of UnderlyingSink. */
async abort(reason?: unknown) {
await this.postFinal({
abort: errorToJson(ConnectError.from(reason), undefined),
});
}
}
Loading

0 comments on commit e681747

Please sign in to comment.