Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): correct data conversion for publishing messages with format #1695

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 41 additions & 22 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,57 @@
import concat from 'concat-stream'
import * as Debug from 'debug'
import _ from 'lodash'
import * as mqtt from 'mqtt'
import { IClientOptions, IClientPublishOptions } from 'mqtt'
import pump from 'pump'
import concat from 'concat-stream'
import { Writable } from 'readable-stream'
import split2 from 'split2'
import { IClientOptions, IClientPublishOptions } from 'mqtt'
import logWrapper, { Signale, basicLog, benchLog, simulateLog, singaleConfig, signale } from '../utils/logWrapper'
import { parseConnectOptions, parsePublishOptions, checkTopicExists, checkScenarioExists } from '../utils/parse'
import convertPayload from '../utils/convertPayload'
import delay from '../utils/delay'
import { handleSaveOptions, handleLoadOptions } from '../utils/options'
import { loadSimulator } from '../utils/simulate'
import { fileDataSplitter, processPath, readFile } from '../utils/fileUtils'
import logWrapper, { basicLog, benchLog, Signale, signale, simulateLog, singaleConfig } from '../utils/logWrapper'
import { handleLoadOptions, handleSaveOptions } from '../utils/options'
import { checkScenarioExists, checkTopicExists, parseConnectOptions, parsePublishOptions } from '../utils/parse'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import { readFile, processPath, fileDataSplitter } from '../utils/fileUtils'
import convertPayload from '../utils/convertPayload'
import * as Debug from 'debug'
import _ from 'lodash'
import { loadSimulator } from '../utils/simulate'

/**
* Processes the outgoing message through two potential stages:
* 1. Format Conversion: If a format is specified, transform the message into that format.
* If no format is specified, the message retains its initial state.
* 2. Protobuf Serialization: If both protobuf path and message name are present,
* encapsulate the message into a protobuf format. If these settings are absent,
* the message remains unchanged.
* Flow:
* Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message
* @param {string | Buffer} message - The message to be processed.
* @param {string} [protobufPath] - The path to the protobuf definition.
* @param {string} [protobufMessageName] - The name of the protobuf message.
* @param {FormatType} [format] - The format to convert the message to.
* @returns {Buffer | string} - The processed message.
*/
const processPublishMessage = (
message: string | Buffer,
protobufPath?: string,
protobufMessageName?: string,
format?: FormatType,
): Buffer | string => {
/*
* Pipeline for processing outgoing messages in two potential stages:
* 1. Format Conversion --> Applied if a format is specified, transforming the message into that format; if absent, the message retains its initial state.
* 2. Protobuf Serialization --> Engaged if both protobuf path and message name are present, encapsulating the message into a protobuf format; without these settings, the message circulates unchanged.
*/
const pipeline = [
(msg: string | Buffer) => (format ? convertPayload(Buffer.from(msg.toString()), format, 'encode') : msg),
(msg: string | Buffer) =>
protobufPath && protobufMessageName
? serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
: msg,
]
const convertMessageFormat = (msg: string | Buffer): Buffer | string => {
if (!format) {
return msg
}
const bufferMsg = Buffer.isBuffer(msg) ? msg : Buffer.from(msg.toString())
return convertPayload(bufferMsg, format, 'encode')
}

const serializeProtobufMessage = (msg: string | Buffer): Buffer | string => {
if (protobufPath && protobufMessageName) {
return serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
}
return msg
}

const pipeline = [convertMessageFormat, serializeProtobufMessage]

return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer
}
Expand Down
Loading