From 9d7e1d1afe7db359795d1e9c1ca3aae276f82f94 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Mon, 21 Oct 2024 11:59:56 -0600 Subject: [PATCH 01/19] Start work on HTTP messaging backend --- lib/chorex/transport.ex | 28 ++++++++++++++++++++++++++++ lib/chorex/transport/http.ex | 5 +++++ 2 files changed, 33 insertions(+) create mode 100644 lib/chorex/transport.ex create mode 100644 lib/chorex/transport/http.ex diff --git a/lib/chorex/transport.ex b/lib/chorex/transport.ex new file mode 100644 index 0000000..7a8d822 --- /dev/null +++ b/lib/chorex/transport.ex @@ -0,0 +1,28 @@ +defmodule Chorex.Transport do + @moduledoc """ + Generalized message sending backend. + """ + + # Idea: generalize the message sending/receiving and have an API + # that different backends can implement. + + use GenServer + + def init(_) do + # HELP! I need queue semantics, not stack semantics! + {:ok, %{inbox: [], outbox: []}} + end + + def handle_call({{:send, msg}, _sender, %{outbox: ob} = state}) do + send(self(), :process_outbox) + {:reply, :ok, %{state | outbox: ob ++ [msg]}} + end + + def handle_info(:process_outbox, %{outbox: []} = state), + do: {:noreply, state} + + def handle_info(:process_outbox, %{outbox: ob} = state) do + # FIXME: send everything in `ob` + {:noreply, %{state | outbox: []}} + end +end diff --git a/lib/chorex/transport/http.ex b/lib/chorex/transport/http.ex new file mode 100644 index 0000000..224297e --- /dev/null +++ b/lib/chorex/transport/http.ex @@ -0,0 +1,5 @@ +defmodule Chorex.Transport.Http do + @moduledoc """ + HTTP message transport for choreographies. + """ +end From 06a634b230b438100490a704e341acfbcec6d2cc Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Thu, 24 Oct 2024 11:52:47 -0600 Subject: [PATCH 02/19] WIP building TCP communication proxy --- lib/chorex/socket_listener.ex | 25 +++++++++++++++ lib/chorex/socket_proxy.ex | 46 ++++++++++++++++++++++++++++ lib/chorex/tcp/accepter_pool_impl.ex | 16 ++++++++++ lib/chorex/tcp/client_impl.ex | 16 ++++++++++ lib/chorex/tcp/handler_chor.ex | 15 +++++++++ lib/chorex/tcp/handler_impl.ex | 7 +++++ lib/chorex/tcp/listener_chor.ex | 15 +++++++++ lib/chorex/tcp/listener_impl.ex | 28 +++++++++++++++++ lib/chorex/transport.ex | 20 +++++------- lib/chorex/transport/backend.ex | 4 +++ lib/chorex/transport/http.ex | 8 +++++ 11 files changed, 188 insertions(+), 12 deletions(-) create mode 100644 lib/chorex/socket_listener.ex create mode 100644 lib/chorex/socket_proxy.ex create mode 100644 lib/chorex/tcp/accepter_pool_impl.ex create mode 100644 lib/chorex/tcp/client_impl.ex create mode 100644 lib/chorex/tcp/handler_chor.ex create mode 100644 lib/chorex/tcp/handler_impl.ex create mode 100644 lib/chorex/tcp/listener_chor.ex create mode 100644 lib/chorex/tcp/listener_impl.ex create mode 100644 lib/chorex/transport/backend.ex diff --git a/lib/chorex/socket_listener.ex b/lib/chorex/socket_listener.ex new file mode 100644 index 0000000..75ff80e --- /dev/null +++ b/lib/chorex/socket_listener.ex @@ -0,0 +1,25 @@ +defmodule Chorex.SocketListener do + use GenServer + + def init(%{listen_port: port, notify: parent}) do + GenServer.cast(self(), {:listen, port}) + {:ok, %{parent: parent}} + end + + def handle_cast({:listen, port}, %{parent: parent}) do + {:ok, socket} = listen(port) + GenServer.cast(parent, {:got_knock, socket}) + {:noreply, %{}} + end + + def listen(port) do + default_options = [ + backlog: 1024, + nodelay: true, + send_timeout: 30_000, + send_timeout_close: true, + reuseaddr: true + ] + :gen_tcp.listen(port, default_options) + end +end diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex new file mode 100644 index 0000000..fc4bcb4 --- /dev/null +++ b/lib/chorex/socket_proxy.ex @@ -0,0 +1,46 @@ +defmodule Chorex.SocketProxy do + @moduledoc """ + Socket proxy + """ + use GenServer + + def init(%{listen_port: lport, remote_host: rhost, remote_port: rport}) do + child = GenServer.start_link(Chorex.SocketListener, %{listen_port: lport, notify: self()}) + timer_ref = Process.send_after(self(), {:try_connect, rhost, rport}, 10) + {:ok, {:not_ready, %{listener: child, knock_timer: timer_ref}}} + end + + def handle_call({:new_session, _}, _caller, {:not_ready, _} = state), + do: {:reply, {:error, :waiting}, state} + + def handle_call({:new_session, token}, _caller, state) do + {:reply, :ok, state} + end + + def handle_cast({:tcp_recv, msg}, state) do + # TODO: notify listeners for this session + {:noreply, state} + end + + def handle_cast({:got_knock, socket}, {:not_ready, st}) do + Process.cancel_timer(st.knock_timer) + Process.exit(st.listener, :connected) + {:noreply, {:ready, socket}} + end + + def handle_cast({:got_knock, socket}, {:ready, st}) do + # uh oh... race condition + end + + def handle_info({:try_connect, host, port}, {:not_ready, st}) do + case :gen_tcp.connect(host, port, [], 1_000) do + {:ok, socket} -> + Process.exit(st.listener, :connected) + {:noreply, {:ready, socket}} + + {:error, _} -> + new_timer = Process.send_after(self(), {:try_connect, host, port}, 10) + {:noreply, {:not_ready, %{st | knock_timer: new_timer}}} + end + end +end diff --git a/lib/chorex/tcp/accepter_pool_impl.ex b/lib/chorex/tcp/accepter_pool_impl.ex new file mode 100644 index 0000000..2808072 --- /dev/null +++ b/lib/chorex/tcp/accepter_pool_impl.ex @@ -0,0 +1,16 @@ +defmodule Chorex.Tcp.AccepterPoolImpl do + use Chorex.Tcp.ListenerChor.Chorex, :accepterpool + + def accept_and_handle_connection(listen_socket) do + IO.inspect(listen_socket, label: "[accepter_pool] socket") + + {:ok, socket} = :gen_tcp.accept(listen_socket) + + # startup instance of the handler choreography + Chorex.start( + Tcp.HandlerChor.Chorex, + %{Handler => Tcp.HandlerImpl, TcpClient => Tcp.ClientImpl}, + [%{}, socket] + ) + end +end diff --git a/lib/chorex/tcp/client_impl.ex b/lib/chorex/tcp/client_impl.ex new file mode 100644 index 0000000..a70f53c --- /dev/null +++ b/lib/chorex/tcp/client_impl.ex @@ -0,0 +1,16 @@ +defmodule Chorex.Tcp.ClientImpl do + use Chorex.Tcp.HandlerChor.Chorex, :tcpclient + + def read(sock) do + :gen_tcp.recv(sock, 0) # 0 = all available bytes + end + + def send_over_socket(sock, msg) do + IO.inspect(msg, label: "[client] msg") + :gen_tcp.send(sock, msg) + end + + def shutdown(sock) do + :gen_tcp.close(sock) + end +end diff --git a/lib/chorex/tcp/handler_chor.ex b/lib/chorex/tcp/handler_chor.ex new file mode 100644 index 0000000..252f872 --- /dev/null +++ b/lib/chorex/tcp/handler_chor.ex @@ -0,0 +1,15 @@ +defmodule Chorex.Tcp.HandlerChor do + import Chorex + + defchor [Handler, TcpClient] do + def run(Handler.(config), TcpClient.(sock)) do + loop(Handler.(config), TcpClient.(sock)) + end + + def loop(Handler.(config), TcpClient.(sock)) do + TcpClient.read(sock) ~> Handler.(msg) + Handler.process(msg, config) + loop(Handler.(config), TcpClient.(sock)) + end + end +end diff --git a/lib/chorex/tcp/handler_impl.ex b/lib/chorex/tcp/handler_impl.ex new file mode 100644 index 0000000..7a8322c --- /dev/null +++ b/lib/chorex/tcp/handler_impl.ex @@ -0,0 +1,7 @@ +defmodule Chorex.Tcp.HandlerImpl do + use Chorex.Tcp.HandlerChor.Chorex, :handler + + def process(msg, config) do + :ok + end +end diff --git a/lib/chorex/tcp/listener_chor.ex b/lib/chorex/tcp/listener_chor.ex new file mode 100644 index 0000000..3f5bf4c --- /dev/null +++ b/lib/chorex/tcp/listener_chor.ex @@ -0,0 +1,15 @@ +defmodule Chorex.Tcp.ListenerChor do + import Chorex + + defchor [Listener, AccepterPool] do + def run(Listener.(config)) do + Listener.get_listener_socket(config) ~> AccepterPool.({:ok, socket}) + loop(AccepterPool.(socket)) + end + + def loop(AccepterPool.(listen_socket)) do + AccepterPool.accept_and_handle_connection(listen_socket) + loop(AccepterPool.(listen_socket)) + end + end +end diff --git a/lib/chorex/tcp/listener_impl.ex b/lib/chorex/tcp/listener_impl.ex new file mode 100644 index 0000000..0c2d31d --- /dev/null +++ b/lib/chorex/tcp/listener_impl.ex @@ -0,0 +1,28 @@ +defmodule Chorex.Tcp.ListenerImpl do + use Chorex.Tcp.ListenerChor.Chorex, :listener + + @hardcoded_options [mode: :binary, active: false] + + def get_listener_socket(config) do + default_options = [ + backlog: 1024, + nodelay: true, + send_timeout: 30_000, + send_timeout_close: true, + reuseaddr: true + ] + + opts = + Enum.uniq_by( + @hardcoded_options ++ config[:user_options] ++ default_options, + fn + {key, _} when is_atom(key) -> key + key when is_atom(key) -> key + end + ) + + # Hopefully returns {:ok, :inet.socket()} + :gen_tcp.listen(config[:port], opts) + |> IO.inspect(label: "listener socket") + end +end diff --git a/lib/chorex/transport.ex b/lib/chorex/transport.ex index 7a8d822..7e22c85 100644 --- a/lib/chorex/transport.ex +++ b/lib/chorex/transport.ex @@ -3,26 +3,22 @@ defmodule Chorex.Transport do Generalized message sending backend. """ - # Idea: generalize the message sending/receiving and have an API - # that different backends can implement. + alias __MODULE__ use GenServer - def init(_) do - # HELP! I need queue semantics, not stack semantics! - {:ok, %{inbox: [], outbox: []}} + @spec init(backend :: Transport.Backend.t()) :: {:ok, any()} + def init(backend) do + {:ok, %{backend: backend, inbox: :queue.new(), outbox: :queue.new()}} end def handle_call({{:send, msg}, _sender, %{outbox: ob} = state}) do send(self(), :process_outbox) - {:reply, :ok, %{state | outbox: ob ++ [msg]}} + {:reply, :ok, %{state | outbox: :queue.snoc(ob, msg)}} end - def handle_info(:process_outbox, %{outbox: []} = state), - do: {:noreply, state} - - def handle_info(:process_outbox, %{outbox: ob} = state) do - # FIXME: send everything in `ob` - {:noreply, %{state | outbox: []}} + def handle_info(:process_outbox, %{backend: backend, outbox: ob} = state) do + leftovers = Transport.Backend.send_msg(backend, :queue.to_list(ob)) + {:noreply, %{state | outbox: :queue.from_list(leftovers)}} end end diff --git a/lib/chorex/transport/backend.ex b/lib/chorex/transport/backend.ex new file mode 100644 index 0000000..dfe710b --- /dev/null +++ b/lib/chorex/transport/backend.ex @@ -0,0 +1,4 @@ +defprotocol Chorex.Transport.Backend do + @spec send_msg(t, any()) :: [any()] + def send_msg(t, msg) +end diff --git a/lib/chorex/transport/http.ex b/lib/chorex/transport/http.ex index 224297e..4199b22 100644 --- a/lib/chorex/transport/http.ex +++ b/lib/chorex/transport/http.ex @@ -2,4 +2,12 @@ defmodule Chorex.Transport.Http do @moduledoc """ HTTP message transport for choreographies. """ + + defstruct [:host, :port, :socket] + + defimpl Chorex.Transport.Backend, for: __MODULE__ do + def send_msg(self, msgs) do + + end + end end From 9e705b8edb1a464bd867c5077d60a1fe25319187 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Thu, 24 Oct 2024 14:03:45 -0600 Subject: [PATCH 03/19] New draft of the SocketProxy module --- lib/chorex/socket_listener.ex | 13 ++++-- lib/chorex/socket_proxy.ex | 84 +++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/lib/chorex/socket_listener.ex b/lib/chorex/socket_listener.ex index 75ff80e..8092369 100644 --- a/lib/chorex/socket_listener.ex +++ b/lib/chorex/socket_listener.ex @@ -6,10 +6,17 @@ defmodule Chorex.SocketListener do {:ok, %{parent: parent}} end - def handle_cast({:listen, port}, %{parent: parent}) do + def handle_cast({:listen, port}, state) do {:ok, socket} = listen(port) - GenServer.cast(parent, {:got_knock, socket}) - {:noreply, %{}} + GenServer.cast(self(), :listen_loop) + {:noreply, %{notify: state.notify, socket: socket}} + end + + def handle_cast(:listen_loop, state) do + {:ok, bytes} = :gen_tcp.recv(state.socket, 0) # 0 = all bytes + term = :erlang.binary_to_term(bytes) + GenServer.cast(state.notify, {:tcp_recv, term}) + {:noreply, state} end def listen(port) do diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index fc4bcb4..a7dc16b 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -2,45 +2,81 @@ defmodule Chorex.SocketProxy do @moduledoc """ Socket proxy """ + require Logger use GenServer - def init(%{listen_port: lport, remote_host: rhost, remote_port: rport}) do - child = GenServer.start_link(Chorex.SocketListener, %{listen_port: lport, notify: self()}) - timer_ref = Process.send_after(self(), {:try_connect, rhost, rport}, 10) - {:ok, {:not_ready, %{listener: child, knock_timer: timer_ref}}} + @type config_map :: %{ + listen_port: integer(), + remote_host: binary(), + remote_port: integer() + } + + @type state :: %{ + out_socket: nil | :inet.socket(), + out_queue: :queue.queue(), + in_listener: pid(), + config: config_map() + } + + @spec init(config_map()) :: {:ok, state()} + def init(%{listen_port: lport, remote_host: _rhost, remote_port: _rport} = config) do + {:ok, in_listener} = + GenServer.start_link(Chorex.SocketListener, %{listen_port: lport, notify: self()}) + + send(self(), :try_connect) + {:ok, %{out_socket: nil, out_queue: :queue.new(), in_listener: in_listener, config: config}} end - def handle_call({:new_session, _}, _caller, {:not_ready, _} = state), - do: {:reply, {:error, :waiting}, state} + def handle_info(:try_connect, %{out_socket: nil} = state) do + # 500 = timeout in milliseconds + case :gen_tcp.connect(state.config.host, state.config.port, [], 500) do + {:ok, socket} -> + send(self(), :flush_queue) + {:noreply, %{state | out_socket: socket}} + + {:error, _} -> + send(self(), :try_connect) + {:noreply, state} + end + end - def handle_call({:new_session, token}, _caller, state) do - {:reply, :ok, state} + def handle_info(:flush_queue, state) do + Process.send_after(self(), :flush_queue, 1_000) # reschedule send + if :queue.is_empty(state.out_queue) do + {:noreply, state} + else + {:noreply, %{state | out_queue: send_until_empty(state)}} + end end def handle_cast({:tcp_recv, msg}, state) do - # TODO: notify listeners for this session {:noreply, state} end - def handle_cast({:got_knock, socket}, {:not_ready, st}) do - Process.cancel_timer(st.knock_timer) - Process.exit(st.listener, :connected) - {:noreply, {:ready, socket}} + def handle_cast({:tcp_send, msg}, state) do + bytes = :erlang.term_to_binary(msg) + send(self(), :flush_queue) + {:noreply, %{state | out_queue: :queue.snoc(state.out_queue, bytes)}} end - def handle_cast({:got_knock, socket}, {:ready, st}) do - # uh oh... race condition + @spec send_until_empty(state()) :: :queue.queue() + def send_until_empty(%{out_queue: q, out_socket: nil}) do + # No connection; don't do anything + q end - def handle_info({:try_connect, host, port}, {:not_ready, st}) do - case :gen_tcp.connect(host, port, [], 1_000) do - {:ok, socket} -> - Process.exit(st.listener, :connected) - {:noreply, {:ready, socket}} - - {:error, _} -> - new_timer = Process.send_after(self(), {:try_connect, host, port}, 10) - {:noreply, {:not_ready, %{st | knock_timer: new_timer}}} + def send_until_empty(%{out_queue: q, out_socket: socket} = state) do + case :queue.out(q) do + {{:value, m}, new_queue} -> + with :ok <- :gen_tcp.send(socket, m) do + send_until_empty(%{state | out_queue: new_queue}) + else + {:error, e} -> + Logger.warning("[Chorex.SocketProxy] failed sending packet: #{inspect e}") + q + end + {:empty, mt_q} -> + mt_q end end end From c005f6977eab2468682aeaf75be1c419989de772 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Thu, 24 Oct 2024 15:59:40 -0600 Subject: [PATCH 04/19] Rudimentary two-way communication working --- lib/chorex/socket_listener.ex | 14 ++++++++------ lib/chorex/socket_proxy.ex | 30 +++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/lib/chorex/socket_listener.ex b/lib/chorex/socket_listener.ex index 8092369..31539eb 100644 --- a/lib/chorex/socket_listener.ex +++ b/lib/chorex/socket_listener.ex @@ -3,18 +3,20 @@ defmodule Chorex.SocketListener do def init(%{listen_port: port, notify: parent}) do GenServer.cast(self(), {:listen, port}) - {:ok, %{parent: parent}} + {:ok, %{notify: parent}} end def handle_cast({:listen, port}, state) do - {:ok, socket} = listen(port) - GenServer.cast(self(), :listen_loop) + {:ok, lsocket} = listen(port) + {:ok, socket} = :gen_tcp.accept(lsocket) {:noreply, %{notify: state.notify, socket: socket}} end - def handle_cast(:listen_loop, state) do - {:ok, bytes} = :gen_tcp.recv(state.socket, 0) # 0 = all bytes - term = :erlang.binary_to_term(bytes) + # Messages get sent here after the :gen_tcp.accept() + def handle_info({:tcp, _socket, data}, state) do + term = data + |> :erlang.list_to_binary() + |> :erlang.binary_to_term() GenServer.cast(state.notify, {:tcp_recv, term}) {:noreply, state} end diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index a7dc16b..d08f830 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -6,10 +6,10 @@ defmodule Chorex.SocketProxy do use GenServer @type config_map :: %{ - listen_port: integer(), - remote_host: binary(), - remote_port: integer() - } + listen_port: integer(), + remote_host: binary(), + remote_port: integer() + } @type state :: %{ out_socket: nil | :inet.socket(), @@ -28,10 +28,11 @@ defmodule Chorex.SocketProxy do end def handle_info(:try_connect, %{out_socket: nil} = state) do + host = if is_binary(state.config.remote_host), do: String.to_charlist(state.config.remote_host), else: state.config.remote_host # 500 = timeout in milliseconds - case :gen_tcp.connect(state.config.host, state.config.port, [], 500) do + case :gen_tcp.connect(host, state.config.remote_port, [], 500) do {:ok, socket} -> - send(self(), :flush_queue) + schedule_send() {:noreply, %{state | out_socket: socket}} {:error, _} -> @@ -41,7 +42,8 @@ defmodule Chorex.SocketProxy do end def handle_info(:flush_queue, state) do - Process.send_after(self(), :flush_queue, 1_000) # reschedule send + schedule_send(1_000) + if :queue.is_empty(state.out_queue) do {:noreply, state} else @@ -50,15 +52,24 @@ defmodule Chorex.SocketProxy do end def handle_cast({:tcp_recv, msg}, state) do + IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") {:noreply, state} end def handle_cast({:tcp_send, msg}, state) do bytes = :erlang.term_to_binary(msg) - send(self(), :flush_queue) + schedule_send() {:noreply, %{state | out_queue: :queue.snoc(state.out_queue, bytes)}} end + def schedule_send() do + send(self(), :flush_queue) + end + + def schedule_send(timeout) do + Process.send_after(self(), :flush_queue, timeout) + end + @spec send_until_empty(state()) :: :queue.queue() def send_until_empty(%{out_queue: q, out_socket: nil}) do # No connection; don't do anything @@ -72,9 +83,10 @@ defmodule Chorex.SocketProxy do send_until_empty(%{state | out_queue: new_queue}) else {:error, e} -> - Logger.warning("[Chorex.SocketProxy] failed sending packet: #{inspect e}") + Logger.warning("[Chorex.SocketProxy] failed sending packet: #{inspect(e)}") q end + {:empty, mt_q} -> mt_q end From 844ca07dae3931336e2647ae245c2e83933bf12a Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Thu, 24 Oct 2024 16:40:06 -0600 Subject: [PATCH 05/19] Cleanup old TCP modules --- lib/chorex/tcp/accepter_pool_impl.ex | 16 ---------------- lib/chorex/tcp/client_impl.ex | 16 ---------------- lib/chorex/tcp/handler_chor.ex | 15 --------------- lib/chorex/tcp/handler_impl.ex | 7 ------- lib/chorex/tcp/listener_chor.ex | 15 --------------- lib/chorex/tcp/listener_impl.ex | 28 ---------------------------- lib/chorex/transport.ex | 24 ------------------------ lib/chorex/transport/backend.ex | 4 ---- lib/chorex/transport/http.ex | 13 ------------- 9 files changed, 138 deletions(-) delete mode 100644 lib/chorex/tcp/accepter_pool_impl.ex delete mode 100644 lib/chorex/tcp/client_impl.ex delete mode 100644 lib/chorex/tcp/handler_chor.ex delete mode 100644 lib/chorex/tcp/handler_impl.ex delete mode 100644 lib/chorex/tcp/listener_chor.ex delete mode 100644 lib/chorex/tcp/listener_impl.ex delete mode 100644 lib/chorex/transport.ex delete mode 100644 lib/chorex/transport/backend.ex delete mode 100644 lib/chorex/transport/http.ex diff --git a/lib/chorex/tcp/accepter_pool_impl.ex b/lib/chorex/tcp/accepter_pool_impl.ex deleted file mode 100644 index 2808072..0000000 --- a/lib/chorex/tcp/accepter_pool_impl.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule Chorex.Tcp.AccepterPoolImpl do - use Chorex.Tcp.ListenerChor.Chorex, :accepterpool - - def accept_and_handle_connection(listen_socket) do - IO.inspect(listen_socket, label: "[accepter_pool] socket") - - {:ok, socket} = :gen_tcp.accept(listen_socket) - - # startup instance of the handler choreography - Chorex.start( - Tcp.HandlerChor.Chorex, - %{Handler => Tcp.HandlerImpl, TcpClient => Tcp.ClientImpl}, - [%{}, socket] - ) - end -end diff --git a/lib/chorex/tcp/client_impl.ex b/lib/chorex/tcp/client_impl.ex deleted file mode 100644 index a70f53c..0000000 --- a/lib/chorex/tcp/client_impl.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule Chorex.Tcp.ClientImpl do - use Chorex.Tcp.HandlerChor.Chorex, :tcpclient - - def read(sock) do - :gen_tcp.recv(sock, 0) # 0 = all available bytes - end - - def send_over_socket(sock, msg) do - IO.inspect(msg, label: "[client] msg") - :gen_tcp.send(sock, msg) - end - - def shutdown(sock) do - :gen_tcp.close(sock) - end -end diff --git a/lib/chorex/tcp/handler_chor.ex b/lib/chorex/tcp/handler_chor.ex deleted file mode 100644 index 252f872..0000000 --- a/lib/chorex/tcp/handler_chor.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule Chorex.Tcp.HandlerChor do - import Chorex - - defchor [Handler, TcpClient] do - def run(Handler.(config), TcpClient.(sock)) do - loop(Handler.(config), TcpClient.(sock)) - end - - def loop(Handler.(config), TcpClient.(sock)) do - TcpClient.read(sock) ~> Handler.(msg) - Handler.process(msg, config) - loop(Handler.(config), TcpClient.(sock)) - end - end -end diff --git a/lib/chorex/tcp/handler_impl.ex b/lib/chorex/tcp/handler_impl.ex deleted file mode 100644 index 7a8322c..0000000 --- a/lib/chorex/tcp/handler_impl.ex +++ /dev/null @@ -1,7 +0,0 @@ -defmodule Chorex.Tcp.HandlerImpl do - use Chorex.Tcp.HandlerChor.Chorex, :handler - - def process(msg, config) do - :ok - end -end diff --git a/lib/chorex/tcp/listener_chor.ex b/lib/chorex/tcp/listener_chor.ex deleted file mode 100644 index 3f5bf4c..0000000 --- a/lib/chorex/tcp/listener_chor.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule Chorex.Tcp.ListenerChor do - import Chorex - - defchor [Listener, AccepterPool] do - def run(Listener.(config)) do - Listener.get_listener_socket(config) ~> AccepterPool.({:ok, socket}) - loop(AccepterPool.(socket)) - end - - def loop(AccepterPool.(listen_socket)) do - AccepterPool.accept_and_handle_connection(listen_socket) - loop(AccepterPool.(listen_socket)) - end - end -end diff --git a/lib/chorex/tcp/listener_impl.ex b/lib/chorex/tcp/listener_impl.ex deleted file mode 100644 index 0c2d31d..0000000 --- a/lib/chorex/tcp/listener_impl.ex +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Chorex.Tcp.ListenerImpl do - use Chorex.Tcp.ListenerChor.Chorex, :listener - - @hardcoded_options [mode: :binary, active: false] - - def get_listener_socket(config) do - default_options = [ - backlog: 1024, - nodelay: true, - send_timeout: 30_000, - send_timeout_close: true, - reuseaddr: true - ] - - opts = - Enum.uniq_by( - @hardcoded_options ++ config[:user_options] ++ default_options, - fn - {key, _} when is_atom(key) -> key - key when is_atom(key) -> key - end - ) - - # Hopefully returns {:ok, :inet.socket()} - :gen_tcp.listen(config[:port], opts) - |> IO.inspect(label: "listener socket") - end -end diff --git a/lib/chorex/transport.ex b/lib/chorex/transport.ex deleted file mode 100644 index 7e22c85..0000000 --- a/lib/chorex/transport.ex +++ /dev/null @@ -1,24 +0,0 @@ -defmodule Chorex.Transport do - @moduledoc """ - Generalized message sending backend. - """ - - alias __MODULE__ - - use GenServer - - @spec init(backend :: Transport.Backend.t()) :: {:ok, any()} - def init(backend) do - {:ok, %{backend: backend, inbox: :queue.new(), outbox: :queue.new()}} - end - - def handle_call({{:send, msg}, _sender, %{outbox: ob} = state}) do - send(self(), :process_outbox) - {:reply, :ok, %{state | outbox: :queue.snoc(ob, msg)}} - end - - def handle_info(:process_outbox, %{backend: backend, outbox: ob} = state) do - leftovers = Transport.Backend.send_msg(backend, :queue.to_list(ob)) - {:noreply, %{state | outbox: :queue.from_list(leftovers)}} - end -end diff --git a/lib/chorex/transport/backend.ex b/lib/chorex/transport/backend.ex deleted file mode 100644 index dfe710b..0000000 --- a/lib/chorex/transport/backend.ex +++ /dev/null @@ -1,4 +0,0 @@ -defprotocol Chorex.Transport.Backend do - @spec send_msg(t, any()) :: [any()] - def send_msg(t, msg) -end diff --git a/lib/chorex/transport/http.ex b/lib/chorex/transport/http.ex deleted file mode 100644 index 4199b22..0000000 --- a/lib/chorex/transport/http.ex +++ /dev/null @@ -1,13 +0,0 @@ -defmodule Chorex.Transport.Http do - @moduledoc """ - HTTP message transport for choreographies. - """ - - defstruct [:host, :port, :socket] - - defimpl Chorex.Transport.Backend, for: __MODULE__ do - def send_msg(self, msgs) do - - end - end -end From bd268ac9e47e7d93940d6213b09ba7df6771bb46 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Fri, 25 Oct 2024 17:00:28 -0600 Subject: [PATCH 06/19] Wrote test for remote nodes that I would *like* to work --- lib/chorex/socket_proxy.ex | 5 ++- test/chorex/socket_proxy_test.exs | 53 +++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 test/chorex/socket_proxy_test.exs diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index d08f830..d1176ef 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -52,7 +52,7 @@ defmodule Chorex.SocketProxy do end def handle_cast({:tcp_recv, msg}, state) do - IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") + # IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") {:noreply, state} end @@ -79,11 +79,14 @@ defmodule Chorex.SocketProxy do def send_until_empty(%{out_queue: q, out_socket: socket} = state) do case :queue.out(q) do {{:value, m}, new_queue} -> + # IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} trying to send packet") with :ok <- :gen_tcp.send(socket, m) do + # IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} sent packet") send_until_empty(%{state | out_queue: new_queue}) else {:error, e} -> Logger.warning("[Chorex.SocketProxy] failed sending packet: #{inspect(e)}") + # IO.inspect(m, label: "sending") q end diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs new file mode 100644 index 0000000..64b7923 --- /dev/null +++ b/test/chorex/socket_proxy_test.exs @@ -0,0 +1,53 @@ +defmodule Chorex.SocketProxyTest do + use ExUnit.Case + + defmodule BasicRemote do + import Chorex + + defchor [Alice, Bob] do + def run(Alice.(report), Bob.(report)) do + Alice.("hello") ~> Bob.(m1) + Alice.("there") ~> Bob.(m2) + Alice.("bob") ~> Bob.(m3) + Bob.([m1, m2, m3]) ~> Alice.(message) + Alice.(send(report, {:done, message})) + Bob.(send(report, {:done, message})) # this should be an error + end + end + end + + defmodule AliceImpl do + use BasicRemote.Chorex, :alice + end + + defmodule BobImpl do + use BasicRemote.Chorex, :bob + end + + test "basic proxy works" do + alice_receiver = Task.async(fn -> + m = receive do + x -> x + end + m + end) + + bob_receiver = Task.async(fn -> + m = receive do + x -> x + end + m + end) + + Chorex.start(BasicRemote.Chorex, + %{Alice => AliceImpl, + Bob => {:remote, 4242, "localhost", 4243}}, [alice_receiver, nil]) + + Chorex.start(BasicRemote.Chorex, + %{Alice => {:remote, 4243, "localhost", 4242}, + Bob => BobImpl}, [nil, bob_receiver]) + + assert Task.await(alice_receiver) == ["hello", "there", "bob"] + assert Task.await(bob_receiver) == "whatever" + end +end From 98483b032dd8a2b25509f5f81057b56d496cae85 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Fri, 25 Oct 2024 17:08:43 -0600 Subject: [PATCH 07/19] Tweaks to tests; start on documentation --- lib/chorex.ex | 8 ++++++++ test/chorex/socket_proxy_test.exs | 7 ++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index f8a2a77..c64b8f1 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -399,6 +399,14 @@ defmodule Chorex do actor names to implementing modules, and a list of arguments to pass to the `run` function. + Values in the map are either modules or a tuple like + + ```elixir + {:remote, listen_socket :: integer(), remote_host :: binary(), remote_port :: integer()} + ``` + + # FIXME: document the proxy setup as well + ## Example ```elixir diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 64b7923..6a2248d 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -11,7 +11,7 @@ defmodule Chorex.SocketProxyTest do Alice.("bob") ~> Bob.(m3) Bob.([m1, m2, m3]) ~> Alice.(message) Alice.(send(report, {:done, message})) - Bob.(send(report, {:done, message})) # this should be an error + Bob.(send(report, {:done, "whatever"})) end end end @@ -25,6 +25,7 @@ defmodule Chorex.SocketProxyTest do end test "basic proxy works" do + # Spin up two tasks to collect responses alice_receiver = Task.async(fn -> m = receive do x -> x @@ -47,7 +48,7 @@ defmodule Chorex.SocketProxyTest do %{Alice => {:remote, 4243, "localhost", 4242}, Bob => BobImpl}, [nil, bob_receiver]) - assert Task.await(alice_receiver) == ["hello", "there", "bob"] - assert Task.await(bob_receiver) == "whatever" + assert {:done, ["hello", "there", "bob"]} = Task.await(alice_receiver) + assert {:done, "whatever"} = Task.await(bob_receiver) end end From 18b36207e4102bc9f9e0a97b21813e281ae996c0 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Mon, 28 Oct 2024 12:37:44 -0600 Subject: [PATCH 08/19] Some docs --- lib/chorex.ex | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index c64b8f1..0546a18 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -392,6 +392,16 @@ defmodule Chorex do import Utils alias Chorex.Proxy + @typedoc """ + A tuple describing where to find a remote host. The `Chorex.start/3` + function takes this and spins up proxies as needed to manage the connection. + + ```elixir + {:remote, listen_socket :: integer(), remote_host :: binary(), remote_port :: integer()} + ``` + """ + @type remote_actor_ref() :: {:remote, integer(), binary(), integer()} + @doc """ Start a choreography. @@ -399,13 +409,7 @@ defmodule Chorex do actor names to implementing modules, and a list of arguments to pass to the `run` function. - Values in the map are either modules or a tuple like - - ```elixir - {:remote, listen_socket :: integer(), remote_host :: binary(), remote_port :: integer()} - ``` - - # FIXME: document the proxy setup as well + Values in the map are either modules or `remote_actor_ref()` tuples. ## Example @@ -415,6 +419,9 @@ defmodule Chorex do []) ``` """ + @spec start(module(), %{atom() => module() | remote_actor_ref()}, [ + any() + ]) :: any() def start(chorex_module, actor_impl_map, init_args) do actor_list = chorex_module.get_actors() From bb32ed1ea41569f6404d65ffcf115eea29667a3e Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Mon, 28 Oct 2024 13:09:11 -0600 Subject: [PATCH 09/19] First draft of spinning up remote proxies --- lib/chorex.ex | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index 0546a18..20b19e8 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -433,16 +433,39 @@ defmodule Chorex do {a, {backend_module, proxy_pid}} a when is_atom(a) -> - pid = spawn(actor_impl_map[a], :init, [init_args]) - {a, pid} + case actor_impl_map[a] do + {:remote, lport, rhost, rport} -> + {a, {:remote, lport, rhost, rport}} + + m when is_atom(a) -> + pid = spawn(m, :init, [init_args]) + {a, pid} + end end end |> Enum.into(%{}) + # Gather up actors that need remote proxies + remotes = + pre_config + |> Enum.flat_map(fn + {_k, {:remote, _, _, _} = r} -> [r] + _ -> [] + end) + |> Enum.into(MapSet) + + remote_proxies = + for {:remote, lport, rhost, rport} = proxy_desc <- remotes do + {:ok, proxy_pid} = GenServer.start(Chorex.SocketProxy, %{listen_port: lport, remote_host: rhost, remote_port: rport}) + {proxy_desc, proxy_pid} + end + |> Enum.into(%{}) + config = pre_config |> Enum.map(fn {a, {_backend_module, proxy_pid}} -> {a, proxy_pid} + {a, {:remote, _, _, _} = r_desc} -> {a, remote_proxies[r_desc]} {a, pid} -> {a, pid} end) |> Enum.into(%{}) From e04e96009838c184607d83838406a96e6df5439d Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Mon, 28 Oct 2024 15:48:28 -0600 Subject: [PATCH 10/19] New sending schema in place; basic Chorex tests pass --- lib/chorex.ex | 79 +++++++++++++++++++++++++++----------------- lib/chorex/proxy.ex | 48 +++++++++------------------ mix.exs | 3 +- mix.lock | 1 + test/chorex_test.exs | 26 ++++++++++----- 5 files changed, 86 insertions(+), 71 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index 20b19e8..d7fc540 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -456,11 +456,19 @@ defmodule Chorex do remote_proxies = for {:remote, lport, rhost, rport} = proxy_desc <- remotes do - {:ok, proxy_pid} = GenServer.start(Chorex.SocketProxy, %{listen_port: lport, remote_host: rhost, remote_port: rport}) + {:ok, proxy_pid} = + GenServer.start(Chorex.SocketProxy, %{ + listen_port: lport, + remote_host: rhost, + remote_port: rport + }) + {proxy_desc, proxy_pid} end |> Enum.into(%{}) + session_token = UUID.uuid4() + config = pre_config |> Enum.map(fn @@ -470,17 +478,17 @@ defmodule Chorex do end) |> Enum.into(%{}) |> Map.put(:super, self()) + |> Map.put(:session_token, session_token) for actor_desc <- actor_list do case actor_desc do {a, :singleton} -> {backend_module, px} = pre_config[a] - session_pids = Map.values(config) - Proxy.begin_session(px, session_pids, backend_module, :init, [init_args]) - send(px, {:chorex, Enum.at(session_pids, 0), {:config, config}}) + Proxy.begin_session(px, session_token, backend_module, :init, [init_args]) + send(px, {:chorex, session_token, :meta, {:config, config}}) a when is_atom(a) -> - send(config[a], {:config, config}) + send(config[a], {:chorex, session_token, :meta, {:config, config}}) end end end @@ -576,13 +584,12 @@ defmodule Chorex do defmodule unquote(actor) do unquote_splicing(callbacks) - import unquote(Chorex.Proxy), only: [send_proxied: 2] # impl is the name of a module implementing this behavior # args whatever was passed as the third arg to Chorex.start def init(impl, args) do receive do - {:config, config} -> + {:chorex, session_token, :meta, {:config, config}} -> ret = apply(__MODULE__, :run, [impl, config | args]) send(config[:super], {:chorex_return, unquote(actor), ret}) end @@ -685,11 +692,15 @@ defmodule Chorex do {^label, _} -> # check: is this a singleton I'm talking to? - send_func = if Enum.member?(ctx.singletons, actor2), do: :send_proxied, else: :send return( quote do - unquote(send_func)(config[unquote(actor2)], unquote(sender_exp)) + tok = config[:session_token] + + send( + config[unquote(actor2)], + {:chorex, tok, unquote(actor1), unquote(actor2), unquote(sender_exp)} + ) end ) @@ -699,9 +710,11 @@ defmodule Chorex do # nil when I'm expanding the real thing. return( quote do + tok = config[:session_token] + unquote(recver_exp) = receive do - msg -> msg + {:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> msg end end ) @@ -901,13 +914,13 @@ defmodule Chorex do case {sender, dest} do {^label, _} -> - send_func = if Enum.member?(ctx.singletons, dest), do: :send_proxied, else: :send - return( quote do - unquote(send_func)( + tok = config[:session_token] + + send( config[unquote(dest)], - {:choice, unquote(sender), unquote(choice)} + {:choice, tok, unquote(sender), unquote(dest), unquote(choice)} ) unquote(cont_) @@ -917,8 +930,10 @@ defmodule Chorex do {_, ^label} -> return( quote do + tok = config[:session_token] + receive do - {:choice, unquote(sender), unquote(choice)} -> + {:choice, ^tok, unquote(sender), unquote(dest), unquote(choice)} -> unquote(cont_) end end @@ -1269,37 +1284,41 @@ defmodule Chorex do end def merge_step( - {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, L]}], l_branch]}]]]}, - {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, R]}], r_branch]}]]]} - ) do + {:__block__, _, [{:=, _, [{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} = tok_get, {:receive, _, _} = lhs_rcv]}, + {:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]}) do quote do - receive do - {:choice, unquote(agent), L} -> unquote(l_branch) - {:choice, unquote(agent), R} -> unquote(r_branch) - end + unquote(tok_get) + unquote(merge_step(lhs_rcv, rhs_rcv)) end end - # flip order of branches def merge_step( - {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, R]}], r_branch]}]]]}, - {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, L]}], l_branch]}]]]} + {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, L]}], l_branch]}]]]}, + {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, R]}], r_branch]}]]]} ) do quote do receive do - {:choice, unquote(agent), L} -> unquote(l_branch) - {:choice, unquote(agent), R} -> unquote(r_branch) + {:choice, unquote(tok), unquote(agent), unquote(dest), L} -> unquote(l_branch) + {:choice, unquote(tok), unquote(agent), unquote(dest), R} -> unquote(r_branch) end end end + # flip order of branches + def merge_step( + {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, _, _, _, R]}], _]}]]]} = rhs, + {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, _, _, _, L]}], _]}]]]} = lhs + ) do + merge_step(lhs, rhs) + end + # merge same branch def merge_step( - {:receive, m1, [[do: [{:->, m2, [[{:{}, m3, [:choice, agent, dir]}], branch1]}]]]}, - {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, agent, dir]}], branch2]}]]]} + {:receive, m1, [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]}, + {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, dir]}], branch2]}]]]} ) do {:receive, m1, - [[do: [{:->, m2, [[{:{}, m3, [:choice, agent, dir]}], merge(branch1, branch2)]}]]]} + [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], merge(branch1, branch2)]}]]]} end def merge_step(x, y) do diff --git a/lib/chorex/proxy.ex b/lib/chorex/proxy.ex index 4f30769..4a0b685 100644 --- a/lib/chorex/proxy.ex +++ b/lib/chorex/proxy.ex @@ -8,30 +8,26 @@ defmodule Chorex.Proxy do @type session_key :: term() @type session_state :: any() @type state :: %{ - pid_session: %{pid() => session_key()}, # This is the shared state session_data: any(), session_handler: %{session_key() => pid()} } def init(init_state) do - {:ok, %{pid_session: %{}, session_data: init_state, session_handler: %{}}} + {:ok, %{session_data: init_state, session_handler: %{}}} end def handle_call( - {:begin_session, pids, backend, backend_func, backend_args}, + {:begin_session, session_token, backend, backend_func, backend_args}, _caller, state ) do - # could replace with a UUID - session_key = :erlang.monotonic_time() + + # Create a backend handler for this session and remember {child, _child_ref} = spawn_monitor(backend, backend_func, backend_args) + new_state = put_in(state, [:session_handler, session_token], child) - pids - |> Enum.reduce(%{}, fn p, acc -> Map.put(acc, p, session_key) end) - |> then(&Map.update!(state, :pid_session, fn old -> Map.merge(old, &1) end)) - |> put_in([:session_handler, session_key], child) - |> then(&{:reply, :ok, &1}) + {:reply, :ok, new_state} end def handle_call({:set_state, new_state}, _caller, state) do @@ -53,16 +49,16 @@ defmodule Chorex.Proxy do end # Inject key :proxy into config for all proxied modules - def handle_info({:chorex, sender, {:config, config}}, state) when is_pid(sender) do - with {:ok, _key, session_handler} <- fetch_session(state, sender) do + def handle_info({:chorex, session_key, {:config, config}}, state) do + with {:ok, session_handler} <- fetch_session(state, session_key) do send(session_handler, {:config, Map.put(config, :proxy, self())}) end {:noreply, state} end - def handle_info({:chorex, sender, msg}, state) when is_pid(sender) do - with {:ok, _key, session_handler} <- fetch_session(state, sender) do + def handle_info({:chorex, session_key, msg}, state) do + with {:ok, session_handler} <- fetch_session(state, session_key) do # Forward to proxy send(session_handler, msg) end @@ -74,11 +70,10 @@ defmodule Chorex.Proxy do def handle_info({:DOWN, _, _, _, _}, state), do: {:noreply, state} # Fetch all session data for the associated PID - @spec fetch_session(state(), pid()) :: {:ok, session_key(), pid()} | :error - defp fetch_session(state, pid) do - with {:ok, session_key} <- Map.fetch(state[:pid_session], pid), - {:ok, handler} <- Map.fetch(state[:session_handler], session_key) do - {:ok, session_key, handler} + @spec fetch_session(state(), binary) :: {:ok, pid()} | :error + defp fetch_session(state, session_key) do + with {:ok, handler} <- Map.fetch(state[:session_handler], session_key) do + {:ok, handler} end end @@ -90,10 +85,10 @@ defmodule Chorex.Proxy do GenServer.call(proxy, {:set_state, new_state}) end - def begin_session(proxy, session_pids, proxy_module, start_func, start_args) do + def begin_session(proxy, session_token, proxy_module, start_func, start_args) do GenServer.call( proxy, - {:begin_session, session_pids, proxy_module, start_func, start_args} + {:begin_session, session_token, proxy_module, start_func, start_args} ) end @@ -110,15 +105,4 @@ defmodule Chorex.Proxy do def fetch_state(config) do GenServer.call(config[:proxy], :fetch_state) end - - @doc """ - Send a message to a proxied service. - - Handles the wrapping of the message with the `{:chorex, self(), ...}` - tuple so that the proxy knows which session to send the message on to. - """ - @spec send_proxied(pid(), any()) :: any() - def send_proxied(proxy_pid, msg) do - send(proxy_pid, {:chorex, self(), msg}) - end end diff --git a/mix.exs b/mix.exs index 3370569..3345080 100644 --- a/mix.exs +++ b/mix.exs @@ -25,7 +25,8 @@ defmodule Chorex.MixProject do defp deps do [ - {:ex_doc, "~> 0.31", only: :dev, runtime: false} + {:ex_doc, "~> 0.31", only: :dev, runtime: false}, + {:elixir_uuid, "~> 1.2"} ] end diff --git a/mix.lock b/mix.lock index ae885bb..80df3a4 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,6 @@ %{ "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, diff --git a/test/chorex_test.exs b/test/chorex_test.exs index 1431076..958e5cb 100644 --- a/test/chorex_test.exs +++ b/test/chorex_test.exs @@ -8,6 +8,7 @@ defmodule ChorexTest do def run() do Buyer.get_book_title() ~> Seller.(b) Seller.get_price("book:" <> b) ~> Buyer.(p) + Seller.(:done) Buyer.(p + 2) end end @@ -36,12 +37,13 @@ defmodule ChorexTest do ps = spawn(MySeller, :init, [[]]) pb = spawn(MyBuyer, :init, [[]]) - config = %{Seller => ps, Buyer => pb, :super => self()} + tok = UUID.uuid4() + config = %{Seller => ps, Buyer => pb, :session_token => tok, :super => self()} - send(ps, {:config, config}) - send(pb, {:config, config}) + send(ps, {:chorex, tok, :meta, {:config, config}}) + send(pb, {:chorex, tok, :meta, {:config, config}}) - assert_receive {:chorex_return, Seller, 40} + assert_receive {:chorex_return, Seller, :done} assert_receive {:chorex_return, Buyer, 42} end @@ -104,11 +106,19 @@ defmodule ChorexTest do pb1 = spawn(MyBuyer1, :init, [[]]) pb2 = spawn(MyBuyer2, :init, [[]]) - config = %{Seller1 => ps1, Buyer1 => pb1, Buyer2 => pb2, :super => self()} + tok = UUID.uuid4() - send(ps1, {:config, config}) - send(pb1, {:config, config}) - send(pb2, {:config, config}) + config = %{ + Seller1 => ps1, + Buyer1 => pb1, + Buyer2 => pb2, + :session_token => tok, + :super => self() + } + + send(ps1, {:chorex, tok, :meta, {:config, config}}) + send(pb1, {:chorex, tok, :meta, {:config, config}}) + send(pb2, {:chorex, tok, :meta, {:config, config}}) assert_receive {:chorex_return, Buyer1, ~D[2024-05-13]} end From 79b1829199b747a2b4c4f0b984fce6d3a8b2c1a1 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Tue, 29 Oct 2024 17:09:50 -0600 Subject: [PATCH 11/19] Get some of the proxy tests working --- lib/chorex/proxy.ex | 9 +- test/chorex/proxy_test.exs | 266 +++++++++++++++++++------------------ 2 files changed, 140 insertions(+), 135 deletions(-) diff --git a/lib/chorex/proxy.ex b/lib/chorex/proxy.ex index 4a0b685..f575c73 100644 --- a/lib/chorex/proxy.ex +++ b/lib/chorex/proxy.ex @@ -49,17 +49,18 @@ defmodule Chorex.Proxy do end # Inject key :proxy into config for all proxied modules - def handle_info({:chorex, session_key, {:config, config}}, state) do + def handle_info({:chorex, session_key, :meta, {:config, config}}, state) do with {:ok, session_handler} <- fetch_session(state, session_key) do - send(session_handler, {:config, Map.put(config, :proxy, self())}) + send(session_handler, {:chorex, session_key, :meta, {:config, Map.put(config, :proxy, self())}}) end {:noreply, state} end - def handle_info({:chorex, session_key, msg}, state) do + # Normal messages going to the proxy + def handle_info({:chorex, session_key, _sender, _receiver, _msg} = msg, state) do with {:ok, session_handler} <- fetch_session(state, session_key) do - # Forward to proxy + # Forward to handler send(session_handler, msg) end diff --git a/test/chorex/proxy_test.exs b/test/chorex/proxy_test.exs index 5e46003..0c47d43 100644 --- a/test/chorex/proxy_test.exs +++ b/test/chorex/proxy_test.exs @@ -8,75 +8,79 @@ defmodule Chorex.ProxyTest do # driver of the test driver_pid = receive do - m -> m + {:chorex, _tok, :test, Worker, pid} -> pid end send(driver_pid, :worker_here) m2 = receive do - m -> m + {:chorex, _tok, :test, Worker, m} -> m end send(driver_pid, {:got, m2}) end - def test_actor_comm(:start) do + def test_actor_comm(tok) do receive do - {:config, config} -> - test_actor_comm(config) + {:chorex, ^tok, :meta, {:config, config}} -> + test_actor_comm_go(config) end end - def test_actor_comm(config) do + def test_actor_comm_go(config) do + tok = config.session_token send(config[:super], {:chorex, Worker, config}) - send(config[Chorex.ProxyTest.Actor], {:from_worker, config[:proxy]}) + send(config[Chorex.ProxyTest.Actor], {:chorex, tok, Worker, Chorex.ProxyTest.Actor, {:from_worker, config[:proxy]}}) receive do - {"hello there", actor_pid} -> + {:chorex, ^tok, Chorex.ProxyTest.Actor, Worker, {"hello there", actor_pid}} -> send(config[:super], {:chorex, Worker, {:found_actor, actor_pid}}) end end end defmodule Actor do - def test_actor_comm(:start) do + def test_actor_comm(tok) do receive do - {:config, config} -> - test_actor_comm(config) + {:chorex, ^tok, :meta, {:config, config}} -> + test_actor_comm_go(config) end end - def test_actor_comm(config) do + def test_actor_comm_go(config) do + tok = config.session_token send(config[:super], {:chorex, Actor, config}) receive do - {:from_worker, the_proxy_pid} -> + {:chorex, ^tok, Chorex.ProxyTest.Worker, Actor, {:from_worker, the_proxy_pid}} -> send(config[:super], {:chorex, Actor, {:got_worker_proxy, the_proxy_pid}}) end - send_proxied(config[Chorex.ProxyTest.Worker], {"hello there", self()}) + send(config[Chorex.ProxyTest.Worker], {:chorex, tok, Actor, Worker, {"hello there", self()}}) end end test "proxy forwards messages" do {:ok, proxy} = GenServer.start(Chorex.Proxy, []) assert is_pid(proxy) - begin_session(proxy, [self()], Worker, :test_pingpong, []) - send_proxied(proxy, self()) + tok = UUID.uuid4() + begin_session(proxy, tok, Worker, :test_pingpong, []) + send(proxy, {:chorex, tok, :test, Worker, self()}) assert_receive :worker_here i = :rand.uniform(1_000_000) - send_proxied(proxy, i) + send(proxy, {:chorex, tok, :test, Worker, i}) assert_receive {:got, ^i} end test "proxy injects self into config" do {:ok, proxy} = GenServer.start(Chorex.Proxy, []) - a1 = spawn(Actor, :test_actor_comm, [:start]) - begin_session(proxy, [a1, self()], Worker, :test_actor_comm, [:start]) - config = %{Actor => a1, Worker => proxy, :super => self()} - send(a1, {:config, config}) - send_proxied(proxy, {:config, config}) + tok = UUID.uuid4() + a1 = spawn(Actor, :test_actor_comm, [tok]) + begin_session(proxy, tok, Worker, :test_actor_comm, [tok]) + config = %{Actor => a1, Worker => proxy, :super => self(), :session_token => tok} + send(a1, {:chorex, tok, :meta, {:config, config}}) + send(proxy, {:chorex, tok, :meta, {:config, config}}) assert_receive {:chorex, Actor, actor_config} refute Map.has_key?(actor_config, :proxy) @@ -85,113 +89,113 @@ defmodule Chorex.ProxyTest do assert_receive {:chorex, Worker, {:found_actor, ^a1}} end - test "sessions kept separate" do - {:ok, proxy} = GenServer.start(Chorex.Proxy, []) - a1 = spawn(Actor, :test_actor_comm, [:start]) - begin_session(proxy, [a1, self()], Worker, :test_actor_comm, [:start]) - config1 = %{Actor => a1, Worker => proxy, :super => self()} - send(a1, {:config, config1}) - send_proxied(proxy, {:config, config1}) - - assert_receive {:chorex, Actor, actor_config} - refute Map.has_key?(actor_config, :proxy) - assert_receive {:chorex, Worker, %{:proxy => ^proxy}} - assert_receive {:chorex, Actor, {:got_worker_proxy, ^proxy}} - assert_receive {:chorex, Worker, {:found_actor, ^a1}} - - a2 = spawn(Actor, :test_actor_comm, [:start]) - begin_session(proxy, [a2], Worker, :test_actor_comm, [:start]) - config2 = %{Actor => a2, Worker => proxy, :super => self()} - send(a2, {:config, config2}) - send(proxy, {:chorex, a2, {:config, config2}}) - - assert_receive {:chorex, Actor, actor_config} - refute Map.has_key?(actor_config, :proxy) - assert_receive {:chorex, Worker, worker_config} - assert %{proxy: ^proxy} = worker_config - # make sure we have the right actor here - assert %{Actor => ^a2} = worker_config - assert_receive {:chorex, Actor, {:got_worker_proxy, ^proxy}} - assert_receive {:chorex, Worker, {:found_actor, ^a2}} - end - - defmodule StateWorker do - def test_state(:start) do - receive do - {:config, config} -> test_state(config) - end - end - - def test_state(config) do - receive do - :incr -> - update_state(config, fn x -> {x + 1, x + 1} end) - test_state(config) - - :fetch -> - send(config[Chorex.ProxyTest.StateClient], {:final_count, fetch_state(config)}) - end - end - end - - defmodule StateClient do - def test_state(:start) do - receive do - {:config, config} -> test_state(config) - end - end - - def test_state(config) do - bump_times = - receive do - {:bump, n} -> n - end - - for _i <- 1..bump_times do - send_proxied(config[Chorex.ProxyTest.StateWorker], :incr) - end - - send_proxied(config[Chorex.ProxyTest.StateWorker], :fetch) - - receive do - {:final_count, c} -> - send(config[:super], {:got_count, c}) - end - end - end - - test "state shared" do - {:ok, proxy} = GenServer.start(Chorex.Proxy, 0) - - # First session - a1 = spawn(StateClient, :test_state, [:start]) - begin_session(proxy, [a1], StateWorker, :test_state, [:start]) - config1 = %{StateWorker => proxy, StateClient => a1, :super => self()} - send(a1, {:config, config1}) - send(proxy, {:chorex, a1, {:config, config1}}) - - # Second session - a2 = spawn(StateClient, :test_state, [:start]) - begin_session(proxy, [a2], StateWorker, :test_state, [:start]) - config2 = %{StateWorker => proxy, StateClient => a2, :super => self()} - send(a2, {:config, config2}) - send(proxy, {:chorex, a2, {:config, config2}}) - - send(a2, {:bump, 21}) - Process.sleep(1) - send(a1, {:bump, 21}) - - final1 = - receive do - {:got_count, n} -> n - end - - final2 = - receive do - {:got_count, n} -> n - end - - # WARNING: this is a little brittle but it's working - assert {21, 42} = {final1, final2} - end + # test "sessions kept separate" do + # {:ok, proxy} = GenServer.start(Chorex.Proxy, []) + # a1 = spawn(Actor, :test_actor_comm, [:start]) + # begin_session(proxy, [a1, self()], Worker, :test_actor_comm, [:start]) + # config1 = %{Actor => a1, Worker => proxy, :super => self()} + # send(a1, {:config, config1}) + # send_proxied(proxy, {:config, config1}) + + # assert_receive {:chorex, Actor, actor_config} + # refute Map.has_key?(actor_config, :proxy) + # assert_receive {:chorex, Worker, %{:proxy => ^proxy}} + # assert_receive {:chorex, Actor, {:got_worker_proxy, ^proxy}} + # assert_receive {:chorex, Worker, {:found_actor, ^a1}} + + # a2 = spawn(Actor, :test_actor_comm, [:start]) + # begin_session(proxy, [a2], Worker, :test_actor_comm, [:start]) + # config2 = %{Actor => a2, Worker => proxy, :super => self()} + # send(a2, {:config, config2}) + # send(proxy, {:chorex, a2, {:config, config2}}) + + # assert_receive {:chorex, Actor, actor_config} + # refute Map.has_key?(actor_config, :proxy) + # assert_receive {:chorex, Worker, worker_config} + # assert %{proxy: ^proxy} = worker_config + # # make sure we have the right actor here + # assert %{Actor => ^a2} = worker_config + # assert_receive {:chorex, Actor, {:got_worker_proxy, ^proxy}} + # assert_receive {:chorex, Worker, {:found_actor, ^a2}} + # end + + # defmodule StateWorker do + # def test_state(:start) do + # receive do + # {:config, config} -> test_state(config) + # end + # end + + # def test_state(config) do + # receive do + # :incr -> + # update_state(config, fn x -> {x + 1, x + 1} end) + # test_state(config) + + # :fetch -> + # send(config[Chorex.ProxyTest.StateClient], {:final_count, fetch_state(config)}) + # end + # end + # end + + # defmodule StateClient do + # def test_state(:start) do + # receive do + # {:config, config} -> test_state(config) + # end + # end + + # def test_state(config) do + # bump_times = + # receive do + # {:bump, n} -> n + # end + + # for _i <- 1..bump_times do + # send_proxied(config[Chorex.ProxyTest.StateWorker], :incr) + # end + + # send_proxied(config[Chorex.ProxyTest.StateWorker], :fetch) + + # receive do + # {:final_count, c} -> + # send(config[:super], {:got_count, c}) + # end + # end + # end + + # test "state shared" do + # {:ok, proxy} = GenServer.start(Chorex.Proxy, 0) + + # # First session + # a1 = spawn(StateClient, :test_state, [:start]) + # begin_session(proxy, [a1], StateWorker, :test_state, [:start]) + # config1 = %{StateWorker => proxy, StateClient => a1, :super => self()} + # send(a1, {:config, config1}) + # send(proxy, {:chorex, a1, {:config, config1}}) + + # # Second session + # a2 = spawn(StateClient, :test_state, [:start]) + # begin_session(proxy, [a2], StateWorker, :test_state, [:start]) + # config2 = %{StateWorker => proxy, StateClient => a2, :super => self()} + # send(a2, {:config, config2}) + # send(proxy, {:chorex, a2, {:config, config2}}) + + # send(a2, {:bump, 21}) + # Process.sleep(1) + # send(a1, {:bump, 21}) + + # final1 = + # receive do + # {:got_count, n} -> n + # end + + # final2 = + # receive do + # {:got_count, n} -> n + # end + + # # WARNING: this is a little brittle but it's working + # assert {21, 42} = {final1, final2} + # end end From a70782d23c0eb2b8e4c970b5d50b8f748e168d8b Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 30 Oct 2024 17:30:03 -0600 Subject: [PATCH 12/19] Substantially more tests passing now --- lib/chorex.ex | 2 +- test/function_test.exs | 8 +------- test/higher_order_test.exs | 12 +++++++----- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index d7fc540..aa0beae 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -452,7 +452,7 @@ defmodule Chorex do {_k, {:remote, _, _, _} = r} -> [r] _ -> [] end) - |> Enum.into(MapSet) + |> Enum.into(MapSet.new()) remote_proxies = for {:remote, lport, rhost, rport} = proxy_desc <- remotes do diff --git a/test/function_test.exs b/test/function_test.exs index 6edf815..ec5af34 100644 --- a/test/function_test.exs +++ b/test/function_test.exs @@ -72,13 +72,7 @@ defmodule FunctionTest do end test "looping increment test" do - cs = spawn(MyCounterServer, :init, [[]]) - cc = spawn(MyCounterClient, :init, [[]]) - - config = %{CounterServer => cs, CounterClient => cc, :super => self()} - - send(cs, {:config, config}) - send(cc, {:config, config}) + Chorex.start(CounterTest.Chorex, %{CounterServer => MyCounterServer, CounterClient => MyCounterClient}, []) assert_receive {:chorex_return, CounterClient, 55} end diff --git a/test/higher_order_test.exs b/test/higher_order_test.exs index 88b7051..4663412 100644 --- a/test/higher_order_test.exs +++ b/test/higher_order_test.exs @@ -160,13 +160,15 @@ defmodule HigherOrderTest do end test "big hoc test" do - alice = spawn(MyAlice4, :init, [[false]]) - bob = spawn(MyBob4, :init, [[false]]) + # alice = spawn(MyAlice4, :init, [[false]]) + # bob = spawn(MyBob4, :init, [[false]]) - config = %{Alice => alice, Bob => bob, :super => self()} + # config = %{Alice => alice, Bob => bob, :super => self()} - send(alice, {:config, config}) - send(bob, {:config, config}) + # send(alice, {:config, config}) + # send(bob, {:config, config}) + + Chorex.start(TestChor4.Chorex, %{Alice => MyAlice4, Bob => MyBob4}, [false]) assert_receive {:chorex_return, Alice, [ From a0a9d017620feb31ed1f984033888c572ce8b85d Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 30 Oct 2024 17:36:21 -0600 Subject: [PATCH 13/19] More tests --- lib/chorex/proxy.ex | 3 ++- test/higher_order_test.exs | 8 -------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/lib/chorex/proxy.ex b/lib/chorex/proxy.ex index f575c73..1418362 100644 --- a/lib/chorex/proxy.ex +++ b/lib/chorex/proxy.ex @@ -58,7 +58,8 @@ defmodule Chorex.Proxy do end # Normal messages going to the proxy - def handle_info({:chorex, session_key, _sender, _receiver, _msg} = msg, state) do + def handle_info({signal, session_key, _sender, _receiver, _msg} = msg, state) + when signal in [:chorex, :choice] do with {:ok, session_handler} <- fetch_session(state, session_key) do # Forward to handler send(session_handler, msg) diff --git a/test/higher_order_test.exs b/test/higher_order_test.exs index 4663412..47e2200 100644 --- a/test/higher_order_test.exs +++ b/test/higher_order_test.exs @@ -160,14 +160,6 @@ defmodule HigherOrderTest do end test "big hoc test" do - # alice = spawn(MyAlice4, :init, [[false]]) - # bob = spawn(MyBob4, :init, [[false]]) - - # config = %{Alice => alice, Bob => bob, :super => self()} - - # send(alice, {:config, config}) - # send(bob, {:config, config}) - Chorex.start(TestChor4.Chorex, %{Alice => MyAlice4, Bob => MyBob4}, [false]) assert_receive {:chorex_return, Alice, From fdcdfa11f8c5114fba33303e320d110fbee084c9 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 30 Oct 2024 20:43:03 -0600 Subject: [PATCH 14/19] All old tests passing; working on socket tests --- test/chorex/socket_proxy_test.exs | 33 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 6a2248d..c6b5bfe 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -4,26 +4,27 @@ defmodule Chorex.SocketProxyTest do defmodule BasicRemote do import Chorex - defchor [Alice, Bob] do - def run(Alice.(report), Bob.(report)) do - Alice.("hello") ~> Bob.(m1) - Alice.("there") ~> Bob.(m2) - Alice.("bob") ~> Bob.(m3) - Bob.([m1, m2, m3]) ~> Alice.(message) - Alice.(send(report, {:done, message})) - Bob.(send(report, {:done, "whatever"})) + defchor [SockAlice, SockBob] do + def run(SockAlice.(report), SockBob.(report)) do + SockAlice.("hello") ~> SockBob.(m1) + SockAlice.("there") ~> SockBob.(m2) + SockAlice.("bob") ~> SockBob.(m3) + SockBob.([m1, m2, m3]) ~> SockAlice.(message) + SockAlice.(send(report, {:done, message})) + SockBob.(send(report, {:done, "whatever"})) end end end - defmodule AliceImpl do - use BasicRemote.Chorex, :alice + defmodule SockAliceImpl do + use BasicRemote.Chorex, :sockalice end - defmodule BobImpl do - use BasicRemote.Chorex, :bob + defmodule SockBobImpl do + use BasicRemote.Chorex, :sockbob end + @tag :skip test "basic proxy works" do # Spin up two tasks to collect responses alice_receiver = Task.async(fn -> @@ -41,12 +42,12 @@ defmodule Chorex.SocketProxyTest do end) Chorex.start(BasicRemote.Chorex, - %{Alice => AliceImpl, - Bob => {:remote, 4242, "localhost", 4243}}, [alice_receiver, nil]) + %{SockAlice => SockAliceImpl, + SockBob => {:remote, 4242, "localhost", 4243}}, [alice_receiver, nil]) Chorex.start(BasicRemote.Chorex, - %{Alice => {:remote, 4243, "localhost", 4242}, - Bob => BobImpl}, [nil, bob_receiver]) + %{SockAlice => {:remote, 4243, "localhost", 4242}, + SockBob => SockBobImpl}, [nil, bob_receiver]) assert {:done, ["hello", "there", "bob"]} = Task.await(alice_receiver) assert {:done, "whatever"} = Task.await(bob_receiver) From 0914577eacb464b43094eac57e16dec32b966913 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 30 Oct 2024 21:08:51 -0600 Subject: [PATCH 15/19] Draft sending logic for proxy --- lib/chorex/proxy.ex | 2 +- lib/chorex/socket_proxy.ex | 41 ++++++++++++++++++++++++++----- test/chorex/socket_proxy_test.exs | 1 - 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/lib/chorex/proxy.ex b/lib/chorex/proxy.ex index 1418362..ca66035 100644 --- a/lib/chorex/proxy.ex +++ b/lib/chorex/proxy.ex @@ -71,7 +71,7 @@ defmodule Chorex.Proxy do # TEMPORARY FIX: Swallow DOWN messages def handle_info({:DOWN, _, _, _, _}, state), do: {:noreply, state} - # Fetch all session data for the associated PID + # Fetch all session data for the associated session key @spec fetch_session(state(), binary) :: {:ok, pid()} | :error defp fetch_session(state, session_key) do with {:ok, handler} <- Map.fetch(state[:session_handler], session_key) do diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index d1176ef..3cf1ba6 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -15,7 +15,9 @@ defmodule Chorex.SocketProxy do out_socket: nil | :inet.socket(), out_queue: :queue.queue(), in_listener: pid(), - config: config_map() + net_config: config_map(), + session_key: binary(), + config: map() } @spec init(config_map()) :: {:ok, state()} @@ -24,13 +26,26 @@ defmodule Chorex.SocketProxy do GenServer.start_link(Chorex.SocketListener, %{listen_port: lport, notify: self()}) send(self(), :try_connect) - {:ok, %{out_socket: nil, out_queue: :queue.new(), in_listener: in_listener, config: config}} + + {:ok, + %{ + out_socket: nil, + out_queue: :queue.new(), + in_listener: in_listener, + net_config: config, + session_key: nil, + config: nil + }} end def handle_info(:try_connect, %{out_socket: nil} = state) do - host = if is_binary(state.config.remote_host), do: String.to_charlist(state.config.remote_host), else: state.config.remote_host + host = + if is_binary(state.net_config.remote_host), + do: String.to_charlist(state.net_config.remote_host), + else: state.net_config.remote_host + # 500 = timeout in milliseconds - case :gen_tcp.connect(host, state.config.remote_port, [], 500) do + case :gen_tcp.connect(host, state.net_config.remote_port, [], 500) do {:ok, socket} -> schedule_send() {:noreply, %{state | out_socket: socket}} @@ -51,8 +66,22 @@ defmodule Chorex.SocketProxy do end end - def handle_cast({:tcp_recv, msg}, state) do - # IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") + def handle_info({:chorex, session_key, :meta, {:config, config}}, state) do + # This message doesn't get forwarded + {:noreply, %{state | config: config, session_key: session_key}} + end + + def handle_info({signal, _session_key, _sender, _receiver, _msg} = msg, state) + when signal in [:chorex, :choice] do + bytes = :erlang.term_to_binary(msg) + schedule_send() + {:noreply, %{state | out_queue: :queue.snoc(state.out_queue, bytes)}} + end + + def handle_cast({:tcp_recv, {signal, _session_key, _sender, receiver, _body} = msg}, state) + when signal in [:chorex, :choice] do + IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") + send(state.config[receiver], msg) {:noreply, state} end diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index c6b5bfe..2a9bf97 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -24,7 +24,6 @@ defmodule Chorex.SocketProxyTest do use BasicRemote.Chorex, :sockbob end - @tag :skip test "basic proxy works" do # Spin up two tasks to collect responses alice_receiver = Task.async(fn -> From 8980a553d7bcd8bc1a16dd0ea5b8214ddc0059fb Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 30 Oct 2024 21:19:50 -0600 Subject: [PATCH 16/19] Debugging socket proxy test I need to do some session token translation in the proxies; each node will have its own session token; I should do a little bit of signaling that overwrites the session tokens soundly. --- lib/chorex.ex | 4 ++++ lib/chorex/socket_proxy.ex | 6 ++++-- test/chorex/socket_proxy_test.exs | 8 ++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index aa0beae..937eb91 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -715,6 +715,10 @@ defmodule Chorex do unquote(recver_exp) = receive do {:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> msg + m -> + IO.inspect(m, label: "#{inspect self()} got unexpected message") + IO.inspect(tok, label: "tok") + 42 end end ) diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index 3cf1ba6..e792a2a 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -47,6 +47,7 @@ defmodule Chorex.SocketProxy do # 500 = timeout in milliseconds case :gen_tcp.connect(host, state.net_config.remote_port, [], 500) do {:ok, socket} -> + IO.inspect(self(), label: "connected to remote #{host}:#{state.net_config.remote_port}; I am PID: ") schedule_send() {:noreply, %{state | out_socket: socket}} @@ -73,6 +74,7 @@ defmodule Chorex.SocketProxy do def handle_info({signal, _session_key, _sender, _receiver, _msg} = msg, state) when signal in [:chorex, :choice] do + IO.inspect(msg, label: "#{inspect self()} [SocketProxy] sending msg") bytes = :erlang.term_to_binary(msg) schedule_send() {:noreply, %{state | out_queue: :queue.snoc(state.out_queue, bytes)}} @@ -81,7 +83,7 @@ defmodule Chorex.SocketProxy do def handle_cast({:tcp_recv, {signal, _session_key, _sender, receiver, _body} = msg}, state) when signal in [:chorex, :choice] do IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") - send(state.config[receiver], msg) + send(IO.inspect(state.config[receiver], label: "receiver"), msg) {:noreply, state} end @@ -110,7 +112,7 @@ defmodule Chorex.SocketProxy do {{:value, m}, new_queue} -> # IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} trying to send packet") with :ok <- :gen_tcp.send(socket, m) do - # IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} sent packet") + IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} sent packet") send_until_empty(%{state | out_queue: new_queue}) else {:error, e} -> diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 2a9bf97..93f4591 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -7,9 +7,17 @@ defmodule Chorex.SocketProxyTest do defchor [SockAlice, SockBob] do def run(SockAlice.(report), SockBob.(report)) do SockAlice.("hello") ~> SockBob.(m1) + SockAlice.(IO.puts("here1")) + SockBob.(IO.puts("here1b")) SockAlice.("there") ~> SockBob.(m2) + SockAlice.(IO.puts("here2")) + SockBob.(IO.puts("here2b")) SockAlice.("bob") ~> SockBob.(m3) + SockAlice.(IO.puts("here3")) + SockBob.(IO.puts("here3b")) SockBob.([m1, m2, m3]) ~> SockAlice.(message) + SockAlice.(IO.puts("here4")) + SockBob.(IO.puts("here4b")) SockAlice.(send(report, {:done, message})) SockBob.(send(report, {:done, "whatever"})) end From 2df64bc7ca7b6b29e48dbf4491ce26ed2412f262 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 30 Oct 2024 21:24:06 -0600 Subject: [PATCH 17/19] =?UTF-8?q?Socket=20proxy=20test=20works=20intermitt?= =?UTF-8?q?ently=E2=80=94some=20messages=20not=20received?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/chorex/socket_proxy.ex | 5 +++-- test/chorex/socket_proxy_test.exs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index e792a2a..ae6d43c 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -80,10 +80,11 @@ defmodule Chorex.SocketProxy do {:noreply, %{state | out_queue: :queue.snoc(state.out_queue, bytes)}} end - def handle_cast({:tcp_recv, {signal, _session_key, _sender, receiver, _body} = msg}, state) + def handle_cast({:tcp_recv, {signal, _session_key, sender, receiver, body} = msg}, state) when signal in [:chorex, :choice] do IO.inspect(msg, label: "#{inspect self()} [SocketProxy] msg received") - send(IO.inspect(state.config[receiver], label: "receiver"), msg) + # FIXME: do a careful mapping between session keys + send(state.config[receiver], {signal, state.session_key, sender, receiver, body}) {:noreply, state} end diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 93f4591..00a22ca 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -50,11 +50,11 @@ defmodule Chorex.SocketProxyTest do Chorex.start(BasicRemote.Chorex, %{SockAlice => SockAliceImpl, - SockBob => {:remote, 4242, "localhost", 4243}}, [alice_receiver, nil]) + SockBob => {:remote, 4242, "localhost", 4243}}, [alice_receiver.pid, nil]) Chorex.start(BasicRemote.Chorex, %{SockAlice => {:remote, 4243, "localhost", 4242}, - SockBob => SockBobImpl}, [nil, bob_receiver]) + SockBob => SockBobImpl}, [nil, bob_receiver.pid]) assert {:done, ["hello", "there", "bob"]} = Task.await(alice_receiver) assert {:done, "whatever"} = Task.await(bob_receiver) From 281d6247156f55393fedec09ed3d52446d656fb7 Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Mon, 4 Nov 2024 12:45:51 -0700 Subject: [PATCH 18/19] Attempting to fix intermittent failures --- lib/chorex/socket_listener.ex | 9 +++++++++ lib/chorex/socket_proxy.ex | 2 +- test/chorex/socket_proxy_test.exs | 8 -------- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/chorex/socket_listener.ex b/lib/chorex/socket_listener.ex index 31539eb..6fba09e 100644 --- a/lib/chorex/socket_listener.ex +++ b/lib/chorex/socket_listener.ex @@ -17,14 +17,23 @@ defmodule Chorex.SocketListener do term = data |> :erlang.list_to_binary() |> :erlang.binary_to_term() + IO.inspect(term, label: "[SocketListener #{inspect self()} → #{inspect state.notify}] got term") GenServer.cast(state.notify, {:tcp_recv, term}) {:noreply, state} end + def handle_info(m, state) do + IO.inspect(m, label: "unhandled message") + {:noreply, state} + end + def listen(port) do default_options = [ backlog: 1024, + active: true, nodelay: true, + # binary: true, + # packet: :line, send_timeout: 30_000, send_timeout_close: true, reuseaddr: true diff --git a/lib/chorex/socket_proxy.ex b/lib/chorex/socket_proxy.ex index ae6d43c..f51bb4c 100644 --- a/lib/chorex/socket_proxy.ex +++ b/lib/chorex/socket_proxy.ex @@ -112,7 +112,7 @@ defmodule Chorex.SocketProxy do case :queue.out(q) do {{:value, m}, new_queue} -> # IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} trying to send packet") - with :ok <- :gen_tcp.send(socket, m) do + with :ok <- :gen_tcp.send(socket, m <> "\n") do IO.inspect(:erlang.binary_to_term(m), label: "#{inspect self()} sent packet") send_until_empty(%{state | out_queue: new_queue}) else diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 00a22ca..576caec 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -7,17 +7,9 @@ defmodule Chorex.SocketProxyTest do defchor [SockAlice, SockBob] do def run(SockAlice.(report), SockBob.(report)) do SockAlice.("hello") ~> SockBob.(m1) - SockAlice.(IO.puts("here1")) - SockBob.(IO.puts("here1b")) SockAlice.("there") ~> SockBob.(m2) - SockAlice.(IO.puts("here2")) - SockBob.(IO.puts("here2b")) SockAlice.("bob") ~> SockBob.(m3) - SockAlice.(IO.puts("here3")) - SockBob.(IO.puts("here3b")) SockBob.([m1, m2, m3]) ~> SockAlice.(message) - SockAlice.(IO.puts("here4")) - SockBob.(IO.puts("here4b")) SockAlice.(send(report, {:done, message})) SockBob.(send(report, {:done, "whatever"})) end From 340f54904f812799e68eb20f2b0709064cf97b5b Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 6 Nov 2024 13:31:46 -0700 Subject: [PATCH 19/19] Skip flakey test, document TCP transport, elixir-format --- lib/chorex.ex | 88 +++++++++++++------------------ test/chorex/socket_proxy_test.exs | 1 + 2 files changed, 37 insertions(+), 52 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index 937eb91..06fe49c 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -330,62 +330,36 @@ defmodule Chorex do Note the 2-tuple: the first element is the module to be proxied, and the second element should be the PID of an already-running proxy. - ### Manually setting up the shared-state choreography (deprecated) + ### **Experimental** TCP transport setup - (*Note: we recommend using the `Chorex.start` mechanism now.*) + You can run choreographies over TCP. Instead of specifying the + implementing module's name in the actor ↦ module map, put a tuple + like `{:remote, local_port, remote_host, remote_port}`. A process + will begin listening on `local_port` and forward messages to the + proper actors on the current node. Messages going to a remote actor + will be buffered until a TCP connection is established, at which + point they'll be sent FIFO. - You need to be a little careful when setting up the shared state - choreography. Instead of setting up all the actors manually, you need - to set up *one* instance of each shared-state actor, then create - separate *sessions* for each instance of the choreography that you - want to run. + Example with hosts `alice.net` and `bob.net`: - Here is an example with two buyers trying to buy the same book: + Host `alice.net`: ```elixir - # Start up the buyers - b1 = spawn(MyBuyer, :init, [[]]) - b2 = spawn(MyBuyer, :init, [[]]) - - # Start up the seller proxy with the initial shared - # state (the stock of books in this case) - {:ok, px} = GenServer.start(Chorex.Proxy, %{"Anathem" => 1}) - - # Start sessions: one for each buyer - Proxy.begin_session(px, [b1], MySellerBackend, :init, []) - config1 = %{Buyer => b1, Seller => px, :super => self()} + Chorex.start(BasicRemote.Chorex, + %{SockAlice => SockAliceImpl, + SockBob => {:remote, 4242, "bob.net", 4243}}, []) + ``` - Proxy.begin_session(px, [b2], MySellerBackend, :init, []) - config2 = %{Buyer => b2, Seller => px, :super => self()} + Host `bob.net`: - # Send everyone their configuration - send(b1, {:config, config1}) - send(px, {:chorex, b1, {:config, config1}}) - send(b2, {:config, config2}) - send(px, {:chorex, b2, {:config, config2}}) + ```elixir + Chorex.start(BasicRemote.Chorex, + %{SockAlice => {:remote, 4243, "alice.net", 4242}, + SockBob => SockBobImpl}, []) ``` - The `Proxy.begin_sesion` function takes a proxy function, a list of - PIDs that partake in a given session, and a module, function, arglist - for the thing to proxy. - - **Sessions**: PIDs belonging to a session will have their messages - routed to the corresponding proxied process. The GenServer looks up - which session a PID belongs to, finds the proxied process linked to - that session, then forwards the message to that process. The exact - mechanisms of how this works may change in the future to accommodate - restarts. - - When you send the config information to a proxied process, you send it - through the proxy first, and you must wrap the message as shown above - with a process from the session you want to target as the second - element in the tuple; this just helps the proxy figure out the session - you want. - - That's it! If you run the above choreography, the process that kicks - this all off will get *one* message like `{:chorex_return, Buyer, :book_get}` - and *one* message like `{:chorex_return, Buyer, :darn_missed_it}`, - indicating that exactly one of the buyers got the coveted book. + **WARNING** this transport is *experimental* and not guaranteed to + work. We've had issues with message delivery during testing. PRs welcome! """ import WriterMonad @@ -714,9 +688,11 @@ defmodule Chorex do unquote(recver_exp) = receive do - {:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> msg + {:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> + msg + m -> - IO.inspect(m, label: "#{inspect self()} got unexpected message") + IO.inspect(m, label: "#{inspect(self())} got unexpected message") IO.inspect(tok, label: "tok") 42 end @@ -1288,8 +1264,15 @@ defmodule Chorex do end def merge_step( - {:__block__, _, [{:=, _, [{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} = tok_get, {:receive, _, _} = lhs_rcv]}, - {:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]}) do + {:__block__, _, + [ + {:=, _, + [{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} = + tok_get, + {:receive, _, _} = lhs_rcv + ]}, + {:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]} + ) do quote do unquote(tok_get) unquote(merge_step(lhs_rcv, rhs_rcv)) @@ -1318,7 +1301,8 @@ defmodule Chorex do # merge same branch def merge_step( - {:receive, m1, [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]}, + {:receive, m1, + [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]}, {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, dir]}], branch2]}]]]} ) do {:receive, m1, diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 576caec..4230344 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -24,6 +24,7 @@ defmodule Chorex.SocketProxyTest do use BasicRemote.Chorex, :sockbob end + @tag :skip test "basic proxy works" do # Spin up two tasks to collect responses alice_receiver = Task.async(fn ->