Skip to content

Commit

Permalink
copying changes from 9779 branch (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbigel authored Jun 21, 2024
1 parent b865f47 commit fb9ad36
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 17 deletions.
2 changes: 1 addition & 1 deletion plugins/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ If you would like to extend the functionality of the existing PubSub client or c

Here is an overview of the functions provided by the PubSubClient class:

- `initialize()` - Initializes a WebSocket connection to the specified URL.
- `initialize()` - Initializes a WebSocket connection to the specified URL. By default, it points to 'ws://localhost:8080'.

- `publish(topic, message)` - Publishes a message to the specified topic.

Expand Down
9 changes: 9 additions & 0 deletions src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ export default class App extends Base {
process.env.REPORTINGID = reportingId;
process.env.STANDALONE = standalone;
process.env.STANDALONE_PREFIX = standalonePrefix;

// Set the pubSub URL if present
process.env.PUB_SUB_URL = new URLSearchParams(window.location.search).get('pubSubUrl');

if (platform) {
process.env.PLATFORM = platform;
} else {
Expand Down Expand Up @@ -410,6 +414,11 @@ export default class App extends Base {
logger.error('No Mac Address Found in Parameter Initialization response...', 'getParameterInitializationValues');
}

// Set the pubSub URL if present
if (query.params.pubSubUrl) {
process.env.PUB_SUB_URL = query.params.pubSubUrl;
}

if (query.task) {
setTimeout(() => {
const intentReader = new IntentReader();
Expand Down
49 changes: 33 additions & 16 deletions src/pubSubClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
*
* SPDX-License-Identifier: Apache-2.0
*/

const logger = require('../src/utils/Logger')('pubSubClient.js');
require('dotenv').config({ override: true });

const defaultWsUrl = 'ws://your-ws-url-here.com';

class PubSubClient {
constructor() {
this.ws = null;
this.url = defaultWsUrl;
this.url = process.env.PUB_SUB_URL ? process.env.PUB_SUB_URL : 'ws://localhost:8080';
this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX = '_FCS';
this.PUBSUB_PUBLISH_TOPIC_SUFFIX = '_FCA';
}
Expand All @@ -35,6 +32,7 @@ class PubSubClient {
const appUrl = window.location;
const pubSubTopicUUID = new URLSearchParams(appUrl.search).get('pubsub_uuid');
const macAddress = process.env.MACADDRESS;
const appId = process.env.CURRENT_APPID;

// Priority #1: Use pubSubTopicUUID if it's available
if (pubSubTopicUUID) {
Expand All @@ -51,12 +49,12 @@ class PubSubClient {
console.warn(`WARNING: No pubsub_uuid query parameter or MAC address found. Using default value: ${pubSubTopic}`);
}

process.env.PUBSUB_SUBSCRIBE_TOPIC = pubSubTopic + this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX;
process.env.PUBSUB_PUBLISH_TOPIC = pubSubTopic + this.PUBSUB_PUBLISH_TOPIC_SUFFIX;
process.env.PUBSUB_SUBSCRIBE_TOPIC = pubSubTopic + '_' + appId + this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX;
process.env.PUBSUB_PUBLISH_TOPIC = pubSubTopic + '_' + appId + this.PUBSUB_PUBLISH_TOPIC_SUFFIX;

// Establish WS Connection
this.ws = new WebSocket(this.url);
logger.info('Establishing a WS connection...', 'initialize');
logger.info(`Establishing a WS connection to ${this.url}...`, 'initialize');

return new Promise((resolve, reject) => {
this.ws.addEventListener('open', (event) => {
Expand All @@ -65,13 +63,13 @@ class PubSubClient {
});

this.ws.addEventListener('error', (event) => {
if (this.url === defaultWsUrl) {
logger.error('WARNING: WebSocket connections will fail to initialize. The file has not been properly configured. Please update the URL to point to your WebSocket server for communication to work.');
} else {
logger.error('Failed to initialize a WS connection...', 'initialize');
}
logger.info('Failed to initialize a WS connection...', event);
this.ws = null; // Ensure ws is null if connection fails
reject(false);
});
}).catch((error) => {
logger.info('Continuing without PubSub due to WS connection failure.');
return false;
});
}

Expand All @@ -82,6 +80,11 @@ class PubSubClient {
return false;
}

if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot publish message.');
return false;
}

const publishMsg = {
operation: 'pub',
topic,
Expand All @@ -92,7 +95,7 @@ class PubSubClient {

// If headers are passed in, add them to the payload object
if (headers) {
payload.payload.headers = headers;
publishMsg.payload.headers = headers;
}

logger.info('Publishing message: ', JSON.stringify(publishMsg));
Expand All @@ -109,6 +112,11 @@ class PubSubClient {

// Subscribe to a topic
subscribe(topic, callback) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot subscribe to topic.');
return false;
}

const subscribeMsg = {
operation: 'sub',
topic,
Expand Down Expand Up @@ -146,6 +154,11 @@ class PubSubClient {

// Unsubscribe to a topic
unsubscribe(topic) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot unsubscribe from topic.');
return false;
}

const payload = {
operation: 'unsub',
topic,
Expand All @@ -164,8 +177,8 @@ class PubSubClient {
// Checks WebSocket connection status
isConnected() {
let status = false;
if (this.ws && this.ws.readyState == this.ws.OPEN) {
logger.info('WS connection already Established');
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
logger.info('WS connection already established');
status = true;
}
return status;
Expand All @@ -174,7 +187,11 @@ class PubSubClient {

const getClient = async () => {
const pubSubClient = new PubSubClient();
await pubSubClient.initialize();
try {
await pubSubClient.initialize();
} catch (error) {
logger.error(error);
}
return pubSubClient;
};

Expand Down

0 comments on commit fb9ad36

Please sign in to comment.