Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions service/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ services:
DEFAULT_PROVIDER: ${DEFAULT_PROVIDER}

OBJECT_STORAGE_URL: http://object-storage:52010
BUNDLE_BUCKET_NAME: bundles

FORGE_API_URL: http://forge:52020/metorial-forge

LAMBDA_AWS_REGION: ${LAMBDA_AWS_REGION:-}
Expand Down
30 changes: 30 additions & 0 deletions service/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ model Function {
functionDeployments FunctionDeployment[]
runtime Runtime? @relation(fields: [runtimeOid], references: [oid])
runtimeOid BigInt?
functionBundles FunctionBundle[]

@@unique([identifier, instanceOid])
}
Expand All @@ -131,6 +132,9 @@ model FunctionDeployment {
functionVersionOid BigInt?
functionVersion FunctionVersion? @relation(fields: [functionVersionOid], references: [oid])

functionBundleOid BigInt?
functionBundle FunctionBundle? @relation(fields: [functionBundleOid], references: [oid])

runtimeOid BigInt
runtime Runtime @relation(fields: [runtimeOid], references: [oid])

Expand Down Expand Up @@ -202,6 +206,9 @@ model FunctionVersion {
runtimeOid BigInt
runtime Runtime @relation(fields: [runtimeOid], references: [oid])

functionBundleOid BigInt
functionBundle FunctionBundle @relation(fields: [functionBundleOid], references: [oid])

/// [FunctionConfiguration]
configuration Json

Expand Down Expand Up @@ -243,3 +250,26 @@ model FunctionInvocation {

createdAt DateTime @default(now())
}

enum FunctionBundleStatus {
uploading
available
failed
}

model FunctionBundle {
oid BigInt @id
id String @unique
status FunctionBundleStatus

functionOid BigInt
function Function @relation(fields: [functionOid], references: [oid])

storageKey String?
bucket String?

createdAt DateTime @default(now())

functionDeployments FunctionDeployment[]
functionVersions FunctionVersion[]
}
3 changes: 2 additions & 1 deletion service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ export let env = createValidatedEnv({
},

storage: {
OBJECT_STORAGE_URL: v.string()
OBJECT_STORAGE_URL: v.string(),
BUNDLE_BUCKET_NAME: v.string()
},

encryption: {
Expand Down
3 changes: 2 additions & 1 deletion service/src/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export let ID = createIdGenerator({
functionVersion: idType.sorted('bfv_'),
functionDeployment: idType.sorted('bfd_'),
functionDeploymentStep: idType.sorted('bfds_'),
functionInvocation: idType.sorted('bfi_')
functionInvocation: idType.sorted('bfi_'),
functionBundle: idType.sorted('bfb_')
});

let workerIdBits = 12;
Expand Down
60 changes: 59 additions & 1 deletion service/src/queues/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { functionBayRuntimeConfig, type FunctionBayRuntimeConfig } from '@functi
import { generatePlainId } from '@lowerdeck/id';
import { combineQueueProcessors, createQueue, QueueRetryError } from '@lowerdeck/queue';
import { v } from '@lowerdeck/validation';
import { Readable } from 'stream';
import { db } from '../db';
import { encryption } from '../encryption';
import { env } from '../env';
Expand All @@ -15,6 +16,7 @@ import {
OUTPUT_ARTIFACT_NAME,
OUTPUT_ZIP_PATH
} from '../providers/const';
import { storage } from '../storage';

export let startBuildQueue = createQueue<{
deploymentId: string;
Expand Down Expand Up @@ -277,6 +279,15 @@ let deployToFunctionBayQueueProcessor = deployToFunctionBayQueue.process(async d
});
if (!deployment) throw new QueueRetryError();

let bundle = await db.functionBundle.create({
data: {
oid: snowflake.nextId(),
status: 'uploading',
id: await ID.generateId('functionBundle'),
functionOid: deployment.functionOid
}
});

let version = await db.functionVersion.create({
data: {
oid: snowflake.nextId(),
Expand All @@ -288,6 +299,7 @@ let deployToFunctionBayQueueProcessor = deployToFunctionBayQueue.process(async d

functionOid: deployment.functionOid,
runtimeOid: deployment.runtimeOid,
functionBundleOid: bundle.oid,

configuration: deployment.configuration,
providerData: data.providerData,
Expand All @@ -301,6 +313,12 @@ let deployToFunctionBayQueueProcessor = deployToFunctionBayQueue.process(async d
});

await succeededQueue.add({ deploymentId: deployment.id });

await uploadBundleQueue.add({
deploymentId: deployment.id,
bundleId: bundle.id,
outputUrl: data.outputUrl
});
});

let succeededQueue = createQueue<{
Expand Down Expand Up @@ -379,6 +397,45 @@ let errorQueueProcessor = errorQueue.process(async data => {
await cleanupQueue.add({ deploymentId: deployment.id }, { delay: 60000 });
});

let uploadBundleQueue = createQueue<{
deploymentId: string;
bundleId: string;
outputUrl: string;
}>({
name: 'fbay/build/upbndl',
redisUrl: env.service.REDIS_URL
});

let uploadBundleQueueProcessor = uploadBundleQueue.process(async data => {
let storageKey = `bundles/${data.bundleId}.zip`;
let bucket = env.storage.BUNDLE_BUCKET_NAME;

try {
await storage.putObject(
bucket,
storageKey,
await fetch(data.outputUrl).then(res => Readable.fromWeb(res.body!) as any),
'application/zip'
);

await db.functionBundle.updateMany({
where: { id: data.bundleId },
data: {
status: 'available',
storageKey,
bucket
}
});
} catch (err) {
await db.functionBundle.updateMany({
where: { id: data.bundleId },
data: { status: 'failed' }
});

throw err; // Throw to retry, but we mark the bundle as failed already
}
});

let cleanupQueue = createQueue<{
deploymentId: string;
}>({
Expand All @@ -401,5 +458,6 @@ export let buildProcessors = combineQueueProcessors([
succeededQueueProcessor,
errorQueueProcessor,
cleanupQueueProcessor,
deployToFunctionBayQueueProcessor
deployToFunctionBayQueueProcessor,
uploadBundleQueueProcessor
]);
10 changes: 9 additions & 1 deletion service/src/queues/cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createCron } from '@lowerdeck/cron';
import { subDays } from 'date-fns';
import { subDays, subHours } from 'date-fns';
import { db } from '../db';
import { env } from '../env';

Expand All @@ -11,9 +11,17 @@ export let cleanupProcessor = createCron(
},
async () => {
let threeDaysAgo = subDays(new Date(), 3);
let oneHourAgo = subHours(new Date(), 1);

await db.functionInvocation.deleteMany({
where: { createdAt: { lt: threeDaysAgo } }
});

await db.functionBundle.deleteMany({
where: {
status: 'uploading',
createdAt: { lt: oneHourAgo }
}
});
}
);
18 changes: 18 additions & 0 deletions service/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
import { delay } from '@lowerdeck/delay';
import { ObjectStorageClient } from 'object-storage-client';
import { env } from './env';

export let storage = new ObjectStorageClient(env.storage.OBJECT_STORAGE_URL);

let initBuckets = async () => {
await storage.upsertBucket(env.storage.BUNDLE_BUCKET_NAME);
};

(async () => {
while (true) {
try {
await initBuckets();
return;
} catch (err) {
console.error('Error initializing storage buckets, retrying...');
}

await delay(5000);
}
})();