diff --git a/lib/browser/mqtt_request_response.spec.ts b/lib/browser/mqtt_request_response.spec.ts index 51bf7523..c173928c 100644 --- a/lib/browser/mqtt_request_response.spec.ts +++ b/lib/browser/mqtt_request_response.spec.ts @@ -3,847 +3,599 @@ * SPDX-License-Identifier: Apache-2.0. */ -import * as protocol_adapter_mock from "./mqtt_request_response/protocol_adapter_mock"; -import * as mqtt_request_response from "./mqtt_request_response"; -import * as protocol_adapter from "./mqtt_request_response/protocol_adapter"; -import { CrtError } from "./error"; -import {MockProtocolAdapter} from "./mqtt_request_response/protocol_adapter_mock"; - -jest.setTimeout(1000000); - -interface TestContextOptions { - clientOptions?: mqtt_request_response.RequestResponseClientOptions, - adapterOptions?: protocol_adapter_mock.MockProtocolAdapterOptions -} -interface TestContext { - client : mqtt_request_response.RequestResponseClient, - adapter: protocol_adapter_mock.MockProtocolAdapter -} - -function createTestContext(options? : TestContextOptions) : TestContext { - let adapter = new protocol_adapter_mock.MockProtocolAdapter(options?.adapterOptions); - - var clientOptions : mqtt_request_response.RequestResponseClientOptions = options?.clientOptions ?? { - maxRequestResponseSubscriptions: 4, - maxStreamingSubscriptions: 2, - operationTimeoutInSeconds: 600, +import * as auth from "./auth"; +import * as test_env from "@test/test_env" +import * as aws_iot_311 from "./aws_iot"; +import * as aws_iot_5 from "./aws_iot_mqtt5"; +//import * as mqtt5 from "./mqtt5"; +import * as mqtt_request_response from "./mqtt_request_response"; +//import {v4 as uuid} from "uuid"; +//import {once} from "events"; +import * as mrr_test from "@test/mqtt_request_response"; +import {v4 as uuid} from "uuid"; + +jest.setTimeout(10000); + +function createClientBuilder5() : aws_iot_5.AwsIotMqtt5ClientConfigBuilder { + let credentials : auth.AWSCredentials = { + aws_region: test_env.AWS_IOT_ENV.MQTT5_REGION, + aws_access_id: test_env.AWS_IOT_ENV.MQTT5_CRED_ACCESS_KEY, + aws_secret_key: test_env.AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY, }; - // @ts-ignore - let client = new mqtt_request_response.RequestResponseClient(adapter, clientOptions); - - return { - client: client, - adapter: adapter - }; -} + if (test_env.AWS_IOT_ENV.MQTT5_CRED_SESSION_TOKEN) { + credentials.aws_sts_token = test_env.AWS_IOT_ENV.MQTT5_CRED_SESSION_TOKEN; + } -function cleanupTestContext(context: TestContext) { - context.client.close(); -} + let provider = new auth.StaticCredentialProvider(credentials); -test('create/destroy', async () => { - let context = createTestContext(); - cleanupTestContext(context); -}); + let builder = aws_iot_5.AwsIotMqtt5ClientConfigBuilder.newWebsocketMqttBuilderWithSigv4Auth(test_env.AWS_IOT_ENV.MQTT5_HOST, { + credentialsProvider: provider, + region: test_env.AWS_IOT_ENV.MQTT5_REGION + }); -async function doRequestResponseValidationFailureTest(request: mqtt_request_response.RequestResponseOperationOptions, errorSubstring: string) { - let context = createTestContext(); + return builder; +} - context.adapter.connect(); +function createClientBuilder311() : aws_iot_311.AwsIotMqttConnectionConfigBuilder { + let builder = aws_iot_311.AwsIotMqttConnectionConfigBuilder.new_with_websockets(); - try { - await context.client.submitRequest(request); - expect(false); - } catch (err: any) { - expect(err.message).toContain(errorSubstring); - } + builder.with_endpoint(test_env.AWS_IOT_ENV.MQTT5_HOST); + builder.with_client_id(`node-mqtt-unit-test-${uuid()}`) + builder.with_credentials( + test_env.AWS_IOT_ENV.MQTT5_REGION, + test_env.AWS_IOT_ENV.MQTT5_CRED_ACCESS_KEY, + test_env.AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY, + test_env.AWS_IOT_ENV.MQTT5_CRED_SESSION_TOKEN + ); - cleanupTestContext(context); + return builder; } -const DEFAULT_ACCEPTED_PATH = "a/b/accepted"; -const DEFAULT_REJECTED_PATH = "a/b/rejected"; -const DEFAULT_CORRELATION_TOKEN_PATH = "token"; -const DEFAULT_CORRELATION_TOKEN = "abcd"; - -function makeGoodRequest() : mqtt_request_response.RequestResponseOperationOptions { - var encoder = new TextEncoder(); - - return { - subscriptionTopicFilters : new Array("a/b/+"), - responsePaths: new Array({ - topic: DEFAULT_ACCEPTED_PATH, - correlationTokenJsonPath: DEFAULT_CORRELATION_TOKEN_PATH - }, { - topic: DEFAULT_REJECTED_PATH, - correlationTokenJsonPath: DEFAULT_CORRELATION_TOKEN_PATH - }), - publishTopic: "a/b/derp", - payload: encoder.encode(JSON.stringify({ - token: DEFAULT_CORRELATION_TOKEN - })), - correlationToken: DEFAULT_CORRELATION_TOKEN - }; +function initClientBuilderFactories() { + // @ts-ignore + mrr_test.setClientBuilderFactories(createClientBuilder5, createClientBuilder311); } -test('request-response validation failure - null options', async () => { - // @ts-ignore - let requestOptions : mqtt_request_response.RequestResponseOperationOptions = null; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Create Destroy Mqtt5', async () => { + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 + }); + await context.open(); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); + await context.close(); }); -test('request-response validation failure - null response paths', async () => { - let requestOptions = makeGoodRequest(); - - // @ts-ignore - requestOptions.responsePaths = null; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Create Destroy Mqtt311', async () => { + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt311 + }); + await context.open(); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); + await context.close(); }); -test('request-response validation failure - no response paths', async () => { - let requestOptions = makeGoodRequest(); - requestOptions.responsePaths = new Array(); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Success Rejected Mqtt5', async () => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt5, true); }); -test('request-response validation failure - null response topic', async () => { - let requestOptions = makeGoodRequest(); - - // @ts-ignore - requestOptions.responsePaths[0].topic = null; - - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Success Rejected Mqtt311', async () => { + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt311, true); }); -test('request-response validation failure - response topic bad type', async () => { - let requestOptions = makeGoodRequest(); - - // @ts-ignore - requestOptions.responsePaths[0].topic = 5; - - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Success Rejected No CorrelationToken Mqtt5', async () => { + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt5, false); }); -test('request-response validation failure - empty response topic', async () => { - let requestOptions = makeGoodRequest(); - - requestOptions.responsePaths[0].topic = ""; - - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Success Rejected No CorrelationToken Mqtt311', async () => { + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt311, false); }); -test('request-response validation failure - invalid response topic', async () => { - let requestOptions = makeGoodRequest(); - - requestOptions.responsePaths[0].topic = "a/#/b"; - - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('UpdateNamedShadow Success Accepted Mqtt5', async () => { + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt5, true); }); -test('request-response validation failure - correlation token path bad type', async () => { - let requestOptions = makeGoodRequest(); - - // @ts-ignore - requestOptions.responsePaths[0].correlationTokenJsonPath = 5; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('UpdateNamedShadow Success Accepted Mqtt311', async () => { + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt311, true); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('UpdateNamedShadow Success Accepted No CorrelationToken Mqtt5', async () => { + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt5, false); }); -test('request-response validation failure - null publish topic', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('UpdateNamedShadow Success Accepted No CorrelationToken Mqtt311', async () => { + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt311, false); +}); - // @ts-ignore - requestOptions.publishTopic = null; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Timeout Mqtt5', async () => { + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt5, true); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Timeout Mqtt311', async () => { + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt311, true); }); -test('request-response validation failure - publish topic bad type', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Timeout No CorrelationToken Mqtt5', async () => { + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt5, false); +}); - // @ts-ignore - requestOptions.publishTopic = 5; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Timeout No CorrelationToken Mqtt311', async () => { + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt311, false); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure On Close Mqtt5', async () => { + await mrr_test.do_get_named_shadow_failure_on_close_test(mrr_test.ProtocolVersion.Mqtt5, "closed"); }); -test('request-response validation failure - empty publish topic', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure On Close Mqtt311', async () => { + await mrr_test.do_get_named_shadow_failure_on_close_test(mrr_test.ProtocolVersion.Mqtt311, "closed"); +}); - requestOptions.publishTopic = ""; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure zero max request response subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_no_max_request_response_subscriptions, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure zero max request response subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_no_max_request_response_subscriptions, "Invalid client options"); }); -test('request-response validation failure - invalid publish topic', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure invalid max request response subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_invalid_max_request_response_subscriptions, "Invalid client options"); +}); - requestOptions.publishTopic = "a/+"; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure invalid max request response subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_invalid_max_request_response_subscriptions, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure undefined config mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_undefined_config, "Invalid client options"); }); -test('request-response validation failure - null subscription topic filters', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure undefined config mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_undefined_config, "Invalid client options"); +}); - // @ts-ignore - requestOptions.subscriptionTopicFilters = null; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure undefined max request response subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_undefined_max_request_response_subscriptions, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure undefined max request response subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_undefined_max_request_response_subscriptions, "Invalid client options"); }); -test('request-response validation failure - no subscription topic filters', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure null max request response subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_null_max_request_response_subscriptions, "Invalid client options"); +}); - requestOptions.subscriptionTopicFilters = new Array(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure null max request response subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_null_max_request_response_subscriptions, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure missing max request response subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_missing_max_request_response_subscriptions, "Invalid client options"); }); -test('request-response validation failure - null subscription topic filter', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure missing max request response subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_missing_max_request_response_subscriptions, "Invalid client options"); +}); - // @ts-ignore - requestOptions.subscriptionTopicFilters[0] = null; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure undefined max streaming subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_undefined_max_streaming_subscriptions, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure undefined max streaming subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_undefined_max_streaming_subscriptions, "Invalid client options"); }); -test('request-response validation failure - subscription topic filter bad type', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure null max streaming subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_null_max_streaming_subscriptions, "Invalid client options"); +}); - // @ts-ignore - requestOptions.subscriptionTopicFilters[0] = 5; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure null max streaming subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_null_max_streaming_subscriptions, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure missing max streaming subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_missing_max_streaming_subscriptions, "Invalid client options"); }); -test('request-response validation failure - empty subscription topic filter', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure missing max streaming subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_missing_max_streaming_subscriptions, "Invalid client options"); +}); - requestOptions.subscriptionTopicFilters[0] = ""; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure missing max streaming subscriptions mqtt5', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_invalid_operation_timeout, "Invalid client options"); +}); - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Client creation failure missing max streaming subscriptions mqtt311', async() => { + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_invalid_operation_timeout, "Invalid client options"); }); -test('request-response validation failure - invalid subscription topic filter', async () => { - let requestOptions = makeGoodRequest(); - requestOptions.subscriptionTopicFilters[0] = "#/a/b"; +test('Client creation failure null protocol client mqtt311', async() => { + let config : mqtt_request_response.RequestResponseClientOptions = { + maxRequestResponseSubscriptions: 2, + maxStreamingSubscriptions : 2, + operationTimeoutInSeconds : 5, + }; - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); + // @ts-ignore + expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt311(null, config)}).toThrow("protocol client is null"); }); -test('request-response validation failure - null payload', async () => { - let requestOptions = makeGoodRequest(); +test('Client creation failure null protocol client mqtt5', async() => { + let config : mqtt_request_response.RequestResponseClientOptions = { + maxRequestResponseSubscriptions: 2, + maxStreamingSubscriptions : 2, + operationTimeoutInSeconds : 5, + }; // @ts-ignore - requestOptions.payload = null; - - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); + expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt5(null, config)}).toThrow("protocol client is null"); }); -test('request-response validation failure - empty payload', async () => { - let requestOptions = makeGoodRequest(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure No Subscription Topic Filters', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + delete new_options.subscriptionTopicFilters; - let encoder = new TextEncoder(); - requestOptions.payload = encoder.encode(""); - - await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); + return new_options; + }); }); -test('request-response failure - interrupted by close', async () => { - let context = createTestContext(); - - context.adapter.connect(); - - let responsePromise = context.client.submitRequest(makeGoodRequest()); - - context.client.close(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Null Subscription Topic Filters', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.subscriptionTopicFilters = null; - try { - await responsePromise; - expect(false); - } catch (err: any) { - expect(err.message).toContain("client closed"); - } - - cleanupTestContext(context); + return new_options; + }); }); -test('request-response failure - client closed', async () => { - let context = createTestContext(); - - context.adapter.connect(); - context.client.close(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Subscription Topic Filters Not An Array', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.subscriptionTopicFilters = "null"; - try { - await context.client.submitRequest(makeGoodRequest()); - expect(false); - } catch (err: any) { - expect(err.message).toContain("already been closed"); - } - - cleanupTestContext(context); + return new_options; + }); }); -test('request-response failure - timeout', async () => { - let clientOptions = { - maxRequestResponseSubscriptions: 4, - maxStreamingSubscriptions: 2, - operationTimeoutInSeconds: 2 - }; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Subscription Topic Filters Empty', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.subscriptionTopicFilters = []; - let context = createTestContext({ - clientOptions: clientOptions + return new_options; }); - - context.adapter.connect(); - - try { - await context.client.submitRequest(makeGoodRequest()); - expect(false); - } catch (err: any) { - expect(err.message).toContain("timeout"); - } - - cleanupTestContext(context); }); -function mockSubscribeSuccessHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) { - setImmediate(() => { adapter.completeSubscribe(subscribeOptions.topicFilter); }); -} - -function mockUnsubscribeSuccessHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, unsubscribeOptions: protocol_adapter.UnsubscribeOptions, context?: any) { - setImmediate(() => { adapter.completeUnsubscribe(unsubscribeOptions.topicFilter); }); -} +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure No Response Paths', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + delete new_options.responsePaths; -interface PublishHandlerContext { - responseTopic: string, - responsePayload: any -} + return new_options; + }); +}); -function mockPublishSuccessHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - let publishHandlerContext = context as PublishHandlerContext; - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Null Response Paths', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths = null; - let decoder = new TextDecoder(); - let payloadAsString = decoder.decode(publishOptions.payload); - let payloadAsObject: any = JSON.parse(payloadAsString); + return new_options; + }); +}); - publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH]; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Paths Not An Array', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths = "null"; - let encoder = new TextEncoder(); - let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); - adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); + return new_options; }); -} - -async function do_request_response_single_success_test(responsePath: string, multiSubscribe: boolean) { - let publishHandlerContext : PublishHandlerContext = { - responseTopic: responsePath, - responsePayload: {} - } +}); - let adapterOptions : protocol_adapter_mock.MockProtocolAdapterOptions = { - subscribeHandler: mockSubscribeSuccessHandler, - unsubscribeHandler: mockUnsubscribeSuccessHandler, - publishHandler: mockPublishSuccessHandler, - publishHandlerContext: publishHandlerContext - }; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Paths Empty', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths = []; - let context = createTestContext({ - adapterOptions: adapterOptions, + return new_options; }); +}); - context.adapter.connect(); - - let request = makeGoodRequest(); - if (multiSubscribe) { - request.subscriptionTopicFilters = new Array(DEFAULT_ACCEPTED_PATH, DEFAULT_REJECTED_PATH); - } +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Path No Topic', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + delete new_options.responsePaths[0].topic; - let responsePromise = context.client.submitRequest(request); - let response = await responsePromise; + return new_options; + }); +}); - expect(response.topic).toEqual(responsePath); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Path Null Topic', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths[0].topic = null; - let decoder = new TextDecoder(); - expect(decoder.decode(response.payload)).toEqual(JSON.stringify({token:DEFAULT_CORRELATION_TOKEN})); + return new_options; + }); +}); - cleanupTestContext(context); -} +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Path Bad Topic Type', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths[0].topic = 5; -test('request-response success - accepted response path', async () => { - await do_request_response_single_success_test(DEFAULT_ACCEPTED_PATH, false); + return new_options; + }); }); -test('request-response success - multi-sub accepted response path', async () => { - await do_request_response_single_success_test(DEFAULT_ACCEPTED_PATH, true); -}); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Path Null Correlation Token Json Path', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths[0].correlationTokenJsonPath = null; -test('request-response success - rejected response path', async () => { - await do_request_response_single_success_test(DEFAULT_REJECTED_PATH, false); + return new_options; + }); }); -test('request-response success - multi-sub rejected response path', async () => { - await do_request_response_single_success_test(DEFAULT_REJECTED_PATH, true); -}); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Response Path Bad Correlation Token Json Path Type', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.responsePaths[0].correlationTokenJsonPath = {}; -function mockPublishSuccessHandlerNoToken(responseTopic: string, responsePayload: any, adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); - adapter.triggerIncomingPublish(responseTopic, publishOptions.payload); + return new_options; }); -} +}); -async function do_request_response_success_empty_correlation_token(responsePath: string, count: number) { - let adapterOptions : protocol_adapter_mock.MockProtocolAdapterOptions = { - subscribeHandler: mockSubscribeSuccessHandler, - unsubscribeHandler: mockUnsubscribeSuccessHandler, - publishHandler: (adapter, publishOptions, context) => { mockPublishSuccessHandlerNoToken(responsePath, {}, adapter, publishOptions, context); }, - }; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure No Publish Topic', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + delete new_options.publishTopic; - let context = createTestContext({ - adapterOptions: adapterOptions, + return new_options; }); +}); - context.adapter.connect(); - - let encoder = new TextEncoder(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Null Publish Topic', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.publishTopic = null; - let promises = new Array>(); - for (let i = 0; i < count; i++) { - let request = makeGoodRequest(); - delete request.correlationToken; - delete request.responsePaths[0].correlationTokenJsonPath; - delete request.responsePaths[1].correlationTokenJsonPath; + return new_options; + }); +}); - request.payload = encoder.encode(JSON.stringify({ - requestNumber: `${i}` - })); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Bad Publish Topic Type', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.publishTopic = {someValue: null}; - promises.push(context.client.submitRequest(request)); - } + return new_options; + }); +}); - for (const [i, promise] of promises.entries()) { - let response = await promise; - expect(response.topic).toEqual(responsePath); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure No Payload', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + delete new_options.payload; - let decoder = new TextDecoder(); - expect(decoder.decode(response.payload)).toEqual(JSON.stringify({requestNumber:`${i}`})); - } + return new_options; + }); +}); - cleanupTestContext(context); -} +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Null Payload', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.payload = null; -test('request-response success - accepted response path no correlation token', async () => { - await do_request_response_success_empty_correlation_token(DEFAULT_ACCEPTED_PATH, 1); + return new_options; + }); }); -test('request-response success - accepted response path no correlation token sequence', async () => { - await do_request_response_success_empty_correlation_token(DEFAULT_ACCEPTED_PATH, 5); -}); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Null Correlation Token', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.correlationToken = null; -test('request-response success - rejected response path no correlation token', async () => { - await do_request_response_success_empty_correlation_token(DEFAULT_REJECTED_PATH, 1); + return new_options; + }); }); -test('request-response success - rejected response path no correlation token sequence', async () => { - await do_request_response_success_empty_correlation_token(DEFAULT_REJECTED_PATH, 5); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Bad Correlation Token Type', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + // @ts-ignore + new_options.correlationToken = ["something"]; + + return new_options; + }); }); -interface FailingSubscribeContext { - startFailingIndex: number, - subscribesSeen: number -} +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Protocol Invalid Topic', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + let new_options = options; + new_options.publishTopic = "#/illegal/#/topic"; -function mockSubscribeFailureHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) { - let subscribeContext = context as FailingSubscribeContext; + return new_options; + }); +}); - if (subscribeContext.subscribesSeen >= subscribeContext.startFailingIndex) { - setImmediate(() => { - adapter.completeSubscribe(subscribeOptions.topicFilter, new CrtError("Nope")); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Null Options', async () => { + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "Invalid request options", + // @ts-ignore + (options : mqtt_request_response.RequestResponseOperationOptions) => { + return null; }); - } else { - setImmediate(() => { - adapter.completeSubscribe(subscribeOptions.topicFilter); - }); - } - - subscribeContext.subscribesSeen++; -} - -async function do_request_response_failure_subscribe(failSecondSubscribe: boolean) { - - let subscribeContext : FailingSubscribeContext = { - startFailingIndex : failSecondSubscribe ? 1 : 0, - subscribesSeen : 0, - }; - - let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { - subscribeHandler: mockSubscribeFailureHandler, - subscribeHandlerContext: subscribeContext, - unsubscribeHandler: mockUnsubscribeSuccessHandler, - }; +}); - let context = createTestContext({ - adapterOptions: adapterOptions, +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('GetNamedShadow Failure Submit After Close', async () => { + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 }); - context.adapter.connect(); - - let request = makeGoodRequest(); - if (failSecondSubscribe) { - request.subscriptionTopicFilters = new Array(DEFAULT_ACCEPTED_PATH, DEFAULT_REJECTED_PATH); - } + await context.open(); + await context.close(); + let requestOptions = mrr_test.createRejectedGetNamedShadowRequest(true); try { - await context.client.submitRequest(request); + await context.client.submitRequest(requestOptions); expect(false); - } catch (e) { - let err = e as Error; - expect(err.message).toContain("Subscribe failure"); + } catch (err: any) { + expect(err.message).toContain("already been closed"); } - - cleanupTestContext(context); -} - - -test('request-response failure - subscribe failure', async () => { - await do_request_response_failure_subscribe(false); }); -test('request-response failure - second subscribe failure', async () => { - await do_request_response_failure_subscribe(true); +////////////////////////////////////////////// +// Streaming Ops NYI +/* +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Open/Close MQTT5', async () => { + await mrr_test.do_streaming_operation_new_open_close_test(mrr_test.ProtocolVersion.Mqtt5); }); -function mockPublishFailureHandlerAck(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - setImmediate(() => { - adapter.completePublish(publishOptions.completionData, new CrtError("Publish failure - No can do")); - }); -} - -test('request-response failure - publish failure', async () => { - let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { - subscribeHandler: mockSubscribeSuccessHandler, - unsubscribeHandler: mockUnsubscribeSuccessHandler, - publishHandler: mockPublishFailureHandlerAck, - }; - - let context = createTestContext({ - adapterOptions: adapterOptions, - }); - - context.adapter.connect(); - - let request = makeGoodRequest(); - - try { - await context.client.submitRequest(request); - expect(false); - } catch (e) { - let err = e as Error; - expect(err.message).toContain("Publish failure"); - } +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Open/Close MQTT311', async () => { + await mrr_test.do_streaming_operation_new_open_close_test(mrr_test.ProtocolVersion.Mqtt311); +}); - cleanupTestContext(context); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Incoming Publish MQTT5', async () => { + await mrr_test.do_streaming_operation_incoming_publish_test(mrr_test.ProtocolVersion.Mqtt5); }); -async function doRequestResponseFailureByTimeoutDueToResponseTest(publishHandler: (adapter: MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) => void) { - let publishHandlerContext : PublishHandlerContext = { - responseTopic: DEFAULT_ACCEPTED_PATH, - responsePayload: {} - } +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Incoming Publish MQTT311', async () => { + await mrr_test.do_streaming_operation_incoming_publish_test(mrr_test.ProtocolVersion.Mqtt311); +}); - let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { - subscribeHandler: mockSubscribeSuccessHandler, - unsubscribeHandler: mockUnsubscribeSuccessHandler, - publishHandler: publishHandler, - publishHandlerContext: publishHandlerContext - }; +// We only have a 5-based test because there's no way to stop the 311 client without destroying it in the process. +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Subscription Events MQTT5', async () => { - let context = createTestContext({ - adapterOptions: adapterOptions, - clientOptions: { - maxRequestResponseSubscriptions: 4, - maxStreamingSubscriptions: 2, - operationTimeoutInSeconds: 2, // need a quick timeout + await mrr_test.do_streaming_operation_subscription_events_test({ + version: mrr_test.ProtocolVersion.Mqtt5, + builder_mutator5: (builder) => { + builder.withSessionBehavior(mqtt5.ClientSessionBehavior.Clean); + return builder; } }); +}); - context.adapter.connect(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Failure Reopen', async () => { + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 + }); - let request = makeGoodRequest(); + await context.open(); - try { - await context.client.submitRequest(request); - expect(false); - } catch (e) { - let err = e as Error; - expect(err.message).toContain("timeout"); + let topic_filter = `not/a/real/shadow/${uuid()}`; + let streaming_options : mqtt_request_response.StreamingOperationOptions = { + subscriptionTopicFilter : topic_filter, } - cleanupTestContext(context); -} + let stream = context.client.createStream(streaming_options); -function mockPublishFailureHandlerInvalidResponse(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - let publishHandlerContext = context as PublishHandlerContext; - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); + let initialSubscriptionComplete = once(stream, mqtt_request_response.StreamingOperationBase.SUBSCRIPTION_STATUS); - let decoder = new TextDecoder(); - let payloadAsString = decoder.decode(publishOptions.payload); - let payloadAsObject: any = JSON.parse(payloadAsString); + stream.open(); - publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH]; + await initialSubscriptionComplete; - let encoder = new TextEncoder(); - let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); - // drop the closing bracket to create a JSON deserialization error - adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString.slice(0, responsePayloadAsString.length - 1))); - }); -} + stream.open(); -test('request-response failure - invalid response payload', async () => { - await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerInvalidResponse); -}); + stream.close(); -function mockPublishFailureHandlerMissingCorrelationToken(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - let publishHandlerContext = context as PublishHandlerContext; - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); + // multi-opening or multi-closing are fine, but opening after a close is not + expect(() => {stream.open()}).toThrow(); - let encoder = new TextEncoder(); - let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); - adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); - }); -} + stream.close(); -test('request-response failure - missing correlation token', async () => { - await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerMissingCorrelationToken); + await context.close(); }); -function mockPublishFailureHandlerInvalidCorrelationTokenType(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - let publishHandlerContext = context as PublishHandlerContext; - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); - - let decoder = new TextDecoder(); - let payloadAsString = decoder.decode(publishOptions.payload); - let payloadAsObject: any = JSON.parse(payloadAsString); - let tokenAsString = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH] as string; - publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = parseInt(tokenAsString, 10); - - let encoder = new TextEncoder(); - let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); - adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Auto Close', async () => { + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 }); -} - -test('request-response failure - invalid correlation token type', async () => { - await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerInvalidCorrelationTokenType); -}); - -function mockPublishFailureHandlerNonMatchingCorrelationToken(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - let publishHandlerContext = context as PublishHandlerContext; - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); - - let decoder = new TextDecoder(); - let payloadAsString = decoder.decode(publishOptions.payload); - let payloadAsObject: any = JSON.parse(payloadAsString); - let token = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH] as string; - publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = token.substring(1); // skip the first character - - let encoder = new TextEncoder(); - let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); - adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); - }); -} - -test('request-response failure - non-matching correlation token', async () => { - await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerNonMatchingCorrelationToken); -}); - -interface TestOperationDefinition { - topicPrefix: string, - uniqueRequestPayload: string, - correlationToken?: string, -} - -interface RequestSequenceContext { - responseMap: Map -} -function makeTestRequest(definition: TestOperationDefinition): mqtt_request_response.RequestResponseOperationOptions { - let encoder = new TextEncoder(); + await context.open(); - let baseResponseAsObject : any = {}; - baseResponseAsObject["requestPayload"] = definition.uniqueRequestPayload; - if (definition.correlationToken) { - baseResponseAsObject[DEFAULT_CORRELATION_TOKEN_PATH] = definition.correlationToken; + let topic_filter = `not/a/real/shadow/${uuid()}`; + let streaming_options : mqtt_request_response.StreamingOperationOptions = { + subscriptionTopicFilter : topic_filter, } - let options : mqtt_request_response.RequestResponseOperationOptions = { - subscriptionTopicFilters : new Array(`${definition.topicPrefix}/+`), - responsePaths: new Array({ - topic: `${definition.topicPrefix}/accepted` - }, { - topic: `${definition.topicPrefix}/rejected` - }), - publishTopic: `${definition.topicPrefix}/operation`, - payload: encoder.encode(JSON.stringify(baseResponseAsObject)), - }; - - if (definition.correlationToken) { - options.responsePaths[0].correlationTokenJsonPath = DEFAULT_CORRELATION_TOKEN_PATH; - options.responsePaths[1].correlationTokenJsonPath = DEFAULT_CORRELATION_TOKEN_PATH; - options.correlationToken = definition.correlationToken; - } - - return options; -} - -function mockPublishSuccessHandlerSequence(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { - let publishHandlerContext = context as RequestSequenceContext; - setImmediate(() => { - adapter.completePublish(publishOptions.completionData); + let stream = context.client.createStream(streaming_options); - let decoder = new TextDecoder(); - let payloadAsString = decoder.decode(publishOptions.payload); + let initialSubscriptionComplete = once(stream, mqtt_request_response.StreamingOperationBase.SUBSCRIPTION_STATUS); - let payloadAsObject: any = JSON.parse(payloadAsString); - let token : string | undefined = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH]; + stream.open(); - let uniquenessValue = payloadAsObject["requestPayload"] as string; - let definition = publishHandlerContext.responseMap.get(uniquenessValue); - if (!definition) { - return; - } + await initialSubscriptionComplete; - let responsePayload : any = { - requestPayload: uniquenessValue - }; - if (token) { - responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = token; // skip the first character - } + stream.open(); - let encoder = new TextEncoder(); - let responsePayloadAsString = JSON.stringify(responsePayload); - adapter.triggerIncomingPublish(`${definition.topicPrefix}/accepted`, encoder.encode(responsePayloadAsString)); - }); -} - -test('request-response success - multi operation sequence', async () => { - let operations : Array = new Array( - { - topicPrefix: "test", - uniqueRequestPayload: "1", - correlationToken: "token1", - }, - { - topicPrefix: "test", - uniqueRequestPayload: "2", - correlationToken: "token2", - }, - { - topicPrefix: "test2", - uniqueRequestPayload: "3", - correlationToken: "token3", - }, - { - topicPrefix: "interrupting/cow", - uniqueRequestPayload: "4", - correlationToken: "moo", - }, - { - topicPrefix: "test", - uniqueRequestPayload: "5", - correlationToken: "token4", - }, - { - topicPrefix: "test2", - uniqueRequestPayload: "6", - correlationToken: "token5", - }, - { - topicPrefix: "provision", - uniqueRequestPayload: "7", - }, - { - topicPrefix: "provision", - uniqueRequestPayload: "8", - }, - { - topicPrefix: "create-keys-and-cert", - uniqueRequestPayload: "9", - }, - { - topicPrefix: "test", - uniqueRequestPayload: "10", - correlationToken: "token6", - }, - { - topicPrefix: "test2", - uniqueRequestPayload: "11", - correlationToken: "token7", - }, - { - topicPrefix: "provision", - uniqueRequestPayload: "12", - }, - ); - - let responseMap = operations.reduce(function(map, def) { - map.set(def.uniqueRequestPayload, def); - return map; - }, new Map()); - - let publishHandlerContext : RequestSequenceContext = { - responseMap: responseMap - } + await context.close(); - let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { - subscribeHandler: mockSubscribeSuccessHandler, - unsubscribeHandler: mockUnsubscribeSuccessHandler, - publishHandler: mockPublishSuccessHandlerSequence, - publishHandlerContext: publishHandlerContext - }; - - let context = createTestContext({ - adapterOptions: adapterOptions - }); + // Closing the client should close the operation automatically; verify that by verifying that open now generates + // an exception + expect(() => {stream.open()}).toThrow(); +}); - context.adapter.connect(); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Null Options', async () => { + // @ts-ignore + await mrr_test.do_invalid_streaming_operation_config_test(null, "invalid configuration"); +}); - let promises = new Array>(); - for (let operation of operations) { - let request = makeTestRequest(operation); - promises.push(context.client.submitRequest(request)); - } +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Undefined Options', async () => { + // @ts-ignore + await mrr_test.do_invalid_streaming_operation_config_test(undefined, "invalid configuration"); +}); - for (const [i, promise] of promises.entries()) { - let definition = operations[i]; - let response = await promise; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Null Filter', async () => { + await mrr_test.do_invalid_streaming_operation_config_test({ + // @ts-ignore + subscriptionTopicFilter : null, + }, "invalid configuration"); +}); - expect(response.topic).toEqual(`${definition.topicPrefix}/accepted`); +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Invalid Filter Type', async () => { + await mrr_test.do_invalid_streaming_operation_config_test({ + // @ts-ignore + subscriptionTopicFilter : 5, + }, "invalid configuration"); +}); - let decoder = new TextDecoder(); - let payloadAsString = decoder.decode(response.payload); - let payloadAsObject = JSON.parse(payloadAsString); - let originalRequestPayload = payloadAsObject["requestPayload"] as string; +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Invalid Filter Value', async () => { + await mrr_test.do_invalid_streaming_operation_config_test({ + subscriptionTopicFilter : "#/hello/#", + }, "Failed to create"); +}); - expect(definition.uniqueRequestPayload).toEqual(originalRequestPayload); - } - cleanupTestContext(context); -}); + */ \ No newline at end of file diff --git a/lib/browser/mqtt_request_response.ts b/lib/browser/mqtt_request_response.ts index 63b2fd9d..65523b28 100644 --- a/lib/browser/mqtt_request_response.ts +++ b/lib/browser/mqtt_request_response.ts @@ -102,6 +102,44 @@ interface ServiceTaskWrapper { nextServiceTime : number; } +function areClientOptionsValid(options: mqtt_request_response.RequestResponseClientOptions) : boolean { + if (!options) { + return false; + } + + if (!options.maxRequestResponseSubscriptions) { + return false; + } + + if (!Number.isInteger(options.maxRequestResponseSubscriptions)) { + return false; + } + + if (options.maxRequestResponseSubscriptions < 2) { + return false; + } + + if (!options.maxStreamingSubscriptions) { + return false; + } + + if (!Number.isInteger(options.maxStreamingSubscriptions)) { + return false; + } + + if (options.operationTimeoutInSeconds) { + if (!Number.isInteger(options.operationTimeoutInSeconds)) { + return false; + } + + if (options.operationTimeoutInSeconds <= 0) { + return false; + } + } + + return true; +} + /** * Native implementation of an MQTT-based request-response client tuned for AWS MQTT services. * @@ -128,6 +166,10 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ private operationQueue : Array = new Array; constructor(protocolClientAdapter: protocol_client_adapter.ProtocolClientAdapter, options: mqtt_request_response.RequestResponseClientOptions) { + if (!areClientOptionsValid(options)) { + throw new CrtError("Invalid client options passed to RequestResponseClient constructor"); + } + super(); this.operationTimeoutInSeconds = options.operationTimeoutInSeconds ?? 60; @@ -860,5 +902,7 @@ function validateRequestOptions(requestOptions: mqtt_request_response.RequestRes if (typeof(requestOptions.correlationToken) !== 'string') { throw new CrtError("Invalid request options - correlationToken is not a string"); } + } else if (requestOptions.correlationToken === null) { + throw new CrtError("Invalid request options - correlationToken null"); } } diff --git a/lib/browser/mqtt_request_response_impl.spec.ts b/lib/browser/mqtt_request_response_impl.spec.ts new file mode 100644 index 00000000..51bf7523 --- /dev/null +++ b/lib/browser/mqtt_request_response_impl.spec.ts @@ -0,0 +1,849 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +import * as protocol_adapter_mock from "./mqtt_request_response/protocol_adapter_mock"; +import * as mqtt_request_response from "./mqtt_request_response"; +import * as protocol_adapter from "./mqtt_request_response/protocol_adapter"; +import { CrtError } from "./error"; +import {MockProtocolAdapter} from "./mqtt_request_response/protocol_adapter_mock"; + +jest.setTimeout(1000000); + +interface TestContextOptions { + clientOptions?: mqtt_request_response.RequestResponseClientOptions, + adapterOptions?: protocol_adapter_mock.MockProtocolAdapterOptions +} + +interface TestContext { + client : mqtt_request_response.RequestResponseClient, + adapter: protocol_adapter_mock.MockProtocolAdapter +} + +function createTestContext(options? : TestContextOptions) : TestContext { + let adapter = new protocol_adapter_mock.MockProtocolAdapter(options?.adapterOptions); + + var clientOptions : mqtt_request_response.RequestResponseClientOptions = options?.clientOptions ?? { + maxRequestResponseSubscriptions: 4, + maxStreamingSubscriptions: 2, + operationTimeoutInSeconds: 600, + }; + + // @ts-ignore + let client = new mqtt_request_response.RequestResponseClient(adapter, clientOptions); + + return { + client: client, + adapter: adapter + }; +} + +function cleanupTestContext(context: TestContext) { + context.client.close(); +} + +test('create/destroy', async () => { + let context = createTestContext(); + cleanupTestContext(context); +}); + +async function doRequestResponseValidationFailureTest(request: mqtt_request_response.RequestResponseOperationOptions, errorSubstring: string) { + let context = createTestContext(); + + context.adapter.connect(); + + try { + await context.client.submitRequest(request); + expect(false); + } catch (err: any) { + expect(err.message).toContain(errorSubstring); + } + + cleanupTestContext(context); +} + +const DEFAULT_ACCEPTED_PATH = "a/b/accepted"; +const DEFAULT_REJECTED_PATH = "a/b/rejected"; +const DEFAULT_CORRELATION_TOKEN_PATH = "token"; +const DEFAULT_CORRELATION_TOKEN = "abcd"; + +function makeGoodRequest() : mqtt_request_response.RequestResponseOperationOptions { + var encoder = new TextEncoder(); + + return { + subscriptionTopicFilters : new Array("a/b/+"), + responsePaths: new Array({ + topic: DEFAULT_ACCEPTED_PATH, + correlationTokenJsonPath: DEFAULT_CORRELATION_TOKEN_PATH + }, { + topic: DEFAULT_REJECTED_PATH, + correlationTokenJsonPath: DEFAULT_CORRELATION_TOKEN_PATH + }), + publishTopic: "a/b/derp", + payload: encoder.encode(JSON.stringify({ + token: DEFAULT_CORRELATION_TOKEN + })), + correlationToken: DEFAULT_CORRELATION_TOKEN + }; +} + +test('request-response validation failure - null options', async () => { + // @ts-ignore + let requestOptions : mqtt_request_response.RequestResponseOperationOptions = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - null response paths', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.responsePaths = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - no response paths', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.responsePaths = new Array(); + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - null response topic', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.responsePaths[0].topic = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - response topic bad type', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.responsePaths[0].topic = 5; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - empty response topic', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.responsePaths[0].topic = ""; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - invalid response topic', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.responsePaths[0].topic = "a/#/b"; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - correlation token path bad type', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.responsePaths[0].correlationTokenJsonPath = 5; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - null publish topic', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.publishTopic = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - publish topic bad type', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.publishTopic = 5; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - empty publish topic', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.publishTopic = ""; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - invalid publish topic', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.publishTopic = "a/+"; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - null subscription topic filters', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.subscriptionTopicFilters = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - no subscription topic filters', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.subscriptionTopicFilters = new Array(); + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - null subscription topic filter', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.subscriptionTopicFilters[0] = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - subscription topic filter bad type', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.subscriptionTopicFilters[0] = 5; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - empty subscription topic filter', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.subscriptionTopicFilters[0] = ""; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - invalid subscription topic filter', async () => { + let requestOptions = makeGoodRequest(); + + requestOptions.subscriptionTopicFilters[0] = "#/a/b"; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - null payload', async () => { + let requestOptions = makeGoodRequest(); + + // @ts-ignore + requestOptions.payload = null; + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response validation failure - empty payload', async () => { + let requestOptions = makeGoodRequest(); + + let encoder = new TextEncoder(); + requestOptions.payload = encoder.encode(""); + + await doRequestResponseValidationFailureTest(requestOptions, "Invalid request options"); +}); + +test('request-response failure - interrupted by close', async () => { + let context = createTestContext(); + + context.adapter.connect(); + + let responsePromise = context.client.submitRequest(makeGoodRequest()); + + context.client.close(); + + try { + await responsePromise; + expect(false); + } catch (err: any) { + expect(err.message).toContain("client closed"); + } + + cleanupTestContext(context); +}); + +test('request-response failure - client closed', async () => { + let context = createTestContext(); + + context.adapter.connect(); + context.client.close(); + + try { + await context.client.submitRequest(makeGoodRequest()); + expect(false); + } catch (err: any) { + expect(err.message).toContain("already been closed"); + } + + cleanupTestContext(context); +}); + +test('request-response failure - timeout', async () => { + let clientOptions = { + maxRequestResponseSubscriptions: 4, + maxStreamingSubscriptions: 2, + operationTimeoutInSeconds: 2 + }; + + let context = createTestContext({ + clientOptions: clientOptions + }); + + context.adapter.connect(); + + try { + await context.client.submitRequest(makeGoodRequest()); + expect(false); + } catch (err: any) { + expect(err.message).toContain("timeout"); + } + + cleanupTestContext(context); +}); + +function mockSubscribeSuccessHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) { + setImmediate(() => { adapter.completeSubscribe(subscribeOptions.topicFilter); }); +} + +function mockUnsubscribeSuccessHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, unsubscribeOptions: protocol_adapter.UnsubscribeOptions, context?: any) { + setImmediate(() => { adapter.completeUnsubscribe(unsubscribeOptions.topicFilter); }); +} + +interface PublishHandlerContext { + responseTopic: string, + responsePayload: any +} + +function mockPublishSuccessHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + let publishHandlerContext = context as PublishHandlerContext; + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + + let decoder = new TextDecoder(); + let payloadAsString = decoder.decode(publishOptions.payload); + let payloadAsObject: any = JSON.parse(payloadAsString); + + publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH]; + + let encoder = new TextEncoder(); + let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); + adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); + }); +} + +async function do_request_response_single_success_test(responsePath: string, multiSubscribe: boolean) { + let publishHandlerContext : PublishHandlerContext = { + responseTopic: responsePath, + responsePayload: {} + } + + let adapterOptions : protocol_adapter_mock.MockProtocolAdapterOptions = { + subscribeHandler: mockSubscribeSuccessHandler, + unsubscribeHandler: mockUnsubscribeSuccessHandler, + publishHandler: mockPublishSuccessHandler, + publishHandlerContext: publishHandlerContext + }; + + let context = createTestContext({ + adapterOptions: adapterOptions, + }); + + context.adapter.connect(); + + let request = makeGoodRequest(); + if (multiSubscribe) { + request.subscriptionTopicFilters = new Array(DEFAULT_ACCEPTED_PATH, DEFAULT_REJECTED_PATH); + } + + let responsePromise = context.client.submitRequest(request); + let response = await responsePromise; + + expect(response.topic).toEqual(responsePath); + + let decoder = new TextDecoder(); + expect(decoder.decode(response.payload)).toEqual(JSON.stringify({token:DEFAULT_CORRELATION_TOKEN})); + + cleanupTestContext(context); +} + +test('request-response success - accepted response path', async () => { + await do_request_response_single_success_test(DEFAULT_ACCEPTED_PATH, false); +}); + +test('request-response success - multi-sub accepted response path', async () => { + await do_request_response_single_success_test(DEFAULT_ACCEPTED_PATH, true); +}); + +test('request-response success - rejected response path', async () => { + await do_request_response_single_success_test(DEFAULT_REJECTED_PATH, false); +}); + +test('request-response success - multi-sub rejected response path', async () => { + await do_request_response_single_success_test(DEFAULT_REJECTED_PATH, true); +}); + +function mockPublishSuccessHandlerNoToken(responseTopic: string, responsePayload: any, adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + adapter.triggerIncomingPublish(responseTopic, publishOptions.payload); + }); +} + +async function do_request_response_success_empty_correlation_token(responsePath: string, count: number) { + let adapterOptions : protocol_adapter_mock.MockProtocolAdapterOptions = { + subscribeHandler: mockSubscribeSuccessHandler, + unsubscribeHandler: mockUnsubscribeSuccessHandler, + publishHandler: (adapter, publishOptions, context) => { mockPublishSuccessHandlerNoToken(responsePath, {}, adapter, publishOptions, context); }, + }; + + let context = createTestContext({ + adapterOptions: adapterOptions, + }); + + context.adapter.connect(); + + let encoder = new TextEncoder(); + + let promises = new Array>(); + for (let i = 0; i < count; i++) { + let request = makeGoodRequest(); + delete request.correlationToken; + delete request.responsePaths[0].correlationTokenJsonPath; + delete request.responsePaths[1].correlationTokenJsonPath; + + request.payload = encoder.encode(JSON.stringify({ + requestNumber: `${i}` + })); + + promises.push(context.client.submitRequest(request)); + } + + for (const [i, promise] of promises.entries()) { + let response = await promise; + + expect(response.topic).toEqual(responsePath); + + let decoder = new TextDecoder(); + expect(decoder.decode(response.payload)).toEqual(JSON.stringify({requestNumber:`${i}`})); + } + + cleanupTestContext(context); +} + +test('request-response success - accepted response path no correlation token', async () => { + await do_request_response_success_empty_correlation_token(DEFAULT_ACCEPTED_PATH, 1); +}); + +test('request-response success - accepted response path no correlation token sequence', async () => { + await do_request_response_success_empty_correlation_token(DEFAULT_ACCEPTED_PATH, 5); +}); + +test('request-response success - rejected response path no correlation token', async () => { + await do_request_response_success_empty_correlation_token(DEFAULT_REJECTED_PATH, 1); +}); + +test('request-response success - rejected response path no correlation token sequence', async () => { + await do_request_response_success_empty_correlation_token(DEFAULT_REJECTED_PATH, 5); +}); + +interface FailingSubscribeContext { + startFailingIndex: number, + subscribesSeen: number +} + +function mockSubscribeFailureHandler(adapter: protocol_adapter_mock.MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) { + let subscribeContext = context as FailingSubscribeContext; + + if (subscribeContext.subscribesSeen >= subscribeContext.startFailingIndex) { + setImmediate(() => { + adapter.completeSubscribe(subscribeOptions.topicFilter, new CrtError("Nope")); + }); + } else { + setImmediate(() => { + adapter.completeSubscribe(subscribeOptions.topicFilter); + }); + } + + subscribeContext.subscribesSeen++; +} + +async function do_request_response_failure_subscribe(failSecondSubscribe: boolean) { + + let subscribeContext : FailingSubscribeContext = { + startFailingIndex : failSecondSubscribe ? 1 : 0, + subscribesSeen : 0, + }; + + let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { + subscribeHandler: mockSubscribeFailureHandler, + subscribeHandlerContext: subscribeContext, + unsubscribeHandler: mockUnsubscribeSuccessHandler, + }; + + let context = createTestContext({ + adapterOptions: adapterOptions, + }); + + context.adapter.connect(); + + let request = makeGoodRequest(); + if (failSecondSubscribe) { + request.subscriptionTopicFilters = new Array(DEFAULT_ACCEPTED_PATH, DEFAULT_REJECTED_PATH); + } + + try { + await context.client.submitRequest(request); + expect(false); + } catch (e) { + let err = e as Error; + expect(err.message).toContain("Subscribe failure"); + } + + cleanupTestContext(context); +} + + +test('request-response failure - subscribe failure', async () => { + await do_request_response_failure_subscribe(false); +}); + +test('request-response failure - second subscribe failure', async () => { + await do_request_response_failure_subscribe(true); +}); + +function mockPublishFailureHandlerAck(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + setImmediate(() => { + adapter.completePublish(publishOptions.completionData, new CrtError("Publish failure - No can do")); + }); +} + +test('request-response failure - publish failure', async () => { + let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { + subscribeHandler: mockSubscribeSuccessHandler, + unsubscribeHandler: mockUnsubscribeSuccessHandler, + publishHandler: mockPublishFailureHandlerAck, + }; + + let context = createTestContext({ + adapterOptions: adapterOptions, + }); + + context.adapter.connect(); + + let request = makeGoodRequest(); + + try { + await context.client.submitRequest(request); + expect(false); + } catch (e) { + let err = e as Error; + expect(err.message).toContain("Publish failure"); + } + + cleanupTestContext(context); +}); + +async function doRequestResponseFailureByTimeoutDueToResponseTest(publishHandler: (adapter: MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) => void) { + let publishHandlerContext : PublishHandlerContext = { + responseTopic: DEFAULT_ACCEPTED_PATH, + responsePayload: {} + } + + let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { + subscribeHandler: mockSubscribeSuccessHandler, + unsubscribeHandler: mockUnsubscribeSuccessHandler, + publishHandler: publishHandler, + publishHandlerContext: publishHandlerContext + }; + + let context = createTestContext({ + adapterOptions: adapterOptions, + clientOptions: { + maxRequestResponseSubscriptions: 4, + maxStreamingSubscriptions: 2, + operationTimeoutInSeconds: 2, // need a quick timeout + } + }); + + context.adapter.connect(); + + let request = makeGoodRequest(); + + try { + await context.client.submitRequest(request); + expect(false); + } catch (e) { + let err = e as Error; + expect(err.message).toContain("timeout"); + } + + cleanupTestContext(context); +} + +function mockPublishFailureHandlerInvalidResponse(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + let publishHandlerContext = context as PublishHandlerContext; + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + + let decoder = new TextDecoder(); + let payloadAsString = decoder.decode(publishOptions.payload); + let payloadAsObject: any = JSON.parse(payloadAsString); + + publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH]; + + let encoder = new TextEncoder(); + let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); + // drop the closing bracket to create a JSON deserialization error + adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString.slice(0, responsePayloadAsString.length - 1))); + }); +} + +test('request-response failure - invalid response payload', async () => { + await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerInvalidResponse); +}); + +function mockPublishFailureHandlerMissingCorrelationToken(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + let publishHandlerContext = context as PublishHandlerContext; + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + + let encoder = new TextEncoder(); + let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); + adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); + }); +} + +test('request-response failure - missing correlation token', async () => { + await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerMissingCorrelationToken); +}); + +function mockPublishFailureHandlerInvalidCorrelationTokenType(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + let publishHandlerContext = context as PublishHandlerContext; + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + + let decoder = new TextDecoder(); + let payloadAsString = decoder.decode(publishOptions.payload); + let payloadAsObject: any = JSON.parse(payloadAsString); + let tokenAsString = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH] as string; + publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = parseInt(tokenAsString, 10); + + let encoder = new TextEncoder(); + let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); + adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); + }); +} + +test('request-response failure - invalid correlation token type', async () => { + await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerInvalidCorrelationTokenType); +}); + +function mockPublishFailureHandlerNonMatchingCorrelationToken(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + let publishHandlerContext = context as PublishHandlerContext; + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + + let decoder = new TextDecoder(); + let payloadAsString = decoder.decode(publishOptions.payload); + let payloadAsObject: any = JSON.parse(payloadAsString); + let token = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH] as string; + publishHandlerContext.responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = token.substring(1); // skip the first character + + let encoder = new TextEncoder(); + let responsePayloadAsString = JSON.stringify(publishHandlerContext.responsePayload); + adapter.triggerIncomingPublish(publishHandlerContext.responseTopic, encoder.encode(responsePayloadAsString)); + }); +} + +test('request-response failure - non-matching correlation token', async () => { + await doRequestResponseFailureByTimeoutDueToResponseTest(mockPublishFailureHandlerNonMatchingCorrelationToken); +}); + +interface TestOperationDefinition { + topicPrefix: string, + uniqueRequestPayload: string, + correlationToken?: string, +} + +interface RequestSequenceContext { + responseMap: Map +} + +function makeTestRequest(definition: TestOperationDefinition): mqtt_request_response.RequestResponseOperationOptions { + let encoder = new TextEncoder(); + + let baseResponseAsObject : any = {}; + baseResponseAsObject["requestPayload"] = definition.uniqueRequestPayload; + if (definition.correlationToken) { + baseResponseAsObject[DEFAULT_CORRELATION_TOKEN_PATH] = definition.correlationToken; + } + + let options : mqtt_request_response.RequestResponseOperationOptions = { + subscriptionTopicFilters : new Array(`${definition.topicPrefix}/+`), + responsePaths: new Array({ + topic: `${definition.topicPrefix}/accepted` + }, { + topic: `${definition.topicPrefix}/rejected` + }), + publishTopic: `${definition.topicPrefix}/operation`, + payload: encoder.encode(JSON.stringify(baseResponseAsObject)), + }; + + if (definition.correlationToken) { + options.responsePaths[0].correlationTokenJsonPath = DEFAULT_CORRELATION_TOKEN_PATH; + options.responsePaths[1].correlationTokenJsonPath = DEFAULT_CORRELATION_TOKEN_PATH; + options.correlationToken = definition.correlationToken; + } + + return options; +} + +function mockPublishSuccessHandlerSequence(adapter: protocol_adapter_mock.MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) { + let publishHandlerContext = context as RequestSequenceContext; + setImmediate(() => { + adapter.completePublish(publishOptions.completionData); + + let decoder = new TextDecoder(); + let payloadAsString = decoder.decode(publishOptions.payload); + + let payloadAsObject: any = JSON.parse(payloadAsString); + let token : string | undefined = payloadAsObject[DEFAULT_CORRELATION_TOKEN_PATH]; + + let uniquenessValue = payloadAsObject["requestPayload"] as string; + let definition = publishHandlerContext.responseMap.get(uniquenessValue); + if (!definition) { + return; + } + + let responsePayload : any = { + requestPayload: uniquenessValue + }; + if (token) { + responsePayload[DEFAULT_CORRELATION_TOKEN_PATH] = token; // skip the first character + } + + let encoder = new TextEncoder(); + let responsePayloadAsString = JSON.stringify(responsePayload); + adapter.triggerIncomingPublish(`${definition.topicPrefix}/accepted`, encoder.encode(responsePayloadAsString)); + }); +} + +test('request-response success - multi operation sequence', async () => { + let operations : Array = new Array( + { + topicPrefix: "test", + uniqueRequestPayload: "1", + correlationToken: "token1", + }, + { + topicPrefix: "test", + uniqueRequestPayload: "2", + correlationToken: "token2", + }, + { + topicPrefix: "test2", + uniqueRequestPayload: "3", + correlationToken: "token3", + }, + { + topicPrefix: "interrupting/cow", + uniqueRequestPayload: "4", + correlationToken: "moo", + }, + { + topicPrefix: "test", + uniqueRequestPayload: "5", + correlationToken: "token4", + }, + { + topicPrefix: "test2", + uniqueRequestPayload: "6", + correlationToken: "token5", + }, + { + topicPrefix: "provision", + uniqueRequestPayload: "7", + }, + { + topicPrefix: "provision", + uniqueRequestPayload: "8", + }, + { + topicPrefix: "create-keys-and-cert", + uniqueRequestPayload: "9", + }, + { + topicPrefix: "test", + uniqueRequestPayload: "10", + correlationToken: "token6", + }, + { + topicPrefix: "test2", + uniqueRequestPayload: "11", + correlationToken: "token7", + }, + { + topicPrefix: "provision", + uniqueRequestPayload: "12", + }, + ); + + let responseMap = operations.reduce(function(map, def) { + map.set(def.uniqueRequestPayload, def); + return map; + }, new Map()); + + let publishHandlerContext : RequestSequenceContext = { + responseMap: responseMap + } + + let adapterOptions: protocol_adapter_mock.MockProtocolAdapterOptions = { + subscribeHandler: mockSubscribeSuccessHandler, + unsubscribeHandler: mockUnsubscribeSuccessHandler, + publishHandler: mockPublishSuccessHandlerSequence, + publishHandlerContext: publishHandlerContext + }; + + let context = createTestContext({ + adapterOptions: adapterOptions + }); + + context.adapter.connect(); + + let promises = new Array>(); + for (let operation of operations) { + let request = makeTestRequest(operation); + promises.push(context.client.submitRequest(request)); + } + + for (const [i, promise] of promises.entries()) { + let definition = operations[i]; + let response = await promise; + + expect(response.topic).toEqual(`${definition.topicPrefix}/accepted`); + + let decoder = new TextDecoder(); + let payloadAsString = decoder.decode(response.payload); + let payloadAsObject = JSON.parse(payloadAsString); + let originalRequestPayload = payloadAsObject["requestPayload"] as string; + + expect(definition.uniqueRequestPayload).toEqual(originalRequestPayload); + } + + cleanupTestContext(context); +}); diff --git a/lib/native/mqtt_request_response.spec.ts b/lib/native/mqtt_request_response.spec.ts index 6caf9ccf..c486521a 100644 --- a/lib/native/mqtt_request_response.spec.ts +++ b/lib/native/mqtt_request_response.spec.ts @@ -5,156 +5,46 @@ import * as test_env from "@test/test_env" -import * as mqtt311 from "./mqtt"; import * as mqtt5 from "./mqtt5"; import * as mqtt_request_response from "./mqtt_request_response"; import {v4 as uuid} from "uuid"; import {once} from "events"; +import * as mrr_test from "@test/mqtt_request_response"; +import * as aws_iot_5 from "./aws_iot_mqtt5"; +import * as aws_iot_311 from "./aws_iot"; import * as iot from "./iot"; -import {toUtf8} from "@aws-sdk/util-utf8-browser"; import {StreamingOperationOptions, SubscriptionStatusEvent} from "./mqtt_request_response"; import {newLiftedPromise} from "../common/promise"; +import {ProtocolVersion, TestingContext, TestingOptions} from "@test/mqtt_request_response"; jest.setTimeout(10000); -enum ProtocolVersion { - Mqtt311, - Mqtt5 -} - -interface TestingOptions { - version: ProtocolVersion, - timeoutSeconds?: number, - startOffline?: boolean, - builder_mutator5?: (builder: iot.AwsIotMqtt5ClientConfigBuilder) => iot.AwsIotMqtt5ClientConfigBuilder, - builder_mutator311?: (builder: iot.AwsIotMqttConnectionConfigBuilder) => iot.AwsIotMqttConnectionConfigBuilder, -} - -function build_protocol_client_mqtt5(builder_mutator?: (builder: iot.AwsIotMqtt5ClientConfigBuilder) => iot.AwsIotMqtt5ClientConfigBuilder) : mqtt5.Mqtt5Client { +function createClientBuilder5() : aws_iot_5.AwsIotMqtt5ClientConfigBuilder { let builder = iot.AwsIotMqtt5ClientConfigBuilder.newDirectMqttBuilderWithMtlsFromPath( test_env.AWS_IOT_ENV.MQTT5_HOST, test_env.AWS_IOT_ENV.MQTT5_RSA_CERT, test_env.AWS_IOT_ENV.MQTT5_RSA_KEY ); - builder.withConnectProperties({ - clientId : uuid(), - keepAliveIntervalSeconds: 1200, - }); - - if (builder_mutator) { - builder = builder_mutator(builder); - } - - return new mqtt5.Mqtt5Client(builder.build()); + return builder; } -function build_protocol_client_mqtt311(builder_mutator?: (builder: iot.AwsIotMqttConnectionConfigBuilder) => iot.AwsIotMqttConnectionConfigBuilder) : mqtt311.MqttClientConnection { +function createClientBuilder311() : aws_iot_311.AwsIotMqttConnectionConfigBuilder { let builder = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder_from_path(test_env.AWS_IOT_ENV.MQTT5_RSA_CERT, test_env.AWS_IOT_ENV.MQTT5_RSA_KEY); builder.with_endpoint(test_env.AWS_IOT_ENV.MQTT5_HOST); // yes, 5 not 3 - builder.with_client_id(uuid()); - if (builder_mutator) { - builder = builder_mutator(builder); - } - - let client = new mqtt311.MqttClient(); - return client.new_connection(builder.build()); + return builder; } -class TestingContext { - - mqtt311Client?: mqtt311.MqttClientConnection; - mqtt5Client?: mqtt5.Mqtt5Client; - - client: mqtt_request_response.RequestResponseClient; - - private protocolStarted : boolean = false; - - async startProtocolClient() { - if (!this.protocolStarted) { - this.protocolStarted = true; - if (this.mqtt5Client) { - let connected = once(this.mqtt5Client, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); - this.mqtt5Client.start(); - - await connected; - } - - if (this.mqtt311Client) { - await this.mqtt311Client.connect(); - } - } - } - - async stopProtocolClient() { - if (this.protocolStarted) { - this.protocolStarted = false; - if (this.mqtt5Client) { - let stopped = once(this.mqtt5Client, mqtt5.Mqtt5Client.STOPPED); - this.mqtt5Client.stop(); - await stopped; - - this.mqtt5Client.close(); - } - - if (this.mqtt311Client) { - await this.mqtt311Client.disconnect(); - } - } - } - - async publishProtocolClient(topic: string, payload: ArrayBuffer) { - if (this.mqtt5Client) { - await this.mqtt5Client.publish({ - topicName: topic, - qos: mqtt5.QoS.AtLeastOnce, - payload: payload, - }); - } - - if (this.mqtt311Client) { - await this.mqtt311Client.publish(topic, payload, mqtt311.QoS.AtLeastOnce); - } - } - - constructor(options: TestingOptions) { - if (options.version == ProtocolVersion.Mqtt5) { - this.mqtt5Client = build_protocol_client_mqtt5(options.builder_mutator5); - - let rrOptions : mqtt_request_response.RequestResponseClientOptions = { - maxRequestResponseSubscriptions : 6, - maxStreamingSubscriptions : 2, - operationTimeoutInSeconds : options.timeoutSeconds ?? 60, - } - - this.client = mqtt_request_response.RequestResponseClient.newFromMqtt5(this.mqtt5Client, rrOptions); - } else { - this.mqtt311Client = build_protocol_client_mqtt311(options.builder_mutator311); - - let rrOptions : mqtt_request_response.RequestResponseClientOptions = { - maxRequestResponseSubscriptions : 6, - maxStreamingSubscriptions : 2, - operationTimeoutInSeconds : options.timeoutSeconds ?? 60, - } - - this.client = mqtt_request_response.RequestResponseClient.newFromMqtt311(this.mqtt311Client, rrOptions); - } - } - - async open() { - await this.startProtocolClient(); - } - - async close() { - this.client.close(); - await this.stopProtocolClient(); - } +function initClientBuilderFactories() { + // @ts-ignore + mrr_test.setClientBuilderFactories(createClientBuilder5, createClientBuilder311); } test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Create Destroy Mqtt5', async () => { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt5 + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 }); await context.open(); @@ -162,396 +52,188 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Creat }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Create Destroy Mqtt311', async () => { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt311 + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt311 }); await context.open(); await context.close(); }); -function createRejectedGetNamedShadowRequest(addCorelationToken: boolean) : mqtt_request_response.RequestResponseOperationOptions { - let requestOptions : mqtt_request_response.RequestResponseOperationOptions = { - subscriptionTopicFilters: [ "$aws/things/NoSuchThing/shadow/name/Derp/get/+" ], - responsePaths: [{ - topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/accepted", - }, { - topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/rejected", - }], - publishTopic: "$aws/things/NoSuchThing/shadow/name/Derp/get", - payload: Buffer.from("{}", "utf-8"), - } - - if (addCorelationToken) { - let correlationToken = uuid(); - - requestOptions.responsePaths = [{ - topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/accepted", - correlationTokenJsonPath: "clientToken", - }, { - topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/rejected", - correlationTokenJsonPath: "clientToken", - }]; - requestOptions.payload = Buffer.from(`{\"clientToken\":\"${correlationToken}\"}`); - requestOptions.correlationToken = correlationToken; - } - - return requestOptions; -} - -async function do_get_named_shadow_success_rejected_test(version: ProtocolVersion, useCorrelationToken: boolean) : Promise { - let context = new TestingContext({ - version: version - }); - - await context.open(); - - let requestOptions = createRejectedGetNamedShadowRequest(useCorrelationToken); - - let response = await context.client.submitRequest(requestOptions); - expect(response.topic).toEqual(requestOptions.responsePaths[1].topic); - expect(response.payload.byteLength).toBeGreaterThan(0); - - let response_string = toUtf8(new Uint8Array(response.payload)); - expect(response_string).toContain("No shadow exists with name"); - - await context.close(); -} test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Success Rejected Mqtt5', async () => { - await do_get_named_shadow_success_rejected_test(ProtocolVersion.Mqtt5, true); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt5, true); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Success Rejected Mqtt311', async () => { - await do_get_named_shadow_success_rejected_test(ProtocolVersion.Mqtt311, true); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt311, true); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Success Rejected No CorrelationToken Mqtt5', async () => { - await do_get_named_shadow_success_rejected_test(ProtocolVersion.Mqtt5, false); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt5, false); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Success Rejected No CorrelationToken Mqtt311', async () => { - await do_get_named_shadow_success_rejected_test(ProtocolVersion.Mqtt311, false); -}); - -function createAcceptedUpdateNamedShadowRequest(addCorelationToken: boolean) : mqtt_request_response.RequestResponseOperationOptions { - let requestOptions : mqtt_request_response.RequestResponseOperationOptions = { - subscriptionTopicFilters: [ - "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/accepted", - "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/rejected" - ], - responsePaths: [{ - topic: "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/accepted", - }, { - topic: "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/rejected", - }], - publishTopic: "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update", - payload: Buffer.from("", "utf-8"), - } - - let desired_state = `{\"magic\":\"${uuid()}\"}`; - - if (addCorelationToken) { - let correlationToken = uuid(); - - requestOptions.responsePaths[0].correlationTokenJsonPath = "clientToken"; - requestOptions.responsePaths[1].correlationTokenJsonPath = "clientToken"; - requestOptions.correlationToken = correlationToken; - requestOptions.payload = Buffer.from(`{\"clientToken\":\"${correlationToken}\",\"state\":{\"desired\":${desired_state}}}`); - } else { - requestOptions.payload = Buffer.from(`{\"state\":{\"desired\":${desired_state}}}`); - } - - return requestOptions; -} - -async function do_update_named_shadow_success_accepted_test(version: ProtocolVersion, useCorrelationToken: boolean) : Promise { - let context = new TestingContext({ - version: version - }); - - await context.open(); - - let requestOptions = createAcceptedUpdateNamedShadowRequest(useCorrelationToken); - - let response = await context.client.submitRequest(requestOptions); - expect(response.topic).toEqual(requestOptions.responsePaths[0].topic); - expect(response.payload.byteLength).toBeGreaterThan(0); - - await context.close(); -} + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_success_rejected_test(mrr_test.ProtocolVersion.Mqtt311, false); +}); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('UpdateNamedShadow Success Accepted Mqtt5', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt5, true); + initClientBuilderFactories(); + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt5, true); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('UpdateNamedShadow Success Accepted Mqtt311', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt311, true); + initClientBuilderFactories(); + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt311, true); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('UpdateNamedShadow Success Accepted No CorrelationToken Mqtt5', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt5, false); + initClientBuilderFactories(); + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt5, false); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('UpdateNamedShadow Success Accepted No CorrelationToken Mqtt311', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt311, false); + initClientBuilderFactories(); + await mrr_test.do_update_named_shadow_success_accepted_test(mrr_test.ProtocolVersion.Mqtt311, false); }); -async function do_get_named_shadow_failure_timeout_test(version: ProtocolVersion, useCorrelationToken: boolean) : Promise { - let context = new TestingContext({ - version: version, - timeoutSeconds: 4, - }); - - await context.open(); - - let requestOptions = createRejectedGetNamedShadowRequest(useCorrelationToken); - requestOptions.publishTopic = "not/the/right/topic"; - - try { - await context.client.submitRequest(requestOptions); - expect(false); - } catch (e: any) { - expect(e).toContain("timeout"); - } - - await context.close(); -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Timeout Mqtt5', async () => { - await do_get_named_shadow_failure_timeout_test(ProtocolVersion.Mqtt5, true); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt5, true); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Timeout Mqtt311', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt311, true); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt311, true); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Timeout No CorrelationToken Mqtt5', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt5, false); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt5, false); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Timeout No CorrelationToken Mqtt311', async () => { - await do_update_named_shadow_success_accepted_test(ProtocolVersion.Mqtt311, false); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_timeout_test(mrr_test.ProtocolVersion.Mqtt311, false); }); -async function do_get_named_shadow_failure_on_close_test(version: ProtocolVersion) : Promise { - let context = new TestingContext({ - version: version, - }); - - await context.open(); - - let requestOptions = createRejectedGetNamedShadowRequest(true); - - try { - let resultPromise = context.client.submitRequest(requestOptions); - context.client.close(); - await resultPromise; - expect(false); - } catch (e: any) { - expect(e).toContain("timeout"); - } - - await context.close(); -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure On Close Mqtt5', async () => { - await do_get_named_shadow_failure_on_close_test(ProtocolVersion.Mqtt5); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_on_close_test(mrr_test.ProtocolVersion.Mqtt5, "timeout"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure On Close Mqtt311', async () => { - await do_get_named_shadow_failure_on_close_test(ProtocolVersion.Mqtt311); + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_on_close_test(mrr_test.ProtocolVersion.Mqtt311, "timeout"); }); -function do_client_creation_failure_test(version: ProtocolVersion, configMutator: (config: mqtt_request_response.RequestResponseClientOptions) => mqtt_request_response.RequestResponseClientOptions | undefined, expected_error_text: string) { - if (version == ProtocolVersion.Mqtt311) { - let protocolClient = build_protocol_client_mqtt311(); - let goodConfig : mqtt_request_response.RequestResponseClientOptions = { - maxRequestResponseSubscriptions: 2, - maxStreamingSubscriptions : 2, - operationTimeoutInSeconds : 5, - }; - let badConfig = configMutator(goodConfig); - - // @ts-ignore - expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt311(protocolClient, badConfig)}).toThrow(expected_error_text); - } else { - let protocolClient = build_protocol_client_mqtt5(); - let goodConfig : mqtt_request_response.RequestResponseClientOptions = { - maxRequestResponseSubscriptions: 2, - maxStreamingSubscriptions : 2, - operationTimeoutInSeconds : 5, - }; - let badConfig = configMutator(goodConfig); - - // @ts-ignore - expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt5(protocolClient, badConfig)}).toThrow(expected_error_text); - } -} - -function create_bad_config_no_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - maxRequestResponseSubscriptions: 0, - maxStreamingSubscriptions : config.maxStreamingSubscriptions, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure zero max request response subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_no_max_request_response_subscriptions, "An invalid argument was passed to a function"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_no_max_request_response_subscriptions, "An invalid argument was passed to a function"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure zero max request response subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_no_max_request_response_subscriptions, "An invalid argument was passed to a function"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_no_max_request_response_subscriptions, "An invalid argument was passed to a function"); }); -function create_bad_config_invalid_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - // @ts-ignore - maxRequestResponseSubscriptions: "help", - maxStreamingSubscriptions : config.maxStreamingSubscriptions, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure invalid max request response subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_invalid_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_invalid_max_request_response_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure invalid max request response subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_invalid_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_invalid_max_request_response_subscriptions, "invalid configuration options"); }); -function create_bad_config_undefined_config(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return undefined -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure undefined config mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_undefined_config, "required configuration parameter is null"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_undefined_config, "required configuration parameter is null"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure undefined config mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_undefined_config, "required configuration parameter is null"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_undefined_config, "required configuration parameter is null"); }); -function create_bad_config_undefined_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - // @ts-ignore - maxRequestResponseSubscriptions: undefined, - maxStreamingSubscriptions : config.maxStreamingSubscriptions, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure undefined max request response subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_undefined_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_undefined_max_request_response_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure undefined max request response subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_undefined_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_undefined_max_request_response_subscriptions, "invalid configuration options"); }); -function create_bad_config_null_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - // @ts-ignore - maxRequestResponseSubscriptions: null, - maxStreamingSubscriptions : config.maxStreamingSubscriptions, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure null max request response subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_null_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_null_max_request_response_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure null max request response subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_null_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_null_max_request_response_subscriptions, "invalid configuration options"); }); -function create_bad_config_missing_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - // @ts-ignore - return { - maxStreamingSubscriptions : config.maxStreamingSubscriptions, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure missing max request response subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_missing_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_missing_max_request_response_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure missing max request response subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_missing_max_request_response_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_missing_max_request_response_subscriptions, "invalid configuration options"); }); -function create_bad_config_undefined_max_streaming_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - maxRequestResponseSubscriptions: config.maxRequestResponseSubscriptions, - // @ts-ignore - maxStreamingSubscriptions : undefined, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure undefined max streaming subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_undefined_max_streaming_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_undefined_max_streaming_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure undefined max streaming subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_undefined_max_streaming_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_undefined_max_streaming_subscriptions, "invalid configuration options"); }); -function create_bad_config_null_max_streaming_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - maxRequestResponseSubscriptions: config.maxRequestResponseSubscriptions, - // @ts-ignore - maxStreamingSubscriptions : null, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure null max streaming subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_null_max_streaming_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_null_max_streaming_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure null max streaming subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_null_max_streaming_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_null_max_streaming_subscriptions, "invalid configuration options"); }); -function create_bad_config_missing_max_streaming_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - // @ts-ignore - return { - maxRequestResponseSubscriptions : config.maxRequestResponseSubscriptions, - operationTimeoutInSeconds : config.operationTimeoutInSeconds - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure missing max streaming subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_missing_max_streaming_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_missing_max_streaming_subscriptions, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure missing max streaming subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_missing_max_streaming_subscriptions, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_missing_max_streaming_subscriptions, "invalid configuration options"); }); -function create_bad_config_invalid_operation_timeout(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { - return { - maxRequestResponseSubscriptions : config.maxRequestResponseSubscriptions, - maxStreamingSubscriptions : config.maxStreamingSubscriptions, - // @ts-ignore - operationTimeoutInSeconds : "no" - } -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure missing max streaming subscriptions mqtt5', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt5, create_bad_config_invalid_operation_timeout, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt5, mrr_test.create_bad_config_invalid_operation_timeout, "invalid configuration options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Client creation failure missing max streaming subscriptions mqtt311', async() => { - do_client_creation_failure_test(ProtocolVersion.Mqtt311, create_bad_config_invalid_operation_timeout, "invalid configuration options"); + initClientBuilderFactories(); + mrr_test.do_client_creation_failure_test(mrr_test.ProtocolVersion.Mqtt311, mrr_test.create_bad_config_invalid_operation_timeout, "invalid configuration options"); }); test('Client creation failure null protocol client mqtt311', async() => { + initClientBuilderFactories(); let config : mqtt_request_response.RequestResponseClientOptions = { maxRequestResponseSubscriptions: 2, maxStreamingSubscriptions : 2, @@ -563,6 +245,7 @@ test('Client creation failure null protocol client mqtt311', async() => { }); test('Client creation failure null protocol client mqtt5', async() => { + initClientBuilderFactories(); let config : mqtt_request_response.RequestResponseClientOptions = { maxRequestResponseSubscriptions: 2, maxStreamingSubscriptions : 2, @@ -573,28 +256,9 @@ test('Client creation failure null protocol client mqtt5', async() => { expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt5(null, config)}).toThrow("protocol client is null"); }); -async function do_get_named_shadow_failure_invalid_test(useCorrelationToken: boolean, expected_error_substring: string, options_mutator: (options: mqtt_request_response.RequestResponseOperationOptions) => mqtt_request_response.RequestResponseOperationOptions) : Promise { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt5 - }); - - await context.open(); - - let requestOptions = createRejectedGetNamedShadowRequest(useCorrelationToken); - - let responsePromise = context.client.submitRequest(options_mutator(requestOptions)); - try { - await responsePromise; - expect(false); - } catch (err: any) { - expect(err.message).toContain(expected_error_substring); - } - - await context.close(); -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure No Subscription Topic Filters', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore delete new_options.subscriptionTopicFilters; @@ -604,7 +268,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Null Subscription Topic Filters', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.subscriptionTopicFilters = null; @@ -614,7 +279,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Subscription Topic Filters Not An Array', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.subscriptionTopicFilters = "null"; @@ -624,7 +290,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Subscription Topic Filters Empty', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.subscriptionTopicFilters = []; @@ -634,7 +301,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure No Response Paths', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore delete new_options.responsePaths; @@ -644,7 +312,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Null Response Paths', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths = null; @@ -654,7 +323,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Paths Not An Array', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths = "null"; @@ -664,7 +334,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Paths Empty', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths = []; @@ -674,7 +345,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Path No Topic', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore delete new_options.responsePaths[0].topic; @@ -684,7 +356,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Path Null Topic', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths[0].topic = null; @@ -694,7 +367,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Path Bad Topic Type', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths[0].topic = 5; @@ -704,7 +378,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Path Null Correlation Token Json Path', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths[0].correlationTokenJsonPath = null; @@ -714,7 +389,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Response Path Bad Correlation Token Json Path Type', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.responsePaths[0].correlationTokenJsonPath = {}; @@ -724,7 +400,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure No Publish Topic', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore delete new_options.publishTopic; @@ -734,7 +411,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Null Publish Topic', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.publishTopic = null; @@ -744,7 +422,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Bad Publish Topic Type', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.publishTopic = {someValue: null}; @@ -755,7 +434,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure No Payload', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore delete new_options.payload; @@ -765,7 +445,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Null Payload', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.payload = null; @@ -775,7 +456,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Bad Payload Type', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.payload = {notAStringOrBuffer: 21}; @@ -785,7 +467,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Null Correlation Token', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.correlationToken = null; @@ -795,7 +478,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Bad Correlation Token Type', async () => { - await do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "invalid request options", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; // @ts-ignore new_options.correlationToken = ["something"]; @@ -805,7 +489,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Protocol Invalid Topic', async () => { - await do_get_named_shadow_failure_invalid_test(true, "failure invoking native client submit_request", (options : mqtt_request_response.RequestResponseOperationOptions) => { + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "failure invoking native client submit_request", (options : mqtt_request_response.RequestResponseOperationOptions) => { let new_options = options; new_options.publishTopic = "#/illegal/#/topic"; @@ -814,7 +499,8 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Null Options', async () => { - await do_get_named_shadow_failure_invalid_test(true, "null request options", + initClientBuilderFactories(); + await mrr_test.do_get_named_shadow_failure_invalid_test(true, "null request options", // @ts-ignore (options : mqtt_request_response.RequestResponseOperationOptions) => { return null; @@ -822,14 +508,15 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNamedShadow Failure Submit After Close', async () => { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt5 + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 }); await context.open(); await context.close(); - let requestOptions = createRejectedGetNamedShadowRequest(true); + let requestOptions = mrr_test.createRejectedGetNamedShadowRequest(true); try { await context.client.submitRequest(requestOptions); expect(false); @@ -838,33 +525,17 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('GetNa } }); -async function do_streaming_operation_new_open_close_test(version: ProtocolVersion) { - let context = new TestingContext({ - version: version - }); - - await context.open(); - - let streaming_options : StreamingOperationOptions = { - subscriptionTopicFilter : "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/delta" - } - - let stream = context.client.createStream(streaming_options); - stream.open(); - stream.close(); - - await context.close(); -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Open/Close MQTT5', async () => { - await do_streaming_operation_new_open_close_test(ProtocolVersion.Mqtt5); + initClientBuilderFactories(); + await mrr_test.do_streaming_operation_new_open_close_test(mrr_test.ProtocolVersion.Mqtt5); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Open/Close MQTT311', async () => { - await do_streaming_operation_new_open_close_test(ProtocolVersion.Mqtt311); + initClientBuilderFactories(); + await mrr_test.do_streaming_operation_new_open_close_test(mrr_test.ProtocolVersion.Mqtt311); }); -async function do_streaming_operation_incoming_publish_test(version: ProtocolVersion) { +export async function do_streaming_operation_incoming_publish_test(version: ProtocolVersion) { let context = new TestingContext({ version: version }); @@ -896,16 +567,7 @@ async function do_streaming_operation_incoming_publish_test(version: ProtocolVer await context.close(); } -test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Incoming Publish MQTT5', async () => { - await do_streaming_operation_incoming_publish_test(ProtocolVersion.Mqtt5); -}); - - -test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Incoming Publish MQTT311', async () => { - await do_streaming_operation_incoming_publish_test(ProtocolVersion.Mqtt311); -}); - -async function do_streaming_operation_subscription_events_test(options: TestingOptions) { +export async function do_streaming_operation_subscription_events_test(options: TestingOptions) { let context = new TestingContext(options); await context.open(); @@ -957,11 +619,37 @@ async function do_streaming_operation_subscription_events_test(options: TestingO await context.close(); } +export async function do_invalid_streaming_operation_config_test(config: StreamingOperationOptions, expected_error: string) { + let context = new TestingContext({ + version: ProtocolVersion.Mqtt5 + }); + + await context.open(); + + expect(() => { + // @ts-ignore + context.client.createStream(config) + }).toThrow(expected_error); + + await context.close(); +} + +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Incoming Publish MQTT5', async () => { + initClientBuilderFactories(); + await do_streaming_operation_incoming_publish_test(mrr_test.ProtocolVersion.Mqtt5); +}); + +test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Incoming Publish MQTT311', async () => { + initClientBuilderFactories(); + await do_streaming_operation_incoming_publish_test(mrr_test.ProtocolVersion.Mqtt311); +}); + // We only have a 5-based test because there's no way to stop the 311 client without destroying it in the process. test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('ShadowUpdated Streaming Operation Success Subscription Events MQTT5', async () => { + initClientBuilderFactories(); await do_streaming_operation_subscription_events_test({ - version: ProtocolVersion.Mqtt5, + version: mrr_test.ProtocolVersion.Mqtt5, builder_mutator5: (builder) => { builder.withSessionBehavior(mqtt5.ClientSessionBehavior.Clean); return builder; @@ -970,14 +658,15 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Shado }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Failure Reopen', async () => { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt5 + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 }); await context.open(); let topic_filter = `not/a/real/shadow/${uuid()}`; - let streaming_options : StreamingOperationOptions = { + let streaming_options : mqtt_request_response.StreamingOperationOptions = { subscriptionTopicFilter : topic_filter, } @@ -1002,14 +691,15 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Strea }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Auto Close', async () => { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt5 + initClientBuilderFactories(); + let context = new mrr_test.TestingContext({ + version: mrr_test.ProtocolVersion.Mqtt5 }); await context.open(); let topic_filter = `not/a/real/shadow/${uuid()}`; - let streaming_options : StreamingOperationOptions = { + let streaming_options : mqtt_request_response.StreamingOperationOptions = { subscriptionTopicFilter : topic_filter, } @@ -1030,32 +720,20 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Strea expect(() => {stream.open()}).toThrow(); }); -async function do_invalid_streaming_operation_config_test(config: StreamingOperationOptions, expected_error: string) { - let context = new TestingContext({ - version: ProtocolVersion.Mqtt5 - }); - - await context.open(); - - expect(() => { - // @ts-ignore - context.client.createStream(config) - }).toThrow(expected_error); - - await context.close(); -} - test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Creation Failure Null Options', async () => { + initClientBuilderFactories(); // @ts-ignore await do_invalid_streaming_operation_config_test(null, "invalid configuration"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Creation Failure Undefined Options', async () => { + initClientBuilderFactories(); // @ts-ignore await do_invalid_streaming_operation_config_test(undefined, "invalid configuration"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Creation Failure Null Filter', async () => { + initClientBuilderFactories(); await do_invalid_streaming_operation_config_test({ // @ts-ignore subscriptionTopicFilter : null, @@ -1063,6 +741,7 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Strea }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Creation Failure Invalid Filter Type', async () => { + initClientBuilderFactories(); await do_invalid_streaming_operation_config_test({ // @ts-ignore subscriptionTopicFilter : 5, @@ -1070,6 +749,7 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Strea }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_mtls_rsa())('Streaming Operation Creation Failure Invalid Filter Value', async () => { + initClientBuilderFactories(); await do_invalid_streaming_operation_config_test({ subscriptionTopicFilter : "#/hello/#", }, "Failed to create"); diff --git a/lib/native/mqtt_request_response.ts b/lib/native/mqtt_request_response.ts index 3e6cbfb9..ecf62f8f 100644 --- a/lib/native/mqtt_request_response.ts +++ b/lib/native/mqtt_request_response.ts @@ -227,15 +227,17 @@ export class RequestResponseClient extends NativeResourceMixin(BufferedEventEmit * client, one layer up), such a payload may actually indicate a failure. */ async submitRequest(requestOptions: mqtt_request_response.RequestResponseOperationOptions): Promise { - if (this.state == mqtt_request_response_internal.RequestResponseClientState.Closed) { - throw new CrtError("MQTT request-response client has already been closed"); - } + return new Promise((resolve, reject) => { + if (this.state == mqtt_request_response_internal.RequestResponseClientState.Closed) { + reject(new CrtError("MQTT request-response client has already been closed")); + return; + } - if (!requestOptions) { - throw new CrtError("null request options"); - } + if (!requestOptions) { + reject(new CrtError("null request options")); + return; + } - return new Promise((resolve, reject) => { function curriedPromiseCallback(errorCode: number, topic?: string, response?: ArrayBuffer){ return RequestResponseClient._s_on_request_completion(resolve, reject, errorCode, topic, response); } @@ -298,7 +300,7 @@ export class RequestResponseClient extends NativeResourceMixin(BufferedEventEmit } resolve(response); } else { - reject(error_code_to_string(errorCode)); + reject(new CrtError(error_code_to_string(errorCode))); } } } diff --git a/test/mqtt_request_response.ts b/test/mqtt_request_response.ts new file mode 100644 index 00000000..b2ad5051 --- /dev/null +++ b/test/mqtt_request_response.ts @@ -0,0 +1,443 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +import * as iot from "@awscrt/iot"; +import * as mqtt5 from "@awscrt/mqtt5"; +import * as test_env from "./test_env"; +import {v4 as uuid} from "uuid"; +import * as mqtt311 from "@awscrt/mqtt"; +import * as mqtt_request_response from "@awscrt/mqtt_request_response"; +import {once} from "events"; +import {toUtf8} from "@aws-sdk/util-utf8-browser"; +import {StreamingOperationOptions} from "@awscrt/mqtt_request_response"; + +export type ClientBuilderFactory5 = () => iot.AwsIotMqtt5ClientConfigBuilder; +export type ClientBuilderFactory311 = () => iot.AwsIotMqttConnectionConfigBuilder; + +var testBuilderFactory5 : ClientBuilderFactory5 | undefined = undefined; +var testBuilderFactory311 : ClientBuilderFactory311 | undefined = undefined; + +export function setClientBuilderFactories(factory5: ClientBuilderFactory5, factory311: ClientBuilderFactory311) { + testBuilderFactory5 = factory5; + testBuilderFactory311 = factory311; +} + +export enum ProtocolVersion { + Mqtt311, + Mqtt5 +} + +export interface TestingOptions { + version: ProtocolVersion, + timeoutSeconds?: number, + startOffline?: boolean, + builder_mutator5?: (builder: iot.AwsIotMqtt5ClientConfigBuilder) => iot.AwsIotMqtt5ClientConfigBuilder, + builder_mutator311?: (builder: iot.AwsIotMqttConnectionConfigBuilder) => iot.AwsIotMqttConnectionConfigBuilder, +} + +export function build_protocol_client_mqtt5(builder: iot.AwsIotMqtt5ClientConfigBuilder, builder_mutator?: (builder: iot.AwsIotMqtt5ClientConfigBuilder) => iot.AwsIotMqtt5ClientConfigBuilder) : mqtt5.Mqtt5Client { + builder.withConnectProperties({ + clientId : uuid(), + keepAliveIntervalSeconds: 1200, + }); + + if (builder_mutator) { + builder = builder_mutator(builder); + } + + return new mqtt5.Mqtt5Client(builder.build()); +} + +export function build_protocol_client_mqtt311(builder: iot.AwsIotMqttConnectionConfigBuilder, builder_mutator?: (builder: iot.AwsIotMqttConnectionConfigBuilder) => iot.AwsIotMqttConnectionConfigBuilder) : mqtt311.MqttClientConnection { + builder.with_endpoint(test_env.AWS_IOT_ENV.MQTT5_HOST); // yes, 5 not 3 + builder.with_client_id(uuid()); + + if (builder_mutator) { + builder = builder_mutator(builder); + } + + let client = new mqtt311.MqttClient(); + return client.new_connection(builder.build()); +} + +export class TestingContext { + + mqtt311Client?: mqtt311.MqttClientConnection; + mqtt5Client?: mqtt5.Mqtt5Client; + + client: mqtt_request_response.RequestResponseClient; + + private protocolStarted : boolean = false; + + async startProtocolClient() { + if (!this.protocolStarted) { + this.protocolStarted = true; + if (this.mqtt5Client) { + let connected = once(this.mqtt5Client, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); + this.mqtt5Client.start(); + + await connected; + } + + if (this.mqtt311Client) { + await this.mqtt311Client.connect(); + } + } + } + + async stopProtocolClient() { + if (this.protocolStarted) { + this.protocolStarted = false; + if (this.mqtt5Client) { + let stopped = once(this.mqtt5Client, mqtt5.Mqtt5Client.STOPPED); + this.mqtt5Client.stop(); + await stopped; + + this.mqtt5Client.close(); + } + + if (this.mqtt311Client) { + await this.mqtt311Client.disconnect(); + } + } + } + + async publishProtocolClient(topic: string, payload: ArrayBuffer) { + if (this.mqtt5Client) { + await this.mqtt5Client.publish({ + topicName: topic, + qos: mqtt5.QoS.AtLeastOnce, + payload: payload, + }); + } + + if (this.mqtt311Client) { + await this.mqtt311Client.publish(topic, payload, mqtt311.QoS.AtLeastOnce); + } + } + + constructor(options: TestingOptions) { + if (options.version == ProtocolVersion.Mqtt5) { + // @ts-ignore + this.mqtt5Client = build_protocol_client_mqtt5(testBuilderFactory5(), options.builder_mutator5); + + let rrOptions : mqtt_request_response.RequestResponseClientOptions = { + maxRequestResponseSubscriptions : 6, + maxStreamingSubscriptions : 2, + operationTimeoutInSeconds : options.timeoutSeconds ?? 60, + } + + this.client = mqtt_request_response.RequestResponseClient.newFromMqtt5(this.mqtt5Client, rrOptions); + } else { + // @ts-ignore + this.mqtt311Client = build_protocol_client_mqtt311(testBuilderFactory311(), options.builder_mutator311); + + let rrOptions : mqtt_request_response.RequestResponseClientOptions = { + maxRequestResponseSubscriptions : 6, + maxStreamingSubscriptions : 2, + operationTimeoutInSeconds : options.timeoutSeconds ?? 60, + } + + this.client = mqtt_request_response.RequestResponseClient.newFromMqtt311(this.mqtt311Client, rrOptions); + } + } + + async open() { + await this.startProtocolClient(); + } + + async close() { + this.client.close(); + await this.stopProtocolClient(); + } +} + +export function createRejectedGetNamedShadowRequest(addCorelationToken: boolean) : mqtt_request_response.RequestResponseOperationOptions { + let requestOptions : mqtt_request_response.RequestResponseOperationOptions = { + subscriptionTopicFilters: [ "$aws/things/NoSuchThing/shadow/name/Derp/get/+" ], + responsePaths: [{ + topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/accepted", + }, { + topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/rejected", + }], + publishTopic: "$aws/things/NoSuchThing/shadow/name/Derp/get", + payload: Buffer.from("{}", "utf-8"), + } + + if (addCorelationToken) { + let correlationToken = uuid(); + + requestOptions.responsePaths = [{ + topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/accepted", + correlationTokenJsonPath: "clientToken", + }, { + topic: "$aws/things/NoSuchThing/shadow/name/Derp/get/rejected", + correlationTokenJsonPath: "clientToken", + }]; + requestOptions.payload = Buffer.from(`{\"clientToken\":\"${correlationToken}\"}`); + requestOptions.correlationToken = correlationToken; + } + + return requestOptions; +} + +export async function do_get_named_shadow_success_rejected_test(version: ProtocolVersion, useCorrelationToken: boolean) : Promise { + let context = new TestingContext({ + version: version + }); + + await context.open(); + + let requestOptions = createRejectedGetNamedShadowRequest(useCorrelationToken); + + let response = await context.client.submitRequest(requestOptions); + expect(response.topic).toEqual(requestOptions.responsePaths[1].topic); + expect(response.payload.byteLength).toBeGreaterThan(0); + + let response_string = toUtf8(new Uint8Array(response.payload)); + expect(response_string).toContain("No shadow exists with name"); + + await context.close(); +} + +export function createAcceptedUpdateNamedShadowRequest(addCorelationToken: boolean) : mqtt_request_response.RequestResponseOperationOptions { + let requestOptions : mqtt_request_response.RequestResponseOperationOptions = { + subscriptionTopicFilters: [ + "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/accepted", + "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/rejected" + ], + responsePaths: [{ + topic: "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/accepted", + }, { + topic: "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/rejected", + }], + publishTopic: "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update", + payload: Buffer.from("", "utf-8"), + } + + let desired_state = `{\"magic\":\"${uuid()}\"}`; + + if (addCorelationToken) { + let correlationToken = uuid(); + + requestOptions.responsePaths[0].correlationTokenJsonPath = "clientToken"; + requestOptions.responsePaths[1].correlationTokenJsonPath = "clientToken"; + requestOptions.correlationToken = correlationToken; + requestOptions.payload = Buffer.from(`{\"clientToken\":\"${correlationToken}\",\"state\":{\"desired\":${desired_state}}}`); + } else { + requestOptions.payload = Buffer.from(`{\"state\":{\"desired\":${desired_state}}}`); + } + + return requestOptions; +} + +export async function do_update_named_shadow_success_accepted_test(version: ProtocolVersion, useCorrelationToken: boolean) : Promise { + let context = new TestingContext({ + version: version + }); + + await context.open(); + + let requestOptions = createAcceptedUpdateNamedShadowRequest(useCorrelationToken); + + let response = await context.client.submitRequest(requestOptions); + expect(response.topic).toEqual(requestOptions.responsePaths[0].topic); + expect(response.payload.byteLength).toBeGreaterThan(0); + + await context.close(); +} + +export async function do_get_named_shadow_failure_timeout_test(version: ProtocolVersion, useCorrelationToken: boolean) : Promise { + let context = new TestingContext({ + version: version, + timeoutSeconds: 4, + }); + + await context.open(); + + let requestOptions = createRejectedGetNamedShadowRequest(useCorrelationToken); + requestOptions.publishTopic = "not/the/right/topic"; + + try { + await context.client.submitRequest(requestOptions); + expect(false); + } catch (e) { + let err = e as Error; + expect(err.message).toContain("timeout"); + } + + await context.close(); +} + +export async function do_get_named_shadow_failure_on_close_test(version: ProtocolVersion, expectedFailureSubstring: string) : Promise { + let context = new TestingContext({ + version: version, + }); + + await context.open(); + + let requestOptions = createRejectedGetNamedShadowRequest(true); + + try { + let resultPromise = context.client.submitRequest(requestOptions); + context.client.close(); + await resultPromise; + expect(false); + } catch (e) { + let err = e as Error; + expect(err.message).toContain(expectedFailureSubstring); + } + + await context.close(); +} + +export function do_client_creation_failure_test(version: ProtocolVersion, configMutator: (config: mqtt_request_response.RequestResponseClientOptions) => mqtt_request_response.RequestResponseClientOptions | undefined, expected_error_text: string) { + if (version == ProtocolVersion.Mqtt311) { + // @ts-ignore + let protocolClient = build_protocol_client_mqtt311(testBuilderFactory311()); + let goodConfig : mqtt_request_response.RequestResponseClientOptions = { + maxRequestResponseSubscriptions: 2, + maxStreamingSubscriptions : 2, + operationTimeoutInSeconds : 5, + }; + let badConfig = configMutator(goodConfig); + + // @ts-ignore + expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt311(protocolClient, badConfig)}).toThrow(expected_error_text); + } else { + // @ts-ignore + let protocolClient = build_protocol_client_mqtt5(testBuilderFactory5()); + let goodConfig : mqtt_request_response.RequestResponseClientOptions = { + maxRequestResponseSubscriptions: 2, + maxStreamingSubscriptions : 2, + operationTimeoutInSeconds : 5, + }; + let badConfig = configMutator(goodConfig); + + // @ts-ignore + expect(() => {mqtt_request_response.RequestResponseClient.newFromMqtt5(protocolClient, badConfig)}).toThrow(expected_error_text); + } +} + +export function create_bad_config_no_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + maxRequestResponseSubscriptions: 0, + maxStreamingSubscriptions : config.maxStreamingSubscriptions, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_invalid_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + // @ts-ignore + maxRequestResponseSubscriptions: "help", + maxStreamingSubscriptions : config.maxStreamingSubscriptions, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_undefined_config(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return undefined +} + +export function create_bad_config_undefined_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + // @ts-ignore + maxRequestResponseSubscriptions: undefined, + maxStreamingSubscriptions : config.maxStreamingSubscriptions, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_null_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + // @ts-ignore + maxRequestResponseSubscriptions: null, + maxStreamingSubscriptions : config.maxStreamingSubscriptions, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_missing_max_request_response_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + // @ts-ignore + return { + maxStreamingSubscriptions : config.maxStreamingSubscriptions, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_undefined_max_streaming_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + maxRequestResponseSubscriptions: config.maxRequestResponseSubscriptions, + // @ts-ignore + maxStreamingSubscriptions : undefined, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_null_max_streaming_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + maxRequestResponseSubscriptions: config.maxRequestResponseSubscriptions, + // @ts-ignore + maxStreamingSubscriptions : null, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_missing_max_streaming_subscriptions(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + // @ts-ignore + return { + maxRequestResponseSubscriptions : config.maxRequestResponseSubscriptions, + operationTimeoutInSeconds : config.operationTimeoutInSeconds + } +} + +export function create_bad_config_invalid_operation_timeout(config: mqtt_request_response.RequestResponseClientOptions) : mqtt_request_response.RequestResponseClientOptions | undefined { + return { + maxRequestResponseSubscriptions : config.maxRequestResponseSubscriptions, + maxStreamingSubscriptions : config.maxStreamingSubscriptions, + // @ts-ignore + operationTimeoutInSeconds : "no" + } +} + +export async function do_get_named_shadow_failure_invalid_test(useCorrelationToken: boolean, expected_error_substring: string, options_mutator: (options: mqtt_request_response.RequestResponseOperationOptions) => mqtt_request_response.RequestResponseOperationOptions) : Promise { + let context = new TestingContext({ + version: ProtocolVersion.Mqtt5 + }); + + await context.open(); + + let requestOptions = createRejectedGetNamedShadowRequest(useCorrelationToken); + + let responsePromise = context.client.submitRequest(options_mutator(requestOptions)); + try { + await responsePromise; + expect(false); + } catch (err: any) { + expect(err.message).toContain(expected_error_substring); + } + + await context.close(); +} + +export async function do_streaming_operation_new_open_close_test(version: ProtocolVersion) { + let context = new TestingContext({ + version: version + }); + + await context.open(); + + let streaming_options : StreamingOperationOptions = { + subscriptionTopicFilter : "$aws/things/NoSuchThing/shadow/name/UpdateShadowCITest/update/delta" + } + + let stream = context.client.createStream(streaming_options); + stream.open(); + stream.close(); + + await context.close(); +} + diff --git a/test/test_env.ts b/test/test_env.ts index 690e0a0e..1d8d1647 100644 --- a/test/test_env.ts +++ b/test/test_env.ts @@ -108,8 +108,7 @@ export class AWS_IOT_ENV { return AWS_IOT_ENV.MQTT5_HOST !== "" && AWS_IOT_ENV.MQTT5_REGION !== "" && AWS_IOT_ENV.MQTT5_CRED_ACCESS_KEY !== "" && - AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY !== "" && - AWS_IOT_ENV.MQTT5_CRED_SESSION_TOKEN !== ""; + AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY !== ""; } public static mqtt5_is_valid_cognito() {