Skip to content

Commit

Permalink
Make Engine#process, #enqueue_mesg, and #enqueue_value private.
Browse files Browse the repository at this point in the history
  • Loading branch information
postmodern committed May 28, 2024
1 parent 8db38dd commit 47a8203
Showing 1 changed file with 54 additions and 60 deletions.
114 changes: 54 additions & 60 deletions lib/ronin/recon/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -378,63 +378,6 @@ def on_connection(worker,value,parent)
end
end

public

#
# Enqueues a message for processing.
#
# @param [Message::Value, Message::SHUTDOWN] mesg
# The message object.
#
# @raise [NotImplementedError]
# An unsupported message type was given.
#
# @api private
#
def enqueue_mesg(mesg)
case mesg
when Message::Value
value = mesg.value

if (worker_classes = @worker_classes[value.class])
worker_classes.each do |worker_class|
@logger.debug("Value enqueued: #{worker_class} #{value.inspect}")

@value_status.value_enqueued(worker_class,value)
end

@worker_tasks[value.class].each do |worker_task|
worker_task.enqueue_mesg(mesg)
end
end
when Message::SHUTDOWN
@logger.debug("Shutting down ...")

@worker_tasks.each_value do |worker_tasks|
worker_tasks.each do |worker_task|
@logger.debug("Shutting down worker: #{worker_task.worker} ...")

worker_task.enqueue_mesg(mesg)
end
end
else
raise(NotImplementedError,"unable to handle message: #{mesg.inspect}")
end
end

#
# Sends a new value into the recon engine for processing.
#
# @param [Values::Value] value
# The value object to enqueue.
#
# @api private
#
def enqueue_value(value)
@graph.add_node(value)
enqueue_mesg(Message::Value.new(value))
end

#
# Processes a message.
#
Expand All @@ -459,8 +402,6 @@ def process(mesg)
end
end

private

#
# Handles when a worker has started.
#
Expand Down Expand Up @@ -580,7 +521,60 @@ def process_value(mesg)
end
end

public
#
# Enqueues a message for processing.
#
# @param [Message::Value, Message::SHUTDOWN] mesg
# The message object.
#
# @raise [NotImplementedError]
# An unsupported message type was given.
#
# @api private
#
def enqueue_mesg(mesg)
case mesg
when Message::Value
value = mesg.value

if (worker_classes = @worker_classes[value.class])
worker_classes.each do |worker_class|
@logger.debug("Value enqueued: #{worker_class} #{value.inspect}")

@value_status.value_enqueued(worker_class,value)
end

@worker_tasks[value.class].each do |worker_task|
worker_task.enqueue_mesg(mesg)
end
end
when Message::SHUTDOWN
@logger.debug("Shutting down ...")

@worker_tasks.each_value do |worker_tasks|
worker_tasks.each do |worker_task|
@logger.debug("Shutting down worker: #{worker_task.worker} ...")

worker_task.enqueue_mesg(mesg)
end
end
else
raise(NotImplementedError,"unable to handle message: #{mesg.inspect}")
end
end

#
# Sends a new value into the recon engine for processing.
#
# @param [Values::Value] value
# The value object to enqueue.
#
# @api private
#
def enqueue_value(value)
@graph.add_node(value)
enqueue_mesg(Message::Value.new(value))
end

#
# Sends the shutdown message and waits for all worker tasks to shutdown.
Expand Down

0 comments on commit 47a8203

Please sign in to comment.