Skip to content

Commit

Permalink
update stream protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
loganwright committed May 3, 2017
1 parent 8f097e4 commit 7d956ea
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Sources/Redis/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public final class Client<StreamType: DuplexStream> {
public func command(_ command: Command, _ params: [Bytes] = []) throws -> Data? {
let query = format(command, params)
try serializer.serialize(query)
try serializer.flush()

let res = try parser.parse()
if let data = res, case .error(let e) = data {
throw e
Expand Down
2 changes: 0 additions & 2 deletions Sources/Redis/Pipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ public class Pipeline<StreamType: DuplexStream> {
throw RedisError.pipelineCommandsRequired
}

try client.serializer.flush()

var responses: [Data?] = []
for _ in 0..<queuedCommands {
let data = try client.parser.parse()
Expand Down
6 changes: 1 addition & 5 deletions Sources/Redis/Serializer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ public final class Serializer<StreamType: DuplexStream> {
/// the Serializer's stream
public func serialize(_ r: Data) throws {
let bytes = makeBytes(from: r)
try stream.write(bytes)
}

public func flush() throws {
try stream.flush()
_ = try stream.write(bytes)
}

/// Convert the Redis Data into bytes
Expand Down
16 changes: 6 additions & 10 deletions Sources/Redis/StreamBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ final class StreamBuffer<Stream: DuplexStream>: DuplexStream {
private let size: Int

private var readIterator: IndexingIterator<[Byte]>
private var writeBuffer: Bytes

var isClosed: Bool {
return stream.isClosed
Expand All @@ -33,7 +32,6 @@ final class StreamBuffer<Stream: DuplexStream>: DuplexStream {
self.stream = stream

readIterator = Bytes().makeIterator()
writeBuffer = []
}

/// Reads the next byte from the buffer
Expand Down Expand Up @@ -89,14 +87,12 @@ final class StreamBuffer<Stream: DuplexStream>: DuplexStream {
}

/// write bytes to the buffer stream
func write(_ bytes: Bytes) throws {
writeBuffer += bytes
func write(_ bytes: Bytes) throws -> Int {
return try write(max: bytes.count, from: bytes)
}

func flush() throws {
guard !writeBuffer.isEmpty else { return }
try stream.write(writeBuffer)
try stream.flush()
writeBuffer = []

func write(max: Int, from buffer: Bytes) throws -> Int {
return try stream.write(max: max, from: buffer)
}

}

0 comments on commit 7d956ea

Please sign in to comment.