Skip to content

Commit

Permalink
Flush caches by key (#1914)
Browse files Browse the repository at this point in the history
* flush cache by key

* use scan instead of keys command

* docs

* docs

* docs

* cms cache controller test and other test stubs

* only need two of these

* tests

* cleaner test

* correct cursor

* more communicative tests

* scan on each node

* cms cache entries
  • Loading branch information
anthonyshull authored Mar 14, 2024
1 parent a41217c commit 8e40c33
Show file tree
Hide file tree
Showing 20 changed files with 413 additions and 78 deletions.
8 changes: 4 additions & 4 deletions .envrc.template
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export WIREMOCK_TRIP_PLAN_PROXY_URL=http://otp-local.mbtace.com
# export REDIS_HOST=
# export REDIS_PORT=

# These credentials control access to resetting cache entries for the CMS.
# You can set them to be whatever you want, but they'll need to match those on the Drupal side.
# export CMS_BASIC_AUTH_USERNAME=
# export CMS_BASIC_AUTH_PASSWORD=
# These credentials control access to resetting cache entries.
# You can set them to be whatever you want, but they'll need to match external users like Drupal.
# export BASIC_AUTH_USERNAME=
# export BASIC_AUTH_PASSWORD=
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import Config

config :elixir, ansi_enabled: true

config :dotcom, :redis, Dotcom.Cache.Multilevel.Redis
config :dotcom, :redix, Redix
config :dotcom, :redix_pub_sub, Redix.PubSub

for config_file <- Path.wildcard("config/{deps,dotcom}/*.exs") do
import_config("../#{config_file}")
end
Expand Down
10 changes: 5 additions & 5 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ redis_config = [
]

# This is used by PubSub, we only use the first node in the cluster
config :dotcom, :redis, redis_config[:redis_cluster][:configuration_endpoints][:conn_opts]
config :dotcom, :redis_config, redis_config[:redis_cluster][:configuration_endpoints][:conn_opts]

# Set caches that use the Redis cluster
config :dotcom, Dotcom.Cache.Multilevel,
Expand All @@ -95,15 +95,15 @@ config :dotcom, Dotcom.Cache.TripPlanFeedback.Cache, redis_config

if config_env() == :test do
config :dotcom, DotcomWeb.Router,
cms_basic_auth: [
basic_auth: [
username: "username",
password: "password"
]
else
config :dotcom, DotcomWeb.Router,
cms_basic_auth: [
username: System.get_env("CMS_BASIC_AUTH_USERNAME"),
password: System.get_env("CMS_BASIC_AUTH_PASSWORD")
basic_auth: [
username: System.get_env("BASIC_AUTH_USERNAME"),
password: System.get_env("BASIC_AUTH_PASSWORD")
]
end

Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import Config

config :dotcom, :cache, Dotcom.Cache.TestCache
config :dotcom, :redis, Redis.Mock
config :dotcom, :redix, Redix.Mock
config :dotcom, :redix_pub_sub, Redix.PubSub.Mock
config :dotcom, :trip_plan_feedback_cache, Dotcom.Cache.TestCache
16 changes: 10 additions & 6 deletions lib/cms/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ defmodule CMS.Repo do

@decorate cacheable(
cache: @cache,
key: "/cms/whats-happening",
key: "cms.repo|whats-happening",
on_error: :nothing,
opts: [ttl: 60_000]
)
Expand All @@ -122,7 +122,7 @@ defmodule CMS.Repo do

@decorate cacheable(
cache: @cache,
key: "/cms/important-notices",
key: "cms.repo|important-notices",
on_error: :nothing,
opts: [ttl: 60_000]
)
Expand Down Expand Up @@ -161,7 +161,7 @@ defmodule CMS.Repo do

@decorate cacheable(
cache: @cache,
key: "/cms/schedules/#{route_id}",
key: "cms.repo|schedules|#{route_id}",
on_error: :nothing,
opts: [ttl: @ttl]
)
Expand Down Expand Up @@ -193,7 +193,7 @@ defmodule CMS.Repo do

@decorate cacheable(
cache: @cache,
key: "/cms/route_pdfs/#{route_id}",
key: "cms.repo|route_pdfs|#{route_id}",
on_error: :nothing,
opts: [ttl: @ttl]
)
Expand Down Expand Up @@ -221,11 +221,15 @@ defmodule CMS.Repo do

@impl true
def generate(_, _, [path, %Plug.Conn.Unfetched{aspect: :query_params}]) do
"/cms/#{String.trim(path, "/")}"
key = path |> String.trim("/") |> String.replace(~r/\//, "|")

"cms.repo|#{key}"
end

def generate(_, _, [path, params]) do
"/cms/#{String.trim(path, "/")}" <> params_to_string(params)
key = path |> String.trim("/") |> String.replace(~r/\//, "|")

"cms.repo|#{key}" <> params_to_string(params)
end

defp params_to_string(params) when params == %{}, do: ""
Expand Down
86 changes: 77 additions & 9 deletions lib/dotcom/cache/multilevel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,95 @@ defmodule Dotcom.Cache.Multilevel do
use Nebulex.Cache, otp_app: :dotcom, adapter: Dotcom.Cache.Publisher
end

@cache Application.compile_env!(:dotcom, :cache)
@redix Application.compile_env!(:dotcom, :redix)

@doc """
To flush the cache, we get all *shared* keys in Redis and delete them.
These deletes will be published to the Publisher, which will then delete the keys in the Local caches.
Delete all entries where the key matches the pattern.
First, we make sure we can get a connection to Redis.
Second, we get all of the nodes in the cluster.
For each node:
We get all the keys in Redis that match the pattern.
Then, we use a cursor to stream the keys in batches of 100 using the SCAN command.
Finally, we delete all the keys with the default delete/1 function.
That way we'll delete from the Local, Redis, and publish the delete on the Publisher.
"""
def flush_keys(pattern \\ "*") do
case Application.get_env(:dotcom, :redis) |> Redix.start_link() do
{:ok, conn} -> flush_redis_keys(conn, pattern)
case Application.get_env(:dotcom, :redis_config) |> @redix.start_link() do
{:ok, conn} -> (delete_from_nodes(conn, pattern) ++ [@redix.stop(conn)]) |> all_ok()
{:error, _} -> :error
end
end

defp flush_redis_keys(conn, pattern) do
case Redix.command(conn, ["KEYS", pattern]) do
{:ok, keys} -> delete_keys(conn, keys)
defp all_ok(list) do
if list
|> List.flatten()
|> Enum.all?(fn
:ok -> true
_ -> false
end) do
:ok
else
:error
end
end

defp delete_from_node([host, port], pattern) do
case @redix.start_link(host: host, port: port) do
{:ok, conn} -> delete_stream_keys(conn, pattern)
{:error, _} -> :error
end
end

defp delete_from_nodes(conn, pattern) do
case get_nodes(conn) do
[] -> :ok
nodes -> Enum.map(nodes, fn node -> delete_from_node(node, pattern) end)
end
end

defp delete_keys(conn, keys) do
Enum.each(keys, fn key -> __MODULE__.delete(key) end)
results = Enum.map(keys, fn key -> @cache.delete(key) end)

result = @redix.stop(conn)

[result | results]
end

defp delete_stream_keys(conn, pattern) do
case stream_keys(conn, pattern) |> Enum.to_list() |> List.flatten() do
[] -> :ok
keys -> delete_keys(conn, keys)
end
end

defp get_nodes(conn) do
case @redix.command(conn, ["CLUSTER", "SLOTS"]) do
{:ok, slots} ->
slots
|> Enum.flat_map(fn slots -> Enum.slice(slots, 2..99) end)
|> Enum.map(fn slot -> Enum.slice(slot, 0..1) end)
|> Enum.sort_by(fn [_, port] -> port end)

{:error, _} ->
[]
end
end

defp scan_for_keys(conn, pattern, cursor) do
case @redix.command(conn, ["SCAN", cursor, "MATCH", pattern, "COUNT", 100]) do
{:ok, [new_cursor, keys]} -> {keys, if(new_cursor == "0", do: :stop, else: new_cursor)}
{:error, _} -> {[], :stop}
end
end

Redix.stop(conn)
defp stream_keys(conn, pattern) do
Stream.unfold("0", fn
:stop -> nil
cursor -> scan_for_keys(conn, pattern, cursor)
end)
end
end
3 changes: 2 additions & 1 deletion lib/dotcom/cache/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Dotcom.Cache.Publisher do
alias Nebulex.Adapter.Stats

@channel "dotcom:cache:publisher"
@redis Application.compile_env!(:dotcom, :redis)

def channel, do: @channel

Expand Down Expand Up @@ -72,7 +73,7 @@ defmodule Dotcom.Cache.Publisher do
def delete(meta, key, _) do
command = "eviction"

Dotcom.Cache.Multilevel.Redis.command([
@redis.command([
"PUBLISH",
@channel,
"#{command}|#{meta.publisher_id}|#{key}"
Expand Down
7 changes: 4 additions & 3 deletions lib/dotcom/cache/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Dotcom.Cache.Subscriber do
@executions %{
"eviction" => :delete
}
@redix_pub_sub Application.compile_env!(:dotcom, :redix_pub_sub)

def start_link(uuid) do
GenServer.start_link(__MODULE__, uuid, [])
Expand All @@ -25,8 +26,8 @@ defmodule Dotcom.Cache.Subscriber do
Starts a Redix.PubSub process and subscribes to the channel given by the Publisher.
"""
def init(uuid) do
Application.get_env(:dotcom, :redis)
|> Redix.PubSub.start_link()
Application.get_env(:dotcom, :redis_config)
|> @redix_pub_sub.start_link()
|> subscribe(@channel)

{:ok, uuid}
Expand Down Expand Up @@ -67,6 +68,6 @@ defmodule Dotcom.Cache.Subscriber do
end

defp subscribe({:ok, pubsub}, channel) do
Redix.PubSub.subscribe(pubsub, channel, self())
@redix_pub_sub.subscribe(pubsub, channel, self())
end
end
13 changes: 13 additions & 0 deletions lib/dotcom/redis/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Dotcom.Redis.Behaviour do
@moduledoc """
A behaviour module for NebulexRedisAdapter.
"""

@callback command(Redix.command()) ::
{:ok, Redix.Protocol.redis_value()}
| {:error, atom() | Redix.Error.t() | Redix.ConnectionError.t()}

@implementation Application.compile_env!(:dotcom, :redis)

def command(cmd), do: @implementation.command(cmd)
end
17 changes: 17 additions & 0 deletions lib/dotcom/redix/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Dotcom.Redix.Behaviour do
@moduledoc """
A behaviour module for Redix.
"""

@callback command(Redix.connection(), Redix.command()) ::
{:ok, Redix.Protocol.redis_value()}
| {:error, atom() | Redix.Error.t() | Redix.ConnectionError.t()}
@callback start_link(String.t() | keyword()) :: {:ok, pid()} | :ignore | {:error, term()}
@callback stop(Redix.connection()) :: :ok

@implementation Application.compile_env!(:dotcom, :redix)

def command(conn, cmd), do: @implementation.command(conn, cmd)
def start_link(opts), do: @implementation.start_link(opts)
def stop(conn), do: @implementation.stop(conn)
end
19 changes: 19 additions & 0 deletions lib/dotcom/redix/pub_sub/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Dotcom.Redix.PubSub.Behaviour do
@moduledoc """
A behaviour module for Redix.PubSub.
"""

@callback start_link(String.t() | keyword()) :: {:ok, pid()} | :ignore | {:error, term()}
@callback subscribe(
Redix.PubSub.connection(),
String.t() | [String.t()],
Redix.PubSub.subscriber()
) :: {:ok, reference()}

@implementation Application.compile_env!(:dotcom, :redix_pub_sub)

def start_link(opts), do: @implementation.start_link(opts)

def subscribe(conn, channels, subscriber),
do: @implementation.subscribe(conn, channels, subscriber)
end
46 changes: 46 additions & 0 deletions lib/dotcom_web/controllers/cache_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule DotcomWeb.CacheController do
@moduledoc """
A controller that allows us to interact with the cache.
Currently, we only support deleting keys from the cache.
"""

require Logger

use DotcomWeb, :controller

@cache Application.compile_env!(:dotcom, :cache)

@doc """
Flushes the cache given a key in the path.
Simply use a / in the path where you would use a | in the key.
Wildcards are supported.
Examples:
/cache/stops.repo/stop/* -> stops.repo|stop|*
/cache/stops.repo/stop/1 -> stops.repo|stop|1
"""
def flush_cache_keys(conn, %{"path" => path}) do
key = Enum.join(path, "|")

try do
Kernel.apply(@cache, flush_cache_function(@cache), [key])
rescue
e in Redix.ConnectionError ->
Logger.warning("dotcom_web.cache_controller.error error=redis-#{e.reason}")

e in Redix.Error ->
Logger.warning("dotcom_web.cache_controller.error error=redis-#{e.message}")
end

send_resp(conn, 202, "") |> halt()
end

defp flush_cache_function(cache) do
if cache.__info__(:functions) |> Keyword.has_key?(:flush_keys) do
:flush_keys
else
:delete
end
end
end
24 changes: 0 additions & 24 deletions lib/dotcom_web/controllers/cms_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ defmodule DotcomWeb.CMSController do
Page.ProjectUpdate
]

@cache Application.compile_env!(:dotcom, :cache)

@spec page(Conn.t(), map) :: Conn.t()
def page(%Conn{request_path: path, query_params: query_params} = conn, _params) do
conn = Conn.assign(conn, :try_encoded_on_404?, Map.has_key?(query_params, "id"))
Expand All @@ -45,28 +43,6 @@ defmodule DotcomWeb.CMSController do
|> handle_page_response(conn)
end

@doc """
Resets a cache key based on the URL params.
The path after /cms is joined with / to form the cache key.
So, it can be of arbitrary length.
PATCH /cms/foo/bar/baz will reset the cache key /cms/foo/bar/baz.
This corresponds to the CMS page /foo/bar/baz.
"""
def reset_cache_key(conn, %{"path" => path}) do
joined_path = Enum.join(path, "/")

try do
@cache.delete("/cms/#{joined_path}")

Logger.notice("cms.cache.delete path=/cms/#{joined_path}")
rescue
e in Redix.ConnectionError -> Logger.warning("cms.cache.delete error=redis-#{e.reason}")
e in Redix.Error -> Logger.warning("cms.cache.delete error=redis-#{e.message}")
end

send_resp(conn, 202, "") |> halt()
end

@spec handle_page_response(Page.t() | {:error, API.error()}, Conn.t()) ::
Plug.Conn.t()
defp handle_page_response(%{__struct__: struct} = page, conn)
Expand Down
Loading

0 comments on commit 8e40c33

Please sign in to comment.