Skip to content

Commit

Permalink
Merge pull request #48 from JuliaParallel/tan/yarn
Browse files Browse the repository at this point in the history
improve api and cluster manager, update docs
  • Loading branch information
tanmaykm authored Jan 8, 2020
2 parents 0b9e19d + 58aa882 commit f096096
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 75 deletions.
42 changes: 39 additions & 3 deletions HDFS.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ The DFS can be navigated using the same Julia APIs as used for a traditional fil
julia> pwd(dfs)
"/"
julia> du(dfs)
0x0000000000000017
julia> readdir(dfs)
5-element Array{AbstractString,1}:
"testdir"
Expand Down Expand Up @@ -65,6 +68,28 @@ HDFSFileInfo: bar
julia> isfile(bar_file)
true
julia> isdir(bar_file)
false
julia> islink(bar_file)
false
julia> filemode(bar_file)
0x000001a4
julia> mtime(bar_file)
0x0000016f7f71aa30
julia> atime(bar_file)
0x0000016f7f71a980
julia> dirname(bar_file)
HDFSFile: hdfs://userid@localhost:9000/tmp/foo/
julia> joinpath(dirname(bar_file), "baz_file")
HDFSFile: hdfs://userid@localhost:9000/tmp/foo/baz_file
...
````

Expand All @@ -90,13 +115,24 @@ HDFSFileInfo: baz.txt
julia> open(bar_file, "w") do f
write(f, b"hello world")
end
0x000000000000000b
11
julia> open(bar_file, "r") do f
bytes = Array(UInt8, filesize(f))
bytes = Vector{UInt8}(undef, filesize(f))
read!(f, bytes)
println(bytestring(bytes))
println(String(bytes))
end
hello world
````

Elly also supports block level access to files, to enable distributed processing.

```
julia> hdfs_blocks(huge_file)
1-element Array{Tuple{UInt64,Array},1}:
(0x0000000000000000, AbstractString["node1"])
(0x0000000007d00000, AbstractString["node2"])
(0x000000000fa00000, AbstractString["node3"])
```

89 changes: 71 additions & 18 deletions YARN.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,48 @@ YarnNodes: 1 (connected to 0)
YarnNode: /default-rack/tanlt:36080 running, Used mem: 0/8192, cores: 0/8
````

### YarnAppMaster
### Yarn Applications (The YarnAppMaster)

Elly supports only unmanaged application masters. A `YarnAppMaster` can be constructed by providing the address of the Yarn Scheduler and a
`UserGroupInformation` object. It is then registered with Yarn using the `submit` API to get an application as a `YarnApp` instance.
An ApplicationMaster, in Yarn terminology, is the part of an application that negotiates resources from the Yarn ResourceManager and works
with the Yarn NodeManager to execute and monitor the granted resources (bundled as containers) for a given application. Application masters
can either be:

- Managed: managed by Yarn and run inside the cluster, resources are allocated inside the Yarn cluster and Yarn instantiates the process
- Unmanaged: not managed by Yarn and run outside the cluster, it is the application writers responsibility to ensure that it has the resources it needs and is kept running throughout the course of the application

Elly supports both managed and unmanaged application masters.

#### Unmanaged YarnAppMaster

An unmanaged `YarnAppMaster` can be constructed by providing the address of the Yarn Scheduler and a
`UserGroupInformation` object. It needs to be then registered with Yarn using the `submit` API to get an application as a `YarnApp` instance.

````
julia> yarnam = YarnAppMaster("localhost", 8030, ugi)
YarnAppMaster: tan@localhost:8030/
id: 6c215ce3-0070-4b
connected: false
Memory: available:0, max:0, can schecule:false
Cores: available:0, max:0, can schedule:false
Queue:
YarnNodes: 0 (connected to 0)
Containers: 0/0 active, 0 in use
````

Applications may register callbacks for container allocation and finish events to be able to start a task on the allocated container or
read the results.
Once registered, the application master can then allocate one or more containers in the cluster.
But before they can do that, applications should register callbacks for container allocation and
finish events, so that they can start a task on the allocated container or read the results after
termination.

````
julia> cids = Dict()
julia> cids = Set()
Dict{Any,Any} with 0 entries
julia> function on_alloc(cid)
# start container process
println("allocated $cid")
env = Dict("JULIA_PKGDIR" => "/home/tan/.julia");
env = Dict(
"JULIA_LOAD_PATH" => join([Base.LOAD_PATH..., "/home/tan/.julia/dev", "/home/tan/.julia/packages"], ':'),
"JULIA_DEPOT_PATH" => join(Base.DEPOT_PATH, ':')
);
clc = launchcontext(cmd="/bin/julia /tmp/simplecontainer.jl 1>/tmp/stdout 2>/tmp/stderr", env=env);
container_start(yarnam, cid, clc)
cids[cid] = "some identifier"
push!(cids, cid)
nothing
end
on_alloc (generic function with 1 method)
Expand All @@ -71,7 +82,7 @@ julia> function on_finish(cid)
# release the container (or can start a new process here also)
println("finished $cid")
container_release(yarnam, cid)
delete!(cids, cid)
pop!(cids, cid)
nothing
end
on_finish (generic function with 1 method)
Expand All @@ -83,18 +94,16 @@ YarnApp YARN (EllyApp/2): accepted-0.0
location: tan@N/A:0/default
````

