Skip to content

Commit

Permalink
[minor] Add a cache warmer script (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall authored May 7, 2024
1 parent b55cc37 commit 49a14d9
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 56 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
scyllaridae
scyllaridae.yml
!examples/*/scyllaridae.yml
9 changes: 9 additions & 0 deletions examples/cache-warmer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ARG TAG=main
ARG DOCKER_REPOSITORY=local
FROM ${DOCKER_REPOSITORY}/scyllaridae:${TAG}

RUN apk update && \
apk add --no-cache jq==1.7.1-r0

COPY cmd.sh /app/
COPY scyllaridae.yml /app/scyllaridae.yml
117 changes: 117 additions & 0 deletions examples/cache-warmer/cmd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env bash

set -eou pipefail

export LOCK_FILE="/tmp/scyllaridae-cache.lock"

handle_error() {
rm -f "$LOCK_FILE"
exit 1
}
trap 'handle_error' ERR

# curl wrapper function so on 302 we can forward the cache-warmer paramater
process_url() {
local URL="$1"
local COUNT=0
echo "Crawling: $URL"
REDIRECT_URL=$(curl -w "%{redirect_url}" --silent -o /dev/null "$URL")
while [ "$REDIRECT_URL" != "" ]; do
REDIRECT_URL=$(curl -w "%{redirect_url}" --silent -o /dev/null "$REDIRECT_URL?cache-warmer=1")
COUNT=$((COUNT + 1))
if [ "$COUNT" -gt 5 ]; then
break
fi
done
}

# if we just need to warm the cache for a single node, do that then bail
if [ "$#" -eq 1 ] && [ "$1" != "all" ]; then
process_url "$1?cache-warmer=1"
process_url "$DRUPAL_URL/browse?cache-warmer=1"
process_url "$DRUPAL_URL/collections?cache-warmer=1"

exit 0
fi

# otherwise we're warming the entire site's cache

if [ -f "$LOCK_FILE" ]; then
# TODO: we need a lock mechanism in scyllardiae that can kill running processes
# but for now we can just gate it here
echo "Cache warming is already taking place"
exit 0
fi

touch "$LOCK_FILE"

# how many cURL commands to run in parallel
PARALLEL_EXECUTIONS=3

# Warm everything in the sitemap
BASE_URL="$DRUPAL_URL/sitemap.xml"
PAGE=1

while true; do
NEXT_PAGE_URL="$BASE_URL?page=$PAGE"
STATUS=$(curl -w '%{http_code}' \
--silent \
-o links.xml \
"${NEXT_PAGE_URL}")

if [ "${STATUS}" -eq 200 ]; then
mapfile -t URLS < <(grep -oP '<loc>\K[^<]+' links.xml)
while [ "${#URLS[@]}" -gt 0 ]; do
for ((i = 0; i < PARALLEL_EXECUTIONS; i++)); do
array_length=${#URLS[@]}
if [ "$array_length" -gt 0 ]; then
URL="${URLS[$((array_length-1))]}"
unset "URLS[$((array_length-1))]"
else
break
fi
process_url "$URL?cache-warmer=1" &
job_ids+=($!)
done

for job_id in "${job_ids[@]}"; do
wait "$job_id" || echo "One job failed, but continuing anyway"
done
done

PAGE=$((PAGE + 1))
if [ "$PAGE" -gt 100 ]; then
break
fi
else
break
fi
done

rm -f links.xml

# now that the sitemap is warm, get all the IIIF paged content manifests warm
curl -s "$DRUPAL_URL/api/v1/paged-content" > pc.json
mapfile -t NIDS < <(jq -r '.[]' pc.json)
for NID in "${NIDS[@]}"; do
for ((i = 0; i < PARALLEL_EXECUTIONS; i++)); do
array_length=${#NIDS[@]}
if [ "$array_length" -gt 0 ]; then
NID="${NIDS[$((array_length-1))]}"
unset "NIDS[$((array_length-1))]"
else
break
fi
echo "Crawling: $DRUPAL_URL/node/$NID/book-manifest"
curl -s -o /dev/null "$DRUPAL_URL/node/$NID/book-manifest?cache-warmer=1" &
job_ids+=($!)
done

for job_id in "${job_ids[@]}"; do
wait "$job_id" || echo "One job failed, but continuing anyway"
done
done

rm -f pc.json

rm "$LOCK_FILE"
8 changes: 8 additions & 0 deletions examples/cache-warmer/scyllaridae.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
queueName: islandora-cache-warmer
allowedMimeTypes:
- "*"
cmdByMimeType:
default:
cmd: "/app/cmd.sh"
args:
- "%target"
181 changes: 129 additions & 52 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ package main
import (
"bufio"
"bytes"
"fmt"
"log/slog"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"syscall"
"time"

stomp "github.com/go-stomp/stomp/v3"
scyllaridae "github.com/lehigh-university-libraries/scyllaridae/internal/config"
Expand All @@ -14,7 +21,6 @@ import (

var (
config *scyllaridae.ServerConfig
stop = make(chan bool)
)

func init() {
Expand All @@ -31,8 +37,21 @@ func main() {
// either subscribe to activemq directly
if config.QueueName != "" {
subscribed := make(chan bool)
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)

go RecvStompMessages(config.QueueName, subscribed)
<-subscribed

select {
case <-subscribed:
slog.Info("Subscription to queue successful")
case <-stopChan:
slog.Info("Received stop signal, exiting")
return
}

<-stopChan
slog.Info("Shutting down message listener")
} else {
// or make this an available API ala crayfish
http.HandleFunc("/", MessageHandler)
Expand Down Expand Up @@ -121,82 +140,140 @@ func MessageHandler(w http.ResponseWriter, r *http.Request) {
}

func RecvStompMessages(queueName string, subscribed chan bool) {
defer func() {
stop <- true
}()

var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
// stomp.ConnOpt.Login("guest", "guest"),
stomp.ConnOpt.Host("/"),
defer close(subscribed)
attempt := 0
maxAttempts := 30
for attempt = 0; attempt < maxAttempts; attempt += 1 {
if err := connectAndSubscribe(queueName, subscribed); err != nil {
slog.Error("resubscribing", "error", err)
if err := retryWithExponentialBackoff(attempt, maxAttempts); err != nil {
slog.Error("Failed subscribing after too many failed attempts", "attempts", attempt)
return
}
}
}
}

func connectAndSubscribe(queueName string, subscribed chan bool) error {
addr := os.Getenv("STOMP_SERVER_ADDR")
if addr == "" {
addr = "activemq:61613"
}
conn, err := stomp.Dial("tcp", addr, options...)

c, err := net.Dial("tcp", addr)
if err != nil {
slog.Error("cannot connect to server", "err", err.Error())
return
slog.Error("cannot connect to port", "err", err.Error())
return err
}
tcpConn := c.(*net.TCPConn)

sub, err := conn.Subscribe(queueName, stomp.AckAuto)
err = tcpConn.SetKeepAlive(true)
if err != nil {
slog.Error("cannot subscribe to", queueName, err.Error())
return
slog.Error("cannot set keepalive", "err", err.Error())
return err
}

err = tcpConn.SetKeepAlivePeriod(10 * time.Second)
if err != nil {
slog.Error("cannot set keepalive period", "err", err.Error())
return err
}
close(subscribed)
slog.Info("Server subscriber to", "queue", queueName)

for i := 1; i <= 10; i++ {
msg := <-sub.C
message, err := api.DecodeEventMessage(msg.Body)
conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 0*time.Second))
if err != nil {
slog.Error("cannot connect to stomp server", "err", err.Error())
return err
}
defer func() {
err := conn.Disconnect()
if err != nil {
slog.Error("could not read the event message", "err", err, "msg", string(msg.Body))
continue
}
cmdArgs := map[string]string{
"sourceMimeType": message.Attachment.Content.SourceMimeType,
"destinationMimeType": message.Attachment.Content.DestinationMimeType,
"addtlArgs": message.Attachment.Content.Args,
"target": message.Target,
slog.Error("problem disconnecting from stomp server", "err", err)
}
}()

cmd, err := scyllaridae.BuildExecCommand(cmdArgs, config)
sub, err := conn.Subscribe(queueName, stomp.AckAuto)
if err != nil {
slog.Error("cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
}
defer func() {
if !sub.Active() {
return
}
err := sub.Unsubscribe()
if err != nil {
slog.Error("Error building command", "err", err)
continue
slog.Error("problem unsubscribing", "err", err)
}
}()
slog.Info("Server subscribed to", "queue", queueName)
subscribed <- true

// log stdout for the command as it prints
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("error creating stdout pipe", "err", err)
for msg := range sub.C {
if msg == nil || len(msg.Body) == 0 {
// if the subscription isn't active return so we can try reconnecting
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
}
// else just try reading again. There's probably just no messages in the queue
continue
}
handleStompMessage(msg)
}

// Create a buffer to stream the error output of the command
var stdErr bytes.Buffer
cmd.Stderr = &stdErr
return nil
}

slog.Info("Running command", "cmd", cmd.String())
if err := cmd.Start(); err != nil {
slog.Error("Error starting command", "cmd", cmd.String(), "err", stdErr.String())
continue
}
func handleStompMessage(msg *stomp.Message) {
message, err := api.DecodeEventMessage(msg.Body)
if err != nil {
slog.Error("could not read the event message", "err", err, "msg", string(msg.Body))
return
}

go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
slog.Info("Cmd output", "stdout", scanner.Text())
}
}()
cmdArgs := map[string]string{
"sourceMimeType": message.Attachment.Content.SourceMimeType,
"destinationMimeType": message.Attachment.Content.DestinationMimeType,
"addtlArgs": message.Attachment.Content.Args,
"target": message.Target,
}
cmd, err := scyllaridae.BuildExecCommand(cmdArgs, config)
if err != nil {
slog.Error("Error building command", "err", err)
return
}

runCommand(cmd)
}

if err := cmd.Wait(); err != nil {
slog.Error("command finished with error", "err", stdErr.String())
func runCommand(cmd *exec.Cmd) {
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("error creating stdout pipe", "err", err)
return
}
scanner := bufio.NewScanner(stdout)
go func() {
for scanner.Scan() {
slog.Info("cmd output", "stdout", scanner.Text())
}
}()

var stdErr bytes.Buffer
cmd.Stderr = &stdErr
if err := cmd.Start(); err != nil {
slog.Error("Error starting command", "cmd", cmd.String(), "err", stdErr.String())
return
}
if err := cmd.Wait(); err != nil {
slog.Error("command finished with error", "err", stdErr.String())
}
}

slog.Info("Great success!")
func retryWithExponentialBackoff(attempt int, maxAttempts int) error {
if attempt >= maxAttempts {
return fmt.Errorf("maximum retry attempts reached")
}
wait := time.Duration(rand.Intn(1<<attempt)) * time.Second
time.Sleep(wait)
return nil
}
5 changes: 1 addition & 4 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ cmdByMimeType:

// make sure the command ran
f := "/tmp/stomp.success"
_, err = os.Stat(f)
if err != nil && os.IsNotExist(err) {
t.Errorf("The stomp subscriber not create the expected file %s", f)
}
assert.FileExists(t, f)
}

type Test struct {
Expand Down

0 comments on commit 49a14d9

Please sign in to comment.