From 436ca4a8cbbaeaeb5d21c14e23647d194774e6da Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 31 Oct 2024 08:09:34 +0100 Subject: [PATCH] Add fetching messages from JetStream Signed-off-by: Piotr Piotrowski Co-authored-by: Tomasz Pietrek --- Sources/JetStream/Consumer+Pull.swift | 220 ++++++++++ Sources/JetStream/Consumer.swift | 2 +- .../JetStream/JetStreamContext+Stream.swift | 4 +- Sources/JetStream/JetStreamContext.swift | 10 +- Sources/JetStream/JetStreamError.swift | 68 +++ Sources/JetStream/JetStreamMessage.swift | 193 ++++++++ Sources/Nats/NatsMessage.swift | 18 +- .../Integration/ConsumerTests.swift | 415 ++++++++++++++++++ Tests/JetStreamTests/Unit/MessageTests.swift | 119 +++++ .../Integration/ConnectionTests.swift | 1 + 10 files changed, 1038 insertions(+), 12 deletions(-) create mode 100644 Sources/JetStream/Consumer+Pull.swift create mode 100644 Sources/JetStream/JetStreamMessage.swift create mode 100644 Tests/JetStreamTests/Integration/ConsumerTests.swift create mode 100644 Tests/JetStreamTests/Unit/MessageTests.swift diff --git a/Sources/JetStream/Consumer+Pull.swift b/Sources/JetStream/Consumer+Pull.swift new file mode 100644 index 0000000..c9a9596 --- /dev/null +++ b/Sources/JetStream/Consumer+Pull.swift @@ -0,0 +1,220 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation +import Nats +import Nuid + +/// Extension to ``Consumer`` adding pull consumer capabilities. +extension Consumer { + + /// Retrieves up to a provided number of messages from a stream. + /// This method will send a single request and deliver requested messages unless time out is met earlier. + /// + /// - Parameters: + /// - batch: maximum number of messages to be retrieved + /// - expires: timeout of a pull request + /// - idleHeartbeat: interval in which server should send heartbeat messages (if no user messages are available). + /// + /// - Returns: ``FetchResult`` which implements ``AsyncSequence`` allowing iteration over messages. + /// + /// - Throws: + /// - ``JetStreamError/FetchError`` if there was an error while fetching messages + public func fetch( + batch: Int, expires: TimeInterval = 30, idleHeartbeat: TimeInterval? = nil + ) async throws -> FetchResult { + var request: PullRequest + if let idleHeartbeat { + request = PullRequest( + batch: batch, expires: NanoTimeInterval(expires), + heartbeat: NanoTimeInterval(idleHeartbeat)) + } else { + request = PullRequest(batch: batch, expires: NanoTimeInterval(expires)) + } + + let subject = ctx.apiSubject("CONSUMER.MSG.NEXT.\(info.stream).\(info.name)") + let inbox = "_INBOX.\(nextNuid())" + let sub = try await ctx.client.subscribe(subject: inbox) + try await self.ctx.client.publish( + JSONEncoder().encode(request), subject: subject, reply: inbox) + return FetchResult(ctx: ctx, sub: sub, idleHeartbeat: idleHeartbeat, batch: batch) + } +} + +/// Used to iterate over results of ``Consumer/fetch(batch:expires:idleHeartbeat:)`` +public class FetchResult: AsyncSequence { + public typealias Element = JetStreamMessage + public typealias AsyncIterator = FetchIterator + + private let ctx: JetStreamContext + private let sub: NatsSubscription + private let idleHeartbeat: TimeInterval? + private let batch: Int + + init(ctx: JetStreamContext, sub: NatsSubscription, idleHeartbeat: TimeInterval?, batch: Int) { + self.ctx = ctx + self.sub = sub + self.idleHeartbeat = idleHeartbeat + self.batch = batch + } + + public func makeAsyncIterator() -> FetchIterator { + return FetchIterator( + ctx: ctx, + sub: self.sub, idleHeartbeat: self.idleHeartbeat, remainingMessages: self.batch) + } + + public struct FetchIterator: AsyncIteratorProtocol { + private let ctx: JetStreamContext + private let sub: NatsSubscription + private let idleHeartbeat: TimeInterval? + private var remainingMessages: Int + private var subIterator: NatsSubscription.AsyncIterator + + init( + ctx: JetStreamContext, sub: NatsSubscription, idleHeartbeat: TimeInterval?, + remainingMessages: Int + ) { + self.ctx = ctx + self.sub = sub + self.idleHeartbeat = idleHeartbeat + self.remainingMessages = remainingMessages + self.subIterator = sub.makeAsyncIterator() + } + + public mutating func next() async throws -> JetStreamMessage? { + if remainingMessages <= 0 { + try await sub.unsubscribe() + return nil + } + + while true { + let message: NatsMessage? + + if let idleHeartbeat = idleHeartbeat { + let timeout = idleHeartbeat * 2 + message = try await nextWithTimeout(timeout, subIterator) + } else { + message = try await subIterator.next() + } + + guard let message else { + // the subscription has ended + try await sub.unsubscribe() + return nil + } + + let status = message.status ?? .ok + + switch status { + case .timeout: + try await sub.unsubscribe() + return nil + case .idleHeartbeat: + // in case of idle heartbeat error, we want to + // wait for next message on subscription + continue + case .notFound: + try await sub.unsubscribe() + return nil + case .ok: + remainingMessages -= 1 + return JetStreamMessage(message: message, client: ctx.client) + case .badRequest: + try await sub.unsubscribe() + throw JetStreamError.FetchError.badRequest + case .noResponders: + try await sub.unsubscribe() + throw JetStreamError.FetchError.noResponders + case .requestTerminated: + try await sub.unsubscribe() + guard let description = message.description else { + throw JetStreamError.FetchError.invalidResponse + } + + let descLower = description.lowercased() + if descLower.contains("message size exceeds maxbytes") { + return nil + } else if descLower.contains("leadership changed") { + throw JetStreamError.FetchError.leadershipChanged + } else if descLower.contains("consumer deleted") { + throw JetStreamError.FetchError.consumerDeleted + } else if descLower.contains("consumer is push based") { + throw JetStreamError.FetchError.consumerIsPush + } + default: + throw JetStreamError.FetchError.unknownStatus(status, message.description) + } + + if remainingMessages == 0 { + try await sub.unsubscribe() + } + + } + } + + func nextWithTimeout( + _ timeout: TimeInterval, _ subIterator: NatsSubscription.AsyncIterator + ) async throws -> NatsMessage? { + try await withThrowingTaskGroup(of: NatsMessage?.self) { group in + group.addTask { + return try await subIterator.next() + } + group.addTask { + try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000)) + try await sub.unsubscribe() + return nil + } + defer { + group.cancelAll() + } + for try await result in group { + if let msg = result { + return msg + } else { + throw JetStreamError.FetchError.noHeartbeatReceived + } + } + // this should not be reachable + throw JetStreamError.FetchError.noHeartbeatReceived + } + } + } +} + +internal struct PullRequest: Codable { + let batch: Int + let expires: NanoTimeInterval + let maxBytes: Int? + let noWait: Bool? + let heartbeat: NanoTimeInterval? + + internal init( + batch: Int, expires: NanoTimeInterval, maxBytes: Int? = nil, noWait: Bool? = nil, + heartbeat: NanoTimeInterval? = nil + ) { + self.batch = batch + self.expires = expires + self.maxBytes = maxBytes + self.noWait = noWait + self.heartbeat = heartbeat + } + + enum CodingKeys: String, CodingKey { + case batch + case expires + case maxBytes = "max_bytes" + case noWait = "no_wait" + case heartbeat = "idle_heartbeat" + } +} diff --git a/Sources/JetStream/Consumer.swift b/Sources/JetStream/Consumer.swift index 8fe8db2..3e13bc5 100644 --- a/Sources/JetStream/Consumer.swift +++ b/Sources/JetStream/Consumer.swift @@ -40,7 +40,7 @@ public class Consumer { /// > - ``JetStreamRequestError`` if the request was unsuccessful. /// > - ``JetStreamError`` if the server responded with an API error. public func info() async throws -> ConsumerInfo { - let subj = "CONSUMER.INFO.\(info.stream).\(info.config.name!)" + let subj = "CONSUMER.INFO.\(info.stream).\(info.name)" let info: Response = try await ctx.request(subj) switch info { case .success(let info): diff --git a/Sources/JetStream/JetStreamContext+Stream.swift b/Sources/JetStream/JetStreamContext+Stream.swift index 313bafe..377cd47 100644 --- a/Sources/JetStream/JetStreamContext+Stream.swift +++ b/Sources/JetStream/JetStreamContext+Stream.swift @@ -137,7 +137,7 @@ extension JetStreamContext { /// Used to list stream infos. /// - /// - Returns an ``Streams`` which implements AsyncSequence allowing iteration over streams. + /// - Returns ``Streams`` which implements AsyncSequence allowing iteration over streams. /// /// - Parameter subject: if provided will be used to filter out returned streams public func streams(subject: String? = nil) async -> Streams { @@ -146,7 +146,7 @@ extension JetStreamContext { /// Used to list stream names. /// - /// - Returns an ``StreamNames`` which implements AsyncSequence allowing iteration over stream names. + /// - Returns ``StreamNames`` which implements AsyncSequence allowing iteration over stream names. /// /// - Parameter subject: if provided will be used to filter out returned stream names public func streamNames(subject: String? = nil) async -> StreamNames { diff --git a/Sources/JetStream/JetStreamContext.swift b/Sources/JetStream/JetStreamContext.swift index dcb75d1..2490c6a 100644 --- a/Sources/JetStream/JetStreamContext.swift +++ b/Sources/JetStream/JetStreamContext.swift @@ -18,7 +18,7 @@ import Nuid /// A context which can perform jetstream scoped requests. public class JetStreamContext { - private var client: NatsClient + internal var client: NatsClient private var prefix: String = "$JS.API" private var timeout: TimeInterval = 5.0 @@ -86,7 +86,7 @@ extension JetStreamContext { let data = message ?? Data() do { let response = try await self.client.request( - data, subject: "\(self.prefix).\(subject)", timeout: self.timeout) + data, subject: apiSubject(subject), timeout: self.timeout) let decoder = JSONDecoder() guard let payload = response.payload else { throw JetStreamError.RequestError.emptyResponsePayload @@ -108,7 +108,7 @@ extension JetStreamContext { let data = message ?? Data() do { return try await self.client.request( - data, subject: "\(self.prefix).\(subject)", timeout: self.timeout) + data, subject: apiSubject(subject), timeout: self.timeout) } catch let err as NatsError.RequestError { switch err { case .noResponders: @@ -120,6 +120,10 @@ extension JetStreamContext { } } } + + internal func apiSubject(_ subject: String) -> String { + return "\(self.prefix).\(subject)" + } } public struct JetStreamAPIResponse: Codable { diff --git a/Sources/JetStream/JetStreamError.swift b/Sources/JetStream/JetStreamError.swift index 7627b6c..00182a5 100644 --- a/Sources/JetStream/JetStreamError.swift +++ b/Sources/JetStream/JetStreamError.swift @@ -49,6 +49,74 @@ public enum JetStreamError { } } + public enum MessageMetadataError: JetStreamErrorProtocol { + case noReplyInMessage + case invalidPrefix + case invalidTokenNum + case invalidTokenValue + + public var description: String { + switch self { + case .noReplyInMessage: + return "nats: did not fund reply subject in message" + case .invalidPrefix: + return "nats: invalid reply subject prefix" + case .invalidTokenNum: + return "nats: invalid token count" + case .invalidTokenValue: + return "nats: invalid token value" + } + } + } + + public enum FetchError: JetStreamErrorProtocol { + case noHeartbeatReceived + case consumerDeleted + case badRequest + case noResponders + case consumerIsPush + case invalidResponse + case leadershipChanged + case unknownStatus(StatusCode, String?) + + public var description: String { + switch self { + case .noHeartbeatReceived: + return "nats: no heartbeat received" + case .consumerDeleted: + return "nats: consumer deleted" + case .badRequest: + return "nats: bad request" + case .noResponders: + return "nats: no responders" + case .consumerIsPush: + return "nats: consumer is push based" + case .invalidResponse: + return "nats: no description in status response" + case .leadershipChanged: + return "nats: leadership changed" + case .unknownStatus(let status, let description): + if let description { + return "nats: unknown response status: \(status): \(description)" + } else { + return "nats: unknown response status: \(status)" + } + } + } + + } + + public enum AckError: JetStreamErrorProtocol { + case noReplyInMessage + + public var description: String { + switch self { + case .noReplyInMessage: + return "nats: did not fund reply subject in message" + } + } + } + public enum StreamError: JetStreamErrorProtocol { case nameRequired case invalidStreamName(String) diff --git a/Sources/JetStream/JetStreamMessage.swift b/Sources/JetStream/JetStreamMessage.swift new file mode 100644 index 0000000..4673e63 --- /dev/null +++ b/Sources/JetStream/JetStreamMessage.swift @@ -0,0 +1,193 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation +import Nats + +/// Representation of NATS message in the context of JetStream. +/// It exposes message properties (payload, headers etc.) and various methods for acknowledging delivery. +/// It also allows for checking message metadata. +public struct JetStreamMessage { + private let message: NatsMessage + + /// Message payload. + public var payload: Data? { message.payload } + + /// Message headers. + public var headers: NatsHeaderMap? { message.headers } + + /// The subject the message was published on. + public var subject: String { message.subject } + + /// Reply subject used for acking a message. + public var reply: String? { message.replySubject } + + internal let client: NatsClient + + private let emptyPayload = "".data(using: .utf8)! + + internal init(message: NatsMessage, client: NatsClient) { + self.message = message + self.client = client + } + + /// Sends an acknowledgement of given kind to the server. + /// + /// - Parameter ackType: the type of acknowledgement being sent (defaults to ``AckKind/ack``. For details, see ``AckKind``. + /// - Throws: + /// - ``JetStreamError/AckError`` if there was an error sending the acknowledgement. + public func ack(ackType: AckKind = .ack) async throws { + guard let subject = message.replySubject else { + throw JetStreamError.AckError.noReplyInMessage + } + try await client.publish(ackType.payload(), subject: subject) + } + + /// Parses the reply subject of the message, exposing JetStream message metadata. + /// + /// - Returns ``MessageMetadata`` + /// + /// - Throws: + /// - ``JetStreamError/MessageMetadataError`` when there is an error parsing metadata. + public func metadata() throws -> MessageMetadata { + let prefix = "$JS.ACK." + guard let subject = message.replySubject else { + throw JetStreamError.MessageMetadataError.noReplyInMessage + } + if !subject.starts(with: prefix) { + throw JetStreamError.MessageMetadataError.invalidPrefix + } + + let startIndex = subject.index(subject.startIndex, offsetBy: prefix.count) + let parts = subject[startIndex...].split(separator: ".") + + return try MessageMetadata(tokens: parts) + } +} + +/// Represents various types of JetStream message acknowledgement. +public enum AckKind { + /// Normal acknowledgemnt + case ack + /// Negative ack, message will be redelivered (immediately or after given delay) + case nak(delay: TimeInterval? = nil) + /// Marks the message as being processed, resets ack wait timer delaying evential redelivery. + case inProgress + /// Marks the message as terminated, it will never be redelivered. + case term(reason: String? = nil) + + func payload() -> Data { + switch self { + case .ack: + return "+ACK".data(using: .utf8)! + case .nak(let delay): + if let delay { + let delayStr = String(Int64(delay * 1_000_000_000)) + return "-NAK {\"delay\":\(delayStr)}".data(using: .utf8)! + } else { + return "-NAK".data(using: .utf8)! + } + case .inProgress: + return "+WPI".data(using: .utf8)! + case .term(let reason): + if let reason { + return "+TERM \(reason)".data(using: .utf8)! + } else { + return "+TERM".data(using: .utf8)! + } + } + } +} + +/// Metadata of a JetStream message. +public struct MessageMetadata { + /// The domain this message was received on. + public let domain: String? + + /// Optional account hash, present in servers post-ADR-15. + public let accountHash: String? + + /// Name of the stream the message is delivered from. + public let stream: String + + /// Name of the consumer the mesasge is delivered from. + public let consumer: String + + /// Number of delivery attempts of this message. + public let delivered: UInt64 + + /// Stream sequence associated with this message. + public let streamSequence: UInt64 + + /// Consumer sequence associated with this message. + public let consumerSequence: UInt64 + + /// The time this message was received by the server from the publisher. + public let timestamp: String + + /// The number of messages known by the server to be pending to this consumer. + public let pending: UInt64 + + private let v1TokenCount = 7 + private let v2TokenCount = 9 + + init(tokens: [Substring]) throws { + if tokens.count >= v2TokenCount { + self.domain = String(tokens[0]) + self.accountHash = String(tokens[1]) + self.stream = String(tokens[2]) + self.consumer = String(tokens[3]) + guard let delivered = UInt64(tokens[4]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.delivered = delivered + guard let sseq = UInt64(tokens[5]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.streamSequence = sseq + guard let cseq = UInt64(tokens[6]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.consumerSequence = cseq + self.timestamp = String(tokens[7]) + guard let pending = UInt64(tokens[8]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.pending = pending + } else if tokens.count == v1TokenCount { + self.domain = nil + self.accountHash = nil + self.stream = String(tokens[0]) + self.consumer = String(tokens[1]) + guard let delivered = UInt64(tokens[2]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.delivered = delivered + guard let sseq = UInt64(tokens[3]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.streamSequence = sseq + guard let cseq = UInt64(tokens[4]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.consumerSequence = cseq + self.timestamp = String(tokens[5]) + guard let pending = UInt64(tokens[6]) else { + throw JetStreamError.MessageMetadataError.invalidTokenValue + } + self.pending = pending + } else { + throw JetStreamError.MessageMetadataError.invalidTokenNum + } + } +} diff --git a/Sources/Nats/NatsMessage.swift b/Sources/Nats/NatsMessage.swift index 3f66df5..d8ec1fe 100644 --- a/Sources/Nats/NatsMessage.swift +++ b/Sources/Nats/NatsMessage.swift @@ -24,15 +24,21 @@ public struct NatsMessage { } public struct StatusCode: Equatable { - public static let idleHeartbeat = StatusCode(100) - public static let ok = StatusCode(200) - public static let notFound = StatusCode(404) - public static let timeout = StatusCode(408) - public static let noResponders = StatusCode(503) - public static let requestTerminated = StatusCode(409) + public static let idleHeartbeat = StatusCode(value: 100) + public static let ok = StatusCode(value: 200) + public static let badRequest = StatusCode(value: 400) + public static let notFound = StatusCode(value: 404) + public static let timeout = StatusCode(value: 408) + public static let noResponders = StatusCode(value: 503) + public static let requestTerminated = StatusCode(value: 409) let value: UInt16 + // non-optional initializer for static status codes + private init(value: UInt16) { + self.value = value + } + init?(_ value: UInt16) { if !(100..<1000 ~= value) { return nil diff --git a/Tests/JetStreamTests/Integration/ConsumerTests.swift b/Tests/JetStreamTests/Integration/ConsumerTests.swift new file mode 100644 index 0000000..4350679 --- /dev/null +++ b/Tests/JetStreamTests/Integration/ConsumerTests.swift @@ -0,0 +1,415 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import JetStream +import Logging +import Nats +import NatsServer +import XCTest + +class ConsumerTests: XCTestCase { + + static var allTests = [ + ("testFetchWithDefaultOptions", testFetchWithDefaultOptions), + ("testFetchConsumerDeleted", testFetchConsumerDeleted), + ("testFetchExpires", testFetchExpires), + ("testFetchInvalidIdleHeartbeat", testFetchInvalidIdleHeartbeat), + ("testAck", testAck), + ("testNak", testNak), + ("testNakWithDelay", testNakWithDelay), + ("testTerm", testTerm), + ] + + var natsServer = NatsServer() + + override func tearDown() { + super.tearDown() + natsServer.stop() + } + + func testFetchWithDefaultOptions() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + let consumer = try await stream.createConsumer(cfg: ConsumerConfig(name: "cons")) + + let payload = "hello".data(using: .utf8)! + // publish some messages on stream + for _ in 1...100 { + let ack = try await ctx.publish("foo.A", message: payload) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 100) + + let batch = try await consumer.fetch(batch: 30) + + var i = 0 + for try await msg in batch { + try await msg.ack() + XCTAssertEqual(msg.payload, payload) + i += 1 + } + XCTAssertEqual(i, 30) + } + + func testFetchConsumerDeleted() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + let consumer = try await stream.createConsumer(cfg: ConsumerConfig(name: "cons")) + + let payload = "hello".data(using: .utf8)! + // publish some messages on stream + for _ in 1...10 { + let ack = try await ctx.publish("foo.A", message: payload) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 10) + + let batch = try await consumer.fetch(batch: 30) + + sleep(1) + try await stream.deleteConsumer(name: "cons") + var i = 0 + do { + for try await msg in batch { + try await msg.ack() + XCTAssertEqual(msg.payload, payload) + i += 1 + } + } catch JetStreamError.FetchError.consumerDeleted { + XCTAssertEqual(i, 10) + return + } + XCTFail("should get consumer deleted") + } + + func testFetchExpires() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + let consumer = try await stream.createConsumer(cfg: ConsumerConfig(name: "cons")) + + let payload = "hello".data(using: .utf8)! + // publish some messages on stream + for _ in 1...10 { + let ack = try await ctx.publish("foo.A", message: payload) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 10) + + let batch = try await consumer.fetch(batch: 30, expires: 1) + + var i = 0 + for try await msg in batch { + try await msg.ack() + XCTAssertEqual(msg.payload, payload) + i += 1 + } + XCTAssertEqual(i, 10) + } + + func testFetchInvalidIdleHeartbeat() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + let consumer = try await stream.createConsumer(cfg: ConsumerConfig(name: "cons")) + + let batch = try await consumer.fetch(batch: 30, expires: 1, idleHeartbeat: 2) + + do { + for try await _ in batch {} + } catch JetStreamError.FetchError.badRequest { + // success + return + } + XCTFail("should get bad request") + } + + func testFetchMissingHeartbeat() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + let consumer = try await stream.createConsumer(cfg: ConsumerConfig(name: "cons")) + + let payload = "hello".data(using: .utf8)! + // publish some messages on stream + for _ in 1...10 { + let ack = try await ctx.publish("foo.A", message: payload) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 10) + + try await stream.deleteConsumer(name: "cons") + + let batch = try await consumer.fetch(batch: 30, idleHeartbeat: 1) + + do { + for try await _ in batch {} + } catch JetStreamError.FetchError.noHeartbeatReceived { + return + } + XCTFail("should get missing heartbeats") + } + + func testAck() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + // create a consumer with 500ms ack wait + let consumer = try await stream.createConsumer( + cfg: ConsumerConfig(name: "cons", ackWait: NanoTimeInterval(0.5))) + + // publish some messages on stream + for i in 0..<100 { + let ack = try await ctx.publish("foo.A", message: "\(i)".data(using: .utf8)!) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 100) + + var batch = try await consumer.fetch(batch: 10) + + var i = 0 + for try await msg in batch { + try await msg.ack() + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(i)") + i += 1 + } + XCTAssertEqual(i, 10) + + // now wait 1 second and make sure the messages are not re-delivered + sleep(1) + + batch = try await consumer.fetch(batch: 10) + + for try await msg in batch { + try await msg.ack() + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(i)") + i += 1 + } + XCTAssertEqual(i, 20) + } + + func testNak() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + // create a consumer with 500ms ack wait + let consumer = try await stream.createConsumer( + cfg: ConsumerConfig(name: "cons", ackWait: NanoTimeInterval(0.5))) + + // publish some messages on stream + for i in 0..<10 { + let ack = try await ctx.publish("foo.A", message: "\(i)".data(using: .utf8)!) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 10) + + var batch = try await consumer.fetch(batch: 1) + var iter = batch.makeAsyncIterator() + var msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(0)") + var meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 1) + XCTAssertEqual(meta.consumerSequence, 1) + try await msg.ack(ackType: .nak()) + + // now fetch the message again, it should be redelivered + batch = try await consumer.fetch(batch: 1) + iter = batch.makeAsyncIterator() + msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(0)") + meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 1) + XCTAssertEqual(meta.consumerSequence, 2) + try await msg.ack() + } + + func testNakWithDelay() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + // create a consumer with 500ms ack wait + let consumer = try await stream.createConsumer(cfg: ConsumerConfig(name: "cons")) + + // publish some messages on stream + for i in 0..<10 { + let ack = try await ctx.publish("foo.A", message: "\(i)".data(using: .utf8)!) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 10) + + var batch = try await consumer.fetch(batch: 1) + var iter = batch.makeAsyncIterator() + var msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(0)") + var meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 1) + XCTAssertEqual(meta.consumerSequence, 1) + try await msg.ack(ackType: .nak(delay: 0.5)) + + // now fetch the next message immediately, it should be the next message + batch = try await consumer.fetch(batch: 1) + iter = batch.makeAsyncIterator() + msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(1)") + meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 2) + XCTAssertEqual(meta.consumerSequence, 2) + try await msg.ack() + + // wait a second, the first message should be redelivered at this point + sleep(1) + batch = try await consumer.fetch(batch: 1) + iter = batch.makeAsyncIterator() + msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(0)") + meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 1) + XCTAssertEqual(meta.consumerSequence, 3) + } + + func testTerm() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let streamCfg = StreamConfig(name: "test", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: streamCfg) + + // create a consumer with 500ms ack wait + let consumer = try await stream.createConsumer( + cfg: ConsumerConfig(name: "cons", ackWait: NanoTimeInterval(0.5))) + + // publish some messages on stream + for i in 0..<10 { + let ack = try await ctx.publish("foo.A", message: "\(i)".data(using: .utf8)!) + _ = try await ack.wait() + } + let info = try await stream.info() + XCTAssertEqual(info.state.messages, 10) + + var batch = try await consumer.fetch(batch: 1) + var iter = batch.makeAsyncIterator() + var msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(0)") + var meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 1) + XCTAssertEqual(meta.consumerSequence, 1) + try await msg.ack(ackType: .term()) + + // wait 1s, the first message should not be redelivered (even though we are past ack wait) + sleep(1) + batch = try await consumer.fetch(batch: 1) + iter = batch.makeAsyncIterator() + msg = try await iter.next()! + XCTAssertEqual(String(decoding: msg.payload!, as: UTF8.self), "\(1)") + meta = try msg.metadata() + XCTAssertEqual(meta.streamSequence, 2) + XCTAssertEqual(meta.consumerSequence, 2) + try await msg.ack() + } +} diff --git a/Tests/JetStreamTests/Unit/MessageTests.swift b/Tests/JetStreamTests/Unit/MessageTests.swift new file mode 100644 index 0000000..e24d5ba --- /dev/null +++ b/Tests/JetStreamTests/Unit/MessageTests.swift @@ -0,0 +1,119 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import XCTest + +@testable import JetStream +@testable import Nats + +class JetStreamMessageTests: XCTestCase { + + static var allTests = [ + ("testValidOldFormatMessage", testValidOldFormatMessage), + ("testValidNewFormatMessage", testValidNewFormatMessage), + ("testMissingTokens", testMissingTokens), + ("testInvalidTokenValues", testInvalidTokenValues), + ("testInvalidPrefix", testInvalidPrefix), + ("testNoReplySubject", testNoReplySubject), + ] + + func testValidOldFormatMessage() async throws { + let replySubject = "$JS.ACK.myStream.myConsumer.10.20.30.1234567890.5" + let natsMessage = NatsMessage( + payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil, + status: nil, description: nil) + let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient()) + + let metadata = try jetStreamMessage.metadata() + + XCTAssertNil(metadata.domain) + XCTAssertNil(metadata.accountHash) + XCTAssertEqual(metadata.stream, "myStream") + XCTAssertEqual(metadata.consumer, "myConsumer") + XCTAssertEqual(metadata.delivered, 10) + XCTAssertEqual(metadata.streamSequence, 20) + XCTAssertEqual(metadata.consumerSequence, 30) + XCTAssertEqual(metadata.timestamp, "1234567890") + XCTAssertEqual(metadata.pending, 5) + } + + func testValidNewFormatMessage() async throws { + let replySubject = "$JS.ACK.domain.accountHash123.myStream.myConsumer.10.20.30.1234567890.5" + let natsMessage = NatsMessage( + payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil, + status: nil, description: nil) + let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient()) + let metadata = try jetStreamMessage.metadata() + + XCTAssertEqual(metadata.domain, "domain") + XCTAssertEqual(metadata.accountHash, "accountHash123") + XCTAssertEqual(metadata.stream, "myStream") + XCTAssertEqual(metadata.consumer, "myConsumer") + XCTAssertEqual(metadata.delivered, 10) + XCTAssertEqual(metadata.streamSequence, 20) + XCTAssertEqual(metadata.consumerSequence, 30) + XCTAssertEqual(metadata.timestamp, "1234567890") + XCTAssertEqual(metadata.pending, 5) + } + + func testMissingTokens() async throws { + let replySubject = "$JS.ACK.myStream.myConsumer" + let natsMessage = NatsMessage( + payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil, + status: nil, description: nil) + let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient()) + do { + _ = try jetStreamMessage.metadata() + } catch JetStreamError.MessageMetadataError.invalidTokenNum { + return + } + } + + func testInvalidTokenValues() async throws { + let replySubject = "$JS.ACK.myStream.myConsumer.invalid.20.30.1234567890.5" + let natsMessage = NatsMessage( + payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil, + status: nil, description: nil) + let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient()) + do { + _ = try jetStreamMessage.metadata() + } catch JetStreamError.MessageMetadataError.invalidTokenValue { + return + } + } + + func testInvalidPrefix() async throws { + let replySubject = "$JS.WRONG.myStream.myConsumer.10.20.30.1234567890.5" + let natsMessage = NatsMessage( + payload: nil, subject: "", replySubject: replySubject, length: 0, headers: nil, + status: nil, description: nil) + let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient()) + do { + _ = try jetStreamMessage.metadata() + } catch JetStreamError.MessageMetadataError.invalidPrefix { + return + } + } + + func testNoReplySubject() async throws { + let natsMessage = NatsMessage( + payload: nil, subject: "", replySubject: nil, length: 0, headers: nil, status: nil, + description: nil) + let jetStreamMessage = JetStreamMessage(message: natsMessage, client: NatsClient()) + do { + _ = try jetStreamMessage.metadata() + } catch JetStreamError.MessageMetadataError.noReplyInMessage { + return + } + } +} diff --git a/Tests/NatsTests/Integration/ConnectionTests.swift b/Tests/NatsTests/Integration/ConnectionTests.swift index 1524110..6857899 100755 --- a/Tests/NatsTests/Integration/ConnectionTests.swift +++ b/Tests/NatsTests/Integration/ConnectionTests.swift @@ -48,6 +48,7 @@ class CoreNatsTests: XCTestCase { ("testRequest", testRequest), ("testRequest_noResponders", testRequest_noResponders), ("testRequest_permissionDenied", testRequest_permissionDenied), + ("testRequest_timeout", testRequest_timeout), ("testPublishOnClosedConnection", testPublishOnClosedConnection), ("testCloseClosedConnection", testCloseClosedConnection), ("testSuspendClosedConnection", testSuspendClosedConnection),