diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 282dc5f..dd9aac1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: test: strategy: matrix: - os: [macos-14, ubuntu-22.04] + os: [macos-15, ubuntu-22.04] name: Test runs-on: ${{ matrix.os }} environment: ci diff --git a/Package.swift b/Package.swift index a37364c..e38648b 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.5 +// swift-tools-version:6.0 import PackageDescription #if os(macOS) diff --git a/unxip.swift b/unxip.swift index b52af6e..4ae09ae 100644 --- a/unxip.swift +++ b/unxip.swift @@ -1,3 +1,6 @@ +#if !canImport(Glibc) + @preconcurrency import unistd // optind +#endif import Foundation #if canImport(Compression) @@ -5,7 +8,7 @@ import Foundation #else import FoundationXML import GNUSource - import getopt + @preconcurrency import getopt // optind import lzma import zlib #endif @@ -112,13 +115,52 @@ struct Queue { } } -extension AsyncThrowingStream where Failure == Error { - actor PermissiveActionLink where S.Element == Element { +protocol ErasedIterator: AsyncIteratorProtocol, Sendable { +} + +public struct ErasedSequence: AsyncSequence { + struct ErasedButBarelyLikeWithThosePinkPearlThingsSequence: AsyncSequence where S.AsyncIterator: Sendable { + struct Iterator: ErasedIterator { + var iterator: S.AsyncIterator + + mutating func next() async throws -> S.Element? { + try await iterator.next() + } + } + + let sequence: S + + func makeAsyncIterator() -> Iterator { + .init(iterator: sequence.makeAsyncIterator()) + } + } + + public struct Iterator: AsyncIteratorProtocol, Sendable { + var iterator: any ErasedIterator + + public mutating func next() async throws -> Element? { + try await iterator.next() + } + } + + let iterator: any ErasedIterator + + init(sequence: S) where S.Element == Element, S.AsyncIterator: Sendable { + iterator = ErasedButBarelyLikeWithThosePinkPearlThingsSequence(sequence: sequence).makeAsyncIterator() + } + + public func makeAsyncIterator() -> Iterator { + .init(iterator: iterator) + } +} + +extension AsyncThrowingStream where Element: Sendable, Failure == Error { + actor PermissiveActionLink where S.Element == Element, S.AsyncIterator: Sendable { var iterator: S.AsyncIterator let count: Int var queued = [CheckedContinuation]() - init(iterator: S.AsyncIterator, count: Int) { + init(iterator: sending S.AsyncIterator, count: Int) { self.iterator = iterator self.count = count } @@ -150,13 +192,6 @@ extension AsyncThrowingStream where Failure == Error { queued.removeAll() } } - - init(erasing sequence: S) where S.Element == Element { - var iterator = sequence.makeAsyncIterator() - self.init { - try await iterator.next() - } - } } protocol BackpressureProvider { @@ -210,7 +245,7 @@ final class FileBackpressure: BackpressureProvider { } } -actor BackpressureStream: AsyncSequence where Backpressure.Element == Element { +actor BackpressureStream: AsyncSequence where Backpressure.Element == Element { struct Iterator: AsyncIteratorProtocol { let stream: BackpressureStream @@ -309,30 +344,11 @@ actor BackpressureStream: AsyncSequ } } -actor ConcurrentStream { - class Wrapper { - var stream: AsyncThrowingStream! - var continuation: AsyncThrowingStream.Continuation! - } +actor ConcurrentStream { + let results: AsyncThrowingStream + let continuation: AsyncThrowingStream.Continuation - let wrapper = Wrapper() let batchSize: Int - nonisolated var results: AsyncThrowingStream { - get { - wrapper.stream - } - set { - wrapper.stream = newValue - } - } - nonisolated var continuation: AsyncThrowingStream.Continuation { - get { - wrapper.continuation - } - set { - wrapper.continuation = newValue - } - } var index = -1 var finishedIndex = Int?.none var completedIndex = -1 @@ -341,9 +357,7 @@ actor ConcurrentStream { init(batchSize: Int = 2 * ProcessInfo.processInfo.activeProcessorCount, consumeResults: Bool = false) { self.batchSize = batchSize - results = AsyncThrowingStream { - continuation = $0 - } + (results, continuation) = AsyncThrowingStream.makeStream(of: Element.self, throwing: Error.self) if consumeResults { Task { for try await _ in results { @@ -531,7 +545,7 @@ public struct DataReader where S.Element: RandomAccessCollecti } } -extension DataReader where S == AsyncThrowingStream<[UInt8], Error> { +extension DataReader where S == ErasedSequence<[UInt8]> { public init(descriptor: CInt) { self.init(data: Self.data(readingFrom: descriptor)) } @@ -580,13 +594,12 @@ extension DataReader where S == AsyncThrowingStream<[UInt8], Error> { #if PROFILING os_signpost(.end, log: readLog, name: "Read", signpostID: id, "Ended read") #endif - let chunk = chunk + let chunk = [UInt8](unsafeUninitializedCapacity: chunk.count) { buffer, count in + _ = chunk.copyBytes(to: buffer, from: nil) + count = chunk.count + } Task { - await stream.yield( - [UInt8](unsafeUninitializedCapacity: chunk.count) { buffer, count in - _ = chunk.copyBytes(to: buffer, from: nil) - count = chunk.count - }) + await stream.yield(chunk) continuation.resume(returning: true) } } @@ -596,7 +609,7 @@ extension DataReader where S == AsyncThrowingStream<[UInt8], Error> { } } - return .init(erasing: stream) + return .init(sequence: stream) } } @@ -615,7 +628,7 @@ public struct Chunk: Sendable { } } -public struct File { +public struct File: Sendable { public let dev: Int public let ino: Int public let mode: Int @@ -819,9 +832,9 @@ public struct File { public protocol StreamAperture { associatedtype Input associatedtype Next: StreamAperture - associatedtype Options + associatedtype Options: Sendable - static func transform(_: Input, options: Options?) -> Next.Input + static func transform(_: sending Input, options: Options?) -> Next.Input } protocol Decompressor { @@ -871,9 +884,9 @@ public enum XIP: StreamAperture where S.Element: RandomAccessC public typealias Input = DataReader public typealias Next = Chunks - public struct Options { - let zlibDecompressor: ([UInt8], Int) throws -> [UInt8] - let lzmaDecompressor: ([UInt8], Int) throws -> [UInt8] + public struct Options: Sendable { + let zlibDecompressor: @Sendable ([UInt8], Int) throws -> [UInt8] + let lzmaDecompressor: @Sendable ([UInt8], Int) throws -> [UInt8] init(zlibDecompressor: Zlib.Type, lzmaDecompressor: LZMA.Type) { self.zlibDecompressor = Zlib.decompress @@ -953,7 +966,7 @@ public enum XIP: StreamAperture where S.Element: RandomAccessC file.cap = file.position + contentSize } - public static func transform(_ data: Input, options: Options?) -> Next.Input { + public static func transform(_ data: sending Input, options: Options?) -> Next.Input { let options = options ?? Self.defaultOptions let decompressionStream = ConcurrentStream(consumeResults: true) @@ -1007,17 +1020,17 @@ public enum XIP: StreamAperture where S.Element: RandomAccessC await decompressionStream.finish() } - return .init(erasing: chunkStream) + return .init(sequence: chunkStream) } } public enum Chunks: StreamAperture { - public typealias Input = AsyncThrowingStream + public typealias Input = ErasedSequence public typealias Next = Files public typealias Options = Never - public static func transform(_ chunks: Input, options: Options?) -> Next.Input { + public static func transform(_ chunks: sending Input, options: Options?) -> Next.Input { let fileStream = BackpressureStream(backpressure: FileBackpressure(maxSize: 1_000_000_000), of: File.self) Task { var iterator = chunks.makeAsyncIterator() @@ -1038,9 +1051,11 @@ public enum Chunks: StreamAperture { } func readOctal(from bytes: [UInt8]) throws -> Int { - try UnxipError.throw(.invalid, ifNil: String(data: Data(bytes), encoding: .utf8).map { - Int($0, radix: 8) - } ?? nil) + try UnxipError.throw( + .invalid, + ifNil: String(data: Data(bytes), encoding: .utf8).map { + Int($0, radix: 8) + } ?? nil) } while true { @@ -1059,7 +1074,7 @@ public enum Chunks: StreamAperture { var filesize = try readOctal(from: await read(size: 11)) let _name = try await read(size: namesize) try UnxipError.throw(.invalid, if: _name.last != 0) - let name = String(cString: _name) + let name = String(decoding: _name.dropLast(), as: UTF8.self) var file = File(dev: dev, ino: ino, mode: mode, name: name) while filesize > 0 { @@ -1095,15 +1110,15 @@ public enum Chunks: StreamAperture { await fileStream.yield(file) } } - return .init(erasing: fileStream) + return .init(sequence: fileStream) } } public enum Files: StreamAperture { - public typealias Input = AsyncThrowingStream + public typealias Input = ErasedSequence public typealias Next = Disk - public struct Options { + public struct Options: Sendable { public let compress: Bool public let dryRun: Bool @@ -1117,7 +1132,7 @@ public enum Files: StreamAperture { .init(compress: true, dryRun: false) } - public static func transform(_ files: Input, options: Options?) -> Next.Input { + public static func transform(_ files: sending Input, options: Options?) -> Next.Input { let options = options ?? Self.defaultOptions let taskStream = ConcurrentStream() @@ -1315,12 +1330,12 @@ public enum Files: StreamAperture { completion.completionStream.finish() } - return .init(erasing: completion.completionStream) + return .init(sequence: completion.completionStream) } } public enum Disk: StreamAperture { - public typealias Input = AsyncThrowingStream + public typealias Input = ErasedSequence public typealias Next = Disk // Irrelevant because this is never used public typealias Options = Never @@ -1353,15 +1368,16 @@ public struct UnxipStream { } public struct Unxip { - public static func makeStream(from start: UnxipStream, to end: UnxipStream, input: Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil, _ option3: Start.Next.Next.Options? = nil) -> End.Input where Start.Next.Next.Next == End { + public static func makeStream(from start: UnxipStream, to end: UnxipStream, input: sending Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil, _ option3: Start.Next.Next.Options? = nil) -> End.Input where Start.Next.Next.Next == End { Start.Next.Next.transform(Start.Next.transform(Start.transform(input, options: option1), options: option2), options: option3) } - public static func makeStream(from start: UnxipStream, to end: UnxipStream, input: Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil) -> End.Input where Start.Next.Next == End { - Start.Next.transform(Start.transform(input, options: option1), options: option2) + public static func makeStream(from start: UnxipStream, to end: UnxipStream, input: sending Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil) -> End.Input where Start.Next.Next == End { + let input = Start.transform(input, options: option1) + return Start.Next.transform(input, options: option2) } - public static func makeStream(from start: UnxipStream, to end: UnxipStream, input: Start.Input, _ option1: Start.Options? = nil) -> End.Input where Start.Next == End { + public static func makeStream(from start: UnxipStream, to end: UnxipStream, input: sending Start.Input, _ option1: Start.Options? = nil) -> End.Input where Start.Next == End { Start.transform(input, options: option1) } @@ -1371,7 +1387,7 @@ public struct Unxip { } } -extension AsyncSequence { +extension AsyncSequence where Element: Sendable, AsyncIterator: Sendable { public func lockstepSplit() -> (AsyncThrowingStream, AsyncThrowingStream) { let pal = AsyncThrowingStream.PermissiveActionLink(iterator: makeAsyncIterator(), count: 2)