From a292217cddfbb650f4e039eaf1a09f5d629d51bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 19 Jan 2026 15:43:41 +0000 Subject: [PATCH 1/9] Get process output from `Malt`, instead of capturing it ourselves This enables us to get the output generated by crashes jobs. --- Project.toml | 2 +- src/ParallelTestRunner.jl | 65 ++++++++++++++++++++++----------------- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/Project.toml b/Project.toml index e0c0753..373fb2d 100644 --- a/Project.toml +++ b/Project.toml @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Dates = "1" IOCapture = "0.2.5, 1" -Malt = "1.3.0" +Malt = "1.4.0" Printf = "1" Random = "1" Scratch = "1.3.0" diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 6d0c82e..d3ba025 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -57,7 +57,6 @@ abstract type AbstractTestRecord end struct TestRecord <: AbstractTestRecord value::DefaultTestSet - output::String # captured stdout/stderr # stats time::Float64 @@ -201,6 +200,27 @@ function print_test_crashed(wrkr, test, ctx::TestIOContext) end end +# Adapted from `Malt._stdio_loop` +function stdio_loop(worker::Malt.Worker) + @async while isopen(worker.stdout_pipe) && Malt.isrunning(worker) + try + bytes = readline(worker.stdout_pipe) + write(worker.collected_stdout, bytes, '\n') + flush(worker.collected_stdout) + catch + break + end + end + @async while isopen(worker.stderr_pipe) && Malt.isrunning(worker) + try + bytes = readline(worker.stderr_pipe) + write(worker.collected_stderr, bytes, '\n') + flush(worker.collected_stderr) + catch + break + end + end +end # # entry point @@ -252,28 +272,15 @@ function runtest(f, name, init_code, color) GC.gc(true) Random.seed!(1) - pipe = Pipe() - pipe_initialized = Channel{Nothing}(1) - reader = @async begin - take!(pipe_initialized) - read(pipe, String) - end - io = IOContext(pipe, :color=>$(color)) - stats = redirect_stdio(; stdout=io, stderr=io) do - put!(pipe_initialized, nothing) - - # @testset CustomTestRecord switches the all lower-level testset to our custom testset, - # so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`. - # This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`. - @timed @testset WorkerTestSet "placeholder" begin - @testset DefaultTestSet $name begin - $f - end + # @testset CustomTestRecord switches the all lower-level testset to our custom testset, + # so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`. + # This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`. + stats = @timed @testset WorkerTestSet "placeholder" begin + @testset DefaultTestSet $name begin + $f end end - close(pipe.in) - output = fetch(reader) - (; testset=stats.value, output, stats.time, stats.bytes, stats.gctime) + (; testset=stats.value, stats.time, stats.bytes, stats.gctime) end # process results @@ -451,7 +458,8 @@ function addworker(; # Malt already sets OPENBLAS_NUM_THREADS to 1 push!(env, "OPENBLAS_NUM_THREADS" => "1") - wrkr = Malt.Worker(; exename, exeflags, env) + io = IOBuffer() + wrkr = Malt.Worker(; exename, exeflags, env, stdio_loop, stdout=io, stderr=io) WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 return wrkr end @@ -840,7 +848,7 @@ function runtests(mod::Module, args::ParsedArgs; line3 = "Progress: $completed/$total tests completed" if completed > 0 # estimate per-test time (slightly pessimistic) - durations_done = [end_time - start_time for (_, _, start_time, end_time) in results] + durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results] μ = mean(durations_done) σ = length(durations_done) > 1 ? std(durations_done) : 0.0 est_per_test = μ + 0.5σ @@ -989,7 +997,8 @@ function runtests(mod::Module, args::ParsedArgs; ex end test_t1 = time() - push!(results, (test, result, test_t0, test_t1)) + output = String(take!(wrkr.collected_stdout)) + push!(results, (test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord @@ -1070,10 +1079,10 @@ function runtests(mod::Module, args::ParsedArgs; @async rmprocs(; waitfor=0) # print the output generated by each testset - for (testname, result, start, stop) in results - if isa(result, AbstractTestRecord) && !isempty(result.output) + for (testname, result, output, start, stop) in results + if !isempty(output) println(io_ctx.stdout, "\nOutput generated during execution of '$testname':") - lines = collect(eachline(IOBuffer(result.output))) + lines = collect(eachline(IOBuffer(output))) for (i,line) in enumerate(lines) prefix = if length(lines) == 1 @@ -1122,7 +1131,7 @@ function runtests(mod::Module, args::ParsedArgs; function collect_results() with_testset(o_ts) do completed_tests = Set{String}() - for (testname, result, start, stop) in results + for (testname, result, output, start, stop) in results push!(completed_tests, testname) if result isa AbstractTestRecord From eaa38d846bc7cfcec78f54b2084b9440d18c5a59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 19 Jan 2026 15:57:18 +0000 Subject: [PATCH 2/9] DROP ME: temporarily point to my fork of Malt --- Project.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Project.toml b/Project.toml index 373fb2d..f834dcb 100644 --- a/Project.toml +++ b/Project.toml @@ -25,3 +25,6 @@ Serialization = "1" Statistics = "1" Test = "1" julia = "1.10" + +[sources] +Malt = {url = "https://github.com/giordano/Malt.jl.git", rev = "mg/worker-stdout-stderr"} From e562950bb310d09c54af99c96f798db1cf597854 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 19 Jan 2026 16:28:49 +0000 Subject: [PATCH 3/9] Almost completely fix handling of colours --- src/ParallelTestRunner.jl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index d3ba025..0b81ca0 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -256,7 +256,7 @@ function Test.finish(ts::WorkerTestSet) return ts.wrapped_ts end -function runtest(f, name, init_code, color) +function runtest(f, name, init_code) function inner() # generate a temporary module to execute the tests in mod = @eval(Main, module $(gensym(name)) end) @@ -442,7 +442,9 @@ To add multiple workers, use [`addworkers`](@ref). """ function addworker(; env = Vector{Pair{String, String}}(), - exename = nothing, exeflags = nothing + exename = nothing, + exeflags = nothing, + color::Bool = false, ) exe = test_exe() if exename === nothing @@ -458,7 +460,7 @@ function addworker(; # Malt already sets OPENBLAS_NUM_THREADS to 1 push!(env, "OPENBLAS_NUM_THREADS" => "1") - io = IOBuffer() + io = IOContext(IOBuffer(), :color => color) wrkr = Malt.Worker(; exename, exeflags, env, stdio_loop, stdout=io, stderr=io) WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 return wrkr @@ -978,7 +980,7 @@ function runtests(mod::Module, args::ParsedArgs; wrkr = p end if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker() + wrkr = p = addworker(; io_ctx.color) end # run the test @@ -986,7 +988,7 @@ function runtests(mod::Module, args::ParsedArgs; result = try Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner)) Malt.remote_call_fetch(invokelatest, wrkr, runtest, - testsuite[test], test, init_code, io_ctx.color) + testsuite[test], test, init_code) catch ex if isa(ex, InterruptException) # the worker got interrupted, signal other tasks to stop @@ -997,7 +999,7 @@ function runtests(mod::Module, args::ParsedArgs; ex end test_t1 = time() - output = String(take!(wrkr.collected_stdout)) + output = String(take!(wrkr.collected_stdout.io)) push!(results, (test, result, output, test_t0, test_t1)) # act on the results From 6227e30c4f45116c5373313c3a29743ee0820cef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 19 Jan 2026 17:01:07 +0000 Subject: [PATCH 4/9] Pass `--color` option directly to workers to completely fix colour issues --- src/ParallelTestRunner.jl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 0b81ca0..70b1938 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -399,7 +399,7 @@ function save_test_history(mod::Module, history::Dict{String, Float64}) end end -function test_exe() +function test_exe(color::Bool=false) test_exeflags = Base.julia_cmd() filter!(test_exeflags.exec) do c !(startswith(c, "--depwarn") || startswith(c, "--check-bounds")) @@ -408,6 +408,7 @@ function test_exe() push!(test_exeflags.exec, "--startup-file=no") push!(test_exeflags.exec, "--depwarn=yes") push!(test_exeflags.exec, "--project=$(Base.active_project())") + push!(test_exeflags.exec, "--color=$(color ? "yes" : "no")") return test_exeflags end @@ -446,7 +447,7 @@ function addworker(; exeflags = nothing, color::Bool = false, ) - exe = test_exe() + exe = test_exe(color) if exename === nothing exename = exe[1] end @@ -460,7 +461,7 @@ function addworker(; # Malt already sets OPENBLAS_NUM_THREADS to 1 push!(env, "OPENBLAS_NUM_THREADS" => "1") - io = IOContext(IOBuffer(), :color => color) + io = IOBuffer() wrkr = Malt.Worker(; exename, exeflags, env, stdio_loop, stdout=io, stderr=io) WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 return wrkr @@ -999,7 +1000,7 @@ function runtests(mod::Module, args::ParsedArgs; ex end test_t1 = time() - output = String(take!(wrkr.collected_stdout.io)) + output = String(take!(wrkr.collected_stdout)) push!(results, (test, result, output, test_t0, test_t1)) # act on the results From e550df51bb16e65f9ecf36c453a5a35416be417e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 19 Jan 2026 17:14:55 +0000 Subject: [PATCH 5/9] Append newlines to flush streams in colorful output tests --- test/runtests.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/runtests.jl b/test/runtests.jl index 1188e00..18a6102 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -235,6 +235,7 @@ end testsuite = Dict( "color" => quote printstyled("Roses Are Red"; color=:red) + println() end ) io = IOBuffer() @@ -248,6 +249,7 @@ end "no color" => quote print("Violets are ") printstyled("blue"; color=:blue) + println() end ) io = IOBuffer() From 02219b71115395454ad430d2a08537740f38f492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Mon, 19 Jan 2026 17:38:00 +0000 Subject: [PATCH 6/9] Add tests about capturing output of crashed workers --- test/runtests.jl | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/runtests.jl b/test/runtests.jl index 18a6102..7ec30d7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -179,8 +179,10 @@ end end @testset "crashing test" begin + msg = "This test will crash" testsuite = Dict( "abort" => quote + println($(msg)) abort() = ccall(:abort, Nothing, ()) abort() end @@ -192,6 +194,13 @@ end end str = String(take!(io)) + # Make sure we can capture the output generated by the crashed process, see + # issue . + @test contains(str, msg) + # "in expression starting at" comes from the abort trap, make sure we + # captured that as well. + @test contains(str, "in expression starting at") + # Following are messages printed by ParallelTestRunner. @test contains(str, r"abort .+ started at") @test contains(str, r"abort .+ crashed at") @test contains(str, "FAILURE") From a33635eee3d1f6e55de44e25a3bbf4d8f38bef25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Tue, 20 Jan 2026 10:51:53 +0000 Subject: [PATCH 7/9] Use `readavailable` instead of `readline` --- src/ParallelTestRunner.jl | 8 ++++---- test/runtests.jl | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 70b1938..94bccd5 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -204,8 +204,8 @@ end function stdio_loop(worker::Malt.Worker) @async while isopen(worker.stdout_pipe) && Malt.isrunning(worker) try - bytes = readline(worker.stdout_pipe) - write(worker.collected_stdout, bytes, '\n') + bytes = readavailable(worker.stdout_pipe) + write(worker.collected_stdout, bytes) flush(worker.collected_stdout) catch break @@ -213,8 +213,8 @@ function stdio_loop(worker::Malt.Worker) end @async while isopen(worker.stderr_pipe) && Malt.isrunning(worker) try - bytes = readline(worker.stderr_pipe) - write(worker.collected_stderr, bytes, '\n') + bytes = readavailable(worker.stderr_pipe) + write(worker.collected_stderr, bytes) flush(worker.collected_stderr) catch break diff --git a/test/runtests.jl b/test/runtests.jl index 7ec30d7..0914690 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -244,7 +244,6 @@ end testsuite = Dict( "color" => quote printstyled("Roses Are Red"; color=:red) - println() end ) io = IOBuffer() @@ -258,7 +257,6 @@ end "no color" => quote print("Violets are ") printstyled("blue"; color=:blue) - println() end ) io = IOBuffer() From dbdf9d696e59695fef983286610500014ed7694e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Tue, 20 Jan 2026 11:29:29 +0000 Subject: [PATCH 8/9] Add more tests about output capturing --- test/runtests.jl | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 0914690..ca884ec 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -209,9 +209,10 @@ end end @testset "test output" begin + msg = "This is some output from the test" testsuite = Dict( "output" => quote - println("This is some output from the test") + println($(msg)) end ) @@ -220,7 +221,33 @@ end str = String(take!(io)) @test contains(str, r"output .+ started at") - @test contains(str, r"This is some output from the test") + @test contains(str, msg) + @test contains(str, "SUCCESS") + + msg2 = "More output" + testsuite = Dict( + "verbose-1" => quote + print($(msg)) + end, + "verbose-2" => quote + println($(msg2)) + end, + "silent" => quote + @test true + end, + ) + io = IOBuffer() + # Run all tests on the same worker, makre sure all the output is captured + # and attributed to the correct test set. + runtests(ParallelTestRunner, ["--verbose", "--jobs=1"]; testsuite, stdout=io, stderr=io) + + str = String(take!(io)) + @test contains(str, r"verbose-1 .+ started at") + @test contains(str, r"verbose-2 .+ started at") + @test contains(str, r"silent .+ started at") + @test contains(str, "Output generated during execution of 'verbose-1':\n[ $(msg)") + @test contains(str, "Output generated during execution of 'verbose-2':\n[ $(msg2)") + @test !contains(str, "Output generated during execution of 'silent':") @test contains(str, "SUCCESS") end From c295ac5cc67232e529863f215afb4bef3335cd5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Tue, 20 Jan 2026 14:40:38 +0000 Subject: [PATCH 9/9] Replace `isopen` -> `!eof` --- src/ParallelTestRunner.jl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 94bccd5..24a22bb 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -202,20 +202,18 @@ end # Adapted from `Malt._stdio_loop` function stdio_loop(worker::Malt.Worker) - @async while isopen(worker.stdout_pipe) && Malt.isrunning(worker) + @async while !eof(worker.stdout_pipe) && Malt.isrunning(worker) try bytes = readavailable(worker.stdout_pipe) write(worker.collected_stdout, bytes) - flush(worker.collected_stdout) catch break end end - @async while isopen(worker.stderr_pipe) && Malt.isrunning(worker) + @async while !eof(worker.stderr_pipe) && Malt.isrunning(worker) try bytes = readavailable(worker.stderr_pipe) write(worker.collected_stderr, bytes) - flush(worker.collected_stderr) catch break end