Skip to content

Commit

Permalink
Don't retain receiver of Completable.andThen beyond its completion (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolaykasyanov authored Oct 4, 2024
1 parent 11c2d30 commit 98f63d1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
16 changes: 8 additions & 8 deletions RxSwift/Traits/PrimitiveSequence/Completable+AndThen.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ final private class ConcatCompletable<Element>: Producer<Element> {
}

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ConcatCompletableSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
let sink = ConcatCompletableSink(second: second, observer: observer, cancel: cancel)
let subscription = sink.run(completable: completable)
return (sink: sink, subscription: subscription)
}
}
Expand All @@ -82,11 +82,11 @@ final private class ConcatCompletableSink<Observer: ObserverType>
typealias Element = Never
typealias Parent = ConcatCompletable<Observer.Element>

private let parent: Parent
private let second: Observable<Observer.Element>
private let subscription = SerialDisposable()

init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
init(second: Observable<Observer.Element>, observer: Observer, cancel: Cancelable) {
self.second = second
super.init(observer: observer, cancel: cancel)
}

Expand All @@ -99,14 +99,14 @@ final private class ConcatCompletableSink<Observer: ObserverType>
break
case .completed:
let otherSink = ConcatCompletableSinkOther(parent: self)
self.subscription.disposable = self.parent.second.subscribe(otherSink)
self.subscription.disposable = self.second.subscribe(otherSink)
}
}

func run() -> Disposable {
func run(completable: Observable<Never>) -> Disposable {
let subscription = SingleAssignmentDisposable()
self.subscription.disposable = subscription
subscription.setDisposable(self.parent.completable.subscribe(self))
subscription.setDisposable(completable.subscribe(self))
return self.subscription
}
}
Expand Down
2 changes: 2 additions & 0 deletions Sources/AllTestz/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ final class CompletableAndThenTest_ : CompletableAndThenTest, RxTestCase {
("testCompletableCompleted_CompletableCompleted", CompletableAndThenTest.testCompletableCompleted_CompletableCompleted),
("testCompletableError_CompletableCompleted", CompletableAndThenTest.testCompletableError_CompletableCompleted),
("testCompletableCompleted_CompletableError", CompletableAndThenTest.testCompletableCompleted_CompletableError),
("testCompletable_FirstCompletableNotRetainedBeyondCompletion", CompletableAndThenTest.testCompletable_FirstCompletableNotRetainedBeyondCompletion),
("testCompletable_FirstCompletableNotRetainedBeyondFailure", CompletableAndThenTest.testCompletable_FirstCompletableNotRetainedBeyondFailure),
("testCompletableEmpty_SingleCompleted", CompletableAndThenTest.testCompletableEmpty_SingleCompleted),
("testCompletableCompleted_SingleNormal", CompletableAndThenTest.testCompletableCompleted_SingleNormal),
("testCompletableError_SingleNormal", CompletableAndThenTest.testCompletableError_SingleNormal),
Expand Down
67 changes: 67 additions & 0 deletions Tests/RxSwiftTests/Completable+AndThen.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,70 @@ extension CompletableAndThenTest {
])
}

func testCompletable_FirstCompletableNotRetainedBeyondCompletion() {
let scheduler = TestScheduler(initialClock: 0)

let x: TestableObservable<Never> = scheduler.createHotObservable([
.completed(210),
])

var object = Optional.some(TestObject())

var completable = x.asCompletable()
.do(onCompleted: { [object] in
_ = object
})

let disposable = completable
.andThen(.never())
.subscribe()

defer {
disposable.dispose()
}

// completable has completed by now
scheduler.advanceTo(300)

weak var weakObject = object
object = nil
completable = .never()

XCTAssertNil(weakObject)
}

func testCompletable_FirstCompletableNotRetainedBeyondFailure() {
let scheduler = TestScheduler(initialClock: 0)

let x: TestableObservable<Never> = scheduler.createHotObservable([
.error(210, TestError.dummyError),
])

var object = Optional.some(TestObject())

var completable = x.asCompletable()
.do(onCompleted: { [object] in
_ = object
})

let disposable = completable
.andThen(.never())
.subscribe()

defer {
disposable.dispose()
}

// completable has terminated with error by now
scheduler.advanceTo(300)

weak var weakObject = object
object = nil
completable = .never()

XCTAssertNil(weakObject)
}

#if TRACE_RESOURCES
func testAndThenCompletableReleasesResourcesOnComplete() {
_ = Completable.empty().andThen(Completable.empty()).subscribe()
Expand Down Expand Up @@ -575,3 +639,6 @@ extension CompletableAndThenTest {
}
#endif
}

private class TestObject {
}

0 comments on commit 98f63d1

Please sign in to comment.