-
Notifications
You must be signed in to change notification settings - Fork 285
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(plugin-keychain-memory): add observability via RxJS ReplaySubjects
1. This is an example of how to add observability to a plugin such as if you had to somehow expose the stream of transaction execution requests flowing through a connector plugin but did not feel like setting up Kafka or RabbitMQ just for this and instead opted to do it with an in-process, purely NodeJS/Javascript based solution. 2. The downside of this is of course that this doesn't work well in a distributed computing environment just by itself, since if you were to host a fleet of servers running the same connector plugin with horizontal scaling, then this wouldn't be able to observe all the invocations across the server fleet, but it would still make it easier to implement a functionality like that. 3. The main purpose of this pull request is educational. The keychain memory plugin is only used for testing and demonstration purposes and I wanted to show to a few other contributors what I meant when I was explaining that they could just use RxJS subjects to allow consumers of the connector plugins to observe the stream of transactions flowing through said connector plugin instance. Signed-off-by: Peter Somogyvari <peter.somogyvari@accenture.com>
- Loading branch information
Showing
4 changed files
with
158 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
...gin-keychain-memory/src/test/typescript/unit/plugin-keychain-memory-observability.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import "jest-extended"; | ||
import { v4 as uuidV4 } from "uuid"; | ||
|
||
import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; | ||
|
||
import { PluginKeychainMemory } from "../../../main/typescript/public-api"; | ||
|
||
const logLevel: LogLevelDesc = "INFO"; | ||
|
||
describe("PluginKeychainMemory", () => { | ||
const log = LoggerProvider.getOrCreate({ | ||
label: "plugin-keychain-memory-observability.test.ts", | ||
level: logLevel, | ||
}); | ||
|
||
test("can observe set operations", async () => { | ||
const keychain = new PluginKeychainMemory({ | ||
instanceId: uuidV4(), | ||
keychainId: uuidV4(), | ||
logLevel, | ||
}); | ||
|
||
let getCount = 0; | ||
const stratedAt = new Date(); | ||
|
||
const taskPromise = new Promise<void>((resolve) => { | ||
keychain.observeSet().subscribe({ | ||
next: (value) => { | ||
getCount++; | ||
log.debug("NEXT_SET: startedAt=%o value=%o", stratedAt, value); | ||
if (getCount >= 5) { | ||
resolve(); | ||
} | ||
}, | ||
}); | ||
|
||
keychain.set("some-key-that-does-not-matter-1", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-2", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-3", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-4", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-5", uuidV4()); | ||
}); | ||
await expect(taskPromise).toResolve(); | ||
}, 500); | ||
|
||
test("can observe set operations with buffer", async () => { | ||
const keychain = new PluginKeychainMemory({ | ||
instanceId: uuidV4(), | ||
keychainId: uuidV4(), | ||
logLevel, | ||
observabilityBufferSize: 5, | ||
observabilityTtlSeconds: 1000, | ||
}); | ||
|
||
let getCount = 0; | ||
const stratedAt = new Date(); | ||
|
||
keychain.set("some-key-that-does-not-matter-1", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-2", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-3", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-4", uuidV4()); | ||
keychain.set("some-key-that-does-not-matter-5", uuidV4()); | ||
|
||
const taskPromise = new Promise<void>((resolve) => { | ||
keychain.observeSet().subscribe({ | ||
next: (value) => { | ||
getCount++; | ||
log.debug("NEXT_SET_1: startedAt=%o value=%o", stratedAt, value); | ||
}, | ||
}); | ||
keychain.observeSet().subscribe({ | ||
next: (value) => { | ||
getCount++; | ||
log.debug("NEXT_SET_2: startedAt=%o value=%o", stratedAt, value); | ||
if (getCount >= 10) { | ||
resolve(); | ||
} | ||
}, | ||
}); | ||
}); | ||
|
||
await expect(taskPromise).toResolve(); | ||
}, 500); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
9b41377
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold
0.05
.plugin-ledger-connector-besu_HTTP_GET_getOpenApiSpecV1
725
ops/sec (±3.57%
)779
ops/sec (±2.52%
)1.07
This comment was automatically generated by workflow using github-action-benchmark.
CC: @petermetz