Skip to content

Commit

Permalink
engine updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tanner0101 committed May 1, 2017
1 parent d2057fa commit 8f097e4
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Sources/Redis/Parser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Transport

/// Parses Redis Data from a Stream
public final class Parser<StreamType: DuplexStream> {
public let stream: StreamBuffer<StreamType>
let stream: StreamBuffer<StreamType>
public init(_ stream: StreamType) {
self.stream = StreamBuffer(stream)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Redis/Serializer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Transport

/// Serializes Redis Data to a Stream
public final class Serializer<StreamType: DuplexStream> {
public let stream: StreamBuffer<StreamType>
let stream: StreamBuffer<StreamType>
public init(_ stream: StreamType) {
self.stream = StreamBuffer(stream)
}
Expand Down
102 changes: 102 additions & 0 deletions Sources/Redis/StreamBuffer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import Core
import Dispatch
import Transport

/// Buffers receive and send calls to a Stream.
///
/// Receive calls are buffered by the size used to initialize
/// the buffer.
///
/// Send calls are buffered until `flush()` is called.
final class StreamBuffer<Stream: DuplexStream>: DuplexStream {
private let stream: Stream
private let size: Int

private var readIterator: IndexingIterator<[Byte]>
private var writeBuffer: Bytes

var isClosed: Bool {
return stream.isClosed
}

func setTimeout(_ timeout: Double) throws {
try stream.setTimeout(timeout)
}

func close() throws {
try stream.close()
}

/// create a buffer steam with a chunk size
init(_ stream: Stream, size: Int = 2048) {
self.size = size
self.stream = stream

readIterator = Bytes().makeIterator()
writeBuffer = []
}

/// Reads the next byte from the buffer
func readByte() throws -> Byte? {
guard let next = readIterator.next() else {
readIterator = try stream.read(max: size).makeIterator()
return readIterator.next()
}
return next
}

/// reads a chunk of bytes from the buffer
/// less than max
func read(max: Int, into buffer: inout Bytes) throws -> Int {
var bytes = readIterator.array

// while the byte count is below max
// continue fetching, until the stream is empty
while bytes.count < max {
let more = max - bytes.count
let new = try stream.read(max: more < size ? size : more)
bytes += new
if new.count == 0 {
break
}
}

// if byte count is below max,
// set that as the cap
let cap = bytes.count > max
? max
: bytes.count

// pull out the result array
let result = bytes[0..<cap].array

if cap >= bytes.count {
// if returning all bytes,
// create empty iterator
readIterator = [].makeIterator()
} else {
// if not returning all bytes,
// create an iterator with remaining
let remaining = bytes[cap..<bytes.count]
readIterator = remaining
.array
.makeIterator()
}

// return requested bytes
buffer = result
return result.count
}

/// write bytes to the buffer stream
func write(_ bytes: Bytes) throws {
writeBuffer += bytes
}

func flush() throws {
guard !writeBuffer.isEmpty else { return }
try stream.write(writeBuffer)
try stream.flush()
writeBuffer = []
}
}

0 comments on commit 8f097e4

Please sign in to comment.