Skip to content

Commit

Permalink
Renamed WorkerTasks to WorkerPool.
Browse files Browse the repository at this point in the history
  • Loading branch information
postmodern committed May 28, 2024
1 parent 7bc937e commit 9a40186
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 40 deletions.
40 changes: 20 additions & 20 deletions lib/ronin/recon/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# along with ronin-recon. If not, see <https://www.gnu.org/licenses/>.
#

require 'ronin/recon/worker_tasks'
require 'ronin/recon/worker_pool'
require 'ronin/recon/value_status'
require 'ronin/recon/graph'
require 'ronin/recon/scope'
Expand Down Expand Up @@ -88,8 +88,8 @@ def initialize(values, workers: WorkerSet.default,
@scope = Scope.new(values, ignore: ignore)

@worker_classes = {}
@worker_tasks = {}
@worker_task_count = 0
@worker_pools = {}
@worker_pool_count = 0

@value_status = ValueStatus.new
@graph = Graph.new
Expand Down Expand Up @@ -186,9 +186,9 @@ def run(task=Async::Task.current)
end

# start all work groups
@worker_tasks.each_value do |worker_tasks|
worker_tasks.each do |worker_task|
worker_task.start(task)
@worker_pools.each_value do |worker_pools|
worker_pools.each do |worker_pool|
worker_pool.start(task)
end
end
end
Expand All @@ -207,13 +207,13 @@ def run(task=Async::Task.current)
def add_worker(worker_class, concurrency: worker_class.concurrency,
params: nil)
worker = worker_class.new(params: params)
worker_tasks = WorkerTasks.new(worker, concurrency: concurrency,
output_queue: @output_queue,
logger: @logger)
worker_pools = WorkerPool.new(worker, concurrency: concurrency,
output_queue: @output_queue,
logger: @logger)

worker_class.accepts.each do |value_class|
(@worker_classes[value_class] ||= []) << worker_class
(@worker_tasks[value_class] ||= []) << worker_tasks
(@worker_pools[value_class] ||= []) << worker_pools
end
end

Expand Down Expand Up @@ -405,7 +405,7 @@ def process(mesg)
#
def process_worker_started(mesg)
@logger.debug("Worker started: #{mesg.worker}")
@worker_task_count += 1
@worker_pool_count += 1
end

#
Expand All @@ -418,7 +418,7 @@ def process_worker_started(mesg)
#
def process_worker_stopped(mesg)
@logger.debug("Worker shutdown: #{mesg.worker}")
@worker_task_count -= 1
@worker_pool_count -= 1
end

#
Expand Down Expand Up @@ -537,18 +537,18 @@ def enqueue_mesg(mesg)
@value_status.value_enqueued(worker_class,value)
end

@worker_tasks[value.class].each do |worker_task|
worker_task.enqueue_mesg(mesg)
@worker_pools[value.class].each do |worker_pool|
worker_pool.enqueue_mesg(mesg)
end
end
when Message::SHUTDOWN
@logger.debug("Shutting down ...")

@worker_tasks.each_value do |worker_tasks|
worker_tasks.each do |worker_task|
@logger.debug("Shutting down worker: #{worker_task.worker} ...")
@worker_pools.each_value do |worker_pools|
worker_pools.each do |worker_pool|
@logger.debug("Shutting down worker: #{worker_pool.worker} ...")

worker_task.enqueue_mesg(mesg)
worker_pool.enqueue_mesg(mesg)
end
end
else
Expand All @@ -570,15 +570,15 @@ def enqueue_value(value)
end

#
# Sends the shutdown message and waits for all worker tasks to shutdown.
# Sends the shutdown message and waits for all worker pools to shutdown.
#
# @api private
#
def shutdown!
enqueue_mesg(Message::SHUTDOWN)

# wait until all workers report that they have exited
until @worker_task_count == 0
until @worker_pool_count == 0
process(@output_queue.dequeue)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
module Ronin
module Recon
#
# Contains the `Async::Task` objects that process messages from the input
# queue and sends messages to the output queue.
# Contains the `Async::Task` objects for a worker, that process messages
# from the input queue and sends messages to the output queue.
#
# @api private
#
class WorkerTasks
class WorkerPool

# The recon worker's ID.
#
Expand Down Expand Up @@ -70,7 +70,7 @@ class WorkerTasks
attr_reader :logger

#
# Initializes the worker tasks.
# Initializes the worker pool.
#
# @param [Worker] worker
# The initialized worker object.
Expand Down Expand Up @@ -154,7 +154,7 @@ def run
end

#
# Starts the worker.
# Starts the worker pool.
#
# @param [Async::Task] task
# The optional async task to register the worker under.
Expand All @@ -171,18 +171,18 @@ def start(task=Async::Task.current)
end

#
# Marks the worker as running.
# Marks the worker pool as running.
#
def started!
# send a message to the engine that the worker task has started
# send a message to the engine that the worker pool has started
enqueue(Message::WorkerStarted.new(@worker))
end

#
# Marks the worker as stopped.
# Marks the worker pool as stopped.
#
def stopped!
# send a message to the engine that the worker task has stopped
# send a message to the engine that the worker pool has stopped
enqueue(Message::WorkerStopped.new(@worker))
end

Expand Down
22 changes: 11 additions & 11 deletions spec/worker_tasks_spec.rb → spec/worker_pool_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
require 'spec_helper'
require 'ronin/recon/worker_tasks'
require 'ronin/recon/worker_pool'
require 'ronin/recon/worker'
require 'ronin/recon/message/value'

describe Ronin::Recon::WorkerTasks do
describe Ronin::Recon::WorkerPool do
subject { described_class.new(worker, output_queue: Async::Queue.new) }

module TestWorkerTasks
module TestWorkerPool
class TestWorker < Ronin::Recon::Worker
def process(value)
yield Ronin::Recon::Values::Domain.new('example.com')
Expand All @@ -20,7 +20,7 @@ def process(value); end
end

describe "#initialize" do
let(:worker) { TestWorkerTasks::TestWorker.new }
let(:worker) { TestWorkerPool::TestWorker.new }

it "must initialize #worker worker object" do
expect(subject.worker).to be(worker)
Expand All @@ -46,7 +46,7 @@ def process(value); end
describe "#enqueue_mesg" do
context "for Message::SHUTDOWN" do
let(:mesg_value) { Ronin::Recon::Message::SHUTDOWN }
let(:worker) { TestWorkerTasks::TestWorkerWithConcurrency.new }
let(:worker) { TestWorkerPool::TestWorkerWithConcurrency.new }

it "must enqueue Message::Shutdown into #input_queue 2 times" do
Async { subject.enqueue_mesg(mesg_value) }
Expand All @@ -58,7 +58,7 @@ def process(value); end

context "for other Message's" do
let(:mesg_value) { Ronin::Recon::Message::Value }
let(:worker) { TestWorkerTasks::TestWorker.new }
let(:worker) { TestWorkerPool::TestWorker.new }

it "must enqueue Message into #input_queue" do
Async { subject.enqueue_mesg(mesg_value) }
Expand All @@ -69,7 +69,7 @@ def process(value); end
end

describe "#run" do
let(:worker) { TestWorkerTasks::TestWorker.new }
let(:worker) { TestWorkerPool::TestWorker.new }
let(:shutdown_mesg) { Ronin::Recon::Message::SHUTDOWN }
let(:value_mesg) { Ronin::Recon::Message::Value.new("value") }

Expand Down Expand Up @@ -102,7 +102,7 @@ def process(value); end
end

describe "#start" do
let(:worker) { TestWorkerTasks::TestWorkerWithConcurrency.new }
let(:worker) { TestWorkerPool::TestWorkerWithConcurrency.new }
let(:shutdown_mesg) { Ronin::Recon::Message::SHUTDOWN }
let(:value_mesg) { Ronin::Recon::Message::Value.new("value") }

Expand All @@ -117,7 +117,7 @@ def process(value); end
end

describe "#started!" do
let(:worker) { TestWorkerTasks::TestWorker.new }
let(:worker) { TestWorkerPool::TestWorker.new }

it "must enqueue Message::WorkerStarted instance into #output_queue" do
Async { subject.started! }
Expand All @@ -128,7 +128,7 @@ def process(value); end
end

describe "#stopped!" do
let(:worker) { TestWorkerTasks::TestWorker.new }
let(:worker) { TestWorkerPool::TestWorker.new }

it "must enqueue Message::WorkerStopped instance into #output_queue" do
Async { subject.stopped! }
Expand All @@ -139,7 +139,7 @@ def process(value); end
end

describe "#enqueue" do
let(:worker) { TestWorkerTasks::TestWorker.new }
let(:worker) { TestWorkerPool::TestWorker.new }
let(:mesg) { Ronin::Recon::Message::JobFailed }

it "must enqueue Message into #output_queue" do
Expand Down

0 comments on commit 9a40186

Please sign in to comment.