Skip to content

Commit

Permalink
Merge pull request #37 from JuliaParallel/tanmaykm
Browse files Browse the repository at this point in the history
port over to Julia 0.7, drop Julia 0.6 support
  • Loading branch information
tanmaykm authored Jul 31, 2018
2 parents 1b911e8 + 7ed1493 commit 46f64a8
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 387 deletions.
8 changes: 2 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ dist: trusty
os:
- linux
julia:
- 0.6
- 0.7
- nightly
matrix:
fast_finish: true
allow_failures:
- julia: nightly
notifications:
email: false
script:
- if [[ -a .git/shallow ]]; then git fetch --unshallow; fi
- git clone -b 'v0.0.3' --single-branch https://github.com/JuliaCI/PkgEvalHadoopEnv.git ./test/test_setup
- git clone -b 'v0.0.5' --single-branch https://github.com/JuliaCI/PkgEvalHadoopEnv.git ./test/test_setup
- ./test/test_setup/hadoop/setup_hdfs.sh
- julia --check-bounds=yes -e 'Pkg.clone(pwd()); Pkg.build("Elly"); Pkg.test("Elly"; coverage=true)'
after_success:
Expand Down
5 changes: 2 additions & 3 deletions REQUIRE
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
julia 0.6.0
Compat 0.17.0
julia 0.7.0-
Compat 0.70.0
ProtoBuf
URIParser
CRC
MbedTLS
2 changes: 1 addition & 1 deletion YARN.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ julia> function on_finish(cid)
end
on_finish (generic function with 1 method)
julia> callback(yarnam, Nullable(on_alloc), Nullable(on_finish))
julia> callback(yarnam, on_alloc, on_finish)
julia> yarnapp = submit(yarnclnt, yarnam)
YarnApp YARN (EllyApp/2): accepted-0.0
Expand Down
48 changes: 16 additions & 32 deletions src/Elly.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ module Elly
using Compat
using ProtoBuf
using URIParser
using CRC
using MbedTLS

import Base: connect, readdir, show, isfile, isdir, islink, stat, filesize, filemode, mtime, mkdir, mkpath,
mv, rm, abspath, cd, pwd, touch, open, nb_available, cp, joinpath, dirname,
eof, position, seek, seekend, seekstart, skip, read, write, read!, close,
launch, manage, convert
# using from stdlib
using CRC32c
using Random
using UUIDs
using Base64
using Dates
using Sockets
using Serialization
import Sockets: connect
using Distributed
import Distributed: launch, manage

import Base: readdir, show, isfile, isdir, islink, stat, filesize, filemode, mtime, mkdir, mkpath,
mv, rm, abspath, cd, pwd, touch, open, bytesavailable, cp, joinpath, dirname,
eof, position, seek, seekend, seekstart, skip, read, write, read!, close, convert
import ProtoBuf: write_bytes, read_bytes, call_method
import URIParser: URI

if VERSION >= v"0.7.0-"
using CRC32c
end

export show, convert, URI

export UserGroupInformation, add_token, find_tokens, username
Expand All @@ -28,7 +34,7 @@ export HDFSClient, HDFSFile, HDFSFileInfo,
hdfs_status, hdfs_capacity, hdfs_capacity_used, hdfs_capacity_remaining, hdfs_renewlease,
isfile, isdir, islink, stat, filesize, filemode, mtime, atime, du, exists, readdir,
mkdir, mkpath, touch, mv, rm, abspath, cd, pwd, joinpath, dirname,
eof, position, seek, seekend, seekstart, skip, nb_available,
eof, position, seek, seekend, seekstart, skip, bytesavailable,
read!, read, write, readbytes, readall, open, close, cp

export YarnClient, YarnNode, YarnApp, YarnAppStatus, YarnAppAttemptStatus, nodecount, nodes, launchcontext, submit, kill, status, attempts, am_rm_token
Expand All @@ -40,28 +46,6 @@ export YarnManager, launch, manage
const Lock = Channel
makelock() = Channel{Int}(1)

function tstr()
t = time()
string(Libc.strftime("%Y-%m-%dT%H:%M:%S",t), Libc.strftime("%z",t)[1:end-2], ":", Libc.strftime("%z",t)[end-1:end])
end

# enable logging only during debugging
#using Logging
##const logger = Logging.configure(filename="elly.log", level=DEBUG)
#const logger = Logging.configure(level=DEBUG)
#macro logmsg(s)
# quote
# debug("[", myid(), "-", "] ", $(esc(s)))
# end
#end
macro logmsg(s)
end
#macro logmsg(s)
# quote
# info(tstr(), " [", myid(), "-", "] ", $(esc(s)))
# end
#end

