Skip to content

Commit

Permalink
Merge pull request #51 from JuliaParallel/tan/protoupgrade
Browse files Browse the repository at this point in the history
switch to use ProtoBuf 0.9
  • Loading branch information
tanmaykm authored Oct 21, 2020
2 parents b61ad27 + 72116ed commit 9f043f7
Show file tree
Hide file tree
Showing 29 changed files with 16,506 additions and 2,997 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ dist: trusty
os:
- linux
julia:
- 1.0
- 1.1
- 1.2
- 1.3
- 1.4
- 1.5
- nightly
notifications:
email: false
Expand Down
6 changes: 3 additions & 3 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ uuid = "0bb624de-12df-571d-ad84-47aef8b93290"
keywords = ["hadoop", "hdfs", "yarn", "client"]
license = "MIT"
desc = "Hadoop HDFS and Yarn client"
version = "0.4.0"
version = "0.5.0"

[deps]
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Expand All @@ -21,8 +21,8 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
julia = "1"
MbedTLS = "0.6.8, 0.7"
ProtoBuf = "0.7"
MbedTLS = "0.6.8, 0.7, 1"
ProtoBuf = "0.9"
URIParser = "0.4"

[extras]
Expand Down
18 changes: 10 additions & 8 deletions src/api_hdfs_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ function show(io::IO, st::HDFSFileInfo)
end

