Skip to content
5 changes: 4 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
[compat]
Dates = "1"
IOCapture = "0.2.5, 1"
Malt = "1.3.0"
Malt = "1.4.0"
Printf = "1"
Random = "1"
Scratch = "1.3.0"
Serialization = "1"
Statistics = "1"
Test = "1"
julia = "1.10"

[sources]
Malt = {url = "https://github.com/giordano/Malt.jl.git", rev = "mg/worker-stdout-stderr"}
78 changes: 44 additions & 34 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ abstract type AbstractTestRecord end

struct TestRecord <: AbstractTestRecord
value::DefaultTestSet
output::String # captured stdout/stderr

# stats
time::Float64
Expand Down Expand Up @@ -201,6 +200,25 @@ function print_test_crashed(wrkr, test, ctx::TestIOContext)
end
end

# Adapted from `Malt._stdio_loop`
function stdio_loop(worker::Malt.Worker)
@async while !eof(worker.stdout_pipe) && Malt.isrunning(worker)
try
bytes = readavailable(worker.stdout_pipe)
write(worker.collected_stdout, bytes)
catch
break
end
end
@async while !eof(worker.stderr_pipe) && Malt.isrunning(worker)
try
bytes = readavailable(worker.stderr_pipe)
write(worker.collected_stderr, bytes)
catch
break
end
end
end

#
# entry point
Expand Down Expand Up @@ -236,7 +254,7 @@ function Test.finish(ts::WorkerTestSet)
return ts.wrapped_ts
end

function runtest(f, name, init_code, color)
function runtest(f, name, init_code)
function inner()
# generate a temporary module to execute the tests in
mod = @eval(Main, module $(gensym(name)) end)
Expand All @@ -252,28 +270,15 @@ function runtest(f, name, init_code, color)
GC.gc(true)
Random.seed!(1)

pipe = Pipe()
pipe_initialized = Channel{Nothing}(1)
reader = @async begin
take!(pipe_initialized)
read(pipe, String)
end
io = IOContext(pipe, :color=>$(color))
stats = redirect_stdio(; stdout=io, stderr=io) do
put!(pipe_initialized, nothing)

# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
@timed @testset WorkerTestSet "placeholder" begin
@testset DefaultTestSet $name begin
$f
end
# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
stats = @timed @testset WorkerTestSet "placeholder" begin
@testset DefaultTestSet $name begin
$f
end
end
close(pipe.in)
output = fetch(reader)
(; testset=stats.value, output, stats.time, stats.bytes, stats.gctime)
(; testset=stats.value, stats.time, stats.bytes, stats.gctime)
end

# process results
Expand Down Expand Up @@ -392,7 +397,7 @@ function save_test_history(mod::Module, history::Dict{String, Float64})
end
end

function test_exe()
function test_exe(color::Bool=false)
test_exeflags = Base.julia_cmd()
filter!(test_exeflags.exec) do c
!(startswith(c, "--depwarn") || startswith(c, "--check-bounds"))
Expand All @@ -401,6 +406,7 @@ function test_exe()
push!(test_exeflags.exec, "--startup-file=no")
push!(test_exeflags.exec, "--depwarn=yes")
push!(test_exeflags.exec, "--project=$(Base.active_project())")
push!(test_exeflags.exec, "--color=$(color ? "yes" : "no")")
return test_exeflags
end

Expand Down Expand Up @@ -435,9 +441,11 @@ To add multiple workers, use [`addworkers`](@ref).
"""
function addworker(;
env = Vector{Pair{String, String}}(),
exename = nothing, exeflags = nothing
exename = nothing,
exeflags = nothing,
color::Bool = false,
)
exe = test_exe()
exe = test_exe(color)
if exename === nothing
exename = exe[1]
end
Expand All @@ -451,7 +459,8 @@ function addworker(;
# Malt already sets OPENBLAS_NUM_THREADS to 1
push!(env, "OPENBLAS_NUM_THREADS" => "1")

wrkr = Malt.Worker(; exename, exeflags, env)
io = IOBuffer()
wrkr = Malt.Worker(; exename, exeflags, env, stdio_loop, stdout=io, stderr=io)
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
return wrkr
end
Expand Down Expand Up @@ -840,7 +849,7 @@ function runtests(mod::Module, args::ParsedArgs;
line3 = "Progress: $completed/$total tests completed"
if completed > 0
# estimate per-test time (slightly pessimistic)
durations_done = [end_time - start_time for (_, _, start_time, end_time) in results]
durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results]
μ = mean(durations_done)
σ = length(durations_done) > 1 ? std(durations_done) : 0.0
est_per_test = μ + 0.5σ
Expand Down Expand Up @@ -970,15 +979,15 @@ function runtests(mod::Module, args::ParsedArgs;
wrkr = p
end
if wrkr === nothing || !Malt.isrunning(wrkr)
wrkr = p = addworker()
wrkr = p = addworker(; io_ctx.color)
end

# run the test
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr, runtest,
testsuite[test], test, init_code, io_ctx.color)
testsuite[test], test, init_code)
catch ex
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
Expand All @@ -989,7 +998,8 @@ function runtests(mod::Module, args::ParsedArgs;
ex
end
test_t1 = time()
push!(results, (test, result, test_t0, test_t1))
output = String(take!(wrkr.collected_stdout))
push!(results, (test, result, output, test_t0, test_t1))

# act on the results
if result isa AbstractTestRecord
Expand Down Expand Up @@ -1070,10 +1080,10 @@ function runtests(mod::Module, args::ParsedArgs;
@async rmprocs(; waitfor=0)

# print the output generated by each testset
for (testname, result, start, stop) in results
if isa(result, AbstractTestRecord) && !isempty(result.output)
for (testname, result, output, start, stop) in results
if !isempty(output)
println(io_ctx.stdout, "\nOutput generated during execution of '$testname':")
lines = collect(eachline(IOBuffer(result.output)))
lines = collect(eachline(IOBuffer(output)))

for (i,line) in enumerate(lines)
prefix = if length(lines) == 1
Expand Down Expand Up @@ -1122,7 +1132,7 @@ function runtests(mod::Module, args::ParsedArgs;
function collect_results()
with_testset(o_ts) do
completed_tests = Set{String}()
for (testname, result, start, stop) in results
for (testname, result, output, start, stop) in results
push!(completed_tests, testname)

if result isa AbstractTestRecord
Expand Down
40 changes: 38 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ end
end

@testset "crashing test" begin
msg = "This test will crash"
testsuite = Dict(
"abort" => quote
println($(msg))
abort() = ccall(:abort, Nothing, ())
abort()
end
Expand All @@ -192,6 +194,13 @@ end
end

str = String(take!(io))
# Make sure we can capture the output generated by the crashed process, see
# issue <https://github.com/JuliaTesting/ParallelTestRunner.jl/issues/83>.
@test contains(str, msg)
# "in expression starting at" comes from the abort trap, make sure we
# captured that as well.
@test contains(str, "in expression starting at")
# Following are messages printed by ParallelTestRunner.
@test contains(str, r"abort .+ started at")
@test contains(str, r"abort .+ crashed at")
@test contains(str, "FAILURE")
Expand All @@ -200,9 +209,10 @@ end
end

@testset "test output" begin
msg = "This is some output from the test"
testsuite = Dict(
"output" => quote
println("This is some output from the test")
println($(msg))
end
)

Expand All @@ -211,7 +221,33 @@ end

str = String(take!(io))
@test contains(str, r"output .+ started at")
@test contains(str, r"This is some output from the test")
@test contains(str, msg)
@test contains(str, "SUCCESS")

msg2 = "More output"
testsuite = Dict(
"verbose-1" => quote
print($(msg))
end,
"verbose-2" => quote
println($(msg2))
end,
"silent" => quote
@test true
end,
)
io = IOBuffer()
# Run all tests on the same worker, makre sure all the output is captured
# and attributed to the correct test set.
runtests(ParallelTestRunner, ["--verbose", "--jobs=1"]; testsuite, stdout=io, stderr=io)

str = String(take!(io))
@test contains(str, r"verbose-1 .+ started at")
@test contains(str, r"verbose-2 .+ started at")
@test contains(str, r"silent .+ started at")
@test contains(str, "Output generated during execution of 'verbose-1':\n[ $(msg)")
@test contains(str, "Output generated during execution of 'verbose-2':\n[ $(msg2)")
@test !contains(str, "Output generated during execution of 'silent':")
@test contains(str, "SUCCESS")
end

Expand Down
Loading