Skip to content

Commit

Permalink
Merge pull request #5 from violetpay-org/refactor/blocking-run-method
Browse files Browse the repository at this point in the history
Refactor/blocking run method
  • Loading branch information
asheswook authored Jul 4, 2024
2 parents 3575ac6 + 23043bb commit 31fba14
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 39 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/coverage-badge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ jobs:
persist-credentials: false # otherwise, the token used is the GITHUB_TOKEN, instead of your personal access token.
fetch-depth: 0 # otherwise, there would be errors pushing refs to the destination repository.

- name: Run Kafka KRaft Broker
uses: spicyparrot/kafka-kraft-action@v1.1.0
with:
kafka-version: "3.7.0"
kafka-topics: "test,3,test1,3,test2,3,test3,3"

- name: Setup go
uses: actions/setup-go@v4
with:
Expand Down
37 changes: 21 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Queue Streamer
![Coverage](https://img.shields.io/badge/Coverage-91.6%25-brightgreen)
![Coverage](https://img.shields.io/badge/Coverage-91.8%25-brightgreen)

Queue Streamer is a Go package that processes and transfers data between Kafka topics with exactly-once delivery guarantees. This package receives messages from Kafka brokers and transfers them to specified topics. This document explains how to install and use Queue Streamer.

Expand Down Expand Up @@ -29,41 +29,46 @@ import (
func main() {
wg := &sync.WaitGroup{}

brokers := []string{"localhost:9092"}
origin := qstreamer.Topic("origin-topic", 3) // Topic name and partition

// Serializer that converts the message to the message to be produced.
brokers := []string{"localhost:9092"}

// Topic name and partition
origin := qstreamer.Topic("origin-topic", 3)

// Create a topic streamer from the brokers and the origin topic.
streamer := qstreamer.NewTopicStreamer(brokers, origin)

// Serializer that converts the message to the message to be produced.
// In this case, the message is not converted, so it is a pass-through serializer.
serializer := qstreamer.NewPassThroughSerializer()

destination1 := qstreamer.Topic("destination-topic-1", 5) // Topic name and partition
// Destination topic and partition
destination1 := qstreamer.Topic("destination-topic-1", 5)

streamer := qstreamer.NewTopicStreamer(brokers, origin)

cfg := qstreamer.NewStreamConfig(serializer, destination1)
streamer.AddConfig(cfg)


streamer.Run() // Non-blocking
go streamer.Run()
defer streamer.Stop()
wg.Add(1)

wg.Wait()
}
```

### Explanation

1. **Set Topics**: Use the `NewTopic()` to set the start and end topics.
1. **Set Topics**: Use the `Topic()` to set the start and end topics.

2. **Create Streamer**: Create a new streamer with the `NewTopicStreamer()` function. This function takes the Kafka brokers and the origin topic as arguments.

2. **Use PassThroughSerializer**: Create a pass-through serializer using `NewPassThroughSerializer()` which does not manufacture the message.
* If you want to convert the message, you can create a custom serializer that implements the Serializer interface.
3. **Set Serializer**: Create a new serializer with the `NewPassThroughSerializer()` function. This function is used to convert the message to the message to be produced. In this case, the message is not converted, so it is a pass-through serializer.

3. **Set StreamConfig**: Use the `NewStreamConfig()` to configure the stream settings.
4. **Set Destination Topic**: Use the `Topic()` to set the destination topic and partition.

4. **Create and Configure TopicStreamer**: Use the `NewTopicStreamer()` to create the topic streamer and the `AddConfig()` method to add the stream configuration.
5. **Set Configuration**: Create a new configuration with the `NewStreamConfig()` function. This function takes the serializer and the destination topic as arguments. Add the configuration to the streamer with the `AddConfig()` function.

5. **Run and Stop Streamer**: Call the `Run()` method to start the streamer and the `Stop()` method to stop the streamer.
6. **Run Streamer**: Run the streamer with the `Run()` function. This function starts the streamer and processes the messages.

## Contribution

Expand Down
24 changes: 18 additions & 6 deletions streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package qstreamer

import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/violetpay-org/queue-streamer/common"
"github.com/violetpay-org/queue-streamer/internal"
"sync"
)

type TopicStreamer struct {
topic common.Topic
configs []StreamConfig
cancel context.CancelFunc
mutex *sync.Mutex

consumer *internal.StreamConsumer
}
Expand Down Expand Up @@ -54,6 +57,7 @@ func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{})
configs: make([]StreamConfig, 0),
cancel: nil,
consumer: consumer,
mutex: &sync.Mutex{},
}
}

Expand Down Expand Up @@ -89,10 +93,10 @@ func (ts *TopicStreamer) Run() {
mss = append(mss, config.MessageSerializer())
}

ts.cancel = ts.run(dests, mss)
ts.run(dests, mss)
}

func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageSerializer) context.CancelFunc {
func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageSerializer) {
if dests == nil || len(dests) == 0 {
panic("No dests")
}
Expand All @@ -102,13 +106,21 @@ func (ts *TopicStreamer) run(dests []common.Topic, serializers []common.MessageS
}

ctx, cancel := context.WithCancel(context.Background())
go ts.consumer.StartAsGroupSelf(ctx)

return cancel
ts.mutex.Lock()
ts.cancel = cancel
ts.mutex.Unlock()
ts.consumer.StartAsGroupSelf(ctx)
}

func (ts *TopicStreamer) Stop() {
func (ts *TopicStreamer) Stop() error {
ts.mutex.Lock()
defer ts.mutex.Unlock()
if ts.cancel == nil {
return errors.New("no cancel function")
}

ts.cancel()
return nil
}

func Topic(name string, partition int32) common.Topic {
Expand Down
37 changes: 20 additions & 17 deletions streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
qstreamer "github.com/violetpay-org/queue-streamer"
"github.com/violetpay-org/queue-streamer/common"
"testing"
"time"
)

var brokers = []string{"localhost:9093"}
Expand Down Expand Up @@ -114,7 +115,8 @@ func TestTopicStreamer_Run(t *testing.T) {

t.Run("Run", func(t *testing.T) {
t.Cleanup(func() {
streamer.Stop()
err := streamer.Stop()
assert.Nil(t, err)
streamer = nil
})

Expand All @@ -123,75 +125,76 @@ func TestTopicStreamer_Run(t *testing.T) {
streamer.AddConfig(config)

assert.NotPanics(t, func() {
streamer.Run()
go streamer.Run()
time.Sleep(1 * time.Second)
})
})

t.Run("Run with no dests", func(t *testing.T) {
t.Cleanup(func() {
err := streamer.Stop()
assert.NotNil(t, err)
streamer = nil
})
streamer = qstreamer.NewTopicStreamer(brokers, topic)

assert.Panics(t, func() {
streamer.Run()
})
assert.Panics(t, streamer.Run)
})

t.Run("Run with no messageSerializer", func(t *testing.T) {
t.Cleanup(func() {
err := streamer.Stop()
assert.NotNil(t, err)
streamer = nil
})

streamer = qstreamer.NewTopicStreamer(brokers, topic)
config := qstreamer.NewStreamConfig(nil, topic)
streamer.AddConfig(config)

assert.Panics(t, func() {
streamer.Run()
})
assert.Panics(t, streamer.Run)
})

t.Run("Run with no topic", func(t *testing.T) {
t.Cleanup(func() {
err := streamer.Stop()
assert.NotNil(t, err)
streamer = nil
})

streamer = qstreamer.NewTopicStreamer(brokers, topic)
config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{})
streamer.AddConfig(config)

assert.Panics(t, func() {
streamer.Run()
})
assert.Panics(t, streamer.Run)
})

t.Run("Run with no topic partition", func(t *testing.T) {
t.Cleanup(func() {
err := streamer.Stop()
assert.NotNil(t, err)
streamer = nil
})

streamer = qstreamer.NewTopicStreamer(brokers, topic)
config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{Name: "test1"})
streamer.AddConfig(config)

assert.Panics(t, func() {
streamer.Run()
})
assert.Panics(t, streamer.Run)
})

t.Run("Run with no topic name", func(t *testing.T) {
t.Cleanup(func() {
err := streamer.Stop()
assert.NotNil(t, err)
streamer = nil
})

streamer = qstreamer.NewTopicStreamer(brokers, topic)
config := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), common.Topic{Partition: 1})
streamer.AddConfig(config)

assert.Panics(t, func() {
streamer.Run()
})
assert.Panics(t, streamer.Run)
})
}

Expand Down

0 comments on commit 31fba14

Please sign in to comment.