Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate eventstream per namespace #140

Merged
merged 13 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .env
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
PORT=3000
ETHCONNECT_URL=http://127.0.0.1:5102
ETHCONNECT_TOPIC=token
FACTORY_CONTRACT_ADDRESS=
AUTO_INIT=true
ETHCONNECT_TOPIC=tokens_0_0
FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e"
USE_LEGACY_ERC20_SAMPLE=false
USE_LEGACY_ERC721_SAMPLE=false
2 changes: 1 addition & 1 deletion .github/workflows/docker_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Set build tag
id: build_tag_generator
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/docker_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
with:
fetch-depth: 0

Expand All @@ -30,7 +30,7 @@ jobs:
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly-tokens-erc20-erc721:${GITHUB_REF##*/}

- name: Push head tag
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
Expand Down
16 changes: 8 additions & 8 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Use Node.js
uses: actions/setup-node@v2
uses: actions/setup-node@v3
with:
node-version: '16.x'
node-version: '20.9.0'
- run: npm ci
- run: npm run test
- run: npm run test:e2e
solidity-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Use Node.js
uses: actions/setup-node@v2
uses: actions/setup-node@v3
with:
node-version: '16.x'
node-version: '20.9.0'
- run: npm ci
working-directory: ./samples/solidity
- run: npm run compile
Expand All @@ -33,6 +33,6 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Docker build
run: docker build --tag ghcr.io/hyperledger/firefly-tokens-erc20-erc721 .
run: docker build --tag ghcr.io/hyperledger/firefly-tokens-erc20-erc721 .
33 changes: 33 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run Tests",
"runtimeExecutable": "npm",
"args": ["run", "test"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"name": "Run E2E Tests",
"runtimeExecutable": "npm",
"args": ["run", "test:e2e"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${file}",
"preLaunchTask": "tsc: build - tsconfig.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
]
}
6 changes: 2 additions & 4 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
{
"solidity.compileUsingRemoteVersion": "v0.6.12+commit.27d51765",
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
},
"eslint.validate": ["javascript"],
"solidity.defaultCompiler": "remote",
"cSpell.words": [
"fftm"
]
"cSpell.words": ["eventstream", "fftm"]
}
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM node:16-alpine3.15 as build
FROM node:20-alpine3.17 as build
USER node
WORKDIR /home/node
ADD --chown=node:node package*.json ./
RUN npm install
ADD --chown=node:node . .
RUN npm run build

FROM node:16-alpine3.15 as solidity-build
FROM node:20-alpine3.17 as solidity-build
RUN apk add python3 alpine-sdk
USER node
WORKDIR /home/node
Expand All @@ -15,7 +15,7 @@ RUN npm install
ADD --chown=node:node ./samples/solidity .
RUN npx hardhat compile

FROM node:16-alpine3.15
FROM node:20-alpine3.17
RUN apk add curl jq
RUN mkdir -p /app/contracts/source \
&& chgrp -R 0 /app/ \
Expand All @@ -31,6 +31,8 @@ COPY --from=solidity-build --chown=1001:0 /home/node/contracts /home/node/packag
RUN npm install --production
WORKDIR /app/contracts
COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json ./
# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI
COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json /home/node/contracts/
WORKDIR /app
COPY --from=build --chown=1001:0 /home/node/dist ./dist
COPY --from=build --chown=1001:0 /home/node/package.json /home/node/package-lock.json ./
Expand Down
2 changes: 1 addition & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
39 changes: 32 additions & 7 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -20,9 +20,9 @@ import { AxiosRequestConfig } from 'axios';
import { lastValueFrom } from 'rxjs';
import WebSocket from 'ws';
import { FFRequestIDHeader } from '../request-context/constants';
import { Context } from '../request-context/request-context.decorator';
import { Context, newContext } from '../request-context/request-context.decorator';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { getHttpRequestOptions, getWebsocketOptions } from '../utils';
import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils';
import {
Event,
EventBatch,
Expand All @@ -46,6 +46,7 @@ export class EventStreamSocket {
constructor(
private url: string,
private topic: string,
private namespace: string,
private username: string,
private password: string,
private handleEvents: (events: EventBatch) => void,
Expand All @@ -67,7 +68,7 @@ export class EventStreamSocket {
} else {
this.logger.log('Event stream websocket connected');
}
this.produce({ type: 'listen', topic: this.topic });
this.produce({ type: 'listen', topic: eventStreamName(this.topic, this.namespace) });
this.produce({ type: 'listenreplies' });
this.ping();
})
Expand All @@ -83,6 +84,7 @@ export class EventStreamSocket {
}
})
.on('message', (message: string) => {
this.logger.verbose(`WS => ${message}`);
this.handleMessage(JSON.parse(message));
})
.on('pong', () => {
Expand All @@ -109,7 +111,11 @@ export class EventStreamSocket {
}

ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: this.topic, batchNumber });
this.produce({ type: 'ack', topic: eventStreamName(this.topic, this.namespace), batchNumber });
}

nack(batchNumber: number | undefined) {
this.produce({ type: 'nack', topic: eventStreamName(this.topic, this.namespace), batchNumber });
}

close() {
Expand Down Expand Up @@ -193,13 +199,27 @@ export class EventStreamService {
batchSize: 50,
batchTimeoutMS: 500,
type: 'websocket',
websocket: { topic },
websocket: { topic: name },
blockedReryDelaySec: 30, // intentional due to spelling error in ethconnect
inputs: true,
timestamps: true,
};

const existingStreams = await this.getStreams(ctx);

// Check to see if there is a deprecated stream that we should remove
this.logger.debug(`Checking for deprecated event steam with topic '${topic}'`);
const deprecatedStream = existingStreams.find(s => s.name === topic);
if (deprecatedStream) {
this.logger.log(`Purging deprecated eventstream '${deprecatedStream.id}'`);
await lastValueFrom(
this.http.delete(
new URL(`/eventstreams/${deprecatedStream.id}`, this.baseUrl).href,
this.requestOptions(ctx),
),
);
}

const stream = existingStreams.find(s => s.name === streamDetails.name);
if (stream) {
const patchedStreamRes = await lastValueFrom(
Expand Down Expand Up @@ -331,15 +351,20 @@ export class EventStreamService {
return true;
}

connect(
async connect(
url: string,
topic: string,
namespace: string,
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
const name = eventStreamName(topic, namespace);
await this.createOrUpdateStream(newContext(), name, topic);

return new EventStreamSocket(
url,
topic,
namespace,
this.username,
this.password,
handleEvents,
Expand Down
Loading
Loading