Skip to content

Commit

Permalink
Merge pull request #45 from JuliaParallel/tan/yarn
Browse files Browse the repository at this point in the history
don't rely on containerid format. string cookies
  • Loading branch information
tanmaykm authored Dec 18, 2019
2 parents 7125367 + 916cd86 commit 45491a8
Showing 1 changed file with 67 additions and 67 deletions.
134 changes: 67 additions & 67 deletions src/cluster_manager.jl
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
"""
The Yarn ClusterManager for Julia.
ugi: UserGroupInformation representing the user to represent (default is current logged in user)
yarnhost: yarn host name
rmport: yarn resource manager port
schedport: yarn scheduler port
launch_timeout: number of seconds to wait until all workers are launched (default is 60, provide larger value for more workers)
keep_connected: if false, YarnManager will disconnect from the cluster once all workers are removed (default true, use false if you do not plan to add more workers after removing all existing workers)
"""
struct YarnManager <: ClusterManager
ugi::UserGroupInformation
clnt::YarnClient
am::YarnAppMaster
app::YarnApp
stdout_ios::Vector{IO}
launch_timeout::Integer
keep_connected::Bool

function YarnManager(; kwargs...)
params = Dict()
for (n,v) in kwargs
params[n] = v
end
params = Dict(kwargs)
paramkeys = keys(params)
@debug("YarnManager constructor", params)

user = (:user in paramkeys) ? params[:user] : ""
ugi = (:ugi in paramkeys) ? params[:ugi] : UserGroupInformation()
rmport = (:rmport in paramkeys) ? params[:rmport] : 8032
yarnhost = (:yarnhost in paramkeys) ? params[:yarnhost] : "localhost"
schedport = (:schedport in paramkeys) ? params[:schedport] : 8030
launch_timeout = (:launch_timeout in paramkeys) ? params[:launch_timeout] : 60
keep_connected = (:keep_connected in paramkeys) ? params[:keep_connected] : true

ugi = UserGroupInformation(user)
clnt = YarnClient(yarnhost, rmport, ugi)
am = YarnAppMaster(yarnhost, schedport, ugi)
app = submit(clnt, am)

new(ugi, clnt, am, app, IO[], launch_timeout, keep_connected)
new(ugi, clnt, am, app, launch_timeout, keep_connected)
end
end

Expand All @@ -40,40 +45,22 @@ function show(io::IO, yarncm::YarnManager)
show(io, yarncm.clnt)
end

function setup_worker(host, port, cookie)
@debug("YarnManager setup_worker", host, port, cookie, container=ENV[CONTAINER_ID])
function setup_worker(host, port)
@debug("YarnManager setup_worker", host, port)

c = connect(IPv4(host), port)
if :wait_connected in names(Base; all=true) # < Julia 1.3
Base.wait_connected(c)
else
Sockets.wait_connected(c) # >= Julia 1.3
end
# identify container id so that rmprocs can clean things up nicely if required
serialize(c, ENV["JULIA_YARN_CID"])

redirect_stdout(c)
redirect_stderr(c)
# identify container id so that rmprocs can clean things up nicely if required
serialize(c, ENV[CONTAINER_ID])
if cookie == nothing
start_worker(c)
else
if isa(cookie, Symbol)
cookie = string(cookie)[8:end] # strip the leading "cookie_"
end
start_worker(c, cookie)
end
end

# used to reconstruct container id object from the environment string
function _container(cid::String)
parts = split(cid, '_')
(parts[1] == "container") || throw(YarnException("Invalid container id $cid"))
cluster_timestamp = parse(Int64, parts[2])
app_id = parse(Int32, parts[3])
attempt_id = parse(Int32, parts[4])
container_id = parse(Int64, parts[5])

appid_proto = protobuild(ApplicationIdProto, Dict(:id => app_id, :cluster_timestamp => cluster_timestamp))
app_attempt_id_proto = protobuild(ApplicationAttemptIdProto, Dict(:application_id => appid_proto, :attemptId => attempt_id))
protobuild(ContainerIdProto, Dict(:app_id => appid_proto, :app_attempt_id => app_attempt_id_proto, :id => container_id))
start_worker(c, ENV["JULIA_YARN_KUKI"])
end

