Skip to content

Commit

Permalink
Separate server and stomp subscriber logic (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall authored Oct 4, 2024
1 parent d7cd6f7 commit 92079f7
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 305 deletions.
298 changes: 5 additions & 293 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,311 +1,23 @@
package main

import (
"bytes"
"encoding/base64"
"fmt"
"log/slog"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

stomp "github.com/go-stomp/stomp/v3"
scyllaridae "github.com/lehigh-university-libraries/scyllaridae/internal/config"
"github.com/lehigh-university-libraries/scyllaridae/pkg/api"
)

var (
config *scyllaridae.ServerConfig
)

func init() {
var err error

config, err = scyllaridae.ReadConfig("scyllaridae.yml")
func main() {
config, err := scyllaridae.ReadConfig("scyllaridae.yml")
if err != nil {
slog.Error("Could not read YML", "err", err)
os.Exit(1)
}
}

func main() {
if len(config.QueueMiddlewares) > 0 {
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)

var wg sync.WaitGroup

for _, middleware := range config.QueueMiddlewares {
wg.Add(1)
go func(middleware scyllaridae.QueueMiddleware) {
defer wg.Done()
messageChan := make(chan *stomp.Message, middleware.Consumers)

// Start the specified number of worker goroutines
for i := 0; i < middleware.Consumers; i++ {
slog.Info("Adding consumer", "consumer", i)
go worker(messageChan, middleware)
}

RecvStompMessages(middleware.QueueName, messageChan)
}(middleware)
}

<-stopChan
slog.Info("Shutting down message listener")
} else {
// or make this an available API ala crayfish
http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, "OK")
})
http.HandleFunc("/", MessageHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}

slog.Info("Server listening", "port", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
panic(err)
}
}
}

