Skip to content

Commit

Permalink
Merge pull request #8 from violetpay-org/feature/direct-streamer
Browse files Browse the repository at this point in the history
Feature/direct streamer
  • Loading branch information
asheswook authored Jul 6, 2024
2 parents 4894a96 + daa0ebb commit c136b2e
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 23 deletions.
60 changes: 60 additions & 0 deletions direct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package qstreamer

import (
"github.com/violetpay-org/queue-streamer/common"
"github.com/violetpay-org/queue-streamer/internal"
)

// DirectStreamer is a streamer that streams messages from a topic to a topic.
type DirectStreamer struct {
ts TopicStreamer
}

// NewDirectStreamer creates a new topic streamer that streams messages from a topic to a topic.
// The streamer is configured with a list of brokers, a topic to stream from and a consumer group id .
// If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.
// - ds := NewDirectStreamer(brokers, topic, groupId)
// - ds := NewDirectStreamer(brokers, topic, groupId, consumerConfig, producerConfig)
// - ds := NewDirectStreamer(brokers, topic, groupId, nil, producerConfig)
func NewDirectStreamer(brokers []string, src common.Topic, groupId string, args ...interface{}) *DirectStreamer {
ts := NewTopicStreamer(brokers, src, groupId, args...)
return &DirectStreamer{
ts: *ts,
}
}

func (ds *DirectStreamer) Config() (bool, StreamConfig) {
if len(ds.ts.configs) == 0 {
return false, StreamConfig{}
}

return true, ds.ts.configs[0]
}

func (ds *DirectStreamer) SetConfig(config StreamConfig) {
if len(ds.ts.configs) == 0 {
ds.ts.configs = append(ds.ts.configs, config)
} else {
ds.ts.configs[0] = config
}
}

func (ds *DirectStreamer) Topic() common.Topic {
return ds.ts.Topic()
}

func (ds *DirectStreamer) Consumer() *internal.StreamConsumer {
return ds.ts.Consumer()
}

func (ds *DirectStreamer) GroupId() string {
return ds.ts.GroupId()
}

func (ds *DirectStreamer) Run() {
ds.ts.Run()
}

func (ds *DirectStreamer) Stop() error {
return ds.ts.Stop()
}
147 changes: 147 additions & 0 deletions direct_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package qstreamer_test

import (
"github.com/stretchr/testify/assert"
qstreamer "github.com/violetpay-org/queue-streamer"
"testing"
"time"
)

var dbrokers = []string{"localhost:9093"}
var dTopic = qstreamer.Topic("test", 1)
var dTopic2 = qstreamer.Topic("test2", 2)

func TestNewDirectStreamer(t *testing.T) {
var ds *qstreamer.DirectStreamer

t.Cleanup(func() {
ds = nil
})

t.Run("NewDirectStreamer", func(t *testing.T) {
ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")
assert.NotNil(t, ds)
})
}

func TestDirectStreamer_Config(t *testing.T) {
var ds *qstreamer.DirectStreamer

t.Cleanup(func() {
ds = nil
})

t.Run("No Config set", func(t *testing.T) {
t.Cleanup(func() {
ds = nil
})

ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")
ok, _ := ds.Config()

assert.False(t, ok)
})

t.Run("Set Config", func(t *testing.T) {
t.Cleanup(func() {
ds = nil
})

ms := qstreamer.NewPassThroughSerializer()
ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")
cfg := qstreamer.NewStreamConfig(ms, dTopic)
ds.SetConfig(cfg)

ok, cfg := ds.Config()
assert.True(t, ok)

assert.Equal(t, dTopic, cfg.Topic())
assert.Equal(t, ms, cfg.MessageSerializer())
})

t.Run("Set Config multiple times", func(t *testing.T) {
t.Cleanup(func() {
ds = nil
})

ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")

ms1 := qstreamer.NewPassThroughSerializer()
ms2 := qstreamer.NewPassThroughSerializer()

cfg1 := qstreamer.NewStreamConfig(ms1, dTopic)
cfg2 := qstreamer.NewStreamConfig(ms2, dTopic2)

ds.SetConfig(cfg1)
ds.SetConfig(cfg2)

ok, dsCfg := ds.Config()
assert.True(t, ok)
assert.Equal(t, dTopic2, dsCfg.Topic())
assert.Equal(t, ms2, dsCfg.MessageSerializer())
})
}

