Skip to content

Commit

Permalink
Collect terminating results in schedule_graph
Browse files Browse the repository at this point in the history
  • Loading branch information
joott committed Jul 10, 2024
1 parent c6baaec commit cbc9678
Showing 1 changed file with 26 additions and 33 deletions.
59 changes: 26 additions & 33 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,53 +80,46 @@ function get_oute_map(G)
return outgoing_edges_destinations_map
end

function get_vertices_promises(vertices::Vector, G::MetaDiGraph)
promises = []
for vertex in vertices
push!(promises, get_prop(G, vertex, :res_data))
end
return promises
function get_promises(graph::MetaDiGraph, vertices::Vector)
return [get_prop(graph, v, :res_data) for v in vertices]
end

function get_in_promises(graph::MetaDiGraph, vertex_id::Int)
return [get_prop(graph, src, :res_data) for src in inneighbors(graph, vertex_id)]
function is_terminating_alg(graph::AbstractGraph, vertex_id::Int)
successor_dataobjects = outneighbors(graph, vertex_id)
is_terminating(vertex) = isempty(outneighbors(graph, vertex))
all(is_terminating, successor_dataobjects)
end

function schedule_algorithm!(graph::MetaDiGraph, vertex_id::Int)
incoming_data = get_in_promises(graph, vertex_id)
incoming_data = get_promises(graph, inneighbors(graph, vertex_id))
algorithm = MockupAlgorithm(graph, vertex_id)
Dagger.@spawn algorithm(incoming_data...)
end

function schedule_graph(G::MetaDiGraph)
alg_vertices = MetaGraphs.filter_vertices(G, :type, "Algorithm")
sorted_vertices = MetaGraphs.topological_sort(G)
function schedule_graph(graph::MetaDiGraph)
alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm")
sorted_vertices = MetaGraphs.topological_sort(graph)

terminating_results = []

for vertex_id in intersect(sorted_vertices, alg_vertices)
res = schedule_algorithm!(G, vertex_id)
for v in outneighbors(G, vertex_id)
set_prop!(G, v, :res_data, res)
res = schedule_algorithm!(graph, vertex_id)
set_prop!(graph, vertex_id, :res_data, res)
for v in outneighbors(graph, vertex_id)
set_prop!(graph, v, :res_data, res)
end
end
end

function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int)
schedule_graph(G::MetaDiGraph)

final_vertices = []

out_e_src_map = get_oute_map(G)
for vertex_id in MetaGraphs.vertices(G)
if !haskey(out_e_src_map, vertex_id)
out_e_src_map[vertex_id] = []
end
is_terminating_alg(graph, vertex_id) && push!(terminating_results, res)
end

for vertex_id in keys(out_e_src_map)
if isempty(out_e_src_map[vertex_id])
push!(final_vertices, vertex_id)
end
end
return terminating_results
end

function schedule_graph_with_notify(graph::MetaDiGraph,
notifications::RemoteChannel,
graph_name::String,
graph_id::Int)
terminating_results = schedule_graph(graph)

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...)
Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...)
end

0 comments on commit cbc9678

Please sign in to comment.