Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHRAS-4084 : workers - cancel a workers jobs if error is "record not found" #4531

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ protected function doExecute(InputInterface $input, OutputInterface $output)
if ($body === false) {
$output->writeln(sprintf('<error>Unable to read payload file %s</error>', $input->getArgument('body')));

return;
return 0;
}

$body = json_decode($body, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$output->writeln('<error>Invalid message body</error>');

return;
return 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not returning a error code ? (1 ?) see: https://hpc-discourse.usc.edu/t/exit-codes-and-their-meanings/414

same thing todo for every failure point (catch's)

}
}

Expand All @@ -58,5 +58,6 @@ protected function doExecute(InputInterface $input, OutputInterface $output)
@unlink($input->getArgument('body'));
}

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public function process(array $payload)
$count
));

return;
return 0;
}

$body = json_decode($body,true);
Expand Down Expand Up @@ -111,6 +111,8 @@ public function process(array $payload)
unset($workerRunningJob);
}
$em->flush();

return 0;
}

private function createStory(array $body)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public function process(array $payload)
$workerRunningJob->getId()
));

return;
return 0;
}


Expand All @@ -134,7 +134,7 @@ public function process(array $payload)
$workerRunningJob->getId()
));

return;
return 0;
}

if ($workerRunningJob != null) {
Expand Down Expand Up @@ -181,7 +181,7 @@ public function process(array $payload)
if (!isset($body['formData']['collection_destination'])) {
$this->messagePublisher->pushLog("The collection_destination is not defined");

return ;
return 0;
}

$base_id = $body['formData']['collection_destination'];
Expand Down Expand Up @@ -257,6 +257,8 @@ public function process(array $payload)
]
]
);

return 0;
}

