@@ -31,55 +31,6 @@ def to_table
31
31
end
32
32
end
33
33
34
- class QueueItemReader
35
- @@na = "NA"
36
- def initialize ( zk_action , id , payload )
37
- @bytes = payload . nil? ? [ ] : payload . bytes
38
- @is_json = zk_action . is_json
39
- @status_vals = zk_action . status_vals
40
- @queue_node = zk_action . zk_path
41
- @id = id
42
- end
43
-
44
- def status_byte
45
- @bytes . empty? ? 0 : @bytes [ 0 ]
46
- end
47
-
48
- def status
49
- return @@na if status_byte > @status_vals . length
50
- @status_vals [ status_byte ]
51
- end
52
-
53
- def time
54
- return nil if @bytes . length < 9
55
- # https://stackoverflow.com/a/68855488/3846548
56
- t = @bytes [ 1 ..8 ] . inject ( 0 ) { |m , b | ( m << 8 ) + b }
57
- Time . at ( t /1000 )
58
- end
59
-
60
- def payload_text
61
- return "" if @bytes . length < 10
62
- @bytes [ 9 ..] . pack ( 'c*' )
63
- end
64
-
65
- def payload_object
66
- if @is_json
67
- json = JSON . parse ( payload_text )
68
- else
69
- json = {
70
- payload : payload_text
71
- }
72
- end
73
- json [ 'queueNode' ] = @queue_node
74
- json [ 'id' ] = @id
75
- json [ 'date' ] = time
76
- json [ 'status' ] = status
77
- json
78
- end
79
-
80
- end
81
-
82
-
83
34
class ZookeeperAction < AdminAction
84
35
def initialize ( config , action , path , myparams , filters )
85
36
super ( config , action , path , myparams )
@@ -88,6 +39,15 @@ def initialize(config, action, path, myparams, filters)
88
39
@items = ZkList . new
89
40
end
90
41
42
+ def migration_level
43
+ return :m1 if @zk . exists? ( "/migration/m1" )
44
+ :none
45
+ end
46
+
47
+ def migration_m1?
48
+ migration_level == :m1
49
+ end
50
+
91
51
def zk_path
92
52
'/tbd'
93
53
end
@@ -109,17 +69,10 @@ def register_item(item)
109
69
end
110
70
111
71
def perform_action
112
- jobs = MerrittZK ::LegacyIngestJob . list_jobs ( @zk )
72
+ jobs = migration_m1? ? MerrittZK :: Job . list_jobs ( @zk ) : MerrittZK ::LegacyIngestJob . list_jobs ( @zk )
113
73
jobs . each do |po |
114
74
register_item ( QueueEntry . new ( po ) )
115
75
end
116
- if false
117
- @zk . children ( zk_path ) . each do |cp |
118
- arr = @zk . get ( "#{ zk_path } /#{ cp } " )
119
- po = QueueItemReader . new ( self , cp , arr [ 0 ] ) . payload_object
120
- register_item ( QueueEntry . new ( po ) )
121
- end
122
- end
123
76
convert_json_to_table ( '' )
124
77
end
125
78
0 commit comments