Real-time event streaming for Nebulex caches
Nebulex Streams provides real-time event streaming capabilities for Nebulex caches. Subscribe to cache events like insertions, updates, and deletions as they happen, enabling reactive applications and real-time monitoring.
- 🚀 Real-time Events - React to cache operations instantly.
- 🔄 Partitioned Streams - Scale event processing across multiple processes.
- 🎯 Event Filtering - Subscribe to specific event types.
- 🌐 Distributed - Built on Phoenix.PubSub for cluster-wide events.
- 📊 Telemetry - Comprehensive observability with telemetry events.
Add nebulex_streams
to your list of dependencies in mix.exs
:
def deps do
[
{:nebulex_streams, "~> 0.1.0"}
]
end
Add streaming support to your Nebulex cache:
defmodule MyApp.Cache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Local
use Nebulex.Streams
end
Add the cache and stream to your supervision tree:
# lib/my_app/application.ex
def start(_type, _args) do
children = [
MyApp.Cache,
{Nebulex.Streams, cache: MyApp.Cache}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
Create an event handler:
defmodule MyApp.EventHandler do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, [])
end
def init(_) do
# Subscribe to all cache events
:ok = MyApp.Cache.subscribe()
{:ok, %{}}
end
def handle_info(%Nebulex.Event.CacheEntryEvent{} = event, state) do
IO.puts("Cache event: #{event.type} for key #{inspect(event.target)}")
{:noreply, state}
end
end
# Start your event handler
{:ok, _pid} = MyApp.EventHandler.start_link([])
# Perform cache operations
MyApp.Cache.put("user:123", %{name: "Alice", age: 30})
#=> Cache event: inserted for key {:key, "user:123"}
MyApp.Cache.put("user:123", %{name: "Alice", age: 31})
#=> Cache event: updated for key {:key, "user:123"}
MyApp.Cache.delete("user:123")
#=> Cache event: deleted for key {:key, "user:123"}
Subscribe to specific event types:
# Only listen for insertions and deletions
MyApp.Cache.subscribe(events: [:inserted, :deleted])
Scale event processing across multiple processes:
# In your application.ex
def start(_type, _args) do
partitions = System.schedulers_online()
children = [
MyApp.Cache,
{Nebulex.Streams, cache: MyApp.Cache, partitions: partitions},
{MyApp.EventHandler.Supervisor, partitions}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
# Event handler supervisor
defmodule MyApp.EventHandler.Supervisor do
use Supervisor
def start_link(partitions) do
Supervisor.start_link(__MODULE__, partitions, name: __MODULE__)
end
def init(partitions) do
children =
for partition <- 0..(partitions - 1) do
Supervisor.child_spec(
{MyApp.EventHandler, partition},
id: {MyApp.EventHandler, partition}
)
end
Supervisor.init(children, strategy: :one_for_one)
end
end
# Partition-aware event handler
defmodule MyApp.EventHandler do
use GenServer
def start_link(partition) do
GenServer.start_link(__MODULE__, partition)
end
def init(partition) do
# Subscribe to a specific partition
:ok = MyApp.Cache.subscribe(partition: partition)
{:ok, %{partition: partition}}
end
def handle_info(%Nebulex.Event.CacheEntryEvent{} = event, state) do
IO.puts("Partition #{state.partition}: #{event.type} for #{inspect(event.target)}")
{:noreply, state}
end
end
Route events to specific partitions based on your logic:
def custom_hash(%Nebulex.Event.CacheEntryEvent{target: {:key, key}}) do
cond do
String.starts_with?(key, "user:") -> 0 # User events to partition 0
String.starts_with?(key, "session:") -> 1 # Session events to partition 1
true -> 2 # Everything else to partition 2
end
end
# Configure with custom hash
{Nebulex.Streams,
cache: MyApp.Cache,
partitions: 3,
hash: &MyApp.custom_hash/1}
Collect and analyze event patterns:
defmodule MyApp.EventAggregator do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
:ok = MyApp.Cache.subscribe()
# Send stats every 10 seconds
:timer.send_interval(10_000, :report_stats)
{:ok, %{inserted: 0, updated: 0, deleted: 0}}
end
def handle_info(%Nebulex.Event.CacheEntryEvent{type: type}, state) do
new_state = Map.update(state, type, 1, &(&1 + 1))
{:noreply, new_state}
end
def handle_info(:report_stats, state) do
Logger.info("Cache activity: #{inspect(state)}")
{:noreply, %{inserted: 0, updated: 0, deleted: 0}}
end
end
Nebulex Streams emits telemetry events for monitoring:
# Listen for broadcast errors
:telemetry.attach(
"stream-errors",
[:nebulex, :streams, :broadcast_error],
fn _event, _measurements, metadata, _config ->
Logger.error("Stream broadcast failed: #{inspect(metadata.reason)}")
end,
nil
)
Enable debug logging to see stream activity:
config :logger, level: :debug
Nebulex Streams supports dynamic caches:
# Start a dynamic cache
{:ok, _pid} = MyApp.Cache.start_link(name: :my_dynamic_cache)
# Start streams for the dynamic cache
{:ok, _pid} = Nebulex.Streams.start_link(
cache: MyApp.Cache,
name: :my_dynamic_cache
)
# Subscribe to the dynamic cache
MyApp.Cache.subscribe(:my_dynamic_cache, [])
Contributions to Nebulex are very welcome and appreciated!
Use the issue tracker for bug reports or feature requests. Open a pull request when you are ready to contribute.
When submitting a pull request you should not update the CHANGELOG.md, and also make sure you test your changes thoroughly, include unit tests alongside new or changed code.
Before to submit a PR it is highly recommended to run mix test.ci
and ensure
all checks run successfully.
Copyright (c) 2025, Carlos Bolaños.
Nebulex.Streams
source code is licensed under the MIT License.