diff --git a/CHANGES.md b/CHANGES.md index 46e2ebec..5974e7e0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. +## 0.2.2 + +* New explicit transaction log passing API based on idea by @gasche (@polytypic, review: @samoht and @lyrm) + ## 0.2.1 * New k-CAS-n-CMP algorithm extending the GKMZ algorithm (@polytypic, review: @bartoszmodelski) diff --git a/README.md b/README.md index ab93ba1d..4006a20d 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,8 @@ is distributed under the [ISC license](LICENSE.md). - [A transactional lock-free queue](#a-transactional-lock-free-queue) - [Composing transactions](#composing-transactions) - [About transactions](#about-transactions) + - [Programming with explicit transaction log passing](#programming-with-explicit-transaction-log-passing) + - [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap) - [Development](#development) ## A quick tour @@ -96,6 +98,14 @@ Perform transactions over them: - : unit = () ``` +Explicitly pass a transaction log through a computation: + +```ocaml +# Xt.commit { tx = fun ~xt -> + Xt.set ~xt a (Xt.get ~xt b) } +- : unit = () +``` + And get the answer: ```ocaml @@ -112,10 +122,13 @@ The API of **kcas** is divided into submodules. The main modules are - [`Op`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Op/index.html), providing an interface for _primitive operations_ over multiple shared memory - locations, and + locations, - [`Tx`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html), - providing _composable transactions_ over shared memory locations. + providing _composable transactions_ over shared memory locations, and + +- [`Xt`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html), + providing _explicit transaction log passing_ over shared memory locations. The following sections discuss each of the above in turn. @@ -132,9 +145,10 @@ In other words, an application that uses [`Atomic`](https://v2.ocaml.org/api/Atomic.html), but then needs to perform atomic operations over multiple atomic locations, could theoretically just rebind `module Atomic = Loc` and then use the -[`Op`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Op/index.html) +[`Op`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Op/index.html), +[`Tx`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html), and/or -[`Tx`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html) APIs +[`Xt`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html) APIs to perform operations over multiple locations. This should not be done just-in-case, however, as, even though **kcas** is efficient, it does naturally have higher overhead than the Stdlib @@ -448,7 +462,8 @@ val a_queue : int queue = {front = ; back = } > can cause > [starvation]() as > producers may then be able to always complete their transactions before -> consumers and the back of the queue might grow without bound. +> consumers and the back of the queue might grow without bound. _Can you see a +> way to avoid this problem?_ #### Composing transactions @@ -511,6 +526,205 @@ will essentially perform a [busy-wait](https://en.wikipedia.org/wiki/Busy_waiting), which should usually be avoided. +### Programming with explicit transaction log passing + +The [`Xt`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html) +module provides an API that allows transactions to be implemented by explicitly +passing a mutable transaction log, which allows convenient use of all the +ordinary sequential control flow structures of OCaml. + +> At the moment it is unclear whether both of the +> [`Xt`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html) and +> the [`Tx`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Tx/index.html) +> APIs will be supported in the future. They provide roughly the same expressive +> power as witnessed by the conversions +> [`of_tx`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-of_tx) +> and +> [`to_tx`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-to_tx). +> Feedback on this question is welcome! + +#### A transactional lock-free leftist heap + +Let's implement something a bit more complicated, +[a leftist heap](https://en.wikipedia.org/wiki/Leftist_tree), which is a kind of +priority queue. + +> The implementation here is adapted from the book _Data Structures and +> Algorithm Analysis in C (2nd ed.)_ by Mark Allen Weiss. + +First we define a data type to represent the spine of a leftist heap: + +```ocaml +# type 'v leftist = + [ `Null + | `Node of 'v leftist Loc.t + * int Loc.t + * 'v + * 'v leftist Loc.t ] +type 'v leftist = + [ `Node of 'v leftist Loc.t * int Loc.t * 'v * 'v leftist Loc.t | `Null ] +``` + +To create a leftist heap we +[`make`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-make) +a location with an empty spine: + +```ocaml +# let leftist () : _ leftist Loc.t = Loc.make `Null +val leftist : unit -> 'a leftist Loc.t = +``` + +We then define an auxiliary function `npl_of` to get the null path length of a +leftist heap: + +```ocaml +# let npl_of ~xt : _ leftist -> int = function + | `Null -> 0 + | `Node (_, npl, _, _) -> Xt.get ~xt npl +val npl_of : xt:'a Xt.t -> 'b leftist -> int = +``` + +Notice the `~xt` parameter. It refers to the transaction log being passed +explicitly. Above we pass it to +[`get`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-get) +to record an operation in the log. + +The core operation of leftist heaps is that of merging two leftist heaps: + +```ocaml +# let rec merge ~xt ~lt h1 h2 = + match h1, h2 with + | `Null, h2 -> h2 + | h1, `Null -> h1 + | (`Node (_, _, v1, _) as h1), + (`Node (_, _, v2, _) as h2) -> + let (`Node (h1l, npl, _, h1r) as h1), h2 = + if lt v1 v2 then h1, h2 else h2, h1 in + let l = Xt.get ~xt h1l in + if l == `Null then + Xt.set ~xt h1l h2 + else begin + let r = merge ~xt ~lt (Xt.get ~xt h1r) h2 in + match npl_of ~xt l, npl_of ~xt r with + | l_npl, r_npl when l_npl < r_npl -> + Xt.set ~xt h1l r; + Xt.set ~xt h1r l; + Xt.set ~xt npl (l_npl + 1) + | _, r_npl -> + Xt.set ~xt h1r r; + Xt.set ~xt npl (r_npl + 1) + end; + h1 +val merge : + xt:'a Xt.t -> + lt:('b -> 'b -> bool) -> 'b leftist -> 'b leftist -> 'b leftist = +``` + +Again, `merge` passes the `~xt` parameter explicitly to the +[`get`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-get) +and +[`set`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-set) +operations to record them in the log. + +The `merge` operation can be used to implement both insertion to + +```ocaml +# let insert ~xt ~lt h x = + let h1 = `Node ( + Loc.make `Null, + Loc.make 1, + x, + Loc.make `Null + ) in + Xt.set ~xt h (merge ~xt ~lt h1 (Xt.get ~xt h)) +val insert : + xt:'a Xt.t -> lt:('b -> 'b -> bool) -> 'b leftist Loc.t -> 'b -> unit = + +``` + +and deletion from + +```ocaml +# let delete_min_opt ~xt ~lt h = + match Xt.get ~xt h with + | `Null -> None + | `Node (h1, _, x, h2) -> + Xt.set ~xt h + (merge ~xt ~lt (Xt.get ~xt h1) (Xt.get ~xt h2)); + Some x +val delete_min_opt : + xt:'a Xt.t -> lt:('b -> 'b -> bool) -> 'b leftist Loc.t -> 'b option = + +``` + +a leftist heap. + +Let's then first pick an ordering + +```ocaml +# let lt = (>) +val lt : 'a -> 'a -> bool = +``` + +and create a leftist heap: + +```ocaml +# let a_heap : int leftist Loc.t = leftist () +val a_heap : int leftist Loc.t = +``` + +To populate the heap we need to define a transaction passing function and pass +it to +[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-commit): + +```ocaml +# Xt.commit { tx = fun ~xt -> + List.iter (insert ~xt ~lt a_heap) [3; 1; 4; 1; 5] } +- : unit = () +``` + +Notice that we could simply use `List.iter` from the Stdlib to iterate over a +list of elements. + +> The +> [`{ tx = ... }`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#type-tx) +> wrapper is used to ensure that the transaction function is polymorphic with +> respect to the log. This way the type system makes it difficult to +> accidentally leak the log as described in the paper +> [Lazy Functional State Threads](https://dl.acm.org/doi/10.1145/178243.178246). + +Let's then define a transaction passing function to remove all elements from a +heap + +```ocaml +# let remove_all ~xt ~lt h = + let xs = ref [] in + while match delete_min_opt ~xt ~lt h with + | None -> false + | Some x -> xs := x :: !xs; true do + () + done; + List.rev !xs +val remove_all : + xt:'a Xt.t -> lt:('b -> 'b -> bool) -> 'b leftist Loc.t -> 'b list = +``` + +and use it + +```ocaml +# Xt.commit { tx = remove_all ~lt a_heap } +- : int list = [5; 4; 3; 1; 1] +``` + +on the heap we populated earlier. + +Notice how we were able to use a `while` loop, rather than recursion, in +`remove_all`. + +> This leftist tree implementation is unlikely to be the best performing +> lock-free heap implementation, but it was pretty straightforward to implement +> using k-CAS based on a textbook imperative implementation. + ## Development ### Formatting diff --git a/src/kcas.ml b/src/kcas.ml index c8a7bd05..11382d9d 100644 --- a/src/kcas.ml +++ b/src/kcas.ml @@ -35,7 +35,7 @@ let casn_before = Atomic.make `Before let rec release_after casn = function | NIL -> true | CASN (_, state, lt, gt) -> - release_after casn lt |> ignore; + if lt != NIL then release_after casn lt |> ignore; if not (is_cmp casn state) then ( state.before <- state.after; state.casn <- casn_after); @@ -44,7 +44,7 @@ let rec release_after casn = function let rec release_before casn = function | NIL -> false | CASN (_, state, lt, gt) -> - release_before casn lt |> ignore; + if lt != NIL then release_before casn lt |> ignore; if not (is_cmp casn state) then ( state.after <- state.before; state.casn <- casn_before); @@ -57,12 +57,16 @@ let release casn cass = function let rec verify casn = function | NIL -> `After | CASN (atom, desired, lt, gt) -> ( - match verify casn lt with - | `After -> - if is_cmp casn desired && Atomic.get atom.state != desired then - `Before - else verify casn gt - | `Before -> `Before) + if lt == NIL then + if is_cmp casn desired && Atomic.get atom.state != desired then `Before + else verify casn gt + else + match verify casn lt with + | `After -> + if is_cmp casn desired && Atomic.get atom.state != desired then + `Before + else verify casn gt + | `Before -> `Before) let finish casn (`Undetermined cass as undetermined) (status : determined) = if Atomic.compare_and_set casn (undetermined :> status) (status :> status) @@ -74,7 +78,7 @@ let exit _ = raise Exit [@@inline never] let rec determine casn action = function | NIL -> action | CASN (loc, desired, lt, gt) as eq -> ( - match determine casn action lt with + match if lt != NIL then determine casn action lt else action with | `Before -> `Before | (`After | `Verify) as action -> let current = Atomic.get loc.state in @@ -294,12 +298,17 @@ let update_as g loc f casn state' l r = [@@inline] let update_as g loc f (casn, cass) = + let x = loc.id in match cass with | NIL -> update_as0 g loc f casn NIL NIL + | CASN (a, _, NIL, _) as cass when x < a.id -> + update_as0 g loc f casn NIL cass + | CASN (a, _, _, NIL) as cass when a.id < x -> + update_as0 g loc f casn cass NIL | CASN (loc', state', l, r) when Obj.magic loc' == loc -> update_as g loc f casn state' l r | _ -> ( - match splay loc.id cass with + match splay x cass with | l, Miss, r -> update_as0 g loc f casn l r | l, Hit (_loc', state'), r -> update_as g loc f casn state' l r) [@@inline] @@ -378,3 +387,105 @@ module Tx = struct let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx = commit backoff mode tx end + +module Xt = struct + type 'x t = { casn : casn; mutable cass : cass } + + let update0 loc f xt l r = + let state = Atomic.get loc.state in + let before = eval state in + let after = f before in + let state = + if before == after && is_obstruction_free xt.casn then state + else { before; after; casn = xt.casn } + in + xt.cass <- CASN (loc, state, l, r); + before + [@@inline] + + let update loc f xt state' l r = + let state = Obj.magic state' in + if is_cmp xt.casn state then ( + let before = eval state in + let after = f before in + let state = + if before == after then state else { before; after; casn = xt.casn } + in + xt.cass <- CASN (loc, state, l, r); + before) + else + let current = state.after in + xt.cass <- CASN (loc, { state with after = f current }, l, r); + current + [@@inline] + + let update loc f xt = + let x = loc.id in + match xt.cass with + | NIL -> update0 loc f xt NIL NIL + | CASN (a, _, NIL, _) as cass when x < a.id -> update0 loc f xt NIL cass + | CASN (a, _, _, NIL) as cass when a.id < x -> update0 loc f xt cass NIL + | CASN (loc', state', l, r) when Obj.magic loc' == loc -> + update loc f xt state' l r + | cass -> ( + match splay x cass with + | l, Miss, r -> update0 loc f xt l r + | l, Hit (_loc', state'), r -> update loc f xt state' l r) + [@@inline] + + let protect xt f x = + let cass = xt.cass in + let y = f x in + assert (xt.cass == cass); + y + [@@inline] + + let get ~xt loc = update loc Fun.id xt + let set ~xt loc after = update loc (fun _ -> after) xt |> ignore + let modify ~xt loc f = update loc (protect xt f) xt |> ignore + let exchange ~xt loc after = update loc (fun _ -> after) xt + let fetch_and_add ~xt loc n = update loc (( + ) n) xt + let incr ~xt loc = fetch_and_add ~xt loc 1 |> ignore + let decr ~xt loc = fetch_and_add ~xt loc (-1) |> ignore + let update ~xt loc f = update loc (protect xt f) xt + + type 'a tx = { tx : 'x. xt:'x t -> 'a } [@@unboxed] + + let call { tx } = tx [@@inline] + + let attempt (mode : Mode.t) tx = + let xt = { casn = Atomic.make (mode :> status); cass = NIL } in + let result = tx ~xt in + match xt.cass with + | NIL -> result + | CASN (loc, state, NIL, NIL) -> + if is_cmp xt.casn state then result + else + let before = state.before in + state.before <- state.after; + if cas loc before state then result else exit () + | cass -> if determine_for_owner xt.casn cass then result else exit () + + let rec commit backoff mode tx = + match attempt mode tx with + | result -> result + | exception Mode.Interference -> + commit (Backoff.once backoff) Mode.lock_free tx + | exception Exit -> commit (Backoff.once backoff) mode tx + + let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx = + commit backoff mode tx.tx + [@@inline] + + let attempt ?(mode = Mode.lock_free) tx = attempt mode tx.tx [@@inline] + + let of_tx tx ~xt = + let (_, cass), x = tx (xt.casn, xt.cass) in + xt.cass <- cass; + x + + let to_tx tx (casn, cass) = + let xt = { casn; cass } in + let x = tx.tx ~xt in + ((xt.casn, xt.cass), x) +end diff --git a/src/kcas.mli b/src/kcas.mli index e40b7192..72d669d3 100644 --- a/src/kcas.mli +++ b/src/kcas.mli @@ -1,6 +1,13 @@ +(** {1 Auxiliary modules} *) + module Backoff : module type of Backoff (** Randomized exponential backoff mechanism. *) +(** {1 Individual locations} + + Individual shared memory locations are manipulated through the {!Loc} module + that is essentially compatible with the Stdlib [Atomic] module. *) + (** Shared memory locations. *) module Loc : sig type 'a t @@ -45,6 +52,30 @@ module Loc : sig (** [decr r] atomically decrements [r]. *) end +(** {1 Manipulating multiple locations atomically} + + Multiple shared memory locations can be manipulated atomically using either + + - {!Op}, to specify a list of primitive operations to perform, + - {!Tx}, to specify a composable transaction, or + - {!Xt}, to explicitly pass a transaction log to record accesses. + + Atomic operations over multiple shared memory locations are performed in two + or three phases: + + 1. The first phase essentially records a list or log of operations to access + shared memory locations. + + 2. The second phase attempts to perform the operations atomically. + + 3. In {!Mode.obstruction_free} a third phase verifies all read-only + operations. + + Each phase may fail. In particular, in the first phase, as no changes to + shared memory have yet been attempted, it is safe, for example, to raise + exceptions to signal failure. Failure on the third phase raises + {!Mode.Interference}. *) + (** Operating modes of the [k-CAS-n-CMP] algorithm. *) module Mode : sig type t @@ -52,12 +83,13 @@ module Mode : sig val lock_free : t (** In [lock_free] mode the algorithm makes sure that at least one domain will - be able to make progress. *) + be able to make progress by performing read-only operations as read-write + operations. *) val obstruction_free : t (** In [obstruction_free] mode the algorithm proceeds optimistically and - allows operations to fail due to {!Interference} from other domains that - might have been prevented in the {!lock_free} mode. *) + allows read-only operations to fail due to {!Interference} from other + domains that might have been prevented in the {!lock_free} mode. *) exception Interference (** Exception raised when interference from other domains is detected in the @@ -119,20 +151,6 @@ module Tx : sig Transactions can be composed both sequentially (see {!let*}) and conditionally (see {!(<|>)} and {!forget}). - Transactions are performed in two or three phases: - - 1. The first phase essentially records a log of operations based on the - {!get} and {!set} accesses of shared memory locations. - - 2. The second phase attempts to perform the operations atomically. - - 3. In {!Mode.obstruction_free} a third phase verifies all read-only - operations. - - Each phase may fail. In particular, the first phase is allowed to raise - exceptions to signal failure. Failure on the third phase raises - {!Mode.Interference}. - Here is an example of unconditionally {!commit}ting a transaction that swaps the values of the two shared memory locations [x_loc] and [y_loc] and returns their sum: @@ -198,6 +216,8 @@ module Tx : sig choice of whether accesses should, should not, or may be safely forgotten depends on what the desired semantics are for the transaction. *) + (** {1 Access combinators} *) + val get : 'a Loc.t -> 'a t (** [get r] accesses the shared memory location [r] inside the transaction and results in the current value of [r] inside the transaction. *) @@ -225,6 +245,8 @@ module Tx : sig val exchange_as : ('a -> 'b) -> 'a Loc.t -> 'a -> 'b t (** [exchange_as g r v] is equivalent to [update r (fun _ -> v) |> map g]. *) + (** {1 Sequencing combinators} *) + val return : 'a -> 'a t (** [return v] is a transactional operation whose result is the value [v]. *) @@ -283,6 +305,8 @@ module Tx : sig (* continue successfully *) ]} *) + (** {1 Conditional transactions} *) + val ( <|> ) : 'a t -> 'a t -> 'a t (** [l_tx <|> r_tx] is a left-biased choice between the two transactions [l_tx] and [r_tx]. @@ -303,6 +327,8 @@ module Tx : sig transaction to forget any accesses made during the forgotten part of a transaction. *) + (** {1 Performing transactions} *) + val attempt : ?mode:Mode.t -> 'a t -> 'a (** [attempt tx] attempts to atomically perform the given transaction over shared memory locations. If used in {!Mode.obstruction_free} may raise @@ -323,3 +349,92 @@ module Tx : sig transaction mechanism has no way to intelligently wait until shared memory locations are modified by other domains. *) end + +(** Explicit transaction log passing on shared memory locations. *) +module Xt : sig + type 'x t + (** Type of an explicit transaction log on shared memory locations. + + Note that a transaction log itself is not domain-safe and should generally + only be used by a single domain. If a new domain is spawned inside a + function recording shared memory accesses to a log and the new domain also + records accesses to the log it may become inconsistent. *) + + (** {1 Recording accesses} + + Accesses of shared memory locations using an explicit transaction log + first ensure that the initial value of the shared memory location is + recorded in the log and then act on the current value of the shared memory + location as recorded in the log. *) + + val get : xt:'x t -> 'a Loc.t -> 'a + (** [get ~xt r] returns the current value of the shared memory location [r] in + the explicit transaction log [xt]. *) + + val set : xt:'x t -> 'a Loc.t -> 'a -> unit + (** [set ~xt r v] records the current value of the shared memory location [r] + to be the given value [v] in the explicit transaction log [xt]. *) + + val update : xt:'x t -> 'a Loc.t -> ('a -> 'a) -> 'a + (** [update ~xt r f] is equivalent to [let x = get ~xt r in set ~xt r (f x); + x] with the limitation that [f] must not and is not allowed to record + accesses to the transaction log. *) + + val modify : xt:'x t -> 'a Loc.t -> ('a -> 'a) -> unit + (** [modify ~xt r f] is equivalent to [let x = get ~xt r in set ~xt r (f x)] + with the limitation that [f] must not and is not allowed to record + accesses to the transaction log. *) + + val exchange : xt:'x t -> 'a Loc.t -> 'a -> 'a + (** [exchange ~xt r v] is equivalent to [update ~xt r (fun _ -> v)]. *) + + val fetch_and_add : xt:'c t -> int Loc.t -> int -> int + (** [fetch_and_add ~xt r n] is equivalent to [update ~xt r ((+) n)]. *) + + val incr : xt:'x t -> int Loc.t -> unit + (** [incr ~xt r] is equivalent to [fetch_and_add ~xt r 1 |> ignore]. *) + + val decr : xt:'x t -> int Loc.t -> unit + (** [decr ~xt r] is equivalent to [fetch_and_add ~xt r (-1) |> ignore]. *) + + (** {1 Performing accesses} *) + + type 'a tx = { tx : 'x. xt:'x t -> 'a } [@@unboxed] + (** Type of a transaction function that is polymorphic with respect to an + explicit transaction log. The universal quantification helps to ensure + that the transaction log cannot accidentally escape. *) + + val call : 'a tx -> xt:'x t -> 'a + (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) + + val attempt : ?mode:Mode.t -> 'a tx -> 'a + (** [attempt tx] attempts to atomically perform the transaction over shared + memory locations recorded by calling [tx] with a fresh explicit + transaction log. If used in {!Mode.obstruction_free} may raise + {!Mode.Interference}. Otherwise either raises [Exit] on failure to commit + the transaction or returns the result of the transaction. The default for + [attempt] is {!Mode.lock_free}. *) + + val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + (** [commit tx] repeats [attempt tx] until it does not raise [Exit] or + {!Mode.Interference} and then either returns or raises whatever attempt + returned or raised. + + The default for [commit] is {!Mode.obstruction_free}. However, after + enough attempts have failed during the verification step, [commit] + switches to {!Mode.lock_free}. + + Note that, aside from using exponential backoff to reduce contention, the + transaction mechanism has no way to intelligently wait until shared memory + locations are modified by other domains. *) + + (** {1 Conversions} *) + + val of_tx : 'a Tx.t -> xt:'x t -> 'a + (** [of_tx tx] converts the given {!Tx} transaction [tx] to an explicit log + passing function. *) + + val to_tx : 'a tx -> 'a Tx.t + (** [to_tx tx] converts the given explicit log passing function [tx] to a + {!Tx} transaction. *) +end diff --git a/test/dune b/test/dune index fcb1fe0c..e3c3f765 100644 --- a/test/dune +++ b/test/dune @@ -2,6 +2,16 @@ (name barrier) (modules barrier)) +(library + (name tx_linked_queue) + (libraries kcas) + (modules tx_linked_queue)) + +(library + (name xt_three_stack_queue) + (libraries kcas) + (modules xt_three_stack_queue)) + (test (name test) (libraries kcas barrier) @@ -9,8 +19,13 @@ (test (name tx_test) - (libraries kcas) - (modules tx_test tx_linked_queue tx_two_stack_queue tx_stack)) + (libraries kcas tx_linked_queue xt_three_stack_queue) + (modules tx_test tx_stack)) + +(test + (name xt_test) + (libraries kcas tx_linked_queue xt_three_stack_queue) + (modules xt_test xt_stack)) (test (name test_overlapping_loc) @@ -32,6 +47,11 @@ (libraries kcas barrier) (modules tx_parallel_cmp_bench)) +(test + (name xt_parallel_cmp_bench) + (libraries kcas barrier) + (modules xt_parallel_cmp_bench)) + (test (name example) (libraries kcas) diff --git a/test/tx_test.ml b/test/tx_test.ml index 46e0de64..673e368f 100644 --- a/test/tx_test.ml +++ b/test/tx_test.ml @@ -1,5 +1,17 @@ -module Tx = Kcas.Tx -module Q = Tx_two_stack_queue +open Kcas + +module Q = struct + include Xt_three_stack_queue + + let is_empty q = Xt.to_tx { tx = is_empty q } + let push_back q x = Xt.to_tx { tx = push_back q x } + let push_front q x = Xt.to_tx { tx = push_front q x } + + let pop_front q = + Xt.to_tx { tx = pop_front_opt q } + |> Tx.map @@ function None -> raise Exit | Some x -> x +end + module P = Tx_linked_queue module S = Tx_stack diff --git a/test/tx_two_stack_queue.ml b/test/tx_two_stack_queue.ml deleted file mode 100644 index 16c3d98a..00000000 --- a/test/tx_two_stack_queue.ml +++ /dev/null @@ -1,25 +0,0 @@ -module Loc = Kcas.Loc -module Tx = Kcas.Tx - -type 'a t = { front : 'a list Loc.t; back : 'a list Loc.t } - -let create () = { front = Loc.make []; back = Loc.make [] } - -let is_empty q = - Tx.( - get q.front >>= function - | _ :: _ -> return false - | [] -> q.back |> get_as @@ ( == ) []) - -let push_front q x = Tx.modify q.front @@ List.cons x -let push_back q x = Tx.modify q.back @@ List.cons x -let tl_safe = function [] -> [] | _ :: xs -> xs - -let pop_front q = - Tx.( - update q.front tl_safe >>= function - | x :: _ -> return x - | [] -> ( - exchange_as List.rev q.back [] >>= function - | [] -> forget - | x :: xs -> set q.front xs >>. x)) diff --git a/test/tx_two_stack_queue.mli b/test/tx_two_stack_queue.mli deleted file mode 100644 index 7170d958..00000000 --- a/test/tx_two_stack_queue.mli +++ /dev/null @@ -1,7 +0,0 @@ -type 'a t - -val create : unit -> 'a t -val is_empty : 'a t -> bool Kcas.Tx.t -val push_front : 'a t -> 'a -> unit Kcas.Tx.t -val push_back : 'a t -> 'a -> unit Kcas.Tx.t -val pop_front : 'a t -> 'a Kcas.Tx.t diff --git a/test/xt_parallel_cmp_bench.ml b/test/xt_parallel_cmp_bench.ml new file mode 100644 index 00000000..6587df84 --- /dev/null +++ b/test/xt_parallel_cmp_bench.ml @@ -0,0 +1,55 @@ +module Loc = Kcas.Loc +module Xt = Kcas.Xt + +let parallel_cmp_benchmark n_iter = + let barrier = Barrier.make 2 in + + let a = Loc.make 10 and b = Loc.make 52 in + + let thread1 () = + let x = Loc.make 0 in + let tx1 ~xt = + let a = Xt.get ~xt a and b = Xt.get ~xt b in + Xt.set ~xt x (b - a) + and tx2 ~xt = + let a = Xt.get ~xt a and b = Xt.get ~xt b in + Xt.set ~xt x (a + b) + in + let tx1 = { Xt.tx = tx1 } and tx2 = { Xt.tx = tx2 } in + Barrier.await barrier; + let start = Unix.gettimeofday () in + for _ = 1 to n_iter do + Xt.commit tx1; + Xt.commit tx2 + done; + Unix.gettimeofday () -. start + and thread2 () = + let y = Loc.make 0 in + let tx1 ~xt = + let a = Xt.get ~xt a and b = Xt.get ~xt b in + Xt.set ~xt y (b - a) + and tx2 ~xt = + let a = Xt.get ~xt a and b = Xt.get ~xt b in + Xt.set ~xt y (a + b) + in + let tx1 = { Xt.tx = tx1 } and tx2 = { Xt.tx = tx2 } in + Barrier.await barrier; + let start = Unix.gettimeofday () in + for _ = 1 to n_iter do + Xt.commit tx1; + Xt.commit tx2 + done; + Unix.gettimeofday () -. start + in + + let total = + [ thread1; thread2 ] |> List.map Domain.spawn |> List.map Domain.join + |> List.fold_left ( +. ) 0.0 + in + + Printf.printf "%f ns/tx\n" + (1_000_000_000.0 *. total /. Float.of_int (4 * n_iter)) + +let () = + let n_iter = try int_of_string Sys.argv.(1) with _ -> 1_000_000 in + parallel_cmp_benchmark n_iter diff --git a/test/xt_stack.ml b/test/xt_stack.ml new file mode 100644 index 00000000..de1fe075 --- /dev/null +++ b/test/xt_stack.ml @@ -0,0 +1,13 @@ +module Loc = Kcas.Loc +module Xt = Kcas.Xt + +type 'a t = 'a list Loc.t + +let create () = Loc.make [] +let is_empty ~xt s = Xt.get ~xt s == [] +let push ~xt s x = Xt.modify ~xt s @@ List.cons x + +let pop_opt ~xt s = + match Xt.update ~xt s @@ function [] -> [] | _ :: xs -> xs with + | [] -> None + | x :: _ -> Some x diff --git a/test/xt_stack.mli b/test/xt_stack.mli new file mode 100644 index 00000000..faac78c3 --- /dev/null +++ b/test/xt_stack.mli @@ -0,0 +1,6 @@ +type 'a t + +val create : unit -> 'a t +val is_empty : xt:'x Kcas.Xt.t -> 'a t -> bool +val push : xt:'x Kcas.Xt.t -> 'a t -> 'a -> unit +val pop_opt : xt:'x Kcas.Xt.t -> 'a t -> 'a option diff --git a/test/xt_test.ml b/test/xt_test.ml new file mode 100644 index 00000000..8ea75c8d --- /dev/null +++ b/test/xt_test.ml @@ -0,0 +1,47 @@ +open Kcas + +module Q = struct + include Tx_linked_queue + + let is_empty ~xt q = Xt.of_tx ~xt @@ is_empty q + let push_back ~xt q x = Xt.of_tx ~xt @@ push_back q x + let push_front ~xt q x = Xt.of_tx ~xt @@ push_front q x + + let pop_front_opt ~xt q = + Xt.of_tx ~xt Tx.(pop_front q |> map Option.some <|> return None) +end + +module P = Xt_three_stack_queue +module S = Xt_stack + +let () = + let p = P.create () and q = Q.create () and s = S.create () in + + (* Populate [p] with two items atomically *) + let tx ~xt = + P.push_front ~xt p 4; + P.push_back ~xt p 1 + in + Xt.commit { tx }; + + Xt.commit { tx = P.push_back p 3 }; + + assert (not (Xt.commit { tx = P.is_empty p })); + + (* Transfer item from [p] queue to [q] queue atomically *) + let tx ~xt = P.pop_front_opt ~xt p |> Option.iter @@ Q.push_back ~xt q in + Xt.commit { tx }; + + assert (Xt.commit { tx = Q.pop_front_opt q } = Some 4); + assert (Xt.commit { tx = Q.is_empty q }); + + (* Transfer item from queue [p] to stack [s] atomically *) + Xt.commit + { tx = (fun ~xt -> P.pop_front_opt ~xt p |> Option.iter @@ S.push ~xt s) }; + + assert (Xt.commit { tx = S.pop_opt s } = Some 1); + assert (Xt.commit { tx = P.pop_front_opt p } = Some 3); + assert (Xt.commit { tx = P.is_empty p }); + + Xt.commit { tx = Q.push_front q 101 }; + assert (not (Xt.commit { tx = Q.is_empty q })) diff --git a/test/xt_three_stack_queue.ml b/test/xt_three_stack_queue.ml new file mode 100644 index 00000000..eeb2e20e --- /dev/null +++ b/test/xt_three_stack_queue.ml @@ -0,0 +1,44 @@ +module Loc = Kcas.Loc +module Xt = Kcas.Xt + +type 'a t = { + back : 'a list Loc.t; + middle : 'a list Loc.t; + front : 'a list Loc.t; +} + +let create () = + let back = Loc.make [] and middle = Loc.make [] and front = Loc.make [] in + { back; middle; front } + +let is_empty ~xt q = + Xt.get ~xt q.back == [] + && Xt.get ~xt q.middle == [] + && Xt.get ~xt q.front == [] + +let push_front ~xt q x = Xt.modify ~xt q.front @@ List.cons x +let push_back ~xt q x = Xt.modify ~xt q.back @@ List.cons x + +let back_to_middle q = + let tx ~xt = + match Xt.exchange ~xt q.back [] with + | [] -> raise Not_found + | xs -> if Xt.exchange ~xt q.middle xs != [] then raise Not_found + in + try Xt.commit { tx } with Not_found -> () + +let pop_front_opt ~xt q = + match Xt.update ~xt q.front @@ function [] -> [] | _ :: xs -> xs with + | x :: _ -> Some x + | [] -> ( + back_to_middle q; + match Xt.exchange ~xt q.middle [] |> List.rev with + | x :: xs -> + Xt.set ~xt q.front xs; + Some x + | [] -> ( + match Xt.exchange ~xt q.back [] |> List.rev with + | x :: xs -> + Xt.set ~xt q.front xs; + Some x + | [] -> None)) diff --git a/test/xt_three_stack_queue.mli b/test/xt_three_stack_queue.mli new file mode 100644 index 00000000..6d706d87 --- /dev/null +++ b/test/xt_three_stack_queue.mli @@ -0,0 +1,7 @@ +type 'a t + +val create : unit -> 'a t +val is_empty : xt:'x Kcas.Xt.t -> 'a t -> bool +val push_front : xt:'x Kcas.Xt.t -> 'a t -> 'a -> unit +val push_back : xt:'x Kcas.Xt.t -> 'a t -> 'a -> unit +val pop_front_opt : xt:'x Kcas.Xt.t -> 'a t -> 'a option