Skip to content

Commit

Permalink
test(node): Use PayloadFormat in messaging plugin tests (#2803)
Browse files Browse the repository at this point in the history
Use `PayloadFormat` helper class to do the `mqtt` / `websocket` payload
object transformation. Previously we did equal transformation by calling
`JSON.parse` / `JSON.stringify` but the new approach is better as it
encapsulates the transformation details.

This is a cherry-pick from
#2774.
  • Loading branch information
teogeb authored Oct 14, 2024
1 parent 9325a50 commit 4f3b7c8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
9 changes: 5 additions & 4 deletions packages/node/test/integration/createMessagingPluginTest.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Wallet } from 'ethers'
import { MessageMetadata, Stream, StreamrClient } from '@streamr/sdk'
import { fetchPrivateKeyWithGas, Queue } from '@streamr/test-utils'
import { merge, wait } from '@streamr/utils'
import { Wallet } from 'ethers'
import { Broker } from '../../src/broker'
import { Message } from '../../src/helpers/PayloadFormat'
import { createClient, startBroker, createTestStream } from '../utils'
import { wait, merge } from '@streamr/utils'
import { Message, MetadataPayloadFormat } from '../../src/helpers/PayloadFormat'
import { createClient, createTestStream, startBroker } from '../utils'

interface MessagingPluginApi<T> {
createClient: (action: 'publish' | 'subscribe', streamId: string, apiKey?: string) => Promise<T>
Expand All @@ -21,6 +21,7 @@ interface Ports {
plugin: number
}

export const PAYLOAD_FORMAT = new MetadataPayloadFormat()
const MOCK_MESSAGE = {
content: {
foo: 'bar'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import mqtt, { AsyncMqttClient } from 'async-mqtt'
import { Queue } from '@streamr/test-utils'
import mqtt, { AsyncMqttClient } from 'async-mqtt'
import { Message } from '../../../../src/helpers/PayloadFormat'
import { createMessagingPluginTest } from '../../createMessagingPluginTest'
import { createMessagingPluginTest, PAYLOAD_FORMAT } from '../../createMessagingPluginTest'

const MQTT_PORT = 12430

Expand All @@ -20,12 +20,12 @@ createMessagingPluginTest('mqtt',
await client.end(true)
},
publish: async (msg: Message, streamId: string, client: AsyncMqttClient): Promise<void> => {
await client.publish(streamId, JSON.stringify(msg))
await client.publish(streamId, PAYLOAD_FORMAT.createPayload(msg.content, msg.metadata))
},
subscribe: async (messageQueue: Queue<Message>, streamId: string, client: AsyncMqttClient): Promise<void> => {
client.once('message', (topic: string, message: Buffer) => {
if (topic === streamId) {
messageQueue.push(JSON.parse(message.toString()))
messageQueue.push(PAYLOAD_FORMAT.createMessage(message.toString()))
}
})
await client.subscribe(streamId)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import WebSocket from 'ws'
import { Queue } from '@streamr/test-utils'
import { waitForEvent } from '@streamr/utils'
import WebSocket from 'ws'
import { Message } from '../../../../src/helpers/PayloadFormat'
import { createMessagingPluginTest } from '../../createMessagingPluginTest'
import { createMessagingPluginTest, PAYLOAD_FORMAT } from '../../createMessagingPluginTest'

jest.setTimeout(30000)

Expand All @@ -28,12 +28,12 @@ createMessagingPluginTest('websocket',
client.close()
},
publish: async (msg: Message, _streamId: string, client: WebSocket): Promise<void> => {
client.send(JSON.stringify(msg))
client.send(PAYLOAD_FORMAT.createPayload(msg.content, msg.metadata))
},
subscribe: async (messageQueue: Queue<Message>, _streamId: string, client: WebSocket): Promise<void> => {
client.on('message', (data: WebSocket.RawData) => {
const payload = data.toString()
messageQueue.push(JSON.parse(payload))
messageQueue.push(PAYLOAD_FORMAT.createMessage(payload))
})
},
errors: {
Expand Down

0 comments on commit 4f3b7c8

Please sign in to comment.