Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test-stack/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ services:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
GF_USERS_ALLOW_SIGN_UP: "false"
# Enable anonymous access for annotation API (test-stack only)
GF_AUTH_ANONYMOUS_ENABLED: "true"
GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"
volumes:
- ./grafana/provisioning:/etc/grafana/provisioning:ro
- grafana-data:/var/lib/grafana
Expand Down
43 changes: 40 additions & 3 deletions test-stack/grafana/provisioning/dashboards/kafka-lag.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,43 @@
{
"annotations": {
"list": []
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": false,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Producer Phases",
"target": {
"limit": 100,
"matchAny": true,
"tags": ["producer"],
"type": "tags"
},
"type": "dashboard"
},
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": false,
"iconColor": "rgba(255, 96, 96, 1)",
"name": "Consumer Phases",
"target": {
"limit": 100,
"matchAny": true,
"tags": ["consumer"],
"type": "tags"
},
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
Expand Down Expand Up @@ -1300,8 +1337,8 @@
},
"timepicker": {},
"timezone": "",
"title": "Kafka Consumer Lag",
"uid": "kafka-lag-dashboard",
"title": "Kafka Consumer Lag (with Phase Annotations)",
"uid": "kafka-lag-dashboard-test",
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the dashboard UID from "kafka-lag-dashboard" to "kafka-lag-dashboard-test" may break existing references, bookmarks, or links to this dashboard. If this is intentional to create a separate test dashboard while keeping the original, consider documenting this change. If this is meant to replace the existing dashboard, the UID should remain unchanged to preserve continuity.

Suggested change
"uid": "kafka-lag-dashboard-test",
"uid": "kafka-lag-dashboard",

Copilot uses AI. Check for mistakes.
"version": 1,
"weekStart": ""
}
18 changes: 18 additions & 0 deletions test-stack/scripts/consumer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@ BOOTSTRAP_SERVER="kafka:29092"
TOPIC1="test-topic"
TOPIC2="high-volume-topic"
GROUP_ID="test-consumer-group"
GRAFANA_URL="http://grafana:3000"

# Message counter
MSG_COUNT=0

