-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ENH: Implement API for slow running tasks. Add end-points for request…
…ing status on a task
- Loading branch information
1 parent
099a45e
commit 6837dcc
Showing
11 changed files
with
338 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package com.github.statnett.loadflowservice | ||
|
||
import io.github.oshai.kotlinlogging.KotlinLogging | ||
import kotlinx.coroutines.CancellationException | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.launch | ||
import kotlinx.serialization.Serializable | ||
|
||
fun statusUrl(id: String): String { | ||
return "/status/$id" | ||
} | ||
|
||
fun resultUrl(id: String): String { | ||
return "/result/$id" | ||
} | ||
|
||
@Serializable | ||
data class TaskInfo( | ||
val statusUrl: String, | ||
val resultUrl: String, | ||
val id: String | ||
) | ||
|
||
private val logger = KotlinLogging.logger { } | ||
|
||
fun createTask(tm: TaskManager, calculate: () -> ComputationResult): TaskInfo { | ||
val task = Task() | ||
tm.register(task) | ||
CoroutineScope(Dispatchers.Default).launch { | ||
logger.info { "Running task ${task.id} on thread ${Thread.currentThread().name}" } | ||
try { | ||
task.status = TaskStatus.RUNNING | ||
task.result = calculate() | ||
task.status = TaskStatus.FINISHED | ||
} catch (e: CancellationException) { | ||
throw e | ||
} catch (e: Exception) { | ||
task.status = TaskStatus.FAILED | ||
task.exception = e | ||
} finally { | ||
tm.scheduleTaskDeletion(task.id) | ||
} | ||
} | ||
return TaskInfo( | ||
statusUrl = statusUrl(task.id), | ||
resultUrl = resultUrl(task.id), | ||
id = task.id | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package com.github.statnett.loadflowservice | ||
|
||
import io.ktor.http.* | ||
import io.ktor.server.application.* | ||
import io.ktor.server.response.* | ||
import kotlinx.coroutines.delay | ||
import kotlinx.serialization.Serializable | ||
import kotlinx.serialization.encodeToString | ||
import kotlinx.serialization.json.Json | ||
import java.util.* | ||
|
||
enum class TaskStatus { | ||
CREATED, RUNNING, FINISHED, FAILED | ||
} | ||
|
||
class Task { | ||
var status = TaskStatus.CREATED | ||
var result: ComputationResult? = null | ||
val id = UUID.randomUUID().toString() | ||
var exception: Exception? = null | ||
private val createdAt = System.currentTimeMillis() | ||
|
||
fun ageSeconds(): Int { | ||
return ((System.currentTimeMillis() - createdAt) / 1000).toInt() | ||
} | ||
} | ||
|
||
class TaskQueue { | ||
private val tasks = mutableListOf<Task>() | ||
|
||
fun get(id: String): Task? { | ||
return tasks.firstOrNull { task -> task.id == id } | ||
} | ||
|
||
fun size(): Int { | ||
return tasks.size | ||
} | ||
|
||
fun register(task: Task) { | ||
tasks.add(task) | ||
} | ||
|
||
fun remove(id: String) { | ||
tasks.removeIf { t -> t.id == id } | ||
} | ||
|
||
fun finished(id: String): Boolean { | ||
val task = get(id) ?: return false | ||
return task.status == TaskStatus.FINISHED | ||
} | ||
|
||
fun clearOlderThan(ageSeconds: Int) { | ||
tasks.removeIf { t -> (t.ageSeconds() > ageSeconds) and (t.status != TaskStatus.RUNNING)} | ||
} | ||
|
||
fun numRunning(): Int { | ||
return tasks.count { t -> t.status == TaskStatus.RUNNING } | ||
} | ||
} | ||
|
||
class TaskDoesNotExistException(message: String) : Exception(message) | ||
class FullBufferException(message: String) : Exception(message) | ||
|
||
@Serializable | ||
data class TaskStatusResponse(val status: String, val message: String) | ||
|
||
class TaskManager(private val maxRunningTasks: Int = 100, private val retention: Int = 10 * 60) { | ||
internal val queue = TaskQueue() | ||
|
||
fun status(id: String): TaskStatusResponse { | ||
val task = queue.get(id) ?: throw TaskDoesNotExistException("No task with id $id") | ||
val message = if (task.exception != null) "${task.exception}" else "" | ||
return TaskStatusResponse( | ||
task.status.name, | ||
message | ||
) | ||
} | ||
|
||
fun size(): Int { | ||
return queue.size() | ||
} | ||
|
||
fun numRunning(): Int { | ||
return queue.numRunning() | ||
} | ||
|
||
private fun clearOld() { | ||
queue.clearOlderThan(retention) | ||
} | ||
|
||
private fun raiseOnFull() { | ||
if (numRunning() >= maxRunningTasks) { | ||
throw FullBufferException("Could not add task because the buffer is full") | ||
} | ||
} | ||
|
||
fun register(task: Task) { | ||
raiseOnFull() | ||
queue.register(task) | ||
} | ||
|
||
|
||
suspend fun respondWithResult(call: ApplicationCall, id: String) { | ||
if (status(id).status != TaskStatus.FINISHED.name) { | ||
call.respondText("Task not finished", status = HttpStatusCode.NotFound) | ||
return | ||
} | ||
|
||
val result = queue.get(id) | ||
if (result != null) { | ||
val res = Json.encodeToString(result.result) | ||
call.respondText(res, contentType = ContentType.Application.Json) | ||
queue.remove(id) | ||
} | ||
|
||
throw TaskDoesNotExistException("No task with id $id") | ||
} | ||
|
||
suspend fun scheduleTaskDeletion(id: String) { | ||
delay((retention*1000).toLong()) | ||
queue.remove(id) | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.