Skip to content

Commit b8238d6

Browse files
committed
work in prog
1 parent f6d7723 commit b8238d6

9 files changed

+38
-185
lines changed

src-colladmin/actions/access_queue_action.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# Collection Admin Task class - see config/actions.yml for description
77
class AccessQueueAction < ZookeeperListAction
88
def initialize(config, action, path, myparams)
9-
super(config, action, path, myparams, 'admin/queues-acc')
9+
super(config, action, path, myparams)
1010
end
1111

1212
def perform_action

src-colladmin/actions/ingest_batch_action.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
require_relative 'forward_to_ingest_action'
44

55
# Collection Admin Task class - see config/actions.yml for description
6-
class IngestBatchAction < ForwardToIngestAction
6+
class IngestBatchAction < ZookeeperListAction
77
def initialize(config, action, path, myparams)
88
@batch = myparams.fetch('batch', 'no-batch-provided')
99
@batch_obj = Batch.new(@batch)
10-
super(config, action, path, myparams, 'admin/queues')
10+
super(config, action, path, myparams, {batch: @batch})
1111
end
1212

1313
def get_title
@@ -23,7 +23,7 @@ def table_types
2323
end
2424

2525
def table_rows(body)
26-
queue_list = QueueList.new(get_ingest_server, body, { batch: @batch })
26+
queue_list = QueueList.new(@zk, { batch: @batch })
2727
queue_list.jobs.each do |qe|
2828
@batch_obj.add_queue_job(qe)
2929
end

src-colladmin/actions/ingest_batch_folders_action.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ def initialize(config, action, path, myparams)
88
@days = myparams.fetch('days', '7').to_i
99
@days = 60 if @days > 60
1010
super(config, action, path, myparams, "admin/bids/#{@days}")
11+
@zk = ZK.new(get_zookeeper_conn)
12+
ZookeeperListAction.migration_level(@zk)
13+
end
14+
15+
def get_zookeeper_conn
16+
@config.fetch('zookeeper', '')
1117
end
1218

1319
def get_title
@@ -28,7 +34,7 @@ def init_status
2834

2935
def table_rows(body)
3036
bflist = BatchFolderList.new(body)
31-
bflist.apply_queue_list(QueueList.get_queue_list(get_ingest_server))
37+
bflist.apply_queue_list(QueueList.new(@zk))
3238
bflist.apply_recent_ingests(RecentIngests.new(@config, @days))
3339
bflist.to_table
3440
end

src-colladmin/actions/ingest_collection_locks_action.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,21 @@ def initialize(config, action, path, myparams)
1515
@collections = Collections.new(config)
1616

1717
super(config, action, path, myparams, endpoint)
18+
@zk = ZK.new(get_zookeeper_conn)
19+
ZookeeperListAction.migration_level(@zk)
1820
@held_counts = {}
19-
ql = QueueList.get_queue_list(get_ingest_server)
21+
ql = QueueList.new(@zk)
2022
ql.jobs.each do |qe|
2123
next if qe.qstatus != 'Held'
2224

2325
@held_counts[qe.profile] = @held_counts.fetch(qe.profile, 0) + 1
2426
end
2527
end
2628

29+
def get_zookeeper_conn
30+
@config.fetch('zookeeper', '')
31+
end
32+
2733
def specific_profile?
2834
@profile != ''
2935
end

src-colladmin/actions/inventory_queue_action.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# Collection Admin Task class - see config/actions.yml for description
77
class InventoryQueueAction < ZookeeperListAction
88
def initialize(config, action, path, myparams)
9-
super(config, action, path, myparams, 'admin/queues-inv')
9+
super(config, action, path, myparams)
1010
end
1111

1212
def perform_action

src-colladmin/actions/zookeeper_action.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,20 @@ def to_table
3333

3434
## Base class for actions that interact directly with Zookeeper using the mrt-zk library
3535
class ZookeeperListAction < AdminAction
36-
def initialize(config, action, path, myparams, _filters)
36+
def initialize(config, action, path, myparams, _filters = {})
3737
super(config, action, path, myparams)
3838
@filters = {}
3939
@zk = ZK.new(get_zookeeper_conn)
40+
ZookeeperListAction.migration_level(@zk)
4041
@items = ZkList.new
41-
migration_level
4242
end
4343

44-
def migration_level
44+
def self.migration_level(zk)
4545
return unless $migration.nil?
4646

4747
$migration = []
48-
$migration << :m1 if @zk.exists?('/migration/m1')
49-
$migration << :m3 if @zk.exists?('/migration/m3')
48+
$migration << :m1 if zk.exists?('/migration/m1')
49+
$migration << :m3 if zk.exists?('/migration/m3')
5050
end
5151

