Skip to content

Commit

Permalink
Allow multiple consumers (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall authored Dec 16, 2024
1 parent 39cebc5 commit 2b04619
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 60 deletions.
2 changes: 0 additions & 2 deletions examples/coverpage/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ FROM ${DOCKER_REPOSITORY}/scyllaridae-pandoc:${TAG}

RUN apt-get update \
&& apt-get install -y --no-install-recommends \
curl=8.5.0-2ubuntu10.5 \
ghostscript=10.02.1~dfsg1-0ubuntu7.4 \
jq=1.7.1-3build1 \
&& rm -rf /var/lib/apt/lists/*
COPY . /app
5 changes: 4 additions & 1 deletion examples/pandoc/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
ARG TAG=main
ARG DOCKER_REPOSITORY=lehighlts
FROM ${DOCKER_REPOSITORY}/scyllaridae:${TAG} AS scyllaridae
FROM pandoc/latex:3.5.0-ubuntu AS pandoc
FROM pandoc/latex:3.6-ubuntu AS pandoc

# hadolint ignore=DL3008
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
curl \
bash=5.2.21-2ubuntu4 \
gosu=1.17-1ubuntu0.24.04.2 \
jq=1.7.1-3build1 \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /app
Expand Down
128 changes: 71 additions & 57 deletions stomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,32 @@ func runStompSubscribers(config *scyllaridae.ServerConfig) {
var wg sync.WaitGroup

for _, middleware := range config.QueueMiddlewares {
wg.Add(1)
go func(middleware scyllaridae.QueueMiddleware) {
defer wg.Done()

for {
select {
case <-stopChan:
slog.Info("Stopping subscriber for queue", "queue", middleware.QueueName)
return
default:
// Process one message at a time
err := RecvAndProcessMessage(middleware.QueueName, middleware)
if err != nil {
slog.Error("Error processing message", "queue", middleware.QueueName, "error", err)
numConsumers := middleware.Consumers
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go func(middleware scyllaridae.QueueMiddleware, consumerID int) {
defer wg.Done()

slog.Info("Starting subscriber", "queue", middleware.QueueName, "consumer", consumerID)

for {
select {
case <-stopChan:
slog.Info("Stopping subscriber", "queue", middleware.QueueName, "consumer", consumerID)
return
default:
err := RecvAndProcessMessage(middleware.QueueName, middleware)
if err != nil {
slog.Error("Error processing message", "queue", middleware.QueueName, "consumer", consumerID, "error", err)
}
}
}
}
}(middleware)
}(middleware, i)
}
}

// Wait for a termination signal
<-stopChan
slog.Info("Shutting down message listeners")

// Wait for all subscribers to gracefully stop
slog.Info("Shutting down all message listeners")
wg.Wait()
}

Expand Down Expand Up @@ -76,53 +77,66 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew
slog.Error("Cannot set keepalive period", "err", err.Error())
return err
}

conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 0*time.Second))
if err != nil {
slog.Error("Cannot connect to STOMP server", "err", err.Error())
return err
}
defer func() {
err := conn.Disconnect()
for {
conn, err := stomp.Connect(tcpConn,
stomp.ConnOpt.HeartBeat(10*time.Second, 10*time.Second),
stomp.ConnOpt.HeartBeatGracePeriodMultiplier(1.5),
stomp.ConnOpt.HeartBeatError(60*time.Second),
)
if err != nil {
slog.Error("Problem disconnecting from STOMP server", "err", err)
}
}()
sub, err := conn.Subscribe(queueName, stomp.AckClient)
if err != nil {
slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
}
defer func() {
if !sub.Active() {
return
slog.Error("Cannot connect to STOMP server", "err", err.Error())
return err
}
err := sub.Unsubscribe()
defer func() {
err := conn.Disconnect()
if err != nil {
slog.Error("Problem disconnecting from STOMP server", "err", err)
}
}()
sub, err := conn.Subscribe(queueName, stomp.AckClient)
if err != nil {
slog.Error("Problem unsubscribing", "err", err)
slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
}
}()
slog.Info("Subscribed to queue", "queue", queueName)

// Process one message at a time
for {
msg := <-sub.C // Blocking read for one message
if msg == nil || len(msg.Body) == 0 {
defer func() {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
return
}
err := sub.Unsubscribe()
if err != nil {
slog.Error("Problem unsubscribing", "err", err)
}
}()
slog.Info("Subscribed to queue", "queue", queueName)

// Process one message at a time
for {
// Wait for the next message (blocks if the channel is empty)
msg, ok := <-sub.C
if !ok {
// Subscription is no longer active
return fmt.Errorf("subscription to %s is closed", queueName)
}
continue
}

// Process the message
handleMessage(msg, middleware)
// Check for an empty or nil message
if msg == nil || len(msg.Body) == 0 {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
}
continue
}

// Acknowledge the message after successful processing
err := msg.Conn.Ack(msg)
if err != nil {
slog.Error("Failed to acknowledge message", "queue", queueName, "error", err)
// Process the message synchronously
handleMessage(msg, middleware)

// Acknowledge the message after successful processing
err := msg.Conn.Ack(msg)
if err != nil {
slog.Error("Failed to acknowledge message", "queue", queueName, "error", err)
}
}
}

}

func handleMessage(msg *stomp.Message, middleware scyllaridae.QueueMiddleware) {
Expand Down

0 comments on commit 2b04619

Please sign in to comment.