Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Systest 9779 #137

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ If you would like to extend the functionality of the existing PubSub client or c

Here is an overview of the functions provided by the PubSubClient class:

- `initialize()` - Initializes a WebSocket connection to the specified URL.
- `initialize()` - Initializes a WebSocket connection to the specified URL. By default, it points to 'ws://localhost:8080'.

- `publish(topic, message)` - Publishes a message to the specified topic.

Expand Down
50 changes: 34 additions & 16 deletions src/pubSubClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*
* SPDX-License-Identifier: Apache-2.0
*/

const logger = require('../src/utils/Logger')('pubSubClient.js');
require('dotenv').config({ override: true });

const defaultWsUrl = 'ws://your-ws-url-here.com';
const defaultWsUrl = 'ws://localhost:8080';

class PubSubClient {
constructor() {
constructor(url = defaultWsUrl) {
this.ws = null;
this.url = defaultWsUrl;
this.url = url;
this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX = '_FCS';
this.PUBSUB_PUBLISH_TOPIC_SUFFIX = '_FCA';
}
Expand Down Expand Up @@ -56,7 +55,7 @@ class PubSubClient {

// Establish WS Connection
this.ws = new WebSocket(this.url);
logger.info('Establishing a WS connection...', 'initialize');
logger.info(`Establishing a WS connection to ${this.url}...`, 'initialize');

return new Promise((resolve, reject) => {
this.ws.addEventListener('open', (event) => {
Expand All @@ -65,13 +64,13 @@ class PubSubClient {
});

this.ws.addEventListener('error', (event) => {
if (this.url === defaultWsUrl) {
logger.error('WARNING: WebSocket connections will fail to initialize. The file has not been properly configured. Please update the URL to point to your WebSocket server for communication to work.');
} else {
logger.error('Failed to initialize a WS connection...', 'initialize');
}
logger.error('Failed to initialize a WS connection...', event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this as "info". It will only technically be an error if someone wants to connect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

this.ws = null; // Ensure ws is null if connection fails
reject(false);
});
}).catch((error) => {
logger.error('Continuing without PubSub due to WS connection failure.');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above RE: "info"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

return false;
});
}

Expand All @@ -82,6 +81,11 @@ class PubSubClient {
return false;
}

if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot publish message.');
return false;
}

const publishMsg = {
operation: 'pub',
topic,
Expand All @@ -92,7 +96,7 @@ class PubSubClient {

// If headers are passed in, add them to the payload object
if (headers) {
payload.payload.headers = headers;
publishMsg.payload.headers = headers;
}

logger.info('Publishing message: ', JSON.stringify(publishMsg));
Expand All @@ -109,6 +113,11 @@ class PubSubClient {

// Subscribe to a topic
subscribe(topic, callback) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot subscribe to topic.');
return false;
}

const subscribeMsg = {
operation: 'sub',
topic,
Expand Down Expand Up @@ -146,6 +155,11 @@ class PubSubClient {

// Unsubscribe to a topic
unsubscribe(topic) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot unsubscribe from topic.');
return false;
}

const payload = {
operation: 'unsub',
topic,
Expand All @@ -164,17 +178,21 @@ class PubSubClient {
// Checks WebSocket connection status
isConnected() {
let status = false;
if (this.ws && this.ws.readyState == this.ws.OPEN) {
logger.info('WS connection already Established');
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
logger.info('WS connection already established');
status = true;
}
return status;
}
}

const getClient = async () => {
const pubSubClient = new PubSubClient();
await pubSubClient.initialize();
const getClient = async (url = defaultWsUrl) => {
const pubSubClient = new PubSubClient(url);
try {
await pubSubClient.initialize();
} catch (error) {
logger.error(error);
}
return pubSubClient;
};

Expand Down