Skip to content

Commit

Permalink
feat!: add Sender field to Message
Browse files Browse the repository at this point in the history
  • Loading branch information
mdawar committed Jan 22, 2025
1 parent 537e40f commit b96b101
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 176 deletions.
46 changes: 35 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import "github.com/mdawar/pubsub"
#### Create a Broker

```go
// Create a broker and specify the topics type and the message payloads type.
broker := pubsub.NewBroker[string, string]()
// Create a message broker.
// The type params are for the topic, payload and sender respectively.
broker := pubsub.NewBroker[string, string, string]()

// The message payload type can be any type.
events := pubsub.NewBroker[string, Event]()
// Any types can be used.
events := pubsub.NewBroker[string, Event, int]()
```

#### Subscriptions
Expand All @@ -54,13 +55,24 @@ broker.Unsubscribe(sub2, "actions", "testing")

// Unsubscribe from all topics.
// The channel will not be closed, it will only stop receiving messages.
// Note: Specifying all the topics is much more efficient if performance is critical.
// NOTE: Specifying all the topics is much more efficient if performance is critical.
broker.Unsubscribe(sub2)
```

#### Publishing Messages

```go
// A message is composed of a topic, payload and an optional sender.
// The type params are the same types used when creating the broker.
var msg = pubsub.Message[string, string, string]{
Topic: "events",
Payload: "Sample message",
Sender: "sender-id",
}

// You can specify an alias for the generic message type.
type Message = pubsub.Message[string, string, string]

