Skip to content

Commit

Permalink
refactor: few sources migrated to v2 completely
Browse files Browse the repository at this point in the history
  • Loading branch information
vinayteki95 committed Feb 17, 2025
1 parent 05f4790 commit 72c146c
Show file tree
Hide file tree
Showing 27 changed files with 147 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/sources/adjust/core.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const path = require('path');
const fs = require('fs');
const Message = require('../../v0/sources/message');
const Message = require('../message');
const { excludedFieldList } = require('./config');
const { extractCustomFields, generateUUID } = require('../../v0/util');
const { convertToISODate } = require('./utils');
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
const path = require('path');
const fs = require('fs');
const { TransformationError } = require('@rudderstack/integrations-lib');
const utils = require('../../util');
const utils = require('../../v0/util');
const Message = require('../message');

const mappingJson = JSON.parse(fs.readFileSync(path.resolve(__dirname, './mapping.json'), 'utf-8'));

const { removeUndefinedAndNullValues } = require('../../util');

const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE } = require('../../v0/util/constant');

const processNormalEvent = (event) => {
const message = new Message(`APPCENTER`);
Expand Down Expand Up @@ -56,11 +54,12 @@ const processTestEvent = (event) => ({
statusCode: 200,
});

const process = (event) => {
const process = (payload) => {
const event = utils.getBodyFromV2SpecPayload(payload);
const response = isTestEvent(event) ? processTestEvent(event) : processNormalEvent(event);
// to bypass the unit testcases ( we may change this)
// response.anonymousId = "7e32188a4dab669f";
return removeUndefinedAndNullValues(response);
return utils.removeUndefinedAndNullValues(response);
};

exports.process = process;
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ const path = require('path');
const fs = require('fs');
const { TransformationError } = require('@rudderstack/integrations-lib');
const Message = require('../message');
const { generateUUID } = require('../../util');
const { generateUUID, getBodyFromV2SpecPayload } = require('../../v0/util');

const mappingJson = JSON.parse(fs.readFileSync(path.resolve(__dirname, './mapping.json'), 'utf-8'));

const { removeUndefinedAndNullValues, isObject, isAppleFamily } = require('../../util');
const { removeUndefinedAndNullValues, isObject, isAppleFamily } = require('../../v0/util');

function processEvent(event) {
const messageType = 'track';
Expand Down Expand Up @@ -74,7 +74,8 @@ function processEvent(event) {
throw new TransformationError('Unknwon event type from Appsflyer');
}

function process(event) {
function process(payload) {
const event = getBodyFromV2SpecPayload(payload);
const response = processEvent(event);
const returnValue = removeUndefinedAndNullValues(response);
return returnValue;
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
const path = require('path');
const fs = require('fs');
const { removeUndefinedAndNullValues } = require('../../util');
const { removeUndefinedAndNullValues, getBodyFromV2SpecPayload } = require('../../v0/util');
const { getGroupId } = require('./util');
// import mapping json using JSON.parse to preserve object key order
const mapping = JSON.parse(fs.readFileSync(path.resolve(__dirname, './mapping.json'), 'utf-8'));
const Message = require('../message');
const { generateUUID } = require('../../util');
const { generateUUID } = require('../../v0/util');

// Ref: https://auth0.com/docs/logs/references/log-event-type-codes
const eventNameMap = JSON.parse(
Expand Down Expand Up @@ -69,10 +69,11 @@ function processEvents(eventList) {
return responses;
}

function process(events) {
let eventList = events;
if (!Array.isArray(events)) {
eventList = events.logs || [events];
function process(payload) {
const event = getBodyFromV2SpecPayload(payload);
let eventList = event;
if (!Array.isArray(event)) {
eventList = event.logs || [event];
}
return processEvents(eventList);
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ const {
formatTimeStamp,
removeUndefinedAndNullValues,
getHashFromArray,
} = require('../../../v0/util');
const Message = require('../../../v0/sources/message');
getBodyFromV2SpecPayload,
} = require('../../v0/util');
const Message = require('../message');

// import mapping json using JSON.parse to preserve object key order
const mapping = JSON.parse(fs.readFileSync(path.resolve(__dirname, './mapping.json'), 'utf-8'));
Expand Down Expand Up @@ -66,8 +67,9 @@ const processEvent = (event, eventMapping) => {
throw new TransformationError('Unknown event type from Braze');
};

const process = (inputEvent) => {
const { event, source } = inputEvent;
const process = (payload) => {
const event = getBodyFromV2SpecPayload(payload);
const { source } = payload;
const { customMapping } = source.Config;
const eventMapping = getHashFromArray(customMapping, 'from', 'to', false);
const responses = [];
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const sha256 = require('sha256');
const { TransformationError } = require('@rudderstack/integrations-lib');
const Message = require('../message');
const { voterMapping, authorMapping, checkForRequiredFields } = require('./util');
const logger = require('../../../logger');
const logger = require('../../logger');
const { getBodyFromV2SpecPayload } = require('../../v0/util');

const CannyOperation = {
VOTE_CREATED: 'vote.created',
Expand Down Expand Up @@ -73,7 +74,8 @@ function createMessage(event, typeOfUser) {
return finalMessage;
}

function process(event) {
function process(payload) {
const event = getBodyFromV2SpecPayload(payload);
let typeOfUser;

switch (event.type) {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ const {
removeUndefinedAndNullRecurse,
generateUUID,
formatTimeStamp,
} = require('../../../v0/util');
getBodyFromV2SpecPayload,
} = require('../../v0/util');
const { excludedFieldList } = require('./config');
const Message = require('../../../v0/sources/message');
const Message = require('../message');

function processEvent(inputEvent) {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand Down Expand Up @@ -48,8 +49,8 @@ function processEvent(inputEvent) {
return message;
}

function process(inputEvent) {
const { event } = inputEvent;
function process(payload) {
const event = getBodyFromV2SpecPayload(payload);
const response = processEvent(event);
return removeUndefinedAndNullValues(response);
}
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Message = require('../../../v0/sources/message');
const { CommonUtils } = require('../../../util/common');
const { generateUUID, isDefinedAndNotNull } = require('../../../v0/util');
const Message = require('../message');
const { CommonUtils } = require('../../util/common');
const { generateUUID, isDefinedAndNotNull, getBodyFromV2SpecPayload } = require('../../v0/util');
const { eventsMapping } = require('./config');

const mapping = require('./mapping.json');
Expand Down Expand Up @@ -43,8 +43,8 @@ const processEvent = (inputPaylaod) => {
};

const process = (inputEvent) => {
const { event: events } = inputEvent;
const eventsArray = CommonUtils.toArray(events);
const event = getBodyFromV2SpecPayload(inputEvent);
const eventsArray = CommonUtils.toArray(event);
return eventsArray.map(processEvent);
};

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ const { get } = require('lodash');
const Message = require('../message');

const { mappingConfig } = require('./config');
const { isDefinedAndNotNull } = require('../../util');
const { isDefinedAndNotNull, getBodyFromV2SpecPayload } = require('../../v0/util');

function process(event) {
function process(payload) {
const event = getBodyFromV2SpecPayload(payload);
const message = new Message(`Customer.io`);

// since customer, email, sms, push, slack, webhook
Expand Down
86 changes: 86 additions & 0 deletions src/sources/message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const set = require('set-value');
const get = require('get-value');

const { getValueFromMessage } = require('../v0/util');

const context = (integration) => ({
library: {
name: 'unknown',
version: 'unknown',
},
integration: {
name: integration,
},
});

class Message {
constructor(integration) {
this.context = context(integration);
this.integrations = {
[integration]: false,
};
}

setEventName(name) {
this.event = name;
}

setEventType(type) {
this.type = type;
}

setProperty(name, value) {
set(this, name, value);
}

setProperties(event, mapping) {
Object.keys(mapping).forEach((key) => {
const setVal = get(event, key);
let destKeys = mapping[key];
if (!Array.isArray(destKeys)) {
destKeys = [destKeys];
}
destKeys.forEach((destKey) => {
const existingVal = get(this, destKey);
// do not set if val setVal nil
// give higher pref to first key in mapping.json in case of same value
if (
setVal !== null &&
setVal !== undefined &&
(existingVal === null || existingVal === undefined)
) {
set(this, destKey, setVal);
}
});
});
}

setPropertiesV2(event, mappingJson) {
mappingJson.forEach((mapping) => {
const { sourceKeys } = mapping;
let { destKeys } = mapping;
const setVal = getValueFromMessage(event, sourceKeys);
if (!Array.isArray(destKeys)) {
destKeys = [destKeys];
}
destKeys.forEach((destKey) => {
const existingVal = get(this, destKey);
// do not set if val setVal nil
// give higher pref to first key in mapping.json in case of same value
if (
setVal !== null &&
setVal !== undefined &&
(existingVal === null || existingVal === undefined)
) {
set(this, destKey, setVal);
}
});
});
}

setTimestamp(timestamp) {
this.timestamp = timestamp;

Check warning on line 82 in src/sources/message.js

View check run for this annotation

Codecov / codecov/patch

src/sources/message.js#L81-L82

Added lines #L81 - L82 were not covered by tests
}
}

module.exports = Message;
6 changes: 6 additions & 0 deletions src/v0/util/constant.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ const HTTP_STATUS_CODES = {
NETWORK_AUTHENTICATION_REQUIRED: 511,
};

const ERROR_MESSAGES = {
MALFORMED_JSON_IN_REQUEST_BODY: 'Malformed JSON in request body',
REQUEST_BODY_NOT_PRESENT_IN_V2_SPEC_PAYLOAD: 'request.body is not present in V2 spec payload',
};

module.exports = {
API_CALL,
AUTH_CACHE_TTL,
Expand All @@ -94,4 +99,5 @@ module.exports = {
FEATURE_FILTER_CODE,
FEATURE_GZIP_SUPPORT,
VDM_V2_SCHEMA_VERSION,
ERROR_MESSAGES,
};
16 changes: 15 additions & 1 deletion src/v0/util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const logger = require('../../logger');
const stats = require('../../util/stats');
const { DestCanonicalNames, DestHandlerMap } = require('../../constants/destinationCanonicalNames');
const { client: errNotificationClient } = require('../../util/errorNotifier');
const { HTTP_STATUS_CODES, VDM_V2_SCHEMA_VERSION } = require('./constant');
const { HTTP_STATUS_CODES, VDM_V2_SCHEMA_VERSION, ERROR_MESSAGES } = require('./constant');
const {
REFRESH_TOKEN,
AUTH_STATUS_INACTIVE,
Expand Down Expand Up @@ -2364,6 +2364,19 @@ const convertToUuid = (input) => {
throw new InstrumentationError(errorMessage);
}
};

const getBodyFromV2SpecPayload = ({ request }) => {
if (request?.body) {
try {
const parsedBody = JSON.parse(request.body);
return parsedBody;
} catch (error) {
throw new TransformationError(ERROR_MESSAGES.MALFORMED_JSON_IN_REQUEST_BODY);

Check warning on line 2374 in src/v0/util/index.js

View check run for this annotation

Codecov / codecov/patch

src/v0/util/index.js#L2374

Added line #L2374 was not covered by tests
}
}
throw new TransformationError(ERROR_MESSAGES.REQUEST_BODY_NOT_PRESENT_IN_V2_SPEC_PAYLOAD);

Check warning on line 2377 in src/v0/util/index.js

View check run for this annotation

Codecov / codecov/patch

src/v0/util/index.js#L2377

Added line #L2377 was not covered by tests
};

// ========================================================================
// EXPORTS
// ========================================================================
Expand Down Expand Up @@ -2397,6 +2410,7 @@ module.exports = {
generateErrorObject,
generateUUID,
getBrowserInfo,
getBodyFromV2SpecPayload,
getDateInFormat,
getDestinationExternalID,
getDestinationExternalIDInfoForRetl,
Expand Down

0 comments on commit 72c146c

Please sign in to comment.