From 6f1036ee5af22cd3b566ce5e471f834040c8b829 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 22 Jan 2016 10:01:00 +0100 Subject: [PATCH 01/23] first version of antidote_db with eleveldb as backend --- rebar.config | 3 ++ src/antidote_db.app.src | 13 +++++++ src/antidote_db.erl | 79 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) create mode 100644 rebar.config create mode 100644 src/antidote_db.app.src create mode 100644 src/antidote_db.erl diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..2ee9798 --- /dev/null +++ b/rebar.config @@ -0,0 +1,3 @@ +{deps, [ + {eleveldb, ".*", {git, "git://github.com/SyncFree/eleveldb", {branch, "antidote_comparator"}}} +]}. diff --git a/src/antidote_db.app.src b/src/antidote_db.app.src new file mode 100644 index 0000000..8d0eb98 --- /dev/null +++ b/src/antidote_db.app.src @@ -0,0 +1,13 @@ +{application, antidote_db, + [ + {description, "Antidote DataBase application"}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + eleveldb + ]}, + {mod, { antidote_db, []}}, + {env, []} + ]}. diff --git a/src/antidote_db.erl b/src/antidote_db.erl new file mode 100644 index 0000000..3584c91 --- /dev/null +++ b/src/antidote_db.erl @@ -0,0 +1,79 @@ +-module(antidote_db). + +-export([ + new/1, + get/2, + put/3, + close_and_destroy/2, + close/1, + fold/3, + fold_keys/3, + is_empty/1, + delete/2, + repair/1]). + +-type antidote_db() :: eleveldb:db_ref(). + +-export_type([antidote_db/0]). + +%% Given a name, returns a new AntidoteDB (ElevelDB) +%% OpenOptions are set to use Antidote special comparator +-spec new(atom()) -> {ok, antidote_db()} | {error, any()}. +new(Name) -> + eleveldb:open(Name, [{create_if_missing, true}, {antidote, true}]). + +%% Closes and destroys the given base +-spec close_and_destroy(antidote_db(), atom()) -> ok | {error, any()}. +close_and_destroy(AntidoteDB, Name) -> + eleveldb:close(AntidoteDB), + eleveldb:destroy(Name, []). + +-spec close(antidote_db()) -> ok | {error, any()}. +close(AntidoteDB) -> + eleveldb:close(AntidoteDB). + +%% @doc returns the value of Key, in the antidote_db +-spec get(antidote_db(), atom() | binary()) -> term() | not_found. +get(AntidoteDB, Key) -> + AKey = case is_binary(Key) of + true -> Key; + false -> atom_to_binary(Key, utf8) + end, + case eleveldb:get(AntidoteDB, AKey, []) of + {ok, Res} -> + binary_to_term(Res); + not_found -> + not_found + end. + +%% @doc puts the Value associated to Key in eleveldb AntidoteDB +-spec put(antidote_db(), atom() | binary(), any()) -> ok | {error, any()}. +put(AntidoteDB, Key, Value) -> + AKey = case is_binary(Key) of + true -> Key; + false -> atom_to_binary(Key, utf8) + end, + ATerm = case is_binary(Value) of + true -> Value; + false -> term_to_binary(Value) + end, + eleveldb:put(AntidoteDB, AKey, ATerm, []). + +-spec fold(antidote_db(), eleveldb:fold_fun(), any()) -> any(). +fold(AntidoteDB, Fun, Acc0) -> + eleveldb:fold(AntidoteDB, Fun, Acc0, []). + +-spec fold_keys(antidote_db(), eleveldb:fold_keys_fun(), any()) -> any(). +fold_keys(AntidoteDB, Fun, Acc0) -> + eleveldb:fold_keys(AntidoteDB, Fun, Acc0, []). + +-spec is_empty(antidote_db()) -> boolean(). +is_empty(AntidoteDB) -> + eleveldb:is_empty(AntidoteDB). + +repair(Name) -> + eleveldb:repair(Name, []). + +-spec delete(antidote_db(), binary()) -> ok | {error, any()}. +delete(AntidoteDB, Key) -> + eleveldb:delete(AntidoteDB, Key, []). From 2e3fd2612245bf1ccf70b1614119e989470abe16 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 22 Jan 2016 13:02:43 +0100 Subject: [PATCH 02/23] added options to fold methods --- src/antidote_db.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index 3584c91..7164c00 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -59,13 +59,13 @@ put(AntidoteDB, Key, Value) -> end, eleveldb:put(AntidoteDB, AKey, ATerm, []). --spec fold(antidote_db(), eleveldb:fold_fun(), any()) -> any(). -fold(AntidoteDB, Fun, Acc0) -> - eleveldb:fold(AntidoteDB, Fun, Acc0, []). +-spec fold(antidote_db(), eleveldb:fold_fun(), any(), eleveldb:read_options()) -> any(). +fold(AntidoteDB, Fun, Acc0, Opts) -> + eleveldb:fold(AntidoteDB, Fun, Acc0, Opts). --spec fold_keys(antidote_db(), eleveldb:fold_keys_fun(), any()) -> any(). -fold_keys(AntidoteDB, Fun, Acc0) -> - eleveldb:fold_keys(AntidoteDB, Fun, Acc0, []). +-spec fold_keys(antidote_db(), eleveldb:fold_keys_fun(), any(), eleveldb:read_options()) -> any(). +fold_keys(AntidoteDB, Fun, Acc0, Opts) -> + eleveldb:fold_keys(AntidoteDB, Fun, Acc0, Opts). -spec is_empty(antidote_db()) -> boolean(). is_empty(AntidoteDB) -> From deac50b8afc1ac47637a18fdab477f6c3a382d3e Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 22 Jan 2016 13:25:02 +0100 Subject: [PATCH 03/23] fix esport parameters for fold methods --- src/antidote_db.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index 7164c00..9cf5677 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -6,8 +6,8 @@ put/3, close_and_destroy/2, close/1, - fold/3, - fold_keys/3, + fold/4, + fold_keys/4, is_empty/1, delete/2, repair/1]). From e37bad9a1f34e39cce703b41b9e9e947b8a7d47e Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 22 Jan 2016 19:13:57 +0100 Subject: [PATCH 04/23] antidote db sup first version --- src/antidote_db_sup.erl | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 src/antidote_db_sup.erl diff --git a/src/antidote_db_sup.erl b/src/antidote_db_sup.erl new file mode 100644 index 0000000..0e86e87 --- /dev/null +++ b/src/antidote_db_sup.erl @@ -0,0 +1,26 @@ +-module(antidote_db_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. From 1f4c0e2f13be04ac1ec8ebaf1d1475ec15848834 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 22 Jan 2016 19:26:31 +0100 Subject: [PATCH 05/23] fix bug while passing keys to binary --- src/antidote_db.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index 9cf5677..df12f17 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -33,11 +33,11 @@ close(AntidoteDB) -> eleveldb:close(AntidoteDB). %% @doc returns the value of Key, in the antidote_db --spec get(antidote_db(), atom() | binary()) -> term() | not_found. +-spec get(antidote_db(), any()) -> term() | not_found. get(AntidoteDB, Key) -> AKey = case is_binary(Key) of true -> Key; - false -> atom_to_binary(Key, utf8) + false -> term_to_binary(Key) end, case eleveldb:get(AntidoteDB, AKey, []) of {ok, Res} -> @@ -47,11 +47,11 @@ get(AntidoteDB, Key) -> end. %% @doc puts the Value associated to Key in eleveldb AntidoteDB --spec put(antidote_db(), atom() | binary(), any()) -> ok | {error, any()}. +-spec put(antidote_db(), any(), any()) -> ok | {error, any()}. put(AntidoteDB, Key, Value) -> AKey = case is_binary(Key) of true -> Key; - false -> atom_to_binary(Key, utf8) + false -> term_to_binary(Key) end, ATerm = case is_binary(Value) of true -> Value; From 1336486576ad8eb7bda3717784ef143711ff131f Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 22 Aug 2016 16:36:35 -0300 Subject: [PATCH 06/23] Added antidote_db_wrapper, originally on the main Antidote repo. First attempt of using antidote_utils, not working 100% yet. --- rebar.config | 3 +- src/antidote_db_wrapper.erl | 325 ++++++++++++++++++++++++++++++++++++ 2 files changed, 327 insertions(+), 1 deletion(-) create mode 100644 src/antidote_db_wrapper.erl diff --git a/rebar.config b/rebar.config index 2ee9798..7833a89 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,4 @@ {deps, [ - {eleveldb, ".*", {git, "git://github.com/SyncFree/eleveldb", {branch, "antidote_comparator"}}} + {eleveldb, ".*", {git, "git://github.com/SyncFree/eleveldb", {branch, "antidote_comparator"}}}, + {antidote_utils, ".*", {git, "git://github.com/SyncFree/antidote_utils", {branch, "master"}}} ]}. diff --git a/src/antidote_db_wrapper.erl b/src/antidote_db_wrapper.erl new file mode 100644 index 0000000..73338ea --- /dev/null +++ b/src/antidote_db_wrapper.erl @@ -0,0 +1,325 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(antidote_db_wrapper). + +-include_lib("antidote_utils/include/antidote_utils.hrl"). + +-export([get_snapshot/3, + put_snapshot/4, + get_ops/4, + put_op/4]). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% Gets the most suitable snapshot for Key that has been committed +%% before CommitTime. If its nothing is found, returns {error, not_found} +-spec get_snapshot(antidote_db:antidote_db(), key(), + snapshot_time()) -> {ok, snapshot(), snapshot_time()} | {error, not_found}. +get_snapshot(AntidoteDB, Key, CommitTime) -> + try + antidote_db:fold(AntidoteDB, + fun({K, V}, AccIn) -> + {Key1, VC, SNAP} = binary_to_term(K), + case (Key1 == Key) of %% check same key + true -> + %% check its a snapshot and its time is less than the one required + case (SNAP == snap) and + vectorclock:le(vectorclock:from_list(VC), CommitTime) of + true -> + Snapshot = binary_to_term(V), + throw({break, Snapshot, VC}); + _ -> + AccIn + end; + false -> + throw({break}) + end + end, + [], + [{first_key, term_to_binary({Key})}]), + {error, not_found} + catch + {break, SNAP, VC} -> + {ok, SNAP, VC}; + _ -> + {error, not_found} + end. + +%% Saves the snapshot into AntidoteDB +-spec put_snapshot(antidote_db:antidote_db(), key(), snapshot_time(), + snapshot()) -> ok | error. +put_snapshot(AntidoteDB, Key, SnapshotTime, Snapshot) -> + SnapshotTimeList = vectorclock_to_sorted_list(SnapshotTime), + antidote_db:put(AntidoteDB, {binary_to_atom(Key), SnapshotTimeList, snap}, Snapshot). + +%% Returns a list of operations that have commit time in the range [VCFrom, VCTo] +-spec get_ops(antidote_db:antidote_db(), key(), vectorclock(), vectorclock()) -> list(). +get_ops(AntidoteDB, Key, VCFrom, VCTo) -> + VCFromDict = vectorclock_to_dict(VCFrom), + VCToDict = vectorclock_to_dict(VCTo), + try + antidote_db:fold(AntidoteDB, + fun({K, V}, AccIn) -> + {Key1, VC1, OP} = binary_to_term(K), + VC1Dict = vectorclock:from_list(VC1), + case Key == Key1 of %% check same key + true -> + %% if its greater, continue + case vectorclock:gt(VC1Dict, VCToDict) of + true -> + AccIn; + false -> + %% check its an op and its commit time is in the required range + case not vectorclock:lt(VC1Dict, VCFromDict) of + true -> + case (OP == op) of + true -> + AccIn ++ [binary_to_term(V)]; + false -> + AccIn + end; + false -> + throw({break, AccIn}) + end + end; + false -> + throw({break, AccIn}) + end + end, + [], + [{first_key, term_to_binary({Key})}]) + catch + {break, OPS} -> + OPS; + _ -> + [] + end. + +%% Saves the operation into AntidoteDB +-spec put_op(antidote_db:antidote_db(), key(), vectorclock(), operation()) -> ok | error. +put_op(AntidoteDB, Key, VC, Op) -> + VCList = vectorclock_to_sorted_list(VC), + antidote_db:put(AntidoteDB, {binary_to_atom(Key), VCList, op}, Op). + +vectorclock_to_dict(VC) -> + case is_list(VC) of + true -> vectorclock:from_list(VC); + false -> VC + end. + +%% Sort the resulting list, for easier comparison and parsing +vectorclock_to_sorted_list(VC) -> + case is_list(VC) of + true -> lists:sort(VC); + false -> lists:sort(vectorclock:to_list(VC)) + end. + +%% Workaround for basho bench +%% TODO find a better solution to this +binary_to_atom(Key) -> + case is_binary(Key) of + true -> list_to_atom(integer_to_list(binary_to_integer(Key))); + false -> Key + end. + +-ifdef(TEST). + +%% This test ensures vectorclock_to_list method +%% sorts VCs the correct way +vectorclock_to_sorted_list_test() -> + Sorted = vectorclock_to_sorted_list([{e, 5}, {c, 3}, {a, 1}, {b, 2}, {d, 4}]), + ?assertEqual([{a, 1}, {b, 2}, {c, 3}, {d, 4}, {e, 5}], Sorted). + +get_snapshot_not_found_test() -> + eleveldb:destroy("get_snapshot_not_found_test", []), + {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_found_test"), + + Key = key, + Key1 = key1, + Key2 = key2, + %% No snapshot in the DB + NotFound = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), + ?assertEqual({error, not_found}, NotFound), + + %% Put 10 snapshots for Key and check there is no snapshot with time 0 in both DCs + put_n_snapshots(AntidoteDB, Key, 10), + NotFound1 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), + ?assertEqual({error, not_found}, NotFound1), + + %% Look for a snapshot for Key1 + S1 = get_snapshot(AntidoteDB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), + ?assertEqual({error, not_found}, S1), + + %% Put snapshots for Key2 and look for a snapshot for Key1 + put_n_snapshots(AntidoteDB, Key2, 10), + S2 = get_snapshot(AntidoteDB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), + ?assertEqual({error, not_found}, S2), + + antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_not_found_test"). + +get_snapshot_matching_vc_test() -> + eleveldb:destroy("get_snapshot_matching_vc_test", []), + {ok, AntidoteDB} = antidote_db:new("get_snapshot_matching_vc_test"), + + Key = key, + put_n_snapshots(AntidoteDB, Key, 10), + + %% get some of the snapshots inserted (matches VC) + S1 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), + S2 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), + S3 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), + ?assertEqual({ok, 1, [{local, 1}, {remote, 1}]}, S1), + ?assertEqual({ok, 4, [{local, 4}, {remote, 4}]}, S2), + ?assertEqual({ok, 8, [{local, 8}, {remote, 8}]}, S3), + + antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_matching_vc_test"). + + +get_snapshot_not_matching_vc_test() -> + eleveldb:destroy("get_snapshot_not_matching_vc_test", []), + {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_matching_vc_test"), + + Key = key, + put_n_snapshots(AntidoteDB, Key, 10), + + %% get snapshots with different times in their DCs + S4 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), + S5 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 5}, {remote, 4}])), + S6 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + ?assertEqual({error, not_found}, S4), + ?assertEqual({ok, 4, [{local, 4}, {remote, 4}]}, S5), + ?assertEqual({ok, 8, [{local, 8}, {remote, 8}]}, S6), + + antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_not_matching_vc_test"). + +get_operations_empty_result_test() -> + eleveldb:destroy("get_operations_not_found_test", []), + {ok, AntidoteDB} = antidote_db:new("get_operations_not_found_test"), + Key = key, + Key1 = key1, + %% Nothing in the DB yet return empty list + O1 = get_ops(AntidoteDB, Key, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + ?assertEqual([], O1), + + put_n_operations(AntidoteDB, Key, 10), + %% Getting something out of range returns an empty list + O2 = get_ops(AntidoteDB, Key, [{local, 123}, {remote, 100}], [{local, 200}, {remote, 124}]), + ?assertEqual([], O2), + + %% Getting a key not present, returns an empty list + O3 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 4}, {remote, 5}]), + ?assertEqual([], O3), + + %% Searching for the same range returns an empty list + O4 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 2}, {remote, 2}]), + ?assertEqual([], O4), + + antidote_db:close_and_destroy(AntidoteDB, "get_operations_not_found_test"). + + +get_operations_non_empty_test() -> + eleveldb:destroy("get_operations_non_empty_test", []), + {ok, AntidoteDB} = antidote_db:new("get_operations_non_empty_test"), + + %% Fill the DB with values + Key = key, + Key1 = key1, + Key2 = key2, + put_n_operations(AntidoteDB, Key, 100), + put_n_operations(AntidoteDB, Key1, 10), + put_n_operations(AntidoteDB, Key2, 25), + + %% concurrent operations are present in the result + O1 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + O2 = get_ops(AntidoteDB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), + ?assertEqual([9, 8, 7, 6, 5, 4, 3, 2], O1), + ?assertEqual([7, 6, 5, 4], O2), + + antidote_db:close_and_destroy(AntidoteDB, "get_operations_non_empty_test"). + +operations_and_snapshots_mixed_test() -> + eleveldb:destroy("operations_and_snapshots_mixed_test", []), + {ok, AntidoteDB} = antidote_db:new("operations_and_snapshots_mixed_test"), + + Key = key, + Key1 = key1, + Key2 = key2, + VCTo = [{local, 7}, {remote, 8}], + put_n_operations(AntidoteDB, Key, 10), + put_n_operations(AntidoteDB, Key1, 20), + put_snapshot(AntidoteDB, Key1, [{local, 2}, {remote, 3}], 5), + put_n_operations(AntidoteDB, Key2, 8), + + %% We want all ops for Key1 that are between the snapshot and + %% [{local, 7}, {remote, 8}]. First get the snapshot, then OPS. + {ok, Value, VCFrom} = get_snapshot(AntidoteDB, Key1, vectorclock:from_list(VCTo)), + ?assertEqual({ok, 5, [{local, 2}, {remote, 3}]}, {ok, Value, VCFrom}), + + O1 = get_ops(AntidoteDB, Key1, VCFrom, VCTo), + ?assertEqual([8, 7, 6, 5, 4, 3, 2], O1), + + antidote_db:close_and_destroy(AntidoteDB, "operations_and_snapshots_mixed_test"). + +%% This test is used to check that compare function for VCs is working OK +%% with VCs containing != lengths and values +length_of_vc_test() -> + eleveldb:destroy("length_of_vc_test", []), + {ok, AntidoteDB} = antidote_db:new("length_of_vc_test"), + + %% Same key, and same value for the local DC + %% OP2 should be newer than op1 since it contains 1 more DC in its VC + Key = key, + put_op(AntidoteDB, Key, [{local, 2}], 1), + put_op(AntidoteDB, Key, [{local, 2}, {remote, 3}], 2), + O1 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + ?assertEqual([2, 1], O1), + + %% Insert OP3, with no remote DC value and check it´s newer than 1 and 2 + put_op(AntidoteDB, Key, [{local, 3}], 3), + O2 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + ?assertEqual([3, 2, 1], O2), + + %% OP3 is still returned if the local value we look for is lower + %% This is the expected outcome for vectorclock gt and lt methods + O3 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), + ?assertEqual([3, 2, 1], O3), + + %% Insert remote operation not containing local clock and check is the oldest one + put_op(AntidoteDB, Key, [{remote, 1}], 4), + O4 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + ?assertEqual([3, 2, 1, 4], O4), + + antidote_db:close_and_destroy(AntidoteDB, "length_of_vc_test"). + +put_n_snapshots(_AntidoteDB, _Key, 0) -> + ok; +put_n_snapshots(AntidoteDB, Key, N) -> + put_snapshot(AntidoteDB, Key, [{local, N}, {remote, N}], N), + put_n_snapshots(AntidoteDB, Key, N - 1). + +put_n_operations(_AntidoteDB, _Key, 0) -> + ok; +put_n_operations(AntidoteDB, Key, N) -> + put_op(AntidoteDB, Key, [{local, N}, {remote, N}], N), + put_n_operations(AntidoteDB, Key, N - 1). + +-endif. From f8d7ba33cb23e6f67a7307a4d30bcc4a81770148 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Tue, 23 Aug 2016 19:26:21 -0300 Subject: [PATCH 07/23] Changed dependency for antidote utils branch --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 7833a89..bb1379a 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ {eleveldb, ".*", {git, "git://github.com/SyncFree/eleveldb", {branch, "antidote_comparator"}}}, - {antidote_utils, ".*", {git, "git://github.com/SyncFree/antidote_utils", {branch, "master"}}} + {antidote_utils, ".*", {git, "git://github.com/SyncFree/antidote_utils", {branch, "add_op_snap"}}} ]}. From eaf5db96aa75d2e382344416f463516b3b5ea5b5 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Wed, 24 Aug 2016 16:27:39 -0300 Subject: [PATCH 08/23] Fixed branch in rebar config for antidote utils dependency --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index bb1379a..e511595 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ {eleveldb, ".*", {git, "git://github.com/SyncFree/eleveldb", {branch, "antidote_comparator"}}}, - {antidote_utils, ".*", {git, "git://github.com/SyncFree/antidote_utils", {branch, "add_op_snap"}}} + {antidote_utils, ".*", {git, "git://github.com/SyncFree/antidote_utils", {branch, "additions_for_antidote_db"}}} ]}. From ca2eb9fed64a733adfa73a36a76614af19efb5d3 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 26 Aug 2016 15:01:59 -0300 Subject: [PATCH 09/23] Added the licence to missing files --- src/antidote_db.erl | 20 ++++++++++++++++++++ src/antidote_db_sup.erl | 19 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index df12f17..9cb9431 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -1,3 +1,23 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + -module(antidote_db). -export([ diff --git a/src/antidote_db_sup.erl b/src/antidote_db_sup.erl index 0e86e87..afb94da 100644 --- a/src/antidote_db_sup.erl +++ b/src/antidote_db_sup.erl @@ -1,3 +1,22 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 SyncFree Consortium. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- -module(antidote_db_sup). -behaviour(supervisor). From af40b72f592d6e1ac6c113a933531e7313106f3f Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 29 Aug 2016 11:39:03 -0300 Subject: [PATCH 10/23] Switched branch order in get_ops to make code more readable --- src/antidote_db_wrapper.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/antidote_db_wrapper.erl b/src/antidote_db_wrapper.erl index 73338ea..5c300d6 100644 --- a/src/antidote_db_wrapper.erl +++ b/src/antidote_db_wrapper.erl @@ -89,16 +89,16 @@ get_ops(AntidoteDB, Key, VCFrom, VCTo) -> AccIn; false -> %% check its an op and its commit time is in the required range - case not vectorclock:lt(VC1Dict, VCFromDict) of + case vectorclock:lt(VC1Dict, VCFromDict) of true -> + throw({break, AccIn}); + false -> case (OP == op) of true -> AccIn ++ [binary_to_term(V)]; false -> AccIn - end; - false -> - throw({break, AccIn}) + end end end; false -> From 7b87efd87a4512bdd9c75af0da51036ca90a50cf Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 29 Aug 2016 14:33:16 -0300 Subject: [PATCH 11/23] Fixed accumulation in the resulting list of the get_ops method to make it more efficient --- src/antidote_db_wrapper.erl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/antidote_db_wrapper.erl b/src/antidote_db_wrapper.erl index 5c300d6..afb8d02 100644 --- a/src/antidote_db_wrapper.erl +++ b/src/antidote_db_wrapper.erl @@ -77,7 +77,7 @@ get_ops(AntidoteDB, Key, VCFrom, VCTo) -> VCFromDict = vectorclock_to_dict(VCFrom), VCToDict = vectorclock_to_dict(VCTo), try - antidote_db:fold(AntidoteDB, + Res = antidote_db:fold(AntidoteDB, fun({K, V}, AccIn) -> {Key1, VC1, OP} = binary_to_term(K), VC1Dict = vectorclock:from_list(VC1), @@ -95,7 +95,7 @@ get_ops(AntidoteDB, Key, VCFrom, VCTo) -> false -> case (OP == op) of true -> - AccIn ++ [binary_to_term(V)]; + [binary_to_term(V) | AccIn]; false -> AccIn end @@ -106,10 +106,13 @@ get_ops(AntidoteDB, Key, VCFrom, VCTo) -> end end, [], - [{first_key, term_to_binary({Key})}]) + [{first_key, term_to_binary({Key})}]), + %% If the fold returned without throwing a break (it iterated all + %% keys and ended up normally) reverse the resulting list + lists:reverse(Res) catch {break, OPS} -> - OPS; + lists:reverse(OPS); _ -> [] end. From 03fbcdcfd192d744427fc66cc47e1748c1696112 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Wed, 7 Sep 2016 14:46:35 -0300 Subject: [PATCH 12/23] Updated the wrapper with to use the log_record provided in antidote_utils --- src/antidote_db_wrapper.erl | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/antidote_db_wrapper.erl b/src/antidote_db_wrapper.erl index afb8d02..c895c56 100644 --- a/src/antidote_db_wrapper.erl +++ b/src/antidote_db_wrapper.erl @@ -118,10 +118,10 @@ get_ops(AntidoteDB, Key, VCFrom, VCTo) -> end. %% Saves the operation into AntidoteDB --spec put_op(antidote_db:antidote_db(), key(), vectorclock(), operation()) -> ok | error. -put_op(AntidoteDB, Key, VC, Op) -> +-spec put_op(antidote_db:antidote_db(), key(), vectorclock(), #log_record{}) -> ok | error. +put_op(AntidoteDB, Key, VC, Record) -> VCList = vectorclock_to_sorted_list(VC), - antidote_db:put(AntidoteDB, {binary_to_atom(Key), VCList, op}, Op). + antidote_db:put(AntidoteDB, {binary_to_atom(Key), VCList, op}, Record). vectorclock_to_dict(VC) -> case is_list(VC) of @@ -254,8 +254,8 @@ get_operations_non_empty_test() -> %% concurrent operations are present in the result O1 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), O2 = get_ops(AntidoteDB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), - ?assertEqual([9, 8, 7, 6, 5, 4, 3, 2], O1), - ?assertEqual([7, 6, 5, 4], O2), + ?assertEqual([9, 8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), + ?assertEqual([7, 6, 5, 4], filter_records_into_numbers(O2)), antidote_db:close_and_destroy(AntidoteDB, "get_operations_non_empty_test"). @@ -278,7 +278,7 @@ operations_and_snapshots_mixed_test() -> ?assertEqual({ok, 5, [{local, 2}, {remote, 3}]}, {ok, Value, VCFrom}), O1 = get_ops(AntidoteDB, Key1, VCFrom, VCTo), - ?assertEqual([8, 7, 6, 5, 4, 3, 2], O1), + ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), antidote_db:close_and_destroy(AntidoteDB, "operations_and_snapshots_mixed_test"). @@ -291,25 +291,25 @@ length_of_vc_test() -> %% Same key, and same value for the local DC %% OP2 should be newer than op1 since it contains 1 more DC in its VC Key = key, - put_op(AntidoteDB, Key, [{local, 2}], 1), - put_op(AntidoteDB, Key, [{local, 2}, {remote, 3}], 2), - O1 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + put_op(AntidoteDB, Key, [{local, 2}], #log_record{version = 1}), + put_op(AntidoteDB, Key, [{local, 2}, {remote, 3}], #log_record{version = 2}), + O1 = filter_records_into_numbers(get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}])), ?assertEqual([2, 1], O1), %% Insert OP3, with no remote DC value and check it´s newer than 1 and 2 - put_op(AntidoteDB, Key, [{local, 3}], 3), + put_op(AntidoteDB, Key, [{local, 3}], #log_record{version = 3}), O2 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), - ?assertEqual([3, 2, 1], O2), + ?assertEqual([3, 2, 1], filter_records_into_numbers(O2)), %% OP3 is still returned if the local value we look for is lower %% This is the expected outcome for vectorclock gt and lt methods O3 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), - ?assertEqual([3, 2, 1], O3), + ?assertEqual([3, 2, 1], filter_records_into_numbers(O3)), %% Insert remote operation not containing local clock and check is the oldest one - put_op(AntidoteDB, Key, [{remote, 1}], 4), + put_op(AntidoteDB, Key, [{remote, 1}], #log_record{version = 4}), O4 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), - ?assertEqual([3, 2, 1, 4], O4), + ?assertEqual([3, 2, 1, 4], filter_records_into_numbers(O4)), antidote_db:close_and_destroy(AntidoteDB, "length_of_vc_test"). @@ -322,7 +322,13 @@ put_n_snapshots(AntidoteDB, Key, N) -> put_n_operations(_AntidoteDB, _Key, 0) -> ok; put_n_operations(AntidoteDB, Key, N) -> - put_op(AntidoteDB, Key, [{local, N}, {remote, N}], N), + %% For testing purposes, we use only the version in the record to identify + %% the different ops, since it's easier than reproducing the whole record + put_op(AntidoteDB, Key, [{local, N}, {remote, N}], + #log_record{version = N}), put_n_operations(AntidoteDB, Key, N - 1). +filter_records_into_numbers(List) -> + lists:foldr(fun(Record, Acum) -> [Record#log_record.version | Acum] end, [], List). + -endif. From 20148aba92e9083fefa3d01b79ad61598347d4d7 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Wed, 7 Sep 2016 15:56:41 -0300 Subject: [PATCH 13/23] Updated spec return type in get_ops --- src/antidote_db_wrapper.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/antidote_db_wrapper.erl b/src/antidote_db_wrapper.erl index c895c56..73bf479 100644 --- a/src/antidote_db_wrapper.erl +++ b/src/antidote_db_wrapper.erl @@ -72,7 +72,7 @@ put_snapshot(AntidoteDB, Key, SnapshotTime, Snapshot) -> antidote_db:put(AntidoteDB, {binary_to_atom(Key), SnapshotTimeList, snap}, Snapshot). %% Returns a list of operations that have commit time in the range [VCFrom, VCTo] --spec get_ops(antidote_db:antidote_db(), key(), vectorclock(), vectorclock()) -> list(). +-spec get_ops(antidote_db:antidote_db(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. get_ops(AntidoteDB, Key, VCFrom, VCTo) -> VCFromDict = vectorclock_to_dict(VCFrom), VCToDict = vectorclock_to_dict(VCTo), From e3bff37cf14393d678debd200a3becb215790e8c Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Tue, 20 Sep 2016 20:14:05 -0300 Subject: [PATCH 14/23] Renamed and refactored the antidote_db and the leveldb_wrapper to match the new architecture proposed, making it ready to support othr backends. --- src/antidote_db.erl | 146 +++++++++-------- ...ote_db_wrapper.erl => leveldb_wrapper.erl} | 150 ++++++++++-------- 2 files changed, 167 insertions(+), 129 deletions(-) rename src/{antidote_db_wrapper.erl => leveldb_wrapper.erl} (69%) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index 9cb9431..f0bc8fb 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -20,80 +20,98 @@ -module(antidote_db). +-include_lib("antidote_utils/include/antidote_utils.hrl"). + -export([ - new/1, - get/2, - put/3, + new/2, close_and_destroy/2, close/1, - fold/4, - fold_keys/4, - is_empty/1, - delete/2, - repair/1]). + get_snapshot/3, + put_snapshot/4, + get_ops/4, + put_op/4]). + +-type antidote_db() :: {leveldb, eleveldb:db_ref()}. --type antidote_db() :: eleveldb:db_ref(). +-type antidote_db_type() :: leveldb. --export_type([antidote_db/0]). +-export_type([antidote_db/0, antidote_db_type/0]). + +%% Given a name, returns a new AntidoteDB (for now, only ElevelDB is supported) +%% OpenOptions are set to use Antidote special comparator in the case of Eleveldb +-spec new(atom(), antidote_db_type()) -> {ok, antidote_db()} | {error, any()}. +new(Name, Type) -> + case Type of + leveldb -> + {ok, Ref} = eleveldb:open(Name, [{create_if_missing, true}, {antidote, true}]), + {ok, {leveldb, Ref}}; + _ -> + {error, type_not_supported} + end. -%% Given a name, returns a new AntidoteDB (ElevelDB) -%% OpenOptions are set to use Antidote special comparator --spec new(atom()) -> {ok, antidote_db()} | {error, any()}. -new(Name) -> - eleveldb:open(Name, [{create_if_missing, true}, {antidote, true}]). %% Closes and destroys the given base -spec close_and_destroy(antidote_db(), atom()) -> ok | {error, any()}. -close_and_destroy(AntidoteDB, Name) -> - eleveldb:close(AntidoteDB), - eleveldb:destroy(Name, []). +close_and_destroy({Type, DB}, Name) -> + case Type of + leveldb -> + eleveldb:close(DB), + eleveldb:destroy(Name, []); + _ -> + {error, type_not_supported} + end. -spec close(antidote_db()) -> ok | {error, any()}. -close(AntidoteDB) -> - eleveldb:close(AntidoteDB). - -%% @doc returns the value of Key, in the antidote_db --spec get(antidote_db(), any()) -> term() | not_found. -get(AntidoteDB, Key) -> - AKey = case is_binary(Key) of - true -> Key; - false -> term_to_binary(Key) - end, - case eleveldb:get(AntidoteDB, AKey, []) of - {ok, Res} -> - binary_to_term(Res); - not_found -> - not_found +close({Type, DB}) -> + case Type of + leveldb -> + eleveldb:close(DB); + _ -> + {error, type_not_supported} + end. + +%% Gets the most suitable snapshot for Key that has been committed +%% before CommitTime. If its nothing is found, returns {error, not_found} +-spec get_snapshot(antidote_db:antidote_db(), key(), + snapshot_time()) -> {ok, snapshot(), snapshot_time()} | {error, not_found}. +get_snapshot({Type, DB}, Key, CommitTime) -> + case Type of + leveldb -> + leveldb_wrapper:get_snapshot(DB, Key, CommitTime); + _ -> + {error, type_not_supported} end. -%% @doc puts the Value associated to Key in eleveldb AntidoteDB --spec put(antidote_db(), any(), any()) -> ok | {error, any()}. -put(AntidoteDB, Key, Value) -> - AKey = case is_binary(Key) of - true -> Key; - false -> term_to_binary(Key) - end, - ATerm = case is_binary(Value) of - true -> Value; - false -> term_to_binary(Value) - end, - eleveldb:put(AntidoteDB, AKey, ATerm, []). - --spec fold(antidote_db(), eleveldb:fold_fun(), any(), eleveldb:read_options()) -> any(). -fold(AntidoteDB, Fun, Acc0, Opts) -> - eleveldb:fold(AntidoteDB, Fun, Acc0, Opts). - --spec fold_keys(antidote_db(), eleveldb:fold_keys_fun(), any(), eleveldb:read_options()) -> any(). -fold_keys(AntidoteDB, Fun, Acc0, Opts) -> - eleveldb:fold_keys(AntidoteDB, Fun, Acc0, Opts). - --spec is_empty(antidote_db()) -> boolean(). -is_empty(AntidoteDB) -> - eleveldb:is_empty(AntidoteDB). - -repair(Name) -> - eleveldb:repair(Name, []). - --spec delete(antidote_db(), binary()) -> ok | {error, any()}. -delete(AntidoteDB, Key) -> - eleveldb:delete(AntidoteDB, Key, []). +%% Saves the snapshot into AntidoteDB +-spec put_snapshot(antidote_db:antidote_db(), key(), snapshot_time(), + snapshot()) -> ok | error. +put_snapshot({Type, DB}, Key, SnapshotTime, Snapshot) -> + case Type of + leveldb -> + leveldb_wrapper:put_snapshot(DB, Key, SnapshotTime, Snapshot); + _ -> + {error, type_not_supported} + end. + +%% Returns a list of operations that have commit time in the range [VCFrom, VCTo] +-spec get_ops(antidote_db:antidote_db(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. +get_ops({Type, DB}, Key, VCFrom, VCTo) -> + case Type of + leveldb -> + leveldb_wrapper:get_ops(DB, Key, VCFrom, VCTo); + _ -> + {error, type_not_supported} + end. + + +%% Saves the operation into AntidoteDB +-spec put_op(antidote_db:antidote_db(), key(), vectorclock(), #log_record{}) -> ok | error. +put_op({Type, DB}, Key, VC, Record) -> + case Type of + leveldb -> + leveldb_wrapper:put_op(DB, Key, VC, Record); + _ -> + {error, type_not_supported} + end. + + diff --git a/src/antidote_db_wrapper.erl b/src/leveldb_wrapper.erl similarity index 69% rename from src/antidote_db_wrapper.erl rename to src/leveldb_wrapper.erl index 73bf479..30505d0 100644 --- a/src/antidote_db_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -17,7 +17,7 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(antidote_db_wrapper). +-module(leveldb_wrapper). -include_lib("antidote_utils/include/antidote_utils.hrl"). @@ -32,11 +32,11 @@ %% Gets the most suitable snapshot for Key that has been committed %% before CommitTime. If its nothing is found, returns {error, not_found} --spec get_snapshot(antidote_db:antidote_db(), key(), - snapshot_time()) -> {ok, snapshot(), snapshot_time()} | {error, not_found}. -get_snapshot(AntidoteDB, Key, CommitTime) -> +-spec get_snapshot(eleveldb:db_ref(), key(), snapshot_time()) -> + {ok, snapshot(), snapshot_time()} | {error, not_found}. +get_snapshot(DB, Key, CommitTime) -> try - antidote_db:fold(AntidoteDB, + eleveldb:fold(DB, fun({K, V}, AccIn) -> {Key1, VC, SNAP} = binary_to_term(K), case (Key1 == Key) of %% check same key @@ -69,15 +69,15 @@ get_snapshot(AntidoteDB, Key, CommitTime) -> snapshot()) -> ok | error. put_snapshot(AntidoteDB, Key, SnapshotTime, Snapshot) -> SnapshotTimeList = vectorclock_to_sorted_list(SnapshotTime), - antidote_db:put(AntidoteDB, {binary_to_atom(Key), SnapshotTimeList, snap}, Snapshot). + put(AntidoteDB, {binary_to_atom(Key), SnapshotTimeList, snap}, Snapshot). %% Returns a list of operations that have commit time in the range [VCFrom, VCTo] --spec get_ops(antidote_db:antidote_db(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. -get_ops(AntidoteDB, Key, VCFrom, VCTo) -> +-spec get_ops(eleveldb:db_ref(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. +get_ops(DB, Key, VCFrom, VCTo) -> VCFromDict = vectorclock_to_dict(VCFrom), VCToDict = vectorclock_to_dict(VCTo), try - Res = antidote_db:fold(AntidoteDB, + Res = eleveldb:fold(DB, fun({K, V}, AccIn) -> {Key1, VC1, OP} = binary_to_term(K), VC1Dict = vectorclock:from_list(VC1), @@ -118,10 +118,10 @@ get_ops(AntidoteDB, Key, VCFrom, VCTo) -> end. %% Saves the operation into AntidoteDB --spec put_op(antidote_db:antidote_db(), key(), vectorclock(), #log_record{}) -> ok | error. -put_op(AntidoteDB, Key, VC, Record) -> +-spec put_op(eleveldb:db_ref(), key(), vectorclock(), #log_record{}) -> ok | error. +put_op(DB, Key, VC, Record) -> VCList = vectorclock_to_sorted_list(VC), - antidote_db:put(AntidoteDB, {binary_to_atom(Key), VCList, op}, Record). + put(DB, {binary_to_atom(Key), VCList, op}, Record). vectorclock_to_dict(VC) -> case is_list(VC) of @@ -144,6 +144,19 @@ binary_to_atom(Key) -> false -> Key end. +%% @doc puts the Value associated to Key in eleveldb AntidoteDB +-spec put(eleveldb:db_ref(), any(), any()) -> ok | {error, any()}. +put(DB, Key, Value) -> + AKey = case is_binary(Key) of + true -> Key; + false -> term_to_binary(Key) + end, + ATerm = case is_binary(Value) of + true -> Value; + false -> term_to_binary(Value) + end, + eleveldb:put(DB, AKey, ATerm, []). + -ifdef(TEST). %% This test ensures vectorclock_to_list method @@ -154,42 +167,44 @@ vectorclock_to_sorted_list_test() -> get_snapshot_not_found_test() -> eleveldb:destroy("get_snapshot_not_found_test", []), - {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_found_test"), + {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_found_test", leveldb), + {leveldb, DB} = AntidoteDB, Key = key, Key1 = key1, Key2 = key2, %% No snapshot in the DB - NotFound = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), + NotFound = get_snapshot(DB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), ?assertEqual({error, not_found}, NotFound), %% Put 10 snapshots for Key and check there is no snapshot with time 0 in both DCs - put_n_snapshots(AntidoteDB, Key, 10), - NotFound1 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), + put_n_snapshots(DB, Key, 10), + NotFound1 = get_snapshot(DB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), ?assertEqual({error, not_found}, NotFound1), %% Look for a snapshot for Key1 - S1 = get_snapshot(AntidoteDB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), + S1 = get_snapshot(DB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), ?assertEqual({error, not_found}, S1), %% Put snapshots for Key2 and look for a snapshot for Key1 - put_n_snapshots(AntidoteDB, Key2, 10), - S2 = get_snapshot(AntidoteDB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), + put_n_snapshots(DB, Key2, 10), + S2 = get_snapshot(DB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), ?assertEqual({error, not_found}, S2), antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_not_found_test"). get_snapshot_matching_vc_test() -> eleveldb:destroy("get_snapshot_matching_vc_test", []), - {ok, AntidoteDB} = antidote_db:new("get_snapshot_matching_vc_test"), + {ok, AntidoteDB} = antidote_db:new("get_snapshot_matching_vc_test", leveldb), + {leveldb, DB} = AntidoteDB, Key = key, - put_n_snapshots(AntidoteDB, Key, 10), + put_n_snapshots(DB, Key, 10), %% get some of the snapshots inserted (matches VC) - S1 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), - S2 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), - S3 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), + S1 = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), + S2 = get_snapshot(DB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), + S3 = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), ?assertEqual({ok, 1, [{local, 1}, {remote, 1}]}, S1), ?assertEqual({ok, 4, [{local, 4}, {remote, 4}]}, S2), ?assertEqual({ok, 8, [{local, 8}, {remote, 8}]}, S3), @@ -199,15 +214,16 @@ get_snapshot_matching_vc_test() -> get_snapshot_not_matching_vc_test() -> eleveldb:destroy("get_snapshot_not_matching_vc_test", []), - {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_matching_vc_test"), + {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_matching_vc_test", leveldb), + {leveldb, DB} = AntidoteDB, Key = key, - put_n_snapshots(AntidoteDB, Key, 10), + put_n_snapshots(DB, Key, 10), %% get snapshots with different times in their DCs - S4 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), - S5 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 5}, {remote, 4}])), - S6 = get_snapshot(AntidoteDB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + S4 = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), + S5 = get_snapshot(DB, Key, vectorclock:from_list([{local, 5}, {remote, 4}])), + S6 = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), ?assertEqual({error, not_found}, S4), ?assertEqual({ok, 4, [{local, 4}, {remote, 4}]}, S5), ?assertEqual({ok, 8, [{local, 8}, {remote, 8}]}, S6), @@ -216,24 +232,25 @@ get_snapshot_not_matching_vc_test() -> get_operations_empty_result_test() -> eleveldb:destroy("get_operations_not_found_test", []), - {ok, AntidoteDB} = antidote_db:new("get_operations_not_found_test"), + {ok, AntidoteDB} = antidote_db:new("get_operations_not_found_test", leveldb), + {leveldb, DB} = AntidoteDB, Key = key, Key1 = key1, %% Nothing in the DB yet return empty list - O1 = get_ops(AntidoteDB, Key, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + O1 = get_ops(DB, Key, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), ?assertEqual([], O1), - put_n_operations(AntidoteDB, Key, 10), + put_n_operations(DB, Key, 10), %% Getting something out of range returns an empty list - O2 = get_ops(AntidoteDB, Key, [{local, 123}, {remote, 100}], [{local, 200}, {remote, 124}]), + O2 = get_ops(DB, Key, [{local, 123}, {remote, 100}], [{local, 200}, {remote, 124}]), ?assertEqual([], O2), %% Getting a key not present, returns an empty list - O3 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 4}, {remote, 5}]), + O3 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 4}, {remote, 5}]), ?assertEqual([], O3), %% Searching for the same range returns an empty list - O4 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 2}, {remote, 2}]), + O4 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 2}, {remote, 2}]), ?assertEqual([], O4), antidote_db:close_and_destroy(AntidoteDB, "get_operations_not_found_test"). @@ -241,19 +258,20 @@ get_operations_empty_result_test() -> get_operations_non_empty_test() -> eleveldb:destroy("get_operations_non_empty_test", []), - {ok, AntidoteDB} = antidote_db:new("get_operations_non_empty_test"), + {ok, AntidoteDB} = antidote_db:new("get_operations_non_empty_test", leveldb), + {leveldb, DB} = AntidoteDB, %% Fill the DB with values Key = key, Key1 = key1, Key2 = key2, - put_n_operations(AntidoteDB, Key, 100), - put_n_operations(AntidoteDB, Key1, 10), - put_n_operations(AntidoteDB, Key2, 25), + put_n_operations(DB, Key, 100), + put_n_operations(DB, Key1, 10), + put_n_operations(DB, Key2, 25), %% concurrent operations are present in the result - O1 = get_ops(AntidoteDB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), - O2 = get_ops(AntidoteDB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), + O1 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + O2 = get_ops(DB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), ?assertEqual([9, 8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), ?assertEqual([7, 6, 5, 4], filter_records_into_numbers(O2)), @@ -261,23 +279,24 @@ get_operations_non_empty_test() -> operations_and_snapshots_mixed_test() -> eleveldb:destroy("operations_and_snapshots_mixed_test", []), - {ok, AntidoteDB} = antidote_db:new("operations_and_snapshots_mixed_test"), + {ok, AntidoteDB} = antidote_db:new("operations_and_snapshots_mixed_test", leveldb), + {leveldb, DB} = AntidoteDB, Key = key, Key1 = key1, Key2 = key2, VCTo = [{local, 7}, {remote, 8}], - put_n_operations(AntidoteDB, Key, 10), - put_n_operations(AntidoteDB, Key1, 20), - put_snapshot(AntidoteDB, Key1, [{local, 2}, {remote, 3}], 5), - put_n_operations(AntidoteDB, Key2, 8), + put_n_operations(DB, Key, 10), + put_n_operations(DB, Key1, 20), + put_snapshot(DB, Key1, [{local, 2}, {remote, 3}], 5), + put_n_operations(DB, Key2, 8), %% We want all ops for Key1 that are between the snapshot and %% [{local, 7}, {remote, 8}]. First get the snapshot, then OPS. - {ok, Value, VCFrom} = get_snapshot(AntidoteDB, Key1, vectorclock:from_list(VCTo)), + {ok, Value, VCFrom} = get_snapshot(DB, Key1, vectorclock:from_list(VCTo)), ?assertEqual({ok, 5, [{local, 2}, {remote, 3}]}, {ok, Value, VCFrom}), - O1 = get_ops(AntidoteDB, Key1, VCFrom, VCTo), + O1 = get_ops(DB, Key1, VCFrom, VCTo), ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), antidote_db:close_and_destroy(AntidoteDB, "operations_and_snapshots_mixed_test"). @@ -286,47 +305,48 @@ operations_and_snapshots_mixed_test() -> %% with VCs containing != lengths and values length_of_vc_test() -> eleveldb:destroy("length_of_vc_test", []), - {ok, AntidoteDB} = antidote_db:new("length_of_vc_test"), + {ok, AntidoteDB} = antidote_db:new("length_of_vc_test", leveldb), + {leveldb, DB} = AntidoteDB, %% Same key, and same value for the local DC %% OP2 should be newer than op1 since it contains 1 more DC in its VC Key = key, - put_op(AntidoteDB, Key, [{local, 2}], #log_record{version = 1}), - put_op(AntidoteDB, Key, [{local, 2}, {remote, 3}], #log_record{version = 2}), - O1 = filter_records_into_numbers(get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}])), + put_op(DB, Key, [{local, 2}], #log_record{version = 1}), + put_op(DB, Key, [{local, 2}, {remote, 3}], #log_record{version = 2}), + O1 = filter_records_into_numbers(get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}])), ?assertEqual([2, 1], O1), %% Insert OP3, with no remote DC value and check it´s newer than 1 and 2 - put_op(AntidoteDB, Key, [{local, 3}], #log_record{version = 3}), - O2 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + put_op(DB, Key, [{local, 3}], #log_record{version = 3}), + O2 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), ?assertEqual([3, 2, 1], filter_records_into_numbers(O2)), %% OP3 is still returned if the local value we look for is lower %% This is the expected outcome for vectorclock gt and lt methods - O3 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), + O3 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), ?assertEqual([3, 2, 1], filter_records_into_numbers(O3)), %% Insert remote operation not containing local clock and check is the oldest one - put_op(AntidoteDB, Key, [{remote, 1}], #log_record{version = 4}), - O4 = get_ops(AntidoteDB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + put_op(DB, Key, [{remote, 1}], #log_record{version = 4}), + O4 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), ?assertEqual([3, 2, 1, 4], filter_records_into_numbers(O4)), antidote_db:close_and_destroy(AntidoteDB, "length_of_vc_test"). -put_n_snapshots(_AntidoteDB, _Key, 0) -> +put_n_snapshots(_DB, _Key, 0) -> ok; -put_n_snapshots(AntidoteDB, Key, N) -> - put_snapshot(AntidoteDB, Key, [{local, N}, {remote, N}], N), - put_n_snapshots(AntidoteDB, Key, N - 1). +put_n_snapshots(DB, Key, N) -> + put_snapshot(DB, Key, [{local, N}, {remote, N}], N), + put_n_snapshots(DB, Key, N - 1). -put_n_operations(_AntidoteDB, _Key, 0) -> +put_n_operations(_DB, _Key, 0) -> ok; -put_n_operations(AntidoteDB, Key, N) -> +put_n_operations(DB, Key, N) -> %% For testing purposes, we use only the version in the record to identify %% the different ops, since it's easier than reproducing the whole record - put_op(AntidoteDB, Key, [{local, N}, {remote, N}], + put_op(DB, Key, [{local, N}, {remote, N}], #log_record{version = N}), - put_n_operations(AntidoteDB, Key, N - 1). + put_n_operations(DB, Key, N - 1). filter_records_into_numbers(List) -> lists:foldr(fun(Record, Acum) -> [Record#log_record.version | Acum] end, [], List). From 17377dc481651e7e56c2ac81d7caa259d2e1520b Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Tue, 20 Sep 2016 20:38:52 -0300 Subject: [PATCH 15/23] Added a test for to check the reutnr type in all methods of AntidoteDB, when a non existent type is passed. --- src/antidote_db.erl | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index f0bc8fb..7a72c13 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -37,6 +37,10 @@ -export_type([antidote_db/0, antidote_db_type/0]). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %% Given a name, returns a new AntidoteDB (for now, only ElevelDB is supported) %% OpenOptions are set to use Antidote special comparator in the case of Eleveldb -spec new(atom(), antidote_db_type()) -> {ok, antidote_db()} | {error, any()}. @@ -115,3 +119,15 @@ put_op({Type, DB}, Key, VC, Record) -> end. +-ifdef(TEST). + +wrong_types_test() -> + ?assertEqual({error, type_not_supported}, new("TEST", type)), + ?assertEqual({error, type_not_supported}, close_and_destroy({type, db}, name)), + ?assertEqual({error, type_not_supported}, close({type, db})), + ?assertEqual({error, type_not_supported}, get_snapshot({type, db}, key, [])), + ?assertEqual({error, type_not_supported}, put_snapshot({type, db}, key, [], [])), + ?assertEqual({error, type_not_supported}, get_ops({type, db}, key, [], [])), + ?assertEqual({error, type_not_supported}, put_op({type, db}, key, [], [])). + +-endif. From 0cba2b8c64c9a5674b8f8f4718a153702a34d575 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Wed, 21 Sep 2016 14:44:24 -0300 Subject: [PATCH 16/23] Changes to use the materealized_snapshot in antidote_uti.s --- src/antidote_db.erl | 13 +++++----- src/leveldb_wrapper.erl | 56 +++++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index 7a72c13..e95861d 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -27,7 +27,7 @@ close_and_destroy/2, close/1, get_snapshot/3, - put_snapshot/4, + put_snapshot/3, get_ops/4, put_op/4]). @@ -77,7 +77,7 @@ close({Type, DB}) -> %% Gets the most suitable snapshot for Key that has been committed %% before CommitTime. If its nothing is found, returns {error, not_found} -spec get_snapshot(antidote_db:antidote_db(), key(), - snapshot_time()) -> {ok, snapshot(), snapshot_time()} | {error, not_found}. + snapshot_time()) -> {ok, #materialized_snapshot{}} | {error, not_found}. get_snapshot({Type, DB}, Key, CommitTime) -> case Type of leveldb -> @@ -87,12 +87,11 @@ get_snapshot({Type, DB}, Key, CommitTime) -> end. %% Saves the snapshot into AntidoteDB --spec put_snapshot(antidote_db:antidote_db(), key(), snapshot_time(), - snapshot()) -> ok | error. -put_snapshot({Type, DB}, Key, SnapshotTime, Snapshot) -> +-spec put_snapshot(antidote_db:antidote_db(), key(), #materialized_snapshot{}) -> ok | error. +put_snapshot({Type, DB}, Key, Snapshot) -> case Type of leveldb -> - leveldb_wrapper:put_snapshot(DB, Key, SnapshotTime, Snapshot); + leveldb_wrapper:put_snapshot(DB, Key, Snapshot); _ -> {error, type_not_supported} end. @@ -126,7 +125,7 @@ wrong_types_test() -> ?assertEqual({error, type_not_supported}, close_and_destroy({type, db}, name)), ?assertEqual({error, type_not_supported}, close({type, db})), ?assertEqual({error, type_not_supported}, get_snapshot({type, db}, key, [])), - ?assertEqual({error, type_not_supported}, put_snapshot({type, db}, key, [], [])), + ?assertEqual({error, type_not_supported}, put_snapshot({type, db}, key, [])), ?assertEqual({error, type_not_supported}, get_ops({type, db}, key, [], [])), ?assertEqual({error, type_not_supported}, put_op({type, db}, key, [], [])). diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index 30505d0..3353db3 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -22,7 +22,7 @@ -include_lib("antidote_utils/include/antidote_utils.hrl"). -export([get_snapshot/3, - put_snapshot/4, + put_snapshot/3, get_ops/4, put_op/4]). @@ -33,7 +33,7 @@ %% Gets the most suitable snapshot for Key that has been committed %% before CommitTime. If its nothing is found, returns {error, not_found} -spec get_snapshot(eleveldb:db_ref(), key(), snapshot_time()) -> - {ok, snapshot(), snapshot_time()} | {error, not_found}. + {ok, #materialized_snapshot{}} | {error, not_found}. get_snapshot(DB, Key, CommitTime) -> try eleveldb:fold(DB, @@ -46,7 +46,7 @@ get_snapshot(DB, Key, CommitTime) -> vectorclock:le(vectorclock:from_list(VC), CommitTime) of true -> Snapshot = binary_to_term(V), - throw({break, Snapshot, VC}); + throw({break, Snapshot}); _ -> AccIn end; @@ -58,17 +58,16 @@ get_snapshot(DB, Key, CommitTime) -> [{first_key, term_to_binary({Key})}]), {error, not_found} catch - {break, SNAP, VC} -> - {ok, SNAP, VC}; + {break, SNAP} -> + {ok, SNAP}; _ -> {error, not_found} end. %% Saves the snapshot into AntidoteDB --spec put_snapshot(antidote_db:antidote_db(), key(), snapshot_time(), - snapshot()) -> ok | error. -put_snapshot(AntidoteDB, Key, SnapshotTime, Snapshot) -> - SnapshotTimeList = vectorclock_to_sorted_list(SnapshotTime), +-spec put_snapshot(antidote_db:antidote_db(), key(), #materialized_snapshot{}) -> ok | error. +put_snapshot(AntidoteDB, Key, Snapshot) -> + SnapshotTimeList = vectorclock_to_sorted_list(Snapshot#materialized_snapshot.snapshot_time), put(AntidoteDB, {binary_to_atom(Key), SnapshotTimeList, snap}, Snapshot). %% Returns a list of operations that have commit time in the range [VCFrom, VCTo] @@ -133,7 +132,7 @@ vectorclock_to_dict(VC) -> vectorclock_to_sorted_list(VC) -> case is_list(VC) of true -> lists:sort(VC); - false -> lists:sort(vectorclock:to_list(VC)) + false -> lists:sort(dict:to_list(VC)) end. %% Workaround for basho bench @@ -202,12 +201,15 @@ get_snapshot_matching_vc_test() -> put_n_snapshots(DB, Key, 10), %% get some of the snapshots inserted (matches VC) - S1 = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), - S2 = get_snapshot(DB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), - S3 = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), - ?assertEqual({ok, 1, [{local, 1}, {remote, 1}]}, S1), - ?assertEqual({ok, 4, [{local, 4}, {remote, 4}]}, S2), - ?assertEqual({ok, 8, [{local, 8}, {remote, 8}]}, S3), + {ok, S1} = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), + {ok, S2} = get_snapshot(DB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), + {ok, S3} = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), + ?assertEqual([{local, 1}, {remote, 1}], vectorclock_to_sorted_list(S1#materialized_snapshot.snapshot_time)), + ?assertEqual(1, S1#materialized_snapshot.value), + ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S2#materialized_snapshot.snapshot_time)), + ?assertEqual(4, S2#materialized_snapshot.value), + ?assertEqual([{local, 8}, {remote, 8}], vectorclock_to_sorted_list(S3#materialized_snapshot.snapshot_time)), + ?assertEqual(8, S3#materialized_snapshot.value), antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_matching_vc_test"). @@ -222,11 +224,13 @@ get_snapshot_not_matching_vc_test() -> %% get snapshots with different times in their DCs S4 = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), - S5 = get_snapshot(DB, Key, vectorclock:from_list([{local, 5}, {remote, 4}])), - S6 = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + {ok, S5} = get_snapshot(DB, Key, vectorclock:from_list([{local, 5}, {remote, 4}])), + {ok, S6} = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), ?assertEqual({error, not_found}, S4), - ?assertEqual({ok, 4, [{local, 4}, {remote, 4}]}, S5), - ?assertEqual({ok, 8, [{local, 8}, {remote, 8}]}, S6), + ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S5#materialized_snapshot.snapshot_time)), + ?assertEqual(4, S5#materialized_snapshot.value), + ?assertEqual([{local, 8}, {remote, 8}], vectorclock_to_sorted_list(S6#materialized_snapshot.snapshot_time)), + ?assertEqual(8, S6#materialized_snapshot.value), antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_not_matching_vc_test"). @@ -288,15 +292,16 @@ operations_and_snapshots_mixed_test() -> VCTo = [{local, 7}, {remote, 8}], put_n_operations(DB, Key, 10), put_n_operations(DB, Key1, 20), - put_snapshot(DB, Key1, [{local, 2}, {remote, 3}], 5), + put_snapshot(DB, Key1, #materialized_snapshot{snapshot_time = [{local, 2}, {remote, 3}], value = 5}), put_n_operations(DB, Key2, 8), %% We want all ops for Key1 that are between the snapshot and %% [{local, 7}, {remote, 8}]. First get the snapshot, then OPS. - {ok, Value, VCFrom} = get_snapshot(DB, Key1, vectorclock:from_list(VCTo)), - ?assertEqual({ok, 5, [{local, 2}, {remote, 3}]}, {ok, Value, VCFrom}), + {ok, Snapshot} = get_snapshot(DB, Key1, vectorclock:from_list(VCTo)), + ?assertEqual([{local, 2}, {remote, 3}], vectorclock_to_sorted_list(Snapshot#materialized_snapshot.snapshot_time)), + ?assertEqual(5, Snapshot#materialized_snapshot.value), - O1 = get_ops(DB, Key1, VCFrom, VCTo), + O1 = get_ops(DB, Key1, Snapshot#materialized_snapshot.snapshot_time, VCTo), ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), antidote_db:close_and_destroy(AntidoteDB, "operations_and_snapshots_mixed_test"). @@ -336,7 +341,8 @@ length_of_vc_test() -> put_n_snapshots(_DB, _Key, 0) -> ok; put_n_snapshots(DB, Key, N) -> - put_snapshot(DB, Key, [{local, N}, {remote, N}], N), + VC = vectorclock:from_list([{local, N}, {remote, N}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = N}), put_n_snapshots(DB, Key, N - 1). put_n_operations(_DB, _Key, 0) -> From ffb1b101532ef301217c96a67814548595a60add Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 26 Sep 2016 16:32:05 -0300 Subject: [PATCH 17/23] Fixed upper bound in get_ops method --- src/leveldb_wrapper.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index 3353db3..de49be0 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -70,7 +70,10 @@ put_snapshot(AntidoteDB, Key, Snapshot) -> SnapshotTimeList = vectorclock_to_sorted_list(Snapshot#materialized_snapshot.snapshot_time), put(AntidoteDB, {binary_to_atom(Key), SnapshotTimeList, snap}, Snapshot). -%% Returns a list of operations that have commit time in the range [VCFrom, VCTo] +%% Returns a list of operations that have commit time in the range [VCFrom, VCTo). +%% In other words, it returns all ops which have a VectorClock concurrent or larger than VCFrom, +%% and smaller or equal (for all entries) than VCTo. +%% An example on what this method returns can be seen in the test get_operations_non_empty_test. -spec get_ops(eleveldb:db_ref(), key(), vectorclock(), vectorclock()) -> [#log_record{}]. get_ops(DB, Key, VCFrom, VCTo) -> VCFromDict = vectorclock_to_dict(VCFrom), @@ -83,7 +86,7 @@ get_ops(DB, Key, VCFrom, VCTo) -> case Key == Key1 of %% check same key true -> %% if its greater, continue - case vectorclock:gt(VC1Dict, VCToDict) of + case vectorclock:strict_ge(VC1Dict, VCToDict) of true -> AccIn; false -> @@ -275,8 +278,9 @@ get_operations_non_empty_test() -> %% concurrent operations are present in the result O1 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), + O2 = get_ops(DB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), - ?assertEqual([9, 8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), ?assertEqual([7, 6, 5, 4], filter_records_into_numbers(O2)), antidote_db:close_and_destroy(AntidoteDB, "get_operations_non_empty_test"). @@ -302,7 +306,7 @@ operations_and_snapshots_mixed_test() -> ?assertEqual(5, Snapshot#materialized_snapshot.value), O1 = get_ops(DB, Key1, Snapshot#materialized_snapshot.snapshot_time, VCTo), - ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), + ?assertEqual([7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), antidote_db:close_and_destroy(AntidoteDB, "operations_and_snapshots_mixed_test"). From 992109e9e87b0f90409472992ca80fb59b364111 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 26 Sep 2016 19:07:11 -0300 Subject: [PATCH 18/23] Added the get_ops_applicable_to_snapshot method and it's tests. --- src/leveldb_wrapper.erl | 88 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index de49be0..e27ac6a 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -21,7 +21,9 @@ -include_lib("antidote_utils/include/antidote_utils.hrl"). --export([get_snapshot/3, +-export([ + get_ops_applicable_to_snapshot/3, + get_snapshot/3, put_snapshot/3, get_ops/4, put_op/4]). @@ -30,6 +32,48 @@ -include_lib("eunit/include/eunit.hrl"). -endif. + +%% Given a key and a VC, this method returns the most suitable snapshot committed before the VC +%% with a list of operations in the range [SnapshotCommitTime, VC) to be applied to the mentioned +%% snapshot to generate a new one. +-spec get_ops_applicable_to_snapshot(eleveldb:db_ref(), key(), vectorclock()) -> + {ok, #materialized_snapshot{} | not_found, [#log_record{}]}. +get_ops_applicable_to_snapshot(DB, Key, VectorClock) -> + try + Res = eleveldb:fold(DB, + fun({K, V}, AccIn) -> + {Key1, VC1, OP} = binary_to_term(K), + VC1Dict = vectorclock:from_list(VC1), + case Key == Key1 of %% check same key + true -> + %% if its greater, continue + case vectorclock:strict_ge(VC1Dict, VectorClock) of + true -> + AccIn; + false -> + case (OP == op) of + true -> + [binary_to_term(V) | AccIn]; + false -> + throw({break, binary_to_term(V), AccIn}) + end + end; + false -> + throw({break, not_found, AccIn}) + end + end, + [], + [{first_key, term_to_binary({Key})}]), + %% If the fold returned without throwing a break (it iterated all + %% keys and ended up normally) reverse the resulting list + {ok, not_found, lists:reverse(Res)} + catch + {break, Snapshot, OPS} -> + {ok, Snapshot, lists:reverse(OPS)}; + _ -> + {error, not_found} + end. + %% Gets the most suitable snapshot for Key that has been committed %% before CommitTime. If its nothing is found, returns {error, not_found} -spec get_snapshot(eleveldb:db_ref(), key(), snapshot_time()) -> @@ -167,6 +211,48 @@ vectorclock_to_sorted_list_test() -> Sorted = vectorclock_to_sorted_list([{e, 5}, {c, 3}, {a, 1}, {b, 2}, {d, 4}]), ?assertEqual([{a, 1}, {b, 2}, {c, 3}, {d, 4}, {e, 5}], Sorted). +get_ops_applicable_to_snapshot_empty_result_test() -> + eleveldb:destroy("get_ops_applicable_to_snapshot_empty_result_test", []), + {ok, AntidoteDB} = antidote_db:new("get_ops_applicable_to_snapshot_empty_result_test", leveldb), + {leveldb, DB} = AntidoteDB, + + Key = key, + + NotFound = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 3}, {remote, 2}])), + ?assertEqual({ok, not_found, []}, NotFound), + + VC = vectorclock:from_list([{local, 4}, {remote, 4}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 1}), + S1 = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 3}, {remote, 3}])), + ?assertEqual({ok, not_found, []}, S1), + + antidote_db:close_and_destroy(AntidoteDB, "get_ops_applicable_to_snapshot_empty_result_test"). + +get_ops_applicable_to_snapshot_non_empty_result_test() -> + eleveldb:destroy("get_ops_applicable_to_snapshot_non_empty_result_test", []), + {ok, AntidoteDB} = antidote_db:new("get_ops_applicable_to_snapshot_non_empty_result_test", leveldb), + {leveldb, DB} = AntidoteDB, + + Key = key, + Key1 = key1, + + VC = vectorclock:from_list([{local, 4}, {remote, 4}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 1}), + VC1 = vectorclock:from_list([{local, 2}, {remote, 2}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC1, value = 2}), + {ok, S1, Ops1} = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 5}, {remote, 6}])), + ?assertEqual(1, S1#materialized_snapshot.value), + ?assertEqual([], Ops1), + + put_n_operations(DB, Key, 10), + put_n_operations(DB, Key1, 5), + + {ok, S2, Ops2} = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + ?assertEqual(1, S2#materialized_snapshot.value), + ?assertEqual([8, 7, 6, 5, 4], filter_records_into_numbers(Ops2)), + + antidote_db:close_and_destroy(AntidoteDB, "get_ops_applicable_to_snapshot_non_empty_result_test"). + get_snapshot_not_found_test() -> eleveldb:destroy("get_snapshot_not_found_test", []), {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_found_test", leveldb), From eeb1968aa8c25c322a157c07d76f637643dad5b7 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 26 Sep 2016 19:50:18 -0300 Subject: [PATCH 19/23] Refactored tests and added comments --- src/leveldb_wrapper.erl | 355 ++++++++++++++++++++-------------------- 1 file changed, 181 insertions(+), 174 deletions(-) diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index e27ac6a..6330047 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -205,6 +205,17 @@ put(DB, Key, Value) -> -ifdef(TEST). +withFreshDb(F) -> + %% Destroy the test DB to prevent having dirty DBs if a test fails + eleveldb:destroy("test_db", []), + {ok, AntidoteDB} = antidote_db:new("test_db", leveldb), + {leveldb, Db} = AntidoteDB, + try + F(Db) + after + antidote_db:close_and_destroy(AntidoteDB, "test_db") + end. + %% This test ensures vectorclock_to_list method %% sorts VCs the correct way vectorclock_to_sorted_list_test() -> @@ -212,221 +223,217 @@ vectorclock_to_sorted_list_test() -> ?assertEqual([{a, 1}, {b, 2}, {c, 3}, {d, 4}, {e, 5}], Sorted). get_ops_applicable_to_snapshot_empty_result_test() -> - eleveldb:destroy("get_ops_applicable_to_snapshot_empty_result_test", []), - {ok, AntidoteDB} = antidote_db:new("get_ops_applicable_to_snapshot_empty_result_test", leveldb), - {leveldb, DB} = AntidoteDB, - - Key = key, + withFreshDb(fun(DB) -> + Key = key, - NotFound = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 3}, {remote, 2}])), - ?assertEqual({ok, not_found, []}, NotFound), + %% There are no ops nor snapshots in the DB + NotFound = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 3}, {remote, 2}])), + ?assertEqual({ok, not_found, []}, NotFound), - VC = vectorclock:from_list([{local, 4}, {remote, 4}]), - put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 1}), - S1 = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 3}, {remote, 3}])), - ?assertEqual({ok, not_found, []}, S1), + %% Add a new Snapshot, but it's not in the range searched, so the result is still empty + VC = vectorclock:from_list([{local, 4}, {remote, 4}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 1}), - antidote_db:close_and_destroy(AntidoteDB, "get_ops_applicable_to_snapshot_empty_result_test"). + S1 = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 3}, {remote, 3}])), + ?assertEqual({ok, not_found, []}, S1) + end). get_ops_applicable_to_snapshot_non_empty_result_test() -> - eleveldb:destroy("get_ops_applicable_to_snapshot_non_empty_result_test", []), - {ok, AntidoteDB} = antidote_db:new("get_ops_applicable_to_snapshot_non_empty_result_test", leveldb), - {leveldb, DB} = AntidoteDB, + withFreshDb(fun(DB) -> + Key = key, + Key1 = key1, + + %% Save two snapshots + VC = vectorclock:from_list([{local, 4}, {remote, 4}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 1}), + VC1 = vectorclock:from_list([{local, 2}, {remote, 2}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC1, value = 2}), + + %% There is one [4, 4] snapshot matches best so it's returned with no ops since there aren't any + {ok, S1, Ops1} = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + ?assertEqual(1, S1#materialized_snapshot.value), + ?assertEqual([], Ops1), + + %% Add some ops, and try again with the same VC. Now ops to be applied are returned + put_n_operations(DB, Key, 10), + put_n_operations(DB, Key1, 5), + + {ok, S2, Ops2} = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), + ?assertEqual(1, S2#materialized_snapshot.value), + ?assertEqual([8, 7, 6, 5, 4], filter_records_into_numbers(Ops2)) + end). - Key = key, - Key1 = key1, - - VC = vectorclock:from_list([{local, 4}, {remote, 4}]), - put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 1}), - VC1 = vectorclock:from_list([{local, 2}, {remote, 2}]), - put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC1, value = 2}), - {ok, S1, Ops1} = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 5}, {remote, 6}])), - ?assertEqual(1, S1#materialized_snapshot.value), - ?assertEqual([], Ops1), - - put_n_operations(DB, Key, 10), - put_n_operations(DB, Key1, 5), +get_snapshot_not_found_test() -> + withFreshDb(fun(DB) -> + Key = key, + Key1 = key1, + Key2 = key2, + + %% No snapshot in the DB + NotFound = get_snapshot(DB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), + ?assertEqual({error, not_found}, NotFound), + + %% Put 10 snapshots for Key and check there is no snapshot with time 0 in both DCs + put_n_snapshots(DB, Key, 10), + NotFound1 = get_snapshot(DB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), + ?assertEqual({error, not_found}, NotFound1), + + %% Look for a snapshot for Key1 + S1 = get_snapshot(DB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), + ?assertEqual({error, not_found}, S1), + + %% Put snapshots for Key2 and look for a snapshot for Key1 + put_n_snapshots(DB, Key2, 10), + S2 = get_snapshot(DB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), + ?assertEqual({error, not_found}, S2) + end). - {ok, S2, Ops2} = get_ops_applicable_to_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), - ?assertEqual(1, S2#materialized_snapshot.value), - ?assertEqual([8, 7, 6, 5, 4], filter_records_into_numbers(Ops2)), +get_snapshot_matching_vc_test() -> + withFreshDb(fun(DB) -> - antidote_db:close_and_destroy(AntidoteDB, "get_ops_applicable_to_snapshot_non_empty_result_test"). + Key = key, + put_n_snapshots(DB, Key, 10), -get_snapshot_not_found_test() -> - eleveldb:destroy("get_snapshot_not_found_test", []), - {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_found_test", leveldb), - {leveldb, DB} = AntidoteDB, + %% Get some of the snapshots inserted (matching VC) + {ok, S1} = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), + {ok, S2} = get_snapshot(DB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), + {ok, S3} = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), - Key = key, - Key1 = key1, - Key2 = key2, - %% No snapshot in the DB - NotFound = get_snapshot(DB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), - ?assertEqual({error, not_found}, NotFound), + ?assertEqual([{local, 1}, {remote, 1}], vectorclock_to_sorted_list(S1#materialized_snapshot.snapshot_time)), + ?assertEqual(1, S1#materialized_snapshot.value), - %% Put 10 snapshots for Key and check there is no snapshot with time 0 in both DCs - put_n_snapshots(DB, Key, 10), - NotFound1 = get_snapshot(DB, Key, vectorclock:from_list([{local, 0}, {remote, 0}])), - ?assertEqual({error, not_found}, NotFound1), + ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S2#materialized_snapshot.snapshot_time)), + ?assertEqual(4, S2#materialized_snapshot.value), - %% Look for a snapshot for Key1 - S1 = get_snapshot(DB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), - ?assertEqual({error, not_found}, S1), + ?assertEqual([{local, 8}, {remote, 8}], vectorclock_to_sorted_list(S3#materialized_snapshot.snapshot_time)), + ?assertEqual(8, S3#materialized_snapshot.value) + end). - %% Put snapshots for Key2 and look for a snapshot for Key1 - put_n_snapshots(DB, Key2, 10), - S2 = get_snapshot(DB, Key1, vectorclock:from_list([{local, 5}, {remote, 4}])), - ?assertEqual({error, not_found}, S2), - antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_not_found_test"). +get_snapshot_not_matching_vc_test() -> + withFreshDb(fun(DB) -> + Key = key, -get_snapshot_matching_vc_test() -> - eleveldb:destroy("get_snapshot_matching_vc_test", []), - {ok, AntidoteDB} = antidote_db:new("get_snapshot_matching_vc_test", leveldb), - {leveldb, DB} = AntidoteDB, + %% Add 3 snapshots + VC = vectorclock:from_list([{local, 4}, {remote, 4}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 4}), - Key = key, - put_n_snapshots(DB, Key, 10), - %% get some of the snapshots inserted (matches VC) - {ok, S1} = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 1}])), - {ok, S2} = get_snapshot(DB, Key, vectorclock:from_list([{local, 4}, {remote, 4}])), - {ok, S3} = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 8}])), - ?assertEqual([{local, 1}, {remote, 1}], vectorclock_to_sorted_list(S1#materialized_snapshot.snapshot_time)), - ?assertEqual(1, S1#materialized_snapshot.value), - ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S2#materialized_snapshot.snapshot_time)), - ?assertEqual(4, S2#materialized_snapshot.value), - ?assertEqual([{local, 8}, {remote, 8}], vectorclock_to_sorted_list(S3#materialized_snapshot.snapshot_time)), - ?assertEqual(8, S3#materialized_snapshot.value), + VC1 = vectorclock:from_list([{local, 2}, {remote, 3}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC1, value = 2}), - antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_matching_vc_test"). + VC2 = vectorclock:from_list([{local, 8}, {remote, 7}]), + put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC2, value = 8}), -get_snapshot_not_matching_vc_test() -> - eleveldb:destroy("get_snapshot_not_matching_vc_test", []), - {ok, AntidoteDB} = antidote_db:new("get_snapshot_not_matching_vc_test", leveldb), - {leveldb, DB} = AntidoteDB, + %% Request the snapshots using a VC different than the one used to insert them + S4 = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), + {ok, S5} = get_snapshot(DB, Key, vectorclock:from_list([{local, 6}, {remote, 5}])), + {ok, S6} = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), - Key = key, - put_n_snapshots(DB, Key, 10), + ?assertEqual({error, not_found}, S4), - %% get snapshots with different times in their DCs - S4 = get_snapshot(DB, Key, vectorclock:from_list([{local, 1}, {remote, 0}])), - {ok, S5} = get_snapshot(DB, Key, vectorclock:from_list([{local, 5}, {remote, 4}])), - {ok, S6} = get_snapshot(DB, Key, vectorclock:from_list([{local, 8}, {remote, 9}])), - ?assertEqual({error, not_found}, S4), - ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S5#materialized_snapshot.snapshot_time)), - ?assertEqual(4, S5#materialized_snapshot.value), - ?assertEqual([{local, 8}, {remote, 8}], vectorclock_to_sorted_list(S6#materialized_snapshot.snapshot_time)), - ?assertEqual(8, S6#materialized_snapshot.value), + ?assertEqual([{local, 4}, {remote, 4}], vectorclock_to_sorted_list(S5#materialized_snapshot.snapshot_time)), + ?assertEqual(4, S5#materialized_snapshot.value), - antidote_db:close_and_destroy(AntidoteDB, "get_snapshot_not_matching_vc_test"). + ?assertEqual([{local, 8}, {remote, 7}], vectorclock_to_sorted_list(S6#materialized_snapshot.snapshot_time)), + ?assertEqual(8, S6#materialized_snapshot.value) + end). get_operations_empty_result_test() -> - eleveldb:destroy("get_operations_not_found_test", []), - {ok, AntidoteDB} = antidote_db:new("get_operations_not_found_test", leveldb), - {leveldb, DB} = AntidoteDB, - Key = key, - Key1 = key1, - %% Nothing in the DB yet return empty list - O1 = get_ops(DB, Key, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), - ?assertEqual([], O1), + withFreshDb(fun(DB) -> + Key = key, + Key1 = key1, - put_n_operations(DB, Key, 10), - %% Getting something out of range returns an empty list - O2 = get_ops(DB, Key, [{local, 123}, {remote, 100}], [{local, 200}, {remote, 124}]), - ?assertEqual([], O2), + %% Nothing in the DB returns an empty list + O1 = get_ops(DB, Key, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + ?assertEqual([], O1), - %% Getting a key not present, returns an empty list - O3 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 4}, {remote, 5}]), - ?assertEqual([], O3), + %% Insert some operations + put_n_operations(DB, Key, 10), - %% Searching for the same range returns an empty list - O4 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 2}, {remote, 2}]), - ?assertEqual([], O4), + %% Requesting for ops in a range with noting, returns an empty list + O2 = get_ops(DB, Key, [{local, 123}, {remote, 100}], [{local, 200}, {remote, 124}]), + ?assertEqual([], O2), - antidote_db:close_and_destroy(AntidoteDB, "get_operations_not_found_test"). + %% Getting a key not present, returns an empty list + O3 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 4}, {remote, 5}]), + ?assertEqual([], O3), + %% Searching for the same range returns an empty list + O4 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 2}, {remote, 2}]), + ?assertEqual([], O4) + end). -get_operations_non_empty_test() -> - eleveldb:destroy("get_operations_non_empty_test", []), - {ok, AntidoteDB} = antidote_db:new("get_operations_non_empty_test", leveldb), - {leveldb, DB} = AntidoteDB, - %% Fill the DB with values - Key = key, - Key1 = key1, - Key2 = key2, - put_n_operations(DB, Key, 100), - put_n_operations(DB, Key1, 10), - put_n_operations(DB, Key2, 25), +get_operations_non_empty_test() -> + withFreshDb(fun(DB) -> + %% Fill the DB with values + Key = key, + Key1 = key1, + Key2 = key2, - %% concurrent operations are present in the result - O1 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), - ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), + put_n_operations(DB, Key, 100), + put_n_operations(DB, Key1, 10), + put_n_operations(DB, Key2, 25), - O2 = get_ops(DB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), - ?assertEqual([7, 6, 5, 4], filter_records_into_numbers(O2)), + %% Concurrent operations int the lower bound are present in the result + O1 = get_ops(DB, Key1, [{local, 2}, {remote, 2}], [{local, 8}, {remote, 9}]), + ?assertEqual([8, 7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), - antidote_db:close_and_destroy(AntidoteDB, "get_operations_non_empty_test"). + O2 = get_ops(DB, Key1, [{local, 4}, {remote, 5}], [{local, 7}, {remote, 7}]), + ?assertEqual([7, 6, 5, 4], filter_records_into_numbers(O2)) + end). operations_and_snapshots_mixed_test() -> - eleveldb:destroy("operations_and_snapshots_mixed_test", []), - {ok, AntidoteDB} = antidote_db:new("operations_and_snapshots_mixed_test", leveldb), - {leveldb, DB} = AntidoteDB, - - Key = key, - Key1 = key1, - Key2 = key2, - VCTo = [{local, 7}, {remote, 8}], - put_n_operations(DB, Key, 10), - put_n_operations(DB, Key1, 20), - put_snapshot(DB, Key1, #materialized_snapshot{snapshot_time = [{local, 2}, {remote, 3}], value = 5}), - put_n_operations(DB, Key2, 8), - - %% We want all ops for Key1 that are between the snapshot and - %% [{local, 7}, {remote, 8}]. First get the snapshot, then OPS. - {ok, Snapshot} = get_snapshot(DB, Key1, vectorclock:from_list(VCTo)), - ?assertEqual([{local, 2}, {remote, 3}], vectorclock_to_sorted_list(Snapshot#materialized_snapshot.snapshot_time)), - ?assertEqual(5, Snapshot#materialized_snapshot.value), - - O1 = get_ops(DB, Key1, Snapshot#materialized_snapshot.snapshot_time, VCTo), - ?assertEqual([7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)), - - antidote_db:close_and_destroy(AntidoteDB, "operations_and_snapshots_mixed_test"). + withFreshDb(fun(DB) -> + Key = key, + Key1 = key1, + Key2 = key2, + + VCTo = [{local, 7}, {remote, 8}], + put_n_operations(DB, Key, 10), + put_n_operations(DB, Key1, 20), + put_snapshot(DB, Key1, #materialized_snapshot{snapshot_time = [{local, 2}, {remote, 3}], value = 5}), + put_n_operations(DB, Key2, 8), + + %% We want all ops for Key1 that are between the snapshot and + %% [{local, 7}, {remote, 8}]. First get the snapshot, then OPS. + {ok, Snapshot} = get_snapshot(DB, Key1, vectorclock:from_list(VCTo)), + ?assertEqual([{local, 2}, {remote, 3}], vectorclock_to_sorted_list(Snapshot#materialized_snapshot.snapshot_time)), + ?assertEqual(5, Snapshot#materialized_snapshot.value), + + O1 = get_ops(DB, Key1, Snapshot#materialized_snapshot.snapshot_time, VCTo), + ?assertEqual([7, 6, 5, 4, 3, 2], filter_records_into_numbers(O1)) + end). %% This test is used to check that compare function for VCs is working OK %% with VCs containing != lengths and values length_of_vc_test() -> - eleveldb:destroy("length_of_vc_test", []), - {ok, AntidoteDB} = antidote_db:new("length_of_vc_test", leveldb), - {leveldb, DB} = AntidoteDB, - - %% Same key, and same value for the local DC - %% OP2 should be newer than op1 since it contains 1 more DC in its VC - Key = key, - put_op(DB, Key, [{local, 2}], #log_record{version = 1}), - put_op(DB, Key, [{local, 2}, {remote, 3}], #log_record{version = 2}), - O1 = filter_records_into_numbers(get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}])), - ?assertEqual([2, 1], O1), - - %% Insert OP3, with no remote DC value and check it´s newer than 1 and 2 - put_op(DB, Key, [{local, 3}], #log_record{version = 3}), - O2 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), - ?assertEqual([3, 2, 1], filter_records_into_numbers(O2)), - - %% OP3 is still returned if the local value we look for is lower - %% This is the expected outcome for vectorclock gt and lt methods - O3 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), - ?assertEqual([3, 2, 1], filter_records_into_numbers(O3)), - - %% Insert remote operation not containing local clock and check is the oldest one - put_op(DB, Key, [{remote, 1}], #log_record{version = 4}), - O4 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), - ?assertEqual([3, 2, 1, 4], filter_records_into_numbers(O4)), - - antidote_db:close_and_destroy(AntidoteDB, "length_of_vc_test"). + withFreshDb(fun(DB) -> + %% Same key, and same value for the local DC + %% OP2 should be newer than op1 since it contains 1 more DC in its VC + Key = key, + put_op(DB, Key, [{local, 2}], #log_record{version = 1}), + put_op(DB, Key, [{local, 2}, {remote, 3}], #log_record{version = 2}), + O1 = filter_records_into_numbers(get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}])), + ?assertEqual([2, 1], O1), + + %% Insert OP3, with no remote DC value and check it´s newer than 1 and 2 + put_op(DB, Key, [{local, 3}], #log_record{version = 3}), + O2 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + ?assertEqual([3, 2, 1], filter_records_into_numbers(O2)), + + %% OP3 is still returned if the local value we look for is lower + %% This is the expected outcome for vectorclock gt and lt methods + O3 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), + ?assertEqual([3, 2, 1], filter_records_into_numbers(O3)), + + %% Insert remote operation not containing local clock and check is the oldest one + put_op(DB, Key, [{remote, 1}], #log_record{version = 4}), + O4 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), + ?assertEqual([3, 2, 1, 4], filter_records_into_numbers(O4)) + end). put_n_snapshots(_DB, _Key, 0) -> ok; From 17fbcfc12fb9f929b311078dc08b4651bc643e4a Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Mon, 26 Sep 2016 19:55:16 -0300 Subject: [PATCH 20/23] Added the get_ops_applicable_to_snapshot method to the antidote_db interface --- src/antidote_db.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/antidote_db.erl b/src/antidote_db.erl index e95861d..a447e3c 100644 --- a/src/antidote_db.erl +++ b/src/antidote_db.erl @@ -26,6 +26,7 @@ new/2, close_and_destroy/2, close/1, + get_ops_applicable_to_snapshot/3, get_snapshot/3, put_snapshot/3, get_ops/4, @@ -74,6 +75,16 @@ close({Type, DB}) -> {error, type_not_supported} end. +-spec get_ops_applicable_to_snapshot(antidote_db:antidote_db(), key(), vectorclock()) -> + {ok, #materialized_snapshot{} | not_found, [#log_record{}]}. +get_ops_applicable_to_snapshot({Type, DB}, Key, VectorClock) -> + case Type of + leveldb -> + leveldb_wrapper:get_ops_applicable_to_snapshot(DB, Key, VectorClock); + _ -> + {error, type_not_supported} + end. + %% Gets the most suitable snapshot for Key that has been committed %% before CommitTime. If its nothing is found, returns {error, not_found} -spec get_snapshot(antidote_db:antidote_db(), key(), From 28b0d8b65713d37c50600253dc2566d4716ad4b4 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Tue, 27 Sep 2016 22:55:06 -0300 Subject: [PATCH 21/23] Fixed lower bound in get_ops range --- src/leveldb_wrapper.erl | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index 6330047..7418202 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -47,16 +47,16 @@ get_ops_applicable_to_snapshot(DB, Key, VectorClock) -> case Key == Key1 of %% check same key true -> %% if its greater, continue - case vectorclock:strict_ge(VC1Dict, VectorClock) of + case vectorclock:le(VC1Dict, VectorClock) of true -> - AccIn; - false -> case (OP == op) of true -> [binary_to_term(V) | AccIn]; false -> throw({break, binary_to_term(V), AccIn}) - end + end; + false -> + AccIn end; false -> throw({break, not_found, AccIn}) @@ -127,17 +127,16 @@ get_ops(DB, Key, VCFrom, VCTo) -> fun({K, V}, AccIn) -> {Key1, VC1, OP} = binary_to_term(K), VC1Dict = vectorclock:from_list(VC1), + io:format("~p : ~p ~n", [binary_to_term(K), binary_to_term(V)]), case Key == Key1 of %% check same key true -> %% if its greater, continue - case vectorclock:strict_ge(VC1Dict, VCToDict) of + case vectorclock:le(VC1Dict, VCToDict) of true -> - AccIn; - false -> %% check its an op and its commit time is in the required range case vectorclock:lt(VC1Dict, VCFromDict) of true -> - throw({break, AccIn}); + AccIn; false -> case (OP == op) of true -> @@ -145,7 +144,9 @@ get_ops(DB, Key, VCFrom, VCTo) -> false -> AccIn end - end + end; + false -> + AccIn end; false -> throw({break, AccIn}) @@ -309,7 +310,6 @@ get_snapshot_matching_vc_test() -> ?assertEqual(8, S3#materialized_snapshot.value) end). - get_snapshot_not_matching_vc_test() -> withFreshDb(fun(DB) -> Key = key, @@ -318,11 +318,9 @@ get_snapshot_not_matching_vc_test() -> VC = vectorclock:from_list([{local, 4}, {remote, 4}]), put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC, value = 4}), - VC1 = vectorclock:from_list([{local, 2}, {remote, 3}]), put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC1, value = 2}), - VC2 = vectorclock:from_list([{local, 8}, {remote, 7}]), put_snapshot(DB, Key, #materialized_snapshot{snapshot_time = VC2, value = 8}), @@ -365,7 +363,6 @@ get_operations_empty_result_test() -> ?assertEqual([], O4) end). - get_operations_non_empty_test() -> withFreshDb(fun(DB) -> %% Fill the DB with values @@ -424,10 +421,9 @@ length_of_vc_test() -> O2 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 7}, {remote, 8}]), ?assertEqual([3, 2, 1], filter_records_into_numbers(O2)), - %% OP3 is still returned if the local value we look for is lower - %% This is the expected outcome for vectorclock gt and lt methods + %% OP3 is not returned if the local value we look for is lower O3 = get_ops(DB, Key, [{local, 1}, {remote, 1}], [{local, 2}, {remote, 8}]), - ?assertEqual([3, 2, 1], filter_records_into_numbers(O3)), + ?assertEqual([2, 1], filter_records_into_numbers(O3)), %% Insert remote operation not containing local clock and check is the oldest one put_op(DB, Key, [{remote, 1}], #log_record{version = 4}), From 26f66a81ebd65edde463f05d8db3f6d8faf10489 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Wed, 28 Sep 2016 19:13:40 -0300 Subject: [PATCH 22/23] Fixed bug with concurrent ops --- src/leveldb_wrapper.erl | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index 7418202..798339f 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -28,11 +28,12 @@ get_ops/4, put_op/4]). +-record(ops_record, {ops_list :: list(), last_vc :: vectorclock()}). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. - %% Given a key and a VC, this method returns the most suitable snapshot committed before the VC %% with a list of operations in the range [SnapshotCommitTime, VC) to be applied to the mentioned %% snapshot to generate a new one. @@ -127,7 +128,6 @@ get_ops(DB, Key, VCFrom, VCTo) -> fun({K, V}, AccIn) -> {Key1, VC1, OP} = binary_to_term(K), VC1Dict = vectorclock:from_list(VC1), - io:format("~p : ~p ~n", [binary_to_term(K), binary_to_term(V)]), case Key == Key1 of %% check same key true -> %% if its greater, continue @@ -136,27 +136,34 @@ get_ops(DB, Key, VCFrom, VCTo) -> %% check its an op and its commit time is in the required range case vectorclock:lt(VC1Dict, VCFromDict) of true -> - AccIn; + %% Check if last two VCs are concurrent or we should break the fold + %% If it's concurrent, save the last vc for the next iteration + case concurrent_or_empty_vc(VC1Dict, AccIn#ops_record.last_vc) of + true -> + #ops_record{ops_list = AccIn#ops_record.ops_list, last_vc = VC1Dict}; + false -> + throw({break, AccIn#ops_record.ops_list}) + end; false -> case (OP == op) of true -> - [binary_to_term(V) | AccIn]; + #ops_record{ops_list = [binary_to_term(V) | AccIn#ops_record.ops_list], last_vc = vectorclock:new()}; false -> AccIn end end; false -> - AccIn + #ops_record{ops_list = AccIn#ops_record.ops_list, last_vc = vectorclock:new()} end; false -> - throw({break, AccIn}) + throw({break, AccIn#ops_record.ops_list}) end end, - [], + #ops_record{ops_list = [], last_vc = vectorclock:new()}, [{first_key, term_to_binary({Key})}]), %% If the fold returned without throwing a break (it iterated all %% keys and ended up normally) reverse the resulting list - lists:reverse(Res) + lists:reverse(Res#ops_record.ops_list) catch {break, OPS} -> lists:reverse(OPS); @@ -164,6 +171,11 @@ get_ops(DB, Key, VCFrom, VCTo) -> [] end. +%% Returns true if any of the DCs are empty or if the VCs are concurrent +concurrent_or_empty_vc(VC1, VC2) -> + vectorclock:eq(VC1, vectorclock:new()) or vectorclock:eq(VC2, vectorclock:new()) + or not (vectorclock:ge(VC1, VC2) or vectorclock:le(VC1, VC2)). + %% Saves the operation into AntidoteDB -spec put_op(eleveldb:db_ref(), key(), vectorclock(), #log_record{}) -> ok | error. put_op(DB, Key, VC, Record) -> @@ -431,6 +443,19 @@ length_of_vc_test() -> ?assertEqual([3, 2, 1, 4], filter_records_into_numbers(O4)) end). +%% This test inserts 5 ops, 4 of them are concurrent, and checks that only the first and two of the concurrent are +%% returned, since they are the only ones that match the requested ranged passed in +concurrent_ops_test() -> + withFreshDb(fun(DB) -> + ok = put_op(DB, d, [{dc1, 10}, {dc2, 14}, {dc3, 3}], #log_record{version = 1}), + ok = put_op(DB, d, [{dc1, 9}, {dc2, 12}, {dc3, 1}], #log_record{version = 2}), + ok = put_op(DB, d, [{dc1, 7}, {dc2, 2}, {dc3, 12}], #log_record{version = 3}), + ok = put_op(DB, d, [{dc1, 5}, {dc2, 2}, {dc3, 10}], #log_record{version = 4}), + ok = put_op(DB, d, [{dc1, 1}, {dc2, 1}, {dc3, 1}], #log_record{version = 5}), + OPS = get_ops(DB, d, [{dc1, 10}, {dc2, 17}, {dc3, 2}], [{dc1, 12}, {dc2, 20}, {dc3, 18}]), + ?assertEqual([1, 3, 4], filter_records_into_numbers(OPS)) + end). + put_n_snapshots(_DB, _Key, 0) -> ok; put_n_snapshots(DB, Key, N) -> From f965f2b34e07b4b0291d1887e81ffb95ce85e527 Mon Sep 17 00:00:00 2001 From: santialvarezcolombo Date: Fri, 30 Sep 2016 15:17:49 -0300 Subject: [PATCH 23/23] Fixed begening key to start looking in fold --- src/leveldb_wrapper.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/leveldb_wrapper.erl b/src/leveldb_wrapper.erl index 798339f..d6bfc3e 100644 --- a/src/leveldb_wrapper.erl +++ b/src/leveldb_wrapper.erl @@ -64,7 +64,7 @@ get_ops_applicable_to_snapshot(DB, Key, VectorClock) -> end end, [], - [{first_key, term_to_binary({Key})}]), + [{first_key, term_to_binary({Key, vectorclock_to_sorted_list(VectorClock)})}]), %% If the fold returned without throwing a break (it iterated all %% keys and ended up normally) reverse the resulting list {ok, not_found, lists:reverse(Res)} @@ -100,7 +100,7 @@ get_snapshot(DB, Key, CommitTime) -> end end, [], - [{first_key, term_to_binary({Key})}]), + [{first_key, term_to_binary({Key, vectorclock_to_sorted_list(CommitTime)})}]), {error, not_found} catch {break, SNAP} -> @@ -160,7 +160,7 @@ get_ops(DB, Key, VCFrom, VCTo) -> end end, #ops_record{ops_list = [], last_vc = vectorclock:new()}, - [{first_key, term_to_binary({Key})}]), + [{first_key, term_to_binary({Key, vectorclock_to_sorted_list(VCTo)})}]), %% If the fold returned without throwing a break (it iterated all %% keys and ended up normally) reverse the resulting list lists:reverse(Res#ops_record.ops_list)