include("hadoop/hadoop.jl")
using Elly.hadoop
using Elly.hadoop.common
Expand Down
67 changes: 33 additions & 34 deletions src/api_hdfs_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ It also stores the folder context for using relative paths in APIs that use the
mutable struct HDFSClient
nn_conn::HDFSProtocol
wd::AbstractString
server_defaults::Nullable{FsServerDefaultsProto}
server_defaults::Union{Nothing,FsServerDefaultsProto}

function HDFSClient(host::AbstractString, port::Integer, ugi::UserGroupInformation=UserGroupInformation())
nn_conn = HDFSProtocol(host, port, ugi)
new(nn_conn, "/", Nullable{FsServerDefaultsProto}())
new(nn_conn, "/", nothing)
end
end

Expand Down Expand Up @@ -98,7 +98,7 @@ mutable struct HDFSFileInfo
last_mod::UInt64
last_access::UInt64

HDFSFileInfo(fs::HdfsFileStatusProto) = new(fs.fileType, String(fs.path),
HDFSFileInfo(fs::HdfsFileStatusProto) = new(fs.fileType, String(copy(fs.path)),
fs.length, fs.block_replication, fs.blocksize,
fs.owner, fs.group, fs.permission.perm,
fs.modification_time, fs.access_time)
Expand Down Expand Up @@ -163,7 +163,7 @@ function _get_file_info(client::HDFSClient, path::AbstractString)
path = abspath(client, path)
inp = protobuild(GetFileInfoRequestProto, Dict(:src => path))
resp = getFileInfo(client.nn_conn, inp)
isfilled(resp, :fs) ? Nullable{HdfsFileStatusProto}(resp.fs) : Nullable{HdfsFileStatusProto}()
isfilled(resp, :fs) ? resp.fs : nothing
end

function _get_block_locations(client::HDFSClient, path::AbstractString, offset::UInt64=zero(UInt64), length::UInt64=zero(UInt64))
Expand All @@ -175,18 +175,18 @@ function _get_block_locations(client::HDFSClient, path::AbstractString, offset::
:offset => offset,
:length => length))
resp = getBlockLocations(client.nn_conn, inp)
isfilled(resp, :locations) ? Nullable{LocatedBlocksProto}(resp.locations) : Nullable{LocatedBlocksProto}()
isfilled(resp, :locations) ? resp.locations : nothing
end

#
# Server defaults
function _get_server_defaults(client::HDFSClient)
if isnull(client.server_defaults)
if client.server_defaults === nothing
inp = GetServerDefaultsRequestProto()
resp = getServerDefaults(client.nn_conn, inp)
client.server_defaults = Nullable(resp.serverDefaults)
client.server_defaults = resp.serverDefaults
end
get(client.server_defaults)
client.server_defaults
end

hdfs_server_defaults(client::HDFSClient) = _as_dict(_get_server_defaults(client))
Expand All @@ -208,14 +208,14 @@ hdfs_capacity_remaining(client::HDFSClient) = _get_fs_status(client).remaining
stat(file::HDFSFile) = stat(file.client, file.path)
function stat(client::HDFSClient, path::AbstractString)
fileinfo = _get_file_info(client, path)
isnull(fileinfo) && throw(HDFSException("Path not found $path"))
hdfs_file_info = HDFSFileInfo(get(fileinfo))
(fileinfo === nothing) && throw(HDFSException("Path not found $path"))
hdfs_file_info = HDFSFileInfo(fileinfo)
hdfs_file_info.name = path
hdfs_file_info
end

exists(file::HDFSFile) = exists(file.client, file.path)
exists(client::HDFSClient, path::AbstractString) = !isnull(_get_file_info(client, path))
exists(client::HDFSClient, path::AbstractString) = (_get_file_info(client, path) !== nothing)

isdir(file::HDFSFile) = isdir(file.client, file.path)
isdir(client::HDFSClient, path::AbstractString) = isdir(stat(client, path))
Expand Down Expand Up @@ -248,9 +248,8 @@ atime(fileinfo::HDFSFileInfo) = fileinfo.last_access
hdfs_blocks(file::HDFSFile, offset::Integer=0, length::Integer=typemax(Int)) = hdfs_blocks(file.client, file.path, offset, length)
function hdfs_blocks(client::HDFSClient, path::AbstractString, offset::Integer=0, length::Integer=typemax(Int))
blocks = Tuple{UInt64,Array}[]
_locations = _get_block_locations(client, path, UInt64(offset), UInt64(length))
isnull(_locations) && (return blocks)
locations = get(_locations)
locations = _get_block_locations(client, path, UInt64(offset), UInt64(length))
(locations === nothing) && (return blocks)
for block in locations.blocks
block.corrupt && throw(HDFSException("Corrupt block found at offset $(block.offset)"))
node_ips = AbstractString[]
Expand Down Expand Up @@ -283,20 +282,20 @@ function _get_content_summary(client::HDFSClient, path::AbstractString)
resp.summary
end

