diff --git a/packages/node/test/integration/createMessagingPluginTest.ts b/packages/node/test/integration/createMessagingPluginTest.ts index 2ac514304f..0378e0726a 100644 --- a/packages/node/test/integration/createMessagingPluginTest.ts +++ b/packages/node/test/integration/createMessagingPluginTest.ts @@ -1,10 +1,10 @@ -import { Wallet } from 'ethers' import { MessageMetadata, Stream, StreamrClient } from '@streamr/sdk' import { fetchPrivateKeyWithGas, Queue } from '@streamr/test-utils' +import { merge, wait } from '@streamr/utils' +import { Wallet } from 'ethers' import { Broker } from '../../src/broker' -import { Message } from '../../src/helpers/PayloadFormat' -import { createClient, startBroker, createTestStream } from '../utils' -import { wait, merge } from '@streamr/utils' +import { Message, MetadataPayloadFormat } from '../../src/helpers/PayloadFormat' +import { createClient, createTestStream, startBroker } from '../utils' interface MessagingPluginApi { createClient: (action: 'publish' | 'subscribe', streamId: string, apiKey?: string) => Promise @@ -21,6 +21,7 @@ interface Ports { plugin: number } +export const PAYLOAD_FORMAT = new MetadataPayloadFormat() const MOCK_MESSAGE = { content: { foo: 'bar' diff --git a/packages/node/test/integration/plugins/mqtt/MqttPlugin.test.ts b/packages/node/test/integration/plugins/mqtt/MqttPlugin.test.ts index 373d14df18..44d267f38c 100644 --- a/packages/node/test/integration/plugins/mqtt/MqttPlugin.test.ts +++ b/packages/node/test/integration/plugins/mqtt/MqttPlugin.test.ts @@ -1,7 +1,7 @@ -import mqtt, { AsyncMqttClient } from 'async-mqtt' import { Queue } from '@streamr/test-utils' +import mqtt, { AsyncMqttClient } from 'async-mqtt' import { Message } from '../../../../src/helpers/PayloadFormat' -import { createMessagingPluginTest } from '../../createMessagingPluginTest' +import { createMessagingPluginTest, PAYLOAD_FORMAT } from '../../createMessagingPluginTest' const MQTT_PORT = 12430 @@ -20,12 +20,12 @@ createMessagingPluginTest('mqtt', await client.end(true) }, publish: async (msg: Message, streamId: string, client: AsyncMqttClient): Promise => { - await client.publish(streamId, JSON.stringify(msg)) + await client.publish(streamId, PAYLOAD_FORMAT.createPayload(msg.content, msg.metadata)) }, subscribe: async (messageQueue: Queue, streamId: string, client: AsyncMqttClient): Promise => { client.once('message', (topic: string, message: Buffer) => { if (topic === streamId) { - messageQueue.push(JSON.parse(message.toString())) + messageQueue.push(PAYLOAD_FORMAT.createMessage(message.toString())) } }) await client.subscribe(streamId) diff --git a/packages/node/test/integration/plugins/websocket/WebsocketPlugin.test.ts b/packages/node/test/integration/plugins/websocket/WebsocketPlugin.test.ts index e25e847b9d..59dc2cfb04 100644 --- a/packages/node/test/integration/plugins/websocket/WebsocketPlugin.test.ts +++ b/packages/node/test/integration/plugins/websocket/WebsocketPlugin.test.ts @@ -1,8 +1,8 @@ -import WebSocket from 'ws' import { Queue } from '@streamr/test-utils' import { waitForEvent } from '@streamr/utils' +import WebSocket from 'ws' import { Message } from '../../../../src/helpers/PayloadFormat' -import { createMessagingPluginTest } from '../../createMessagingPluginTest' +import { createMessagingPluginTest, PAYLOAD_FORMAT } from '../../createMessagingPluginTest' jest.setTimeout(30000) @@ -28,12 +28,12 @@ createMessagingPluginTest('websocket', client.close() }, publish: async (msg: Message, _streamId: string, client: WebSocket): Promise => { - client.send(JSON.stringify(msg)) + client.send(PAYLOAD_FORMAT.createPayload(msg.content, msg.metadata)) }, subscribe: async (messageQueue: Queue, _streamId: string, client: WebSocket): Promise => { client.on('message', (data: WebSocket.RawData) => { const payload = data.toString() - messageQueue.push(JSON.parse(payload)) + messageQueue.push(PAYLOAD_FORMAT.createMessage(payload)) }) }, errors: {