Skip to content

Commit 5b8a429

Browse files
committed
rotor: get rid of pg-config-store in favor of http based entity-store
1 parent 7ab8a7e commit 5b8a429

File tree

7 files changed

+176
-227
lines changed

7 files changed

+176
-227
lines changed

services/rotor/src/index.ts

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { redis } from "./lib/redis";
2020
import { redisLogger } from "./lib/redis-logger";
2121
import { DummyMetrics, Metrics } from "./lib/metrics";
2222
import { isTruish } from "@jitsu-internal/console/lib/shared/chores";
23-
import { pgConfigStore } from "./lib/pg-config-store";
23+
import { connectionsStore, functionsStore } from "./lib/entity-store";
2424

2525
export const log = getLog("rotor");
2626

@@ -67,9 +67,14 @@ async function main() {
6767
await mongodb.waitInit();
6868
await redis.waitInit();
6969
await redisLogger.waitInit();
70-
const store = await pgConfigStore.get();
71-
if (!store.enabled) {
72-
log.atError().log("Postgres is not configured. Rotor will not work");
70+
const connStore = await connectionsStore.get();
71+
if (!connStore.enabled) {
72+
log.atError().log("Connection store is not configured. Rotor will not work");
73+
process.exit(1);
74+
}
75+
const funcStore = await functionsStore.get();
76+
if (!funcStore.enabled) {
77+
log.atError().log("Functions store is not configured. Rotor will not work");
7378
process.exit(1);
7479
}
7580
//kafka consumer mode
@@ -104,9 +109,14 @@ async function main() {
104109
await mongodb.waitInit();
105110
await redis.waitInit();
106111
await redisLogger.waitInit();
107-
const store = await pgConfigStore.get();
108-
if (!store.enabled) {
109-
log.atError().log("Postgres is not configured. Rotor will not work");
112+
const connStore = await connectionsStore.get();
113+
if (!connStore.enabled) {
114+
log.atError().log("Connection store is not configured. Rotor will not work");
115+
process.exit(1);
116+
}
117+
const funcStore = await functionsStore.get();
118+
if (!funcStore.enabled) {
119+
log.atError().log("Functions store is not configured. Rotor will not work");
110120
process.exit(1);
111121
}
112122
const geoResolver = await initMaxMindClient(process.env.MAXMIND_LICENSE_KEY || "");
@@ -118,11 +128,17 @@ function initHTTP(metrics: Metrics, geoResolver: GeoResolver) {
118128
http.get("/health", (req, res) => {
119129
res.json({
120130
status: "pass",
121-
configStore: {
122-
enabled: pgConfigStore.getCurrent()?.enabled || "loading",
123-
status: pgConfigStore.status(),
124-
lastUpdated: pgConfigStore.lastRefresh(),
125-
lastModified: pgConfigStore.lastModified(),
131+
connectionsStore: {
132+
enabled: connectionsStore.getCurrent()?.enabled || "loading",
133+
status: connectionsStore.status(),
134+
lastUpdated: connectionsStore.lastRefresh(),
135+
lastModified: connectionsStore.lastModified(),
136+
},
137+
functionsStore: {
138+
enabled: functionsStore.getCurrent()?.enabled || "loading",
139+
status: functionsStore.status(),
140+
lastUpdated: functionsStore.lastRefresh(),
141+
lastModified: functionsStore.lastModified(),
126142
},
127143
});
128144
});
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { createInMemoryStore } from "./inmem-store";
2+
import { getLog } from "juava";
3+
import { httpAgent, httpsAgent } from "@jitsu/core-functions";
4+
import fetch from "node-fetch-commonjs";
5+
6+
const log = getLog("entity-store");
7+
8+
export type EntityStore = {
9+
getObject: (id: string) => any;
10+
toJSON: () => string;
11+
enabled: boolean;
12+
};
13+
14+
const DisabledStore: EntityStore = {
15+
enabled: false,
16+
getObject: () => undefined,
17+
toJSON: () => "disabled",
18+
};
19+
20+
const EmptyStore: EntityStore = {
21+
enabled: true,
22+
getObject: () => undefined,
23+
toJSON: () => "",
24+
};
25+
26+
const refreshFunc =
27+
(storeId: string) =>
28+
async (ifModifiedSince?: Date): Promise<{ lastModified: Date | undefined; store: EntityStore } | "not_modified"> => {
29+
const repositoryBase = process.env.REPOSITORY_BASE_URL;
30+
if (repositoryBase) {
31+
const objs: Record<string, any> = {};
32+
const headers: Record<string, string> = {};
33+
let lastModified: Date | undefined = undefined;
34+
if (process.env.REPOSITORY_AUTH_TOKEN) {
35+
headers["Authorization"] = `Bearer ${process.env.REPOSITORY_AUTH_TOKEN}`;
36+
}
37+
if (ifModifiedSince) {
38+
headers["If-Modified-Since"] = ifModifiedSince.toUTCString();
39+
}
40+
try {
41+
const res = await fetch(`${repositoryBase}/${storeId}?timeoutMs=10000&listen=1`, {
42+
method: "GET",
43+
headers: headers,
44+
agent: await (repositoryBase.startsWith("https://") ? httpsAgent : httpAgent).waitInit(),
45+
});
46+
if (res.status === 304) {
47+
log.atInfo().log(`${storeId} nod modified: ${ifModifiedSince}`);
48+
return "not_modified";
49+
}
50+
if (!res.ok) {
51+
throw new Error(`Failed to load ${storeId} from repository: ${res.status} response: ${await res.text()}`);
52+
}
53+
const json: any = await res.json();
54+
for (const fn of json) {
55+
objs[fn.id] = fn;
56+
}
57+
const lmString = res.headers.get("Last-Modified");
58+
if (lmString) {
59+
lastModified = new Date(lmString);
60+
}
61+
log.atInfo().log(`${storeId} updated: ${lastModified} previous update date: ${ifModifiedSince}`);
62+
return {
63+
store: {
64+
enabled: true,
65+
getObject: (key: string) => {
66+
return objs[key];
67+
},
68+
toJSON: () => {
69+
return JSON.stringify(objs);
70+
},
71+
},
72+
lastModified: lastModified,
73+
};
74+
} catch (e) {
75+
throw new Error(`Failed to load ${storeId} from repository: ${e}`);
76+
}
77+
} else {
78+
return { store: DisabledStore, lastModified: new Date() };
79+
}
80+
};
81+
82+
const storeFunc = (storeId: string) =>
83+
createInMemoryStore({
84+
refreshIntervalMillis: process.env.REPOSITORY_REFRESH_PERIOD_SEC
85+
? parseInt(process.env.REPOSITORY_REFRESH_PERIOD_SEC) * 1000
86+
: 2000,
87+
name: `${storeId}-store`,
88+
localDir: process.env.REPOSITORY_CACHE_DIR,
89+
serializer: (store: EntityStore) => (store.enabled ? store.toJSON() : ""),
90+
deserializer: (serialized: string) => {
91+
if (serialized) {
92+
if (serialized === "disabled") {
93+
return DisabledStore;
94+
}
95+
const store = JSON.parse(serialized);
96+
return {
97+
enabled: true,
98+
getObject: (key: string): any => {
99+
return store?.[key];
100+
},
101+
toJSON: () => {
102+
return store ? JSON.stringify(store) : "";
103+
},
104+
};
105+
} else {
106+
return EmptyStore;
107+
}
108+
},
109+
refresh: refreshFunc(storeId),
110+
});
111+
112+
export const functionsStore = storeFunc("functions");
113+
export const connectionsStore = storeFunc("rotor-connections");

services/rotor/src/lib/functions-chain.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ import { RetryErrorName, DropRetryErrorName } from "@jitsu/functions-lib";
1919

2020
import { getLog, newError, requireDefined, stopwatch } from "juava";
2121
import { retryObject } from "./retries";
22-
import { ConfigStore } from "./pg-config-store";
2322
import NodeCache from "node-cache";
2423
import pick from "lodash/pick";
2524
import { EnrichedConnectionConfig } from "./config-types";
25+
import { EntityStore } from "./entity-store";
2626

2727
export type Func = {
2828
id: string;
@@ -87,7 +87,7 @@ const getCachedOrLoad = (cache: NodeCache, key: string, loader: (key: string) =>
8787

8888
export function buildFunctionChain(
8989
connection: EnrichedConnectionConfig,
90-
pgStore: ConfigStore,
90+
func: EntityStore,
9191
functionsFilter?: (id: string) => boolean
9292
) {
9393
let mainFunction;
@@ -123,7 +123,7 @@ export function buildFunctionChain(
123123
.filter(f => f.functionId.startsWith("udf."))
124124
.map(f => {
125125
const functionId = f.functionId.substring(4);
126-
const userFunctionObj = pgStore.getConfig("function", functionId);
126+
const userFunctionObj = func.getObject(functionId);
127127
if (!userFunctionObj || userFunctionObj.workspaceId !== connection.workspaceId) {
128128
return {
129129
id: f.functionId as string,

services/rotor/src/lib/inmem-store.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,7 @@ export const createInMemoryStore = <T>(definition: StoreDefinition<T>): InMemory
6262
let intervalToClear: NodeJS.Timeout | undefined = undefined;
6363

6464
function scheduleStoreRefresh() {
65-
intervalToClear = setInterval(async () => {
66-
if (stopping) {
67-
return;
68-
}
65+
const refresh = async () => {
6966
try {
7067
const newDef = await definition.refresh(lastModified);
7168
if (newDef !== "not_modified") {
@@ -79,7 +76,25 @@ export const createInMemoryStore = <T>(definition: StoreDefinition<T>): InMemory
7976
log.atWarn().withCause(e).log(`Failed to refresh store ${definition.name}. Using an old value`);
8077
status = "outdated";
8178
}
82-
}, definition.refreshIntervalMillis);
79+
};
80+
if (definition.refreshIntervalMillis > 0) {
81+
intervalToClear = setInterval(async () => {
82+
if (stopping) {
83+
return;
84+
}
85+
await refresh();
86+
}, definition.refreshIntervalMillis);
87+
} else {
88+
(function loop() {
89+
if (stopping) {
90+
return;
91+
}
92+
setTimeout(async () => {
93+
await refresh();
94+
loop();
95+
}, 1);
96+
})();
97+
}
8398
}
8499

85100
let instancePromise = new Promise<T>((resolve, reject) => {
@@ -149,8 +164,6 @@ export const createInMemoryStore = <T>(definition: StoreDefinition<T>): InMemory
149164
stopping = true;
150165
if (intervalToClear) {
151166
clearInterval(intervalToClear);
152-
} else {
153-
log.atError().log(`There's no interval for ${definition.name}`);
154167
}
155168
status = "stopped";
156169
},

services/rotor/src/lib/message-handler.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { Metrics } from "./metrics";
33
import { GeoResolver } from "./maxmind";
44
import { IngestMessage } from "@jitsu/protocols/async-request";
55
import { CONNECTION_IDS_HEADER } from "./rotor";
6-
import { pgConfigStore } from "./pg-config-store";
6+
import { connectionsStore, functionsStore } from "./entity-store";
7+
78
import { AnalyticsServerEvent } from "@jitsu/protocols/analytics";
89
import { EventContext } from "@jitsu/protocols/functions";
910
import {
@@ -36,17 +37,21 @@ export async function rotorMessageHandler(
3637
if (!_message) {
3738
return;
3839
}
39-
const pgStore = pgConfigStore.getCurrent();
40-
if (!pgStore || !pgStore.enabled) {
41-
throw newError("Config store is not enabled");
40+
const connStore = connectionsStore.getCurrent();
41+
if (!connStore || !connStore.enabled) {
42+
throw newError("Connection store is not enabled");
43+
}
44+
const funcStore = functionsStore.getCurrent();
45+
if (!funcStore || !funcStore.enabled) {
46+
throw newError("Functions store is not enabled");
4247
}
4348
const eventStore = redisLogger();
4449

4550
const message = (typeof _message === "string" ? JSON.parse(_message) : _message) as IngestMessage;
4651
const connectionId =
4752
headers && headers[CONNECTION_IDS_HEADER] ? headers[CONNECTION_IDS_HEADER].toString() : message.connectionId;
4853
const connection: EnrichedConnectionConfig = requireDefined(
49-
pgStore.getEnrichedConnection(connectionId),
54+
connStore.getObject(connectionId),
5055
`Unknown connection: ${connectionId}`
5156
);
5257

@@ -107,7 +112,7 @@ export async function rotorMessageHandler(
107112
},
108113
};
109114

110-
const funcChain = buildFunctionChain(connection, pgStore, functionsFilter);
115+
const funcChain = buildFunctionChain(connection, funcStore, functionsFilter);
111116

112117
const chainRes = await runChain(funcChain, event, eventStore, store, ctx, systemContext);
113118
chainRes.connectionId = connectionId;

0 commit comments

Comments
 (0)