Skip to content

Commit 7d7cce3

Browse files
committed
additional zk action handling
1 parent b8238d6 commit 7d7cce3

12 files changed

+67
-99
lines changed

src-colladmin/actions/access_queue_action.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55

66
# Collection Admin Task class - see config/actions.yml for description
77
class AccessQueueAction < ZookeeperListAction
8-
def initialize(config, action, path, myparams)
9-
super(config, action, path, myparams)
10-
end
11-
128
def perform_action
139
jobs = []
1410
if ZookeeperListAction.migration_m3?

src-colladmin/actions/forward_to_ingest_action.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ class ForwardToIngestAction < AdminAction
99
def initialize(config, action, path, myparams, endpoint)
1010
super(config, action, path, myparams)
1111
@endpoint = endpoint
12+
13+
@zk = ZK.new(get_zookeeper_conn)
14+
ZookeeperListAction.migration_level(@zk)
15+
end
16+
17+
def get_zookeeper_conn
18+
@config.fetch('zookeeper', '')
1219
end
1320

1421
def get_body

src-colladmin/actions/ingest_batch_action.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class IngestBatchAction < ZookeeperListAction
77
def initialize(config, action, path, myparams)
88
@batch = myparams.fetch('batch', 'no-batch-provided')
99
@batch_obj = Batch.new(@batch)
10-
super(config, action, path, myparams, {batch: @batch})
10+
super(config, action, path, myparams, { batch: @batch })
1111
end
1212

1313
def get_title
@@ -22,7 +22,7 @@ def table_types
2222
@batch_obj.table_types
2323
end
2424

25-
def table_rows(body)
25+
def table_rows(_body)
2626
queue_list = QueueList.new(@zk, { batch: @batch })
2727
queue_list.jobs.each do |qe|
2828
@batch_obj.add_queue_job(qe)

src-colladmin/actions/ingest_batch_folders_action.rb

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@ def initialize(config, action, path, myparams)
88
@days = myparams.fetch('days', '7').to_i
99
@days = 60 if @days > 60
1010
super(config, action, path, myparams, "admin/bids/#{@days}")
11-
@zk = ZK.new(get_zookeeper_conn)
12-
ZookeeperListAction.migration_level(@zk)
13-
end
14-
15-
def get_zookeeper_conn
16-
@config.fetch('zookeeper', '')
1711
end
1812

1913
def get_title

src-colladmin/actions/ingest_collection_locks_action.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ def initialize(config, action, path, myparams)
1515
@collections = Collections.new(config)
1616

1717
super(config, action, path, myparams, endpoint)
18-
@zk = ZK.new(get_zookeeper_conn)
19-
ZookeeperListAction.migration_level(@zk)
2018
@held_counts = {}
2119
ql = QueueList.new(@zk)
2220
ql.jobs.each do |qe|

src-colladmin/actions/inventory_queue_action.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55

66
# Collection Admin Task class - see config/actions.yml for description
77
class InventoryQueueAction < ZookeeperListAction
8-
def initialize(config, action, path, myparams)
9-
super(config, action, path, myparams)
10-
end
11-
128
def perform_action
139
jobs = ZookeeperListAction.migration_m1? ? [] : MerrittZK::LegacyInventoryJob.list_jobs(@zk)
1410
jobs.each do |po|

src-colladmin/actions/queue_action.rb

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@
44
require_relative 'forward_to_ingest_action'
55
require_relative '../lib/http_post_json'
66

7-
# Collection Admin Task class - see config/actions.yml for description
8-
class QueueAction < PostToIngestAction
9-
def initialize(config, action, path, myparams, endpoint)
10-
qp = CGI.unescape(myparams.fetch('queue-path', 'na'))
11-
super(config, action, path, myparams, "#{endpoint}#{qp}")
12-
end
13-
end
14-
157
# Collection Admin Task class - see config/actions.yml for description
168
class CollQueueAction < PostToIngestAction
179
def initialize(config, action, path, myparams, endpoint)
@@ -22,58 +14,74 @@ def initialize(config, action, path, myparams, endpoint)
2214
end
2315

