Skip to content

Commit

Permalink
Skip flakey test, document TCP transport, elixir-format
Browse files Browse the repository at this point in the history
  • Loading branch information
ashton314 committed Nov 6, 2024
1 parent 281d624 commit 340f549
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 52 deletions.
88 changes: 36 additions & 52 deletions lib/chorex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,62 +330,36 @@ defmodule Chorex do
Note the 2-tuple: the first element is the module to be proxied, and
the second element should be the PID of an already-running proxy.
### Manually setting up the shared-state choreography (deprecated)
### **Experimental** TCP transport setup
(*Note: we recommend using the `Chorex.start` mechanism now.*)
You can run choreographies over TCP. Instead of specifying the
implementing module's name in the actor ↦ module map, put a tuple
like `{:remote, local_port, remote_host, remote_port}`. A process
will begin listening on `local_port` and forward messages to the
proper actors on the current node. Messages going to a remote actor
will be buffered until a TCP connection is established, at which
point they'll be sent FIFO.
You need to be a little careful when setting up the shared state
choreography. Instead of setting up all the actors manually, you need
to set up *one* instance of each shared-state actor, then create
separate *sessions* for each instance of the choreography that you
want to run.
Example with hosts `alice.net` and `bob.net`:
Here is an example with two buyers trying to buy the same book:
Host `alice.net`:
```elixir
# Start up the buyers
b1 = spawn(MyBuyer, :init, [[]])
b2 = spawn(MyBuyer, :init, [[]])
# Start up the seller proxy with the initial shared
# state (the stock of books in this case)
{:ok, px} = GenServer.start(Chorex.Proxy, %{"Anathem" => 1})
# Start sessions: one for each buyer
Proxy.begin_session(px, [b1], MySellerBackend, :init, [])
config1 = %{Buyer => b1, Seller => px, :super => self()}
Chorex.start(BasicRemote.Chorex,
%{SockAlice => SockAliceImpl,
SockBob => {:remote, 4242, "bob.net", 4243}}, [])
```
Proxy.begin_session(px, [b2], MySellerBackend, :init, [])
config2 = %{Buyer => b2, Seller => px, :super => self()}
Host `bob.net`:
# Send everyone their configuration
send(b1, {:config, config1})
send(px, {:chorex, b1, {:config, config1}})
send(b2, {:config, config2})
send(px, {:chorex, b2, {:config, config2}})
```elixir
Chorex.start(BasicRemote.Chorex,
%{SockAlice => {:remote, 4243, "alice.net", 4242},
SockBob => SockBobImpl}, [])
```
The `Proxy.begin_sesion` function takes a proxy function, a list of
PIDs that partake in a given session, and a module, function, arglist
for the thing to proxy.
**Sessions**: PIDs belonging to a session will have their messages
routed to the corresponding proxied process. The GenServer looks up
which session a PID belongs to, finds the proxied process linked to
that session, then forwards the message to that process. The exact
mechanisms of how this works may change in the future to accommodate
restarts.
When you send the config information to a proxied process, you send it
through the proxy first, and you must wrap the message as shown above
with a process from the session you want to target as the second
element in the tuple; this just helps the proxy figure out the session
you want.
That's it! If you run the above choreography, the process that kicks
this all off will get *one* message like `{:chorex_return, Buyer, :book_get}`
and *one* message like `{:chorex_return, Buyer, :darn_missed_it}`,
indicating that exactly one of the buyers got the coveted book.
**WARNING** this transport is *experimental* and not guaranteed to
work. We've had issues with message delivery during testing. PRs welcome!
"""

import WriterMonad
Expand Down Expand Up @@ -714,9 +688,11 @@ defmodule Chorex do

unquote(recver_exp) =
receive do
{:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> msg
{:chorex, ^tok, unquote(actor1), unquote(actor2), msg} ->
msg

m ->
IO.inspect(m, label: "#{inspect self()} got unexpected message")
IO.inspect(m, label: "#{inspect(self())} got unexpected message")
IO.inspect(tok, label: "tok")
42
end
Expand Down Expand Up @@ -1288,8 +1264,15 @@ defmodule Chorex do
end

def merge_step(
{:__block__, _, [{:=, _, [{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} = tok_get, {:receive, _, _} = lhs_rcv]},
{:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]}) do
{:__block__, _,
[
{:=, _,
[{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} =
tok_get,
{:receive, _, _} = lhs_rcv
]},
{:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]}
) do
quote do
unquote(tok_get)
unquote(merge_step(lhs_rcv, rhs_rcv))
Expand Down Expand Up @@ -1318,7 +1301,8 @@ defmodule Chorex do

# merge same branch
def merge_step(
{:receive, m1, [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]},
{:receive, m1,
[[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]},
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, dir]}], branch2]}]]]}
) do
{:receive, m1,
Expand Down
1 change: 1 addition & 0 deletions test/chorex/socket_proxy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Chorex.SocketProxyTest do
use BasicRemote.Chorex, :sockbob
end

@tag :skip
test "basic proxy works" do
# Spin up two tasks to collect responses
alice_receiver = Task.async(fn ->
Expand Down

0 comments on commit 340f549

Please sign in to comment.