From 030e1fd7409afbb7f49507eacf29ea6aaa095390 Mon Sep 17 00:00:00 2001 From: g41797 Date: Sat, 21 Sep 2024 09:55:58 +0300 Subject: [PATCH] Improve description --- README.md | 124 ++++++++++++++++++++++++++++-------------------- src/mailbox.zig | 47 ++++++++---------- 2 files changed, 93 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index a31d9a2..5edb75e 100644 --- a/README.md +++ b/README.md @@ -4,34 +4,19 @@ [![CI](https://github.com/g41797/yazq/actions/workflows/ci.yml/badge.svg)](https://github.com/g41797/yazq/actions/workflows/ci.yml) -> Mailboxes are one of the fundamental parts of the [actor model originated in **1973**](https://en.wikipedia.org/wiki/Actor_model). -> -> Through the mailbox mechanism, actors can decouple the reception of a message from its elaboration. -> +## A bit of history, a bit of theory + +Mailboxes are one of the fundamental parts of the [actor model originated in **1973**](https://en.wikipedia.org/wiki/Actor_model): > An actor is an object that carries out its actions in response to communications it receives. -> +> Through the mailbox mechanism, actors can decouple the reception of a message from its elaboration. > A mailbox is nothing more than the data structure (FIFO) that holds messages. -> -> If you send 3 messages to the same actor, it will just execute one at a time. - -Useful links for interested: -- [Huan Mailbox](https://github.com/huan/mailbox) -- [Typed Mailboxes in Scala](https://www.baeldung.com/scala/typed-mailboxes) -- [Actors have mailboxes](https://www.brianstorti.com/the-actor-model/) - I first encountered MailBox in the late 80s while working om a real-time system: - -> "A **mailbox** is one of two types of objects that can be used for intertask +> "A **mailbox** is object that can be used for inter-task communication. When task A wants to send an object to task B, task A must send the object to the mailbox, and task B must visit the mailbox, -where, if an object isn't there, it has the option of waiting for any -desired length of time. Sending an object in this manner can achieve -various purposes. The object might be a segment that contains data -needed by the waiting task. On the other hand, the segment might be -blank, and sending it might constitute a signal to the waiting task. -Another reason to send an object might be to point out the object to the -receiving task." +where, if an object isn't there, it has the option of *waiting for any +desired length of time*..." > **iRMX 86™ NUCLEUS REFERENCE MANUAL** _Copyright @ 1980, 1981 Intel Corporation. Since than I have used it in: @@ -40,18 +25,28 @@ Since than I have used it in: - Windows - *C++/C#* - Linux - *Golang* -**Now it's time for Zig** +**Now it's Zig time** + +## Why? +If your thread runs in "Fire and Forget" mode, you don't need Mailbox. +But in the real multithreaded application, threads communicate with each other as +members of work team. -## Example of Echo actor(Thread + Mailbox) +**Mailbox** provides convenient and simple communication mechanism. +Just try: +- without it +- with it +## Example of usage - 'Echo' ```zig // Mbx is Mailbox with usize letter(data) const Mbx = MailBox(usize); - // Echo actor - runs on own thread + // Echo - runs on own thread + // It has two mailboxes + // "TO" and "FROM" - from the client point of the view // Receives letter via 'TO' mailbox - // Send letter without change (echo) to "FROM" mailbox - // "TO"/"FROM" - from the client point of the view + // Replies letter without change (echo) to "FROM" mailbox const Echo = struct { const Self = @This(); @@ -60,19 +55,22 @@ Since than I have used it in: thread: Thread = undefined, // Mailboxes creation and start of the thread + // Pay attention, that client code does not use + // any thread "API" - all embedded within Echo pub fn start(echo: *Self) void { - echo.to = Mbx.open(); - echo.from = Mbx.open(); + echo.to = .{}; + echo.from = .{}; echo.thread = std.Thread.spawn(.{}, run, .{echo}) catch unreachable; } - // Thread function + // Echo thread function fn run(echo: *Self) void { // Main loop: while (true) { // Receive - exit from the thread if mailbox was closed const envelope = echo.to.receive(100000000) catch break; - // Send - exit from the thread if mailbox was closed + // Reply to the client + // Exit from the thread if mailbox was closed _ = echo.from.send(envelope) catch break; } } @@ -83,7 +81,7 @@ Since than I have used it in: } // Close mailboxes - // As result Echo actor should stop processing + // As result Echo should stop processing // and exit from the thread. pub fn stop(echo: *Self) !void { _ = try echo.to.close(); @@ -91,14 +89,13 @@ Since than I have used it in: } }; - // Echo "client" code: var echo = try std.testing.allocator.create(Echo); - // Start Echo actor on own thread + // Start Echo(on own thread) echo.start(); defer { - // Wait finish of the thread + // Wait finish of Echo echo.waitFinish(); std.testing.allocator.destroy(echo); } @@ -130,37 +127,60 @@ Since than I have used it in: } } - // Stop Echo actor + // Stop Echo try echo.stop(); -} -// defered: -// Wait finish of the thread -// echo.waitFinish(); -// Free allocated memory: -// std.testing.allocator.destroy(echo); -// std.testing.allocator.destroy(envl); ``` -## Installation +## Boring details -Add dependency to build.zig.zon: -```bash -zig fetch --save-exact https://github.com/g41797/mailbox/archive/master.tar.gz +Mailbox of *[]const u8* 'Letters': +```zig +const Rumors = MailBox([]const u8); +const rmrsMbx : Rumors = .{}; ``` -Add to build.zig: +**Envelope** is wrapper of actual user defined type **Letter**. ```zig -exe.addModule("mailbox", b.dependency("mailbox", .{}).module("mailbox")); + pub const Envelope = struct { + prev: ?*Envelope = null, + next: ?*Envelope = null, + letter: Letter, + }; ``` +In fact Mailbox is queue(FIFO) of Envelope(s). -## Contributing +MailBox supports following operations: +- send *Envelope* to MailBox (*enqueue*) and wakeup waiting receivers +- receive *Envelope* from Mailbox (*dequeue*) with time-out +- close Mailbox: + - disables further operations + - first close returns List of non-processed *Envelope(s)* for free/reuse etc. -Feel free to report bugs and suggest improvements. +Feel free to suggest improvements in doc and code. -## License + +## License [MIT](LICENSE) +## Installation +You finally got to installation: +- add dependency to build.zig.zon +```bash +zig fetch --save-exact https://github.com/g41797/mailbox/archive/master.tar.gz +``` +- add to build.zig: +```zig +exe.addModule("mailbox", b.dependency("mailbox", .{}).module("mailbox")); +``` + +**Stop reading and start playing!** + +## Last warning +First rule of multithreading: +>**If you can do without multithreading - do without.** + + diff --git a/src/mailbox.zig b/src/mailbox.zig index 39f247b..d1c7ee8 100644 --- a/src/mailbox.zig +++ b/src/mailbox.zig @@ -11,7 +11,7 @@ const Mutex = std.Thread.Mutex; const Condition = std.Thread.Condition; const Thread = std.Thread; -pub fn MailBox(comptime T: type) type { +pub fn MailBox(comptime Letter: type) type { return struct { const Self = @This(); @@ -19,23 +19,16 @@ pub fn MailBox(comptime T: type) type { pub const Envelope = struct { prev: ?*Envelope = null, next: ?*Envelope = null, - letter: T, + letter: Letter, }; first: ?*Envelope = null, last: ?*Envelope = null, len: usize = 0, - closed: bool = true, + closed: bool = false, mutex: Mutex = .{}, cond: Condition = .{}, - /// Set mailbox to ready mode - pub fn open() Self { // Add alloc: std.mem.Allocator - return Self{ - .closed = false, - }; - } - /// Append a new Envelope to the tail /// and wake-up waiting on receive threads. /// Arguments: @@ -65,7 +58,6 @@ pub fn MailBox(comptime T: type) type { defer mbox.mutex.unlock(); while (mbox.len == 0) { - if (mbox.closed) { return error.Closed; } @@ -172,7 +164,7 @@ test { //----------------------------- test "basic MailBox test" { const Mbx = MailBox(u32); - var mbox = Mbx.open(); + var mbox: Mbx = .{}; try testing.expectError(error.Timeout, mbox.receive(10)); @@ -208,15 +200,16 @@ test "basic MailBox test" { //----------------------------- //----------------------------- -test "Echo actor based on mailboxes test" { +test "Echo mailboxes test" { // Mbx is Mailbox with usize letter(data) const Mbx = MailBox(usize); - // Echo actor - runs on own thread + // Echo - runs on own thread + // It has two mailboxes + // "TO" and "FROM" - from the client point of the view // Receives letter via 'TO' mailbox - // Send letter without change (echo) to "FROM" mailbox - // "TO"/"FROM" - from the client point of the view + // Replies letter without change (echo) to "FROM" mailbox const Echo = struct { const Self = @This(); @@ -225,19 +218,22 @@ test "Echo actor based on mailboxes test" { thread: Thread = undefined, // Mailboxes creation and start of the thread + // Pay attention, that client code does not use + // any thread "API" - all embedded within Echo pub fn start(echo: *Self) void { - echo.to = Mbx.open(); - echo.from = Mbx.open(); + echo.to = .{}; + echo.from = .{}; echo.thread = std.Thread.spawn(.{}, run, .{echo}) catch unreachable; } - // Thread function + // Echo thread function fn run(echo: *Self) void { // Main loop: while (true) { // Receive - exit from the thread if mailbox was closed const envelope = echo.to.receive(100000000) catch break; - // Send - exit from the thread if mailbox was closed + // Reply to the client + // Exit from the thread if mailbox was closed _ = echo.from.send(envelope) catch break; } } @@ -248,7 +244,7 @@ test "Echo actor based on mailboxes test" { } // Close mailboxes - // As result Echo actor should stop processing + // As result Echo should stop processing // and exit from the thread. pub fn stop(echo: *Self) !void { _ = try echo.to.close(); @@ -256,14 +252,13 @@ test "Echo actor based on mailboxes test" { } }; - var echo = try std.testing.allocator.create(Echo); - // Start Echo actor on own thread + // Start Echo(on own thread) echo.start(); defer { - // Wait finish of the thread + // Wait finish of Echo echo.waitFinish(); std.testing.allocator.destroy(echo); } @@ -295,11 +290,11 @@ test "Echo actor based on mailboxes test" { } } - // Stop Echo actor + // Stop Echo try echo.stop(); } // defered: -// Wait finish of the thread +// Wait finish of Echo // echo.waitFinish(); // Free allocated memory: // std.testing.allocator.destroy(echo);