From f172a07383d0b0b1935ced0d92297aae1fdb7c38 Mon Sep 17 00:00:00 2001 From: Bryan Hunt Date: Tue, 12 Jul 2016 16:45:29 +0100 Subject: [PATCH] add gsets support --- src/riak_dt.proto | 16 ++++++- src/riak_pb_dt_codec.erl | 43 ++++++++++++++++-- test/riak_pb_dt_codec_tests.erl | 78 +++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 test/riak_pb_dt_codec_tests.erl diff --git a/src/riak_dt.proto b/src/riak_dt.proto index 06f19fba..f5e48164 100644 --- a/src/riak_dt.proto +++ b/src/riak_dt.proto @@ -107,11 +107,11 @@ message DtValue { optional sint64 counter_value = 1; repeated bytes set_value = 2; repeated MapEntry map_value = 3; - /* We return an estimated cardinality of the Hyperloglog set * on fetch. */ optional uint64 hll_value = 4; + repeated bytes gset_value = 5; } @@ -130,6 +130,7 @@ message DtFetchResp { SET = 2; MAP = 3; HLL = 4; + GSET = 5; } optional bytes context = 1; @@ -160,6 +161,15 @@ message SetOp { repeated bytes removes = 2; } +/* + * An operation to update a GSet, on its own. + * GSet members are opaque binary values, you can only add + * them to a Set. + */ +message GSetOp { + repeated bytes adds = 1; +} + /* * An operation to update a Hyperloglog Set, a top-level DT. * You can only add to a HllSet. @@ -195,6 +205,7 @@ message MapUpdate { optional bytes register_op = 4; optional FlagOp flag_op = 5; optional MapOp map_op = 6; + } /* @@ -219,11 +230,11 @@ message DtOp { optional CounterOp counter_op = 1; optional SetOp set_op = 2; optional MapOp map_op = 3; - /* Adding values to a hyperloglog (set) is just like adding values * to a set. */ optional HllOp hll_op = 4; + optional GSetOp gset_op = 5; } /* @@ -271,4 +282,5 @@ message DtUpdateResp { repeated bytes set_value = 4; repeated MapEntry map_value = 5; optional uint64 hll_value = 6; + repeated bytes gset_value = 7; } diff --git a/src/riak_pb_dt_codec.erl b/src/riak_pb_dt_codec.erl index 444a30f2..06dfbbdb 100644 --- a/src/riak_pb_dt_codec.erl +++ b/src/riak_pb_dt_codec.erl @@ -23,6 +23,8 @@ -include("riak_dt_pb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + -export([ encode_fetch_request/2, encode_fetch_request/3, @@ -49,6 +51,7 @@ -type counter_value() :: integer(). -type set_value() :: [ binary() ]. -type hll_value() :: number(). +-type gset_value() :: [ binary() ]. -type register_value() :: binary(). -type flag_value() :: boolean(). -type map_entry() :: {map_field(), embedded_value()}. @@ -56,13 +59,13 @@ -type map_value() :: [ map_entry() ]. -type embedded_value() :: counter_value() | set_value() | register_value() | flag_value() | map_value(). --type toplevel_value() :: counter_value() | set_value() | map_value() +-type toplevel_value() :: counter_value() | gset_value() | set_value() | map_value() | hll_value() | undefined. -type fetch_response() :: {toplevel_type(), toplevel_value(), context()}. %% Type names as atoms -type embedded_type() :: counter | set | register | flag | map. --type toplevel_type() :: counter | set | map | hll. +-type toplevel_type() :: counter | gset | set | map | hll. -type all_type() :: toplevel_type() | embedded_type(). %% Operations @@ -70,12 +73,14 @@ -type simple_set_op() :: {add, binary()} | {remove, binary()} | {add_all, [binary()]} | {remove_all, [binary()]}. -type set_op() :: simple_set_op() | {update, [simple_set_op()]}. -type hll_op() :: {add, binary()} | {add_all, [binary()]}. +-type simple_gset_op() :: {add, binary()} | {add_all, [binary()]}. +-type gset_op() :: simple_gset_op(). -type flag_op() :: enable | disable. -type register_op() :: {assign, binary()}. -type simple_map_op() :: {remove, map_field()} | {update, map_field(), embedded_type_op()}. -type map_op() :: simple_map_op() | {update, [simple_map_op()]}. -type embedded_type_op() :: counter_op() | set_op() | register_op() | flag_op() | map_op(). --type toplevel_op() :: counter_op() | set_op() | map_op() | hll_op(). +-type toplevel_op() :: counter_op() | gset_op() | set_op() | map_op() | hll_op(). -type update() :: {toplevel_type(), toplevel_op(), context()}. %% Request options @@ -175,6 +180,7 @@ decode_type(PBType, Mods) -> decode_type('COUNTER') -> counter; decode_type('SET') -> set; decode_type('HLL') -> hll; +decode_type('GSET') -> gset; decode_type('REGISTER') -> register; decode_type('FLAG') -> flag; decode_type('MAP') -> map. @@ -195,6 +201,7 @@ encode_type(TypeOrMod, Mods) -> encode_type(counter) -> 'COUNTER'; encode_type(set) -> 'SET'; encode_type(hll) -> 'HLL'; +encode_type(gset) -> 'GSET'; encode_type(register) -> 'REGISTER'; encode_type(flag) -> 'FLAG'; encode_type(map) -> 'MAP'. @@ -263,6 +270,9 @@ decode_fetch_response(#dtfetchresp{context=Context, type='SET', decode_fetch_response(#dtfetchresp{context=Context, type='HLL', value=#dtvalue{hll_value=Val}}) -> {hll, Val, Context}; +decode_fetch_response(#dtfetchresp{context=Context, type='GSET', + value=#dtvalue{gset_value=Val}}) -> + {gset, Val, Context}; decode_fetch_response(#dtfetchresp{context=Context, type='MAP', value=#dtvalue{map_value=Val}}) -> {map, [ decode_map_entry(Entry) || Entry <- Val ], Context}. @@ -286,6 +296,8 @@ encode_fetch_response(Type, Value, Context, Mods) -> Response#dtfetchresp{value=#dtvalue{set_value=Value}}; hll -> Response#dtfetchresp{value=#dtvalue{hll_value=Value}}; + gset -> + Response#dtfetchresp{value=#dtvalue{gset_value=Value}}; map -> Response#dtfetchresp{value=#dtvalue{map_value=[encode_map_entry(Entry, Mods) || Entry <- Value]}} end. @@ -341,6 +353,25 @@ encode_set_update({remove_all, Members}, #setop{removes=R}=S) when is_list(Membe S#setop{removes=Members++R}. +%% @doc Decodes a GSetOp message into a gset operation. +-spec decode_gset_op(#setop{}) -> gset_op(). +decode_gset_op(#gsetop{adds=A}) -> + {add_all, A}. + +%% @doc Encodes a set operation into a SetOp message. +-spec encode_gset_op(gset_op()) -> #gsetop{}. +encode_gset_op({update, Ops}) when is_list(Ops) -> + lists:foldr(fun encode_gset_update/2, #gsetop{}, Ops); +encode_gset_op({C, _}=Op) when add == C; add_all == C -> + encode_gset_op({update, [Op]}). + +%% @doc Folds a set update into the SetOp message. +-spec encode_gset_update(simple_gset_op(), #gsetop{}) -> #gsetop{}. +encode_gset_update({add, Member}, #gsetop{adds=A}=S) when is_binary(Member) -> + S#gsetop{adds=[Member|A]}; +encode_gset_update({add_all, Members}, #gsetop{adds=A}=S) when is_list(Members) -> + S#gsetop{adds=Members++A}. + %% @doc Decodes a operation name from a PB message into an atom. -spec decode_flag_op(atom()) -> atom(). @@ -440,6 +471,8 @@ decode_operation(#dtop{set_op=#setop{}=Op}, _) -> decode_set_op(Op); decode_operation(#dtop{hll_op=#hllop{}=Op}, _) -> decode_hll_op(Op); +decode_operation(#dtop{gset_op=#gsetop{}=Op}, _) -> + decode_gset_op(Op); decode_operation(#dtop{map_op=#mapop{}=Op}, Mods) -> decode_map_op(Op, Mods). @@ -451,6 +484,8 @@ encode_operation(Op, set) -> #dtop{set_op=encode_set_op(Op)}; encode_operation(Op, hll) -> #dtop{hll_op=encode_hll_op(Op)}; +encode_operation(Op, gset) -> + #dtop{gset_op=encode_gset_op(Op)}; encode_operation(Op, map) -> #dtop{map_op=encode_map_op(Op)}. @@ -463,6 +498,8 @@ operation_type(#dtop{set_op=#setop{}}) -> set; operation_type(#dtop{hll_op=#hllop{}}) -> hll; +operation_type(#dtop{gset_op=#gsetop{}}) -> + gset; operation_type(#dtop{map_op=#mapop{}}) -> map. diff --git a/test/riak_pb_dt_codec_tests.erl b/test/riak_pb_dt_codec_tests.erl new file mode 100644 index 00000000..26ff2552 --- /dev/null +++ b/test/riak_pb_dt_codec_tests.erl @@ -0,0 +1,78 @@ +%% ------------------------------------------------------------------- +%% +%% test cases for riak_pb_dt_codec: Protocol Buffers utility functions for Riak DT types +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_pb_dt_codec_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-include("riak_dt_pb.hrl"). + +-import(riak_pb_dt_codec, [decode_operation/1, +operation_type/1, +decode_fetch_response/1, +encode_fetch_response/4, +encode_update_request/4, +decode_update_response/3 +]). + +-define(CONTEXT, undefined_context). +-define(SET_VALUE, [<<"binarytemple">>]). + +operation_type_gset_test() -> + OpType = operation_type(#dtop{gset_op = #gsetop{}}), + ?assertEqual(OpType, gset). + +decode_operation_gset_test() -> + Op = #dtop{gset_op = #gsetop{adds = ?SET_VALUE}}, + OpDecode = decode_operation(Op), + ?assertEqual(OpDecode, {add_all, ?SET_VALUE}). + +decode_fetch_response_gset_test() -> + Res = decode_fetch_response(#dtfetchresp{context = ?CONTEXT, type = 'GSET', value = #dtvalue{gset_value = ?SET_VALUE}}), + ?assertEqual({gset, ?SET_VALUE, ?CONTEXT}, Res). + +decode_update_response_test() -> + Res = decode_update_response( + #dtupdateresp{set_value = ?SET_VALUE, context = ?CONTEXT}, set, true + ), + ?assertEqual({set, ?SET_VALUE, undefined_context}, Res). + +encode_fetch_response_gset_test() -> + Resp = encode_fetch_response(gset, ?SET_VALUE, ?CONTEXT, []), + ?assertMatch(#dtfetchresp{context= ?CONTEXT, type= 'GSET', value= #dtvalue{gset_value= ?SET_VALUE}}, Resp). + +encode_update_request_gset_test() -> + Res = encode_update_request( + {<<"btype">>, <<"bucket">>}, + <<"key">>, + {gset, {update, [{add_all, ?SET_VALUE}]}, ?CONTEXT}, + [] + ), + ?assertMatch(#dtupdatereq{ + bucket = <<"bucket">>, + type = <<"btype">>, + key = <<"key">>, + op = #dtop{ + gset_op = #gsetop{adds = ?SET_VALUE} + } + }, Res), + ok +.