Skip to content

Commit

Permalink
feat: add tests for the cmq package
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Sep 6, 2024
1 parent ba637e6 commit 2857ae0
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 2 deletions.
21 changes: 19 additions & 2 deletions pkg/cmq/cmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmq

import (
"errors"
"fmt"
)

var (
Expand All @@ -10,6 +11,19 @@ var (
ErrSubscriberNotFound = errors.New("subscriber doesn't exist")
)

type SubscriberFullError struct {
Subscriber string
Topic string
}

func (err SubscriberFullError) Error() string {
return fmt.Sprintf(
"cannot publish into subscriber named %s of topic %s because its channel is already full",
err.Subscriber,
err.Topic,
)
}

type SubscriberContext[T any] struct {
ch chan T
exit chan struct{}
Expand Down Expand Up @@ -46,16 +60,19 @@ func (mmq MockMessageQueue[T]) Publish(topic string, message T) error {
mmq.queues[topic] = make(map[string]chan T)
}

for _, channel := range mmq.queues[topic] {
var err error

for name, channel := range mmq.queues[topic] {
select {
case channel <- message:
default:
// just like NATS we are ignoring consumers that can not consume as fast as
// producer.
err = errors.Join(err, SubscriberFullError{name, topic})
}
}

return nil
return err
}

// Register a subscribe group on a topic. You need to register subscriber group before using it.
Expand Down
78 changes: 78 additions & 0 deletions pkg/cmq/cmq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cmq_test

import (
"errors"
"sync"
"testing"

"github.com/1995parham-learning/cmq-1/pkg/cmq"
)

func TestFullSubscriber(t *testing.T) {
mmq := cmq.NewMockMessageQueue[int]()

// create a subscribe groip on "numbers" topic which is named "s1"
if err := mmq.Register("s1", "numbers", 1); err != nil {
t.Fatalf("failed to create subscriber group named s1 on numbers topic %s", err)
}

if err := mmq.Publish("numbers", 78); err != nil {
t.Fatalf("failed to publish on numbers topic %s", err)
}

err := mmq.Publish("numbers", 78)
if err == nil {
t.Fatalf("publish on full subscriber should fail")
}

var subErr cmq.SubscriberFullError
if !errors.As(err, &subErr) {
t.Fatalf("publish on full subscriber should fail with subscribe error but failed with %s", err)
}
if subErr.Topic != "numbers" {
t.Fatalf("the name of topic is %s instead of numbers", subErr.Topic)
}
if subErr.Subscriber != "s1" {
t.Fatalf("the name of subscriber is %s instead of s1", subErr.Subscriber)
}
}

func TestPublishAndSubscribe(t *testing.T) {
mmq := cmq.NewMockMessageQueue[int]()

// create a subscribe groip on "numbers" topic which is named "s1"
if err := mmq.Register("s1", "numbers", 10); err != nil {
t.Fatalf("failed to create subscriber group named s1 on numbers topic %s", err)
}

sub, err := mmq.Subscribe("s1", "numbers")
if err != nil {
t.Fatalf("failed to subscribe on group named s1 and numbers topic %s", err)
}

var wg sync.WaitGroup

wg.Add(1)
go func() {
i := <-sub.Channel()
if i != 78 {
t.Errorf("read %d from subscribe instead of 78", i)
t.Fail()
}
sub.Close()
wg.Done()
}()

// subscriber has 10 empty place, so we can insert
// 10 numbers without any error.
if err := mmq.Publish("numbers", 78); err != nil {
t.Fatalf("failed to publish on numbers topic %s", err)
}
for i := range 9 {
if err := mmq.Publish("numbers", i); err != nil {
t.Fatalf("failed to publish on numbers topic %s", err)
}
}

wg.Wait()
}

0 comments on commit 2857ae0

Please sign in to comment.