5252
def self.migration_m1?
@@ -85,6 +85,7 @@ class ZookeeperAction < AdminAction
8585
def initialize(config, action, path, myparams)
8686
super(config, action, path, myparams)
8787
@zk = ZK.new(get_zookeeper_conn)
88+
ZookeeperListAction.migration_level(@zk)
8889
@qpath = myparams.fetch('queue-path', '')
8990
end
9091

src-colladmin/config/actions.yml

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -904,20 +904,6 @@ createProfile/sla:
904904
documentation: |
905905
Ingest: POST /admin/profile/sla
906906
migration: m5
907-
queue-delete:
908-
link-title: Remove an item from a Zookeeper queue
909-
class: QueueAction
910-
params: ["admin/deleteq"]
911-
category: Queue Management
912-
sensitivity: irreversible change
913-
testing: manual
914-
description: |
915-
Remove an item from a Zookeeper queue.
916-
report-datatypes:
917-
- qdelete
918-
documentation: |
919-
Ingest: POST /admin/deleteq
920-
migration: m1, m2, m3
921907
queue-delete-mrtzk:
922908
link-title: Remove an item from a Zookeeper queue
923909
class: ZkDeleteM1Action
@@ -944,20 +930,6 @@ queue-delete-legacy:
944930
documentation: |
945931
Zk: tbd
946932
migration: m1
947-
requeue:
948-
link-title: Re-queue an item from a Zookeeper queue
949-
class: QueueAction
950-
params: ["admin/requeue"]
951-
category: Queue Management
952-
sensitivity: irreversible change
953-
testing: manual
954-
description: |
955-
Re-queue an item from a Zookeeper queue.
956-
report-datatypes:
957-
- requeue
958-
documentation: |
959-
Ingest: POST /admin/requeue
960-
migration: m1, m2, m3
961933
requeue-mrtzk:
962934
link-title: Re-queue an item from a Zookeeper queue
963935
class: ZkRequeueM1Action
@@ -984,20 +956,6 @@ requeue-legacy:
984956
documentation: |
985957
ZK tbd
986958
migration: m1
987-
hold-queue-item:
988-
link-title: Move a pending item from in Zookeeper queue to a Held status
989-
class: QueueAction
990-
params: ["admin/hold"]
991-
category: Queue Management
992-
sensitivity: irreversible change
993-
testing: manual
994-
description: |
995-
Move a pending item from in Zookeeper queue to a Held status.
996-
report-datatypes:
997-
- hold
998-
documentation: |
999-
Ingest: POST /admin/hold
1000-
migration: m1, m2
1001959
hold-queue-item-mrtzk:
1002960
link-title: Move a pending item from in Zookeeper queue to a Held status
1003961
class: ZkHoldM1Action
@@ -1024,20 +982,6 @@ hold-queue-item-legacy:
1024982
documentation: |
1025983
Zk: tbd
1026984
migration: m1
1027-
release-queue-item:
1028-
link-title: Release a held item from in Zookeeper queue to a Pending status
1029-
class: QueueAction
1030-
params: ["admin/release"]
1031-
category: Queue Management
1032-
sensitivity: irreversible change
1033-
testing: manual
1034-
description: |
1035-
Release a held item from in Zookeeper queue to a Pending status.
1036-
report-datatypes:
1037-
- release
1038-
documentation: |
1039-
Ingest: POST /admin/release
1040-
migration: m1, m2
1041985
release-queue-item-mrtzk:
1042986
link-title: Release a held item from in Zookeeper queue to a Pending status
1043987
class: ZkReleaseM1Action

