From 68c93b6aa69ea41bcc5a3dda3d53c61f00ec4f52 Mon Sep 17 00:00:00 2001 From: Jeroen Vermeulen Date: Thu, 6 Jul 2023 11:42:34 +0200 Subject: [PATCH] Support moving of `stream_to`. (#707) Fixes #706. Finally biting the bullet and supporting moves of transaction_focus, which requires dealing with that backpointer from the transaction back to the focus object. --- NEWS | 3 ++ include/pqxx/internal/stream_query.hxx | 3 ++ include/pqxx/internal/stream_query_impl.hxx | 2 +- include/pqxx/pipeline.hxx | 3 ++ include/pqxx/stream_from.hxx | 3 ++ include/pqxx/stream_to.hxx | 10 ++++ include/pqxx/transaction_focus.hxx | 53 +++++++++++++++------ src/pipeline.cxx | 16 +++---- src/stream_from.cxx | 2 +- src/stream_to.cxx | 4 +- src/transaction_base.cxx | 6 +-- test/unit/test_stream_to.cxx | 17 +++++++ 12 files changed, 93 insertions(+), 29 deletions(-) diff --git a/NEWS b/NEWS index 9c4be392e..198c053b8 100644 --- a/NEWS +++ b/NEWS @@ -31,6 +31,9 @@ - Support for `PQinitOpenSSL()`. (#678) - Slightly more helpful error for unsupported conversions. (#695) - Replace some C++ feature tests with C++20 feature macros. + - Give `stream_to` a move constructor. (#706) + - Support move in `stream_to`. (#706) +>>>>>>> 42a46214 (Support move assignment as well.) 7.7.4 - `transaction_base::for_each()` is now called `for_stream()`. (#580) - New `transaction_base::for_query()` is similar, but non-streaming. (#580) diff --git a/include/pqxx/internal/stream_query.hxx b/include/pqxx/internal/stream_query.hxx index b4f3ec1ed..a8f88db80 100644 --- a/include/pqxx/internal/stream_query.hxx +++ b/include/pqxx/internal/stream_query.hxx @@ -84,6 +84,9 @@ public: /// Execute `query` on `tx`, stream results. inline stream_query(transaction_base &tx, std::string_view query); + stream_query(stream_query &&) = delete; + stream_query &operator=(stream_query &&) = delete; + ~stream_query() noexcept { try diff --git a/include/pqxx/internal/stream_query_impl.hxx b/include/pqxx/internal/stream_query_impl.hxx index a56c1f56f..bc03ecdec 100644 --- a/include/pqxx/internal/stream_query_impl.hxx +++ b/include/pqxx/internal/stream_query_impl.hxx @@ -137,7 +137,7 @@ stream_query::read_line() & { assert(not done()); - internal::gate::connection_stream_from gate{m_trans.conn()}; + internal::gate::connection_stream_from gate{m_trans->conn()}; try { auto line{gate.read_copy_line()}; diff --git a/include/pqxx/pipeline.hxx b/include/pqxx/pipeline.hxx index 02fc38255..61a4d4f34 100644 --- a/include/pqxx/pipeline.hxx +++ b/include/pqxx/pipeline.hxx @@ -55,6 +55,9 @@ public: pipeline(pipeline const &) = delete; pipeline &operator=(pipeline const &) = delete; + pipeline(pipeline &&) = delete; + pipeline &operator=(pipeline &&) = delete; + /// Start a pipeline. explicit pipeline(transaction_base &t) : transaction_focus{t, s_classname} diff --git a/include/pqxx/stream_from.hxx b/include/pqxx/stream_from.hxx index 7996ce765..33dc51d67 100644 --- a/include/pqxx/stream_from.hxx +++ b/include/pqxx/stream_from.hxx @@ -81,6 +81,9 @@ public: using raw_line = std::pair, std::size_t>; + stream_from(stream_from &&) = delete; + stream_from &operator=(stream_from &&) = delete; + /// Factory: Execute query, and stream the results. /** The query can be a SELECT query or a VALUES query; or it can be an * UPDATE, INSERT, or DELETE with a RETURNING clause. diff --git a/include/pqxx/stream_to.hxx b/include/pqxx/stream_to.hxx index c5bcafcc3..85c2e6647 100644 --- a/include/pqxx/stream_to.hxx +++ b/include/pqxx/stream_to.hxx @@ -189,6 +189,16 @@ public: transaction_base &, std::string_view table_name, Iter columns_begin, Iter columns_end); + explicit stream_to(stream_to &&other) : + // (This first step only moves the transaction_focus base-class object.) + transaction_focus{std::move(other)}, + m_finished{other.m_finished}, + m_buffer{std::move(other.m_buffer)}, + m_field_buf{std::move(other.m_field_buf)}, + m_finder{other.m_finder} + { + other.m_finished = true; + } ~stream_to() noexcept; /// Does this stream still need to @ref complete()? diff --git a/include/pqxx/transaction_focus.hxx b/include/pqxx/transaction_focus.hxx index c5b798af0..ed8500324 100644 --- a/include/pqxx/transaction_focus.hxx +++ b/include/pqxx/transaction_focus.hxx @@ -30,16 +30,16 @@ class PQXX_LIBEXPORT transaction_focus public: transaction_focus( transaction_base &t, std::string_view cname, std::string_view oname) : - m_trans{t}, m_classname{cname}, m_name{oname} + m_trans{&t}, m_classname{cname}, m_name{oname} {} transaction_focus( transaction_base &t, std::string_view cname, std::string &&oname) : - m_trans{t}, m_classname{cname}, m_name{std::move(oname)} + m_trans{&t}, m_classname{cname}, m_name{std::move(oname)} {} transaction_focus(transaction_base &t, std::string_view cname) : - m_trans{t}, m_classname{cname} + m_trans{&t}, m_classname{cname} {} transaction_focus() = delete; @@ -60,17 +60,30 @@ public: return pqxx::internal::describe_object(m_classname, m_name); } - /// Can't move a transaction_focus. - /** Moving the transaction_focus would break the transaction's reference back - * to the object. - */ - transaction_focus(transaction_focus &&) = delete; + transaction_focus(transaction_focus &&other) : + m_trans{other.m_trans}, + m_registered{other.m_registered}, + m_classname{other.m_classname}, + // We can't move the name until later. + m_name{} + { + // This is a bit more complicated than you might expect. The transaction + // has a backpointer to the focus, and we need to transfer that to the new + // focus. + move_name_and_registration(other); + } - /// Can't move a transaction_focus. - /** Moving the transaction_focus would break the transaction's reference back - * to the object. - */ - transaction_focus &operator=(transaction_focus &&) = delete; + transaction_focus &operator=(transaction_focus &&other) + { + if (&other != this) + { + if (m_registered) unregister_me(); + m_trans = other.m_trans; + m_classname = other.m_classname; + move_name_and_registration(other); + } + return *this; + } protected: void register_me(); @@ -78,12 +91,24 @@ protected: void reg_pending_error(std::string const &) noexcept; bool registered() const noexcept { return m_registered; } - transaction_base &m_trans; + transaction_base *m_trans; private: bool m_registered = false; std::string_view m_classname; std::string m_name; + + /// Perform part of a move operation. + void move_name_and_registration(transaction_focus &other) + { + bool const reg{other.m_registered}; + // Unregister the original while it still owns its name. + if (reg) other.unregister_me(); + // Now! Quick! Steal that name. + m_name = std::move(other.m_name); + // Now that we own the name, register ourselves instead. + if (reg) this->register_me(); + } }; } // namespace pqxx #endif diff --git a/src/pipeline.cxx b/src/pipeline.cxx index 55662c078..4153bdf4c 100644 --- a/src/pipeline.cxx +++ b/src/pipeline.cxx @@ -35,7 +35,7 @@ std::string const theDummyQuery{"SELECT " + theDummyValue + theSeparator}; void pqxx::pipeline::init() { - m_encoding = internal::enc_group(m_trans.conn().encoding_id()); + m_encoding = internal::enc_group(m_trans->conn().encoding_id()); m_issuedrange = make_pair(std::end(m_queries), std::end(m_queries)); attach(); } @@ -125,7 +125,7 @@ void PQXX_COLD pqxx::pipeline::cancel() { while (have_pending()) { - pqxx::internal::gate::connection_pipeline(m_trans.conn()).cancel_query(); + pqxx::internal::gate::connection_pipeline(m_trans->conn()).cancel_query(); auto canceled_query{m_issuedrange.first}; ++m_issuedrange.first; m_queries.erase(canceled_query); @@ -211,7 +211,7 @@ void pqxx::pipeline::issue() if (prepend_dummy) cum = theDummyQuery + cum; - pqxx::internal::gate::connection_pipeline{m_trans.conn()}.start_exec( + pqxx::internal::gate::connection_pipeline{m_trans->conn()}.start_exec( cum.c_str()); // Since we managed to send out these queries, update state to reflect this. @@ -231,7 +231,7 @@ void PQXX_COLD pqxx::pipeline::internal_error(std::string const &err) bool pqxx::pipeline::obtain_result(bool expect_none) { - pqxx::internal::gate::connection_pipeline gate{m_trans.conn()}; + pqxx::internal::gate::connection_pipeline gate{m_trans->conn()}; auto const r{gate.get_result()}; if (r == nullptr) { @@ -273,7 +273,7 @@ void pqxx::pipeline::obtain_dummy() static auto const text{ std::make_shared("[DUMMY PIPELINE QUERY]")}; - pqxx::internal::gate::connection_pipeline gate{m_trans.conn()}; + pqxx::internal::gate::connection_pipeline gate{m_trans->conn()}; auto const r{gate.get_result()}; m_dummy_pending = false; @@ -339,7 +339,7 @@ void pqxx::pipeline::obtain_dummy() m_num_waiting--; auto const query{*m_issuedrange.first->second.query}; auto &holder{m_issuedrange.first->second}; - holder.res = m_trans.exec(query); + holder.res = m_trans->exec(query); pqxx::internal::gate::result_creation{holder.res}.check_status(); ++m_issuedrange.first; } while (m_issuedrange.first != stop); @@ -411,7 +411,7 @@ pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q) void pqxx::pipeline::get_further_available_results() { - pqxx::internal::gate::connection_pipeline gate{m_trans.conn()}; + pqxx::internal::gate::connection_pipeline gate{m_trans->conn()}; while (not gate.is_busy() and obtain_result()) if (not gate.consume_input()) throw broken_connection{}; @@ -420,7 +420,7 @@ void pqxx::pipeline::get_further_available_results() void pqxx::pipeline::receive_if_available() { - pqxx::internal::gate::connection_pipeline gate{m_trans.conn()}; + pqxx::internal::gate::connection_pipeline gate{m_trans->conn()}; if (not gate.consume_input()) throw broken_connection{}; if (gate.is_busy()) diff --git a/src/stream_from.cxx b/src/stream_from.cxx index 1b4af3673..82a401022 100644 --- a/src/stream_from.cxx +++ b/src/stream_from.cxx @@ -112,7 +112,7 @@ pqxx::stream_from::raw_line pqxx::stream_from::get_raw_line() { if (*this) { - internal::gate::connection_stream_from gate{m_trans.conn()}; + internal::gate::connection_stream_from gate{m_trans->conn()}; try { raw_line line{gate.read_copy_line()}; diff --git a/src/stream_to.cxx b/src/stream_to.cxx index f3878a6d2..68afb4c79 100644 --- a/src/stream_to.cxx +++ b/src/stream_to.cxx @@ -70,7 +70,7 @@ pqxx::stream_to::~stream_to() noexcept void pqxx::stream_to::write_raw_line(std::string_view text) { - internal::gate::connection_stream_to{m_trans.conn()}.write_copy_line(text); + internal::gate::connection_stream_to{m_trans->conn()}.write_copy_line(text); } @@ -119,7 +119,7 @@ void pqxx::stream_to::complete() { m_finished = true; unregister_me(); - internal::gate::connection_stream_to{m_trans.conn()}.end_copy_write(); + internal::gate::connection_stream_to{m_trans->conn()}.end_copy_write(); } } diff --git a/src/transaction_base.cxx b/src/transaction_base.cxx index 312cabc34..78e89264e 100644 --- a/src/transaction_base.cxx +++ b/src/transaction_base.cxx @@ -518,7 +518,7 @@ std::string pqxx::transaction_base::description() const void pqxx::transaction_focus::register_me() { - pqxx::internal::gate::transaction_transaction_focus{m_trans}.register_focus( + pqxx::internal::gate::transaction_transaction_focus{*m_trans}.register_focus( this); m_registered = true; } @@ -526,7 +526,7 @@ void pqxx::transaction_focus::register_me() void pqxx::transaction_focus::unregister_me() noexcept { - pqxx::internal::gate::transaction_transaction_focus{m_trans} + pqxx::internal::gate::transaction_transaction_focus{*m_trans} .unregister_focus(this); m_registered = false; } @@ -535,6 +535,6 @@ void pqxx::transaction_focus::unregister_me() noexcept void pqxx::transaction_focus::reg_pending_error( std::string const &err) noexcept { - pqxx::internal::gate::transaction_transaction_focus{m_trans} + pqxx::internal::gate::transaction_transaction_focus{*m_trans} .register_pending_error(err); } diff --git a/test/unit/test_stream_to.cxx b/test/unit/test_stream_to.cxx index 002857626..bb62a1418 100644 --- a/test/unit/test_stream_to.cxx +++ b/test/unit/test_stream_to.cxx @@ -526,6 +526,22 @@ void test_stream_to_escaping() } +void test_stream_to_moves_into_optional() +{ + pqxx::connection cx; + pqxx::transaction tx{cx}; + tx.exec0("CREATE TEMP TABLE foo (a integer)"); + std::optional org{std::in_place, pqxx::stream_to::table(tx, {"foo"}, {"a"})}; + org->write_values(1); + auto copy{std::move(org)}; + copy->write_values(2); + copy->complete(); + auto values{tx.exec_n(2, "SELECT a FROM foo ORDER BY a")}; + PQXX_CHECK_EQUAL(values[0][0].as(), 1, "Streaming results start off wrong."); + PQXX_CHECK_EQUAL(values[1][0].as(), 2, "Moved stream went wrong."); +} + + PQXX_REGISTER_TEST(test_stream_to); PQXX_REGISTER_TEST(test_container_stream_to); PQXX_REGISTER_TEST(test_stream_to_does_nonnull_optional); @@ -534,4 +550,5 @@ PQXX_REGISTER_TEST(test_stream_to_factory_with_dynamic_columns); PQXX_REGISTER_TEST(test_stream_to_quotes_arguments); PQXX_REGISTER_TEST(test_stream_to_optionals); PQXX_REGISTER_TEST(test_stream_to_escaping); +PQXX_REGISTER_TEST(test_stream_to_moves_into_optional); } // namespace