Skip to content

Commit

Permalink
Respect nested-transaction-rule in subsequent transactions (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
camsaul committed Oct 3, 2023
1 parent 7f2df92 commit 75a4147
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 209 deletions.
2 changes: 2 additions & 0 deletions .clj-kondo/config.edn
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
toucan2.insert insert
toucan2.instance instance
toucan2.jdbc jdbc
toucan2.jdbc.connection jdbc.conn
toucan2.jdbc.options jdbc.options
toucan2.jdbc.pipeline jdbc.pipeline
toucan2.jdbc.query jdbc.query
toucan2.jdbc.read jdbc.read
toucan2.jdbc.result-set jdbc.rs
Expand Down
42 changes: 6 additions & 36 deletions src/toucan2/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
with no default connection available and execute it later with one bound. (This also means
that [[toucan2.execute/reducible-query]] does not capture dynamic bindings such
as [[toucan2.connection/*current-connectable*]] -- you probably wouldn't want it to, anyway, since we have no
guarantees and open connection will be around when we go to use the reducible query later.)"
guarantees and open connection will be around when we go to use the reducible query later.
The default JDBC implementations for methods here live in [[toucan2.jdbc.connection]]."
(:require
[clojure.spec.alpha :as s]
[methodical.core :as m]
[next.jdbc :as next.jdbc]
[next.jdbc.transaction :as next.jdbc.transaction]
[pretty.core :as pretty]
[toucan2.log :as log]
[toucan2.protocols :as protocols]
Expand Down Expand Up @@ -203,24 +203,6 @@
*current-connectable*)]
(do-with-connection current-connectable f)))

(m/defmethod do-with-connection java.sql.Connection
[conn f]
(f conn))

(m/defmethod do-with-connection javax.sql.DataSource
[^javax.sql.DataSource data-source f]
(with-open [conn (.getConnection data-source)]
(f conn)))

(m/defmethod do-with-connection clojure.lang.IPersistentMap
"Implementation for map connectables. Treats them as a `clojure.java.jdbc`-style connection spec map, converting them to
a `java.sql.DataSource` with [[next.jdbc/get-datasource]]."
[m f]
(do-with-connection (next.jdbc/get-datasource m) f))

;;; for record types that implement `DataSource`, prefer the `DataSource` impl over the map impl.
(m/prefer-method! #'do-with-connection javax.sql.DataSource clojure.lang.IPersistentMap)

;;;; connection string support

(defn connection-string-protocol
Expand All @@ -247,12 +229,7 @@
[connection-string f]
(do-with-connection-string connection-string f))

(m/defmethod do-with-connection-string "jdbc"
"Implementation of `do-with-connection-string` (and thus [[do-with-connection]]) for all strings starting with `jdbc:`.
Calls `java.sql.DriverManager/getConnection` on the connection string."
[^String connection-string f]
(with-open [conn (java.sql.DriverManager/getConnection connection-string)]
(f conn)))
;;; JDBC implementations live in [[toucan2.jdbc.connection]]

(m/defmulti do-with-transaction
"`options` are options for determining what type of transaction we'll get. See dox for [[with-transaction]] for more
Expand All @@ -267,15 +244,6 @@
(log/debugf "do with transaction %s %s" options (some-> connection class .getCanonicalName symbol))
(next-method connection options (bind-current-connectable-fn f)))

(m/defmethod do-with-transaction java.sql.Connection
[^java.sql.Connection conn options f]
(let [nested-tx-rule (get options :nested-transaction-rule :allow)
options (dissoc options :nested-transaction-rule)]
(log/debugf "do with JDBC transaction (nested rule: %s) with options %s" nested-tx-rule options)
(binding [next.jdbc.transaction/*nested-tx* nested-tx-rule]
(next.jdbc/with-transaction [t-conn conn options]
(f t-conn)))))

(defmacro with-transaction
"Gets a connection with [[with-connection]], and executes `body` within that transaction.
Expand All @@ -302,3 +270,5 @@
:options (s/? ::with-transaction-options)))
:body (s/+ any?))
:ret any?)

;;; JDBC implementation lives in [[toucan2.jdbc.connection]]
91 changes: 22 additions & 69 deletions src/toucan2/jdbc.clj
Original file line number Diff line number Diff line change
@@ -1,85 +1,38 @@
(ns toucan2.jdbc
(:require
[methodical.core :as m]
[toucan2.jdbc.query :as jdbc.query]
[toucan2.model :as model]
[toucan2.pipeline :as pipeline]
[toucan2.types :as types]))
[toucan2.jdbc.connection :as jdbc.conn]
[toucan2.jdbc.pipeline :as jdbc.pipeline]
[toucan2.protocols :as protocols]))

(set! *warn-on-reflection* true)

(m/defmethod pipeline/transduce-execute-with-connection [#_connection java.sql.Connection
#_query-type :default
#_model :default]
"Default impl for the JDBC query execution backend."
[rf conn query-type model sql-args]
{:pre [(sequential? sql-args) (string? (first sql-args))]}
;; `:return-keys` is passed in this way instead of binding a dynamic var because we don't want any additional queries
;; happening inside of the `rf` to return keys or whatever.
(let [extra-options (when (isa? query-type :toucan.result-type/pks)
{:return-keys true})
result (jdbc.query/reduce-jdbc-query rf (rf) conn model sql-args extra-options)]
(rf result)))

(m/defmethod pipeline/transduce-execute-with-connection [#_connection java.sql.Connection
#_query-type :toucan.result-type/pks
#_model :default]
"JDBC query execution backend for executing queries that return PKs (`:toucan.result-type/pks`).
Applies transducer to call [[toucan2.model/select-pks-fn]] on each result row."
[rf conn query-type model sql-args]
(let [xform (map (model/select-pks-fn model))]
(next-method (xform rf) conn query-type model sql-args)))

(defn- transduce-instances-from-pks
[rf model columns pks]
;; make sure [[toucan2.select]] is loaded so we get the impls for `:toucan.query-type/select.instances`
(when-not (contains? (loaded-libs) 'toucan2.select)
(locking clojure.lang.RT/REQUIRE_LOCK
(require 'toucan2.select)))
(if (empty? pks)
[]
(let [kv-args {:toucan/pk [:in pks]}
parsed-args {:columns columns
:kv-args kv-args}]
(pipeline/transduce-query rf :toucan.query-type/select.instances-from-pks model parsed-args {}))))
(comment jdbc.conn/keep-me
jdbc.pipeline/keep-me)

(derive ::DML-queries-returning-instances :toucan.result-type/instances)

(doseq [query-type [:toucan.query-type/delete.instances
:toucan.query-type/update.instances
:toucan.query-type/insert.instances]]
(derive query-type ::DML-queries-returning-instances))

(m/defmethod pipeline/transduce-execute-with-connection [#_connection java.sql.Connection
#_query-type ::DML-queries-returning-instances
#_model :default]
"DML queries like `UPDATE` or `INSERT` don't usually support returning instances, at least not with JDBC. So for these
situations we'll fake it by first running an equivalent query returning inserted/affected PKs, and then do a
subsequent SELECT to get those rows. Then we'll reduce the rows with the original reducing function."
[rf conn query-type model sql-args]
;; We're using `conj` here instead of `rf` so no row-transform nonsense or whatever is done. We will pass the
;; actual instances to the original `rf` once we get them.
(let [pk-query-type (types/similar-query-type-returning query-type :toucan.result-type/pks)
pks (pipeline/transduce-execute-with-connection conj conn pk-query-type model sql-args)
;; this is sort of a hack but I don't know of any other way to pass along `:columns` information with the
;; original parsed args
columns (:columns pipeline/*parsed-args*)]
;; once we have a sequence of PKs then get instances as with `select` and do our magic on them using the
;; ORIGINAL `rf`.
(transduce-instances-from-pks rf model columns pks)))
(set! *warn-on-reflection* true)

;;;; load the miscellaneous integrations

(defn- class-exists? [^String class-name]
(defn- class-for-name ^Class [^String class-name]
(try
(Class/forName class-name)
(catch Throwable _)))

(when (class-exists? "org.postgresql.jdbc.PgConnection")
(when (class-for-name "org.postgresql.jdbc.PgConnection")
(require 'toucan2.jdbc.postgres))

(when (some class-exists? ["org.mariadb.jdbc.Connection"
(when (some class-for-name ["org.mariadb.jdbc.Connection"
"org.mariadb.jdbc.MariaDbConnection"
"com.mysql.cj.MysqlConnection"])
(require 'toucan2.jdbc.mysql-mariadb))

;;; c3p0 and Hikari integration: when we encounter a wrapped connection pool connection, dispatch off of the class of
;;; connection it wraps
(doseq [pool-connection-class-name ["com.mchange.v2.c3p0.impl.NewProxyConnection"
"com.zaxxer.hikari.pool.HikariProxyConnection"]]
(when-let [pool-connection-class (class-for-name pool-connection-class-name)]
(extend pool-connection-class
protocols/IDispatchValue
{:dispatch-value (fn [^java.sql.Wrapper conn]
(try
(protocols/dispatch-value (.unwrap conn java.sql.Connection))

Check warning on line 36 in src/toucan2/jdbc.clj

View check run for this annotation

Codecov / codecov/patch

src/toucan2/jdbc.clj#L32-L36

Added lines #L32 - L36 were not covered by tests
(catch Throwable _
pool-connection-class)))})))

Check warning on line 38 in src/toucan2/jdbc.clj

View check run for this annotation

Codecov / codecov/patch

src/toucan2/jdbc.clj#L38

Added line #L38 was not covered by tests
43 changes: 43 additions & 0 deletions src/toucan2/jdbc/connection.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
(ns toucan2.jdbc.connection
(:require
[methodical.core :as m]
[next.jdbc]
[next.jdbc.transaction]
[toucan2.connection :as conn]
[toucan2.log :as log]))

(set! *warn-on-reflection* true)

(m/defmethod conn/do-with-connection java.sql.Connection
[conn f]
(f conn))

(m/defmethod conn/do-with-connection javax.sql.DataSource
[^javax.sql.DataSource data-source f]
(with-open [conn (.getConnection data-source)]
(f conn)))

(m/defmethod conn/do-with-connection clojure.lang.IPersistentMap
"Implementation for map connectables. Treats them as a `clojure.java.jdbc`-style connection spec map, converting them to
a `java.sql.DataSource` with [[next.jdbc/get-datasource]]."
[m f]
(conn/do-with-connection (next.jdbc/get-datasource m) f))

;;; for record types that implement `DataSource`, prefer the `DataSource` impl over the map impl.
(m/prefer-method! #'conn/do-with-connection javax.sql.DataSource clojure.lang.IPersistentMap)

(m/defmethod conn/do-with-connection-string "jdbc"
"Implementation of `do-with-connection-string` (and thus [[do-with-connection]]) for all strings starting with `jdbc:`.
Calls `java.sql.DriverManager/getConnection` on the connection string."
[^String connection-string f]
(with-open [conn (java.sql.DriverManager/getConnection connection-string)]
(f conn)))

(m/defmethod conn/do-with-transaction java.sql.Connection
[^java.sql.Connection conn options f]
(let [nested-tx-rule (get options :nested-transaction-rule next.jdbc.transaction/*nested-tx*)
options (dissoc options :nested-transaction-rule)]
(log/debugf "do with JDBC transaction (nested rule: %s) with options %s" nested-tx-rule options)
(binding [next.jdbc.transaction/*nested-tx* nested-tx-rule]
(next.jdbc/with-transaction [t-conn conn options]
(f t-conn)))))
68 changes: 68 additions & 0 deletions src/toucan2/jdbc/pipeline.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
(ns toucan2.jdbc.pipeline
(:require
[methodical.core :as m]
[toucan2.jdbc.query :as jdbc.query]
[toucan2.model :as model]
[toucan2.pipeline :as pipeline]
[toucan2.types :as types]))

(m/defmethod pipeline/transduce-execute-with-connection [#_connection java.sql.Connection
#_query-type :default
#_model :default]
"Default impl for the JDBC query execution backend."
[rf conn query-type model sql-args]
{:pre [(sequential? sql-args) (string? (first sql-args))]}
;; `:return-keys` is passed in this way instead of binding a dynamic var because we don't want any additional queries
;; happening inside of the `rf` to return keys or whatever.
(let [extra-options (when (isa? query-type :toucan.result-type/pks)
{:return-keys true})
result (jdbc.query/reduce-jdbc-query rf (rf) conn model sql-args extra-options)]
(rf result)))

(m/defmethod pipeline/transduce-execute-with-connection [#_connection java.sql.Connection
#_query-type :toucan.result-type/pks
#_model :default]
"JDBC query execution backend for executing queries that return PKs (`:toucan.result-type/pks`).
Applies transducer to call [[toucan2.model/select-pks-fn]] on each result row."
[rf conn query-type model sql-args]
(let [xform (map (model/select-pks-fn model))]
(next-method (xform rf) conn query-type model sql-args)))

(defn- transduce-instances-from-pks
[rf model columns pks]
;; make sure [[toucan2.select]] is loaded so we get the impls for `:toucan.query-type/select.instances`
(when-not (contains? (loaded-libs) 'toucan2.select)
(locking clojure.lang.RT/REQUIRE_LOCK
(require 'toucan2.select)))

Check warning on line 37 in src/toucan2/jdbc/pipeline.clj

View check run for this annotation

Codecov / codecov/patch

src/toucan2/jdbc/pipeline.clj#L36-L37

Added lines #L36 - L37 were not covered by tests
(if (empty? pks)
[]
(let [kv-args {:toucan/pk [:in pks]}
parsed-args {:columns columns
:kv-args kv-args}]
(pipeline/transduce-query rf :toucan.query-type/select.instances-from-pks model parsed-args {}))))

(derive ::DML-queries-returning-instances :toucan.result-type/instances)

(doseq [query-type [:toucan.query-type/delete.instances
:toucan.query-type/update.instances
:toucan.query-type/insert.instances]]
(derive query-type ::DML-queries-returning-instances))

(m/defmethod pipeline/transduce-execute-with-connection [#_connection java.sql.Connection
#_query-type ::DML-queries-returning-instances
#_model :default]
"DML queries like `UPDATE` or `INSERT` don't usually support returning instances, at least not with JDBC. So for these
situations we'll fake it by first running an equivalent query returning inserted/affected PKs, and then do a
subsequent SELECT to get those rows. Then we'll reduce the rows with the original reducing function."
[rf conn query-type model sql-args]
;; We're using `conj` here instead of `rf` so no row-transform nonsense or whatever is done. We will pass the
;; actual instances to the original `rf` once we get them.
(let [pk-query-type (types/similar-query-type-returning query-type :toucan.result-type/pks)
pks (pipeline/transduce-execute-with-connection conj conn pk-query-type model sql-args)
;; this is sort of a hack but I don't know of any other way to pass along `:columns` information with the
;; original parsed args
columns (:columns pipeline/*parsed-args*)]
;; once we have a sequence of PKs then get instances as with `select` and do our magic on them using the
;; ORIGINAL `rf`.
(transduce-instances-from-pks rf model columns pks)))
2 changes: 1 addition & 1 deletion src/toucan2/jdbc/query.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns ^:no-doc toucan2.jdbc.query
(:require
[next.jdbc :as next.jdbc]
[next.jdbc]
[toucan2.jdbc.options :as jdbc.options]
[toucan2.jdbc.result-set :as jdbc.rs]
[toucan2.log :as log]
Expand Down
19 changes: 2 additions & 17 deletions src/toucan2/protocols.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns toucan2.protocols
(:require [potemkin :as p]))
(:require
[potemkin :as p]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -104,22 +105,6 @@
(dispatch-value [k]
k))

;;; c3p0 and Hikari integration: when we encounter a wrapped connection pool connection, dispatch off of the class of
;;; connection it wraps
(doseq [^String pool-connection-class-name ["com.mchange.v2.c3p0.impl.NewProxyConnection"
"com.zaxxer.hikari.pool.HikariProxyConnection"]]
(when-let [pool-connection-class (try
(Class/forName pool-connection-class-name)
(catch Throwable _
nil))]
(extend pool-connection-class
IDispatchValue
{:dispatch-value (fn [^java.sql.Wrapper conn]
(try
(dispatch-value (.unwrap conn java.sql.Connection))
(catch Throwable _
pool-connection-class)))})))

(p/defprotocol+ IDeferrableUpdate
(deferrable-update [this k f]
"Like [[clojure.core/update]], but this update can be deferred until later. For things like transient rows where you
Expand Down
Loading

0 comments on commit 75a4147

Please sign in to comment.