|
| 1 | +using FrameworkDemo |
| 2 | +using Dagger |
| 3 | +using Graphs |
| 4 | +using MetaGraphs |
| 5 | + |
| 6 | +function get_alg_timeline(logs::Dict) |
| 7 | + timeline = Dict{Int,Any}() |
| 8 | + Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx |
| 9 | + category = logs[w][:core][start_idx].category |
| 10 | + if category == :compute |
| 11 | + tid = logs[w][:id][start_idx].thunk_id |
| 12 | + t_start = logs[w][:core][start_idx].timestamp |
| 13 | + t_stop = logs[w][:core][finish_idx].timestamp |
| 14 | + timeline[tid] = (start=t_start, stop=t_stop) |
| 15 | + end |
| 16 | + end |
| 17 | + return timeline |
| 18 | +end |
| 19 | + |
| 20 | +function get_alg_deps(logs::Dict) |
| 21 | + task_deps = Dict{Int,Set{Int}}() |
| 22 | + for w in keys(logs) |
| 23 | + for idx in 1:length(logs[w][:core]) |
| 24 | + category = logs[w][:core][idx].category |
| 25 | + kind = logs[w][:core][idx].kind |
| 26 | + if category == :add_thunk && kind == :start |
| 27 | + (tid, deps) = logs[w][:taskdeps][idx] |
| 28 | + if isa(deps, Vector{Int}) && !isempty(deps) |
| 29 | + task_deps[tid] = Set{Int}(deps) |
| 30 | + end |
| 31 | + end |
| 32 | + end |
| 33 | + end |
| 34 | + return task_deps |
| 35 | +end |
| 36 | + |
| 37 | + |
| 38 | +@testset verbose = true "Scheduling" begin |
| 39 | + graph = FrameworkDemo.parse_graphml(["../data/demo/datadeps/df.graphml"]) |
| 40 | + algorithms_count = 7 |
| 41 | + set_indexing_prop!(graph, :node_id) |
| 42 | + |
| 43 | + Dagger.enable_logging!(timeline=true, |
| 44 | + tasknames=true, |
| 45 | + taskdeps=true, |
| 46 | + taskargs=true, |
| 47 | + taskargmoves=true, |
| 48 | + ) |
| 49 | + _ = Dagger.fetch_logs!() # flush logs |
| 50 | + |
| 51 | + FrameworkDemo.schedule_graph(graph) |
| 52 | + for v in vertices(graph) |
| 53 | + wait(get_prop(graph, v, :res_data)) |
| 54 | + end |
| 55 | + logs = Dagger.fetch_logs!() |
| 56 | + @test !isnothing(logs) |
| 57 | + |
| 58 | + task_to_tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map |
| 59 | + return deepcopy(id_map) |
| 60 | + end |
| 61 | + |
| 62 | + function get_tid(node_id::String)::Int |
| 63 | + task = get_prop(graph, graph[node_id, :node_id], :res_data) |
| 64 | + return task_to_tid[task.uid] |
| 65 | + end |
| 66 | + |
| 67 | + @testset "Timeline" begin |
| 68 | + timeline = get_alg_timeline(logs) |
| 69 | + @test length(timeline) == algorithms_count |
| 70 | + |
| 71 | + get_time = (node_id) -> timeline[get_tid(node_id)] |
| 72 | + |
| 73 | + @test get_time("ProducerA").stop < get_time("TransformerAB").start |
| 74 | + @test get_time("ProducerBC").stop < get_time("TransformerAB").start |
| 75 | + @test get_time("ProducerBC").stop < get_time("ConsumerBC").start |
| 76 | + @test get_time("ProducerBC").stop < get_time("TransformerC").start |
| 77 | + @test get_time("ProducerBC").stop < get_time("ConsumerCD").start |
| 78 | + @test get_time("TransformerAB").stop < get_time("ConsumerE").start |
| 79 | + @test get_time("TransformerAB").stop < get_time("ConsumerCD").start |
| 80 | + end |
| 81 | + |
| 82 | + @testset "Dependencies" begin |
| 83 | + deps = get_alg_deps(logs) |
| 84 | + get_deps = node_id -> deps[get_tid(node_id)] |
| 85 | + |
| 86 | + @test get_tid("ProducerA") ∈ get_deps("TransformerAB") |
| 87 | + @test get_tid("ProducerBC") ∈ get_deps("TransformerAB") |
| 88 | + @test get_tid("ProducerBC") ∈ get_deps("ConsumerBC") |
| 89 | + @test get_tid("ProducerBC") ∈ get_deps("TransformerC") |
| 90 | + @test get_tid("ProducerBC") ∈ get_deps("ConsumerCD") |
| 91 | + @test get_tid("TransformerAB") ∈ get_deps("ConsumerE") |
| 92 | + @test get_tid("TransformerAB") ∈ get_deps("ConsumerCD") |
| 93 | + end |
| 94 | +end |
0 commit comments