Skip to content

Commit

Permalink
Fix flaky isolate shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
ntkme committed Aug 17, 2024
1 parent 9d45fad commit 73111db
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 35 deletions.
48 changes: 17 additions & 31 deletions lib/src/embedded/compilation_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,18 @@ final class CompilationDispatcher {
/// This is used in outgoing messages.
late Uint8List _compilationIdVarint;

/// Whether we detected a [ProtocolError] while parsing an incoming response.
///
/// If we have, we don't want to send the final compilation result because
/// it'll just be a wrapper around the error.
var _requestError = false;

/// Creates a [CompilationDispatcher] that receives encoded protocol buffers
/// through [_mailbox] and sends them through [_sendPort].
CompilationDispatcher(this._mailbox, this._sendPort);

/// Listens for incoming `CompileRequests` and runs their compilations.
void listen() {
do {
while (true) {
Uint8List packet;
try {
packet = _mailbox.take();
} on StateError catch (_) {
break;
Isolate.exit();
}

try {
Expand All @@ -88,9 +82,7 @@ final class CompilationDispatcher {
case InboundMessage_Message.compileRequest:
var request = message.compileRequest;
var response = _compile(request);
if (!_requestError) {
_send(OutboundMessage()..compileResponse = response);
}
_send(OutboundMessage()..compileResponse = response);

case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");
Expand All @@ -113,7 +105,7 @@ final class CompilationDispatcher {
} catch (error, stackTrace) {
_handleError(error, stackTrace);
}
} while (!_requestError);
}
}

OutboundMessage_CompileResponse _compile(
Expand Down Expand Up @@ -287,20 +279,14 @@ final class CompilationDispatcher {
void sendLog(OutboundMessage_LogEvent event) =>
_send(OutboundMessage()..logEvent = event);

/// Sends [error] to the host.
/// Sends [error] to the host and exit.
///
/// This is used during compilation by other classes like host callable.
/// Therefore it must set _requestError = true to prevent sending a CompileFailure after
/// sending a ProtocolError.
void sendError(ProtocolError error) {
_sendError(error);
_requestError = true;
Never sendError(ProtocolError error) {
Isolate.exit(
_sendPort, _serializePacket((OutboundMessage()..error = error)));
}

/// Sends [error] to the host.
void _sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
Expand Down Expand Up @@ -330,10 +316,7 @@ final class CompilationDispatcher {
try {
packet = _mailbox.take();
} on StateError catch (_) {
// Compiler is shutting down, throw without calling `_handleError` as we
// don't want to report this as an actual error.
_requestError = true;
rethrow;
Isolate.exit();
}

try {
Expand Down Expand Up @@ -376,21 +359,24 @@ final class CompilationDispatcher {
return response;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
_requestError = true;
rethrow;
}
}

/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if
/// available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
_sendError(handleError(error, stackTrace, messageId: messageId));
Never _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
}

/// Sends [message] to the host with the given [wireId].
void _send(OutboundMessage message) {
_sendPort.send(_serializePacket(message));
}

/// Serialize [message] to [Uint8List].
Uint8List _serializePacket(OutboundMessage message) {
var protobufWriter = CodedBufferWriter();
message.writeToCodedBufferWriter(protobufWriter);

Expand All @@ -407,6 +393,6 @@ final class CompilationDispatcher {
};
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_sendPort.send(packet);
return packet;
}
}
1 change: 0 additions & 1 deletion lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ Callable hostCallable(
}
} on ProtocolError catch (error, stackTrace) {
dispatcher.sendError(handleError(error, stackTrace));
throw error.message;
}
});
return callable;
Expand Down
5 changes: 2 additions & 3 deletions lib/src/embedded/reusable_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ class ReusableIsolate {

/// Shuts down the isolate.
void kill() {
_isolate.kill();
_receivePort.close();

// If the isolate is blocking on [Mailbox.take], it won't even process a
// kill event, so we closed the mailbox to nofity and wake it up.
_mailbox.close();
_isolate.kill(priority: Isolate.immediate);
_receivePort.close();
}
}

Expand Down

0 comments on commit 73111db

Please sign in to comment.