Skip to content

Commit

Permalink
Add task options
Browse files Browse the repository at this point in the history
  • Loading branch information
siiptuo committed Oct 3, 2024
1 parent abc39bf commit 10f8bd8
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 28 deletions.
5 changes: 4 additions & 1 deletion backend/src/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function isTaskStatus(x: any): x is TaskStatus {
@Entity()
// Added "NULLS NOT DISTINCT" manually to the migration, see
// https://github.com/typeorm/typeorm/issues/9827
@Unique(["type", "site", "measurementDate", "product", "instrumentInfo", "model"])
@Unique(["type", "site", "measurementDate", "product", "instrumentInfo", "model", "options"])
export class Task {
@PrimaryGeneratedColumn()
id!: number;
Expand Down Expand Up @@ -84,4 +84,7 @@ export class Task {

@Column("text", { nullable: true })
batchId!: string | null;

@Column({ type: "jsonb", nullable: true })
options!: any | null;
}
41 changes: 36 additions & 5 deletions backend/src/lib/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DataSource, FindOptionsWhere, In, Not, Repository, LessThan, MoreThanOrEqual } from "typeorm";
import { Task, TaskStatus } from "../entity/Task";
import { DataSource, In, Repository, LessThan } from "typeorm";
import { Task, TaskStatus, TaskType } from "../entity/Task";

export class QueueService {
private taskRepo: Repository<Task>;
Expand All @@ -11,7 +11,8 @@ export class QueueService {
}

async publish(task: Task) {
await this.publishSql("VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", [
task.options = this.validateTaskOptions(task.type, task.options);
await this.publishSql("VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)", [
task.type,
task.site ? task.site.id : task.siteId,
task.measurementDate,
Expand All @@ -22,6 +23,7 @@ export class QueueService {
task.priority,
task.scheduledAt.toISOString(),
task.batchId || null,
task.options,
]);
}

Expand All @@ -37,7 +39,8 @@ export class QueueService {
"status",
"priority",
"scheduledAt",
"batchId"
"batchId",
"options"
)
${valuesSql}
ON CONFLICT (
Expand All @@ -46,7 +49,8 @@ export class QueueService {
"measurementDate",
"productId",
"instrumentInfoUuid",
"modelId"
"modelId",
"options"
)
DO UPDATE SET
priority = LEAST(task.priority, EXCLUDED.priority),
Expand Down Expand Up @@ -203,4 +207,31 @@ export class QueueService {
const limit = new Date(time.getTime() - 60 * 60 * 1000);
await this.taskRepo.delete({ status: TaskStatus.DONE, doneAt: LessThan(limit) });
}

validateTaskOptions(type: TaskType, options: any): any {
if (type != TaskType.PROCESS) {
if (options != null) {
throw new Error(`Options are not supported for ${type}`);
}
return;
}
if (options != null && options.constructor !== Object) {
throw new Error(`Options should be null or object`);
}
const output = options || {};
const allowedKeys = ["derivedProducts"];
for (const key in output) {
if (!allowedKeys.includes(key)) {
throw new Error(`Unknown key ${key}`);
}
}
if (typeof output.derivedProducts !== "undefined") {
if (typeof output.derivedProducts !== "boolean") {
throw new Error("derivedProducts should be boolean");
}
} else {
output.derivedProducts = true;
}
return output;
}
}
21 changes: 21 additions & 0 deletions backend/src/migration/1727950103203-AddTaskOptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class AddTaskOptions1727950103203 implements MigrationInterface {
name = "AddTaskOptions1727950103203";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "task" DROP CONSTRAINT "UQ_26f9b0a2a92830a8706c5817204"`);
await queryRunner.query(`ALTER TABLE "task" ADD "options" jsonb`);
await queryRunner.query(
`ALTER TABLE "task" ADD CONSTRAINT "UQ_ad0af11c960ed4273cec74078c0" UNIQUE NULLS NOT DISTINCT ("type", "siteId", "measurementDate", "productId", "instrumentInfoUuid", "modelId", "options")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "task" DROP CONSTRAINT "UQ_ad0af11c960ed4273cec74078c0"`);
await queryRunner.query(`ALTER TABLE "task" DROP COLUMN "options"`);
await queryRunner.query(
`ALTER TABLE "task" ADD CONSTRAINT "UQ_26f9b0a2a92830a8706c5817204" UNIQUE NULLS NOT DISTINCT ("type", "siteId", "measurementDate", "productId", "instrumentInfoUuid", "modelId")`,
);
}
}
31 changes: 17 additions & 14 deletions backend/src/routes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export class QueueRoutes {
}
}

searchParams.options = this.queueService.validateTaskOptions(searchParams.type, searchParams.options);
const batchId = randomName();
const batches = [];
batches.push(this.submitInstrumentBatch(searchParams, batchId));
Expand Down Expand Up @@ -97,6 +98,7 @@ export class QueueRoutes {
}
task.scheduledAt = "scheduledAt" in body ? new Date(body.scheduledAt) : new Date();
task.priority = "priority" in body ? body.priority : 50;
task.options = body.options;

await this.queueService.publish(task);
res.send(task);
Expand Down Expand Up @@ -209,18 +211,18 @@ export class QueueRoutes {

/// Submit batch for non-instrument products. Tasks are created only for days
/// that contain instrument uploads.
private async submitProductBatch(filters: Record<string, any>, batchId: string, productId: string) {
private async submitProductBatch(searchParams: Record<string, any>, batchId: string, productId: string) {
const where: string[] = [];
const parameters = [productId];
return this.batchQuery(filters, where, parameters, {
return this.batchQuery(searchParams, where, parameters, {
table: "instrument_upload",
batchId,
productId: `$${parameters.length}::text`,
});
}

private async batchQuery(
filters: Record<string, any>,
searchParams: Record<string, any>,
where: string[],
parameters: string[],
options: {
Expand All @@ -232,21 +234,21 @@ export class QueueRoutes {
join?: string;
},
) {
if (filters.siteIds) {
if (searchParams.siteIds) {
where.push(`upload."siteId" = ANY ($${parameters.length + 1})`);
parameters.push(filters.siteIds);
parameters.push(searchParams.siteIds);
}
if (filters.date) {
if (searchParams.date) {
where.push(`upload."measurementDate" = $${parameters.length + 1}`);
parameters.push(filters.date);
parameters.push(searchParams.date);
}
if (filters.dateFrom) {
if (searchParams.dateFrom) {
where.push(`upload."measurementDate" >= $${parameters.length + 1}`);
parameters.push(filters.dateFrom);
parameters.push(searchParams.dateFrom);
}
if (filters.dateTo) {
if (searchParams.dateTo) {
where.push(`upload."measurementDate" <= $${parameters.length + 1}`);
parameters.push(filters.dateTo);
parameters.push(searchParams.dateTo);
}
const columns = [
`$${parameters.length + 1}::task_type_enum`, // type
Expand All @@ -259,17 +261,18 @@ export class QueueRoutes {
`50`, // priority
`now() AT TIME ZONE 'utc'`, // scheduledAt
`$${parameters.length + 2}::text`, // batchId
`$${parameters.length + 3}::jsonb`, // options
].join(", ");
const select = filters.dryRun ? `COUNT(DISTINCT (${columns})) AS "taskCount"` : `DISTINCT ${columns}`;
const select = searchParams.dryRun ? `COUNT(DISTINCT (${columns})) AS "taskCount"` : `DISTINCT ${columns}`;
let query = `SELECT ${select} FROM ${options.table} upload`;
if (options.join) {
query += ` ${options.join}`;
}
if (where.length > 0) {
query += " WHERE " + where.join(" AND ");
}
parameters.push(filters.type, options.batchId);
if (filters.dryRun) {
parameters.push(searchParams.type, options.batchId, searchParams.options);
if (searchParams.dryRun) {
const result = await this.dataSource.query(query, parameters);
return parseInt(result[0].taskCount);
} else {
Expand Down
77 changes: 69 additions & 8 deletions backend/tests/integration/sequential/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import axios from "axios";
import { InstrumentUpload, ModelUpload } from "../../../src/entity/Upload";
import { Permission } from "../../../src/entity/Permission";
import { UserAccount } from "../../../src/entity/UserAccount";
import { execPath } from "process";

let dataSource: DataSource;
let queueService: QueueService;
Expand All @@ -20,7 +21,7 @@ describe("QueueService", () => {
now.setMinutes(now.getMinutes() + minutes);
}

function makeTask(options: {
function makeTask(params: {
type?: TaskType;
siteId: string;
productId: string;
Expand All @@ -29,20 +30,22 @@ describe("QueueService", () => {
priority?: number;
delayMinutes?: number;
batchId?: string;
options?: object;
}) {
const { type = TaskType.PROCESS, priority = 0, delayMinutes = 0 } = options;
const { type = TaskType.PROCESS, priority = 0, delayMinutes = 0 } = params;
const task = new Task();
task.type = type;
task.status = TaskStatus.CREATED;
task.siteId = options.siteId;
task.productId = options.productId;
if (options.instrumentInfoUuid) {
task.instrumentInfoUuid = options.instrumentInfoUuid;
task.siteId = params.siteId;
task.productId = params.productId;
if (params.instrumentInfoUuid) {
task.instrumentInfoUuid = params.instrumentInfoUuid;
}
task.measurementDate = new Date(options.measurementDate);
task.measurementDate = new Date(params.measurementDate);
task.scheduledAt = new Date(now.getTime() + delayMinutes * 60 * 1000);
task.priority = priority;
task.batchId = options.batchId || null;
task.batchId = params.batchId || null;
task.options = params.options;
return task;
}

Expand Down Expand Up @@ -514,6 +517,43 @@ describe("QueueService", () => {
await queueService.cleanOldTasks(now);
expect(await queueService.count()).toBe(0);
});

it("defaults to derivedProducts: true", async () => {
await queueService.publish(
makeTask({
siteId: "hyytiala",
productId: "lidar",
instrumentInfoUuid: "c43e9f54-c94d-45f7-8596-223b1c2b14c0",
measurementDate: "2024-01-10",
}),
);
expect(await queueService.count()).toBe(1);
const taskRes = await queueService.receive({ now });
expect(taskRes).not.toBeNull();
expect(taskRes!.options).toEqual({ derivedProducts: true });
});

it("published two tasks with different options", async () => {
await queueService.publish(
makeTask({
siteId: "hyytiala",
productId: "lidar",
instrumentInfoUuid: "c43e9f54-c94d-45f7-8596-223b1c2b14c0",
measurementDate: "2024-01-10",
options: { derivedProducts: true },
}),
);
await queueService.publish(
makeTask({
siteId: "hyytiala",
productId: "lidar",
instrumentInfoUuid: "c43e9f54-c94d-45f7-8596-223b1c2b14c0",
measurementDate: "2024-01-10",
options: { derivedProducts: false },
}),
);
expect(await queueService.count()).toBe(2);
});
});

describe("/api/queue/batch", () => {
Expand Down Expand Up @@ -587,6 +627,7 @@ describe("/api/queue/batch", () => {
siteId: "bucharest",
productId: "lidar",
instrumentInfoUuid: "c43e9f54-c94d-45f7-8596-223b1c2b14c0",
options: { derivedProducts: true },
}),
).toBeTruthy();
});
Expand All @@ -600,6 +641,7 @@ describe("/api/queue/batch", () => {
siteId: "granada",
productId: "radar",
instrumentInfoUuid: "9e0f4b27-d5f3-40ad-8b73-2ae5dabbf81f",
options: { derivedProducts: true },
}),
).toBeTruthy();
expect(
Expand All @@ -608,6 +650,7 @@ describe("/api/queue/batch", () => {
siteId: "bucharest",
productId: "radar",
instrumentInfoUuid: "0b3a7fa0-4812-4964-af23-1162e8b3a665",
options: { derivedProducts: true },
}),
).toBeTruthy();
});
Expand Down Expand Up @@ -703,4 +746,22 @@ describe("/api/queue/batch", () => {
await axios.delete(`${batchUrl}/${res.data.batchId}`, { auth });
expect(await taskRepo.count()).toBe(0);
});

it("creates tasks with options", async () => {
await axios.post(
batchUrl,
{ type: "process", productIds: ["lidar"], dryRun: false, options: { derivedProducts: false } },
{ auth },
);
expect(await taskRepo.count()).toBe(1);
expect(
await taskRepo.existsBy({
measurementDate: new Date("2020-08-12"),
siteId: "bucharest",
productId: "lidar",
instrumentInfoUuid: "c43e9f54-c94d-45f7-8596-223b1c2b14c0",
options: { derivedProducts: false },
}),
).toBeTruthy();
});
});

0 comments on commit 10f8bd8

Please sign in to comment.