From 766fce69e947de20e2ed99b7e298b15338df5534 Mon Sep 17 00:00:00 2001 From: Jeff King Date: Tue, 8 Oct 2024 04:33:47 -0400 Subject: [PATCH 1/2] simple-ipc: split async server initialization and running To start an async ipc server, you call ipc_server_run_async(). That initializes the ipc_server_data object, and starts all of the threads running, which may immediately start serving clients. This can create some awkward timing problems, though. In the fsmonitor daemon (the sole user of the simple-ipc system), we want to create the ipc server early in the process, which means we may start serving clients before the rest of the daemon is fully initialized. To solve this, let's break run_async() into two parts: an initialization which allocates all data and spawns the threads (without letting them run), and a start function which actually lets them begin work. Since we have two simple-ipc implementations, we have to handle this twice: - in ipc-unix-socket.c, we have a central listener thread which hands connections off to worker threads using a work_available mutex. We can hold that mutex after init, and release it when we're ready to start. We do need an extra "started" flag so that we know whether the main thread is holding the mutex or not (e.g., if we prematurely stop the server, we want to make sure all of the worker threads are released to hear about the shutdown). - in ipc-win32.c, we don't have a central mutex. So we'll introduce a new startup_barrier mutex, which we'll similarly hold until we're ready to let the threads proceed. We again need a "started" flag here to make sure that we release the barrier mutex when shutting down, so that the sub-threads can proceed to the finish. I've renamed the run_async() function to init_async() to make sure we catch all callers, since they'll now need to call the matching start_async(). We could leave run_async() as a wrapper that does both, but there's not much point. There are only two callers, one of which is fsmonitor, which will want to actually do work between the two calls. And the other is just a test-tool wrapper. For now I've added the start_async() calls in fsmonitor where they would otherwise have happened, so there should be no behavior change with this patch. Signed-off-by: Jeff King Acked-by: Koji Nakamaru Signed-off-by: Junio C Hamano --- builtin/fsmonitor--daemon.c | 8 +++-- compat/simple-ipc/ipc-shared.c | 5 +-- compat/simple-ipc/ipc-unix-socket.c | 28 ++++++++++++++--- compat/simple-ipc/ipc-win32.c | 48 ++++++++++++++++++++++++++--- simple-ipc.h | 17 +++++++--- 5 files changed, 88 insertions(+), 18 deletions(-) diff --git a/builtin/fsmonitor--daemon.c b/builtin/fsmonitor--daemon.c index 1593713f4cb29f..0c54cf13715765 100644 --- a/builtin/fsmonitor--daemon.c +++ b/builtin/fsmonitor--daemon.c @@ -1208,13 +1208,15 @@ static int fsmonitor_run_daemon_1(struct fsmonitor_daemon_state *state) * system event listener thread so that we have the IPC handle * before we need it. */ - if (ipc_server_run_async(&state->ipc_server_data, - state->path_ipc.buf, &ipc_opts, - handle_client, state)) + if (ipc_server_init_async(&state->ipc_server_data, + state->path_ipc.buf, &ipc_opts, + handle_client, state)) return error_errno( _("could not start IPC thread pool on '%s'"), state->path_ipc.buf); + ipc_server_start_async(&state->ipc_server_data); + /* * Start the fsmonitor listener thread to collect filesystem * events. diff --git a/compat/simple-ipc/ipc-shared.c b/compat/simple-ipc/ipc-shared.c index cb176d966f287d..d1c21b49bdba8d 100644 --- a/compat/simple-ipc/ipc-shared.c +++ b/compat/simple-ipc/ipc-shared.c @@ -16,11 +16,12 @@ int ipc_server_run(const char *path, const struct ipc_server_opts *opts, struct ipc_server_data *server_data = NULL; int ret; - ret = ipc_server_run_async(&server_data, path, opts, - application_cb, application_data); + ret = ipc_server_init_async(&server_data, path, opts, + application_cb, application_data); if (ret) return ret; + ipc_server_start_async(server_data); ret = ipc_server_await(server_data); ipc_server_free(server_data); diff --git a/compat/simple-ipc/ipc-unix-socket.c b/compat/simple-ipc/ipc-unix-socket.c index 9b3f2cdf8c9608..57d919c6b44932 100644 --- a/compat/simple-ipc/ipc-unix-socket.c +++ b/compat/simple-ipc/ipc-unix-socket.c @@ -328,6 +328,7 @@ struct ipc_server_data { int back_pos; int front_pos; + int started; int shutdown_requested; int is_stopped; }; @@ -824,10 +825,10 @@ static int setup_listener_socket( /* * Start IPC server in a pool of background threads. */ -int ipc_server_run_async(struct ipc_server_data **returned_server_data, - const char *path, const struct ipc_server_opts *opts, - ipc_server_application_cb *application_cb, - void *application_data) +int ipc_server_init_async(struct ipc_server_data **returned_server_data, + const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data) { struct unix_ss_socket *server_socket = NULL; struct ipc_server_data *server_data; @@ -888,6 +889,12 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data, server_data->accept_thread->fd_send_shutdown = sv[0]; server_data->accept_thread->fd_wait_shutdown = sv[1]; + /* + * Hold work-available mutex so that no work can start until + * we unlock it. + */ + pthread_mutex_lock(&server_data->work_available_mutex); + if (pthread_create(&server_data->accept_thread->pthread_id, NULL, accept_thread_proc, server_data->accept_thread)) die_errno(_("could not start accept_thread '%s'"), path); @@ -918,6 +925,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data, return 0; } +void ipc_server_start_async(struct ipc_server_data *server_data) +{ + if (!server_data || server_data->started) + return; + + server_data->started = 1; + pthread_mutex_unlock(&server_data->work_available_mutex); +} + /* * Gently tell the IPC server treads to shutdown. * Can be run on any thread. @@ -933,7 +949,9 @@ int ipc_server_stop_async(struct ipc_server_data *server_data) trace2_region_enter("ipc-server", "server-stop-async", NULL); - pthread_mutex_lock(&server_data->work_available_mutex); + /* If we haven't started yet, we are already holding lock. */ + if (server_data->started) + pthread_mutex_lock(&server_data->work_available_mutex); server_data->shutdown_requested = 1; diff --git a/compat/simple-ipc/ipc-win32.c b/compat/simple-ipc/ipc-win32.c index 8bfe51248e552e..a8fc812adfcbd3 100644 --- a/compat/simple-ipc/ipc-win32.c +++ b/compat/simple-ipc/ipc-win32.c @@ -371,6 +371,9 @@ struct ipc_server_data { HANDLE hEventStopRequested; struct ipc_server_thread_data *thread_list; int is_stopped; + + pthread_mutex_t startup_barrier; + int started; }; enum connect_result { @@ -526,6 +529,16 @@ static int use_connection(struct ipc_server_thread_data *server_thread_data) return ret; } +static void wait_for_startup_barrier(struct ipc_server_data *server_data) +{ + /* + * Temporarily hold the startup_barrier mutex before starting, + * which lets us know that it's OK to start serving requests. + */ + pthread_mutex_lock(&server_data->startup_barrier); + pthread_mutex_unlock(&server_data->startup_barrier); +} + /* * Thread proc for an IPC server worker thread. It handles a series of * connections from clients. It cleans and reuses the hPipe between each @@ -550,6 +563,8 @@ static void *server_thread_proc(void *_server_thread_data) memset(&oConnect, 0, sizeof(oConnect)); oConnect.hEvent = hEventConnected; + wait_for_startup_barrier(server_thread_data->server_data); + for (;;) { cr = wait_for_connection(server_thread_data, &oConnect); @@ -752,10 +767,10 @@ static HANDLE create_new_pipe(wchar_t *wpath, int is_first) return hPipe; } -int ipc_server_run_async(struct ipc_server_data **returned_server_data, - const char *path, const struct ipc_server_opts *opts, - ipc_server_application_cb *application_cb, - void *application_data) +int ipc_server_init_async(struct ipc_server_data **returned_server_data, + const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data) { struct ipc_server_data *server_data; wchar_t wpath[MAX_PATH]; @@ -787,6 +802,13 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data, strbuf_addstr(&server_data->buf_path, path); wcscpy(server_data->wpath, wpath); + /* + * Hold the startup_barrier lock so that no threads will progress + * until ipc_server_start_async() is called. + */ + pthread_mutex_init(&server_data->startup_barrier, NULL); + pthread_mutex_lock(&server_data->startup_barrier); + if (nr_threads < 1) nr_threads = 1; @@ -837,6 +859,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data, return 0; } +void ipc_server_start_async(struct ipc_server_data *server_data) +{ + if (!server_data || server_data->started) + return; + + server_data->started = 1; + pthread_mutex_unlock(&server_data->startup_barrier); +} + int ipc_server_stop_async(struct ipc_server_data *server_data) { if (!server_data) @@ -850,6 +881,13 @@ int ipc_server_stop_async(struct ipc_server_data *server_data) * We DO NOT attempt to force them to drop an active connection. */ SetEvent(server_data->hEventStopRequested); + + /* + * If we haven't yet told the threads they are allowed to run, + * do so now, so they can receive the shutdown event. + */ + ipc_server_start_async(server_data); + return 0; } @@ -900,5 +938,7 @@ void ipc_server_free(struct ipc_server_data *server_data) free(std); } + pthread_mutex_destroy(&server_data->startup_barrier); + free(server_data); } diff --git a/simple-ipc.h b/simple-ipc.h index a849d9f8411fdb..3916eaf70d9863 100644 --- a/simple-ipc.h +++ b/simple-ipc.h @@ -179,11 +179,20 @@ struct ipc_server_opts * When a client IPC message is received, the `application_cb` will be * called (possibly on a random thread) to handle the message and * optionally compose a reply message. + * + * This initializes all threads but no actual work will be done until + * ipc_server_start_async() is called. + */ +int ipc_server_init_async(struct ipc_server_data **returned_server_data, + const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data); + +/* + * Let an async server start running. This needs to be called only once + * after initialization. */ -int ipc_server_run_async(struct ipc_server_data **returned_server_data, - const char *path, const struct ipc_server_opts *opts, - ipc_server_application_cb *application_cb, - void *application_data); +void ipc_server_start_async(struct ipc_server_data *server_data); /* * Gently signal the IPC server pool to shutdown. No new client From 51907f8feeae9ae4af1722e973f44ff10aa168dc Mon Sep 17 00:00:00 2001 From: Jeff King Date: Tue, 8 Oct 2024 04:36:13 -0400 Subject: [PATCH 2/2] fsmonitor: initialize fs event listener before accepting clients There's a racy hang in fsmonitor on macOS that we sometimes see in CI. When we serve a client, what's supposed to happen is: 1. The client thread calls with_lock__wait_for_cookie() in which we create a cookie file and then wait for a pthread_cond event 2. The filesystem event listener sees the cookie file creation, does some internal book-keeping, and then triggers the pthread_cond. But there's a problem: we start the listener that accepts client threads before we start the fs event thread. So it's possible for us to accept a client which creates the cookie file and starts waiting before the fs event thread is initialized, and we miss those filesystem events entirely. That leaves the client thread hanging forever. In CI, the symptom is that t9210 (which is testing scalar, which always enables fsmonitor under the hood) may hang forever in "scalar clone". It is waiting on "git fetch" which is waiting on the fsmonitor daemon. The race happens more frequently under load, but you can trigger it predictably with a sleep like this, which delays the start of the fs event thread: --- a/compat/fsmonitor/fsm-listen-darwin.c +++ b/compat/fsmonitor/fsm-listen-darwin.c @@ -510,6 +510,7 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state) FSEventStreamSetDispatchQueue(data->stream, data->dq); data->stream_scheduled = 1; + sleep(1); if (!FSEventStreamStart(data->stream)) { error(_("Failed to start the FSEventStream")); goto force_error_stop_without_loop; One solution might be to reverse the order of initialization: start the fs event thread before we start the thread listening for clients. But the fsmonitor code explicitly does it in the opposite direction. The fs event thread wants to refer to the ipc_server_data struct, so we need it to be initialized first. A further complication is that we need a signal from the fs event thread that it is actually ready and listening. And those details happen within backend-specific fsmonitor code, whereas the initialization is in the shared code. So instead, let's use the ipc_server init/start split added in the previous commit. The generic fsmonitor code will init the ipc_server but _not_ start it, leaving that to the backend specific code, which now needs to call ipc_server_start_async() at the right time. For macOS, that is right after we start the FSEventStream that you can see in the diff above. It's not clear to me if Windows suffers from the same problem (and we simply don't trigger it in CI), or if it is immune. Regardless, the obvious place to start accepting clients there is right after we've established the ReadDirectoryChanges watch. This makes the hangs go away in our macOS CI environment, even when compiled with the sleep() above. Helped-by: Koji Nakamaru Signed-off-by: Jeff King Acked-by: Koji Nakamaru Signed-off-by: Junio C Hamano --- builtin/fsmonitor--daemon.c | 2 -- compat/fsmonitor/fsm-listen-darwin.c | 6 ++++++ compat/fsmonitor/fsm-listen-win32.c | 6 ++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/builtin/fsmonitor--daemon.c b/builtin/fsmonitor--daemon.c index 0c54cf13715765..97421d73a200d2 100644 --- a/builtin/fsmonitor--daemon.c +++ b/builtin/fsmonitor--daemon.c @@ -1215,8 +1215,6 @@ static int fsmonitor_run_daemon_1(struct fsmonitor_daemon_state *state) _("could not start IPC thread pool on '%s'"), state->path_ipc.buf); - ipc_server_start_async(&state->ipc_server_data); - /* * Start the fsmonitor listener thread to collect filesystem * events. diff --git a/compat/fsmonitor/fsm-listen-darwin.c b/compat/fsmonitor/fsm-listen-darwin.c index 2fc67442eb5e87..dfa551459d2d1b 100644 --- a/compat/fsmonitor/fsm-listen-darwin.c +++ b/compat/fsmonitor/fsm-listen-darwin.c @@ -516,6 +516,12 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state) } data->stream_started = 1; + /* + * Our fs event listener is now running, so it's safe to start + * serving client requests. + */ + ipc_server_start_async(state->ipc_server_data); + pthread_mutex_lock(&data->dq_lock); pthread_cond_wait(&data->dq_finished, &data->dq_lock); pthread_mutex_unlock(&data->dq_lock); diff --git a/compat/fsmonitor/fsm-listen-win32.c b/compat/fsmonitor/fsm-listen-win32.c index 5a21dade7b8659..80e092b511c973 100644 --- a/compat/fsmonitor/fsm-listen-win32.c +++ b/compat/fsmonitor/fsm-listen-win32.c @@ -741,6 +741,12 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state) start_rdcw_watch(data->watch_gitdir) == -1) goto force_error_stop; + /* + * Now that we've established the rdcw watches, we can start + * serving clients. + */ + ipc_server_start_async(state->ipc_server_data); + for (;;) { dwWait = WaitForMultipleObjects(data->nr_listener_handles, data->hListener,