diff --git a/pkg/api/client/amqp/qeclients/factory.go b/pkg/api/client/amqp/qeclients/factory.go index 6a6b969..bd74551 100644 --- a/pkg/api/client/amqp/qeclients/factory.go +++ b/pkg/api/client/amqp/qeclients/factory.go @@ -5,6 +5,7 @@ type AmqpQEClientImpl int const ( Python AmqpQEClientImpl = iota + MultipleReceiversPython Java NodeJS Timeout int = 60 @@ -20,10 +21,16 @@ type AmqpQEClientImplInfo struct { var ( QEClientImageMap = map[AmqpQEClientImpl]AmqpQEClientImplInfo{ Python: { - Name: "cli-proton-python", - Image: "docker.io/rhmessagingqe/cli-proton-python:latest", - CommandSender: "cli-proton-python-sender", + Name: "cli-proton-python", + Image: "docker.io/rhmessagingqe/cli-proton-python:latest", + CommandSender: "cli-proton-python-sender", CommandReceiver: "cli-proton-python-receiver", }, + MultipleReceiversPython: { + Name: "multiple-receivers", + Image: "docker.io/nicob1987/multireceive:latest", + CommandSender: "NotImplemented", + CommandReceiver: "./multireceive.py", + }, } ) diff --git a/pkg/api/client/amqp/qeclients/factoryreceiver.go b/pkg/api/client/amqp/qeclients/factoryreceiver.go index 315c321..5ef1ba4 100644 --- a/pkg/api/client/amqp/qeclients/factoryreceiver.go +++ b/pkg/api/client/amqp/qeclients/factoryreceiver.go @@ -8,8 +8,8 @@ import ( ) type AmqpQEReceiverBuilder struct { - receiver *AmqpQEClientCommon - MessageCount int + receiver *AmqpQEClientCommon + MessageCount int } func NewReceiverBuilder(name string, impl AmqpQEClientImpl, data framework.ContextData, url string) *AmqpQEReceiverBuilder { @@ -38,6 +38,25 @@ func (a *AmqpQEReceiverBuilder) Messages(count int) *AmqpQEReceiverBuilder { return a } +func (a *AmqpQEReceiverBuilder) addSpecificImplementationOptions(cBuilder *framework.ContainerBuilder) { + switch a.receiver.Implementation { + // URL + case MultipleReceiversPython: + { + cBuilder.AddArgs("--address", a.receiver.Url) + cBuilder.AddArgs("--connections", "100") //total connections + cBuilder.AddArgs("--links", "500") //total links per connection + } + default: + { + cBuilder.AddArgs("--broker-url", a.receiver.Url) + cBuilder.AddArgs("--count", strconv.Itoa(a.MessageCount)) + cBuilder.AddArgs("--timeout", strconv.Itoa(a.receiver.Timeout)) + cBuilder.AddArgs("--log-msgs", "json") + } + } +} + func (a *AmqpQEReceiverBuilder) Build() (*AmqpQEClientCommon, error) { // Preparing Pod, Container (commands and args) and etc podBuilder := framework.NewPodBuilder(a.receiver.Name, a.receiver.Context.Namespace) @@ -50,21 +69,7 @@ func (a *AmqpQEReceiverBuilder) Build() (*AmqpQEClientCommon, error) { cBuilder := framework.NewContainerBuilder(a.receiver.Name, QEClientImageMap[a.receiver.Implementation].Image) cBuilder.WithCommands(QEClientImageMap[a.receiver.Implementation].CommandReceiver) - // - // Adds args (may vary from one implementation to another) - // - - // URL - cBuilder.AddArgs("--broker-url", a.receiver.Url) - - // Message count - cBuilder.AddArgs("--count", strconv.Itoa(a.MessageCount)) - - // Timeout - cBuilder.AddArgs("--timeout", strconv.Itoa(a.receiver.Timeout)) - - // Static options - cBuilder.AddArgs("--log-msgs", "json") + a.addSpecificImplementationOptions(cBuilder) // Retrieving container and adding to pod c := cBuilder.Build() diff --git a/pkg/framework/pod.go b/pkg/framework/pod.go index 12254dc..1750092 100644 --- a/pkg/framework/pod.go +++ b/pkg/framework/pod.go @@ -31,6 +31,7 @@ func NewContainerBuilder(name string, image string) *ContainerBuilder { cb.c = v1.Container{} cb.c.Name = name cb.c.Image = image + cb.c.TerminationMessagePolicy = v1.TerminationMessageFallbackToLogsOnError return cb }