Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
Abstract storage component, providing a shared API surface for file storage drivers written in Swift.

[
![Release: 1.0.0-beta.1](https://img.shields.io/badge/Release-1%2E0%2E0--beta%2E1-F05138)
![Release: 1.0.0-beta.2](https://img.shields.io/badge/Release-1%2E0%2E0--beta%2E2-F05138)
](
https://github.com/feather-framework/feather-storage/releases/tag/1.0.0-beta.1
https://github.com/feather-framework/feather-storage/releases/tag/1.0.0-beta.2
)

## Features
Expand Down Expand Up @@ -35,7 +35,7 @@ Abstract storage component, providing a shared API surface for file storage driv
Use Swift Package Manager; add the dependency to your `Package.swift` file:

```swift
.package(url: "https://github.com/feather-framework/feather-storage", exact: "1.0.0-beta.1"),
.package(url: "https://github.com/feather-framework/feather-storage", exact: "1.0.0-beta.2"),
```

Then add `FeatherStorage` to your target dependencies:
Expand Down
54 changes: 54 additions & 0 deletions Sources/FeatherStorage/ByteBufferSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// ByteBufferSequence.swift
// feather-storage
//
// Created by Tibor Bödecs on 2023. 01. 16.

import NIOCore

/// An async sequence that streams a `ByteBuffer` in fixed-size chunks.
public struct ByteBufferSequence: AsyncSequence, Sendable {
private let buffer: ByteBuffer
private let chunkSize: Int

/// Creates a chunked byte buffer async sequence.
///
/// - Parameters:
/// - buffer: The source buffer to stream from.
/// - chunkSize: The maximum number of bytes emitted per iteration.
public init(
buffer: ByteBuffer,
chunkSize: Int = 32 * 1024
) {
self.buffer = buffer
self.chunkSize = chunkSize
}

/// The async iterator for `ByteBufferSequence`.
public struct AsyncIterator: AsyncIteratorProtocol {
var buffer: ByteBuffer
let chunkSize: Int

/// Returns the next chunk from the underlying buffer.
///
/// - Returns: A buffer slice up to `chunkSize` bytes, or `nil` when the stream is exhausted.
public mutating func next() async -> ByteBuffer? {
guard buffer.readableBytes > 0 else {
return nil
}
return buffer.readSlice(
length: Swift.min(chunkSize, buffer.readableBytes)
)
}
}

/// Creates an async iterator over the byte buffer chunks.
///
/// - Returns: A new async iterator instance.
public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(
buffer: buffer,
chunkSize: chunkSize
)
}
}
18 changes: 18 additions & 0 deletions Sources/FeatherStorage/StorageSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ public struct StorageSequence: Sendable, AsyncSequence {
}
}

/// Creates a type-erased storage sequence from a byte buffer.
///
/// - Parameters:
/// - buffer: The underlying byte buffer.
/// - chunkSize: The maximum number of bytes emitted per iteration.
public init(
buffer: ByteBuffer,
chunkSize: Int = 32 * 1024
) {
self.init(
asyncSequence: ByteBufferSequence(
buffer: buffer,
chunkSize: chunkSize
),
length: UInt64(buffer.readableBytes)
)
}

/// Creates an async iterator for consuming the storage sequence.
///
/// - Returns: A new `AsyncIterator` instance.
Expand Down
52 changes: 52 additions & 0 deletions Tests/FeatherStorageTests/ByteBufferSequenceTestSuite.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// ByteBufferSequenceTestSuite.swift
// feather-storage
//
// Created by Tibor Bodecs on 2023. 01. 16.

import NIOCore
import Testing

@testable import FeatherStorage

