Skip to content

Commit

Permalink
Configure algorithm runtime, don't schedule data object nodes as algo…
Browse files Browse the repository at this point in the history
…rtihms (#21)

* New treatment of MockupAlgorithms

based on Mateusz's ideas without the datadeps updates

* Added size and runtime properties to test graphs

* added size_kb field to store size as a double
* adjusted scheduling.jl treatment of size to align with test graphs

* Made graph property consistent with tests

Changed size_kb to size_average_B

* only check for results in data nodes

* Cleaned up struct function

* Collect terminating results in schedule_graph

* removed obsolete unused functions

* removed ! as schedule algorithm is not mutating

---------

Co-authored-by: Mateusz Jakub Fila <mateusz.jakub.fila@cern.ch>
  • Loading branch information
joott and m-fila authored Jul 10, 2024
1 parent 06c132f commit 41a471b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 143 deletions.
5 changes: 3 additions & 2 deletions data/datadeps_demo/df.graphml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version='1.0' encoding='utf-8'?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd"><key id="d4" for="node" attr.name="size_average_B" attr.type="double"/>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd">
<key id="d4" for="node" attr.name="size_average_B" attr.type="double"/>
<key id="d3" for="node" attr.name="class" attr.type="string"/>
<key id="d2" for="node" attr.name="node_id" attr.type="string"/>
<key id="d1" for="node" attr.name="runtime_average_s" attr.type="double"/>
Expand Down Expand Up @@ -96,4 +97,4 @@
<edge source="9" target="4"/>
<edge source="11" target="5"/>
<edge source="12" target="6"/>
</graph></graphml>
</graph></graphml>
120 changes: 69 additions & 51 deletions data/sequencer_demo/another_test_graph.graphml
Original file line number Diff line number Diff line change
@@ -1,88 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd">
<key id="key0" for="node" attr.name="class" attr.type="string" />
<key id="key1" for="node" attr.name="node_id" attr.type="string" />
<key id="key2" for="node" attr.name="type" attr.type="string" />
<key id="d0" for="node" attr.name="class" attr.type="string" />
<key id="d1" for="node" attr.name="node_id" attr.type="string" />
<key id="d2" for="node" attr.name="type" attr.type="string" />
<key id="d3" for="node" attr.name="runtime_average_s" attr.type="double" />
<key id="d4" for="node" attr.name="size_average_B" attr.type="double" />
<graph id="G" edgedefault="directed" parse.nodeids="free" parse.edgeids="canonical" parse.order="nodesfirst">
<node id="n0">
<data key="key0">MicroProducer</data>
<data key="key1">ProducerA</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroProducer</data>
<data key="d1">ProducerA</data>
<data key="d2">Algorithm</data>
<data key="d3">9.3027e-05</data>
</node>
<node id="n1">
<data key="key0"></data>
<data key="key1">A</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">A</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n2">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerB</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerB</data>
<data key="d2">Algorithm</data>
<data key="d3">4.2463e-05</data>
</node>
<node id="n3">
<data key="key0"></data>
<data key="key1">C</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">C</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n4">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerD</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerD</data>
<data key="d2">Algorithm</data>
<data key="d3">8.241000000000001e-06</data>
</node>
<node id="n5">
<data key="key0"></data>
<data key="key1">E</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">E</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n6">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerF</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerF</data>
<data key="d2">Algorithm</data>
<data key="d3">7.264e-06</data>
</node>
<node id="n7">
<data key="key0"></data>
<data key="key1">G</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">G</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n8">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerH</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerH</data>
<data key="d2">Algorithm</data>
<data key="d3">6.705e-06</data>
</node>
<node id="n9">
<data key="key0"></data>
<data key="key1">I</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">I</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n10">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerJ</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerJ</data>
<data key="d2">Algorithm</data>
<data key="d3">8.59e-06</data>
</node>
<node id="n11">
<data key="key0"></data>
<data key="key1">K</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">K</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n12">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerL</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerL</data>
<data key="d2">Algorithm</data>
<data key="d3">8.94e-06</data>
</node>
<node id="n13">
<data key="key0"></data>
<data key="key1">M</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">M</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n14">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerN</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerN</data>
<data key="d2">Algorithm</data>
<data key="d3">8.94e-06</data>
</node>
<node id="n15">
<data key="key0"></data>
<data key="key1">O</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">O</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>

<edge id="e0" source="n0" target="n1">
Expand Down
3 changes: 2 additions & 1 deletion data/sequencer_demo/df_sequencer_demo.graphml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version='1.0' encoding='utf-8'?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd"><key id="d4" for="node" attr.name="size_average_B" attr.type="double"/>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd">
<key id="d4" for="node" attr.name="size_average_B" attr.type="double"/>
<key id="d3" for="node" attr.name="class" attr.type="string"/>
<key id="d2" for="node" attr.name="node_id" attr.type="string"/>
<key id="d1" for="node" attr.name="runtime_average_s" attr.type="double"/>
Expand Down
7 changes: 2 additions & 5 deletions examples/schedule.jl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ function execution(graphs_map)
results = []
for (g_name, g) in graphs
g_map = Dict{Int, Any}()
for vertex_id in Graphs.vertices(g)
data_vertices = MetaGraphs.filter_vertices(g, :type, "DataObject")
for vertex_id in data_vertices
future = get_prop(g, vertex_id, :res_data)
g_map[vertex_id] = fetch(future)
end
Expand All @@ -64,10 +65,6 @@ end

function main(graphs_map)
FrameworkDemo.configure_LocalEventLog()
#
# OR
#
# configure_webdash_multievent()

@time execution(graphs_map)

Expand Down
128 changes: 44 additions & 84 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ using Distributed
using MetaGraphs

# Algorithms
function mock_Gaudi_algorithm(graph_name, graph_id, vertex_id, data...)
println("Graph: $graph_name, Gaudi algorithm for vertex $vertex_id !")
sleep(1)
return vertex_id
struct MockupAlgorithm
name::String
runtime::Float64
input_length::UInt
MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) = begin
runtime = get_prop(graph, vertex_id, :runtime_average_s)
name = get_prop(graph, vertex_id, :node_id)
inputs = length(inneighbors(graph, vertex_id))
new(name, runtime, inputs)
end
end

