Skip to content

Commit

Permalink
As per @KayEss's suggestion: notification struct.
Browse files Browse the repository at this point in the history
Yes, that's better.

Still a few details to be ironed out: Should we pass the `notification`
by value, by `const` reference, or by rvalue reference?  There's things
to be said for each option, but I like the simplicity of passing by
value.  Ultimately I doubt it matters much.

Another one is the name for the `pqxx::connection &` field.  We can't
call it `connection` because that's a type name.  (Probably technically
allowed but some compilers will complain.)  I've been using "cx" as the
standard abbreviation for "connection" — but I don't really feel like
breaking consistency with other class members today.  Perhaps it makes
sense to settle on "conn" for class members and "cx" for local
variables.
  • Loading branch information
jtv committed Dec 18, 2024
1 parent bfdee69 commit 4265818
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 30 deletions.
83 changes: 65 additions & 18 deletions include/pqxx/connection.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,54 @@ class const_connection_largeobject;

namespace pqxx
{
/// An incoming notification.
/** PostgreSQL extends SQL with a "message bus" using the `LISTEN` and `NOTIFY`
* commands. In libpqxx you use @ref connection::listen() and (optionally)
* @ref transaction_base::notify().
*
* When you receive a notification for which you have been listening, your
* handler receives it in the form of a `notification` object.
*
* @warning These structs are meant for extremely short lifespans: the fields
* reference memory that may become invalid as soon as your handler has been
* called.
*/
struct notification
{
/// The connection which received the notification.
/** There will be no _backend_ transaction active on the connection when your
* handler gets called, but there may be a @ref nontransaction. (This is a
* special transaction type in libpqxx which does not start a transaction on
* the backend.)
*/
connection &conn;

/// Channel name.
/** The notification logic will only pass the notification to a handler which
* was registered to listen on this exact name.
*/
zview channel;

/// Optional payload text.
/** If the notification did not carry a payload, the string will be empty.
*/
zview payload;

/// Process ID of the backend that sent the notification.
/** This can be useful in situations where a multiple clients are listening
* on the same channel, and also send notifications on it.
*
* In those situations, it often makes sense for a client to ignore its own
* incoming notifications, but handle all others on the same channel in some
* way.
*
* To check for that, compare this process ID to the return value of the
* connection's `backendpid()`.
*/
int backend_pid;
};


/// Flags for skipping initialisation of SSL-related libraries.
/** When a running process makes its first SSL connection to a database through
* libpqxx, libpq automatically initialises the OpenSSL and libcrypto
Expand Down Expand Up @@ -623,29 +671,28 @@ public:
*/
int await_notification(std::time_t seconds, long microseconds);

/// A handler callback for notifications.
/** A notification handler takes 3 arguments: the _channel name_ to which
* the notification was sent; the process ID of the backend that sent the
* notification; and an optional "payload" text. If there was no payload,
* you will receive an empty string there.
*
* Why the backend process ID? Because sometimes you may want to send out
* a notification for every client listening on a channel, except yourself.
* If you sent out the notification yourself in the same session, then the
* handler will receive a process ID that's identical to the one that your
* connection's @ref backendpid() returns.
/// A handler callback for incoming notifications on a given channel.
/** Your callback must accept a @ref notification object. This object can
* and will exist only for the duration of the handling of that one incoming
* notification.
*
* The handler can be "empty," i.e. contain no code. Setting an empty
* handler on a channel disables listening on that channel.
*/
using notification_handler = std::function<void(zview, int, zview)>;
using notification_handler = std::function<void(notification)>;

/// Attach a handler to a notification channel.
/** Issues a `LISTEN` SQL command for channel `name`, and stores `handler`
/** Issues a `LISTEN` SQL command for channel `channel`, and stores `handler`
* as the callback for when a notification comes in on that channel.
*
* The handler is a `std::function` (see @ref notification_handler), but you
* can simply pass in a lambda with the right parameters. Your handler
* probably needs to interact with your application's data; the simple way to
* get that working is to pass a lambda with a closure referencing the data
* items you need.
* can simply pass in a lambda with the right parameters, or a function, or
* an object of a type you define that happens to implemnt the right function
* call operator.
*
* Your handler probably needs to interact with your application's data; the
* simple way to get that working is to pass a lambda with a closure
* referencing the data items you need.
*
* If the handler is empty (the default), then that stops the connection
* listening on the channel. It cancels your subscription, so to speak.
Expand All @@ -656,7 +703,7 @@ public:
* different handlers on the same channel, then the second overwrites the
* first.
*/
void listen(std::string_view name, notification_handler handler = {});
void listen(std::string_view channel, notification_handler handler = {});

//@}

Expand Down
7 changes: 4 additions & 3 deletions src/connection.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,9 @@ int pqxx::connection::get_notifs()
{
notifs++;

std::string const name{N->relname};
std::string const channel{N->relname};

auto const Hit{m_receivers.equal_range(name)};
auto const Hit{m_receivers.equal_range(channel)};
if (Hit.second != Hit.first)
{
std::string const payload{N->extra};
Expand Down Expand Up @@ -654,8 +654,9 @@ int pqxx::connection::get_notifs()
}

auto const handler{m_notification_handlers.find(N->relname)};
// C++20: Use "dot notation" to initialise struct fields.
if (handler != std::end(m_notification_handlers))
(handler->second)(name, N->be_pid, N->extra);
(handler->second)(notification{*this, channel, N->extra, N->be_pid});

N.reset();
}
Expand Down
4 changes: 2 additions & 2 deletions test/test04.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ void test_004()
int backend_pid{0};
cx.listen(
channel,
[&backend_pid](pqxx::zview, int pid, pqxx::zview) noexcept
{ backend_pid = pid; });
[&backend_pid](pqxx::notification n) noexcept
{ backend_pid = n.backend_pid; });

// Trigger our notification receiver.
pqxx::perform([&cx, &channel] {
Expand Down
4 changes: 1 addition & 3 deletions test/test78.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ void test_078()
bool done{false};

std::string const channel{"my listener"};
cx.listen(
channel,
[&done](pqxx::zview, int, pqxx::zview) noexcept { done = true; });
cx.listen(channel, [&done](pqxx::notification) noexcept { done = true; });

pqxx::perform([&cx, &channel] {
pqxx::nontransaction tx{cx};
Expand Down
4 changes: 2 additions & 2 deletions test/test79.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ void test_079()

cx.listen(
channel,
[&backend_pid](pqxx::zview, int pid, pqxx::zview) noexcept
{ backend_pid = pid; });
[&backend_pid](pqxx::notification n) noexcept
{ backend_pid = n.backend_pid; });

// First see if the timeout really works: we're not expecting any notifs
int notifs{cx.await_notification(0, 1)};
Expand Down
4 changes: 2 additions & 2 deletions test/test87.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ void test_087()

cx.listen(
channel,
[&backend_pid](pqxx::zview, int pid, pqxx::zview) noexcept
{ backend_pid = pid; });
[&backend_pid](pqxx::notification n) noexcept
{ backend_pid = n.backend_pid; });

pqxx::perform([&cx, &channel] {
pqxx::work tx{cx};
Expand Down

0 comments on commit 4265818

Please sign in to comment.