Skip to content

Commit

Permalink
fix: fix some bugs regarding request/reply (#637)
Browse files Browse the repository at this point in the history
  • Loading branch information
KhudaDad414 authored Dec 11, 2023
1 parent 0356229 commit bc5ceb4
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 40 deletions.
21 changes: 11 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { getParsedAsyncAPI } from './lib/asyncapiFile.js'
import { getSelectedServerNames } from './lib/servers.js'
import { EnrichedEvent, AuthEvent } from './lib/adapter.js'
import { ClusterEvent } from './lib/cluster.js'
import { getMessagesSchema } from './lib/util.js'

dotenvExpand(dotenv.config())

Expand Down Expand Up @@ -99,11 +100,14 @@ export default async function GleeAppInitializer() {
await generateDocs(parsedAsyncAPI, config, null)
parsedAsyncAPI.operations().filterByReceive().forEach(operation => {
const channel = operation.channels()[0] // operation can have only one channel.
const messagesSchemas = operation.messages().filterByReceive().map(m => m.payload().json()).filter(schema => !!schema)
const schema = {
oneOf: messagesSchemas
} as any
if (messagesSchemas.length > 0) app.use(channel.id(), validate(schema))
const replyChannel = operation.reply()?.channel()
if (replyChannel) {
const replyMessagesSchemas = getMessagesSchema(operation.reply())
if (replyMessagesSchemas.oneOf.length > 0) app.useOutbound(replyChannel.id(), validate(replyMessagesSchemas))
app.useOutbound(replyChannel.id(), json2string)
}
const schema = getMessagesSchema(operation)
if (schema.oneOf.length > 0) app.use(channel.id(), validate(schema))
app.use(channel.id(), (event, next) => {
triggerFunction({
app,
Expand All @@ -115,11 +119,8 @@ export default async function GleeAppInitializer() {

parsedAsyncAPI.operations().filterBySend().forEach(operation => {
const channel = operation.channels()[0] // operation can have only one channel.
const messagesSchemas = operation.messages().filterBySend().map(m => m.payload().json()).filter(schema => !!schema)
const schema = {
oneOf: messagesSchemas
} as any
if (messagesSchemas.length > 0) app.useOutbound(channel.id(), validate(schema))
const schema = getMessagesSchema(operation)
if (schema.oneOf.length > 0) app.useOutbound(channel.id(), validate(schema))
app.useOutbound(channel.id(), json2string)
})

Expand Down
32 changes: 11 additions & 21 deletions src/lib/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,17 @@ export async function trigger({
})

functionResult?.reply?.forEach((reply) => {
const replyMessages = createReplies(reply, message, parsedAsyncAPI)
const hasReplyMessages = replyMessages && replyMessages.length > 0
if (!hasReplyMessages) {
const replyMessage = createReplies(reply, message, parsedAsyncAPI)
if (!replyMessage) {
return
}
replyMessages.forEach(replyMessage => {
const replyChannel = parsedAsyncAPI.channels().get(replyMessage.channel)
replyChannel.servers().forEach((server) => {
replyMessage.serverName = server.id()
app.send(
replyMessage
)
})

const replyChannel = parsedAsyncAPI.channels().get(replyMessage.channel)
replyChannel.servers().forEach((server) => {
replyMessage.serverName = server.id()
app.send(
replyMessage
)
})

})
Expand All @@ -201,7 +199,7 @@ export async function trigger({
}
}

function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMessage, parsedAsyncAPI: AsyncAPIDocumentInterface): GleeMessage[] {
function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMessage, parsedAsyncAPI: AsyncAPIDocumentInterface): GleeMessage {
const operation = message.operation
const reply = operation.reply()
if (!reply) {
Expand All @@ -224,13 +222,5 @@ function createReplies(functionReply: GleeFunctionReturnReply, message: GleeMess
replyChannel = channel
}

const sendOperations = replyChannel.operations().filterBySend()

if (!sendOperations || sendOperations.length === 0) {
const warningMsg = `No 'send' operations defined for channel '${replyChannel.id()}'. Ensure your AsyncAPI file defines a 'send' operation for this channel to enable message replies. As a result, no reply will be sent for the channel.`
logWarningMessage(warningMsg)
return []
}

return sendOperations.map(operation => new GleeMessage({ ...functionReply, channel: replyChannel.id(), request: message, operation, connection: message.connection }))
return new GleeMessage({ ...functionReply, channel: replyChannel.id(), request: message, operation, connection: message.connection })
}
9 changes: 8 additions & 1 deletion src/lib/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AsyncAPIDocumentInterface as AsyncAPIDocument, ChannelInterface, ChannelParameterInterface } from '@asyncapi/parser'
import { AsyncAPIDocumentInterface as AsyncAPIDocument, ChannelInterface, ChannelParameterInterface, MessagesInterface } from '@asyncapi/parser'
import Ajv from 'ajv'
import betterAjvErrors from 'better-ajv-errors'
import { pathToRegexp } from 'path-to-regexp'
Expand Down Expand Up @@ -237,3 +237,10 @@ function getParamFromLocation(location: string, message: GleeMessage) {
return extractExpressionValueFromMessage(message, location)
}
}

export function getMessagesSchema(operation: { messages: () => MessagesInterface }) {
const messagesSchemas = operation.messages().all().map(m => m.payload().json()).filter(schema => !!schema)
return {
oneOf: messagesSchemas
}
}
18 changes: 13 additions & 5 deletions src/middlewares/existsInAsyncAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@ export default (asyncapi: AsyncAPIDocument) =>
if (!messageChannel) {
return next(new Error(`Invalid or undefined channel: '${event.channel}'. Ensure that '${event.channel}' is both a valid name and defined in the AsyncAPI file.`))
}
const receiveOperations = messageChannel.operations().filterByReceive()
const sendOperations = messageChannel.operations().filterBySend()
if (sendOperations.length === 0 && event.isOutbound()) {
return next(new Error(`Failed to send message: No 'send' operation defined for channel "${messageChannel.id()}". Please verify that your AsyncAPI file includes a 'send' operation for this channel.`))
if (event.isInbound()) {
const hasReceiveOperations = receiveOperations.length > 0
if (!hasReceiveOperations) {
return next(new Error(`Failed to receive message: No 'receive' operation defined for channel "${messageChannel.id()}". Please verify that your AsyncAPI specification file a 'receive' operation for this channel.`))
}
}

const receiveOperations = messageChannel.operations().filterByReceive()
if (receiveOperations.length === 0 && event.isInbound()) {
return next(new Error(`Failed to receive message: No 'receive' operation defined for channel "${messageChannel.id()}". Please verify that your AsyncAPI specification file a 'receive' operation for this channel.`))
if (event.isOutbound()) {
const hasSendOperations = sendOperations.length > 0
const hasReplyInOperation = receiveOperations.some(operation => operation.reply)
if (!hasSendOperations && !hasReplyInOperation) {
return next(new Error(`Failed to send message: No 'send' operation defined for channel "${messageChannel.id()}" or your 'receive' operation doesn't have a 'reply' field. Please verify that your AsyncAPI file includes a 'send' operation for this channel.`))
}
}


return next()
}
6 changes: 3 additions & 3 deletions src/middlewares/validate.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { SchemaV2 as Schema } from '@asyncapi/parser'
import { AsyncAPISchema } from '@asyncapi/parser'
import GleeError from '../errors/glee-error.js'
import GleeMessage from '../lib/message.js'
import { validateData } from '../lib/util.js'
import { MiddlewareCallback } from './index.js'

export default (schema: Schema) =>
export default (schema: AsyncAPISchema) =>
(event: GleeMessage, next: MiddlewareCallback) => {
const { humanReadableError, errors, isValid } = validateData(
event.payload,
schema
schema as any
)
if (!isValid) {
return next(
Expand Down

0 comments on commit bc5ceb4

Please sign in to comment.