Skip to content

Commit

Permalink
Revert "optimised overview data api"
Browse files Browse the repository at this point in the history
This reverts commit fefaaf1.
  • Loading branch information
lmenezes committed Oct 16, 2017
1 parent d55787f commit 57ae79f
Show file tree
Hide file tree
Showing 38 changed files with 3,412 additions and 1,118 deletions.
39 changes: 32 additions & 7 deletions app/controllers/ClusterOverviewController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,45 @@ import javax.inject.Inject

import controllers.auth.AuthenticationModule
import elastic.{ElasticClient, Error}
import models.overview.ClusterOverview
import models.{CerebroResponse, Hosts, ShardStats}
import services.overview.DataService

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class ClusterOverviewController @Inject()(val authentication: AuthenticationModule,
val hosts: Hosts,
client: ElasticClient,
data: DataService) extends BaseController {
client: ElasticClient) extends BaseController {

def index = process { request =>
data.getOverviewData(request.target).map { overview =>
CerebroResponse(200, overview)
Future.sequence(
Seq(
client.clusterState(request.target),
client.nodesStats(Seq("jvm","fs","os","process"), request.target),
client.indicesStats(request.target),
client.clusterSettings(request.target),
client.aliases(request.target),
client.clusterHealth(request.target),
client.nodes(Seq("os","jvm"), request.target),
client.main(request.target)
)
).map { responses =>
val failed = responses.find(_.isInstanceOf[Error])
failed match {
case Some(f) => CerebroResponse(f.status, f.body)
case None =>
val overview = ClusterOverview(
responses(0).body,
responses(1).body,
responses(2).body,
responses(3).body,
responses(4).body,
responses(5).body,
responses(6).body,
responses(7).body
)
CerebroResponse(200, overview)
}
}
}

Expand Down Expand Up @@ -77,7 +102,7 @@ class ClusterOverviewController @Inject()(val authentication: AuthenticationModu

def getShardStats = process { request =>
val index = request.get("index")
val shard = request.get("shard").toInt // TODO ES return as Int?
val shard = request.getInt("shard")
val node = request.get("node")
Future.sequence(
Seq(
Expand All @@ -99,7 +124,7 @@ class ClusterOverviewController @Inject()(val authentication: AuthenticationModu

def relocateShard = process { request =>
val index = request.get("index")
val shard = request.get("shard").toInt // TODO ES return as Int?
val shard = request.getInt("shard")
val from = request.get("from")
val to = request.get("to")
val server = request.target
Expand Down
14 changes: 7 additions & 7 deletions app/models/ShardStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ object ShardStats {
shardStats.getOrElse(getShardRecovery(index, node, shard, recovery).getOrElse(JsNull))
}

private def matchesNode(node: String, shardNode: String): Boolean = {
shardNode.equals(node) || shardNode.startsWith(node)
}

private def getShardStats(index: String, node: String, shard: Int, stats: JsValue): Option[JsValue] =
(stats \ "indices" \ index \ "shards" \ shard.toString).asOpt[JsArray] match {
case Some(JsArray(shards)) => shards.find(s => matchesNode(node, (s \ "routing" \ "node").as[String]))
case Some(JsArray(shards)) => shards.collectFirst {
case s if (s \ "routing" \ "node").as[String].equals(node) => s
}
case _ => None
}


private def getShardRecovery(index: String, node: String, shard: Int, recovery: JsValue): Option[JsValue] =
(recovery \ index \ "shards").asOpt[JsArray] match {
case Some(JsArray(recoveries)) =>
recoveries.find(r => matchesNode(node, (r \ "target" \ "id").as[String]) && (r \ "id").as[Int] == shard)
case Some(JsArray(recoveries)) => recoveries.collectFirst {
case r if (r \ "target" \ "id").as[String].equals(node) && (r \ "id").as[Int].equals(shard) => r
}
case _ => None
}

Expand Down
4 changes: 0 additions & 4 deletions app/models/commons/NodeRoles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,4 @@ object NodeRoles {
)
}
}

def apply(roles: String): NodeRoles =
NodeRoles(roles.contains("m"), roles.contains("d"), roles.contains("i"))

}
18 changes: 18 additions & 0 deletions app/models/overview/ClosedIndices.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package models.overview

import play.api.libs.json._

object ClosedIndices {

def apply(clusterState: JsValue) = {
val blocks = (clusterState \ "blocks" \ "indices").asOpt[JsObject].getOrElse(Json.obj())
blocks.keys.collect {
case index if (blocks \ index \ "4").asOpt[JsObject].isDefined =>
Json.obj(
"name" -> JsString(index),
"closed" -> JsBoolean(true),
"special" -> JsBoolean(index.startsWith("."))
)
}
}
}
64 changes: 64 additions & 0 deletions app/models/overview/ClusterOverview.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package models.overview

import play.api.libs.json._

object ClusterOverview {

def apply(clusterState: JsValue, nodesStats: JsValue, indicesStats: JsValue,
clusterSettings: JsValue, aliases: JsValue, clusterHealth: JsValue,
nodesInfo: JsValue, main: JsValue): JsValue = {

val indices = buildIndices(clusterState, indicesStats, aliases)

val masterNodeId = (clusterState \ "master_node").as[String]

val persistentAllocation = (clusterSettings \ "persistent" \ "cluster" \ "routing" \ "allocation" \ "enable").asOpt[String].getOrElse("all")
val transientAllocation = (clusterSettings \ "transient" \ "cluster" \ "routing" \ "allocation" \ "enable").asOpt[String]
val shardAllocation = transientAllocation.getOrElse(persistentAllocation).equals("all")

JsObject(Seq(
// clusterHealth
"cluster_name" -> (clusterHealth \ "cluster_name").as[JsString],
"status" -> (clusterHealth \ "status").as[JsString],
"number_of_nodes" -> (clusterHealth \ "number_of_nodes").as[JsNumber],
"active_primary_shards" -> (clusterHealth \ "active_primary_shards").as[JsNumber],
"active_shards" -> (clusterHealth \ "active_shards").as[JsNumber],
"relocating_shards" -> (clusterHealth \ "relocating_shards").as[JsNumber],
"initializing_shards" -> (clusterHealth \ "initializing_shards").as[JsNumber],
"unassigned_shards" -> (clusterHealth \ "unassigned_shards").as[JsNumber],
// indicesStats
"docs_count" -> (indicesStats \ "_all" \ "primaries" \ "docs" \ "count").asOpt[JsNumber].getOrElse(JsNumber(0)),
"size_in_bytes" -> (indicesStats \ "_all" \ "total" \ "store" \ "size_in_bytes").asOpt[JsNumber].getOrElse(JsNumber(0)),
"total_indices" -> JsNumber(indices.size),
"closed_indices" -> JsNumber(indices.count { idx => (idx \ "closed").as[Boolean] }),
"special_indices" -> JsNumber(indices.count { idx => (idx \ "special").as[Boolean] }),
"indices" -> JsArray(indices),
"nodes" -> buildNodes(masterNodeId, nodesInfo, nodesStats),
"shard_allocation" -> JsBoolean(shardAllocation)
))
}

def buildNodes(masterNodeId: String, nodesInfo: JsValue, nodesStats: JsValue): JsArray =
JsArray(
(nodesInfo \ "nodes").as[JsObject].value.map {
case (id, info) =>
val stats = (nodesStats \ "nodes" \ id).as[JsObject]
Node(id, info, stats, masterNodeId)
}.toSeq
)

def buildIndices(clusterState: JsValue, indicesStats: JsValue, aliases: JsValue): Seq[JsValue] = {
val routingTable = (clusterState \ "routing_table" \ "indices").as[JsObject]
val routingNodes = (clusterState \ "routing_nodes" \ "nodes").as[JsObject]
val openIndices = routingTable.value.map { case (index, shards) =>
val indexStats = (indicesStats \ "indices" \ index).asOpt[JsObject].getOrElse(Json.obj())
val indexAliases = (aliases \ index \ "aliases").asOpt[JsObject].getOrElse(Json.obj()) // 1.4 < does not return aliases obj

Index(index, indexStats, shards, indexAliases, routingNodes)
}.toSeq

val closedIndices = ClosedIndices(clusterState)
openIndices ++ closedIndices
}

}
49 changes: 49 additions & 0 deletions app/models/overview/Index.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package models.overview

import play.api.libs.json._

object Index {

def apply(name: String, stats: JsValue, shards: JsValue, aliases: JsObject, routingNodes: JsValue): JsValue = {

val unassignedShards = (shards \ "shards").as[JsObject].values.flatMap {
case JsArray(shards) =>
shards.filter { shard =>
(shard \ "node").asOpt[String].isEmpty
}
case _ => Nil
}

val shardsAllocation = routingNodes.as[JsObject].value.mapValues {
case JsArray(shards) => JsArray(shards.filter { shard => (shard \ "index").as[String].equals(name) })
case _ => JsArray()
}.toSeq ++ Seq("unassigned" -> JsArray(unassignedShards.toSeq))

val numShards = (shards \ "shards").as[JsObject].keys.size
val numReplicas = (shards \ "shards" \ "0").as[JsArray].value.size - 1

val special = name.startsWith(".")

JsObject(Seq(
"name" -> JsString(name),
"closed" -> JsBoolean(false),
"special" -> JsBoolean(special),
"unhealthy" -> JsBoolean(unhealthyIndex(shardsAllocation)),
"doc_count" -> (stats \ "primaries" \ "docs" \ "count").asOpt[JsNumber].getOrElse(JsNumber(0)),
"deleted_docs" -> (stats \ "primaries" \ "docs" \ "deleted").asOpt[JsNumber].getOrElse(JsNumber(0)),
"size_in_bytes" -> (stats \ "primaries" \ "store" \ "size_in_bytes").asOpt[JsNumber].getOrElse(JsNumber(0)),
"total_size_in_bytes" -> (stats \ "total" \ "store" \ "size_in_bytes").asOpt[JsNumber].getOrElse(JsNumber(0)),
"aliases" -> JsArray(aliases.keys.map(JsString(_)).toSeq), // 1.4 < does not return aliases obj
"num_shards" -> JsNumber(numShards),
"num_replicas" -> JsNumber(numReplicas),
"shards" -> JsObject(shardsAllocation)
))
}

private def unhealthyIndex(shardAllocation: Seq[(String, JsArray)]): Boolean =
shardAllocation.exists {
case ("unassigned", _) => true
case (_, JsArray(shards)) => shards.exists(!_.\("state").as[String].equals("STARTED"))
}

}
67 changes: 67 additions & 0 deletions app/models/overview/Node.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package models.overview

import models.commons.NodeRoles
import play.api.libs.json._

object Node {

def apply(id: String, info: JsValue, stats: JsValue, masterNodeId: String) = {
val nodeRoles = NodeRoles(info)


// AWS nodes return no host/ip info
val host = (info \ "host").asOpt[JsString].getOrElse(JsNull)
val ip = (info \ "ip").asOpt[JsString].getOrElse(JsNull)
val jvmVersion = (info \ "jvm" \ "version").asOpt[JsString].getOrElse(JsNull)

Json.obj(
"id" -> JsString(id),
"current_master" -> JsBoolean(id.equals(masterNodeId)),
"name" -> (info \ "name").as[JsString],
"host" -> host,
"ip" -> ip,
"es_version" -> (info \ "version").as[JsString],
"jvm_version" -> jvmVersion,
"load_average" -> loadAverage(stats),
"available_processors" -> (info \ "os" \ "available_processors").as[JsNumber],
"cpu_percent" -> cpuPercent(stats),
"master" -> JsBoolean(nodeRoles.master),
"data" -> JsBoolean(nodeRoles.data),
"coordinating" -> JsBoolean(nodeRoles.coordinating),
"ingest" -> JsBoolean(nodeRoles.ingest),
"heap" -> Json.obj(
"used" -> (stats \ "jvm" \ "mem" \ "heap_used_in_bytes").as[JsNumber],
"committed" -> (stats \ "jvm" \ "mem" \ "heap_committed_in_bytes").as[JsNumber],
"used_percent" -> (stats \ "jvm" \ "mem" \ "heap_used_percent").as[JsNumber],
"max" -> (stats \ "jvm" \ "mem" \ "heap_max_in_bytes").as[JsNumber]
),
"disk" -> disk(stats)
)
}

def disk(stats: JsValue): JsObject = {
val totalInBytes = (stats \ "fs" \ "total" \ "total_in_bytes").asOpt[Long].getOrElse(0l)
val freeInBytes = (stats \ "fs" \ "total" \ "free_in_bytes").asOpt[Long].getOrElse(0l)
val usedPercent = 100 - (100 * (freeInBytes.toFloat / totalInBytes.toFloat)).toInt
Json.obj(
"total" -> JsNumber(totalInBytes),
"free" -> JsNumber(freeInBytes),
"used_percent" -> JsNumber(usedPercent)
)
}

def loadAverage(nodeStats: JsValue): JsNumber = {
val load = (nodeStats \ "os" \ "cpu" \ "load_average" \ "1m").asOpt[Float].getOrElse(// 5.X
(nodeStats \ "os" \ "load_average").asOpt[Float].getOrElse(0f) // FIXME: 2.X
)
JsNumber(BigDecimal(load.toDouble))
}

def cpuPercent(nodeStats: JsValue): JsNumber = {
val cpu = (nodeStats \ "os" \ "cpu" \ "percent").asOpt[Int].getOrElse(// 5.X
(nodeStats \ "os" \ "cpu_percent").asOpt[Int].getOrElse(0) // FIXME 2.X
)
JsNumber(BigDecimal(cpu))
}

}
Loading

0 comments on commit 57ae79f

Please sign in to comment.