Replies: 3 comments
-
Sorry for the late response, I'm currently a bit too busy to take a deeper look into this but I took a look at the Google Pub/Sub event bus implementation of another event-sourcing library here, and they are implementing it as you described (using the Ordering Key). One thing to consider is the "wildcard" feature that the NATS and In-Memory event buses support. I found this link that claims it should be possible but couldn't find a deeper guide for now. Wildcard examplepackage example
func example(bus event.Bus) {
// Subscribe to all events.
events, errs, err := bus.Subscribe(context.TODO(), "*")
} To test the event bus implementation, you can use the eventbustest package and simply call the test suites: // backend/gcp/eventbus_test.go
package gcp_test
import (
"testing"
"github.com/modernice/goes/backend/testing/eventbustest"
"github.com/modernice/goes/codec"
"github.com/modernice/goes/event"
"github.com/modernice/goes/event/eventbus"
)
func TestEvenBus(t *testing.T) {
eventbustest.RunCore(t, newBus)
eventbustest.RunWildcard(t, newBus)
}
func newBus(codec.Encoding) event.Bus {
// Return Pub/Sub event bus.
} Hope this helps, and if you have any questions, I'm happy to help 🙂 |
Beta Was this translation helpful? Give feedback.
-
Hi @bounoable , I have a doubt regarding the Bus interface. The consumer will receive a channel that will provide the events to consume. This schema does offer no warranty regarding the events are processed. If the bus deliver an event to a handler, and the handler is stopped in any way (the node goes down, a processing error, etc.) the event will be lost. Are you using any mechanism to avoid loosing events? |
Beta Was this translation helpful? Give feedback.
-
Hey @totemcaf, yes you're right that you definitely cannot rely on the event bus to "confirm" processing of events. If your node goes down, or if your consumer is too slow you can "lose" events. There are solutions to this issue, but it's hard giving a single answer to cover all possible scenarios. Most important is that you persist events in an event store if you need to provide this kind of guarantee. In most cases, the easiest solution will be to create a projection that implements the ProgressAware API, and use the ExampleGiven an application that publishes a package example
import (
"context"
"github.com/google/uuid"
"github.com/modernice/goes/projection"
"github.com/modernice/goes/projection/schedule"
"github.com/modernice/goes/persistence/model"
)
type WelcomeMailer struct {
*projection.Progressor
id uuid.UUID
}
func NewMailer(id uuid.UUID) *WelcomeMailer {
return &WelcomeMailer{
Progressor: projection.NewProgressor(),
id: id,
}
}
// ModelID implements model.Model (for the WelcomeMailerRepository).
func (wm *WelcomeMailer) ModelID() uuid.UUID {
return wm.id
}
func (wm *WelcomeMailer) ApplyEvent(evt event.Event) {
// Send welcome mail to user ...
}
type WelcomeMailerRepository = model.Repository[*WelcomeMailer, uuid.UUID]
// Defining a fixed UUID for the mailer because there will only ever be a single mailer.
var WelcomeMailerID = uuid.MustParse("949189d7-6de5-4c27-b8ea-91c3e3b8e33f")
func RunMailer(
ctx context.Context,
bus event.Bus,
store event.Store,
repo WelcomeMailerRepository,
) (<-chan error, error) {
// Define a schedule that is triggered by "user.registered" events.
s := schedule.Continuously(bus, store, []string{"user.registered"})
return s.Subscribe(
ctx,
func(ctx projection.Job) error {
// Fetch the mailer state from the repository.
wm, err := repo.Fetch(ctx, WelcomeMailerID)
if err != nil {
return fmt.Errorf("welcome mailer not found: %w", err)
}
// Apply the events from the job to the WelcomeMailer.
// In this example, it will only ever apply "user.registered" events.
if err := ctx.Apply(ctx, wm); err != nil {
return err
}
// Save the updated mailer back to the repository.
return repo.Save(ctx, wm)
},
// The Startup() option fetches the "user.registered" events from the event store
// and immediately creates a projection job for these events. Because WelcomeMailer
// embeds *Progressor, only the events that haven't already been applied to the WelcomeMailer
// will be fetched and applied.
projection.Startup(),
)
} package main
import (
"example"
"github.com/google/uuid"
"github.com/modernice/goes/backend/memory"
)
func main() {
// Using in-memory repository, event bus, and event store for brevity.
// In your application you should use production-ready backends (MongoDB, NATS etc.)
repo := memory.NewModelRepository[*example.WelcomeMailer, uuid.UUID](memory.ModelFactory(example.NewMailer))
bus := eventbus.New()
store := eventstore.New()
mailerErrors, err := example.RunMailer(context.TODO(), bus, store, repo)
if err != nil {
log.Panic(err)
}
for err := range mailerErrors {
log.Println(err)
}
} This example might not be the best because "welcome mails" are arguably something that doesn't need this kind of guarantee in most applications 😄 But if you really need to give this guarantee then it's certainly possible to do this using the |
Beta Was this translation helpful? Give feedback.
-
Summary
Provide an Event Bus backend implementation backed by Google PubSub event broker.
Motivation
We are deploying our application GCE and we need an event bus to publish events so the application can use an event driven architecture. One of the uses is as for Event Sourcing and to allow the projectors to see events from aggregates.
We evaluated Google PubSub and saw that it supports Event Ordering. An event can provide an Ordering Key and the infrastructure will warrant the events for the same Ordering Key will be provided in order. Our intent is to use the Aggregate ID as the ordering key.
Proposal
The idea is to implement the
event.Bus
interface using thecloud.google.com/go/pubsub
package. This implementation will take thenats.bus
implementation as a base.Using
topicFunc
(like subjectFunc in nats implementation), all events of a given aggregate type can be published on the same PubSub topic.Also the Aggregate ID in the event can be used as an Ordering Key.
Note
I'm using this implementation to learn about goes and its capabilities. Any suggestion or guidence us wellcome.
Beta Was this translation helpful? Give feedback.
All reactions