From 8f097e4f339fdafc31fa07def6d6cdcb8ac19c18 Mon Sep 17 00:00:00 2001 From: tanner0101 Date: Mon, 1 May 2017 19:32:35 +0100 Subject: [PATCH] engine updates --- Sources/Redis/Parser.swift | 2 +- Sources/Redis/Serializer.swift | 2 +- Sources/Redis/StreamBuffer.swift | 102 +++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 Sources/Redis/StreamBuffer.swift diff --git a/Sources/Redis/Parser.swift b/Sources/Redis/Parser.swift index 40886c2..f14ff0a 100644 --- a/Sources/Redis/Parser.swift +++ b/Sources/Redis/Parser.swift @@ -2,7 +2,7 @@ import Transport /// Parses Redis Data from a Stream public final class Parser { - public let stream: StreamBuffer + let stream: StreamBuffer public init(_ stream: StreamType) { self.stream = StreamBuffer(stream) } diff --git a/Sources/Redis/Serializer.swift b/Sources/Redis/Serializer.swift index 74d7b46..06f0a57 100644 --- a/Sources/Redis/Serializer.swift +++ b/Sources/Redis/Serializer.swift @@ -2,7 +2,7 @@ import Transport /// Serializes Redis Data to a Stream public final class Serializer { - public let stream: StreamBuffer + let stream: StreamBuffer public init(_ stream: StreamType) { self.stream = StreamBuffer(stream) } diff --git a/Sources/Redis/StreamBuffer.swift b/Sources/Redis/StreamBuffer.swift new file mode 100644 index 0000000..98cb562 --- /dev/null +++ b/Sources/Redis/StreamBuffer.swift @@ -0,0 +1,102 @@ +import Core +import Dispatch +import Transport + +/// Buffers receive and send calls to a Stream. +/// +/// Receive calls are buffered by the size used to initialize +/// the buffer. +/// +/// Send calls are buffered until `flush()` is called. +final class StreamBuffer: DuplexStream { + private let stream: Stream + private let size: Int + + private var readIterator: IndexingIterator<[Byte]> + private var writeBuffer: Bytes + + var isClosed: Bool { + return stream.isClosed + } + + func setTimeout(_ timeout: Double) throws { + try stream.setTimeout(timeout) + } + + func close() throws { + try stream.close() + } + + /// create a buffer steam with a chunk size + init(_ stream: Stream, size: Int = 2048) { + self.size = size + self.stream = stream + + readIterator = Bytes().makeIterator() + writeBuffer = [] + } + + /// Reads the next byte from the buffer + func readByte() throws -> Byte? { + guard let next = readIterator.next() else { + readIterator = try stream.read(max: size).makeIterator() + return readIterator.next() + } + return next + } + + /// reads a chunk of bytes from the buffer + /// less than max + func read(max: Int, into buffer: inout Bytes) throws -> Int { + var bytes = readIterator.array + + // while the byte count is below max + // continue fetching, until the stream is empty + while bytes.count < max { + let more = max - bytes.count + let new = try stream.read(max: more < size ? size : more) + bytes += new + if new.count == 0 { + break + } + } + + // if byte count is below max, + // set that as the cap + let cap = bytes.count > max + ? max + : bytes.count + + // pull out the result array + let result = bytes[0..= bytes.count { + // if returning all bytes, + // create empty iterator + readIterator = [].makeIterator() + } else { + // if not returning all bytes, + // create an iterator with remaining + let remaining = bytes[cap..