From 1c6ddc4d778910d6dcb09b322f71e2cb32b10349 Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 7 Jan 2020 15:22:40 +0530 Subject: [PATCH 1/5] make mem and cpu spec optional for yarn submit --- src/api_yarn_appmaster.jl | 2 +- src/api_yarn_client.jl | 6 +++--- test/yarntests.jl | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index 5a3ed8c..e1e30c5 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -111,7 +111,7 @@ callback(yam::YarnAppMaster, on_container_alloc::Union{Nothing,Function}, on_con function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster) @debug("submitting unmanaged application") clc = launchcontext() - app = submit(client, clc, YARN_CONTAINER_MEM_DEFAULT, YARN_CONTAINER_CPU_DEFAULT; unmanaged=true) + app = submit(client, clc; unmanaged=true) # keep the am_rm token tok = am_rm_token(app) diff --git a/src/api_yarn_client.jl b/src/api_yarn_client.jl index 640a1a3..0767eb1 100644 --- a/src/api_yarn_client.jl +++ b/src/api_yarn_client.jl @@ -173,12 +173,12 @@ function _new_app(client::YarnClient) resp.application_id, resp.maximumCapability.memory, resp.maximumCapability.virtual_cores end -function submit(client::YarnClient, cmd::Union{AbstractString,Vector}, mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT, env::Dict{String,String}=Dict{String,String}(); kwargs...) +function submit(client::YarnClient, cmd::Union{AbstractString,Vector}, env::Dict{String,String}=Dict{String,String}(); kwargs...) container_spec = launchcontext(cmd=cmd, env=env) - submit(client, container_spec, mem, cores; kwargs...) + submit(client, container_spec; kwargs...) end -function submit(client::YarnClient, container_spec::ContainerLaunchContextProto, mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT; +function submit(client::YarnClient, container_spec::ContainerLaunchContextProto; mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT, priority::Int32=one(Int32), appname::AbstractString="EllyApp", queue::AbstractString="default", apptype::AbstractString="YARN", reuse::Bool=false, unmanaged::Bool=false, schedaddr::String="") @debug("submitting application", unmanaged=unmanaged, cmd=container_spec.command) diff --git a/test/yarntests.jl b/test/yarntests.jl index 95438e7..40020be 100644 --- a/test/yarntests.jl +++ b/test/yarntests.jl @@ -93,7 +93,7 @@ function test_managed_yarn_clustermanager(host="localhost", rmport=8032, schedpo @info("starting managed julia with environment", env) testscript = joinpath(@__DIR__, "yarnmanagedcm.jl") - app = submit(clnt, [Elly._currprocname(), testscript], Elly.YARN_CONTAINER_MEM_DEFAULT, Elly.YARN_CONTAINER_CPU_DEFAULT, env; schedaddr="$(host):$(schedport)") + app = submit(clnt, [Elly._currprocname(), testscript], env; schedaddr="$(host):$(schedport)") Elly.wait_for_state(app, Elly.YarnApplicationStateProto.FINISHED) @info("app complete", status=status(app)) @test isfile("/tmp/ellytest.log") From 4c75f7d5a5910d81b7f9ffff9ad1fe16bcc7a70b Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 7 Jan 2020 16:02:44 +0530 Subject: [PATCH 2/5] update docs [skip ci] --- HDFS.md | 42 ++++++++++++++++++++++++-- YARN.md | 89 ++++++++++++++++++++++++++++++++++++++++++++----------- YARNCM.md | 33 +++++++++++++++++---- 3 files changed, 138 insertions(+), 26 deletions(-) diff --git a/HDFS.md b/HDFS.md index 47b16dd..153e875 100644 --- a/HDFS.md +++ b/HDFS.md @@ -25,6 +25,9 @@ The DFS can be navigated using the same Julia APIs as used for a traditional fil julia> pwd(dfs) "/" +julia> du(dfs) +0x0000000000000017 + julia> readdir(dfs) 5-element Array{AbstractString,1}: "testdir" @@ -65,6 +68,28 @@ HDFSFileInfo: bar julia> isfile(bar_file) true + +julia> isdir(bar_file) +false + +julia> islink(bar_file) +false + +julia> filemode(bar_file) +0x000001a4 + +julia> mtime(bar_file) +0x0000016f7f71aa30 + +julia> atime(bar_file) +0x0000016f7f71a980 + +julia> dirname(bar_file) +HDFSFile: hdfs://userid@localhost:9000/tmp/foo/ + +julia> joinpath(dirname(bar_file), "baz_file") +HDFSFile: hdfs://userid@localhost:9000/tmp/foo/baz_file + ... ```` @@ -90,13 +115,24 @@ HDFSFileInfo: baz.txt julia> open(bar_file, "w") do f write(f, b"hello world") end -0x000000000000000b +11 julia> open(bar_file, "r") do f - bytes = Array(UInt8, filesize(f)) + bytes = Vector{UInt8}(undef, filesize(f)) read!(f, bytes) - println(bytestring(bytes)) + println(String(bytes)) end hello world ```` +Elly also supports block level access to files, to enable distributed processing. + +``` +julia> hdfs_blocks(huge_file) +1-element Array{Tuple{UInt64,Array},1}: + (0x0000000000000000, AbstractString["node1"]) + (0x0000000007d00000, AbstractString["node2"]) + (0x000000000fa00000, AbstractString["node3"]) + +``` + diff --git a/YARN.md b/YARN.md index 287ae08..d3fea77 100644 --- a/YARN.md +++ b/YARN.md @@ -32,37 +32,48 @@ YarnNodes: 1 (connected to 0) YarnNode: /default-rack/tanlt:36080 running, Used mem: 0/8192, cores: 0/8 ```` -### YarnAppMaster +### Yarn Applications (The YarnAppMaster) -Elly supports only unmanaged application masters. A `YarnAppMaster` can be constructed by providing the address of the Yarn Scheduler and a -`UserGroupInformation` object. It is then registered with Yarn using the `submit` API to get an application as a `YarnApp` instance. +An ApplicationMaster, in Yarn terminology, is the part of an application that negotiates resources from the Yarn ResourceManager and works +with the Yarn NodeManager to execute and monitor the granted resources (bundled as containers) for a given application. Application masters +can either be: + +- Managed: managed by Yarn and run inside the cluster, resources are allocated inside the Yarn cluster and Yarn instantiates the process +- Unmanaged: not managed by Yarn and run outside the cluster, it is the application writers responsibility to ensure that it has the resources it needs and is kept running throughout the course of the application + +Elly supports both managed and unmanaged application masters. + +#### Unmanaged YarnAppMaster + +An unmanaged `YarnAppMaster` can be constructed by providing the address of the Yarn Scheduler and a +`UserGroupInformation` object. It needs to be then registered with Yarn using the `submit` API to get an application as a `YarnApp` instance. ```` julia> yarnam = YarnAppMaster("localhost", 8030, ugi) YarnAppMaster: tan@localhost:8030/ id: 6c215ce3-0070-4b connected: false - Memory: available:0, max:0, can schecule:false - Cores: available:0, max:0, can schedule:false - Queue: -YarnNodes: 0 (connected to 0) -Containers: 0/0 active, 0 in use ```` -Applications may register callbacks for container allocation and finish events to be able to start a task on the allocated container or -read the results. +Once registered, the application master can then allocate one or more containers in the cluster. +But before they can do that, applications should register callbacks for container allocation and +finish events, so that they can start a task on the allocated container or read the results after +termination. ```` -julia> cids = Dict() +julia> cids = Set() Dict{Any,Any} with 0 entries julia> function on_alloc(cid) # start container process println("allocated $cid") - env = Dict("JULIA_PKGDIR" => "/home/tan/.julia"); + env = Dict( + "JULIA_LOAD_PATH" => join([Base.LOAD_PATH..., "/home/tan/.julia/dev", "/home/tan/.julia/packages"], ':'), + "JULIA_DEPOT_PATH" => join(Base.DEPOT_PATH, ':') + ); clc = launchcontext(cmd="/bin/julia /tmp/simplecontainer.jl 1>/tmp/stdout 2>/tmp/stderr", env=env); container_start(yarnam, cid, clc) - cids[cid] = "some identifier" + push!(cids, cid) nothing end on_alloc (generic function with 1 method) @@ -71,7 +82,7 @@ julia> function on_finish(cid) # release the container (or can start a new process here also) println("finished $cid") container_release(yarnam, cid) - delete!(cids, cid) + pop!(cids, cid) nothing end on_finish (generic function with 1 method) @@ -83,18 +94,16 @@ YarnApp YARN (EllyApp/2): accepted-0.0 location: tan@N/A:0/default ```` -Containers can then be allocated/de-allocated and started/stopped as required. +With event handlers registered, containers can then be allocated/de-allocated and started/stopped as required. ```` julia> container_allocate(yarnam, 1); allocated Elly.hadoop.yarn.ContainerIdProto(#undef,Elly.hadoop.yarn.ApplicationAttemptIdProto(Elly.hadoop.yarn.ApplicationIdProto(2,1461548151454),1),1) -julia> cid = collect(keys(cids))[1] +julia> cid = collect(cids)[1] Elly.hadoop.yarn.ContainerIdProto(#undef,Elly.hadoop.yarn.ApplicationAttemptIdProto(Elly.hadoop.yarn.ApplicationIdProto(2,1461548151454),1),1) julia> container_stop(yarnam, cid); - -julia> container_release(yarnam, cid); finished Elly.hadoop.yarn.ContainerIdProto(#undef,Elly.hadoop.yarn.ApplicationAttemptIdProto(Elly.hadoop.yarn.ApplicationIdProto(2,1461548151454),1),1) ```` @@ -104,3 +113,47 @@ Finally the application master can be terminated with a call to `unregister`: julia> unregister(yarnam, true) true ```` + +#### Managed YarnAppMaster + +A managed `YarnAppMaster` can be deployed simply by submitting a command to the YarnClient with the `unmanaged` flag set to `false`. + +```julia +ugi = UserGroupInformation() +clnt = YarnClient(host, rmport, ugi) +yarn_host = "yarnhost" +yarn_scheduler_port = 8030 + +env = Dict( + "JULIA_LOAD_PATH"=>join([Base.LOAD_PATH..., "/usr/local/julia/packages"], ':'), + "JULIA_DEPOT_PATH"=>join([Base.DEPOT_PATH..., "/usr/local/julia"], ':') +) + +testscript = "/application/masterprocess.jl" +app = submit(clnt, ["/usr/local/julia/bin/julia", testscript], env; schedaddr="$(yarn_host):$(yarn_scheduler_port)", unmanaged=false) +``` + +Once submitted, the submitting process can exit, leaving the application master running inside the cluster. It can also monitor the application if so desired. + +```julia +@info("status", status(app)) +``` + +And wait for application to reach a certain state. + +```julia +Elly.wait_for_state(app, Elly.YarnApplicationStateProto.FINISHED) +``` + +The Yarn master process thus submitted can create an instance of `YarnAppMaster` and use it to manage itself and also allocate and launch more containers into the cluster. + + +For example, the `/application/masterprocess.jl` script launched above, can instantiate a `YarnAppMaster` and register itself. + +``` +ugi = UserGroupInformation() +am = YarnAppMaster(ugi) +register(am) +``` + +And then it can proceed to allocate and execute more containers exactly as how we did with the unmanaged `YarnAppMaster`. diff --git a/YARNCM.md b/YARNCM.md index ef14cf6..b43ce08 100644 --- a/YARNCM.md +++ b/YARNCM.md @@ -2,7 +2,9 @@ `YarnManager` provides a Julia [ClusterManager](http://docs.julialang.org/en/latest/manual/parallel-computing/#clustermanagers) interface for working with Yarn. It does not have the full functionality of the direct Yarn APIs, but it provides the familiar `addprocs`, `rmprocs` methods -for starting and stopping containers. +for starting and stopping containers. + +`YarnManager` works in both managed mode (both master and workers launched inside the cluster) and unmanaged mode (only workers launched inside the cluster). See section on ["Yarn Applications using Elly"](YARN.md) for details. It can be also used to get a distributed Julia shell in the Yarn cluster. @@ -10,14 +12,25 @@ The below example walks through a simple example using a Julia on a Yarn cluster You can find the YARN manager parameters in the `$HADOOP_CONFIG_DIR/yarn-site.xml` file [Hadoop Docs](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html). -Bring up the Julia processes on the Yarn cluster: +Bring up the Julia processes on the Yarn cluster. Note that Julia should be installed on all nodes of the cluster at the same path for this to work. ```` julia> using Elly -julia> yarncm = YarnManager(yarnhost="localhost", rmport=8032, schedport=8030, launch_timeout=60); +julia> yarncm = YarnManager( + yarnhost="localhost", + rmport=8032, + schedport=8030, + launch_timeout=60, + unmanaged=false # pass true when running in unmanaged mode + ); + +julia>env = Dict( + "JULIA_LOAD_PATH"=>join([Base.LOAD_PATH..., "/usr/local/julia/packages"], ':'), + "JULIA_DEPOT_PATH"=>join([Base.DEPOT_PATH..., "/usr/local/julia"], ':') +); -julia> addprocs(yarncm; np=8, env=Dict("JULIA_PKGDIR"=>Pkg.dir())); +julia> addprocs(yarncm; np=8, env=env); julia> @everywhere println(myid()) 1 @@ -32,7 +45,8 @@ julia> @everywhere println(myid()) ```` Next, we try some trivial computation on all nodes. We use a file `dart.jl` that contains some code to -arrive at an approximate value of pi using a Monte Carlo method: +arrive at an approximate value of pi using a Monte Carlo method. Note that `dart.jl` should be made +available throughout the cluster on all nodes at the same path. ```` # dart.jl @@ -88,3 +102,12 @@ julia> @everywhere println(myid()) julia> Elly.disconnect(yarncm); ```` + +`YarnManager` can also be used in the familiar Julia `do` block by passing a function to execute in the context of the manager, e.g.: + +``` +YarnManager(launch_timeout=60, unmanaged=false) do yarncm + # use yarncm here... + ... +end +``` From de0410543dd5041eb4b0392b00360eae52c3dc7c Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 8 Jan 2020 08:26:33 +0530 Subject: [PATCH 3/5] use ReentrantLock instead of Channel for locking Use the simpler `ReentrantLock` instead of `Channel` for locking in `YarnAppMaster`. --- src/Elly.jl | 3 --- src/api_yarn_appmaster.jl | 23 +++++------------------ 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/src/Elly.jl b/src/Elly.jl index 39f54e1..4e2164b 100644 --- a/src/Elly.jl +++ b/src/Elly.jl @@ -42,9 +42,6 @@ export YarnAppMaster, register, unregister, kill, can_schedule_mem, can_schedule export YarnManager, launch, manage -const Lock = Channel -makelock() = Channel{Int}(1) - include("hadoop/hadoop.jl") using Elly.hadoop using Elly.hadoop.common diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index e1e30c5..6855ac7 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -44,19 +44,17 @@ mutable struct YarnAppMaster response_id::Int32 # initial value must be 0, update with response_id sent from server on every response registration::Union{Nothing,RegisterApplicationMasterResponseProto} am_rm_task::Union{Nothing,Task} - lck::Lock + lck::ReentrantLock function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation=UserGroupInformation(), amhost::AbstractString="", amport::Integer=0, amurl::AbstractString="") amrm_conn = YarnAMRMProtocol(rmhost, rmport, ugi) - lck = makelock() - put!(lck, 1) new(amrm_conn, amhost, amport, amurl, Int32(0), Int32(0), Int32(0), Int32(0), YarnNodes(ugi), YarnContainers(), "", 0, - nothing, nothing, lck) + nothing, nothing, ReentrantLock()) end function YarnAppMaster(ugi::UserGroupInformation=UserGroupInformation()) @@ -83,17 +81,6 @@ function YarnAppMaster(fn::Function, ugi::UserGroupInformation=UserGroupInformat end end -function withlock(fn, yam) - try - take!(yam.lck) - return fn() - catch ex - rethrow(ex) - finally - put!(yam.lck, 1) - end -end - function show(io::IO, yam::YarnAppMaster) show(io, yam.amrm_conn) if yam.registration !== nothing @@ -137,7 +124,7 @@ function register(yam::YarnAppMaster) end !isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url) - resp = withlock(yam) do + resp = lock(yam.lck) do registerApplicationMaster(yam.amrm_conn, inp) end yam.registration = resp @@ -170,7 +157,7 @@ function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::Abstra !isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url) !isempty(diagnostics) && setproperty!(inp, :diagnostics, diagnostics) - resp = withlock(yam) do + resp = lock(yam.lck) do finishApplicationMaster(yam.amrm_conn, inp) end resp.isUnregistered && (yam.registration = nothing) @@ -249,7 +236,7 @@ function _update_rm(yam::YarnAppMaster) setproperty!(inp, :response_id, yam.response_id) #@debug(inp) - resp = withlock(yam) do + resp = lock(yam.lck) do allocate_resp = allocate(yam.amrm_conn, inp) yam.response_id = allocate_resp.response_id # next response id must match this allocate_resp From ed3296fe72bb12fb7c2edb3d6bf6a6a460ad9c05 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 8 Jan 2020 12:14:15 +0530 Subject: [PATCH 4/5] containerid string representation --- src/containerid.jl | 16 ++++++++++++++++ test/yarntests.jl | 16 ++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/containerid.jl b/src/containerid.jl index 543aef6..11dca7d 100644 --- a/src/containerid.jl +++ b/src/containerid.jl @@ -35,3 +35,19 @@ function parse_container_id(cidstr::String) attemptid_proto = ApplicationAttemptIdProto(; application_id=appid_proto, attemptId=attemptid) ContainerIdProto(; app_id=appid_proto, app_attempt_id=attemptid_proto, id=cid) end + +function container_id_string(cid::ContainerIdProto) + app_id = isdefined(cid, :app_id) ? cid.app_id : cid.app_attempt_id.application_id + attempt_id = isdefined(cid, :app_attempt_id) ? cid.app_attempt_id.attemptId : 0 + id = cid.id + epoch = id >> 40 + id = CONTAINER_ID_BITMASK & id + + parts = [CONTAINER_PREFIX] + (epoch > 0) && push!(parts, EPOCH_PREFIX * lpad(epoch, 2, "0")) + push!(parts, string(app_id.cluster_timestamp)) + push!(parts, lpad(app_id.id, 4, "0")) + push!(parts, lpad(attempt_id, 2, "0")) + push!(parts, lpad(id, 6, "0")) + join(parts, CONTAINER_ID_SPLITTER) +end diff --git a/test/yarntests.jl b/test/yarntests.jl index 40020be..4f531f8 100644 --- a/test/yarntests.jl +++ b/test/yarntests.jl @@ -15,17 +15,29 @@ end function test_container_id() @info("testing container ids") - cid = Elly.parse_container_id("container_1577681661884_0005_01_000001") + cidstr = "container_1577681661884_0005_01_000001" + cid = Elly.parse_container_id(cidstr) @test cid.id == 1 @test cid.app_id.cluster_timestamp == 1577681661884 @test cid.app_id.id == 5 @test cid.app_attempt_id.attemptId == 1 + @test Elly.container_id_string(cid) == cidstr - cid = Elly.parse_container_id("container_e17_1410901177871_0001_01_000005") + cidstr = "container_e17_1410901177871_0001_01_000005" + cid = Elly.parse_container_id(cidstr) @test cid.id == 18691697672197 @test cid.app_id.cluster_timestamp == 1410901177871 @test cid.app_id.id == 1 @test cid.app_attempt_id.attemptId == 1 + @test Elly.container_id_string(cid) == cidstr + + cidstr = "container_e03_1465095377475_0007_02_000001" + cid = Elly.parse_container_id(cidstr) + @test cid.id == 3298534883329 + @test cid.app_id.cluster_timestamp == 1465095377475 + @test cid.app_id.id == 7 + @test cid.app_attempt_id.attemptId == 2 + @test Elly.container_id_string(cid) == cidstr nothing end From 58aa882d2f2ed9956ab5e304c09464f7203777f7 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 8 Jan 2020 14:16:06 +0530 Subject: [PATCH 5/5] clustermanager: implement worker termination - handle interrupt, finalize and deregister of workers - release yarn containers and resources on worker termination --- src/api_yarn_appmaster.jl | 6 ++++-- src/api_yarn_base.jl | 31 +++++++++++++++++-------------- src/cluster_manager.jl | 17 ++++++++++++++--- test/yarntests.jl | 4 ++-- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index 6855ac7..5075387 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -171,7 +171,8 @@ kill(yam::YarnAppMaster, diagnostics::AbstractString="") = _unregister(yam, Fina container_allocate(yam::YarnAppMaster, numcontainers::Int; opts...) = request_alloc(yam.containers, numcontainers; opts...) container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_release(yam.containers, cids...) -container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec) +container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, container_id_string(cid), container_spec) +container_start(yam::YarnAppMaster, cidstr::String, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cidstr], container_spec) function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto) @debug("starting", container) req = StartContainerRequestProto(container_launch_context=container_spec, container_token=container.container_token) @@ -197,7 +198,8 @@ function container_start(yam::YarnAppMaster, container::ContainerProto, containe cid end -container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, yam.containers.containers[cid]) +container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, container_id_string(cid)) +container_stop(yam::YarnAppMaster, cidstr::String) = container_stop(yam, yam.containers.containers[cidstr]) function container_stop(yam::YarnAppMaster, container::ContainerProto) @debug("stopping", container) diff --git a/src/api_yarn_base.jl b/src/api_yarn_base.jl index 960ffc8..44569d5 100644 --- a/src/api_yarn_base.jl +++ b/src/api_yarn_base.jl @@ -273,10 +273,10 @@ It also holds the allocation and release pipelines that are used by application Also schedules callbacks as tasks when containers are allocated or terminated. """ mutable struct YarnContainers - containers::Dict{ContainerIdProto,ContainerProto} - status::Dict{ContainerIdProto,ContainerStatusProto} - active::Set{ContainerIdProto} - busy::Set{ContainerIdProto} + containers::Dict{String,ContainerProto} + status::Dict{String,ContainerStatusProto} + active::Set{String} + busy::Set{String} alloc_pipeline::RequestPipeline{ResourceRequestProto} release_pipeline::RequestPipeline{ContainerIdProto} @@ -286,7 +286,7 @@ mutable struct YarnContainers on_container_finish::Union{Nothing,Function} function YarnContainers() - new(Dict{ContainerIdProto,ContainerProto}(), Dict{ContainerIdProto,ContainerStatusProto}(), Set{ContainerIdProto}(), Set{ContainerIdProto}(), + new(Dict{String,ContainerProto}(), Dict{String,ContainerStatusProto}(), Set{String}(), Set{String}(), RequestPipeline{ResourceRequestProto}(), RequestPipeline{ContainerIdProto}(), 0, nothing, nothing) end end @@ -313,8 +313,9 @@ function update(containers::YarnContainers, arp::AllocateResponseProto) if isfilled(arp, :allocated_containers) for cont in arp.allocated_containers id = cont.id - contlist[id] = cont - push!(active, id) + idstr = container_id_string(id) + contlist[idstr] = cont + push!(active, idstr) @debug("calling callback for alloc") (cballoc === nothing) || @async cballoc(id) end @@ -323,11 +324,12 @@ function update(containers::YarnContainers, arp::AllocateResponseProto) @debug("have completed containers") for contst in arp.completed_container_statuses id = contst.container_id - status[id] = contst - @debug("container finished", id, active=(id in active)) - (id in active) && pop!(active, id) - (id in busy) && pop!(busy, id) - @debug("calling callback for finish") + idstr = container_id_string(id) + status[idstr] = contst + @debug("container finished", idstr, isactive=(idstr in active), isbusy=(idstr in busy)) + (idstr in active) && pop!(active, idstr) + (idstr in busy) && pop!(busy, idstr) + @debug("calling callback for finish", id) (cbfinish === nothing) || @async cbfinish(id) end end @@ -359,7 +361,7 @@ end function set_busy(containers::YarnContainers, cids::ContainerIdProto...) busy = containers.busy for cid in cids - push!(busy, cid) + push!(busy, container_id_string(cid)) end nothing end @@ -367,7 +369,8 @@ end function set_free(containers::YarnContainers, cids::ContainerIdProto...) busy = containers.busy for cid in cids - pop!(busy, cid) + cidstr = container_id_string(cid) + (cidstr in busy) && pop!(busy, cidstr) end nothing end diff --git a/src/cluster_manager.jl b/src/cluster_manager.jl index 370a0bb..dbc4b4b 100644 --- a/src/cluster_manager.jl +++ b/src/cluster_manager.jl @@ -122,6 +122,9 @@ function container_start(manager::YarnManager, cmd::String, env::Dict{String,Str end end +container_stop(manager::YarnManager, cid::ContainerIdProto) = container_stop(manager.am, cid) +container_release(manager::YarnManager, cid::ContainerIdProto) = container_release(manager.am, cid) + function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition) @debug("YarnManager launch", params) @@ -179,10 +182,18 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con end function manage(manager::YarnManager, id::Integer, config::WorkerConfig, op::Symbol) + @debug("YarnManager manage", id, op, container=config.userdata, nprocs=nprocs()) # This function needs to exist, but so far we don't do anything - if op == :deregister - @debug("YarnManager manage", id, op, nprocs=nprocs()) - !manager.keep_connected && (1 == nprocs()) && (@async disconnect(manager)) + if op in [:deregister, :finalize, :interrupt] + if (config.userdata !== nothing) && haskey(config.userdata, :container_id) + cid = config.userdata[:container_id] + cidstr = container_id_string(cid) + if cidstr in manager.am.containers.active + container_stop(manager, cid) + container_release(manager, cid) + end + end + !manager.keep_connected && (1 == nprocs()) && disconnect(manager) end nothing end diff --git a/test/yarntests.jl b/test/yarntests.jl index 4f531f8..f3f5581 100644 --- a/test/yarntests.jl +++ b/test/yarntests.jl @@ -60,8 +60,7 @@ function make_julia_env() end end if !("JULIA_LOAD_PATH" in keys(env)) - home = ENV["HOME"] - env["JULIA_LOAD_PATH"] = join(Base.LOAD_PATH, ':') * ":$(home)/.julia/dev:$(home)/.julia/packages" + env["JULIA_LOAD_PATH"] = join([Base.LOAD_PATH..., joinpath(Base.homedir(), ".julia", "dev"), joinpath(Base.homedir(), ".julia", "packages")], ':') end if !("JULIA_DEPOT_PATH" in keys(env)) env["JULIA_DEPOT_PATH"] = join(Base.DEPOT_PATH, ':') @@ -84,6 +83,7 @@ function test_yarn_clustermanager(yarncm::YarnManager, limitedtestenv::Bool) @everywhere println("hi") rmprocs(workers()) @test nprocs() == 1 + @test yarncm.am.registration !== nothing # because keep_connected is true by default nothing end