diff --git a/services/rotor/src/lib/message-handler.ts b/services/rotor/src/lib/message-handler.ts index cc27d34a0..3f5bf7f58 100644 --- a/services/rotor/src/lib/message-handler.ts +++ b/services/rotor/src/lib/message-handler.ts @@ -19,6 +19,7 @@ import { import { redisLogger } from "./redis-logger"; import { buildFunctionChain, checkError, runChain } from "./functions-chain"; import { redis } from "./redis"; +import { EnrichedConnectionConfig } from "./config-types"; export const log = getLog("rotor"); const anonymousEventsStore = mongoAnonymousEventsStore(); @@ -44,7 +45,10 @@ export async function rotorMessageHandler( const message = (typeof _message === "string" ? JSON.parse(_message) : _message) as IngestMessage; const connectionId = headers && headers[CONNECTION_IDS_HEADER] ? headers[CONNECTION_IDS_HEADER].toString() : message.connectionId; - const connection = requireDefined(pgStore.getEnrichedConnection(connectionId), `Unknown connection: ${connectionId}`); + const connection: EnrichedConnectionConfig = requireDefined( + pgStore.getEnrichedConnection(connectionId), + `Unknown connection: ${connectionId}` + ); log .atDebug() diff --git a/webapps/console/pages/api/admin/export/[name]/index.ts b/webapps/console/pages/api/admin/export/[name]/index.ts index 8fcc26b14..674fae08b 100644 --- a/webapps/console/pages/api/admin/export/[name]/index.ts +++ b/webapps/console/pages/api/admin/export/[name]/index.ts @@ -3,10 +3,10 @@ import { db } from "../../../../../lib/server/db"; import { getErrorMessage, getLog, requireDefined, rpc } from "juava"; import { z } from "zod"; import { getCoreDestinationTypeNonStrict } from "../../../../../lib/schema/destinations"; -import pick from "lodash/pick"; import { createJwt, getEeConnection, isEEAvailable } from "../../../../../lib/server/ee"; import omit from "lodash/omit"; import { NextApiRequest } from "next"; +import hash from "object-hash"; interface Writer { write(data: string): void; @@ -38,7 +38,7 @@ async function getLastUpdated(): Promise { const exports: Export[] = [ { - name: "bulker", + name: "bulker-connections", lastModified: getLastUpdated, data: async writer => { writer.write("["); @@ -47,7 +47,7 @@ const exports: Export[] = [ let needComma = false; while (true) { const objects = await db.prisma().configurationObjectLink.findMany({ - where: { deleted: false }, + where: { deleted: false, workspace: { deleted: false }, from: { deleted: false }, to: { deleted: false } }, include: { from: true, to: true, workspace: true }, take: batchSize, cursor: lastId ? { id: lastId } : undefined, @@ -60,7 +60,6 @@ const exports: Export[] = [ lastId = objects[objects.length - 1].id; for (const { data, from, id, to, updatedAt, workspace } of objects) { const destinationType = to.config.destinationType; - console.log(to.config); if (getCoreDestinationTypeNonStrict(destinationType)?.usesBulker) { if (needComma) { writer.write(","); @@ -71,14 +70,10 @@ const exports: Export[] = [ workspace: { id: workspace.id, name: workspace.slug }, }, id: id, - type: destinationType, - options: - from.config.type === "stream" - ? pick(data, "mode", "frequency", "primaryKey", "timestampColumn") - : undefined, - updatedAt: dateMax(updatedAt, from.updatedAt, to.updatedAt), - credentials: to.config, + options: data, + updatedAt: dateMax(updatedAt, to.updatedAt), + credentials: omit(to.config, "destinationType", "type", "name"), }) ); needComma = true; @@ -115,6 +110,111 @@ const exports: Export[] = [ writer.write("]"); }, }, + { + name: "rotor-connections", + lastModified: getLastUpdated, + data: async writer => { + writer.write("["); + + let lastId: string | undefined = undefined; + let needComma = false; + while (true) { + const objects = await db.prisma().configurationObjectLink.findMany({ + where: { + deleted: false, + NOT: { type: "sync" }, + workspace: { deleted: false }, + from: { deleted: false }, + to: { deleted: false }, + }, + include: { from: true, to: true, workspace: true }, + take: batchSize, + cursor: lastId ? { id: lastId } : undefined, + orderBy: { id: "asc" }, + }); + if (objects.length == 0) { + break; + } + getLog().atDebug().log(`Got batch of ${objects.length} objects for bulker export`); + lastId = objects[objects.length - 1].id; + for (const { data, from, id, to, updatedAt, workspace } of objects) { + const destinationType = to.config.destinationType; + const coreDestinationType = getCoreDestinationTypeNonStrict(destinationType); + if (!coreDestinationType) { + getLog().atError().log(`Unknown destination type: ${destinationType} for connection ${id}`); + } + if (needComma) { + writer.write(","); + } + writer.write( + JSON.stringify({ + __debug: { + workspace: { id: workspace.id, name: workspace.slug }, + }, + id: id, + type: destinationType, + workspaceId: workspace.id, + streamId: from.id, + destinationId: to.id, + usesBulker: !!coreDestinationType?.usesBulker, + options: data, + updatedAt: dateMax(updatedAt, to.updatedAt), + credentials: omit(to.config, "destinationType", "type", "name"), + }) + ); + needComma = true; + } + if (objects.length < batchSize) { + break; + } + } + writer.write("]"); + }, + }, + { + name: "functions", + lastModified: getLastUpdated, + data: async writer => { + writer.write("["); + + let lastId: string | undefined = undefined; + let needComma = false; + while (true) { + const objects = await db.prisma().configurationObject.findMany({ + where: { + deleted: false, + type: "function", + workspace: { deleted: false }, + }, + take: batchSize, + cursor: lastId ? { id: lastId } : undefined, + orderBy: { id: "asc" }, + }); + if (objects.length == 0) { + break; + } + getLog().atDebug().log(`Got batch of ${objects.length} objects for bulker export`); + lastId = objects[objects.length - 1].id; + for (const row of objects) { + if (needComma) { + writer.write(","); + } + writer.write( + JSON.stringify({ + ...omit(row, "deleted", "config"), + ...row.config, + codeHash: hash(row.config?.code), + }) + ); + needComma = true; + } + if (objects.length < batchSize) { + break; + } + } + writer.write("]"); + }, + }, { name: "streams-with-destinations", lastModified: getLastUpdated, @@ -124,7 +224,7 @@ const exports: Export[] = [ let needComma = false; while (true) { const objects = await db.prisma().configurationObject.findMany({ - where: { deleted: false, type: "stream" }, + where: { deleted: false, type: "stream", workspace: { deleted: false } }, include: { toLinks: { include: { to: true } }, workspace: true }, take: batchSize, cursor: lastId ? { id: lastId } : undefined, @@ -159,6 +259,7 @@ const exports: Export[] = [ ), ...obj.config, }, + backupEnabled: !(obj.workspace.featuresEnabled || []).includes("nobackup"), destinations: obj.toLinks .filter(l => !l.deleted && l.type === "push") .map(l => ({ @@ -205,12 +306,18 @@ export function getIfModifiedSince(req: NextApiRequest): Date | undefined { export const ExportQueryParams = z.object({ name: z.string(), listen: z.string().optional(), - timeoutMs: z.number().optional().default(10_000), + timeoutMs: z.coerce.number().optional().default(10_000), dateOnly: z.coerce.boolean().optional().default(false), }); export function notModified(ifModifiedSince: Date | undefined, lastModified: Date | undefined) { - return ifModifiedSince && lastModified && ifModifiedSince.getTime() >= lastModified.getTime(); + if (!ifModifiedSince || !lastModified) { + return false; + } + const lastModifiedCopy = new Date(lastModified.getTime()); + // Last-Modified and If-Modified-Since headers are not precise enough, so we need to round it to seconds + lastModifiedCopy.setMilliseconds(0); + return ifModifiedSince.getTime() >= lastModifiedCopy.getTime(); } export default createRoute() diff --git a/webapps/console/scripts/password-hash.ts b/webapps/console/scripts/password-hash.ts index 592b13da2..858cbe35b 100644 --- a/webapps/console/scripts/password-hash.ts +++ b/webapps/console/scripts/password-hash.ts @@ -17,7 +17,7 @@ async function main(): Promise { process.env.GLOBAL_HASH_SECRET ? "custom GLOBAL_HASH_SECRET" : "default hash secret" }` ); - log.atInfo().log(`Hashing ${secret} → ${createHash(args._[0])}`); + log.atInfo().log(`Hashing ${secret} → ${createHash(secret)}`); } main(); diff --git a/webapps/ee-api/pages/api/s3-connections.ts b/webapps/ee-api/pages/api/s3-connections.ts index cd21f1277..8ecce5bc4 100644 --- a/webapps/ee-api/pages/api/s3-connections.ts +++ b/webapps/ee-api/pages/api/s3-connections.ts @@ -33,6 +33,7 @@ const handler = async function handler(req: NextApiRequest, res: NextApiResponse }, id: `${w.id}_backup`, type: "s3", + special: "backup", options: { dataLayout: "passthrough", deduplicate: false, @@ -43,7 +44,7 @@ const handler = async function handler(req: NextApiRequest, res: NextApiResponse region: process.env.S3_REGION, accessKeyId: process.env.S3_ACCESS_KEY_ID, secretAccessKey: process.env.S3_SECRET_ACCESS_KEY, - bucket: `${w.slug}.data.use.jitsu.com`, + bucket: `${w.id}.data.use.jitsu.com`, compression: "gzip", format: "ndjson", folder: "[DATE]",