Skip to content

Commit

Permalink
refactor: move lookup key to socket assign
Browse files Browse the repository at this point in the history
  • Loading branch information
lemald committed Oct 26, 2023
1 parent 1d752ba commit 2d9dd0c
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 175 deletions.
91 changes: 57 additions & 34 deletions lib/realtime/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,69 +93,92 @@ defmodule Realtime.Server do
```
Those `lookup_args` can be passed into `RealTime.Server.lookup(lookup_args)/1` to get the data.
"""
@spec subscribe_to_route(Route.id(), GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_route(Route.id(), GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_route(route_id, server \\ default_name()) do
subscribe(server, {:route_id, route_id})
subscription_key = {:route_id, route_id}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_all_shuttles(GenServer.server()) :: [Vehicle.t()]
@spec subscribe_to_all_shuttles(GenServer.server()) :: {subscription_key(), [Vehicle.t()]}
def subscribe_to_all_shuttles(server \\ default_name()) do
subscribe(server, :all_shuttles)
subscription_key = :all_shuttles
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_search(search_params(), GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_search(search_params(), GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_search(search_params, server \\ default_name()) do
subscribe(server, {:search, search_params})
subscription_key = {:search, search_params}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_limited_search(search_params(), GenServer.server()) ::
limited_search_result()
{subscription_key(), limited_search_result()}
def subscribe_to_limited_search(search_params, server \\ default_name()) do
subscribe(server, {:limited_search, search_params})
subscription_key = {:limited_search, search_params}
{subscription_key, subscribe(server, subscription_key)}
end

@spec update_limited_search_subscription(search_params(), GenServer.server()) ::
limited_search_result()
{subscription_key(), limited_search_result()}
def update_limited_search_subscription(search_params, server \\ default_name()) do
update_subscription(server, {:limited_search, search_params})
subscription_key = {:limited_search, search_params}
{subscription_key, update_subscription(server, subscription_key)}
end

@spec subscribe_to_vehicle(String.t(), GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_vehicle(String.t(), GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_vehicle(vehicle_id, server \\ default_name()) do
subscribe(
server,
{:vehicle, vehicle_id}
)
subscription_key = {:vehicle, vehicle_id}

{subscription_key,
subscribe(
server,
subscription_key
)}
end

@spec subscribe_to_vehicle_with_logged_out(String.t(), GenServer.server()) :: [
VehicleOrGhost.t()
]
@spec subscribe_to_vehicle_with_logged_out(String.t(), GenServer.server()) ::
{subscription_key(),
[
VehicleOrGhost.t()
]}
def subscribe_to_vehicle_with_logged_out(vehicle_id, server \\ default_name()) do
subscribe(
server,
{:vehicle_with_logged_out, vehicle_id}
)
subscription_key = {:vehicle_with_logged_out, vehicle_id}

{subscription_key,
subscribe(
server,
subscription_key
)}
end

@spec subscribe_to_run_ids([Run.id()], GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_run_ids([Run.id()], GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_run_ids(run_ids, server \\ default_name()) do
subscribe(server, {:run_ids, run_ids})
subscription_key = {:run_ids, run_ids}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_block_ids([Block.id()], GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_block_ids([Block.id()], GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_block_ids(block_ids, server \\ default_name()) do
subscribe(server, {:block_ids, block_ids})
subscription_key = {:block_ids, block_ids}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_all_pull_backs(GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_all_pull_backs(GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_all_pull_backs(server \\ default_name()) do
subscribe(server, :all_pull_backs)
subscription_key = :all_pull_backs
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_alerts(Route.id(), GenServer.server()) :: [String.t()]
@spec subscribe_to_alerts(Route.id(), GenServer.server()) :: {subscription_key(), [String.t()]}
def subscribe_to_alerts(route_id, server \\ default_name()) do
subscribe(server, {:alerts, route_id})
subscription_key = {:alerts, route_id}
{subscription_key, subscribe(server, subscription_key)}
end

def peek_at_vehicles_by_run_ids(run_ids, server \\ default_name()) do
Expand Down Expand Up @@ -524,15 +547,15 @@ defmodule Realtime.Server do
Enum.each(entries, fn {pid, subscripition_key} ->
if (data_type == :alerts and match?({:alerts, _}, subscripition_key)) or
(data_type == :vehicles and !match?({:alerts, _}, subscripition_key)) do
send_data({pid, subscripition_key}, state)
send_data(pid, state)
end
end)
end)
end

@spec send_data({pid, subscription_key}, t) :: broadcast_message
defp send_data({pid, subscription_key}, state) do
send(pid, {:new_realtime_data, {state.ets, subscription_key}})
@spec send_data(pid, t) :: broadcast_message
defp send_data(pid, state) do
send(pid, {:new_realtime_data, state.ets})
end

@spec block_is_active?(VehicleOrGhost.t()) :: boolean
Expand Down
13 changes: 9 additions & 4 deletions lib/skate_web/channels/alerts_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ defmodule SkateWeb.AlertsChannel do

@impl SkateWeb.AuthenticatedChannel
def join_authenticated("alerts:route:" <> route_id, _message, socket) do
alerts = Duration.log_duration(Server, :subscribe_to_alerts, [route_id])
{:ok, %{data: alerts}, socket}
{lookup_key, alerts} = Duration.log_duration(Server, :subscribe_to_alerts, [route_id])

{:ok, %{data: alerts},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do
data = Server.lookup(lookup_args)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = Map.get(socket.assigns, :lookup_key)

data = Server.lookup({ets, lookup_key})

:ok = push(socket, "alerts", %{data: data})
{:noreply, socket}
end
Expand Down
39 changes: 29 additions & 10 deletions lib/skate_web/channels/vehicle_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ defmodule SkateWeb.VehicleChannel do
alias Realtime.Server

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_params}, socket) do
vehicle_or_ghost = Realtime.Server.lookup(lookup_params)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = socket.assigns[:lookup_key]

vehicle_or_ghost = Realtime.Server.lookup({ets, lookup_key})

:ok = push(socket, "vehicle", %{data: List.first(vehicle_or_ghost)})

{:noreply, socket}
Expand All @@ -15,11 +18,17 @@ defmodule SkateWeb.VehicleChannel do
@impl SkateWeb.AuthenticatedChannel
def join_authenticated("vehicle:run_ids:" <> run_ids, _message, socket) do
run_ids = String.split(run_ids, ",")

vehicle_or_ghost = Realtime.Server.peek_at_vehicles_by_run_ids(run_ids) |> List.first()

if vehicle_or_ghost do
_ = Server.subscribe_to_vehicle(vehicle_or_ghost.id)
end
socket =
if vehicle_or_ghost do
{lookup_key, _vehicle_or_ghost} = Server.subscribe_to_vehicle(vehicle_or_ghost.id)

Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))
else
socket
end

{:ok, %{data: vehicle_or_ghost}, socket}
end
Expand All @@ -36,13 +45,23 @@ defmodule SkateWeb.VehicleChannel do
Realtime.Server.peek_at_vehicle_by_id(vehicle_or_ghost_id) |> List.first()
end

if vehicle_or_ghost do
if user_in_test_group? do
_ = Server.subscribe_to_vehicle_with_logged_out(vehicle_or_ghost.id)
{lookup_key, _vehicle_or_ghost} =
if vehicle_or_ghost do
if user_in_test_group? do
Server.subscribe_to_vehicle_with_logged_out(vehicle_or_ghost.id)
else
Server.subscribe_to_vehicle(vehicle_or_ghost.id)
end
else
{nil, nil}
end

socket =
if is_nil(lookup_key) do
socket
else
_ = Server.subscribe_to_vehicle(vehicle_or_ghost.id)
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))
end
end

{:ok, %{data: vehicle_or_ghost}, socket}
end
Expand Down
60 changes: 40 additions & 20 deletions lib/skate_web/channels/vehicles_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,45 @@ defmodule SkateWeb.VehiclesChannel do

@impl SkateWeb.AuthenticatedChannel
def join_authenticated("vehicles:shuttle:all", _message, socket) do
shuttles = Duration.log_duration(Server, :subscribe_to_all_shuttles, [])
{:ok, %{data: shuttles}, socket}
{lookup_key, shuttles} = Duration.log_duration(Server, :subscribe_to_all_shuttles, [])

{:ok, %{data: shuttles},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

def join_authenticated("vehicles:pull_backs:all", _message, socket) do
pull_backs = Duration.log_duration(Server, :subscribe_to_all_pull_backs, [])
{:ok, %{data: pull_backs}, socket}
{lookup_key, pull_backs} = Duration.log_duration(Server, :subscribe_to_all_pull_backs, [])

{:ok, %{data: pull_backs},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

def join_authenticated("vehicles:route:" <> route_id, _message, socket) do
vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_route, [route_id])
{:ok, %{data: vehicles_and_ghosts}, socket}
{lookup_key, vehicles_and_ghosts} =
Duration.log_duration(Server, :subscribe_to_route, [route_id])

{:ok, %{data: vehicles_and_ghosts},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

def join_authenticated("vehicles:run_ids:" <> run_ids, _message, socket) do
run_ids = String.split(run_ids, ",")
vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_run_ids, [run_ids])
{:ok, %{data: vehicles_and_ghosts}, socket}

{lookup_key, vehicles_and_ghosts} =
Duration.log_duration(Server, :subscribe_to_run_ids, [run_ids])

{:ok, %{data: vehicles_and_ghosts},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

def join_authenticated("vehicles:block_ids:" <> block_ids, _message, socket) do
block_ids = String.split(block_ids, ",")
vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_block_ids, [block_ids])
{:ok, %{data: vehicles_and_ghosts}, socket}

{lookup_key, vehicles_and_ghosts} =
Duration.log_duration(Server, :subscribe_to_block_ids, [block_ids])

{:ok, %{data: vehicles_and_ghosts},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

def join_authenticated(
Expand Down Expand Up @@ -76,26 +91,31 @@ defmodule SkateWeb.VehiclesChannel do
"User=#{username} searched for property=#{subscribe_args.property}, text=#{subscribe_args.text}"
end)

vehicles = Duration.log_duration(Server, :subscribe_to_search, [subscribe_args])
{lookup_key, vehicles} = Duration.log_duration(Server, :subscribe_to_search, [subscribe_args])

{:ok, %{data: vehicles}, socket}
{:ok, %{data: vehicles},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

def join_authenticated(topic, _message, _socket) do
{:error, %{message: "no such topic \"#{topic}\""}}
end

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do
event_name = event_name(lookup_args)
data = Server.lookup(lookup_args)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = socket.assigns[:lookup_key]

data = Server.lookup({ets, lookup_key})

event_name = event_name(lookup_key)
:ok = push(socket, event_name, %{data: data})

{:noreply, socket}
end

@spec event_name(Server.lookup_key()) :: String.t()
defp event_name({_ets, :all_shuttles}), do: "shuttles"
defp event_name({_ets, :all_pull_backs}), do: "pull_backs"
defp event_name({_ets, {:search, _}}), do: "search"
defp event_name({_ets, _}), do: "vehicles"
@spec event_name(Server.subscription_key()) :: String.t()
defp event_name(:all_shuttles), do: "shuttles"
defp event_name(:all_pull_backs), do: "pull_backs"
defp event_name({:search, _}), do: "search"
defp event_name(_), do: "vehicles"
end
24 changes: 15 additions & 9 deletions lib/skate_web/channels/vehicles_search_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ defmodule SkateWeb.VehiclesSearchChannel do
"#{__MODULE__} limited_search User=#{username} searched for property=#{subscribe_args.property}, text=#{subscribe_args.text}"
end)

result = Duration.log_duration(Server, :subscribe_to_limited_search, [subscribe_args])
{lookup_key, result} =
Duration.log_duration(Server, :subscribe_to_limited_search, [subscribe_args])

{:ok, %{data: result}, socket}
{:ok, %{data: result},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

@impl SkateWeb.AuthenticatedChannel
Expand All @@ -67,7 +69,7 @@ defmodule SkateWeb.VehiclesSearchChannel do

%{property: property, text: text} = search_params_from_subtopic(subtopic)

result =
{lookup_key, result} =
Duration.log_duration(Server, :update_limited_search_subscription, [
%{
property: property,
Expand All @@ -81,7 +83,8 @@ defmodule SkateWeb.VehiclesSearchChannel do
"#{__MODULE__} limited_search User=#{username} updated limit for property=#{property}limit=#{limit}"
end)

{:reply, {:ok, %{data: result}}, socket}
{:reply, {:ok, %{data: result}},
Phoenix.Socket.assign(socket, Map.merge(socket.assigns, %{lookup_key: lookup_key}))}
end

defp search_params_from_subtopic(subtopic) do
Expand All @@ -90,13 +93,16 @@ defmodule SkateWeb.VehiclesSearchChannel do
end

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do
event_name = event_name(lookup_args)
data = Server.lookup(lookup_args)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = socket.assigns[:lookup_key]

event_name = event_name(lookup_key)
data = Server.lookup({ets, lookup_key})

:ok = push(socket, event_name, %{data: data})
{:noreply, socket}
end

@spec event_name(Server.lookup_key()) :: String.t()
defp event_name({_ets, {:limited_search, _}}), do: "limited_search"
@spec event_name(Server.subscription_key()) :: String.t()
defp event_name({:limited_search, _}), do: "limited_search"
end
Loading

0 comments on commit 2d9dd0c

Please sign in to comment.