Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harmony 1996: Add 'empty-result' status to WorkItem class and work-items table #683

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
13 changes: 12 additions & 1 deletion db/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ CREATE TABLE `job_errors` (
`jobID` char(36) not null,
`url` varchar(4096) not null,
`message` varchar(4096) not null,
`level` varchar(255) check (
`level` in (
'error',
'warning'
)
) not null default 'error',
`category` varchar(255) not null default 'generic',
`createdAt` datetime not null,
`updatedAt` datetime not null,
FOREIGN KEY(jobID) REFERENCES jobs(jobID)
Expand Down Expand Up @@ -80,7 +87,8 @@ CREATE TABLE `work_items` (
`workflowStepIndex` integer not null,
`scrollID` varchar(4096),
`serviceID` varchar(255) not null,
`status` text check (`status` in ('ready', 'queued', 'running', 'successful', 'failed', 'canceled')) not null,
`status` varchar(255) check (`status` in ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'warning')) not null,
`sub_status` varchar(255) check (`sub_status` in ('no-data')),
`stacCatalogLocation` varchar(255),
`totalItemsSize` double precision not null default 0,
`outputItemSizesJson` text,
Expand Down Expand Up @@ -183,9 +191,12 @@ CREATE INDEX jobs_username_idx ON jobs(jobID, username);
CREATE INDEX job_links_jobID_idx ON job_links(jobID);
CREATE INDEX job_links_jobID_id_idx ON job_links(jobID, id);
CREATE INDEX job_errors_jobID_idx ON job_errors(jobID);
CREATE INDEX job_errors_level_idx ON job_errors(level);
CREATE INDEX job_errors_category_idx ON job_errors(category);
CREATE INDEX work_items_jobID_idx ON work_items(jobID);
CREATE INDEX work_items_serviceID_idx ON work_items(serviceID);
CREATE INDEX work_items_status_idx ON work_items(status);
CREATE INDEX work_items_sub_status_idx ON work_items(sub_status);
CREATE INDEX workflow_steps_jobID_idx ON workflow_steps(jobID);
CREATE INDEX workflow_steps_jobID_StepIndex_idx ON workflow_steps(jobID, stepIndex);
CREATE INDEX workflow_steps_serviceID_idx ON workflow_steps(serviceID);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex, Promise) {
return knex.schema.raw(`
ALTER TABLE "work_items"
DROP CONSTRAINT "work_items_status_check",
Copy link
Collaborator

@vinnyinverso vinnyinverso Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to drop work_items_status_check before adding it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question for the "down" migration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there is already a constraint with that name. We can't update it, we have to drop it then add it.

ADD CONSTRAINT "work_items_status_check"
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'warning')),
ADD COLUMN "sub_status" VARCHAR(255),
ADD CONSTRAINT "work_items_sub_status_check"
CHECK (sub_status IN (null, 'no-data'));
CREATE INDEX work_items_sub_status ON work_items (sub_status)
`);
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.raw(`
ALTER TABLE "work_items"
DROP CONSTRAINT "work_items_sub_status_check",
DROP COLUMN "sub_status"),
DROP CONSTRAINT "work_items_status_check",
ADD CONSTRAINT "work_items_status_check"
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled'))
`);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex, Promise) {
return knex.schema.raw(`
ALTER TABLE "job_errors"
ADD COLUMN "level" VARCHAR(255) DEFAULT 'error' NOT NULL,
ADD CONSTRAINT "job_errors_level_check"
CHECK (level IN ('error', 'warning')),
ADD COLUMN "category" VARCHAR(255) DEFAULT 'generic' NOT NULL;

CREATE INDEX job_errors_level ON job_errors (level);
CREATE INDEX job_errors_category ON job_errors (category)
`);
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.raw(`
DROP INDEX job_errors_category;
DROP_INDEX job_errors_level;
ALTER TABLE "job_errors"
DROP CONSTRAINT "job_errors_category_check",
DROP COLUMN "category",
DROP CONSTRAINT "job_errors_level_check",
DROP COLUMN "level"
`);
};
5 changes: 4 additions & 1 deletion packages/util/tsconfig.build.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
"allowJs": true,
"noImplicitAny": false,
"sourceMap": true,
"outDir": "built"
"outDir": "built",
"typeRoots": [
"node_modules/@types"
]
},
"include": ["."],
"exclude": ["./node_modules", "coverage"]
Expand Down
5 changes: 4 additions & 1 deletion packages/util/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
"allowJs": true,
"noImplicitAny": false,
"sourceMap": true,
"outDir": "built"
"outDir": "built",
"typeRoots": [
"node_modules/@types"
],
},
"include": ["."],
"exclude": ["./node_modules", "coverage"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ export async function preprocessWorkItem(
let outputItemSizes;
let catalogItems;
try {
if (status === WorkItemStatus.SUCCESSFUL && !nextWorkflowStep) {
// TODO fix this in HARMONY-1995
if ([WorkItemStatus.SUCCESSFUL, WorkItemStatus.WARNING].includes(status) && !nextWorkflowStep) {
// if we are CREATING STAC CATALOGS for the last step in the chain we should read the catalog items
// since they are needed for generating the output links we will save
catalogItems = await readCatalogsItems(results);
Expand Down Expand Up @@ -608,13 +609,13 @@ export async function processWorkItem(
): Promise<void> {
const { jobID } = job;
const { status, errorMessage, catalogItems, outputItemSizes } = preprocessResult;
const { workItemID, hits, results, scrollID } = update;
const { workItemID, hits, results, scrollID, sub_status } = update;
const startTime = new Date().getTime();
let durationMs;
let jobSaveStartTime;
let didCreateWorkItem = false;
if (status === WorkItemStatus.SUCCESSFUL) {
logger.info(`Updating work item ${workItemID} to ${status}`);
if (status === WorkItemStatus.SUCCESSFUL || status === WorkItemStatus.WARNING) {
logger.info(`Updating work item ${workItemID} to ${status} | ${sub_status}`);
}

try {
Expand Down Expand Up @@ -700,6 +701,7 @@ export async function processWorkItem(
tx,
workItemID,
status,
sub_status,
duration,
totalItemsSize,
outputItemSizes);
Expand All @@ -711,6 +713,7 @@ export async function processWorkItem(
logger.info(`Updated work item. Duration (ms) was: ${duration}`);

workItem.status = status;
workItem.sub_status = sub_status;

let allWorkItemsForStepComplete = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export async function updateWorkItem(req: HarmonyRequest, res: Response): Promis
const { id } = req.params;
const {
status,
sub_status,
hits,
results,
scrollID,
Expand All @@ -116,6 +117,7 @@ export async function updateWorkItem(req: HarmonyRequest, res: Response): Promis
const update = {
workItemID: parseInt(id),
status,
sub_status,
hits,
results,
scrollID,
Expand Down
10 changes: 10 additions & 0 deletions services/harmony/app/models/work-item-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ export enum WorkItemStatus {
SUCCESSFUL = 'successful',
FAILED = 'failed',
CANCELED = 'canceled',
WARNING = 'warning',
}

// additional information about the status - currently only relevant for WARNING status
export enum WorkItemSubStatus {
NO_DATA = 'no-data', // the service responded with no data
}

export const COMPLETED_WORK_ITEM_STATUSES = [
WorkItemStatus.SUCCESSFUL,
WorkItemStatus.FAILED,
WorkItemStatus.CANCELED,
WorkItemStatus.WARNING,
];

export interface WorkItemRecord {
Expand All @@ -36,6 +43,9 @@ export interface WorkItemRecord {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
sub_status?: WorkItemSubStatus;

// error message if status === FAILED
errorMessage?: string;

Expand Down
5 changes: 4 additions & 1 deletion services/harmony/app/models/work-item-update.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _ from 'lodash';
import { WorkItemStatus } from './work-item-interface';
import { WorkItemStatus, WorkItemSubStatus } from './work-item-interface';

/**
*
Expand All @@ -14,6 +14,9 @@ export default interface WorkItemUpdate {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
sub_status?: WorkItemSubStatus;

// The ID of the scroll session (only used for the query cmr service)
scrollID?: string;

Expand Down
19 changes: 13 additions & 6 deletions services/harmony/app/models/work-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import env from '../util/env';
import { Job, JobStatus } from './job';
import Record from './record';
import WorkflowStep from './workflow-steps';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery } from './work-item-interface';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery, WorkItemSubStatus } from './work-item-interface';
import { eventEmitter } from '../events';
import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';

Expand Down Expand Up @@ -51,6 +51,9 @@ export default class WorkItem extends Record implements WorkItemRecord {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
sub_status?: WorkItemSubStatus;

// error message if status === FAILED
errorMessage?: string;

Expand Down Expand Up @@ -256,7 +259,7 @@ export async function getNextWorkItems(
.whereIn('id', workItemData.map((w) => w.id));
}
} catch (e) {
logger.error(`Error getting next work item for service [${serviceID}] and job [${jobID}]`);
logger.error(`Error getting next work items for service [${serviceID}] and job [${jobID}]`);
logger.error(e);
throw e;
}
Expand Down Expand Up @@ -297,6 +300,7 @@ export async function getWorkItemStatus(
* @param tx - the transaction to use for querying
* @param id - the id of the WorkItem
* @param status - the status to set for the WorkItem
* @param sub_status - the sub-status to set for the WorkItem
* @param duration - how long the work item took to process
* @param totalItemsSize - the combined sizes of all the input granules for this work item
* @param outputItemSizes - the separate size of each granule in the output for this work item
Expand All @@ -305,6 +309,7 @@ export async function updateWorkItemStatus(
tx: Transaction,
id: number,
status: WorkItemStatus,
sub_status: WorkItemSubStatus,
duration: number,
totalItemsSize: number,
outputItemSizes: number[],
Expand All @@ -313,11 +318,11 @@ export async function updateWorkItemStatus(
const outputItemSizesJson = JSON.stringify(outputItemSizes);
try {
await tx(WorkItem.table)
.update({ status, duration, totalItemsSize, outputItemSizesJson: outputItemSizesJson, updatedAt: new Date() })
.update({ status, sub_status, duration, totalItemsSize, outputItemSizesJson: outputItemSizesJson, updatedAt: new Date() })
.where({ id });
logger.debug(`Status for work item ${id} set to ${status}`);
logger.debug(`Status for work item ${id} set to ${status} | ${sub_status}`);
} catch (e) {
logger.error(`Failed to update work item ${id} status to ${status}`);
logger.error(`Failed to update work item ${id} status to ${status} | ${sub_status}`);
logger.error(e);
throw e;
}
Expand All @@ -328,14 +333,16 @@ export async function updateWorkItemStatus(
* @param tx - the transaction to use for querying
* @param ids - the ids of the WorkItems
* @param status - the status to set for the WorkItems
* @param sub_status - the sub-status to set for the WorkItems
*/
export async function updateWorkItemStatuses(
tx: Transaction,
ids: number[],
status: WorkItemStatus,
sub_status?: WorkItemSubStatus,
): Promise<void> {
const now = new Date();
let update = { status, updatedAt: now };
let update = { status, sub_status, updatedAt: now };
// if we are setting the status to running, also set the startedAt time
if (status === WorkItemStatus.RUNNING) {
update = { ...update, ...{ startedAt: now } };
Expand Down
Loading
Loading