Skip to content

Commit

Permalink
Generalized more tests to be common (#92)
Browse files Browse the repository at this point in the history
* Made jest play nice with multiple platforms/import paths

* Fixed long-standing TS bug between versions of mqtt/async-mqtt

* show output if tsc fails

* Fixed how jest is invoked to run tests/install puppeteer

* Added missing classes/args to match the HTTP APIs between browser and native

* es5 browser target, fixes to make that work

* Added polyfill for TextEncoder for browser

Co-authored-by: Michael Graeb <graebm@amazon.com>
  • Loading branch information
Justin Boswell and graebm authored Jun 4, 2020
1 parent cefb1cb commit 18dbbd3
Show file tree
Hide file tree
Showing 19 changed files with 2,607 additions and 3,055 deletions.
53 changes: 38 additions & 15 deletions lib/browser/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import { HttpHeader, HttpHeaders as CommonHttpHeaders, HttpProxyOptions, HttpProxyAuthenticationType } from '../common/http';
export { HttpHeader, HttpProxyOptions, HttpProxyAuthenticationType } from '../common/http';
import { BufferedEventEmitter } from '../common/event';
import { InputStream } from './io';
import { CrtError } from './error';
import * as axios from 'axios';
import axios = require('axios');
import { ClientBootstrap, InputStream, SocketOptions, TlsConnectionOptions } from '@awscrt/io';

require('./polyfills')

/**
* A collection of HTTP headers
Expand Down Expand Up @@ -173,15 +175,15 @@ export class HttpRequest {
export class HttpClientConnection extends BufferedEventEmitter {
readonly axios: any;
constructor(
protected boostrap: ClientBootstrap,
host_name: string,
port: number,
scheme?: string,
protected socket_options: SocketOptions,
protected tls_opts?: TlsConnectionOptions,
proxy_options?: HttpProxyOptions,
) {
super();
if (!scheme) {
scheme = (port == 443) ? 'https' : 'http'
}
let scheme = (tls_opts) ? 'https' : 'http'
let axios_options: axios.AxiosRequestConfig = {
baseURL: `${scheme}://${host_name}:${port}/`
};
Expand Down Expand Up @@ -236,6 +238,10 @@ export class HttpClientConnection extends BufferedEventEmitter {
_on_end(stream: HttpClientStream) {
this.emit('close');
}

close() {
this.emit('close');
}
}

function stream_request(connection: HttpClientConnection, request: HttpRequest) {
Expand Down Expand Up @@ -287,6 +293,18 @@ export class HttpClientStream extends BufferedEventEmitter {
return this.response_status_code;
}

/**
* Begin sending the request.
*
* The stream does nothing until this is called. Call activate() when you
* are ready for its callbacks and events to fire.
*/
activate() {
setTimeout(() => {
this.uncork();
}, 0);
}

/**
* Emitted when the header block arrives from the server.
*/
Expand All @@ -308,13 +326,7 @@ export class HttpClientStream extends BufferedEventEmitter {
on(event: 'end', listener: () => void): this;

on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
if (event == 'ready' || event == 'response') {
setTimeout(() => {
this.uncork();
}, 0);
}
return this;
return super.on(event, listener);
}

// Private helpers for stream_request()
Expand Down Expand Up @@ -373,9 +385,14 @@ export class HttpClientConnectionManager {
private pending_requests: PendingRequest[] = [];

constructor(
readonly bootstrap: ClientBootstrap,
readonly host: string,
readonly port: number,
readonly max_connections: number
readonly max_connections: number,
readonly initial_window_size: number,
readonly socket_options: SocketOptions,
readonly tls_opts?: TlsConnectionOptions,
readonly proxy_options?: HttpProxyOptions
) {

}
Expand Down Expand Up @@ -423,7 +440,13 @@ export class HttpClientConnectionManager {
}

// There's room, create a new connection
let connection = new HttpClientConnection(this.host, this.port);
let connection = new HttpClientConnection(
this.bootstrap,
this.host,
this.port,
this.socket_options,
this.tls_opts,
this.proxy_options);
this.pending_connections.add(connection);
const on_connect = () => {
this.pending_connections.delete(connection);
Expand Down
55 changes: 55 additions & 0 deletions lib/browser/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,61 @@ export class InputStream {
}
}

/**
* Represents resources required to bootstrap a client connection, provided as
* a stub for the browser API
*
* @module aws-crt
* @category I/O
*/
export class ClientBootstrap { };

/**
* Options for creating a {@link ClientTlsContext}. Provided as a stub for
* browser API.
*
* @module aws-crt
* @category TLS
*/
export type TlsContextOptions = any;

/**
* TLS options that are unique to a given connection using a shared TlsContext.
* Provided as a stub for browser API.
*
* @module aws-crt
* @category TLS
*/
export class TlsConnectionOptions {
constructor(readonly tls_ctx: TlsContext, readonly server_name?: string, readonly alpn_list: string[] = []) {

}
};

/**
* TLS context used for TLS communications over sockets. Provided as a
* stub for the browser API
*
* @module aws-crt
* @category TLS
*/
export abstract class TlsContext {

};

/**
* TLS context used for client TLS communications over sockets. Provided as a
* stub for the browser API
*
* @module aws-crt
* @category TLS
*/
export class ClientTlsContext extends TlsContext {
constructor(options?: TlsContextOptions) {
super();
}
};

/**
* Standard Berkeley socket style options.
*
Expand Down
78 changes: 37 additions & 41 deletions lib/browser/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
* permissions and limitations under the License.
*/

import { MqttClient as MqttClientInternal, IClientOptions, ISubscriptionGrant, IUnsubackPacket, IPublishPacket } from "mqtt";
import { AsyncClient } from "async-mqtt";
import { AsyncClient, IClientOptions, ISubscriptionGrant, IUnsubackPacket, IPublishPacket, IConnackPacket } from "async-mqtt";
import { MqttClient as _MqttClient } from "mqtt";
import * as WebsocketUtils from "./ws";
import * as trie from "./trie";

Expand Down Expand Up @@ -154,7 +154,7 @@ class TopicTrie extends trie.Trie<SubscriptionCallback | undefined> {
* @param payload The payload to convert
* @internal
*/
function normalize_payload(payload: Payload) {
function normalize_payload(payload: Payload): string {
let payload_data: string = payload.toString();
if (payload instanceof DataView) {
payload_data = new TextDecoder('utf8').decode(payload as DataView);
Expand Down Expand Up @@ -185,10 +185,19 @@ export class MqttClientConnection extends BufferedEventEmitter {
private config: MqttConnectionConfig) {
super();

const create_websocket_stream = (client: MqttClientInternal) => WebsocketUtils.create_websocket_stream(this.config);
const transform_websocket_url = (url: string, options: IClientOptions, client: MqttClientInternal) => WebsocketUtils.create_websocket_url(this.config);
const create_websocket_stream = (client: _MqttClient) => WebsocketUtils.create_websocket_stream(this.config);
const transform_websocket_url = (url: string, options: IClientOptions, client: _MqttClient) => WebsocketUtils.create_websocket_url(this.config);

this.connection = new AsyncClient(new MqttClientInternal(
const will = this.config.will ? {
topic: this.config.will.topic,
payload: normalize_payload(this.config.will.payload),
qos: this.config.will.qos,
retain: this.config.will.retain,
} : undefined;

const websocketXform = (config.websocket || {}).protocol != 'wss-custom-auth' ? transform_websocket_url : undefined;

this.connection = new AsyncClient(new _MqttClient(
create_websocket_stream,
{
// service default is 1200 seconds
Expand All @@ -199,15 +208,16 @@ export class MqttClientConnection extends BufferedEventEmitter {
username: this.config.username,
password: this.config.password,
reconnectPeriod: 0,
will: this.config.will ? {
topic: this.config.will.topic,
payload: normalize_payload(this.config.will.payload),
qos: this.config.will.qos,
retain: this.config.will.retain,
} : undefined,
transformWsUrl: (config.websocket || {}).protocol != 'wss-custom-auth' ? transform_websocket_url : undefined
will: will,
transformWsUrl: websocketXform,
}
));

this.connection.on('connect', this.on_connect);
this.connection.on('error', this.on_error);
this.connection.on('message', this.on_message);
this.connection.on('offline', this.on_offline);
this.connection.on('end', this.on_disconnected);
}

/** Emitted when the connection is ready and is about to start sending response data */
Expand Down Expand Up @@ -250,6 +260,10 @@ export class MqttClientConnection extends BufferedEventEmitter {
return this;
}

private on_connect = (connack: IConnackPacket) => {
this.on_online(connack.sessionPresent);
}

private on_online = (session_present: boolean) => {
if (++this.connection_count == 1) {
this.emit('connect', session_present);
Expand All @@ -266,6 +280,10 @@ export class MqttClientConnection extends BufferedEventEmitter {
this.emit('disconnect');
}

private on_error = (error: Error) => {
this.emit('error', new CrtError(error))
}

private on_message = (topic: string, payload: Buffer, packet: any) => {
const callback = this.subscriptions.find(topic);
if (callback) {
Expand All @@ -274,14 +292,6 @@ export class MqttClientConnection extends BufferedEventEmitter {
this.emit('message', topic, payload);
}

private _reject(reject: (reason: any) => void) {
return (reason: any) => {
reject(reason);
this.emit('error', new CrtError(reason));
}
}


/**
* Open the actual connection to the server (async).
* @returns A Promise which completes whether the connection succeeds or fails.
Expand All @@ -291,26 +301,12 @@ export class MqttClientConnection extends BufferedEventEmitter {
*/
async connect() {
return new Promise<boolean>((resolve, reject) => {
reject = this._reject(reject);

try {
this.connection.on('connect',
(connack: { sessionPresent: boolean, rc: number }) => {
resolve(connack.sessionPresent);
this.on_online(connack.sessionPresent);
}
);
this.connection.on('error',
(error: string) => {
reject(`Failed to connect: error=${error}`);
}
);
this.connection.on('message', this.on_message);
this.connection.on('offline', this.on_offline);
this.connection.on('end', this.on_disconnected);
} catch (e) {
reject(e);
}
this.connection.once('connect', (connack: IConnackPacket) => {
resolve(connack.sessionPresent);
});
this.connection.once('error', (error: Error) => {
reject(new CrtError(error));
});
});
}

Expand Down
77 changes: 77 additions & 0 deletions lib/browser/polyfills.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

/** This file contains polyfills for possibly missing browser features */

// Taken from https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder
if (typeof TextEncoder === "undefined") {
TextEncoder = function TextEncoder() { };
TextEncoder.prototype.encode = function encode(str) {
"use strict";
var Len = str.length, resPos = -1;
// The Uint8Array's length must be at least 3x the length of the string because an invalid UTF-16
// takes up the equivelent space of 3 UTF-8 characters to encode it properly. However, Array's
// have an auto expanding length and 1.5x should be just the right balance for most uses.
var resArr = typeof Uint8Array === "undefined" ? new Array(Len * 1.5) : new Uint8Array(Len * 3);
for (var point = 0, nextcode = 0, i = 0; i !== Len;) {
point = str.charCodeAt(i), i += 1;
if (point >= 0xD800 && point <= 0xDBFF) {
if (i === Len) {
resArr[resPos += 1] = 0xef/*0b11101111*/; resArr[resPos += 1] = 0xbf/*0b10111111*/;
resArr[resPos += 1] = 0xbd/*0b10111101*/; break;
}
// https://mathiasbynens.be/notes/javascript-encoding#surrogate-formulae
nextcode = str.charCodeAt(i);
if (nextcode >= 0xDC00 && nextcode <= 0xDFFF) {
point = (point - 0xD800) * 0x400 + nextcode - 0xDC00 + 0x10000;
i += 1;
if (point > 0xffff) {
resArr[resPos += 1] = (0x1e/*0b11110*/ << 3) | (point >>> 18);
resArr[resPos += 1] = (0x2/*0b10*/ << 6) | ((point >>> 12) & 0x3f/*0b00111111*/);
resArr[resPos += 1] = (0x2/*0b10*/ << 6) | ((point >>> 6) & 0x3f/*0b00111111*/);
resArr[resPos += 1] = (0x2/*0b10*/ << 6) | (point & 0x3f/*0b00111111*/);
continue;
}
} else {
resArr[resPos += 1] = 0xef/*0b11101111*/; resArr[resPos += 1] = 0xbf/*0b10111111*/;
resArr[resPos += 1] = 0xbd/*0b10111101*/; continue;
}
}
if (point <= 0x007f) {
resArr[resPos += 1] = (0x0/*0b0*/ << 7) | point;
} else if (point <= 0x07ff) {
resArr[resPos += 1] = (0x6/*0b110*/ << 5) | (point >>> 6);
resArr[resPos += 1] = (0x2/*0b10*/ << 6) | (point & 0x3f/*0b00111111*/);
} else {
resArr[resPos += 1] = (0xe/*0b1110*/ << 4) | (point >>> 12);
resArr[resPos += 1] = (0x2/*0b10*/ << 6) | ((point >>> 6) & 0x3f/*0b00111111*/);
resArr[resPos += 1] = (0x2/*0b10*/ << 6) | (point & 0x3f/*0b00111111*/);
}
}
if (typeof Uint8Array !== "undefined") return resArr.subarray(0, resPos + 1);
// else // IE 6-9
resArr.length = resPos + 1; // trim off extra weight
return resArr;
};
TextEncoder.prototype.toString = function () { return "[object TextEncoder]" };
try { // Object.defineProperty only works on DOM prototypes in IE8
Object.defineProperty(TextEncoder.prototype, "encoding", {
get: function () {
if (TextEncoder.prototype.isPrototypeOf(this)) return "utf-8";
else throw TypeError("Illegal invocation");
}
});
} catch (e) { /*IE6-8 fallback*/ TextEncoder.prototype.encoding = "utf-8"; }
if (typeof Symbol !== "undefined") TextEncoder.prototype[Symbol.toStringTag] = "TextEncoder";
}
1 change: 0 additions & 1 deletion lib/browser/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,3 @@ export function create_websocket_stream(config: MqttConnectionConfig) {
const url = create_websocket_url(config);
return WebsocketStream(url, ['mqttv3.1'], config.websocket);
}

Loading

0 comments on commit 18dbbd3

Please sign in to comment.