Skip to content

Commit

Permalink
change: implement onCanceled without dispatchGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
0xWOF committed Apr 16, 2024
1 parent 21203ed commit c85286b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 63 deletions.
75 changes: 41 additions & 34 deletions Source/Concurrency/Contract/Contract.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,18 @@ public final class Contract<Value, Failure: Error> {

let queue: DispatchQueue
var executeGroup: DispatchGroup
var cancelGroup: DispatchGroup

var subscribers: [ContractSubscriber<Value, Failure>]
var cancelSubscribers: [ContractCancelSubscriber]

init(queue: DispatchQueue = .global()) {
self.state = Atomic(.executing)

self.queue = queue
self.executeGroup = DispatchGroup()
self.cancelGroup = DispatchGroup()

self.subscribers = []

cancelGroup.enter()
}

deinit {
state.capture { state in
if case .canceled = state {} else {
cancelGroup.leave()
}
}
self.cancelSubscribers = []
}
}

Expand Down Expand Up @@ -80,8 +70,14 @@ extension Contract {

func cancel() {
state.mutate { state in
if case .canceled = state {} else {
cancelGroup.leave()
if case .executing = state {
for cancelSubscriber in self.cancelSubscribers {
executeGroup.notify(queue: queue) {
cancelSubscriber.onCanceled()
}
}
subscribers = []
cancelSubscribers = []
return .canceled
}

Expand All @@ -96,19 +92,21 @@ extension Contract {
onCanceled: @escaping () -> Void
) {
state.capture { state in
subscribers.append(ContractSubscriber(
queue: queue,
onResolved: onResolved,
onRejected: onRejected
))
}

let state = self.state
cancelGroup.notify(queue: queue) {
let state = state.capture { $0 }

if case .canceled = state {
onCanceled()
if case .executing = state {
subscribers.append(ContractSubscriber(
queue: queue,
onResolved: onResolved,
onRejected: onRejected
))
cancelSubscribers.append(ContractCancelSubscriber(
queue: queue,
onCanceled: onCanceled
))
}
else if case .canceled = state {
executeGroup.notify(queue: queue) {
onCanceled()
}
}
}
}
Expand All @@ -117,12 +115,17 @@ extension Contract {
queue: DispatchQueue,
onCanceled: @escaping () -> Void
) {
let state = self.state
cancelGroup.notify(queue: queue) {
let state = state.capture { $0 }

if case .canceled = state {
onCanceled()
state.capture { state in
if case .executing = state {
cancelSubscribers.append(ContractCancelSubscriber(
queue: queue,
onCanceled: onCanceled
))
}
else if case .canceled = state {
executeGroup.notify(queue: queue) {
onCanceled()
}
}
}
}
Expand Down Expand Up @@ -190,7 +193,6 @@ extension Contract {
) -> Contract<Value, Failure> {
let contract = Contract(queue: queue)
contract.state = Atomic(.canceled)
contract.cancelGroup.leave()

return contract
}
Expand Down Expand Up @@ -266,6 +268,11 @@ struct ContractSubscriber<Value, Failure: Error> {
let onRejected: (Failure) -> Void
}

struct ContractCancelSubscriber {
let queue: DispatchQueue
let onCanceled: () -> Void
}

public final class ContractSchedule {
private let mode: Mode

Expand Down
76 changes: 47 additions & 29 deletions Source/Concurrency/Promise/Promise.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,25 @@ public final class Promise<Value, Failure: Error> {
var state: Atomic<PromiseState<Value, Failure>>

let queue: DispatchQueue
let executeGroup: DispatchGroup
let cancelGroup: DispatchGroup
let pendingGroup: DispatchGroup

var cancelSubscribers: [PromiseCancelSubscriber]

init(queue: DispatchQueue = .global()) {
self.state = Atomic(.pending)

self.queue = queue
self.executeGroup = DispatchGroup()
self.cancelGroup = DispatchGroup()
self.pendingGroup = DispatchGroup()

self.cancelSubscribers = []

executeGroup.enter()
cancelGroup.enter()
pendingGroup.enter()
}

deinit {
state.capture { state in
if case .canceled = state {} else {
cancelGroup.leave()
if case .pending = state {
executeGroup.leave()
}
if case .pending = state {
pendingGroup.leave()
}
}
}
Expand Down Expand Up @@ -222,7 +220,7 @@ extension Promise {
func resolve(_ value: Value) {
state.mutate { state in
if case .pending = state {
executeGroup.leave()
pendingGroup.leave()
return .resolved(value)
}

Expand All @@ -233,7 +231,7 @@ extension Promise {
func reject(_ error: Failure) {
state.mutate { state in
if case .pending = state {
executeGroup.leave()
pendingGroup.leave()
return .rejected(error)
}

Expand All @@ -244,10 +242,15 @@ extension Promise {
func cancel() {
state.mutate { state in
if case .canceled = state {} else {
cancelGroup.leave()
if case .pending = state {
executeGroup.leave()
pendingGroup.leave()
}
for cancelSubscriber in self.cancelSubscribers {
pendingGroup.notify(queue: queue) {
cancelSubscriber.onCanceled()
}
}
cancelSubscribers = []
return .canceled
}

Expand All @@ -262,7 +265,7 @@ extension Promise {
onCanceled: @escaping () -> Void
) {
let state = self.state
executeGroup.notify(queue: queue) {
pendingGroup.notify(queue: queue) {
let state = state.capture { $0 }

if case .resolved(let value) = state {
Expand All @@ -272,11 +275,17 @@ extension Promise {
onRejected(error)
}
}
cancelGroup.notify(queue: queue) {
let state = state.capture { $0 }

state.capture { state in
if case .canceled = state {
onCanceled()
pendingGroup.notify(queue: queue) {
onCanceled()
}
}
else {
cancelSubscribers.append(PromiseCancelSubscriber(
queue: queue,
onCanceled: onCanceled
))
}
}
}
Expand All @@ -285,12 +294,17 @@ extension Promise {
queue: DispatchQueue,
onCanceled: @escaping () -> Void
) {
let state = self.state
cancelGroup.notify(queue: queue) {
let state = state.capture { $0 }

state.capture { state in
if case .canceled = state {
onCanceled()
pendingGroup.notify(queue: queue) {
onCanceled()
}
}
else {
cancelSubscribers.append(PromiseCancelSubscriber(
queue: queue,
onCanceled: onCanceled
))
}
}
}
Expand Down Expand Up @@ -373,7 +387,7 @@ extension Promise {
) -> Promise<Value, Failure> {
let promise = Promise(queue: queue)
promise.state = Atomic(.resolved(value))
promise.executeGroup.leave()
promise.pendingGroup.leave()

return promise
}
Expand All @@ -384,7 +398,7 @@ extension Promise {
) -> Promise<Value, Error> where Failure == Error {
let promise = Promise<Value, Error>(queue: queue)
promise.state = Atomic(.rejected(error))
promise.executeGroup.leave()
promise.pendingGroup.leave()

return promise
}
Expand All @@ -394,8 +408,7 @@ extension Promise {
) -> Promise<Value, Failure> {
let promise = Promise(queue: queue)
promise.state = Atomic(.canceled)
promise.executeGroup.leave()
promise.cancelGroup.leave()
promise.pendingGroup.leave()

return promise
}
Expand Down Expand Up @@ -444,6 +457,11 @@ enum PromiseState<Value, Failure: Error> {
case canceled
}

struct PromiseCancelSubscriber {
let queue: DispatchQueue
let onCanceled: () -> Void
}

public enum PromiseError: Error {
case timeout
}

0 comments on commit c85286b

Please sign in to comment.