Skip to content

Commit

Permalink
allow skipping cpu crunching (#32)
Browse files Browse the repository at this point in the history
* allow skipping cpu-crunching and executing algs immediately

* allow executing fast in runner

* use arguments to skip crunching in tests
  • Loading branch information
m-fila authored Aug 12, 2024
1 parent f72b4c7 commit 0148de7
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 15 deletions.
18 changes: 14 additions & 4 deletions bin/schedule.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ function parse_args()
"--dot-trace"
help = "Output graphviz dot file for execution logs graph"
arg_type = String

"--fast"
help = "Execute algorithms immediately skipping algorithm runtime information and crunching"
action = :store_true

end

return ArgParse.parse_args(s)
Expand All @@ -35,16 +40,21 @@ end
function main()
args = parse_args()

event_count = args["event-count"]
max_concurrent = args["max-concurrent"]

if !isnothing(args["dot-trace"])
@info "Enabled logging"
FrameworkDemo.configure_LocalEventLog()
end

graph = FrameworkDemo.parse_graphml(args["data-flow"])
FrameworkDemo.run_events(graph, event_count, max_concurrent)
event_count=args["event-count"]
max_concurrent=args["max-concurrent"]
fast=args["fast"]

@time "Pipeline execution" FrameworkDemo.run_events(graph;
event_count=event_count,
max_concurrent=max_concurrent,
fast=fast
)

if !isnothing(args["dot-trace"])
logs = Dagger.fetch_logs!()
Expand Down
2 changes: 1 addition & 1 deletion src/cpu_crunching.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function benchmark_prime(n::Int)
return Δt
end

function calculate_coefficients()
function calculate_coefficients()::Vector{Float64}
n_max = [1000,200_000]
t_average = benchmark_prime.(n_max)

Expand Down
32 changes: 23 additions & 9 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ end

alg_default_runtime_s::Float64 = 0

function (alg::MockupAlgorithm)(args...; coefficients::Vector{Float64})
function (alg::MockupAlgorithm)(args...; coefficients::Union{Vector{Float64},Missing})
println("Executing $(alg.name)")
crunch_for_seconds(alg.runtime, coefficients)
if coefficients isa Vector{Float64}
crunch_for_seconds(alg.runtime, coefficients)
end

return alg.name
end
Expand All @@ -45,13 +47,18 @@ function is_terminating_alg(graph::AbstractGraph, vertex_id::Int)
all(is_terminating, successor_dataobjects)
end

function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, coefficients::Dagger.Shard)
function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, coefficients::Union{Dagger.Shard,Nothing})
incoming_data = get_promises(graph, inneighbors(graph, vertex_id))
algorithm = MockupAlgorithm(graph, vertex_id)
Dagger.@spawn algorithm(incoming_data...; coefficients)
if isnothing(coefficients)
alg_helper(data...) = algorithm(data...; coefficients=missing)
return Dagger.@spawn alg_helper(incoming_data...)
else
return Dagger.@spawn algorithm(incoming_data...; coefficients=coefficients)
end
end

function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard)
function schedule_graph(graph::MetaDiGraph, coefficients::Union{Dagger.Shard,Nothing})
alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm")
sorted_vertices = MetaGraphs.topological_sort(graph)

Expand All @@ -70,13 +77,20 @@ function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard)
return terminating_results
end

function run_events(graph::MetaDiGraph,
event_count::Int,
max_concurrent::Int)


function calibrate_crunch(; fast::Bool=false)::Union{Dagger.Shard,Nothing}
return fast ? nothing : Dagger.@shard calculate_coefficients()
end

function run_events(graph::MetaDiGraph;
event_count::Int,
max_concurrent::Int,
fast::Bool=false)

graphs_tasks = Dict{Int,Dagger.DTask}()
notifications = RemoteChannel(()->Channel{Int}(32))
coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients()
coefficients = FrameworkDemo.calibrate_crunch(;fast=fast)

for idx in 1:event_count
while length(graphs_tasks) >= max_concurrent
Expand Down
32 changes: 32 additions & 0 deletions test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# FrameworkDemo tests

## Usage

Run the tests with:

- Pkg REPL (arguments not supported):
```julia
] test
```
- Pkg:
```julia
import Pkg
Pkg.test("FrameworkDemo")
# or with arguments
Pkg.test("FrameworkDemo"; test_args=<list of args>)
```
- Manually:
```
julia --project test/runtests.jl <list of args>
```
- REPL:
```julia
append!(ARGS, <list of args>)
include("test/runtests.jl")
```

## Arguments

The tests support arguments:

- `no-fast` - don't skip algorithm CPU crunching
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using Test
n_workers = Distributed.nworkers(),
n_procs = Distributed.nprocs(),
n_threads = Threads.nthreads(),
test_args = repr(ARGS)
)

@testset verbose = true "FrameworkDemo.jl" begin
Expand Down
3 changes: 2 additions & 1 deletion test/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ end
ilength(x) = sum(_ -> 1, x) # no standard length for MetaGraphs.filter_vertices iterator
algorithms_count = ilength(MetaGraphs.filter_vertices(graph, :type, "Algorithm"))
set_indexing_prop!(graph, :node_id)
coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients()
is_fast = "no-fast" ARGS
coefficients = FrameworkDemo.calibrate_crunch(;fast=is_fast)

Dagger.enable_logging!(tasknames=true, taskdeps=true)
_ = Dagger.fetch_logs!() # flush logs
Expand Down

0 comments on commit 0148de7

Please sign in to comment.