Skip to content

Commit

Permalink
use CoreLogging instead of custom macro
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmaykm committed Jul 30, 2018
1 parent 6875347 commit 7ed1493
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 139 deletions.
22 changes: 0 additions & 22 deletions src/Elly.jl
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,6 @@ export YarnManager, launch, manage
const Lock = Channel
makelock() = Channel{Int}(1)

#function tstr()
# t = time()
# string(Libc.strftime("%Y-%m-%dT%H:%M:%S",t), Libc.strftime("%z",t)[1:end-2], ":", Libc.strftime("%z",t)[end-1:end])
#end

# enable logging only during debugging
#using Logging
##const logger = Logging.configure(filename="elly.log", level=DEBUG)
#const logger = Logging.configure(level=DEBUG)
#macro logmsg(s)
# quote
# @info("[$(myid())-] " * $(esc(s)))
# end
#end
macro logmsg(s)
end
#macro logmsg(s)
# quote
# info(tstr(), " [", myid(), "-", "] ", $(esc(s)))
# end
#end

include("hadoop/hadoop.jl")
using Elly.hadoop
using Elly.hadoop.common
Expand Down
4 changes: 2 additions & 2 deletions src/api_hdfs_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ function _complete_file(client::HDFSClient, path::AbstractString, last::Union{No
:clientName => ELLY_CLIENTNAME))
if last !== nothing
set_field!(endinp, :last, last)
@logmsg("setting last block as $(last)")
@debug("setting last block as $(last)")
end

endresp = complete(client.nn_conn, endinp)
Expand All @@ -400,7 +400,7 @@ end

function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:LocatedBlockProto
(previous === nothing) && (return _add_block(ExtendedBlockProto, client, path))
@logmsg("adding next block to $(previous.b)")
@debug("adding next block to $(previous.b)")
_add_block(ExtendedBlockProto, client, path, previous.b)
end
function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:ExtendedBlockProto
Expand Down
18 changes: 9 additions & 9 deletions src/api_hdfs_io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ end

isconnected(reader::HDFSFileReader) = (reader.blk_reader !== nothing)
function disconnect(reader::HDFSFileReader, reuse::Bool)
@logmsg("disconnecting $(URI(reader,true)) reuse:$reuse")
@debug("disconnecting", uri=URI(reader,true), reuse=reuse)
reader.blocks = nothing
if reader.blk_reader !== nothing
isconnected(reader.blk_reader) && disconnect(reader.blk_reader, reuse)
Expand All @@ -103,13 +103,13 @@ function disconnect(reader::HDFSFileReader, reuse::Bool)
end

function connect(reader::HDFSFileReader)
@logmsg("connect if required $(URI(reader,true))")
@debug("connect if required", uri=URI(reader,true))
eof(reader) && return
isconnected(reader) && return

