From 558780f6b02f5fa6342cc8161637f021dd1014f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Fri, 16 Aug 2024 23:04:01 -0700 Subject: [PATCH 1/8] Exit isolate on ProtocolError --- lib/src/embedded/compilation_dispatcher.dart | 48 +++++++------------- lib/src/embedded/host_callable.dart | 1 - lib/src/embedded/isolate_dispatcher.dart | 12 ++++- lib/src/embedded/reusable_isolate.dart | 5 +- 4 files changed, 30 insertions(+), 36 deletions(-) diff --git a/lib/src/embedded/compilation_dispatcher.dart b/lib/src/embedded/compilation_dispatcher.dart index 8f69b2553..aa51a90f1 100644 --- a/lib/src/embedded/compilation_dispatcher.dart +++ b/lib/src/embedded/compilation_dispatcher.dart @@ -51,24 +51,18 @@ final class CompilationDispatcher { /// This is used in outgoing messages. late Uint8List _compilationIdVarint; - /// Whether we detected a [ProtocolError] while parsing an incoming response. - /// - /// If we have, we don't want to send the final compilation result because - /// it'll just be a wrapper around the error. - var _requestError = false; - /// Creates a [CompilationDispatcher] that receives encoded protocol buffers /// through [_mailbox] and sends them through [_sendPort]. CompilationDispatcher(this._mailbox, this._sendPort); /// Listens for incoming `CompileRequests` and runs their compilations. void listen() { - do { + while (true) { Uint8List packet; try { packet = _mailbox.take(); } on StateError catch (_) { - break; + Isolate.exit(); } try { @@ -88,9 +82,7 @@ final class CompilationDispatcher { case InboundMessage_Message.compileRequest: var request = message.compileRequest; var response = _compile(request); - if (!_requestError) { - _send(OutboundMessage()..compileResponse = response); - } + _send(OutboundMessage()..compileResponse = response); case InboundMessage_Message.versionRequest: throw paramsError("VersionRequest must have compilation ID 0."); @@ -113,7 +105,7 @@ final class CompilationDispatcher { } catch (error, stackTrace) { _handleError(error, stackTrace); } - } while (!_requestError); + } } OutboundMessage_CompileResponse _compile( @@ -287,20 +279,14 @@ final class CompilationDispatcher { void sendLog(OutboundMessage_LogEvent event) => _send(OutboundMessage()..logEvent = event); - /// Sends [error] to the host. + /// Sends [error] to the host and exit. /// /// This is used during compilation by other classes like host callable. - /// Therefore it must set _requestError = true to prevent sending a CompileFailure after - /// sending a ProtocolError. - void sendError(ProtocolError error) { - _sendError(error); - _requestError = true; + Never sendError(ProtocolError error) { + Isolate.exit( + _sendPort, _serializePacket((OutboundMessage()..error = error))); } - /// Sends [error] to the host. - void _sendError(ProtocolError error) => - _send(OutboundMessage()..error = error); - InboundMessage_CanonicalizeResponse sendCanonicalizeRequest( OutboundMessage_CanonicalizeRequest request) => _sendRequest( @@ -330,10 +316,7 @@ final class CompilationDispatcher { try { packet = _mailbox.take(); } on StateError catch (_) { - // Compiler is shutting down, throw without calling `_handleError` as we - // don't want to report this as an actual error. - _requestError = true; - rethrow; + Isolate.exit(); } try { @@ -376,8 +359,6 @@ final class CompilationDispatcher { return response; } catch (error, stackTrace) { _handleError(error, stackTrace); - _requestError = true; - rethrow; } } @@ -385,12 +366,17 @@ final class CompilationDispatcher { /// /// The [messageId] indicate the IDs of the message being responded to, if /// available. - void _handleError(Object error, StackTrace stackTrace, {int? messageId}) { - _sendError(handleError(error, stackTrace, messageId: messageId)); + Never _handleError(Object error, StackTrace stackTrace, {int? messageId}) { + sendError(handleError(error, stackTrace, messageId: messageId)); } /// Sends [message] to the host with the given [wireId]. void _send(OutboundMessage message) { + _sendPort.send(_serializePacket(message)); + } + + /// Serialize [message] to [Uint8List]. + Uint8List _serializePacket(OutboundMessage message) { var protobufWriter = CodedBufferWriter(); message.writeToCodedBufferWriter(protobufWriter); @@ -407,6 +393,6 @@ final class CompilationDispatcher { }; packet.setAll(1, _compilationIdVarint); protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length); - _sendPort.send(packet); + return packet; } } diff --git a/lib/src/embedded/host_callable.dart b/lib/src/embedded/host_callable.dart index 06c3acb66..95e15221a 100644 --- a/lib/src/embedded/host_callable.dart +++ b/lib/src/embedded/host_callable.dart @@ -53,7 +53,6 @@ Callable hostCallable( } } on ProtocolError catch (error, stackTrace) { dispatcher.sendError(handleError(error, stackTrace)); - throw error.message; } }); return callable; diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index 1fd9877d9..86726f740 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -27,7 +27,7 @@ class IsolateDispatcher { /// All isolates that have been spawned to dispatch to. /// /// Only used for cleaning up the process when the underlying channel closes. - final _allIsolates = StreamController(); + final _allIsolates = StreamController(sync: true); /// The isolates that aren't currently running compilations final _inactiveIsolates = {}; @@ -43,6 +43,9 @@ class IsolateDispatcher { /// See https://github.com/sass/dart-sass/pull/2019 final _isolatePool = Pool(sizeOf() <= 4 ? 7 : 15); + /// Whether the stdin has been closed or not. + bool _closed = false; + IsolateDispatcher(this._channel); void listen() { @@ -56,6 +59,12 @@ class IsolateDispatcher { if (compilationId != 0) { var isolate = await _activeIsolates.putIfAbsent( compilationId, () => _getIsolate(compilationId!)); + + // The shutdown may have started by the time the isolate is spawned + if (_closed) { + return; + } + try { isolate.send(packet); return; @@ -88,6 +97,7 @@ class IsolateDispatcher { }, onError: (Object error, StackTrace stackTrace) { _handleError(error, stackTrace); }, onDone: () { + _closed = true; _allIsolates.stream.listen((isolate) => isolate.kill()); }); } diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart index 5cdd6bbd8..d3743fc4b 100644 --- a/lib/src/embedded/reusable_isolate.dart +++ b/lib/src/embedded/reusable_isolate.dart @@ -118,12 +118,11 @@ class ReusableIsolate { /// Shuts down the isolate. void kill() { - _isolate.kill(); - _receivePort.close(); - // If the isolate is blocking on [Mailbox.take], it won't even process a // kill event, so we closed the mailbox to nofity and wake it up. _mailbox.close(); + _isolate.kill(priority: Isolate.immediate); + _receivePort.close(); } } From 0fb5183a43150ff8660be3ea19b34bd349fe7b29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Sun, 18 Aug 2024 11:06:06 -0700 Subject: [PATCH 2/8] Remove intermediate StreamController --- lib/src/embedded/isolate_dispatcher.dart | 50 +++++++++----- lib/src/embedded/reusable_isolate.dart | 84 +++++------------------- 2 files changed, 52 insertions(+), 82 deletions(-) diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index 86726f740..f0d508060 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -115,25 +115,43 @@ class IsolateDispatcher { } else { var future = ReusableIsolate.spawn(_isolateMain); isolate = await future; + isolate.receivePort.listen((message) { + assert(isolate.borrowed, + "Shouldn't receive a message before being borrowed."); + + var fullBuffer = message as Uint8List; + + // The first byte of messages from isolates indicates whether the entire + // compilation is finished (1) or if it encountered an error (2). Sending + // this as part of the message buffer rather than a separate message + // avoids a race condition where the host might send a new compilation + // request with the same ID as one that just finished before the + // [IsolateDispatcher] receives word that the isolate with that ID is + // done. See sass/dart-sass#2004. + var category = fullBuffer[0]; + var packet = Uint8List.sublistView(fullBuffer, 1); + + switch (category) { + case 0: + _channel.sink.add(packet); + case 1: + _activeIsolates.remove(compilationId); + _inactiveIsolates.add(isolate); + _channel.sink.add(packet); + isolate.release(); + case 2: + _activeIsolates.remove(compilationId); + _channel.sink.add(packet); + _channel.sink.close(); + isolate.release(); + } + }, onError: (Object error, StackTrace stackTrace) { + _handleError(error, stackTrace); + }); _allIsolates.add(isolate); } - isolate.checkOut().listen(_channel.sink.add, - onError: (Object error, StackTrace stackTrace) { - if (error is ProtocolError) { - // Protocol errors have already been through [_handleError] in the child - // isolate, so we just send them as-is and close out the underlying - // channel. - sendError(compilationId, error); - _channel.sink.close(); - } else { - _handleError(error, stackTrace); - } - }, onDone: () { - _activeIsolates.remove(compilationId); - _inactiveIsolates.add(isolate); - resource.release(); - }); + isolate.borrow(resource); return isolate; } diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart index d3743fc4b..b15cea800 100644 --- a/lib/src/embedded/reusable_isolate.dart +++ b/lib/src/embedded/reusable_isolate.dart @@ -8,8 +8,7 @@ import 'dart:typed_data'; import 'package:native_synchronization/mailbox.dart'; import 'package:native_synchronization/sendable.dart'; -import 'embedded_sass.pb.dart'; -import 'utils.dart'; +import 'package:pool/pool.dart'; /// The entrypoint for a [ReusableIsolate]. /// @@ -32,16 +31,12 @@ class ReusableIsolate { /// The [ReceivePort] that receives messages from the wrapped isolate. final ReceivePort _receivePort; + ReceivePort get receivePort => _receivePort; - /// The subscription to [_port]. - final StreamSubscription _subscription; + /// The [PoolResource] used to track whether this isolate is being used. + PoolResource? _resource; - /// Whether [checkOut] has been called and the returned stream has not yet - /// closed. - bool _checkedOut = false; - - ReusableIsolate._(this._isolate, this._mailbox, this._receivePort) - : _subscription = _receivePort.listen(_defaultOnData); + ReusableIsolate._(this._isolate, this._mailbox, this._receivePort); /// Spawns a [ReusableIsolate] that runs the given [entryPoint]. static Future spawn( @@ -53,58 +48,20 @@ class ReusableIsolate { return ReusableIsolate._(isolate, mailbox, receivePort); } - /// Checks out this isolate and returns a stream of messages from it. - /// - /// This isolate is considered "checked out" until the returned stream - /// completes. While checked out, messages may be sent to the isolate using - /// [send]. - /// - /// Throws a [StateError] if this is called while the isolate is already - /// checked out. - Stream checkOut() { - if (_checkedOut) { - throw StateError( - "Can't call ResuableIsolate.checkOut until the previous stream has " - "completed."); - } - _checkedOut = true; - - var controller = StreamController(sync: true); - - _subscription.onData((message) { - var fullBuffer = message as Uint8List; - - // The first byte of messages from isolates indicates whether the entire - // compilation is finished (1) or if it encountered an error (2). Sending - // this as part of the message buffer rather than a separate message - // avoids a race condition where the host might send a new compilation - // request with the same ID as one that just finished before the - // [IsolateDispatcher] receives word that the isolate with that ID is - // done. See sass/dart-sass#2004. - var category = fullBuffer[0]; - var packet = Uint8List.sublistView(fullBuffer, 1); + /// Whether this isolate is in use + bool get borrowed => _resource != null; - if (category == 2) { - // Parse out the compilation ID and surface the [ProtocolError] as an - // error. This allows the [IsolateDispatcher] to notice that an error - // has occurred and close out the underlying channel. - var (_, buffer) = parsePacket(packet); - controller.addError(OutboundMessage.fromBuffer(buffer).error); - return; - } - - controller.sink.add(packet); - if (category == 1) { - _checkedOut = false; - _subscription.onData(_defaultOnData); - _subscription.onError(null); - controller.close(); - } - }); - - _subscription.onError(controller.addError); + /// Request this isolate as part of a pool and mark it as in use. + void borrow(PoolResource resource) { + assert(!borrowed, 'ReusableIsolate has already been borrowed.'); + _resource = resource; + } - return controller.stream; + /// Release this isolate from the pool. + void release() { + assert(borrowed, 'ReusableIsolate has not been borrowed.'); + _resource!.release(); + _resource = null; } /// Sends [message] to the isolate. @@ -113,6 +70,7 @@ class ReusableIsolate { /// out, or if a second message is sent before the isolate has processed the /// first one. void send(Uint8List message) { + assert(borrowed, 'Cannot send a message before being borrowed'); _mailbox.put(message); } @@ -126,12 +84,6 @@ class ReusableIsolate { } } -/// The default handler for data events from the wrapped isolate when it's not -/// checked out. -void _defaultOnData(Object? _) { - throw StateError("Shouldn't receive a message before being checked out."); -} - void _isolateMain( (ReusableIsolateEntryPoint, Sendable, SendPort) message) { var (entryPoint, sendableMailbox, sendPort) = message; From a044ace34035aee2f72aeaf577b98329f090aba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Sun, 18 Aug 2024 22:21:17 -0700 Subject: [PATCH 3/8] Aggressively shutdown on ProtocolError --- lib/src/embedded/isolate_dispatcher.dart | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index f0d508060..d9e046c16 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -4,6 +4,7 @@ import 'dart:async'; import 'dart:ffi'; +import 'dart:io'; import 'dart:isolate'; import 'dart:typed_data'; @@ -140,10 +141,8 @@ class IsolateDispatcher { _channel.sink.add(packet); isolate.release(); case 2: - _activeIsolates.remove(compilationId); _channel.sink.add(packet); - _channel.sink.close(); - isolate.release(); + exit(exitCode); } }, onError: (Object error, StackTrace stackTrace) { _handleError(error, stackTrace); From c3a937e90166838cb9917af606413e9430508cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Mon, 19 Aug 2024 14:18:45 -0700 Subject: [PATCH 4/8] Apply suggestions from code review Co-authored-by: Natalie Weizenbaum --- lib/src/embedded/compilation_dispatcher.dart | 3 +-- lib/src/embedded/isolate_dispatcher.dart | 8 +++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/src/embedded/compilation_dispatcher.dart b/lib/src/embedded/compilation_dispatcher.dart index aa51a90f1..6506bfa94 100644 --- a/lib/src/embedded/compilation_dispatcher.dart +++ b/lib/src/embedded/compilation_dispatcher.dart @@ -283,8 +283,7 @@ final class CompilationDispatcher { /// /// This is used during compilation by other classes like host callable. Never sendError(ProtocolError error) { - Isolate.exit( - _sendPort, _serializePacket((OutboundMessage()..error = error))); + Isolate.exit(_sendPort, _serializePacket(OutboundMessage()..error = error)); } InboundMessage_CanonicalizeResponse sendCanonicalizeRequest( diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index d9e046c16..927c99dc5 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -44,8 +44,8 @@ class IsolateDispatcher { /// See https://github.com/sass/dart-sass/pull/2019 final _isolatePool = Pool(sizeOf() <= 4 ? 7 : 15); - /// Whether the stdin has been closed or not. - bool _closed = false; + /// Whether [_channel] has been closed or not. + var _closed = false; IsolateDispatcher(this._channel); @@ -62,9 +62,7 @@ class IsolateDispatcher { compilationId, () => _getIsolate(compilationId!)); // The shutdown may have started by the time the isolate is spawned - if (_closed) { - return; - } + if (_closed) return; try { isolate.send(packet); From 71342048deed447dd38788973623ff7dca0b42fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Mon, 19 Aug 2024 14:27:54 -0700 Subject: [PATCH 5/8] Extract a common _receive() method and add comment for Isolate.exit() --- lib/src/embedded/compilation_dispatcher.dart | 28 +++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/lib/src/embedded/compilation_dispatcher.dart b/lib/src/embedded/compilation_dispatcher.dart index 6506bfa94..da149e04d 100644 --- a/lib/src/embedded/compilation_dispatcher.dart +++ b/lib/src/embedded/compilation_dispatcher.dart @@ -58,15 +58,8 @@ final class CompilationDispatcher { /// Listens for incoming `CompileRequests` and runs their compilations. void listen() { while (true) { - Uint8List packet; try { - packet = _mailbox.take(); - } on StateError catch (_) { - Isolate.exit(); - } - - try { - var (compilationId, messageBuffer) = parsePacket(packet); + var (compilationId, messageBuffer) = parsePacket(_receive()); _compilationId = compilationId; _compilationIdVarint = serializeVarint(compilationId); @@ -311,16 +304,9 @@ final class CompilationDispatcher { message.id = _outboundRequestId; _send(message); - Uint8List packet; - try { - packet = _mailbox.take(); - } on StateError catch (_) { - Isolate.exit(); - } - try { var messageBuffer = - Uint8List.sublistView(packet, _compilationIdVarint.length); + Uint8List.sublistView(_receive(), _compilationIdVarint.length); InboundMessage message; try { @@ -394,4 +380,14 @@ final class CompilationDispatcher { protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length); return packet; } + + /// Receive a packet from the host. + Uint8List _receive() { + try { + return _mailbox.take(); + } on StateError catch (_) { + // [_mailbox] has been closed, exit the current isolate + Isolate.exit(); + } + } } From e6239add9155a18cde52c70ff34d942d7171b460 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Mon, 19 Aug 2024 14:37:38 -0700 Subject: [PATCH 6/8] Convert assertions to throw StateError --- lib/src/embedded/isolate_dispatcher.dart | 6 ++++-- lib/src/embedded/reusable_isolate.dart | 12 +++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index 927c99dc5..eb89630bd 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -115,8 +115,10 @@ class IsolateDispatcher { var future = ReusableIsolate.spawn(_isolateMain); isolate = await future; isolate.receivePort.listen((message) { - assert(isolate.borrowed, - "Shouldn't receive a message before being borrowed."); + if (!isolate.borrowed) { + throw StateError( + "Shouldn't receive a message before being borrowed."); + } var fullBuffer = message as Uint8List; diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart index b15cea800..1d824f956 100644 --- a/lib/src/embedded/reusable_isolate.dart +++ b/lib/src/embedded/reusable_isolate.dart @@ -53,13 +53,17 @@ class ReusableIsolate { /// Request this isolate as part of a pool and mark it as in use. void borrow(PoolResource resource) { - assert(!borrowed, 'ReusableIsolate has already been borrowed.'); + if (borrowed) { + throw StateError('ReusableIsolate has already been borrowed.'); + } _resource = resource; } /// Release this isolate from the pool. void release() { - assert(borrowed, 'ReusableIsolate has not been borrowed.'); + if (!borrowed) { + throw StateError('ReusableIsolate has not been borrowed.'); + } _resource!.release(); _resource = null; } @@ -70,7 +74,9 @@ class ReusableIsolate { /// out, or if a second message is sent before the isolate has processed the /// first one. void send(Uint8List message) { - assert(borrowed, 'Cannot send a message before being borrowed'); + if (!borrowed) { + throw StateError('Cannot send a message before being borrowed'); + } _mailbox.put(message); } From 31cd86f3f334faa730dbf38035f8e0b8320fecfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Mon, 19 Aug 2024 16:09:10 -0700 Subject: [PATCH 7/8] Update comment --- lib/src/embedded/compilation_dispatcher.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/src/embedded/compilation_dispatcher.dart b/lib/src/embedded/compilation_dispatcher.dart index da149e04d..a8c84c8eb 100644 --- a/lib/src/embedded/compilation_dispatcher.dart +++ b/lib/src/embedded/compilation_dispatcher.dart @@ -386,7 +386,8 @@ final class CompilationDispatcher { try { return _mailbox.take(); } on StateError catch (_) { - // [_mailbox] has been closed, exit the current isolate + // The [_mailbox] has been closed, exit the current isolate immediately + // to avoid bubble the error up as [SassException] during [_sendRequest]. Isolate.exit(); } } From f8c2ffdb72474dce7a6142a2a3cb34a624bd1c80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Mon, 19 Aug 2024 18:36:32 -0700 Subject: [PATCH 8/8] Subscribe on each borrow --- lib/src/embedded/isolate_dispatcher.dart | 64 +++++++++++------------- lib/src/embedded/reusable_isolate.dart | 55 +++++++++++--------- 2 files changed, 61 insertions(+), 58 deletions(-) diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index eb89630bd..fe79f034c 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -112,45 +112,41 @@ class IsolateDispatcher { isolate = _inactiveIsolates.first; _inactiveIsolates.remove(isolate); } else { - var future = ReusableIsolate.spawn(_isolateMain); - isolate = await future; - isolate.receivePort.listen((message) { - if (!isolate.borrowed) { - throw StateError( - "Shouldn't receive a message before being borrowed."); - } - - var fullBuffer = message as Uint8List; - - // The first byte of messages from isolates indicates whether the entire - // compilation is finished (1) or if it encountered an error (2). Sending - // this as part of the message buffer rather than a separate message - // avoids a race condition where the host might send a new compilation - // request with the same ID as one that just finished before the - // [IsolateDispatcher] receives word that the isolate with that ID is - // done. See sass/dart-sass#2004. - var category = fullBuffer[0]; - var packet = Uint8List.sublistView(fullBuffer, 1); - - switch (category) { - case 0: - _channel.sink.add(packet); - case 1: - _activeIsolates.remove(compilationId); - _inactiveIsolates.add(isolate); - _channel.sink.add(packet); - isolate.release(); - case 2: - _channel.sink.add(packet); - exit(exitCode); - } - }, onError: (Object error, StackTrace stackTrace) { + var future = ReusableIsolate.spawn(_isolateMain, + onError: (Object error, StackTrace stackTrace) { _handleError(error, stackTrace); }); + isolate = await future; _allIsolates.add(isolate); } - isolate.borrow(resource); + isolate.borrow((message) { + var fullBuffer = message as Uint8List; + + // The first byte of messages from isolates indicates whether the entire + // compilation is finished (1) or if it encountered an error (2). Sending + // this as part of the message buffer rather than a separate message + // avoids a race condition where the host might send a new compilation + // request with the same ID as one that just finished before the + // [IsolateDispatcher] receives word that the isolate with that ID is + // done. See sass/dart-sass#2004. + var category = fullBuffer[0]; + var packet = Uint8List.sublistView(fullBuffer, 1); + + switch (category) { + case 0: + _channel.sink.add(packet); + case 1: + _activeIsolates.remove(compilationId); + isolate.release(); + _inactiveIsolates.add(isolate); + resource.release(); + _channel.sink.add(packet); + case 2: + _channel.sink.add(packet); + exit(exitCode); + } + }); return isolate; } diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart index 1d824f956..3e15bf978 100644 --- a/lib/src/embedded/reusable_isolate.dart +++ b/lib/src/embedded/reusable_isolate.dart @@ -8,7 +8,6 @@ import 'dart:typed_data'; import 'package:native_synchronization/mailbox.dart'; import 'package:native_synchronization/sendable.dart'; -import 'package:pool/pool.dart'; /// The entrypoint for a [ReusableIsolate]. /// @@ -31,51 +30,53 @@ class ReusableIsolate { /// The [ReceivePort] that receives messages from the wrapped isolate. final ReceivePort _receivePort; - ReceivePort get receivePort => _receivePort; - /// The [PoolResource] used to track whether this isolate is being used. - PoolResource? _resource; + /// The subscription to [_receivePort]. + final StreamSubscription _subscription; - ReusableIsolate._(this._isolate, this._mailbox, this._receivePort); + /// Whether the current isolate has been borrowed. + bool _borrowed = false; + + ReusableIsolate._(this._isolate, this._mailbox, this._receivePort, + {Function? onError}) + : _subscription = _receivePort.listen(_defaultOnData, onError: onError); /// Spawns a [ReusableIsolate] that runs the given [entryPoint]. - static Future spawn( - ReusableIsolateEntryPoint entryPoint) async { + static Future spawn(ReusableIsolateEntryPoint entryPoint, + {Function? onError}) async { var mailbox = Mailbox(); var receivePort = ReceivePort(); var isolate = await Isolate.spawn( _isolateMain, (entryPoint, mailbox.asSendable, receivePort.sendPort)); - return ReusableIsolate._(isolate, mailbox, receivePort); + return ReusableIsolate._(isolate, mailbox, receivePort, onError: onError); } - /// Whether this isolate is in use - bool get borrowed => _resource != null; - - /// Request this isolate as part of a pool and mark it as in use. - void borrow(PoolResource resource) { - if (borrowed) { + /// Subscribe to messages from [_receivePort]. + void borrow(void onData(dynamic event)?) { + if (_borrowed) { throw StateError('ReusableIsolate has already been borrowed.'); } - _resource = resource; + _borrowed = true; + _subscription.onData(onData); } - /// Release this isolate from the pool. + /// Unsubscribe to messages from [_receivePort]. void release() { - if (!borrowed) { + if (!_borrowed) { throw StateError('ReusableIsolate has not been borrowed.'); } - _resource!.release(); - _resource = null; + _borrowed = false; + _subscription.onData(_defaultOnData); } /// Sends [message] to the isolate. /// - /// Throws a [StateError] if this is called while the isolate isn't checked - /// out, or if a second message is sent before the isolate has processed the - /// first one. + /// Throws a [StateError] if this is called while the isolate isn't borrowed, + /// or if a second message is sent before the isolate has processed the first + /// one. void send(Uint8List message) { - if (!borrowed) { - throw StateError('Cannot send a message before being borrowed'); + if (!_borrowed) { + throw StateError('Cannot send a message before being borrowed.'); } _mailbox.put(message); } @@ -90,6 +91,12 @@ class ReusableIsolate { } } +/// The default handler for data events from the wrapped isolate when it's not +/// borrowed. +void _defaultOnData(dynamic _) { + throw StateError("Shouldn't receive a message before being borrowed."); +} + void _isolateMain( (ReusableIsolateEntryPoint, Sendable, SendPort) message) { var (entryPoint, sendableMailbox, sendPort) = message;