Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 69 additions & 15 deletions templates/lib/actions/action.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
*/

const spec = require("../spec.json");
const { mapFieldNames, getMetadata, mapFormDataBody, putAdditionalParamsInBody, executeCall, formatApiKey } = require("../utils/helpers");
const {
mapFieldNames,
getMetadata,
mapFormDataBody,
putAdditionalParamsInBody,
executeCall,
formatApiKey,
} = require("../utils/helpers");
const componentJson = require("../../component.json");

async function processAction(msg, cfg, snapshot, incomingMessageHeaders, tokenData) {
Expand All @@ -23,7 +30,6 @@ async function processAction(msg, cfg, snapshot, incomingMessageHeaders, tokenDa
if (cfg && cfg.nodeSettings && cfg.nodeSettings.continueOnError) continueOnError = true;

try {

if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) {
logger = this.logger.child({});
logger.level && logger.level(logLevel);
Expand Down Expand Up @@ -114,31 +120,79 @@ async function processAction(msg, cfg, snapshot, incomingMessageHeaders, tokenDa
delete callParams.requestBody;
}


const resp = await executeCall.call(this, callParams);

// Wait for rate limit if specified
const rateLimit = cfg.nodeSettings && cfg.nodeSettings.rateLimit ? parseInt(cfg.nodeSettings.rateLimit) : (Number.isInteger(componentJson.rateLimit) ? componentJson.rateLimit : 0);
const rateLimit =
cfg.nodeSettings && cfg.nodeSettings.rateLimit
? parseInt(cfg.nodeSettings.rateLimit)
: Number.isInteger(componentJson.rateLimit)
? componentJson.rateLimit
: 0;
if (rateLimit > 0) {
this.logger.info(`Waiting for rate limit: ${rateLimit} ms`);
await new Promise(resolve => setTimeout(resolve, rateLimit));
await new Promise((resolve) => setTimeout(resolve, rateLimit));
}

const newElement = {};
newElement.metadata = getMetadata(msg.metadata);
newElement.data = resp.body;
const responseBody = resp.body;
const { arraySplittingKey } = cfg.nodeSettings;

if (arraySplittingKey && !cfg.returnResult) {
if (Array.isArray(responseBody)) {
logger.info(`Response is an array with ${responseBody.length} items. Emitting each element separately.`);
responseBody.forEach((item, index) => {
this.emit("data", {
data: item,
metadata: getMetadata(msg.metadata),
});
logger.info(`Emitted array item at index ${index}`);
});
} else if (responseBody && typeof responseBody === "object") {
logger.info(`Response is an object. Resolving nested path "${arraySplittingKey}".`);
const splitArray = arraySplittingKey
.split(".")
.reduce((acc, key) => (acc && acc[key] !== undefined ? acc[key] : null), responseBody);

if (Array.isArray(splitArray)) {
logger.info(`Found array at "${arraySplittingKey}" with ${splitArray.length} items. Emitting each element.`);
splitArray.forEach((item, index) => {
this.emit("data", {
data: item,
metadata: getMetadata(msg.metadata),
});
logger.info(`Emitted nested array item at index ${index}`);
});
} else {
if (splitArray === null) {
logger.info(`Path "${arraySplittingKey}" not found in response object. Emitting full response instead.`);
} else {
logger.info(`Path "${arraySplittingKey}" resolved, but value is not an array (type: ${typeof splitArray}). Emitting full response instead.`);
}
this.emit("data", { data: responseBody, metadata: getMetadata(msg.metadata) });
}
} else {
logger.info(`Array splitting key "${arraySplittingKey}" was specified, but response is neither an array nor an object. Type: ${typeof responseBody}. Emitting full response.`);
this.emit("data", { data: responseBody, metadata: getMetadata(msg.metadata) });
}
} else {
const outputMessage = {
metadata: getMetadata(msg.metadata),
data: responseBody,
};

if (cfg.returnResult) {
logger.info(`returnResult flag is true. Returning output message instead of emitting.`);
return outputMessage;
}

if (cfg.returnResult) {
return newElement;
this.emit("data", outputMessage);
this.logger.info("Execution finished: emitted single message.");
}

this.emit("data", newElement);
this.logger.info("Execution finished");
} catch (e) {
if (continueOnError === true) {
this.emit('data', { data: {}, metadata: {} });
this.emit("data", { data: {}, metadata: {} });
} else {
this.emit('error', e);
this.emit("error", e);
}
logger.error(e);
}
Expand Down