diff --git a/lib/basket/application.ex b/lib/basket/application.ex index 0ef12e3..1334947 100644 --- a/lib/basket/application.ex +++ b/lib/basket/application.ex @@ -18,10 +18,16 @@ defmodule Basket.Application do # {Basket.Worker, arg}, # Start to serve requests, typically the last entry BasketWeb.Endpoint, - Basket.Websocket.Alpaca, {Cachex, name: :assets} ] + children = + unless Mix.env() == :test do + children ++ [Basket.Websocket.Alpaca] + else + children + end + # See https://hexdocs.pm/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Basket.Supervisor] diff --git a/lib/basket/websocket/adapter.ex b/lib/basket/websocket/adapter.ex deleted file mode 100644 index 6cbce49..0000000 --- a/lib/basket/websocket/adapter.ex +++ /dev/null @@ -1,20 +0,0 @@ -defmodule Basket.Websocket.Adapter do - @moduledoc """ - Abstract wrapper around WebSockex. - """ - - @doc """ - Handles the messages sent by the Alpaca websocket server, responding if necessary. - Besides processing messages as they arrive, this function will also set up the initial - subscription once the authorization acknowledgement method is received. - """ - @callback on_msg(WebSockex.Frame.frame(), bitstring()) :: :ok - - defmacro __using__(_) do - quote location: :keep do - use WebSockex - - @behaviour Basket.Websocket.Adapter - end - end -end diff --git a/lib/basket/websocket/alpaca.ex b/lib/basket/websocket/alpaca.ex index 77f7d70..3523166 100644 --- a/lib/basket/websocket/alpaca.ex +++ b/lib/basket/websocket/alpaca.ex @@ -4,7 +4,7 @@ defmodule Basket.Websocket.Alpaca do Currently only supports the "bars" feed on the minute. """ - use Basket.Websocket.Adapter + use WebSockex require Logger @@ -17,66 +17,16 @@ defmodule Basket.Websocket.Alpaca do @auth_success ~s([{\"T\":\"success\",\"msg\":\"authenticated\"}]) @connection_success ~s([{\"T\":\"success\",\"msg\":\"connected\"}]) @bars_topic "bars" - @subscribe_message %{ - action: :subscribe - } - @unsubscribe_message %{ - action: :unsubscribe - } - def start_link(state) do - Logger.info("Starting Alpaca websocket client.") + @callback start_link(term()) :: {:ok, pid()} | {:error, term()} + @callback subscribe(subscription_fields()) :: :ok + @callback unsubscribe(subscription_fields()) :: :ok - WebSockex.start_link(iex_feed(), __MODULE__, state, extra_headers: auth_headers()) - end - - @impl true - def on_msg(messages, _state) do - Enum.each(messages, fn message -> - case Map.get(message, "T") do - "b" -> - handle_bars(message) - - "d" -> - handle_daily_bars(message) - - "u" -> - handle_bar_updates(message) - - "error" -> - Logger.error("Error message from Alpaca websocket connection: #{message}") - - "subscription" -> - Logger.info("Subscription message from Alpaca websocket connection: #{message}") - - _ -> - Logger.info("Unhandled websocket message: #{message}") - end - end) - end - - @spec subscribe(subscription_fields()) :: :ok - def subscribe(tickers) do - decoded_message = Jason.encode!(build_message(@subscribe_message, tickers)) - - case WebSockex.send_frame(client_pid(), {:text, decoded_message}) do - :ok -> Logger.debug("Subscription message sent: #{inspect(decoded_message)}") - {:error, error} -> Logger.error("Error sending subscription message: #{inspect(error)}") - end - end + def start_link(state), do: impl().start_link(state) + def subscribe(tickers), do: impl().subscribe(tickers) + def unsubscribe(tickers), do: impl().unsubscribe(tickers) - @spec unsubscribe(subscription_fields()) :: :ok - def unsubscribe(tickers) do - decoded_message = Jason.encode!(build_message(@unsubscribe_message, tickers)) - - case WebSockex.send_frame(client_pid(), {:text, decoded_message}) do - :ok -> - Logger.debug("Subscription removal message sent: #{inspect(decoded_message)}") - - {:error, error} -> - Logger.error("Error sending subscription removal message: #{inspect(error)}") - end - end + def bars_topic, do: @bars_topic @impl true def handle_connect(_conn, state) do @@ -111,40 +61,37 @@ defmodule Basket.Websocket.Alpaca do @impl true def handle_frame({_tpe, msg}, state) do - Jason.decode!(msg) - |> on_msg(state) + case Jason.decode(msg) do + {:ok, decoded_message} -> + Enum.each(decoded_message, fn message -> + case Map.get(message, "T") do + "b" -> + handle_bars(message) - {:ok, state} - end + "d" -> + handle_daily_bars(message) - defp auth_headers, do: [{"APCA-API-KEY-ID", api_key()}, {"APCA-API-SECRET-KEY", api_secret()}] + "u" -> + handle_bar_updates(message) - defp iex_feed, do: "#{url()}/iex" + "error" -> + Logger.error("Error message from Alpaca websocket connection: #{inspect(message)}") - defp url, do: Application.fetch_env!(:basket, :alpaca)[:market_ws_url] + "subscription" -> + Logger.info( + "Subscription message from Alpaca websocket connection: #{inspect(message)}" + ) - defp api_key, do: Application.fetch_env!(:basket, :alpaca)[:api_key] + _ -> + Logger.info("Unhandled websocket message: #{inspect(message)}") + end + end) - defp api_secret, do: Application.fetch_env!(:basket, :alpaca)[:api_secret] - - defp client_pid, - do: - Supervisor.which_children(Basket.Supervisor) - |> Enum.find(fn c -> - case c do - {Basket.Websocket.Alpaca, _pid, :worker, [Basket.Websocket.Alpaca]} -> - true - - _ -> - false - end - end) - |> elem(1) + {:error, error} -> + Logger.error("Error decoding websocket message: #{inspect(error)}") + end - defp build_message(message, %{bars: bars, quotes: quotes, trades: trades}) do - message = if bars, do: Map.put(message, :bars, bars), else: message - message = if quotes, do: Map.put(message, :quotes, quotes), else: message - if trades, do: Map.put(message, :trades, trades), else: message + {:ok, state} end defp handle_bars( @@ -169,4 +116,6 @@ defmodule Basket.Websocket.Alpaca do defp handle_bar_updates(_message) do Logger.debug("Bar updates message received") end + + defp impl, do: Application.get_env(:basket, :alpaca_ws_client, Basket.Websocket.Alpaca.Impl) end diff --git a/lib/basket/websocket/alpaca/impl.ex b/lib/basket/websocket/alpaca/impl.ex new file mode 100644 index 0000000..c749e9b --- /dev/null +++ b/lib/basket/websocket/alpaca/impl.ex @@ -0,0 +1,67 @@ +defmodule Basket.Websocket.Alpaca.Impl do + require Logger + + @subscribe_message %{ + action: :subscribe + } + @unsubscribe_message %{ + action: :unsubscribe + } + + def start_link(state) do + Logger.info("Starting Alpaca websocket client.") + + WebSockex.start_link(iex_feed(), Basket.Websocket.Alpaca, state, extra_headers: auth_headers()) + end + + def subscribe(tickers) do + decoded_message = Jason.encode!(build_message(@subscribe_message, tickers)) + + case WebSockex.send_frame(client_pid(), {:text, decoded_message}) do + :ok -> Logger.debug("Subscription message sent: #{inspect(decoded_message)}") + {:error, error} -> Logger.error("Error sending subscription message: #{inspect(error)}") + end + end + + def unsubscribe(tickers) do + decoded_message = Jason.encode!(build_message(@unsubscribe_message, tickers)) + + case WebSockex.send_frame(client_pid(), {:text, decoded_message}) do + :ok -> + Logger.debug("Subscription removal message sent: #{inspect(decoded_message)}") + + {:error, error} -> + Logger.error("Error sending subscription removal message: #{inspect(error)}") + end + end + + defp auth_headers, do: [{"APCA-API-KEY-ID", api_key()}, {"APCA-API-SECRET-KEY", api_secret()}] + + defp api_key, do: Application.fetch_env!(:basket, :alpaca)[:api_key] + + defp api_secret, do: Application.fetch_env!(:basket, :alpaca)[:api_secret] + + defp iex_feed, do: "#{url()}/iex" + + defp url, do: Application.fetch_env!(:basket, :alpaca)[:market_ws_url] + + defp client_pid do + Supervisor.which_children(Basket.Supervisor) + |> Enum.find(fn c -> + case c do + {Basket.Websocket.Alpaca, _pid, :worker, [Basket.Websocket.Alpaca]} -> + true + + _ -> + false + end + end) + |> elem(1) + end + + defp build_message(message, %{bars: bars, quotes: quotes, trades: trades}) do + message = if bars, do: Map.put(message, :bars, bars), else: message + message = if quotes, do: Map.put(message, :quotes, quotes), else: message + if trades, do: Map.put(message, :trades, trades), else: message + end +end diff --git a/lib/basket_web/live/overview.ex b/lib/basket_web/live/overview.ex index cbf0178..79796d7 100644 --- a/lib/basket_web/live/overview.ex +++ b/lib/basket_web/live/overview.ex @@ -9,13 +9,13 @@ defmodule BasketWeb.Overview do require Logger alias Basket.Alpaca.HttpClient - alias Basket.Websocket.{Alpaca, Message} + alias Basket.Websocket.Alpaca alias BasketWeb.Components.{NavRow, SearchInput} prop tickers, :list, default: [] def mount(_, _, socket) do - BasketWeb.Endpoint.subscribe(Message.bars_topic()) + BasketWeb.Endpoint.subscribe(Alpaca.bars_topic()) socket = assign(socket, tickers: []) socket = assign(socket, basket: []) diff --git a/test/basket/websocket/alpaca_test.exs b/test/basket/websocket/alpaca_test.exs new file mode 100644 index 0000000..e69de29 diff --git a/test/basket_web/components/card_test.exs b/test/basket_web/components/card_test.exs index 05e6662..c3ddd5f 100644 --- a/test/basket_web/components/card_test.exs +++ b/test/basket_web/components/card_test.exs @@ -2,5 +2,5 @@ defmodule BasketWeb.Components.CardTest do use BasketWeb.ConnCase, async: true use Surface.LiveViewTest - catalogue_test BasketWeb.Card + catalogue_test BasketWeb.Components.Card end diff --git a/test/basket_web/live/overview_test.exs b/test/basket_web/live/overview_test.exs new file mode 100644 index 0000000..e500c3b --- /dev/null +++ b/test/basket_web/live/overview_test.exs @@ -0,0 +1,11 @@ +defmodule BasketWeb.OverviewTest do + use BasketWeb.ConnCase + + import Mox + + test "mount/3" do + Basket.Websocket.MockAlpaca |> expect(:start_link, fn state -> {:ok, 1} end) + + Basket.Websocket.Alpaca.start_link("statee") |> IO.inspect(label: "RESULT") + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 7102505..6b4588f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,5 +1,5 @@ -Mox.defmock(Basket.Alpaca.Websocket.MockClient, for: Basket.Alpaca.Websocket.Client) -Application.put_env(:basket, :alpaca_websocket_client, Basket.Alpaca.Websocket.MockClient) +Mox.defmock(Basket.Websocket.MockAlpaca, for: Basket.Websocket.Alpaca) +Application.put_env(:basket, :alpaca_ws_client, Basket.Websocket.MockAlpaca) ExUnit.start() Ecto.Adapters.SQL.Sandbox.mode(Basket.Repo, :manual)