Skip to content

Commit

Permalink
Subscribe on each borrow
Browse files Browse the repository at this point in the history
  • Loading branch information
ntkme committed Aug 20, 2024
1 parent 924c916 commit ea383c7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 48 deletions.
60 changes: 26 additions & 34 deletions lib/src/embedded/isolate_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,35 @@ class IsolateDispatcher {
} else {
var future = ReusableIsolate.spawn(_isolateMain);
isolate = await future;
isolate.receivePort.listen((message) {
if (!isolate.borrowed) {
throw StateError(
"Shouldn't receive a message before being borrowed.");
}

var fullBuffer = message as Uint8List;

// The first byte of messages from isolates indicates whether the entire
// compilation is finished (1) or if it encountered an error (2). Sending
// this as part of the message buffer rather than a separate message
// avoids a race condition where the host might send a new compilation
// request with the same ID as one that just finished before the
// [IsolateDispatcher] receives word that the isolate with that ID is
// done. See sass/dart-sass#2004.
var category = fullBuffer[0];
var packet = Uint8List.sublistView(fullBuffer, 1);

switch (category) {
case 0:
_channel.sink.add(packet);
case 1:
_activeIsolates.remove(compilationId);
_inactiveIsolates.add(isolate);
_channel.sink.add(packet);
isolate.release();
case 2:
_channel.sink.add(packet);
exit(exitCode);
}
}, onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace);
});
_allIsolates.add(isolate);
}

isolate.borrow(resource);
isolate.borrow(resource, (message) {
var fullBuffer = message as Uint8List;

// The first byte of messages from isolates indicates whether the entire
// compilation is finished (1) or if it encountered an error (2). Sending
// this as part of the message buffer rather than a separate message
// avoids a race condition where the host might send a new compilation
// request with the same ID as one that just finished before the
// [IsolateDispatcher] receives word that the isolate with that ID is
// done. See sass/dart-sass#2004.
var category = fullBuffer[0];
var packet = Uint8List.sublistView(fullBuffer, 1);

switch (category) {
case 0:
_channel.sink.add(packet);
case 1:
_activeIsolates.remove(compilationId);
_inactiveIsolates.add(isolate);
_channel.sink.add(packet);
isolate.release();
case 2:
_channel.sink.add(packet);
exit(exitCode);
}
});

return isolate;
}
Expand Down
36 changes: 22 additions & 14 deletions lib/src/embedded/reusable_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ class ReusableIsolate {

/// The [ReceivePort] that receives messages from the wrapped isolate.
final ReceivePort _receivePort;
ReceivePort get receivePort => _receivePort;

/// The subscription to [_port].
final StreamSubscription<dynamic> _subscription;

/// The [PoolResource] used to track whether this isolate is being used.
PoolResource? _resource;

ReusableIsolate._(this._isolate, this._mailbox, this._receivePort);
ReusableIsolate._(this._isolate, this._mailbox, this._receivePort)
: _subscription = _receivePort.listen(_defaultOnData);

/// Spawns a [ReusableIsolate] that runs the given [entryPoint].
static Future<ReusableIsolate> spawn(
Expand All @@ -48,33 +51,32 @@ class ReusableIsolate {
return ReusableIsolate._(isolate, mailbox, receivePort);
}

/// Whether this isolate is in use
bool get borrowed => _resource != null;

/// Request this isolate as part of a pool and mark it as in use.
void borrow(PoolResource resource) {
if (borrowed) {
/// Request this isolate as part of a pool and subscribe to the [ReceivePort].
void borrow(PoolResource resource, void onData(dynamic event)?) {
if (_resource != null) {
throw StateError('ReusableIsolate has already been borrowed.');
}
_resource = resource;
_subscription.onData(onData);
}

/// Release this isolate from the pool.
/// Release this isolate from the pool and unsubscribe from the [ReceivePort]..
void release() {
if (!borrowed) {
if (_resource == null) {
throw StateError('ReusableIsolate has not been borrowed.');
}
_subscription.onData(_defaultOnData);
_resource!.release();
_resource = null;
}

/// Sends [message] to the isolate.
///
/// Throws a [StateError] if this is called while the isolate isn't checked
/// out, or if a second message is sent before the isolate has processed the
/// first one.
/// Throws a [StateError] if this is called while the isolate isn't borrowed,
/// or if a second message is sent before the isolate has processed the first
/// one.
void send(Uint8List message) {
if (!borrowed) {
if (_resource == null) {
throw StateError('Cannot send a message before being borrowed');
}
_mailbox.put(message);
Expand All @@ -90,6 +92,12 @@ class ReusableIsolate {
}
}

/// The default handler for data events from the wrapped isolate when it's not
/// borrowed.
void _defaultOnData(dynamic _) {
throw StateError("Shouldn't receive a message before being borrowed.");
}

void _isolateMain(
(ReusableIsolateEntryPoint, Sendable<Mailbox>, SendPort) message) {
var (entryPoint, sendableMailbox, sendPort) = message;
Expand Down

0 comments on commit ea383c7

Please sign in to comment.