From 751809084768a65603fa1a1e1262dd4c7ee8a6e7 Mon Sep 17 00:00:00 2001 From: YuShifan <894402575bt@gmail.com> Date: Fri, 14 Jun 2024 11:23:52 +0800 Subject: [PATCH] fix(cli): correct data conversion for publishing messages with format --- cli/src/lib/pub.ts | 63 ++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index bc5f39808..b63def111 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -1,38 +1,57 @@ +import concat from 'concat-stream' +import * as Debug from 'debug' +import _ from 'lodash' import * as mqtt from 'mqtt' +import { IClientOptions, IClientPublishOptions } from 'mqtt' import pump from 'pump' -import concat from 'concat-stream' import { Writable } from 'readable-stream' import split2 from 'split2' -import { IClientOptions, IClientPublishOptions } from 'mqtt' -import logWrapper, { Signale, basicLog, benchLog, simulateLog, singaleConfig, signale } from '../utils/logWrapper' -import { parseConnectOptions, parsePublishOptions, checkTopicExists, checkScenarioExists } from '../utils/parse' +import convertPayload from '../utils/convertPayload' import delay from '../utils/delay' -import { handleSaveOptions, handleLoadOptions } from '../utils/options' -import { loadSimulator } from '../utils/simulate' +import { fileDataSplitter, processPath, readFile } from '../utils/fileUtils' +import logWrapper, { basicLog, benchLog, Signale, signale, simulateLog, singaleConfig } from '../utils/logWrapper' +import { handleLoadOptions, handleSaveOptions } from '../utils/options' +import { checkScenarioExists, checkTopicExists, parseConnectOptions, parsePublishOptions } from '../utils/parse' import { serializeProtobufToBuffer } from '../utils/protobuf' -import { readFile, processPath, fileDataSplitter } from '../utils/fileUtils' -import convertPayload from '../utils/convertPayload' -import * as Debug from 'debug' -import _ from 'lodash' +import { loadSimulator } from '../utils/simulate' +/** + * Processes the outgoing message through two potential stages: + * 1. Format Conversion: If a format is specified, transform the message into that format. + * If no format is specified, the message retains its initial state. + * 2. Protobuf Serialization: If both protobuf path and message name are present, + * encapsulate the message into a protobuf format. If these settings are absent, + * the message remains unchanged. + * Flow: + * Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message + * @param {string | Buffer} message - The message to be processed. + * @param {string} [protobufPath] - The path to the protobuf definition. + * @param {string} [protobufMessageName] - The name of the protobuf message. + * @param {FormatType} [format] - The format to convert the message to. + * @returns {Buffer | string} - The processed message. + */ const processPublishMessage = ( message: string | Buffer, protobufPath?: string, protobufMessageName?: string, format?: FormatType, ): Buffer | string => { - /* - * Pipeline for processing outgoing messages in two potential stages: - * 1. Format Conversion --> Applied if a format is specified, transforming the message into that format; if absent, the message retains its initial state. - * 2. Protobuf Serialization --> Engaged if both protobuf path and message name are present, encapsulating the message into a protobuf format; without these settings, the message circulates unchanged. - */ - const pipeline = [ - (msg: string | Buffer) => (format ? convertPayload(Buffer.from(msg.toString()), format, 'encode') : msg), - (msg: string | Buffer) => - protobufPath && protobufMessageName - ? serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName) - : msg, - ] + const convertMessageFormat = (msg: string | Buffer): Buffer | string => { + if (!format) { + return msg + } + const bufferMsg = Buffer.isBuffer(msg) ? msg : Buffer.from(msg.toString()) + return convertPayload(bufferMsg, format, 'encode') + } + + const serializeProtobufMessage = (msg: string | Buffer): Buffer | string => { + if (protobufPath && protobufMessageName) { + return serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName) + } + return msg + } + + const pipeline = [convertMessageFormat, serializeProtobufMessage] return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer }