Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix minute replication job script #348

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion images/replication-job/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ RUN apt-get update && \
nginx \
python3-pip \
python3-venv \
procps && \
procps \
curl && \
rm -rf /var/lib/apt/lists/*

RUN python3 -m venv /opt/venv
Expand Down
89 changes: 60 additions & 29 deletions images/replication-job/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ else
echo JAVACMD_OPTIONS=\"-server -Xmx$memory\" >~/.osmosis
fi

slack_message_count=0
max_slack_messages=2

workingDirectory="/mnt/data"
mkdir -p $workingDirectory

Expand Down Expand Up @@ -49,6 +52,10 @@ function get_current_state_file() {
--file $workingDirectory/state.txt --query="name"
fi
fi
else
echo "File $workingDirectory/state.txt exist in local storage"
echo "File $workingDirectory/state.txt content:"
cat $workingDirectory/state.txt
fi
}

Expand All @@ -70,30 +77,71 @@ function upload_file_cloud() {
fi
}

function send_slack_message() {
# Check if Slack messaging is enabled
if [ "${ENABLE_SEND_SLACK_MESSAGE}" != "true" ]; then
echo "Slack messaging is disabled. Set ENABLE_SEND_SLACK_MESSAGE to true to enable."
return
fi

# Check if the Slack webhook URL is set
if [ -z "${SLACK_WEBHOOK_URL}" ]; then
echo "SLACK_WEBHOOK_URL is not set. Unable to send message to Slack."
return 1
fi

# Limit Slack message count to 3
if [ "$slack_message_count" -ge "$max_slack_messages" ]; then
echo "Max Slack messages limit reached. No further messages will be sent."
return
fi

local message="$1"
curl -X POST -H 'Content-type: application/json' --data "{\"text\": \"$message\"}" "$SLACK_WEBHOOK_URL"
echo "Message sent to Slack: $message"
slack_message_count=$((slack_message_count + 1))
}


function monitor_minute_replication() {
# Function to handle continuous monitoring, minutminutes replication and upload to cloud provider
# Directory to store a log of processed files
# Function to handle continuous monitoring, minute replication, and sequential upload to cloud provider
# Directory to store a log of the last processed file
processed_files_log="$workingDirectory/processed_files.log"
max_log_size_mb=1

while true; do
upload_file_cloud /mnt/data/state.txt
sleep 60s
done &

while true; do
if [ -e "$processed_files_log" ]; then
log_size=$(du -m "$processed_files_log" | cut -f1)
if [ "$log_size" -gt "$max_log_size_mb" ]; then
echo $(date +%F_%H:%M:%S)": Cleaning processed_files_log..." >"$processed_files_log"
fi
for local_minute_file in $(find $workingDirectory/ -cmin -1); do
# Find new .gz files created within the last minute
for local_minute_file in $(find $workingDirectory/ -name "*.gz" -cmin -1); do
if [ -f "$local_minute_file" ]; then
if grep -q "$local_minute_file" "$processed_files_log"; then
continue
echo "Processing $local_minute_file..."
# Ensure the file is uploaded only once
if ! grep -q "$local_minute_file: SUCCESS" "$processed_files_log" && ! grep -q "$local_minute_file: FAILURE" "$processed_files_log"; then
# Verify gz file integrity
if gzip -t "$local_minute_file" 2>/dev/null; then
# Upload the file sequentially
upload_file_cloud $local_minute_file
echo "$local_minute_file: SUCCESS" >>"$processed_files_log"
# Upload and update state.txt after successful upload
upload_file_cloud "$workingDirectory/state.txt"
else
echo $(date +%F_%H:%M:%S)": $local_minute_file is corrupted and will not be uploaded." >>"$processed_files_log"
echo "$local_minute_file: FAILURE" >>"$processed_files_log"
# Ensure state.txt maintains the current ID to regenerate the corrupted file
current_state_id=$(( $(echo "$local_minute_file" | sed 's/[^0-9]//g' | sed 's/^0*//') - 1 ))
sed -i "s/sequenceNumber=.*/sequenceNumber=$current_state_id/" "$workingDirectory/state.txt"
rm "$local_minute_file"
echo "Stopping any existing Osmosis processes..."
pkill -f "osmosis.*--replicate-apidb"
echo "Regenerating $local_minute_file..."
send_slack_message "${ENVIROMENT}: Corrupted file $local_minute_file detected. Regenerating the file..."
generate_replication
fi
fi
upload_file_cloud $local_minute_file
echo "$local_minute_file" >>"$processed_files_log"
fi
done
else
Expand All @@ -120,23 +168,6 @@ function generate_replication() {
workingDirectory=$workingDirectory
}

# function start_nginx() {
# if [ "$STAR_NGINX_SERVER" = "true" ]; then
# echo 'server {
# listen 8080;
# server_name localhost;

# location / {
# root /mnt/data;
# index index.html;
# }
# }' >/etc/nginx/nginx.conf
# service nginx restart
# else
# echo "STAR_NGINX_SERVER is either not set or not set to true."
# fi
# }

######################## Start minutes replication process ########################
get_current_state_file
flag=true
Expand Down
10 changes: 8 additions & 2 deletions osm-seed/templates/jobs/replication-job-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ spec:
- /bin/bash
- -c
- /liveness.sh
initialDelaySeconds: 10
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 10
periodSeconds: 30
failureThreshold: 3
{{- if .Values.replicationJob.resources.enabled }}
resources:
Expand Down Expand Up @@ -78,6 +78,12 @@ spec:
- name: MEMORY_JAVACMD_OPTIONS
value: {{ .Values.replicationJob.resources.requests.memory | default "2Gi" | quote}}
{{- end }}
- name: ENABLE_SEND_SLACK_MESSAGE
value: {{ .Values.replicationJob.env.ENABLE_SEND_SLACK_MESSAGE | quote}}
- name: SLACK_WEBHOOK_URL
value: {{ .Values.replicationJob.env.SLACK_WEBHOOK_URL | quote}}
- name: ENVIROMENT
value: {{ .Values.environment | quote}}
{{- if .Values.replicationJob.nodeSelector.enabled }}
nodeSelector:
{{ .Values.replicationJob.nodeSelector.label_key }} : {{ .Values.replicationJob.nodeSelector.label_value }}
Expand Down
3 changes: 3 additions & 0 deletions osm-seed/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ replicationJob:
image:
name: ""
tag: ""
env:
ENABLE_SEND_SLACK_MESSAGE: "false"
SLACK_WEBHOOK_URL: "null"
resources:
enabled: false
requests:
Expand Down
Loading