func TestDirectStreamer_Consumer(t *testing.T) {
var ds *qstreamer.DirectStreamer

t.Cleanup(func() {
ds = nil
})

t.Run("Consumer", func(t *testing.T) {
ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")
assert.NotNil(t, ds.Consumer())
})
}

func TestDirectStreamer_Topic(t *testing.T) {
var ds *qstreamer.DirectStreamer

t.Cleanup(func() {
ds = nil
})

t.Run("Topic", func(t *testing.T) {
ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")
assert.Equal(t, dTopic, ds.Topic())
})
}

func TestDirectStreamer_GroupId(t *testing.T) {
var ds *qstreamer.DirectStreamer

t.Cleanup(func() {
ds = nil
})

t.Run("GroupID", func(t *testing.T) {
ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "testGroupId")
assert.Equal(t, "testGroupId", ds.GroupId())
})
}

func TestDirectStreamer_Run(t *testing.T) {
var ds *qstreamer.DirectStreamer

t.Cleanup(func() {
ds = nil
})

t.Run("Run", func(t *testing.T) {
t.Cleanup(func() {
err := ds.Stop()
assert.Nil(t, err)
ds = nil
})

ds = qstreamer.NewDirectStreamer(dbrokers, dTopic, "")
cfg := qstreamer.NewStreamConfig(qstreamer.NewPassThroughSerializer(), dTopic)
ds.SetConfig(cfg)

assert.NotPanics(t, func() {
go ds.Run()
time.Sleep(1 * time.Second)
})
})
}
2 changes: 0 additions & 2 deletions internal/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

func TestCopy(t *testing.T) {
t.Parallel()

t.Run("Copy pointer", func(t *testing.T) {
type TestStruct struct {
Name string
Expand Down
23 changes: 17 additions & 6 deletions streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,24 @@ import (
"sync"
)

// TopicStreamer is a streamer that streams messages from a topic to other topics.
type TopicStreamer struct {
topic common.Topic
configs []StreamConfig
cancel context.CancelFunc
mutex *sync.Mutex
groupId string

consumer *internal.StreamConsumer
}

// NewTopicStreamer creates a new topic streamer that streams messages from a topic to other topics.
// The streamer is configured with a list of brokers and a topic to stream from.
// The streamer is configured with a list of brokers, a topic to stream from and a consumer group id .
// If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.
// - ts := NewTopicStreamer(brokers, topic)
// - ts := NewTopicStreamer(brokers, topic, consumerConfig, producerConfig)
// - ts := NewTopicStreamer(brokers, topic, nil, producerConfig)
func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{}) *TopicStreamer {
// - ts := NewTopicStreamer(brokers, topic, groupId)
// - ts := NewTopicStreamer(brokers, topic, groupId, consumerConfig, producerConfig)
// - ts := NewTopicStreamer(brokers, topic, groupId, nil, producerConfig)
func NewTopicStreamer(brokers []string, topic common.Topic, groupId string, args ...interface{}) *TopicStreamer {
var ccfg *sarama.Config
var pcfg *sarama.Config

Expand All @@ -44,9 +46,13 @@ func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{})
panic("Invalid number of arguments")
}

if groupId == "" {
groupId = "queue-streamer-default-group"
}

consumer := internal.NewStreamConsumer(
topic,
"groupId",
groupId,
brokers,
ccfg,
pcfg,
Expand All @@ -58,6 +64,7 @@ func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{})
cancel: nil,
consumer: consumer,
mutex: &sync.Mutex{},
groupId: groupId,
}
}

Expand All @@ -77,6 +84,10 @@ func (ts *TopicStreamer) AddConfig(config StreamConfig) {
ts.configs = append(ts.configs, config)
}

func (ts *TopicStreamer) GroupId() string {
return ts.groupId
}

func (ts *TopicStreamer) Run() {
dests := make([]common.Topic, 0)
mss := make([]common.MessageSerializer, 0)
Expand Down
Loading

0 comments on commit c136b2e

Please sign in to comment.