diff --git a/build.sbt b/build.sbt index 2cfb248..0d03c75 100644 --- a/build.sbt +++ b/build.sbt @@ -12,22 +12,24 @@ lazy val bitcoinrpc = (project in file(".")). publishTo := Some(Resolver.url("TA-S3", url("s3://ivy-jar-repository-ta/"))(Resolver.ivyStylePatterns)) ). settings( - libraryDependencies ++= http4s ++ json ++ zmq ++ cats + libraryDependencies ++= http4s ++ json ++ zmq ++ cats ) -val http4sVersion = "0.20.11" +val circeVersion = "0.12.3" +val http4sVersion = "0.21.0-M5" lazy val http4s = Seq( "org.http4s" %% "http4s-dsl" % http4sVersion, "org.http4s" %% "http4s-blaze-server" % http4sVersion, - "org.http4s" %% "http4s-blaze-client" % http4sVersion + "org.http4s" %% "http4s-blaze-client" % http4sVersion, + "org.http4s" %% "http4s-prometheus-metrics" % http4sVersion, + "org.http4s" %% "http4s-circe" % http4sVersion, ) lazy val json = Seq( - "org.http4s" %% "http4s-circe" % http4sVersion, - "io.circe" %% "circe-generic" % "0.11.1", - "io.circe" %% "circe-literal" % "0.11.1", - "io.circe" %% "circe-parser" % "0.11.1" + "io.circe" %% "circe-generic" % circeVersion, + "io.circe" %% "circe-literal" % circeVersion, + "io.circe" %% "circe-parser" % circeVersion, ) lazy val zmq = Seq ( diff --git a/src/main/scala/RPCClient.scala b/src/main/scala/RPCClient.scala index 2e3e0e6..ff0e7c1 100644 --- a/src/main/scala/RPCClient.scala +++ b/src/main/scala/RPCClient.scala @@ -16,7 +16,7 @@ */ package io.tokenanalyst.bitcoinrpc -import cats.effect.{ContextShift, IO, Resource} +import cats.effect.{ContextShift, IO, Resource, Sync, Clock} import io.circe.{Decoder, Encoder, Json} import org.http4s.circe.CirceEntityDecoder._ import org.http4s.circe.CirceEntityEncoder._ @@ -31,6 +31,10 @@ import java.net.{ConnectException, SocketTimeoutException} import scala.concurrent.ExecutionContext import scala.concurrent.duration._ +import io.prometheus.client.CollectorRegistry +import org.http4s.client.middleware.Metrics +import org.http4s.metrics.prometheus.Prometheus + object RPCClient { def bitcoin( @@ -42,7 +46,8 @@ object RPCClient { onErrorRetry: (Int, Throwable) => IO[Unit] = (_,_) => IO.unit )( implicit ec: ExecutionContext, - cs: ContextShift[IO] + cs: ContextShift[IO], + clock: Clock[IO] ): Resource[IO, Bitcoin] = { val config = Config(hosts, port, username, password, zmqPort) for (client <- make(config, onErrorRetry)) yield Bitcoin(client) @@ -57,33 +62,48 @@ object RPCClient { onErrorRetry: (Int, Throwable) => IO[Unit] = (_,_) => IO.unit )( implicit ec: ExecutionContext, - cs: ContextShift[IO] + cs: ContextShift[IO], + clock: Clock[IO] ): Resource[IO, Omni] = { val config = Config(hosts, port, username, password, zmqPort) for (client <- make(config, onErrorRetry)) yield Omni(client) } + def wrap(registry: CollectorRegistry, client: Client[IO]) + (implicit c: Clock[IO], s: Sync[IO]): Resource[IO, Client[IO]] = { + val classifierFunc = (r: Request[IO]) => Some(r.method.toString.toLowerCase) + Resource.make { for { + ops <- Prometheus[IO](registry, "client") + } yield Metrics[IO](ops, classifierFunc)(client) + }{ _ => IO.unit} +} + def make(config: Config, onErrorRetry: (Int, Throwable) => IO[Unit])( implicit ec: ExecutionContext, - cs: ContextShift[IO] - ): Resource[IO, RPCClient] = { - for { + cs: ContextShift[IO], + sync: Sync[IO], + clock: Clock[IO] + ): Resource[IO, RPCClient] = for { client <- BlazeClientBuilder[IO](ec) - .withConnectTimeout(5.seconds) - .withRequestTimeout(2.minutes) - .resource + .withConnectTimeout(5.seconds) + .withRequestTimeout(2.minutes) + .resource + registry = CollectorRegistry.defaultRegistry + metered <- wrap(registry, client) socket <- ZeroMQ.socket( config.hosts.head, config.zmqPort.getOrElse(28332) ) - } yield new RPCClient(client, socket, config, onErrorRetry) - } + } yield { + new RPCClient(metered, socket, config, registry, onErrorRetry) + } } -class RPCClient( +case class RPCClient( client: Client[IO], zmq: ZeroMQ.Socket, config: Config, + metrics: CollectorRegistry, onErrorRetry: (Int, Throwable) => IO[Unit] ) extends Http4sClientDsl[IO] { diff --git a/src/main/scala/examples/MetricsExport.scala b/src/main/scala/examples/MetricsExport.scala new file mode 100644 index 0000000..16466ec --- /dev/null +++ b/src/main/scala/examples/MetricsExport.scala @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.tokenanalyst.bitcoinrpc.examples + +import cats.effect._ +import cats.effect.{ExitCode, IO, IOApp} +import scala.concurrent.ExecutionContext.global + +import io.tokenanalyst.bitcoinrpc.Bitcoin +import io.tokenanalyst.bitcoinrpc.{RPCClient, Config} +import io.tokenanalyst.bitcoinrpc.bitcoin.Syntax._ +import org.http4s.implicits._ + +import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.server.{Router, Server} +import org.http4s.metrics.prometheus.PrometheusExportService + +object MetricsExport extends IOApp { + + def loop(rpc: Bitcoin, current: Long = 0L, until: Long = 10L): IO[Unit] = + for { + block <- rpc.getBlockByHeight(current) + _ <- IO { println(block) } + l <- if (current + 1 < until) loop(rpc, current + 1, until) else IO.unit + } yield l + + def run(args: List[String]): IO[ExitCode] = { + implicit val ec = global + implicit val config = Config.fromEnv + val res = for { + rpc <- RPCClient + .bitcoin(config.hosts, config.port, config.username, config.password) + server <- BlazeServerBuilder[IO] + .bindHttp(8080) + .withHttpApp( + Router( + "/" -> PrometheusExportService[IO](rpc.client.metrics).routes + ).orNotFound + ) + .resource + } yield (rpc, server) + res.use { + case (bitcoin: Bitcoin, _: Server[IO]) => + for { + _ <- loop(bitcoin) + } yield ExitCode(0) + } + } +}