src-colladmin/lib/acc_queue.rb

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -121,70 +121,4 @@ def status
121121
def date
122122
get_value(:date)
123123
end
124-
end
125-
126-
# access queue
127-
class AccQueue < MerrittJson
128-
def initialize(queue_list, body)
129-
data = JSON.parse(body)
130-
data = fetch_hash_val(data, 'que:queueState')
131-
data = fetch_hash_val(data, 'que:queueEntries')
132-
list = fetch_array_val(data, 'que:queueEntryState')
133-
list.each do |obj|
134-
q = AccQueueEntry.new(obj)
135-
queue_list.tokens.append(q)
136-
end
137-
super()
138-
end
139-
end
140-
141-
# list of available access queues
142-
class AccQueueList < MerrittJson
143-
def initialize(ingest_server, body, _filter = {})
144-
super()
145-
@ingest_server = ingest_server
146-
@body = body
147-
@tokens = []
148-
retrieve_queues
149-
end
150-
151-
def self.get_queue_list(ingest_server, filter = {})
152-
qjson = HttpGetJson.new(ingest_server, 'admin/queues-acc')
153-
AccQueueList.new(ingest_server, qjson.body, filter)
154-
end
155-
156-
def retrieve_queues
157-
data = JSON.parse(@body)
158-
data = fetch_hash_val(data, 'ingq:ingestQueueNameState')
159-
data = fetch_hash_val(data, 'ingq:ingestQueueName')
160-
fetch_array_val(data, 'ingq:ingestQueue').each do |qjson|
161-
node = fetch_hash_val(qjson, 'ingq:node')
162-
begin
163-
qjson = HttpGetJson.new(@ingest_server, "admin/queue-acc#{node}")
164-
next unless qjson.status == 200
165-
166-
AccQueue.new(self, qjson.body)
167-
rescue StandardError => e
168-
LambdaBase.log(e.message)
169-
LambdaBase.log(e.backtrace)
170-
end
171-
end
172-
end
173-
174-
attr_reader :tokens, :body
175-
176-
def to_table
177-
table = []
178-
ts = @tokens.sort do |a, b|
179-
if a.status == b.status
180-
b.date <=> a.date
181-
else
182-
AdminTask.status_sort_val(a.status) <=> AdminTask.status_sort_val(b.status)
183-
end
184-
end
185-
ts.each_with_index do |q, _i|
186-
table.append(q.to_table_row)
187-
end
188-
table
189-
end
190-
end
124+
end

src-colladmin/lib/queue.rb

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -216,64 +216,26 @@ def num_jobs
216216
end
217217
end
218218

219-
# representation of the ingest queue
220-
class IngestQueue < MerrittJson
221-
def initialize(queue_list, body)
222-
data = JSON.parse(body)
223-
data = fetch_hash_val(data, 'que:queueState')
224-
data = fetch_hash_val(data, 'que:queueEntries')
225-
list = fetch_array_val(data, 'que:queueEntryState')
226-
list.each do |obj|
227-
q = QueueEntry.new(obj)
228-
next unless q.check_filter(queue_list.filter)
229-
230-
qenrtylist = queue_list.batches.fetch(q.bid, QueueBatch.new(q.bid, q.user))
231-
qenrtylist.add_job(q)
232-
queue_list.batches[q.bid] = qenrtylist
233-
queue_list.jobs.append(q)
234-
235-
# next if q.qstatus == "Completed" || q.qstatus == "Deleted"
236-
k = "#{q.profile},#{q.qstatus}"
237-
queue_list.profiles[k] = queue_list.profiles.fetch(k, [])
238-
queue_list.profiles[k].append(q)
239-
super()
240-
end
241-
end
242-
end
243-
244219
# List of queues of a particular type - ingest once had separate queues for each worker
245220
class QueueList < MerrittJson
246-
def initialize(ingest_server, body, filter = {})
221+
def initialize(zk, filter = {})
247222
super()
248-
@ingest_server = ingest_server
249-
@body = body
250223
@batches = {}
251224
@jobs = []
252225
@profiles = {}
253226
@filter = filter
254-
retrieve_queues
255-
end
256-
257-
def self.get_queue_list(ingest_server, filter = {})
258-
qjson = HttpGetJson.new(ingest_server, 'admin/queues')
259-
QueueList.new(ingest_server, qjson.body, filter)
260-
end
261227

262-
def retrieve_queues
263-
data = JSON.parse(@body)
264-
data = fetch_hash_val(data, 'ingq:ingestQueueNameState')
265-
data = fetch_hash_val(data, 'ingq:ingestQueueName')
266-
fetch_array_val(data, 'ingq:ingestQueue').each do |qjson|
267-
node = fetch_hash_val(qjson, 'ingq:node').gsub(%r{^/}, '')
268-
begin
269-
qjson = HttpGetJson.new(@ingest_server, "admin/queue/#{node}")
270-
next unless qjson.status == 200
271-
272-
IngestQueue.new(self, qjson.body)
273-
rescue StandardError => e
274-
LambdaBase.log(e.message)
275-
LambdaBase.log(e.backtrace)
276-
end
228+
jobs = ZookeeperListAction.migration_m1? ? MerrittZK::Job.list_jobs(zk) : MerrittZK::LegacyIngestJob.list_jobs(zk)
229+
jobs.each do |j|
230+
job = QueueEntry.new(j)
231+
@jobs << job
232+
qb = @batches.fetch(job.bid, QueueBatch.new(job.bid, job.user))
233+
qb.add_job(job)
234+
@batches[job.bid] = qb
235+
236+
k = "#{job.profile},#{job.qstatus}"
237+
profiles[k] = profiles.fetch(k, [])
238+
profiles[k].append(job)
277239
end
278240
end
279241

0 commit comments

Comments
 (0)