Skip to content

Commit

Permalink
fix the race condition of mqtt
Browse files Browse the repository at this point in the history
Signed-off-by: myan <myan@redhat.com>
  • Loading branch information
yanmxa committed Sep 4, 2024
1 parent 1aecb20 commit 460cc6d
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions protocol/mqtt_paho/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

type Protocol struct {
client *paho.Client
config *paho.ClientConfig
connOption *paho.Connect
publishOption *paho.Publish
subscribeOption *paho.Subscribe
Expand Down Expand Up @@ -89,7 +88,7 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...
var err error
defer m.Finish(err)

msg := p.publishOption
msg := p.publishMsg()
if cecontext.TopicFrom(ctx) != "" {
msg.Topic = cecontext.TopicFrom(ctx)
cecontext.WithTopic(ctx, "")
Expand All @@ -107,6 +106,16 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...
return err
}

// publishMsg generate a new paho.Publish message from the p.publishOption
func (p *Protocol) publishMsg() *paho.Publish {
return &paho.Publish{
QoS: p.publishOption.QoS,
Retain: p.publishOption.Retain,
Topic: p.publishOption.Topic,
Properties: p.publishOption.Properties,
}
}

func (p *Protocol) OpenInbound(ctx context.Context) error {
if p.subscribeOption == nil {
return fmt.Errorf("the paho.Subscribe option must not be nil")
Expand Down

0 comments on commit 460cc6d

Please sign in to comment.