diff --git a/src/cluster_manager.jl b/src/cluster_manager.jl index a0c4960..f9d3a0a 100644 --- a/src/cluster_manager.jl +++ b/src/cluster_manager.jl @@ -1,33 +1,38 @@ +""" +The Yarn ClusterManager for Julia. + +ugi: UserGroupInformation representing the user to represent (default is current logged in user) +yarnhost: yarn host name +rmport: yarn resource manager port +schedport: yarn scheduler port +launch_timeout: number of seconds to wait until all workers are launched (default is 60, provide larger value for more workers) +keep_connected: if false, YarnManager will disconnect from the cluster once all workers are removed (default true, use false if you do not plan to add more workers after removing all existing workers) +""" struct YarnManager <: ClusterManager ugi::UserGroupInformation clnt::YarnClient am::YarnAppMaster app::YarnApp - stdout_ios::Vector{IO} launch_timeout::Integer keep_connected::Bool function YarnManager(; kwargs...) - params = Dict() - for (n,v) in kwargs - params[n] = v - end + params = Dict(kwargs) paramkeys = keys(params) @debug("YarnManager constructor", params) - user = (:user in paramkeys) ? params[:user] : "" + ugi = (:ugi in paramkeys) ? params[:ugi] : UserGroupInformation() rmport = (:rmport in paramkeys) ? params[:rmport] : 8032 yarnhost = (:yarnhost in paramkeys) ? params[:yarnhost] : "localhost" schedport = (:schedport in paramkeys) ? params[:schedport] : 8030 launch_timeout = (:launch_timeout in paramkeys) ? params[:launch_timeout] : 60 keep_connected = (:keep_connected in paramkeys) ? params[:keep_connected] : true - ugi = UserGroupInformation(user) clnt = YarnClient(yarnhost, rmport, ugi) am = YarnAppMaster(yarnhost, schedport, ugi) app = submit(clnt, am) - new(ugi, clnt, am, app, IO[], launch_timeout, keep_connected) + new(ugi, clnt, am, app, launch_timeout, keep_connected) end end @@ -40,40 +45,22 @@ function show(io::IO, yarncm::YarnManager) show(io, yarncm.clnt) end -function setup_worker(host, port, cookie) - @debug("YarnManager setup_worker", host, port, cookie, container=ENV[CONTAINER_ID]) +function setup_worker(host, port) + @debug("YarnManager setup_worker", host, port) + c = connect(IPv4(host), port) if :wait_connected in names(Base; all=true) # < Julia 1.3 Base.wait_connected(c) else Sockets.wait_connected(c) # >= Julia 1.3 end + # identify container id so that rmprocs can clean things up nicely if required + serialize(c, ENV["JULIA_YARN_CID"]) + redirect_stdout(c) redirect_stderr(c) - # identify container id so that rmprocs can clean things up nicely if required - serialize(c, ENV[CONTAINER_ID]) - if cookie == nothing - start_worker(c) - else - if isa(cookie, Symbol) - cookie = string(cookie)[8:end] # strip the leading "cookie_" - end - start_worker(c, cookie) - end -end -# used to reconstruct container id object from the environment string -function _container(cid::String) - parts = split(cid, '_') - (parts[1] == "container") || throw(YarnException("Invalid container id $cid")) - cluster_timestamp = parse(Int64, parts[2]) - app_id = parse(Int32, parts[3]) - attempt_id = parse(Int32, parts[4]) - container_id = parse(Int64, parts[5]) - - appid_proto = protobuild(ApplicationIdProto, Dict(:id => app_id, :cluster_timestamp => cluster_timestamp)) - app_attempt_id_proto = protobuild(ApplicationAttemptIdProto, Dict(:application_id => appid_proto, :attemptId => attempt_id)) - protobuild(ContainerIdProto, Dict(:app_id => appid_proto, :app_attempt_id => app_attempt_id_proto, :id => container_id)) + start_worker(c, ENV["JULIA_YARN_KUKI"]) end _envdict(envdict::Dict) = envdict @@ -94,6 +81,26 @@ function _currprocname() "julia" end +function container_start(manager::YarnManager, cmd::String, env::Dict{String,String}, ipaddr::IPv4, port::UInt16, cid::ContainerIdProto) + try + iob = IOBuffer() + serialize(iob, cid) + env["JULIA_YARN_CID"] = base64encode(take!(iob)) + env["JULIA_YARN_KUKI"] = cluster_cookie() + + initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port))" + + clc = launchcontext(cmd="$cmd -e '$initargs'", env=env) + # TODO: do we need to add tokens into clc? + + @debug("YarnManager container_start", initargs, clc) + container_start(manager.am, cid, clc) + catch ex + @error("error starting yarn container", exception=ex) + rethrow(ex) + end +end + function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition) @debug("YarnManager launch", params) @@ -106,53 +113,46 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con loc = (:loc in paramkeys) ? params[:loc] : YARN_CONTAINER_LOCATION_DEFAULT priority = (:priority in paramkeys) ? params[:priority] : YARN_CONTAINER_PRIORITY_DEFAULT - stdout_ios = manager.stdout_ios + acceptors = Task[] ipaddr = getipaddr() (port, server) = listenany(ipaddr, 11000) - rcv_stdouts = @async begin - while length(stdout_ios) < np + accept_task = @async begin + while length(acceptors) < np sock = accept(server) - push!(stdout_ios, sock) + push!(acceptors, @async begin + config = WorkerConfig() + config.io = sock + # keep the container id in userdata to be used with rmprocs if required + cidstr = deserialize(sock) + @debug("got container id string", cidstr) + cid = deserialize(IOBuffer(base64decode(cidstr))) + @debug("got container id", cid) + config.userdata = Dict(:container_id => cid) + push!(instances_arr, config) + end) end end - cookie = ":cookie_" * cluster_cookie() - initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port), $(cookie))" - clc = launchcontext(cmd="$cmd -e '$initargs'", env=appenv) - # TODO: do we need to add tokens into clc? - - @debug("YarnManager launch", initargs, context=clc) - on_alloc = (cid) -> container_start(manager.am, cid, clc) + on_alloc = (cid) -> container_start(manager, cmd, appenv, ipaddr, port, cid) callback(manager.am, on_alloc, nothing) container_allocate(manager.am, convert(Int, np); mem=mem, cpu=cpu, loc=loc, priority=priority) - tend = time() + manager.launch_timeout - while (time() < tend) && !istaskdone(rcv_stdouts) - sleep(1) - end - - (length(stdout_ios) == np) || throw(YarnException("Error starting all workers. $(length(stdout_ios)) of $(np) started.")) - - configs = WorkerConfig[] - @sync begin - for io in manager.stdout_ios - @async let io=io - config = WorkerConfig() - config.io = io - - # keep the container id in userdata to be used with rmprocs if required - cidstr = deserialize(io) - userdata = Dict(:container_id => _container(cidstr)) - config.userdata = userdata - - push!(configs, config) - end + timedwait(()->istaskdone(accept_task), Float64(manager.launch_timeout)) + nconnected = 0 + nsuccess = 0 + while !isempty(acceptors) + acceptor = pop!(acceptors) + nconnected += 1 + try + wait(acceptor) + nsuccess += 1 + catch ex + @error("Could not decipher worker connection", exception=ex) end - empty!(manager.stdout_ios) end - append!(instances_arr, configs) + (nconnected == np) || @warn("Not all workers connected back", nconnected, nsuccess, nrequested=np) notify(c) nothing end