Skip to content

Commit

Permalink
refactor: use PubSub for Realtime.Server
Browse files Browse the repository at this point in the history
  • Loading branch information
lemald committed Oct 26, 2023
1 parent 2d9dd0c commit 5b7be58
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
41 changes: 19 additions & 22 deletions lib/realtime/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ defmodule Realtime.Server do
@spec default_name() :: GenServer.name()
def default_name(), do: Realtime.Server

@spec pubsub_name() :: Phoenix.PubSub.t()
def pubsub_name(), do: Realtime.PubSub

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(start_link_opts) do
GenServer.start_link(__MODULE__, [], start_link_opts)
Expand Down Expand Up @@ -210,17 +213,20 @@ defmodule Realtime.Server do
@spec subscribe(GenServer.server(), {:run_ids, [Run.id()]}) :: [VehicleOrGhost.t()]
@spec subscribe(GenServer.server(), {:block_ids, [Block.id()]}) :: [VehicleOrGhost.t()]
@spec subscribe(GenServer.server(), {:alerts, Route.id()}) :: [String.t()]
defp subscribe(server, {:alerts, _route_id} = subscription_key) do
{pubsub, ets} = GenServer.call(server, :subscription_info)
Phoenix.PubSub.subscribe(pubsub, "realtime_alerts")
lookup({ets, subscription_key})
end

defp subscribe(server, subscription_key) do
{registry_key, ets} = GenServer.call(server, :subscription_info)
Registry.register(Realtime.Registry, registry_key, subscription_key)
{pubsub, ets} = GenServer.call(server, :subscription_info)
Phoenix.PubSub.subscribe(pubsub, "realtime_vehicles")
lookup({ets, subscription_key})
end

defp update_subscription(server, {:limited_search, _search_params} = subscription_key) do
{registry_key, ets} = GenServer.call(server, :subscription_info)
# Replace the old search subscription with the new one
Registry.unregister_match(Realtime.Registry, registry_key, {:limited_search, %{}})
Registry.register(Realtime.Registry, registry_key, subscription_key)
{_pubsub, ets} = GenServer.call(server, :subscription_info)

lookup({ets, subscription_key})
end
Expand Down Expand Up @@ -401,8 +407,7 @@ defmodule Realtime.Server do

@impl true
def handle_call(:subscription_info, _from, %__MODULE__{} = state) do
registry_key = self()
{:reply, {registry_key, state.ets}, state}
{:reply, {pubsub_name(), state.ets}, state}
end

def handle_call(:ets, _from, %__MODULE__{ets: ets} = state) do
Expand Down Expand Up @@ -541,21 +546,13 @@ defmodule Realtime.Server do

@spec broadcast(t(), :vehicles | :alerts) :: :ok
defp broadcast(state, data_type) do
registry_key = self()

Registry.dispatch(Realtime.Supervisor.registry_name(), registry_key, fn entries ->
Enum.each(entries, fn {pid, subscripition_key} ->
if (data_type == :alerts and match?({:alerts, _}, subscripition_key)) or
(data_type == :vehicles and !match?({:alerts, _}, subscripition_key)) do
send_data(pid, state)
end
end)
end)
end
topic =
case data_type do
:vehicles -> "realtime_vehicles"
:alerts -> "realtime_alerts"
end

@spec send_data(pid, t) :: broadcast_message
defp send_data(pid, state) do
send(pid, {:new_realtime_data, state.ets})
Phoenix.PubSub.broadcast(pubsub_name(), topic, {:new_realtime_data, state.ets})
end

@spec block_is_active?(VehicleOrGhost.t()) :: boolean
Expand Down
1 change: 1 addition & 0 deletions lib/realtime/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Realtime.Supervisor do
children = [
{Registry, keys: :duplicate, name: registry_name()},
{Realtime.BlockWaiverStore, name: Realtime.BlockWaiverStore.default_name()},
{Phoenix.PubSub, name: Realtime.Server.pubsub_name()},
{Realtime.Server, name: Realtime.Server.default_name()},
{Realtime.TrainVehiclesPubSub, name: Realtime.TrainVehiclesPubSub.default_name()},
{Realtime.DataStatusPubSub, name: Realtime.DataStatusPubSub.default_name()},
Expand Down

0 comments on commit 5b7be58

Please sign in to comment.