Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request response browser3 #576

Closed
wants to merge 13 commits into from
601 changes: 601 additions & 0 deletions lib/browser/mqtt_request_response.spec.ts

Large diffs are not rendered by default.

894 changes: 894 additions & 0 deletions lib/browser/mqtt_request_response.ts

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions lib/browser/mqtt_request_response/protocol_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import * as mqtt311 from "../mqtt";
import * as mqtt5 from "../mqtt5";
import * as mqtt_request_response from "../../common/mqtt_request_response";
import {BufferedEventEmitter} from "../../common/event";
import {QoS} from "../mqtt";


const MS_PER_SECOND : number = 1000;
Expand Down Expand Up @@ -71,6 +72,13 @@ export interface ConnectionStatusEvent {

export type ConnectionStatusEventListener = (event: ConnectionStatusEvent) => void;

export interface IncomingPublishEvent {
topic: string,
payload?: ArrayBuffer
}

export type IncomingPublishEventListener = (event: IncomingPublishEvent) => void;

/*
* Provides a client-agnostic wrapper around the MQTT functionality needed by the browser request-response client.
*
Expand Down Expand Up @@ -101,6 +109,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
})});
};

private incomingPublishListener5 : mqtt5.MessageReceivedEventListener = (event: mqtt5.MessageReceivedEvent) => {
setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, {
topic: event.message.topicName,
payload: event.message.payload
})});
};

private connectionSuccessListener311 : mqtt311.MqttConnectionSuccess = (event : mqtt311.OnConnectionSuccessResult) => {
this.connectionState = ConnectionState.Connected;
setImmediate(() => { this.emit(ProtocolClientAdapter.CONNECTION_STATUS, {
Expand All @@ -116,6 +131,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
})});
};

private incomingPublishListener311 : mqtt311.OnMessageCallback = (topic: string, payload: ArrayBuffer, dup: boolean, qos: QoS, retain: boolean) => {
setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, {
topic: topic,
payload: payload
})});
};

private constructor() {
super();

Expand All @@ -130,6 +152,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

client.addListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, adapter.connectionSuccessListener5);
client.addListener(mqtt5.Mqtt5Client.DISCONNECTION, adapter.disconnectionListener5);
client.addListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, adapter.incomingPublishListener5);

adapter.connectionState = client.isConnected() ? ConnectionState.Connected : ConnectionState.Disconnected;

return adapter;
Expand All @@ -142,6 +166,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

client.addListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, adapter.connectionSuccessListener311);
client.addListener(mqtt311.MqttClientConnection.DISCONNECT, adapter.disconnectionListener311);
client.addListener(mqtt311.MqttClientConnection.MESSAGE, adapter.incomingPublishListener311);

adapter.connectionState = client.is_connected() ? ConnectionState.Connected : ConnectionState.Disconnected;

return adapter;
Expand All @@ -157,12 +183,14 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
if (this.client5) {
this.client5.removeListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, this.connectionSuccessListener5);
this.client5.removeListener(mqtt5.Mqtt5Client.DISCONNECTION, this.disconnectionListener5);
this.client5.removeListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, this.incomingPublishListener5);
this.client5 = undefined;
}

if (this.client311) {
this.client311.removeListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, this.connectionSuccessListener311);
this.client311.removeListener(mqtt311.MqttClientConnection.DISCONNECT, this.disconnectionListener311);
this.client311.removeListener(mqtt311.MqttClientConnection.MESSAGE, this.incomingPublishListener311);
this.client311 = undefined;
}
}
Expand Down Expand Up @@ -434,6 +462,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

static CONNECTION_STATUS : string = 'connectionStatus';

static INCOMING_PUBLISH : string = 'incomingPublish';

on(event: 'publishCompletion', listener: PublishCompletionEventListener): this;

on(event: 'subscribeCompletion', listener: SubscribeCompletionEventListener): this;
Expand All @@ -442,6 +472,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

on(event: 'connectionStatus', listener: ConnectionStatusEventListener): this;

on(event: 'incomingPublish', listener: IncomingPublishEventListener): this;

on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
Expand Down
42 changes: 38 additions & 4 deletions lib/browser/mqtt_request_response/protocol_adapter_mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as protocol_adapter from "./protocol_adapter";
import {BufferedEventEmitter} from "../../common/event";
import {ICrtError} from "../../common/error";
import * as subscription_manager from "./subscription_manager";
import {IncomingPublishEventListener} from "./protocol_adapter";


export interface ProtocolAdapterApiCall {
Expand All @@ -16,8 +17,12 @@ export interface ProtocolAdapterApiCall {
}

export interface MockProtocolAdapterOptions {
subscribeHandler?: (subscribeOptions: protocol_adapter.SubscribeOptions) => void,
unsubscribeHandler?: (unsubscribeOptions: protocol_adapter.UnsubscribeOptions) => void,
subscribeHandler?: (adapter: MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) => void,
subscribeHandlerContext?: any,
unsubscribeHandler?: (adapter: MockProtocolAdapter, unsubscribeOptions: protocol_adapter.UnsubscribeOptions, context?: any) => void,
unsubscribeHandlerContext?: any,
publishHandler?: (adapter: MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) => void,
publishHandlerContext?: any,
}

export class MockProtocolAdapter extends BufferedEventEmitter {
Expand All @@ -39,6 +44,10 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
methodName: "publish",
args: publishOptions
});

if (this.options && this.options.publishHandler) {
this.options.publishHandler(this, publishOptions, this.options.publishHandlerContext);
}
}

subscribe(subscribeOptions : protocol_adapter.SubscribeOptions) : void {
Expand All @@ -48,7 +57,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
});

if (this.options && this.options.subscribeHandler) {
this.options.subscribeHandler(subscribeOptions);
this.options.subscribeHandler(this, subscribeOptions, this.options.subscribeHandlerContext);
}
}

Expand All @@ -59,7 +68,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
});

if (this.options && this.options.unsubscribeHandler) {
this.options.unsubscribeHandler(unsubscribeOptions);
this.options.unsubscribeHandler(this, unsubscribeOptions,this.options.unsubscribeHandlerContext);
}
}

Expand Down Expand Up @@ -104,6 +113,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
event.retryable = retryable;
}

// TODO - rework tests to pass with deferred event emission
this.emit(protocol_adapter.ProtocolClientAdapter.SUBSCRIBE_COMPLETION, event);
}

Expand All @@ -118,9 +128,31 @@ export class MockProtocolAdapter extends BufferedEventEmitter {
event.retryable = retryable;
}

// TODO - rework tests to pass with deferred event emission
this.emit(protocol_adapter.ProtocolClientAdapter.UNSUBSCRIBE_COMPLETION, event);
}

completePublish(completionData: any, err?: ICrtError) : void {
let event : protocol_adapter.PublishCompletionEvent = {
completionData: completionData
};

if (err) {
event.err = err;
}

this.emit(protocol_adapter.ProtocolClientAdapter.PUBLISH_COMPLETION, event);
}

triggerIncomingPublish(topic: string, payload: ArrayBuffer) : void {
let event : protocol_adapter.IncomingPublishEvent = {
topic : topic,
payload: payload
};

this.emit(protocol_adapter.ProtocolClientAdapter.INCOMING_PUBLISH, event);
}

// Events
on(event: 'publishCompletion', listener: protocol_adapter.PublishCompletionEventListener): this;

Expand All @@ -130,6 +162,8 @@ export class MockProtocolAdapter extends BufferedEventEmitter {

on(event: 'connectionStatus', listener: protocol_adapter.ConnectionStatusEventListener): this;

on(event: 'incomingPublish', listener: IncomingPublishEventListener): this;

on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
Expand Down
17 changes: 17 additions & 0 deletions lib/browser/mqtt_request_response/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ export enum AcquireSubscriptionResult {
Failure,
}

export function acquireSubscriptionResultToString(result: AcquireSubscriptionResult) : string {
switch (result) {
case AcquireSubscriptionResult.Subscribed:
return "Subscribed";
case AcquireSubscriptionResult.Subscribing:
return "Subscribing";
case AcquireSubscriptionResult.Blocked:
return "Blocked";
case AcquireSubscriptionResult.NoCapacity:
return "NoCapacity";
case AcquireSubscriptionResult.Failure:
return "Failure";
default:
return "Unknown";
}
}

export interface SubscriptionManagerConfig {
maxRequestResponseSubscriptions: number,
maxStreamingSubscriptions: number,
Expand Down
Loading
Loading