Skip to content

Commit

Permalink
Merge pull request #229 from nhs-riak/gsets
Browse files Browse the repository at this point in the history
add gsets support
  • Loading branch information
russelldb authored Mar 22, 2018
2 parents 5a6bcbc + f172a07 commit 3f65a02
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 5 deletions.
16 changes: 14 additions & 2 deletions src/riak_dt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ message DtValue {
optional sint64 counter_value = 1;
repeated bytes set_value = 2;
repeated MapEntry map_value = 3;

/* We return an estimated cardinality of the Hyperloglog set
* on fetch.
*/
optional uint64 hll_value = 4;
repeated bytes gset_value = 5;
}


Expand All @@ -130,6 +130,7 @@ message DtFetchResp {
SET = 2;
MAP = 3;
HLL = 4;
GSET = 5;
}

optional bytes context = 1;
Expand Down Expand Up @@ -160,6 +161,15 @@ message SetOp {
repeated bytes removes = 2;
}

/*
* An operation to update a GSet, on its own.
* GSet members are opaque binary values, you can only add
* them to a Set.
*/
message GSetOp {
repeated bytes adds = 1;
}

/*
* An operation to update a Hyperloglog Set, a top-level DT.
* You can only add to a HllSet.
Expand Down Expand Up @@ -195,6 +205,7 @@ message MapUpdate {
optional bytes register_op = 4;
optional FlagOp flag_op = 5;
optional MapOp map_op = 6;

}

/*
Expand All @@ -219,11 +230,11 @@ message DtOp {
optional CounterOp counter_op = 1;
optional SetOp set_op = 2;
optional MapOp map_op = 3;

/* Adding values to a hyperloglog (set) is just like adding values
* to a set.
*/
optional HllOp hll_op = 4;
optional GSetOp gset_op = 5;
}

