-
Notifications
You must be signed in to change notification settings - Fork 0
/
12-task_queue.Rmd
43 lines (30 loc) · 4.26 KB
/
12-task_queue.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# The Task Queue System
The Task Queue System maintains a list of outstanding analysis tasks in the database and is responsible for assigning tasks to analysis processes.
The database implementation is designed so that it can be utilised by either audioBlastAnalyse or in the future by an analysis package in another language (e.g. a Python analysis suite).
## Database Implementation
The system is implemented as two tables in the database, `tasks` and `tasks-progress`. Tasks are assigned using the stored procedure `get-tasks()` for local files, or `get-tasks-by-file()` for web files. The `get-tasks()` procedure will assign `n` random tasks to the analysis process, whereas `get-tasks-by-file()` will return all outstanding tasks for a given file. The latter procedure removes the need for repeated downloads of the same file.
### `tasks` table
The table `tasks` is a list of outstanding analysis tasks. It is populated automatically by triggers on the `recordings` and `recordings-calcuated` table. Analysis tasks are grouped into sections (recordings_calculated, sounscapes_minute and soundscpaes_second). The `recordings-calculaed` table has a TINYINT column for each of these, that is set to `1` once the task is completed.
### `tasks-progress` table
The table `tasks-progress` is populated by the stored procedure `get-tasks()` when an analysis process requests outstanding tasks. It is used to prevent the same task being analysed to multiple analysis processes. Tasks from an unresponsive process (e.g. a process which has crashed) can be reassigned after an hour of inactivity by that process.
### `get-tasks()` stored procedure
The stored procedure `get-tasks(process_id, n, source)` is used to assign `n` tasks to the analysis process `process_id`. Initially a call to `_quickMaintain()` is performed to deallocate time-expired tasks. It inserts `n` random unassigned tasks from the `tasks` table into the `tasks-progress` table along with the process_id and the current timestamp. It returns via a SELECT statement these tasks.
### `get-tasks-by-file()` stored procedure
Initially a call to `_quickMaintain()` is performed to deallocate time-expired tasks. The stored procedure `get-tasks(process_id, source)` then assigns all outstanding tasks for a randomly chosen file to the analysis process `process_id`. It inserts these tasks from the `tasks` table into the `tasks-progress` table along with the process_id and the current timestamp. It returns via a SELECT statement these tasks.
### `delete-task()` stored proecure
The stored procedure `delete-task(process_id, source, id, task)` marks a task as complete by deleting it from the `tasks` table. A trigger in the `tasks` table will delete the matching row from `tasks-progress`. The started time of any remaining tasks assigned to the same `process_id` in the `tasks-progess` table will be updated to the current time. A call is made to the stored procedure `_quickMaintain()` which will perform routine maintenance tasks, including removing any expired tasks in the `tasks-progress` table.
## audioBlastAnalyse Implementation
The analysis suite audioBlastAnalyse has several functions that implement this task queue in the R language.
All these functions require a database connector (`db`) as a parameter, and a unique `process_id` for the analysis process. In the majority of cases these functions will be called automatically from within a call to the main `analyse()` function, where the database connector will already be configured and a process identifier is automatically generated.
### Legacy mode
On some older Linux operating systems the RMariaDB package and libraries it builds against have issues dealing with stored procedures. For the time being a legacy mode is provided that implements the stored procedures as a sequence of queries. Setting `legacy=TRUE` as a parameter to either of the fetch queries will activate this mode.
### Getting tasks
The `get-tasks()` stored procedure is accessed by using `fetchDownloadableRecordings()`, and `get-tasks-by-file()` by using `fetchDownloadableRecordings()`.
````{R, eval=FALSE}
fetchDownloadableRecordings(db, source, process_id, legacy=FALSE)
fetchUnanalysedRecordings(db, source, process_id, legacy=FALSE)
````
### Removing completed tasks
````{R, eval=FALSE}
deleteToDo(db, source, id, task, process_id)
````