Skip to content

Commit

Permalink
Indirect support for mqtt request response
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Aug 26, 2024
1 parent e14deae commit 50cac1e
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 6 deletions.
32 changes: 32 additions & 0 deletions lib/browser/mqtt_request_response/protocol_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import * as mqtt311 from "../mqtt";
import * as mqtt5 from "../mqtt5";
import * as mqtt_request_response from "../../common/mqtt_request_response";
import {BufferedEventEmitter} from "../../common/event";
import {QoS} from "../mqtt";


const MS_PER_SECOND : number = 1000;
Expand Down Expand Up @@ -71,6 +72,13 @@ export interface ConnectionStatusEvent {

export type ConnectionStatusEventListener = (event: ConnectionStatusEvent) => void;

export interface IncomingPublishEvent {
topic: string,
payload?: ArrayBuffer
}

export type IncomingPublishEventListener = (event: IncomingPublishEvent) => void;

/*
* Provides a client-agnostic wrapper around the MQTT functionality needed by the browser request-response client.
*
Expand Down Expand Up @@ -101,6 +109,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
})});
};

private incomingPublishListener5 : mqtt5.MessageReceivedEventListener = (event: mqtt5.MessageReceivedEvent) => {
setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, {
topic: event.message.topicName,
payload: event.message.payload
})});
};

private connectionSuccessListener311 : mqtt311.MqttConnectionSuccess = (event : mqtt311.OnConnectionSuccessResult) => {
this.connectionState = ConnectionState.Connected;
setImmediate(() => { this.emit(ProtocolClientAdapter.CONNECTION_STATUS, {
Expand All @@ -116,6 +131,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
})});
};

private incomingPublishListener311 : mqtt311.OnMessageCallback = (topic: string, payload: ArrayBuffer, dup: boolean, qos: QoS, retain: boolean) => {
setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, {
topic: topic,
payload: payload
})});
};

private constructor() {
super();

Expand All @@ -130,6 +152,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

client.addListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, adapter.connectionSuccessListener5);
client.addListener(mqtt5.Mqtt5Client.DISCONNECTION, adapter.disconnectionListener5);
client.addListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, adapter.incomingPublishListener5);

adapter.connectionState = client.isConnected() ? ConnectionState.Connected : ConnectionState.Disconnected;

return adapter;
Expand All @@ -142,6 +166,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

client.addListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, adapter.connectionSuccessListener311);
client.addListener(mqtt311.MqttClientConnection.DISCONNECT, adapter.disconnectionListener311);
client.addListener(mqtt311.MqttClientConnection.MESSAGE, adapter.incomingPublishListener311);

adapter.connectionState = client.is_connected() ? ConnectionState.Connected : ConnectionState.Disconnected;

return adapter;
Expand All @@ -157,12 +183,14 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
if (this.client5) {
this.client5.removeListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, this.connectionSuccessListener5);
this.client5.removeListener(mqtt5.Mqtt5Client.DISCONNECTION, this.disconnectionListener5);
this.client5.removeListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, this.incomingPublishListener5);
this.client5 = undefined;
}

if (this.client311) {
this.client311.removeListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, this.connectionSuccessListener311);
this.client311.removeListener(mqtt311.MqttClientConnection.DISCONNECT, this.disconnectionListener311);
this.client311.removeListener(mqtt311.MqttClientConnection.MESSAGE, this.incomingPublishListener311);
this.client311 = undefined;
}
}
Expand Down Expand Up @@ -434,6 +462,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

static CONNECTION_STATUS : string = 'connectionStatus';

static INCOMING_PUBLISH : string = 'incomingPublish';

on(event: 'publishCompletion', listener: PublishCompletionEventListener): this;

on(event: 'subscribeCompletion', listener: SubscribeCompletionEventListener): this;
Expand All @@ -442,6 +472,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

on(event: 'connectionStatus', listener: ConnectionStatusEventListener): this;

on(event: 'incomingPublish', listener: IncomingPublishEventListener): this;

on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
Expand Down
42 changes: 38 additions & 4 deletions lib/browser/mqtt_request_response/protocol_adapter_mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as protocol_adapter from "./protocol_adapter";
import {BufferedEventEmitter} from "../../common/event";
import {ICrtError} from "../../common/error";
import * as subscription_manager from "./subscription_manager";
import {IncomingPublishEventListener} from "./protocol_adapter";


export interface ProtocolAdapterApiCall {
Expand All @@ -16,8 +17,12 @@ export interface ProtocolAdapterApiCall {
}

export interface MockProtocolAdapterOptions {
subscribeHandler?: (subscribeOptions: protocol_adapter.SubscribeOptions) => void,
unsubscribeHandler?: (unsubscribeOptions: protocol_adapter.UnsubscribeOptions) => void,
subscribeHandler?: (adapter: MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) => void,
subscribeHandlerContext?: any,
unsubscribeHandler?: (adapter: MockProtocolAdapter, unsubscribeOptions: protocol_adapter.UnsubscribeOptions, context?: any) => void,
unsubscribeHandlerContext?: any,
publishHandler?: (adapter: MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) => void,
publishHandlerContext?: any,
}

export class MockProtocolAdapter extends BufferedEventEmitter {
Expand All @@ -39,6 +44,10 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
methodName: "publish",
args: publishOptions
});

if (this.options && this.options.publishHandler) {
this.options.publishHandler(this, publishOptions, this.options.publishHandlerContext);
}
}

subscribe(subscribeOptions : protocol_adapter.SubscribeOptions) : void {
Expand All @@ -48,7 +57,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
});

if (this.options && this.options.subscribeHandler) {
this.options.subscribeHandler(subscribeOptions);
this.options.subscribeHandler(this, subscribeOptions, this.options.subscribeHandlerContext);
}
}

Expand All @@ -59,7 +68,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
});

if (this.options && this.options.unsubscribeHandler) {
this.options.unsubscribeHandler(unsubscribeOptions);
this.options.unsubscribeHandler(this, unsubscribeOptions,this.options.unsubscribeHandlerContext);
}
}

Expand Down Expand Up @@ -104,6 +113,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
event.retryable = retryable;
}

// TODO - rework tests to pass with deferred event emission
this.emit(protocol_adapter.ProtocolClientAdapter.SUBSCRIBE_COMPLETION, event);
}

Expand All @@ -118,9 +128,31 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
event.retryable = retryable;
}

// TODO - rework tests to pass with deferred event emission
this.emit(protocol_adapter.ProtocolClientAdapter.UNSUBSCRIBE_COMPLETION, event);
}

completePublish(completionData: any, err?: ICrtError) : void {
let event : protocol_adapter.PublishCompletionEvent = {
completionData: completionData
};

if (err) {
event.err = err;
}

this.emit(protocol_adapter.ProtocolClientAdapter.PUBLISH_COMPLETION, event);
}

triggerIncomingPublish(topic: string, payload: ArrayBuffer) : void {
let event : protocol_adapter.IncomingPublishEvent = {
topic : topic,
payload: payload
};

this.emit(protocol_adapter.ProtocolClientAdapter.INCOMING_PUBLISH, event);
}

// Events
on(event: 'publishCompletion', listener: protocol_adapter.PublishCompletionEventListener): this;

Expand All @@ -130,6 +162,8 @@ export class MockProtocolAdapter extends BufferedEventEmitter {

on(event: 'connectionStatus', listener: protocol_adapter.ConnectionStatusEventListener): this;

on(event: 'incomingPublish', listener: IncomingPublishEventListener): this;

on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
Expand Down
17 changes: 17 additions & 0 deletions lib/browser/mqtt_request_response/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ export enum AcquireSubscriptionResult {
Failure,
}

export function acquireSubscriptionResultToString(result: AcquireSubscriptionResult) : string {
switch (result) {
case AcquireSubscriptionResult.Subscribed:
return "Subscribed";
case AcquireSubscriptionResult.Subscribing:
return "Subscribing";
case AcquireSubscriptionResult.Blocked:
return "Blocked";
case AcquireSubscriptionResult.NoCapacity:
return "NoCapacity";
case AcquireSubscriptionResult.Failure:
return "Failure";
default:
return "Unknown";
}
}

export interface SubscriptionManagerConfig {
maxRequestResponseSubscriptions: number,
maxStreamingSubscriptions: number,
Expand Down
20 changes: 20 additions & 0 deletions lib/common/mqtt_request_response_internal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

/**
* @packageDocumentation
* @module mqtt_request_response
*/

export enum StreamingOperationState {
None,
Open,
Closed,
}

export enum RequestResponseClientState {
Ready,
Closed
}
62 changes: 62 additions & 0 deletions lib/common/mqtt_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,65 @@ export function normalize_payload(payload: any): Buffer | string {

/** @internal */
export const DEFAULT_KEEP_ALIVE : number = 1200;


function isValidTopicInternal(topic: string, isFilter: boolean) : boolean {
if (topic.length === 0 || topic.length > 65535) {
return false;
}

let sawHash : boolean = false;
for (let segment of topic.split('/')) {
if (sawHash) {
return false;
}

if (segment.length === 0) {
continue;
}

if (segment.includes("+")) {
if (!isFilter) {
return false;
}

if (segment.length > 1) {
return false;
}
}

if (segment.includes("#")) {
if (!isFilter) {
return false;
}

if (segment.length > 1) {
return false;
}

sawHash = true;
}
}

return true;
}

export function isValidTopicFilter(topicFilter: any) : boolean {
if (typeof(topicFilter) !== 'string') {
return false;
}

let topicFilterAsString = topicFilter as string;

return isValidTopicInternal(topicFilterAsString, true);
}

export function isValidTopic(topic: any) : boolean {
if (typeof(topic) !== 'string') {
return false;
}

let topicAsString = topic as string;

return isValidTopicInternal(topicAsString, false);
}
3 changes: 1 addition & 2 deletions test/test_env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ export class AWS_IOT_ENV {
return AWS_IOT_ENV.MQTT5_HOST !== "" &&
AWS_IOT_ENV.MQTT5_REGION !== "" &&
AWS_IOT_ENV.MQTT5_CRED_ACCESS_KEY !== "" &&
AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY !== "" &&
AWS_IOT_ENV.MQTT5_CRED_SESSION_TOKEN !== "";
AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY !== "";
}

public static mqtt5_is_valid_cognito() {
Expand Down

0 comments on commit 50cac1e

Please sign in to comment.