_envdict(envdict::Dict) = envdict
Expand All @@ -94,6 +81,26 @@ function _currprocname()
"julia"
end

function container_start(manager::YarnManager, cmd::String, env::Dict{String,String}, ipaddr::IPv4, port::UInt16, cid::ContainerIdProto)
try
iob = IOBuffer()
serialize(iob, cid)
env["JULIA_YARN_CID"] = base64encode(take!(iob))
env["JULIA_YARN_KUKI"] = cluster_cookie()

initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port))"

clc = launchcontext(cmd="$cmd -e '$initargs'", env=env)
# TODO: do we need to add tokens into clc?

@debug("YarnManager container_start", initargs, clc)
container_start(manager.am, cid, clc)
catch ex
@error("error starting yarn container", exception=ex)
rethrow(ex)
end
end

function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition)
@debug("YarnManager launch", params)

Expand All @@ -106,53 +113,46 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con
loc = (:loc in paramkeys) ? params[:loc] : YARN_CONTAINER_LOCATION_DEFAULT
priority = (:priority in paramkeys) ? params[:priority] : YARN_CONTAINER_PRIORITY_DEFAULT

stdout_ios = manager.stdout_ios
acceptors = Task[]
ipaddr = getipaddr()
(port, server) = listenany(ipaddr, 11000)
rcv_stdouts = @async begin
while length(stdout_ios) < np
accept_task = @async begin
while length(acceptors) < np
sock = accept(server)
push!(stdout_ios, sock)
push!(acceptors, @async begin
config = WorkerConfig()
config.io = sock
# keep the container id in userdata to be used with rmprocs if required
cidstr = deserialize(sock)
@debug("got container id string", cidstr)
cid = deserialize(IOBuffer(base64decode(cidstr)))
@debug("got container id", cid)
config.userdata = Dict(:container_id => cid)
push!(instances_arr, config)
end)
end
end

cookie = ":cookie_" * cluster_cookie()
initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port), $(cookie))"
clc = launchcontext(cmd="$cmd -e '$initargs'", env=appenv)
# TODO: do we need to add tokens into clc?

@debug("YarnManager launch", initargs, context=clc)
on_alloc = (cid) -> container_start(manager.am, cid, clc)
on_alloc = (cid) -> container_start(manager, cmd, appenv, ipaddr, port, cid)
callback(manager.am, on_alloc, nothing)

container_allocate(manager.am, convert(Int, np); mem=mem, cpu=cpu, loc=loc, priority=priority)

tend = time() + manager.launch_timeout
while (time() < tend) && !istaskdone(rcv_stdouts)
sleep(1)
end

(length(stdout_ios) == np) || throw(YarnException("Error starting all workers. $(length(stdout_ios)) of $(np) started."))

configs = WorkerConfig[]
@sync begin
for io in manager.stdout_ios
@async let io=io
config = WorkerConfig()
config.io = io

# keep the container id in userdata to be used with rmprocs if required
cidstr = deserialize(io)
userdata = Dict(:container_id => _container(cidstr))
config.userdata = userdata

push!(configs, config)
end
timedwait(()->istaskdone(accept_task), Float64(manager.launch_timeout))
nconnected = 0
nsuccess = 0
while !isempty(acceptors)
acceptor = pop!(acceptors)
nconnected += 1
try
wait(acceptor)
nsuccess += 1
catch ex
@error("Could not decipher worker connection", exception=ex)
end
empty!(manager.stdout_ios)
end

append!(instances_arr, configs)
(nconnected == np) || @warn("Not all workers connected back", nconnected, nsuccess, nrequested=np)
notify(c)
nothing
end
Expand Down

2 comments on commit 45491a8

@tanmaykm
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error while trying to register: "Tag with name 0.3.0 already exists and points to a different commit"

Please sign in to comment.