Skip to content

Commit

Permalink
Nested transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed May 14, 2023
1 parent 8cd364e commit c5a74e4
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 11 deletions.
58 changes: 56 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ is distributed under the [ISC license](LICENSE.md).
- [Designing lock-free algorithms with k-CAS](#designing-lock-free-algorithms-with-k-cas)
- [Minimize accesses](#minimize-accesses)
- [Prefer compound accesses](#prefer-compound-accesses)
- [Log updates optimistically and abort](#log-updates-optimistically-and-abort)
- [Log updates optimistically](#log-updates-optimistically)
- [Postcompute](#postcompute)
- [Race to cooperate](#race-to-cooperate)
- [Understanding transactions](#understanding-transactions)
Expand Down Expand Up @@ -975,7 +975,7 @@ The above will likely perform slightly better.
> [`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-update),
> that only performs a single access to the transaction log.
#### Log updates optimistically and abort
#### Log updates optimistically

Transactional write accesses to shared memory locations are only attempted after
the transaction log construction finishes successfully. Therefore it is entirely
Expand Down Expand Up @@ -1031,6 +1031,60 @@ can see

the updates are only done in case of success.

A problem with the `transfer` function above is that it is not a composable
transaction. The transaction mechanism provided by **kcas** does not implicitly
perform rollbacks of changes made to locations, but it does offer low level
support for nested conditional transactions.

By explicitly calling
[`snapshot`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-snapshot)
and
[`rollback`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-rollback)
one can scope tentative changes and create a composable version of `transfer`:

```ocaml
# let transfer ~xt amount ~source ~target =
let snap = Xt.snapshot ~xt in
if Xt.fetch_and_add ~xt source (-amount) < amount then
Retry.later (Xt.rollback ~xt snap);
Xt.fetch_and_add ~xt target amount |> ignore
val transfer :
xt:'a Xt.t -> int -> source:int Loc.t -> target:int Loc.t -> unit = <fun>
```

Given a bunch of locations

```ocaml
let a = Loc.make 10
and b = Loc.make 20
and c = Loc.make 30
and d = Loc.make 27
```

we can now attempt `transfer`s and perform the
[`first`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-first)
of them that succeeds:

```ocaml
# Xt.commit {
tx = Xt.first [
transfer 15 ~source:a ~target:d;
transfer 15 ~source:b ~target:d;
transfer 15 ~source:c ~target:d;
]
}
- : unit = ()
```

A look at the locations

```ocaml
# List.map Loc.get [a; b; c; d]
- : int list = [10; 5; 30; 42]
```

confirms the expected result.

### Postcompute

The more time a transaction takes, the more likely it is to suffer from
Expand Down
47 changes: 46 additions & 1 deletion src/kcas/kcas.ml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ module Retry = struct

let later () = raise Later [@@inline never]
let unless condition = if not condition then later () [@@inline]

exception Invalid

let invalid () = raise Invalid [@@inline never]
end

let add_awaiter loc before awaiter =
Expand Down Expand Up @@ -506,7 +510,7 @@ module Xt = struct

let validate_one casn loc state =
let before = if is_cmp casn state then eval state else state.before in
if before != eval (fenceless_get (as_atomic loc)) then Retry.later ()
if before != eval (fenceless_get (as_atomic loc)) then Retry.invalid ()
[@@inline]

let rec validate_all casn = function
Expand Down Expand Up @@ -636,6 +640,45 @@ module Xt = struct
Obj.magic a == loc
| _, Miss, _ -> impossible ())

let rec rollback casn cass_snap cass =
if cass_snap == cass then cass
else
match cass with
| NIL -> NIL
| CASN { loc; state; lt; gt; _ } -> (
match splay ~hit_parent:false loc.id cass_snap with
| lt_mark, Miss, gt_mark ->
let lt = rollback casn lt_mark lt
and gt = rollback casn gt_mark gt in
let state =
if is_cmp casn state then state
else
let current = fenceless_get (as_atomic loc) in
if state.before != eval current then Retry.invalid ()
else current
in
CASN { loc; state; lt; gt; awaiters = [] }
| lt_mark, Hit (loc, state), gt_mark ->
let lt = rollback casn lt_mark lt
and gt = rollback casn gt_mark gt in
CASN { loc; state; lt; gt; awaiters = [] })

type 'x snap = cass

let snapshot ~xt = xt.cass
let rollback ~xt snap = xt.cass <- rollback xt.casn snap xt.cass

let rec first ~xt tx = function
| [] -> tx ~xt
| tx' :: txs -> (
match tx ~xt with
| value -> value
| exception Retry.Later -> first ~xt tx' txs)

let first ~xt = function
| [] -> Retry.later ()
| tx :: txs -> first ~xt tx txs

type 'a tx = { tx : 'x. xt:'x t -> 'a } [@@unboxed]

let call { tx } = tx [@@inline]
Expand Down Expand Up @@ -697,6 +740,8 @@ module Xt = struct
| exception Mode.Interference ->
commit (Backoff.once backoff) Mode.lock_free
(reset Mode.lock_free xt) tx))
| exception Retry.Invalid ->
commit (Backoff.once backoff) mode (reset_quick xt) tx
| exception Retry.Later -> (
if xt.cass == NIL then invalid_retry ();
let t = Domain_local_await.prepare_for_await () in
Expand Down
56 changes: 51 additions & 5 deletions src/kcas/kcas.mli
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ module Retry : sig

val unless : bool -> unit
(** [unless condition] is equivalent to [if not condition then later ()]. *)

exception Invalid
(** Exception that may be raised to signal that the transaction log is no
longer valid, e.g. because shared memory locations have been changed
outside of the transaction, and the transaction should be retried. *)

val invalid : unit -> 'a
(** [invalid ()] is equivalent to [raise Invalid]. *)
end

(** Operating modes of the [k-CAS-n-CMP] algorithm. *)
Expand Down Expand Up @@ -260,6 +268,44 @@ module Xt : sig
(** [to_nonblocking ~xt tx] converts the blocking transaction [tx] to a
non-blocking transaction by returning [None] on retry. *)

(** {1 Nested transactions}
The transaction mechanism does not implicitly rollback changes recorded in
the transaction log. Using {!snapshot} and {!rollback} it is possible to
implement nested conditional transactions that may tentatively record
changes in the transaction log and then later discard those changes. *)

type 'x snap
(** Type of a {!snapshot} of a transaction log. *)

val snapshot : xt:'x t -> 'x snap
(** [snapshot ~xt] returns a snapshot of the transaction log.
Taking a snapshot is a fast constant time [O(1)] operation. *)

val rollback : xt:'x t -> 'x snap -> unit
(** [rollback ~xt snap] discards any changes of shared memory locations
recorded in the transaction log after the [snap] was taken by {!snapshot}.
Performing a rollback is potentially as expensive as linear time [O(n)] in
the number of locations accessed, but, depending on the exact access
patterns, may also be performed more quickly. The implementation is
optimized with the assumption that a rollback is performed at most once
per snapshot.
{b NOTE}: Only changes are discarded. Any location newly accessed after
the snapshot was taken will remain recorded in the log as a read-only
entry. *)

val first : xt:'x t -> (xt:'x t -> 'a) list -> 'a
(** [first ~xt txs] calls each transaction in the given list in turn and
either returns the value returned by the first transaction in the list or
raises {!Retry.Later} in case all of the transactions raised
{!Retry.Later}.
{b NOTE}: [first] does not automatically rollback changes made by the
transactions. *)

(** {1 Post commit actions} *)

val post_commit : xt:'x t -> (unit -> unit) -> unit
Expand All @@ -270,8 +316,8 @@ module Xt : sig

val validate : xt:'x t -> 'a Loc.t -> unit
(** [validate ~xt r] determines whether the shared memory location [r] has
been modified outside of the transaction and raises {!Retry.Later} in case
it has.
been modified outside of the transaction and raises {!Retry.Invalid} in
case it has.
Due to the possibility of read skew, in cases where some important
invariant should hold between two or more different shared memory
Expand All @@ -297,9 +343,9 @@ module Xt : sig
val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a
(** [commit tx] repeatedly calls [tx] to record a log of shared memory
accesses and attempts to perform them atomically until it succeeds and
then returns whatever [tx] returned. [tx] may raise {!Retry.Later} to
explicitly request a retry or any other exception to abort the
transaction.
then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or
{!Retry.Invalid} to explicitly request a retry or any other exception to
abort the transaction.
The default for [commit] is {!Mode.obstruction_free}. However, after
enough attempts have failed during the verification step, [commit]
Expand Down
6 changes: 3 additions & 3 deletions src/kcas_data/hashtbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ module Xt = struct
let initial_state = Array.length old_buckets in
while true do
(* If state is modified outside our expensive tx would fail. *)
Retry.unless (Loc.fenceless_get state = initial_state);
if Loc.fenceless_get state != initial_state then Retry.invalid ();
rehash_a_few_buckets ~xt
done
else
Expand All @@ -217,7 +217,7 @@ module Xt = struct
assert (not must_be_done_in_this_tx);
let buckets = Xt.get ~xt t.buckets in
(* Check state to ensure that buckets have not been updated. *)
Retry.unless (0 <= Loc.fenceless_get state);
if Loc.fenceless_get state < 0 then Retry.invalid ();
let snapshot =
get_or_alloc snapshot @@ fun () ->
Array.make (Array.length buckets) []
Expand All @@ -239,7 +239,7 @@ module Xt = struct
assert (not must_be_done_in_this_tx);
let old_buckets = Xt.get ~xt t.buckets in
(* Check state to ensure that buckets have not been updated. *)
Retry.unless (0 <= Loc.fenceless_get state);
if Loc.fenceless_get state < 0 then Retry.invalid ();
let new_capacity = Array.length old_buckets in
let new_buckets =
get_or_alloc new_buckets @@ fun () -> Loc.make_array new_capacity []
Expand Down
37 changes: 37 additions & 0 deletions test/kcas/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,42 @@ let test_explicit_validation () =

(* *)

let test_rollback () =
let n_iter = 1_000 in

let n_locs = 20 in

let locs = Loc.make_array n_locs 0 in

let accum = ref 0 in

for _ = 1 to n_iter do
let n_permanent = Random.int n_locs in
let n_rollbacks = Random.int n_locs in

let tx ~xt =
in_place_shuffle locs;
for i = 0 to n_permanent - 1 do
Xt.incr ~xt locs.(i)
done;

let snap = Xt.snapshot ~xt in
in_place_shuffle locs;
for i = 0 to n_rollbacks - 1 do
Xt.incr ~xt locs.(i)
done;
Xt.rollback ~xt snap
in
Xt.commit { tx };

accum := n_permanent + !accum
done;

let sum = Array.map Loc.get locs |> Array.fold_left ( + ) 0 in
assert (!accum = sum)

(* *)

let test_mode () =
assert (Loc.get_mode (Loc.make ~mode:Mode.lock_free 0) == Mode.lock_free);
assert (
Expand Down Expand Up @@ -533,6 +569,7 @@ let () =
test_no_unnecessary_wakeups ();
test_periodic_validation ();
test_explicit_validation ();
test_rollback ();
test_mode ();
test_xt ();
Printf.printf "Test suite OK!\n%!"
Expand Down

0 comments on commit c5a74e4

Please sign in to comment.