diff --git a/build.sbt b/build.sbt index 7c2f4cac7..f520304a3 100644 --- a/build.sbt +++ b/build.sbt @@ -126,6 +126,7 @@ val instrumentationProjects = Seq[ProjectReference]( `kamon-mongo-legacy`, `kamon-cassandra`, `kamon-elasticsearch`, + `kamon-opensearch`, `kamon-spring`, `kamon-annotation`, `kamon-annotation-api`, @@ -404,6 +405,22 @@ lazy val `kamon-elasticsearch` = (project in file("instrumentation/kamon-elastic ) ).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test") +lazy val `kamon-opensearch` = (project in file("instrumentation/kamon-opensearch")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + Test / run / fork := true, + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "org.opensearch.client" % "opensearch-rest-client" % "1.3.14" % "provided", + "org.opensearch.client" % "opensearch-rest-high-level-client" % "1.3.14" % "provided", + scalatest % "test", + logbackClassic % "test", + "com.dimafeng" %% "testcontainers-scala" % "0.41.0" % "test", + ) + ).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test") + lazy val `kamon-spring` = (project in file("instrumentation/kamon-spring")) .disablePlugins(AssemblyPlugin) .enablePlugins(JavaAgent) @@ -1058,6 +1075,7 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle `kamon-mongo-legacy`, `kamon-cassandra`, `kamon-elasticsearch`, + `kamon-opensearch`, `kamon-spring`, `kamon-annotation`, `kamon-annotation-api`, @@ -1117,6 +1135,7 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d `kamon-jdbc`, `kamon-kafka`, `kamon-elasticsearch`, + `kamon-opensearch`, `kamon-spring`, `kamon-annotation`, `kamon-annotation-api`, diff --git a/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/AsyncOpensearchRestClientInstrumentation.java b/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/AsyncOpensearchRestClientInstrumentation.java new file mode 100644 index 000000000..6abebf9b3 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/AsyncOpensearchRestClientInstrumentation.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.opensearch; + +import kamon.Kamon; +import kamon.trace.Span; +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import org.opensearch.client.Request; +import org.opensearch.client.ResponseListener; + +public class AsyncOpensearchRestClientInstrumentation { + + @Advice.OnMethodEnter + public static void enter( + @Advice.Argument(0) Request request, + @Advice.Argument(value = 1, readOnly = false) ResponseListener responseListener) { + final String operationName = + RequestNameConverter.convert( + HighLevelOpensearchClientInstrumentation.requestClassName.get(), + "AsyncRequest"); + + Span span = Kamon.clientSpanBuilder(operationName, "opensearch.client") + .tag("opensearch.http.endpoint", request.getEndpoint()) + .tag("opensearch.http.method", request.getMethod()) + .start(); + + RequestSizeHistogram.record(request.getEntity()); + responseListener = new InstrumentedListener(responseListener, span); + } +} diff --git a/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/HighLevelOpensearchClientInstrumentation.java b/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/HighLevelOpensearchClientInstrumentation.java new file mode 100644 index 000000000..cfffab0d0 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/HighLevelOpensearchClientInstrumentation.java @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.opensearch; + +import kanela.agent.libs.net.bytebuddy.asm.Advice; + +public class HighLevelOpensearchClientInstrumentation { + public static final ThreadLocal requestClassName = new ThreadLocal<>(); + + @Advice.OnMethodEnter + public static void enter( + @Advice.Argument(0) Req request) { + requestClassName.set(request.getClass().getSimpleName()); + } + + @Advice.OnMethodExit + public static void exit() { + requestClassName.remove(); + } +} diff --git a/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/SyncOpensearchRestClientInstrumentation.java b/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/SyncOpensearchRestClientInstrumentation.java new file mode 100644 index 000000000..960a73b08 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/main/java/kamon/instrumentation/opensearch/SyncOpensearchRestClientInstrumentation.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.opensearch; + +import kamon.Kamon; +import kamon.trace.Span; +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import org.opensearch.client.Request; + +public class SyncOpensearchRestClientInstrumentation { + + @Advice.OnMethodEnter + public static void enter( + @Advice.Argument(0) Request request, + @Advice.Local("span") Span span) { + final String operationName = RequestNameConverter.convert( + HighLevelOpensearchClientInstrumentation.requestClassName.get(), + "SyncRequest"); + + RequestSizeHistogram.record(request.getEntity()); + span = Kamon.clientSpanBuilder(operationName, "opensearch.client") + .tag("opensearch.http.endpoint", request.getEndpoint()) + .tag("opensearch.http.method", request.getMethod()) + .start(); + } + + @Advice.OnMethodExit + public static void exit( + @Advice.Local("span") Span span) { + span.finish(); + } +} diff --git a/instrumentation/kamon-opensearch/src/main/resources/reference.conf b/instrumentation/kamon-opensearch/src/main/resources/reference.conf new file mode 100644 index 000000000..11f839324 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/main/resources/reference.conf @@ -0,0 +1,23 @@ +# ================================================== # +# kamon Opensearch client reference configuration # +# ================================================== # + +kamon.instrumentation.opensearch { +} + +kanela { + modules { + opensearch-driver { + + name = "Opensearch Client" + description = "Provides tracing of client calls made with the official Opensearch Client library." + instrumentations = [ + "kamon.instrumentation.opensearch.OSInstrumentation" + ] + + within = [ + "org.opensearch.client..*" + ] + } + } +} diff --git a/instrumentation/kamon-opensearch/src/main/scala/kamon/instrumentation/opensearch/OSInstrumentation.scala b/instrumentation/kamon-opensearch/src/main/scala/kamon/instrumentation/opensearch/OSInstrumentation.scala new file mode 100644 index 000000000..b8f762e7f --- /dev/null +++ b/instrumentation/kamon-opensearch/src/main/scala/kamon/instrumentation/opensearch/OSInstrumentation.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.opensearch + +import kamon.Kamon +import kamon.trace.Span +import kanela.agent.api.instrumentation.InstrumentationBuilder +import org.apache.http.HttpEntity +import org.opensearch.client.{Response, ResponseListener} + +class OSInstrumentation extends InstrumentationBuilder { + onType("org.opensearch.client.RestClient") + .advise(method("performRequestAsync").and(takesArguments(2)), classOf[AsyncOpensearchRestClientInstrumentation]) + .advise(method("performRequest").and(takesArguments(1)), classOf[SyncOpensearchRestClientInstrumentation]) + + onType("org.opensearch.client.RestHighLevelClient") + .advise(method("internalPerformRequest").and(takesArguments(5)), classOf[HighLevelOpensearchClientInstrumentation]) + .advise(method("internalPerformRequestAsync").and(takesArguments(6)), classOf[HighLevelOpensearchClientInstrumentation]) +} + +class InstrumentedListener(inner: ResponseListener, span: Span) extends ResponseListener { + override def onSuccess(response: Response): Unit = { + span.finish() + inner.onSuccess(response) + } + + override def onFailure(exception: Exception): Unit = { + span.fail(exception) + inner.onFailure(exception) + } +} + +object RequestSizeHistogram { + private val histogram = Kamon.histogram("opensearch.request.size").withoutTags() + + def record(entity: HttpEntity): Unit = { + Option(entity) + .map(_.getContentLength) + .filter(_ >= 0) + .foreach(histogram.record) + } +} diff --git a/instrumentation/kamon-opensearch/src/main/scala/kamon/instrumentation/opensearch/RequestNameConverter.scala b/instrumentation/kamon-opensearch/src/main/scala/kamon/instrumentation/opensearch/RequestNameConverter.scala new file mode 100644 index 000000000..c7c09b585 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/main/scala/kamon/instrumentation/opensearch/RequestNameConverter.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.opensearch + +object RequestNameConverter { + def convert(className: String, fallback: String): String = { + val requestType = Option(className).getOrElse(fallback) + s"opensearch/$requestType" + } +} diff --git a/instrumentation/kamon-opensearch/src/test/resources/application.conf b/instrumentation/kamon-opensearch/src/test/resources/application.conf new file mode 100644 index 000000000..63cec7034 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/test/resources/application.conf @@ -0,0 +1,6 @@ +kamon.instrumentation.opensearch { +} +kanela { +# debug-mode = true +# log-level = "DEBUG" +} diff --git a/instrumentation/kamon-opensearch/src/test/resources/logback.xml b/instrumentation/kamon-opensearch/src/test/resources/logback.xml new file mode 100644 index 000000000..abf859055 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + diff --git a/instrumentation/kamon-opensearch/src/test/scala/kamon/instrumentation/OpenSearchInstrumentationTest.scala b/instrumentation/kamon-opensearch/src/test/scala/kamon/instrumentation/OpenSearchInstrumentationTest.scala new file mode 100644 index 000000000..67a0ecd09 --- /dev/null +++ b/instrumentation/kamon-opensearch/src/test/scala/kamon/instrumentation/OpenSearchInstrumentationTest.scala @@ -0,0 +1,114 @@ +package kamon.instrumentation + +import com.dimafeng.testcontainers.{GenericContainer, ForAllTestContainer} +import kamon.tag.Lookups.plain +import kamon.testkit.{InitAndStopKamonAfterAll, Reconfigure, TestSpanReporter} +import org.apache.http.HttpHost +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.cluster.node.tasks.list.{ListTasksRequest, ListTasksResponse} +import org.opensearch.client._ +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.SpanSugar +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.OptionValues +import org.testcontainers.containers.wait.strategy.Wait + + +class OpenSearchInstrumentationTest + extends AnyWordSpec + with Matchers + with Eventually + with SpanSugar + with Reconfigure + with OptionValues + with TestSpanReporter + with InitAndStopKamonAfterAll + with ForAllTestContainer { + + val endpointTag = "opensearch.http.endpoint" + val methodTag = "opensearch.http.method" + + "The opensearch client" should { + "records a span for a basic sync request" in { + client.performRequest(new Request("GET", "/_cluster/health")) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName should be("opensearch/SyncRequest") + span.tags.get(plain(endpointTag)) shouldBe "/_cluster/health" + span.tags.get(plain(methodTag)) shouldBe "GET" + } + } + + "records a span for a basic async request" in { + client.performRequestAsync(new Request("GET", "/_cluster/health"), + new ResponseListener() { + override def onSuccess(response: Response): Unit = () + override def onFailure(exception: Exception): Unit = () + }) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName should be("opensearch/AsyncRequest") + span.tags.get(plain(endpointTag)) shouldBe "/_cluster/health" + span.tags.get(plain(methodTag)) shouldBe "GET" + } + } + + "records a span for a high level sync request" in { + highLevelClient.tasks().list(new ListTasksRequest(), RequestOptions.DEFAULT) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName should be("opensearch/ListTasksRequest") + span.tags.get(plain(endpointTag)) shouldBe "/_tasks" + span.tags.get(plain(methodTag)) shouldBe "GET" + } + } + + "records a span for a high level async request" in { + val request = new ListTasksRequest() + val listener = new ActionListener[ListTasksResponse] { + override def onResponse(response: ListTasksResponse): Unit = () + override def onFailure(e: Exception): Unit = () + } + + highLevelClient.tasks().listAsync(request, RequestOptions.DEFAULT, listener) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName should be("opensearch/ListTasksRequest") + span.tags.get(plain(endpointTag)) shouldBe "/_tasks" + span.tags.get(plain(methodTag)) shouldBe "GET" + } + } + } + + + override val container: GenericContainer = GenericContainer( + "opensearchproject/opensearch:1.3.14", + exposedPorts = Seq(9200), + env = Map("discovery.type" -> "single-node", "plugins.security.disabled" -> "true"), + waitStrategy = Wait.forHttp("/_cluster/health") + ) + var client: RestClient = _ + var highLevelClient: RestHighLevelClient = _ + + override protected def beforeAll(): Unit = { + super.beforeAll() + container.start() + + client = RestClient + .builder(HttpHost.create(s"${container.host}:${container.mappedPort(9200)}")) + .build() + + highLevelClient = new RestHighLevelClient( + RestClient.builder(HttpHost.create(s"${container.host}:${container.mappedPort(9200)}"))) + } + + override protected def afterAll(): Unit = { + container.stop() + super.afterAll() + } +}