du(file::HDFSFile, details::Nullable{Dict{Symbol,Any}}=Nullable{Dict{Symbol,Any}}()) = du(file.client, file.path, details)
function du(client::HDFSClient, path::AbstractString=".", details::Nullable{Dict{Symbol,Any}}=Nullable{Dict{Symbol,Any}}())
du(file::HDFSFile, details::Union{Nothing,Dict{Symbol,Any}}=nothing) = du(file.client, file.path, details)
function du(client::HDFSClient, path::AbstractString=".", details::Union{Nothing,Dict{Symbol,Any}}=nothing)
summary = _get_content_summary(client, path)
isnull(details) || _as_dict(summary, get(details))
(details === nothing) || _as_dict(summary, details)
summary.length
end

#
# File create, delete, list
readdir(file::HDFSFile, limit::Int=typemax(Int)) = readdir(file.client, file.path, limit)
function readdir(client::HDFSClient, path::AbstractString=".", limit::Int=typemax(Int))
result = AbstractString[]
result = String[]
_walkdir(client, path, (filestatus)->begin
push!(result, String(filestatus.path))
push!(result, String(copy(filestatus.path)))
(length(result) < limit)
end)
result
Expand Down Expand Up @@ -376,40 +375,40 @@ function _create_file(client::HDFSClient, path::AbstractString, overwrite::Bool=
:blockSize => blocksz))

resp = create(client.nn_conn, inp)
isfilled(resp, :fs) || (return Nullable{HdfsFileStatusProto}())
isfilled(resp, :fs) || (return nothing)

if docomplete
_complete_file(client, path) || (return Nullable{HdfsFileStatusProto}())
_complete_file(client, path) || (return nothing)
end

return Nullable(resp.fs)
return resp.fs
end

function _complete_file(client::HDFSClient, path::AbstractString, last::Nullable{ExtendedBlockProto}=Nullable{ExtendedBlockProto}())
function _complete_file(client::HDFSClient, path::AbstractString, last::Union{Nothing,ExtendedBlockProto}=nothing)
path = abspath(client, path)

endinp = protobuild(CompleteRequestProto, Dict(:src => path,
:clientName => ELLY_CLIENTNAME))
if !isnull(last)
set_field!(endinp, :last, get(last))
@logmsg("setting last block as $(get(last))")
if last !== nothing
set_field!(endinp, :last, last)
@debug("setting last block as $(last)")
end

endresp = complete(client.nn_conn, endinp)
endresp.result
end

function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Nullable{T}=Nullable{T}()) where T<:LocatedBlockProto
isnull(previous) && (return _add_block(ExtendedBlockProto, client, path))
@logmsg("adding next block to $(get(previous).b)")
_add_block(ExtendedBlockProto, client, path, Nullable(get(previous).b))
function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:LocatedBlockProto
(previous === nothing) && (return _add_block(ExtendedBlockProto, client, path))
@debug("adding next block to $(previous.b)")
_add_block(ExtendedBlockProto, client, path, previous.b)
end
function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Nullable{T}=Nullable{T}()) where T<:ExtendedBlockProto
function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:ExtendedBlockProto
path = abspath(client, path)

inp = protobuild(AddBlockRequestProto, Dict(:src => path,
:clientName => ELLY_CLIENTNAME))
isnull(previous) || set_field!(inp, :previous, get(previous))
(previous === nothing) || set_field!(inp, :previous, previous)

resp = addBlock(client.nn_conn, inp)
return resp.block
Expand All @@ -430,15 +429,15 @@ touch(file::HDFSFile, replication::UInt32=zero(UInt32), blocksz::UInt64=zero(UIn
function touch(client::HDFSClient, path::AbstractString, replication::UInt32=zero(UInt32), blocksz::UInt64=zero(UInt64), mode::UInt32=DEFAULT_FILE_MODE)
if exists(client, path)
path = abspath(client, path)
t = UInt64(Base.Dates.datetime2unix(now(Base.Dates.UTC))*1000)
t = UInt64(datetime2unix(now(UTC))*1000)
inp = protobuild(SetTimesRequestProto, Dict(:src => path,
:mtime => t,
:atime => t))

setTimes(client.nn_conn, inp)
else
fs = _create_file(client, path, false, replication, blocksz, mode)
isnull(fs) && throw(HDFSException("Could not create file $path"))
(fs === nothing) && throw(HDFSException("Could not create file $path"))
end
nothing
end
Loading

0 comments on commit 46f64a8

Please sign in to comment.