-
Notifications
You must be signed in to change notification settings - Fork 650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enforce Sendability in EventLoopFuture
and EventLoopPromise
methods
#2501
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,25 +41,12 @@ extension EventLoopFuture { | |
/// - returns: A future that will receive the eventual value. | ||
@inlinable | ||
@preconcurrency | ||
public func flatMapWithEventLoop<NewValue>(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture<NewValue>) -> EventLoopFuture<NewValue> { | ||
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self.eventLoop) | ||
self._whenComplete { [eventLoop = self.eventLoop] in | ||
switch self._value! { | ||
case .success(let t): | ||
let futureU = callback(t, eventLoop) | ||
if futureU.eventLoop.inEventLoop { | ||
return futureU._addCallback { | ||
next._setValue(value: futureU._value!) | ||
} | ||
} else { | ||
futureU.cascade(to: next) | ||
return CallbackList() | ||
} | ||
case .failure(let error): | ||
return next._setValue(value: .failure(error)) | ||
} | ||
} | ||
return next.futureResult | ||
public func flatMapWithEventLoop<NewValue: Sendable>(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture<NewValue>) -> EventLoopFuture<NewValue> { | ||
// Is this the same thing and still fast? | ||
let eventLoop = self.eventLoop | ||
return self.flatMap { | ||
callback($0, eventLoop) | ||
}.hop(to: self.eventLoop) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can delete the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and the previous implementation probably also allocates less, depends on the optimiser a bit. Why not just revert? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. Well this method became a lot easier then :D |
||
} | ||
|
||
/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which | ||
|
@@ -75,20 +62,22 @@ extension EventLoopFuture { | |
/// - returns: A future that will receive the recovered value. | ||
@inlinable | ||
@preconcurrency | ||
public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture<Value>) -> EventLoopFuture<Value> { | ||
public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture<Value>) -> EventLoopFuture<Value> where Value: Sendable { | ||
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self.eventLoop) | ||
let unsafeSelf = UnsafeTransfer(self) | ||
let unsafeNext = UnsafeTransfer(next) | ||
self._whenComplete { [eventLoop = self.eventLoop] in | ||
switch self._value! { | ||
switch unsafeSelf.wrappedValue._value! { | ||
case .success(let t): | ||
return next._setValue(value: .success(t)) | ||
return unsafeNext.wrappedValue._setValue(value: .success(t)) | ||
case .failure(let e): | ||
let t = callback(e, eventLoop) | ||
if t.eventLoop.inEventLoop { | ||
return t._addCallback { | ||
next._setValue(value: t._value!) | ||
unsafeNext.wrappedValue._setValue(value: t._value!) | ||
} | ||
} else { | ||
t.cascade(to: next) | ||
t.cascade(to: unsafeNext.wrappedValue) | ||
return CallbackList() | ||
} | ||
} | ||
|
@@ -113,16 +102,15 @@ extension EventLoopFuture { | |
/// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`. | ||
/// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. | ||
@inlinable | ||
@preconcurrency | ||
public func foldWithEventLoop<OtherValue>( | ||
_ futures: [EventLoopFuture<OtherValue>], | ||
with combiningFunction: @escaping @Sendable (Value, OtherValue, EventLoop) -> EventLoopFuture<Value> | ||
) -> EventLoopFuture<Value> { | ||
func fold0(eventLoop: EventLoop) -> EventLoopFuture<Value> { | ||
) -> EventLoopFuture<Value> where Value: Sendable, OtherValue: Sendable { // This is a breaking change | ||
let fold0: @Sendable (EventLoop) -> EventLoopFuture<Value> = { (eventLoop: EventLoop) in | ||
let body = futures.reduce(self) { (f1: EventLoopFuture<Value>, f2: EventLoopFuture<OtherValue>) -> EventLoopFuture<Value> in | ||
let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture<Value> in | ||
let (f1Value, f2Value) = args | ||
self.eventLoop.assertInEventLoop() | ||
eventLoop.assertInEventLoop() | ||
return combiningFunction(f1Value, f2Value, eventLoop) | ||
} | ||
assert(newFuture.eventLoop === self.eventLoop) | ||
|
@@ -132,11 +120,11 @@ extension EventLoopFuture { | |
} | ||
|
||
if self.eventLoop.inEventLoop { | ||
return fold0(eventLoop: self.eventLoop) | ||
return fold0(self.eventLoop) | ||
} else { | ||
let promise = self.eventLoop.makePromise(of: Value.self) | ||
self.eventLoop.execute { [eventLoop = self.eventLoop] in | ||
fold0(eventLoop: eventLoop).cascade(to: promise) | ||
fold0(eventLoop).cascade(to: promise) | ||
} | ||
return promise.futureResult | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this one. Probably the previous implementation was faster but I wasn't really sure here