From 73111db8ec87d1c2ed86fbb45e01e6e518e7f826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Fri, 16 Aug 2024 23:04:01 -0700 Subject: [PATCH] Fix flaky isolate shutdown --- lib/src/embedded/compilation_dispatcher.dart | 48 +++++++------------- lib/src/embedded/host_callable.dart | 1 - lib/src/embedded/reusable_isolate.dart | 5 +- 3 files changed, 19 insertions(+), 35 deletions(-) diff --git a/lib/src/embedded/compilation_dispatcher.dart b/lib/src/embedded/compilation_dispatcher.dart index 8f69b2553..aa51a90f1 100644 --- a/lib/src/embedded/compilation_dispatcher.dart +++ b/lib/src/embedded/compilation_dispatcher.dart @@ -51,24 +51,18 @@ final class CompilationDispatcher { /// This is used in outgoing messages. late Uint8List _compilationIdVarint; - /// Whether we detected a [ProtocolError] while parsing an incoming response. - /// - /// If we have, we don't want to send the final compilation result because - /// it'll just be a wrapper around the error. - var _requestError = false; - /// Creates a [CompilationDispatcher] that receives encoded protocol buffers /// through [_mailbox] and sends them through [_sendPort]. CompilationDispatcher(this._mailbox, this._sendPort); /// Listens for incoming `CompileRequests` and runs their compilations. void listen() { - do { + while (true) { Uint8List packet; try { packet = _mailbox.take(); } on StateError catch (_) { - break; + Isolate.exit(); } try { @@ -88,9 +82,7 @@ final class CompilationDispatcher { case InboundMessage_Message.compileRequest: var request = message.compileRequest; var response = _compile(request); - if (!_requestError) { - _send(OutboundMessage()..compileResponse = response); - } + _send(OutboundMessage()..compileResponse = response); case InboundMessage_Message.versionRequest: throw paramsError("VersionRequest must have compilation ID 0."); @@ -113,7 +105,7 @@ final class CompilationDispatcher { } catch (error, stackTrace) { _handleError(error, stackTrace); } - } while (!_requestError); + } } OutboundMessage_CompileResponse _compile( @@ -287,20 +279,14 @@ final class CompilationDispatcher { void sendLog(OutboundMessage_LogEvent event) => _send(OutboundMessage()..logEvent = event); - /// Sends [error] to the host. + /// Sends [error] to the host and exit. /// /// This is used during compilation by other classes like host callable. - /// Therefore it must set _requestError = true to prevent sending a CompileFailure after - /// sending a ProtocolError. - void sendError(ProtocolError error) { - _sendError(error); - _requestError = true; + Never sendError(ProtocolError error) { + Isolate.exit( + _sendPort, _serializePacket((OutboundMessage()..error = error))); } - /// Sends [error] to the host. - void _sendError(ProtocolError error) => - _send(OutboundMessage()..error = error); - InboundMessage_CanonicalizeResponse sendCanonicalizeRequest( OutboundMessage_CanonicalizeRequest request) => _sendRequest( @@ -330,10 +316,7 @@ final class CompilationDispatcher { try { packet = _mailbox.take(); } on StateError catch (_) { - // Compiler is shutting down, throw without calling `_handleError` as we - // don't want to report this as an actual error. - _requestError = true; - rethrow; + Isolate.exit(); } try { @@ -376,8 +359,6 @@ final class CompilationDispatcher { return response; } catch (error, stackTrace) { _handleError(error, stackTrace); - _requestError = true; - rethrow; } } @@ -385,12 +366,17 @@ final class CompilationDispatcher { /// /// The [messageId] indicate the IDs of the message being responded to, if /// available. - void _handleError(Object error, StackTrace stackTrace, {int? messageId}) { - _sendError(handleError(error, stackTrace, messageId: messageId)); + Never _handleError(Object error, StackTrace stackTrace, {int? messageId}) { + sendError(handleError(error, stackTrace, messageId: messageId)); } /// Sends [message] to the host with the given [wireId]. void _send(OutboundMessage message) { + _sendPort.send(_serializePacket(message)); + } + + /// Serialize [message] to [Uint8List]. + Uint8List _serializePacket(OutboundMessage message) { var protobufWriter = CodedBufferWriter(); message.writeToCodedBufferWriter(protobufWriter); @@ -407,6 +393,6 @@ final class CompilationDispatcher { }; packet.setAll(1, _compilationIdVarint); protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length); - _sendPort.send(packet); + return packet; } } diff --git a/lib/src/embedded/host_callable.dart b/lib/src/embedded/host_callable.dart index 06c3acb66..95e15221a 100644 --- a/lib/src/embedded/host_callable.dart +++ b/lib/src/embedded/host_callable.dart @@ -53,7 +53,6 @@ Callable hostCallable( } } on ProtocolError catch (error, stackTrace) { dispatcher.sendError(handleError(error, stackTrace)); - throw error.message; } }); return callable; diff --git a/lib/src/embedded/reusable_isolate.dart b/lib/src/embedded/reusable_isolate.dart index 5cdd6bbd8..d3743fc4b 100644 --- a/lib/src/embedded/reusable_isolate.dart +++ b/lib/src/embedded/reusable_isolate.dart @@ -118,12 +118,11 @@ class ReusableIsolate { /// Shuts down the isolate. void kill() { - _isolate.kill(); - _receivePort.close(); - // If the isolate is blocking on [Mailbox.take], it won't even process a // kill event, so we closed the mailbox to nofity and wake it up. _mailbox.close(); + _isolate.kill(priority: Isolate.immediate); + _receivePort.close(); } }