Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky shutdown #2312

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 27 additions & 45 deletions lib/src/embedded/compilation_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,15 @@ final class CompilationDispatcher {
/// This is used in outgoing messages.
late Uint8List _compilationIdVarint;

/// Whether we detected a [ProtocolError] while parsing an incoming response.
///
/// If we have, we don't want to send the final compilation result because
/// it'll just be a wrapper around the error.
var _requestError = false;

/// Creates a [CompilationDispatcher] that receives encoded protocol buffers
/// through [_mailbox] and sends them through [_sendPort].
CompilationDispatcher(this._mailbox, this._sendPort);

/// Listens for incoming `CompileRequests` and runs their compilations.
void listen() {
do {
Uint8List packet;
try {
packet = _mailbox.take();
} on StateError catch (_) {
break;
}

while (true) {
try {
var (compilationId, messageBuffer) = parsePacket(packet);
var (compilationId, messageBuffer) = parsePacket(_receive());

_compilationId = compilationId;
_compilationIdVarint = serializeVarint(compilationId);
Expand All @@ -88,9 +75,7 @@ final class CompilationDispatcher {
case InboundMessage_Message.compileRequest:
var request = message.compileRequest;
var response = _compile(request);
if (!_requestError) {
_send(OutboundMessage()..compileResponse = response);
}
_send(OutboundMessage()..compileResponse = response);

case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");
Expand All @@ -113,7 +98,7 @@ final class CompilationDispatcher {
} catch (error, stackTrace) {
_handleError(error, stackTrace);
}
} while (!_requestError);
}
}

OutboundMessage_CompileResponse _compile(
Expand Down Expand Up @@ -287,20 +272,13 @@ final class CompilationDispatcher {
void sendLog(OutboundMessage_LogEvent event) =>
_send(OutboundMessage()..logEvent = event);

/// Sends [error] to the host.
/// Sends [error] to the host and exit.
///
/// This is used during compilation by other classes like host callable.
/// Therefore it must set _requestError = true to prevent sending a CompileFailure after
/// sending a ProtocolError.
void sendError(ProtocolError error) {
_sendError(error);
_requestError = true;
Never sendError(ProtocolError error) {
Isolate.exit(_sendPort, _serializePacket(OutboundMessage()..error = error));
}

/// Sends [error] to the host.
void _sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
Expand All @@ -326,19 +304,9 @@ final class CompilationDispatcher {
message.id = _outboundRequestId;
_send(message);

Uint8List packet;
try {
packet = _mailbox.take();
} on StateError catch (_) {
// Compiler is shutting down, throw without calling `_handleError` as we
// don't want to report this as an actual error.
_requestError = true;
rethrow;
}

try {
var messageBuffer =
Uint8List.sublistView(packet, _compilationIdVarint.length);
Uint8List.sublistView(_receive(), _compilationIdVarint.length);

InboundMessage message;
try {
Expand Down Expand Up @@ -376,21 +344,24 @@ final class CompilationDispatcher {
return response;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
_requestError = true;
rethrow;
}
}

/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if
/// available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
_sendError(handleError(error, stackTrace, messageId: messageId));
Never _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
}

/// Sends [message] to the host with the given [wireId].
void _send(OutboundMessage message) {
_sendPort.send(_serializePacket(message));
}

/// Serialize [message] to [Uint8List].
Uint8List _serializePacket(OutboundMessage message) {
var protobufWriter = CodedBufferWriter();
message.writeToCodedBufferWriter(protobufWriter);

Expand All @@ -407,6 +378,17 @@ final class CompilationDispatcher {
};
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_sendPort.send(packet);
return packet;
}

/// Receive a packet from the host.
Uint8List _receive() {
try {
return _mailbox.take();
} on StateError catch (_) {
// The [_mailbox] has been closed, exit the current isolate immediately
// to avoid bubble the error up as [SassException] during [_sendRequest].
Isolate.exit();
}
}
}
1 change: 0 additions & 1 deletion lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ Callable hostCallable(
}
} on ProtocolError catch (error, stackTrace) {
dispatcher.sendError(handleError(error, stackTrace));
throw error.message;
}
});
return callable;
Expand Down
55 changes: 39 additions & 16 deletions lib/src/embedded/isolate_dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import 'dart:async';
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';

Expand All @@ -27,7 +28,7 @@ class IsolateDispatcher {
/// All isolates that have been spawned to dispatch to.
///
/// Only used for cleaning up the process when the underlying channel closes.
final _allIsolates = StreamController<ReusableIsolate>();
final _allIsolates = StreamController<ReusableIsolate>(sync: true);

/// The isolates that aren't currently running compilations
final _inactiveIsolates = <ReusableIsolate>{};
Expand All @@ -43,6 +44,9 @@ class IsolateDispatcher {
/// See https://github.com/sass/dart-sass/pull/2019
final _isolatePool = Pool(sizeOf<IntPtr>() <= 4 ? 7 : 15);

/// Whether [_channel] has been closed or not.
var _closed = false;

IsolateDispatcher(this._channel);

void listen() {
Expand All @@ -56,6 +60,10 @@ class IsolateDispatcher {
if (compilationId != 0) {
var isolate = await _activeIsolates.putIfAbsent(
compilationId, () => _getIsolate(compilationId!));

// The shutdown may have started by the time the isolate is spawned
if (_closed) return;

try {
isolate.send(packet);
return;
Expand Down Expand Up @@ -88,6 +96,7 @@ class IsolateDispatcher {
}, onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace);
}, onDone: () {
_closed = true;
_allIsolates.stream.listen((isolate) => isolate.kill());
});
}
Expand All @@ -103,26 +112,40 @@ class IsolateDispatcher {
isolate = _inactiveIsolates.first;
_inactiveIsolates.remove(isolate);
} else {
var future = ReusableIsolate.spawn(_isolateMain);
var future = ReusableIsolate.spawn(_isolateMain,
onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace);
});
isolate = await future;
_allIsolates.add(isolate);
}

isolate.checkOut().listen(_channel.sink.add,
onError: (Object error, StackTrace stackTrace) {
if (error is ProtocolError) {
// Protocol errors have already been through [_handleError] in the child
// isolate, so we just send them as-is and close out the underlying
// channel.
sendError(compilationId, error);
_channel.sink.close();
} else {
_handleError(error, stackTrace);
isolate.borrow((message) {
var fullBuffer = message as Uint8List;

// The first byte of messages from isolates indicates whether the entire
// compilation is finished (1) or if it encountered an error (2). Sending
// this as part of the message buffer rather than a separate message
// avoids a race condition where the host might send a new compilation
// request with the same ID as one that just finished before the
// [IsolateDispatcher] receives word that the isolate with that ID is
// done. See sass/dart-sass#2004.
var category = fullBuffer[0];
var packet = Uint8List.sublistView(fullBuffer, 1);

switch (category) {
case 0:
_channel.sink.add(packet);
case 1:
_activeIsolates.remove(compilationId);
isolate.release();
_inactiveIsolates.add(isolate);
resource.release();
_channel.sink.add(packet);
case 2:
_channel.sink.add(packet);
exit(exitCode);
}
}, onDone: () {
_activeIsolates.remove(compilationId);
_inactiveIsolates.add(isolate);
resource.release();
});

return isolate;
Expand Down
Loading