Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: janus POC #346

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"bugs:": "https://github.com/team-telnyx/webrtc/issues",
"license": "MIT",
"dependencies": {
"eventemitter3": "^5.0.1",
"loglevel": "^1.6.8",
"uuid": "^7.0.3"
},
Expand Down
69 changes: 69 additions & 0 deletions packages/js/src/Modules/Janus/Connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Establishes a connection to the Janus server
* through a WebSocket connection.
*/
import EventEmitter from 'eventemitter3';
import { DEV_HOST } from './util/constants';

export default class JanusConnection extends EventEmitter {
public static PROTOCOL = 'janus-protocol';
private _socket: WebSocket | null = null;

public connect = () => {
// TODO - use PROD_HOST in production.
this._socket = new WebSocket(DEV_HOST, JanusConnection.PROTOCOL);
this._socket.addEventListener('open', this._onStateChange);
this._socket.addEventListener('close', this._onStateChange);
this._socket.addEventListener('message', this._onMessage);
this._socket.addEventListener('error', this._onError);
};

private _onError = (error: Event) => {
this.emit('error', error);
};
private _onMessage = (ev: MessageEvent) => {
this.emit('message', ev.data);
};

private _onStateChange = () => {
this.emit('stateChange', this._socket.readyState);
};

get connected(): boolean {
return this._socket && this._socket.readyState === WebSocket.OPEN;
}

get connecting(): boolean {
return this._socket && this._socket.readyState === WebSocket.CONNECTING;
}

get closing(): boolean {
return this._socket && this._socket.readyState === WebSocket.CLOSING;
}

get closed(): boolean {
return this._socket && this._socket.readyState === WebSocket.CLOSED;
}

get isAlive(): boolean {
return this.connecting || this.connected;
}

get isDead(): boolean {
return this.closing || this.closed;
}

public sendRaw(data: string | ArrayBuffer | Blob | ArrayBufferView): void {
{
this._socket.send(data);
}
}

public close() {
this._socket.close();
this._socket.removeEventListener('open', this._onStateChange);
this._socket.removeEventListener('message', this._onMessage);
this._socket.removeEventListener('close', this._onStateChange);
this._socket.removeEventListener('error', this._onError);
}
}
141 changes: 141 additions & 0 deletions packages/js/src/Modules/Janus/Handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
export const isFunction = (variable: any): boolean =>
variable instanceof Function || typeof variable === 'function';

type QueueMap = { [key: string]: Function[] };

const GLOBAL = 'GLOBAL';
const queue: QueueMap = {};
const _buildEventName = (event: string, uniqueId: string) =>
`${event}|${uniqueId}`;

const isQueued = (event: string, uniqueId: string = GLOBAL) => {
const eventName = _buildEventName(event, uniqueId);
return eventName in queue;
};

const queueLength = (event: string, uniqueId: string = GLOBAL): number => {
const eventName = _buildEventName(event, uniqueId);
return eventName in queue ? queue[eventName].length : 0;
};

/**
* Subscribes the callback to the passed event. Use uniqueId to render unique the event.
*/
const register = (
event: string,
callback: Function,
uniqueId: string = GLOBAL
) => {
const eventName = _buildEventName(event, uniqueId);
if (!(eventName in queue)) {
queue[eventName] = [];
}
queue[eventName].push(callback);
};

/**
* Subscribes the callback to the passed event only once. Use uniqueId to render unique the event.
*/
const registerOnce = (
event: string,
callback: Function,
uniqueId: string = GLOBAL
) => {
/* tslint:disable-next-line */
const cb = function (data) {
deRegister(event, cb, uniqueId);
callback(data);
};
cb.prototype.targetRef = callback;
return register(event, cb, uniqueId);
};

/**
* Remove subscription by callback. If not callback is passed in, all subscription will be removed.
*/
const deRegister = (
event: string,
callback?: Function | null,
uniqueId: string = GLOBAL
) => {
if (!isQueued(event, uniqueId)) {
return false;
}
const eventName = _buildEventName(event, uniqueId);
if (isFunction(callback)) {
const len = queue[eventName].length;
for (let i = len - 1; i >= 0; i--) {
const fn = queue[eventName][i];
if (
callback === fn ||
(fn.prototype && callback === fn.prototype.targetRef)
) {
queue[eventName].splice(i, 1);
}
}
} else {
queue[eventName] = [];
}
if (queue[eventName].length === 0) {
// Cleanup
delete queue[eventName];
}
return true;
};

