Skip to content

Commit

Permalink
Support presence message extras
Browse files Browse the repository at this point in the history
Signed-off-by: Lewis Marshall <lewis.marshall@ably.com>
  • Loading branch information
lmars committed Aug 2, 2023
1 parent 53ad2ec commit eac58f4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 34 deletions.
4 changes: 4 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3208,6 +3208,10 @@ declare namespace Types {
* This will typically be empty as all presence messages received from Ably are automatically decoded client-side using this value. However, if the message encoding cannot be processed, this attribute will contain the remaining transformations not applied to the data payload.
*/
encoding: string;
/**
* A JSON object of arbitrary key-value pairs that may contain metadata, and/or ancillary payloads. Valid payloads include `headers`.
*/
extras: any;
/**
* A unique ID assigned to each `PresenceMessage` by Ably.
*/
Expand Down
78 changes: 44 additions & 34 deletions src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,41 +104,56 @@ class RealtimePresence extends Presence {
if (isAnonymousOrWildcard(this)) {
throw new ErrorInfo('clientId must be specified to enter a presence channel', 40012, 400);
}
return this._enterOrUpdateClient(undefined, undefined, data, 'enter', callback);
if (typeof data === 'function') {
callback = data as ErrCallback;
data = null;
}
return this.enterMessage(PresenceMessage.fromValues({ data }), callback);
}

update(data: unknown, callback: ErrCallback): void | Promise<void> {
if (isAnonymousOrWildcard(this)) {
throw new ErrorInfo('clientId must be specified to update presence data', 40012, 400);
}
return this._enterOrUpdateClient(undefined, undefined, data, 'update', callback);
if (typeof data === 'function') {
callback = data as ErrCallback;
data = null;
}
return this.updateMessage(PresenceMessage.fromValues({ data }), callback);
}

enterClient(clientId: string, data: unknown, callback: ErrCallback): void | Promise<void> {
return this._enterOrUpdateClient(undefined, clientId, data, 'enter', callback);
if (typeof data === 'function') {
callback = data as ErrCallback;
data = null;
}
return this.enterMessage(PresenceMessage.fromValues({ clientId, data }), callback);
}

updateClient(clientId: string, data: unknown, callback: ErrCallback): void | Promise<void> {
return this._enterOrUpdateClient(undefined, clientId, data, 'update', callback);
if (typeof data === 'function') {
callback = data as ErrCallback;
data = null;
}
return this.updateMessage(PresenceMessage.fromValues({ clientId, data }), callback);
}

_enterOrUpdateClient(
id: string | undefined,
clientId: string | undefined,
data: unknown,
action: string,
callback: ErrCallback
): void | Promise<void> {
enterMessage(msg: PresenceMessage, callback: ErrCallback): void | Promise<void> {
msg.action = 'enter';
return this._enterOrUpdateClient(msg, callback);
}

updateMessage(msg: PresenceMessage, callback: ErrCallback): void | Promise<void> {
msg.action = 'update';
return this._enterOrUpdateClient(msg, callback);
}

_enterOrUpdateClient(presence: PresenceMessage, callback: ErrCallback): void | Promise<void> {
if (!callback) {
if (typeof data === 'function') {
callback = data as ErrCallback;
data = null;
} else {
if (this.channel.realtime.options.promises) {
return Utils.promisify(this, '_enterOrUpdateClient', [id, clientId, data, action]);
}
callback = noop;
if (this.channel.realtime.options.promises) {
return Utils.promisify(this, '_enterOrUpdateClient', [presence]);
}
callback = noop;
}

const channel = this.channel;
Expand All @@ -149,21 +164,10 @@ class RealtimePresence extends Presence {

Logger.logAction(
Logger.LOG_MICRO,
'RealtimePresence.' + action + 'Client()',
'channel = ' + channel.name + ', id = ' + id + ', client = ' + (clientId || '(implicit) ' + getClientId(this))
'RealtimePresence.' + presence.action + 'Client()',
'channel = ' + channel.name + ', id = ' + presence.id + ', client = ' + (presence.clientId || '(implicit) ' + getClientId(this))
);

const presence = PresenceMessage.fromValues({
action: action,
data: data,
});
if (id) {
presence.id = id;
}
if (clientId) {
presence.clientId = clientId;
}

PresenceMessage.encode(presence, channel.channelOptions as CipherOptions, (err: IPartialErrorInfo) => {
if (err) {
callback(err);
Expand All @@ -185,7 +189,7 @@ class RealtimePresence extends Presence {
break;
default:
err = new PartialErrorInfo(
'Unable to ' + action + ' presence channel while in ' + channel.state + ' state',
'Unable to ' + presence.action + ' presence channel while in ' + channel.state + ' state',
90001
);
err.code = 90001;
Expand Down Expand Up @@ -491,7 +495,13 @@ class RealtimePresence extends Presence {
);
// RTP17g: Send ENTER containing the member id, clientId and data
// attributes.
this._enterOrUpdateClient(entry.id, entry.clientId, entry.data, 'enter', reenterCb);
const msg = PresenceMessage.fromValues({
id: entry.id,
action: 'enter',
clientId: entry.clientId,
data: entry.data,
});
this._enterOrUpdateClient(msg, reenterCb);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/lib/types/presencemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class PresenceMessage {
connectionId?: string;
data?: string | Buffer | Uint8Array;
encoding?: string;
extras?: any;
size?: number;

static Actions = ['absent', 'present', 'enter', 'leave', 'update'];
Expand Down Expand Up @@ -53,6 +54,7 @@ class PresenceMessage {
action: number;
data: string | Buffer | Uint8Array;
encoding?: string;
extras?: any;
} {
/* encode data to base64 if present and we're returning real JSON;
* although msgpack calls toJSON(), we know it is a stringify()
Expand All @@ -78,6 +80,7 @@ class PresenceMessage {
action: toActionValue(this.action as string),
data: data,
encoding: encoding,
extras: this.extras,
};
}

Expand All @@ -95,6 +98,9 @@ class PresenceMessage {
result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(this.data);
else result += '; data (json)=' + JSON.stringify(this.data);
}
if (this.extras) {
result += '; extras=' + JSON.stringify(this.extras);
}
result += ']';
return result;
}
Expand Down
33 changes: 33 additions & 0 deletions test/realtime/presence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
var createPM = Ably.Realtime.ProtocolMessage.fromDeserialized;
var closeAndFinish = helper.closeAndFinish;
var monitorConnection = helper.monitorConnection;
var PresenceMessage = Ably.Realtime.PresenceMessage;

function extractClientIds(presenceSet) {
return utils
Expand Down Expand Up @@ -370,6 +371,38 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
monitorConnection(done, clientRealtime);
});

/*
* Attach to channel, enter presence channel with extras and check received
* PresenceMessage has extras.
*/
it('presenceMessageExtras', function (done) {
var clientRealtime = helper.AblyRealtime({ clientId: testClientId, tokenDetails: authToken });
var channelName = 'presenceMessageExtras';
var clientChannel = clientRealtime.channels.get(channelName);
var presence = clientChannel.presence;
presence.subscribe(
function (presenceMessage) {
try {
expect(presenceMessage.extras).to.deep.equal({ headers: { key: 'value' } }, 'extras should have headers "key=value"');
} catch (err) {
closeAndFinish(done, clientRealtime, err);
return;
}
closeAndFinish(done, clientRealtime);
},
function onPresenceSubscribe(err) {
if (err) {
closeAndFinish(done, clientRealtime, err);
return;
}
clientChannel.presence.enterMessage(PresenceMessage.fromValues({
extras: { headers: { key: 'value' } },
}));
}
);
monitorConnection(done, clientRealtime);
});

/*
* Enter presence channel (without attaching), detach, then enter again to reattach
*/
Expand Down

0 comments on commit eac58f4

Please sign in to comment.