From c5a74e4d601319259c2edb67d2bff37dfe4eae09 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Thu, 11 May 2023 00:43:22 +0300 Subject: [PATCH] Nested transactions --- README.md | 58 ++++++++++++++++++++++++++++++++++++++-- src/kcas/kcas.ml | 47 +++++++++++++++++++++++++++++++- src/kcas/kcas.mli | 56 ++++++++++++++++++++++++++++++++++---- src/kcas_data/hashtbl.ml | 6 ++--- test/kcas/test.ml | 37 +++++++++++++++++++++++++ 5 files changed, 193 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 5988c4d5..f784edc6 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ is distributed under the [ISC license](LICENSE.md). - [Designing lock-free algorithms with k-CAS](#designing-lock-free-algorithms-with-k-cas) - [Minimize accesses](#minimize-accesses) - [Prefer compound accesses](#prefer-compound-accesses) - - [Log updates optimistically and abort](#log-updates-optimistically-and-abort) + - [Log updates optimistically](#log-updates-optimistically) - [Postcompute](#postcompute) - [Race to cooperate](#race-to-cooperate) - [Understanding transactions](#understanding-transactions) @@ -975,7 +975,7 @@ The above will likely perform slightly better. > [`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-update), > that only performs a single access to the transaction log. -#### Log updates optimistically and abort +#### Log updates optimistically Transactional write accesses to shared memory locations are only attempted after the transaction log construction finishes successfully. Therefore it is entirely @@ -1031,6 +1031,60 @@ can see the updates are only done in case of success. +A problem with the `transfer` function above is that it is not a composable +transaction. The transaction mechanism provided by **kcas** does not implicitly +perform rollbacks of changes made to locations, but it does offer low level +support for nested conditional transactions. + +By explicitly calling +[`snapshot`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-snapshot) +and +[`rollback`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-rollback) +one can scope tentative changes and create a composable version of `transfer`: + +```ocaml +# let transfer ~xt amount ~source ~target = + let snap = Xt.snapshot ~xt in + if Xt.fetch_and_add ~xt source (-amount) < amount then + Retry.later (Xt.rollback ~xt snap); + Xt.fetch_and_add ~xt target amount |> ignore +val transfer : + xt:'a Xt.t -> int -> source:int Loc.t -> target:int Loc.t -> unit = +``` + +Given a bunch of locations + +```ocaml +let a = Loc.make 10 +and b = Loc.make 20 +and c = Loc.make 30 +and d = Loc.make 27 +``` + +we can now attempt `transfer`s and perform the +[`first`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-first) +of them that succeeds: + +```ocaml +# Xt.commit { + tx = Xt.first [ + transfer 15 ~source:a ~target:d; + transfer 15 ~source:b ~target:d; + transfer 15 ~source:c ~target:d; + ] + } +- : unit = () +``` + +A look at the locations + +```ocaml +# List.map Loc.get [a; b; c; d] +- : int list = [10; 5; 30; 42] +``` + +confirms the expected result. + ### Postcompute The more time a transaction takes, the more likely it is to suffer from diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index e6932587..aa509f37 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -293,6 +293,10 @@ module Retry = struct let later () = raise Later [@@inline never] let unless condition = if not condition then later () [@@inline] + + exception Invalid + + let invalid () = raise Invalid [@@inline never] end let add_awaiter loc before awaiter = @@ -506,7 +510,7 @@ module Xt = struct let validate_one casn loc state = let before = if is_cmp casn state then eval state else state.before in - if before != eval (fenceless_get (as_atomic loc)) then Retry.later () + if before != eval (fenceless_get (as_atomic loc)) then Retry.invalid () [@@inline] let rec validate_all casn = function @@ -636,6 +640,45 @@ module Xt = struct Obj.magic a == loc | _, Miss, _ -> impossible ()) + let rec rollback casn cass_snap cass = + if cass_snap == cass then cass + else + match cass with + | NIL -> NIL + | CASN { loc; state; lt; gt; _ } -> ( + match splay ~hit_parent:false loc.id cass_snap with + | lt_mark, Miss, gt_mark -> + let lt = rollback casn lt_mark lt + and gt = rollback casn gt_mark gt in + let state = + if is_cmp casn state then state + else + let current = fenceless_get (as_atomic loc) in + if state.before != eval current then Retry.invalid () + else current + in + CASN { loc; state; lt; gt; awaiters = [] } + | lt_mark, Hit (loc, state), gt_mark -> + let lt = rollback casn lt_mark lt + and gt = rollback casn gt_mark gt in + CASN { loc; state; lt; gt; awaiters = [] }) + + type 'x snap = cass + + let snapshot ~xt = xt.cass + let rollback ~xt snap = xt.cass <- rollback xt.casn snap xt.cass + + let rec first ~xt tx = function + | [] -> tx ~xt + | tx' :: txs -> ( + match tx ~xt with + | value -> value + | exception Retry.Later -> first ~xt tx' txs) + + let first ~xt = function + | [] -> Retry.later () + | tx :: txs -> first ~xt tx txs + type 'a tx = { tx : 'x. xt:'x t -> 'a } [@@unboxed] let call { tx } = tx [@@inline] @@ -697,6 +740,8 @@ module Xt = struct | exception Mode.Interference -> commit (Backoff.once backoff) Mode.lock_free (reset Mode.lock_free xt) tx)) + | exception Retry.Invalid -> + commit (Backoff.once backoff) mode (reset_quick xt) tx | exception Retry.Later -> ( if xt.cass == NIL then invalid_retry (); let t = Domain_local_await.prepare_for_await () in diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 89ea1869..67a9aed5 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -19,6 +19,14 @@ module Retry : sig val unless : bool -> unit (** [unless condition] is equivalent to [if not condition then later ()]. *) + + exception Invalid + (** Exception that may be raised to signal that the transaction log is no + longer valid, e.g. because shared memory locations have been changed + outside of the transaction, and the transaction should be retried. *) + + val invalid : unit -> 'a + (** [invalid ()] is equivalent to [raise Invalid]. *) end (** Operating modes of the [k-CAS-n-CMP] algorithm. *) @@ -260,6 +268,44 @@ module Xt : sig (** [to_nonblocking ~xt tx] converts the blocking transaction [tx] to a non-blocking transaction by returning [None] on retry. *) + (** {1 Nested transactions} + + The transaction mechanism does not implicitly rollback changes recorded in + the transaction log. Using {!snapshot} and {!rollback} it is possible to + implement nested conditional transactions that may tentatively record + changes in the transaction log and then later discard those changes. *) + + type 'x snap + (** Type of a {!snapshot} of a transaction log. *) + + val snapshot : xt:'x t -> 'x snap + (** [snapshot ~xt] returns a snapshot of the transaction log. + + Taking a snapshot is a fast constant time [O(1)] operation. *) + + val rollback : xt:'x t -> 'x snap -> unit + (** [rollback ~xt snap] discards any changes of shared memory locations + recorded in the transaction log after the [snap] was taken by {!snapshot}. + + Performing a rollback is potentially as expensive as linear time [O(n)] in + the number of locations accessed, but, depending on the exact access + patterns, may also be performed more quickly. The implementation is + optimized with the assumption that a rollback is performed at most once + per snapshot. + + {b NOTE}: Only changes are discarded. Any location newly accessed after + the snapshot was taken will remain recorded in the log as a read-only + entry. *) + + val first : xt:'x t -> (xt:'x t -> 'a) list -> 'a + (** [first ~xt txs] calls each transaction in the given list in turn and + either returns the value returned by the first transaction in the list or + raises {!Retry.Later} in case all of the transactions raised + {!Retry.Later}. + + {b NOTE}: [first] does not automatically rollback changes made by the + transactions. *) + (** {1 Post commit actions} *) val post_commit : xt:'x t -> (unit -> unit) -> unit @@ -270,8 +316,8 @@ module Xt : sig val validate : xt:'x t -> 'a Loc.t -> unit (** [validate ~xt r] determines whether the shared memory location [r] has - been modified outside of the transaction and raises {!Retry.Later} in case - it has. + been modified outside of the transaction and raises {!Retry.Invalid} in + case it has. Due to the possibility of read skew, in cases where some important invariant should hold between two or more different shared memory @@ -297,9 +343,9 @@ module Xt : sig val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and - then returns whatever [tx] returned. [tx] may raise {!Retry.Later} to - explicitly request a retry or any other exception to abort the - transaction. + then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or + {!Retry.Invalid} to explicitly request a retry or any other exception to + abort the transaction. The default for [commit] is {!Mode.obstruction_free}. However, after enough attempts have failed during the verification step, [commit] diff --git a/src/kcas_data/hashtbl.ml b/src/kcas_data/hashtbl.ml index 3f7cdf49..e45a2e91 100644 --- a/src/kcas_data/hashtbl.ml +++ b/src/kcas_data/hashtbl.ml @@ -203,7 +203,7 @@ module Xt = struct let initial_state = Array.length old_buckets in while true do (* If state is modified outside our expensive tx would fail. *) - Retry.unless (Loc.fenceless_get state = initial_state); + if Loc.fenceless_get state != initial_state then Retry.invalid (); rehash_a_few_buckets ~xt done else @@ -217,7 +217,7 @@ module Xt = struct assert (not must_be_done_in_this_tx); let buckets = Xt.get ~xt t.buckets in (* Check state to ensure that buckets have not been updated. *) - Retry.unless (0 <= Loc.fenceless_get state); + if Loc.fenceless_get state < 0 then Retry.invalid (); let snapshot = get_or_alloc snapshot @@ fun () -> Array.make (Array.length buckets) [] @@ -239,7 +239,7 @@ module Xt = struct assert (not must_be_done_in_this_tx); let old_buckets = Xt.get ~xt t.buckets in (* Check state to ensure that buckets have not been updated. *) - Retry.unless (0 <= Loc.fenceless_get state); + if Loc.fenceless_get state < 0 then Retry.invalid (); let new_capacity = Array.length old_buckets in let new_buckets = get_or_alloc new_buckets @@ fun () -> Loc.make_array new_capacity [] diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 7c839b4f..312ec767 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -491,6 +491,42 @@ let test_explicit_validation () = (* *) +let test_rollback () = + let n_iter = 1_000 in + + let n_locs = 20 in + + let locs = Loc.make_array n_locs 0 in + + let accum = ref 0 in + + for _ = 1 to n_iter do + let n_permanent = Random.int n_locs in + let n_rollbacks = Random.int n_locs in + + let tx ~xt = + in_place_shuffle locs; + for i = 0 to n_permanent - 1 do + Xt.incr ~xt locs.(i) + done; + + let snap = Xt.snapshot ~xt in + in_place_shuffle locs; + for i = 0 to n_rollbacks - 1 do + Xt.incr ~xt locs.(i) + done; + Xt.rollback ~xt snap + in + Xt.commit { tx }; + + accum := n_permanent + !accum + done; + + let sum = Array.map Loc.get locs |> Array.fold_left ( + ) 0 in + assert (!accum = sum) + +(* *) + let test_mode () = assert (Loc.get_mode (Loc.make ~mode:Mode.lock_free 0) == Mode.lock_free); assert ( @@ -533,6 +569,7 @@ let () = test_no_unnecessary_wakeups (); test_periodic_validation (); test_explicit_validation (); + test_rollback (); test_mode (); test_xt (); Printf.printf "Test suite OK!\n%!"