Skip to content

Commit

Permalink
Merge pull request #266 from peczenyj/small-fixes-and-refactors
Browse files Browse the repository at this point in the history
Small fixes and refactors
  • Loading branch information
Zerpet authored Jun 6, 2024
2 parents 2d7a4f4 + a212a69 commit 6867443
Show file tree
Hide file tree
Showing 22 changed files with 102 additions and 211 deletions.
6 changes: 0 additions & 6 deletions _examples/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ func (client *Client) handleReconnect(addr string) {
client.infolog.Println("attempting to connect")

conn, err := client.connect(addr)

if err != nil {
client.errlog.Println("failed to connect. Retrying...")

Expand All @@ -201,7 +200,6 @@ func (client *Client) handleReconnect(addr string) {
// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)

if err != nil {
return nil, err
}
Expand All @@ -220,7 +218,6 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
client.m.Unlock()

err := client.init(conn)

if err != nil {
client.errlog.Println("failed to initialize channel, retrying...")

Expand Down Expand Up @@ -250,13 +247,11 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()

if err != nil {
return err
}

err = ch.Confirm(false)

if err != nil {
return err
}
Expand All @@ -268,7 +263,6 @@ func (client *Client) init(conn *amqp.Connection) error {
false, // No-wait
nil, // Arguments
)

if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion _examples/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package main

import (
"flag"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"os"
"os/signal"
"syscall"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion _examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"crypto/sha1"
"flag"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"io"
"log"
"os"

amqp "github.com/rabbitmq/amqp091-go"
)

var url = flag.String("url", "amqp:///", "AMQP url for both the publisher and subscriber")
Expand Down
1 change: 0 additions & 1 deletion allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestAllocatorShouldNotReuseEarly(t *testing.T) {
if want, got := first, third; want != got {
t.Fatalf("expected third allocation to be %d, got: %d", want, got)
}

}

func TestAllocatorReleasesKeepUpWithAllocationsForAllSizes(t *testing.T) {
Expand Down
4 changes: 1 addition & 3 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (auth *AMQPlainAuth) Response() string {
}

// ExternalAuth for RabbitMQ-auth-mechanism-ssl.
type ExternalAuth struct {
}
type ExternalAuth struct{}

// Mechanism returns "EXTERNAL"
func (*ExternalAuth) Mechanism() string {
Expand All @@ -70,7 +69,6 @@ func (*ExternalAuth) Response() string {

// Finds the first mechanism preferred by the client that the server supports.
func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) {

for _, auth = range client {
for _, mech := range serverMechanisms {
if auth.Mechanism() == mech {
Expand Down
2 changes: 1 addition & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ it must be redelivered or dropped.
See also Delivery.Nack
*/
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
func (ch *Channel) Nack(tag uint64, multiple, requeue bool) error {
ch.m.Lock()
defer ch.m.Unlock()

Expand Down
25 changes: 10 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ type server struct {
tune connectionTuneOk
}

var defaultLogin = "guest"
var defaultPassword = "guest"
var defaultPlainAuth = &PlainAuth{defaultLogin, defaultPassword}
var defaultAMQPlainAuth = &AMQPlainAuth{defaultLogin, defaultPassword}
var (
defaultLogin = "guest"
defaultPassword = "guest"
defaultPlainAuth = &PlainAuth{defaultLogin, defaultPassword}
defaultAMQPlainAuth = &AMQPlainAuth{defaultLogin, defaultPassword}
)

func defaultConfigWithAuth(auth Authentication) Config {
return Config{
Expand All @@ -48,6 +50,8 @@ func amqplainConfig() Config {
}

func newServer(t *testing.T, serverIO, clientIO io.ReadWriteCloser) *server {
t.Helper()

return &server{
T: t,
r: reader{serverIO},
Expand All @@ -58,6 +62,8 @@ func newServer(t *testing.T, serverIO, clientIO io.ReadWriteCloser) *server {
}

func newSession(t *testing.T) (io.ReadWriteCloser, *server) {
t.Helper()

rs, wc := io.Pipe()
rc, ws := io.Pipe()

Expand Down Expand Up @@ -228,7 +234,6 @@ func TestDefaultClientProperties(t *testing.T) {

go func() {
srv.connectionOpen()

}()

if c, err := Open(rwc, defaultConfig()); err != nil {
Expand All @@ -246,7 +251,6 @@ func TestDefaultClientProperties(t *testing.T) {
if want, got := defaultLocale, srv.start.Locale; want != got {
t.Errorf("expected locale %s got: %s", want, got)
}

}

func TestCustomClientProperties(t *testing.T) {
Expand All @@ -261,7 +265,6 @@ func TestCustomClientProperties(t *testing.T) {

go func() {
srv.connectionOpen()

}()

if c, err := Open(rwc, config); err != nil {
Expand All @@ -275,15 +278,13 @@ func TestCustomClientProperties(t *testing.T) {
if want, got := config.Properties["version"], srv.start.ClientProperties["version"]; want != got {
t.Errorf("expected version %s got: %s", want, got)
}

}

func TestOpen(t *testing.T) {
rwc, srv := newSession(t)
t.Cleanup(func() { rwc.Close() })
go func() {
srv.connectionOpen()

}()

if c, err := Open(rwc, defaultConfig()); err != nil {
Expand Down Expand Up @@ -333,7 +334,6 @@ func TestChannelOpen(t *testing.T) {
go func() {
srv.connectionOpen()
srv.channelOpen(1)

}()

c, err := Open(rwc, defaultConfig())
Expand All @@ -345,7 +345,6 @@ func TestChannelOpen(t *testing.T) {
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}

}

func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
Expand All @@ -361,7 +360,6 @@ func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
if err != ErrSASL {
t.Fatalf("expected ErrSASL got: %+v on %+v", err, c)
}

}

func TestOpenAMQPlainAuth(t *testing.T) {
Expand Down Expand Up @@ -393,7 +391,6 @@ func TestOpenAMQPlainAuth(t *testing.T) {
if table["PASSWORD"] != defaultPassword {
t.Fatalf("unexpected password: want: %s, got: %s", defaultPassword, table["PASSWORD"])
}

}

func TestOpenFailedCredentials(t *testing.T) {
Expand All @@ -410,7 +407,6 @@ func TestOpenFailedCredentials(t *testing.T) {
if err != ErrCredentials {
t.Fatalf("expected ErrCredentials got: %+v on %+v", err, c)
}

}

func TestOpenFailedVhost(t *testing.T) {
Expand Down Expand Up @@ -527,7 +523,6 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
t.Fatalf("failed ack, expected ack#%d to be %d, got %d", i, tag, ack.DeliveryTag)
}
}

}

func TestDeferredConfirmations(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@ func (c *Connection) dispatch0(f frame) {
return
case c.rpc <- m:
}

}
case *heartbeatFrame:
// kthx - all reads reset our deadline. so we can drop this
Expand Down Expand Up @@ -755,7 +754,6 @@ func (c *Connection) reader(r io.Reader) {

for {
frame, err := frames.ReadFrame()

if err != nil {
c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
return
Expand Down
2 changes: 2 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ func TestConnection_Close_WhenMemoryAlarmIsActive(t *testing.T) {
}

func rabbitmqctl(t *testing.T, args ...string) error {
t.Helper()

rabbitmqctlPath, found := os.LookupEnv(rabbitmqctlEnvKey)
if !found {
t.Skipf("variable for %s for rabbitmqctl not found, skipping", rabbitmqctlEnvKey)
Expand Down
2 changes: 1 addition & 1 deletion consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
defer close(out)
defer subs.Done()

var inflight = in
inflight := in
var queue []*Delivery

for delivery := range in {
Expand Down
2 changes: 1 addition & 1 deletion delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized")
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple bool, requeue bool) error
Nack(tag uint64, multiple, requeue bool) error
Reject(tag uint64, requeue bool) error
}

Expand Down
6 changes: 0 additions & 6 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func (client *Client) handleReconnect(addr string) {
client.logger.Println("Attempting to connect")

conn, err := client.connect(addr)

if err != nil {
client.logger.Println("Failed to connect. Retrying...")

Expand All @@ -195,7 +194,6 @@ func (client *Client) handleReconnect(addr string) {
// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)

if err != nil {
return nil, err
}
Expand All @@ -214,7 +212,6 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
client.m.Unlock()

err := client.init(conn)

if err != nil {
client.logger.Println("Failed to initialize channel. Retrying...")

Expand Down Expand Up @@ -244,13 +241,11 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()

if err != nil {
return err
}

err = ch.Confirm(false)

if err != nil {
return err
}
Expand All @@ -262,7 +257,6 @@ func (client *Client) init(conn *amqp.Connection) error {
false, // No-wait
nil, // Arguments
)

if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func ExampleChannel_Confirm_bridge() {
// And the body
Body: msg.Body,
})

if err != nil {
if e := msg.Nack(false, false); e != nil {
log.Printf("nack error: %+v", e)
Expand Down
Loading

0 comments on commit 6867443

Please sign in to comment.