Skip to content

Commit

Permalink
console/api/admin/export added rotor-connections and functions
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 19, 2024
1 parent cddf11e commit 7ab8a7e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 17 deletions.
6 changes: 5 additions & 1 deletion services/rotor/src/lib/message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { redisLogger } from "./redis-logger";
import { buildFunctionChain, checkError, runChain } from "./functions-chain";
import { redis } from "./redis";
import { EnrichedConnectionConfig } from "./config-types";
export const log = getLog("rotor");

const anonymousEventsStore = mongoAnonymousEventsStore();
Expand All @@ -44,7 +45,10 @@ export async function rotorMessageHandler(
const message = (typeof _message === "string" ? JSON.parse(_message) : _message) as IngestMessage;
const connectionId =
headers && headers[CONNECTION_IDS_HEADER] ? headers[CONNECTION_IDS_HEADER].toString() : message.connectionId;
const connection = requireDefined(pgStore.getEnrichedConnection(connectionId), `Unknown connection: ${connectionId}`);
const connection: EnrichedConnectionConfig = requireDefined(
pgStore.getEnrichedConnection(connectionId),
`Unknown connection: ${connectionId}`
);

log
.atDebug()
Expand Down
135 changes: 121 additions & 14 deletions webapps/console/pages/api/admin/export/[name]/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { db } from "../../../../../lib/server/db";
import { getErrorMessage, getLog, requireDefined, rpc } from "juava";
import { z } from "zod";
import { getCoreDestinationTypeNonStrict } from "../../../../../lib/schema/destinations";
import pick from "lodash/pick";
import { createJwt, getEeConnection, isEEAvailable } from "../../../../../lib/server/ee";
import omit from "lodash/omit";
import { NextApiRequest } from "next";
import hash from "object-hash";

interface Writer {
write(data: string): void;
Expand Down Expand Up @@ -38,7 +38,7 @@ async function getLastUpdated(): Promise<Date | undefined> {

const exports: Export[] = [
{
name: "bulker",
name: "bulker-connections",
lastModified: getLastUpdated,
data: async writer => {
writer.write("[");
Expand All @@ -47,7 +47,7 @@ const exports: Export[] = [
let needComma = false;
while (true) {
const objects = await db.prisma().configurationObjectLink.findMany({
where: { deleted: false },
where: { deleted: false, workspace: { deleted: false }, from: { deleted: false }, to: { deleted: false } },
include: { from: true, to: true, workspace: true },
take: batchSize,
cursor: lastId ? { id: lastId } : undefined,
Expand All @@ -60,7 +60,6 @@ const exports: Export[] = [
lastId = objects[objects.length - 1].id;
for (const { data, from, id, to, updatedAt, workspace } of objects) {
const destinationType = to.config.destinationType;
console.log(to.config);
if (getCoreDestinationTypeNonStrict(destinationType)?.usesBulker) {
if (needComma) {
writer.write(",");
Expand All @@ -71,14 +70,10 @@ const exports: Export[] = [
workspace: { id: workspace.id, name: workspace.slug },
},
id: id,

type: destinationType,
options:
from.config.type === "stream"
? pick(data, "mode", "frequency", "primaryKey", "timestampColumn")
: undefined,
updatedAt: dateMax(updatedAt, from.updatedAt, to.updatedAt),
credentials: to.config,
options: data,
updatedAt: dateMax(updatedAt, to.updatedAt),
credentials: omit(to.config, "destinationType", "type", "name"),
})
);
needComma = true;
Expand Down Expand Up @@ -115,6 +110,111 @@ const exports: Export[] = [
writer.write("]");
},
},
{
name: "rotor-connections",
lastModified: getLastUpdated,
data: async writer => {
writer.write("[");

let lastId: string | undefined = undefined;
let needComma = false;
while (true) {
const objects = await db.prisma().configurationObjectLink.findMany({
where: {
deleted: false,
NOT: { type: "sync" },
workspace: { deleted: false },
from: { deleted: false },
to: { deleted: false },
},
include: { from: true, to: true, workspace: true },
take: batchSize,
cursor: lastId ? { id: lastId } : undefined,
orderBy: { id: "asc" },
});
if (objects.length == 0) {
break;
}
getLog().atDebug().log(`Got batch of ${objects.length} objects for bulker export`);
lastId = objects[objects.length - 1].id;
for (const { data, from, id, to, updatedAt, workspace } of objects) {
const destinationType = to.config.destinationType;
const coreDestinationType = getCoreDestinationTypeNonStrict(destinationType);
if (!coreDestinationType) {
getLog().atError().log(`Unknown destination type: ${destinationType} for connection ${id}`);
}
if (needComma) {
writer.write(",");
}
writer.write(
JSON.stringify({
__debug: {
workspace: { id: workspace.id, name: workspace.slug },
},
id: id,
type: destinationType,
workspaceId: workspace.id,
streamId: from.id,
destinationId: to.id,
usesBulker: !!coreDestinationType?.usesBulker,
options: data,
updatedAt: dateMax(updatedAt, to.updatedAt),
credentials: omit(to.config, "destinationType", "type", "name"),
})
);
needComma = true;
}
if (objects.length < batchSize) {
break;
}
}
writer.write("]");
},
},
{
name: "functions",
lastModified: getLastUpdated,
data: async writer => {
writer.write("[");

let lastId: string | undefined = undefined;
let needComma = false;
while (true) {
const objects = await db.prisma().configurationObject.findMany({
where: {
deleted: false,
type: "function",
workspace: { deleted: false },
},
take: batchSize,
cursor: lastId ? { id: lastId } : undefined,
orderBy: { id: "asc" },
});
if (objects.length == 0) {
break;
}
getLog().atDebug().log(`Got batch of ${objects.length} objects for bulker export`);
lastId = objects[objects.length - 1].id;
for (const row of objects) {
if (needComma) {
writer.write(",");
}
writer.write(
JSON.stringify({
...omit(row, "deleted", "config"),
...row.config,
codeHash: hash(row.config?.code),
})
);
needComma = true;
}
if (objects.length < batchSize) {
break;
}
}
writer.write("]");
},
},
{
name: "streams-with-destinations",
lastModified: getLastUpdated,
Expand All @@ -124,7 +224,7 @@ const exports: Export[] = [
let needComma = false;
while (true) {
const objects = await db.prisma().configurationObject.findMany({
where: { deleted: false, type: "stream" },
where: { deleted: false, type: "stream", workspace: { deleted: false } },
include: { toLinks: { include: { to: true } }, workspace: true },
take: batchSize,
cursor: lastId ? { id: lastId } : undefined,
Expand Down Expand Up @@ -159,6 +259,7 @@ const exports: Export[] = [
),
...obj.config,
},
backupEnabled: !(obj.workspace.featuresEnabled || []).includes("nobackup"),
destinations: obj.toLinks
.filter(l => !l.deleted && l.type === "push")
.map(l => ({
Expand Down Expand Up @@ -205,12 +306,18 @@ export function getIfModifiedSince(req: NextApiRequest): Date | undefined {
export const ExportQueryParams = z.object({
name: z.string(),
listen: z.string().optional(),
timeoutMs: z.number().optional().default(10_000),
timeoutMs: z.coerce.number().optional().default(10_000),
dateOnly: z.coerce.boolean().optional().default(false),
});

export function notModified(ifModifiedSince: Date | undefined, lastModified: Date | undefined) {
return ifModifiedSince && lastModified && ifModifiedSince.getTime() >= lastModified.getTime();
if (!ifModifiedSince || !lastModified) {
return false;
}
const lastModifiedCopy = new Date(lastModified.getTime());
// Last-Modified and If-Modified-Since headers are not precise enough, so we need to round it to seconds
lastModifiedCopy.setMilliseconds(0);
return ifModifiedSince.getTime() >= lastModifiedCopy.getTime();
}

export default createRoute()
Expand Down
2 changes: 1 addition & 1 deletion webapps/console/scripts/password-hash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async function main(): Promise<void> {
process.env.GLOBAL_HASH_SECRET ? "custom GLOBAL_HASH_SECRET" : "default hash secret"
}`
);
log.atInfo().log(`Hashing ${secret}${createHash(args._[0])}`);
log.atInfo().log(`Hashing ${secret}${createHash(secret)}`);
}

main();
3 changes: 2 additions & 1 deletion webapps/ee-api/pages/api/s3-connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const handler = async function handler(req: NextApiRequest, res: NextApiResponse
},
id: `${w.id}_backup`,
type: "s3",
special: "backup",
options: {
dataLayout: "passthrough",
deduplicate: false,
Expand All @@ -43,7 +44,7 @@ const handler = async function handler(req: NextApiRequest, res: NextApiResponse
region: process.env.S3_REGION,
accessKeyId: process.env.S3_ACCESS_KEY_ID,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
bucket: `${w.slug}.data.use.jitsu.com`,
bucket: `${w.id}.data.use.jitsu.com`,
compression: "gzip",
format: "ndjson",
folder: "[DATE]",
Expand Down

0 comments on commit 7ab8a7e

Please sign in to comment.