/*
Expand Down Expand Up @@ -272,4 +283,5 @@ message DtUpdateResp {
repeated bytes set_value = 4;
repeated MapEntry map_value = 5;
optional uint64 hll_value = 6;
repeated bytes gset_value = 7;
}
43 changes: 40 additions & 3 deletions src/riak_pb_dt_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

-include("riak_dt_pb.hrl").

-include_lib("eunit/include/eunit.hrl").

-export([
encode_fetch_request/2,
encode_fetch_request/3,
Expand All @@ -49,33 +51,36 @@
-type counter_value() :: integer().
-type set_value() :: [ binary() ].
-type hll_value() :: number().
-type gset_value() :: [ binary() ].
-type register_value() :: binary().
-type flag_value() :: boolean().
-type map_entry() :: {map_field(), embedded_value()}.
-type map_field() :: {binary(), embedded_type()}.
-type map_value() :: [ map_entry() ].
-type embedded_value() :: counter_value() | set_value() | register_value()
| flag_value() | map_value().
-type toplevel_value() :: counter_value() | set_value() | map_value()
-type toplevel_value() :: counter_value() | gset_value() | set_value() | map_value()
| hll_value() | undefined.
-type fetch_response() :: {toplevel_type(), toplevel_value(), context()}.

%% Type names as atoms
-type embedded_type() :: counter | set | register | flag | map.
-type toplevel_type() :: counter | set | map | hll.
-type toplevel_type() :: counter | gset | set | map | hll.
-type all_type() :: toplevel_type() | embedded_type().

%% Operations
-type counter_op() :: increment | decrement | {increment | decrement, integer()}.
-type simple_set_op() :: {add, binary()} | {remove, binary()} | {add_all, [binary()]} | {remove_all, [binary()]}.
-type set_op() :: simple_set_op() | {update, [simple_set_op()]}.
-type hll_op() :: {add, binary()} | {add_all, [binary()]}.
-type simple_gset_op() :: {add, binary()} | {add_all, [binary()]}.
-type gset_op() :: simple_gset_op().
-type flag_op() :: enable | disable.
-type register_op() :: {assign, binary()}.
-type simple_map_op() :: {remove, map_field()} | {update, map_field(), embedded_type_op()}.
-type map_op() :: simple_map_op() | {update, [simple_map_op()]}.
-type embedded_type_op() :: counter_op() | set_op() | register_op() | flag_op() | map_op().
-type toplevel_op() :: counter_op() | set_op() | map_op() | hll_op().
-type toplevel_op() :: counter_op() | gset_op() | set_op() | map_op() | hll_op().
-type update() :: {toplevel_type(), toplevel_op(), context()}.

%% Request options
Expand Down Expand Up @@ -175,6 +180,7 @@ decode_type(PBType, Mods) ->
decode_type('COUNTER') -> counter;
decode_type('SET') -> set;
decode_type('HLL') -> hll;
decode_type('GSET') -> gset;
decode_type('REGISTER') -> register;
decode_type('FLAG') -> flag;
decode_type('MAP') -> map.
Expand All @@ -195,6 +201,7 @@ encode_type(TypeOrMod, Mods) ->
encode_type(counter) -> 'COUNTER';
encode_type(set) -> 'SET';
encode_type(hll) -> 'HLL';
encode_type(gset) -> 'GSET';
encode_type(register) -> 'REGISTER';
encode_type(flag) -> 'FLAG';
encode_type(map) -> 'MAP'.
Expand Down Expand Up @@ -263,6 +270,9 @@ decode_fetch_response(#dtfetchresp{context=Context, type='SET',
decode_fetch_response(#dtfetchresp{context=Context, type='HLL',
value=#dtvalue{hll_value=Val}}) ->
{hll, Val, Context};
decode_fetch_response(#dtfetchresp{context=Context, type='GSET',
value=#dtvalue{gset_value=Val}}) ->
{gset, Val, Context};
decode_fetch_response(#dtfetchresp{context=Context, type='MAP',
value=#dtvalue{map_value=Val}}) ->
{map, [ decode_map_entry(Entry) || Entry <- Val ], Context}.
Expand All @@ -286,6 +296,8 @@ encode_fetch_response(Type, Value, Context, Mods) ->
Response#dtfetchresp{value=#dtvalue{set_value=Value}};
hll ->
Response#dtfetchresp{value=#dtvalue{hll_value=Value}};
gset ->
Response#dtfetchresp{value=#dtvalue{gset_value=Value}};
map ->
Response#dtfetchresp{value=#dtvalue{map_value=[encode_map_entry(Entry, Mods) || Entry <- Value]}}
end.
Expand Down Expand Up @@ -341,6 +353,25 @@ encode_set_update({remove_all, Members}, #setop{removes=R}=S) when is_list(Membe
S#setop{removes=Members++R}.


%% @doc Decodes a GSetOp message into a gset operation.
-spec decode_gset_op(#setop{}) -> gset_op().
decode_gset_op(#gsetop{adds=A}) ->
{add_all, A}.

%% @doc Encodes a set operation into a SetOp message.
-spec encode_gset_op(gset_op()) -> #gsetop{}.
encode_gset_op({update, Ops}) when is_list(Ops) ->
lists:foldr(fun encode_gset_update/2, #gsetop{}, Ops);
encode_gset_op({C, _}=Op) when add == C; add_all == C ->
encode_gset_op({update, [Op]}).

%% @doc Folds a set update into the SetOp message.
-spec encode_gset_update(simple_gset_op(), #gsetop{}) -> #gsetop{}.
encode_gset_update({add, Member}, #gsetop{adds=A}=S) when is_binary(Member) ->
S#gsetop{adds=[Member|A]};
encode_gset_update({add_all, Members}, #gsetop{adds=A}=S) when is_list(Members) ->
S#gsetop{adds=Members++A}.

%% @doc Decodes a operation name from a PB message into an atom.
-spec decode_flag_op(atom()) -> atom().

Expand Down Expand Up @@ -440,6 +471,8 @@ decode_operation(#dtop{set_op=#setop{}=Op}, _) ->
decode_set_op(Op);
decode_operation(#dtop{hll_op=#hllop{}=Op}, _) ->
decode_hll_op(Op);
decode_operation(#dtop{gset_op=#gsetop{}=Op}, _) ->
decode_gset_op(Op);
decode_operation(#dtop{map_op=#mapop{}=Op}, Mods) ->
decode_map_op(Op, Mods).

Expand All @@ -451,6 +484,8 @@ encode_operation(Op, set) ->
#dtop{set_op=encode_set_op(Op)};
encode_operation(Op, hll) ->
#dtop{hll_op=encode_hll_op(Op)};
encode_operation(Op, gset) ->
#dtop{gset_op=encode_gset_op(Op)};
encode_operation(Op, map) ->
#dtop{map_op=encode_map_op(Op)}.

Expand All @@ -463,6 +498,8 @@ operation_type(#dtop{set_op=#setop{}}) ->
set;
operation_type(#dtop{hll_op=#hllop{}}) ->
hll;
operation_type(#dtop{gset_op=#gsetop{}}) ->
gset;
operation_type(#dtop{map_op=#mapop{}}) ->
map.

Expand Down
78 changes: 78 additions & 0 deletions test/riak_pb_dt_codec_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
%% -------------------------------------------------------------------
%%
%% test cases for riak_pb_dt_codec: Protocol Buffers utility functions for Riak DT types
%%
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(riak_pb_dt_codec_tests).

-include_lib("eunit/include/eunit.hrl").

-include("riak_dt_pb.hrl").

-import(riak_pb_dt_codec, [decode_operation/1,
operation_type/1,
decode_fetch_response/1,
encode_fetch_response/4,
encode_update_request/4,
decode_update_response/3
]).

-define(CONTEXT, undefined_context).
-define(SET_VALUE, [<<"binarytemple">>]).

operation_type_gset_test() ->
OpType = operation_type(#dtop{gset_op = #gsetop{}}),
?assertEqual(OpType, gset).

decode_operation_gset_test() ->
Op = #dtop{gset_op = #gsetop{adds = ?SET_VALUE}},
OpDecode = decode_operation(Op),
?assertEqual(OpDecode, {add_all, ?SET_VALUE}).

decode_fetch_response_gset_test() ->
Res = decode_fetch_response(#dtfetchresp{context = ?CONTEXT, type = 'GSET', value = #dtvalue{gset_value = ?SET_VALUE}}),
?assertEqual({gset, ?SET_VALUE, ?CONTEXT}, Res).

decode_update_response_test() ->
Res = decode_update_response(
#dtupdateresp{set_value = ?SET_VALUE, context = ?CONTEXT}, set, true
),
?assertEqual({set, ?SET_VALUE, undefined_context}, Res).

encode_fetch_response_gset_test() ->
Resp = encode_fetch_response(gset, ?SET_VALUE, ?CONTEXT, []),
?assertMatch(#dtfetchresp{context= ?CONTEXT, type= 'GSET', value= #dtvalue{gset_value= ?SET_VALUE}}, Resp).

encode_update_request_gset_test() ->
Res = encode_update_request(
{<<"btype">>, <<"bucket">>},
<<"key">>,
{gset, {update, [{add_all, ?SET_VALUE}]}, ?CONTEXT},
[]
),
?assertMatch(#dtupdatereq{
bucket = <<"bucket">>,
type = <<"btype">>,
key = <<"key">>,
op = #dtop{
gset_op = #gsetop{adds = ?SET_VALUE}
}
}, Res),
ok
.

0 comments on commit 3f65a02

Please sign in to comment.