/**
* Trigger the event, passing the data to it's subscribers. Use uniqueId to identify unique events.
*/
const trigger = (
event: string,
data: any,
uniqueId: string = GLOBAL,
globalPropagation: boolean = true
): boolean => {
const _propagate: boolean = globalPropagation && uniqueId !== GLOBAL;
if (!isQueued(event, uniqueId)) {
if (_propagate) {
trigger(event, data);
}
return false;
}
const eventName = _buildEventName(event, uniqueId);
const len = queue[eventName].length;
if (!len) {
if (_propagate) {
trigger(event, data);
}
return false;
}
for (let i = len - 1; i >= 0; i--) {
queue[eventName][i](data);
}
if (_propagate) {
trigger(event, data);
}
return true;
};

/**
* Remove all subscriptions
*/
const deRegisterAll = (event: string) => {
const eventName = _buildEventName(event, '');
Object.keys(queue)
.filter((name) => name.indexOf(eventName) === 0)
.forEach((event) => delete queue[event]);
};

const clearQueue = () =>
Object.keys(queue).forEach((event) => delete queue[event]);

export {
trigger,
register,
registerOnce,
deRegister,
deRegisterAll,
isQueued,
queueLength,
clearQueue,
};
6 changes: 6 additions & 0 deletions packages/js/src/Modules/Janus/KeepAliveAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import JanusConnection from "./Connection";

export class KeepAliveAgent {
private connection: JanusConnection
// TODO - Implement Keep Alive Agent
}
55 changes: 55 additions & 0 deletions packages/js/src/Modules/Janus/Request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
export enum Janus {
create = 'create',
success = 'success',
error = 'error',
attach = 'attach',
message = 'message',
event = 'event',
ack = 'ack'
}

type JanusCreateSessionRequest = {
janus: Janus.create;
transaction?: string;
};

type JanusAttachPluginRequest = {
janus: Janus.attach;
plugin: string;
session_id: number;
transaction?: string;
};

type JanusSIPRegisterRequest = {
janus: Janus.message;
body: {
request: 'register';
type?: string; //"<if guest or helper, no SIP REGISTER is actually sent; optional>",
send_register?: boolean; // <true|false; if false, no SIP REGISTER is actually sent; optional>,
force_udp?: boolean; // <true|false; if true, forces UDP for the SIP messaging; optional>,
force_tcp?: boolean; // <true|false; if true, forces TCP for the SIP messaging; optional>,
sips?: boolean; // <true|false; if true, configures a SIPS URI too when registering; optional>,
rfc2543_cancel?: boolean; // <true|false; if true, configures sip client to CANCEL pending INVITEs without having received a provisional response first; optional>,
username: string; // "<SIP URI to register; mandatory>",
secret?: string; // "<password to use to register; optional>",
ha1_secret?: string; // "<prehashed password to use to register; optional>",
authuser?: string; // "<username to use to authenticate (overrides the one in the SIP URI); optional>",
display_name?: string; // "<display name to use when sending SIP REGISTER; optional>",
user_agent?: string; //"<user agent to use when sending SIP REGISTER; optional>",
proxy?: string; //"<server to register at; optional, as won't be needed in case the REGISTER is not goint to be sent (e.g., guests)>";
outbound_proxy?: string; // '<outbound proxy to use, if any; optional>';
headers?: Record<string, string>; // '<object with key/value mappings (header name/value), to specify custom headers to add to the SIP REGISTER; optional>';
contact_params?: Record<string, string>[]; // "<array of key/value objects, to specify custom Contact URI params to add to the SIP REGISTER; optional>",
incoming_header_prefixes?: string[]; // "<array of strings, to specify custom (non-standard) headers to read on incoming SIP events; optional>",
refresh?: boolean; // "<true|false; if true, only uses the SIP REGISTER as an update and not a new registration; optional>",
master_id?: number; //"<ID of an already registered account, if this is an helper for multiple calls (more on that later); optional>",
register_ttl?: number; //: "<integer; number of seconds after which the registration should expire; optional>"
};
transaction?: string;
handle_id: number;
session_id: number;
};
export type JanusRequest =
| JanusCreateSessionRequest
| JanusAttachPluginRequest
| JanusSIPRegisterRequest;
Loading
Loading