func MessageHandler(w http.ResponseWriter, r *http.Request) {
slog.Info(r.RequestURI, "method", r.Method, "ip", r.RemoteAddr, "proto", r.Proto)

if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
defer r.Body.Close()

if r.Header.Get("Apix-Ldp-Resource") == "" && r.Header.Get("X-Islandora-Event") == "" {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}

// Read the Alpaca message payload
auth := ""
if config.ForwardAuth {
auth = r.Header.Get("Authorization")
}
message, err := api.DecodeAlpacaMessage(r, auth)
if err != nil {
slog.Error("Error decoding alpaca message", "err", err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}

// Stream the file contents from the source URL
req, err := http.NewRequest("GET", message.Attachment.Content.SourceURI, nil)
if err != nil {
slog.Error("Error creating request to source", "source", message.Attachment.Content.SourceURI, "err", err)
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
if config.ForwardAuth {
req.Header.Set("Authorization", auth)
}
sourceResp, err := http.DefaultClient.Do(req)
if err != nil {
slog.Error("Error fetching source file contents", "err", err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
defer sourceResp.Body.Close()
if sourceResp.StatusCode != http.StatusOK {
slog.Error("SourceURI sent a bad status code", "code", sourceResp.StatusCode, "uri", message.Attachment.Content.SourceURI)
http.Error(w, "Bad request", http.StatusBadRequest)
return
}

cmd, err := scyllaridae.BuildExecCommand(message, config)
if err != nil {
slog.Error("Error building command", "err", err)
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
cmd.Stdin = sourceResp.Body

// Create a buffer to stream the output of the command
var stdErr bytes.Buffer
cmd.Stderr = &stdErr

// send stdout to the ResponseWriter stream
cmd.Stdout = w

slog.Info("Running command", "cmd", cmd.String())
if err := cmd.Run(); err != nil {
slog.Error("Error running command", "cmd", cmd.String(), "err", stdErr.String())
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
}

func worker(messageChan <-chan *stomp.Message, middleware scyllaridae.QueueMiddleware) {
for msg := range messageChan {
handleMessage(msg, middleware)
}
}

func RecvStompMessages(queueName string, messageChan chan<- *stomp.Message) {
attempt := 0
maxAttempts := 30
for attempt = 0; attempt < maxAttempts; attempt += 1 {
if err := connectAndSubscribe(queueName, messageChan); err != nil {
slog.Error("resubscribing", "queue", queueName, "error", err)
if err := retryWithExponentialBackoff(attempt, maxAttempts); err != nil {
slog.Error("Failed subscribing after too many failed attempts", "queue", queueName, "attempts", attempt)
return
}
} else {
// Subscription was successful
break
}
}
}

func connectAndSubscribe(queueName string, messageChan chan<- *stomp.Message) error {
addr := os.Getenv("STOMP_SERVER_ADDR")
if addr == "" {
addr = "activemq:61613"
}

c, err := net.Dial("tcp", addr)
if err != nil {
slog.Error("cannot connect to port", "err", err.Error())
return err
}
tcpConn := c.(*net.TCPConn)

err = tcpConn.SetKeepAlive(true)
if err != nil {
slog.Error("cannot set keepalive", "err", err.Error())
return err
}

err = tcpConn.SetKeepAlivePeriod(10 * time.Second)
if err != nil {
slog.Error("cannot set keepalive period", "err", err.Error())
return err
}

conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 0*time.Second))
if err != nil {
slog.Error("cannot connect to stomp server", "err", err.Error())
return err
}
defer func() {
err := conn.Disconnect()
if err != nil {
slog.Error("problem disconnecting from stomp server", "err", err)
}
}()

sub, err := conn.Subscribe(queueName, stomp.AckAuto)
if err != nil {
slog.Error("cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
}
defer func() {
if !sub.Active() {
return
}
err := sub.Unsubscribe()
if err != nil {
slog.Error("problem unsubscribing", "err", err)
}
}()
slog.Info("Server subscribed to", "queue", queueName)

for msg := range sub.C {
if msg == nil || len(msg.Body) == 0 {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
}
continue
}
messageChan <- msg // Send the message to the channel
}

return nil
}

func handleMessage(msg *stomp.Message, middleware scyllaridae.QueueMiddleware) {
req, err := http.NewRequest("GET", middleware.Url, nil)
if err != nil {
slog.Error("Error creating HTTP request", "url", middleware.Url, "err", err)
return
}

req.Header.Set("X-Islandora-Event", base64.StdEncoding.EncodeToString(msg.Body))
islandoraMessage, err := api.DecodeEventMessage(msg.Body)
if err != nil {
slog.Error("Unable to decode event message", "err", err)
return
}

if middleware.ForwardAuth {
auth := msg.Header.Get("Authorization")
if auth != "" {
req.Header.Set("Authorization", auth)
}
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
slog.Error("Error sending HTTP GET request", "url", middleware.Url, "err", err)
return
}
defer resp.Body.Close()

if resp.StatusCode >= 299 {
slog.Error("Failed to deliver message", "url", middleware.Url, "status", resp.StatusCode)
return
}

if middleware.NoPut {
return
}

putReq, err := http.NewRequest("PUT", islandoraMessage.Attachment.Content.DestinationURI, resp.Body)
if err != nil {
slog.Error("Error creating HTTP PUT request", "url", islandoraMessage.Attachment.Content.DestinationURI, "err", err)
return
}

putReq.Header.Set("Authorization", msg.Header.Get("Authorization"))
putReq.Header.Set("Content-Type", islandoraMessage.Attachment.Content.DestinationMimeType)
putReq.Header.Set("Content-Location", islandoraMessage.Attachment.Content.FileUploadURI)

// Send the PUT request
putResp, err := client.Do(putReq)
if err != nil {
slog.Error("Error sending HTTP PUT request", "url", islandoraMessage.Attachment.Content.DestinationURI, "err", err)
return
}
defer putResp.Body.Close()

if putResp.StatusCode >= 299 {
slog.Error("Failed to PUT data", "url", islandoraMessage.Attachment.Content.DestinationURI, "status", putResp.StatusCode)
runStompSubscribers(config)
} else {
slog.Info("Successfully PUT data to", "url", islandoraMessage.Attachment.Content.DestinationURI, "status", putResp.StatusCode)
}
}

func retryWithExponentialBackoff(attempt int, maxAttempts int) error {
if attempt >= maxAttempts {
return fmt.Errorf("maximum retry attempts reached")
server := &Server{Config: config}
runHTTPServer(server)
}
wait := time.Duration(rand.Intn(1<<attempt)) * time.Second
time.Sleep(wait)
return nil
}
27 changes: 15 additions & 12 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ type Test struct {
}

func TestMessageHandler_MethodNotAllowed(t *testing.T) {
testConfig := &scyllaridae.ServerConfig{}
server := &Server{Config: testConfig}

req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

rr := httptest.NewRecorder()
handler := http.HandlerFunc(MessageHandler)

handler := http.HandlerFunc(server.MessageHandler)
handler.ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusMethodNotAllowed {
t.Errorf("handler returned wrong status code: got %v want %v",
t.Errorf("Handler returned wrong status code: got %v want %v",
status, http.StatusMethodNotAllowed)
}
}
Expand Down Expand Up @@ -216,19 +217,20 @@ cmdByMimeType:
destinationServer := createMockDestinationServer(t, tt.returnedBody)
defer destinationServer.Close()

sourceServer := createMockSourceServer(t, tt.mimetype, tt.authHeader, destinationServer.URL)
defer sourceServer.Close()

os.Setenv("SCYLLARIDAE_YML", tt.yml)
// set the config based on tt.yml
config, err = scyllaridae.ReadConfig("")
config, err := scyllaridae.ReadConfig("")

sourceServer := createMockSourceServer(t, config, tt.mimetype, tt.authHeader, destinationServer.URL)
defer sourceServer.Close()
if err != nil {
t.Fatalf("Could not read YML: %v", err)
os.Exit(1)
}

// Create a Server instance with the test config
server := &Server{Config: config}

// Configure and start the main server
setupServer := httptest.NewServer(http.HandlerFunc(MessageHandler))
setupServer := httptest.NewServer(http.HandlerFunc(server.MessageHandler))
defer setupServer.Close()

// Send the mock message to the main server
Expand Down Expand Up @@ -260,6 +262,7 @@ cmdByMimeType:
}
})
}

}

func createMockDestinationServer(t *testing.T, content string) *httptest.Server {
Expand All @@ -270,7 +273,7 @@ func createMockDestinationServer(t *testing.T, content string) *httptest.Server
}))
}

func createMockSourceServer(t *testing.T, mimetype, auth, content string) *httptest.Server {
func createMockSourceServer(t *testing.T, config *scyllaridae.ServerConfig, mimetype, auth, content string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if config.ForwardAuth && r.Header.Get("Authorization") != auth {
w.WriteHeader(http.StatusUnauthorized)
Expand Down
Loading

0 comments on commit 92079f7

Please sign in to comment.