Skip to content

Commit

Permalink
fix(cli): correct data conversion for publishing messages with format
Browse files Browse the repository at this point in the history
  • Loading branch information
ysfscream authored and Red-Asuka committed Jun 14, 2024
1 parent 15dd0dd commit 6346d57
Showing 1 changed file with 41 additions and 22 deletions.
63 changes: 41 additions & 22 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,57 @@
import concat from 'concat-stream'
import * as Debug from 'debug'
import _ from 'lodash'
import * as mqtt from 'mqtt'
import { IClientOptions, IClientPublishOptions } from 'mqtt'
import pump from 'pump'
import concat from 'concat-stream'
import { Writable } from 'readable-stream'
import split2 from 'split2'
import { IClientOptions, IClientPublishOptions } from 'mqtt'
import logWrapper, { Signale, basicLog, benchLog, simulateLog, singaleConfig, signale } from '../utils/logWrapper'
import { parseConnectOptions, parsePublishOptions, checkTopicExists, checkScenarioExists } from '../utils/parse'
import convertPayload from '../utils/convertPayload'
import delay from '../utils/delay'
import { handleSaveOptions, handleLoadOptions } from '../utils/options'
import { loadSimulator } from '../utils/simulate'
import { fileDataSplitter, processPath, readFile } from '../utils/fileUtils'
import logWrapper, { basicLog, benchLog, Signale, signale, simulateLog, singaleConfig } from '../utils/logWrapper'
import { handleLoadOptions, handleSaveOptions } from '../utils/options'
import { checkScenarioExists, checkTopicExists, parseConnectOptions, parsePublishOptions } from '../utils/parse'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import { readFile, processPath, fileDataSplitter } from '../utils/fileUtils'
import convertPayload from '../utils/convertPayload'
import * as Debug from 'debug'
import _ from 'lodash'
import { loadSimulator } from '../utils/simulate'

/**
* Processes the outgoing message through two potential stages:
* 1. Format Conversion: If a format is specified, transform the message into that format.
* If no format is specified, the message retains its initial state.
* 2. Protobuf Serialization: If both protobuf path and message name are present,
* encapsulate the message into a protobuf format. If these settings are absent,
* the message remains unchanged.
* Flow:
* Input Message -> [Format Conversion] -> [Protobuf Serialization] -> Output Message
* @param {string | Buffer} message - The message to be processed.
* @param {string} [protobufPath] - The path to the protobuf definition.
* @param {string} [protobufMessageName] - The name of the protobuf message.
* @param {FormatType} [format] - The format to convert the message to.
* @returns {Buffer | string} - The processed message.
*/
const processPublishMessage = (
message: string | Buffer,
protobufPath?: string,
protobufMessageName?: string,
format?: FormatType,
): Buffer | string => {
/*
* Pipeline for processing outgoing messages in two potential stages:
* 1. Format Conversion --> Applied if a format is specified, transforming the message into that format; if absent, the message retains its initial state.
* 2. Protobuf Serialization --> Engaged if both protobuf path and message name are present, encapsulating the message into a protobuf format; without these settings, the message circulates unchanged.
*/
const pipeline = [
(msg: string | Buffer) => (format ? convertPayload(Buffer.from(msg.toString()), format, 'encode') : msg),
(msg: string | Buffer) =>
protobufPath && protobufMessageName
? serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
: msg,
]
const convertMessageFormat = (msg: string | Buffer): Buffer | string => {
if (!format) {
return msg
}
const bufferMsg = Buffer.isBuffer(msg) ? msg : Buffer.from(msg.toString())
return convertPayload(bufferMsg, format, 'encode')
}

const serializeProtobufMessage = (msg: string | Buffer): Buffer | string => {
if (protobufPath && protobufMessageName) {
return serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
}
return msg
}

const pipeline = [convertMessageFormat, serializeProtobufMessage]

return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer
}
Expand Down

0 comments on commit 6346d57

Please sign in to comment.