Skip to content

Commit

Permalink
rotor: fill source name in function's EventContext
Browse files Browse the repository at this point in the history
rotor: don't count events produced by user recognition as multiplied events
  • Loading branch information
absorbb committed Jan 28, 2025
1 parent 0395779 commit e3cea99
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 15 deletions.
3 changes: 3 additions & 0 deletions libs/core-functions/__tests__/user-recognition.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const expectedEvents: AnalyticsServerEvent[] = [
},
{
messageId: "1",
_JITSU_UR_MESSAGE_ID: "4",
type: "page",
anonymousId: "anon1",
userId: "user1",
Expand All @@ -67,6 +68,7 @@ const expectedEvents: AnalyticsServerEvent[] = [
},
{
messageId: "2",
_JITSU_UR_MESSAGE_ID: "4",
type: "page",
anonymousId: "anon1",
userId: "user1",
Expand All @@ -79,6 +81,7 @@ const expectedEvents: AnalyticsServerEvent[] = [
},
{
messageId: "3",
_JITSU_UR_MESSAGE_ID: "4",
type: "page",
anonymousId: "anon1",
userId: "user1",
Expand Down
32 changes: 21 additions & 11 deletions libs/core-functions/src/functions/bulker-destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import { AnalyticsServerEvent, DataLayoutType } from "@jitsu/protocols/analytics

import omit from "lodash/omit";
import { MetricsMeta } from "./lib";
import { UserRecognitionParameter } from "./user-recognition";

const JitsuInternalProperties = [TableNameParameter, UserRecognitionParameter];

export type MappedEvent = {
event: any;
Expand All @@ -25,7 +28,7 @@ export type DataLayoutImpl<T> = (

export function jitsuLegacy(event: AnalyticsServerEvent, ctx: FullContext<BulkerDestinationConfig>): MappedEvent {
const flat = toJitsuClassic(event, ctx);
return { event: omit(flat, TableNameParameter), table: event[TableNameParameter] ?? "events" };
return { event: omit(flat, JitsuInternalProperties), table: event[TableNameParameter] ?? "events" };
}

export function segmentLayout(
Expand Down Expand Up @@ -56,7 +59,7 @@ export function segmentLayout(
event.context?.groupId || event.traits?.groupId || event.context?.traits?.groupId
);
transferFunc(transformed, event.properties);
transferFunc(transformed, event, ["context", "properties", "traits", "type", TableNameParameter]);
transferFunc(transformed, event, ["context", "properties", "traits", "type", ...JitsuInternalProperties]);
} else {
transformed = {
context: {},
Expand All @@ -65,7 +68,7 @@ export function segmentLayout(
transferFunc(transformed, event.properties);
transferFunc(transformed, event.context?.traits);
transferFunc(transformed, event.traits);
transferFunc(transformed, event, ["context", "properties", "traits", "type", TableNameParameter]);
transferFunc(transformed, event, ["context", "properties", "traits", "type", ...JitsuInternalProperties]);
}
break;
case "group":
Expand All @@ -79,15 +82,22 @@ export function segmentLayout(
transferFunc(transformed.context.group, event.traits);
transferValueFunc(transformed.context, "group_id", event.groupId);
transferFunc(transformed, event.properties);
transferFunc(transformed, event, ["context", "properties", "traits", "type", "groupId", TableNameParameter]);
transferFunc(transformed, event, [
"context",
"properties",
"traits",
"type",
"groupId",
...JitsuInternalProperties,
]);
} else {
transformed = {
context: {},
};
transferFunc(transformed.context, event.context, ["traits"]);
transferFunc(transformed, event.properties);
transferFunc(transformed, event.traits);
transferFunc(transformed, event, ["context", "properties", "traits", "type", TableNameParameter]);
transferFunc(transformed, event, ["context", "properties", "traits", "type", ...JitsuInternalProperties]);
}
break;
case "track":
Expand All @@ -102,13 +112,13 @@ export function segmentLayout(
transferFunc(transformed.context.traits, event.properties?.traits, ["groupId"]);
transferValueFunc(transformed.context, "group_id", event.context?.groupId || event.context?.traits?.groupId);
transferFunc(transformed, event.properties, ["traits"]);
transferFunc(transformed, event, ["context", "properties", "type", TableNameParameter]);
transferFunc(transformed, event, ["context", "properties", "type", ...JitsuInternalProperties]);
} else {
baseTrackFlat = {};
transferFunc(baseTrackFlat, event, ["properties", "type", TableNameParameter]);
transferFunc(baseTrackFlat, event, ["properties", "type", ...JitsuInternalProperties]);
transformed = {};
transferFunc(transformed, event.properties);
transferFunc(transformed, event, ["properties", "type", TableNameParameter]);
transferFunc(transformed, event, ["properties", "type", ...JitsuInternalProperties]);
}
break;
default:
Expand All @@ -122,11 +132,11 @@ export function segmentLayout(
transferFunc(transformed.context.traits, event.context?.traits, ["groupId"]);
transferValueFunc(transformed.context, "group_id", event.context?.groupId || event.context?.traits?.groupId);
transferFunc(transformed, event.properties);
transferFunc(transformed, event, ["context", "properties", TableNameParameter]);
transferFunc(transformed, event, ["context", "properties", ...JitsuInternalProperties]);
} else {
transformed = {};
transferFunc(transformed, event.properties);
transferFunc(transformed, event, ["properties", TableNameParameter]);
transferFunc(transformed, event, ["properties", ...JitsuInternalProperties]);
}
}
if (event[TableNameParameter]) {
Expand Down Expand Up @@ -167,7 +177,7 @@ export const dataLayouts: Record<DataLayoutType, DataLayoutImpl<any>> = {
segment: (event, ctx) => segmentLayout(event, false, ctx),
"segment-single-table": (event, ctx) => segmentLayout(event, true, ctx),
"jitsu-legacy": jitsuLegacy,
passthrough: event => ({ event: omit(event, TableNameParameter), table: event[TableNameParameter] ?? "events" }),
passthrough: event => ({ event: omit(event, JitsuInternalProperties), table: event[TableNameParameter] ?? "events" }),
};

export type BulkerDestinationConfig = {
Expand Down
1 change: 1 addition & 0 deletions libs/core-functions/src/functions/lib/udf_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ export async function UDFTestRun({
headers: {},
source: {
id: "functionsDebugger-streamId",
name: "Functions Debugger Stream",
type: "browser",
},
destination: {
Expand Down
6 changes: 5 additions & 1 deletion libs/core-functions/src/functions/user-recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import get from "lodash/get";
import set from "lodash/set";
import merge from "lodash/merge";

export const UserRecognitionParameter = "_JITSU_UR_MESSAGE_ID";

export const UserRecognitionConfig = z.object({
/**
* Where to look for anonymous id, an array of JSON paths
Expand Down Expand Up @@ -61,7 +63,9 @@ const UserRecognitionFunction: JitsuFunction<AnalyticsServerEvent, UserRecogniti
const res = await anonEvStore.evictEvents(collectionName, anonId).then(evs => {
return evs.map(anonEvent => {
//merge anonymous event with identified fields
return merge(anonEvent, identifiedFields);
const merged = merge(anonEvent, identifiedFields);
merged[UserRecognitionParameter] = event.messageId;
return merged;
});
});
if (res.length === 0) {
Expand Down
1 change: 1 addition & 0 deletions libs/core-functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export function isDropResult(result: FuncReturn): boolean {
}

export * as bulkerDestination from "./functions/bulker-destination";
export { UserRecognitionParameter } from "./functions/user-recognition";
export { UDFWrapper, UDFTestRun } from "./functions/lib/udf_wrapper";
export type { UDFTestRequest, UDFTestResponse, logType } from "./functions/lib/udf_wrapper";
export { ProfileUDFWrapper, ProfileUDFTestRun, mergeUserTraits } from "./functions/lib/profiles-udf-wrapper";
Expand Down
1 change: 1 addition & 0 deletions libs/core-functions/src/lib/config-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export type EnrichedConnectionConfig = {
updatedAt?: Date;
destinationId: string;
streamId: string;
streamName?: string;
metricsKeyPrefix: string;
usesBulker: boolean;
//destinationType
Expand Down
4 changes: 3 additions & 1 deletion services/rotor/src/lib/functions-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
MetricsMeta,
mongodb,
UDFWrapper,
UserRecognitionParameter,
wrapperFunction,
} from "@jitsu/core-functions";
import Prometheus from "prom-client";
Expand Down Expand Up @@ -290,7 +291,8 @@ export async function runChain(
const sw = stopwatch();
const rat = new Date(event.receivedAt);
const execLogMeta = {
eventIndex: i,
// we don't multiply active incoming metrics for events produced by user recognition
eventIndex: event[UserRecognitionParameter] ? 0 : i,
receivedAt: !isNaN(rat.getTime()) ? rat : new Date(),
functionId: f.id,
metricsMeta: metricsMeta,
Expand Down
1 change: 1 addition & 0 deletions services/rotor/src/lib/message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export async function rotorMessageHandler(
source: {
type: message.ingestType,
id: connection.streamId,
name: connection.streamName,
domain: message.origin?.domain,
},
destination: {
Expand Down
4 changes: 2 additions & 2 deletions services/rotor/src/lib/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ export function createMetrics(
JSON.stringify({
timestamp: e.timestamp,
workspaceId: e.workspaceId,
messageId: e.messageId,
messageId: e.key,
})
);
streamOld.push("\n");
}
stream.push(JSON.stringify(omit(e, "retries", "messageId")));
stream.push(JSON.stringify(omit(e, "retries", "messageId", "key")));
stream.push("\n");
});
//close stream
Expand Down
1 change: 1 addition & 0 deletions webapps/console/pages/api/admin/export/[name]/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ const exports: Export[] = [
type: destinationType,
workspaceId: workspace.id,
streamId: from.id,
streamName: from.config?.name,
destinationId: to.id,
usesBulker: !!coreDestinationType?.usesBulker,
options: {
Expand Down

0 comments on commit e3cea99

Please sign in to comment.