Skip to content
Alexander Belopolsky edited this page Oct 1, 2017 · 14 revisions

Async Notes

Libuv sample code

The source distribution of libuv comes with a test test-embed.c demonstrating how to embed a libuv event loop in another loop.

Overview

The sample app has the following components:

  1. An external loop;
  2. An embed_timer scheduled on the default loop;
  3. The embed thread; and
  4. The embed_async task.

The program sets up its components and then enters the external loop:

  uv_run(&external, UV_RUN_DEFAULT);

At the same time, the embed thread obtains the backend fd of the default loop

    fd = uv_backend_fd(uv_default_loop());

and goes to sleep waiting for events to appear on fd:

    r = epoll_wait(fd, &ev, 1, timeout);

Once an event is received, the embed thread wakes up the external loop by making an async call to embed_async and waits for embed_cb to raise the embed_sem semaphore.

    uv_async_send(&embed_async);
    uv_sem_wait(&embed_sem);

Before raising the semaphore, the embed_async task processes events on the default loop:

static void embed_cb(uv_async_t* async) {
  uv_run(uv_default_loop(), UV_RUN_ONCE);
  uv_sem_post(&embed_sem);
}

How is a event generated got the backend_fd?

Let's run the embed test code with a breakpoint in kevent. The first break occurs in both threads:

* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 13.1
    frame #0: 0x00007fff94d9dd8c libsystem_kernel.dylib`kevent
libsystem_kernel.dylib`kevent:
->  0x7fff94d9dd8c <+0>:  movl   $0x200016b, %eax          ; imm = 0x200016B
    0x7fff94d9dd91 <+5>:  movq   %rcx, %r10
    0x7fff94d9dd94 <+8>:  syscall
    0x7fff94d9dd96 <+10>: jae    0x7fff94d9dda0            ; <+20>
  thread #2, stop reason = breakpoint 13.1
    frame #0: 0x00007fff94d9dd8c libsystem_kernel.dylib`kevent
libsystem_kernel.dylib`kevent:
->  0x7fff94d9dd8c <+0>:  movl   $0x200016b, %eax          ; imm = 0x200016B
    0x7fff94d9dd91 <+5>:  movq   %rcx, %r10
    0x7fff94d9dd94 <+8>:  syscall
    0x7fff94d9dd96 <+10>: jae    0x7fff94d9dda0            ; <+20>
Target 0: (run-tests) stopped.

Main thread

(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 13.1
  * frame #0: 0x00007fff94d9dd8c libsystem_kernel.dylib`kevent
    frame #1: 0x00000001005728eb libuv.1.dylib`uv__io_poll(loop=0x00007fff5fbff1b8, timeout=-1) at kqueue.c:215
    frame #2: 0x000000010055429f libuv.1.dylib`uv_run(loop=0x00007fff5fbff1b8, mode=UV_RUN_DEFAULT) at core.c:368
    frame #3: 0x000000010000b3bb run-tests`run_test_embed at test-embed.c:130
    frame #4: 0x0000000100004414 run-tests`run_test_part(test="embed", part="embed") at runner.c:355
    frame #5: 0x0000000100002b3a run-tests`main(argc=3, argv=0x00000001007001e0) at run-tests.c:59

We are inside the uv_run of the external loop and notifying a different fd from the one that the thread is waiting on.

Embed thread

* thread #2, stop reason = breakpoint 13.1
  * frame #0: 0x00007fff94d9dd8c libsystem_kernel.dylib`kevent
    frame #1: 0x000000010000b575 run-tests`embed_thread_runner(arg=0x0000000000000000) at test-embed.c:80

The second kevent

(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 13.1
    frame #0: 0x00007fff94d9dd8c libsystem_kernel.dylib`kevent
  * frame #1: 0x00000001005728eb libuv.1.dylib`uv__io_poll(loop=0x00000001005777e0, timeout=0) at kqueue.c:215
    frame #2: 0x000000010055429f libuv.1.dylib`uv_run(loop=0x00000001005777e0, mode=UV_RUN_ONCE) at core.c:368
    frame #3: 0x000000010000b47e run-tests`embed_cb(async=0x000000010009c708) at test-embed.c:95
    frame #4: 0x0000000100553bc2 libuv.1.dylib`uv__async_io(loop=0x00007fff5fbff1b8, w=0x00007fff5fbff428, events=1) at async.c:118
    frame #5: 0x00000001005730fc libuv.1.dylib`uv__io_poll(loop=0x00007fff5fbff1b8, timeout=-1) at kqueue.c:344
    frame #6: 0x000000010055429f libuv.1.dylib`uv_run(loop=0x00007fff5fbff1b8, mode=UV_RUN_DEFAULT) at core.c:368
    frame #7: 0x000000010000b3bb run-tests`run_test_embed at test-embed.c:130
    frame #8: 0x0000000100004414 run-tests`run_test_part(test="embed", part="embed") at runner.c:355
    frame #9: 0x0000000100002b3a run-tests`main(argc=3, argv=0x00000001007001e0) at run-tests.c:59

The other thread is waiting on the semaphore

(lldb) thread list
Process 4928 stopped
* thread #1: tid = 0x113773, 0x00007fff94d9dd8c libsystem_kernel.dylib`kevent, queue = 'com.apple.main-thread', stop reason = breakpoint 13.1
  thread #2: tid = 0x11378f, 0x00007fff94d95386 libsystem_kernel.dylib`semaphore_wait_trap + 10
(lldb) up
frame #2: 0x000000010000b5bd run-tests`embed_thread_runner(arg=0x0000000000000000) at test-embed.c:89
   86  	#endif
   87  	    } while (r == -1 && errno == EINTR);
   88  	    uv_async_send(&embed_async);
-> 89  	    uv_sem_wait(&embed_sem);
   90  	  }
   91  	}
   92

Debug

Listing handles in Julia

julia> ccall(:uv_print_all_handles, Void, (Ptr{Void}, Ptr{Void}), Base.eventloop(), ccall(:Q_stderr, Ptr{Void}, ()))
[--I] signal   0x1053b0830
[-AI] async    0x1053b05d8
[RA-] async    0x1052ff390
[R--] tty      0x11050b880
[R--] tty      0x100200360
[RA-] tty      0x100205350

The format is [flags] handle-type handle-address. For flags:

  • R is printed for a handle that is referenced
  • A is printed for a handle that is active
  • I is printed for a handle that is internal

What is @async

The @async expr macro expands into

    thunk = ()->expr
    t = Task(thunk)
    sync_add(t)
    enq_work(t)
    t

Where sync_add is often a no-op and enq_work is the same as schedule.

The schedule method.

The schedule method is defined as follows:

function enq_work(t::Task)
    t.state == :runnable || error("schedule: Task not runnable")
    ccall(:uv_stop, Void, (Ptr{Void},), eventloop())
    push!(Workqueue, t)
    t.state = :queued
    return t
end

schedule(t::Task) = enq_work(t)

In other words, it calls uv_stop on the default loop, pushed the given task on the Workqueue and changes its state from :runnable to :queued.