diff --git a/lib/src/embedded/compilation_dispatcher.dart b/lib/src/embedded/compilation_dispatcher.dart index 8f69b2553..a8c84c8eb 100644 --- a/lib/src/embedded/compilation_dispatcher.dart +++ b/lib/src/embedded/compilation_dispatcher.dart @@ -51,28 +51,15 @@ final class CompilationDispatcher { /// This is used in outgoing messages. late Uint8List _compilationIdVarint; - /// Whether we detected a [ProtocolError] while parsing an incoming response. - /// - /// If we have, we don't want to send the final compilation result because - /// it'll just be a wrapper around the error. - var _requestError = false; - /// Creates a [CompilationDispatcher] that receives encoded protocol buffers /// through [_mailbox] and sends them through [_sendPort]. CompilationDispatcher(this._mailbox, this._sendPort); /// Listens for incoming `CompileRequests` and runs their compilations. void listen() { - do { - Uint8List packet; - try { - packet = _mailbox.take(); - } on StateError catch (_) { - break; - } - + while (true) { try { - var (compilationId, messageBuffer) = parsePacket(packet); + var (compilationId, messageBuffer) = parsePacket(_receive()); _compilationId = compilationId; _compilationIdVarint = serializeVarint(compilationId); @@ -88,9 +75,7 @@ final class CompilationDispatcher { case InboundMessage_Message.compileRequest: var request = message.compileRequest; var response = _compile(request); - if (!_requestError) { - _send(OutboundMessage()..compileResponse = response); - } + _send(OutboundMessage()..compileResponse = response); case InboundMessage_Message.versionRequest: throw paramsError("VersionRequest must have compilation ID 0."); @@ -113,7 +98,7 @@ final class CompilationDispatcher { } catch (error, stackTrace) { _handleError(error, stackTrace); } - } while (!_requestError); + } } OutboundMessage_CompileResponse _compile( @@ -287,20 +272,13 @@ final class CompilationDispatcher { void sendLog(OutboundMessage_LogEvent event) => _send(OutboundMessage()..logEvent = event); - /// Sends [error] to the host. + /// Sends [error] to the host and exit. /// /// This is used during compilation by other classes like host callable. - /// Therefore it must set _requestError = true to prevent sending a CompileFailure after - /// sending a ProtocolError. - void sendError(ProtocolError error) { - _sendError(error); - _requestError = true; + Never sendError(ProtocolError error) { + Isolate.exit(_sendPort, _serializePacket(OutboundMessage()..error = error)); } - /// Sends [error] to the host. - void _sendError(ProtocolError error) => - _send(OutboundMessage()..error = error); - InboundMessage_CanonicalizeResponse sendCanonicalizeRequest( OutboundMessage_CanonicalizeRequest request) => _sendRequest( @@ -326,19 +304,9 @@ final class CompilationDispatcher { message.id = _outboundRequestId; _send(message); - Uint8List packet; - try { - packet = _mailbox.take(); - } on StateError catch (_) { - // Compiler is shutting down, throw without calling `_handleError` as we - // don't want to report this as an actual error. - _requestError = true; - rethrow; - } - try { var messageBuffer = - Uint8List.sublistView(packet, _compilationIdVarint.length); + Uint8List.sublistView(_receive(), _compilationIdVarint.length); InboundMessage message; try { @@ -376,8 +344,6 @@ final class CompilationDispatcher { return response; } catch (error, stackTrace) { _handleError(error, stackTrace); - _requestError = true; - rethrow; } } @@ -385,12 +351,17 @@ final class CompilationDispatcher { /// /// The [messageId] indicate the IDs of the message being responded to, if /// available. - void _handleError(Object error, StackTrace stackTrace, {int? messageId}) { - _sendError(handleError(error, stackTrace, messageId: messageId)); + Never _handleError(Object error, StackTrace stackTrace, {int? messageId}) { + sendError(handleError(error, stackTrace, messageId: messageId)); } /// Sends [message] to the host with the given [wireId]. void _send(OutboundMessage message) { + _sendPort.send(_serializePacket(message)); + } + + /// Serialize [message] to [Uint8List]. + Uint8List _serializePacket(OutboundMessage message) { var protobufWriter = CodedBufferWriter(); message.writeToCodedBufferWriter(protobufWriter); @@ -407,6 +378,17 @@ final class CompilationDispatcher { }; packet.setAll(1, _compilationIdVarint); protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length); - _sendPort.send(packet); + return packet; + } + + /// Receive a packet from the host. + Uint8List _receive() { + try { + return _mailbox.take(); + } on StateError catch (_) { + // The [_mailbox] has been closed, exit the current isolate immediately + // to avoid bubble the error up as [SassException] during [_sendRequest]. + Isolate.exit(); + } } } diff --git a/lib/src/embedded/host_callable.dart b/lib/src/embedded/host_callable.dart index 06c3acb66..95e15221a 100644 --- a/lib/src/embedded/host_callable.dart +++ b/lib/src/embedded/host_callable.dart @@ -53,7 +53,6 @@ Callable hostCallable( } } on ProtocolError catch (error, stackTrace) { dispatcher.sendError(handleError(error, stackTrace)); - throw error.message; } }); return callable; diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index 1fd9877d9..fe79f034c 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -4,6 +4,7 @@ import 'dart:async'; import 'dart:ffi'; +import 'dart:io'; import 'dart:isolate'; import 'dart:typed_data'; @@ -27,7 +28,7 @@ class IsolateDispatcher { /// All isolates that have been spawned to dispatch to. /// /// Only used for cleaning up the process when the underlying channel closes. - final _allIsolates = StreamController(); + final _allIsolates = StreamController(sync: true); /// The isolates that aren't currently running compilations final _inactiveIsolates = {}; @@ -43,6 +44,9 @@ class IsolateDispatcher { /// See https://github.com/sass/dart-sass/pull/2019 final _isolatePool = Pool(sizeOf() <= 4 ? 7 : 15); + /// Whether [_channel] has been closed or not. + var _closed = false; + IsolateDispatcher(this._channel); void listen() { @@ -56,6 +60,10 @@ class IsolateDispatcher { if (compilationId != 0) { var isolate = await _activeIsolates.putIfAbsent( compilationId, () => _getIsolate(compilationId!)); + + // The shutdown may have started by the time the isolate is spawned + if (_closed) return; + try { isolate.send(packet); return; @@ -88,6 +96,7 @@ class IsolateDispatcher { }, onError: (Object error, StackTrace stackTrace) { _handleError(error, stackTrace); }, onDone: () { + _closed = true; _allIsolates.stream.listen((isolate) => isolate.kill()); }); } @@ -103,26 +112,40 @@ class IsolateDispatcher { isolate = _inactiveIsolates.first; _inactiveIsolates.remove(isolate); } else { - var future = ReusableIsolate.spawn(_isolateMain); + var future = ReusableIsolate.spawn(_isolateMain, + onError: (Object error, StackTrace stackTrace) { + _handleError(error, stackTrace); + }); isolate = await future; _allIsolates.add(isolate); } - isolate.checkOut().listen(_channel.sink.add, - onError: (Object error, StackTrace stackTrace) { - if (error is ProtocolError) { - // Protocol errors have already been through [_handleError] in the child - // isolate, so we just send them as-is and close out the underlying - // channel. - sendError(compilationId, error); - _channel.sink.close(); - } else { - _handleError(error, stackTrace); + isolate.borrow((message) { + var fullBuffer = message as Uint8List; + + // The first byte of messages from isolates indicates whether the entire + // compilation is finished (1) or if it encountered an error (2). Sending + // this as part of the message buffer rather than a separate message + // avoids a race condition where the host might send a new compilation + // request with the same ID as one that just finished before the + // [IsolateDispatcher] receives word that the isolate with that ID is + // done. See sass/dart-sass#2004. + var category = fullBuffer[0]; + var packet = Uint8List.sublistView(fullBuffer, 1); + + switch (category) { + case 0: + _channel.sink.add(packet); + case 1: + _activeIsolates.remove(compilationId); + isolate.release(); + _inactiveIsolates.add(isolate); + resource.release(); + _channel.sink.add(packet); + case 2: + _channel.sink.add(packet); + exit(exitCode); } - }, onDone: () { - _activeIsolates.remove(compilationId); - _inactiveIsolates.add(isolate); - resource.release(); }); return isolate; diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart index 5cdd6bbd8..3e15bf978 100644 --- a/lib/src/embedded/reusable_isolate.dart +++ b/lib/src/embedded/reusable_isolate.dart @@ -8,8 +8,6 @@ import 'dart:typed_data'; import 'package:native_synchronization/mailbox.dart'; import 'package:native_synchronization/sendable.dart'; -import 'embedded_sass.pb.dart'; -import 'utils.dart'; /// The entrypoint for a [ReusableIsolate]. /// @@ -33,104 +31,70 @@ class ReusableIsolate { /// The [ReceivePort] that receives messages from the wrapped isolate. final ReceivePort _receivePort; - /// The subscription to [_port]. - final StreamSubscription _subscription; + /// The subscription to [_receivePort]. + final StreamSubscription _subscription; - /// Whether [checkOut] has been called and the returned stream has not yet - /// closed. - bool _checkedOut = false; + /// Whether the current isolate has been borrowed. + bool _borrowed = false; - ReusableIsolate._(this._isolate, this._mailbox, this._receivePort) - : _subscription = _receivePort.listen(_defaultOnData); + ReusableIsolate._(this._isolate, this._mailbox, this._receivePort, + {Function? onError}) + : _subscription = _receivePort.listen(_defaultOnData, onError: onError); /// Spawns a [ReusableIsolate] that runs the given [entryPoint]. - static Future spawn( - ReusableIsolateEntryPoint entryPoint) async { + static Future spawn(ReusableIsolateEntryPoint entryPoint, + {Function? onError}) async { var mailbox = Mailbox(); var receivePort = ReceivePort(); var isolate = await Isolate.spawn( _isolateMain, (entryPoint, mailbox.asSendable, receivePort.sendPort)); - return ReusableIsolate._(isolate, mailbox, receivePort); + return ReusableIsolate._(isolate, mailbox, receivePort, onError: onError); } - /// Checks out this isolate and returns a stream of messages from it. - /// - /// This isolate is considered "checked out" until the returned stream - /// completes. While checked out, messages may be sent to the isolate using - /// [send]. - /// - /// Throws a [StateError] if this is called while the isolate is already - /// checked out. - Stream checkOut() { - if (_checkedOut) { - throw StateError( - "Can't call ResuableIsolate.checkOut until the previous stream has " - "completed."); + /// Subscribe to messages from [_receivePort]. + void borrow(void onData(dynamic event)?) { + if (_borrowed) { + throw StateError('ReusableIsolate has already been borrowed.'); + } + _borrowed = true; + _subscription.onData(onData); + } + + /// Unsubscribe to messages from [_receivePort]. + void release() { + if (!_borrowed) { + throw StateError('ReusableIsolate has not been borrowed.'); } - _checkedOut = true; - - var controller = StreamController(sync: true); - - _subscription.onData((message) { - var fullBuffer = message as Uint8List; - - // The first byte of messages from isolates indicates whether the entire - // compilation is finished (1) or if it encountered an error (2). Sending - // this as part of the message buffer rather than a separate message - // avoids a race condition where the host might send a new compilation - // request with the same ID as one that just finished before the - // [IsolateDispatcher] receives word that the isolate with that ID is - // done. See sass/dart-sass#2004. - var category = fullBuffer[0]; - var packet = Uint8List.sublistView(fullBuffer, 1); - - if (category == 2) { - // Parse out the compilation ID and surface the [ProtocolError] as an - // error. This allows the [IsolateDispatcher] to notice that an error - // has occurred and close out the underlying channel. - var (_, buffer) = parsePacket(packet); - controller.addError(OutboundMessage.fromBuffer(buffer).error); - return; - } - - controller.sink.add(packet); - if (category == 1) { - _checkedOut = false; - _subscription.onData(_defaultOnData); - _subscription.onError(null); - controller.close(); - } - }); - - _subscription.onError(controller.addError); - - return controller.stream; + _borrowed = false; + _subscription.onData(_defaultOnData); } /// Sends [message] to the isolate. /// - /// Throws a [StateError] if this is called while the isolate isn't checked - /// out, or if a second message is sent before the isolate has processed the - /// first one. + /// Throws a [StateError] if this is called while the isolate isn't borrowed, + /// or if a second message is sent before the isolate has processed the first + /// one. void send(Uint8List message) { + if (!_borrowed) { + throw StateError('Cannot send a message before being borrowed.'); + } _mailbox.put(message); } /// Shuts down the isolate. void kill() { - _isolate.kill(); - _receivePort.close(); - // If the isolate is blocking on [Mailbox.take], it won't even process a // kill event, so we closed the mailbox to nofity and wake it up. _mailbox.close(); + _isolate.kill(priority: Isolate.immediate); + _receivePort.close(); } } /// The default handler for data events from the wrapped isolate when it's not -/// checked out. -void _defaultOnData(Object? _) { - throw StateError("Shouldn't receive a message before being checked out."); +/// borrowed. +void _defaultOnData(dynamic _) { + throw StateError("Shouldn't receive a message before being borrowed."); } void _isolateMain(