-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathbatch-concurrency-cleaner.sh
More file actions
executable file
·180 lines (155 loc) · 4.68 KB
/
batch-concurrency-cleaner.sh
File metadata and controls
executable file
·180 lines (155 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/bin/bash
#
# Batch Queue Concurrency Cleaner
#
# Detects and cleans up stale concurrency entries that block batch processing.
# This is a workaround for a bug where visibility timeout reclaims don't release concurrency.
#
# Uses a Lua script for ATOMIC detection of stale entries - no race conditions.
#
# Usage:
# ./batch-concurrency-cleaner.sh --read-redis <url> --write-redis <url> [--delay <seconds>] [--dry-run]
#
set -e
# Defaults
DELAY=10
DRY_RUN=false
READ_REDIS=""
WRITE_REDIS=""
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--read-redis)
READ_REDIS="$2"
shift 2
;;
--write-redis)
WRITE_REDIS="$2"
shift 2
;;
--delay)
DELAY="$2"
shift 2
;;
--dry-run)
DRY_RUN=true
shift
;;
-h|--help)
echo "Usage: $0 --read-redis <url> --write-redis <url> [--delay <seconds>] [--dry-run]"
exit 0
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
if [[ -z "$READ_REDIS" ]] || [[ -z "$WRITE_REDIS" ]]; then
echo "Error: --read-redis and --write-redis are required"
exit 1
fi
echo "Batch Queue Concurrency Cleaner (Atomic Version)"
echo "================================================="
echo "Read Redis: ${READ_REDIS:0:30}..."
echo "Write Redis: ${WRITE_REDIS:0:30}..."
echo "Delay: ${DELAY}s"
echo "Dry run: $DRY_RUN"
echo ""
rcli_read() {
redis-cli -u "$READ_REDIS" --no-auth-warning "$@" 2>/dev/null
}
rcli_write() {
redis-cli -u "$WRITE_REDIS" --no-auth-warning "$@" 2>/dev/null
}
# Lua script that ATOMICALLY checks for stale concurrency entries
# KEYS[1] = concurrency key to check
# KEYS[2-13] = in-flight data hash keys for shards 0-11
# Returns: list of stale messageIds (not in any in-flight hash)
FIND_STALE_LUA='
local concurrency_key = KEYS[1]
local stale = {}
-- Get all members of the concurrency set
local members = redis.call("SMEMBERS", concurrency_key)
for _, msg_id in ipairs(members) do
local found = false
-- Check each in-flight shard (KEYS[2] through KEYS[13])
for i = 2, 13 do
if redis.call("HEXISTS", KEYS[i], msg_id) == 1 then
found = true
break
end
end
if not found then
table.insert(stale, msg_id)
end
end
return stale
'
# Build the in-flight keys array (used in every Lua call)
INFLIGHT_KEYS="engine:batch:inflight:0:data"
for shard in 1 2 3 4 5 6 7 8 9 10 11; do
INFLIGHT_KEYS="$INFLIGHT_KEYS engine:batch:inflight:$shard:data"
done
# Main loop
while true; do
ts=$(date '+%H:%M:%S')
# Get master queue total and in-flight count for status display
master_total=0
for i in 0 1 2 3 4 5 6 7 8 9 10 11; do
count=$(rcli_read ZCARD "engine:batch:master:$i")
master_total=$((master_total + count))
done
inflight_total=0
for i in 0 1 2 3 4 5 6 7 8 9 10 11; do
count=$(rcli_read HLEN "engine:batch:inflight:$i:data")
inflight_total=$((inflight_total + count))
done
# Scan for concurrency keys
cursor=0
total_stale=0
cleaned_tenants=0
while true; do
scan_output=$(rcli_read SCAN $cursor MATCH 'engine:batch:concurrency:tenant:*' COUNT 1000)
cursor=$(echo "$scan_output" | head -1)
keys=$(echo "$scan_output" | tail -n +2)
while IFS= read -r conc_key; do
[[ -z "$conc_key" ]] && continue
# ATOMIC check: Run Lua script to find stale entries
# 13 keys total: 1 concurrency key + 12 in-flight keys
stale_ids=$(rcli_read EVAL "$FIND_STALE_LUA" 13 "$conc_key" $INFLIGHT_KEYS)
# Count stale entries
stale_count=0
stale_array=()
while IFS= read -r stale_id; do
[[ -z "$stale_id" ]] && continue
stale_array+=("$stale_id")
stale_count=$((stale_count + 1))
done <<< "$stale_ids"
if [[ $stale_count -gt 0 ]]; then
tenant="${conc_key#engine:batch:concurrency:tenant:}"
total_stale=$((total_stale + stale_count))
if [[ "$DRY_RUN" == "true" ]]; then
echo "[$ts] STALE (dry-run): $tenant ($stale_count entries)"
for sid in "${stale_array[@]}"; do
echo " - $sid"
done
else
# Remove each stale entry individually with SREM (idempotent, safe)
for sid in "${stale_array[@]}"; do
rcli_write SREM "$conc_key" "$sid" >/dev/null
done
echo "[$ts] CLEANED: $tenant ($stale_count stale entries removed)"
cleaned_tenants=$((cleaned_tenants + 1))
fi
fi
done <<< "$keys"
[[ "$cursor" == "0" ]] && break
done
if [[ "$DRY_RUN" == "true" ]]; then
echo "[$ts] in-flight=$inflight_total master-queue=$master_total stale-found=$total_stale"
else
echo "[$ts] in-flight=$inflight_total master-queue=$master_total cleaned=$cleaned_tenants"
fi
sleep "$DELAY"
done