Skip to content

Commit

Permalink
Introduced EventSourceActor, fixed all concurrency warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
Recouse committed Nov 17, 2024
1 parent 8d28360 commit 1567f7c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 39 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ Using EventSource is easy. Simply create a new task from an instance of EventSou
import EventSource

let eventSource = EventSource()
let dataTask = eventSource.dataTask(for: urlRequest)
let dataTask = await eventSource.dataTask(for: urlRequest)

for await event in dataTask.events() {
for await event in await dataTask.events() {
switch event {
case .open:
print("Connection was opened.")
Expand Down
60 changes: 33 additions & 27 deletions Sources/EventSource/EventSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import Foundation
import FoundationNetworking
#endif

/// The global actor used for isolating ``EventSource/EventSource/DataTask``.
@globalActor public actor EventSourceActor: GlobalActor {
public static let shared = EventSourceActor()
}

///
/// An `EventSource` instance opens a persistent connection to an HTTP server,
/// which sends events in `text/event-stream` format.
Expand All @@ -26,7 +31,7 @@ public struct EventSource {
}

/// Event type.
public enum EventType {
public enum EventType: Sendable {
case error(Error)
case event(EVEvent)
case open
Expand All @@ -44,7 +49,8 @@ public struct EventSource {
self.eventParser = eventParser
self.timeoutInterval = timeoutInterval
}


@EventSourceActor
public func dataTask(for urlRequest: URLRequest) -> DataTask {
DataTask(
urlRequest: urlRequest,
Expand All @@ -60,7 +66,7 @@ public extension EventSource {
/// Creation of a task is exclusively handled by ``EventSource``. A new task can be created by calling
/// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task,
/// it can be started by iterating event stream returned by ``DataTask/events()``.
final class DataTask {
@EventSourceActor final class DataTask {
/// A value representing the state of the connection.
public private(set) var readyState: ReadyState = .none

Expand All @@ -79,11 +85,7 @@ public extension EventSource {
private var continuation: AsyncStream<EventType>.Continuation?

private var urlSession: URLSession?

private var sessionDelegate = SessionDelegate()

private var sessionDelegateTask: Task<Void, Error>?


private var urlSessionDataTask: URLSessionDataTask?

private var httpResponseErrorStatusCode: Int?
Expand Down Expand Up @@ -113,31 +115,36 @@ public extension EventSource {
/// Creates and returns event stream.
public func events() -> AsyncStream<EventType> {
AsyncStream { continuation in
let sessionDelegate = SessionDelegate()
let sesstionDelegateTask = Task { [weak self] in
for await event in sessionDelegate.eventStream {
guard let self else { return }

switch event {
case let .didCompleteWithError(error):
handleSessionError(error)
case let .didReceiveResponse(response, completionHandler):
handleSessionResponse(response, completionHandler: completionHandler)
case let .didReceiveData(data):
parseMessages(from: data)
}
}
}

continuation.onTermination = { @Sendable [weak self] _ in
self?.close()
sesstionDelegateTask.cancel()
Task { await self?.close() }
}

self.continuation = continuation



urlSession = URLSession(
configuration: urlSessionConfiguration,
delegate: sessionDelegate,
delegateQueue: nil
)

sessionDelegate.onEvent = { [weak self] event in
guard let self else { return }

switch event {
case let .didCompleteWithError(error):
handleSessionError(error)
case let .didReceiveResponse(response, completionHandler):
handleSessionResponse(response, completionHandler: completionHandler)
case let .didReceiveData(data):
parseMessages(from: data)
}
}


urlSessionDataTask = urlSession!.dataTask(with: urlRequest)
urlSessionDataTask!.resume()
readyState = .connecting
Expand Down Expand Up @@ -194,7 +201,7 @@ public extension EventSource {
/// Closes the connection, if one was made,
/// and sets the `readyState` property to `.closed`.
/// - Returns: State before closing.
@Sendable private func close() {
private func close() {
let previousState = self.readyState
if previousState != .closed {
continuation?.yield(.closed)
Expand Down Expand Up @@ -242,7 +249,6 @@ public extension EventSource {
public func cancel() {
readyState = .closed
lastMessageId = ""
sessionDelegateTask?.cancel()
urlSessionDataTask?.cancel()
urlSession?.invalidateAndCancel()
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/EventSource/ServerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Foundation

/// Protocol for defining a basic event structure. It is used by the ``EventParser``
/// and should be implemented as a custom type when a custom ``EventParser`` is required.
public protocol EVEvent {
public protocol EVEvent: Sendable {
var id: String? { get }
var event: String? { get }
var data: String? { get }
Expand Down
20 changes: 11 additions & 9 deletions Sources/EventSource/SessionDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,38 @@ import Foundation
#endif

final class SessionDelegate: NSObject, URLSessionDataDelegate {
enum Event {
enum Event: Sendable {
case didCompleteWithError(Error?)
case didReceiveResponse(URLResponse, (URLSession.ResponseDisposition) -> Void)
case didReceiveResponse(URLResponse, @Sendable (URLSession.ResponseDisposition) -> Void)
case didReceiveData(Data)
}

var onEvent: (Event) -> Void = { _ in }


private let internalStream = AsyncStream<Event>.makeStream()

var eventStream: AsyncStream<Event> { internalStream.stream }

func urlSession(
_ session: URLSession,
task: URLSessionTask,
didCompleteWithError error: Error?
) {
onEvent(.didCompleteWithError(error))
internalStream.continuation.yield(.didCompleteWithError(error))
}

func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive response: URLResponse,
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
completionHandler: @Sendable @escaping (URLSession.ResponseDisposition) -> Void
) {
onEvent(.didReceiveResponse(response, completionHandler))
internalStream.continuation.yield(.didReceiveResponse(response, completionHandler))
}

func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive data: Data
) {
onEvent(.didReceiveData(data))
internalStream.continuation.yield(.didReceiveData(data))
}
}

0 comments on commit 1567f7c

Please sign in to comment.