Skip to content

Commit 07436ee

Browse files
authored
add kamon-pekko-connectors-kafka module (#1367)
1 parent 28e2310 commit 07436ee

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

build.sbt

+16
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,21 @@ lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc
606606
)
607607
)).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test")
608608

609+
lazy val `kamon-pekko-connectors-kafka` = (project in file("instrumentation/kamon-pekko-connectors-kafka"))
610+
.disablePlugins(AssemblyPlugin)
611+
.enablePlugins(JavaAgent)
612+
.settings(instrumentationSettings)
613+
.settings(
614+
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
615+
libraryDependencies ++= Seq(
616+
kanelaAgent % "provided",
617+
"org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % "provided",
618+
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided",
619+
scalatest % "test",
620+
logbackClassic % "test"
621+
)
622+
).dependsOn(`kamon-core`, `kamon-pekko`, `kamon-testkit` % "test")
623+
609624
lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
610625
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
611626
.disablePlugins(AssemblyPlugin)
@@ -1156,6 +1171,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo
11561171
`kamon-pekko`,
11571172
`kamon-pekko-http`,
11581173
`kamon-pekko-grpc`,
1174+
`kamon-pekko-connectors-kafka`,
11591175
`kamon-tapir`,
11601176
`kamon-alpakka-kafka`
11611177
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# ==================================================== #
2+
# Kamon Pekko Connectors Kafka Reference Configuration #
3+
# ==================================================== #
4+
5+
kanela {
6+
modules {
7+
pekko-connectors-kafka {
8+
9+
name = "Apache Pekko Connectors Kafka Instrumentation"
10+
description = "PREVIEW. Provides context propagation for Apache Pekko Connectors Kafka applications"
11+
instrumentations = [
12+
"kamon.instrumentation.pekko.connectors.kafka.ProducerMessageInstrumentation"
13+
]
14+
15+
within = [
16+
"org.apache.pekko.kafka.ProducerMessage\\$Message",
17+
"org.apache.pekko.kafka.ProducerMessage\\$MultiMessage",
18+
"org.apache.pekko.kafka.internal.DefaultProducerStageLogic"
19+
]
20+
}
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* ==========================================================================================
3+
* Copyright © 2013-2022 The Kamon Project <https://kamon.io/>
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
6+
* except in compliance with the License. You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the specific language governing permissions
13+
* and limitations under the License.
14+
* ==========================================================================================
15+
*/
16+
17+
package kamon
18+
package instrumentation
19+
package pekko
20+
package connectors
21+
package kafka
22+
23+
import kamon.Kamon
24+
import kamon.context.Storage
25+
import kamon.context.Storage.Scope
26+
import kamon.instrumentation.context.HasContext
27+
import kanela.agent.api.instrumentation.InstrumentationBuilder
28+
import kanela.agent.libs.net.bytebuddy.asm.Advice
29+
30+
class ProducerMessageInstrumentation extends InstrumentationBuilder {
31+
32+
/**
33+
* Captures the current context the a Message or MultiMessage is created and restores it while
34+
* the ProducerLogic is running, so the proper context gets propagated to the Kafka Producer.
35+
*/
36+
onTypes("org.apache.pekko.kafka.ProducerMessage$Message", "org.apache.pekko.kafka.ProducerMessage$MultiMessage")
37+
.mixin(classOf[HasContext.MixinWithInitializer])
38+
39+
onTypes(
40+
"org.apache.pekko.kafka.internal.DefaultProducerStageLogic",
41+
"org.apache.pekko.kafka.internal.CommittingProducerSinkStageLogic"
42+
)
43+
.advise(method("produce"), ProduceWithEnvelopeContext)
44+
}
45+
46+
object ProduceWithEnvelopeContext {
47+
48+
@Advice.OnMethodEnter
49+
def enter(@Advice.Argument(0) envelope: Any): Storage.Scope = {
50+
envelope match {
51+
case hasContext: HasContext => Kamon.storeContext(hasContext.context)
52+
case _ => Scope.Empty
53+
}
54+
}
55+
56+
@Advice.OnMethodExit(onThrowable = classOf[Throwable])
57+
def exit(@Advice.Enter scope: Storage.Scope): Unit =
58+
scope.close()
59+
}

0 commit comments

Comments
 (0)