Skip to content

Commit

Permalink
Support moving of stream_to. (#707)
Browse files Browse the repository at this point in the history
Fixes #706.

Finally biting the bullet and supporting moves of transaction_focus, which requires dealing with that backpointer from the transaction back to the focus object.
  • Loading branch information
jtv authored Jul 6, 2023
1 parent 8c8977f commit 68c93b6
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 29 deletions.
3 changes: 3 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
- Support for `PQinitOpenSSL()`. (#678)
- Slightly more helpful error for unsupported conversions. (#695)
- Replace some C++ feature tests with C++20 feature macros.
- Give `stream_to` a move constructor. (#706)
- Support move in `stream_to`. (#706)
>>>>>>> 42a46214 (Support move assignment as well.)
7.7.4
- `transaction_base::for_each()` is now called `for_stream()`. (#580)
- New `transaction_base::for_query()` is similar, but non-streaming. (#580)
Expand Down
3 changes: 3 additions & 0 deletions include/pqxx/internal/stream_query.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public:
/// Execute `query` on `tx`, stream results.
inline stream_query(transaction_base &tx, std::string_view query);

stream_query(stream_query &&) = delete;
stream_query &operator=(stream_query &&) = delete;

~stream_query() noexcept
{
try
Expand Down
2 changes: 1 addition & 1 deletion include/pqxx/internal/stream_query_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ stream_query<TYPE...>::read_line() &
{
assert(not done());

internal::gate::connection_stream_from gate{m_trans.conn()};
internal::gate::connection_stream_from gate{m_trans->conn()};
try
{
auto line{gate.read_copy_line()};
Expand Down
3 changes: 3 additions & 0 deletions include/pqxx/pipeline.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public:

pipeline(pipeline const &) = delete;
pipeline &operator=(pipeline const &) = delete;
pipeline(pipeline &&) = delete;
pipeline &operator=(pipeline &&) = delete;


/// Start a pipeline.
explicit pipeline(transaction_base &t) : transaction_focus{t, s_classname}
Expand Down
3 changes: 3 additions & 0 deletions include/pqxx/stream_from.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public:
using raw_line =
std::pair<std::unique_ptr<char, void(*)(void const *)>, std::size_t>;

stream_from(stream_from &&) = delete;
stream_from &operator=(stream_from &&) = delete;

/// Factory: Execute query, and stream the results.
/** The query can be a SELECT query or a VALUES query; or it can be an
* UPDATE, INSERT, or DELETE with a RETURNING clause.
Expand Down
10 changes: 10 additions & 0 deletions include/pqxx/stream_to.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ public:
transaction_base &, std::string_view table_name, Iter columns_begin,
Iter columns_end);

explicit stream_to(stream_to &&other) :
// (This first step only moves the transaction_focus base-class object.)
transaction_focus{std::move(other)},
m_finished{other.m_finished},
m_buffer{std::move(other.m_buffer)},
m_field_buf{std::move(other.m_field_buf)},
m_finder{other.m_finder}
{
other.m_finished = true;
}
~stream_to() noexcept;

/// Does this stream still need to @ref complete()?
Expand Down
53 changes: 39 additions & 14 deletions include/pqxx/transaction_focus.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ class PQXX_LIBEXPORT transaction_focus
public:
transaction_focus(
transaction_base &t, std::string_view cname, std::string_view oname) :
m_trans{t}, m_classname{cname}, m_name{oname}
m_trans{&t}, m_classname{cname}, m_name{oname}
{}

transaction_focus(
transaction_base &t, std::string_view cname, std::string &&oname) :
m_trans{t}, m_classname{cname}, m_name{std::move(oname)}
m_trans{&t}, m_classname{cname}, m_name{std::move(oname)}
{}

transaction_focus(transaction_base &t, std::string_view cname) :
m_trans{t}, m_classname{cname}
m_trans{&t}, m_classname{cname}
{}

transaction_focus() = delete;
Expand All @@ -60,30 +60,55 @@ public:
return pqxx::internal::describe_object(m_classname, m_name);
}

/// Can't move a transaction_focus.
/** Moving the transaction_focus would break the transaction's reference back
* to the object.
*/
transaction_focus(transaction_focus &&) = delete;
transaction_focus(transaction_focus &&other) :
m_trans{other.m_trans},
m_registered{other.m_registered},
m_classname{other.m_classname},
// We can't move the name until later.
m_name{}
{
// This is a bit more complicated than you might expect. The transaction
// has a backpointer to the focus, and we need to transfer that to the new
// focus.
move_name_and_registration(other);
}

/// Can't move a transaction_focus.
/** Moving the transaction_focus would break the transaction's reference back
* to the object.
*/
transaction_focus &operator=(transaction_focus &&) = delete;
transaction_focus &operator=(transaction_focus &&other)
{
if (&other != this)
{
if (m_registered) unregister_me();
m_trans = other.m_trans;
m_classname = other.m_classname;
move_name_and_registration(other);
}
return *this;
}

protected:
void register_me();
void unregister_me() noexcept;
void reg_pending_error(std::string const &) noexcept;
bool registered() const noexcept { return m_registered; }

transaction_base &m_trans;
transaction_base *m_trans;

private:
bool m_registered = false;
std::string_view m_classname;
std::string m_name;

/// Perform part of a move operation.
void move_name_and_registration(transaction_focus &other)
{
bool const reg{other.m_registered};
// Unregister the original while it still owns its name.
if (reg) other.unregister_me();
// Now! Quick! Steal that name.
m_name = std::move(other.m_name);
// Now that we own the name, register ourselves instead.
if (reg) this->register_me();
}
};
} // namespace pqxx
#endif
16 changes: 8 additions & 8 deletions src/pipeline.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ std::string const theDummyQuery{"SELECT " + theDummyValue + theSeparator};

void pqxx::pipeline::init()
{
m_encoding = internal::enc_group(m_trans.conn().encoding_id());
m_encoding = internal::enc_group(m_trans->conn().encoding_id());
m_issuedrange = make_pair(std::end(m_queries), std::end(m_queries));
attach();
}
Expand Down Expand Up @@ -125,7 +125,7 @@ void PQXX_COLD pqxx::pipeline::cancel()
{
while (have_pending())
{
pqxx::internal::gate::connection_pipeline(m_trans.conn()).cancel_query();
pqxx::internal::gate::connection_pipeline(m_trans->conn()).cancel_query();
auto canceled_query{m_issuedrange.first};
++m_issuedrange.first;
m_queries.erase(canceled_query);
Expand Down Expand Up @@ -211,7 +211,7 @@ void pqxx::pipeline::issue()
if (prepend_dummy)
cum = theDummyQuery + cum;

pqxx::internal::gate::connection_pipeline{m_trans.conn()}.start_exec(
pqxx::internal::gate::connection_pipeline{m_trans->conn()}.start_exec(
cum.c_str());

// Since we managed to send out these queries, update state to reflect this.
Expand All @@ -231,7 +231,7 @@ void PQXX_COLD pqxx::pipeline::internal_error(std::string const &err)

bool pqxx::pipeline::obtain_result(bool expect_none)
{
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
pqxx::internal::gate::connection_pipeline gate{m_trans->conn()};
auto const r{gate.get_result()};
if (r == nullptr)
{
Expand Down Expand Up @@ -273,7 +273,7 @@ void pqxx::pipeline::obtain_dummy()
static auto const text{
std::make_shared<std::string>("[DUMMY PIPELINE QUERY]")};

pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
pqxx::internal::gate::connection_pipeline gate{m_trans->conn()};
auto const r{gate.get_result()};
m_dummy_pending = false;

Expand Down Expand Up @@ -339,7 +339,7 @@ void pqxx::pipeline::obtain_dummy()
m_num_waiting--;
auto const query{*m_issuedrange.first->second.query};
auto &holder{m_issuedrange.first->second};
holder.res = m_trans.exec(query);
holder.res = m_trans->exec(query);
pqxx::internal::gate::result_creation{holder.res}.check_status();
++m_issuedrange.first;
} while (m_issuedrange.first != stop);
Expand Down Expand Up @@ -411,7 +411,7 @@ pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)

void pqxx::pipeline::get_further_available_results()
{
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
pqxx::internal::gate::connection_pipeline gate{m_trans->conn()};
while (not gate.is_busy() and obtain_result())
if (not gate.consume_input())
throw broken_connection{};
Expand All @@ -420,7 +420,7 @@ void pqxx::pipeline::get_further_available_results()

void pqxx::pipeline::receive_if_available()
{
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
pqxx::internal::gate::connection_pipeline gate{m_trans->conn()};
if (not gate.consume_input())
throw broken_connection{};
if (gate.is_busy())
Expand Down
2 changes: 1 addition & 1 deletion src/stream_from.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pqxx::stream_from::raw_line pqxx::stream_from::get_raw_line()
{
if (*this)
{
internal::gate::connection_stream_from gate{m_trans.conn()};
internal::gate::connection_stream_from gate{m_trans->conn()};
try
{
raw_line line{gate.read_copy_line()};
Expand Down
4 changes: 2 additions & 2 deletions src/stream_to.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pqxx::stream_to::~stream_to() noexcept

void pqxx::stream_to::write_raw_line(std::string_view text)
{
internal::gate::connection_stream_to{m_trans.conn()}.write_copy_line(text);
internal::gate::connection_stream_to{m_trans->conn()}.write_copy_line(text);
}


Expand Down Expand Up @@ -119,7 +119,7 @@ void pqxx::stream_to::complete()
{
m_finished = true;
unregister_me();
internal::gate::connection_stream_to{m_trans.conn()}.end_copy_write();
internal::gate::connection_stream_to{m_trans->conn()}.end_copy_write();
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/transaction_base.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,15 @@ std::string pqxx::transaction_base::description() const

void pqxx::transaction_focus::register_me()
{
pqxx::internal::gate::transaction_transaction_focus{m_trans}.register_focus(
pqxx::internal::gate::transaction_transaction_focus{*m_trans}.register_focus(
this);
m_registered = true;
}


void pqxx::transaction_focus::unregister_me() noexcept
{
pqxx::internal::gate::transaction_transaction_focus{m_trans}
pqxx::internal::gate::transaction_transaction_focus{*m_trans}
.unregister_focus(this);
m_registered = false;
}
Expand All @@ -535,6 +535,6 @@ void pqxx::transaction_focus::unregister_me() noexcept
void pqxx::transaction_focus::reg_pending_error(
std::string const &err) noexcept
{
pqxx::internal::gate::transaction_transaction_focus{m_trans}
pqxx::internal::gate::transaction_transaction_focus{*m_trans}
.register_pending_error(err);
}
17 changes: 17 additions & 0 deletions test/unit/test_stream_to.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,22 @@ void test_stream_to_escaping()
}


void test_stream_to_moves_into_optional()
{
pqxx::connection cx;
pqxx::transaction tx{cx};
tx.exec0("CREATE TEMP TABLE foo (a integer)");
std::optional<pqxx::stream_to> org{std::in_place, pqxx::stream_to::table(tx, {"foo"}, {"a"})};
org->write_values(1);
auto copy{std::move(org)};
copy->write_values(2);
copy->complete();
auto values{tx.exec_n(2, "SELECT a FROM foo ORDER BY a")};
PQXX_CHECK_EQUAL(values[0][0].as<int>(), 1, "Streaming results start off wrong.");
PQXX_CHECK_EQUAL(values[1][0].as<int>(), 2, "Moved stream went wrong.");
}


PQXX_REGISTER_TEST(test_stream_to);
PQXX_REGISTER_TEST(test_container_stream_to);
PQXX_REGISTER_TEST(test_stream_to_does_nonnull_optional);
Expand All @@ -534,4 +550,5 @@ PQXX_REGISTER_TEST(test_stream_to_factory_with_dynamic_columns);
PQXX_REGISTER_TEST(test_stream_to_quotes_arguments);
PQXX_REGISTER_TEST(test_stream_to_optionals);
PQXX_REGISTER_TEST(test_stream_to_escaping);
PQXX_REGISTER_TEST(test_stream_to_moves_into_optional);
} // namespace

0 comments on commit 68c93b6

Please sign in to comment.