Skip to content

Commit

Permalink
docs: sample otel
Browse files Browse the repository at this point in the history
  • Loading branch information
Guilherme Ferreira committed Nov 27, 2023
1 parent 9c57625 commit d361097
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>


<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="OpenTelemetry" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.6.0" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/PrintConsoleHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Threading.Tasks;
using KafkaFlow.TypedHandler;

namespace KafkaFlow.Sample.OpenTelemetry;

public class PrintConsoleHandler : IMessageHandler<TestMessage>
{
public Task Handle(IMessageContext context, TestMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);

return Task.CompletedTask;
}
}
95 changes: 95 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.OpenTelemetry;
using KafkaFlow.Producers;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace KafkaFlow.Sample.OpenTelemetry;

public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();

const string producerName = "PrintConsole";
const string topicName = "otel-sample-topic";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(
serviceName: "DemoApp",
serviceVersion: "1.0.0"))
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddConsoleExporter()
.AddOtlpExporter()
.Build();

services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddOpenTelemetryInstrumentation()
.AddCluster(
cluster => cluster
.WithBrokers(new[]
{
"localhost:9092"
})
.CreateTopicIfNotExists(topicName, 6, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
)
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId("print-console-handler")
.WithBufferSize(100)
.WithWorkersCount(3)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
)
);

var provider = services.BuildServiceProvider();

var bus = provider.CreateKafkaBus();

await bus.StartAsync();

var producer = provider
.GetRequiredService<IProducerAccessor>()
.GetProducer(producerName);

while (true)
{
Console.WriteLine("Type the message you want to send or 'exit' to quit:");
var input = Console.ReadLine();


if (input!.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
await bus.StopAsync();
break;
}

await producer.ProduceAsync(
topicName,
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {input}" });
}

await Task.Delay(3000);
}
}
52 changes: 52 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# KafkaFlow.Sample.OpenTelemetry

This is a simple sample that shows how to enable [OpenTelemetry](https://opentelemetry.io/) instrumentation when using KafkaFlow.

## How to run

### Requirements

- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0)
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)

### Start the cluster

Using your terminal of choice, start the cluster.
You can find a docker-compose file at the root of this repository.
Position the terminal in that folder and run the following command.

```bash
docker-compose up -d
```

### Start Jaeger

Using your terminal of choice, start the [Jaeger](https://www.jaegertracing.io/).

```
docker run --rm --name jaeger \
-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
-p 14250:14250 \
-p 14268:14268 \
-p 14269:14269 \
-p 9411:9411 \
jaegertracing/all-in-one:1.51
```

### Run the Sample

Using your terminal of choice, start the sample for the sample folder.

```bash
dotnet run
```

### See the traces collected

Go to http://localhost:16686/ and observe the traces collected for your application.
10 changes: 10 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/TestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Runtime.Serialization;

namespace KafkaFlow.Sample.OpenTelemetry;

[DataContract]
public class TestMessage
{
[DataMember(Order = 1)]
public string Text { get; set; }
}
7 changes: 7 additions & 0 deletions src/KafkaFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Telemetry", "Telemetry", "{
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{1557B135-4925-4FA2-80DA-8AD13155F3BD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "..\samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{C684DC85-69EE-4088-A564-B62B82B5CE17}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -233,6 +235,10 @@ Global
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.Build.0 = Release|Any CPU
{C684DC85-69EE-4088-A564-B62B82B5CE17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C684DC85-69EE-4088-A564-B62B82B5CE17}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C684DC85-69EE-4088-A564-B62B82B5CE17}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C684DC85-69EE-4088-A564-B62B82B5CE17}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -273,6 +279,7 @@ Global
{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{1557B135-4925-4FA2-80DA-8AD13155F3BD} = {96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39}
{C684DC85-69EE-4088-A564-B62B82B5CE17} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Expand Down
8 changes: 7 additions & 1 deletion website/docs/getting-started/samples.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ You can find the code here: [/samples/KafkaFlow.Sample.PauseConsumerOnError](htt

## Consumer Throttling

This is a sample that shows how to throttle a consumer based on others consumers lag
This is a sample that shows how to throttle a consumer based on others consumers lag.

You can find the code here: [/samples/KafkaFlow.Sample.ConsumerThrottling](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.ConsumerThrottling)

## Open Telemetry

This is a sample that shows how to enable OpenTelemetry instrumentation when using KafkaFlow.

You can find the code here: [/samples/KafkaFlow.Sample.OpenTelemetry](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.OpenTelemetry)
18 changes: 16 additions & 2 deletions website/docs/guides/open-telemetry.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
---
sidebar_position: 10
sidebar_label: OpenTelemetry
---

# OpenTelemetry instrumentation

In this section, we will explore how to enable OpenTelemetry instrumentation when using KafkaFlow.

KafkaFlow includes support for [Traces](https://opentelemetry.io/docs/concepts/signals/traces/) and [Baggage](https://opentelemetry.io/docs/concepts/signals/baggage/) signals using [OpenTelemetry instrumentation](https://opentelemetry.io/docs/instrumentation/net/).

:::tip
You can find a sample on how to enable OpenTelemetry [here](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.OpenTelemetry).
:::

## Including OpenTelemetry instrumentation in your code

Add the package [KafkaFlow.OpenTelemetry](https://www.nuget.org/packages/KafkaFlow.OpenTelemetry/) to the project and add the extension method `AddOpenTelemetryInstrumentation` in your Startup:
Add the package [KafkaFlow.OpenTelemetry](https://www.nuget.org/packages/KafkaFlow.OpenTelemetry/) to the project and add the extension method `AddOpenTelemetryInstrumentation` in your [configuration](./configuration.md):

```csharp
services.AddKafka(
Expand All @@ -18,6 +25,13 @@ services.AddKafka(
);
```

## Using KafkaFlow instrumentation with .NET Automatic Instrumentation
Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), you just need to subscribe to the source `KafkaFlow.OpenTelemetry` that is accessible through a constant at `KafkaFlowInstrumentation.ActivitySourceName`.

## Using .NET Automatic Instrumentation

When using [.NET automatic instrumentation](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation), the KafkaFlow activity can be captured by including the ActivitySource name `KafkaFlow.OpenTelemetry` as a parameter to the variable `OTEL_DOTNET_AUTO_TRACES_ADDITIONAL_SOURCES`.

## Propagation

KafkaFlow supports [Propagation](https://opentelemetry.io/docs/specs/otel/context/api-propagators/), the mechanism that moves context information data between services and processes.
When a message is produced using a KafkaFlow producer and consumed by a KafkaFlow consumer, the context will automatically be propagated.
2 changes: 1 addition & 1 deletion website/docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ To do that, KafkaFlow gives you access to features like:
- [Schema Registry](guides/middlewares/serializer-middleware.md#adding-schema-registry-support) support.
- [Compression](guides/compression.md) using native Confluent Kafka client compression or compressor middleware.
- [Global Events Subcription](guides/global-events.md) for message production and consumption.
- [Open Telemetry Instrumentation](guides/open-telemetry.md) for traces and baggage signals.
- [OpenTelemetry Instrumentation](guides/open-telemetry.md) for traces and baggage signals.
- Graceful shutdown (wait to finish processing to shutdown).
- Store offset when processing ends, avoiding message loss.
- Supports .NET Core and .NET Framework.
Expand Down

0 comments on commit d361097

Please sign in to comment.