diff --git a/lib/ronin/recon/engine.rb b/lib/ronin/recon/engine.rb index e121dd9..28c1c55 100644 --- a/lib/ronin/recon/engine.rb +++ b/lib/ronin/recon/engine.rb @@ -378,63 +378,6 @@ def on_connection(worker,value,parent) end end - public - - # - # Enqueues a message for processing. - # - # @param [Message::Value, Message::SHUTDOWN] mesg - # The message object. - # - # @raise [NotImplementedError] - # An unsupported message type was given. - # - # @api private - # - def enqueue_mesg(mesg) - case mesg - when Message::Value - value = mesg.value - - if (worker_classes = @worker_classes[value.class]) - worker_classes.each do |worker_class| - @logger.debug("Value enqueued: #{worker_class} #{value.inspect}") - - @value_status.value_enqueued(worker_class,value) - end - - @worker_tasks[value.class].each do |worker_task| - worker_task.enqueue_mesg(mesg) - end - end - when Message::SHUTDOWN - @logger.debug("Shutting down ...") - - @worker_tasks.each_value do |worker_tasks| - worker_tasks.each do |worker_task| - @logger.debug("Shutting down worker: #{worker_task.worker} ...") - - worker_task.enqueue_mesg(mesg) - end - end - else - raise(NotImplementedError,"unable to handle message: #{mesg.inspect}") - end - end - - # - # Sends a new value into the recon engine for processing. - # - # @param [Values::Value] value - # The value object to enqueue. - # - # @api private - # - def enqueue_value(value) - @graph.add_node(value) - enqueue_mesg(Message::Value.new(value)) - end - # # Processes a message. # @@ -459,8 +402,6 @@ def process(mesg) end end - private - # # Handles when a worker has started. # @@ -580,7 +521,60 @@ def process_value(mesg) end end - public + # + # Enqueues a message for processing. + # + # @param [Message::Value, Message::SHUTDOWN] mesg + # The message object. + # + # @raise [NotImplementedError] + # An unsupported message type was given. + # + # @api private + # + def enqueue_mesg(mesg) + case mesg + when Message::Value + value = mesg.value + + if (worker_classes = @worker_classes[value.class]) + worker_classes.each do |worker_class| + @logger.debug("Value enqueued: #{worker_class} #{value.inspect}") + + @value_status.value_enqueued(worker_class,value) + end + + @worker_tasks[value.class].each do |worker_task| + worker_task.enqueue_mesg(mesg) + end + end + when Message::SHUTDOWN + @logger.debug("Shutting down ...") + + @worker_tasks.each_value do |worker_tasks| + worker_tasks.each do |worker_task| + @logger.debug("Shutting down worker: #{worker_task.worker} ...") + + worker_task.enqueue_mesg(mesg) + end + end + else + raise(NotImplementedError,"unable to handle message: #{mesg.inspect}") + end + end + + # + # Sends a new value into the recon engine for processing. + # + # @param [Values::Value] value + # The value object to enqueue. + # + # @api private + # + def enqueue_value(value) + @graph.add_node(value) + enqueue_mesg(Message::Value.new(value)) + end # # Sends the shutdown message and waits for all worker tasks to shutdown.