Skip to content

Commit

Permalink
Merge pull request #18 from JuliaParallel/vc/errors
Browse files Browse the repository at this point in the history
Add `check` macro
  • Loading branch information
vchuravy authored Feb 1, 2021
2 parents 889c2b6 + 7ad26dc commit 6a369d8
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 38 deletions.
57 changes: 23 additions & 34 deletions src/UCX.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ UCS_PTR_STATUS(ptr::Ptr{Cvoid}) = API.ucs_status_t(reinterpret(UInt, ptr))
UCS_PTR_IS_ERR(ptr::Ptr{Cvoid}) = uintptr_t(ptr) >= uintptr_t(API.UCS_ERR_LAST)
UCS_PTR_IS_PTR(ptr::Ptr{Cvoid}) = (uintptr_t(ptr) - 1) < (uintptr_t(API.UCS_ERR_LAST) - 1)

struct UCXError
ctx::String
struct UCXException <: Exception
status::API.ucs_status_t
end

macro check(ex)
quote
status = $(esc(ex))
if status != API.UCS_OK
throw(UCXException(status))
end
end
end

function version()
major = Ref{Cuint}()
minor = Ref{Cuint}()
Expand All @@ -57,8 +65,7 @@ mutable struct UCXConfig

function UCXConfig(;kwargs...)
r_handle = Ref{Ptr{API.ucp_config_t}}()
status = API.ucp_config_read(C_NULL, C_NULL, r_handle) # XXX: Prefix is broken
@assert status == API.UCS_OK
@check API.ucp_config_read(C_NULL, C_NULL, r_handle) # XXX: Prefix is broken

config = new(r_handle[])
finalizer(config) do config
Expand All @@ -74,8 +81,7 @@ mutable struct UCXConfig
end

function Base.setindex!(config::UCXConfig, value::String, key::Union{String, Symbol})
status = API.ucp_config_modify(config.handle, key, value)
@assert status == API.UCS_OK
@check API.ucp_config_modify(config.handle, key, value)
return value
end

Expand Down Expand Up @@ -128,9 +134,8 @@ mutable struct UCXContext

r_handle = Ref{API.ucp_context_h}()
# UCP.ucp_init is a header function so we call, UCP.ucp_init_version
status = API.ucp_init_version(API.UCP_API_MAJOR, API.UCP_API_MINOR,
params, config.handle, r_handle)
@assert status === API.UCS_OK
@check API.ucp_init_version(API.UCP_API_MAJOR, API.UCP_API_MINOR,
params, config.handle, r_handle)

context = new(r_handle[], parse(Dict, config))

Expand Down Expand Up @@ -176,8 +181,7 @@ mutable struct UCXWorker
set!(params, :thread_mode, thread_mode)

r_handle = Ref{API.ucp_worker_h}()
status = API.ucp_worker_create(context.handle, params, r_handle)
@assert status === API.UCS_OK
@check API.ucp_worker_create(context.handle, params, r_handle)

worker = new(r_handle[], context)
finalizer(worker) do worker
Expand All @@ -191,15 +195,6 @@ function progress(worker::UCXWorker)
API.ucp_worker_progress(worker.handle) !== 0
end

function arm(worker::UCXWorker)
status = API.ucp_worker_arm(worker.handle)
if status == API.UCS_ERR_BUSY
return false
end
@assert status == API.UCS_OK
return true
end

struct UCXConnectionRequest
handle::API.ucp_conn_request_h
end
Expand Down Expand Up @@ -241,8 +236,7 @@ function UCXEndpoint(worker::UCXWorker, ip::IPv4, port)

# TODO: Error callback

status = API.ucp_ep_create(worker.handle, params, r_handle)
@assert status == API.UCS_OK
@check API.ucp_ep_create(worker.handle, params, r_handle)
end

UCXEndpoint(worker, r_handle[])
Expand All @@ -262,8 +256,7 @@ function UCXEndpoint(worker::UCXWorker, conn_request::UCXConnectionRequest)
# TODO: Error callback

r_handle = Ref{API.ucp_ep_h}()
status = API.ucp_ep_create(worker.handle, params, r_handle)
@assert status == API.UCS_OK
@check API.ucp_ep_create(worker.handle, params, r_handle)

UCXEndpoint(worker, r_handle[])
end
Expand Down Expand Up @@ -303,17 +296,15 @@ mutable struct UCXListener
set!(params, :sockaddr, ucs_sockaddr)
set!(params, :conn_handler, conn_handler)

status = API.ucp_listener_create(worker.handle, params, r_handle)
@assert status === API.UCS_OK
@check API.ucp_listener_create(worker.handle, params, r_handle)
end

new(r_handle[], worker, port)
end
end

function reject(listener::UCXListener, conn_request::UCXConnectionRequest)
status = API.ucp_listener_reject(listener.handle, conn_request.handle)
@assert status === API.UCS_OK
@check API.ucp_listener_reject(listener.handle, conn_request.handle)
end

function ucp_dt_make_contig(elem_size)
Expand All @@ -335,20 +326,18 @@ end
# Current implementation is blocking
handle_request(ep::UCXEndpoint, ptr) = handle_request(ep.worker, ptr)
function handle_request(worker::UCXWorker, ptr)
if ptr === C_NULL
return API.UCS_OK
elseif UCS_PTR_IS_ERR(ptr)
return UCS_PTR_STATUS(ptr)
else
if UCS_PTR_IS_PTR(ptr)
status = API.ucp_request_check_status(ptr)
while(status === API.UCS_INPROGRESS)
progress(worker)
yield()
status = API.ucp_request_check_status(ptr)
end
API.ucp_request_free(ptr)
return status
else
status = UCS_PTR_STATUS(ptr)
end
@check status
end


Expand Down
8 changes: 4 additions & 4 deletions src/api.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module API
using UCX_jll
using CEnum
include("ctypes.jl")
include(joinpath("api", "ctypes.jl"))

# For now we only wrap UCP
include(joinpath(@__DIR__, "..", "gen", "libucs_common_minimal.jl"))
include(joinpath(@__DIR__, "..", "gen", "libucp_common.jl"))
include(joinpath(@__DIR__, "..", "gen", "libucp_api.jl"))
include(joinpath("api", "libucs_common_minimal.jl"))
include(joinpath("api", "libucp_common.jl"))
include(joinpath("api", "libucp_api.jl"))
end
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 6a369d8

Please sign in to comment.