Skip to content

Commit

Permalink
Merge pull request #5 from tikivn/feature/batch-producer-config
Browse files Browse the repository at this point in the history
Add feature config for batch processing
  • Loading branch information
thunguyen-tiki authored May 8, 2020
2 parents e0ad321 + 8d8ed70 commit 41d46d1
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
*.out

vendor/
.idea/
11 changes: 11 additions & 0 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ func NewEventBus(
return NewEventBusWithConfig(ctx, client, producerTopicFunc, consumerTopicsFunc, opts...)
}

func NewEventBusWithBatchProducer(
ctx context.Context,
brokers []string,
producerTopicFunc TopicProducer,
consumerTopicsFunc TopicsConsumer,
opts ...Option,
) (*EventBus, error) {
client := NewClientWithConfig(brokers, NewConfigWithBatchProducer())
return NewEventBusWithConfig(ctx, client, producerTopicFunc, consumerTopicsFunc, opts...)
}

// NewEventBus creates a EventBus.
func NewEventBusWithConfig(
ctx context.Context,
Expand Down
42 changes: 42 additions & 0 deletions eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,45 @@ func TestEventBus(t *testing.T) {

eventbus.AcceptanceTest(t, bus1, bus2, timeout)
}

func TestBatchProducerEventBus(t *testing.T) {
defer goleak.VerifyNoLeaks(t)
logger := kitlog.NewLogfmtLogger(os.Stdout)
logger = kitlog.With(logger, "TestEventBus", os.Getegid())

kfbus.Logger = logger
brokers := strings.Split("uat-kafka-1.svr.tiki.services:9092", ",")
topic := uuid.New()

timeout := time.Second * 60

ctx := context.Background()

bus1, err := kfbus.NewEventBus(
ctx,
brokers,
kfbus.DefaultTopicProducer(topic.String()),
kfbus.DefaultTopicsConsumer(topic.String()),
kfbus.WithTimeout(timeout),
)
if err != nil {
t.Fatal("there should be no error:", err)
}
bus1.SetWaitConsumer(true)
defer bus1.Close()

bus2, err := kfbus.NewEventBusWithBatchProducer(
ctx,
brokers,
kfbus.DefaultTopicProducer(topic.String()),
kfbus.DefaultTopicsConsumer(topic.String()),
kfbus.WithTimeout(timeout),
)
if err != nil {
t.Fatal("there should be no error:", err)
}
bus2.SetWaitConsumer(true)
defer bus2.Close()

eventbus.AcceptanceTest(t, bus1, bus2, timeout)
}
12 changes: 12 additions & 0 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ func NewConfig() *sarama.Config {
cfg.Producer.Retry.Max = 10
cfg.Producer.Retry.Backoff = time.Second

cfg.Producer.RequiredAcks = sarama.WaitForAll

return cfg
}

func NewConfigWithBatchProducer() *sarama.Config {
cfg := NewConfig()

cfg.Producer.Flush.Messages = 1000
cfg.Producer.Flush.MaxMessages = 10000
cfg.Producer.Flush.Frequency = 100 * time.Millisecond

return cfg
}

Expand Down

0 comments on commit 41d46d1

Please sign in to comment.