Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datadeps handling for algorithm tasks #19

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Conversation

joott
Copy link
Collaborator

@joott joott commented Jul 2, 2024

BEGINRELEASENOTES

  • Algorithm functions now created to be aware of the graph state at schedule-time
  • Data objects no longer scheduled
  • Outputs are now passed as mutable DataObject structs via Dagger.spawn_datadeps()
  • Test graphml files now have size and runtime properties
  • DataObject size allocated according to graphml specification
  • Length of sleep in algorithm now dependent on graph properties

ENDRELEASENOTES

joott added 3 commits July 1, 2024 16:13
Data objects are no longer scheduled; algorithms take mutable inputs
rather than returning outputs in a Thunk
* added size_kb field to store size as a double
* adjusted scheduling.jl treatment of size to align with test graphs
@m-fila m-fila self-requested a review July 2, 2024 17:43
Changed size_kb to size_average_B
Copy link
Collaborator

@m-fila m-fila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome job @ottjk Thank you for a quick fix so the tests run fine
I put some minor suggestions and questions

The only thing that worries me is that it doesn't work anymore with multiple processes. I'll check if this can be fixed somehow now. If not we will have to focus on that later

The execution graph done by Dagger doesn't look like what we'd expect, but it seems it's on Dagger as they plot both In and Out data dependencies as inputs

src/scheduling.jl Outdated Show resolved Hide resolved
src/scheduling.jl Outdated Show resolved Hide resolved
Comment on lines +11 to +18
function populate_data_object!(object::DataObject, data)
proc = Dagger.thunk_processor()
scope = Dagger.scope(worker=myid())

chunk = Dagger.tochunk(data, proc, scope)

object.data = chunk
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about getting the size of data to create from the data object instead of having it as another argument?

Suggested change
function populate_data_object!(object::DataObject, data)
proc = Dagger.thunk_processor()
scope = Dagger.scope(worker=myid())
chunk = Dagger.tochunk(data, proc, scope)
object.data = chunk
end
function populate_data_object!(object::DataObject)
data = ' '^round(Int, object.size)
proc = Dagger.thunk_processor()
scope = Dagger.scope(worker=myid())
chunk = Dagger.tochunk(data, proc, scope)
object.data = chunk
end

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part with making a chunk seems to be problematic with multiple processes when addprocs is used (running with julia -p strangely seems to be fine)

Is it necessary to have data as a chunk if what the algorithm receives is a whole DataObject? If I understood correctly the motivation was to place the chunk on the local worker, but as is we have to transfer a whole DataObject as it may live somewhere else.

I tried removing explicit chunk creation and relay on Dagger to handle it, and ended up with something like:

mutable struct DataObject
    data
    size::Float64
end

function populate_data_object!(object::DataObject)
    object.data = " "^round(Int, object.size)
end

which seems to work without problems with addprocs and -p. I think we could aim now for minimal version and optimize transfers later

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, the Chunk acts as a reference to some data, so moving around a DataObject with a Chunk in it is just moving around a struct that contains a reference rather than moving around the data itself. The actual data should only get transferred when you collect the Chunk in an algorithm.

I agree we can do without it for now.

println("Gaudi algorithm for vertex $vertex_id !")

for output in outputs
bytes = round(Int, output.size * 1e3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 1e3 is not right since what we put in the size is measured in bytes.
In another comment I propose to move that part inside populate_data_object!

joott and others added 2 commits July 4, 2024 13:29
Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com>
Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com>
Comment on lines +143 to +145
transform = get_transform(G, vertex_id)
N_inputs = length(incoming_data)
res = Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the whole chain to execute algorithm is a bit too complicated.
In principle all the parameters from DAG could be extracted and "algorithm" created before anything is submitted to Dagger. I think at this point it may by clearer to replace some closures with callable structs
I'd replace get_transform, AVAILABLE_TRANSFORMS and _algorithm with something like this

struct MockupAlgorithm
    name::String
    runtime::Float64
    input_length::UInt
    MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) = begin
        runtime = get_prop(graph, vertex_id, :runtime_average_s)
        name = get_prop(graph, vertex_id, :node_id)
        inputs = length(inneighbors(graph, vertex_id))
        new(name, runtime, inputs)
    end
end

function (alg::MockupAlgorithm)(args...)

    function execute!(alg::MockupAlgorithm, inputs, outputs)
        println("Executing $(alg.name)")
        populate_data_object!.(outputs)
        sleep(alg.runtime)
    end

    inputs = args[1:alg.input_length]
    outputs = args[alg.input_length+1:end]
    execute!(alg, inputs, outputs)
end

function schedule_algorithm!(graph::MetaDiGraph, vertex_id::Int)
    incoming_data = get_in_promises(graph, vertex_id)
    outgoing_data = get_out_promises(graph, vertex_id)
    algorithm = MockupAlgorithm(graph, vertex_id)
    Dagger.@spawn algorithm(In.(incoming_data)..., Out.(outgoing_data)...)
end

and then

function schedule_graph(G::MetaDiGraph)
    data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject")
    sorted_vertices = MetaGraphs.topological_sort(G)

    for data_id in data_vertices
        size = get_prop(G, data_id, :size_average_B)
        set_prop!(G, data_id, :res_data, DataObject(nothing, size))
    end

    Dagger.spawn_datadeps() do
        for vertex_id in setdiff(sorted_vertices, data_vertices)
            res = schedule_algorithm!(G, vertex_id)
            set_prop!(G, vertex_id, :res_data, res)
        end
    end
end

Having the algorithms as immutable callable structs might be useful later when we revise the pipeline scheduling

@m-fila
Copy link
Collaborator

m-fila commented Jul 4, 2024

Unfortunately it seems we have another very big issue here. The Dagger.spawn_datadeps() is waiting for all the tasks spawned with it. This is not a problem for scheduling a single graph as we would wait for its completion anyway

But when having multiple independent graphs we'd like to spawn tasks for all of them (then probably spawn some more graphs) and then wait for all of them to finish.

With current mechanism of spawning multiple graphs we call spawn_graph in a loop. And each spawn_graph does spawn_datadeps and then blocks until all the tasks are done. So we end up with a single graph running at the time

I'm not sure how to approach this. Whether it'd be possible to adjust the way we schedule multiple tasks or find a non-blocking way to schedule a single graph with data deps.

PS. I couldn't find anything about this blocking behavior of spawn_datadeps in docs, but there is a docstring in the source code and (wait_all in the function):

https://github.com/JuliaParallel/Dagger.jl/blob/235b9683e6b02f4de344f43a46738067773c7f62/src/datadeps.jl#L918-L920

At the end of executing f, spawn_datadeps will wait for all launched tasks
to complete, rethrowing the first error, if any. The result of f will be
returned from spawn_datadeps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants