Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
Dispose on deinit
Browse files Browse the repository at this point in the history
Use NSRecursiveLock for subscriber reads/writes
  • Loading branch information
Tim Chamberlin committed Oct 7, 2021
1 parent 1ed8cc9 commit ed2f2d7
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 25 deletions.
9 changes: 8 additions & 1 deletion Snail/Disposer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@ public protocol DisposableType {
}

public class Disposer {
private(set) public var disposables: [DisposableType] = []
private(set) var disposables: [DisposableType] = []
private let recursiveLock = NSRecursiveLock()

public init() {}

deinit {
disposeAll()
}

public func disposeAll() {
recursiveLock.lock(); defer { recursiveLock.unlock() }
disposables.forEach { $0.dispose() }
disposables.removeAll()
}

public func add(disposable: DisposableType) {
recursiveLock.lock(); defer { recursiveLock.unlock() }
disposables.append(disposable)
}
}
41 changes: 17 additions & 24 deletions Snail/Observable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import Dispatch
public class Observable<T>: ObservableType {
private var isStopped: Int32 = 0
private var stoppedEvent: Event<T>?
private var subscribers: [Subscriber<T>] = []
private let subscribersQueue = DispatchQueue(label: "snail-observable-queue", attributes: .concurrent)
private(set) var subscribers: [Subscriber<T>] = []
private let recursiveLock = NSRecursiveLock()

public init() {}

Expand All @@ -28,31 +28,27 @@ public class Observable<T>: ObservableType {
return subscriber
}

subscribersQueue.async(flags: .barrier) {
self.subscribers.append(subscriber)
}
recursiveLock.lock(); defer { recursiveLock.unlock() }
subscribers.append(subscriber)

return subscriber
}

public func on(_ event: Event<T>) {
recursiveLock.lock(); defer { recursiveLock.unlock() }
switch event {
case .next:
guard isStopped == 0 else {
return
}

subscribersQueue.sync {
self.subscribers.forEach {
notify(subscriber: $0, event: event)
}
subscribers.forEach {
notify(subscriber: $0, event: event)
}
case .error, .done:
if OSAtomicCompareAndSwap32Barrier(0, 1, &isStopped) {
subscribersQueue.sync {
self.subscribers.forEach {
notify(subscriber: $0, event: event)
}
subscribers.forEach {
notify(subscriber: $0, event: event)
}
stoppedEvent = event
}
Expand All @@ -69,21 +65,18 @@ public class Observable<T>: ObservableType {
}

public func removeSubscribers() {
subscribersQueue.async(flags: .barrier) {
self.subscribers.removeAll()
}
recursiveLock.lock(); defer { recursiveLock.unlock() }
subscribers.removeAll()
}

public func removeSubscriber(subscriber: Subscriber<T>) {
subscribersQueue.sync {
guard let index = self.subscribers.enumerated().first(where: { $0.element === subscriber })?.offset else {
return
}

subscribersQueue.async(flags: .barrier) {
self.subscribers.remove(at: index)
}
recursiveLock.lock(); defer { recursiveLock.unlock() }

guard let index = subscribers.enumerated().first(where: { $0.element === subscriber })?.offset else {
return
}

subscribers.remove(at: index)
}

public func map<U>(_ transform: @escaping (T) -> U) -> Observable<U> {
Expand Down
24 changes: 24 additions & 0 deletions SnailTests/DisposerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,28 @@ class DisposerTests: XCTestCase {
subject.disposeAll()
XCTAssertTrue(subject.disposables.count == 0)
}

func testDisposeOnDeinitRemovesSubscribers() {
var subject: Disposer? = Disposer()
let exp = expectation(description: "test")
var sum = 0

let obs = Observable<Void>()
for _ in 0..<10 {
if subject == nil {
subject = Disposer()
}
obs.subscribe(onNext: {
sum += 1
}).add(to: subject!)

}
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 0.5) {
subject = nil
obs.on(.next(()))
XCTAssertEqual(sum, 0)
exp.fulfill()
}
waitForExpectations(timeout: 2, handler: nil)
}
}
59 changes: 59 additions & 0 deletions SnailTests/ObservableTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1054,4 +1054,63 @@ class ObservableTests: XCTestCase {

subject.value = true
}

func testRemovingFirstSubscriberWithinSubscription_NoDeadlock() {
class TestObject {
let disposer = Disposer()
private let variable: Variable<Bool>

init(variable: Variable<Bool>) {
self.variable = variable

self.variable.asObservable().subscribe(onNext: { _ in
if let firstSubscriber = variable.subject.subscribers.first {
self.variable.subject.removeSubscriber(subscriber: firstSubscriber)
}
})
.add(to: disposer)
}
}

let subject = Variable(true)
let disposer = Disposer()

subject.asObservable().subscribe(onNext: { _ in
_ = TestObject(variable: subject)
})
.add(to: disposer)

subject.value = true
}

func testDisposerRemovesSubscriptionFromObserverOnDeinit() {
class TestOwner {
let disposer = Disposer()

func bind(observable: Observable<Int>) {
observable
.subscribe(queue: .main, onNext: { _ in })
.add(to: disposer)
}
}

let observable = Observable<Int>()

var owner: TestOwner! = TestOwner()
owner.bind(observable: observable)

let isDone = expectation(description: "Disposes")

DispatchQueue.global().asyncAfter(deadline: .now() + 0.1) {
owner = nil
}

DispatchQueue.global().asyncAfter(deadline: .now() + 0.5) {
isDone.fulfill()
}

waitForExpectations(timeout: 1) { _ in
XCTAssertEqual(observable.subscribers.count, 0)
}
}
}

0 comments on commit ed2f2d7

Please sign in to comment.