Skip to content

Commit

Permalink
zig threading typos
Browse files Browse the repository at this point in the history
  • Loading branch information
NoelM authored Sep 28, 2023
1 parent 71ba487 commit f112f40
Showing 1 changed file with 32 additions and 31 deletions.
63 changes: 32 additions & 31 deletions content/posts/zig_threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ draft = false
## Prequel

With the release of Zig `0.11.0`, the language introduced [channels](https://ziglang.org/documentation/0.11.0/std/#A;std:event.Channel)
like in Go. However, in the same release the announced that the support of
like in Go. However, in the same release they announced that the support of
[async function is postponed](https://ziglang.org/documentation/0.11.0/#Async-Functions), and the related
[Github issue](https://github.com/ziglang/zig/issues/6025). Finally, the async support is expected for the release `0.13.0`.

Expand All @@ -21,26 +21,26 @@ Thus, I switched to the good old solution of _thread pools_.

## Introduction

I previously benchmarked the [Zig versus C](https://noelmrtn.fr/posts/zig_and_c/) using a prime-number algorithm. The implementation
was single threaded, so it sounds fun to port the algorithm to a multi-threaded implementation.
Using a prime-number algorithm, I previously benchmarked the [Zig versus C](https://noelmrtn.fr/posts/zig_and_c/).
It was single-threaded, so porting the algorithm to a multi-threaded implementation sounds fun.

The standard library of Zig has a type `std.Thread` that includes the basics of threading: `spawn`, `join`, `Mutex`... But also some
more advanced like [thread pool](https://ziglang.org/documentation/0.11.0/std/#A;std:Thread.Pool) (not documented yet).

The implementation consists in a pool of threads waiting for tasks to be executed. In our case the task is testing if a integer is
prime or not. I will present two possible implementations: one with manually spawned threads, the other one with the help
The implementation consists of a pool of threads waiting for a task to be executed. In our case, a task tests whether an integer is
prime or not. I will present two possible implementations: one with manually spawned threads and the other one with the help
of `std.Thread.Pool`.

The full code is published on [Github](https://github.com/NoelM/zig-playground/tree/main/prime_numbers_parallel).
The complete code is published on [Github](https://github.com/NoelM/zig-playground/tree/main/prime_numbers_parallel).

## Thread-safe Structures

### Allocator

In Zig there is no hidden allocation. So, every memory allocation is performed by an allocator of type `std.mem.Allocator`. This is
In Zig, there is no hidden allocation. So, every memory allocation is performed by an allocator of type `std.mem.Allocator`. It is
similar to the `malloc` function in `C` with more options: heap, fixed buffer, arena...

By default the allocators are single threaded and not suitable for concurrency, here is an example of thread-safe allocator:
By default, the allocators are single-threaded and not suitable for concurrency. Here is an example of thread-safe allocator:

```zig
var single_threaded_arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
Expand All @@ -54,8 +54,8 @@ const arena = thread_safe_arena.allocator();

### Queues

The standard library (_stdlib_) provides a type `std.atomic.Queue(T: type)`. It allows multiple producers and readers accessing to
it concurrently, with functions: `put`, `get`, `remove`... Every item of the queue is of type `Node` as declared as:
The standard library (_stdlib_) provides a type `std.atomic.Queue(T: type)`. It allows multiple producers and readers to access
it concurrently, with functions: `put`, `get`, `remove`... Every item of the queue is of type `Node`:

```zig
pub const Node = struct {
Expand All @@ -65,8 +65,8 @@ pub const Node = struct {
}
```

If a pointer is prefixed with `?` it is called [_optional pointer_](https://ziglang.org/documentation/0.11.0/#Optional-Pointers),
and this makes it nullable (by default a pointer in Zig cannot be `null`).
A `?`-prefixed pointer is a so-called [_optional pointer_](https://ziglang.org/documentation/0.11.0/#Optional-Pointers),
and it allows its memory address to be null (by default, a pointer in Zig is not nullable).

To deal with an optional pointer `?*T` you must check if it points to a memory location. An option is directly within an `if`:

Expand All @@ -79,15 +79,15 @@ if (optional_ptr) |ptr| {
}
```

Or directly when accessing to the value:
Or directly when accessing the value:

```zig
const val = optional_ptr.?.*;
```

#### Initialize and Fill Queues

This is pretty easy to setup a thread-safe queue, first of all initialize it. Then allocate nodes with the previously defined
It is pretty easy to set up a thread-safe queue. First of all, initialize it. Then, allocate nodes with the previously defined
thread-safe allocator.

```zig
Expand All @@ -108,8 +108,8 @@ As one can see, we let the values `.prev` and `.next` to `undefined`, then the f

#### Get Nodes from Queue

The elements in a `Queue` can be safely accessed from any thread of the program with the function `get`. It returns
an optional node pointer, because the queue could be empty. When getting a not, test the returned value:
The elements in a `Queue` can be safely accessed from any program thread with the function `get`. It returns
an optional node pointer because the queue might be empty. So, test the value of the returned `Node`:

```zig
if (int_to_test.get()) |node| {
Expand All @@ -122,12 +122,12 @@ if (int_to_test.get()) |node| {

## Thread Pool

A thread pool is designed to optimize the number of running threads. Instead of popping a new thread for each task, it maintains
A thread pool aims to optimize the number of running threads. Instead of popping a new thread for each task, it maintains
a pool of threads waiting for tasks without being stopped.

### Manual

First, I create a slice (because it size cannot be evaluated at compilation) of `std.Thread`.
First, I create a slice (a slice and not an array because its size cannot be evaluated at compilation) of `std.Thread`.

```zig
var pool = try arena.alloc(std.Thread, cpu_count);
Expand All @@ -138,7 +138,7 @@ for (pool) |*thread| {
}
```

Each thread runs a function `isPrimeRoutine`. The function reads a queue containing values to be tested, until the queue is empty.
Each thread runs a function `isPrimeRoutine`. The function reads a queue, tests the returned values, and returns when the queue is empty.

```zig
pub fn isPrimeRoutine(int_to_test: *U64Queue, int_prime: *U64Queue) void {
Expand Down Expand Up @@ -168,7 +168,7 @@ pub fn isPrimeRoutine(int_to_test: *U64Queue, int_prime: *U64Queue) void {
}
```

This example shows an interesting behavior of queues, as `int_to_test.get()` returns a pointer, that is not deallocated.
This example shows an interesting behavior of queues, as `int_to_test.get()` returns a pointer that is not deallocated.
So, you can reuse it as a new node of `is_prime` thanks to reset `.prev` and `.next`.

Finally, we use the classical `.join()` waiting for threads to end.
Expand All @@ -180,15 +180,15 @@ for (pool) |thread| {
```

{{<hint>}}
Previously, I presented the thread-safe allocator with the name `arena`. In this implementation, a thread-safe
allocator is not required.
Previously, I presented an `arena` thread-safe allocator. The current implementation does not require a thread-safe
allocator anymore.
{{</hint>}}

`prime_manual.zig`: [Github](https://github.com/NoelM/zig-playground/blob/main/prime_numbers_parallel/prime_manual.zig)

### Standard Library

Now I use the type `std.Thread.Pool` from the _stdlib_, as shown previously it maintains a pool of threads. But instead of
The type `std.Thread.Pool` from the _stdlib_ maintains a pool of threads. But instead of
putting integer to a queue, we spawn tasks (calls to a function) queued in a `RunQueue`.

```zig
Expand All @@ -203,13 +203,14 @@ defer thread_pool.deinit();
```

{{<hint>}}
Do not forget that the `try` in front of `thread_pool.init()` catches the function's error and returns it.
Whenever a function returns an error, you must [catch it](https://ziglearn.org/chapter-1/#errors). They
are denoted by a `!` prefixing the returned type.
Do not forget that the `try` in front of `thread_pool.init()` catches the function's error.
Whenever a function returns an error, you must [catch it](https://ziglearn.org/chapter-1/#errors). Those functions
return `!`-prefixed values.
{{</hint>}}

Here, instead of spawning a task per integer, I spawn them in batches. Actually, testing a single integer is a too small task
compared to the effort required to spawn a task.
Instead of spawning a task per integer, I spawn them in batches. Testing a single integer is too small
compared to the effort required to spawn a task. Otherwise, it would spoil more computation power into spawning a task
than testing the integer itself.

```zig
const value_max: u64 = 1000000;
Expand All @@ -222,12 +223,12 @@ while (value < value_max) : (value += shard_size) {
```

Each task will verify from `[value ; value + shard_size[` all the possible prime numbers. In contrast with
the previous example, here one requires a thread-safe allocator to create new nodes.
the previous example, now it requires a thread-safe allocator to create new nodes.

#### Wait Group

The thread pool does not have a public `join` member, but it gets a `waitAndWork` instead that requires a
wait group (the _stdlib_ got one). This sounds odd, but mind to `reset` the wait group before using it.
The thread pool does not have a public `join` member but gets a `waitAndWork` instead, which requires a
wait group (the _stdlib_ got one). It may sound odd, but mind to `reset` the wait group before using it.
Because it contains an `std.Thread.ResetEvent` not initialized by default.

```zig
Expand Down

0 comments on commit f112f40

Please sign in to comment.