From 7cd02b40d0d1cb98cefd21e1488c9c9014dab1fe Mon Sep 17 00:00:00 2001 From: Terry Brady Date: Fri, 8 Mar 2024 17:41:45 -0800 Subject: [PATCH] specialize zklists --- src-colladmin/actions/ingest_queue_action.rb | 2 +- src-colladmin/actions/zookeeper_action.rb | 120 ++++++++++++++----- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/src-colladmin/actions/ingest_queue_action.rb b/src-colladmin/actions/ingest_queue_action.rb index 43cdbf2e..0fe595fd 100644 --- a/src-colladmin/actions/ingest_queue_action.rb +++ b/src-colladmin/actions/ingest_queue_action.rb @@ -3,7 +3,7 @@ require_relative 'zookeeper_action' # Collection Admin Task class - see config/actions.yml for description -class IngestQueueAction < ZookeeperAction +class IngestQueueAction < IngestQueueZookeeperAction def initialize(config, action, path, myparams) @batch = myparams.fetch('batch', '') @profile = myparams.fetch('profile', '') diff --git a/src-colladmin/actions/zookeeper_action.rb b/src-colladmin/actions/zookeeper_action.rb index da85cc50..36a2a380 100644 --- a/src-colladmin/actions/zookeeper_action.rb +++ b/src-colladmin/actions/zookeeper_action.rb @@ -7,16 +7,16 @@ # Collection Admin Task class - see config/actions.yml for description class ZkList def initialize - @jobs = [] + @items = [] end - def add_job(job) - @jobs.push(job) + def add_item(item) + @items.push(item) end def to_table table = [] - js = @jobs.sort do |a, b| + js = @items.sort do |a, b| if a.status == b.status b.date <=> a.date else @@ -30,49 +30,109 @@ def to_table end end +class QueueItemReader + @@na = "NA" + def initialize(zk_action, id, payload) + @bytes = payload.nil? ? [] : payload.bytes + @is_json = zk_action.is_json + @status_vals = zk_action.status_vals + @queue_node = zk_action.zk_path + @id = id + end + + def status_byte + @bytes.empty? ? 0 : @bytes[0] + end + + def status + return @@na if status_byte > @status_vals.length + @status_vals[status_byte] + end + + def time + return nil if @bytes.length < 9 + # https://stackoverflow.com/a/68855488/3846548 + t = @bytes[1..8].inject(0) {|m, b| (m << 8) + b } + Time.at(t/1000) + end + + def payload_text + return "" if @bytes.length < 10 + @bytes[9..].pack('c*') + end + + def payload_object + if @is_json + json = JSON.parse(payload_text) + else + json = { + payload: payload_text + } + end + json['queueNode'] = @queue_node + json['id'] = @id + json['date'] = time + json['status'] = status + json + end + +end + + class ZookeeperAction < AdminAction - @@status_vals = ['Pending', 'Consumed', 'Deleted', 'Completed', 'Failed', 'Resolved'] def initialize(config, action, path, myparams, filters) super(config, action, path, myparams) @filters = {} - @path = '/ingest' @zk = ZK.new(get_zookeeper_conn) - @jobs = ZkList.new + @items = ZkList.new + end + + def zk_path + '/tbd' + end + + def status_vals + [] + end + + def is_json + false + end + + def items + @items end def perform_action - @zk.children(@path).each do |cp| + @zk.children(zk_path).each do |cp| puts cp - arr = @zk.get("#{@path}/#{cp}") - next if arr[0].nil? - data = arr[0].bytes - return if data.length < 9 - status = data[0] - # https://stackoverflow.com/a/68855488/3846548 - t = data[1..8].inject(0) {|m, b| (m << 8) + b } - time = Time.at(t/1000) - payload=data[9..].pack('c*') - begin - json = JSON.parse(payload) - json['queueNode'] = 'ingest' - json['id'] = cp - json['date'] = time - json['status'] = @@status_vals[status] - puts json.to_json - @jobs.add_job(QueueEntry.new(json)) - #puts JSON.pretty_generate(json) - rescue => exception - #puts exception - end + arr = @zk.get("#{zk_path}/#{cp}") + po = QueueItemReader.new(self, cp, arr[0]).payload_object + puts po.to_json + @items.add_item(QueueEntry.new(po)) end convert_json_to_table('') end def table_rows(_body) - @jobs.to_table + items.to_table end def get_zookeeper_conn @config.fetch('zookeeper', '').split(',').first end end + +class IngestQueueZookeeperAction < ZookeeperAction + def zk_path + '/ingest' + end + + def status_vals + ['Pending', 'Consumed', 'Deleted', 'Completed', 'Failed', 'Resolved'] + end + + def is_json + true + end +end \ No newline at end of file