Skip to content

Commit

Permalink
feat: add createDynamicTagTimer and update MetricsFacade codes
Browse files Browse the repository at this point in the history
  • Loading branch information
jonesho committed Nov 29, 2024
1 parent c559e42 commit 69ad44d
Show file tree
Hide file tree
Showing 20 changed files with 189 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
private val meterRegistry: MeterRegistry = BackendRegistries.getDefaultNow()
private val micrometerMetricsFacade = MicrometerMetricsFacade(meterRegistry, "linea")
private val httpJsonRpcClientFactory = VertxHttpJsonRpcClientFactory(
vertx,
micrometerMetricsFacade,
vertx = vertx,
metricsFacade = MicrometerMetricsFacade(meterRegistry),
requestResponseLogLevel = Level.TRACE,
failuresLogLevel = Level.WARN
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ class L1DependentApp(
private val alreadySubmittedBlobsFilter =
L1ShnarfBasedAlreadySubmittedBlobsFilter(
lineaRollup = lineaSmartContractClientForDataSubmission,
acceptedBlobEndBlockNumberConsumer = { highestAcceptedBlobTracker }
acceptedBlobEndBlockNumberConsumer = { highestAcceptedBlobTracker(it) }
)

private val latestBlobSubmittedBlockNumberTracker = LatestBlobSubmittedBlockNumberTracker(0UL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ class BlobCompressionProofCoordinator(
private var timerId: Long? = null
private lateinit var blobPollingAction: Handler<Long>
private val blobsCounter = metricsFacade.createCounter(
LineaMetricsCategory.BLOB,
"counter",
"New blobs arriving to blob compression proof coordinator"
category = LineaMetricsCategory.BLOB,
name = "counter",
description = "New blobs arriving to blob compression proof coordinator"
)

init {
metricsFacade.createGauge(
LineaMetricsCategory.BLOB,
"compression.queue.size",
"Size of blob compression proving queue",
{ blobsToHandle.size }
category = LineaMetricsCategory.BLOB,
name = "compression.queue.size",
description = "Size of blob compression proving queue",
measurementSupplier = { blobsToHandle.size }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import net.consensys.encodeHex
import net.consensys.linea.blob.BlobCompressorVersion
import net.consensys.linea.blob.GoNativeBlobCompressor
import net.consensys.linea.blob.GoNativeBlobCompressorFactory
import net.consensys.linea.metrics.LineaMetricsCategory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.TimerCapture
import org.apache.logging.log4j.LogManager
Expand Down Expand Up @@ -74,12 +75,14 @@ class GoBackedBlobCompressor private constructor(
}

private val canAppendBlockTimer: TimerCapture<Boolean> = metricsFacade.createSimpleTimer(
name = "go.backed.blob.compressor.can.append.block",
description = "Time taken to run CanWrite method"
category = LineaMetricsCategory.BLOB,
name = "compressor.canappendblock",
description = "Time taken to check if block fits in current blob"
)
private val appendBlockTimer: TimerCapture<BlobCompressor.AppendResult> = metricsFacade.createSimpleTimer(
name = "go.backed.blob.compressor.append.block",
description = "Time taken to run AppendResult method"
private val appendBlockTimer: TimerCapture<Boolean> = metricsFacade.createSimpleTimer(
category = LineaMetricsCategory.BLOB,
name = "compressor.appendblock",
description = "Time taken to compress block into current blob"
)

private val log = LogManager.getLogger(GoBackedBlobCompressor::class.java)
Expand All @@ -92,7 +95,9 @@ class GoBackedBlobCompressor private constructor(

override fun appendBlock(blockRLPEncoded: ByteArray): BlobCompressor.AppendResult {
val compressionSizeBefore = goNativeBlobCompressor.Len()
val appended = goNativeBlobCompressor.Write(blockRLPEncoded, blockRLPEncoded.size)
val appended = appendBlockTimer.captureTime {
goNativeBlobCompressor.Write(blockRLPEncoded, blockRLPEncoded.size)
}
val compressedSizeAfter = goNativeBlobCompressor.Len()
val compressionRatio = 1.0 - ((compressedSizeAfter - compressionSizeBefore).toDouble() / blockRLPEncoded.size)
log.trace(
Expand All @@ -107,9 +112,7 @@ class GoBackedBlobCompressor private constructor(
log.error("Failure while writing the following RLP encoded block: {}", blockRLPEncoded.encodeHex())
throw BlobCompressionException(error)
}
return appendBlockTimer.captureTime {
BlobCompressor.AppendResult(appended, compressionSizeBefore, compressedSizeAfter)
}
return BlobCompressor.AppendResult(appended, compressionSizeBefore, compressedSizeAfter)
}

override fun startNewBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class ConflationServiceImpl(
internal val blocksToConflate = PriorityBlockingQueue<PayloadAndBlockCounters>()

private val blocksCounter = metricsFacade.createCounter(
LineaMetricsCategory.CONFLATION,
"blocks.imported",
"New blocks arriving to conflation service counter"
category = LineaMetricsCategory.CONFLATION,
name = "blocks.imported",
description = "New blocks arriving to conflation service counter"
)

init {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ class L1ShnarfBasedAlreadySubmittedBlobsFilter(
.maxOfOrNull { it }
}
.thenApply { highestBlobEndBlockNumberFoundInL1 ->
highestBlobEndBlockNumberFoundInL1?.also(acceptedBlobEndBlockNumberConsumer::accept)
?.let { blockNumber -> items.filter { it.startBlockNumber > blockNumber } }
?: items
highestBlobEndBlockNumberFoundInL1?.also {
acceptedBlobEndBlockNumberConsumer.accept(it)
}?.let { blockNumber ->
items.filter { it.startBlockNumber > blockNumber }
} ?: items
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import com.github.michaelbull.result.map
import com.github.michaelbull.result.merge
import com.github.michaelbull.result.recover
import com.github.michaelbull.result.unwrap
import io.micrometer.core.instrument.Clock
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
import io.vertx.core.AsyncResult
import io.vertx.core.CompositeFuture
import io.vertx.core.Future
Expand All @@ -24,7 +20,6 @@ import io.vertx.core.json.jackson.DatabindCodec
import io.vertx.ext.auth.User
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.Tag
import net.consensys.linea.metrics.micrometer.DynamicTagTimerCapture
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger

Expand Down Expand Up @@ -52,42 +47,42 @@ private data class RequestContext(
class JsonRpcMessageProcessor(
private val requestsHandler: JsonRpcRequestHandler,
private val metricsFacade: MetricsFacade,
private val meterRegistry: MeterRegistry,
private val requestParser: JsonRpcRequestParser = Companion::parseRequest
) : JsonRpcMessageHandler {
init {
DatabindCodec.mapper().registerKotlinModule()
}

private val log: Logger = LogManager.getLogger(this.javaClass)
private val counterBuilder = Counter.builder("jsonrpc.counter")
override fun invoke(user: User?, messageJsonStr: String): Future<String> =
handleMessage(user, messageJsonStr)
override fun invoke(user: User?, messageJsonStr: String): Future<String> {
return measureRequestProcessing(user, messageJsonStr)
}

private fun handleMessage(user: User?, requestJsonStr: String): Future<String> {
val wholeRequestTimer =
Timer.builder("jsonrpc.processing.whole")
.description(
"Processing of JSON-RPC message: Deserialization + Business Logic + Serialization"
)
val timerSample = Timer.start(Clock.SYSTEM)
private fun handleMessage(user: User?, requestJsonStr: String): Future<Pair<String, String>> {
val json: Any =
when (val result = decodeMessage(requestJsonStr)) {
is Ok -> result.value
is Err -> {
return Future.succeededFuture(Json.encode(result.error))
return Future.succeededFuture(Pair("MESSAGE_DECODE_ERROR", Json.encode(result.error)))
}
}
log.trace(json)
val isBulkRequest: Boolean = json is JsonArray
val jsonArray = if (isBulkRequest) json as JsonArray else JsonArray().add(json)
val parsingResults: List<Result<Pair<JsonRpcRequest, JsonObject>, JsonRpcErrorResponse>> =
jsonArray.map(::measureRequestParsing)
val methodTag =
if (isBulkRequest) {
"bulk_request"
} else {
parsingResults.first()
.unwrap().first.method
}

// all or nothing: if any of the requests has a parsing error, return before execution
parsingResults.forEach {
when (it) {
is Err -> return Future.succeededFuture(Json.encode(it.error))
is Err -> return Future.succeededFuture(Pair(methodTag, Json.encode(it.error)))
is Ok -> Unit
}
}
Expand Down Expand Up @@ -129,15 +124,6 @@ class JsonRpcMessageProcessor(

return Future.all(serializedResponses)
.transform { ar: AsyncResult<CompositeFuture> ->
val methodTag =
if (isBulkRequest) {
"bulk_request"
} else {
parsingResults.first()
.unwrap().first.method
}
wholeRequestTimer.tag("method", methodTag)

val responses = ar.result().list<String>()
val finalResponseJsonStr =
if (responses.size == 1) {
Expand All @@ -148,27 +134,40 @@ class JsonRpcMessageProcessor(
description = "Time of bulk json response serialization"
).captureTime { responses.joinToString(",", "[", "]") }
}

timerSample.stop(wholeRequestTimer.register(meterRegistry))
logResponse(allSuccessful, finalResponseJsonStr, requestJsonStr)
Future.succeededFuture(finalResponseJsonStr)
Future.succeededFuture(Pair(methodTag, finalResponseJsonStr))
}
}

private fun measureRequestProcessing(user: User?, requestJsonStr: String): Future<String> {
return Future.fromCompletionStage(
metricsFacade.createDynamicTagTimer<Pair<String, String>>(
name = "jsonrpc.processing.whole",
description = "Processing of JSON-RPC message: Deserialization + Business Logic + Serialization",
tagKey = "method",
tagValueExtractorOnError = { "METHOD_PARSE_ERROR" }
) {
it.first
}
.captureTime(handleMessage(user, requestJsonStr).toCompletionStage().toCompletableFuture())
.thenApply {
it.second
}
)
}

private fun measureRequestParsing(
json: Any
): Result<Pair<JsonRpcRequest, JsonObject>, JsonRpcErrorResponse> {
return DynamicTagTimerCapture<Result<Pair<JsonRpcRequest, JsonObject>, JsonRpcErrorResponse>>(
meterRegistry,
"jsonrpc.serialization.request"
)
.setTagKey("method")
.setDescription("json-rpc method parsing")
.setTagValueExtractor { parsingResult: Result<Pair<JsonRpcRequest, JsonObject>, JsonRpcErrorResponse> ->
parsingResult.map { it.first.method }.recover { "METHOD_PARSE_ERROR" }.value
}
.setTagValueExtractorOnError { "METHOD_PARSE_ERROR" }
.captureTime { requestParser(json) }
return metricsFacade.createDynamicTagTimer(
name = "jsonrpc.serialization.request",
description = "json-rpc method parsing",
tagKey = "method",
tagValueExtractorOnError = { "METHOD_PARSE_ERROR" }
) {
parsingResult: Result<Pair<JsonRpcRequest, JsonObject>, JsonRpcErrorResponse> ->
parsingResult.map { it.first.method }.recover { "METHOD_PARSE_ERROR" }.value
}.captureTime { requestParser(json) }
}

private fun encodeAndMeasureResponse(requestContext: RequestContext): String {
Expand All @@ -186,18 +185,22 @@ class JsonRpcMessageProcessor(
jsonRpcRequest: JsonRpcRequest,
requestJson: JsonObject
): Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>> {
val timerCapture = metricsFacade.createSimpleTimer<Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>>(
return metricsFacade.createSimpleTimer<Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>>(
name = "jsonrpc.processing.logic",
description = "Processing of a particular JRPC method's logic without SerDes",
tags = listOf(Tag("method", jsonRpcRequest.method))
)
return timerCapture
.captureTime { callRequestHandlerAndCatchError(user, jsonRpcRequest, requestJson) }
.onComplete { result: AsyncResult<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>> ->
val success = (result.succeeded() && result.result() is Ok)
counterBuilder.tag("success", success.toString())
counterBuilder.tag("method", jsonRpcRequest.method)
counterBuilder.register(meterRegistry).increment()
metricsFacade.createCounter(
name = "jsonrpc.counter",
description = "Counting the JSON rpc request with result and method",
tags = listOf(
Tag("success", success.toString()),
Tag("method", jsonRpcRequest.method)
)
).increment()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ class VertxHttpJsonRpcClient(
}
}

metricsFacade.createSimpleTimer<Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>>(
name = "jsonrpc.request",
description = "Time of Upstream API JsonRpc Requests",
tags = listOf(
Tag("endpoint", endpoint.host),
Tag("method", request.method)
)
).captureTime { requestFuture }
Future.fromCompletionStage(
metricsFacade.createSimpleTimer<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>(
name = "jsonrpc.request",
description = "Time of Upstream API JsonRpc Requests",
tags = listOf(
Tag("endpoint", endpoint.host),
Tag("method", request.method)
)
).captureTime(requestFuture.toCompletionStage().toCompletableFuture())
)
}
.onFailure { th -> logRequestFailure(json, th) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class JsonRpcMessageProcessorTest {
Future.succeededFuture(Ok(JsonRpcSuccessResponse(jsonRpcRequest.id, JsonObject())))
}
meterRegistry = SimpleMeterRegistry()
metricsFacade = MicrometerMetricsFacade(registry = meterRegistry, "linea")
processor = JsonRpcMessageProcessor(fakeRequestHandlerAlwaysSuccess, metricsFacade, meterRegistry)
metricsFacade = MicrometerMetricsFacade(registry = meterRegistry)
processor = JsonRpcMessageProcessor(fakeRequestHandlerAlwaysSuccess, metricsFacade)
}

@Test
Expand All @@ -43,8 +43,7 @@ class JsonRpcMessageProcessorTest {
val request = buildJsonRpcRequest(method = "eth_blockNumber")
val processor = JsonRpcMessageProcessor(
{ _, _, _ -> throw RuntimeException("Something went wrong") },
metricsFacade,
meterRegistry
metricsFacade
)
processor(null, request.toString())
.onComplete(
Expand Down Expand Up @@ -187,7 +186,7 @@ class JsonRpcMessageProcessorTest {

val jsonStr = Json.encode(JsonArray(requests))

processor = JsonRpcMessageProcessor(fakeRequestHandlerWithSomeFailures, metricsFacade, meterRegistry)
processor = JsonRpcMessageProcessor(fakeRequestHandlerWithSomeFailures, metricsFacade)

processor(null, jsonStr)
.onComplete(
Expand Down Expand Up @@ -237,7 +236,7 @@ class JsonRpcMessageProcessorTest {
)
val singleAsBulk = listOf(buildJsonRpcRequest(id = 10, "read_value"))

processor = JsonRpcMessageProcessor(fakeRequestHandlerWithSomeFailures, metricsFacade, meterRegistry)
processor = JsonRpcMessageProcessor(fakeRequestHandlerWithSomeFailures, metricsFacade)
processor(null, Json.encode(request1)).get()
processor(null, Json.encode(request2)).get()
processor(null, Json.encode(request3)).get()
Expand Down Expand Up @@ -281,10 +280,10 @@ class JsonRpcMessageProcessorTest {
)
.isEqualTo(3.0)

assertThat(meterRegistry.timer("linea.jsonrpc.processing.logic", "method", "read_value").count())
assertThat(meterRegistry.timer("jsonrpc.processing.logic", "method", "read_value").count())
.isEqualTo(6)

assertThat(meterRegistry.timer("linea.jsonrpc.processing.logic", "method", "update_value").count())
assertThat(meterRegistry.timer("jsonrpc.processing.logic", "method", "update_value").count())
.isEqualTo(8)

assertThat(meterRegistry.timer("jsonrpc.serialization.request", "method", "read_value").count())
Expand All @@ -294,15 +293,15 @@ class JsonRpcMessageProcessorTest {
)
.isEqualTo(8)
assertThat(
meterRegistry.timer("linea.jsonrpc.serialization.response", "method", "read_value").count()
meterRegistry.timer("jsonrpc.serialization.response", "method", "read_value").count()
)
.isEqualTo(6)
assertThat(
meterRegistry.timer("linea.jsonrpc.serialization.response", "method", "update_value").count()
meterRegistry.timer("jsonrpc.serialization.response", "method", "update_value").count()
)
.isEqualTo(8)

assertThat(meterRegistry.timer("linea.jsonrpc.serialization.response.bulk").count()).isEqualTo(2)
assertThat(meterRegistry.timer("jsonrpc.serialization.response.bulk").count()).isEqualTo(2)
}

private fun buildJsonRpcRequest(
Expand Down
Loading

0 comments on commit 69ad44d

Please sign in to comment.