Skip to content
This repository was archived by the owner on Sep 28, 2021. It is now read-only.

Commit 88d7a00

Browse files
committed
Add broadcast backend for empty heartbeat messages.
This backend uses an ETS table for storing all of the received messages, which are immutable, that are used to perform heartbeating and tree repair for the plumtree backend.
1 parent 02034af commit 88d7a00

File tree

3 files changed

+171
-1
lines changed

3 files changed

+171
-1
lines changed

config/sys.config

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[{lasp, [{storage_backend, lasp_ets_storage_backend}]},
22
{plumtree, [{broadcast_exchange_timer, 60000},
3-
{broadcast_mods, [lasp_default_broadcast_distribution_backend]}]}].
3+
{broadcast_mods, [lasp_plumtree_backend]}]}].

src/lasp_plumtree_backend.erl

+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
2+
-module(lasp_plumtree_backend).
3+
-author("Christopher S. Meiklejohn <[email protected]>").
4+
-behaviour(plumtree_broadcast_handler).
5+
6+
%% plumtree_broadcast_handler callbacks
7+
-export([broadcast_data/1,
8+
merge/2,
9+
is_stale/1,
10+
graft/1,
11+
exchange/1]).
12+
13+
%% gen_server callbacks
14+
-export([init/1,
15+
handle_call/3,
16+
handle_cast/2,
17+
handle_info/2,
18+
terminate/2,
19+
code_change/3]).
20+
21+
%% State record.
22+
-record(state, {}).
23+
24+
%% Broadcast record.
25+
-record(broadcast, {timestamp}).
26+
27+
%%%===================================================================
28+
%%% plumtree_broadcast_handler callbacks
29+
%%%===================================================================
30+
31+
-type timestamp() :: non_neg_integer().
32+
33+
-type broadcast_message() :: #broadcast{}.
34+
-type broadcast_id() :: timestamp().
35+
-type broadcast_payload() :: timestamp().
36+
37+
%% @doc Returns from the broadcast message the identifier and the payload.
38+
-spec broadcast_data(broadcast_message()) ->
39+
{broadcast_id(), broadcast_payload()}.
40+
broadcast_data(#broadcast{timestamp=Timestamp}) ->
41+
{Timestamp, Timestamp}.
42+
43+
%% @doc Perform a merge of an incoming object with an object in the
44+
%% local datastore.
45+
-spec merge(broadcast_id(), broadcast_payload()) -> boolean().
46+
merge(Timestamp, Timestamp) ->
47+
case is_stale(Timestamp) of
48+
true ->
49+
false;
50+
false ->
51+
gen_server:call(?MODULE, {merge, Timestamp}, infinity),
52+
true
53+
end.
54+
55+
%% @doc Use the clock on the object to determine if this message is
56+
%% stale or not.
57+
-spec is_stale(broadcast_id()) -> boolean().
58+
is_stale(Timestamp) ->
59+
gen_server:call(?MODULE, {is_stale, Timestamp}, infinity).
60+
61+
%% @doc Given a message identifier and a clock, return a given message.
62+
-spec graft(broadcast_id()) ->
63+
stale | {ok, broadcast_payload()} | {error, term()}.
64+
graft(Timestamp) ->
65+
gen_server:call(?MODULE, {graft, Timestamp}, infinity).
66+
67+
%% @doc Anti-entropy mechanism.
68+
-spec exchange(node()) -> {ok, pid()}.
69+
exchange(_Peer) ->
70+
%% Ignore the standard anti-entropy mechanism from plumtree.
71+
%%
72+
%% Spawn a process that terminates immediately, because the
73+
%% broadcast exchange timer tracks the number of in progress
74+
%% exchanges and bounds it by that limit.
75+
%%
76+
Pid = spawn_link(fun() -> ok end),
77+
{ok, Pid}.
78+
79+
%%%===================================================================
80+
%%% gen_server callbacks
81+
%%%===================================================================
82+
83+
%% @private
84+
-spec init([]) -> {ok, #state{}}.
85+
init([]) ->
86+
%% Seed the process at initialization.
87+
rand_compat:seed(erlang:phash2([node()]),
88+
erlang:monotonic_time(),
89+
erlang:unique_integer()),
90+
91+
schedule_heartbeat(),
92+
93+
%% Open an ETS table for tracking heartbeat messages.
94+
ets:new(?MODULE, [named_table]),
95+
96+
{ok, #state{}}.
97+
98+
%% @private
99+
-spec handle_call(term(), {pid(), term()}, #state{}) ->
100+
{reply, term(), #state{}}.
101+
102+
%% @private
103+
handle_call({is_stale, Timestamp}, _From, State) ->
104+
Result = case ets:lookup(?MODULE, Timestamp) of
105+
[] ->
106+
false;
107+
_ ->
108+
true
109+
end,
110+
{reply, Result, State};
111+
handle_call({graft, Timestamp}, _From, State) ->
112+
Result = case ets:lookup(?MODULE, Timestamp) of
113+
[] ->
114+
{error, {not_found, Timestamp}};
115+
[Object] ->
116+
{ok, Object}
117+
end,
118+
{reply, Result, State};
119+
handle_call({merge, Timestamp}, _From, State) ->
120+
ok = ets:insert(?MODULE, [{Timestamp, true}]),
121+
{reply, ok, State};
122+
handle_call(Msg, _From, State) ->
123+
_ = lager:warning("Unhandled messages: ~p", [Msg]),
124+
{reply, ok, State}.
125+
126+
-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
127+
%% @private
128+
handle_cast(Msg, State) ->
129+
_ = lager:warning("Unhandled messages: ~p", [Msg]),
130+
{noreply, State}.
131+
132+
%% @private
133+
handle_info(heartbeat, State) ->
134+
%% Send message with monotonically increasing integer.
135+
Timestamp = time_compat:unique_integer([monotonic, positive]),
136+
ok = plumtree_broadcast:broadcast(Timestamp, ?MODULE),
137+
138+
%% Schedule report.
139+
schedule_heartbeat(),
140+
141+
{noreply, State};
142+
handle_info(Msg, State) ->
143+
_ = lager:warning("Unhandled messages: ~p", [Msg]),
144+
{noreply, State}.
145+
146+
%% @private
147+
-spec terminate(term(), #state{}) -> term().
148+
terminate(_Reason, _State) ->
149+
ok.
150+
151+
%% @private
152+
-spec code_change(term() | {down, term()}, #state{}, term()) ->
153+
{ok, #state{}}.
154+
code_change(_OldVsn, State, _Extra) ->
155+
{ok, State}.
156+
157+
%%%===================================================================
158+
%%% Internal functions
159+
%%%===================================================================
160+
161+
%% @private
162+
schedule_heartbeat() ->
163+
Interval = lasp_config:get(heartbeat_interval, 10000),
164+
timer:send_after(Interval, heartbeat).

src/lasp_sup.erl

+6
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ init(_Args) ->
9494
permanent, 5000, worker,
9595
[lasp_default_broadcast_distribution_backend]},
9696

97+
PlumtreeBackend = {lasp_plumtree_backend,
98+
{lasp_plumtree_backend, start_link, []},
99+
permanent, 5000, worker,
100+
[lasp_plumtree_backend]},
101+
97102
Plumtree = {plumtree_sup,
98103
{plumtree_sup, start_link, []},
99104
permanent, infinity, supervisor, [plumtree_sup]},
@@ -113,6 +118,7 @@ init(_Args) ->
113118
BaseSpecs0 = [Unique,
114119
BroadcastBuffer,
115120
Partisan,
121+
PlumtreeBackend,
116122
DistributionBackend,
117123
Plumtree,
118124
Sprinter,

0 commit comments

Comments
 (0)