From 5b7be585ad96ab8127716c3c2114c0ea913dc1bd Mon Sep 17 00:00:00 2001 From: Eddie Maldonado Date: Thu, 26 Oct 2023 15:50:15 -0400 Subject: [PATCH] refactor: use PubSub for Realtime.Server --- lib/realtime/server.ex | 41 ++++++++++++++++++-------------------- lib/realtime/supervisor.ex | 1 + 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/lib/realtime/server.ex b/lib/realtime/server.ex index 443757ef9c..fa749faa26 100644 --- a/lib/realtime/server.ex +++ b/lib/realtime/server.ex @@ -81,6 +81,9 @@ defmodule Realtime.Server do @spec default_name() :: GenServer.name() def default_name(), do: Realtime.Server + @spec pubsub_name() :: Phoenix.PubSub.t() + def pubsub_name(), do: Realtime.PubSub + @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(start_link_opts) do GenServer.start_link(__MODULE__, [], start_link_opts) @@ -210,17 +213,20 @@ defmodule Realtime.Server do @spec subscribe(GenServer.server(), {:run_ids, [Run.id()]}) :: [VehicleOrGhost.t()] @spec subscribe(GenServer.server(), {:block_ids, [Block.id()]}) :: [VehicleOrGhost.t()] @spec subscribe(GenServer.server(), {:alerts, Route.id()}) :: [String.t()] + defp subscribe(server, {:alerts, _route_id} = subscription_key) do + {pubsub, ets} = GenServer.call(server, :subscription_info) + Phoenix.PubSub.subscribe(pubsub, "realtime_alerts") + lookup({ets, subscription_key}) + end + defp subscribe(server, subscription_key) do - {registry_key, ets} = GenServer.call(server, :subscription_info) - Registry.register(Realtime.Registry, registry_key, subscription_key) + {pubsub, ets} = GenServer.call(server, :subscription_info) + Phoenix.PubSub.subscribe(pubsub, "realtime_vehicles") lookup({ets, subscription_key}) end defp update_subscription(server, {:limited_search, _search_params} = subscription_key) do - {registry_key, ets} = GenServer.call(server, :subscription_info) - # Replace the old search subscription with the new one - Registry.unregister_match(Realtime.Registry, registry_key, {:limited_search, %{}}) - Registry.register(Realtime.Registry, registry_key, subscription_key) + {_pubsub, ets} = GenServer.call(server, :subscription_info) lookup({ets, subscription_key}) end @@ -401,8 +407,7 @@ defmodule Realtime.Server do @impl true def handle_call(:subscription_info, _from, %__MODULE__{} = state) do - registry_key = self() - {:reply, {registry_key, state.ets}, state} + {:reply, {pubsub_name(), state.ets}, state} end def handle_call(:ets, _from, %__MODULE__{ets: ets} = state) do @@ -541,21 +546,13 @@ defmodule Realtime.Server do @spec broadcast(t(), :vehicles | :alerts) :: :ok defp broadcast(state, data_type) do - registry_key = self() - - Registry.dispatch(Realtime.Supervisor.registry_name(), registry_key, fn entries -> - Enum.each(entries, fn {pid, subscripition_key} -> - if (data_type == :alerts and match?({:alerts, _}, subscripition_key)) or - (data_type == :vehicles and !match?({:alerts, _}, subscripition_key)) do - send_data(pid, state) - end - end) - end) - end + topic = + case data_type do + :vehicles -> "realtime_vehicles" + :alerts -> "realtime_alerts" + end - @spec send_data(pid, t) :: broadcast_message - defp send_data(pid, state) do - send(pid, {:new_realtime_data, state.ets}) + Phoenix.PubSub.broadcast(pubsub_name(), topic, {:new_realtime_data, state.ets}) end @spec block_is_active?(VehicleOrGhost.t()) :: boolean diff --git a/lib/realtime/supervisor.ex b/lib/realtime/supervisor.ex index 5da03874c1..646057ea71 100644 --- a/lib/realtime/supervisor.ex +++ b/lib/realtime/supervisor.ex @@ -13,6 +13,7 @@ defmodule Realtime.Supervisor do children = [ {Registry, keys: :duplicate, name: registry_name()}, {Realtime.BlockWaiverStore, name: Realtime.BlockWaiverStore.default_name()}, + {Phoenix.PubSub, name: Realtime.Server.pubsub_name()}, {Realtime.Server, name: Realtime.Server.default_name()}, {Realtime.TrainVehiclesPubSub, name: Realtime.TrainVehiclesPubSub.default_name()}, {Realtime.DataStatusPubSub, name: Realtime.DataStatusPubSub.default_name()},