Skip to content

Commit

Permalink
feat(ethereum-connector): support block monitoring with http only con…
Browse files Browse the repository at this point in the history
…nection

- Update web3.js to 4.4 (I was hoping to use requested by me `safeDisconnect`
    method, but it seem to be still unavailable in this version)
- Support monitoring without websocket ethereum node access by using
    http polling method. Method is chosed automatically during runtime, based on
    supplied ethereum node URL.
- Refactored `WatchBlocksV1Endpoint` into base class and child classes that
    implement monitoring strategies. `WatchBlocksV1Endpoint` is now instantiated
    with factory method. This will allow easier addition of
    new strategies in the future.
- Add monitoring tests to ethereum connector pacakge, remove overlaping tests
    from verifier-integration-with-ethereum-connector.test.

Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH committed Jan 19, 2024
1 parent 3291dcc commit fe98302
Show file tree
Hide file tree
Showing 14 changed files with 969 additions and 490 deletions.
43 changes: 17 additions & 26 deletions packages/cactus-plugin-ledger-connector-ethereum/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,36 +181,27 @@ await apiClient.runTransactionV1({
});
```
### Offline signing utils
- Use `signTransaction` from this package to sign transactions payload locally (outside of connector process).
- Offline signed transaction can be send with `Web3SigningCredentialType.None` signing credetnial type in runTransactionV1 endpoint.
### watchBlocksV1
- ApiClient can be used to monitor for new blocks from the ledger with `watchBlocksV1` method.
- When etherum node supports subscription (e.g. websocket protocol is used), then blocks connector will subscribe to new block header event (recommended).
- If ethereum node supports HTTP access only, then polling method will be used.
#### Example
``` typescript
// Offline sign transaction
const { serializedTransactionHex } = signTransaction(
{
to: anotherAccount,
value: 10e6,
maxPriorityFeePerGas: 0,
maxFeePerGas: 0x40000000,
gasLimit: 21000,
type: 2
},
myPrivateKey,
{
networkId: 10,
chainId: 10,
defaultHardfork: "london",
},
);
const watchObservable = apiClient.watchBlocksV1({
getBlockData, // true - return transactions, false - return header only (default)
lastSeenBlock, // connector will push all blocks since lastSeenBlock (default - latest)
httpPollInterval // how often to poll the node (only for http-polling method)
});

