From 8a18f27e24b3fe83815f81fdbe97d6e77f1802df Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 26 Aug 2024 12:59:45 -0400 Subject: [PATCH 1/4] add objects to concisely specify initialization PerProcess: once per process PerThread: once per thread id PerTask: once per task object --- NEWS.md | 6 + base/docs/basedocs.jl | 2 + base/exports.jl | 3 + base/lock.jl | 252 +++++++++++++++++++++++++++++++++++++++++- doc/src/base/base.md | 3 + test/precompile.jl | 21 ++++ test/threads.jl | 66 +++++++++++ 7 files changed, 352 insertions(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 9aebf5d42d954..724d0793e67cd 100644 --- a/NEWS.md +++ b/NEWS.md @@ -68,6 +68,12 @@ variables. ([#53742]). Multi-threading changes ----------------------- +* New types are defined to handle the pattern of code that must run once per process, called + a `PerProcess{T}` type, which allows defining a function that should be run exactly once + the first time it is called, and then always return the same result value of type `T` + every subsequent time afterwards. There are also `PerThread{T}` and `PerTask{T}` types for + similar usage with threads or tasks. ([#TBD]) + Build system changes -------------------- diff --git a/base/docs/basedocs.jl b/base/docs/basedocs.jl index a142ecffdb732..f93d9a5ba0647 100644 --- a/base/docs/basedocs.jl +++ b/base/docs/basedocs.jl @@ -153,6 +153,8 @@ runtime initialization functions of external C libraries and initializing global that involve pointers returned by external libraries. See the [manual section about modules](@ref modules) for more details. +See also: [`PerProcess`](@ref). + # Examples ```julia const foo_data_ptr = Ref{Ptr{Cvoid}}(0) diff --git a/base/exports.jl b/base/exports.jl index daba9a010a9e6..66de141c228b6 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -70,6 +70,9 @@ export OrdinalRange, Pair, PartialQuickSort, + PerProcess, + PerTask, + PerThread, PermutedDimsArray, QuickSort, Rational, diff --git a/base/lock.jl b/base/lock.jl index b473045e5809d..80ab2b3bb9b42 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -500,7 +500,7 @@ Create a level-triggered event source. Tasks that call [`wait`](@ref) on an After `notify` is called, the `Event` remains in a signaled state and tasks will no longer block when waiting for it, until `reset` is called. -If `autoreset` is true, at most one task will be released from `wait` for +If `autoreset` is true, at most one task will be released from `wait` for) each call to `notify`. This provides an acquire & release memory ordering on notify/wait. @@ -570,3 +570,253 @@ end import .Base: Event export Event end + + +""" + PerProcess{T} + +Calling a `PerProcess` object returns a value of type `T` by running the +function `initializer` exactly once per process. All concurrent and future +calls in the same process will return exactly the same value. This is useful in +code that will be precompiled, as it allows setting up caches or other state +which won't get serialized. + +## Example + +```jldoctest +julia> const global_state = Base.PerProcess{Vector{UInt32}}() do + println("Making lazy global value...done.") + return [Libc.rand()] + end; + +julia> procstate = global_state(); +Making lazy global value...done. + +julia> procstate === global_state() +true + +julia> procstate === fetch(@async global_state()) +true +``` +""" +mutable struct PerProcess{T, F} + x::Union{Nothing,T} + @atomic state::UInt8 # 0=initial, 1=hasrun, 2=error + @atomic allow_compile_time::Bool + const initializer::F + const lock::ReentrantLock + + PerProcess{T}(initializer::F) where {T, F} = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock()) + PerProcess{T,F}(initializer::F) where {T, F} = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock()) + PerProcess(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(nothing, 0x00, true, initializer, ReentrantLock()) +end +@inline function (once::PerProcess{T})() where T + state = (@atomic :acquire once.state) + if state != 0x01 + (@noinline function init_perprocesss(once, state) + state == 0x02 && error("PerProcess initializer failed previously") + Base.__precompile__(once.allow_compile_time) + lock(once.lock) + try + state = @atomic :monotonic once.state + if state == 0x00 + once.x = once.initializer() + elseif state == 0x02 + error("PerProcess initializer failed previously") + elseif state != 0x01 + error("invalid state for PerProcess") + end + catch + state == 0x02 || @atomic :release once.state = 0x02 + unlock(once.lock) + rethrow() + end + state == 0x01 || @atomic :release once.state = 0x01 + unlock(once.lock) + nothing + end)(once, state) + end + return once.x::T +end + +function copyto_monotonic!(dest::AtomicMemory, src) + i = 1 + for j in eachindex(src) + if isassigned(src, j) + @atomic :monotonic dest[i] = src[j] + end + i += 1 + end + dest +end + +function fill_monotonic!(dest::AtomicMemory, x) + for i = 1:length(dest) + @atomic :monotonic dest[i] = x + end + dest +end + + +# share a lock, since we just need it briefly, so some contention is okay +const PerThreadLock = ThreadSynchronizer() +""" + PerThread{T} + +Calling a `PerThread` object returns a value of type `T` by running the function +`initializer` exactly once per thread. All future calls in the same thread, and +concurrent or future calls with the same thread id, will return exactly the +same value. The object can also be indexed by the threadid for any existing +thread, to get (or initialize *on this thread*) the value stored for that +thread. Incorrect usage can lead to data-races or memory corruption so use only +if that behavior is correct within your library's threading-safety design. + +Warning: it is not necessarily true that a Task only runs on one thread, therefore the value +returned here may alias other values or change in the middle of your program. This type may +get deprecated in the future. If initializer yields, the thread running the current task +after the call might not be the same as the one at the start of the call. + +See also: [`PerTask`](@ref). + +## Example + +```jldoctest +julia> const thread_state = Base.PerThread{Vector{UInt32}}() do + println("Making lazy thread value...done.") + return [Libc.rand()] + end; + +julia> threadvec = thread_state(); +Making lazy thread value...done. + +julia> threadvec === fetch(@async thread_state()) +true + +julia> threadvec === thread_state[Threads.threadid()] +true +``` +""" +mutable struct PerThread{T, F} + @atomic xs::AtomicMemory{T} # values + @atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent + const initializer::F + + PerThread{T}(initializer::F) where {T, F} = new{T,F}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer) + PerThread{T,F}(initializer::F) where {T, F} = new{T,F}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer) + PerThread(initializer) = (T = Base.promote_op(initializer); new{T, typeof(initializer)}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer)) +end +@inline function getindex(once::PerThread, tid::Integer) + tid = Int(tid) + ss = @atomic :acquire once.ss + xs = @atomic :monotonic once.xs + # n.b. length(xs) >= length(ss) + if tid > length(ss) || (@atomic :acquire ss[tid]) != 0x01 + (@noinline function init_perthread(once, tid) + local xs = @atomic :acquire once.xs + local ss = @atomic :monotonic once.ss + local len = length(ss) + # slow path to allocate it + nt = Threads.maxthreadid() + 0 < tid <= nt || ArgumentError("thread id outside of allocated range") + if tid <= length(ss) && (@atomic :acquire ss[tid]) == 0x02 + error("PerThread initializer failed previously") + end + newxs = xs + newss = ss + if tid > len + # attempt to do all allocations outside of PerThreadLock for better scaling + @assert length(xs) == length(ss) "logical constraint violation" + newxs = typeof(xs)(undef, len + nt) + newss = typeof(ss)(undef, len + nt) + end + # uses state and locks to ensure this runs exactly once per tid argument + lock(PerThreadLock) + try + ss = @atomic :monotonic once.ss + xs = @atomic :monotonic once.xs + if tid > length(ss) + @assert length(ss) >= len && newxs !== xs && newss != ss "logical constraint violation" + fill_monotonic!(newss, 0x00) + xs = copyto_monotonic!(newxs, xs) + ss = copyto_monotonic!(newss, ss) + @atomic :release once.xs = xs + @atomic :release once.ss = ss + end + state = @atomic :monotonic ss[tid] + while state == 0x04 + # lost race, wait for notification this is done running elsewhere + wait(PerThreadLock) # wait for initializer to finish without releasing this thread + ss = @atomic :monotonic once.ss + state = @atomic :monotonic ss[tid] == 0x04 + end + if state == 0x00 + # won the race, drop lock in exchange for state, and run user initializer + @atomic :monotonic ss[tid] = 0x04 + result = try + unlock(PerThreadLock) + once.initializer() + catch + lock(PerThreadLock) + ss = @atomic :monotonic once.ss + @atomic :release ss[tid] = 0x02 + notify(PerThreadLock) + rethrow() + end + # store result and notify waiters + lock(PerThreadLock) + xs = @atomic :monotonic once.xs + @atomic :release xs[tid] = result + ss = @atomic :monotonic once.ss + @atomic :release ss[tid] = 0x01 + notify(PerThreadLock) + elseif state == 0x02 + error("PerThread initializer failed previously") + elseif state != 0x01 + error("invalid state for PerThread") + end + finally + unlock(PerThreadLock) + end + nothing + end)(once, tid) + xs = @atomic :monotonic once.xs + end + return xs[tid] +end +@inline (once::PerThread)() = once[Threads.threadid()] + +""" + PerTask{T} + +Calling a `PerTask` object returns a value of type `T` by running the function `initializer` +exactly once per Task. All future calls in the same Task will return exactly the same value. + +See also: [`task_local_storage`](@ref). + +## Example + +```jldoctest +julia> const task_state = Base.PerTask{Vector{UInt32}}() do + println("Making lazy task value...done.") + return [Libc.rand()] + end; + +julia> taskvec = task_state(); +Making lazy task value...done. + +julia> taskvec === task_state() +true + +julia> taskvec === fetch(@async task_state()) +Making lazy task value...done. +false +``` +""" +mutable struct PerTask{T, F} + const initializer::F + + PerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer) + PerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer) + PerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer) +end +@inline (once::PerTask)() = get!(once.initializer, task_local_storage(), once) diff --git a/doc/src/base/base.md b/doc/src/base/base.md index b5d50a846ce89..b11e985782709 100644 --- a/doc/src/base/base.md +++ b/doc/src/base/base.md @@ -34,6 +34,9 @@ Main.include Base.include_string Base.include_dependency __init__ +Base.PerProcess +Base.PerTask +Base.PerThread Base.which(::Any, ::Any) Base.methods Base.@show diff --git a/test/precompile.jl b/test/precompile.jl index 7a6e41061f9b1..e44771fb6a86f 100644 --- a/test/precompile.jl +++ b/test/precompile.jl @@ -94,6 +94,17 @@ precompile_test_harness(false) do dir end abstract type AbstractAlgebraMap{A} end struct GAPGroupHomomorphism{A, B} <: AbstractAlgebraMap{GAPGroupHomomorphism{B, A}} end + + global process_state_calls::Int = 0 + const process_state = Base.PerProcess{typeof(getpid())}() do + @assert (global process_state_calls += 1) == 1 + return getpid() + end + const mypid = process_state() + @assert process_state_calls === 1 + process_state_calls = 0 + @assert process_state() === process_state() + @assert process_state_calls === 0 end """) write(Foo2_file, @@ -272,6 +283,9 @@ precompile_test_harness(false) do dir oid_vec_int = objectid(a_vec_int) oid_mat_int = objectid(a_mat_int) + + using $FooBase_module: process_state, mypid as FooBase_pid, process_state_calls + const mypid = process_state() end """) # Issue #52063 @@ -333,6 +347,13 @@ precompile_test_harness(false) do dir @test isready(Foo.ch2) @test take!(Foo.ch2) === 2 @test !isready(Foo.ch2) + + @test Foo.process_state_calls === 0 + @test Foo.process_state() === getpid() + @test Foo.mypid !== getpid() + @test Foo.FooBase_pid !== getpid() + @test Foo.mypid !== Foo.FooBase_pid + @test Foo.process_state_calls === 1 end let diff --git a/test/threads.jl b/test/threads.jl index 6265368c2ac79..f1a8aba418412 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -374,3 +374,69 @@ end end end end + +let once = PerProcess(() -> return [nothing]) + @test typeof(once) <: PerProcess{Vector{Nothing}} + x = once() + @test x === once() + @atomic once.state = 0xff + @test_throws ErrorException("invalid state for PerProcess") once() + @test_throws ErrorException("PerProcess initializer failed previously") once() + @atomic once.state = 0x01 + @test x === once() +end +let once = PerProcess{Int}(() -> error("expected")) + @test_throws ErrorException("expected") once() + @test_throws ErrorException("PerProcess initializer failed previously") once() +end + +let once = PerThread(() -> return [nothing]) + @test typeof(once) <: PerThread{Vector{Nothing}} + x = once() + @test x === once() === fetch(@async once()) + tids = zeros(UInt, 50) + onces = Vector{Vector{Nothing}}(undef, length(tids)) + for i = 1:length(tids) + function cl() + local y = once() + onces[i] = y + @test x !== y === once() + nothing + end + function threadcallclosure(cl::F) where {F} # create sparam so we can reference the type of cl in the ccall type + threadwork = @cfunction cl -> cl() Cvoid (Ref{F},) # create a cfunction that specializes on cl as an argument and calls it + err = @ccall uv_thread_create(Ref(tids, i)::Ptr{UInt}, threadwork::Ptr{Cvoid}, cl::Ref{F})::Cint # call that on a thread + err == 0 || Base.uv_error("uv_thread_create", err) + end + threadcallclosure(cl) + end + @noinline function waitallthreads(tids) + for i = 1:length(tids) + tid = Ref(tids, i) + tidp = Base.unsafe_convert(Ptr{UInt}, tid)::Ptr{UInt} + gc_state = @ccall jl_gc_safe_enter()::Int8 + GC.@preserve tid err = @ccall uv_thread_join(tidp::Ptr{UInt})::Cint + @ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid + err == 0 || Base.uv_error("uv_thread_join", err) + end + end + waitallthreads(tids) + @test length(IdSet{eltype(onces)}(onces)) == length(onces) # make sure every object is unique + +end +let once = PerThread{Int}(() -> error("expected")) + @test_throws ErrorException("expected") once() + @test_throws ErrorException("PerThread initializer failed previously") once() +end + +let once = PerTask(() -> return [nothing]) + @test typeof(once) <: PerTask{Vector{Nothing}} + x = once() + @test x === once() !== fetch(@async once()) + delete!(task_local_storage(), once) + @test x !== once() === once() +end +let once = PerTask{Int}(() -> error("expected")) + @test_throws ErrorException("expected") once() + @test_throws ErrorException("expected") once() +end From a66733f0e9ea547eb43a2c7637afb3429471e10b Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 26 Aug 2024 15:46:12 -0400 Subject: [PATCH 2/4] add precompile support for recording fields to change Somewhat generalizes our support for changing Ptr to C_NULL. Not particularly fast, since it is just using the builtins implementation of setfield, and delaying the actual stores, but it should suffice. --- base/lock.jl | 38 ++++++++++++--- base/task.jl | 7 --- src/builtins.c | 2 +- src/gc-stock.c | 2 + src/julia_internal.h | 2 + src/staticdata.c | 111 ++++++++++++++++++++++++++++++++++++++++++- test/threads.jl | 22 ++++++++- 7 files changed, 167 insertions(+), 17 deletions(-) diff --git a/base/lock.jl b/base/lock.jl index 80ab2b3bb9b42..b53607af05a3a 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -2,6 +2,13 @@ const ThreadSynchronizer = GenericCondition{Threads.SpinLock} +""" + current_task() + +Get the currently running [`Task`](@ref). +""" +current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) + # Advisory reentrant lock """ ReentrantLock() @@ -606,16 +613,23 @@ mutable struct PerProcess{T, F} const initializer::F const lock::ReentrantLock - PerProcess{T}(initializer::F) where {T, F} = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock()) - PerProcess{T,F}(initializer::F) where {T, F} = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock()) - PerProcess(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(nothing, 0x00, true, initializer, ReentrantLock()) + function PerProcess{T,F}(initializer::F) where {T, F} + once = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock()) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :x, nothing) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :state, 0x00) + return once + end end +PerProcess{T}(initializer::F) where {T, F} = PerProcess{T, F}(initializer) +PerProcess(initializer) = PerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer) @inline function (once::PerProcess{T})() where T state = (@atomic :acquire once.state) if state != 0x01 (@noinline function init_perprocesss(once, state) state == 0x02 && error("PerProcess initializer failed previously") - Base.__precompile__(once.allow_compile_time) + once.allow_compile_time || __precompile__(false) lock(once.lock) try state = @atomic :monotonic once.state @@ -644,6 +658,8 @@ function copyto_monotonic!(dest::AtomicMemory, src) for j in eachindex(src) if isassigned(src, j) @atomic :monotonic dest[i] = src[j] + #else + # _unsafeindex_atomic!(dest, i, src[j], :monotonic) end i += 1 end @@ -701,10 +717,18 @@ mutable struct PerThread{T, F} @atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent const initializer::F - PerThread{T}(initializer::F) where {T, F} = new{T,F}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer) - PerThread{T,F}(initializer::F) where {T, F} = new{T,F}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer) - PerThread(initializer) = (T = Base.promote_op(initializer); new{T, typeof(initializer)}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer)) + function PerThread{T,F}(initializer::F) where {T, F} + xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}() + once = new{T,F}(xs, ss, initializer) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :xs, xs) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :ss, ss) + return once + end end +PerThread{T}(initializer::F) where {T, F} = PerThread{T,F}(initializer) +PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initializer)}(initializer) @inline function getindex(once::PerThread, tid::Integer) tid = Int(tid) ss = @atomic :acquire once.ss diff --git a/base/task.jl b/base/task.jl index 6cb1ff785eeee..f3a134f374421 100644 --- a/base/task.jl +++ b/base/task.jl @@ -143,13 +143,6 @@ macro task(ex) :(Task($thunk)) end -""" - current_task() - -Get the currently running [`Task`](@ref). -""" -current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) - # task states const task_state_runnable = UInt8(0) diff --git a/src/builtins.c b/src/builtins.c index 96c4cec0f5087..b129cca0ee71d 100644 --- a/src/builtins.c +++ b/src/builtins.c @@ -1008,7 +1008,7 @@ static inline size_t get_checked_fieldindex(const char *name, jl_datatype_t *st, else { jl_value_t *ts[2] = {(jl_value_t*)jl_long_type, (jl_value_t*)jl_symbol_type}; jl_value_t *t = jl_type_union(ts, 2); - jl_type_error("getfield", t, arg); + jl_type_error(name, t, arg); } if (mutabl && jl_field_isconst(st, idx)) { jl_errorf("%s: const field .%s of type %s cannot be changed", name, diff --git a/src/gc-stock.c b/src/gc-stock.c index 6b97881909bbd..37c7b4df48218 100644 --- a/src/gc-stock.c +++ b/src/gc-stock.c @@ -2741,6 +2741,8 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq) gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_list, "global_roots_list"); gc_try_claim_and_push(mq, jl_global_roots_keyset, NULL); gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_keyset, "global_roots_keyset"); + gc_try_claim_and_push(mq, precompile_field_replace, NULL); + gc_heap_snapshot_record_gc_roots((jl_value_t*)precompile_field_replace, "precompile_field_replace"); } // find unmarked objects that need to be finalized from the finalizer list "list". diff --git a/src/julia_internal.h b/src/julia_internal.h index 20d90fede3d5e..3c93d9fd0963d 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -858,6 +858,8 @@ extern jl_genericmemory_t *jl_global_roots_keyset JL_GLOBALLY_ROOTED; extern arraylist_t *jl_entrypoint_mis; JL_DLLEXPORT int jl_is_globally_rooted(jl_value_t *val JL_MAYBE_UNROOTED) JL_NOTSAFEPOINT; JL_DLLEXPORT jl_value_t *jl_as_global_root(jl_value_t *val, int insert) JL_GLOBALLY_ROOTED; +extern jl_svec_t *precompile_field_replace JL_GLOBALLY_ROOTED; +JL_DLLEXPORT void jl_set_precompile_field_replace(jl_value_t *val, jl_value_t *field, jl_value_t *newval) JL_GLOBALLY_ROOTED; jl_opaque_closure_t *jl_new_opaque_closure(jl_tupletype_t *argt, jl_value_t *rt_lb, jl_value_t *rt_ub, jl_value_t *source, jl_value_t **env, size_t nenv, int do_compile); diff --git a/src/staticdata.c b/src/staticdata.c index 0a8cbe6db7c67..5188659d8618d 100644 --- a/src/staticdata.c +++ b/src/staticdata.c @@ -495,6 +495,7 @@ void *native_functions; // opaque jl_native_code_desc_t blob used for fetching // table of struct field addresses to rewrite during saving static htable_t field_replace; +static htable_t bits_replace; static htable_t relocatable_ext_cis; // array of definitions for the predefined function pointers @@ -1649,7 +1650,23 @@ static void jl_write_values(jl_serializer_state *s) JL_GC_DISABLED write_padding(f, offset - tot); tot = offset; size_t fsz = jl_field_size(t, i); - if (t->name->mutabl && jl_is_cpointer_type(jl_field_type_concrete(t, i)) && *(intptr_t*)slot != -1) { + jl_value_t *replace = (jl_value_t*)ptrhash_get(&bits_replace, (void*)slot); + if (replace != HT_NOTFOUND) { + assert(t->name->mutabl && !jl_field_isptr(t, i)); + jl_value_t *rty = jl_typeof(replace); + size_t sz = jl_datatype_size(rty); + ios_write(f, (const char*)replace, sz); + jl_value_t *ft = jl_field_type_concrete(t, i); + int isunion = jl_is_uniontype(ft); + unsigned nth = 0; + if (!jl_find_union_component(ft, rty, &nth)) + assert(0 && "invalid field assignment to isbits union"); + assert(sz <= fsz - isunion); + write_padding(f, fsz - sz - isunion); + if (isunion) + write_uint8(f, nth); + } + else if (t->name->mutabl && jl_is_cpointer_type(jl_field_type_concrete(t, i)) && *(intptr_t*)slot != -1) { // reset Ptr fields to C_NULL (but keep MAP_FAILED / INVALID_HANDLE) assert(!jl_field_isptr(t, i)); write_pointer(f); @@ -2643,6 +2660,65 @@ jl_mutex_t global_roots_lock; extern jl_mutex_t world_counter_lock; extern size_t jl_require_world; +jl_mutex_t precompile_field_replace_lock; +jl_svec_t *precompile_field_replace JL_GLOBALLY_ROOTED; + +static inline jl_value_t *get_checked_fieldindex(const char *name, jl_datatype_t *st, jl_value_t *v, jl_value_t *arg, int mutabl) +{ + if (mutabl) { + if (st == jl_module_type) + jl_error("cannot assign variables in other modules"); + if (!st->name->mutabl) + jl_errorf("%s: immutable struct of type %s cannot be changed", name, jl_symbol_name(st->name->name)); + } + size_t idx; + if (jl_is_long(arg)) { + idx = jl_unbox_long(arg) - 1; + if (idx >= jl_datatype_nfields(st)) + jl_bounds_error(v, arg); + } + else if (jl_is_symbol(arg)) { + idx = jl_field_index(st, (jl_sym_t*)arg, 1); + arg = jl_box_long(idx); + } + else { + jl_value_t *ts[2] = {(jl_value_t*)jl_long_type, (jl_value_t*)jl_symbol_type}; + jl_value_t *t = jl_type_union(ts, 2); + jl_type_error(name, t, arg); + } + if (mutabl && jl_field_isconst(st, idx)) { + jl_errorf("%s: const field .%s of type %s cannot be changed", name, + jl_symbol_name((jl_sym_t*)jl_svecref(jl_field_names(st), idx)), jl_symbol_name(st->name->name)); + } + return arg; +} + +JL_DLLEXPORT void jl_set_precompile_field_replace(jl_value_t *val, jl_value_t *field, jl_value_t *newval) +{ + if (!jl_generating_output()) + return; + jl_datatype_t *st = (jl_datatype_t*)jl_typeof(val); + jl_value_t *idx = get_checked_fieldindex("setfield!", st, val, field, 1); + JL_GC_PUSH1(&idx); + size_t idxval = jl_unbox_long(idx); + jl_value_t *ft = jl_field_type_concrete(st, idxval); + if (!jl_isa(newval, ft)) + jl_type_error("setfield!", ft, newval); + JL_LOCK(&precompile_field_replace_lock); + if (precompile_field_replace == NULL) { + precompile_field_replace = jl_alloc_svec(3); + jl_svecset(precompile_field_replace, 0, jl_alloc_vec_any(0)); + jl_svecset(precompile_field_replace, 1, jl_alloc_vec_any(0)); + jl_svecset(precompile_field_replace, 2, jl_alloc_vec_any(0)); + } + jl_array_ptr_1d_push((jl_array_t*)jl_svecref(precompile_field_replace, 0), val); + jl_array_ptr_1d_push((jl_array_t*)jl_svecref(precompile_field_replace, 1), idx); + jl_array_ptr_1d_push((jl_array_t*)jl_svecref(precompile_field_replace, 2), newval); + JL_GC_POP(); + JL_UNLOCK(&precompile_field_replace_lock); +} + + JL_DLLEXPORT int jl_is_globally_rooted(jl_value_t *val JL_MAYBE_UNROOTED) JL_NOTSAFEPOINT { if (jl_is_datatype(val)) { @@ -2762,6 +2838,7 @@ static void jl_save_system_image_to_stream(ios_t *f, jl_array_t *mod_array, jl_array_t *ext_targets, jl_array_t *edges) JL_GC_DISABLED { htable_new(&field_replace, 0); + htable_new(&bits_replace, 0); // strip metadata and IR when requested if (jl_options.strip_metadata || jl_options.strip_ir) jl_strip_all_codeinfos(); @@ -2773,6 +2850,37 @@ static void jl_save_system_image_to_stream(ios_t *f, jl_array_t *mod_array, arraylist_new(&gvars, 0); arraylist_t external_fns; arraylist_new(&external_fns, 0); + // prepare hash table with any fields the user wanted us to rewrite during serialization + if (precompile_field_replace) { + jl_array_t *vals = (jl_array_t*)jl_svecref(precompile_field_replace, 0); + jl_array_t *fields = (jl_array_t*)jl_svecref(precompile_field_replace, 1); + jl_array_t *newvals = (jl_array_t*)jl_svecref(precompile_field_replace, 2); + size_t i, l = jl_array_nrows(vals); + assert(jl_array_nrows(fields) == l && jl_array_nrows(newvals) == l); + for (i = 0; i < l; i++) { + jl_value_t *val = jl_array_ptr_ref(vals, i); + size_t field = jl_unbox_long(jl_array_ptr_ref(fields, i)); + jl_value_t *newval = jl_array_ptr_ref(newvals, i); + jl_datatype_t *st = (jl_datatype_t*)jl_typeof(val); + size_t offs = jl_field_offset(st, field); + char *fldaddr = (char*)val + offs; + if (jl_field_isptr(st, field)) { + record_field_change((jl_value_t**)fldaddr, newval); + } + else { + // replace the bits + ptrhash_put(&bits_replace, (void*)fldaddr, newval); + // and any pointers inside + jl_datatype_t *rty = (jl_datatype_t*)jl_typeof(newval); + const jl_datatype_layout_t *layout = rty->layout; + size_t j, np = layout->npointers; + for (j = 0; j < np; j++) { + uint32_t ptr = jl_ptr_offset(rty, j); + record_field_change((jl_value_t**)fldaddr + ptr, *(((jl_value_t**)newval) + ptr)); + } + } + } + } int en = jl_gc_enable(0); if (native_functions) { @@ -3113,6 +3221,7 @@ static void jl_save_system_image_to_stream(ios_t *f, jl_array_t *mod_array, arraylist_free(&gvars); arraylist_free(&external_fns); htable_free(&field_replace); + htable_free(&bits_replace); htable_free(&serialization_order); htable_free(&nullptrs); htable_free(&symbol_table); diff --git a/test/threads.jl b/test/threads.jl index f1a8aba418412..d8e9fd4ce2901 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -390,10 +390,19 @@ let once = PerProcess{Int}(() -> error("expected")) @test_throws ErrorException("PerProcess initializer failed previously") once() end -let once = PerThread(() -> return [nothing]) +let e = Base.Event(true), + started = Channel{Int16}(Inf), + once = PerThread() do + push!(started, threadid()) + wait(e) + return [nothing] + end @test typeof(once) <: PerThread{Vector{Nothing}} + notify(e) x = once() @test x === once() === fetch(@async once()) + @test take!(started) == threadid() + @test isempty(started) tids = zeros(UInt, 50) onces = Vector{Vector{Nothing}}(undef, length(tids)) for i = 1:length(tids) @@ -420,7 +429,18 @@ let once = PerThread(() -> return [nothing]) err == 0 || Base.uv_error("uv_thread_join", err) end end + # let them finish in 5 batches of 10 + for i = 1:length(tids) ÷ 10 + for i = 1:10 + @test take!(started) != threadid() + end + for i = 1:10 + notify(e) + end + end + @test isempty(started) waitallthreads(tids) + @test isempty(started) @test length(IdSet{eltype(onces)}(onces)) == length(onces) # make sure every object is unique end From dbbd4d96fd560b41db2324ee31fd37eb734fa39e Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Tue, 17 Sep 2024 10:03:19 -0400 Subject: [PATCH 3/4] improve OncePer implementation Address reviewer feedback, add more fixes and more tests, rename to add Once prefix. --- NEWS.md | 4 +- base/docs/basedocs.jl | 2 +- base/exports.jl | 6 +- base/lock.jl | 148 ++++++++++++++++++++++-------------------- doc/src/base/base.md | 6 +- test/precompile.jl | 2 +- test/threads.jl | 92 ++++++++++++++++++-------- 7 files changed, 153 insertions(+), 107 deletions(-) diff --git a/NEWS.md b/NEWS.md index 724d0793e67cd..e304c78f8ad66 100644 --- a/NEWS.md +++ b/NEWS.md @@ -69,9 +69,9 @@ Multi-threading changes ----------------------- * New types are defined to handle the pattern of code that must run once per process, called - a `PerProcess{T}` type, which allows defining a function that should be run exactly once + a `OncePerProcess{T}` type, which allows defining a function that should be run exactly once the first time it is called, and then always return the same result value of type `T` - every subsequent time afterwards. There are also `PerThread{T}` and `PerTask{T}` types for + every subsequent time afterwards. There are also `OncePerThread{T}` and `OncePerTask{T}` types for similar usage with threads or tasks. ([#TBD]) Build system changes diff --git a/base/docs/basedocs.jl b/base/docs/basedocs.jl index f93d9a5ba0647..0d5d5ac00e8d0 100644 --- a/base/docs/basedocs.jl +++ b/base/docs/basedocs.jl @@ -153,7 +153,7 @@ runtime initialization functions of external C libraries and initializing global that involve pointers returned by external libraries. See the [manual section about modules](@ref modules) for more details. -See also: [`PerProcess`](@ref). +See also: [`OncePerProcess`](@ref). # Examples ```julia diff --git a/base/exports.jl b/base/exports.jl index 66de141c228b6..56cd58ce269e7 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -70,9 +70,9 @@ export OrdinalRange, Pair, PartialQuickSort, - PerProcess, - PerTask, - PerThread, + OncePerProcess, + OncePerTask, + OncePerThread, PermutedDimsArray, QuickSort, Rational, diff --git a/base/lock.jl b/base/lock.jl index b53607af05a3a..a44cd4c0d63cf 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -507,7 +507,7 @@ Create a level-triggered event source. Tasks that call [`wait`](@ref) on an After `notify` is called, the `Event` remains in a signaled state and tasks will no longer block when waiting for it, until `reset` is called. -If `autoreset` is true, at most one task will be released from `wait` for) +If `autoreset` is true, at most one task will be released from `wait` for each call to `notify`. This provides an acquire & release memory ordering on notify/wait. @@ -578,11 +578,15 @@ end export Event end +const PerStateInitial = 0x00 +const PerStateHasrun = 0x01 +const PerStateErrored = 0x02 +const PerStateConcurrent = 0x03 """ - PerProcess{T} + OncePerProcess{T}(init::Function)() -> T -Calling a `PerProcess` object returns a value of type `T` by running the +Calling a `OncePerProcess` object returns a value of type `T` by running the function `initializer` exactly once per process. All concurrent and future calls in the same process will return exactly the same value. This is useful in code that will be precompiled, as it allows setting up caches or other state @@ -591,13 +595,14 @@ which won't get serialized. ## Example ```jldoctest -julia> const global_state = Base.PerProcess{Vector{UInt32}}() do +julia> const global_state = Base.OncePerProcess{Vector{UInt32}}() do println("Making lazy global value...done.") return [Libc.rand()] end; -julia> procstate = global_state(); +julia> (procstate = global_state()) |> typeof Making lazy global value...done. +Vector{UInt32} (alias for Array{UInt32, 1}) julia> procstate === global_state() true @@ -606,51 +611,51 @@ julia> procstate === fetch(@async global_state()) true ``` """ -mutable struct PerProcess{T, F} - x::Union{Nothing,T} +mutable struct OncePerProcess{T, F} + value::Union{Nothing,T} @atomic state::UInt8 # 0=initial, 1=hasrun, 2=error @atomic allow_compile_time::Bool const initializer::F const lock::ReentrantLock - function PerProcess{T,F}(initializer::F) where {T, F} - once = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock()) + function OncePerProcess{T,F}(initializer::F) where {T, F} + once = new{T,F}(nothing, PerStateInitial, true, initializer, ReentrantLock()) ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), - once, :x, nothing) + once, :value, nothing) ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), - once, :state, 0x00) + once, :state, PerStateInitial) return once end end -PerProcess{T}(initializer::F) where {T, F} = PerProcess{T, F}(initializer) -PerProcess(initializer) = PerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer) -@inline function (once::PerProcess{T})() where T +OncePerProcess{T}(initializer::F) where {T, F} = OncePerProcess{T, F}(initializer) +OncePerProcess(initializer) = OncePerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer) +@inline function (once::OncePerProcess{T})() where T state = (@atomic :acquire once.state) - if state != 0x01 + if state != PerStateHasrun (@noinline function init_perprocesss(once, state) - state == 0x02 && error("PerProcess initializer failed previously") + state == PerStateErrored && error("OncePerProcess initializer failed previously") once.allow_compile_time || __precompile__(false) lock(once.lock) try state = @atomic :monotonic once.state - if state == 0x00 - once.x = once.initializer() - elseif state == 0x02 - error("PerProcess initializer failed previously") - elseif state != 0x01 - error("invalid state for PerProcess") + if state == PerStateInitial + once.value = once.initializer() + elseif state == PerStateErrored + error("OncePerProcess initializer failed previously") + elseif state != PerStateHasrun + error("invalid state for OncePerProcess") end catch - state == 0x02 || @atomic :release once.state = 0x02 + state == PerStateErrored || @atomic :release once.state = PerStateErrored unlock(once.lock) rethrow() end - state == 0x01 || @atomic :release once.state = 0x01 + state == PerStateHasrun || @atomic :release once.state = PerStateHasrun unlock(once.lock) nothing end)(once, state) end - return once.x::T + return once.value::T end function copyto_monotonic!(dest::AtomicMemory, src) @@ -659,7 +664,7 @@ function copyto_monotonic!(dest::AtomicMemory, src) if isassigned(src, j) @atomic :monotonic dest[i] = src[j] #else - # _unsafeindex_atomic!(dest, i, src[j], :monotonic) + # _unsetindex_atomic!(dest, i, src[j], :monotonic) end i += 1 end @@ -674,12 +679,12 @@ function fill_monotonic!(dest::AtomicMemory, x) end -# share a lock, since we just need it briefly, so some contention is okay +# share a lock/condition, since we just need it briefly, so some contention is okay const PerThreadLock = ThreadSynchronizer() """ - PerThread{T} + OncePerThread{T}(init::Function)() -> T -Calling a `PerThread` object returns a value of type `T` by running the function +Calling a `OncePerThread` object returns a value of type `T` by running the function `initializer` exactly once per thread. All future calls in the same thread, and concurrent or future calls with the same thread id, will return exactly the same value. The object can also be indexed by the threadid for any existing @@ -687,23 +692,25 @@ thread, to get (or initialize *on this thread*) the value stored for that thread. Incorrect usage can lead to data-races or memory corruption so use only if that behavior is correct within your library's threading-safety design. -Warning: it is not necessarily true that a Task only runs on one thread, therefore the value -returned here may alias other values or change in the middle of your program. This type may -get deprecated in the future. If initializer yields, the thread running the current task -after the call might not be the same as the one at the start of the call. +!!! warning + It is not necessarily true that a Task only runs on one thread, therefore the value + returned here may alias other values or change in the middle of your program. This function + may get deprecated in the future. If initializer yields, the thread running the current + task after the call might not be the same as the one at the start of the call. -See also: [`PerTask`](@ref). +See also: [`OncePerTask`](@ref). ## Example ```jldoctest -julia> const thread_state = Base.PerThread{Vector{UInt32}}() do +julia> const thread_state = Base.OncePerThread{Vector{UInt32}}() do println("Making lazy thread value...done.") return [Libc.rand()] end; -julia> threadvec = thread_state(); +julia> (threadvec = thread_state()) |> typeof Making lazy thread value...done. +Vector{UInt32} (alias for Array{UInt32, 1}) julia> threadvec === fetch(@async thread_state()) true @@ -712,12 +719,12 @@ julia> threadvec === thread_state[Threads.threadid()] true ``` """ -mutable struct PerThread{T, F} +mutable struct OncePerThread{T, F} @atomic xs::AtomicMemory{T} # values @atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent const initializer::F - function PerThread{T,F}(initializer::F) where {T, F} + function OncePerThread{T,F}(initializer::F) where {T, F} xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}() once = new{T,F}(xs, ss, initializer) ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), @@ -727,29 +734,30 @@ mutable struct PerThread{T, F} return once end end -PerThread{T}(initializer::F) where {T, F} = PerThread{T,F}(initializer) -PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initializer)}(initializer) -@inline function getindex(once::PerThread, tid::Integer) +OncePerThread{T}(initializer::F) where {T, F} = OncePerThread{T,F}(initializer) +OncePerThread(initializer) = OncePerThread{Base.promote_op(initializer), typeof(initializer)}(initializer) +@inline (once::OncePerThread)() = once[Threads.threadid()] +@inline function getindex(once::OncePerThread, tid::Integer) tid = Int(tid) ss = @atomic :acquire once.ss xs = @atomic :monotonic once.xs # n.b. length(xs) >= length(ss) - if tid > length(ss) || (@atomic :acquire ss[tid]) != 0x01 + if tid <= 0 || tid > length(ss) || (@atomic :acquire ss[tid]) != PerStateHasrun (@noinline function init_perthread(once, tid) - local xs = @atomic :acquire once.xs - local ss = @atomic :monotonic once.ss + local ss = @atomic :acquire once.ss + local xs = @atomic :monotonic once.xs local len = length(ss) # slow path to allocate it nt = Threads.maxthreadid() - 0 < tid <= nt || ArgumentError("thread id outside of allocated range") - if tid <= length(ss) && (@atomic :acquire ss[tid]) == 0x02 - error("PerThread initializer failed previously") + 0 < tid <= nt || throw(ArgumentError("thread id outside of allocated range")) + if tid <= length(ss) && (@atomic :acquire ss[tid]) == PerStateErrored + error("OncePerThread initializer failed previously") end newxs = xs newss = ss if tid > len # attempt to do all allocations outside of PerThreadLock for better scaling - @assert length(xs) == length(ss) "logical constraint violation" + @assert length(xs) >= length(ss) "logical constraint violation" newxs = typeof(xs)(undef, len + nt) newss = typeof(ss)(undef, len + nt) end @@ -759,30 +767,30 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali ss = @atomic :monotonic once.ss xs = @atomic :monotonic once.xs if tid > length(ss) - @assert length(ss) >= len && newxs !== xs && newss != ss "logical constraint violation" - fill_monotonic!(newss, 0x00) + @assert len <= length(ss) <= length(newss) "logical constraint violation" + fill_monotonic!(newss, PerStateInitial) xs = copyto_monotonic!(newxs, xs) ss = copyto_monotonic!(newss, ss) @atomic :release once.xs = xs @atomic :release once.ss = ss end state = @atomic :monotonic ss[tid] - while state == 0x04 + while state == PerStateConcurrent # lost race, wait for notification this is done running elsewhere wait(PerThreadLock) # wait for initializer to finish without releasing this thread ss = @atomic :monotonic once.ss - state = @atomic :monotonic ss[tid] == 0x04 + state = @atomic :monotonic ss[tid] end - if state == 0x00 + if state == PerStateInitial # won the race, drop lock in exchange for state, and run user initializer - @atomic :monotonic ss[tid] = 0x04 + @atomic :monotonic ss[tid] = PerStateConcurrent result = try unlock(PerThreadLock) once.initializer() catch lock(PerThreadLock) ss = @atomic :monotonic once.ss - @atomic :release ss[tid] = 0x02 + @atomic :release ss[tid] = PerStateErrored notify(PerThreadLock) rethrow() end @@ -791,12 +799,12 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali xs = @atomic :monotonic once.xs @atomic :release xs[tid] = result ss = @atomic :monotonic once.ss - @atomic :release ss[tid] = 0x01 + @atomic :release ss[tid] = PerStateHasrun notify(PerThreadLock) - elseif state == 0x02 - error("PerThread initializer failed previously") - elseif state != 0x01 - error("invalid state for PerThread") + elseif state == PerStateErrored + error("OncePerThread initializer failed previously") + elseif state != PerStateHasrun + error("invalid state for OncePerThread") end finally unlock(PerThreadLock) @@ -807,12 +815,11 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali end return xs[tid] end -@inline (once::PerThread)() = once[Threads.threadid()] """ - PerTask{T} + OncePerTask{T}(init::Function)() -> T -Calling a `PerTask` object returns a value of type `T` by running the function `initializer` +Calling a `OncePerTask` object returns a value of type `T` by running the function `initializer` exactly once per Task. All future calls in the same Task will return exactly the same value. See also: [`task_local_storage`](@ref). @@ -820,13 +827,14 @@ See also: [`task_local_storage`](@ref). ## Example ```jldoctest -julia> const task_state = Base.PerTask{Vector{UInt32}}() do +julia> const task_state = Base.OncePerTask{Vector{UInt32}}() do println("Making lazy task value...done.") return [Libc.rand()] end; -julia> taskvec = task_state(); +julia> (taskvec = task_state()) |> typeof Making lazy task value...done. +Vector{UInt32} (alias for Array{UInt32, 1}) julia> taskvec === task_state() true @@ -836,11 +844,11 @@ Making lazy task value...done. false ``` """ -mutable struct PerTask{T, F} +mutable struct OncePerTask{T, F} const initializer::F - PerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer) - PerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer) - PerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer) + OncePerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer) + OncePerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer) + OncePerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer) end -@inline (once::PerTask)() = get!(once.initializer, task_local_storage(), once) +@inline (once::OncePerTask)() = get!(once.initializer, task_local_storage(), once) diff --git a/doc/src/base/base.md b/doc/src/base/base.md index b11e985782709..7181965d9aa81 100644 --- a/doc/src/base/base.md +++ b/doc/src/base/base.md @@ -34,9 +34,9 @@ Main.include Base.include_string Base.include_dependency __init__ -Base.PerProcess -Base.PerTask -Base.PerThread +Base.OncePerProcess +Base.OncePerTask +Base.OncePerThread Base.which(::Any, ::Any) Base.methods Base.@show diff --git a/test/precompile.jl b/test/precompile.jl index e44771fb6a86f..adf10363298ba 100644 --- a/test/precompile.jl +++ b/test/precompile.jl @@ -96,7 +96,7 @@ precompile_test_harness(false) do dir struct GAPGroupHomomorphism{A, B} <: AbstractAlgebraMap{GAPGroupHomomorphism{B, A}} end global process_state_calls::Int = 0 - const process_state = Base.PerProcess{typeof(getpid())}() do + const process_state = Base.OncePerProcess{typeof(getpid())}() do @assert (global process_state_calls += 1) == 1 return getpid() end diff --git a/test/threads.jl b/test/threads.jl index d8e9fd4ce2901..2dbdcf99eae28 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -375,41 +375,63 @@ end end end -let once = PerProcess(() -> return [nothing]) - @test typeof(once) <: PerProcess{Vector{Nothing}} +let once = OncePerProcess(() -> return [nothing]) + @test typeof(once) <: OncePerProcess{Vector{Nothing}} x = once() @test x === once() @atomic once.state = 0xff - @test_throws ErrorException("invalid state for PerProcess") once() - @test_throws ErrorException("PerProcess initializer failed previously") once() + @test_throws ErrorException("invalid state for OncePerProcess") once() + @test_throws ErrorException("OncePerProcess initializer failed previously") once() @atomic once.state = 0x01 @test x === once() end -let once = PerProcess{Int}(() -> error("expected")) +let once = OncePerProcess{Int}(() -> error("expected")) @test_throws ErrorException("expected") once() - @test_throws ErrorException("PerProcess initializer failed previously") once() + @test_throws ErrorException("OncePerProcess initializer failed previously") once() end let e = Base.Event(true), started = Channel{Int16}(Inf), - once = PerThread() do + finish = Channel{Nothing}(Inf), + exiting = Channel{Nothing}(Inf), + starttest2 = Event(), + once = OncePerThread() do push!(started, threadid()) - wait(e) + take!(finish) + return [nothing] + end + alls = OncePerThread() do return [nothing] end - @test typeof(once) <: PerThread{Vector{Nothing}} - notify(e) + @test typeof(once) <: OncePerThread{Vector{Nothing}} + push!(finish, nothing) + @test_throws ArgumentError once[0] x = once() - @test x === once() === fetch(@async once()) + @test_throws ArgumentError once[0] + @test x === once() === fetch(@async once()) === once[threadid()] @test take!(started) == threadid() @test isempty(started) tids = zeros(UInt, 50) + newthreads = zeros(Int16, length(tids)) onces = Vector{Vector{Nothing}}(undef, length(tids)) + allonces = Vector{Vector{Vector{Nothing}}}(undef, length(tids)) for i = 1:length(tids) function cl() - local y = once() - onces[i] = y - @test x !== y === once() + GC.gc(false) # stress test the GC-safepoint mechanics of jl_adopt_thread + try + newthreads[i] = threadid() + local y = once() + onces[i] = y + @test x !== y === once() === once[threadid()] + wait(starttest2) + allonces[i] = Vector{Nothing}[alls[tid] for tid in newthreads] + catch ex + close(started, ErrorException("failed")) + close(finish, ErrorException("failed")) + @lock stderr Base.display_error(current_exceptions()) + end + push!(exiting, nothing) + GC.gc(false) # stress test the GC-safepoint mechanics of jl_delete_thread nothing end function threadcallclosure(cl::F) where {F} # create sparam so we can reference the type of cl in the ccall type @@ -429,34 +451,50 @@ let e = Base.Event(true), err == 0 || Base.uv_error("uv_thread_join", err) end end - # let them finish in 5 batches of 10 - for i = 1:length(tids) ÷ 10 - for i = 1:10 - @test take!(started) != threadid() + try + # let them finish in batches of 10 + for i = 1:length(tids) ÷ 10 + for i = 1:10 + newid = take!(started) + @test newid != threadid() + end + for i = 1:10 + push!(finish, nothing) + end end - for i = 1:10 - notify(e) + @test isempty(started) + # now run the second part of the test where they all try to access the other threads elements + notify(starttest2) + finally + for _ = 1:length(tids) + # run IO loop until all threads are close to exiting + take!(exiting) end + waitallthreads(tids) end @test isempty(started) - waitallthreads(tids) - @test isempty(started) + @test isempty(finish) @test length(IdSet{eltype(onces)}(onces)) == length(onces) # make sure every object is unique + allexpected = Vector{Nothing}[alls[tid] for tid in newthreads] + @test length(IdSet{eltype(allexpected)}(allexpected)) == length(allexpected) # make sure every object is unique + @test all(i -> allonces[i] !== allexpected && all(j -> allonces[i][j] === allexpected[j], eachindex(allexpected)), eachindex(allonces)) # make sure every thread saw the same elements + @test_throws ArgumentError once[Threads.maxthreadid() + 1] + @test_throws ArgumentError once[-1] end -let once = PerThread{Int}(() -> error("expected")) +let once = OncePerThread{Int}(() -> error("expected")) @test_throws ErrorException("expected") once() - @test_throws ErrorException("PerThread initializer failed previously") once() + @test_throws ErrorException("OncePerThread initializer failed previously") once() end -let once = PerTask(() -> return [nothing]) - @test typeof(once) <: PerTask{Vector{Nothing}} +let once = OncePerTask(() -> return [nothing]) + @test typeof(once) <: OncePerTask{Vector{Nothing}} x = once() @test x === once() !== fetch(@async once()) delete!(task_local_storage(), once) @test x !== once() === once() end -let once = PerTask{Int}(() -> error("expected")) +let once = OncePerTask{Int}(() -> error("expected")) @test_throws ErrorException("expected") once() @test_throws ErrorException("expected") once() end From 9d56856c74cd83bbf3258d14a096de208d89ee5e Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Tue, 15 Oct 2024 17:35:17 +0000 Subject: [PATCH 4/4] fix use-after-free in test (detected in win32 CI) --- test/threads.jl | 75 +++++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/test/threads.jl b/test/threads.jl index 2dbdcf99eae28..d5a801c1a6a1c 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -415,8 +415,8 @@ let e = Base.Event(true), newthreads = zeros(Int16, length(tids)) onces = Vector{Vector{Nothing}}(undef, length(tids)) allonces = Vector{Vector{Vector{Nothing}}}(undef, length(tids)) - for i = 1:length(tids) - function cl() + # allocate closure memory to last until all threads are started + cls = [function cl() GC.gc(false) # stress test the GC-safepoint mechanics of jl_adopt_thread try newthreads[i] = threadid() @@ -434,43 +434,50 @@ let e = Base.Event(true), GC.gc(false) # stress test the GC-safepoint mechanics of jl_delete_thread nothing end - function threadcallclosure(cl::F) where {F} # create sparam so we can reference the type of cl in the ccall type - threadwork = @cfunction cl -> cl() Cvoid (Ref{F},) # create a cfunction that specializes on cl as an argument and calls it - err = @ccall uv_thread_create(Ref(tids, i)::Ptr{UInt}, threadwork::Ptr{Cvoid}, cl::Ref{F})::Cint # call that on a thread - err == 0 || Base.uv_error("uv_thread_create", err) - end - threadcallclosure(cl) - end - @noinline function waitallthreads(tids) + for i = 1:length(tids)] + GC.@preserve cls begin # this memory must survive until each corresponding thread exits (waitallthreads / uv_thread_join) + Base.preserve_handle(cls) for i = 1:length(tids) - tid = Ref(tids, i) - tidp = Base.unsafe_convert(Ptr{UInt}, tid)::Ptr{UInt} - gc_state = @ccall jl_gc_safe_enter()::Int8 - GC.@preserve tid err = @ccall uv_thread_join(tidp::Ptr{UInt})::Cint - @ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid - err == 0 || Base.uv_error("uv_thread_join", err) - end - end - try - # let them finish in batches of 10 - for i = 1:length(tids) ÷ 10 - for i = 1:10 - newid = take!(started) - @test newid != threadid() + function threadcallclosure(tid::Ref{UInt}, cl::Ref{F}) where {F} # create sparam so we can reference the type of cl in the ccall type + threadwork = @cfunction cl -> cl() Cvoid (Ref{F},) # create a cfunction that specializes on cl as an argument and calls it + err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, cl::Ref{F})::Cint # call that on a thread + err == 0 || Base.uv_error("uv_thread_create", err) + nothing end - for i = 1:10 - push!(finish, nothing) + threadcallclosure(Ref(tids, i), Ref(cls, i)) + end + @noinline function waitallthreads(tids, cls) + for i = 1:length(tids) + tid = Ref(tids, i) + tidp = Base.unsafe_convert(Ptr{UInt}, tid)::Ptr{UInt} + gc_state = @ccall jl_gc_safe_enter()::Int8 + GC.@preserve tid err = @ccall uv_thread_join(tidp::Ptr{UInt})::Cint + @ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid + err == 0 || Base.uv_error("uv_thread_join", err) end + Base.unpreserve_handle(cls) end - @test isempty(started) - # now run the second part of the test where they all try to access the other threads elements - notify(starttest2) - finally - for _ = 1:length(tids) - # run IO loop until all threads are close to exiting - take!(exiting) + try + # let them finish in batches of 10 + for i = 1:length(tids) ÷ 10 + for i = 1:10 + newid = take!(started) + @test newid != threadid() + end + for i = 1:10 + push!(finish, nothing) + end + end + @test isempty(started) + # now run the second part of the test where they all try to access the other threads elements + notify(starttest2) + finally + for _ = 1:length(tids) + # run IO loop until all threads are close to exiting + take!(exiting) + end + waitallthreads(tids, cls) end - waitallthreads(tids) end @test isempty(started) @test isempty(finish)