Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions pkg/api/client/amqp/qeclients/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ type AmqpQEClientImpl int

const (
Python AmqpQEClientImpl = iota
MultipleReceiversPython
Java
NodeJS
Timeout int = 60
Expand All @@ -20,10 +21,16 @@ type AmqpQEClientImplInfo struct {
var (
QEClientImageMap = map[AmqpQEClientImpl]AmqpQEClientImplInfo{
Python: {
Name: "cli-proton-python",
Image: "docker.io/rhmessagingqe/cli-proton-python:latest",
CommandSender: "cli-proton-python-sender",
Name: "cli-proton-python",
Image: "docker.io/rhmessagingqe/cli-proton-python:latest",
CommandSender: "cli-proton-python-sender",
CommandReceiver: "cli-proton-python-receiver",
},
MultipleReceiversPython: {
Name: "multiple-receivers",
Image: "docker.io/nicob1987/multireceive:latest",
CommandSender: "NotImplemented",
CommandReceiver: "./multireceive.py",
},
}
)
39 changes: 22 additions & 17 deletions pkg/api/client/amqp/qeclients/factoryreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

type AmqpQEReceiverBuilder struct {
receiver *AmqpQEClientCommon
MessageCount int
receiver *AmqpQEClientCommon
MessageCount int
}

func NewReceiverBuilder(name string, impl AmqpQEClientImpl, data framework.ContextData, url string) *AmqpQEReceiverBuilder {
Expand Down Expand Up @@ -38,6 +38,25 @@ func (a *AmqpQEReceiverBuilder) Messages(count int) *AmqpQEReceiverBuilder {
return a
}

func (a *AmqpQEReceiverBuilder) addSpecificImplementationOptions(cBuilder *framework.ContainerBuilder) {
switch a.receiver.Implementation {
// URL
case MultipleReceiversPython:
{
cBuilder.AddArgs("--address", a.receiver.Url)
cBuilder.AddArgs("--connections", "100") //total connections
cBuilder.AddArgs("--links", "500") //total links per connection
}
default:
{
cBuilder.AddArgs("--broker-url", a.receiver.Url)
cBuilder.AddArgs("--count", strconv.Itoa(a.MessageCount))
cBuilder.AddArgs("--timeout", strconv.Itoa(a.receiver.Timeout))
cBuilder.AddArgs("--log-msgs", "json")
}
}
}

func (a *AmqpQEReceiverBuilder) Build() (*AmqpQEClientCommon, error) {
// Preparing Pod, Container (commands and args) and etc
podBuilder := framework.NewPodBuilder(a.receiver.Name, a.receiver.Context.Namespace)
Expand All @@ -50,21 +69,7 @@ func (a *AmqpQEReceiverBuilder) Build() (*AmqpQEClientCommon, error) {
cBuilder := framework.NewContainerBuilder(a.receiver.Name, QEClientImageMap[a.receiver.Implementation].Image)
cBuilder.WithCommands(QEClientImageMap[a.receiver.Implementation].CommandReceiver)

//
// Adds args (may vary from one implementation to another)
//

// URL
cBuilder.AddArgs("--broker-url", a.receiver.Url)

// Message count
cBuilder.AddArgs("--count", strconv.Itoa(a.MessageCount))

// Timeout
cBuilder.AddArgs("--timeout", strconv.Itoa(a.receiver.Timeout))

// Static options
cBuilder.AddArgs("--log-msgs", "json")
a.addSpecificImplementationOptions(cBuilder)

// Retrieving container and adding to pod
c := cBuilder.Build()
Expand Down
1 change: 1 addition & 0 deletions pkg/framework/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewContainerBuilder(name string, image string) *ContainerBuilder {
cb.c = v1.Container{}
cb.c.Name = name
cb.c.Image = image
cb.c.TerminationMessagePolicy = v1.TerminationMessageFallbackToLogsOnError
return cb
}

Expand Down