From f6220cbcaae8e86f777aab08ec0bddb2ff6d0bd1 Mon Sep 17 00:00:00 2001 From: Zoey de Souza Pessanha Date: Mon, 16 Sep 2024 12:12:02 -0300 Subject: [PATCH] feat: bot framework with polling strategy --- .formatter.exs | 1 + lib/proto_rune/atproto.ex | 13 ++ lib/proto_rune/bot.ex | 218 ++++++++++++++++++++++ lib/proto_rune/bot/poller.ex | 237 ++++++++++++++++++++++++ lib/proto_rune/bot/poller/state.ex | 46 +++++ lib/proto_rune/bot/server.ex | 283 +++++++++++++++++++++++++++++ lib/proto_rune/xrpc/client.ex | 13 ++ lib/proto_rune/xrpc/dsl.ex | 10 +- mix.exs | 2 +- mix.lock | 4 +- 10 files changed, 818 insertions(+), 9 deletions(-) create mode 100644 lib/proto_rune/atproto.ex create mode 100644 lib/proto_rune/bot.ex create mode 100644 lib/proto_rune/bot/poller.ex create mode 100644 lib/proto_rune/bot/poller/state.ex create mode 100644 lib/proto_rune/bot/server.ex diff --git a/.formatter.exs b/.formatter.exs index 3facc2a..44b0a38 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,5 +1,6 @@ # Used by "mix format" [ + import_deps: [:peri], inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], export: [ locals_without_parens: [ diff --git a/lib/proto_rune/atproto.ex b/lib/proto_rune/atproto.ex new file mode 100644 index 0000000..a82627d --- /dev/null +++ b/lib/proto_rune/atproto.ex @@ -0,0 +1,13 @@ +defmodule ProtoRune.Atproto do + @moduledoc false + + def parse_at_uri(<<"at://"::utf8, did::binary-size(32), "/"::utf8, rest::binary>>) do + case rest do + "app.bsky.feed.post" <> _ -> {:ok, {did, :post}} + "app.bsky.feed.generator" <> _ -> {:ok, {did, :generator}} + "app.bsky.labeler.service" <> _ -> {:ok, {did, :service}} + end + end + + def parse_at_uri(_), do: {:error, :invalid_uri} +end diff --git a/lib/proto_rune/bot.ex b/lib/proto_rune/bot.ex new file mode 100644 index 0000000..67cc23f --- /dev/null +++ b/lib/proto_rune/bot.ex @@ -0,0 +1,218 @@ +defmodule ProtoRune.Bot do + @moduledoc """ + The `ProtoRune.Bot` module provides the foundational behavior and macros for building bots + in the ProtoRune ecosystem. It defines the basic structure for bots and ensures that every bot + adheres to a consistent interface, with customizable event handling, identifier, and password + retrieval. + + The bot system integrates with the `ProtoRune.Bot.Server` to manage bot lifecycles, handle + events, and manage sessions. Bots can use different strategies for receiving notifications, + such as polling or firehose (currently under development). + + ## Usage + + To create a bot using `ProtoRune.Bot`, you need to define your bot module with the required + callbacks: `get_identifier/0`, `get_password/0`, and `handle_event/2`. + + Here is an example bot implementation: + + ```elixir + defmodule Walle do + use ProtoRune.Bot, + name: __MODULE__, + strategy: :polling + + require Logger + + @impl true + def get_identifier, do: System.get_env("IDENTIFIER") + + @impl true + def get_password, do: System.get_env("PASSWORD") + + @impl true + def handle_event(event, payload) do + Logger.info("Event: \#{event} with URI: \#{inspect(payload[:uri])}") + end + end + ``` + + In this example, `Walle` is a bot that uses the polling strategy to fetch notifications. + It retrieves its identifier and password from environment variables and logs any events it receives. + + ## Polling Strategy Events + + When using the polling strategy, the bot can receive various types of events triggered by + notifications from the Bluesky or ATProto services. Each event type corresponds to a specific + user action, and a payload containing relevant data is provided. Below is a list of possible + events and their associated payloads: + + ### Event Types and Payloads + + - **`:reply`** + - Triggered when someone replies to a post involving the bot. + - **Payload**: + - `:uri` - The URI of the post that was replied to. + - `:user` - The user who made the reply. + - `:content` - The content of the reply post. + + Example payload: + ```elixir + %{uri: "at://did:plc:1234", user: "user123", content: "Thanks for your post!"} + ``` + + - **`:quote`** + - Triggered when someone quotes the bot's post. + - **Payload**: + - `:uri` - The URI of the quoted post. + - `:user` - The user who quoted the post. + - `:content` - The content of the quote. + + Example payload: + ```elixir + %{uri: "at://did:plc:1234", user: "user456", content: "Great article!"} + ``` + + - **`:mention`** + - Triggered when the bot is mentioned in a post. + - **Payload**: + - `:uri` - The URI of the post mentioning the bot. + - `:user` - The user who mentioned the bot. + - `:content` - The content of the post where the bot was mentioned. + + Example payload: + ```elixir + %{uri: "at://did:plc:5678", user: "user789", content: "Check out @bot's post!"} + ``` + + - **`:like`** + - Triggered when someone likes a post by the bot. + - **Payload**: + - `:uri` - The URI of the liked post. + - `:user` - The user who liked the post. + - `:subject` - The subject of the post that was liked (full post data). + + Example payload: + ```elixir + %{uri: "at://did:plc:1234", user: "user123", subject: %{content: "Nice post!"}} + ``` + + - **`:repost`** + - Triggered when someone reposts content from the bot. + - **Payload**: + - `:uri` - The URI of the reposted content. + - `:user` - The user who reposted the content. + - `:post` - The post that was reposted (full post data). + + Example payload: + ```elixir + %{uri: "at://did:plc:5678", user: "user987", post: %{content: "Check this out!"}} + ``` + + - **`:follow`** + - Triggered when someone follows the bot. + - **Payload**: + - `:uri` - The URI of the follow event. + - `:user` - The user who followed the bot. + + Example payload: + ```elixir + %{uri: "at://did:plc:9876", user: "user123"} + ``` + + - **`:error`** + - Triggered when there is an error while processing an event (e.g., failed to fetch a post). + - **Payload**: + - `:reason` - An atom describing the error. + + Example payload: + ```elixir + %{reason: {:rate_limited, retry_adter :: integer}} + ``` + + ## Callbacks + + The following callbacks can be implemented by any bot module that uses `ProtoRune.Bot`: + + - `get_identifier/0`: Retrieves the bot's identifier (e.g., username or email). This is used + for logging into the service. + + - `get_password/0`: Retrieves the bot's password. This is used alongside the identifier + to authenticate the bot. + + - `handle_event/2`: Handles events that are dispatched to the bot. These events can include + mentions, replies, likes, and other interactions that the bot should process. + + The `handle_event/2` function receives: + - `event`: An atom that represents the type of event (e.g., `:mention`, `:like`, `:reply`). + - `payload`: A map containing the data related to the event, such as the URI of the post or the user who triggered the event. + + ## Optional Callbacks + + These callbacks are optional and can be overridden by the bot module: + + - `get_identifier/0`: If not implemented, a default error will be raised indicating the callback must be defined. + - `get_password/0`: Similar to `get_identifier/0`, this must be implemented by the bot if needed for authentication. + + ## Bot Lifecycle + + The bot is started using `start_link/0`, which initializes the bot server with the provided options. + The server handles the bot's session and dispatches messages or events to the bot's defined handlers. + + For instance, starting the bot would look like this: + + ```elixir + Walle.start_link() + ``` + + ## Customizing the Bot + + - **Authentication**: Bots must implement `get_identifier/0` and `get_password/0` to provide authentication details. + - **Event Handling**: The `handle_event/2` function allows bots to react to different types of events such as mentions, replies, and likes. + + ## Example Workflow + + When the bot receives a notification (for example, a new mention), the following happens: + + 1. The bot's `handle_event/2` callback is called with the event type and payload. + 2. The bot processes the event and can take actions such as replying, liking a post, or logging information. + + ## Notes + + - The current implementation supports the polling strategy for fetching notifications. Firehose-based notifications are not yet implemented. + - Bots should be designed to handle events and messages in a non-blocking manner for efficient performance. + """ + + alias ProtoRune.Bot.Server + + @callback get_identifier :: String.t() + @callback get_password :: String.t() + + @callback handle_event(event :: atom(), data :: map()) :: {:ok, term} | {:error, term} + + @optional_callbacks get_identifier: 0, get_password: 0 + + @spec __using__(Server.options_t()) :: Macro.t() + defmacro __using__(opts) do + quote do + @behaviour ProtoRune.Bot + + def start_link do + Server.start_link(unquote(opts)) + end + + # Default implementation for optional callbacks + @impl ProtoRune.Bot + def handle_event(_, _), do: :ok + + @impl ProtoRune.Bot + def get_identifier, do: raise("get_identifier/0 not implemented") + + @impl ProtoRune.Bot + def get_password, do: raise("get_password/0 not implemented") + + # Required callback + defoverridable handle_event: 2, get_identifier: 0, get_password: 0 + end + end +end diff --git a/lib/proto_rune/bot/poller.ex b/lib/proto_rune/bot/poller.ex new file mode 100644 index 0000000..bbd978b --- /dev/null +++ b/lib/proto_rune/bot/poller.ex @@ -0,0 +1,237 @@ +defmodule ProtoRune.Bot.Poller do + @moduledoc """ + A GenServer module that handles periodic polling of notifications for a bot, and dispatches + these notifications to the appropriate handler functions within the bot. + + The `Poller` connects to the ATProto or Bluesky notification systems and periodically polls + for new notifications, processes them, and dispatches them as events to the bot server. It + handles various types of notifications including replies, mentions, likes, reposts, and follows. + + ## Features + + - Periodic polling of notifications based on a customizable interval. + - Supports exponential backoff in case of rate limiting or errors. + - Handles session refresh when required. + - Dispatches notifications like replies, mentions, quotes, likes, reposts, and follows to the bot. + - Extensible to handle other types of notifications and custom behavior. + + ## Options + + - `:name` (required) - The name of the GenServer instance. + - `:interval` (required) - The polling interval in seconds for checking new notifications. + - `:process_from` - Start polling from a specific date/time. + - `:last_seen` - The last seen date of notifications. + - `:cursor` - The cursor to fetch subsequent notifications from the API. + - `:attempt` - Number of polling attempts, used for backoff. + - `:server_pid` (required) - The server process that handles events from the poller. + - `:session` (required) - The session information used to authenticate API requests. + + ## Functions + + - `start_link/1`: Starts the `Poller` process with the given options. + - `poll_notifications/1`: Fetches the latest notifications from the service and handles them. + - `handle_notifications/2`: Dispatches each notification to the appropriate event handler in the bot server. + - `handle_rate_limited/2`: Handles the rate-limiting case by applying exponential backoff before the next poll. + - `handle_error/2`: Sends error events to the bot server. + - `dispatch_notification/2`: Dispatches different types of notifications (e.g., replies, quotes, mentions, likes, reposts, follows) to the bot server. + + ## Example + + You can start the poller like this: + + ```elixir + ProtoRune.Bot.Poller.start_link([ + name: :my_bot_poller, + interval: 30, + session: my_session, + server_pid: self() + ]) + ``` + + The poller will then periodically fetch notifications and dispatch them to the bot server based on the event type. + + ## Backoff Strategy + + The poller implements an exponential backoff strategy when rate-limited or in case of errors. + The backoff starts with the defined `interval` and increases exponentially with each failed attempt, + up to a maximum of 5 minutes. + + ## Internal State + + The `State` struct is used to keep track of: + - `name`: The name of the poller process. + - `interval`: The polling interval in seconds. + - `last_seen`: The last notification timestamp. + - `cursor`: API cursor for fetching new notifications. + - `attempt`: The number of failed attempts. + - `session`: The current session for API requests. + - `server_pid`: The PID of the server handling the notifications. + """ + + use GenServer + + alias ProtoRune.Atproto + alias ProtoRune.Bot.Poller.State + alias ProtoRune.Bsky + + require Logger + + @type option :: + {:name, atom} + | {:interval, integer} + | {:process_from, NaiveDateTime.t()} + | {:last_seen, Date.t()} + | {:cursor, String.t()} + | {:attempt, integer} + | {:server_pid, pid} + | {:session, map} + @type kwargs :: nonempty_list(option) + + @spec start_link(kwargs) :: GenServer.on_start() + def start_link(opts) do + name = Access.fetch!(opts, :name) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @impl true + def init(opts) do + {:ok, state} = State.new(opts) + {:ok, state, {:continue, :poll}} + end + + @impl true + def handle_continue(:poll, %State{} = state) do + schedule_polling(state) + {:ok, state} = poll_notifications(state) + {:noreply, state} + end + + @impl true + def handle_info(:poll, %State{} = state) do + schedule_polling(state) + {:ok, state} = poll_notifications(state) + {:noreply, state} + end + + def handle_info({:refresh_session, session}, %State{} = state) do + {:noreply, %{state | session: session}} + end + + @impl true + def format_status({:state, state}) do + {:state, Map.take(state, [:name, :interval, :process_from, :last_seen, :cursor, :attempt])} + end + + def format_status(key), do: key + + @spec poll_notifications(State.t()) :: {:ok, State.t()} + defp poll_notifications(%State{} = state) do + case Bsky.Notification.list_notifications(state.session) do + {:ok, data} -> handle_notifications(state, data) + {:error, {:rate_limited, retry_after}} -> handle_rate_limited(state, retry_after) + {:error, reason} -> handle_error(state, reason) + end + end + + defp handle_notifications(state, %{notifications: []}), do: {:noreply, state} + + defp handle_notifications(%State{} = state, data) do + indexed_at = List.first(data[:notifications])[:indexed_at] + {:ok, indexed_at} = NaiveDateTime.from_iso8601(indexed_at) + last_seen = state.last_seen || indexed_at + + Task.start(fn -> + for notification <- data[:notifications], + NaiveDateTime.compare(indexed_at, last_seen) == :gt do + dispatch_notification(state, notification) + end + end) + + {:ok, %{state | last_seen: indexed_at, cursor: data[:cursor]}} + end + + defp handle_rate_limited(%State{} = state, retry_after) do + interval = retry_after || backoff(state) + Process.send_after(self(), :poll, interval) + {:ok, %{state | attempt: state.attempt + 1}} + end + + defp handle_error(%State{server_pid: pid} = state, reason) do + send(pid, {:handle_event, :error, %{reason: reason}}) + {:ok, state} + end + + defp dispatch_notification(%State{} = state, %{reason: "reply", uri: uri}) do + # TODO ignore replies that aren't to the bot + case Bsky.Feed.get_post_thread(state.session, uri: uri) do + {:ok, data} -> send(state.server_pid, {:handle_event, :reply, data}) + {:error, reason} -> send(state.server_pid, {:handle_event, :error, reason}) + end + end + + defp dispatch_notification(%State{} = state, %{reason: "quote", uri: uri}) do + case Bsky.Feed.get_post_thread(state.session, uri: uri) do + {:ok, data} -> send(state.server_pid, {:handle_event, :quote, data}) + {:error, reason} -> send(state.server_pid, {:handle_event, :error, reason}) + end + end + + defp dispatch_notification(%State{} = state, %{reason: "mention", uri: uri}) do + case Bsky.Feed.get_post_thread(state.session, uri: uri) do + {:ok, data} -> send(state.server_pid, {:handle_event, :mention, data}) + {:error, reason} -> send(state.server_pid, {:handle_event, :error, reason}) + end + end + + defp dispatch_notification(%State{} = state, %{reason: "repost"} = notf) do + case Bsky.Feed.get_post_thread(state.session, uri: notf.reason_subject) do + {:ok, data} -> + send( + state.server_pid, + {:handle_event, :repost, %{user: notf.author, post: data, uri: notf.uri}} + ) + + {:error, reason} -> + send(state.server_pid, {:handle_event, :error, reason}) + end + end + + defp dispatch_notification( + %State{} = state, + %{reason: "like", reason_subject: reason_subject} = notf + ) do + {:ok, subject} = Atproto.parse_at_uri(reason_subject) + + if match?({_, :post}, subject) do + case Bsky.Feed.get_post_thread(state.session, uri: reason_subject) do + {:ok, data} -> + send( + state.server_pid, + {:handle_event, :like, %{uri: notf.uri, user: notf.author, subject: data}} + ) + + {:error, reason} -> + send(state.server_pid, {:handle_event, :error, reason}) + end + end + end + + defp dispatch_notification(%State{} = state, %{reason: "follow"} = notf) do + send(state.server_pid, {:handle_event, :follow, %{user: notf.author, uri: notf.uri}}) + end + + defp dispatch_notification(_state, %{reason: reason}) do + Logger.warning("[#{__MODULE__}] ==> Unhandled notification reason: #{inspect(reason)}") + end + + @max_backoff :timer.minutes(5) + + # implement exponential backoff + defp backoff(%State{interval: interval, attempt: attempt}) do + min(@max_backoff, :timer.seconds(interval) ** attempt) + end + + defp schedule_polling(%State{interval: interval}) do + Process.send_after(self(), :poll, :timer.seconds(interval)) + end +end diff --git a/lib/proto_rune/bot/poller/state.ex b/lib/proto_rune/bot/poller/state.ex new file mode 100644 index 0000000..6fa0667 --- /dev/null +++ b/lib/proto_rune/bot/poller/state.ex @@ -0,0 +1,46 @@ +defmodule ProtoRune.Bot.Poller.State do + @moduledoc false + + import Peri + + @type t :: %__MODULE__{ + name: atom, + interval: integer, + process_from: NaiveDateTime.t(), + server_pid: pid, + session: map, + last_seen: NaiveDateTime.t() | nil, + cursor: String.t() | nil, + attempt: integer + } + + defschema(:state_t, %{ + name: {:required, :atom}, + interval: {:required, :integer}, + process_from: {:required, :naive_datetime}, + last_seen: :date, + cursor: :string, + attempt: {:integer, {:default, 0}}, + server_pid: {:required, :pid}, + session: {:required, :map} + }) + + @enforce_keys [:name, :interval, :process_from, :server_pid, :session] + defstruct [ + :name, + :interval, + :process_from, + :last_seen, + :cursor, + :attempt, + :server_pid, + :session + ] + + @spec new(Enumerable.t()) :: {:ok, t} | {:error, term} + def new(params) do + with {:ok, data} <- state_t(params) do + {:ok, struct(__MODULE__, data)} + end + end +end diff --git a/lib/proto_rune/bot/server.ex b/lib/proto_rune/bot/server.ex new file mode 100644 index 0000000..0103176 --- /dev/null +++ b/lib/proto_rune/bot/server.ex @@ -0,0 +1,283 @@ +defmodule ProtoRune.Bot.Server do + @moduledoc """ + The `ProtoRune.Bot.Server` module is responsible for managing bot processes in ProtoRune. + It handles bot initialization, session management, and event/message dispatching. This + module also integrates with the polling system to retrieve real-time notifications from + ATProto and Bluesky services. + + The bot server can operate in two modes: + - **Polling**: Periodically fetches notifications using the `ProtoRune.Bot.Poller` module. + - **Firehose**: (Not yet implemented) Stream real-time events using a websocket-like connection. + + ## Features + + - **Bot Lifecycle Management**: The server manages the entire bot lifecycle, from login + and session refresh to handling messages and events. + - **Polling Strategy**: Supports polling for notifications at regular intervals via the + `ProtoRune.Bot.Poller`. + - **Session Management**: Automatically handles session creation, refresh, and expiration. + - **Event and Message Handling**: Provides a unified interface for handling events and messages + via `handle_message/1` and `handle_event/2`. + + ## Options + + - `:name` (required) - The name of the bot process. + - `:lang` - A list of languages the bot supports (default: `["en"]`). + - `:service` - The service endpoint the bot will connect to (default: `"https://bsky.social"`). + - `:identifier` - The bot's login identifier (e.g., email or username). + - `:password` - The bot's password for login. + - `:polling` - Polling configuration (e.g., interval and process_from). + - `:firehose` - Firehose configuration (not implemented yet). + - `:strategy` - The bot's strategy for receiving notifications (`:polling` or `:firehose`). + + ## Polling Configuration + + Polling can be configured with the following options: + - `:interval` - How often (in seconds) the bot should poll for notifications (default: 5 seconds). + - `:process_from` - Start processing notifications from a specific timestamp (default: current time). + + Example: + ```elixir + ProtoRune.Bot.Server.start_link( + name: :my_bot, + strategy: :polling, + service: "https://bsky.social", + polling: %{interval: 10} + ) + ``` + + ## Firehose Configuration (Not Implemented) + + While not yet available, the firehose strategy will enable real-time notifications using a + websocket connection. Firehose configuration includes: + - `:relay_uri` - The WebSocket URI for the relay server. + - `:auto_reconnect` - Automatically reconnect if the connection drops (default: true). + - `:cursor` - The starting cursor for reading the stream. + + ## Functions + + - `start_link/1`: Starts the bot process with the given configuration options. + - `handle_message/2`: Handles incoming messages for the bot. + - `handle_event/3`: Handles events dispatched to the bot. + - `format_status/1`: Formats the bot's internal state for debugging. + + ## Session Management + + The bot manages its session by authenticating with the ATProto server upon startup. + It also refreshes the session token periodically. If the session expires or cannot be + refreshed, the bot will stop. + + ## Example + + ```elixir + ProtoRune.Bot.Server.start_link([ + name: :my_bot, + strategy: :polling, + service: "https://bsky.social", + identifier: "my-bot-id", + password: "super-secret-password" + ]) + ``` + + This will start a bot that uses the polling strategy to retrieve notifications from the + Bsky service every 5 seconds. + + The bot can handle messages and events like this: + + ```elixir + ProtoRune.Bot.Server.handle_message(:my_bot, "hello") + ProtoRune.Bot.Server.handle_event(:my_bot, :user_joined, %{user: "user123"}) + ``` + + ## Internal State + + The server maintains a state that includes: + - `name`: The bot's name. + - `service`: The endpoint to connect to. + - `session`: The session data for making authenticated requests. + - `poller`: The PID of the polling process (if using the polling strategy). + - `langs`: The languages the bot supports. + + ## Error Handling + + - The bot gracefully handles errors such as rate limits and API failures by retrying or + stopping the process when necessary. + - Errors are dispatched as events to the bot, allowing custom error handling. + """ + + use GenServer + + import Peri + + alias ProtoRune.Atproto + alias ProtoRune.Bot.Poller + alias ProtoRune.Bsky + + require Logger + + @type polling_t :: %{ + optional(:interval) => integer, + optional(:process_from) => NaveDateTime.t() + } + + @type firehose_t :: %{ + optional(:relay_uri) => String.t(), + optional(:auto_reconnect) => boolean, + optional(:cursor) => String.t() + } + + @type option :: + {:name, atom} + | {:lang, list(String.t())} + | {:service, String.t()} + | {:identifier, String.t() | nil} + | {:password, String.t() | nil} + | {:polling, polling_t | nil} + | {:firehose, firehose_t | nil} + | {:strategy, :polling | :firehose} + + @type kwargs :: nonempty_list(option) + + @type mapargs :: %{ + required(:name) => atom, + required(:strategy) => :polling | :firehose, + required(:service) => String.t(), + optional(:langs) => list(String.t()), + optional(:identifier) => String.t(), + optional(:password) => String.t(), + optional(:polling) => polling_t, + optional(:firehose) => firehose_t + } + + @type options_t :: kwargs | mapargs + + defschema(:options_t, %{ + name: {:required, :atom}, + langs: {{:list, :string}, {:default, ["en"]}}, + service: {:string, {:default, "https://bsky.social"}}, + identifier: :string, + password: :string, + strategy: {{:enum, [:polling, :firehose]}, {:default, :polling}}, + polling: {:cond, &(&1.strategy == :polling), get_schema(:polling_t), nil}, + firehose: {:cond, &(&1.strategy == :firehose), get_schema(:firehose_t), nil} + }) + + defschema(:polling_t, %{ + interval: {:integer, {:default, 5}}, + process_from: {:naive_datetime, {:default, &NaiveDateTime.utc_now/0}} + }) + + defschema(:firehose_t, %{ + relay_uri: {:string, {:default, "wss://bsky.network"}}, + auto_reconnect: {:boolean, {:default, true}}, + cursor: {:string, {:default, "latest"}} + }) + + @spec start_link(options_t) :: {:ok, pid} | {:error, term} + def start_link(opts) do + data = options_t!(opts) + + if data[:strategy] == :firehose do + raise "Firehose strategy not implemented yet." + end + + GenServer.start_link(__MODULE__, data, name: data[:name]) + end + + @spec handle_message(pid | atom, String.t()) :: :ok + def handle_message(name, message) do + GenServer.cast(name, {:handle_message, message}) + end + + @spec handle_event(pid | atom, atom, map) :: :ok + def handle_event(name, event, payload \\ %{}) do + GenServer.cast(name, {:handle_event, event, payload}) + end + + @impl true + def init(data) do + Logger.info("[#{__MODULE__}] ==> Starting bot #{data[:name]} at #{data[:service]}") + {:ok, data, {:continue, :fetch_bot_profile}} + end + + @impl true + def handle_continue(:fetch_bot_profile, state) do + identifier = state[:identifier] || state[:name].get_identifier() + password = state[:password] || state[:name].get_password() + + with {:ok, session} <- + Atproto.Server.create_session(identifier: identifier, password: password), + {:ok, profile} <- Bsky.Actor.get_profile(session, actor: session.did) do + schedule_refresh_session() + + {:noreply, + state + |> Map.put(:did, profile[:did]) + |> Map.put(:session, Map.take(session, [:access_jwt, :refresh_jwt])), + {:continue, :start_listener}} + else + err -> {:stop, err, state} + end + end + + def handle_continue(:start_listener, %{strategy: :polling} = state) do + interval = state[:polling][:interval] + process_from = state[:polling][:process_from] + name = :"#{state[:name]}_poller" + + {:ok, pid} = + Poller.start_link( + server_pid: self(), + name: name, + interval: interval, + process_from: process_from, + session: state[:session] + ) + + {:noreply, Map.put(state, :poller, pid)} + end + + @impl true + def handle_cast({:handle_message, message}, %{name: bot} = state) do + bot.handle_message(message) + {:noreply, state} + end + + @impl true + def handle_cast({:handle_event, event, payload}, %{name: bot} = state) do + bot.handle_event(event, payload) + {:noreply, state} + end + + @impl true + def handle_info({:handle_event, event, payload}, state) do + handle_event(state[:name], event, payload) + {:noreply, state} + end + + @impl true + def handle_info(:refresh_session, state) do + Logger.info("[#{__MODULE__}] ==> Refreshing session for bot #{state[:name]}") + + case Atproto.Server.refresh_session(state[:session]) do + {:ok, session} -> + send(state[:poller], {:refresh_session, session}) + schedule_refresh_session() + {:noreply, Map.put(state, :session, Map.take(session, [:access_jwt, :refresh_jwt]))} + + err -> + {:stop, err, state} + end + end + + @impl true + def format_status({:state, state}) do + {:state, Map.take(state, [:name, :service, :profile, :langs])} + end + + def format_status(key), do: key + + defp schedule_refresh_session do + Process.send_after(self(), :refresh_session, :timer.minutes(5)) + end +end diff --git a/lib/proto_rune/xrpc/client.ex b/lib/proto_rune/xrpc/client.ex index 1b5b3e5..f79e956 100644 --- a/lib/proto_rune/xrpc/client.ex +++ b/lib/proto_rune/xrpc/client.ex @@ -46,6 +46,19 @@ defmodule ProtoRune.XRPC.Client do defp parse_http({:error, err}), do: {:error, err} defp parse_http({:ok, %{status: 401}}), do: {:error, :unauthorized} defp parse_http({:ok, %{status: 404}}), do: {:error, :not_found} + defp parse_http({:ok, %{status: 403}}), do: {:error, :forbidden} + defp parse_http({:ok, %{status: 413}}), do: {:error, :payload_too_large} + defp parse_http({:ok, %{status: 501}}), do: {:error, :not_implemented} + defp parse_http({:ok, %{status: 502}}), do: {:error, :bad_gateway} + defp parse_http({:ok, %{status: 503}}), do: {:error, :service_unavailable} + defp parse_http({:ok, %{status: 504}}), do: {:error, :gateway_timeout} + + defp parse_http({:ok, %{status: 429} = resp}) do + retry_after = Req.Response.get_header(resp, "retry-after") + {:error, {:rate_limited, retry_after}} + end + + defp parse_http({:ok, %{status: 500, body: body}}), do: {:error, {:server_error, body}} defp parse_http({:ok, %{status: 400, body: error}}) do {:error, apply_case_map(error, &Case.snakelize/1)} diff --git a/lib/proto_rune/xrpc/dsl.ex b/lib/proto_rune/xrpc/dsl.ex index e0a3312..e33d148 100644 --- a/lib/proto_rune/xrpc/dsl.ex +++ b/lib/proto_rune/xrpc/dsl.ex @@ -156,14 +156,12 @@ defmodule ProtoRune.XRPC.DSL do end unquote(refresh) -> - def unquote(fun)(%{refresh_token: refresh}, params) do + def unquote(fun)(%{refresh_token: refresh}) do proc = Procedure.new(unquote(method)) - with {:ok, proc} <- Procedure.put_body(proc, params) do - proc - |> Procedure.put_header(:authorization, "Bearer #{refresh}") - |> Client.execute() - end + proc + |> Procedure.put_header(:authorization, "Bearer #{refresh}") + |> Client.execute() end true -> diff --git a/mix.exs b/mix.exs index fc7306d..c359a9f 100644 --- a/mix.exs +++ b/mix.exs @@ -51,7 +51,7 @@ defmodule ProtoRune.MixProject do defp docs do [ - main: "README.md", + main: "readme", extras: ["README.md"] ] end diff --git a/mix.lock b/mix.lock index 3c1dfd8..1b4a19a 100644 --- a/mix.lock +++ b/mix.lock @@ -7,7 +7,7 @@ "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, - "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, @@ -18,7 +18,7 @@ "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, - "peri": {:hex, :peri, "0.2.9", "1f83c04a2957d354221462468a0435ef0ed581505794be43c8cf39c7e9779158", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:stream_data, "~> 1.1", [hex: :stream_data, repo: "hexpm", optional: true]}], "hexpm", "2929a65cc45d432dc13dc8348f270a6c5dcaec8f361f3236a0208b3976a78bc9"}, + "peri": {:hex, :peri, "0.2.10", "38cef84abda151d3cca958206307b340731a34e6a2d64f7cd78d5c533b0ae090", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:stream_data, "~> 1.1", [hex: :stream_data, repo: "hexpm", optional: true]}], "hexpm", "bc495b2524da750eb2db9b47702279f813a1dabec1effb1219ee9046f436aadb"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, }