2416
# Collection Admin Task class - see config/actions.yml for description
25-
class CollIterateQueueAction < PostToIngestAction
26-
def initialize(config, action, path, myparams, endpoint)
27-
coll = myparams.fetch('coll', '')
28-
@it_endpoint = endpoint.gsub(/coll$/, coll) unless coll.empty?
29-
super(config, action, path, myparams, 'admin/queues')
17+
class CollIterateQueueAction < ZookeeperAction
18+
def initialize(config, action, path, myparams)
19+
super(config, action, path, myparams)
20+
@coll = myparams.fetch('coll', '')
3021
end
3122

3223
def perform_action
33-
resp = { status: 500 }
34-
data = JSON.parse(get_body)
35-
data = data.fetch('ingq:ingestQueueNameState', {})
36-
data = data.fetch('ingq:ingestQueueName', {})
37-
data.fetch('ingq:ingestQueue', []).each do |qjson|
38-
node = qjson.fetch('ingq:node', '')
39-
next if node.empty?
40-
41-
begin
42-
resp = HttpPostJson.new(get_ingest_server, @it_endpoint.gsub('queue', node))
43-
rescue StandardError => e
44-
log(e.message)
45-
log(e.backtrace)
24+
if ZookeeperListAction.migration_m1?
25+
ql = QueueList.new(@zk, { deletable: true })
26+
puts "PROF: #{ql.profiles.keys}"
27+
else
28+
MerrittZK::LegacyIngestJob.list_jobs(@zk) do |job|
29+
puts "TEST COLL #{@coll}: #{job}"
4630
end
4731
end
4832
{ message: 'queue release submitted' }.to_json
4933
end
5034
end
5135

5236
# Collection Admin Task class - see config/actions.yml for description
53-
class IterateQueueAction < PostToIngestAction
54-
def initialize(config, action, path, myparams, endpoint)
37+
class IterateQueueAction < ZookeeperAction
38+
def initialize(config, action, path, myparams)
39+
super(config, action, path, myparams)
5540
@queue = myparams.fetch('queue', 'na')
5641
@reload_path = myparams.fetch('reload_path', 'na')
57-
@it_endpoint = endpoint
58-
super(config, action, path, myparams, "admin/#{@queue}")
42+
end
43+
44+
def legacy_delete(job)
45+
status = job.fetch(:status, '')
46+
path = job.fetch(:path, '')
47+
return unless %w[Completed Deleted].include?(status)
48+
return if path.empty?
49+
50+
@zk.delete(path)
5951
end
6052

6153
def perform_action
62-
resp = { status: 500 }
63-
data = JSON.parse(get_body)
64-
data = data.fetch('ingq:ingestQueueNameState', {})
65-
data = data.fetch('ingq:ingestQueueName', {})
66-
MerrittJson.json_fetch_array_val(data, 'ingq:ingestQueue').each do |qjson|
67-
node = qjson.fetch('ingq:node', '')
68-
next if node.empty?
54+
if @queue == 'queues-acc' && ZookeeperListAction.migration_m3?
55+
MerrittZK::Access.list_jobs(@zk).each do |job|
56+
qn = job.fetch(:queueNode, MerrittZK::Access::SMALL).gsub(%r{^/access/}, '')
57+
j = MerrittZK::Access.new(qn, job.fetch(:id, ''))
58+
j.load(@zk)
59+
next unless j.status.deletable?
60+
61+
j.delete(@zk)
62+
end
63+
elsif @queue == 'queues-acc'
64+
MerrittZK::LegacyAccessJob.list_jobs(@zk).each do |job|
65+
legacy_delete(job)
66+
end
67+
elsif @queue == 'queues-inv' && ZookeeperListAction.migration_m1?
68+
# no action
69+
elsif @queue == 'queues-inv'
70+
MerrittZK::LegacyInventoryJob.list_jobs(@zk).each do |job|
71+
legacy_delete(job)
72+
end
73+
elsif ZookeeperListAction.migration_m1?
74+
ql = QueueList.new(@zk, { deletable: true })
75+
ql.batches.each_key do |bid|
76+
batch = MerrittZK::Batch.find_batch_by_uuid(@zk, bid)
77+
batch.load(@zk)
78+
next unless batch.status.deletable?
6979

