Skip to content

Commit

Permalink
Fix flaky shutdown (#2312)
Browse files Browse the repository at this point in the history
Co-authored-by: Natalie Weizenbaum <nweiz@google.com>
  • Loading branch information
ntkme and nex3 authored Aug 21, 2024
1 parent 9379d85 commit a236460
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 134 deletions.
72 changes: 27 additions & 45 deletions lib/src/embedded/compilation_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,15 @@ final class CompilationDispatcher {
/// This is used in outgoing messages.
late Uint8List _compilationIdVarint;

/// Whether we detected a [ProtocolError] while parsing an incoming response.
///
/// If we have, we don't want to send the final compilation result because
/// it'll just be a wrapper around the error.
var _requestError = false;

/// Creates a [CompilationDispatcher] that receives encoded protocol buffers
/// through [_mailbox] and sends them through [_sendPort].
CompilationDispatcher(this._mailbox, this._sendPort);

/// Listens for incoming `CompileRequests` and runs their compilations.
void listen() {
do {
Uint8List packet;
try {
packet = _mailbox.take();
} on StateError catch (_) {
break;
}

while (true) {
try {
var (compilationId, messageBuffer) = parsePacket(packet);
var (compilationId, messageBuffer) = parsePacket(_receive());

_compilationId = compilationId;
_compilationIdVarint = serializeVarint(compilationId);
Expand All @@ -88,9 +75,7 @@ final class CompilationDispatcher {
case InboundMessage_Message.compileRequest:
var request = message.compileRequest;
var response = _compile(request);
if (!_requestError) {
_send(OutboundMessage()..compileResponse = response);
}
_send(OutboundMessage()..compileResponse = response);

case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");
Expand All @@ -113,7 +98,7 @@ final class CompilationDispatcher {
} catch (error, stackTrace) {
_handleError(error, stackTrace);
}
} while (!_requestError);
}
}

OutboundMessage_CompileResponse _compile(
Expand Down Expand Up @@ -287,20 +272,13 @@ final class CompilationDispatcher {
void sendLog(OutboundMessage_LogEvent event) =>
_send(OutboundMessage()..logEvent = event);

/// Sends [error] to the host.
/// Sends [error] to the host and exit.
///
/// This is used during compilation by other classes like host callable.
/// Therefore it must set _requestError = true to prevent sending a CompileFailure after
/// sending a ProtocolError.
void sendError(ProtocolError error) {
_sendError(error);
_requestError = true;
Never sendError(ProtocolError error) {
Isolate.exit(_sendPort, _serializePacket(OutboundMessage()..error = error));
}

/// Sends [error] to the host.
void _sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
Expand All @@ -326,19 +304,9 @@ final class CompilationDispatcher {
message.id = _outboundRequestId;
_send(message);

Uint8List packet;
try {
packet = _mailbox.take();
} on StateError catch (_) {
// Compiler is shutting down, throw without calling `_handleError` as we
// don't want to report this as an actual error.
_requestError = true;
rethrow;
}

try {
var messageBuffer =
Uint8List.sublistView(packet, _compilationIdVarint.length);
Uint8List.sublistView(_receive(), _compilationIdVarint.length);

InboundMessage message;
try {
Expand Down Expand Up @@ -376,21 +344,24 @@ final class CompilationDispatcher {
return response;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
_requestError = true;
rethrow;
}
}

/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if
/// available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
_sendError(handleError(error, stackTrace, messageId: messageId));
Never _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
}

/// Sends [message] to the host with the given [wireId].
void _send(OutboundMessage message) {
_sendPort.send(_serializePacket(message));
}

/// Serialize [message] to [Uint8List].
Uint8List _serializePacket(OutboundMessage message) {
var protobufWriter = CodedBufferWriter();
message.writeToCodedBufferWriter(protobufWriter);

Expand All @@ -407,6 +378,17 @@ final class CompilationDispatcher {
};
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_sendPort.send(packet);
return packet;
}

/// Receive a packet from the host.
Uint8List _receive() {
try {
return _mailbox.take();
} on StateError catch (_) {
// The [_mailbox] has been closed, exit the current isolate immediately
// to avoid bubble the error up as [SassException] during [_sendRequest].
Isolate.exit();
}
}
}
1 change: 0 additions & 1 deletion lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ Callable hostCallable(
}
} on ProtocolError catch (error, stackTrace) {
dispatcher.sendError(handleError(error, stackTrace));
throw error.message;
}
});
return callable;
Expand Down
55 changes: 39 additions & 16 deletions lib/src/embedded/isolate_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import 'dart:async';
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';

Expand All @@ -27,7 +28,7 @@ class IsolateDispatcher {
/// All isolates that have been spawned to dispatch to.
///
/// Only used for cleaning up the process when the underlying channel closes.
final _allIsolates = StreamController<ReusableIsolate>();
final _allIsolates = StreamController<ReusableIsolate>(sync: true);

/// The isolates that aren't currently running compilations
final _inactiveIsolates = <ReusableIsolate>{};
Expand All @@ -43,6 +44,9 @@ class IsolateDispatcher {
/// See https://github.com/sass/dart-sass/pull/2019
final _isolatePool = Pool(sizeOf<IntPtr>() <= 4 ? 7 : 15);

/// Whether [_channel] has been closed or not.
var _closed = false;

IsolateDispatcher(this._channel);

void listen() {
Expand All @@ -56,6 +60,10 @@ class IsolateDispatcher {
if (compilationId != 0) {
var isolate = await _activeIsolates.putIfAbsent(
compilationId, () => _getIsolate(compilationId!));

// The shutdown may have started by the time the isolate is spawned
if (_closed) return;

try {
isolate.send(packet);
return;
Expand Down Expand Up @@ -88,6 +96,7 @@ class IsolateDispatcher {
}, onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace);
}, onDone: () {
_closed = true;
_allIsolates.stream.listen((isolate) => isolate.kill());
});
}
Expand All @@ -103,26 +112,40 @@ class IsolateDispatcher {
isolate = _inactiveIsolates.first;
_inactiveIsolates.remove(isolate);
} else {
var future = ReusableIsolate.spawn(_isolateMain);
var future = ReusableIsolate.spawn(_isolateMain,
onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace);
});
isolate = await future;
_allIsolates.add(isolate);
}

isolate.checkOut().listen(_channel.sink.add,
onError: (Object error, StackTrace stackTrace) {
if (error is ProtocolError) {
// Protocol errors have already been through [_handleError] in the child
// isolate, so we just send them as-is and close out the underlying
// channel.
sendError(compilationId, error);
_channel.sink.close();
} else {
_handleError(error, stackTrace);
isolate.borrow((message) {
var fullBuffer = message as Uint8List;

// The first byte of messages from isolates indicates whether the entire
// compilation is finished (1) or if it encountered an error (2). Sending
// this as part of the message buffer rather than a separate message
// avoids a race condition where the host might send a new compilation
// request with the same ID as one that just finished before the
// [IsolateDispatcher] receives word that the isolate with that ID is
// done. See sass/dart-sass#2004.
var category = fullBuffer[0];
var packet = Uint8List.sublistView(fullBuffer, 1);

switch (category) {
case 0:
_channel.sink.add(packet);
case 1:
_activeIsolates.remove(compilationId);
isolate.release();
_inactiveIsolates.add(isolate);
resource.release();
_channel.sink.add(packet);
case 2:
_channel.sink.add(packet);
exit(exitCode);
}
}, onDone: () {
_activeIsolates.remove(compilationId);
_inactiveIsolates.add(isolate);
resource.release();
});

return isolate;
Expand Down
Loading

0 comments on commit a236460

Please sign in to comment.