// Publish a message with the specified payload.
// The payload can be of any type that is specified when creating the broker.
//
Expand All @@ -71,7 +83,11 @@ broker.Unsubscribe(sub2)
// or until the context is canceled.
//
// A nil return value indicates that all the subscribers received the message.
broker.Publish(context.TODO(), "events", "Sample message")
broker.Publish(context.TODO(), Message{
Topic: "events",
Payload: "Sample message",
Sender: "sender-id", // Optional.
})
```

```go
Expand All @@ -80,7 +96,7 @@ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// In this case, Publish will deliver the message to subscribers that are
// ready and will wait for the others for up to the timeout duration.
err := broker.Publish(ctx, "events", "Sample message")
err := broker.Publish(ctx, Message{Topic: "events", Payload: "Sample message"})
// The error is not nil if the context was canceled or the deadline exceeded.
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -97,15 +113,21 @@ if err != nil {
// The message is sent sequentially to the subscribers that are ready to receive it
// and the others are skipped.
//
// Note: Message delivery is not guaranteed.
broker.TryPublish("events", "A message that may not be delivered")
// NOTE: Message delivery is not guaranteed.
broker.TryPublish(Message{
Topic: "events",
Payload: "A message that may not be delivered",
})

// Buffered subscriptions can be used for guaranteed delivery with a non-blocking publish.
//
// Publish will still block if any subscription's channel buffer is full, or any of the
// subscriptions is an unbuffered channel.
sub := broker.SubscribeWithCapacity(1, "events")
broker.Publish(context.TODO(), "events", "Guaranteed delivery message")
broker.Publish(context.TODO(), Message{
Topic: "events",
Payload: "Guaranteed delivery message",
})
```

#### Messages
Expand All @@ -120,13 +142,15 @@ msg := <-sub
msg.Topic
// The payload that was published using Publish() or TryPublish().
msg.Payload
// The message sender.
msg.Sender
```

#### Topics

```go
// Get a slice of all the topics registered on the broker.
// Note: The order of the topics is not guaranteed.
// NOTE: The order of the topics is not guaranteed.
topics := broker.Topics()

// Get the total number of topics.
Expand Down
16 changes: 8 additions & 8 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func BenchmarkBrokerSubscribe(b *testing.B) {
broker := pubsub.NewBroker[string, string]()
broker := pubsub.NewBroker[string, string, string]()

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -19,7 +19,7 @@ func BenchmarkBrokerSubscribe(b *testing.B) {
}

func BenchmarkBrokerSubscribeWithCapacity(b *testing.B) {
broker := pubsub.NewBroker[string, string]()
broker := pubsub.NewBroker[string, string, string]()

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -28,9 +28,9 @@ func BenchmarkBrokerSubscribeWithCapacity(b *testing.B) {
}

func BenchmarkBrokerUnsubscribe(b *testing.B) {
broker := pubsub.NewBroker[string, string]()
broker := pubsub.NewBroker[string, string, string]()

subs := make([]<-chan pubsub.Message[string, string], 0, b.N)
subs := make([]<-chan pubsub.Message[string, string, string], 0, b.N)

for i := 0; i < b.N; i++ {
sub := broker.Subscribe(strconv.Itoa(i))
Expand All @@ -49,7 +49,7 @@ func BenchmarkBrokerPublish(b *testing.B) {

for _, count := range cases {
b.Run(strconv.Itoa(count), func(b *testing.B) {
broker := pubsub.NewBroker[string, string]()
broker := pubsub.NewBroker[string, string, string]()
topic := "testing"

done := make(chan struct{})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkBrokerPublish(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
broker.Publish(ctx, topic, strconv.Itoa(i))
broker.Publish(ctx, Message{Topic: topic, Payload: strconv.Itoa(i)})
}
b.StopTimer()

Expand All @@ -94,7 +94,7 @@ func BenchmarkBrokerTryPublish(b *testing.B) {

for _, count := range cases {
b.Run(strconv.Itoa(count), func(b *testing.B) {
broker := pubsub.NewBroker[string, string]()
broker := pubsub.NewBroker[string, string, string]()
topic := "testing"

done := make(chan struct{})
Expand Down Expand Up @@ -122,7 +122,7 @@ func BenchmarkBrokerTryPublish(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
broker.TryPublish(topic, strconv.Itoa(i))
broker.TryPublish(Message{Topic: topic, Payload: strconv.Itoa(i)})
}
b.StopTimer()

Expand Down
48 changes: 25 additions & 23 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
)

// Message represents a message delivered by the broker to a subscriber.
type Message[T any, P any] struct {
type Message[T any, P any, S any] struct {
// Topic is the topic on which the message is published.
Topic T
// Payload holds the published value.
Payload P
// Sender is an identifier for the message's sender.
Sender S
}

// Broker represents a message broker.
Expand All @@ -21,26 +23,26 @@ type Message[T any, P any] struct {
// of messages to specific topics.
//
// The Broker supports concurrent operations.
type Broker[T comparable, P any] struct {
type Broker[T comparable, P any, S any] struct {
// Mutex to protect the subs map.
mu sync.RWMutex
// subs holds the topics and their subscriptions as a slice.
subs map[T][]chan Message[T, P]
subs map[T][]chan Message[T, P, S]
}

// NewBroker creates a new message [Broker] instance.
func NewBroker[T comparable, P any]() *Broker[T, P] {
return &Broker[T, P]{
subs: make(map[T][]chan Message[T, P]),
func NewBroker[T comparable, P any, S any]() *Broker[T, P, S] {
return &Broker[T, P, S]{
subs: make(map[T][]chan Message[T, P, S]),
}
}

// Topics returns a slice of all the topics registered on the [Broker].
//
// A nil slice is returned if there are no topics.
//
// Note: The order of the topics is not guaranteed.
func (b *Broker[T, P]) Topics() []T {
// NOTE: The order of the topics is not guaranteed.
func (b *Broker[T, P, S]) Topics() []T {
b.mu.RLock()
defer b.mu.RUnlock()

Expand All @@ -54,14 +56,14 @@ func (b *Broker[T, P]) Topics() []T {
}

// NumTopics returns the total number of topics registered on the [Broker].
func (b *Broker[T, P]) NumTopics() int {
func (b *Broker[T, P, S]) NumTopics() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.subs)
}

// Subscribers returns the number of subscriptions on the specified topic.
func (b *Broker[T, P]) Subscribers(topic T) int {
func (b *Broker[T, P, S]) Subscribers(topic T) int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.subs[topic])
Expand All @@ -70,18 +72,18 @@ func (b *Broker[T, P]) Subscribers(topic T) int {
// Subscribe creates a subscription for the specified topics.
//
// The created subscription channel is unbuffered (capacity = 0).
func (b *Broker[T, P]) Subscribe(topics ...T) <-chan Message[T, P] {
func (b *Broker[T, P, S]) Subscribe(topics ...T) <-chan Message[T, P, S] {
return b.SubscribeWithCapacity(0, topics...)
}

// Subscribe creates a subscription for the specified topics with the specified capacity.
//
// The capacity specifies the subscription channel's buffer capacity.
func (b *Broker[T, P]) SubscribeWithCapacity(capacity int, topics ...T) <-chan Message[T, P] {
func (b *Broker[T, P, S]) SubscribeWithCapacity(capacity int, topics ...T) <-chan Message[T, P, S] {
b.mu.Lock()
defer b.mu.Unlock()

sub := make(chan Message[T, P], capacity)
sub := make(chan Message[T, P, S], capacity)

for _, topic := range topics {
b.subs[topic] = append(b.subs[topic], sub)
Expand All @@ -96,8 +98,8 @@ func (b *Broker[T, P]) SubscribeWithCapacity(capacity int, topics ...T) <-chan M
//
// The channel will not be closed, it will only stop receiving messages.
//
// Note: Specifying the topics to unsubscribe from can be more efficient.
func (b *Broker[T, P]) Unsubscribe(sub <-chan Message[T, P], topics ...T) {
// NOTE: Specifying the topics to unsubscribe from can be more efficient.
func (b *Broker[T, P, S]) Unsubscribe(sub <-chan Message[T, P, S], topics ...T) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -118,7 +120,7 @@ func (b *Broker[T, P]) Unsubscribe(sub <-chan Message[T, P], topics ...T) {
// removeSubscription removes a subscription channel from a topic.
//
// The topic will be removed if there are no other subscriptions.
func (b *Broker[T, P]) removeSubscription(sub <-chan Message[T, P], topic T) {
func (b *Broker[T, P, S]) removeSubscription(sub <-chan Message[T, P, S], topic T) {
subscribers := b.subs[topic]
for i, s := range subscribers {
if s == sub {
Expand Down Expand Up @@ -146,12 +148,12 @@ func (b *Broker[T, P]) removeSubscription(sub <-chan Message[T, P], topic T) {
// A nil return value indicates that all the subscribers received the message.
//
// If there are no subscribers to the topic, the message will be discarded.
func (b *Broker[T, P]) Publish(ctx context.Context, topic T, payload P) error {
func (b *Broker[T, P, S]) Publish(ctx context.Context, msg Message[T, P, S]) error {
b.mu.RLock()
defer b.mu.RUnlock()

subs := b.subs[topic]
msg := Message[T, P]{Topic: topic, Payload: payload}
// TODO: add test for empty topic.
subs := b.subs[msg.Topic]

switch len(subs) {
case 0:
Expand Down Expand Up @@ -189,14 +191,14 @@ func (b *Broker[T, P]) Publish(ctx context.Context, topic T, payload P) error {
// The message is sent sequentially to the subscribers that are ready to receive it and the others
// are skipped.
//
// Note: Use the [Broker.Publish] method for guaranteed delivery.
func (b *Broker[T, P]) TryPublish(topic T, payload P) {
// NOTE: Use the [Broker.Publish] method for guaranteed delivery.
func (b *Broker[T, P, S]) TryPublish(msg Message[T, P, S]) {
b.mu.RLock()
defer b.mu.RUnlock()

for _, sub := range b.subs[topic] {
for _, sub := range b.subs[msg.Topic] {
select {
case sub <- Message[T, P]{Topic: topic, Payload: payload}:
case sub <- msg:
default:
}
}
Expand Down
Loading

0 comments on commit b96b101

Please sign in to comment.