Skip to content

Commit c64e345

Browse files
committed
Add reconnect on errors in read loop
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
1 parent a6c986c commit c64e345

File tree

6 files changed

+31
-15
lines changed

6 files changed

+31
-15
lines changed

Sources/NatsSwift/Extensions/Data+Parser.swift

+2-10
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ extension Data {
6767
self = other + self
6868
}
6969

70-
internal func parseOutMessages() -> (ops: [ServerOp], remainder: Data?) {
70+
internal func parseOutMessages() throws -> (ops: [ServerOp], remainder: Data?) {
7171
var serverOps = [ServerOp]()
7272
var startIndex = self.startIndex
7373
var remainder: Data?
@@ -88,15 +88,7 @@ extension Data {
8888
continue
8989
}
9090

91-
let serverOp: ServerOp
92-
do {
93-
serverOp = try ServerOp.parse(from: lineData)
94-
} catch {
95-
// TODO(pp): handle this error properly (maybe surface in throw)
96-
logger.error("Error parsing message: \(error)")
97-
startIndex = nextLineStartIndex
98-
continue
99-
}
91+
let serverOp = try ServerOp.parse(from: lineData)
10092

10193
// if it's a message, get the full payload and add to returned data
10294
if case .Message(var msg) = serverOp {

Sources/NatsSwift/Extensions/String+Utilities.swift

+5
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@
66
import Foundation
77

88
extension String {
9+
private static let charactersToTrim: CharacterSet = .whitespacesAndNewlines.union(CharacterSet(charactersIn: "'"))
910

1011
static func hash() -> String {
1112
let uuid = String.uuid()
1213
return uuid[0...7]
1314
}
15+
16+
func trimWhitespacesAndApostrophes() -> String {
17+
return self.trimmingCharacters(in: String.charactersToTrim)
18+
}
1419

1520
static func uuid() -> String {
1621
return UUID().uuidString.trimmingCharacters(in: .punctuationCharacters)

Sources/NatsSwift/NatsConnection.swift

+17-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,15 @@ class ConnectionHandler: ChannelInboundHandler {
4949
}
5050

5151
self.parseRemainder = nil
52-
let parseResult = inputChunk.parseOutMessages()
52+
let parseResult: (ops: [ServerOp], remainder: Data?)
53+
do {
54+
parseResult = try inputChunk.parseOutMessages()
55+
} catch {
56+
// if parsing throws an error, return and reconnect
57+
inputBuffer.clear()
58+
context.fireErrorCaught(error)
59+
return
60+
}
5361
if let remainder = parseResult.remainder {
5462
self.parseRemainder = remainder
5563
}
@@ -96,6 +104,14 @@ class ConnectionHandler: ChannelInboundHandler {
96104
self.outstandingPings.store(0, ordering: AtomicStoreOrdering.relaxed)
97105
case let .Error(err):
98106
logger.debug("error \(err)")
107+
108+
let normalizedError = err.normalizedError
109+
// on some errors, force reconnect
110+
if normalizedError == "stale connection" || normalizedError == "maximum connections exceeded" {
111+
inputBuffer.clear()
112+
context.fireErrorCaught(err)
113+
}
114+
// TODO(pp): handle auth errors here
99115
case let .Message(msg):
100116
self.handleIncomingMessage(msg)
101117
case let .HMessage(msg):

Sources/NatsSwift/NatsError.swift

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ protocol NatsError: Error {
1010

1111
struct NatsConnectionError: NatsError {
1212
var description: String
13+
var normalizedError: String {
14+
return description.trimWhitespacesAndApostrophes().lowercased()
15+
}
1316
init(_ description: String) {
1417
self.description = description
1518
}

Sources/NatsSwift/NatsProto.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ enum ServerOp {
3434
case Info(ServerInfo)
3535
case Ping
3636
case Pong
37-
case Error(NatsError)
37+
case Error(NatsConnectionError)
3838
case Message(MessageInbound)
3939
case HMessage(HMessageInbound)
40-
40+
4141
static func parse(from message: Data) throws -> ServerOp {
4242
guard message.count > 2 else {
4343
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "unable to parse inbound message: \(message)"])
@@ -53,7 +53,7 @@ enum ServerOp {
5353
case .ok:
5454
return Ok
5555
case .error:
56-
if let errMsg = message.toString() {
56+
if let errMsg = message.removePrefix(Data(NatsOperation.error.rawBytes)).toString() {
5757
return Error(NatsConnectionError(errMsg))
5858
}
5959
return Error(NatsConnectionError("unexpected error"))

Tests/NatsSwiftTests/Unit/ParserTests.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class ParserTests: XCTestCase {
9696
if let prevRemainder {
9797
chunkData.prepend(prevRemainder)
9898
}
99-
let res = chunkData.parseOutMessages()
99+
let res = try! chunkData.parseOutMessages()
100100
prevRemainder = res.remainder
101101
ops.append(contentsOf: res.ops)
102102
}

0 commit comments

Comments
 (0)