Skip to content

Commit 69220c3

Browse files
authored
kvutils/app: Add timing metrics for read/write/index services. (#5176)
* participant-state{,-index}: Move Timed*Service classes from Sandbox. CHANGELOG_BEGIN - [Ledger Integration Kit] Metrics for the various read, write, and index services. CHANGELOG_END * kvutils/app: Add timing metrics for read/write/index services. * participant-state: Move metrics-related code to another Bazel package. * participant-state-metrics: Add to artifacts.yml. * participant-state-metrics: Move TimedIndexService back into Sandbox. Cuts down on dependencies like nobody's business.
1 parent d7d45d1 commit 69220c3

File tree

13 files changed

+122
-45
lines changed

13 files changed

+122
-45
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright (c) 2020 The DAML Authors. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
load(
5+
"//bazel_tools:scala.bzl",
6+
"da_scala_library",
7+
)
8+
9+
da_scala_library(
10+
name = "participant-state-metrics",
11+
srcs = glob(["src/main/scala/**/*.scala"]),
12+
resources = glob(["src/main/resources/**/*"]),
13+
tags = ["maven_coordinates=com.daml.ledger:participant-state-metrics:__VERSION__"],
14+
visibility = [
15+
"//visibility:public",
16+
],
17+
runtime_deps = [],
18+
deps = [
19+
"//daml-lf/archive:daml_lf_dev_archive_java_proto",
20+
"//daml-lf/data",
21+
"//daml-lf/transaction",
22+
"//ledger/ledger-api-health",
23+
"//ledger/participant-state",
24+
"//libs-scala/direct-execution-context",
25+
"@maven//:com_google_protobuf_protobuf_java",
26+
"@maven//:com_typesafe_akka_akka_actor_2_12",
27+
"@maven//:com_typesafe_akka_akka_stream_2_12",
28+
"@maven//:io_dropwizard_metrics_metrics_core",
29+
],
30+
)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) 2020 The DAML Authors. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.daml.ledger.participant.state.metrics
5+
6+
import java.util.concurrent.CompletionStage
7+
8+
import akka.Done
9+
import akka.stream.scaladsl.{Keep, Source}
10+
import com.codahale.metrics.Timer
11+
import com.digitalasset.dec.DirectExecutionContext
12+
13+
import scala.concurrent.Future
14+
15+
object Metrics {
16+
17+
def timedCompletionStage[T](timer: Timer, future: => CompletionStage[T]): CompletionStage[T] = {
18+
val ctx = timer.time()
19+
future.whenComplete { (_, _) =>
20+
ctx.stop()
21+
()
22+
}
23+
}
24+
25+
def timedFuture[T](timer: Timer, future: => Future[T]): Future[T] = {
26+
val ctx = timer.time()
27+
val result = future
28+
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
29+
result
30+
}
31+
32+
def timedSource[Out, Mat](timer: Timer, source: => Source[Out, Mat]): Source[Out, Mat] = {
33+
val ctx = timer.time()
34+
source
35+
.watchTermination()(Keep.both[Mat, Future[Done]])
36+
.mapMaterializedValue {
37+
case (mat, done) =>
38+
done.onComplete(_ => ctx.stop())(DirectExecutionContext)
39+
mat
40+
}
41+
}
42+
43+
}

ledger/sandbox/src/main/scala/com/digitalasset/platform/state/TimedReadService.scala renamed to ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v1/metrics/TimedReadService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
// Copyright (c) 2020 The DAML Authors. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package com.digitalasset.platform.state
4+
package com.daml.ledger.participant.state.v1.metrics
55

66
import akka.NotUsed
77
import akka.stream.scaladsl.Source
88
import com.codahale.metrics.MetricRegistry
9+
import com.daml.ledger.participant.state.metrics.Metrics
910
import com.daml.ledger.participant.state.v1.{LedgerInitialConditions, Offset, ReadService, Update}
1011
import com.digitalasset.ledger.api.health.HealthStatus
11-
import com.digitalasset.platform.metrics.timedSource
1212

