From 5eca7e1ab320cf662b4fcff5734ff2621f69c4fe Mon Sep 17 00:00:00 2001 From: Tobias Herber <22559657+herber@users.noreply.github.com> Date: Sun, 28 Dec 2025 19:03:07 +0100 Subject: [PATCH] Persist bundles --- service/docker-compose.dev.yml | 2 ++ service/prisma/schema.prisma | 30 +++++++++++++++++ service/src/env.ts | 3 +- service/src/id.ts | 3 +- service/src/queues/build.ts | 60 +++++++++++++++++++++++++++++++++- service/src/queues/cleanup.ts | 10 +++++- service/src/storage.ts | 18 ++++++++++ 7 files changed, 122 insertions(+), 4 deletions(-) diff --git a/service/docker-compose.dev.yml b/service/docker-compose.dev.yml index 9100a6c..9b2fb72 100644 --- a/service/docker-compose.dev.yml +++ b/service/docker-compose.dev.yml @@ -89,6 +89,8 @@ services: DEFAULT_PROVIDER: ${DEFAULT_PROVIDER} OBJECT_STORAGE_URL: http://object-storage:52010 + BUNDLE_BUCKET_NAME: bundles + FORGE_API_URL: http://forge:52020/metorial-forge LAMBDA_AWS_REGION: ${LAMBDA_AWS_REGION:-} diff --git a/service/prisma/schema.prisma b/service/prisma/schema.prisma index f55676a..bdec348 100644 --- a/service/prisma/schema.prisma +++ b/service/prisma/schema.prisma @@ -106,6 +106,7 @@ model Function { functionDeployments FunctionDeployment[] runtime Runtime? @relation(fields: [runtimeOid], references: [oid]) runtimeOid BigInt? + functionBundles FunctionBundle[] @@unique([identifier, instanceOid]) } @@ -131,6 +132,9 @@ model FunctionDeployment { functionVersionOid BigInt? functionVersion FunctionVersion? @relation(fields: [functionVersionOid], references: [oid]) + functionBundleOid BigInt? + functionBundle FunctionBundle? @relation(fields: [functionBundleOid], references: [oid]) + runtimeOid BigInt runtime Runtime @relation(fields: [runtimeOid], references: [oid]) @@ -202,6 +206,9 @@ model FunctionVersion { runtimeOid BigInt runtime Runtime @relation(fields: [runtimeOid], references: [oid]) + functionBundleOid BigInt + functionBundle FunctionBundle @relation(fields: [functionBundleOid], references: [oid]) + /// [FunctionConfiguration] configuration Json @@ -243,3 +250,26 @@ model FunctionInvocation { createdAt DateTime @default(now()) } + +enum FunctionBundleStatus { + uploading + available + failed +} + +model FunctionBundle { + oid BigInt @id + id String @unique + status FunctionBundleStatus + + functionOid BigInt + function Function @relation(fields: [functionOid], references: [oid]) + + storageKey String? + bucket String? + + createdAt DateTime @default(now()) + + functionDeployments FunctionDeployment[] + functionVersions FunctionVersion[] +} diff --git a/service/src/env.ts b/service/src/env.ts index 4e7127f..b76a140 100644 --- a/service/src/env.ts +++ b/service/src/env.ts @@ -8,7 +8,8 @@ export let env = createValidatedEnv({ }, storage: { - OBJECT_STORAGE_URL: v.string() + OBJECT_STORAGE_URL: v.string(), + BUNDLE_BUCKET_NAME: v.string() }, encryption: { diff --git a/service/src/id.ts b/service/src/id.ts index 4d501be..77d55f7 100644 --- a/service/src/id.ts +++ b/service/src/id.ts @@ -11,7 +11,8 @@ export let ID = createIdGenerator({ functionVersion: idType.sorted('bfv_'), functionDeployment: idType.sorted('bfd_'), functionDeploymentStep: idType.sorted('bfds_'), - functionInvocation: idType.sorted('bfi_') + functionInvocation: idType.sorted('bfi_'), + functionBundle: idType.sorted('bfb_') }); let workerIdBits = 12; diff --git a/service/src/queues/build.ts b/service/src/queues/build.ts index 3d33bc0..64a9fab 100644 --- a/service/src/queues/build.ts +++ b/service/src/queues/build.ts @@ -2,6 +2,7 @@ import { functionBayRuntimeConfig, type FunctionBayRuntimeConfig } from '@functi import { generatePlainId } from '@lowerdeck/id'; import { combineQueueProcessors, createQueue, QueueRetryError } from '@lowerdeck/queue'; import { v } from '@lowerdeck/validation'; +import { Readable } from 'stream'; import { db } from '../db'; import { encryption } from '../encryption'; import { env } from '../env'; @@ -15,6 +16,7 @@ import { OUTPUT_ARTIFACT_NAME, OUTPUT_ZIP_PATH } from '../providers/const'; +import { storage } from '../storage'; export let startBuildQueue = createQueue<{ deploymentId: string; @@ -277,6 +279,15 @@ let deployToFunctionBayQueueProcessor = deployToFunctionBayQueue.process(async d }); if (!deployment) throw new QueueRetryError(); + let bundle = await db.functionBundle.create({ + data: { + oid: snowflake.nextId(), + status: 'uploading', + id: await ID.generateId('functionBundle'), + functionOid: deployment.functionOid + } + }); + let version = await db.functionVersion.create({ data: { oid: snowflake.nextId(), @@ -288,6 +299,7 @@ let deployToFunctionBayQueueProcessor = deployToFunctionBayQueue.process(async d functionOid: deployment.functionOid, runtimeOid: deployment.runtimeOid, + functionBundleOid: bundle.oid, configuration: deployment.configuration, providerData: data.providerData, @@ -301,6 +313,12 @@ let deployToFunctionBayQueueProcessor = deployToFunctionBayQueue.process(async d }); await succeededQueue.add({ deploymentId: deployment.id }); + + await uploadBundleQueue.add({ + deploymentId: deployment.id, + bundleId: bundle.id, + outputUrl: data.outputUrl + }); }); let succeededQueue = createQueue<{ @@ -379,6 +397,45 @@ let errorQueueProcessor = errorQueue.process(async data => { await cleanupQueue.add({ deploymentId: deployment.id }, { delay: 60000 }); }); +let uploadBundleQueue = createQueue<{ + deploymentId: string; + bundleId: string; + outputUrl: string; +}>({ + name: 'fbay/build/upbndl', + redisUrl: env.service.REDIS_URL +}); + +let uploadBundleQueueProcessor = uploadBundleQueue.process(async data => { + let storageKey = `bundles/${data.bundleId}.zip`; + let bucket = env.storage.BUNDLE_BUCKET_NAME; + + try { + await storage.putObject( + bucket, + storageKey, + await fetch(data.outputUrl).then(res => Readable.fromWeb(res.body!) as any), + 'application/zip' + ); + + await db.functionBundle.updateMany({ + where: { id: data.bundleId }, + data: { + status: 'available', + storageKey, + bucket + } + }); + } catch (err) { + await db.functionBundle.updateMany({ + where: { id: data.bundleId }, + data: { status: 'failed' } + }); + + throw err; // Throw to retry, but we mark the bundle as failed already + } +}); + let cleanupQueue = createQueue<{ deploymentId: string; }>({ @@ -401,5 +458,6 @@ export let buildProcessors = combineQueueProcessors([ succeededQueueProcessor, errorQueueProcessor, cleanupQueueProcessor, - deployToFunctionBayQueueProcessor + deployToFunctionBayQueueProcessor, + uploadBundleQueueProcessor ]); diff --git a/service/src/queues/cleanup.ts b/service/src/queues/cleanup.ts index 009695e..b672f9b 100644 --- a/service/src/queues/cleanup.ts +++ b/service/src/queues/cleanup.ts @@ -1,5 +1,5 @@ import { createCron } from '@lowerdeck/cron'; -import { subDays } from 'date-fns'; +import { subDays, subHours } from 'date-fns'; import { db } from '../db'; import { env } from '../env'; @@ -11,9 +11,17 @@ export let cleanupProcessor = createCron( }, async () => { let threeDaysAgo = subDays(new Date(), 3); + let oneHourAgo = subHours(new Date(), 1); await db.functionInvocation.deleteMany({ where: { createdAt: { lt: threeDaysAgo } } }); + + await db.functionBundle.deleteMany({ + where: { + status: 'uploading', + createdAt: { lt: oneHourAgo } + } + }); } ); diff --git a/service/src/storage.ts b/service/src/storage.ts index 796721f..483476a 100644 --- a/service/src/storage.ts +++ b/service/src/storage.ts @@ -1,4 +1,22 @@ +import { delay } from '@lowerdeck/delay'; import { ObjectStorageClient } from 'object-storage-client'; import { env } from './env'; export let storage = new ObjectStorageClient(env.storage.OBJECT_STORAGE_URL); + +let initBuckets = async () => { + await storage.upsertBucket(env.storage.BUNDLE_BUCKET_NAME); +}; + +(async () => { + while (true) { + try { + await initBuckets(); + return; + } catch (err) { + console.error('Error initializing storage buckets, retrying...'); + } + + await delay(5000); + } +})();