Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge transport backends #17

Merged
merged 19 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 122 additions & 77 deletions lib/chorex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,75 +330,61 @@ defmodule Chorex do
Note the 2-tuple: the first element is the module to be proxied, and
the second element should be the PID of an already-running proxy.

### Manually setting up the shared-state choreography (deprecated)
### **Experimental** TCP transport setup

(*Note: we recommend using the `Chorex.start` mechanism now.*)
You can run choreographies over TCP. Instead of specifying the
implementing module's name in the actor ↦ module map, put a tuple
like `{:remote, local_port, remote_host, remote_port}`. A process
will begin listening on `local_port` and forward messages to the
proper actors on the current node. Messages going to a remote actor
will be buffered until a TCP connection is established, at which
point they'll be sent FIFO.

You need to be a little careful when setting up the shared state
choreography. Instead of setting up all the actors manually, you need
to set up *one* instance of each shared-state actor, then create
separate *sessions* for each instance of the choreography that you
want to run.
Example with hosts `alice.net` and `bob.net`:

Here is an example with two buyers trying to buy the same book:
Host `alice.net`:

```elixir
# Start up the buyers
b1 = spawn(MyBuyer, :init, [[]])
b2 = spawn(MyBuyer, :init, [[]])

# Start up the seller proxy with the initial shared
# state (the stock of books in this case)
{:ok, px} = GenServer.start(Chorex.Proxy, %{"Anathem" => 1})

# Start sessions: one for each buyer
Proxy.begin_session(px, [b1], MySellerBackend, :init, [])
config1 = %{Buyer => b1, Seller => px, :super => self()}
Chorex.start(BasicRemote.Chorex,
%{SockAlice => SockAliceImpl,
SockBob => {:remote, 4242, "bob.net", 4243}}, [])
```

Proxy.begin_session(px, [b2], MySellerBackend, :init, [])
config2 = %{Buyer => b2, Seller => px, :super => self()}
Host `bob.net`:

# Send everyone their configuration
send(b1, {:config, config1})
send(px, {:chorex, b1, {:config, config1}})
send(b2, {:config, config2})
send(px, {:chorex, b2, {:config, config2}})
```elixir
Chorex.start(BasicRemote.Chorex,
%{SockAlice => {:remote, 4243, "alice.net", 4242},
SockBob => SockBobImpl}, [])
```

The `Proxy.begin_sesion` function takes a proxy function, a list of
PIDs that partake in a given session, and a module, function, arglist
for the thing to proxy.

**Sessions**: PIDs belonging to a session will have their messages
routed to the corresponding proxied process. The GenServer looks up
which session a PID belongs to, finds the proxied process linked to
that session, then forwards the message to that process. The exact
mechanisms of how this works may change in the future to accommodate
restarts.

When you send the config information to a proxied process, you send it
through the proxy first, and you must wrap the message as shown above
with a process from the session you want to target as the second
element in the tuple; this just helps the proxy figure out the session
you want.

That's it! If you run the above choreography, the process that kicks
this all off will get *one* message like `{:chorex_return, Buyer, :book_get}`
and *one* message like `{:chorex_return, Buyer, :darn_missed_it}`,
indicating that exactly one of the buyers got the coveted book.
**WARNING** this transport is *experimental* and not guaranteed to
work. We've had issues with message delivery during testing. PRs welcome!
"""

import WriterMonad
import Utils
alias Chorex.Proxy

@typedoc """
A tuple describing where to find a remote host. The `Chorex.start/3`
function takes this and spins up proxies as needed to manage the connection.

```elixir
{:remote, listen_socket :: integer(), remote_host :: binary(), remote_port :: integer()}
```
"""
@type remote_actor_ref() :: {:remote, integer(), binary(), integer()}

@doc """
Start a choreography.

Takes a choreography module like `MyCoolThing.Chorex`, a map from
actor names to implementing modules, and a list of arguments to pass
to the `run` function.

Values in the map are either modules or `remote_actor_ref()` tuples.

## Example