# Function to create Grafana annotation for phase visibility
annotate() {
local text="$1"
local tags="$2"
curl -s -X POST "$GRAFANA_URL/api/annotations" \
-H "Content-Type: application/json" \
-d "{\"text\": \"$text\", \"tags\": [$tags]}" \
Comment on lines +15 to +21
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotate function is vulnerable to JSON injection. The text parameter is directly interpolated into the JSON payload without proper escaping. If the text contains special characters like double quotes, backslashes, or newlines, it will break the JSON structure or could be exploited to inject arbitrary JSON.

Consider using a tool like jq to properly construct the JSON payload, or at minimum, escape special characters in the text parameter before interpolation.

Suggested change
# Function to create Grafana annotation for phase visibility
annotate() {
local text="$1"
local tags="$2"
curl -s -X POST "$GRAFANA_URL/api/annotations" \
-H "Content-Type: application/json" \
-d "{\"text\": \"$text\", \"tags\": [$tags]}" \
# Helper to escape strings for JSON context
json_escape() {
local str="$1"
# Escape backslashes and double quotes
str=${str//\\/\\\\}
str=${str//\"/\\\"}
# Escape newlines and carriage returns
str=${str//$'\n'/\\n}
str=${str//$'\r'/\\r}
printf '%s' "$str"
}
# Function to create Grafana annotation for phase visibility
annotate() {
local text="$1"
local tags="$2"
local escaped_text
escaped_text=$(json_escape "$text")
curl -s -X POST "$GRAFANA_URL/api/annotations" \
-H "Content-Type: application/json" \
-d "{\"text\": \"$escaped_text\", \"tags\": [$tags]}" \

Copilot uses AI. Check for mistakes.
2>/dev/null || true # Don't fail if Grafana isn't ready
}

echo "Starting consumer with consumer group: $GROUP_ID"
echo "This consumer runs slower than producer to create observable lag"

Expand Down Expand Up @@ -41,30 +52,37 @@ consume_with_rate() {

while true; do
# Phase 1: Slow consumption (creates lag buildup)
annotate "Consumer Phase 1: Slow consumption (lag building)" "\"consumer\", \"phase1\", \"slow\""
echo "[$(date)] Phase 1: Slow consumption - 5 msg with 0.5s delay each"
consume_with_rate $TOPIC1 $GROUP_ID 5 0.5

# Phase 2: Medium consumption
annotate "Consumer Phase 2: Medium consumption" "\"consumer\", \"phase2\""
echo "[$(date)] Phase 2: Medium consumption - 20 msg with 0.2s delay each"
consume_with_rate $TOPIC1 $GROUP_ID 20 0.2

# Phase 3: Fast catch-up
annotate "Consumer Phase 3: Fast catch-up" "\"consumer\", \"phase3\", \"catchup\""
echo "[$(date)] Phase 3: Fast catch-up - 50 msg with 0.05s delay each"
consume_with_rate $TOPIC1 $GROUP_ID 50 0.05

# Phase 4: Consume from high-volume topic (different consumer group)
annotate "Consumer Phase 4: High-volume topic consumption" "\"consumer\", \"phase4\""
echo "[$(date)] Phase 4: High-volume topic consumption"
consume_with_rate $TOPIC2 "high-volume-consumer" 30 0.1

# Phase 5: Pause (lag builds up)
annotate "Consumer Phase 5: Pause for 30s (lag building)" "\"consumer\", \"phase5\", \"pause\""
echo "[$(date)] Phase 5: Consumer pause for 30s (lag building)"
sleep 30

# Phase 6: Burst consumption
annotate "Consumer Phase 6: Burst consumption" "\"consumer\", \"phase6\", \"burst\""
echo "[$(date)] Phase 6: Burst consumption - 100 msg with minimal delay"
consume_with_rate $TOPIC1 $GROUP_ID 100 0.01

# Phase 7: Both topics
annotate "Consumer Phase 7: Both topics consumption" "\"consumer\", \"phase7\""
echo "[$(date)] Phase 7: Both topics consumption"
consume_with_rate $TOPIC1 $GROUP_ID 20 0.1 &
consume_with_rate $TOPIC2 "high-volume-consumer" 40 0.05 &
Expand Down
17 changes: 17 additions & 0 deletions test-stack/scripts/producer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,21 @@
BOOTSTRAP_SERVER="kafka:29092"
TOPIC1="test-topic"
TOPIC2="high-volume-topic"
GRAFANA_URL="http://grafana:3000"

# Message counter
MSG_COUNT=0

# Function to create Grafana annotation for phase visibility
annotate() {
local text="$1"
local tags="$2"
curl -s -X POST "$GRAFANA_URL/api/annotations" \
-H "Content-Type: application/json" \
-d "{\"text\": \"$text\", \"tags\": [$tags]}" \
2>/dev/null || true # Don't fail if Grafana isn't ready
}
Comment on lines +15 to +22
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotate function is vulnerable to JSON injection. The text parameter is directly interpolated into the JSON payload without proper escaping. If the text contains special characters like double quotes, backslashes, or newlines, it will break the JSON structure or could be exploited to inject arbitrary JSON.

Consider using a tool like jq to properly construct the JSON payload, or at minimum, escape special characters in the text parameter before interpolation.

Copilot uses AI. Check for mistakes.

# Function to produce messages
produce_messages() {
local topic=$1
Expand All @@ -34,30 +45,36 @@ echo "This will create lag patterns observable in Grafana"

while true; do
# Phase 1: Steady low rate (10 msg/sec for 30 seconds)
annotate "Producer Phase 1: Low rate 10 msg/sec for 30s" "\"producer\", \"phase1\""
echo "[$(date)] Phase 1: Low rate - 10 msg/sec for 30s"
for ((j=1; j<=30; j++)); do
produce_messages $TOPIC1 10 0.1
done

# Phase 2: Burst - high volume (100 messages quickly)
annotate "Producer Phase 2: Burst 100 messages to test-topic" "\"producer\", \"phase2\", \"burst\""
echo "[$(date)] Phase 2: Burst - 100 messages to $TOPIC1"
produce_messages $TOPIC1 100 0.01

# Phase 3: High volume topic burst
annotate "Producer Phase 3: High volume burst 200 messages" "\"producer\", \"phase3\", \"burst\""
echo "[$(date)] Phase 3: High volume burst - 200 messages to $TOPIC2"
produce_messages $TOPIC2 200 0.005

# Phase 4: Steady medium rate (20 msg/sec for 30 seconds)
annotate "Producer Phase 4: Medium rate 20 msg/sec for 30s" "\"producer\", \"phase4\""
echo "[$(date)] Phase 4: Medium rate - 20 msg/sec for 30s"
for ((j=1; j<=30; j++)); do
produce_messages $TOPIC1 20 0.05
done

# Phase 5: Pause (let consumer catch up)
annotate "Producer Phase 5: Pause for 20s (consumer catch-up)" "\"producer\", \"phase5\", \"pause\""
echo "[$(date)] Phase 5: Pause for 20s (consumer catch-up)"
sleep 20

# Phase 6: Multi-topic burst
annotate "Producer Phase 6: Multi-topic burst" "\"producer\", \"phase6\", \"burst\""
echo "[$(date)] Phase 6: Multi-topic burst"
produce_messages $TOPIC1 50 0.02 &
produce_messages $TOPIC2 100 0.01 &
Expand Down