Skip to content

Commit

Permalink
Add a convenience object for expressing once-like / per-runtime patte…
Browse files Browse the repository at this point in the history
…rns (#55793)

This adds 3 new types, to conveniently express 3 common concurrent code
patterns:

 - `PerProcess`: an action that must be taken once per process
 - `PerThread`: an action that must be taken once per thread id
 - `PerTask`: an action that must be take once per task object

The PerProcess object should replace `__init__` or similar hand rolled
implementations of this.
The PerThread object should replace code that used to use `nthreads()`
to implement a much less correct version of this (though this is not
recommended in most new code, some foreign libraries may need this to
interact well with C).
The PerTask object is simply a thin wrapper over `task_local_storage()`.
  • Loading branch information
vtjnash authored Oct 17, 2024
2 parents eb3ed5e + 9d56856 commit d36417b
Show file tree
Hide file tree
Showing 12 changed files with 563 additions and 9 deletions.
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ variables. ([#53742]).
Multi-threading changes
-----------------------

* New types are defined to handle the pattern of code that must run once per process, called
a `OncePerProcess{T}` type, which allows defining a function that should be run exactly once
the first time it is called, and then always return the same result value of type `T`
every subsequent time afterwards. There are also `OncePerThread{T}` and `OncePerTask{T}` types for
similar usage with threads or tasks. ([#TBD])

Build system changes
--------------------

Expand Down
2 changes: 2 additions & 0 deletions base/docs/basedocs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ runtime initialization functions of external C libraries and initializing global
that involve pointers returned by external libraries.
See the [manual section about modules](@ref modules) for more details.
See also: [`OncePerProcess`](@ref).
# Examples
```julia
const foo_data_ptr = Ref{Ptr{Cvoid}}(0)
Expand Down
3 changes: 3 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export
OrdinalRange,
Pair,
PartialQuickSort,
OncePerProcess,
OncePerTask,
OncePerThread,
PermutedDimsArray,
QuickSort,
Rational,
Expand Down
282 changes: 282 additions & 0 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

const ThreadSynchronizer = GenericCondition{Threads.SpinLock}

"""
current_task()
Get the currently running [`Task`](@ref).
"""
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())

# Advisory reentrant lock
"""
ReentrantLock()
Expand Down Expand Up @@ -570,3 +577,278 @@ end
import .Base: Event
export Event
end

const PerStateInitial = 0x00
const PerStateHasrun = 0x01
const PerStateErrored = 0x02
const PerStateConcurrent = 0x03

"""
OncePerProcess{T}(init::Function)() -> T
Calling a `OncePerProcess` object returns a value of type `T` by running the
function `initializer` exactly once per process. All concurrent and future
calls in the same process will return exactly the same value. This is useful in
code that will be precompiled, as it allows setting up caches or other state
which won't get serialized.
## Example
```jldoctest
julia> const global_state = Base.OncePerProcess{Vector{UInt32}}() do
println("Making lazy global value...done.")
return [Libc.rand()]
end;
julia> (procstate = global_state()) |> typeof
Making lazy global value...done.
Vector{UInt32} (alias for Array{UInt32, 1})
julia> procstate === global_state()
true
julia> procstate === fetch(@async global_state())
true
```
"""
mutable struct OncePerProcess{T, F}
value::Union{Nothing,T}
@atomic state::UInt8 # 0=initial, 1=hasrun, 2=error
@atomic allow_compile_time::Bool
const initializer::F
const lock::ReentrantLock

function OncePerProcess{T,F}(initializer::F) where {T, F}
once = new{T,F}(nothing, PerStateInitial, true, initializer, ReentrantLock())
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :value, nothing)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :state, PerStateInitial)
return once
end
end
OncePerProcess{T}(initializer::F) where {T, F} = OncePerProcess{T, F}(initializer)
OncePerProcess(initializer) = OncePerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline function (once::OncePerProcess{T})() where T
state = (@atomic :acquire once.state)
if state != PerStateHasrun
(@noinline function init_perprocesss(once, state)
state == PerStateErrored && error("OncePerProcess initializer failed previously")
once.allow_compile_time || __precompile__(false)
lock(once.lock)
try
state = @atomic :monotonic once.state
if state == PerStateInitial
once.value = once.initializer()
elseif state == PerStateErrored
error("OncePerProcess initializer failed previously")
elseif state != PerStateHasrun
error("invalid state for OncePerProcess")
end
catch
state == PerStateErrored || @atomic :release once.state = PerStateErrored
unlock(once.lock)
rethrow()
end
state == PerStateHasrun || @atomic :release once.state = PerStateHasrun
unlock(once.lock)
nothing
end)(once, state)
end
return once.value::T
end

function copyto_monotonic!(dest::AtomicMemory, src)
i = 1
for j in eachindex(src)
if isassigned(src, j)
@atomic :monotonic dest[i] = src[j]
#else
# _unsetindex_atomic!(dest, i, src[j], :monotonic)
end
i += 1
end
dest
end

function fill_monotonic!(dest::AtomicMemory, x)
for i = 1:length(dest)
@atomic :monotonic dest[i] = x
end
dest
end


# share a lock/condition, since we just need it briefly, so some contention is okay
const PerThreadLock = ThreadSynchronizer()
"""
OncePerThread{T}(init::Function)() -> T
Calling a `OncePerThread` object returns a value of type `T` by running the function
`initializer` exactly once per thread. All future calls in the same thread, and
concurrent or future calls with the same thread id, will return exactly the
same value. The object can also be indexed by the threadid for any existing
thread, to get (or initialize *on this thread*) the value stored for that
thread. Incorrect usage can lead to data-races or memory corruption so use only
if that behavior is correct within your library's threading-safety design.
!!! warning
It is not necessarily true that a Task only runs on one thread, therefore the value
returned here may alias other values or change in the middle of your program. This function
may get deprecated in the future. If initializer yields, the thread running the current
task after the call might not be the same as the one at the start of the call.
See also: [`OncePerTask`](@ref).
## Example
```jldoctest
julia> const thread_state = Base.OncePerThread{Vector{UInt32}}() do
println("Making lazy thread value...done.")
return [Libc.rand()]
end;
julia> (threadvec = thread_state()) |> typeof
Making lazy thread value...done.
Vector{UInt32} (alias for Array{UInt32, 1})
julia> threadvec === fetch(@async thread_state())
true
julia> threadvec === thread_state[Threads.threadid()]
true
```
"""
mutable struct OncePerThread{T, F}
@atomic xs::AtomicMemory{T} # values
@atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent
const initializer::F

function OncePerThread{T,F}(initializer::F) where {T, F}
xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}()
once = new{T,F}(xs, ss, initializer)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :xs, xs)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :ss, ss)
return once
end
end
OncePerThread{T}(initializer::F) where {T, F} = OncePerThread{T,F}(initializer)
OncePerThread(initializer) = OncePerThread{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline (once::OncePerThread)() = once[Threads.threadid()]
@inline function getindex(once::OncePerThread, tid::Integer)
tid = Int(tid)
ss = @atomic :acquire once.ss
xs = @atomic :monotonic once.xs
# n.b. length(xs) >= length(ss)
if tid <= 0 || tid > length(ss) || (@atomic :acquire ss[tid]) != PerStateHasrun
(@noinline function init_perthread(once, tid)
local ss = @atomic :acquire once.ss
local xs = @atomic :monotonic once.xs
local len = length(ss)
# slow path to allocate it
nt = Threads.maxthreadid()
0 < tid <= nt || throw(ArgumentError("thread id outside of allocated range"))
if tid <= length(ss) && (@atomic :acquire ss[tid]) == PerStateErrored
error("OncePerThread initializer failed previously")
end
newxs = xs
newss = ss
if tid > len
# attempt to do all allocations outside of PerThreadLock for better scaling
@assert length(xs) >= length(ss) "logical constraint violation"
newxs = typeof(xs)(undef, len + nt)
newss = typeof(ss)(undef, len + nt)
end
# uses state and locks to ensure this runs exactly once per tid argument
lock(PerThreadLock)
try
ss = @atomic :monotonic once.ss
xs = @atomic :monotonic once.xs
if tid > length(ss)
@assert len <= length(ss) <= length(newss) "logical constraint violation"
fill_monotonic!(newss, PerStateInitial)
xs = copyto_monotonic!(newxs, xs)
ss = copyto_monotonic!(newss, ss)
@atomic :release once.xs = xs
@atomic :release once.ss = ss
end
state = @atomic :monotonic ss[tid]
while state == PerStateConcurrent
# lost race, wait for notification this is done running elsewhere
wait(PerThreadLock) # wait for initializer to finish without releasing this thread
ss = @atomic :monotonic once.ss
state = @atomic :monotonic ss[tid]
end
if state == PerStateInitial
# won the race, drop lock in exchange for state, and run user initializer
@atomic :monotonic ss[tid] = PerStateConcurrent
result = try
unlock(PerThreadLock)
once.initializer()
catch
lock(PerThreadLock)
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = PerStateErrored
notify(PerThreadLock)
rethrow()
end
# store result and notify waiters
lock(PerThreadLock)
xs = @atomic :monotonic once.xs
@atomic :release xs[tid] = result
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = PerStateHasrun
notify(PerThreadLock)
elseif state == PerStateErrored
error("OncePerThread initializer failed previously")
elseif state != PerStateHasrun
error("invalid state for OncePerThread")
end
finally
unlock(PerThreadLock)
end
nothing
end)(once, tid)
xs = @atomic :monotonic once.xs
end
return xs[tid]
end

"""
OncePerTask{T}(init::Function)() -> T
Calling a `OncePerTask` object returns a value of type `T` by running the function `initializer`
exactly once per Task. All future calls in the same Task will return exactly the same value.
See also: [`task_local_storage`](@ref).
## Example
```jldoctest
julia> const task_state = Base.OncePerTask{Vector{UInt32}}() do
println("Making lazy task value...done.")
return [Libc.rand()]
end;
julia> (taskvec = task_state()) |> typeof
Making lazy task value...done.
Vector{UInt32} (alias for Array{UInt32, 1})
julia> taskvec === task_state()
true
julia> taskvec === fetch(@async task_state())
Making lazy task value...done.
false
```
"""
mutable struct OncePerTask{T, F}
const initializer::F

OncePerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
OncePerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
OncePerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
end
@inline (once::OncePerTask)() = get!(once.initializer, task_local_storage(), once)
7 changes: 0 additions & 7 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ macro task(ex)
:(Task($thunk))
end

"""
current_task()
Get the currently running [`Task`](@ref).
"""
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())

# task states

const task_state_runnable = UInt8(0)
Expand Down
3 changes: 3 additions & 0 deletions doc/src/base/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Main.include
Base.include_string
Base.include_dependency
__init__
Base.OncePerProcess
Base.OncePerTask
Base.OncePerThread
Base.which(::Any, ::Any)
Base.methods
Base.@show
Expand Down
2 changes: 1 addition & 1 deletion src/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ static inline size_t get_checked_fieldindex(const char *name, jl_datatype_t *st,
else {
jl_value_t *ts[2] = {(jl_value_t*)jl_long_type, (jl_value_t*)jl_symbol_type};
jl_value_t *t = jl_type_union(ts, 2);
jl_type_error("getfield", t, arg);
jl_type_error(name, t, arg);
}
if (mutabl && jl_field_isconst(st, idx)) {
jl_errorf("%s: const field .%s of type %s cannot be changed", name,
Expand Down
2 changes: 2 additions & 0 deletions src/gc-stock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,8 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq)
gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_list, "global_roots_list");
gc_try_claim_and_push(mq, jl_global_roots_keyset, NULL);
gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_keyset, "global_roots_keyset");
gc_try_claim_and_push(mq, precompile_field_replace, NULL);
gc_heap_snapshot_record_gc_roots((jl_value_t*)precompile_field_replace, "precompile_field_replace");
}

// find unmarked objects that need to be finalized from the finalizer list "list".
Expand Down
2 changes: 2 additions & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,8 @@ extern jl_genericmemory_t *jl_global_roots_keyset JL_GLOBALLY_ROOTED;
extern arraylist_t *jl_entrypoint_mis;
JL_DLLEXPORT int jl_is_globally_rooted(jl_value_t *val JL_MAYBE_UNROOTED) JL_NOTSAFEPOINT;
JL_DLLEXPORT jl_value_t *jl_as_global_root(jl_value_t *val, int insert) JL_GLOBALLY_ROOTED;
extern jl_svec_t *precompile_field_replace JL_GLOBALLY_ROOTED;
JL_DLLEXPORT void jl_set_precompile_field_replace(jl_value_t *val, jl_value_t *field, jl_value_t *newval) JL_GLOBALLY_ROOTED;

jl_opaque_closure_t *jl_new_opaque_closure(jl_tupletype_t *argt, jl_value_t *rt_lb, jl_value_t *rt_ub,
jl_value_t *source, jl_value_t **env, size_t nenv, int do_compile);
Expand Down
Loading

0 comments on commit d36417b

Please sign in to comment.