Containers can then be allocated/de-allocated and started/stopped as required.
With event handlers registered, containers can then be allocated/de-allocated and started/stopped as required.

````
julia> container_allocate(yarnam, 1);
allocated Elly.hadoop.yarn.ContainerIdProto(#undef,Elly.hadoop.yarn.ApplicationAttemptIdProto(Elly.hadoop.yarn.ApplicationIdProto(2,1461548151454),1),1)
julia> cid = collect(keys(cids))[1]
julia> cid = collect(cids)[1]
Elly.hadoop.yarn.ContainerIdProto(#undef,Elly.hadoop.yarn.ApplicationAttemptIdProto(Elly.hadoop.yarn.ApplicationIdProto(2,1461548151454),1),1)
julia> container_stop(yarnam, cid);
julia> container_release(yarnam, cid);
finished Elly.hadoop.yarn.ContainerIdProto(#undef,Elly.hadoop.yarn.ApplicationAttemptIdProto(Elly.hadoop.yarn.ApplicationIdProto(2,1461548151454),1),1)
````

Expand All @@ -104,3 +113,47 @@ Finally the application master can be terminated with a call to `unregister`:
julia> unregister(yarnam, true)
true
````

#### Managed YarnAppMaster

A managed `YarnAppMaster` can be deployed simply by submitting a command to the YarnClient with the `unmanaged` flag set to `false`.

```julia
ugi = UserGroupInformation()
clnt = YarnClient(host, rmport, ugi)
yarn_host = "yarnhost"
yarn_scheduler_port = 8030

env = Dict(
"JULIA_LOAD_PATH"=>join([Base.LOAD_PATH..., "/usr/local/julia/packages"], ':'),
"JULIA_DEPOT_PATH"=>join([Base.DEPOT_PATH..., "/usr/local/julia"], ':')
)

testscript = "/application/masterprocess.jl"
app = submit(clnt, ["/usr/local/julia/bin/julia", testscript], env; schedaddr="$(yarn_host):$(yarn_scheduler_port)", unmanaged=false)
```

Once submitted, the submitting process can exit, leaving the application master running inside the cluster. It can also monitor the application if so desired.

```julia
@info("status", status(app))
```

And wait for application to reach a certain state.

```julia
Elly.wait_for_state(app, Elly.YarnApplicationStateProto.FINISHED)
```

The Yarn master process thus submitted can create an instance of `YarnAppMaster` and use it to manage itself and also allocate and launch more containers into the cluster.


For example, the `/application/masterprocess.jl` script launched above, can instantiate a `YarnAppMaster` and register itself.

```
ugi = UserGroupInformation()
am = YarnAppMaster(ugi)
register(am)
```

And then it can proceed to allocate and execute more containers exactly as how we did with the unmanaged `YarnAppMaster`.
33 changes: 28 additions & 5 deletions YARNCM.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,35 @@

`YarnManager` provides a Julia [ClusterManager](http://docs.julialang.org/en/latest/manual/parallel-computing/#clustermanagers) interface for
working with Yarn. It does not have the full functionality of the direct Yarn APIs, but it provides the familiar `addprocs`, `rmprocs` methods
for starting and stopping containers.
for starting and stopping containers.

`YarnManager` works in both managed mode (both master and workers launched inside the cluster) and unmanaged mode (only workers launched inside the cluster). See section on ["Yarn Applications using Elly"](YARN.md) for details.

It can be also used to get a distributed Julia shell in the Yarn cluster.

The below example walks through a simple example using a Julia on a Yarn cluster.
You can find the YARN manager parameters in the `$HADOOP_CONFIG_DIR/yarn-site.xml` file
[Hadoop Docs](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html).

Bring up the Julia processes on the Yarn cluster:
Bring up the Julia processes on the Yarn cluster. Note that Julia should be installed on all nodes of the cluster at the same path for this to work.

````
julia> using Elly
julia> yarncm = YarnManager(yarnhost="localhost", rmport=8032, schedport=8030, launch_timeout=60);
julia> yarncm = YarnManager(
yarnhost="localhost",
rmport=8032,
schedport=8030,
launch_timeout=60,
unmanaged=false # pass true when running in unmanaged mode
);
julia>env = Dict(
"JULIA_LOAD_PATH"=>join([Base.LOAD_PATH..., "/usr/local/julia/packages"], ':'),
"JULIA_DEPOT_PATH"=>join([Base.DEPOT_PATH..., "/usr/local/julia"], ':')
);
julia> addprocs(yarncm; np=8, env=Dict("JULIA_PKGDIR"=>Pkg.dir()));
julia> addprocs(yarncm; np=8, env=env);
julia> @everywhere println(myid())
1
Expand All @@ -32,7 +45,8 @@ julia> @everywhere println(myid())
````

Next, we try some trivial computation on all nodes. We use a file `dart.jl` that contains some code to
arrive at an approximate value of pi using a Monte Carlo method:
arrive at an approximate value of pi using a Monte Carlo method. Note that `dart.jl` should be made
available throughout the cluster on all nodes at the same path.

````
# dart.jl
Expand Down Expand Up @@ -88,3 +102,12 @@ julia> @everywhere println(myid())
julia> Elly.disconnect(yarncm);
````

`YarnManager` can also be used in the familiar Julia `do` block by passing a function to execute in the context of the manager, e.g.:

```
YarnManager(launch_timeout=60, unmanaged=false) do yarncm
# use yarncm here...
...
end
```
3 changes: 0 additions & 3 deletions src/Elly.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ export YarnAppMaster, register, unregister, kill, can_schedule_mem, can_schedule

export YarnManager, launch, manage

const Lock = Channel
makelock() = Channel{Int}(1)

include("hadoop/hadoop.jl")
using Elly.hadoop
using Elly.hadoop.common
Expand Down
31 changes: 10 additions & 21 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,17 @@ mutable struct YarnAppMaster
response_id::Int32 # initial value must be 0, update with response_id sent from server on every response
registration::Union{Nothing,RegisterApplicationMasterResponseProto}
am_rm_task::Union{Nothing,Task}
lck::Lock
lck::ReentrantLock

function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation=UserGroupInformation(),
amhost::AbstractString="", amport::Integer=0, amurl::AbstractString="")
amrm_conn = YarnAMRMProtocol(rmhost, rmport, ugi)
lck = makelock()
put!(lck, 1)

new(amrm_conn, amhost, amport, amurl,
Int32(0), Int32(0), Int32(0), Int32(0),
YarnNodes(ugi), YarnContainers(),
"", 0,
nothing, nothing, lck)
nothing, nothing, ReentrantLock())
end

function YarnAppMaster(ugi::UserGroupInformation=UserGroupInformation())
Expand All @@ -83,17 +81,6 @@ function YarnAppMaster(fn::Function, ugi::UserGroupInformation=UserGroupInformat
end
end

function withlock(fn, yam)
try
take!(yam.lck)
return fn()
catch ex
rethrow(ex)
finally
put!(yam.lck, 1)
end
end

function show(io::IO, yam::YarnAppMaster)
show(io, yam.amrm_conn)
if yam.registration !== nothing
Expand All @@ -111,7 +98,7 @@ callback(yam::YarnAppMaster, on_container_alloc::Union{Nothing,Function}, on_con
function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster)
@debug("submitting unmanaged application")
clc = launchcontext()
app = submit(client, clc, YARN_CONTAINER_MEM_DEFAULT, YARN_CONTAINER_CPU_DEFAULT; unmanaged=true)
app = submit(client, clc; unmanaged=true)

# keep the am_rm token
tok = am_rm_token(app)
Expand All @@ -137,7 +124,7 @@ function register(yam::YarnAppMaster)
end
!isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url)

resp = withlock(yam) do
resp = lock(yam.lck) do
registerApplicationMaster(yam.amrm_conn, inp)
end
yam.registration = resp
Expand Down Expand Up @@ -170,7 +157,7 @@ function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::Abstra
!isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url)
!isempty(diagnostics) && setproperty!(inp, :diagnostics, diagnostics)

resp = withlock(yam) do
resp = lock(yam.lck) do
finishApplicationMaster(yam.amrm_conn, inp)
end
resp.isUnregistered && (yam.registration = nothing)
Expand All @@ -184,7 +171,8 @@ kill(yam::YarnAppMaster, diagnostics::AbstractString="") = _unregister(yam, Fina
container_allocate(yam::YarnAppMaster, numcontainers::Int; opts...) = request_alloc(yam.containers, numcontainers; opts...)
container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_release(yam.containers, cids...)

container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec)
container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, container_id_string(cid), container_spec)
container_start(yam::YarnAppMaster, cidstr::String, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cidstr], container_spec)
function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto)
@debug("starting", container)
req = StartContainerRequestProto(container_launch_context=container_spec, container_token=container.container_token)
Expand All @@ -210,7 +198,8 @@ function container_start(yam::YarnAppMaster, container::ContainerProto, containe
cid
end

container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, yam.containers.containers[cid])
container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, container_id_string(cid))
container_stop(yam::YarnAppMaster, cidstr::String) = container_stop(yam, yam.containers.containers[cidstr])
function container_stop(yam::YarnAppMaster, container::ContainerProto)
@debug("stopping", container)

Expand Down Expand Up @@ -249,7 +238,7 @@ function _update_rm(yam::YarnAppMaster)
setproperty!(inp, :response_id, yam.response_id)

#@debug(inp)
resp = withlock(yam) do
resp = lock(yam.lck) do
allocate_resp = allocate(yam.amrm_conn, inp)
yam.response_id = allocate_resp.response_id # next response id must match this
allocate_resp
Expand Down
Loading

2 comments on commit f096096

@tanmaykm
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/7652

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if Julia TagBot is installed, or can be done manually through the github interface, or via:

git tag -a v0.4.0 -m "<description of version>" f096096d75a2776e7467cdfee9614681222b0c0e
git push origin v0.4.0

Please sign in to comment.