Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate eventstream per namespace #140

Merged
merged 13 commits into from
Feb 28, 2024
Merged

Conversation

nguyer
Copy link
Contributor

@nguyer nguyer commented Nov 3, 2023

This PR is designed to be compatible with changes in FireFly Core in this PR hyperledger/firefly#1388

Overview of changes:

  • The token connector does not automatically create event streams on startup anymore
  • When a websocket client connects, it must now send a start command for a specific namespace like FireFly Core
  • When a namespace is started, the connector will create a websocket connection to the blockchain connector and set up or reuse the existing token event stream for that namespace
  • Multiple clients can listen on the same namespace
  • Websocket messages will be randomized across all listening clients for a namespace
  • When one client disconnects from a namespace, all in-flight messages for that namesake will be nack'd back to the blockchain connector
  • If all clients for a given namespace disconnect, the token connector will disconnect its websocket to the blockchain connector

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer nguyer mentioned this pull request Feb 20, 2024
src/main.ts Show resolved Hide resolved
@@ -42,7 +42,7 @@ export class TokensController {
@HttpCode(204)
@ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' })
init(@RequestContext() ctx: Context) {
return this.service.init(ctx);
// Do nothing. Endpoint retained for backwards compatibility with older tooling.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will something important (like the FireFly CLI or older FireFly core) crash if we just plain remove this endpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think FireFly CLI calls this, but that call could be removed in new versions

if (stream !== undefined) {
return stream;
}
private async getStream(ctx: Context, namespace: string) {
await this.migrationCheck(ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this migration check? I'm not sure how much value it's adding... might be worth evaluating if it can just be stripped out in this next release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm really not sure. I tried to not touch/break as much of the existing "migration" code as possible because I don't fully understand all the iterations it has gone through. But I'm happy to delete more code if it's safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was there to look for old event streams/subscriptions and give you a nudge in the logs - "hey looks like you may want to delete and clean up old streams".

Since we've now decided to forcefully delete old streams, it doesn't seem useful anymore. It's worth deciding if we need to look for and delete any of the older spellings, in addition to the current stream though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought some more about this, and I think we should definitely remove it.

I believe it should be part of the release notes - ie if you have an old token connector instance, check its startup logs to be sure there are no warnings logged at startup. If there are warnings logged about any event streams or subscriptions, you should address those warnings before upgrading. The warnings will not be logged after upgrading to this new version.

this.logger.error(`Error initializing event stream proxy: ${err}`);
});
}
client.on('message', async (message: string) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're now using a combination of Nest websocket message handlers (ie with @SubscribeMessage('ack')) and raw websocket message handlers (with on('message')). Maybe should choose one or the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I'm not sure how nest works 😆

this.currentClient.send(JSON.stringify(message));
}
}

@SubscribeMessage('ack')
handleAck(@MessageBody() data: AckMessageData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be separating things per websocket client? ie does the client that acks the message need to be the one that we think it was sent to, or is it ok for any connected client to ack any inflight message?

I left a note above about whether we want to use on('message') or SubscribeMessage() in general - if we do opt to keep this message handler, I'll just note you can get the client object with a @ConnectedSocket() client: Socket param.

this.currentClient = undefined;
// Iterate over all the namespaces this client was subscribed to
this.namespaceClients.forEach((clientSet, namespace) => {
clientSet.delete(client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be checking the return value to see if this set included this client? Seems like the current behavior will nack all messages on all namespaces, regardless of whether the disconnect client was actually subscribed to them.

Copy link
Contributor

@awrichar awrichar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall - I think there are just a few decisions to be made about spelling and handling of websocket events, and possibly some minor bugs there.

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@@ -58,6 +58,10 @@ export interface WebSocketStart extends WebSocketActionBase {
namespace: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably delete the prior WebSocketMessage type above if we're not using it anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still used for processing things that come from EVMConnect

if (!this.awaitingAck.get(client.id)) {
this.awaitingAck.set(client.id, []);
}

client.on('message', async (message: string) => {
const action = JSON.parse(message) as WebSocketActionBase;
switch (action.type) {
case 'start':
const startAction = action as WebSocketStart;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you create local vars in a case, you may want to wrap with {}

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer nguyer merged commit 2915844 into hyperledger:main Feb 28, 2024
4 checks passed
@nguyer nguyer deleted the eventstrams branch February 28, 2024 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants