Skip to content

Commit

Permalink
feat: Pipeline API (#108)
Browse files Browse the repository at this point in the history
* Removed packageAfter flag

* Added pipeline queue

* Find asset instead of URL head request

* Clean s3 script

* Added debugger

* Added watch mode
  • Loading branch information
matvp91 authored Nov 12, 2024
1 parent 4aaed05 commit a57db09
Show file tree
Hide file tree
Showing 25 changed files with 474 additions and 245 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ dist-ssr

# Editor directories and files
.vscode/*
!.vscode/settings.json
!.vscode/extensions.json
!.vscode/launch.json
.idea
.DS_Store
*.suo
Expand Down
17 changes: 17 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "bun",
"request": "attach",
"name": "Debug API",
"url": "ws://localhost:6499/sprs-api",
},
{
"type": "bun",
"request": "attach",
"name": "Debug Stitcher",
"url": "ws://localhost:6499/sprs-stitcher",
}
]
}
5 changes: 4 additions & 1 deletion packages/api/scripts/dev.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { $ } from "bun";

await Promise.all([$`tsup`, $`bun --watch ./src/index.ts`]);
await Promise.all([
$`bun run tsup --watch`,
$`bun --watch --inspect=ws://localhost:6499/sprs-api ./src/index.ts`,
]);
24 changes: 24 additions & 0 deletions packages/api/src/repositories/assets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,27 @@ export async function createPlayable(fields: PlayableInsert) {
.values(fields)
.executeTakeFirstOrThrow();
}

export async function getAsset(id: string) {
const asset = await db
.selectFrom("assets")
.leftJoin("playables", "playables.assetId", "assets.id")
.select(({ fn }) => [
"assets.id",
"assets.groupId",
"assets.createdAt",
fn.count<number>("playables.assetId").as("playables"),
])
.groupBy("assets.id")
.where("assets.id", "=", id)
.executeTakeFirst();

if (!asset) {
return null;
}

return {
...asset,
name: asset.id,
};
}
22 changes: 14 additions & 8 deletions packages/api/src/repositories/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
flowProducer,
outcomeQueue,
packageQueue,
pipelineQueue,
transcodeQueue,
} from "bolt";
import { Job as RawJob } from "bullmq";
Expand All @@ -12,6 +13,7 @@ import type { Job } from "../types";
import type { JobNode, JobState, Queue } from "bullmq";

const allQueus = [
pipelineQueue,
transcodeQueue,
packageQueue,
ffmpegQueue,
Expand Down Expand Up @@ -157,15 +159,18 @@ async function formatJobNode(node: JobNode): Promise<Job> {

const failedReason = state === "failed" ? job.failedReason : undefined;

const findParentSortIndex = (job: RawJob): number => {
const value = job.data?.parentSortIndex;
return typeof value === "number" ? value : 0;
};
(children ?? []).sort(
(a, b) => findParentSortIndex(a.job) - findParentSortIndex(b.job),
);
const jobChildren: Job[] = [];
if (children) {
children.sort((a, b) => a.job.timestamp - b.job.timestamp);

const jobChildren = await Promise.all((children ?? []).map(formatJobNode));
for (const child of children) {
if (!child) {
// Jobs can be auto removed. Skip them.
continue;
}
jobChildren.push(await formatJobNode(child));
}
}

let processedAt = job.processedOn;
if (processedAt) {
Expand Down Expand Up @@ -202,6 +207,7 @@ async function formatJobNode(node: JobNode): Promise<Job> {
inputData: JSON.stringify(job.data),
outputData: job.returnvalue ? JSON.stringify(job.returnvalue) : undefined,
failedReason,
stacktrace: job.stacktrace,
children: jobChildren,
};
}
Expand Down
25 changes: 25 additions & 0 deletions packages/api/src/routes/assets.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Elysia, t } from "elysia";
import { auth } from "../auth";
import { DeliberateError } from "../errors";
import {
assetsFilterSchema,
getAsset,
getAssets,
getGroups,
} from "../repositories/assets";
Expand Down Expand Up @@ -29,6 +31,29 @@ export const assets = new Elysia()
},
},
)
.get(
"/assets/:id",
async ({ params }) => {
const asset = await getAsset(params.id);
if (!asset) {
throw new DeliberateError({ type: "ERR_NOT_FOUND" });
}
return asset;
},
{
params: t.Object({
id: t.String(),
}),
detail: {
summary: "Get an asset by id",
tags: ["Assets"],
},
response: {
200: AssetSchema,
400: t.Never(),
},
},
)
.get(
"/groups",
async () => {
Expand Down
189 changes: 90 additions & 99 deletions packages/api/src/routes/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
DEFAULT_PACKAGE_NAME,
DEFAULT_SEGMENT_SIZE,
packageQueue,
pipelineQueue,
transcodeQueue,
} from "bolt";
import { AudioCodec, VideoCodec } from "bolt";
Expand All @@ -13,113 +14,119 @@ import { DeliberateError } from "../errors";
import { getJob, getJobLogs, getJobs } from "../repositories/jobs";
import { JobSchema } from "../types";

const InputSchema = t.Union([
t.Object({
type: t.Literal("video"),
path: t.String({
description: "The source path, starting with http(s):// or s3://",
}),
height: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("audio"),
path: t.String({
description: "The source path, starting with http(s):// or s3://",
}),
language: t.Optional(t.String()),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
path: t.String({
description: "The source path, starting with http(s):// or s3://",
}),
language: t.String(),
}),
]);

const StreamSchema = t.Union([
t.Object({
type: t.Literal("video"),
codec: t.Enum(VideoCodec),
height: t.Number(),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
framerate: t.Optional(t.Number({ description: "Frames per second" })),
}),
t.Object({
type: t.Literal("audio"),
codec: t.Enum(AudioCodec),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
language: t.Optional(t.String()),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
language: t.String(),
}),
]);

export const jobs = new Elysia()
.use(auth({ user: true, service: true }))
.post(
"/transcode",
"/pipeline",
async ({ body }) => {
const data = {
assetId: randomUUID(),
segmentSize: DEFAULT_SEGMENT_SIZE,
name: DEFAULT_PACKAGE_NAME,
...body,
};
const jobId = await addToQueue(transcodeQueue, data, {
const jobId = await addToQueue(pipelineQueue, data, {
id: data.assetId,
});
return { jobId };
},
{
detail: {
summary: "Create transcode job",
summary: "Create pipeline job",
tags: ["Jobs"],
},
body: t.Object({
inputs: t.Array(
t.Union([
t.Object({
type: t.Literal("video"),
path: t.String({
description:
"The source path, starting with http(s):// or s3://",
}),
height: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("audio"),
path: t.String({
description:
"The source path, starting with http(s):// or s3://",
}),
language: t.Optional(t.String()),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
path: t.String({
description:
"The source path, starting with http(s):// or s3://",
}),
language: t.String(),
}),
]),
{
description:
"Source input types. Can refer to the same file, eg: when an mp4 contains " +
"both audio and video, the same source can be added for both video and audio as type.",
},
),
streams: t.Array(
t.Union([
t.Object({
type: t.Literal("video"),
codec: t.Enum(VideoCodec),
height: t.Number(),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
framerate: t.Optional(
t.Number({ description: "Frames per second" }),
),
}),
t.Object({
type: t.Literal("audio"),
codec: t.Enum(AudioCodec),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
language: t.Optional(t.String()),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
language: t.String(),
}),
]),
{
description:
"Output types, the transcoder will match any given input and figure out if a particular output can be generated.",
},
),
segmentSize: t.Optional(
t.Number({
description: "In seconds, will result in proper GOP sizes.",
}),
),
inputs: t.Array(InputSchema),
streams: t.Array(StreamSchema),
assetId: t.Optional(
t.String({
format: "uuid",
description:
"Only provide if you wish to re-transcode an existing asset. When not provided, a unique UUID is created.",
}),
),
packageAfter: t.Optional(
t.Boolean({
description:
"Starts a default package job after a succesful transcode.",
}),
),
group: t.Optional(
group: t.Optional(t.String()),
language: t.Optional(t.String()),
}),
response: {
200: t.Object({
jobId: t.String(),
}),
},
},
)
.post(
"/transcode",
async ({ body }) => {
const data = {
assetId: randomUUID(),
segmentSize: DEFAULT_SEGMENT_SIZE,
...body,
};
const jobId = await addToQueue(transcodeQueue, data, {
id: data.assetId,
});
return { jobId };
},
{
detail: {
summary: "Create transcode job",
tags: ["Jobs"],
},
body: t.Object({
inputs: t.Array(InputSchema),
streams: t.Array(StreamSchema),
assetId: t.Optional(
t.String({
description:
'Groups the asset with an arbitrary value, such as "ad"',
format: "uuid",
}),
),
segmentSize: t.Optional(t.Number()),
group: t.Optional(t.String()),
}),
response: {
200: t.Object({
Expand Down Expand Up @@ -149,25 +156,9 @@ export const jobs = new Elysia()
assetId: t.String({
format: "uuid",
}),
name: t.Optional(t.String()),
segmentSize: t.Optional(t.Number()),
language: t.Optional(t.String()),
segmentSize: t.Optional(
t.Number({
description:
"In seconds, shall be the same or a multiple of the originally transcoded segment size.",
}),
),
tag: t.Optional(
t.String({
description:
'Tag a job for a particular purpose, such as "ad". Arbitrary value.',
}),
),
name: t.Optional(
t.String({
description:
'When provided, the package result will be stored under this name in S3. Mainly used to create multiple packaged results for a transcode result. We\'ll use "hls" when not provided.',
}),
),
}),
response: {
200: t.Object({
Expand Down
Loading

0 comments on commit a57db09

Please sign in to comment.