// Send transaction payload to connector
await apiClient.runTransactionV1({
web3SigningCredential: {
type: Web3SigningCredentialType.None,
const subscription = watchObservable.subscribe({
next(event) {
// process block data
},
transactionConfig: {
rawTransaction: serializedTransactionHex,
error(err) {
// handle error
subscription.unsubscribe();
},
});
```
Expand Down
8 changes: 4 additions & 4 deletions packages/cactus-plugin-ledger-connector-ethereum/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
"sanitize-html": "2.7.0",
"socket.io-client-fixed-types": "4.5.4",
"typescript-optional": "2.0.1",
"web3": "4.1.2",
"web3-eth": "4.2.0",
"web3-eth-contract": "4.1.0"
"web3": "4.4.0",
"web3-eth": "4.4.0",
"web3-eth-contract": "4.2.0"
},
"devDependencies": {
"@hyperledger/cactus-plugin-keychain-memory": "2.0.0-alpha.2",
Expand All @@ -99,7 +99,7 @@
"js-yaml": "4.1.0",
"socket.io": "4.5.4",
"uuid": "9.0.1",
"web3-eth-accounts": "4.0.6"
"web3-eth-accounts": "4.1.1"
},
"engines": {
"node": ">=18",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,19 @@
"type": "object",
"properties": {
"getBlockData": {
"type": "boolean"
"type": "boolean",
"description": "Include entire block data if flag is true, otherwise just a header is returned (default)",
"default": "false"
},
"lastSeenBlock": {
"type": "number",
"description": "Block from which we want to start the monitoring process.",
"default": "latest"
},
"httpPollInterval": {
"type": "number",
"description": "How often to poll ethereum node for new blocks. Not used if the node supports subscription based monitoring (i.e. WebSocket).",
"default": "5 seconds"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,11 +756,23 @@ export type WatchBlocksV1BlockDataTimestamp = number | string;
*/
export interface WatchBlocksV1Options {
/**
*
* Include entire block data if flag is true, otherwise just a header is returned (default)
* @type {boolean}
* @memberof WatchBlocksV1Options
*/
'getBlockData'?: boolean;
/**
* Block from which we want to start the monitoring process.
* @type {number}
* @memberof WatchBlocksV1Options
*/
'lastSeenBlock'?: number;
/**
* How often to poll ethereum node for new blocks. Not used if the node supports subscription based monitoring (i.e. WebSocket).
* @type {number}
* @memberof WatchBlocksV1Options
*/
'httpPollInterval'?: number;
}
/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import Web3, {
TransactionReceiptBase,
WebSocketProvider,
} from "web3";
import { NewHeadsSubscription } from "web3-eth";
import { PayableMethodObject } from "web3-eth-contract";

import OAS from "../json/openapi.json";
Expand Down Expand Up @@ -63,7 +62,10 @@ import {

import { RunTransactionEndpoint } from "./web-services/run-transaction-v1-endpoint";
import { InvokeContractEndpoint } from "./web-services/invoke-contract-v1-endpoint";
import { WatchBlocksV1Endpoint } from "./web-services/watch-blocks-v1-endpoint";
import {
createWatchBlocksV1Endpoint,
WatchBlocksV1Endpoint,
} from "./web-services/watch-blocks-v1-endpoint";
import { GetPrometheusExporterMetricsEndpointV1 } from "./web-services/get-prometheus-exporter-metrics-v1-endpoint";
import { InvokeRawWeb3EthMethodEndpoint } from "./web-services/invoke-raw-web3eth-method-v1-endpoint";
import { InvokeRawWeb3EthContractEndpoint } from "./web-services/invoke-raw-web3eth-contract-v1-endpoint";
Expand Down Expand Up @@ -140,11 +142,10 @@ export class PluginLedgerConnectorEthereum
public prometheusExporter: PrometheusExporter;
private readonly instanceId: string;
private readonly log: Logger;
private readonly web3: Web3;
private readonly web3WatchBlock?: Web3;
private readonly web3: InstanceType<typeof Web3>;
private endpoints: IWebServiceEndpoint[] | undefined;
public static readonly CLASS_NAME = "PluginLedgerConnectorEthereum";
private watchBlocksSubscriptions: Map<string, NewHeadsSubscription> =
private watchBlocksSubscriptions: Map<string, WatchBlocksV1Endpoint> =
new Map();

public get className(): string {
Expand Down Expand Up @@ -196,9 +197,6 @@ export class PluginLedgerConnectorEthereum
const label = this.className;
this.log = LoggerProvider.getOrCreate({ level, label });
this.web3 = new Web3(this.createWeb3Provider());
if (this.options.rpcApiWsHost) {
this.web3WatchBlock = new Web3(this.createWeb3WsProvider());
}

this.instanceId = options.instanceId;
this.pluginRegistry = options.pluginRegistry as PluginRegistry;
Expand Down Expand Up @@ -292,10 +290,7 @@ export class PluginLedgerConnectorEthereum
}

await this.closeWeb3jsConnection(
this.web3.currentProvider as WebSocketProvider,
);
await this.closeWeb3jsConnection(
this.web3WatchBlock?.currentProvider as WebSocketProvider,
this.web3.currentProvider as unknown as WebSocketProvider,
);
}

Expand All @@ -311,35 +306,31 @@ export class PluginLedgerConnectorEthereum
const webServices = await this.getOrCreateWebServices();
await Promise.all(webServices.map((ws) => ws.registerExpress(app)));

if (this.web3WatchBlock) {
this.log.debug(`WebSocketProvider created for socketio endpoints`);
wsApi.on("connection", (socket: SocketIoSocket) => {
this.log.info(`New Socket connected. ID=${socket.id}`);

socket.on(WatchBlocksV1.Subscribe, (options?: WatchBlocksV1Options) => {
new WatchBlocksV1Endpoint({
web3: this.web3WatchBlock as typeof this.web3WatchBlock,
socket,
logLevel,
options,
}).subscribe();
});
});
} else {
this.log.info(
`WebSocketProvider was NOT created for socketio endpoints! Socket.IO will not be handled!`,
wsApi.on("connection", (socket: SocketIoSocket) => {
this.log.info(`New socket connection id ${socket.id}`);

// WatchBlocksV1Endpoint
socket.on(
WatchBlocksV1.Subscribe,
async (options?: WatchBlocksV1Options) => {
try {
const endpoint = createWatchBlocksV1Endpoint({
web3: this.web3,
socket,
logLevel,
options,
});
this.watchBlocksSubscriptions.set(
socket.id,
await endpoint.subscribe(),
);
this.log.debug(`${endpoint.className} created for ${socket.id}`);
} catch (error) {
this.log.error("Error when creating WatchBlocksV1Endpoint:", error);
}
},
);
wsApi.on("connection", (socket: SocketIoSocket) => {
this.log.info(
"Socket connected but no async endpoint is supported - disconnecting...",
);
socket.emit(
WatchBlocksV1.Error,
"Missing rpcApiWsHost - can't listen for new blocks on HTTP provider",
);
socket.disconnect();
});
}
});

// Register JSON-RPC proxy to pass requests directly to ethereum node
if (this.options.rpcApiHttpHost) {
Expand All @@ -355,7 +346,7 @@ export class PluginLedgerConnectorEthereum
[".*"]: "",
},
onProxyReq: fixRequestBody,
logLevel: "error",
logLevel: "warn",
}),
);
this.log.info(`Registered proxy from ${proxyUrl} to ${targetUrl}`);
Expand Down
Loading

0 comments on commit fe98302

Please sign in to comment.