/**
Expand Down
8 changes: 5 additions & 3 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function process(array $payload)
try {
$databox = $this->findDataboxById($payload['databoxId']);
} catch(\Exception $e) {
return;
return 0;
}

$recordIds = [];
Expand All @@ -49,7 +49,7 @@ public function process(array $payload)
if ($workerRunningJob == null) {
$this->messagePublisher->pushLog("Given workerJobId not found !", 'error');

return ;
return 0;
}

$workerRunningJob
Expand Down Expand Up @@ -191,7 +191,7 @@ public function process(array $payload)
$workerMessage
);

return;
return 0;
}

// order to write metas for those records
Expand All @@ -213,5 +213,7 @@ public function process(array $payload)
}

$this->messagePublisher->pushLog(sprintf("record edited databoxname=%s databoxid=%d recordid=%d", $databox->get_viewname(), $payload['databoxId'], $payload['record_id']));

return 0;
}
}
10 changes: 6 additions & 4 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function process(array $payload)
if ($workerRunningJob == null) {
$this->messagePublisher->pushLog("Given workerJobId not found !", 'error');

return ;
return 0;
}

$workerRunningJob
Expand Down Expand Up @@ -243,7 +243,7 @@ public function process(array $payload)
$this->messagePublisher->pushLog(sprintf("subdefinition %s or file as document mapping not found", $phraseanetSubdefAsDocument));
$this->finishedJob($workerRunningJob, $em, WorkerRunningJob::ERROR);

return ;
return 0;
}

if ($lat !== null) {
Expand Down Expand Up @@ -271,7 +271,7 @@ public function process(array $payload)
$this->messagePublisher->pushLog("An error occurred when creating asset: status-code " . $response->getStatusCode());
$this->finishedJob($workerRunningJob, $em, WorkerRunningJob::ERROR);

return ;
return 0;
}

$assetsResponse = json_decode($response->getBody(),true);
Expand Down Expand Up @@ -369,11 +369,13 @@ public function process(array $payload)
$this->messagePublisher->pushLog("An error occurred when creating asset!: ". $e->getMessage());
$this->finishedJob($workerRunningJob, $em, WorkerRunningJob::ERROR);

return;
return 0;
}

// tell that the upload is finished
$this->finishedJob($workerRunningJob, $em);

return 0;
}

private function getClientAnnotationProfile(Client $exposeClient, $publicationId)
Expand Down
8 changes: 6 additions & 2 deletions lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ public function process(array $payload)
$export = $repoFtpExport->find($payload['ftpExportId']);

if ($export !== null) {
$this->doExport($export, $payload);
return $this->doExport($export, $payload);
}

return 0;
}

private function doExport(FtpExport $export, array $payload)
Expand All @@ -73,7 +75,7 @@ private function doExport(FtpExport $export, array $payload)
if ($workerRunningJob == null) {
$this->logger->error("Given workerJobId not found !");

return ;
return 0;
}

$workerRunningJob
Expand Down Expand Up @@ -401,6 +403,8 @@ private function doExport(FtpExport $export, array $payload)
$workerMessage
);
}

return 0;
}

private function finalize(Application $app, FtpExport $export)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function process(array $payload)
if ($workerRunningJob == null) {
$this->messagePublisher->pushLog("Given workerJobId not found !", "error");

return ;
return 0;
}

$workerRunningJob
Expand Down Expand Up @@ -155,6 +155,8 @@ public function process(array $payload)

$em->flush();
}

return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public function process(array $payload)
if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) {
// bad payload
$this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__));
return;
return 0;
}

$recordId = $payload['recordId'];
Expand All @@ -74,11 +74,11 @@ public function process(array $payload)
} catch (\Exception $e) {
$this->logger->error(sprintf("%s (%s) : record not found %s", __FILE__, __LINE__, $e->getMessage()));

return;
return 0;
}

if ($record->isStory()) {
return;
return 0;
}

$oldLogger = $this->subdefGenerator->getLogger();
Expand All @@ -96,7 +96,7 @@ public function process(array $payload)
MessagePublisher::SUBDEF_CREATION_TYPE
);

return ;
return 0;
}

// here we can work
Expand Down Expand Up @@ -130,7 +130,7 @@ public function process(array $payload)
));

// the subscriber will "unlock" the row, no need to do it here
return ;
return 0;
}

// begin to check if the subdef is successfully generated
Expand Down Expand Up @@ -158,7 +158,7 @@ public function process(array $payload)
$this->subdefGenerator->setLogger($oldLogger);

// the subscriber will "unlock" the row, no need to do it here
return ;
return 0;
}

// checking ended
Expand Down Expand Up @@ -202,6 +202,8 @@ public function process(array $payload)
$this->repoWorker->markFinished($workerRunningJobId);

$this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_SUBDEFCREATION, new \DateTime('now'), WorkerRunningJob::FINISHED);

return 0;
}

public static function checkIfFirstChild(\record_adapter $story, \record_adapter $record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public function process(array $payload)
if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) {
// bad payload
$this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__));
return;
return 0;
}

$databoxId = $payload['databoxId'];
Expand All @@ -88,7 +88,7 @@ public function process(array $payload)
MessagePublisher::WRITE_METADATAS_TYPE
);

return ;
return 0;
}

// here we can work
Expand All @@ -98,7 +98,7 @@ public function process(array $payload)
} catch (\Exception $e) {
$this->repoWorker->markFinished($workerRunningJobId, "error " . $e->getMessage());

return;
return 0;
}

/** @var WorkerRunningJob $workerRunningJob */
Expand All @@ -116,7 +116,7 @@ public function process(array $payload)

$this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_WRITEMETADATAS, new \DateTime('now'), WorkerRunningJob::ERROR);

return;
return 0;
}

$this->repoWorker->reconnect();
Expand All @@ -141,7 +141,7 @@ public function process(array $payload)
));

// the subscriber will mark the job as errored, no need to do it here
return ;
return 0;
}

if (!$subdef->is_physically_present()) {
Expand All @@ -158,7 +158,7 @@ public function process(array $payload)
));

// the subscriber will mark the job as errored, no need to do it here
return ;
return 0;
}

// here we can try to do the job
Expand Down Expand Up @@ -326,7 +326,7 @@ public function process(array $payload)
$this->repoWorker->markFinished($workerRunningJobId, $stopInfo);
$this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_WRITEMETADATAS, new \DateTime('now'), WorkerRunningJob::ERROR);
}
return ;
return 0;
}

// mark write metas finished
Expand All @@ -336,6 +336,8 @@ public function process(array $payload)
$this->repoWorker->markFinished($workerRunningJobId);

$this->getDataboxLogger($databox)->initOrUpdateLogDocsFromWorker($record, $databox, $workerRunningJob, $subdefName, \Session_Logger::EVENT_WRITEMETADATAS, new \DateTime('now'), WorkerRunningJob::FINISHED);

return 0;
}

private function removeNulChar($value)
Expand Down
Loading