```elixir
Expand All @@ -407,6 +393,9 @@ defmodule Chorex do
[])
```
"""
@spec start(module(), %{atom() => module() | remote_actor_ref()}, [
any()
]) :: any()
def start(chorex_module, actor_impl_map, init_args) do
actor_list = chorex_module.get_actors()

Expand All @@ -418,31 +407,62 @@ defmodule Chorex do
{a, {backend_module, proxy_pid}}

a when is_atom(a) ->
pid = spawn(actor_impl_map[a], :init, [init_args])
{a, pid}
case actor_impl_map[a] do
{:remote, lport, rhost, rport} ->
{a, {:remote, lport, rhost, rport}}

m when is_atom(a) ->
pid = spawn(m, :init, [init_args])
{a, pid}
end
end
end
|> Enum.into(%{})

# Gather up actors that need remote proxies
remotes =
pre_config
|> Enum.flat_map(fn
{_k, {:remote, _, _, _} = r} -> [r]
_ -> []
end)
|> Enum.into(MapSet.new())

remote_proxies =
for {:remote, lport, rhost, rport} = proxy_desc <- remotes do
{:ok, proxy_pid} =
GenServer.start(Chorex.SocketProxy, %{
listen_port: lport,
remote_host: rhost,
remote_port: rport
})

{proxy_desc, proxy_pid}
end
|> Enum.into(%{})

session_token = UUID.uuid4()

config =
pre_config
|> Enum.map(fn
{a, {_backend_module, proxy_pid}} -> {a, proxy_pid}
{a, {:remote, _, _, _} = r_desc} -> {a, remote_proxies[r_desc]}
{a, pid} -> {a, pid}
end)
|> Enum.into(%{})
|> Map.put(:super, self())
|> Map.put(:session_token, session_token)

for actor_desc <- actor_list do
case actor_desc do
{a, :singleton} ->
{backend_module, px} = pre_config[a]
session_pids = Map.values(config)
Proxy.begin_session(px, session_pids, backend_module, :init, [init_args])
send(px, {:chorex, Enum.at(session_pids, 0), {:config, config}})
Proxy.begin_session(px, session_token, backend_module, :init, [init_args])
send(px, {:chorex, session_token, :meta, {:config, config}})

a when is_atom(a) ->
send(config[a], {:config, config})
send(config[a], {:chorex, session_token, :meta, {:config, config}})
end
end
end
Expand Down Expand Up @@ -538,13 +558,12 @@ defmodule Chorex do

defmodule unquote(actor) do
unquote_splicing(callbacks)
import unquote(Chorex.Proxy), only: [send_proxied: 2]

# impl is the name of a module implementing this behavior
# args whatever was passed as the third arg to Chorex.start
def init(impl, args) do
receive do
{:config, config} ->
{:chorex, session_token, :meta, {:config, config}} ->
ret = apply(__MODULE__, :run, [impl, config | args])
send(config[:super], {:chorex_return, unquote(actor), ret})
end
Expand Down Expand Up @@ -647,11 +666,15 @@ defmodule Chorex do

{^label, _} ->
# check: is this a singleton I'm talking to?
send_func = if Enum.member?(ctx.singletons, actor2), do: :send_proxied, else: :send

return(
quote do
unquote(send_func)(config[unquote(actor2)], unquote(sender_exp))
tok = config[:session_token]

send(
config[unquote(actor2)],
{:chorex, tok, unquote(actor1), unquote(actor2), unquote(sender_exp)}
)
end
)

Expand All @@ -661,9 +684,17 @@ defmodule Chorex do
# nil when I'm expanding the real thing.
return(
quote do
tok = config[:session_token]

unquote(recver_exp) =
receive do
msg -> msg
{:chorex, ^tok, unquote(actor1), unquote(actor2), msg} ->
msg

m ->
IO.inspect(m, label: "#{inspect(self())} got unexpected message")
IO.inspect(tok, label: "tok")
42
end
end
)
Expand Down Expand Up @@ -863,13 +894,13 @@ defmodule Chorex do

case {sender, dest} do
{^label, _} ->
send_func = if Enum.member?(ctx.singletons, dest), do: :send_proxied, else: :send

return(
quote do
unquote(send_func)(
tok = config[:session_token]

send(
config[unquote(dest)],
{:choice, unquote(sender), unquote(choice)}
{:choice, tok, unquote(sender), unquote(dest), unquote(choice)}
)

unquote(cont_)
Expand All @@ -879,8 +910,10 @@ defmodule Chorex do
{_, ^label} ->
return(
quote do
tok = config[:session_token]

receive do
{:choice, unquote(sender), unquote(choice)} ->
{:choice, ^tok, unquote(sender), unquote(dest), unquote(choice)} ->
unquote(cont_)
end
end
Expand Down Expand Up @@ -1231,37 +1264,49 @@ defmodule Chorex do
end

def merge_step(
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, L]}], l_branch]}]]]},
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, R]}], r_branch]}]]]}
{:__block__, _,
[
{:=, _,
[{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} =
tok_get,
{:receive, _, _} = lhs_rcv
]},
{:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]}
) do
quote do
receive do
{:choice, unquote(agent), L} -> unquote(l_branch)
{:choice, unquote(agent), R} -> unquote(r_branch)
end
unquote(tok_get)
unquote(merge_step(lhs_rcv, rhs_rcv))
end
end

# flip order of branches
def merge_step(
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, R]}], r_branch]}]]]},
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, L]}], l_branch]}]]]}
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, L]}], l_branch]}]]]},
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, R]}], r_branch]}]]]}
) do
quote do
receive do
{:choice, unquote(agent), L} -> unquote(l_branch)
{:choice, unquote(agent), R} -> unquote(r_branch)
{:choice, unquote(tok), unquote(agent), unquote(dest), L} -> unquote(l_branch)
{:choice, unquote(tok), unquote(agent), unquote(dest), R} -> unquote(r_branch)
end
end
end

# flip order of branches
def merge_step(
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, _, _, _, R]}], _]}]]]} = rhs,
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, _, _, _, L]}], _]}]]]} = lhs
) do
merge_step(lhs, rhs)
end

# merge same branch
def merge_step(
{:receive, m1, [[do: [{:->, m2, [[{:{}, m3, [:choice, agent, dir]}], branch1]}]]]},
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, dir]}], branch2]}]]]}
{:receive, m1,
[[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]},
{:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, dir]}], branch2]}]]]}
) do
{:receive, m1,
[[do: [{:->, m2, [[{:{}, m3, [:choice, agent, dir]}], merge(branch1, branch2)]}]]]}
[[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], merge(branch1, branch2)]}]]]}
end

def merge_step(x, y) do
Expand Down
Loading
Loading