Skip to content

Commit

Permalink
Fix: TopicStreamer cancellation function
Browse files Browse the repository at this point in the history
  • Loading branch information
asheswook committed May 31, 2024
1 parent 0713175 commit d0102f3
Showing 1 changed file with 5 additions and 18 deletions.
23 changes: 5 additions & 18 deletions streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package qstreamer

import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/violetpay-org/queue-streamer/internal"
"github.com/violetpay-org/queue-streamer/shared"
Expand All @@ -11,7 +10,7 @@ import (
type TopicStreamer struct {
topic shared.Topic
configs []StreamConfig
cancels map[StreamConfig]context.CancelFunc
cancel context.CancelFunc

consumer *internal.StreamConsumer
}
Expand Down Expand Up @@ -55,7 +54,7 @@ func NewTopicStreamer(brokers []string, topic shared.Topic, args ...interface{})
return &TopicStreamer{
topic: topic,
configs: make([]StreamConfig, 0),
cancels: make(map[StreamConfig]context.CancelFunc),
cancel: nil,
consumer: consumer,
}
}
Expand All @@ -72,7 +71,7 @@ func (ts *TopicStreamer) Run() {
mss = append(mss, config.MessageSerializer())
}

ts.run(dests, mss)
ts.cancel = ts.run(dests, mss)
}

func (ts *TopicStreamer) run(dests []shared.Topic, serializers []shared.MessageSerializer) context.CancelFunc {
Expand All @@ -98,20 +97,8 @@ func (ts *TopicStreamer) run(dests []shared.Topic, serializers []shared.MessageS
return cancel
}

func (ts *TopicStreamer) StopAll() {
for _, cancel := range ts.cancels {
cancel()
}
}

func (ts *TopicStreamer) Stop(spec StreamConfig) {
if cancel, ok := ts.cancels[spec]; ok {
cancel()
fmt.Println("Spec stopped")
return
}

fmt.Println("Spec not found")
func (ts *TopicStreamer) Stop() {
ts.cancel()
}

func NewTopic(name string, partition int32) shared.Topic {
Expand Down

0 comments on commit d0102f3

Please sign in to comment.