Skip to content

Commit

Permalink
specialize zklists
Browse files Browse the repository at this point in the history
  • Loading branch information
terrywbrady committed Mar 9, 2024
1 parent f4b9c84 commit 7cd02b4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src-colladmin/actions/ingest_queue_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require_relative 'zookeeper_action'

# Collection Admin Task class - see config/actions.yml for description
class IngestQueueAction < ZookeeperAction
class IngestQueueAction < IngestQueueZookeeperAction
def initialize(config, action, path, myparams)
@batch = myparams.fetch('batch', '')
@profile = myparams.fetch('profile', '')
Expand Down
120 changes: 90 additions & 30 deletions src-colladmin/actions/zookeeper_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
# Collection Admin Task class - see config/actions.yml for description
class ZkList
def initialize
@jobs = []
@items = []
end

def add_job(job)
@jobs.push(job)
def add_item(item)
@items.push(item)
end

def to_table
table = []
js = @jobs.sort do |a, b|
js = @items.sort do |a, b|
if a.status == b.status
b.date <=> a.date
else
Expand All @@ -30,49 +30,109 @@ def to_table
end
end

class QueueItemReader
@@na = "NA"
def initialize(zk_action, id, payload)
@bytes = payload.nil? ? [] : payload.bytes
@is_json = zk_action.is_json
@status_vals = zk_action.status_vals
@queue_node = zk_action.zk_path
@id = id
end

def status_byte
@bytes.empty? ? 0 : @bytes[0]
end

def status
return @@na if status_byte > @status_vals.length
@status_vals[status_byte]
end

def time
return nil if @bytes.length < 9
# https://stackoverflow.com/a/68855488/3846548
t = @bytes[1..8].inject(0) {|m, b| (m << 8) + b }
Time.at(t/1000)
end

def payload_text
return "" if @bytes.length < 10
@bytes[9..].pack('c*')
end

def payload_object
if @is_json
json = JSON.parse(payload_text)
else
json = {
payload: payload_text
}
end
json['queueNode'] = @queue_node
json['id'] = @id
json['date'] = time
json['status'] = status
json
end

end


class ZookeeperAction < AdminAction
@@status_vals = ['Pending', 'Consumed', 'Deleted', 'Completed', 'Failed', 'Resolved']
def initialize(config, action, path, myparams, filters)
super(config, action, path, myparams)
@filters = {}
@path = '/ingest'
@zk = ZK.new(get_zookeeper_conn)
@jobs = ZkList.new
@items = ZkList.new
end

def zk_path
'/tbd'
end

def status_vals
[]
end

def is_json
false
end

def items
@items
end

def perform_action
@zk.children(@path).each do |cp|
@zk.children(zk_path).each do |cp|
puts cp
arr = @zk.get("#{@path}/#{cp}")
next if arr[0].nil?
data = arr[0].bytes
return if data.length < 9
status = data[0]
# https://stackoverflow.com/a/68855488/3846548
t = data[1..8].inject(0) {|m, b| (m << 8) + b }
time = Time.at(t/1000)
payload=data[9..].pack('c*')
begin
json = JSON.parse(payload)
json['queueNode'] = 'ingest'
json['id'] = cp
json['date'] = time
json['status'] = @@status_vals[status]
puts json.to_json
@jobs.add_job(QueueEntry.new(json))
#puts JSON.pretty_generate(json)
rescue => exception
#puts exception
end
arr = @zk.get("#{zk_path}/#{cp}")
po = QueueItemReader.new(self, cp, arr[0]).payload_object
puts po.to_json
@items.add_item(QueueEntry.new(po))
end
convert_json_to_table('')
end

def table_rows(_body)
@jobs.to_table
items.to_table
end

def get_zookeeper_conn
@config.fetch('zookeeper', '').split(',').first
end
end

class IngestQueueZookeeperAction < ZookeeperAction
def zk_path
'/ingest'
end

def status_vals
['Pending', 'Consumed', 'Deleted', 'Completed', 'Failed', 'Resolved']
end

def is_json
true
end
end

0 comments on commit 7cd02b4

Please sign in to comment.