diff --git a/.travis.yml b/.travis.yml index 64677ee..c5399d5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,17 +4,13 @@ dist: trusty os: - linux julia: - - 0.6 + - 0.7 - nightly -matrix: - fast_finish: true - allow_failures: - - julia: nightly notifications: email: false script: - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi - - git clone -b 'v0.0.3' --single-branch https://github.com/JuliaCI/PkgEvalHadoopEnv.git ./test/test_setup + - git clone -b 'v0.0.5' --single-branch https://github.com/JuliaCI/PkgEvalHadoopEnv.git ./test/test_setup - ./test/test_setup/hadoop/setup_hdfs.sh - julia --check-bounds=yes -e 'Pkg.clone(pwd()); Pkg.build("Elly"); Pkg.test("Elly"; coverage=true)' after_success: diff --git a/REQUIRE b/REQUIRE index fbbdf8a..d907971 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,6 +1,5 @@ -julia 0.6.0 -Compat 0.17.0 +julia 0.7.0- +Compat 0.70.0 ProtoBuf URIParser -CRC MbedTLS diff --git a/YARN.md b/YARN.md index ed8a495..287ae08 100644 --- a/YARN.md +++ b/YARN.md @@ -76,7 +76,7 @@ julia> function on_finish(cid) end on_finish (generic function with 1 method) -julia> callback(yarnam, Nullable(on_alloc), Nullable(on_finish)) +julia> callback(yarnam, on_alloc, on_finish) julia> yarnapp = submit(yarnclnt, yarnam) YarnApp YARN (EllyApp/2): accepted-0.0 diff --git a/src/Elly.jl b/src/Elly.jl index 532f9bd..81c8850 100644 --- a/src/Elly.jl +++ b/src/Elly.jl @@ -5,20 +5,26 @@ module Elly using Compat using ProtoBuf using URIParser -using CRC using MbedTLS -import Base: connect, readdir, show, isfile, isdir, islink, stat, filesize, filemode, mtime, mkdir, mkpath, - mv, rm, abspath, cd, pwd, touch, open, nb_available, cp, joinpath, dirname, - eof, position, seek, seekend, seekstart, skip, read, write, read!, close, - launch, manage, convert +# using from stdlib +using CRC32c +using Random +using UUIDs +using Base64 +using Dates +using Sockets +using Serialization +import Sockets: connect +using Distributed +import Distributed: launch, manage + +import Base: readdir, show, isfile, isdir, islink, stat, filesize, filemode, mtime, mkdir, mkpath, + mv, rm, abspath, cd, pwd, touch, open, bytesavailable, cp, joinpath, dirname, + eof, position, seek, seekend, seekstart, skip, read, write, read!, close, convert import ProtoBuf: write_bytes, read_bytes, call_method import URIParser: URI -if VERSION >= v"0.7.0-" - using CRC32c -end - export show, convert, URI export UserGroupInformation, add_token, find_tokens, username @@ -28,7 +34,7 @@ export HDFSClient, HDFSFile, HDFSFileInfo, hdfs_status, hdfs_capacity, hdfs_capacity_used, hdfs_capacity_remaining, hdfs_renewlease, isfile, isdir, islink, stat, filesize, filemode, mtime, atime, du, exists, readdir, mkdir, mkpath, touch, mv, rm, abspath, cd, pwd, joinpath, dirname, - eof, position, seek, seekend, seekstart, skip, nb_available, + eof, position, seek, seekend, seekstart, skip, bytesavailable, read!, read, write, readbytes, readall, open, close, cp export YarnClient, YarnNode, YarnApp, YarnAppStatus, YarnAppAttemptStatus, nodecount, nodes, launchcontext, submit, kill, status, attempts, am_rm_token @@ -40,28 +46,6 @@ export YarnManager, launch, manage const Lock = Channel makelock() = Channel{Int}(1) -function tstr() - t = time() - string(Libc.strftime("%Y-%m-%dT%H:%M:%S",t), Libc.strftime("%z",t)[1:end-2], ":", Libc.strftime("%z",t)[end-1:end]) -end - -# enable logging only during debugging -#using Logging -##const logger = Logging.configure(filename="elly.log", level=DEBUG) -#const logger = Logging.configure(level=DEBUG) -#macro logmsg(s) -# quote -# debug("[", myid(), "-", "] ", $(esc(s))) -# end -#end -macro logmsg(s) -end -#macro logmsg(s) -# quote -# info(tstr(), " [", myid(), "-", "] ", $(esc(s))) -# end -#end - include("hadoop/hadoop.jl") using Elly.hadoop using Elly.hadoop.common diff --git a/src/api_hdfs_base.jl b/src/api_hdfs_base.jl index 30d422d..963d677 100644 --- a/src/api_hdfs_base.jl +++ b/src/api_hdfs_base.jl @@ -27,11 +27,11 @@ It also stores the folder context for using relative paths in APIs that use the mutable struct HDFSClient nn_conn::HDFSProtocol wd::AbstractString - server_defaults::Nullable{FsServerDefaultsProto} + server_defaults::Union{Nothing,FsServerDefaultsProto} function HDFSClient(host::AbstractString, port::Integer, ugi::UserGroupInformation=UserGroupInformation()) nn_conn = HDFSProtocol(host, port, ugi) - new(nn_conn, "/", Nullable{FsServerDefaultsProto}()) + new(nn_conn, "/", nothing) end end @@ -98,7 +98,7 @@ mutable struct HDFSFileInfo last_mod::UInt64 last_access::UInt64 - HDFSFileInfo(fs::HdfsFileStatusProto) = new(fs.fileType, String(fs.path), + HDFSFileInfo(fs::HdfsFileStatusProto) = new(fs.fileType, String(copy(fs.path)), fs.length, fs.block_replication, fs.blocksize, fs.owner, fs.group, fs.permission.perm, fs.modification_time, fs.access_time) @@ -163,7 +163,7 @@ function _get_file_info(client::HDFSClient, path::AbstractString) path = abspath(client, path) inp = protobuild(GetFileInfoRequestProto, Dict(:src => path)) resp = getFileInfo(client.nn_conn, inp) - isfilled(resp, :fs) ? Nullable{HdfsFileStatusProto}(resp.fs) : Nullable{HdfsFileStatusProto}() + isfilled(resp, :fs) ? resp.fs : nothing end function _get_block_locations(client::HDFSClient, path::AbstractString, offset::UInt64=zero(UInt64), length::UInt64=zero(UInt64)) @@ -175,18 +175,18 @@ function _get_block_locations(client::HDFSClient, path::AbstractString, offset:: :offset => offset, :length => length)) resp = getBlockLocations(client.nn_conn, inp) - isfilled(resp, :locations) ? Nullable{LocatedBlocksProto}(resp.locations) : Nullable{LocatedBlocksProto}() + isfilled(resp, :locations) ? resp.locations : nothing end # # Server defaults function _get_server_defaults(client::HDFSClient) - if isnull(client.server_defaults) + if client.server_defaults === nothing inp = GetServerDefaultsRequestProto() resp = getServerDefaults(client.nn_conn, inp) - client.server_defaults = Nullable(resp.serverDefaults) + client.server_defaults = resp.serverDefaults end - get(client.server_defaults) + client.server_defaults end hdfs_server_defaults(client::HDFSClient) = _as_dict(_get_server_defaults(client)) @@ -208,14 +208,14 @@ hdfs_capacity_remaining(client::HDFSClient) = _get_fs_status(client).remaining stat(file::HDFSFile) = stat(file.client, file.path) function stat(client::HDFSClient, path::AbstractString) fileinfo = _get_file_info(client, path) - isnull(fileinfo) && throw(HDFSException("Path not found $path")) - hdfs_file_info = HDFSFileInfo(get(fileinfo)) + (fileinfo === nothing) && throw(HDFSException("Path not found $path")) + hdfs_file_info = HDFSFileInfo(fileinfo) hdfs_file_info.name = path hdfs_file_info end exists(file::HDFSFile) = exists(file.client, file.path) -exists(client::HDFSClient, path::AbstractString) = !isnull(_get_file_info(client, path)) +exists(client::HDFSClient, path::AbstractString) = (_get_file_info(client, path) !== nothing) isdir(file::HDFSFile) = isdir(file.client, file.path) isdir(client::HDFSClient, path::AbstractString) = isdir(stat(client, path)) @@ -248,9 +248,8 @@ atime(fileinfo::HDFSFileInfo) = fileinfo.last_access hdfs_blocks(file::HDFSFile, offset::Integer=0, length::Integer=typemax(Int)) = hdfs_blocks(file.client, file.path, offset, length) function hdfs_blocks(client::HDFSClient, path::AbstractString, offset::Integer=0, length::Integer=typemax(Int)) blocks = Tuple{UInt64,Array}[] - _locations = _get_block_locations(client, path, UInt64(offset), UInt64(length)) - isnull(_locations) && (return blocks) - locations = get(_locations) + locations = _get_block_locations(client, path, UInt64(offset), UInt64(length)) + (locations === nothing) && (return blocks) for block in locations.blocks block.corrupt && throw(HDFSException("Corrupt block found at offset $(block.offset)")) node_ips = AbstractString[] @@ -283,10 +282,10 @@ function _get_content_summary(client::HDFSClient, path::AbstractString) resp.summary end -du(file::HDFSFile, details::Nullable{Dict{Symbol,Any}}=Nullable{Dict{Symbol,Any}}()) = du(file.client, file.path, details) -function du(client::HDFSClient, path::AbstractString=".", details::Nullable{Dict{Symbol,Any}}=Nullable{Dict{Symbol,Any}}()) +du(file::HDFSFile, details::Union{Nothing,Dict{Symbol,Any}}=nothing) = du(file.client, file.path, details) +function du(client::HDFSClient, path::AbstractString=".", details::Union{Nothing,Dict{Symbol,Any}}=nothing) summary = _get_content_summary(client, path) - isnull(details) || _as_dict(summary, get(details)) + (details === nothing) || _as_dict(summary, details) summary.length end @@ -294,9 +293,9 @@ end # File create, delete, list readdir(file::HDFSFile, limit::Int=typemax(Int)) = readdir(file.client, file.path, limit) function readdir(client::HDFSClient, path::AbstractString=".", limit::Int=typemax(Int)) - result = AbstractString[] + result = String[] _walkdir(client, path, (filestatus)->begin - push!(result, String(filestatus.path)) + push!(result, String(copy(filestatus.path))) (length(result) < limit) end) result @@ -376,40 +375,40 @@ function _create_file(client::HDFSClient, path::AbstractString, overwrite::Bool= :blockSize => blocksz)) resp = create(client.nn_conn, inp) - isfilled(resp, :fs) || (return Nullable{HdfsFileStatusProto}()) + isfilled(resp, :fs) || (return nothing) if docomplete - _complete_file(client, path) || (return Nullable{HdfsFileStatusProto}()) + _complete_file(client, path) || (return nothing) end - return Nullable(resp.fs) + return resp.fs end -function _complete_file(client::HDFSClient, path::AbstractString, last::Nullable{ExtendedBlockProto}=Nullable{ExtendedBlockProto}()) +function _complete_file(client::HDFSClient, path::AbstractString, last::Union{Nothing,ExtendedBlockProto}=nothing) path = abspath(client, path) endinp = protobuild(CompleteRequestProto, Dict(:src => path, :clientName => ELLY_CLIENTNAME)) - if !isnull(last) - set_field!(endinp, :last, get(last)) - @logmsg("setting last block as $(get(last))") + if last !== nothing + set_field!(endinp, :last, last) + @debug("setting last block as $(last)") end endresp = complete(client.nn_conn, endinp) endresp.result end -function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Nullable{T}=Nullable{T}()) where T<:LocatedBlockProto - isnull(previous) && (return _add_block(ExtendedBlockProto, client, path)) - @logmsg("adding next block to $(get(previous).b)") - _add_block(ExtendedBlockProto, client, path, Nullable(get(previous).b)) +function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:LocatedBlockProto + (previous === nothing) && (return _add_block(ExtendedBlockProto, client, path)) + @debug("adding next block to $(previous.b)") + _add_block(ExtendedBlockProto, client, path, previous.b) end -function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Nullable{T}=Nullable{T}()) where T<:ExtendedBlockProto +function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:ExtendedBlockProto path = abspath(client, path) inp = protobuild(AddBlockRequestProto, Dict(:src => path, :clientName => ELLY_CLIENTNAME)) - isnull(previous) || set_field!(inp, :previous, get(previous)) + (previous === nothing) || set_field!(inp, :previous, previous) resp = addBlock(client.nn_conn, inp) return resp.block @@ -430,7 +429,7 @@ touch(file::HDFSFile, replication::UInt32=zero(UInt32), blocksz::UInt64=zero(UIn function touch(client::HDFSClient, path::AbstractString, replication::UInt32=zero(UInt32), blocksz::UInt64=zero(UInt64), mode::UInt32=DEFAULT_FILE_MODE) if exists(client, path) path = abspath(client, path) - t = UInt64(Base.Dates.datetime2unix(now(Base.Dates.UTC))*1000) + t = UInt64(datetime2unix(now(UTC))*1000) inp = protobuild(SetTimesRequestProto, Dict(:src => path, :mtime => t, :atime => t)) @@ -438,7 +437,7 @@ function touch(client::HDFSClient, path::AbstractString, replication::UInt32=zer setTimes(client.nn_conn, inp) else fs = _create_file(client, path, false, replication, blocksz, mode) - isnull(fs) && throw(HDFSException("Could not create file $path")) + (fs === nothing) && throw(HDFSException("Could not create file $path")) end nothing end diff --git a/src/api_hdfs_io.jl b/src/api_hdfs_io.jl index e7578b4..75f0000 100644 --- a/src/api_hdfs_io.jl +++ b/src/api_hdfs_io.jl @@ -15,23 +15,22 @@ mutable struct HDFSFileReader <: IO path::AbstractString size::UInt64 block_sz::UInt64 - crc_check::Bool # whether to check CRC with file operations - buffer::IOBuffer # to hold excess data read from HDFSBlockReader (should use a bucket brigade) - fptr::UInt64 # absolute offset in file we start (0:filesize. filesize => eof) - blocks::Nullable{LocatedBlocksProto} # block metadata around the current fptr (if connected) - blk_reader::Nullable{HDFSBlockReader} # current block reader (if connected) + crc_check::Bool # whether to check CRC with file operations + buffer::IOBuffer # to hold excess data read from HDFSBlockReader (should use a bucket brigade) + fptr::UInt64 # absolute offset in file we start (0:filesize. filesize => eof) + blocks::Union{Nothing,LocatedBlocksProto} # block metadata around the current fptr (if connected) + blk_reader::Union{Nothing,HDFSBlockReader} # current block reader (if connected) function HDFSFileReader(client::HDFSClient, path::AbstractString, offset::UInt64=zero(UInt64), check_crc::Bool=false) path = abspath(client, path) - nfile_status = _get_file_info(client, path) - isnull(nfile_status) && throw(HDFSException("File not found: $path")) - file_status = get(nfile_status) + file_status = _get_file_info(client, path) + (file_status === nothing) && throw(HDFSException("File not found: $path")) (file_status.fileType == HdfsFileStatusProto_FileType.IS_FILE) || throw(HDFSException("Not a regular file: $path")) size = file_status.length block_sz = file_status.blocksize - new(client, path, size, block_sz, check_crc, PipeBuffer(), offset, Nullable{LocatedBlocksProto}(), Nullable{HDFSBlockReader}()) + new(client, path, size, block_sz, check_crc, PipeBuffer(), offset, nothing, nothing) end end @@ -50,7 +49,7 @@ show(io::IO, reader::HDFSFileReader) = println(io, "HDFSFileReader: $(URI(reader eof(reader::HDFSFileReader) = (reader.fptr+1 > reader.size) position(reader::HDFSFileReader) = reader.fptr filesize(reader::HDFSFileReader) = reader.size -nb_available(reader::HDFSFileReader) = filesize(reader) - position(reader) +bytesavailable(reader::HDFSFileReader) = filesize(reader) - position(reader) function seek(reader::HDFSFileReader, n::Integer) (n == reader.fptr) && return @@ -58,11 +57,7 @@ function seek(reader::HDFSFileReader, n::Integer) reader.fptr = n # TODO: do not disconnect if new fptr is still within the reach if isconnected(reader) - reuse = false - if !isnull(reader.blk_reader) - blk_reader = get(reader.blk_reader) - reuse = eof(blk_reader) - end + reuse = (reader.blk_reader !== nothing) ? eof(reader.blk_reader) : false disconnect(reader, reuse) end nothing @@ -79,68 +74,62 @@ function _find_block(blocks::LocatedBlocksProto, offset::UInt64, block_sz::UInt6 for block in blockarr if _block_has_offset(block, offset, block_sz) block.corrupt && throw(HDFSException("Corrupt block found at offset $(block.offset)")) - return Nullable(block) + return block end end end - Nullable{LocatedBlockProto}() + nothing end function _current_window(reader::HDFSFileReader) - isnull(reader.blocks) && (return 0:-1) - blocks = get(reader.blocks) - + (reader.blocks === nothing) && (return 0:-1) + blocks = reader.blocks block = blocks.blocks[end] window_max = block.offset + block.b.numBytes window_min = blocks.blocks[1].offset window_min:(window_max-1) end -isconnected(reader::HDFSFileReader) = !isnull(reader.blk_reader) +isconnected(reader::HDFSFileReader) = (reader.blk_reader !== nothing) function disconnect(reader::HDFSFileReader, reuse::Bool) - @logmsg("disconnecting $(URI(reader,true)) reuse:$reuse") - if !isnull(reader.blocks) - reader.blocks = Nullable{LocatedBlocksProto}() - end - if !isnull(reader.blk_reader) - blk_reader = get(reader.blk_reader) - isconnected(blk_reader) && disconnect(blk_reader, reuse) - reader.blk_reader = Nullable{HDFSBlockReader}() + @debug("disconnecting", uri=URI(reader,true), reuse=reuse) + reader.blocks = nothing + if reader.blk_reader !== nothing + isconnected(reader.blk_reader) && disconnect(reader.blk_reader, reuse) + reader.blk_reader = nothing end reader.buffer = PipeBuffer() nothing end function connect(reader::HDFSFileReader) - @logmsg("connect if required $(URI(reader,true))") + @debug("connect if required", uri=URI(reader,true)) eof(reader) && return isconnected(reader) && return - @logmsg("connecting $(URI(reader,true))") + @debug("connecting", uri=URI(reader,true)) if !(reader.fptr in _current_window(reader)) - @logmsg("moving block window for $(URI(reader,true))") - len = min(nb_available(reader), HDFS_READER_WINDOW_LENGTH) - nblocks = _get_block_locations(reader.client, reader.path, reader.fptr, len) - isnull(nblocks) && throw(HDFSException("Could not get block metadata for $(URI(reader,true))")) - blocks = get(nblocks) + @debug("moving block window", uri=URI(reader,true)) + len = min(bytesavailable(reader), HDFS_READER_WINDOW_LENGTH) + blocks = _get_block_locations(reader.client, reader.path, reader.fptr, len) + (blocks === nothing) && throw(HDFSException("Could not get block metadata for $(URI(reader,true))")) reader.blocks = blocks else - blocks = get(reader.blocks) + blocks = reader.blocks end - nblock = _find_block(blocks, reader.fptr, reader.block_sz) - isnull(nblock) && throw(HDFSException("Could not get block metadata for $(URI(reader,true))")) - block = get(nblock) + block = _find_block(blocks, reader.fptr, reader.block_sz) + (block === nothing) && throw(HDFSException("Could not get block metadata for $(URI(reader,true))")) block.corrupt && throw(HDFSException("Corrupt block found for $(URI(reader,true))")) - nnode_info = _select_node_for_block(block) - isnull(nnode_info) && throw(HDFSException("Could not get a valid datanode for $(URI(reader,true))")) - node = get(nnode_info).id + node_info = _select_node_for_block(block) + (node_info === nothing) && throw(HDFSException("Could not get a valid datanode for $(URI(reader,true))")) + node = node_info.id block_len = block.b.numBytes offset_into_block = (reader.fptr % reader.block_sz) len = block_len - offset_into_block - reader.blk_reader = Nullable(HDFSBlockReader(node.ipAddr, node.xferPort, block, offset_into_block, len, reader.crc_check)) + reader.blk_reader = HDFSBlockReader(node.ipAddr, node.xferPort, block, offset_into_block, len, reader.crc_check) nothing end @@ -150,26 +139,26 @@ function _read_and_buffer(reader::HDFSFileReader, out::Vector{UInt8}, offset::UI # while data remains to be copied while len > 0 # set/reset block reader if required - if !isnull(reader.blk_reader) - blk_reader = get(reader.blk_reader) + if reader.blk_reader !== nothing + blk_reader = reader.blk_reader if eof(blk_reader) disconnect(blk_reader, true) - reader.blk_reader = Nullable{HDFSBlockReader}() + reader.blk_reader = nothing connect(reader) end else connect(reader) end - blk_reader = get(reader.blk_reader) + blk_reader = reader.blk_reader # read packet from reader ret = read_packet!(blk_reader, out, offset) # try to read directly into output if ret < 0 pkt_len = len + UInt64(abs(ret)) # bytes in this packet - buff = Vector{UInt8}(pkt_len) # allocate a temporary array - @logmsg("allocated temporary array of size $pkt_len, len:$len, ret:$ret, offset:$offset, bufflen:$(length(buff)), outlen:$(length(out))") + buff = Vector{UInt8}(undef, pkt_len) # allocate a temporary array + @debug("allocated temporary array", size=pkt_len, len=len, ret=ret, offset=offset, bufflen=length(buff), outlen=length(out)) ret = read_packet!(blk_reader, buff, UInt64(1)) # read complete packet - copy!(out, offset, buff, 1, len) # copy len bytes to output + copyto!(out, offset, buff, 1, len) # copy len bytes to output Base.write_sub(reader.buffer, buff, len+1, pkt_len-len) # copy remaining data to buffer copylen = len len = 0 # we definitely do not need to read any more @@ -181,12 +170,12 @@ function _read_and_buffer(reader::HDFSFileReader, out::Vector{UInt8}, offset::UI reader.fptr += copylen end catch ex - if !isnull(reader.blk_reader) - blk_reader = get(reader.blk_reader) + if reader.blk_reader !== nothing + blk_reader = reader.blk_reader channel = blk_reader.channel - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port)", ex=ex) else - @logmsg("exception receiving: $ex") + @debug("exception receiving", ex=ex) end disconnect(reader, false) rethrow(ex) @@ -196,8 +185,8 @@ end function close(reader::HDFSFileReader) reuse = false - if !isnull(reader.blk_reader) - blk_reader = get(reader.blk_reader) + if reader.blk_reader !== nothing + blk_reader = reader.blk_reader reuse = eof(blk_reader) end disconnect(reader, reuse) @@ -214,20 +203,20 @@ function read!(reader::HDFSFileReader, a::Vector{UInt8}) reader.fptr += 1 break end - navlb = nb_available(reader.buffer) + navlb = bytesavailable(reader.buffer) # if buffer empty, fill buffer if navlb == 0 if (reader.fptr + remaining) > reader.size canread = reader.size - reader.fptr - tb = Vector{UInt8}(Int(canread/sizeof(UInt8))) + tb = Vector{UInt8}(undef, Int(canread/sizeof(UInt8))) nbytes = _read_and_buffer(reader, tb, UInt64(1), canread) - copy!(a, offset, tb, 1, length(tb)) + copyto!(a, offset, tb, 1, length(tb)) else nbytes = _read_and_buffer(reader, a, offset, remaining) end else nbytes = min(navlb, remaining) - @logmsg("reading $nbytes from buffer. navlb:$navlb remaining:$remaining, offset:$offset") + @debug("reading $nbytes from buffer", nbytes=nbytes, navlb=navlb, remaining=remaining, offset=offset) Base.read_sub(reader.buffer, a, offset, nbytes) reader.fptr += nbytes end @@ -239,10 +228,10 @@ function read!(reader::HDFSFileReader, a::Vector{UInt8}) (remaining > 0) && throw(EOFError()) a end -const _a = Vector{UInt8}(1) +const _a = Vector{UInt8}(undef, 1) read(reader::HDFSFileReader, ::Type{UInt8}) = (read!(reader, _a); _a[1]) -readbytes(reader::HDFSFileReader, nb::Integer) = read!(reader, Vector{UInt8}(nb)) -readall(reader::HDFSFileReader) = readbytes(reader, nb_available(reader)) +readbytes(reader::HDFSFileReader, nb::Integer) = read!(reader, Vector{UInt8}(undef, nb)) +readall(reader::HDFSFileReader) = readbytes(reader, bytesavailable(reader)) """ # HDFSFileWriter @@ -266,15 +255,15 @@ mutable struct HDFSFileWriter <: IO client::HDFSClient path::AbstractString fptr::UInt64 - blk::Nullable{LocatedBlockProto} - blk_writer::Nullable{HDFSBlockWriter} + blk::Union{Nothing,LocatedBlockProto} + blk_writer::Union{Nothing,HDFSBlockWriter} function HDFSFileWriter(client::HDFSClient, path::AbstractString) path = abspath(client, path) path_exists = exists(client, path) fs = _create_file(client, path, path_exists, zero(UInt32), zero(UInt64), DEFAULT_FILE_MODE, false) - isnull(fs) && throw(HDFSException("Error openeing $path for write")) - new(client, path, 0, Nullable{LocatedBlockProto}(), Nullable{HDFSBlockWriter}()) + (fs === nothing) && throw(HDFSException("Error openeing $path for write")) + new(client, path, 0, nothing, nothing) end end @@ -297,20 +286,20 @@ function _write(writer::HDFSFileWriter, data::T) where T<:Union{UInt8,Vector{UIn L = rem_len = length(data) while rem_len > 0 - if isnull(writer.blk_writer) + if writer.blk_writer === nothing blk = _add_block(LocatedBlockProto, writer.client, writer.path, writer.blk) blk_writer = HDFSBlockWriter(blk, _get_server_defaults(writer.client)) - writer.blk_writer = Nullable(blk_writer) - writer.blk = Nullable(blk) + writer.blk_writer = blk_writer + writer.blk = blk else - blk_writer = get(writer.blk_writer) + blk_writer = writer.blk_writer end rem_len -= write(blk_writer, rem_data) if rem_len > 0 flush(blk_writer) disconnect(blk_writer, true) - writer.blk_writer = Nullable{HDFSBlockWriter}() + writer.blk_writer = nothing (T <: UInt8) || (rem_data = unsafe_wrap(Array, pointer(data, L-rem_len+1), rem_len)) end end @@ -320,12 +309,12 @@ write(writer::HDFSFileWriter, data::UInt8) = _write(writer, data) write(writer::HDFSFileWriter, data::Vector{UInt8}) = _write(writer, data) function close(writer::HDFSFileWriter) - if !isnull(writer.blk_writer) - blk_writer = get(writer.blk_writer) + if writer.blk_writer !== nothing + blk_writer = writer.blk_writer flush(blk_writer) disconnect(blk_writer, true) - _complete_file(writer.client, writer.path, Nullable(blk_writer.block.b)) - writer.blk_writer = Nullable{HDFSBlockWriter}() + _complete_file(writer.client, writer.path, blk_writer.block.b) + writer.blk_writer = nothing end nothing end @@ -363,6 +352,8 @@ function open(fn::Function, file::Union{HDFSFile,HDFSClient}, args...; kwargs... try io = open(file, args...; kwargs...) return fn(io) + catch ex + rethrow(ex) finally close(io) end @@ -385,7 +376,7 @@ function cp(frompath::Union{HDFSFile,AbstractString}, topath::Union{HDFSFile,Abs end buff_sz = 64*1024*1024 - buff = Vector{UInt8}(buff_sz) + buff = Vector{UInt8}(undef, buff_sz) brem = btot = (len == 0) ? (filesize(fromfile)-offset) : len while brem > 0 bread = min(brem, buff_sz) @@ -393,7 +384,7 @@ function cp(frompath::Union{HDFSFile,AbstractString}, topath::Union{HDFSFile,Abs read!(fromfile, buff) write(tofile, buff) brem -= bread - @logmsg("remaining $brem/$btot") + @debug("remaining $brem/$btot") end close(fromfile) close(tofile) diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index 183fd18..272f58a 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -36,7 +36,7 @@ mutable struct YarnAppMaster response_id::Int32 - registration::Nullable{RegisterApplicationMasterResponseProto} + registration::Union{Nothing,RegisterApplicationMasterResponseProto} lck::Lock function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation, @@ -49,7 +49,7 @@ mutable struct YarnAppMaster Int32(0), Int32(0), Int32(0), Int32(0), YarnNodes(ugi), YarnContainers(), "", true, 1, - Nullable{RegisterApplicationMasterResponseProto}(), lck) + nothing, lck) end end @@ -57,6 +57,8 @@ function withlock(fn, yam) try take!(yam.lck) return fn() + catch ex + rethrow(ex) finally put!(yam.lck, 1) end @@ -71,11 +73,11 @@ function show(io::IO, yam::YarnAppMaster) show(io, yam.containers) end -callback(yam::YarnAppMaster, on_container_alloc::Nullable, on_container_finish::Nullable) = +callback(yam::YarnAppMaster, on_container_alloc::Union{Nothing,Function}, on_container_finish::Union{Nothing,Function}) = callback(yam.containers, on_container_alloc, on_container_finish) function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster) - @logmsg("submitting unmanaged application") + @debug("submitting unmanaged application") clc = launchcontext() app = submit(client, clc, YARN_CONTAINER_MEM_DEFAULT, YARN_CONTAINER_CPU_DEFAULT; unmanaged=true) @@ -108,13 +110,13 @@ function register(yam::YarnAppMaster) resp = withlock(yam) do registerApplicationMaster(yam.amrm_conn, inp) end - yam.registration = Nullable(resp) + yam.registration = resp if isfilled(resp, :maximumCapability) yam.max_mem = resp.maximumCapability.memory yam.max_cores = resp.maximumCapability.virtual_cores end - @logmsg("max capability: mem:$(yam.max_mem), cores:$(yam.max_cores)") + @debug("max capability: mem:$(yam.max_mem), cores:$(yam.max_cores)") if isfilled(resp, :queue) yam.queue = resp.queue end @@ -140,7 +142,7 @@ function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::Abstra resp = withlock(yam) do finishApplicationMaster(yam.amrm_conn, inp) end - resp.isUnregistered && (yam.registration = Nullable{RegisterApplicationMasterResponseProto}()) + resp.isUnregistered && (yam.registration = nothing) resp.isUnregistered end @@ -153,7 +155,7 @@ container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_relea container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec) function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto) - @logmsg("starting container $(container)") + @debug("starting container $(container)") req = protobuild(StartContainerRequestProto, Dict(:container_launch_context => container_spec, :container_token => container.container_token)) inp = protobuild(StartContainersRequestProto, Dict(:start_container_request => [req])) @@ -164,6 +166,8 @@ function container_start(yam::YarnAppMaster, container::ContainerProto, containe try resp = startContainers(nm_conn, inp) success = true + catch ex + rethrow(ex) finally release_connection(yam.nodes, nodeid, nm_conn, success) end @@ -177,7 +181,7 @@ end container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, yam.containers.containers[cid]) function container_stop(yam::YarnAppMaster, container::ContainerProto) - @logmsg("stopping container $container") + @debug("stopping container $container") inp = protobuild(StopContainersRequestProto, Dict(:container_id => [container.id])) nodeid = container.nodeId @@ -187,6 +191,8 @@ function container_stop(yam::YarnAppMaster, container::ContainerProto) try resp = stopContainers(nm_conn, inp) success = true + catch ex + rethrow(ex) finally release_connection(yam.nodes, nodeid, nm_conn, success) end @@ -199,13 +205,13 @@ function container_stop(yam::YarnAppMaster, container::ContainerProto) end function _update_rm(yam::YarnAppMaster) - @logmsg("started processing am-rm messages") + @debug("started processing am-rm messages") inp = AllocateRequestProto() # allocation and release requests (alloc_pending,release_pending) = torequest(yam.containers) - @logmsg("alloc pending: $alloc_pending") - @logmsg("release pending: $release_pending") + @debug("alloc pending: $alloc_pending") + @debug("release pending: $release_pending") !isempty(alloc_pending) && set_field!(inp, :ask, alloc_pending) !isempty(release_pending) && set_field!(inp, :release, release_pending) @@ -217,11 +223,11 @@ function _update_rm(yam::YarnAppMaster) end set_field!(inp, :response_id, yam.response_id) - @logmsg(inp) + #@debug(inp) resp = withlock(yam) do allocate(yam.amrm_conn, inp) end - @logmsg(resp) + #@debug(resp) # store/update tokens channel = yam.amrm_conn.channel @@ -242,17 +248,17 @@ function _update_rm(yam::YarnAppMaster) # update node and container status update(yam.nodes, resp) update(yam.containers, resp) - @logmsg("finished processing am-rm messages") - @logmsg(yam) + @debug("finished processing am-rm messages") + #@debug(yam) nothing end haverequests(yam::YarnAppMaster) = haverequests(yam.containers) function process_am_rm(yam::YarnAppMaster) - @logmsg("started am-rm processor task") - stopped = ()->isnull(yam.registration) - stopwaiting = ()->(haverequests(yam) || isnull(yam.registration)) + @debug("started am-rm processor task") + stopped = ()->(yam.registration === nothing) + stopwaiting = ()->(haverequests(yam) || (yam.registration === nothing)) waittime = 10. while !stopped() t1 = time() @@ -266,6 +272,6 @@ function process_am_rm(yam::YarnAppMaster) (waittime < 10) && sleep(10-waittime) end end - @logmsg("stopped am-rm processor task") + @debug("stopped am-rm processor task") nothing end diff --git a/src/api_yarn_base.jl b/src/api_yarn_base.jl index 04478c2..1d1d5d8 100644 --- a/src/api_yarn_base.jl +++ b/src/api_yarn_base.jl @@ -174,6 +174,8 @@ function _new_or_existing_conn(nodes::YarnNodes, nodeid::NodeIdProto) if t > (lastusetime + nodes.keepalivesecs) try disconnect(conn.channel) + catch + # ignore exception finally conn = YarnAMNMProtocol(nodeid.host, nodeid.port, nodes.ugi) lastusetime = t @@ -181,6 +183,8 @@ function _new_or_existing_conn(nodes::YarnNodes, nodeid::NodeIdProto) end nodes.conn[nodeid] = (conn, lastusetime, lck) return (conn, lastusetime, lck) + catch ex + rethrow(ex) finally unlock(nodes.lck) end @@ -204,6 +208,8 @@ function release_connection(nodes::YarnNodes, nodeid::NodeIdProto, conn::YarnAMN if !reuse try disconnect(conn.channel) + catch + # ignore exceptions end end @@ -222,6 +228,8 @@ function release_connection(nodes::YarnNodes, nodeid::NodeIdProto, conn::YarnAMN nodes.conn[nodeid] = (conn, time(), ReentrantLock()) end end + catch ex + rethrow(ex) finally unlock(nodes.lck) end @@ -269,13 +277,12 @@ mutable struct YarnContainers release_pipeline::RequestPipeline{ContainerIdProto} ndesired::Int - on_container_alloc::Nullable{Function} - on_container_finish::Nullable{Function} + on_container_alloc::Union{Nothing,Function} + on_container_finish::Union{Nothing,Function} function YarnContainers() new(Dict{ContainerIdProto,ContainerProto}(), Dict{ContainerIdProto,ContainerStatusProto}(), Set{ContainerIdProto}(), Set{ContainerIdProto}(), - RequestPipeline{ResourceRequestProto}(), RequestPipeline{ContainerIdProto}(), 0, - Nullable{Function}(), Nullable{Function}()) + RequestPipeline{ResourceRequestProto}(), RequestPipeline{ContainerIdProto}(), 0, nothing, nothing) end end @@ -284,7 +291,7 @@ function show(io::IO, containers::YarnContainers) nothing end -function callback(containers::YarnContainers, on_container_alloc::Nullable, on_container_finish::Nullable) +function callback(containers::YarnContainers, on_container_alloc::Union{Nothing,Function}, on_container_finish::Union{Nothing,Function}) containers.on_container_alloc = on_container_alloc containers.on_container_finish = on_container_finish nothing @@ -303,23 +310,21 @@ function update(containers::YarnContainers, arp::AllocateResponseProto) id = cont.id contlist[id] = cont push!(active, id) - @logmsg("calling callback for alloc") - isnull(cballoc) || @async(get(cballoc)(id)) - #isnull(cballoc) || get(cballoc)(id) + @debug("calling callback for alloc") + (cballoc === nothing) || @async cballoc(id) end end if isfilled(arp, :completed_container_statuses) - @logmsg("have completed containers") + @debug("have completed containers") for contst in arp.completed_container_statuses id = contst.container_id - @logmsg("container $id is finished") + @debug("container $id is finished") status[id] = contst - @logmsg("id in active: $(id in active)") + @debug("id in active: $(id in active)") (id in active) && pop!(active, id) (id in busy) && pop!(busy, id) - @logmsg("calling callback for finish") - isnull(cbfinish) || @async(get(cbfinish)(id)) - #isnull(cbfinish) || get(cbfinish)(id) + @debug("calling callback for finish") + (cbfinish === nothing) || @async cbfinish(id) end end nothing diff --git a/src/api_yarn_client.jl b/src/api_yarn_client.jl index 60aef48..84ca5c0 100644 --- a/src/api_yarn_client.jl +++ b/src/api_yarn_client.jl @@ -121,11 +121,11 @@ YarnApp represents one instance of application running on the yarn cluster mutable struct YarnApp client::YarnClient appid::ApplicationIdProto - status::Nullable{YarnAppStatus} + status::Union{Nothing,YarnAppStatus} attempts::Array{YarnAppAttemptStatus} function YarnApp(client::YarnClient, appid::ApplicationIdProto) - new(client, appid, Nullable{YarnAppStatus}(), YarnAppAttemptStatus[]) + new(client, appid, nothing, YarnAppAttemptStatus[]) end end @@ -145,10 +145,10 @@ ATTEMPT_STATES: enum value to state map. Used for converting state for display. const ATTEMPT_STATES = [:new, :submitted, :scheduled, :scheduled, :allocated_saving, :allocated, :launched, :failed, :running, :finishing, :finished, :killed] function show(io::IO, app::YarnApp) - if isnull(app.status) + if app.status === nothing println(io, "YarnApp: $(app.appid.id)") else - show(io, get(app.status)) + show(io, app.status) end nothing end @@ -192,25 +192,25 @@ function kill(app::YarnApp) end function status(app::YarnApp, refresh::Bool=true) - if refresh || isnull(app.status) + if refresh || (app.status === nothing) client = app.client inp = protobuild(GetApplicationReportRequestProto, Dict(:application_id => app.appid)) resp = getApplicationReport(client.rm_conn, inp) - app.status = isfilled(resp.application_report) ? Nullable(YarnAppStatus(resp.application_report)) : Nullable{YarnAppStatus}() + app.status = isfilled(resp.application_report) ? YarnAppStatus(resp.application_report) : nothing end app.status end function wait_for_state(app::YarnApp, state::Int32, timeout_secs::Int=60) - @logmsg("waiting for application to reach $(APP_STATES[state]) ($state) state") + @debug("waiting for application to reach $(APP_STATES[state]) ($state) state") t1 = time() + timeout_secs finalstates = (YarnApplicationStateProto.KILLED, YarnApplicationStateProto.FAILED, YarnApplicationStateProto.FINISHED) isfinalstate = state in finalstates while time() < t1 nst = status(app) - if !isnull(nst) - appstate = get(nst).report.yarn_application_state + if nst !== nothing + appstate = nst.report.yarn_application_state (appstate == state) && (return true) isfinalstate || ((appstate in finalstates) && (return false)) end @@ -222,11 +222,11 @@ end # get am-rm token for an unmanaged app function am_rm_token(app::YarnApp) wait_for_state(app, YarnApplicationStateProto.ACCEPTED) || throw(YarnException("Application was not accepted")) - am_rm_token(get(status(app))) + am_rm_token(status(app)) end function attempts(app::YarnApp, refresh::Bool=true) - if refresh || isnull(app.attempts) + if refresh || isempty(app.attempts) client = app.client inp = protobuild(GetApplicationAttemptsRequestProto, Dict(:application_id => app.appid)) @@ -243,7 +243,7 @@ function attempts(app::YarnApp, refresh::Bool=true) end function wait_for_attempt_state(app::YarnApp, attemptid::Int32, state::Int32, timeout_secs::Int=60) - @logmsg("waiting for application attempt $attemptid to reach $(ATTEMPT_STATES[state]) ($state) state") + @debug("waiting for application attempt $attemptid to reach $(ATTEMPT_STATES[state]) ($state) state") t1 = time() + timeout_secs finalstates = (YarnApplicationAttemptStateProto.APP_ATTEMPT_KILLED, YarnApplicationAttemptStateProto.APP_ATTEMPT_FAILED, YarnApplicationAttemptStateProto.APP_ATTEMPT_FINISHED) isfinalstate = state in finalstates @@ -255,7 +255,7 @@ function wait_for_attempt_state(app::YarnApp, attemptid::Int32, state::Int32, ti atmptstate = report.yarn_application_attempt_state (atmptstate == state) && (return true) isfinalstate || ((atmptstate in finalstates) && (return false)) - @logmsg("application attempt $attemptid is in state $(ATTEMPT_STATES[atmptstate]) ($atmptstate) state") + @debug("application attempt $attemptid is in state $(ATTEMPT_STATES[atmptstate]) ($atmptstate) state") break end end diff --git a/src/cluster_manager.jl b/src/cluster_manager.jl index 5893562..5d83773 100644 --- a/src/cluster_manager.jl +++ b/src/cluster_manager.jl @@ -13,7 +13,7 @@ struct YarnManager <: ClusterManager params[n] = v end paramkeys = keys(params) - @logmsg("YarnManager constructor: params: $params") + @debug("YarnManager constructor: params: $params") user = (:user in paramkeys) ? params[:user] : "" rmport = (:rmport in paramkeys) ? params[:rmport] : 8032 @@ -41,20 +41,20 @@ function show(io::IO, yarncm::YarnManager) end function setup_worker(host, port, cookie) - @logmsg("YarnManager setup_worker: host:$host port:$port cookie:$cookie for container $(ENV[CONTAINER_ID])") + @debug("YarnManager setup_worker: host:$host port:$port cookie:$cookie for container $(ENV[CONTAINER_ID])") c = connect(IPv4(host), port) Base.wait_connected(c) redirect_stdout(c) redirect_stderr(c) # identify container id so that rmprocs can clean things up nicely if required - Base.serialize(c, ENV[CONTAINER_ID]) + serialize(c, ENV[CONTAINER_ID]) if cookie == nothing - Base.start_worker(c) + start_worker(c) else if isa(cookie, Symbol) cookie = string(cookie)[8:end] # strip the leading "cookie_" end - Base.start_worker(c, cookie) + start_worker(c, cookie) end end @@ -73,7 +73,7 @@ function _container(cid::String) end _envdict(envdict::Dict) = envdict -function _envdict(envhash::Base.EnvHash) +function _envdict(envhash::Base.EnvDict) envdict = Dict{String,String}() for (n,v) in envhash envdict[n] = v @@ -82,7 +82,7 @@ function _envdict(envhash::Base.EnvHash) end function _currprocname() - p = joinpath(Base.JULIA_HOME, Sys.get_process_title()) + p = joinpath(Sys.BINDIR, Sys.get_process_title()) exists(p) && (return p) ("_" in keys(ENV)) && contains(ENV["_"], "julia") && (return ENV["_"]) @@ -91,7 +91,7 @@ function _currprocname() end function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition) - @logmsg("YarnManager launch: params: $params") + @debug("YarnManager launch: params: $params") paramkeys = keys(params) np = (:np in paramkeys) ? params[:np] : 1 @@ -103,27 +103,28 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con priority = (:priority in paramkeys) ? params[:priority] : YARN_CONTAINER_PRIORITY_DEFAULT stdout_ios = manager.stdout_ios - (port, server) = listenany(11000) - rcv_stdouts = @schedule begin + ipaddr = getipaddr() + (port, server) = listenany(ipaddr, 11000) + rcv_stdouts = @async begin while length(stdout_ios) < np sock = accept(server) push!(stdout_ios, sock) end end - cookie = string(":cookie_", Base.cluster_cookie()) - initargs = "using Elly; Elly.setup_worker($(getipaddr().host), $(port), $(cookie))" + cookie = ":cookie_" * cluster_cookie() + initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port), $(cookie))" clc = launchcontext(cmd="$cmd -e '$initargs'", env=appenv) - @logmsg("YarnManager launch: initargs: $initargs") - @logmsg("YarnManager launch: context: $clc") + @debug("YarnManager launch: initargs: $initargs") + @debug("YarnManager launch: context: $clc") on_alloc = (cid) -> container_start(manager.am, cid, clc) - callback(manager.am, Nullable(on_alloc), Nullable()) + callback(manager.am, on_alloc, nothing) container_allocate(manager.am, convert(Int, np); mem=mem, cpu=cpu, loc=loc, priority=priority) tend = time() + manager.launch_timeout - while (time() < tend) && !Base.istaskdone(rcv_stdouts) + while (time() < tend) && !istaskdone(rcv_stdouts) sleep(1) end @@ -137,9 +138,9 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con config.io = io # keep the container id in userdata to be used with rmprocs if required - cidstr = Base.deserialize(io) + cidstr = deserialize(io) userdata = Dict(:container_id => _container(cidstr)) - config.userdata = Nullable(userdata) + config.userdata = userdata push!(configs, config) end @@ -155,7 +156,7 @@ end function manage(manager::YarnManager, id::Integer, config::WorkerConfig, op::Symbol) # This function needs to exist, but so far we don't do anything if op == :deregister - @logmsg("YarnManager manage: id:$id, op:$op, nprocs:$(nprocs())") + @debug("YarnManager manage: id:$id, op:$op, nprocs:$(nprocs())") !manager.keep_connected && (1 == nprocs()) && (@async disconnect(manager)) end nothing diff --git a/src/rpc.jl b/src/rpc.jl index bfea5ea..c2535b3 100644 --- a/src/rpc.jl +++ b/src/rpc.jl @@ -3,7 +3,7 @@ # - Client - Datanode # ref: https://wiki.apache.org/hadoop/HadoopRpc -const HRPC_HEADER = convert(Vector{UInt8}, "hrpc") +const HRPC_HEADER = convert(Vector{UInt8}, codeunits("hrpc")) const HRPC_VERSION = 0x09 const HRPC_SERVICE_CLASS = 0x00 @@ -48,16 +48,7 @@ mutable struct HadoopRpcException <: Exception end # Utility methods -function resolveCRC32c() - try - return eval(Base, :crc32c) - end - crc(CRC_32_C) -end - -const crc32 = crc(CRC_32) -const crc32c = resolveCRC32c() -calc_chksum(typ::Int32, c_data::Vector{UInt8}) = (typ === ChecksumTypeProto.CHECKSUM_CRC32) ? crc32(c_data) : (typ === ChecksumTypeProto.CHECKSUM_CRC32C) ? crc32c(c_data) : throw(HadoopRpcException("Unknown CRC type $typ")) +calc_chksum(typ::Int32, c_data::Vector{UInt8}) = (typ === ChecksumTypeProto.CHECKSUM_CRC32C) ? crc32c(c_data) : throw(HadoopRpcException("Unknown CRC type $typ")) isvalid_chksum(typ::Int32) = (typ === ChecksumTypeProto.CHECKSUM_CRC32) || (typ === ChecksumTypeProto.CHECKSUM_CRC32C) function _len_uleb(x::T) where T <: Integer @@ -74,8 +65,7 @@ function buffer_size_delimited(channelbuff::IOBuffer, obj) data = take!(iob) len = write_bytes(channelbuff, data) - @logmsg("$(typeof(obj)) -> $data") - @logmsg("$(typeof(obj)) -> buffer. len $len") + @debug("$(typeof(obj)) -> buffer", len=len, data=data) len end @@ -87,14 +77,14 @@ function send_buffered(buff::IOBuffer, sock::TCPSocket, delimited::Bool) len = write(sock, hton(datalen)) end len += write(sock, data) - @logmsg("buffer -> sock. len $len") + @debug("buffer -> sock", len=len) len end function recv_blockop(sock::TCPSocket) - @logmsg("recv block read message") + @debug("recv block read message") data_bytes = read_bytes(sock) - @logmsg("block_resp <- sock. len $(length(data_bytes))") + @debug("block_resp <- sock", len=length(data_bytes)) block_resp = BlockOpResponseProto() readproto(IOBuffer(data_bytes), block_resp) @@ -115,16 +105,16 @@ mutable struct HadoopRpcChannel <: ProtoRpcChannel protocol_attribs::Dict call_id::Int32 # see: RpcRequestHeaderProto.callId sent_call_id::Int32 # set to the last call id sent for verification purpose - clnt_id::AbstractString # string(Base.Random.uuid4()) + clnt_id::AbstractString # string(uuid4()) ugi::UserGroupInformation iob::IOBuffer - sock::Nullable{TCPSocket} + sock::Union{Nothing,TCPSocket} function HadoopRpcChannel(host::AbstractString, port::Integer, ugi::UserGroupInformation, protocol::DataType) protocol = HRPC_PROTOCOLS[protocol] call_id = HRPC_CALL_ID_CONNCTX - clnt_id = string(Base.Random.uuid4())[1:16] - new(host, port, protocol, call_id, call_id, clnt_id, ugi, IOBuffer(), Nullable{TCPSocket}()) + clnt_id = string(uuid4())[1:16] + new(host, port, protocol, call_id, call_id, clnt_id, ugi, IOBuffer(), nothing) end end @@ -137,9 +127,9 @@ function show(io::IO, ch::HadoopRpcChannel) nothing end -isconnected(channel::HadoopRpcChannel) = !isnull(channel.sock) && isopen(get(channel.sock)) +isconnected(channel::HadoopRpcChannel) = (channel.sock !== nothing) ? isopen(channel.sock) : false begin_send(channel::HadoopRpcChannel) = Base.truncate(channel.iob, 0) -send_buffered(channel::HadoopRpcChannel, delimited::Bool) = send_buffered(channel.iob, get(channel.sock), delimited::Bool) +send_buffered(channel::HadoopRpcChannel, delimited::Bool) = send_buffered(channel.iob, channel.sock, delimited::Bool) function next_call_id(channel::HadoopRpcChannel) id = channel.sent_call_id = channel.call_id @@ -157,7 +147,7 @@ function next_call_id(channel::HadoopRpcChannel) end function buffer_handshake(channel::HadoopRpcChannel, authprotocol::UInt8) - write(channel.iob, [HRPC_HEADER, HRPC_VERSION, HRPC_SERVICE_CLASS, authprotocol;]) + write(channel.iob, HRPC_HEADER, HRPC_VERSION, HRPC_SERVICE_CLASS, authprotocol) end function buffer_connctx(channel::HadoopRpcChannel) @@ -172,7 +162,7 @@ function buffer_reqhdr(channel::HadoopRpcChannel, call_id::Int32) :rpcOp => HRPC_FINAL_PACKET, :callId => call_id, #:retryCount => -1, - :clientId => convert(Vector{UInt8}, channel.clnt_id))) + :clientId => convert(Vector{UInt8}, codeunits(channel.clnt_id)))) buffer_size_delimited(channel.iob, hdr) end @@ -193,10 +183,10 @@ end # Connect to the hadoop server and complete the handshake function connect(channel::HadoopRpcChannel) # open connection - @logmsg("connecting to HadoopRpcChannel $(channel.host):$(channel.port)") + @debug("connecting to HadoopRpcChannel $(channel.host):$(channel.port)") try sock = connect(channel.host, channel.port) - channel.sock = Nullable{TCPSocket}(sock) + channel.sock = sock # negotiate sasl authentication if ugi has appropriate tokens if !conditional_sasl_auth(channel) @@ -210,10 +200,10 @@ function connect(channel::HadoopRpcChannel) buffer_conctx_reqhdr(channel) buffer_connctx(channel) send_buffered(channel, true) - @logmsg("connected to HadoopRpcChannel $(channel.host):$(channel.port)") + @debug("connected to HadoopRpcChannel $(channel.host):$(channel.port)") channel.call_id = HRPC_CALL_ID_NORMAL catch ex - @logmsg("exception connecting to $(channel.host):$(channel.port): $ex") + @debug("exception connecting to $(channel.host):$(channel.port): $ex") disconnect(channel) rethrow(ex) end @@ -224,27 +214,27 @@ end # Client Id is also reset and so this is in essence an entirely new channel. function disconnect(channel::HadoopRpcChannel) try - isconnected(channel) && close(get(channel.sock)) + isconnected(channel) && close(channel.sock) catch ex - @logmsg("exception while closing channel socket $ex") + @debug("exception while closing channel socket $ex") end - channel.sock = Nullable{TCPSocket}() + channel.sock = nothing channel.call_id = HRPC_CALL_ID_CONNCTX - channel.clnt_id = string(Base.Random.uuid4())[1:16] + channel.clnt_id = string(uuid4())[1:16] end function send_rpc_message(channel::HadoopRpcChannel, controller::HadoopRpcController, method::MethodDescriptor, params) isconnected(channel) || connect(channel) try - @logmsg("send rpc message. method: $(method.name)") + @debug("send rpc message. method: $(method.name)") begin_send(channel) buffer_rpc_reqhdr(channel) buffer_method_reqhdr(channel, method) buffer_size_delimited(channel.iob, params) send_buffered(channel, true) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(channel) rethrow(ex) end @@ -253,10 +243,10 @@ end function recv_rpc_message(channel::HadoopRpcChannel, resp) try - @logmsg("recv rpc message") - msg_len = ntoh(read(get(channel.sock), UInt32)) - hdr_bytes = read_bytes(get(channel.sock)) - @logmsg("hdr <- sock. len $(length(hdr_bytes))") + @debug("recv rpc message") + msg_len = ntoh(read(channel.sock, UInt32)) + hdr_bytes = read_bytes(channel.sock) + @debug("hdr <- sock", len=length(hdr_bytes)) resp_hdr = RpcResponseHeaderProto() readproto(IOBuffer(hdr_bytes), resp_hdr) @@ -268,15 +258,15 @@ function recv_rpc_message(channel::HadoopRpcChannel, resp) hdr_len = UInt32(length(hdr_bytes)) hdr_len += _len_uleb(hdr_len) if msg_len > hdr_len - data_bytes = read_bytes(get(channel.sock)) - @logmsg("data <- sock. len $(length(data_bytes))") + data_bytes = read_bytes(channel.sock) + @debug("data <- sock", len=length(data_bytes)) data_len = msg_len - hdr_len - _len_uleb(length(data_bytes)) (length(data_bytes) == data_len) || throw(HadoopRpcException("unexpected response data length. expected:$(data_len) read:$(length(data_bytes))")) readproto(IOBuffer(data_bytes), resp) end end catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(channel) rethrow(ex) end @@ -284,7 +274,7 @@ function recv_rpc_message(channel::HadoopRpcChannel, resp) end function call_method(channel::HadoopRpcChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::HadoopRpcController, params) - @logmsg("call_method $(method.name)") + @debug("call_method $(method.name)") send_rpc_message(channel, controller, method, params) resp_type = get_response_type(method) resp = resp_type() @@ -314,37 +304,37 @@ mutable struct HadoopDataChannel host::AbstractString port::Integer iob::IOBuffer - sock::Nullable{TCPSocket} + sock::Union{Nothing,TCPSocket} - HadoopDataChannel(host::AbstractString, port::Integer) = new(host, port, IOBuffer(), Nullable{TCPSocket}()) + HadoopDataChannel(host::AbstractString, port::Integer) = new(host, port, IOBuffer(), nothing) end function connect(channel::HadoopDataChannel) - @logmsg("connecting to HadoopDataChannel $(channel.host):$(channel.port)") + @debug("connecting to HadoopDataChannel $(channel.host):$(channel.port)") try sock = connect(channel.host, channel.port) - channel.sock = Nullable(sock) + channel.sock = sock catch ex - @logmsg("exception connecting to HadoopDataChannel $(channel.host):$(channel.port): $ex") + @debug("exception connecting to HadoopDataChannel $(channel.host):$(channel.port): $ex") rethrow(ex) end - @logmsg("connected to HadoopDataChannel $(channel.host):$(channel.port)") + @debug("connected to HadoopDataChannel $(channel.host):$(channel.port)") nothing end function disconnect(channel::HadoopDataChannel) try - isconnected(channel) && close(get(channel.sock)) + isconnected(channel) && close(channel.sock) catch ex - @logmsg("exception while closing HadoopDataChannel socket $ex") + @debug("exception while closing HadoopDataChannel socket $ex") end - channel.sock = Nullable{TCPSocket}() + channel.sock = nothing nothing end -isconnected(channel::HadoopDataChannel) = !isnull(channel.sock) && isopen(get(channel.sock)) +isconnected(channel::HadoopDataChannel) = (channel.sock !== nothing) ? isopen(channel.sock) : false begin_send(channel::HadoopDataChannel) = Base.truncate(channel.iob, 0) -send_buffered(channel::HadoopDataChannel, delimited::Bool) = send_buffered(channel.iob, get(channel.sock), delimited::Bool) +send_buffered(channel::HadoopDataChannel, delimited::Bool) = send_buffered(channel.iob, channel.sock, delimited::Bool) function buffer_opcode(channel::HadoopDataChannel, opcode::Int8) hver = UInt16(HDATA_VERSION) @@ -357,9 +347,8 @@ function _select_node_for_block(block::LocatedBlockProto) for loc in block.locs (loc.adminState == DatanodeInfoProto_AdminState.NORMAL) && push!(node_infos, loc) end - isempty(node_infos) && (return Nullable{DatanodeInfoProto}()) # TODO: algo to select best node - return Nullable(node_infos[1]) + isempty(node_infos) ? nothing : node_infos[1] end @@ -382,21 +371,23 @@ function _get(pool::HadoopDataChannelPool, host::AbstractString, port::Integer) timediff = _dcpool.keepalivesecs while !isempty(free) && (timediff >= _dcpool.keepalivesecs) - channel,lastusetime = shift!(free) + channel,lastusetime = popfirst!(free) timediff = round(UInt64, time() - lastusetime) end (timediff < _dcpool.keepalivesecs) || (channel = HadoopDataChannel(host, port)) - @logmsg("return channel: $channel connected: $(isconnected(channel))") + @debug("return channel: $channel connected: $(isconnected(channel))") channel end function _put(pool::HadoopDataChannelPool, channel::HadoopDataChannel, reuse::Bool) isconnected(channel) || return - @logmsg("keeping channel: $channel connected: $(isconnected(channel))") + @debug("keeping channel: $channel connected: $(isconnected(channel))") if !reuse try disconnect(channel) + catch + # ignore exception finally return end @@ -426,10 +417,10 @@ mutable struct HDFSBlockReader len::UInt64 # protocol data - block_op_resp::Nullable{BlockOpResponseProto} + block_op_resp::Union{Nothing,BlockOpResponseProto} total_read::UInt64 - pkt_hdr::Nullable{PacketHeaderProto} + pkt_hdr::Union{Nothing,PacketHeaderProto} packet_len::UInt64 packet_read::UInt64 @@ -445,10 +436,10 @@ mutable struct HDFSBlockReader function HDFSBlockReader(host::AbstractString, port::Integer, block::LocatedBlockProto, offset::UInt64, len::UInt64, chk_crc::Bool) channel = _get(_dcpool, host, port) - @logmsg("creating block reader for offset $offset at $host:$port for length $len") + @debug("creating block reader for offset $offset at $host:$port for length $len") new(channel, block, offset, len, - Nullable{BlockOpResponseProto}(), 0, - Nullable{PacketHeaderProto}(), 0, 0, + nothing, 0, + nothing, 0, 0, 0, 0, 0, UInt8[], UInt32[], chk_crc, false) end @@ -458,9 +449,9 @@ isconnected(reader::HDFSBlockReader) = isconnected(reader.channel) function disconnect(reader::HDFSBlockReader, reuse::Bool) isconnected(reader.channel) && _put(_dcpool, reader.channel, reuse) - reader.block_op_resp = Nullable{BlockOpResponseProto}() + reader.block_op_resp = nothing reader.total_read = 0 - reader.pkt_hdr = Nullable{PacketHeaderProto}() + reader.pkt_hdr = nothing reader.packet_len = 0 reader.packet_read = 0 reader.chunk_len = 0 @@ -491,7 +482,7 @@ function buffer_readblock(reader::HDFSBlockReader) basehdr = protobuild(BaseHeaderProto, Dict(:block => exblock, :token => token)) hdr = protobuild(ClientOperationHeaderProto, Dict(:baseHeader => basehdr, :clientName => ELLY_CLIENTNAME)) readblock = protobuild(OpReadBlockProto, Dict(:offset => offset, :len => len, :header => hdr)) - @logmsg("sending block read message for offset $offset len $len") + @debug("sending block read message for offset $offset len $len") buffer_size_delimited(channel.iob, readblock) end @@ -507,13 +498,13 @@ function send_block_read(reader::HDFSBlockReader) isconnected(channel) || connect(channel) try - @logmsg("send block read message") + @debug("send block read message") begin_send(channel) buffer_opcode(channel, HDATA_READ_BLOCK) buffer_readblock(reader) send_buffered(channel, false) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -523,11 +514,11 @@ end function send_read_status(reader::HDFSBlockReader, status::Int32=Status.SUCCESS) channel = reader.channel try - @logmsg("send read status $status") + @debug("send read status $status") buffer_client_read_status(reader, status) send_buffered(channel, false) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -537,12 +528,12 @@ end function recv_blockop(reader::HDFSBlockReader) channel = reader.channel try - block_resp = recv_blockop(get(channel.sock)) + block_resp = recv_blockop(channel.sock) checksum_type = block_resp.readOpChecksumInfo.checksum._type isvalid_chksum(checksum_type) || throw(HadoopRpcException("Unknown checksum type $checksum_type")) - reader.block_op_resp = Nullable(block_resp) + reader.block_op_resp = block_resp catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -552,42 +543,42 @@ end function recv_packet_hdr(reader::HDFSBlockReader) channel = reader.channel try - @logmsg("recv block packet message") - sock = get(channel.sock) + @debug("recv block packet message") + sock = channel.sock pkt_len = ntoh(read(sock, UInt32)) hdr_len = ntoh(read(sock, UInt16)) - hdr_bytes = Vector{UInt8}(hdr_len) + hdr_bytes = Vector{UInt8}(undef, hdr_len) read!(sock, hdr_bytes) - @logmsg("pkt_hdr <- sock. len $(hdr_len) (pkt_len: $pkt_len)") + @debug("pkt_hdr <- sock", len=hdr_len, pkt_len=pkt_len) pkt_hdr = PacketHeaderProto() readproto(IOBuffer(hdr_bytes), pkt_hdr) data_len = pkt_hdr.dataLen - block_op_resp = get(reader.block_op_resp) + block_op_resp = reader.block_op_resp reader.chunk_len = block_op_resp.readOpChecksumInfo.checksum.bytesPerChecksum - reader.chunk = Vector{UInt8}(reader.chunk_len) + reader.chunk = Vector{UInt8}(undef, reader.chunk_len) reader.chunk_count = div(data_len + reader.chunk_len - 1, reader.chunk_len) # chunk_len-1 to take care of chunks with partial data reader.chunks_read = 0 - @logmsg("received packet with $(reader.chunk_count) chunks of $(reader.chunk_len) bytes each in $data_len bytes of data") - @logmsg("last packet: $(pkt_hdr.lastPacketInBlock)") + @debug("received packet with $(reader.chunk_count) chunks of $(reader.chunk_len) bytes each in $data_len bytes of data") + @debug("last packet: $(pkt_hdr.lastPacketInBlock)") - checksums = Vector{UInt32}(reader.chunk_count) + checksums = Vector{UInt32}(undef, reader.chunk_count) read!(sock, checksums) - @logmsg("checksums <- sock. len $(sizeof(checksums))") + @debug("checksums <- sock", len=sizeof(checksums)) for idx in 1:length(checksums) checksums[idx] = ntoh(checksums[idx]) end reader.packet_len = pkt_len #reader.packet_read = 4 + 2 + hdr_len + sizeof(checksums) reader.packet_read = 4 + sizeof(checksums) - reader.pkt_hdr = Nullable(pkt_hdr) + reader.pkt_hdr = pkt_hdr reader.checksums = checksums - @logmsg("current read position pkt $(reader.packet_read)/$(reader.packet_len), block $(reader.total_read)/$(reader.len)") + @debug("current read position pkt $(reader.packet_read)/$(reader.packet_len), block $(reader.total_read)/$(reader.len)") nothing catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(reader, false) rethrow(ex) end @@ -598,10 +589,10 @@ eof(reader::HDFSBlockReader) = (reader.total_read >= reader.len) function verify_pkt_checksums(reader::HDFSBlockReader, buff::Vector{UInt8}) data_len = length(buff) - @logmsg("verifying packet crc data_len: $data_len") + @debug("verifying packet crc data_len: $data_len") (data_len > 0) || return - block_op_resp = get(reader.block_op_resp) + block_op_resp = reader.block_op_resp checksum_type = block_op_resp.readOpChecksumInfo.checksum._type if !isvalid_chksum(checksum_type) disconnect(reader, false) @@ -631,7 +622,7 @@ function initiate(reader::HDFSBlockReader; retry=true) return catch ex if retry - @logmsg("retrying block reader initiation") + @debug("retrying block reader initiation") disconnect(reader.channel) connect(reader.channel) return initiate(reader; retry=false) @@ -663,19 +654,19 @@ function read_packet!(reader::HDFSBlockReader, inbuff::Vector{UInt8}, offset::UI buff = unsafe_wrap(Array, pointer(inbuff, offset), packet_remaining) channel = reader.channel - sock = get(channel.sock) + sock = channel.sock read!(sock, buff) reader.packet_read += packet_remaining reader.total_read += packet_remaining - @logmsg("recv $(packet_remaining) packet_read $(reader.packet_read), total_read $(reader.total_read)") + @debug("recv $(packet_remaining) packet_read $(reader.packet_read), total_read $(reader.total_read)") # verify crc only if required reader.chk_crc && verify_pkt_checksums(reader, buff) # send the success block read status if eof(reader) - @logmsg("recv last empty packet") + @debug("recv last empty packet") recv_packet_hdr(reader) send_read_status(reader) end @@ -808,16 +799,15 @@ mutable struct HDFSBlockWriter pipeline_task::Task function HDFSBlockWriter(block::LocatedBlockProto, defaults::FsServerDefaultsProto) - nnode_info = _select_node_for_block(block) - isnull(nnode_info) && throw(HadoopRpcException("Could not get a valid datanode to write")) + node_info = _select_node_for_block(block) + (node_info === nothing) && throw(HadoopRpcException("Could not get a valid datanode to write")) - node_info = get(nnode_info) node = node_info.id host = node.ipAddr port = node.xferPort channel = _get(_dcpool, host, port) - @logmsg("creating block writer for offset $(block.offset) at $host:$port") + @debug("creating block writer for offset $(block.offset) at $host:$port") writer = new(channel, defaults, block, node_info, PipeBuffer(), 0, 0, WriterPipeline(), false) writer.pipeline_task = @async process_pipeline(writer) writer @@ -844,20 +834,20 @@ function check_write_buffer(writer::HDFSBlockWriter) defaults = writer.server_defaults max_buffer_pkt = div(defaults.blockSize, defaults.writePacketSize) + 1 max_buffer_bytes = defaults.writePacketSize * max_buffer_pkt - buffered_bytes = nb_available(writer.buffer) + buffered_bytes = bytesavailable(writer.buffer) if ((buffered_bytes >= max_buffer_bytes) || (pending(writer.pkt_pipeline) >= max_buffer_pkt)) - @logmsg("slow pipeline. pending bytes:$(nb_available(writer.buffer)), packets:$(pending(writer.pkt_pipeline)). waiting...") + @debug("slow pipeline. pending bytes:$(bytesavailable(writer.buffer)), packets:$(pending(writer.pkt_pipeline)). waiting...") trigger_pkt(writer.pkt_pipeline) wait_pkt(writer.pkt_pipeline) elseif buffered_bytes >= defaults.writePacketSize - @logmsg("triggering pkt_pipeline. buffered:$(nb_available(writer.buffer)), pending packets:$(pending(writer.pkt_pipeline))") + @debug("triggering pkt_pipeline. buffered:$(bytesavailable(writer.buffer)), pending packets:$(pending(writer.pkt_pipeline))") trigger_pkt(writer.pkt_pipeline) yield() end end # write only enough till blockSize -_can_write(writer::HDFSBlockWriter, n::Int) = min(writer.server_defaults.blockSize - writer.total_written - nb_available(writer.buffer), n) +_can_write(writer::HDFSBlockWriter, n::Int) = min(writer.server_defaults.blockSize - writer.total_written - bytesavailable(writer.buffer), n) function write(writer::HDFSBlockWriter, x::UInt8) nbytes = _can_write(writer, 1) @@ -874,7 +864,7 @@ function write(writer::HDFSBlockWriter, buff::Vector{UInt8}) end function flush(writer::HDFSBlockWriter) - @logmsg("flush of block writer invoked") + @debug("flush of block writer invoked") flush_and_wait(writer.pkt_pipeline) end @@ -887,7 +877,7 @@ function initiate(writer::HDFSBlockWriter; retry=true) return catch ex if retry - @logmsg("retrying block write initiation") + @debug("retrying block write initiation") disconnect(writer.channel) connect(writer.channel) return initiate(writer; retry=false) @@ -898,7 +888,7 @@ function initiate(writer::HDFSBlockWriter; retry=true) end function process_pipeline(writer::HDFSBlockWriter) - @logmsg("process_pipeline: started") + @debug("process_pipeline: started") pipeline = writer.pkt_pipeline defaults = writer.server_defaults flushed = false @@ -909,7 +899,7 @@ function process_pipeline(writer::HDFSBlockWriter) readable_bytes = 0 while !(failed || flushed) loop_state = 0 - if (nb_available(writer.buffer) < defaults.writePacketSize) && + if (bytesavailable(writer.buffer) < defaults.writePacketSize) && isempty(pipeline.pkt_prepared) && (isempty(pipeline.pkt_ackwait) || (readable_bytes == 0)) wait_pkt(pipeline) @@ -918,50 +908,50 @@ function process_pipeline(writer::HDFSBlockWriter) writer.initiated || initiate(writer) channel = writer.channel - sock = get(channel.sock) - readable_bytes = nb_available(sock) + sock = channel.sock + readable_bytes = bytesavailable(sock) - @logmsg("processing pipeline. pending $(nb_available(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") - while nb_available(writer.buffer) >= defaults.writePacketSize + @debug("processing pipeline. pending $(bytesavailable(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") + while bytesavailable(writer.buffer) >= defaults.writePacketSize prepare_packet(writer) end flushed = pipeline.flush if flushed - while nb_available(writer.buffer) > 0 - @logmsg("flushing remaining data packet...") + while bytesavailable(writer.buffer) > 0 + @debug("flushing remaining data packet...") prepare_packet(writer) end - @logmsg("preparing last empty packet...") + @debug("preparing last empty packet...") prepare_packet(writer) end loop_state = 1 - @logmsg("writing packets...") + @debug("writing packets...") while !isempty(pipeline.pkt_prepared) pkt = pipeline.pkt_prepared[1] write_packet(writer, pkt) end - @logmsg("reading acks...") + @debug("reading acks...") loop_state = 2 while !failed && !isempty(pipeline.pkt_ackwait) && (flushed || (readable_bytes > 0)) read_packet_ack(writer) failed = pipeline.failed - readable_bytes = nb_available(sock) + readable_bytes = bytesavailable(sock) end - @logmsg("processed pipeline. pending $(nb_available(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") + @debug("processed pipeline. pending $(bytesavailable(writer.buffer))bytes -> [$(length(pipeline.pkt_prepared)) -> $(length(pipeline.pkt_ackwait)) -> $(length(pipeline.pkt_ackrcvd))]pkts <- $(readable_bytes)bytes flush:$(flushed)") trigger_pkt(pipeline) end catch ex channel = writer.channel extype = (loop_state == 1) ? "send" : (loop_state == 2) ? "read" : "unknown" - @logmsg("pipeline for $(channel.host):$(channel.port) failed. stage $extype. $ex") + @debug("pipeline for $(channel.host):$(channel.port) failed. stage $extype. $ex") disconnect(writer, false) failed = pipeline.failed = true end - @logmsg("process_pipeline finished. failed: $(failed), flushed: $(flushed)") + @debug("process_pipeline finished. failed: $(failed), flushed: $(flushed)") trigger_pkt(pipeline) trigger_flushed(pipeline) end @@ -1000,7 +990,7 @@ function buffer_writeblock(writer::HDFSBlockWriter) :maxBytesRcvd => 0, :latestGenerationStamp => exblock.generationStamp, :requestedChecksum => chksum)) - @logmsg("sending block write message for offset $(block.offset)") + @debug("sending block write message for offset $(block.offset)") buffer_size_delimited(channel.iob, writeblock) end @@ -1010,13 +1000,13 @@ function send_block_write(writer::HDFSBlockWriter) channel = writer.channel try - @logmsg("send block write message") + @debug("send block write message") begin_send(channel) buffer_opcode(channel, HDATA_WRITE_BLOCK) buffer_writeblock(writer) send_buffered(channel, false) catch ex - @logmsg("exception sending to $(channel.host):$(channel.port): $ex") + @debug("exception sending to $(channel.host):$(channel.port): $ex") disconnect(writer, false) rethrow(ex) end @@ -1026,9 +1016,9 @@ end function recv_blockop(writer::HDFSBlockWriter) channel = writer.channel try - recv_blockop(get(channel.sock)) + recv_blockop(channel.sock) catch ex - @logmsg("exception receiving from $(channel.host):$(channel.port): $ex") + @debug("exception receiving from $(channel.host):$(channel.port): $ex") disconnect(writer, false) rethrow(ex) end @@ -1044,7 +1034,7 @@ function populate_checksums(bytes::Vector{UInt8}, chunk_len::UInt32, checksums:: c_len = min(nbytes-c_offset+1, chunk_len) c_data = unsafe_wrap(Array, pointer(bytes, c_offset), c_len) checksums[idx] = hton(calc_chksum(checksum_type, c_data)) - #@logmsg("chksum $(checksums[idx])") + #@debug("chksum $(checksums[idx])") c_offset += c_len end nothing @@ -1054,12 +1044,12 @@ end function prepare_packet(writer::HDFSBlockWriter) defaults = writer.server_defaults - #@logmsg("prepare block packet") - bytes_in_packet = min(defaults.writePacketSize, nb_available(writer.buffer)) + #@debug("prepare block packet") + bytes_in_packet = min(defaults.writePacketSize, bytesavailable(writer.buffer)) last_pkt = (bytes_in_packet == 0) seq_no = Int64(writer.pkt_seq += 1) - #@logmsg("packet seqno $seq_no with $(bytes_in_packet)/$(defaults.writePacketSize) bytes is last packet: $last_pkt") + #@debug("packet seqno $seq_no with $(bytes_in_packet)/$(defaults.writePacketSize) bytes is last packet: $last_pkt") pkt_hdr = protobuild(PacketHeaderProto, Dict(:offsetInBlock => writer.total_written, :seqno => seq_no, @@ -1068,16 +1058,16 @@ function prepare_packet(writer::HDFSBlockWriter) writer.total_written += bytes_in_packet - if bytes_in_packet == nb_available(writer.buffer) + if bytes_in_packet == bytesavailable(writer.buffer) bytes = take!(writer.buffer) else - bytes = Vector{UInt8}(bytes_in_packet) + bytes = Vector{UInt8}(undef, bytes_in_packet) read!(writer.buffer, bytes) end chunk_len = defaults.bytesPerChecksum chunk_count = div(bytes_in_packet + chunk_len - 1, chunk_len) - checksums = Vector{UInt32}(chunk_count) + checksums = Vector{UInt32}(undef, chunk_count) populate_checksums(bytes, chunk_len, checksums, defaults.checksumType) enqueue(writer.pkt_pipeline, PipelinedPacket(seq_no, pkt_hdr, bytes, checksums, Int32[])) @@ -1086,10 +1076,10 @@ end const hdr_iob = IOBuffer() function write_packet(writer::HDFSBlockWriter, pkt::PipelinedPacket) - #@logmsg("write block packet") + #@debug("write block packet") channel = writer.channel - sock = get(channel.sock) + sock = channel.sock pkt_len = UInt32(4 + sizeof(pkt.checksums) + sizeof(pkt.bytes)) seek(hdr_iob, 0) @@ -1102,18 +1092,18 @@ function write_packet(writer::HDFSBlockWriter, pkt::PipelinedPacket) write(sock, pkt.bytes) ackwait(writer.pkt_pipeline, pkt.seqno) - #@logmsg("sent packet to $(channel.host):$(channel.port). total_written:$(writer.total_written)") + #@debug("sent packet to $(channel.host):$(channel.port). total_written:$(writer.total_written)") nothing end function read_packet_ack(writer::HDFSBlockWriter) - #@logmsg("reading packet ack") + #@debug("reading packet ack") channel = writer.channel - sock = get(channel.sock) - #@logmsg("recv packet ack message") + sock = channel.sock + #@debug("recv packet ack message") data_bytes = read_bytes(sock) - #@logmsg("ack <- sock. len $(length(data_bytes))") + #@debug("ack <- sock. len $(length(data_bytes))") ack = PipelineAckProto() readproto(IOBuffer(data_bytes), ack) @@ -1124,7 +1114,7 @@ function read_packet_ack(writer::HDFSBlockWriter) exblk = writer.block.b set_field!(exblk, :numBytes, pipeline.acked_bytes) - #@logmsg("received ack for seqno: $(ack.seqno), status: $(ack.reply) bytes acked: $(exblk.numBytes)") + #@debug("received ack for seqno: $(ack.seqno), status: $(ack.reply) bytes acked: $(exblk.numBytes)") nothing end diff --git a/src/sasl.jl b/src/sasl.jl index 355e4dd..a2802c2 100644 --- a/src/sasl.jl +++ b/src/sasl.jl @@ -15,7 +15,7 @@ function digmd5_decode_challenge(challenge::AbstractString) end comps[n] = v end - @logmsg("decoded challenge components: $comps") + @debug("decoded challenge components: $comps") comps end @@ -79,7 +79,7 @@ end buffer_sasl_reqhdr(channel::HadoopRpcChannel) = (channel.sent_call_id = channel.call_id = HRPC_CALL_ID_SASL; buffer_reqhdr(channel, HRPC_CALL_ID_SASL)) function buffer_sasl_message(channel::HadoopRpcChannel, state::Int32, auths::Vector{RpcSaslProto_SaslAuth}=RpcSaslProto_SaslAuth[], token::Vector{UInt8}=UInt8[]) - @logmsg("buffer SASL message. state:$state, nauths:$(length(auths)), token:$(!isempty(token))") + @debug("buffer SASL message. state:$state, nauths:$(length(auths)), token:$(!isempty(token))") saslmsg = protobuild(RpcSaslProto, Dict(:version => 0, :state => state)) isempty(auths) || set_field!(saslmsg, :auths, auths) isempty(token) || set_field!(saslmsg, :token, token) @@ -87,10 +87,10 @@ function buffer_sasl_message(channel::HadoopRpcChannel, state::Int32, auths::Vec end function recv_sasl_message(channel::HadoopRpcChannel) - @logmsg("recv SASL message") + @debug("recv SASL message") resp = RpcSaslProto() recv_rpc_message(channel, resp) - @logmsg("received SASL message $resp") + @debug("received SASL message $resp") resp end @@ -103,10 +103,10 @@ function conditional_sasl_auth(channel::HadoopRpcChannel) tok_alias = token_alias(channel) tokens = find_tokens(channel.ugi, alias=tok_alias) if isempty(tokens) - @logmsg("no token available to authenticate to $tok_alias. skipping") + @debug("no token available to authenticate to $tok_alias. skipping") return false end - @logmsg("found $(length(tokens)) tokens available to authenticate to $tok_alias. authenticating...") + @debug("found $(length(tokens)) tokens available to authenticate to $tok_alias. authenticating...") token = tokens[1] sasl_auth(channel, token) true @@ -127,16 +127,16 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) # check if TOKEN/DIGEST-MD5 is one of the supported methods nauths = length(resp.auths) - @logmsg("server supports $nauths auths") + @debug("server supports $nauths auths") idx_auth = 0 for idx in 1:length(resp.auths) auth = resp.auths[idx] - @logmsg(" $idx: $(auth.method)/$(auth.mechanism)") + @debug(" $idx: $(auth.method)/$(auth.mechanism)") ((auth.method == "TOKEN") && (auth.mechanism == "DIGEST-MD5")) || continue idx_auth = idx end if idx_auth == 0 - @logmsg("no supported auth method found") + @debug("no supported auth method found") throw(HadoopRpcException("no supported authentication method found")) end auth = resp.auths[idx_auth] @@ -145,9 +145,9 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) protocol = auth.protocol serverId = auth.serverId challenge = auth.challenge - @logmsg("auth.protocol: $protocol, serverId: $serverId, challenge: $challenge") + @debug("auth.protocol: $protocol, serverId: $serverId, challenge: $challenge") response = digmd5_respond(token, auth.protocol, auth.serverId, auth.challenge) - @logmsg("response: $response") + @debug("response: $response") # send response as a sasl initiate request respauth = protobuild(RpcSaslProto_SaslAuth, Dict(:method => auth.method, @@ -157,13 +157,13 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) begin_send(channel) buffer_sasl_reqhdr(channel) - buffer_sasl_message(channel, RpcSaslProto_SaslState.INITIATE, [respauth;], convert(Vector{UInt8}, response)) + buffer_sasl_message(channel, RpcSaslProto_SaslState.INITIATE, [respauth], convert(Vector{UInt8}, codeunits(response))) send_buffered(channel, true) resp = recv_sasl_message(channel) # expect a success response if resp.state != RpcSaslProto_SaslState.SUCCESS - @logmsg("error completing SASL auth. state: $(resp.state). expected state: $(RpcSaslProto_SaslState.SUCCESS)") + @debug("error completing SASL auth. state: $(resp.state). expected state: $(RpcSaslProto_SaslState.SUCCESS)") throw(HadoopRpcException("error completing SASL auth. state: $(resp.state). expected state: $(RpcSaslProto_SaslState.SUCCESS)")) end nothing diff --git a/test/hdfstests.jl b/test/hdfstests.jl index 0e995ef..da07b8d 100644 --- a/test/hdfstests.jl +++ b/test/hdfstests.jl @@ -1,5 +1,6 @@ using Elly -using Base.Test +using Test +using Random function test_hdfs(host="localhost", port=9000) limitedtestenv = (get(ENV, "CI", "false") == "true") @@ -49,13 +50,13 @@ function test_hdfs(host="localhost", port=9000) bar_file = HDFSFile(hdfsclnt, "bar") teststr = "hello world\n" nloops = 1000 - tic() + t1 = time() open(bar_file, "w") do f for idx in 1:nloops write(f, teststr) end end - println("...done in $(toc()) secs") + println("...done in $(time() - t1) secs") println("verify file size to be $(length(teststr)*nloops)...") @test filesize(bar_file) == length(teststr) * nloops @@ -82,32 +83,32 @@ function test_hdfs(host="localhost", port=9000) A = rand(UInt8, size_bytes) open(bar_file, "w"; blocksz=UInt64(size_bytes)) do f for idx in 1:nloops - tic() + t1 = time() write(f, A) - println("...block written in $(toc()) secs") + println("...block written in $(time() - t1) secs") end end println("verify file size to be $(sizeof(A) * nloops)...") @test filesize(bar_file) == sizeof(A) * nloops println("read and verify...") - B = Array{UInt8}(size_bytes) + B = Array{UInt8}(undef, size_bytes) open(bar_file, "r") do f for idx in 1:nloops - tic() + t1 = time() read!(f, B) - println("...block read in $(toc()) secs") + println("...block read in $(time() - t1) secs") @test A == B end end println("read and verify with crc...") - B = Array{UInt8}(size_bytes) + B = Array{UInt8}(undef, size_bytes) open(bar_file, "r"; crc=true) do f for idx in 1:nloops - tic() + t1 = time() read!(f, B) - println("...block read in $(toc()) secs") + println("...block read in $(time() - t1) secs") @test A == B end end @@ -120,7 +121,7 @@ function test_hdfs(host="localhost", port=9000) @test hdfs_set_replication(bar_file, 2) cd(hdfsclnt, "/tmp/foo") - NFILES = limitedtestenv ? 100 : 1000 + NFILES = limitedtestenv ? 10 : 1000 println("create many ($NFILES) files...") for fidx in 1:NFILES bar_file = HDFSFile(hdfsclnt, "bar$fidx") @@ -131,6 +132,7 @@ function test_hdfs(host="localhost", port=9000) end ((fidx % 10) == 0) && println("...created file #$fidx") end + println("reading directory...") allfiles = readdir(hdfsclnt, "/tmp/foo") println("delete many ($NFILES) files...") for idx in 1:NFILES diff --git a/test/yarntests.jl b/test/yarntests.jl index 32219a1..fc7ff13 100644 --- a/test/yarntests.jl +++ b/test/yarntests.jl @@ -1,5 +1,6 @@ using Elly -using Base.Test +using Test +using Distributed function test_yarn(host="localhost", rmport=8032, schedport=8030) limitedtestenv = (get(ENV, "CI", "false") == "true") @@ -10,15 +11,33 @@ function test_yarn(host="localhost", rmport=8032, schedport=8030) println("number of yarn nodes: $nnodes") nlist = nodes(yarnclnt) - show(STDOUT, nlist) + show(stdout, nlist) yarncm = YarnManager(yarnhost=host, rmport=rmport, schedport=schedport, launch_timeout=60); - addprocs(yarncm; np=1, env=Dict("JULIA_PKGDIR"=>Pkg.dir())); + env = Dict{String,String}() + for envname in ("USER", "LD_LIBRARY_PATH", "USERNAME", "HOME", "PATH", "LOGNAME", "JULIA_LOAD_PATH", "LIBDIR") + if envname in keys(ENV) + env[envname] = ENV[envname] + end + end + if !("JULIA_LOAD_PATH" in keys(env)) + home = ENV["HOME"] + env["JULIA_LOAD_PATH"] = join(Base.LOAD_PATH, ':') * ":$(home)/.julia/dev:$(home)/.julia/packages" + end + if !("JULIA_DEPOT_PATH" in keys(env)) + env["JULIA_DEPOT_PATH"] = join(Base.DEPOT_PATH, ':') + end + println("starting workers with environment:") + for (n,v) in env + println(" - $n => $v") + end + + addprocs(yarncm; np=1, env=env); @test nprocs() == 2 if !limitedtestenv - addprocs(yarncm; np=2, env=Dict("JULIA_PKGDIR"=>Pkg.dir())); + addprocs(yarncm; np=2, env=env); @test nprocs() == 4 end