From 7ed1493cc8ae3370fc554fda1c110f5c121d89e0 Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 30 Jul 2018 14:42:57 +0530 Subject: [PATCH] use CoreLogging instead of custom macro --- src/Elly.jl | 22 ------- src/api_hdfs_base.jl | 4 +- src/api_hdfs_io.jl | 18 +++--- src/api_yarn_appmaster.jl | 26 ++++---- src/api_yarn_base.jl | 10 +-- src/api_yarn_client.jl | 6 +- src/cluster_manager.jl | 12 ++-- src/rpc.jl | 133 +++++++++++++++++++------------------- src/sasl.jl | 24 +++---- 9 files changed, 116 insertions(+), 139 deletions(-) diff --git a/src/Elly.jl b/src/Elly.jl index 152ecac..81c8850 100644 --- a/src/Elly.jl +++ b/src/Elly.jl @@ -46,28 +46,6 @@ export YarnManager, launch, manage const Lock = Channel makelock() = Channel{Int}(1) -#function tstr() -# t = time() -# string(Libc.strftime("%Y-%m-%dT%H:%M:%S",t), Libc.strftime("%z",t)[1:end-2], ":", Libc.strftime("%z",t)[end-1:end]) -#end - -# enable logging only during debugging -#using Logging -##const logger = Logging.configure(filename="elly.log", level=DEBUG) -#const logger = Logging.configure(level=DEBUG) -#macro logmsg(s) -# quote -# @info("[$(myid())-] " * $(esc(s))) -# end -#end -macro logmsg(s) -end -#macro logmsg(s) -# quote -# info(tstr(), " [", myid(), "-", "] ", $(esc(s))) -# end -#end - include("hadoop/hadoop.jl") using Elly.hadoop using Elly.hadoop.common diff --git a/src/api_hdfs_base.jl b/src/api_hdfs_base.jl index ef9f8e1..963d677 100644 --- a/src/api_hdfs_base.jl +++ b/src/api_hdfs_base.jl @@ -391,7 +391,7 @@ function _complete_file(client::HDFSClient, path::AbstractString, last::Union{No :clientName => ELLY_CLIENTNAME)) if last !== nothing set_field!(endinp, :last, last) - @logmsg("setting last block as $(last)") + @debug("setting last block as $(last)") end endresp = complete(client.nn_conn, endinp) @@ -400,7 +400,7 @@ end function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:LocatedBlockProto (previous === nothing) && (return _add_block(ExtendedBlockProto, client, path)) - @logmsg("adding next block to $(previous.b)") + @debug("adding next block to $(previous.b)") _add_block(ExtendedBlockProto, client, path, previous.b) end function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:ExtendedBlockProto diff --git a/src/api_hdfs_io.jl b/src/api_hdfs_io.jl index 199551f..75f0000 100644 --- a/src/api_hdfs_io.jl +++ b/src/api_hdfs_io.jl @@ -92,7 +92,7 @@ end isconnected(reader::HDFSFileReader) = (reader.blk_reader !== nothing) function disconnect(reader::HDFSFileReader, reuse::Bool) - @logmsg("disconnecting $(URI(reader,true)) reuse:$reuse") + @debug("disconnecting", uri=URI(reader,true), reuse=reuse) reader.blocks = nothing if reader.blk_reader !== nothing isconnected(reader.blk_reader) && disconnect(reader.blk_reader, reuse) @@ -103,13 +103,13 @@ function disconnect(reader::HDFSFileReader, reuse::Bool) end function connect(reader::HDFSFileReader) - @logmsg("connect if required $(URI(reader,true))") + @debug("connect if required", uri=URI(reader,true)) eof(reader) && return isconnected(reader) && return - @logmsg("connecting $(URI(reader,true))") + @debug("connecting", uri=URI(reader,true)) if !(reader.fptr in _current_window(reader)) - @logmsg("moving block window for $(URI(reader,true))") + @debug("moving block window", uri=URI(reader,true)) len = min(bytesavailable(reader), HDFS_READER_WINDOW_LENGTH) blocks = _get_block_locations(reader.client, reader.path, reader.fptr, len) (blocks === nothing) && throw(HDFSException("Could not get block metadata for $(URI(reader,true))")) @@ -156,7 +156,7 @@ function _read_and_buffer(reader::HDFSFileReader, out::Vector{UInt8}, offset::UI if ret < 0 pkt_len = len + UInt64(abs(ret)) # bytes in this packet buff = Vector{UInt8}(undef, pkt_len) # allocate a temporary array - @logmsg("allocated temporary array of size $pkt_len, len:$len, ret:$ret, offset:$offset, bufflen:$(length(buff)), outlen:$(length(out))") + @debug("allocated temporary array", size=pkt_len, len=len, ret=ret, offset=offset, bufflen=length(buff), outlen=length(out)) ret = read_packet!(blk_reader, buff, UInt64(1)) # read complete packet copyto!(out, offset, buff, 1, len) # copy len bytes to output Base.write_sub(reader.buffer, buff, len+1, pkt_len-len) # copy remaining data to buffer @@ -173,9 +173,9 @@ function _read_and_buffer(reader::HDFSFileReader, out::Vector{UInt8}, offset::UI if reader.blk_reader !== nothing blk_reader = reader.blk_reader channel = blk_reader.channel - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port)", ex=ex) else - @logmsg("exception receiving: $ex") + @debug("exception receiving", ex=ex) end disconnect(reader, false) rethrow(ex) @@ -216,7 +216,7 @@ function read!(reader::HDFSFileReader, a::Vector{UInt8}) end else nbytes = min(navlb, remaining) - @logmsg("reading $nbytes from buffer. navlb:$navlb remaining:$remaining, offset:$offset") + @debug("reading $nbytes from buffer", nbytes=nbytes, navlb=navlb, remaining=remaining, offset=offset) Base.read_sub(reader.buffer, a, offset, nbytes) reader.fptr += nbytes end @@ -384,7 +384,7 @@ function cp(frompath::Union{HDFSFile,AbstractString}, topath::Union{HDFSFile,Abs read!(fromfile, buff) write(tofile, buff) brem -= bread - @logmsg("remaining $brem/$btot") + @debug("remaining $brem/$btot") end close(fromfile) close(tofile) diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index 55c0fe8..272f58a 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -77,7 +77,7 @@ callback(yam::YarnAppMaster, on_container_alloc::Union{Nothing,Function}, on_con callback(yam.containers, on_container_alloc, on_container_finish) function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster) - @logmsg("submitting unmanaged application") + @debug("submitting unmanaged application") clc = launchcontext() app = submit(client, clc, YARN_CONTAINER_MEM_DEFAULT, YARN_CONTAINER_CPU_DEFAULT; unmanaged=true) @@ -116,7 +116,7 @@ function register(yam::YarnAppMaster) yam.max_mem = resp.maximumCapability.memory yam.max_cores = resp.maximumCapability.virtual_cores end - @logmsg("max capability: mem:$(yam.max_mem), cores:$(yam.max_cores)") + @debug("max capability: mem:$(yam.max_mem), cores:$(yam.max_cores)") if isfilled(resp, :queue) yam.queue = resp.queue end @@ -155,7 +155,7 @@ container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_relea container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec) function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto) - @logmsg("starting container $(container)") + @debug("starting container $(container)") req = protobuild(StartContainerRequestProto, Dict(:container_launch_context => container_spec, :container_token => container.container_token)) inp = protobuild(StartContainersRequestProto, Dict(:start_container_request => [req])) @@ -181,7 +181,7 @@ end container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, yam.containers.containers[cid]) function container_stop(yam::YarnAppMaster, container::ContainerProto) - @logmsg("stopping container $container") + @debug("stopping container $container") inp = protobuild(StopContainersRequestProto, Dict(:container_id => [container.id])) nodeid = container.nodeId @@ -205,13 +205,13 @@ function container_stop(yam::YarnAppMaster, container::ContainerProto) end function _update_rm(yam::YarnAppMaster) - @logmsg("started processing am-rm messages") + @debug("started processing am-rm messages") inp = AllocateRequestProto() # allocation and release requests (alloc_pending,release_pending) = torequest(yam.containers) - @logmsg("alloc pending: $alloc_pending") - @logmsg("release pending: $release_pending") + @debug("alloc pending: $alloc_pending") + @debug("release pending: $release_pending") !isempty(alloc_pending) && set_field!(inp, :ask, alloc_pending) !isempty(release_pending) && set_field!(inp, :release, release_pending) @@ -223,11 +223,11 @@ function _update_rm(yam::YarnAppMaster) end set_field!(inp, :response_id, yam.response_id) - #@logmsg(inp) + #@debug(inp) resp = withlock(yam) do allocate(yam.amrm_conn, inp) end - #@logmsg(resp) + #@debug(resp) # store/update tokens channel = yam.amrm_conn.channel @@ -248,15 +248,15 @@ function _update_rm(yam::YarnAppMaster) # update node and container status update(yam.nodes, resp) update(yam.containers, resp) - @logmsg("finished processing am-rm messages") - #@logmsg(yam) + @debug("finished processing am-rm messages") + #@debug(yam) nothing end haverequests(yam::YarnAppMaster) = haverequests(yam.containers) function process_am_rm(yam::YarnAppMaster) - @logmsg("started am-rm processor task") + @debug("started am-rm processor task") stopped = ()->(yam.registration === nothing) stopwaiting = ()->(haverequests(yam) || (yam.registration === nothing)) waittime = 10. @@ -272,6 +272,6 @@ function process_am_rm(yam::YarnAppMaster) (waittime < 10) && sleep(10-waittime) end end - @logmsg("stopped am-rm processor task") + @debug("stopped am-rm processor task") nothing end diff --git a/src/api_yarn_base.jl b/src/api_yarn_base.jl index 8b36d01..1d1d5d8 100644 --- a/src/api_yarn_base.jl +++ b/src/api_yarn_base.jl @@ -310,20 +310,20 @@ function update(containers::YarnContainers, arp::AllocateResponseProto) id = cont.id contlist[id] = cont push!(active, id) - @logmsg("calling callback for alloc") + @debug("calling callback for alloc") (cballoc === nothing) || @async cballoc(id) end end if isfilled(arp, :completed_container_statuses) - @logmsg("have completed containers") + @debug("have completed containers") for contst in arp.completed_container_statuses id = contst.container_id - @logmsg("container $id is finished") + @debug("container $id is finished") status[id] = contst - @logmsg("id in active: $(id in active)") + @debug("id in active: $(id in active)") (id in active) && pop!(active, id) (id in busy) && pop!(busy, id) - @logmsg("calling callback for finish") + @debug("calling callback for finish") (cbfinish === nothing) || @async cbfinish(id) end end diff --git a/src/api_yarn_client.jl b/src/api_yarn_client.jl index 6381c12..84ca5c0 100644 --- a/src/api_yarn_client.jl +++ b/src/api_yarn_client.jl @@ -203,7 +203,7 @@ function status(app::YarnApp, refresh::Bool=true) end function wait_for_state(app::YarnApp, state::Int32, timeout_secs::Int=60) - @logmsg("waiting for application to reach $(APP_STATES[state]) ($state) state") + @debug("waiting for application to reach $(APP_STATES[state]) ($state) state") t1 = time() + timeout_secs finalstates = (YarnApplicationStateProto.KILLED, YarnApplicationStateProto.FAILED, YarnApplicationStateProto.FINISHED) isfinalstate = state in finalstates @@ -243,7 +243,7 @@ function attempts(app::YarnApp, refresh::Bool=true) end function wait_for_attempt_state(app::YarnApp, attemptid::Int32, state::Int32, timeout_secs::Int=60) - @logmsg("waiting for application attempt $attemptid to reach $(ATTEMPT_STATES[state]) ($state) state") + @debug("waiting for application attempt $attemptid to reach $(ATTEMPT_STATES[state]) ($state) state") t1 = time() + timeout_secs finalstates = (YarnApplicationAttemptStateProto.APP_ATTEMPT_KILLED, YarnApplicationAttemptStateProto.APP_ATTEMPT_FAILED, YarnApplicationAttemptStateProto.APP_ATTEMPT_FINISHED) isfinalstate = state in finalstates @@ -255,7 +255,7 @@ function wait_for_attempt_state(app::YarnApp, attemptid::Int32, state::Int32, ti atmptstate = report.yarn_application_attempt_state (atmptstate == state) && (return true) isfinalstate || ((atmptstate in finalstates) && (return false)) - @logmsg("application attempt $attemptid is in state $(ATTEMPT_STATES[atmptstate]) ($atmptstate) state") + @debug("application attempt $attemptid is in state $(ATTEMPT_STATES[atmptstate]) ($atmptstate) state") break end end diff --git a/src/cluster_manager.jl b/src/cluster_manager.jl index 9a2fa67..5d83773 100644 --- a/src/cluster_manager.jl +++ b/src/cluster_manager.jl @@ -13,7 +13,7 @@ struct YarnManager <: ClusterManager params[n] = v end paramkeys = keys(params) - @logmsg("YarnManager constructor: params: $params") + @debug("YarnManager constructor: params: $params") user = (:user in paramkeys) ? params[:user] : "" rmport = (:rmport in paramkeys) ? params[:rmport] : 8032 @@ -41,7 +41,7 @@ function show(io::IO, yarncm::YarnManager) end function setup_worker(host, port, cookie) - @logmsg("YarnManager setup_worker: host:$host port:$port cookie:$cookie for container $(ENV[CONTAINER_ID])") + @debug("YarnManager setup_worker: host:$host port:$port cookie:$cookie for container $(ENV[CONTAINER_ID])") c = connect(IPv4(host), port) Base.wait_connected(c) redirect_stdout(c) @@ -91,7 +91,7 @@ function _currprocname() end function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition) - @logmsg("YarnManager launch: params: $params") + @debug("YarnManager launch: params: $params") paramkeys = keys(params) np = (:np in paramkeys) ? params[:np] : 1 @@ -116,8 +116,8 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port), $(cookie))" clc = launchcontext(cmd="$cmd -e '$initargs'", env=appenv) - @logmsg("YarnManager launch: initargs: $initargs") - @logmsg("YarnManager launch: context: $clc") + @debug("YarnManager launch: initargs: $initargs") + @debug("YarnManager launch: context: $clc") on_alloc = (cid) -> container_start(manager.am, cid, clc) callback(manager.am, on_alloc, nothing) @@ -156,7 +156,7 @@ end function manage(manager::YarnManager, id::Integer, config::WorkerConfig, op::Symbol) # This function needs to exist, but so far we don't do anything if op == :deregister - @logmsg("YarnManager manage: id:$id, op:$op, nprocs:$(nprocs())") + @debug("YarnManager manage: id:$id, op:$op, nprocs:$(nprocs())") !manager.keep_connected && (1 == nprocs()) && (@async disconnect(manager)) end nothing diff --git a/src/rpc.jl b/src/rpc.jl index 73b96dc..c2535b3 100644 --- a/src/rpc.jl +++ b/src/rpc.jl @@ -65,8 +65,7 @@ function buffer_size_delimited(channelbuff::IOBuffer, obj) data = take!(iob) len = write_bytes(channelbuff, data) - @logmsg("$(typeof(obj)) -> $data") - @logmsg("$(typeof(obj)) -> buffer. len $len") + @debug("$(typeof(obj)) -> buffer", len=len, data=data) len end @@ -78,14 +77,14 @@ function send_buffered(buff::IOBuffer, sock::TCPSocket, delimited::Bool) len = write(sock, hton(datalen)) end len += write(sock, data) - @logmsg("buffer -> sock. len $len") + @debug("buffer -> sock", len=len) len end function recv_blockop(sock::TCPSocket) - @logmsg("recv block read message") + @debug("recv block read message") data_bytes = read_bytes(sock) - @logmsg("block_resp <- sock. len $(length(data_bytes))") + @debug("block_resp <- sock", len=length(data_bytes)) block_resp = BlockOpResponseProto() readproto(IOBuffer(data_bytes), block_resp) @@ -184,7 +183,7 @@ end # Connect to the hadoop server and complete the handshake function connect(channel::HadoopRpcChannel) # open connection - @logmsg("connecting to HadoopRpcChannel $(channel.host):$(channel.port)") + @debug("connecting to HadoopRpcChannel $(channel.host):$(channel.port)") try sock = connect(channel.host, channel.port) channel.sock = sock @@ -201,10 +200,10 @@ function connect(channel::HadoopRpcChannel) buffer_conctx_reqhdr(channel) buffer_connctx(channel) send_buffered(channel, true) - @logmsg("connected to HadoopRpcChannel $(channel.host):$(channel.port)") + @debug("connected to HadoopRpcChannel $(channel.host):$(channel.port)") channel.call_id = HRPC_CALL_ID_NORMAL catch ex - @logmsg("exception connecting to $(channel.host):$(channel.port): $ex") + @debug("exception connecting to $(channel.host):$(channel.port): $ex") disconnect(channel) rethrow(ex) end @@ -217,7 +216,7 @@ function disconnect(channel::HadoopRpcChannel) try isconnected(channel) && close(channel.sock) catch ex - @logmsg("exception while closing channel socket $ex") + @debug("exception while closing channel socket $ex") end channel.sock = nothing channel.call_id = HRPC_CALL_ID_CONNCTX @@ -228,14 +227,14 @@ function send_rpc_message(channel::HadoopRpcChannel, controller::HadoopRpcContro isconnected(channel) || connect(channel) try - @logmsg("send rpc message. method: $(method.name)") + @debug("send rpc message. method: $(method.name)") begin_send(channel) buffer_rpc_reqhdr(channel) buffer_method_reqhdr(channel, method) buffer_size_delimited(channel.iob, params) send_buffered(channel, true) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(channel) rethrow(ex) end @@ -244,10 +243,10 @@ end function recv_rpc_message(channel::HadoopRpcChannel, resp) try - @logmsg("recv rpc message") + @debug("recv rpc message") msg_len = ntoh(read(channel.sock, UInt32)) hdr_bytes = read_bytes(channel.sock) - @logmsg("hdr <- sock. len $(length(hdr_bytes))") + @debug("hdr <- sock", len=length(hdr_bytes)) resp_hdr = RpcResponseHeaderProto() readproto(IOBuffer(hdr_bytes), resp_hdr) @@ -260,14 +259,14 @@ function recv_rpc_message(channel::HadoopRpcChannel, resp) hdr_len += _len_uleb(hdr_len) if msg_len > hdr_len data_bytes = read_bytes(channel.sock) - @logmsg("data <- sock. len $(length(data_bytes))") + @debug("data <- sock", len=length(data_bytes)) data_len = msg_len - hdr_len - _len_uleb(length(data_bytes)) (length(data_bytes) == data_len) || throw(HadoopRpcException("unexpected response data length. expected:$(data_len) read:$(length(data_bytes))")) readproto(IOBuffer(data_bytes), resp) end end catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(channel) rethrow(ex) end @@ -275,7 +274,7 @@ function recv_rpc_message(channel::HadoopRpcChannel, resp) end function call_method(channel::HadoopRpcChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::HadoopRpcController, params) - @logmsg("call_method $(method.name)") + @debug("call_method $(method.name)") send_rpc_message(channel, controller, method, params) resp_type = get_response_type(method) resp = resp_type() @@ -311,15 +310,15 @@ mutable struct HadoopDataChannel end function connect(channel::HadoopDataChannel) - @logmsg("connecting to HadoopDataChannel $(channel.host):$(channel.port)") + @debug("connecting to HadoopDataChannel $(channel.host):$(channel.port)") try sock = connect(channel.host, channel.port) channel.sock = sock catch ex - @logmsg("exception connecting to HadoopDataChannel $(channel.host):$(channel.port): $ex") + @debug("exception connecting to HadoopDataChannel $(channel.host):$(channel.port): $ex") rethrow(ex) end - @logmsg("connected to HadoopDataChannel $(channel.host):$(channel.port)") + @debug("connected to HadoopDataChannel $(channel.host):$(channel.port)") nothing end @@ -327,7 +326,7 @@ function disconnect(channel::HadoopDataChannel) try isconnected(channel) && close(channel.sock) catch ex - @logmsg("exception while closing HadoopDataChannel socket $ex") + @debug("exception while closing HadoopDataChannel socket $ex") end channel.sock = nothing nothing @@ -377,13 +376,13 @@ function _get(pool::HadoopDataChannelPool, host::AbstractString, port::Integer) end (timediff < _dcpool.keepalivesecs) || (channel = HadoopDataChannel(host, port)) - @logmsg("return channel: $channel connected: $(isconnected(channel))") + @debug("return channel: $channel connected: $(isconnected(channel))") channel end function _put(pool::HadoopDataChannelPool, channel::HadoopDataChannel, reuse::Bool) isconnected(channel) || return - @logmsg("keeping channel: $channel connected: $(isconnected(channel))") + @debug("keeping channel: $channel connected: $(isconnected(channel))") if !reuse try disconnect(channel) @@ -437,7 +436,7 @@ mutable struct HDFSBlockReader function HDFSBlockReader(host::AbstractString, port::Integer, block::LocatedBlockProto, offset::UInt64, len::UInt64, chk_crc::Bool) channel = _get(_dcpool, host, port) - @logmsg("creating block reader for offset $offset at $host:$port for length $len") + @debug("creating block reader for offset $offset at $host:$port for length $len") new(channel, block, offset, len, nothing, 0, nothing, 0, 0, @@ -483,7 +482,7 @@ function buffer_readblock(reader::HDFSBlockReader) basehdr = protobuild(BaseHeaderProto, Dict(:block => exblock, :token => token)) hdr = protobuild(ClientOperationHeaderProto, Dict(:baseHeader => basehdr, :clientName => ELLY_CLIENTNAME)) readblock = protobuild(OpReadBlockProto, Dict(:offset => offset, :len => len, :header => hdr)) - @logmsg("sending block read message for offset $offset len $len") + @debug("sending block read message for offset $offset len $len") buffer_size_delimited(channel.iob, readblock) end @@ -499,13 +498,13 @@ function send_block_read(reader::HDFSBlockReader) isconnected(channel) || connect(channel) try - @logmsg("send block read message") + @debug("send block read message") begin_send(channel) buffer_opcode(channel, HDATA_READ_BLOCK) buffer_readblock(reader) send_buffered(channel, false) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -515,11 +514,11 @@ end function send_read_status(reader::HDFSBlockReader, status::Int32=Status.SUCCESS) channel = reader.channel try - @logmsg("send read status $status") + @debug("send read status $status") buffer_client_read_status(reader, status) send_buffered(channel, false) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -534,7 +533,7 @@ function recv_blockop(reader::HDFSBlockReader) isvalid_chksum(checksum_type) || throw(HadoopRpcException("Unknown checksum type $checksum_type")) reader.block_op_resp = block_resp catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -544,14 +543,14 @@ end function recv_packet_hdr(reader::HDFSBlockReader) channel = reader.channel try - @logmsg("recv block packet message") + @debug("recv block packet message") sock = channel.sock pkt_len = ntoh(read(sock, UInt32)) hdr_len = ntoh(read(sock, UInt16)) hdr_bytes = Vector{UInt8}(undef, hdr_len) read!(sock, hdr_bytes) - @logmsg("pkt_hdr <- sock. len $(hdr_len) (pkt_len: $pkt_len)") + @debug("pkt_hdr <- sock", len=hdr_len, pkt_len=pkt_len) pkt_hdr = PacketHeaderProto() readproto(IOBuffer(hdr_bytes), pkt_hdr) @@ -562,12 +561,12 @@ function recv_packet_hdr(reader::HDFSBlockReader) reader.chunk = Vector{UInt8}(undef, reader.chunk_len) reader.chunk_count = div(data_len + reader.chunk_len - 1, reader.chunk_len) # chunk_len-1 to take care of chunks with partial data reader.chunks_read = 0 - @logmsg("received packet with $(reader.chunk_count) chunks of $(reader.chunk_len) bytes each in $data_len bytes of data") - @logmsg("last packet: $(pkt_hdr.lastPacketInBlock)") + @debug("received packet with $(reader.chunk_count) chunks of $(reader.chunk_len) bytes each in $data_len bytes of data") + @debug("last packet: $(pkt_hdr.lastPacketInBlock)") checksums = Vector{UInt32}(undef, reader.chunk_count) read!(sock, checksums) - @logmsg("checksums <- sock. len $(sizeof(checksums))") + @debug("checksums <- sock", len=sizeof(checksums)) for idx in 1:length(checksums) checksums[idx] = ntoh(checksums[idx]) end @@ -576,10 +575,10 @@ function recv_packet_hdr(reader::HDFSBlockReader) reader.packet_read = 4 + sizeof(checksums) reader.pkt_hdr = pkt_hdr reader.checksums = checksums - @logmsg("current read position pkt $(reader.packet_read)/$(reader.packet_len), block $(reader.total_read)/$(reader.len)") + @debug("current read position pkt $(reader.packet_read)/$(reader.packet_len), block $(reader.total_read)/$(reader.len)") nothing catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -590,7 +589,7 @@ eof(reader::HDFSBlockReader) = (reader.total_read >= reader.len) function verify_pkt_checksums(reader::HDFSBlockReader, buff::Vector{UInt8}) data_len = length(buff) - @logmsg("verifying packet crc data_len: $data_len") + @debug("verifying packet crc data_len: $data_len") (data_len > 0) || return block_op_resp = reader.block_op_resp @@ -623,7 +622,7 @@ function initiate(reader::HDFSBlockReader; retry=true) return catch ex if retry - @logmsg("retrying block reader initiation") + @debug("retrying block reader initiation") disconnect(reader.channel) connect(reader.channel) return initiate(reader; retry=false) @@ -660,14 +659,14 @@ function read_packet!(reader::HDFSBlockReader, inbuff::Vector{UInt8}, offset::UI read!(sock, buff) reader.packet_read += packet_remaining reader.total_read += packet_remaining - @logmsg("recv $(packet_remaining) packet_read $(reader.packet_read), total_read $(reader.total_read)") + @debug("recv $(packet_remaining) packet_read $(reader.packet_read), total_read $(reader.total_read)") # verify crc only if required reader.chk_crc && verify_pkt_checksums(reader, buff) # send the success block read status if eof(reader) - @logmsg("recv last empty packet") + @debug("recv last empty packet") recv_packet_hdr(reader) send_read_status(reader) end @@ -808,7 +807,7 @@ mutable struct HDFSBlockWriter port = node.xferPort channel = _get(_dcpool, host, port) - @logmsg("creating block writer for offset $(block.offset) at $host:$port") + @debug("creating block writer for offset $(block.offset) at $host:$port") writer = new(channel, defaults, block, node_info, PipeBuffer(), 0, 0, WriterPipeline(), false) writer.pipeline_task = @async process_pipeline(writer) writer @@ -837,11 +836,11 @@ function check_write_buffer(writer::HDFSBlockWriter) max_buffer_bytes = defaults.writePacketSize * max_buffer_pkt buffered_bytes = bytesavailable(writer.buffer) if ((buffered_bytes >= max_buffer_bytes) || (pending(writer.pkt_pipeline) >= max_buffer_pkt)) - @logmsg("slow pipeline. pending bytes:$(bytesavailable(writer.buffer)), packets:$(pending(writer.pkt_pipeline)). waiting...") + @debug("slow pipeline. pending bytes:$(bytesavailable(writer.buffer)), packets:$(pending(writer.pkt_pipeline)). waiting...") trigger_pkt(writer.pkt_pipeline) wait_pkt(writer.pkt_pipeline) elseif buffered_bytes >= defaults.writePacketSize - @logmsg("triggering pkt_pipeline. buffered:$(bytesavailable(writer.buffer)), pending packets:$(pending(writer.pkt_pipeline))") + @debug("triggering pkt_pipeline. buffered:$(bytesavailable(writer.buffer)), pending packets:$(pending(writer.pkt_pipeline))") trigger_pkt(writer.pkt_pipeline) yield() end @@ -865,7 +864,7 @@ function write(writer::HDFSBlockWriter, buff::Vector{UInt8}) end function flush(writer::HDFSBlockWriter) - @logmsg("flush of block writer invoked") + @debug("flush of block writer invoked") flush_and_wait(writer.pkt_pipeline) end @@ -878,7 +877,7 @@ function initiate(writer::HDFSBlockWriter; retry=true) return catch ex if retry - @logmsg("retrying block write initiation") + @debug("retrying block write initiation") disconnect(writer.channel) connect(writer.channel) return initiate(writer; retry=false) @@ -889,7 +888,7 @@ function initiate(writer::HDFSBlockWriter; retry=true) end function process_pipeline(writer::HDFSBlockWriter) - @logmsg("process_pipeline: started") + @debug("process_pipeline: started") pipeline = writer.pkt_pipeline defaults = writer.server_defaults flushed = false @@ -912,7 +911,7 @@ function process_pipeline(writer::HDFSBlockWriter) sock = channel.sock readable_bytes = bytesavailable(sock) - @logmsg("processing pipeline. pending $(bytesavailable(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") + @debug("processing pipeline. pending $(bytesavailable(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") while bytesavailable(writer.buffer) >= defaults.writePacketSize prepare_packet(writer) end @@ -921,38 +920,38 @@ function process_pipeline(writer::HDFSBlockWriter) if flushed while bytesavailable(writer.buffer) > 0 - @logmsg("flushing remaining data packet...") + @debug("flushing remaining data packet...") prepare_packet(writer) end - @logmsg("preparing last empty packet...") + @debug("preparing last empty packet...") prepare_packet(writer) end loop_state = 1 - @logmsg("writing packets...") + @debug("writing packets...") while !isempty(pipeline.pkt_prepared) pkt = pipeline.pkt_prepared[1] write_packet(writer, pkt) end - @logmsg("reading acks...") + @debug("reading acks...") loop_state = 2 while !failed && !isempty(pipeline.pkt_ackwait) && (flushed || (readable_bytes > 0)) read_packet_ack(writer) failed = pipeline.failed readable_bytes = bytesavailable(sock) end - @logmsg("processed pipeline. pending $(bytesavailable(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") + @debug("processed pipeline. pending $(bytesavailable(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") trigger_pkt(pipeline) end catch ex channel = writer.channel extype = (loop_state == 1) ? "send" : (loop_state == 2) ? "read" : "unknown" - @logmsg("pipeline for $(channel.host):$(channel.port) failed. stage $extype. $ex") + @debug("pipeline for $(channel.host):$(channel.port) failed. stage $extype. $ex") disconnect(writer, false) failed = pipeline.failed = true end - @logmsg("process_pipeline finished. failed: $(failed), flushed: $(flushed)") + @debug("process_pipeline finished. failed: $(failed), flushed: $(flushed)") trigger_pkt(pipeline) trigger_flushed(pipeline) end @@ -991,7 +990,7 @@ function buffer_writeblock(writer::HDFSBlockWriter) :maxBytesRcvd => 0, :latestGenerationStamp => exblock.generationStamp, :requestedChecksum => chksum)) - @logmsg("sending block write message for offset $(block.offset)") + @debug("sending block write message for offset $(block.offset)") buffer_size_delimited(channel.iob, writeblock) end @@ -1001,13 +1000,13 @@ function send_block_write(writer::HDFSBlockWriter) channel = writer.channel try - @logmsg("send block write message") + @debug("send block write message") begin_send(channel) buffer_opcode(channel, HDATA_WRITE_BLOCK) buffer_writeblock(writer) send_buffered(channel, false) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(writer, false) rethrow(ex) end @@ -1019,7 +1018,7 @@ function recv_blockop(writer::HDFSBlockWriter) try recv_blockop(channel.sock) catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(writer, false) rethrow(ex) end @@ -1035,7 +1034,7 @@ function populate_checksums(bytes::Vector{UInt8}, chunk_len::UInt32, checksums:: c_len = min(nbytes-c_offset+1, chunk_len) c_data = unsafe_wrap(Array, pointer(bytes, c_offset), c_len) checksums[idx] = hton(calc_chksum(checksum_type, c_data)) - #@logmsg("chksum $(checksums[idx])") + #@debug("chksum $(checksums[idx])") c_offset += c_len end nothing @@ -1045,12 +1044,12 @@ end function prepare_packet(writer::HDFSBlockWriter) defaults = writer.server_defaults - #@logmsg("prepare block packet") + #@debug("prepare block packet") bytes_in_packet = min(defaults.writePacketSize, bytesavailable(writer.buffer)) last_pkt = (bytes_in_packet == 0) seq_no = Int64(writer.pkt_seq += 1) - #@logmsg("packet seqno $seq_no with $(bytes_in_packet)/$(defaults.writePacketSize) bytes is last packet: $last_pkt") + #@debug("packet seqno $seq_no with $(bytes_in_packet)/$(defaults.writePacketSize) bytes is last packet: $last_pkt") pkt_hdr = protobuild(PacketHeaderProto, Dict(:offsetInBlock => writer.total_written, :seqno => seq_no, @@ -1077,7 +1076,7 @@ end const hdr_iob = IOBuffer() function write_packet(writer::HDFSBlockWriter, pkt::PipelinedPacket) - #@logmsg("write block packet") + #@debug("write block packet") channel = writer.channel sock = channel.sock @@ -1093,18 +1092,18 @@ function write_packet(writer::HDFSBlockWriter, pkt::PipelinedPacket) write(sock, pkt.bytes) ackwait(writer.pkt_pipeline, pkt.seqno) - #@logmsg("sent packet to $(channel.host):$(channel.port). total_written:$(writer.total_written)") + #@debug("sent packet to $(channel.host):$(channel.port). total_written:$(writer.total_written)") nothing end function read_packet_ack(writer::HDFSBlockWriter) - #@logmsg("reading packet ack") + #@debug("reading packet ack") channel = writer.channel sock = channel.sock - #@logmsg("recv packet ack message") + #@debug("recv packet ack message") data_bytes = read_bytes(sock) - #@logmsg("ack <- sock. len $(length(data_bytes))") + #@debug("ack <- sock. len $(length(data_bytes))") ack = PipelineAckProto() readproto(IOBuffer(data_bytes), ack) @@ -1115,7 +1114,7 @@ function read_packet_ack(writer::HDFSBlockWriter) exblk = writer.block.b set_field!(exblk, :numBytes, pipeline.acked_bytes) - #@logmsg("received ack for seqno: $(ack.seqno), status: $(ack.reply) bytes acked: $(exblk.numBytes)") + #@debug("received ack for seqno: $(ack.seqno), status: $(ack.reply) bytes acked: $(exblk.numBytes)") nothing end diff --git a/src/sasl.jl b/src/sasl.jl index 84a45b4..a2802c2 100644 --- a/src/sasl.jl +++ b/src/sasl.jl @@ -15,7 +15,7 @@ function digmd5_decode_challenge(challenge::AbstractString) end comps[n] = v end - @logmsg("decoded challenge components: $comps") + @debug("decoded challenge components: $comps") comps end @@ -79,7 +79,7 @@ end buffer_sasl_reqhdr(channel::HadoopRpcChannel) = (channel.sent_call_id = channel.call_id = HRPC_CALL_ID_SASL; buffer_reqhdr(channel, HRPC_CALL_ID_SASL)) function buffer_sasl_message(channel::HadoopRpcChannel, state::Int32, auths::Vector{RpcSaslProto_SaslAuth}=RpcSaslProto_SaslAuth[], token::Vector{UInt8}=UInt8[]) - @logmsg("buffer SASL message. state:$state, nauths:$(length(auths)), token:$(!isempty(token))") + @debug("buffer SASL message. state:$state, nauths:$(length(auths)), token:$(!isempty(token))") saslmsg = protobuild(RpcSaslProto, Dict(:version => 0, :state => state)) isempty(auths) || set_field!(saslmsg, :auths, auths) isempty(token) || set_field!(saslmsg, :token, token) @@ -87,10 +87,10 @@ function buffer_sasl_message(channel::HadoopRpcChannel, state::Int32, auths::Vec end function recv_sasl_message(channel::HadoopRpcChannel) - @logmsg("recv SASL message") + @debug("recv SASL message") resp = RpcSaslProto() recv_rpc_message(channel, resp) - @logmsg("received SASL message $resp") + @debug("received SASL message $resp") resp end @@ -103,10 +103,10 @@ function conditional_sasl_auth(channel::HadoopRpcChannel) tok_alias = token_alias(channel) tokens = find_tokens(channel.ugi, alias=tok_alias) if isempty(tokens) - @logmsg("no token available to authenticate to $tok_alias. skipping") + @debug("no token available to authenticate to $tok_alias. skipping") return false end - @logmsg("found $(length(tokens)) tokens available to authenticate to $tok_alias. authenticating...") + @debug("found $(length(tokens)) tokens available to authenticate to $tok_alias. authenticating...") token = tokens[1] sasl_auth(channel, token) true @@ -127,16 +127,16 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) # check if TOKEN/DIGEST-MD5 is one of the supported methods nauths = length(resp.auths) - @logmsg("server supports $nauths auths") + @debug("server supports $nauths auths") idx_auth = 0 for idx in 1:length(resp.auths) auth = resp.auths[idx] - @logmsg(" $idx: $(auth.method)/$(auth.mechanism)") + @debug(" $idx: $(auth.method)/$(auth.mechanism)") ((auth.method == "TOKEN") && (auth.mechanism == "DIGEST-MD5")) || continue idx_auth = idx end if idx_auth == 0 - @logmsg("no supported auth method found") + @debug("no supported auth method found") throw(HadoopRpcException("no supported authentication method found")) end auth = resp.auths[idx_auth] @@ -145,9 +145,9 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) protocol = auth.protocol serverId = auth.serverId challenge = auth.challenge - @logmsg("auth.protocol: $protocol, serverId: $serverId, challenge: $challenge") + @debug("auth.protocol: $protocol, serverId: $serverId, challenge: $challenge") response = digmd5_respond(token, auth.protocol, auth.serverId, auth.challenge) - @logmsg("response: $response") + @debug("response: $response") # send response as a sasl initiate request respauth = protobuild(RpcSaslProto_SaslAuth, Dict(:method => auth.method, @@ -163,7 +163,7 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) # expect a success response if resp.state != RpcSaslProto_SaslState.SUCCESS - @logmsg("error completing SASL auth. state: $(resp.state). expected state: $(RpcSaslProto_SaslState.SUCCESS)") + @debug("error completing SASL auth. state: $(resp.state). expected state: $(RpcSaslProto_SaslState.SUCCESS)") throw(HadoopRpcException("error completing SASL auth. state: $(resp.state). expected state: $(RpcSaslProto_SaslState.SUCCESS)")) end nothing