Skip to content

Commit ff92b1c

Browse files
committed
Child_processes: ensure stderr/stdout get flushed
1 parent d851082 commit ff92b1c

File tree

2 files changed

+35
-28
lines changed

2 files changed

+35
-28
lines changed

src/lib/child_processes/child_processes.ml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,7 @@ let start_custom :
270270
Deferred.Or_error.try_with ~here:[%here] (fun () -> Process.wait process)
271271
in
272272
[%log trace] "child process %s died" name ;
273-
don't_wait_for
274-
(let%bind () = after (Time.Span.of_sec 1.) in
275-
let%bind () = Writer.close @@ Process.stdin process in
276-
let%bind () = Reader.close @@ Process.stdout process in
277-
Reader.close @@ Process.stderr process ) ;
273+
don't_wait_for (Writer.close @@ Process.stdin process) ;
278274
let%bind () = Sys.remove lock_path in
279275
Ivar.fill terminated_ivar termination_status ;
280276
let log_bad_termination () =

src/lib/mina_net2/libp2p_helper.ml

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ type t =
122122
{ process : Child_processes.t
123123
; logger : Logger.t
124124
; mutable finished : bool
125+
; stderr_finished : unit Ivar.t
125126
; outstanding_requests :
126127
Libp2p_ipc.rpc_response_body Or_error.t Ivar.t
127128
Libp2p_ipc.Sequence_number.Table.t
@@ -141,13 +142,15 @@ let handle_libp2p_helper_termination t ~pids ~killed result =
141142
~metadata:
142143
[ ("exit_status", `String (Unix.Exit_or_signal.to_string_hum e)) ] ;
143144
t.finished <- true ;
145+
let%map () = Ivar.read t.stderr_finished in
144146
raise Libp2p_helper_died_unexpectedly
145147
| Error err ->
146148
[%log' fatal t.logger]
147149
!"Child processes library could not track libp2p_helper process: $err"
148150
~metadata:[ ("err", Error_json.error_to_yojson err) ] ;
149151
t.finished <- true ;
150-
let%map () = Deferred.ignore_m (Child_processes.kill t.process) in
152+
let%bind () = Deferred.ignore_m (Child_processes.kill t.process) in
153+
let%map () = Ivar.read t.stderr_finished in
151154
raise Libp2p_helper_died_unexpectedly
152155
| Ok (Ok ()) ->
153156
[%log' error t.logger]
@@ -249,33 +252,39 @@ let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir
249252
{ process
250253
; logger
251254
; finished = false
255+
; stderr_finished = Ivar.create ()
252256
; outstanding_requests = Libp2p_ipc.Sequence_number.Table.create ()
253257
}
254258
in
255259
termination_handler := handle_libp2p_helper_termination t ~pids ;
256260
O1trace.background_thread "handle_libp2p_helper_subprocess_logs"
257261
(fun () ->
258-
Child_processes.stderr process
259-
|> Strict_pipe.Reader.iter ~f:(fun line ->
260-
Mina_metrics.(
261-
Counter.inc_one Mina_metrics.Network.ipc_logs_received_total) ;
262-
let record_result =
263-
try
264-
Some
265-
(Go_log.record_of_yojson @@ Yojson.Safe.from_string line)
266-
with Yojson.Json_error _error -> None
267-
in
268-
( match record_result with
269-
| Some (Ok record) ->
270-
record |> Go_log.record_to_message |> Logger.raw logger
271-
| Some (Error error) ->
272-
[%log error]
273-
"failed to parse record over libp2p_helper stderr: \
274-
$error"
275-
~metadata:[ ("error", `String error) ]
276-
| None ->
277-
Core.print_endline line ) ;
278-
Deferred.unit ) ) ;
262+
let%map () =
263+
Child_processes.stderr process
264+
|> Strict_pipe.Reader.iter ~f:(fun line ->
265+
Mina_metrics.(
266+
Counter.inc_one
267+
Mina_metrics.Network.ipc_logs_received_total) ;
268+
let record_result =
269+
try
270+
Some
271+
( Go_log.record_of_yojson
272+
@@ Yojson.Safe.from_string line )
273+
with Yojson.Json_error _error -> None
274+
in
275+
( match record_result with
276+
| Some (Ok record) ->
277+
record |> Go_log.record_to_message |> Logger.raw logger
278+
| Some (Error error) ->
279+
[%log error]
280+
"failed to parse record over libp2p_helper stderr: \
281+
$error"
282+
~metadata:[ ("error", `String error) ]
283+
| None ->
284+
Core.print_endline line ) ;
285+
Deferred.unit )
286+
in
287+
Ivar.fill t.stderr_finished () ) ;
279288
O1trace.background_thread "handle_libp2p_ipc_incoming" (fun () ->
280289
Child_processes.stdout process
281290
|> Libp2p_ipc.read_incoming_messages
@@ -284,7 +293,9 @@ let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir
284293
let msg =
285294
Libp2p_ipc.Reader.DaemonInterface.Message.get msg
286295
in
287-
handle_incoming_message t msg ~handle_push_message
296+
if not t.finished then
297+
handle_incoming_message t msg ~handle_push_message
298+
else Deferred.unit
288299
| Error error ->
289300
[%log error]
290301
"failed to parse IPC message over libp2p_helper stdout: \

0 commit comments

Comments
 (0)