From bc5ceb497569371d394b38c3026df9c4659e2bfa Mon Sep 17 00:00:00 2001 From: Khuda Dad Nomani <32505158+KhudaDad414@users.noreply.github.com> Date: Mon, 11 Dec 2023 16:03:15 +0000 Subject: [PATCH] fix: fix some bugs regarding request/reply (#637) --- src/index.ts | 21 ++++++++++--------- src/lib/functions.ts | 32 ++++++++++------------------- src/lib/util.ts | 9 +++++++- src/middlewares/existsInAsyncAPI.ts | 18 +++++++++++----- src/middlewares/validate.ts | 6 +++--- 5 files changed, 46 insertions(+), 40 deletions(-) diff --git a/src/index.ts b/src/index.ts index 349e8c550..21e071443 100755 --- a/src/index.ts +++ b/src/index.ts @@ -32,6 +32,7 @@ import { getParsedAsyncAPI } from './lib/asyncapiFile.js' import { getSelectedServerNames } from './lib/servers.js' import { EnrichedEvent, AuthEvent } from './lib/adapter.js' import { ClusterEvent } from './lib/cluster.js' +import { getMessagesSchema } from './lib/util.js' dotenvExpand(dotenv.config()) @@ -99,11 +100,14 @@ export default async function GleeAppInitializer() { await generateDocs(parsedAsyncAPI, config, null) parsedAsyncAPI.operations().filterByReceive().forEach(operation => { const channel = operation.channels()[0] // operation can have only one channel. - const messagesSchemas = operation.messages().filterByReceive().map(m => m.payload().json()).filter(schema => !!schema) - const schema = { - oneOf: messagesSchemas - } as any - if (messagesSchemas.length > 0) app.use(channel.id(), validate(schema)) + const replyChannel = operation.reply()?.channel() + if (replyChannel) { + const replyMessagesSchemas = getMessagesSchema(operation.reply()) + if (replyMessagesSchemas.oneOf.length > 0) app.useOutbound(replyChannel.id(), validate(replyMessagesSchemas)) + app.useOutbound(replyChannel.id(), json2string) + } + const schema = getMessagesSchema(operation) + if (schema.oneOf.length > 0) app.use(channel.id(), validate(schema)) app.use(channel.id(), (event, next) => { triggerFunction({ app, @@ -115,11 +119,8 @@ export default async function GleeAppInitializer() { parsedAsyncAPI.operations().filterBySend().forEach(operation => { const channel = operation.channels()[0] // operation can have only one channel. - const messagesSchemas = operation.messages().filterBySend().map(m => m.payload().json()).filter(schema => !!schema) - const schema = { - oneOf: messagesSchemas - } as any - if (messagesSchemas.length > 0) app.useOutbound(channel.id(), validate(schema)) + const schema = getMessagesSchema(operation) + if (schema.oneOf.length > 0) app.useOutbound(channel.id(), validate(schema)) app.useOutbound(channel.id(), json2string) }) diff --git a/src/lib/functions.ts b/src/lib/functions.ts index 44e3caad0..1c89776bc 100644 --- a/src/lib/functions.ts +++ b/src/lib/functions.ts @@ -170,19 +170,17 @@ export async function trigger({ }) functionResult?.reply?.forEach((reply) => { - const replyMessages = createReplies(reply, message, parsedAsyncAPI) - const hasReplyMessages = replyMessages && replyMessages.length > 0 - if (!hasReplyMessages) { + const replyMessage = createReplies(reply, message, parsedAsyncAPI) + if (!replyMessage) { return } - replyMessages.forEach(replyMessage => { - const replyChannel = parsedAsyncAPI.channels().get(replyMessage.channel) - replyChannel.servers().forEach((server) => { - replyMessage.serverName = server.id() - app.send( - replyMessage - ) - }) + + const replyChannel = parsedAsyncAPI.channels().get(replyMessage.channel) + replyChannel.servers().forEach((server) => { + replyMessage.serverName = server.id() + app.send( + replyMessage + ) }) }) @@ -201,7 +199,7 @@ export async function trigger({ } } -function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMessage, parsedAsyncAPI: AsyncAPIDocumentInterface): GleeMessage[] { +function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMessage, parsedAsyncAPI: AsyncAPIDocumentInterface): GleeMessage { const operation = message.operation const reply = operation.reply() if (!reply) { @@ -224,13 +222,5 @@ function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMess replyChannel = channel } - const sendOperations = replyChannel.operations().filterBySend() - - if (!sendOperations || sendOperations.length === 0) { - const warningMsg = `No 'send' operations defined for channel '${replyChannel.id()}'. Ensure your AsyncAPI file defines a 'send' operation for this channel to enable message replies. As a result, no reply will be sent for the channel.` - logWarningMessage(warningMsg) - return [] - } - - return sendOperations.map(operation => new GleeMessage({ ...functionReply, channel: replyChannel.id(), request: message, operation, connection: message.connection })) + return new GleeMessage({ ...functionReply, channel: replyChannel.id(), request: message, operation, connection: message.connection }) } \ No newline at end of file diff --git a/src/lib/util.ts b/src/lib/util.ts index a1fdd0eb2..74af6beaf 100644 --- a/src/lib/util.ts +++ b/src/lib/util.ts @@ -1,4 +1,4 @@ -import { AsyncAPIDocumentInterface as AsyncAPIDocument, ChannelInterface, ChannelParameterInterface } from '@asyncapi/parser' +import { AsyncAPIDocumentInterface as AsyncAPIDocument, ChannelInterface, ChannelParameterInterface, MessagesInterface } from '@asyncapi/parser' import Ajv from 'ajv' import betterAjvErrors from 'better-ajv-errors' import { pathToRegexp } from 'path-to-regexp' @@ -237,3 +237,10 @@ function getParamFromLocation(location: string, message: GleeMessage) { return extractExpressionValueFromMessage(message, location) } } + +export function getMessagesSchema(operation: { messages: () => MessagesInterface }) { + const messagesSchemas = operation.messages().all().map(m => m.payload().json()).filter(schema => !!schema) + return { + oneOf: messagesSchemas + } +} \ No newline at end of file diff --git a/src/middlewares/existsInAsyncAPI.ts b/src/middlewares/existsInAsyncAPI.ts index 4c49f519c..5f5c64675 100644 --- a/src/middlewares/existsInAsyncAPI.ts +++ b/src/middlewares/existsInAsyncAPI.ts @@ -8,15 +8,23 @@ export default (asyncapi: AsyncAPIDocument) => if (!messageChannel) { return next(new Error(`Invalid or undefined channel: '${event.channel}'. Ensure that '${event.channel}' is both a valid name and defined in the AsyncAPI file.`)) } + const receiveOperations = messageChannel.operations().filterByReceive() const sendOperations = messageChannel.operations().filterBySend() - if (sendOperations.length === 0 && event.isOutbound()) { - return next(new Error(`Failed to send message: No 'send' operation defined for channel "${messageChannel.id()}". Please verify that your AsyncAPI file includes a 'send' operation for this channel.`)) + if (event.isInbound()) { + const hasReceiveOperations = receiveOperations.length > 0 + if (!hasReceiveOperations) { + return next(new Error(`Failed to receive message: No 'receive' operation defined for channel "${messageChannel.id()}". Please verify that your AsyncAPI specification file a 'receive' operation for this channel.`)) + } } - const receiveOperations = messageChannel.operations().filterByReceive() - if (receiveOperations.length === 0 && event.isInbound()) { - return next(new Error(`Failed to receive message: No 'receive' operation defined for channel "${messageChannel.id()}". Please verify that your AsyncAPI specification file a 'receive' operation for this channel.`)) + if (event.isOutbound()) { + const hasSendOperations = sendOperations.length > 0 + const hasReplyInOperation = receiveOperations.some(operation => operation.reply) + if (!hasSendOperations && !hasReplyInOperation) { + return next(new Error(`Failed to send message: No 'send' operation defined for channel "${messageChannel.id()}" or your 'receive' operation doesn't have a 'reply' field. Please verify that your AsyncAPI file includes a 'send' operation for this channel.`)) + } } + return next() } diff --git a/src/middlewares/validate.ts b/src/middlewares/validate.ts index de18f88f7..47d1c37b3 100644 --- a/src/middlewares/validate.ts +++ b/src/middlewares/validate.ts @@ -1,14 +1,14 @@ -import { SchemaV2 as Schema } from '@asyncapi/parser' +import { AsyncAPISchema } from '@asyncapi/parser' import GleeError from '../errors/glee-error.js' import GleeMessage from '../lib/message.js' import { validateData } from '../lib/util.js' import { MiddlewareCallback } from './index.js' -export default (schema: Schema) => +export default (schema: AsyncAPISchema) => (event: GleeMessage, next: MiddlewareCallback) => { const { humanReadableError, errors, isValid } = validateData( event.payload, - schema + schema as any ) if (!isValid) { return next(