Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ let package = Package(
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.2"),
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.17.0"),
.package(url: "https://github.com/pointfreeco/xctest-dynamic-overlay", from: "1.2.2"),
.package(url: "https://github.com/cwalo/TUSKit.git", branch: "cwalo/response-body"),
// TODO: point to TUSKit once branch is merged
// .package(url: "https://github.com/tus/TUSKit.git", from: "3.6.0"),
// .package(path: "../TUSKit"), /
.package(url: "https://github.com/WeTransfer/Mocker", from: "3.0.0"),
],
targets: [
Expand Down Expand Up @@ -147,7 +151,10 @@ let package = Package(
.target(
name: "Storage",
dependencies: [
"Helpers"
"Helpers",
"TUSKit",
// Local TUSKit testing
// .product(name: "TUSKit", package: "TUSKit"),
]
),
.testTarget(
Expand Down
31 changes: 31 additions & 0 deletions Sources/Storage/Resumable/ResumableClientStore.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import ConcurrencyExtras
import Foundation

/// Creates and stores ResumableUploadClient instances by bucketId
actor ResumableClientStore {
private let configuration: StorageClientConfiguration

var clients = LockIsolated<[String: ResumableUploadClient]>([:])

init(configuration: StorageClientConfiguration) {
self.configuration = configuration
}

func getOrCreateClient(for bucketId: String) throws -> ResumableUploadClient {
if let client = clients.value[bucketId] {
return client
} else {
let client = try ResumableUploadClient(bucketId: bucketId, configuration: configuration)
clients.withValue { $0[bucketId] = client }
return client
}
}

func removeClient(for bucketId: String) {
clients.withValue { _ = $0.removeValue(forKey: bucketId) }
}

func removeAllClients() {
clients.setValue([:])
}
}
36 changes: 36 additions & 0 deletions Sources/Storage/Resumable/ResumableUpload+Status.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import Foundation

extension ResumableUpload {
public enum Status: Sendable, Equatable {
case queued(UUID)
case started(UUID)
case progress(UUID, uploaded: Int, total: Int)
case finished(UUID)
case cancelled(UUID)
case failed(UUID, any Error)
case fileError(any Error)
case clientError(any Error)

// TODO: more robust equatable implementation
public static func == (lhs: Status, rhs: Status) -> Bool {
switch (lhs, rhs) {
case (.queued(let lhsId), .queued(let rhsId)):
return lhsId == rhsId
case (.started(let lhsId), .started(let rhsId)):
return lhsId == rhsId
case (.progress(let lhsId, _, _), .progress(let rhsId, _, _)):
return lhsId == rhsId
case (.finished(let lhsId), .finished(let rhsId)):
return lhsId == rhsId
case (.cancelled(let lhsId), .cancelled(let rhsId)):
return lhsId == rhsId
case (.fileError(let lhsError), .fileError(let rhsError)):
return lhsError.localizedDescription == rhsError.localizedDescription
case (.clientError(let lhsError), .clientError(let rhsError)):
return lhsError.localizedDescription == rhsError.localizedDescription
default:
return false
}
}
}
}
62 changes: 62 additions & 0 deletions Sources/Storage/Resumable/ResumableUpload.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import ConcurrencyExtras
import Foundation

/// An instance of a ResumableUpload
///
/// Consumers can maintain a reference to the upload to observe its status, pause, and resume the upload
public final class ResumableUpload: @unchecked Sendable {
public let id: UUID
public let context: [String: String]?

weak var client: ResumableUploadClient?
var statuses = LockIsolated<[Status]>([])
var continuations = LockIsolated<[UUID: AsyncStream<Status>.Continuation]>([:])

init(id: UUID, context: [String: String]?, client: ResumableUploadClient) {
self.id = id
self.context = context
self.client = client
self.statuses.setValue([.queued(id)])
}

func send(_ status: Status) {
statuses.withValue { $0.append(status) }
let currentContinuations = continuations.value
currentContinuations.values.forEach { $0.yield(status) }
}

func finish() {
let currentContinuations = continuations.value
continuations.setValue([:])
currentContinuations.values.forEach { $0.finish() }
}

public func currentStatus() -> Status {
statuses.value.last ?? .queued(id)
}

public func status() -> AsyncStream<Status> {
AsyncStream { continuation in
let streamID = UUID()

// Replay the last status
if let status = self.statuses.value.last {
continuation.yield(status)
}

continuations.withValue { $0[streamID] = continuation }
continuation.onTermination = { @Sendable _ in
self.continuations.withValue { _ = $0.removeValue(forKey: streamID) }
}
}
}

public func pause() throws {
try client?.pause(id: id)
}

public func resume() throws -> Bool {
guard let client else { return false }
return try client.resume(id: id)
}
}
73 changes: 73 additions & 0 deletions Sources/Storage/Resumable/ResumableUploadApi.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import ConcurrencyExtras
import Foundation
import HTTPTypes
import TUSKit

/// Supabase Resumable Upload API
public class ResumableUploadApi: StorageApi, @unchecked Sendable {
let bucketId: String
let clientStore: ResumableClientStore

init(bucketId: String, configuration: StorageClientConfiguration, clientStore: ResumableClientStore) {
self.bucketId = bucketId
self.clientStore = clientStore
super.init(configuration: configuration)
}

public func upload(file: URL, to path: String, options: FileOptions = .init()) async throws -> ResumableUpload {
let client = try await clientStore.getOrCreateClient(for: bucketId)
let upload = try client.uploadFile(filePath: file, path: path, options: options)
return upload
}

public func upload(data: Data, to path: String, pathExtension: String? = nil, options: FileOptions = .init()) async throws -> ResumableUpload {
let client = try await clientStore.getOrCreateClient(for: bucketId)
let upload = try client.upload(data: data, path: path, pathExtension: pathExtension, options: options)
return upload
}

public func pauseUpload(id: UUID) async throws {
let client = try await clientStore.getOrCreateClient(for: bucketId)
try client.pause(id: id)
}

public func pauseAllUploads() async throws {
let client = try await clientStore.getOrCreateClient(for: bucketId)
try client.pauseAllUploads()
}

public func resumeUpload(id: UUID) async throws -> Bool {
let client = try await clientStore.getOrCreateClient(for: bucketId)
return try client.resume(id: id)
}

public func retryUpload(id: UUID) async throws -> Bool {
let client = try await clientStore.getOrCreateClient(for: bucketId)
return try client.retry(id: id)
}

public func resumeAllUploads() async throws {
let client = try await clientStore.getOrCreateClient(for: bucketId)
try client.resumeAllUploads()
}

public func cancelUpload(id: UUID) async throws {
let client = try await clientStore.getOrCreateClient(for: bucketId)
try client.cancel(id: id)
}

public func cancelAllUploads() async throws {
let client = try await clientStore.getOrCreateClient(for: bucketId)
try client.cancelAllUploads()
}

public func getUploadStatus(id: UUID) async throws -> ResumableUpload.Status? {
let client = try await clientStore.getOrCreateClient(for: bucketId)
return client.status(id: id)
}

public func getUpload(id: UUID) async throws -> ResumableUpload? {
let client = try await clientStore.getOrCreateClient(for: bucketId)
return client.upload(for: id)
}
}
Loading