function dataobject_algorithm(graph_name, graph_id, vertex_id, data...)
println("Graph: $graph_name, Dataobject algorithm for vertex $vertex_id !")
sleep(0.1)
return vertex_id
function (alg::MockupAlgorithm)(args...)
println("Executing $(alg.name)")

return alg.name
end

function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...)
Expand All @@ -28,7 +34,7 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_
parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot"
parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png"
G = parse_graphml([graph_path])

open(parsed_graph_dot, "w") do f
MetaGraphs.savedot(f, G)
end
Expand All @@ -38,92 +44,46 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_
return graphs
end

# Function to get the map of incoming edges to a vertex (i.e. the sources of the incoming edges)
function get_ine_map(G)
incoming_edges_sources_map = Dict{eltype(G), Vector{eltype(G)}}()

for edge in Graphs.edges(G)
src_vertex = src(edge)
dest_vertex = dst(edge)

if haskey(incoming_edges_sources_map, dest_vertex)
push!(incoming_edges_sources_map[dest_vertex], src_vertex)
else
incoming_edges_sources_map[dest_vertex] = [src_vertex]
end
end

return incoming_edges_sources_map
end

# Function to get the map of outgoing edges from a vertex (i.e. the destinations of the outgoing edges)
function get_oute_map(G)
outgoing_edges_destinations_map = Dict{eltype(G), Vector{eltype(G)}}()

for edge in Graphs.edges(G)
src_vertex = src(edge)
dest_vertex = dst(edge)

if haskey(outgoing_edges_destinations_map, src_vertex)
push!(outgoing_edges_destinations_map[src_vertex], dest_vertex)
else
outgoing_edges_destinations_map[src_vertex] = [dest_vertex]
end
end

return outgoing_edges_destinations_map
end

function get_vertices_promises(vertices::Vector, G::MetaDiGraph)
promises = []
for vertex in vertices
push!(promises, get_prop(G, vertex, :res_data))
end
return promises
function get_promises(graph::MetaDiGraph, vertices::Vector)
return [get_prop(graph, v, :res_data) for v in vertices]
end

function get_deps_promises(vertex_id, map, G)
incoming_data = []
if haskey(map, vertex_id)
for src in map[vertex_id]
push!(incoming_data, get_prop(G, src, :res_data))
end
end
return incoming_data
function is_terminating_alg(graph::AbstractGraph, vertex_id::Int)
successor_dataobjects = outneighbors(graph, vertex_id)
is_terminating(vertex) = isempty(outneighbors(graph, vertex))
all(is_terminating, successor_dataobjects)
end

function schedule_graph(G::MetaDiGraph)
inc_e_src_map = get_ine_map(G)

for vertex_id in MetaGraphs.topological_sort(G)
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G)
set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](name, graph_id, vertex_id, incoming_data...))
end
function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int)
incoming_data = get_promises(graph, inneighbors(graph, vertex_id))
algorithm = MockupAlgorithm(graph, vertex_id)
Dagger.@spawn algorithm(incoming_data...)
end

function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int)
final_vertices = []
inc_e_src_map = get_ine_map(G)
function schedule_graph(graph::MetaDiGraph)
alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm")
sorted_vertices = MetaGraphs.topological_sort(graph)

for vertex_id in MetaGraphs.topological_sort(G)
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G)
set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](graph_name, graph_id, vertex_id, incoming_data...))
end
terminating_results = []

out_e_src_map = get_oute_map(G)
for vertex_id in MetaGraphs.vertices(G)
if !haskey(out_e_src_map, vertex_id)
out_e_src_map[vertex_id] = []
for vertex_id in intersect(sorted_vertices, alg_vertices)
res = schedule_algorithm(graph, vertex_id)
set_prop!(graph, vertex_id, :res_data, res)
for v in outneighbors(graph, vertex_id)
set_prop!(graph, v, :res_data, res)
end
end

for vertex_id in keys(out_e_src_map)
if out_e_src_map[vertex_id] == [] # TODO: a native method to check for emptiness should exist
push!(final_vertices, vertex_id)
end
is_terminating_alg(graph, vertex_id) && push!(terminating_results, res)
end

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...)
return terminating_results
end

AVAILABLE_TRANSFORMS = Dict{String, Function}("Algorithm" => mock_Gaudi_algorithm, "DataObject" => dataobject_algorithm)
function schedule_graph_with_notify(graph::MetaDiGraph,
notifications::RemoteChannel,
graph_name::String,
graph_id::Int)
terminating_results = schedule_graph(graph)

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...)
end

0 comments on commit 41a471b

Please sign in to comment.