1313
final class TimedReadService(delegate: ReadService, metrics: MetricRegistry, prefix: String)
1414
extends ReadService {
@@ -22,5 +22,5 @@ final class TimedReadService(delegate: ReadService, metrics: MetricRegistry, pre
2222
delegate.currentHealth()
2323

2424
private def time[Out, Mat](name: String, source: => Source[Out, Mat]): Source[Out, Mat] =
25-
timedSource(metrics.timer(s"$prefix.$name"), source)
25+
Metrics.timedSource(metrics.timer(s"$prefix.$name"), source)
2626
}

ledger/sandbox/src/main/scala/com/digitalasset/platform/state/TimedWriteService.scala renamed to ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v1/metrics/TimedWriteService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Copyright (c) 2020 The DAML Authors. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package com.digitalasset.platform.state
4+
package com.daml.ledger.participant.state.v1.metrics
55

66
import java.util.concurrent.CompletionStage
77

88
import com.codahale.metrics.MetricRegistry
9+
import com.daml.ledger.participant.state.metrics.Metrics
910
import com.daml.ledger.participant.state.v1.{
1011
Configuration,
1112
Party,
@@ -19,7 +20,6 @@ import com.daml.ledger.participant.state.v1.{
1920
import com.digitalasset.daml.lf.data.Time
2021
import com.digitalasset.daml_lf_dev.DamlLf
2122
import com.digitalasset.ledger.api.health.HealthStatus
22-
import com.digitalasset.platform.metrics.timedFuture
2323

2424
final class TimedWriteService(delegate: WriteService, metrics: MetricRegistry, prefix: String)
2525
extends WriteService {
@@ -57,5 +57,5 @@ final class TimedWriteService(delegate: WriteService, metrics: MetricRegistry, p
5757
delegate.currentHealth()
5858

5959
private def time[T](name: String, future: => CompletionStage[T]): CompletionStage[T] =
60-
timedFuture(metrics.timer(s"$prefix.$name"), future)
60+
Metrics.timedCompletionStage(metrics.timer(s"$prefix.$name"), future)
6161
}

ledger/participant-state/kvutils/app/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ da_scala_library(
3232
"//ledger/ledger-api-common",
3333
"//ledger/ledger-api-health",
3434
"//ledger/participant-state",
35+
"//ledger/participant-state-index",
36+
"//ledger/participant-state-metrics",
3537
"//ledger/participant-state/kvutils",
3638
"//ledger/sandbox",
3739
"//libs-scala/contextualized-logging",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright (c) 2020 The DAML Authors. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.daml.ledger.participant.state.kvutils.app
5+
6+
private[app] object Metrics {
7+
8+
val ReadServicePrefix = "daml.services.read"
9+
val IndexServicePrefix = "daml.services.index"
10+
val WriteServicePrefix = "daml.services.write"
11+
12+
}

ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import java.util.UUID
88

99
import akka.actor.ActorSystem
1010
import akka.stream.Materializer
11-
import com.daml.ledger.participant.state.v1.SubmissionId
11+
import com.daml.ledger.participant.state.kvutils.app.Metrics._
12+
import com.daml.ledger.participant.state.v1.metrics.{TimedReadService, TimedWriteService}
13+
import com.daml.ledger.participant.state.v1.{SubmissionId, WritePackagesService}
1214
import com.digitalasset.daml.lf.archive.DarReader
1315
import com.digitalasset.daml_lf_dev.DamlLf.Archive
1416
import com.digitalasset.logging.LoggingContext.newLoggingContext
15-
import com.digitalasset.platform.apiserver.StandaloneApiServer
17+
import com.digitalasset.platform.apiserver.{StandaloneApiServer, TimedIndexService}
1618
import com.digitalasset.platform.indexer.StandaloneIndexerServer
1719
import com.digitalasset.resources.akka.AkkaResourceOwner
1820
import com.digitalasset.resources.{Resource, ResourceOwner}
@@ -49,10 +51,12 @@ class Runner[T <: ReadWriteService, Extra](
4951
val metricRegistry = factory.metricRegistry(participantConfig, config)
5052
for {
5153
ledger <- factory.readWriteServiceOwner(config, participantConfig).acquire()
54+
readService = new TimedReadService(ledger, metricRegistry, ReadServicePrefix)
55+
writeService = new TimedWriteService(ledger, metricRegistry, WriteServicePrefix)
5256
_ <- Resource.fromFuture(
53-
Future.sequence(config.archiveFiles.map(uploadDar(_, ledger))))
57+
Future.sequence(config.archiveFiles.map(uploadDar(_, writeService))))
5458
_ <- new StandaloneIndexerServer(
55-
readService = ledger,
59+
readService = readService,
5660
config = factory.indexerConfig(participantConfig, config),
5761
metrics = metricRegistry,
5862
).acquire()
@@ -61,8 +65,10 @@ class Runner[T <: ReadWriteService, Extra](
6165
commandConfig = factory.commandConfig(config),
6266
partyConfig = factory.partyConfig(config),
6367
submissionConfig = factory.submissionConfig(config),
64-
readService = ledger,
65-
writeService = ledger,
68+
readService = readService,
69+
writeService = writeService,
70+
transformIndexService =
71+
service => new TimedIndexService(service, metricRegistry, IndexServicePrefix),
6672
authService = factory.authService(config),
6773
metrics = metricRegistry,
6874
timeServiceBackend = factory.timeServiceBackend(config),
@@ -75,7 +81,7 @@ class Runner[T <: ReadWriteService, Extra](
7581
}
7682
}
7783

78-
private def uploadDar(from: Path, to: ReadWriteService)(
84+
private def uploadDar(from: Path, to: WritePackagesService)(
7985
implicit executionContext: ExecutionContext
8086
): Future[Unit] = {
8187
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)

ledger/sandbox/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ compile_deps = [
3434
"//ledger/ledger-on-sql",
3535
"//ledger/participant-state",
3636
"//ledger/participant-state-index",
37+
"//ledger/participant-state-metrics",
3738
"//ledger/participant-state/kvutils",
3839
"//libs-scala/build-info",
3940
"//libs-scala/contextualized-logging",

ledger/sandbox/src/main/scala/com/digitalasset/platform/state/TimedIndexService.scala renamed to ledger/sandbox/src/main/scala/com/digitalasset/platform/apiserver/TimedIndexService.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) 2020 The DAML Authors. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package com.digitalasset.platform.state
4+
package com.digitalasset.platform.apiserver
55

66
import java.time.Instant
77

@@ -10,6 +10,7 @@ import akka.stream.scaladsl.Source
1010
import com.codahale.metrics.MetricRegistry
1111
import com.daml.ledger.participant.state.index.v2
1212
import com.daml.ledger.participant.state.index.v2.IndexService
13+
import com.daml.ledger.participant.state.metrics.Metrics
1314
import com.daml.ledger.participant.state.v1.{Configuration, PackageId, ParticipantId, Party}
1415
import com.digitalasset.daml.lf.language.Ast
1516
import com.digitalasset.daml.lf.transaction.Node
@@ -25,7 +26,6 @@ import com.digitalasset.ledger.api.v1.transaction_service.{
2526
GetTransactionTreesResponse,
2627
GetTransactionsResponse
2728
}
28-
import com.digitalasset.platform.metrics.{timedFuture, timedSource}
2929

3030
import scala.concurrent.Future
3131

@@ -146,8 +146,8 @@ final class TimedIndexService(delegate: IndexService, metrics: MetricRegistry, p
146146
delegate.currentHealth()
147147

148148
private def time[T](name: String, future: => Future[T]): Future[T] =
149-
timedFuture(metrics.timer(s"$prefix.$name"), future)
149+
Metrics.timedFuture(metrics.timer(s"$prefix.$name"), future)
150150

151151
private def time[Out, Mat](name: String, source: => Source[Out, Mat]): Source[Out, Mat] =
152-
timedSource(metrics.timer(s"$prefix.$name"), source)
152+
Metrics.timedSource(metrics.timer(s"$prefix.$name"), source)
153153
}

ledger/sandbox/src/main/scala/com/digitalasset/platform/metrics/package.scala

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,12 @@
33

44
package com.digitalasset.platform
55

6-
import java.util.concurrent.CompletionStage
7-
8-
import akka.Done
9-
import akka.stream.scaladsl.{Keep, Source}
106
import com.codahale.metrics.Timer
117
import com.digitalasset.dec.DirectExecutionContext
128

139
import scala.concurrent.Future
1410

1511
package object metrics {
16-
def timedFuture[T](timer: Timer, future: => CompletionStage[T]): CompletionStage[T] = {
17-
val ctx = timer.time()
18-
future.whenComplete { (_, _) =>
19-
ctx.stop()
20-
()
21-
}
22-
}
2312

2413
def timedFuture[T](timer: Timer, future: => Future[T]): Future[T] = {
2514
val ctx = timer.time()
@@ -28,14 +17,4 @@ package object metrics {
2817
result
2918
}
3019

31-
def timedSource[Out, Mat](timer: Timer, source: => Source[Out, Mat]): Source[Out, Mat] = {
32-
val ctx = timer.time()
33-
source
34-
.watchTermination()(Keep.both[Mat, Future[Done]])
35-
.mapMaterializedValue {
36-
case (mat, done) =>
37-
done.onComplete(_ => ctx.stop())(DirectExecutionContext)
38-
mat
39-
}
40-
}
4120
}

ledger/sandbox/src/main/scala/com/digitalasset/platform/sandbox/SandboxServer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import akka.actor.ActorSystem
1111
import akka.stream.Materializer
1212
import akka.stream.scaladsl.Sink
1313
import com.codahale.metrics.MetricRegistry
14+
import com.daml.ledger.participant.state.v1.metrics.TimedWriteService
1415
import com.daml.ledger.participant.state.v1.{ParticipantId, SeedService}
1516
import com.daml.ledger.participant.state.{v1 => ParticipantState}
1617
import com.digitalasset.api.util.TimeProvider
@@ -29,7 +30,8 @@ import com.digitalasset.platform.apiserver.{
2930
ApiServer,
3031
ApiServices,
3132
LedgerApiServer,
32-
TimeServiceBackend
33+
TimeServiceBackend,
34+
TimedIndexService
3335
}
3436
import com.digitalasset.platform.packages.InMemoryPackageStore
3537
import com.digitalasset.platform.sandbox.SandboxServer._
@@ -45,7 +47,6 @@ import com.digitalasset.platform.sandbox.stores.{
4547
SandboxIndexAndWriteService
4648
}
4749
import com.digitalasset.platform.services.time.TimeProviderType
48-
import com.digitalasset.platform.state.{TimedIndexService, TimedWriteService}
4950
import com.digitalasset.ports.Port
5051
import com.digitalasset.resources.akka.AkkaResourceOwner
5152
import com.digitalasset.resources.{Resource, ResourceOwner}

ledger/sandbox/src/main/scala/com/digitalasset/platform/sandboxnext/Runner.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import com.daml.ledger.on.sql.Database.InvalidDatabaseException
1414
import com.daml.ledger.on.sql.SqlLedgerReaderWriter
1515
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
1616
import com.daml.ledger.participant.state.v1
17-
import com.daml.ledger.participant.state.v1.{SeedService, WriteService}
17+
import com.daml.ledger.participant.state.v1.metrics.{TimedReadService, TimedWriteService}
18+
import com.daml.ledger.participant.state.v1.{SeedService, WritePackagesService}
1819
import com.digitalasset.api.util.TimeProvider
1920
import com.digitalasset.buildinfo.BuildInfo
2021
import com.digitalasset.daml.lf.archive.DarReader
@@ -28,7 +29,8 @@ import com.digitalasset.platform.apiserver.{
2829
ApiServer,
2930
ApiServerConfig,
3031
StandaloneApiServer,
31-
TimeServiceBackend
32+
TimeServiceBackend,
33+
TimedIndexService
3234
}
3335
import com.digitalasset.platform.common.LedgerIdMode
3436
import com.digitalasset.platform.indexer.{
@@ -42,7 +44,6 @@ import com.digitalasset.platform.sandbox.metrics.MetricsReporting
4244
import com.digitalasset.platform.sandbox.services.SandboxResetService
4345
import com.digitalasset.platform.sandboxnext.Runner._
4446
import com.digitalasset.platform.services.time.TimeProviderType
45-
import com.digitalasset.platform.state.{TimedIndexService, TimedReadService, TimedWriteService}
4647
import com.digitalasset.platform.store.FlywayMigrations
4748
import com.digitalasset.ports.Port
4849
import com.digitalasset.resources.akka.AkkaResourceOwner
@@ -243,7 +244,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
243244
owner.acquire()
244245
}
245246

246-
private def uploadDar(from: File, to: WriteService)(
247+
private def uploadDar(from: File, to: WritePackagesService)(
247248
implicit executionContext: ExecutionContext
248249
): Future[Unit] = {
249250
val submissionId = v1.SubmissionId.assertFromString(UUID.randomUUID().toString)
@@ -267,8 +268,8 @@ object Runner {
267268
private val InMemoryIndexJdbcUrl =
268269
"jdbc:h2:mem:index;db_close_delay=-1;db_close_on_exit=false"
269270

270-
private val ReadServicePrefix = "daml.services.read"
271271
private val IndexServicePrefix = "daml.services.index"
272+
private val ReadServicePrefix = "daml.services.read"
272273
private val WriteServicePrefix = "daml.services.write"
273274

274275
private val HeartbeatInterval: FiniteDuration = 1.second

release/artifacts.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@
174174
type: jar-proto
175175
- target: //ledger/participant-state-index:participant-state-index
176176
type: jar-scala
177+
- target: //ledger/participant-state-metrics:participant-state-metrics
178+
type: jar-scala
177179
- target: //ledger/sandbox:ledger-api-server
178180
type: jar-scala
179181
- target: //ledger/sandbox:sandbox

0 commit comments

Comments
 (0)