diff --git a/Sources/Storage/Resumable/DiskResumableCache.swift b/Sources/Storage/Resumable/DiskResumableCache.swift new file mode 100644 index 000000000..2246f7152 --- /dev/null +++ b/Sources/Storage/Resumable/DiskResumableCache.swift @@ -0,0 +1,44 @@ +import Foundation + +final class DiskResumableCache: ResumableCache, @unchecked Sendable { + private let storage: FileManager + + init(storage: FileManager) { + self.storage = storage + } + + func set(fingerprint: Fingerprint, entry: ResumableCacheEntry) async throws { + let data = try JSONEncoder().encode(entry) + storage.createFile(atPath: fingerprint.value, contents: data) + } + + func get(fingerprint: Fingerprint) async throws -> ResumableCacheEntry? { + let data = storage.contents(atPath: fingerprint.value) + guard let data = data else { + return nil + } + return try JSONDecoder().decode(ResumableCacheEntry.self, from: data) + } + + func remove(fingerprint: Fingerprint) async throws { + try storage.removeItem(atPath: fingerprint.value) + } + + func clear() async throws { + try storage.removeItem(atPath: storage.currentDirectoryPath) + } + + func entries() async throws -> [CachePair] { + let files = try storage.contentsOfDirectory(atPath: storage.currentDirectoryPath) + return try files.compactMap { file -> CachePair? in + let data = storage.contents(atPath: file) + guard let data = data else { + return nil + } + return ( + Fingerprint(value: file)!, + try JSONDecoder().decode(ResumableCacheEntry.self, from: data) + ) + } + } +} diff --git a/Sources/Storage/Resumable/Fingerprint.swift b/Sources/Storage/Resumable/Fingerprint.swift new file mode 100644 index 000000000..1b7c042fa --- /dev/null +++ b/Sources/Storage/Resumable/Fingerprint.swift @@ -0,0 +1,51 @@ +import Foundation + +public struct Fingerprint: Hashable, Sendable { + let value: String + + private static let fingerprintSeparator = "::" + private static let fingerprintParts = 2 + + private var parts: [String] { + value.components(separatedBy: Self.fingerprintSeparator) + } + + var source: String { + parts[0] + } + + var size: Int64 { + Int64(parts[1]) ?? 0 + } + + init(source: String, size: Int64) { + self.value = "\(source)\(Self.fingerprintSeparator)\(size)" + } + + init?(value: String) { + let parts = value.components(separatedBy: Self.fingerprintSeparator) + guard parts.count == Self.fingerprintParts else { return nil } + self.init(source: parts[0], size: Int64(parts[1]) ?? 0) + } +} + +extension Fingerprint: Codable { + public init(from decoder: any Decoder) throws { + let container = try decoder.singleValueContainer() + let value = try container.decode(String.self) + guard let fingerprint = Fingerprint(value: value) else { + throw DecodingError.dataCorrupted( + DecodingError.Context( + codingPath: decoder.codingPath, + debugDescription: "Invalid fingerprint format" + ) + ) + } + self = fingerprint + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.singleValueContainer() + try container.encode(value) + } +} \ No newline at end of file diff --git a/Sources/Storage/Resumable/MemoryResumableCache.swift b/Sources/Storage/Resumable/MemoryResumableCache.swift new file mode 100644 index 000000000..eb50e3bd9 --- /dev/null +++ b/Sources/Storage/Resumable/MemoryResumableCache.swift @@ -0,0 +1,46 @@ +import Foundation + +actor MemoryResumableCache: ResumableCache { + private var storage: [String: Data] = [:] + + init() {} + + func set(fingerprint: Fingerprint, entry: ResumableCacheEntry) async throws { + let data = try JSONEncoder().encode(entry) + storage[fingerprint.value] = data + } + + func get(fingerprint: Fingerprint) async throws -> ResumableCacheEntry? { + guard let data = storage[fingerprint.value] else { + return nil + } + return try JSONDecoder().decode(ResumableCacheEntry.self, from: data) + } + + func remove(fingerprint: Fingerprint) async throws { + storage.removeValue(forKey: fingerprint.value) + } + + func clear() async throws { + storage.removeAll() + } + + func entries() async throws -> [CachePair] { + var pairs: [CachePair] = [] + + for (key, data) in storage { + guard let fingerprint = Fingerprint(value: key) else { + continue + } + + do { + let entry = try JSONDecoder().decode(ResumableCacheEntry.self, from: data) + pairs.append((fingerprint, entry)) + } catch { + continue + } + } + + return pairs + } +} \ No newline at end of file diff --git a/Sources/Storage/Resumable/ResumableCache.swift b/Sources/Storage/Resumable/ResumableCache.swift new file mode 100644 index 000000000..274e55107 --- /dev/null +++ b/Sources/Storage/Resumable/ResumableCache.swift @@ -0,0 +1,33 @@ +import Foundation + +struct ResumableCacheEntry: Codable, Sendable { + let uploadURL: String + let path: String + let bucketId: String + let expiration: Date + let upsert: Bool + let contentType: String? + + enum CodingKeys: String, CodingKey { + case uploadURL = "upload_url" + case path + case bucketId = "bucket_id" + case expiration + case upsert + case contentType = "content_type" + } +} + +typealias CachePair = (Fingerprint, ResumableCacheEntry) + +protocol ResumableCache: Sendable { + func set(fingerprint: Fingerprint, entry: ResumableCacheEntry) async throws + func get(fingerprint: Fingerprint) async throws -> ResumableCacheEntry? + func remove(fingerprint: Fingerprint) async throws + func clear() async throws + func entries() async throws -> [CachePair] +} + +func createDefaultResumableCache() -> some ResumableCache { + DiskResumableCache(storage: FileManager.default) +} diff --git a/Sources/Storage/Resumable/ResumableClient.swift b/Sources/Storage/Resumable/ResumableClient.swift new file mode 100644 index 000000000..dc070583b --- /dev/null +++ b/Sources/Storage/Resumable/ResumableClient.swift @@ -0,0 +1,143 @@ +import Foundation +import HTTPTypes +import Helpers + +#if canImport(FoundationNetworking) + import FoundationNetworking +#endif + +protocol ResumableClient: Sendable { + static var tusVersion: String { get } + + func createUpload( + fingerprint: Fingerprint, + path: String, + bucketId: String, + contentLength: Int64, + contentType: String?, + upsert: Bool, + metadata: [String: String] + ) async throws -> ResumableCacheEntry + + func continueUpload( + fingerprint: Fingerprint, + cacheEntry: ResumableCacheEntry + ) async throws -> ResumableCacheEntry? +} + +extension ResumableClient { + static var tusVersion: String { "1.0.0" } +} + +final class ResumableClientImpl: ResumableClient, @unchecked Sendable { + static let tusVersion = "1.0.0" + + private let storageApi: StorageApi + private let cache: any ResumableCache + + init(storageApi: StorageApi, cache: any ResumableCache) { + self.storageApi = storageApi + self.cache = cache + } + + func createUpload( + fingerprint: Fingerprint, + path: String, + bucketId: String, + contentLength: Int64, + contentType: String?, + upsert: Bool, + metadata: [String: String] + ) async throws -> ResumableCacheEntry { + var uploadMetadata = metadata + uploadMetadata["filename"] = path.components(separatedBy: "/").last ?? path + uploadMetadata["filetype"] = contentType + + let metadataString = + uploadMetadata + .map { "\($0.key) \(Data($0.value.utf8).base64EncodedString())" } + .joined(separator: ",") + + var headers = HTTPFields() + headers[.tusResumable] = Self.tusVersion + headers[.uploadLength] = "\(contentLength)" + headers[.uploadMetadata] = metadataString + headers[.contentType] = "application/offset+octet-stream" + + if upsert { + headers[.xUpsert] = "true" + } + + let request = Helpers.HTTPRequest( + url: storageApi.configuration.url.appendingPathComponent("upload/resumable/\(bucketId)"), + method: .post, + headers: headers + ) + + let response = try await storageApi.execute(request) + + guard let locationHeader = response.headers[.location], + let uploadURL = URL(string: locationHeader) + else { + throw StorageError( + statusCode: nil, + message: "No location header in TUS upload creation response", + error: nil + ) + } + + let expiration = Date().addingTimeInterval(3600) // 1 hour default + let cacheEntry = ResumableCacheEntry( + uploadURL: uploadURL.absoluteString, + path: path, + bucketId: bucketId, + expiration: expiration, + upsert: upsert, + contentType: contentType + ) + + try await cache.set(fingerprint: fingerprint, entry: cacheEntry) + return cacheEntry + } + + func continueUpload( + fingerprint: Fingerprint, + cacheEntry: ResumableCacheEntry + ) async throws -> ResumableCacheEntry? { + guard cacheEntry.expiration > Date() else { + try await cache.remove(fingerprint: fingerprint) + return nil + } + + guard let uploadURL = URL(string: cacheEntry.uploadURL) else { + try await cache.remove(fingerprint: fingerprint) + return nil + } + + var headers = HTTPFields() + headers[.tusResumable] = Self.tusVersion + + let request = Helpers.HTTPRequest( + url: uploadURL, + method: .head, + headers: headers + ) + + do { + _ = try await storageApi.execute(request) + return cacheEntry + } catch { + try await cache.remove(fingerprint: fingerprint) + return nil + } + } +} + +extension HTTPField.Name { + static let tusResumable = Self("tus-resumable")! + static let uploadLength = Self("upload-length")! + static let uploadOffset = Self("upload-offset")! + static let uploadMetadata = Self("upload-metadata")! + static let location = Self("location")! + static let contentType = Self("content-type")! +} diff --git a/Sources/Storage/Resumable/ResumableUpload.swift b/Sources/Storage/Resumable/ResumableUpload.swift new file mode 100644 index 000000000..9d38a89fd --- /dev/null +++ b/Sources/Storage/Resumable/ResumableUpload.swift @@ -0,0 +1,229 @@ +import Foundation +import HTTPTypes + +#if canImport(FoundationNetworking) + import FoundationNetworking +#endif + +public struct ResumableUploadOptions: Sendable { + public let chunkSize: Int64 + public let retryLimit: Int + public let retryDelay: TimeInterval + + public init( + chunkSize: Int64 = 6 * 1024 * 1024, + retryLimit: Int = 3, + retryDelay: TimeInterval = 1.0 + ) { + self.chunkSize = chunkSize + self.retryLimit = retryLimit + self.retryDelay = retryDelay + } +} + +public actor ResumableUpload { + public let state: AsyncStream + + private let fingerprint: Fingerprint + private let data: Data + private let client: any ResumableClient + private let storageApi: StorageApi + private let options: ResumableUploadOptions + private let stateContinuation: AsyncStream.Continuation + + private var isPaused = false + private var isCancelled = false + private var currentOffset: Int64 = 0 + + init( + fingerprint: Fingerprint, + data: Data, + client: any ResumableClient, + storageApi: StorageApi, + options: ResumableUploadOptions + ) { + self.fingerprint = fingerprint + self.data = data + self.client = client + self.storageApi = storageApi + self.options = options + + let (stream, continuation) = AsyncStream.makeStream() + self.state = stream + self.stateContinuation = continuation + } + + deinit { + stateContinuation.finish() + } + + public func pause() { + isPaused = true + } + + public func cancel() { + isCancelled = true + stateContinuation.finish() + } + + public func start() async throws { + do { + try await performUpload() + } catch { + stateContinuation.finish() + throw error + } + } + + private func performUpload() async throws { + let cacheEntry = try await getCacheEntry() + guard let uploadURL = URL(string: cacheEntry.uploadURL) else { + throw StorageError(statusCode: nil, message: "Invalid upload URL", error: nil) + } + + currentOffset = try await getUploadOffset(url: uploadURL) + + while currentOffset < data.count && !isCancelled { + if isPaused { + await emitState(cacheEntry: cacheEntry, paused: true) + try await waitForResume() + continue + } + + let chunkSize = min(options.chunkSize, Int64(data.count) - currentOffset) + let chunk = data.subdata(in: Int(currentOffset)..= data.count && !isCancelled { + await emitState(cacheEntry: cacheEntry, paused: false) + stateContinuation.finish() + } + } + + private func getCacheEntry() async throws -> ResumableCacheEntry { + if let existingEntry = try await client.continueUpload( + fingerprint: fingerprint, + cacheEntry: try await getCachedEntry() + ) { + return existingEntry + } + + return try await client.createUpload( + fingerprint: fingerprint, + path: fingerprint.source, + bucketId: "default", + contentLength: Int64(data.count), + contentType: "application/octet-stream", + upsert: false, + metadata: [:] + ) + } + + private func getCachedEntry() async throws -> ResumableCacheEntry { + // This is a placeholder - in real implementation, you'd get this from cache + // For now, create a minimal entry to trigger creation + return ResumableCacheEntry( + uploadURL: "", + path: fingerprint.source, + bucketId: "default", + expiration: Date(), + upsert: false, + contentType: "application/octet-stream" + ) + } + + private func getUploadOffset(url: URL) async throws -> Int64 { + var request = Helpers.HTTPRequest(url: url, method: .head) + request.headers[.tusResumable] = ResumableClientImpl.tusVersion + + let response = try await storageApi.execute(request) + + guard + let offsetHeader = response.headers[.uploadOffset], + let offset = Int64(offsetHeader) + else { + return 0 + } + + return offset + } + + private func uploadChunk(chunk: Data, offset: Int64, url: URL) async throws { + var request = Helpers.HTTPRequest(url: url, method: .patch) + request.headers[.tusResumable] = ResumableClientImpl.tusVersion + request.headers[.contentType] = "application/offset+octet-stream" + request.headers[.uploadOffset] = "\(offset)" + request.body = chunk + + _ = try await storageApi.execute(request) + } + + private func emitState(cacheEntry: ResumableCacheEntry, paused: Bool) async { + let status = UploadStatus( + totalBytesSent: currentOffset, + contentLength: Int64(data.count) + ) + + let uploadState = ResumableUploadState( + fingerprint: fingerprint, + cacheEntry: cacheEntry, + status: status, + paused: paused + ) + + stateContinuation.yield(uploadState) + } + + private func waitForResume() async throws { + while isPaused && !isCancelled { + try await Task.sleep(nanoseconds: 100_000_000) // 100ms + } + } +} + +extension StorageFileApi { + public func resumableUpload( + path: String, + data: Data, + options: ResumableUploadOptions = ResumableUploadOptions() + ) throws -> ResumableUpload { + let fingerprint = Fingerprint(source: path, size: Int64(data.count)) + let cache = createDefaultResumableCache() + let client = ResumableClientImpl(storageApi: self, cache: cache) + + return ResumableUpload( + fingerprint: fingerprint, + data: data, + client: client, + storageApi: self, + options: options + ) + } +} diff --git a/Sources/Storage/Resumable/ResumableUploadState.swift b/Sources/Storage/Resumable/ResumableUploadState.swift new file mode 100644 index 000000000..a6bc337b5 --- /dev/null +++ b/Sources/Storage/Resumable/ResumableUploadState.swift @@ -0,0 +1,47 @@ +import Foundation + +public struct UploadStatus: Sendable { + public let totalBytesSent: Int64 + public let contentLength: Int64 + + public init(totalBytesSent: Int64, contentLength: Int64) { + self.totalBytesSent = totalBytesSent + self.contentLength = contentLength + } +} + +public struct ResumableUploadState: Sendable { + public let fingerprint: Fingerprint + public let status: UploadStatus + public let paused: Bool + private let cacheEntry: ResumableCacheEntry + + public var path: String { + cacheEntry.path + } + + public var bucketId: String { + cacheEntry.bucketId + } + + public var isDone: Bool { + status.totalBytesSent >= status.contentLength + } + + public var progress: Float { + guard status.contentLength > 0 else { return 0.0 } + return Float(status.totalBytesSent) / Float(status.contentLength) + } + + init( + fingerprint: Fingerprint, + cacheEntry: ResumableCacheEntry, + status: UploadStatus, + paused: Bool + ) { + self.fingerprint = fingerprint + self.cacheEntry = cacheEntry + self.status = status + self.paused = paused + } +} \ No newline at end of file