@Suite
struct ByteBufferSequenceTestSuite {

@Test
func yieldsChunksUsingConfiguredChunkSize() async {
let allocator = ByteBufferAllocator()
var buffer = allocator.buffer(capacity: 10)
buffer.writeBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

let sequence = ByteBufferSequence(buffer: buffer, chunkSize: 4)
var iterator = sequence.makeAsyncIterator()

let first = await iterator.next()
let second = await iterator.next()
let third = await iterator.next()
let end = await iterator.next()

#expect(Self.readBytes(first) == [1, 2, 3, 4])
#expect(Self.readBytes(second) == [5, 6, 7, 8])
#expect(Self.readBytes(third) == [9, 10])
#expect(end == nil)
}

@Test
func emptyBufferReturnsNilImmediately() async {
let allocator = ByteBufferAllocator()
let buffer = allocator.buffer(capacity: 0)

let sequence = ByteBufferSequence(buffer: buffer)
var iterator = sequence.makeAsyncIterator()

#expect(await iterator.next() == nil)
}

private static func readBytes(_ buffer: ByteBuffer?) -> [UInt8] {
guard var value = buffer else {
return []
}
return value.readBytes(length: value.readableBytes) ?? []
}
}
111 changes: 111 additions & 0 deletions Tests/FeatherStorageTests/StorageSequenceTestSuite.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//
// StorageSequenceTestSuite.swift
// feather-storage
//
// Created by Tibor Bodecs on 2023. 01. 16.

import NIOCore
import Testing

@testable import FeatherStorage

@Suite
struct StorageSequenceTestSuite {

enum TestError: Error {
case failed
}

@Test
func initFromAsyncSequencePreservesElementsAndLength() async throws {
let allocator = ByteBufferAllocator()
let sequence = StorageSequence(
asyncSequence: AsyncStream { continuation in
continuation.yield(
Self.makeBuffer([1, 2], allocator: allocator)
)
continuation.yield(Self.makeBuffer([3], allocator: allocator))
continuation.finish()
},
length: 3
)

var iterator = sequence.makeAsyncIterator()
let first = try await iterator.next()
let second = try await iterator.next()
let end = try await iterator.next()

#expect(sequence.length == 3)
#expect(Self.readBytes(first) == [1, 2])
#expect(Self.readBytes(second) == [3])
#expect(end == nil)
}

@Test
func initFromAsyncSequenceUsesNilLengthByDefault() {
let sequence = StorageSequence(
asyncSequence: AsyncStream<ByteBuffer> { continuation in
continuation.finish()
}
)

#expect(sequence.length == nil)
}

@Test
func initFromBufferSetsLengthAndStreamsAllBytes() async throws {
let allocator = ByteBufferAllocator()
let sequence = StorageSequence(
buffer: Self.makeBuffer([9, 8, 7, 6], allocator: allocator)
)

var iterator = sequence.makeAsyncIterator()
let first = try await iterator.next()
let end = try await iterator.next()

#expect(sequence.length == 4)
#expect(Self.readBytes(first) == [9, 8, 7, 6])
#expect(end == nil)
}

@Test
func initFromThrowingSequencePropagatesErrors() async {
let allocator = ByteBufferAllocator()
let sequence = StorageSequence(
asyncSequence: AsyncThrowingStream<ByteBuffer, Error> {
continuation in
continuation.yield(Self.makeBuffer([1], allocator: allocator))
continuation.finish(throwing: TestError.failed)
}
)

var iterator = sequence.makeAsyncIterator()
do {
_ = try await iterator.next()
_ = try await iterator.next()
Issue.record("Expected TestError.failed")
}
catch TestError.failed {
// expected
}
catch {
Issue.record("Unexpected error: \(error)")
}
}

private static func makeBuffer(
_ bytes: [UInt8],
allocator: ByteBufferAllocator
) -> ByteBuffer {
var buffer = allocator.buffer(capacity: bytes.count)
buffer.writeBytes(bytes)
return buffer
}

private static func readBytes(_ buffer: ByteBuffer?) -> [UInt8] {
guard var value = buffer else {
return []
}
return value.readBytes(length: value.readableBytes) ?? []
}
}