@logmsg("connecting $(URI(reader,true))")
@debug("connecting", uri=URI(reader,true))
if !(reader.fptr in _current_window(reader))
@logmsg("moving block window for $(URI(reader,true))")
@debug("moving block window", uri=URI(reader,true))
len = min(bytesavailable(reader), HDFS_READER_WINDOW_LENGTH)
blocks = _get_block_locations(reader.client, reader.path, reader.fptr, len)
(blocks === nothing) && throw(HDFSException("Could not get block metadata for $(URI(reader,true))"))
Expand Down Expand Up @@ -156,7 +156,7 @@ function _read_and_buffer(reader::HDFSFileReader, out::Vector{UInt8}, offset::UI
if ret < 0
pkt_len = len + UInt64(abs(ret)) # bytes in this packet
buff = Vector{UInt8}(undef, pkt_len) # allocate a temporary array
@logmsg("allocated temporary array of size $pkt_len, len:$len, ret:$ret, offset:$offset, bufflen:$(length(buff)), outlen:$(length(out))")
@debug("allocated temporary array", size=pkt_len, len=len, ret=ret, offset=offset, bufflen=length(buff), outlen=length(out))
ret = read_packet!(blk_reader, buff, UInt64(1)) # read complete packet
copyto!(out, offset, buff, 1, len) # copy len bytes to output
Base.write_sub(reader.buffer, buff, len+1, pkt_len-len) # copy remaining data to buffer
Expand All @@ -173,9 +173,9 @@ function _read_and_buffer(reader::HDFSFileReader, out::Vector{UInt8}, offset::UI
if reader.blk_reader !== nothing
blk_reader = reader.blk_reader
channel = blk_reader.channel
@logmsg("exception receiving from $(channel.host):$(channel.port): $ex")
@debug("exception receiving from $(channel.host):$(channel.port)", ex=ex)
else
@logmsg("exception receiving: $ex")
@debug("exception receiving", ex=ex)
end
disconnect(reader, false)
rethrow(ex)
Expand Down Expand Up @@ -216,7 +216,7 @@ function read!(reader::HDFSFileReader, a::Vector{UInt8})
end
else
nbytes = min(navlb, remaining)
@logmsg("reading $nbytes from buffer. navlb:$navlb remaining:$remaining, offset:$offset")
@debug("reading $nbytes from buffer", nbytes=nbytes, navlb=navlb, remaining=remaining, offset=offset)
Base.read_sub(reader.buffer, a, offset, nbytes)
reader.fptr += nbytes
end
Expand Down Expand Up @@ -384,7 +384,7 @@ function cp(frompath::Union{HDFSFile,AbstractString}, topath::Union{HDFSFile,Abs
read!(fromfile, buff)
write(tofile, buff)
brem -= bread
@logmsg("remaining $brem/$btot")
@debug("remaining $brem/$btot")
end
close(fromfile)
close(tofile)
Expand Down
26 changes: 13 additions & 13 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ callback(yam::YarnAppMaster, on_container_alloc::Union{Nothing,Function}, on_con
callback(yam.containers, on_container_alloc, on_container_finish)

function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster)
@logmsg("submitting unmanaged application")
@debug("submitting unmanaged application")
clc = launchcontext()
app = submit(client, clc, YARN_CONTAINER_MEM_DEFAULT, YARN_CONTAINER_CPU_DEFAULT; unmanaged=true)

Expand Down Expand Up @@ -116,7 +116,7 @@ function register(yam::YarnAppMaster)
yam.max_mem = resp.maximumCapability.memory
yam.max_cores = resp.maximumCapability.virtual_cores
end
@logmsg("max capability: mem:$(yam.max_mem), cores:$(yam.max_cores)")
@debug("max capability: mem:$(yam.max_mem), cores:$(yam.max_cores)")
if isfilled(resp, :queue)
yam.queue = resp.queue
end
Expand Down Expand Up @@ -155,7 +155,7 @@ container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_relea

container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec)
function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto)
@logmsg("starting container $(container)")
@debug("starting container $(container)")
req = protobuild(StartContainerRequestProto, Dict(:container_launch_context => container_spec, :container_token => container.container_token))
inp = protobuild(StartContainersRequestProto, Dict(:start_container_request => [req]))

Expand All @@ -181,7 +181,7 @@ end

container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, yam.containers.containers[cid])
function container_stop(yam::YarnAppMaster, container::ContainerProto)
@logmsg("stopping container $container")
@debug("stopping container $container")

inp = protobuild(StopContainersRequestProto, Dict(:container_id => [container.id]))
nodeid = container.nodeId
Expand All @@ -205,13 +205,13 @@ function container_stop(yam::YarnAppMaster, container::ContainerProto)
end

function _update_rm(yam::YarnAppMaster)
@logmsg("started processing am-rm messages")
@debug("started processing am-rm messages")
inp = AllocateRequestProto()

# allocation and release requests
(alloc_pending,release_pending) = torequest(yam.containers)
@logmsg("alloc pending: $alloc_pending")
@logmsg("release pending: $release_pending")
@debug("alloc pending: $alloc_pending")
@debug("release pending: $release_pending")
!isempty(alloc_pending) && set_field!(inp, :ask, alloc_pending)
!isempty(release_pending) && set_field!(inp, :release, release_pending)

Expand All @@ -223,11 +223,11 @@ function _update_rm(yam::YarnAppMaster)
end
set_field!(inp, :response_id, yam.response_id)

#@logmsg(inp)
#@debug(inp)
resp = withlock(yam) do
allocate(yam.amrm_conn, inp)
end
#@logmsg(resp)
#@debug(resp)

# store/update tokens
channel = yam.amrm_conn.channel
Expand All @@ -248,15 +248,15 @@ function _update_rm(yam::YarnAppMaster)
# update node and container status
update(yam.nodes, resp)
update(yam.containers, resp)
@logmsg("finished processing am-rm messages")
#@logmsg(yam)
@debug("finished processing am-rm messages")
#@debug(yam)
nothing
end

haverequests(yam::YarnAppMaster) = haverequests(yam.containers)

function process_am_rm(yam::YarnAppMaster)
@logmsg("started am-rm processor task")
@debug("started am-rm processor task")
stopped = ()->(yam.registration === nothing)
stopwaiting = ()->(haverequests(yam) || (yam.registration === nothing))
waittime = 10.
Expand All @@ -272,6 +272,6 @@ function process_am_rm(yam::YarnAppMaster)
(waittime < 10) && sleep(10-waittime)
end
end
@logmsg("stopped am-rm processor task")
@debug("stopped am-rm processor task")
nothing
end
10 changes: 5 additions & 5 deletions src/api_yarn_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,20 @@ function update(containers::YarnContainers, arp::AllocateResponseProto)
id = cont.id
contlist[id] = cont
push!(active, id)
@logmsg("calling callback for alloc")
@debug("calling callback for alloc")
(cballoc === nothing) || @async cballoc(id)
end
end
if isfilled(arp, :completed_container_statuses)
@logmsg("have completed containers")
@debug("have completed containers")
for contst in arp.completed_container_statuses
id = contst.container_id
@logmsg("container $id is finished")
@debug("container $id is finished")
status[id] = contst
@logmsg("id in active: $(id in active)")
@debug("id in active: $(id in active)")
(id in active) && pop!(active, id)
(id in busy) && pop!(busy, id)
@logmsg("calling callback for finish")
@debug("calling callback for finish")
(cbfinish === nothing) || @async cbfinish(id)
end
end
Expand Down
6 changes: 3 additions & 3 deletions src/api_yarn_client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ function status(app::YarnApp, refresh::Bool=true)
end

function wait_for_state(app::YarnApp, state::Int32, timeout_secs::Int=60)
@logmsg("waiting for application to reach $(APP_STATES[state]) ($state) state")
@debug("waiting for application to reach $(APP_STATES[state]) ($state) state")
t1 = time() + timeout_secs
finalstates = (YarnApplicationStateProto.KILLED, YarnApplicationStateProto.FAILED, YarnApplicationStateProto.FINISHED)
isfinalstate = state in finalstates
Expand Down Expand Up @@ -243,7 +243,7 @@ function attempts(app::YarnApp, refresh::Bool=true)
end

function wait_for_attempt_state(app::YarnApp, attemptid::Int32, state::Int32, timeout_secs::Int=60)
@logmsg("waiting for application attempt $attemptid to reach $(ATTEMPT_STATES[state]) ($state) state")
@debug("waiting for application attempt $attemptid to reach $(ATTEMPT_STATES[state]) ($state) state")
t1 = time() + timeout_secs
finalstates = (YarnApplicationAttemptStateProto.APP_ATTEMPT_KILLED, YarnApplicationAttemptStateProto.APP_ATTEMPT_FAILED, YarnApplicationAttemptStateProto.APP_ATTEMPT_FINISHED)
isfinalstate = state in finalstates
Expand All @@ -255,7 +255,7 @@ function wait_for_attempt_state(app::YarnApp, attemptid::Int32, state::Int32, ti
atmptstate = report.yarn_application_attempt_state
(atmptstate == state) && (return true)
isfinalstate || ((atmptstate in finalstates) && (return false))
@logmsg("application attempt $attemptid is in state $(ATTEMPT_STATES[atmptstate]) ($atmptstate) state")
@debug("application attempt $attemptid is in state $(ATTEMPT_STATES[atmptstate]) ($atmptstate) state")
break
end
end
Expand Down
12 changes: 6 additions & 6 deletions src/cluster_manager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct YarnManager <: ClusterManager
params[n] = v
end
paramkeys = keys(params)
@logmsg("YarnManager constructor: params: $params")
@debug("YarnManager constructor: params: $params")

user = (:user in paramkeys) ? params[:user] : ""
rmport = (:rmport in paramkeys) ? params[:rmport] : 8032
Expand Down Expand Up @@ -41,7 +41,7 @@ function show(io::IO, yarncm::YarnManager)
end

function setup_worker(host, port, cookie)
@logmsg("YarnManager setup_worker: host:$host port:$port cookie:$cookie for container $(ENV[CONTAINER_ID])")
@debug("YarnManager setup_worker: host:$host port:$port cookie:$cookie for container $(ENV[CONTAINER_ID])")
c = connect(IPv4(host), port)
Base.wait_connected(c)
redirect_stdout(c)
Expand Down Expand Up @@ -91,7 +91,7 @@ function _currprocname()
end

function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition)
@logmsg("YarnManager launch: params: $params")
@debug("YarnManager launch: params: $params")

paramkeys = keys(params)
np = (:np in paramkeys) ? params[:np] : 1
Expand All @@ -116,8 +116,8 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con
initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port), $(cookie))"
clc = launchcontext(cmd="$cmd -e '$initargs'", env=appenv)

@logmsg("YarnManager launch: initargs: $initargs")
@logmsg("YarnManager launch: context: $clc")
@debug("YarnManager launch: initargs: $initargs")
@debug("YarnManager launch: context: $clc")
on_alloc = (cid) -> container_start(manager.am, cid, clc)
callback(manager.am, on_alloc, nothing)

Expand Down Expand Up @@ -156,7 +156,7 @@ end
function manage(manager::YarnManager, id::Integer, config::WorkerConfig, op::Symbol)
# This function needs to exist, but so far we don't do anything
if op == :deregister
@logmsg("YarnManager manage: id:$id, op:$op, nprocs:$(nprocs())")
@debug("YarnManager manage: id:$id, op:$op, nprocs:$(nprocs())")
!manager.keep_connected && (1 == nprocs()) && (@async disconnect(manager))
end
nothing
Expand Down
Loading

0 comments on commit 7ed1493

Please sign in to comment.