From 832f0dd3b6a8ed42049b0f42115e785eb4b857dd Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Wed, 3 Jul 2024 20:40:29 +0900 Subject: [PATCH 1/8] Refactor: TopicStreamer.Run() method now blocking operation --- streamer.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/streamer.go b/streamer.go index 14df73b..7128e05 100644 --- a/streamer.go +++ b/streamer.go @@ -89,10 +89,10 @@ func (ts *TopicStreamer) Run() { mss = append(mss, config.MessageSerializer()) } - ts.cancel = ts.run(dests, mss) + ts.run(dests, mss) } -func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageSerializer) context.CancelFunc { +func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageSerializer) { if dests == nil || len(dests) == 0 { panic("No dests") } @@ -102,9 +102,8 @@ func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageS } ctx, cancel := context.WithCancel(context.Background()) - go ts.consumer.StartAsGroupSelf(ctx) - - return cancel + ts.cancel = cancel + ts.consumer.StartAsGroupSelf(ctx) } func (ts *TopicStreamer) Stop() { From 858927ba5b9a6f8b953977b94767401f642f1bd6 Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Wed, 3 Jul 2024 20:46:51 +0900 Subject: [PATCH 2/8] Docs: update readme example and explanation --- README.md | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 69c0290..46778e4 100644 --- a/README.md +++ b/README.md @@ -29,41 +29,46 @@ import ( func main() { wg := &sync.WaitGroup{} - brokers := []string{"localhost:9092"} - origin := qstreamer.Topic("origin-topic", 3) // Topic name and partition - - // Serializer that converts the message to the message to be produced. + brokers := []string{"localhost:9092"} + + // Topic name and partition + origin := qstreamer.Topic("origin-topic", 3) + + // Create a topic streamer from the brokers and the origin topic. + streamer := qstreamer.NewTopicStreamer(brokers, origin) + + // Serializer that converts the message to the message to be produced. // In this case, the message is not converted, so it is a pass-through serializer. serializer := qstreamer.NewPassThroughSerializer() - destination1 := qstreamer.Topic("destination-topic-1", 5) // Topic name and partition + // Destination topic and partition + destination1 := qstreamer.Topic("destination-topic-1", 5) - streamer := qstreamer.NewTopicStreamer(brokers, origin) - cfg := qstreamer.NewStreamConfig(serializer, destination1) streamer.AddConfig(cfg) - streamer.Run() // Non-blocking + go streamer.Run() defer streamer.Stop() wg.Add(1) - + wg.Wait() } ``` ### Explanation -1. **Set Topics**: Use the `NewTopic()` to set the start and end topics. +1. **Set Topics**: Use the `Topic()` to set the start and end topics. + +2. **Create Streamer**: Create a new streamer with the `NewTopicStreamer()` function. This function takes the Kafka brokers and the origin topic as arguments. -2. **Use PassThroughSerializer**: Create a pass-through serializer using `NewPassThroughSerializer()` which does not manufacture the message. - * If you want to convert the message, you can create a custom serializer that implements the Serializer interface. +3. **Set Serializer**: Create a new serializer with the `NewPassThroughSerializer()` function. This function is used to convert the message to the message to be produced. In this case, the message is not converted, so it is a pass-through serializer. -3. **Set StreamConfig**: Use the `NewStreamConfig()` to configure the stream settings. +4. **Set Destination Topic**: Use the `Topic()` to set the destination topic and partition. -4. **Create and Configure TopicStreamer**: Use the `NewTopicStreamer()` to create the topic streamer and the `AddConfig()` method to add the stream configuration. +5. **Set Configuration**: Create a new configuration with the `NewStreamConfig()` function. This function takes the serializer and the destination topic as arguments. Add the configuration to the streamer with the `AddConfig()` function. -5. **Run and Stop Streamer**: Call the `Run()` method to start the streamer and the `Stop()` method to stop the streamer. +6. **Run Streamer**: Run the streamer with the `Run()` function. This function starts the streamer and processes the messages. ## Contribution From 71c91101ad4920381ecef46cf0e4ec8f5b42944b Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Wed, 3 Jul 2024 20:51:10 +0900 Subject: [PATCH 3/8] Ci: coverage badge cannot be generated because of no kafka broker --- .github/workflows/coverage-badge.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/coverage-badge.yml b/.github/workflows/coverage-badge.yml index 6c61e2a..cd044e1 100644 --- a/.github/workflows/coverage-badge.yml +++ b/.github/workflows/coverage-badge.yml @@ -16,6 +16,12 @@ jobs: persist-credentials: false # otherwise, the token used is the GITHUB_TOKEN, instead of your personal access token. fetch-depth: 0 # otherwise, there would be errors pushing refs to the destination repository. + - name: Run Kafka KRaft Broker + uses: spicyparrot/kafka-kraft-action@v1.1.0 + with: + kafka-version: "3.7.0" + kafka-topics: "test,3,test1,3,test2,3,test3,3" + - name: Setup go uses: actions/setup-go@v4 with: From ee3c694eca5ac78e7ae515923b67563fa1dde30f Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Thu, 4 Jul 2024 11:15:42 +0900 Subject: [PATCH 4/8] Fix: possibility nil pointer error when Stop() TopicStreamer --- streamer.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/streamer.go b/streamer.go index 7128e05..83028d7 100644 --- a/streamer.go +++ b/streamer.go @@ -2,6 +2,7 @@ package qstreamer import ( "context" + "errors" "github.com/IBM/sarama" "github.com/violetpay-org/queue-streamer/common" "github.com/violetpay-org/queue-streamer/internal" @@ -106,8 +107,13 @@ func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageS ts.consumer.StartAsGroupSelf(ctx) } -func (ts *TopicStreamer) Stop() { +func (ts *TopicStreamer) Stop() error { + if ts.cancel == nil { + return errors.New("no cancel function") + } + ts.cancel() + return nil } func Topic(name string, partition int32) common.Topic { From e6e09eded38550c87a96566464c610125c2aa9a0 Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Thu, 4 Jul 2024 11:17:33 +0900 Subject: [PATCH 5/8] Test: TopicStreamer.Run() method is not non-blocking operation, so that changed testing assertion --- streamer_test.go | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/streamer_test.go b/streamer_test.go index 6da33a5..41e8b77 100644 --- a/streamer_test.go +++ b/streamer_test.go @@ -6,6 +6,7 @@ import ( qstreamer "github.com/violetpay-org/queue-streamer" "github.com/violetpay-org/queue-streamer/common" "testing" + "time" ) var brokers = []string{"localhost:9093"} @@ -114,7 +115,8 @@ func TestTopicStreamer_Run(t *testing.T) { t.Run("Run", func(t *testing.T) { t.Cleanup(func() { - streamer.Stop() + err := streamer.Stop() + assert.Nil(t, err) streamer = nil }) @@ -123,23 +125,26 @@ func TestTopicStreamer_Run(t *testing.T) { streamer.AddConfig(config) assert.NotPanics(t, func() { - streamer.Run() + go streamer.Run() + time.Sleep(1 * time.Second) }) }) t.Run("Run with no dests", func(t *testing.T) { t.Cleanup(func() { + err := streamer.Stop() + assert.NotNil(t, err) streamer = nil }) streamer = qstreamer.NewTopicStreamer(brokers, topic) - assert.Panics(t, func() { - streamer.Run() - }) + assert.Panics(t, streamer.Run) }) t.Run("Run with no messageSerializer", func(t *testing.T) { t.Cleanup(func() { + err := streamer.Stop() + assert.NotNil(t, err) streamer = nil }) @@ -147,13 +152,13 @@ func TestTopicStreamer_Run(t *testing.T) { config := qstreamer.NewStreamConfig(nil, topic) streamer.AddConfig(config) - assert.Panics(t, func() { - streamer.Run() - }) + assert.Panics(t, streamer.Run) }) t.Run("Run with no topic", func(t *testing.T) { t.Cleanup(func() { + err := streamer.Stop() + assert.NotNil(t, err) streamer = nil }) @@ -161,13 +166,13 @@ func TestTopicStreamer_Run(t *testing.T) { config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{}) streamer.AddConfig(config) - assert.Panics(t, func() { - streamer.Run() - }) + assert.Panics(t, streamer.Run) }) t.Run("Run with no topic partition", func(t *testing.T) { t.Cleanup(func() { + err := streamer.Stop() + assert.NotNil(t, err) streamer = nil }) @@ -175,13 +180,13 @@ func TestTopicStreamer_Run(t *testing.T) { config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{Name: "test1"}) streamer.AddConfig(config) - assert.Panics(t, func() { - streamer.Run() - }) + assert.Panics(t, streamer.Run) }) t.Run("Run with no topic name", func(t *testing.T) { t.Cleanup(func() { + err := streamer.Stop() + assert.NotNil(t, err) streamer = nil }) @@ -189,9 +194,7 @@ func TestTopicStreamer_Run(t *testing.T) { config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{Partition: 1}) streamer.AddConfig(config) - assert.Panics(t, func() { - streamer.Run() - }) + assert.Panics(t, streamer.Run) }) } From 93d8c8c2ae16ea5014dd02f6c96bca72f8451f2c Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Thu, 4 Jul 2024 02:19:13 +0000 Subject: [PATCH 6/8] chore: Updated coverage badge. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 46778e4..1e23ec9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Queue Streamer -![Coverage](https://img.shields.io/badge/Coverage-91.6%25-brightgreen) +![Coverage](https://img.shields.io/badge/Coverage-91.7%25-brightgreen) Queue Streamer is a Go package that processes and transfers data between Kafka topics with exactly-once delivery guarantees. This package receives messages from Kafka brokers and transfers them to specified topics. This document explains how to install and use Queue Streamer. From a5c8d6ed35c1bf9929f156aa6066e4f37d4fc23e Mon Sep 17 00:00:00 2001 From: Jaewook Lee Date: Thu, 4 Jul 2024 11:22:56 +0900 Subject: [PATCH 7/8] Fix: TopicStreamer has possibility race condition when call cancelFunc --- streamer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/streamer.go b/streamer.go index 83028d7..9084eaf 100644 --- a/streamer.go +++ b/streamer.go @@ -6,12 +6,14 @@ import ( "github.com/IBM/sarama" "github.com/violetpay-org/queue-streamer/common" "github.com/violetpay-org/queue-streamer/internal" + "sync" ) type TopicStreamer struct { topic common.Topic configs []StreamConfig cancel context.CancelFunc + mutex *sync.Mutex consumer *internal.StreamConsumer } @@ -55,6 +57,7 @@ func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{}) configs: make([]StreamConfig, 0), cancel: nil, consumer: consumer, + mutex: &sync.Mutex{}, } } @@ -103,11 +106,15 @@ func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageS } ctx, cancel := context.WithCancel(context.Background()) + ts.mutex.Lock() ts.cancel = cancel + ts.mutex.Unlock() ts.consumer.StartAsGroupSelf(ctx) } func (ts *TopicStreamer) Stop() error { + ts.mutex.Lock() + defer ts.mutex.Unlock() if ts.cancel == nil { return errors.New("no cancel function") } From 23043bbc86d81f1d4a8fd6b9c3deb4b59f1d0319 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Thu, 4 Jul 2024 02:24:35 +0000 Subject: [PATCH 8/8] chore: Updated coverage badge. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1e23ec9..401d8bf 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Queue Streamer -![Coverage](https://img.shields.io/badge/Coverage-91.7%25-brightgreen) +![Coverage](https://img.shields.io/badge/Coverage-91.8%25-brightgreen) Queue Streamer is a Go package that processes and transfers data between Kafka topics with exactly-once delivery guarantees. This package receives messages from Kafka brokers and transfers them to specified topics. This document explains how to install and use Queue Streamer.