70-
begin
71-
endpt = @it_endpoint.gsub('queue', node).gsub(%r{//}, '/')
72-
resp = HttpPostJson.new(get_ingest_server, endpt)
73-
return { message: "Status #{resp.status} for #{endpt}" }.to_json unless resp.status == 200
74-
rescue StandardError => e
75-
log(e.message)
76-
log(e.backtrace)
80+
batch.delete(@zk)
81+
end
82+
else
83+
MerrittZK::LegacyIngestJob.list_jobs(@zk).each do |job|
84+
legacy_delete(job)
7785
end
7886
end
7987
{

src-colladmin/actions/zookeeper_action.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ def initialize(config, action, path, myparams, _filters = {})
4242
end
4343

4444
def self.migration_level(zk)
45-
return unless $migration.nil?
46-
4745
$migration = []
4846
$migration << :m1 if zk.exists?('/migration/m1')
4947
$migration << :m3 if zk.exists?('/migration/m3')

src-colladmin/config/actions.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,6 @@ release-queue-item-legacy:
10111011
cleanup-queue:
10121012
link-title: Cleanup Deleted and Completed items from a Zookeeper Queue
10131013
class: IterateQueueAction
1014-
params: ["admin/cleanupq/queue"]
10151014
category: Queue Management
10161015
sensitivity: irreversible change
10171016
testing: manual

src-colladmin/lib/acc_queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,4 @@ def status
121121
def date
122122
get_value(:date)
123123
end
124-
end
124+
end

src-colladmin/lib/inv_queue.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def self.table_types
8181
type = 'status' if sym == :status
8282
type = 'datetime' if sym == :date
8383
if ZookeeperListAction.migration_m1?
84+
# under m1 migration, there is no INV queue
8485
type = 'qdelete-mrtzk' if sym == :qdelete
8586
type = 'requeue-mrtzk' if sym == :requeue
8687
else

src-colladmin/lib/queue_json.rb

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,13 @@ def path
1212
"#{get_queue_node}/#{get_value(:queueId, '')}"
1313
end
1414

15-
def get_queue_path(requeue: false)
15+
def get_del_queue_path_m1
1616
st = get_value(:qstatus, '')
17-
case st
18-
when 'Consumed'
19-
st = 'consume'
20-
when 'Held'
21-
return '' if requeue
22-
23-
st = 'held'
24-
when 'Completed'
25-
return '' if requeue
26-
27-
st = 'complete'
28-
when 'Failed'
29-
st = 'fail'
17+
if ZookeeperListAction.migration_m1?
18+
return '' unless %w[Failed Held].include?(st)
3019
else
31-
return ''
20+
return '' unless %w[Failed Completed Held].include?(st)
3221
end
33-
"#{get_queue_node}/#{get_value(:queueId, '')}/#{st}"
34-
end
35-
36-
def get_del_queue_path_m1
37-
st = get_value(:qstatus, '')
38-
return '' unless %w[Failed Completed Held].include?(st)
3922

4023
path
4124
end
@@ -47,18 +30,6 @@ def get_requeue_path_m1
4730
path
4831
end
4932

50-
def get_hold_path(release: false)
51-
st = get_value(:qstatus, '')
52-
if st == 'Held' && release
53-
'release'
54-
elsif st == 'Pending' && !release
55-
'hold'
56-
else
57-
return ''
58-
end
59-
path
60-
end
61-
6233
def get_hold_path_m1
6334
st = get_value(:qstatus, '')
6435
return '' unless %w[Pending].include?(st)

0 commit comments

Comments
 (0)