Skip to content

Commit c1d1757

Browse files
authored
Combine RequestBody delegate, source state to fix hang (#666)
* Combine RequestBody delegate produceMore and waiting continuations into single state Fixes issue with hang where yield was told to stop producing and but then was told it could continue producing before the next call to yield. * Attempt to remove withCheckedContinuation for produceMore state * Avoid allocating deque if only one continuation is created * Update codecov token * Resume all continuations on produceMore
1 parent b1bc64f commit c1d1757

File tree

2 files changed

+138
-20
lines changed

2 files changed

+138
-20
lines changed

Sources/HummingbirdCore/Request/RequestBody.swift

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -132,38 +132,116 @@ extension RequestBody {
132132
>
133133

134134
/// Delegate for NIOThrowingAsyncSequenceProducer
135+
///
136+
/// This can be a struct as the state is stored inside a NIOLockedValueBox which
137+
/// turns it into a reference value
135138
@usableFromInline
136-
final class Delegate: NIOAsyncSequenceProducerDelegate, Sendable {
137-
let checkedContinuations: NIOLockedValueBox<Deque<CheckedContinuation<Void, Never>>>
139+
struct Delegate: NIOAsyncSequenceProducerDelegate, Sendable {
140+
enum State {
141+
case produceMore
142+
case waitingForProduceMore(CheckedContinuation<Void, Never>?)
143+
case multipleWaitingForProduceMore(Deque<CheckedContinuation<Void, Never>>)
144+
case terminated
145+
}
146+
let state: NIOLockedValueBox<State>
138147

139148
@usableFromInline
140149
init() {
141-
self.checkedContinuations = .init([])
150+
self.state = .init(.produceMore)
142151
}
143152

144153
@usableFromInline
145154
func produceMore() {
146-
self.checkedContinuations.withLockedValue {
147-
if let cont = $0.popFirst() {
148-
cont.resume()
155+
self.state.withLockedValue { state in
156+
switch state {
157+
case .produceMore:
158+
break
159+
case .waitingForProduceMore(let continuation):
160+
if let continuation {
161+
continuation.resume()
162+
}
163+
state = .produceMore
164+
165+
case .multipleWaitingForProduceMore(var continuations):
166+
// this isnt exactly correct as the number of continuations
167+
// resumed can overflow the back pressure
168+
while let cont = continuations.popFirst() {
169+
cont.resume()
170+
}
171+
state = .produceMore
172+
173+
case .terminated:
174+
preconditionFailure("Unexpected state")
149175
}
150176
}
151177
}
152178

153179
@usableFromInline
154180
func didTerminate() {
155-
self.checkedContinuations.withLockedValue {
156-
while let cont = $0.popFirst() {
157-
cont.resume()
181+
self.state.withLockedValue { state in
182+
switch state {
183+
case .produceMore:
184+
break
185+
case .waitingForProduceMore(let continuation):
186+
if let continuation {
187+
continuation.resume()
188+
}
189+
state = .terminated
190+
case .multipleWaitingForProduceMore(var continuations):
191+
while let cont = continuations.popFirst() {
192+
cont.resume()
193+
}
194+
state = .terminated
195+
case .terminated:
196+
preconditionFailure("Unexpected state")
158197
}
159198
}
160199
}
161200

162201
@usableFromInline
163202
func waitForProduceMore() async {
164-
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
165-
self.checkedContinuations.withLockedValue {
166-
$0.append(cont)
203+
switch self.state.withLockedValue({ $0 }) {
204+
case .produceMore, .terminated:
205+
break
206+
case .waitingForProduceMore, .multipleWaitingForProduceMore:
207+
await withCheckedContinuation { (newContinuation: CheckedContinuation<Void, Never>) in
208+
self.state.withLockedValue { state in
209+
switch state {
210+
case .produceMore:
211+
newContinuation.resume()
212+
case .waitingForProduceMore(let firstContinuation):
213+
if let firstContinuation {
214+
var continuations = Deque<CheckedContinuation<Void, Never>>()
215+
continuations.reserveCapacity(2)
216+
continuations.append(firstContinuation)
217+
continuations.append(newContinuation)
218+
state = .multipleWaitingForProduceMore(continuations)
219+
} else {
220+
state = .waitingForProduceMore(newContinuation)
221+
}
222+
case .multipleWaitingForProduceMore(var continuations):
223+
continuations.append(newContinuation)
224+
state = .multipleWaitingForProduceMore(continuations)
225+
case .terminated:
226+
newContinuation.resume()
227+
}
228+
}
229+
}
230+
}
231+
}
232+
233+
@usableFromInline
234+
func stopProducing() {
235+
self.state.withLockedValue { state in
236+
switch state {
237+
case .produceMore:
238+
state = .waitingForProduceMore(nil)
239+
case .waitingForProduceMore:
240+
break
241+
case .multipleWaitingForProduceMore:
242+
break
243+
case .terminated:
244+
break
167245
}
168246
}
169247
}
@@ -175,14 +253,11 @@ extension RequestBody {
175253
let source: Producer.Source
176254
@usableFromInline
177255
let delegate: Delegate
178-
@usableFromInline
179-
let waitForProduceMore: NIOLockedValueBox<Bool>
180256

181257
@usableFromInline
182258
init(source: Producer.Source, delegate: Delegate) {
183259
self.source = source
184260
self.delegate = delegate
185-
self.waitForProduceMore = .init(false)
186261
}
187262

188263
/// Yields the element to the inbound stream.
@@ -195,13 +270,10 @@ extension RequestBody {
195270
public func yield(_ element: ByteBuffer) async throws {
196271
// if previous call indicated we should stop producing wait until the delegate
197272
// says we can start producing again
198-
if self.waitForProduceMore.withLockedValue({ $0 }) {
199-
await self.delegate.waitForProduceMore()
200-
self.waitForProduceMore.withLockedValue { $0 = false }
201-
}
273+
await self.delegate.waitForProduceMore()
202274
let result = self.source.yield(element)
203275
if result == .stopProducing {
204-
self.waitForProduceMore.withLockedValue { $0 = true }
276+
self.delegate.stopProducing()
205277
}
206278
}
207279

Tests/HummingbirdTests/ApplicationTests.swift

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,52 @@ final class ApplicationTests: XCTestCase {
867867
}
868868
}
869869

870+
/// Test AsyncSequence returned by RequestBody.makeStream() and feeding it data from multiple processes
871+
func testMakeStreamMultipleSources() async throws {
872+
let router = Router()
873+
router.get("numbers") { request, context -> Response in
874+
let body = try await withThrowingTaskGroup(of: Void.self) { group in
875+
let (requestBody, source) = RequestBody.makeStream()
876+
group.addTask {
877+
// Add three tasks feeding the source
878+
await withThrowingTaskGroup(of: Void.self) { group in
879+
group.addTask {
880+
for value in 0..<100 {
881+
try await source.yield(ByteBuffer(string: String(describing: value)))
882+
}
883+
}
884+
group.addTask {
885+
for value in 0..<100 {
886+
try await source.yield(ByteBuffer(string: String(describing: value)))
887+
}
888+
}
889+
group.addTask {
890+
for value in 0..<100 {
891+
try await source.yield(ByteBuffer(string: String(describing: value)))
892+
}
893+
}
894+
}
895+
source.finish()
896+
}
897+
var body = ByteBuffer()
898+
for try await buffer in requestBody {
899+
var buffer = buffer
900+
body.writeBuffer(&buffer)
901+
try await Task.sleep(for: .milliseconds(1))
902+
}
903+
return body
904+
}
905+
return Response(status: .ok, body: .init(byteBuffer: body))
906+
}
907+
let app = Application(responder: router.buildResponder())
908+
909+
try await app.test(.router) { client in
910+
try await client.execute(uri: "/numbers", method: .get) { response in
911+
XCTAssertEqual(response.status, .ok)
912+
}
913+
}
914+
}
915+
870916
#if compiler(>=6.0)
871917
/// Test consumeWithInboundCloseHandler
872918
func testConsumeWithInboundHandler() async throws {

0 commit comments

Comments
 (0)