Skip to content

Commit d90d9f6

Browse files
committed
Implement asyncio.queues (FIFO and LIFO Queues)
1 parent 8f869a1 commit d90d9f6

File tree

6 files changed

+610
-1
lines changed

6 files changed

+610
-1
lines changed

docgen.lua

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,8 @@ local list = {
331331
"futures.lua",
332332
"tasks.lua",
333333
"timer_list.lua",
334-
"synchronization.lua"
334+
"synchronization.lua",
335+
"queues.lua"
335336
}
336337
for file = 1, #list do
337338
local f = io.open(list[file], 'r')

docs/Queues.md

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
# Methods
2+
>### Queue.new ( loop, maxsize, obj )
3+
>| Parameter | Type | Required | Description |
4+
>| :-: | :-: | :-: | - |
5+
>| loop | `EventLoop` || The EventLoop the Queue belongs to |
6+
>| maxsize | `int` || The queue max size. <sub>(default = 0)</sub> |
7+
>| obj | `table` || The table to turn into a Queue. |
8+
>
9+
>Creates a new instance of Queue<br>
10+
>This is a FIFO Queue (first in, first out). If maxsize is less than or equal to zero, the queue size is infinite.<br>
11+
>![/!\\](https://i.imgur.com/HQ188PK.png) Queue.size might be an approximated value some times (on purpose for internal reasons). Use Queue.real_size if you want to get the real queue size.
12+
>
13+
>**Returns**:
14+
>
15+
>| Type | Description |
16+
>| :-: | - |
17+
>| `Queue` | The Queue object |
18+
>
19+
>**Table structure**:
20+
>```Lua
21+
>{
22+
> loop = loop, -- The EventLoop the Queue belongs to
23+
> maxsize = maxsize, -- The queue max size
24+
> waiting_free = {}, -- The sleeping tasks that are waiting for a free spot in the queue
25+
> waiting_free_append = 0, -- The "waiting_free append pointer"
26+
> waiting_free_give = 0, -- The "waiting_free give pointer"
27+
> waiting_item = {}, -- The sleeping tasks that are waiting for an item in the queue
28+
> waiting_item_append = 0, -- The "waiting_item append pointer"
29+
> waiting_item_give = 0, -- The "waiting_item give pointer"
30+
> size = 0, -- The queue approximated size (±1)
31+
> real_size = 0 -- The queue real size
32+
>}
33+
>```
34+
---
35+
>### Queue:full ( )
36+
>
37+
>Checks if the queue is full or not
38+
>
39+
>**Returns**:
40+
>
41+
>| Type | Description |
42+
>| :-: | - |
43+
>| `boolean` | Whether the queue is full or not |
44+
>
45+
---
46+
>### Queue:empty ( )
47+
>
48+
>Checks if the queue is empty or not
49+
>
50+
>**Returns**:
51+
>
52+
>| Type | Description |
53+
>| :-: | - |
54+
>| `boolean` | Whether the queue is empty or not |
55+
>
56+
---
57+
>### Queue:trigger_add ( )
58+
>
59+
>Wakes up a Queue:get task that is waiting for an item to be added.<br>
60+
>![/!\\](https://i.imgur.com/HQ188PK.png) This method should never be called by the user code.
61+
>
62+
>**Returns**:
63+
>
64+
>| Type | Description |
65+
>| :-: | - |
66+
>| `boolean` | Whether a task was triggered or not |
67+
>
68+
---
69+
>### Queue:trigger_remove ( )
70+
>
71+
>Wakes up a Queue:add task that is waiting for an item to be removed.<br>
72+
>![/!\\](https://i.imgur.com/HQ188PK.png) This method should never be called by the user code.
73+
>
74+
>**Returns**:
75+
>
76+
>| Type | Description |
77+
>| :-: | - |
78+
>| `boolean` | Whether a task was triggered or not |
79+
>
80+
---
81+
>### Queue:add_nowait ( item, safe )
82+
>| Parameter | Type | Required | Description |
83+
>| :-: | :-: | :-: | - |
84+
>| item | `table` | ✔ | The item to add |
85+
>| safe | `boolean` | ✕ | Whether to cancel throwing an error if the queue is full <sub>(default = false)</sub> |
86+
>
87+
>Adds an item to the queue without blocking.
88+
>
89+
>**Returns**:
90+
>
91+
>| Type | Description |
92+
>| :-: | - |
93+
>| `boolean` | Whether the item was added or not (if safe is false, this will always be true) |
94+
>
95+
---
96+
>### Queue.add ( self, item )
97+
>| Parameter | Type | Required | Description |
98+
>| :-: | :-: | :-: | - |
99+
>| item | `table` | ✔ | The item to add |
100+
>
101+
>Returns a task that, when awaited, will try to add the item to the queue, and if it cant, it will block until it can.
102+
>
103+
>**Returns**:
104+
>
105+
>| Type | Description |
106+
>| :-: | - |
107+
>| `Task` | The task. |
108+
>
109+
---
110+
>### Queue:get_nowait ( safe )
111+
>| Parameter | Type | Required | Description |
112+
>| :-: | :-: | :-: | - |
113+
>| safe | `boolean` | ✕ | Whether to cancel throwing an error if the queue is empty <sub>(default = false)</sub> |
114+
>
115+
>Gets an item from the queue without blocking.
116+
>
117+
>**Returns**:
118+
>
119+
>| Type | Description |
120+
>| :-: | - |
121+
>| `boolean`, `table` | `false` if the queue is empty and `safe` is `false`, the item (`table`) otherwise. |
122+
>
123+
---
124+
>### Queue.get ( self )
125+
>
126+
>Returns a task that, when awaited, will try to get an item from the queue, and if it cant, it will block until it can.<br>
127+
>The task always returns a `table`, which is the item.
128+
>
129+
>**Returns**:
130+
>
131+
>| Type | Description |
132+
>| :-: | - |
133+
>| `Task` | The task |
134+
>
135+
---
136+
>### LifoQueue.new ( loop, maxsize, obj )
137+
>| Parameter | Type | Required | Description |
138+
>| :-: | :-: | :-: | - |
139+
>| loop | `EventLoop` | ✔ | The EventLoop the Queue belongs to |
140+
>| maxsize | `int` | ✕ | The queue max size. <sub>(default = 0)</sub> |
141+
>| obj | `table` | ✕ | The table to turn into a Queue. |
142+
>
143+
>Creates a new instance of LifoQueue (which inherits from Queue)<br>
144+
>This is a LIFO Queue (last in, first out). If maxsize is less than or equal to zero, the queue size is infinite.<br>
145+
>![/!\\](https://i.imgur.com/HQ188PK.png) Queue.size might be an approximated value some times (on purpose for internal reasons). Use Queue.real_size if you want to get the real queue size.
146+
>
147+
>**Returns**:
148+
>
149+
>| Type | Description |
150+
>| :-: | - |
151+
>| `Queue` | The Queue object |
152+
>
153+
>**Table structure**:
154+
>```Lua
155+
>{
156+
> loop = loop, -- The EventLoop the Queue belongs to
157+
> maxsize = maxsize, -- The queue max size
158+
> waiting_free = {}, -- The sleeping tasks that are waiting for a free spot in the queue
159+
> waiting_free_append = 0, -- The "waiting_free append pointer"
160+
> waiting_free_give = 0, -- The "waiting_free give pointer"
161+
> waiting_item = {}, -- The sleeping tasks that are waiting for an item in the queue
162+
> waiting_item_append = 0, -- The "waiting_item append pointer"
163+
> waiting_item_give = 0, -- The "waiting_item give pointer"
164+
> size = 0, -- The queue approximated size (±1)
165+
> real_size = 0 -- The queue real size
166+
>}
167+
>```
168+
---
169+
>### LifoQueue:add_nowait ( item, safe )
170+
>| Parameter | Type | Required | Description |
171+
>| :-: | :-: | :-: | - |
172+
>| item | `QueueItem` | ✔ | The item to add |
173+
>| safe | `boolean` | ✕ | Whether to cancel throwing an error if the queue is full <sub>(default = false)</sub> |
174+
>
175+
>Adds an item to the queue without blocking.
176+
>
177+
>**Returns**:
178+
>
179+
>| Type | Description |
180+
>| :-: | - |
181+
>| `boolean` | Whether the item was added or not (if safe is false, this will always be true) |
182+
>
183+
---
184+
>### LifoQueue.add ( self, item )
185+
>| Parameter | Type | Required | Description |
186+
>| :-: | :-: | :-: | - |
187+
>| item | `QueueItem` | ✔ | The item to add |
188+
>
189+
>Returns a task that, when awaited, will try to add the item to the queue, and if it cant, it will block until then.
190+
>
191+
>**Returns**:
192+
>
193+
>| Type | Description |
194+
>| :-: | - |
195+
>| `Task` | The task. |
196+
>

docs/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- [Event_loop](Event_loop.md)
66
- [Futures](Futures.md)
7+
- [Queues](Queues.md)
78
- [Synchronization](Synchronization.md)
89
- [Tasks](Tasks.md)
910
- [Timer_list](Timer_list.md)
@@ -47,6 +48,19 @@
4748
- [FutureSemaphore.new](Futures.md#futuresemaphorenew--loop-quantity-obj-)
4849
- [FutureSemaphore:set_error](Futures.md#futuresemaphoreset_error--result-safe-index-)
4950
- [FutureSemaphore:set_result](Futures.md#futuresemaphoreset_result--result-safe-index-)
51+
- [Queues](Queues.md)
52+
- [LifoQueue.add](Queues.md#lifoqueueadd--self-item-)
53+
- [LifoQueue.new](Queues.md#lifoqueuenew--loop-maxsize-obj-)
54+
- [LifoQueue:add_nowait](Queues.md#lifoqueueadd_nowait--item-safe-)
55+
- [Queue.add](Queues.md#queueadd--self-item-)
56+
- [Queue.get](Queues.md#queueget--self-)
57+
- [Queue.new](Queues.md#queuenew--loop-maxsize-obj-)
58+
- [Queue:add_nowait](Queues.md#queueadd_nowait--item-safe-)
59+
- [Queue:empty](Queues.md#queueempty---)
60+
- [Queue:full](Queues.md#queuefull---)
61+
- [Queue:get_nowait](Queues.md#queueget_nowait--safe-)
62+
- [Queue:trigger_add](Queues.md#queuetrigger_add---)
63+
- [Queue:trigger_remove](Queues.md#queuetrigger_remove---)
5064
- [Synchronization](Synchronization.md)
5165
- [Event.new](Synchronization.md#eventnew--loop-obj-)
5266
- [Event.wait](Synchronization.md#eventwait--self-)

examples/queues.lua

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
local asyncio = require "lua-asyncio"
2+
local async = asyncio.async
3+
4+
local loop = asyncio.loops.EventLoop.new()
5+
local queue = loop:new_object(asyncio.queues.Queue)
6+
7+
local giver = async(function()
8+
print("Start giving")
9+
for x = 1, 12 do
10+
print("Giving", x)
11+
loop:await(queue:add({val = x}))
12+
end
13+
print("End giving")
14+
end)
15+
16+
local receiver = async(function(id)
17+
print("Starting receiver", id)
18+
19+
while true do
20+
local data = loop:await(queue:get())
21+
22+
print("Receiver", id, data.val)
23+
end
24+
end)
25+
26+
loop:add_task(giver())
27+
loop:add_task(receiver(1))
28+
loop:add_task(receiver(2))
29+
loop:add_task(receiver(3))
30+
31+
while true do
32+
loop:run()
33+
end
34+
35+
-- As you know, EventLoop is not ordered just to make it faster
36+
-- So, in this example, 3rd receiver will receive first, then 2nd and then 1st.
37+
-- If you added more tasks, this order will probably be different and change over time.
38+
-- However, if you need to make it an ordered process, you must use OrderedEventLoop instead.
39+
--[[ OUTPUT OF THE PROGRAM:
40+
Start giving
41+
Giving 1
42+
Starting receiver 1
43+
Starting receiver 2
44+
Starting receiver 3
45+
Giving 2
46+
Receiver 3 1
47+
Giving 3
48+
Receiver 2 2
49+
Giving 4
50+
Receiver 1 3
51+
Giving 5
52+
Receiver 3 4
53+
Giving 6
54+
Receiver 2 5
55+
Giving 7
56+
Receiver 1 6
57+
Giving 8
58+
Receiver 3 7
59+
Giving 9
60+
Receiver 2 8
61+
Giving 10
62+
Receiver 1 9
63+
Giving 11
64+
Receiver 3 10
65+
Giving 12
66+
Receiver 2 11
67+
End giving
68+
Receiver 1 12
69+
]]

init.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ local futures = require "lua-asyncio/futures"
33
local tasks = require "lua-asyncio/tasks"
44
local timers = require "lua-asyncio/timer_list"
55
local sync = require "lua-asyncio/synchronization"
6+
local queues = require "lua-asyncio/queues"
67

78
return {
89
loops = loops,
910
futures = futures,
1011
sync = sync,
12+
queues = queues,
1113
Task = tasks.Task,
1214
async = tasks.async,
1315
TimerList = timers.TimerList,

0 commit comments

Comments
 (0)