diff --git a/bin/schedule.jl b/bin/schedule.jl index 20a3795..d5b8d01 100644 --- a/bin/schedule.jl +++ b/bin/schedule.jl @@ -27,6 +27,11 @@ function parse_args() "--dot-trace" help = "Output graphviz dot file for execution logs graph" arg_type = String + + "--fast" + help = "Execute algorithms immediately skipping algorithm runtime information and crunching" + action = :store_true + end return ArgParse.parse_args(s) @@ -35,16 +40,21 @@ end function main() args = parse_args() - event_count = args["event-count"] - max_concurrent = args["max-concurrent"] - if !isnothing(args["dot-trace"]) @info "Enabled logging" FrameworkDemo.configure_LocalEventLog() end graph = FrameworkDemo.parse_graphml(args["data-flow"]) - FrameworkDemo.run_events(graph, event_count, max_concurrent) + event_count=args["event-count"] + max_concurrent=args["max-concurrent"] + fast=args["fast"] + + @time "Pipeline execution" FrameworkDemo.run_events(graph; + event_count=event_count, + max_concurrent=max_concurrent, + fast=fast + ) if !isnothing(args["dot-trace"]) logs = Dagger.fetch_logs!() diff --git a/src/cpu_crunching.jl b/src/cpu_crunching.jl index 78bf83f..fd5c663 100644 --- a/src/cpu_crunching.jl +++ b/src/cpu_crunching.jl @@ -29,7 +29,7 @@ function benchmark_prime(n::Int) return Δt end -function calculate_coefficients() +function calculate_coefficients()::Vector{Float64} n_max = [1000,200_000] t_average = benchmark_prime.(n_max) diff --git a/src/scheduling.jl b/src/scheduling.jl index 72bfdd2..5cce408 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -22,9 +22,11 @@ end alg_default_runtime_s::Float64 = 0 -function (alg::MockupAlgorithm)(args...; coefficients::Vector{Float64}) +function (alg::MockupAlgorithm)(args...; coefficients::Union{Vector{Float64},Missing}) println("Executing $(alg.name)") - crunch_for_seconds(alg.runtime, coefficients) + if coefficients isa Vector{Float64} + crunch_for_seconds(alg.runtime, coefficients) + end return alg.name end @@ -45,13 +47,18 @@ function is_terminating_alg(graph::AbstractGraph, vertex_id::Int) all(is_terminating, successor_dataobjects) end -function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, coefficients::Dagger.Shard) +function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, coefficients::Union{Dagger.Shard,Nothing}) incoming_data = get_promises(graph, inneighbors(graph, vertex_id)) algorithm = MockupAlgorithm(graph, vertex_id) - Dagger.@spawn algorithm(incoming_data...; coefficients) + if isnothing(coefficients) + alg_helper(data...) = algorithm(data...; coefficients=missing) + return Dagger.@spawn alg_helper(incoming_data...) + else + return Dagger.@spawn algorithm(incoming_data...; coefficients=coefficients) + end end -function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard) +function schedule_graph(graph::MetaDiGraph, coefficients::Union{Dagger.Shard,Nothing}) alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm") sorted_vertices = MetaGraphs.topological_sort(graph) @@ -70,13 +77,20 @@ function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard) return terminating_results end -function run_events(graph::MetaDiGraph, - event_count::Int, - max_concurrent::Int) + + +function calibrate_crunch(; fast::Bool=false)::Union{Dagger.Shard,Nothing} + return fast ? nothing : Dagger.@shard calculate_coefficients() +end + +function run_events(graph::MetaDiGraph; + event_count::Int, + max_concurrent::Int, + fast::Bool=false) graphs_tasks = Dict{Int,Dagger.DTask}() notifications = RemoteChannel(()->Channel{Int}(32)) - coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients() + coefficients = FrameworkDemo.calibrate_crunch(;fast=fast) for idx in 1:event_count while length(graphs_tasks) >= max_concurrent diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..c2e585d --- /dev/null +++ b/test/README.md @@ -0,0 +1,32 @@ +# FrameworkDemo tests + +## Usage + +Run the tests with: + +- Pkg REPL (arguments not supported): + ```julia + ] test + ``` +- Pkg: + ```julia + import Pkg + Pkg.test("FrameworkDemo") + # or with arguments + Pkg.test("FrameworkDemo"; test_args=) + ``` +- Manually: + ``` + julia --project test/runtests.jl + ``` +- REPL: + ```julia + append!(ARGS, ) + include("test/runtests.jl") + ``` + +## Arguments + +The tests support arguments: + +- `no-fast` - don't skip algorithm CPU crunching diff --git a/test/runtests.jl b/test/runtests.jl index ef2a4c7..8f2792a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,6 +7,7 @@ using Test n_workers = Distributed.nworkers(), n_procs = Distributed.nprocs(), n_threads = Threads.nthreads(), + test_args = repr(ARGS) ) @testset verbose = true "FrameworkDemo.jl" begin diff --git a/test/scheduling.jl b/test/scheduling.jl index ecd65e4..223b134 100644 --- a/test/scheduling.jl +++ b/test/scheduling.jl @@ -41,7 +41,8 @@ end ilength(x) = sum(_ -> 1, x) # no standard length for MetaGraphs.filter_vertices iterator algorithms_count = ilength(MetaGraphs.filter_vertices(graph, :type, "Algorithm")) set_indexing_prop!(graph, :node_id) - coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients() + is_fast = "no-fast" ∉ ARGS + coefficients = FrameworkDemo.calibrate_crunch(;fast=is_fast) Dagger.enable_logging!(tasknames=true, taskdeps=true) _ = Dagger.fetch_logs!() # flush logs