function _as_dict(obj, d=Dict{Symbol,Any}())
for name in fieldnames(typeof(obj))
d[name] = getfield(obj, name)
for name in propertynames(obj)
if hasproperty(obj, name)
d[name] = getproperty(obj, name)
end
end
d
end
Expand All @@ -138,17 +140,17 @@ function _walkdir(client::HDFSClient, path::AbstractString, process_entry::Funct
inp = GetListingRequestProto(src=path, startAfter=start_after, needLocation=false)
resp = getListing(client.nn_conn, inp)

if isfilled(resp, :dirList)
if hasproperty(resp, :dirList)
dir_list = resp.dirList
if isfilled(dir_list, :partialListing)
if hasproperty(dir_list, :partialListing)
partial_listing = dir_list.partialListing
for filestatus in partial_listing
cont = process_entry(filestatus)
(cont == false) && break
end
(cont == false) && break
end
if isfilled(dir_list, :remainingEntries) && (dir_list.remainingEntries > 0)
if hasproperty(dir_list, :remainingEntries) && (dir_list.remainingEntries > 0)
start_after = dir_list.partialListing[end].path
else
cont = false
Expand All @@ -164,7 +166,7 @@ function _get_file_info(client::HDFSClient, path::AbstractString)
path = abspath(client, path)
inp = GetFileInfoRequestProto(src=path)
resp = getFileInfo(client.nn_conn, inp)
isfilled(resp, :fs) ? resp.fs : nothing
hasproperty(resp, :fs) ? resp.fs : nothing
end

function _get_block_locations(client::HDFSClient, path::AbstractString, offset::UInt64=zero(UInt64), length::UInt64=zero(UInt64))
Expand All @@ -174,7 +176,7 @@ function _get_block_locations(client::HDFSClient, path::AbstractString, offset::
end
inp = GetBlockLocationsRequestProto(src=path, offset=offset, length=length)
resp = getBlockLocations(client.nn_conn, inp)
isfilled(resp, :locations) ? resp.locations : nothing
hasproperty(resp, :locations) ? resp.locations : nothing
end

#
Expand Down Expand Up @@ -362,7 +364,7 @@ function _create_file(client::HDFSClient, path::AbstractString, overwrite::Bool=
inp = CreateRequestProto(src=path, masked=perms, clientName=ELLY_CLIENTNAME, createFlag=createFlag, createParent=false, replication=replication, blockSize=blocksz)

resp = create(client.nn_conn, inp)
isfilled(resp, :fs) || (return nothing)
hasproperty(resp, :fs) || (return nothing)

if docomplete
_complete_file(client, path) || (return nothing)
Expand Down
2 changes: 1 addition & 1 deletion src/api_hdfs_io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ skip(reader::HDFSFileReader, n::Integer) = seek(reader, UInt64(n+position(reader
_block_has_offset(block::LocatedBlockProto, offset::UInt64, block_sz::UInt64) = (block.offset <= offset < (block.offset + block_sz))

function _find_block(blocks::LocatedBlocksProto, offset::UInt64, block_sz::UInt64)
if isfilled(blocks, :blocks)
if hasproperty(blocks, :blocks)
blockarr = blocks.blocks
for block in blockarr
if _block_has_offset(block, offset, block_sz)
Expand Down
16 changes: 8 additions & 8 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ function register(yam::YarnAppMaster)
end
yam.registration = resp

if isfilled(resp, :maximumCapability)
if hasproperty(resp, :maximumCapability)
yam.max_mem = resp.maximumCapability.memory
yam.max_cores = resp.maximumCapability.virtual_cores
end
@debug("max capability", mem=yam.max_mem, cores=yam.max_cores)
if isfilled(resp, :queue)
if hasproperty(resp, :queue)
yam.queue = resp.queue
end

Expand All @@ -148,7 +148,7 @@ can_schedule_mem(yam::YarnAppMaster) = can_schedule(yam, SchedulerResourceTypes.
can_schedule_cores(yam::YarnAppMaster) = can_schedule(yam, SchedulerResourceTypes.CPU)
function can_schedule(yam::YarnAppMaster, restype::Int32)
reg = yam.registration
!isfilled(reg, :scheduler_resource_types) && (return false)
!hasproperty(reg, :scheduler_resource_types) && (return false)
restype in reg.scheduler_resource_types
end

Expand Down Expand Up @@ -191,7 +191,7 @@ function container_start(yam::YarnAppMaster, container::ContainerProto, containe
release_connection(yam.nodes, nodeid, nm_conn, success)
end
success || throw(YarnException("Error starting container"))
isfilled(resp, :succeeded_requests) || throw(YarnException(isfilled(resp,:failed_requests) ? resp.failed_requests[1] : "Error starting container"))
hasproperty(resp, :succeeded_requests) || throw(YarnException(hasproperty(resp,:failed_requests) ? resp.failed_requests[1] : "Error starting container"))
cid = resp.succeeded_requests[1]
(cid == container.id) || throw(YarnException("Unexpected container id mismatch"))
set_busy(yam.containers, cid)
Expand All @@ -217,7 +217,7 @@ function container_stop(yam::YarnAppMaster, container::ContainerProto)
release_connection(yam.nodes, nodeid, nm_conn, success)
end
success || throw(YarnException("Error stopping container"))
isfilled(resp, :succeeded_requests) || throw(YarnException(isfilled(resp,:failed_requests) ? resp.failed_requests[1] : "Error stopping container"))
hasproperty(resp, :succeeded_requests) || throw(YarnException(hasproperty(resp,:failed_requests) ? resp.failed_requests[1] : "Error stopping container"))
cid = resp.succeeded_requests[1]
(cid == container.id) || throw(YarnException("Unexpected container id mismatch"))
set_free(yam.containers, cid)
Expand Down Expand Up @@ -248,15 +248,15 @@ function _update_rm(yam::YarnAppMaster)
# store/update tokens
channel = yam.amrm_conn.channel
ugi = channel.ugi
isfilled(resp, :am_rm_token) && add_token!(ugi, token_alias(channel), resp.am_rm_token)
if isfilled(resp, :nm_tokens)
hasproperty(resp, :am_rm_token) && add_token!(ugi, token_alias(channel), resp.am_rm_token)
if hasproperty(resp, :nm_tokens)
for nmtok in resp.nm_tokens
add_token!(ugi, token_alias(nmtok.nodeId), nmtok.token)
end
end

# update available headroom
if isfilled(resp, :limit)
if hasproperty(resp, :limit)
yam.available_mem = resp.limit.memory
yam.available_cores = resp.limit.virtual_cores
end
Expand Down
18 changes: 9 additions & 9 deletions src/api_yarn_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ end

function show(io::IO, serex::SerializedExceptionProto)
print(io, "Exception: ")
isfilled(serex, :class_name) && print(io, serex.class_name, ": ")
isfilled(serex, :message) && print(io, serex.message)
isfilled(serex, :trace) && print(io, '\n', serex.trace, '\n')
hasproperty(serex, :class_name) && print(io, serex.class_name, ": ")
hasproperty(serex, :message) && print(io, serex.message)
hasproperty(serex, :trace) && print(io, '\n', serex.trace, '\n')

if isfilled(serex, :cause)
if hasproperty(serex, :cause)
println(io, "Caused by:")
show(io, serex.cause)
end
Expand Down Expand Up @@ -146,8 +146,8 @@ function show(io::IO, nodes::YarnNodes)
end

function update(nodes::YarnNodes, arp::AllocateResponseProto)
isfilled(arp, :num_cluster_nodes) && (nodes.count = arp.num_cluster_nodes)
if isfilled(arp, :updated_nodes)
hasproperty(arp, :num_cluster_nodes) && (nodes.count = arp.num_cluster_nodes)
if hasproperty(arp, :updated_nodes)
for nrep in arp.updated_nodes
@debug("updating node status", nodeid=nrep.nodeId)
nodes.status[nrep.nodeId] = YarnNode(nrep)
Expand All @@ -157,7 +157,7 @@ function update(nodes::YarnNodes, arp::AllocateResponseProto)
end

function update(nodes::YarnNodes, gcnrp::GetClusterNodesResponseProto)
isfilled(gcnrp, :nodeReports) || return
hasproperty(gcnrp, :nodeReports) || return

nlist = gcnrp.nodeReports
nodes.count = length(nlist)
Expand Down Expand Up @@ -310,7 +310,7 @@ function update(containers::YarnContainers, arp::AllocateResponseProto)
cballoc = containers.on_container_alloc
cbfinish = containers.on_container_finish

if isfilled(arp, :allocated_containers)
if hasproperty(arp, :allocated_containers)
for cont in arp.allocated_containers
id = cont.id
idstr = container_id_string(id)
Expand All @@ -320,7 +320,7 @@ function update(containers::YarnContainers, arp::AllocateResponseProto)
(cballoc === nothing) || @async cballoc(id)
end
end
if isfilled(arp, :completed_container_statuses)
if hasproperty(arp, :completed_container_statuses)
@debug("have completed containers")
for contst in arp.completed_container_statuses
id = contst.container_id
Expand Down
20 changes: 10 additions & 10 deletions src/api_yarn_client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ show(io::IO, client::YarnClient) = show(io, client.rm_conn)
function nodecount(client::YarnClient)
inp = GetClusterMetricsRequestProto()
resp = getClusterMetrics(client.rm_conn, inp)
isfilled(resp, :cluster_metrics) ? resp.cluster_metrics.num_node_managers : 0
hasproperty(resp, :cluster_metrics) ? resp.cluster_metrics.num_node_managers : 0
end

function nodes(client::YarnClient; all::Bool=false, nodelist::YarnNodes=YarnNodes(client.rm_conn.channel.ugi))
Expand Down Expand Up @@ -65,27 +65,27 @@ end
function show(io::IO, status::YarnAppStatus)
report = status.report

if isfilled(report, :final_application_status) && report.final_application_status > 0
if hasproperty(report, :final_application_status) && report.final_application_status > 0
final_state = "-$(FINAL_APP_STATES[report.final_application_status])"
else
final_state = ""
end
if isfilled(report, :progress)
if hasproperty(report, :progress)
final_state *= "-$(report.progress)"
end
println(io, "YarnApp $(report.applicationType) ($(report.name)/$(report.applicationId.id)): $(APP_STATES[report.yarn_application_state])$(final_state)")
println(io, " location: $(report.user)@$(report.host):$(report.rpc_port)/$(report.queue)")
if report.yarn_application_state > YarnApplicationStateProto.RUNNING
println(io, " time: $(report.startTime) to $(report.finishTime)")
if isfilled(report, :app_resource_Usage)
if hasproperty(report, :app_resource_Usage)
rusage = report.app_resource_Usage
println(io, " rusage:")
println(io, " mem,vcore seconds: $(rusage.memory_seconds), $(rusage.vcore_seconds)")
println(io, " containers: used $(rusage.num_used_containers), reserved $(rusage.num_reserved_containers)")
println(io, " mem: used $(rusage.used_resources.memory), reserved $(rusage.reserved_resources.memory), needed $(rusage.needed_resources.memory)")
println(io, " vcores: used $(rusage.used_resources.virtual_cores), reserved $(rusage.reserved_resources.virtual_cores), needed $(rusage.needed_resources.virtual_cores)")
end
if isfilled(report, :diagnostics)
if hasproperty(report, :diagnostics)
println(io, " diagnostics: $(report.diagnostics)")
end
elseif report.yarn_application_state == YarnApplicationStateProto.RUNNING
Expand Down Expand Up @@ -114,7 +114,7 @@ function show(io::IO, status::YarnAppAttemptStatus)
atmpt_id = report.application_attempt_id

atmpt_str = "$(atmpt_id.application_id.id)"
if isfilled(report, :am_container_id)
if hasproperty(report, :am_container_id)
atmpt_str *= "/$(report.am_container_id.id)"
else
atmpt_str *= "/-"
Expand All @@ -123,7 +123,7 @@ function show(io::IO, status::YarnAppAttemptStatus)

println(io, "YarnAppAttempt $(atmpt_str): $(ATTEMPT_STATES[report.yarn_application_attempt_state])")
println(io, " location: $(report.host):$(report.rpc_port)")
if isfilled(report, :diagnostics)
if hasproperty(report, :diagnostics)
println(io, " diagnostics: $(report.diagnostics)")
end
nothing
Expand Down Expand Up @@ -187,7 +187,7 @@ function submit(client::YarnClient, container_spec::ContainerLaunchContextProto;
# the application master would need these environment variables to initialize itself
# TODO: we need ability to read hadoop configuration to avoid this
rm_chan = client.rm_conn.channel
isdefined(container_spec, :environment) || (container_spec.environment = StringStringMapProto[])
hasproperty(container_spec, :environment) || (container_spec.environment = StringStringMapProto[])
push!(container_spec.environment, StringStringMapProto(; key="JULIA_YARN_RESOURCEMANAGER_ADDRESS", value="$(rm_chan.host):$(rm_chan.port)"))
push!(container_spec.environment, StringStringMapProto(; key="JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS", value=schedaddr))
end
Expand Down Expand Up @@ -227,7 +227,7 @@ function status(app::YarnApp, refresh::Bool=true)
inp = GetApplicationReportRequestProto(application_id=app.appid)

resp = getApplicationReport(client.rm_conn, inp)
app.status = isfilled(resp.application_report) ? YarnAppStatus(resp.application_report) : nothing
app.status = hasproperty(resp, :application_report) ? YarnAppStatus(resp.application_report) : nothing
end
app.status
end
Expand Down Expand Up @@ -263,7 +263,7 @@ function attempts(app::YarnApp, refresh::Bool=true)
resp = getApplicationAttempts(client.rm_conn, inp)
atmptlist = app.attempts
empty!(atmptlist)
if isfilled(resp.application_attempts)
if hasproperty(resp, :application_attempts)
for atmpt in resp.application_attempts
push!(atmptlist, YarnAppAttemptStatus(atmpt))
end
Expand Down
4 changes: 2 additions & 2 deletions src/containerid.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ function parse_container_id(cidstr::String)
end

function container_id_string(cid::ContainerIdProto)
app_id = isdefined(cid, :app_id) ? cid.app_id : cid.app_attempt_id.application_id
attempt_id = isdefined(cid, :app_attempt_id) ? cid.app_attempt_id.attemptId : 0
app_id = hasproperty(cid, :app_id) ? cid.app_id : cid.app_attempt_id.application_id
attempt_id = hasproperty(cid, :app_attempt_id) ? cid.app_attempt_id.attemptId : 0
id = cid.id
epoch = id >> 40
id = CONTAINER_ID_BITMASK & id
Expand Down
Loading

2 comments on commit 9f043f7

@tanmaykm
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/23389

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.5.0 -m "<description of version>" 9f043f7d3d11e474ceaf29f308e5e3086fed9bc1
git push origin v0.5.0

Please sign in to comment.