diff --git a/.gitignore b/.gitignore
index b89b9a2..44ac138 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,7 @@
deps
*.o
*.beam
-*.plt
\ No newline at end of file
+*.plt
+_build
+.rebar3
+*.[eh]rl~
\ No newline at end of file
diff --git a/examples/README.md b/examples/gdict/README.md
similarity index 100%
rename from examples/README.md
rename to examples/gdict/README.md
diff --git a/examples/doc/README.md b/examples/gdict/doc/README.md
similarity index 100%
rename from examples/doc/README.md
rename to examples/gdict/doc/README.md
diff --git a/examples/doc/edoc-info b/examples/gdict/doc/edoc-info
similarity index 100%
rename from examples/doc/edoc-info
rename to examples/gdict/doc/edoc-info
diff --git a/examples/doc/erlang.png b/examples/gdict/doc/erlang.png
similarity index 100%
rename from examples/doc/erlang.png
rename to examples/gdict/doc/erlang.png
diff --git a/examples/doc/gdict.md b/examples/gdict/doc/gdict.md
similarity index 100%
rename from examples/doc/gdict.md
rename to examples/gdict/doc/gdict.md
diff --git a/examples/doc/stylesheet.css b/examples/gdict/doc/stylesheet.css
similarity index 100%
rename from examples/doc/stylesheet.css
rename to examples/gdict/doc/stylesheet.css
diff --git a/examples/doc/test_cb.md b/examples/gdict/doc/test_cb.md
similarity index 100%
rename from examples/doc/test_cb.md
rename to examples/gdict/doc/test_cb.md
diff --git a/examples/rebar.config b/examples/gdict/rebar.config
similarity index 100%
rename from examples/rebar.config
rename to examples/gdict/rebar.config
diff --git a/examples/src/bench.erl b/examples/gdict/src/bench.erl
similarity index 100%
rename from examples/src/bench.erl
rename to examples/gdict/src/bench.erl
diff --git a/examples/gdict/src/gdict.app.src b/examples/gdict/src/gdict.app.src
new file mode 100644
index 0000000..a60bbe3
--- /dev/null
+++ b/examples/gdict/src/gdict.app.src
@@ -0,0 +1,13 @@
+{application, gdict,
+ [
+ {description, ""},
+ {vsn, "1"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib,
+ locks
+ ]},
+ {mod, []},
+ {env, []}
+ ]}.
diff --git a/examples/src/examples.app.src b/examples/gdict/src/gdict.app.src~
similarity index 100%
rename from examples/src/examples.app.src
rename to examples/gdict/src/gdict.app.src~
diff --git a/examples/src/gdict.erl b/examples/gdict/src/gdict.erl
similarity index 61%
rename from examples/src/gdict.erl
rename to examples/gdict/src/gdict.erl
index ae11050..0b7a892 100644
--- a/examples/src/gdict.erl
+++ b/examples/gdict/src/gdict.erl
@@ -64,13 +64,13 @@ trace(F) when is_function(F, 0) ->
F().
new() ->
- locks_leader:start_link(test_cb, dict:new()).
+ locks_leader:start_link(test_cb, #{}).
new(Name) ->
- locks_leader:start_link(Name, test_cb, dict:new(), []).
+ locks_leader:start_link(Name, test_cb, #{}, []).
new_opt(Opts) ->
- locks_leader:start_link(test_cb, dict:new(), Opts).
+ locks_leader:start_link(test_cb, #{}, Opts).
-define(store(Dict,Expr,Legend),
locks_leader:leader_call(Dict, {store, fun(D) ->
@@ -84,27 +84,38 @@ new_opt(Opts) ->
%% dict functions that modify state:
append(Key, Value, Dict) ->
- ?store(Dict, dict:append(Key,Value,D), append).
+ ?store(Dict, maps_append(Key,Value,D), append).
append_list(Key, ValList, Dict) ->
- ?store(Dict, dict:append_list(Key,ValList,D), append_list).
+ ?store(Dict, maps_append_list(Key,ValList,D), append_list).
erase(Key, Dict) ->
- ?store(Dict, dict:erase(Key,D), erase).
+ ?store(Dict, maps:remove(Key,D), erase).
store(Key, Value, Dict) ->
- ?store(Dict, dict:store(Key,Value,D), store).
+ ?store(Dict, maps:put(Key,Value,D), store).
update(Key,Function,Dict) ->
- ?store(Dict, dict:update(Key,Function,D), update).
+ ?store(Dict, maps:update_with(Key,Function,D), update).
update(Key, Function, Initial, Dict) ->
- ?store(Dict, dict:update(Key,Function,Initial,D), update).
+ ?store(Dict, maps:update_with(Key,Function,Initial,D), update).
update_counter(Key, Incr, Dict) ->
- ?store(Dict, dict:update_counter(Key,Incr,D), update_counter).
+ ?store(Dict, maps_update_counter(Key,Incr,D), update_counter).
%% dict functions that do not modify state (lookup functions)
%%
-fetch(Key, Dict) -> ?lookup(Dict, dict:fetch(Key,D), fetch).
-fetch_keys(Dict) -> ?lookup(Dict, dict:fetch_keys(D), fetch_keys).
-filter(Pred, Dict) -> ?lookup(Dict, dict:filter(Pred,D), filter).
-find(Key, Dict) -> ?lookup(Dict, dict:find(Key,D), find).
-fold(Fun, Acc0, Dict) -> ?lookup(Dict, dict:fold(Fun,Acc0,D), fold).
-is_key(Key, Dict) -> ?lookup(Dict, dict:is_key(Key,D), is_key).
-map(Fun, Dict) -> ?lookup(Dict, dict:map(Fun,D), map).
-to_list(Dict) -> ?lookup(Dict, dict:to_list(D), to_list).
+fetch(Key, Dict) -> ?lookup(Dict, maps:get(Key,D), fetch).
+fetch_keys(Dict) -> ?lookup(Dict, maps:keys(D), fetch_keys).
+filter(Pred, Dict) -> ?lookup(Dict, maps:filter(Pred,D), filter).
+find(Key, Dict) -> ?lookup(Dict, maps:find(Key,D), find).
+fold(Fun, Acc0, Dict) -> ?lookup(Dict, maps:fold(Fun,Acc0,D), fold).
+is_key(Key, Dict) -> ?lookup(Dict, maps:is_key(Key,D), is_key).
+map(Fun, Dict) -> ?lookup(Dict, maps:map(Fun,D), map).
+to_list(Dict) -> ?lookup(Dict, maps:to_list(D), to_list).
+
+maps_append(Key, Value, D) ->
+ maps_append_list(Key, [Value], D).
+
+maps_append_list(Key, ValList, D) ->
+ L = maps:get(Key, D, []),
+ D#{Key => L ++ ValList}.
+
+maps_update_counter(Key, Incr, D) ->
+ Old = maps:get(Key, D, 0),
+ D#{Key => Old + Incr}.
diff --git a/examples/src/test_cb.erl b/examples/gdict/src/test_cb.erl
similarity index 99%
rename from examples/src/test_cb.erl
rename to examples/gdict/src/test_cb.erl
index 0d3fda2..7ee53f8 100644
--- a/examples/src/test_cb.erl
+++ b/examples/gdict/src/test_cb.erl
@@ -149,7 +149,7 @@ merge_dicts(D, I) ->
lists:foldl(
fun({C, {true, D2}}, Acc) ->
?event({merge_got, C, D2}),
- dict:merge(fun(_K,V1,_) -> V1 end, Acc, D2);
+ maps:merge(Acc, D2); %% our content takes priority
({C, false}, Acc) ->
?event({merge_got, C, false}),
Acc
diff --git a/include/locks.hrl b/include/locks.hrl
index a3f736c..85d05b6 100644
--- a/include/locks.hrl
+++ b/include/locks.hrl
@@ -19,7 +19,7 @@
-type agent() :: pid().
-type tid() :: any().
--type lock_id() :: {oid(), node()}.
+-type lock_id() :: {oid() | '_', node()}.
-type obj() :: {oid(), mode()}
| {oid(), mode(), where()}
@@ -27,10 +27,12 @@
-type objs() :: [obj()].
--type options() :: [{link, boolean()}
- | {client, pid()}
- | {abort_on_error, boolean()}
- | {abort_on_deadlock, boolean()}].
+-type options() :: [option()].
+-type option() :: {link, boolean()}
+ | {client, pid()}
+ | {abort_on_error, boolean()}
+ | {await_nodes, boolean()}
+ | {abort_on_deadlock, boolean()}.
-type deadlocks() :: [lock_id()].
@@ -60,7 +62,7 @@
version = 1 :: integer() | '_',
pid = self() :: pid() | '_',
queue = [] :: [#r{} | #w{}] | '_',
- watchers = [] :: [pid()]
+ watchers = [] :: [pid()] | '_'
}).
-record(locks_info, {
diff --git a/rebar.config b/rebar.config
index 9137aa0..5f66135 100644
--- a/rebar.config
+++ b/rebar.config
@@ -25,8 +25,7 @@
{app_default, "http://www.erlang.org/doc/man"},
{doc_path, []},
{top_level_readme,
- {"./README.md","https://github.com/uwiger/locks"}}]}]}
-
+ {"./README.md","https://github.com/uwiger/locks"}}]}
+ ]},
+ {test, [{project_app_dirs, [".", "examples/gdict"]}]}
]}.
-
-{ct_opts, [{dir, ["test", "examples/src"]}]}.
diff --git a/rebar.config.script b/rebar.config.script
new file mode 100644
index 0000000..d283bc6
--- /dev/null
+++ b/rebar.config.script
@@ -0,0 +1,8 @@
+%% -*- mode:erlang; erlang-indent-level:4; indent-tabs-mode:nil -*-
+case os:getenv("LOCKS_DEBUG") of
+ false ->
+ CONFIG;
+ S when S =/= "false" ->
+ io:fwrite("Enabling runtime debugging~n", []),
+ [{erl_opts, [{d,'LOCKS_DEBUG'}]} | CONFIG]
+end.
diff --git a/rebar.lock b/rebar.lock
new file mode 100644
index 0000000..cbc55d9
--- /dev/null
+++ b/rebar.lock
@@ -0,0 +1,6 @@
+{"1.1.0",
+[{<<"plain_fsm">>,{pkg,<<"plain_fsm">>,<<"1.4.1">>},0}]}.
+[
+{pkg_hash,[
+ {<<"plain_fsm">>, <<"47E9BF6AC9322FC7586FB6DF8DE7198391E93764571C75165F2C45B27ACDE1D0">>}]}
+].
diff --git a/src/locks.erl b/src/locks.erl
index 212935d..b2275ae 100644
--- a/src/locks.erl
+++ b/src/locks.erl
@@ -39,6 +39,7 @@
lock_nowait/4,
lock_objects/2, %% (Agent, Objects)
await_all_locks/1, %% (Agent)
+ transaction_status/1, %% (Agent)
watch/2, %% (OID, Nodes)
unwatch/2, %% (OID, Nodes)
watchers/1, %% (OID)
@@ -191,6 +192,24 @@ lock_objects(Agent, Objects) ->
await_all_locks(Agent) ->
locks_agent:await_all_locks(Agent).
+%% @doc Inquire about the current status of the transaction.
+%% Return values:
+%%
+%% - `no_locks'
+%% - No locks have been requested
+%% - `{have_all_locks, Deadlocks}'
+%% - All requested locks have been claimed, `Deadlocks' indicates whether
+%% any deadlocks were resolved in the process.
+%% - `waiting'
+%% - Still waiting for some locks.
+%% - `{cannot_serve, Objs}'
+%% - Some lock requests cannot be served, e.g. because some nodes are
+%% unavailable.
+%%
+%% @end
+transaction_status(Agent) ->
+ locks_agent:transaction_status(Agent).
+
-spec watch(oid(), [node()]) -> ok.
%% @doc Subscribe to lock state changes.
%%
diff --git a/src/locks_agent.erl b/src/locks_agent.erl
index 3bb8e4a..0f1fc5b 100755
--- a/src/locks_agent.erl
+++ b/src/locks_agent.erl
@@ -1,7 +1,7 @@
%% -*- mode: erlang; indent-tabs-mode: nil; -*-
%%---- BEGIN COPYRIGHT -------------------------------------------------------
%%
-%% Copyright (C) 2013 Ulf Wiger. All rights reserved.
+%% Copyright (C) 2013-20 Ulf Wiger. All rights reserved.
%%
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -20,20 +20,8 @@
%% The agent runs as a separate process from the client, and monitors the
%% client, terminating immediately if it dies.
%% @end
+
-module(locks_agent).
-%% -*- mode: erlang; indent-tabs-mode: nil; -*-
-%%---- BEGIN COPYRIGHT -------------------------------------------------------
-%%
-%% Copyright (C) 2013 Ulf Wiger. All rights reserved.
-%%
-%% This Source Code Form is subject to the terms of the Mozilla Public
-%% License, v. 2.0. If a copy of the MPL was not distributed with this
-%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
-%%
-%%---- END COPYRIGHT ---------------------------------------------------------
-%% Key contributor: Thomas Arts
-%%
-%%=============================================================================
-behaviour(gen_server).
-compile({parse_transform, locks_watcher}).
@@ -70,16 +58,7 @@
-import(lists,[foreach/2,any/2,map/2,member/2]).
--include("locks.hrl").
-
--define(costly_event(E),
- case erlang:trace_info({?MODULE,event,3}, traced) of
- {_, false} -> ok;
- _ -> event(?LINE, E, none)
- end).
-
--define(event(E), event(?LINE, E, none)).
--define(event(E, S), event(?LINE, E, S)).
+-include("locks_agent.hrl").
-ifdef(DEBUG).
-define(dbg(Fmt, Args), io:fwrite(user, Fmt, Args)).
@@ -87,48 +66,6 @@
-define(dbg(F, A), ok).
-endif.
--type option() :: {client, pid()}
- | {abort_on_deadlock, boolean()}
- | {await_nodes, boolean()}
- | {notify, boolean()}.
-
--type transaction_status() :: no_locks
- | {have_all_locks, list()}
- | waiting
- | {cannot_serve, list()}.
-
--record(req, {object,
- mode,
- nodes,
- claim_no = 0,
- require = all}).
-
--type monitored_nodes() :: [{node(), reference()}].
--type down_nodes() :: [node()].
-
--record(state, {
- locks :: ets:tab(),
- agents :: ets:tab(),
- interesting = [] :: [lock_id()],
- claim_no = 0 :: integer(),
- requests :: ets:tab(),
- down = [] :: down_nodes(),
- monitored = [] :: monitored_nodes(),
- await_nodes = false :: boolean(),
- monitor_nodes = false :: boolean(),
- pending :: ets:tab(),
- sync = [] :: [#lock{}],
- client :: pid(),
- client_mref :: reference(),
- options = [] :: [option()],
- notify = [] :: [pid()],
- awaiting_all = []:: [{pid(), reference()}],
- answer :: locking | waiting | done,
- deadlocks = [] :: deadlocks(),
- have_all = false :: boolean(),
- status = no_locks :: transaction_status()
- }).
-
-define(myclient,State#state.client).
record_fields(state ) -> record_info(fields, state);
@@ -243,10 +180,10 @@ agent_init(Wait, Client, Options) ->
{reason, Error},
{trace, erlang:get_stacktrace()}]),
error(Error)
- end;
- Other ->
- ack(Wait, Client, {error, Other}),
- error(Other)
+ end
+ %% Other ->
+ %% ack(Wait, Client, {error, Other}),
+ %% error(Other)
end.
await_pid(false, Pid) ->
@@ -269,11 +206,21 @@ ack(true, Pid, Reply) ->
loop(#state{status = OldStatus} = St) ->
receive Msg ->
St1 = handle_msg(Msg, St),
- loop(maybe_status_change(OldStatus, St1))
+ St2 = next_msg(St1),
+ loop(maybe_status_change(OldStatus, St2))
+ end.
+
+next_msg(St) ->
+ receive
+ Msg ->
+ St1 = handle_msg(Msg, St),
+ next_msg(St1)
+ after 0 ->
+ maybe_check_if_done(St)
end.
handle_msg({'$gen_call', From, Req}, St) ->
- case handle_call(Req, From, St) of
+ case handle_call(Req, From, maybe_check_if_done(St)) of
{reply, Reply, St1} ->
gen_server:reply(From, Reply),
St1;
@@ -281,9 +228,9 @@ handle_msg({'$gen_call', From, Req}, St) ->
St1;
{stop, Reason, Reply, _} ->
gen_server:reply(From, Reply),
- exit(Reason);
- {stop, Reason, _} ->
exit(Reason)
+ %% {stop, Reason, _} ->
+ %% exit(Reason)
end;
handle_msg({'$gen_cast', Msg}, St) ->
case handle_cast(Msg, St) of
@@ -566,7 +513,7 @@ handle_cast({surrender, O, ToAgent, Nodes} = _Req, S) ->
fun(N) ->
#lock{queue = Q} = L = get_lock({O,N}, S),
lists:member(self(), lock_holders(L))
- andalso in_tail(ToAgent, tl(Q))
+ andalso locks_agent_lib:in_tail(ToAgent, tl(Q))
end, Nodes) of
true ->
NewS = lists:foldl(
@@ -611,57 +558,8 @@ list_store(E, L) ->
[E|L]
end.
-matching_request(Object, Mode, Nodes, Require,
- #state{requests = Requests,
- pending = Pending} = S) ->
- case any_matching_request_(ets_lookup(Pending, Object), Pending,
- Object, Mode, Nodes, Require, S) of
- {false, S1} ->
- any_matching_request_(ets_lookup(Requests, Object), Requests,
- Object, Mode, Nodes, Require, S1);
- True -> True
- end.
-
-any_matching_request_([R|Rs], Tab, Object, Mode, Nodes, Require, S) ->
- case matching_request_(R, Tab, Object, Mode, Nodes, Require, S) of
- {false, S1} ->
- any_matching_request_(Rs, Tab, Object, Mode, Nodes, Require, S1);
- True -> True
- end;
-any_matching_request_([], _, _, _, _, _, S) ->
- {false, S}.
-
-matching_request_(Req, Tab, Object, Mode, Nodes, Require, S) ->
- case Req of
- #req{nodes = Nodes1, require = Require, mode = Mode} ->
- ?event({repeated_request, Object}),
- %% Repeated request
- case Nodes -- Nodes1 of
- [] ->
- {true, S};
- [_|_] = New ->
- {true, add_nodes(New, Req, Tab, S)}
- end;
- #req{nodes = Nodes, require = Require, mode = write}
- when Mode==read ->
- ?event({found_old_write_request, Object}),
- %% The old request is sufficient
- {true, S};
- #req{nodes = Nodes, require = Require, mode = read} when Mode==write ->
- ?event({need_upgrade, Object}),
- {false, remove_locks(Object, S)};
- #req{nodes = PrevNodes} ->
- %% Different conditions from last time
- Reason = {conflicting_request,
- [Object, Nodes, PrevNodes]},
- error(Reason)
- end.
-
-remove_locks(Object, #state{locks = Locks, agents = As,
- interesting = I} = S) ->
- ets:match_delete(Locks, #lock{object = {Object,'_'}, _ = '_'}),
- ets:match_delete(As, {{self(),{Object,'_'}}}),
- S#state{interesting = prune_interesting(I, object, Object)}.
+matching_request(Object, Mode, Nodes, Require, S) ->
+ locks_agent_lib:matching_request(Object, Mode, Nodes, Require, S).
new_request(Object, Mode, Nodes, Require, #state{pending = Pending,
claim_no = Cl} = State) ->
@@ -678,41 +576,23 @@ new_request(Object, Mode, Nodes, Require, #state{pending = Pending,
OID, Mode, ensure_monitor(Node, Sx))
end, State, Nodes).
-
-
-add_nodes(Nodes, #req{object = Object, mode = Mode, nodes = OldNodes} = Req,
- Tab, #state{pending = Pending} = State) ->
- AllNodes = union(Nodes, OldNodes),
- Req1 = Req#req{nodes = AllNodes},
- %% replace request
- ets_delete_object(Tab, Req),
- ets_insert(Pending, Req1),
- lists:foldl(
- fun(Node, Sx) ->
- OID = {Object, Node},
- request_lock(
- OID, Mode, ensure_monitor(Node, Sx))
- end, State, Nodes).
-
-union(A, B) ->
- A ++ (B -- A).
-
%% @private
handle_info(#locks_info{lock = Lock0, where = Node, note = Note} = I, S0) ->
?event({handle_info, I}, S0),
LockID = {Lock0#lock.object, Node},
+ V = Lock0#lock.version,
Lock = Lock0#lock{object = LockID},
Prev = ets_lookup(S0#state.locks, LockID),
- State = check_note(Note, LockID, S0),
- case outdated(Prev, Lock) orelse waiting_for_ack(State, LockID) of
+ State = gc_sync(LockID, V, check_note(Note, LockID, S0)),
+ case outdated(Prev, Lock) of
true ->
- ?event({outdated_or_waiting_for_ack, [LockID]}),
+ ?event({outdated, [LockID]}),
{noreply,State};
false ->
NewState = i_add_lock(State, Lock, Prev),
{noreply, handle_locks(check_if_done(NewState, [I]))}
end;
-handle_info({surrendered, A, OID} = _Msg, State) ->
+handle_info({surrendered, A, OID, _V} = _Msg, State) ->
?event(_Msg),
{noreply, note_deadlock(A, OID, State)};
handle_info({'DOWN', Ref, _, _, _Rsn}, #state{client_mref = Ref} = State) ->
@@ -796,22 +676,8 @@ maybe_status_change(OldStatus, #state{status = Status} = S) ->
?event({status_change, OldStatus, Status}),
notify(Status, S).
-requests_for_obj_can_be_served(Obj, #state{pending = Pending} = S) ->
- lists:all(
- fun(R) ->
- request_can_be_served(R, S)
- end, ets:lookup(Pending, Obj)).
-
-request_can_be_served(_, #state{await_nodes = true}) ->
- true;
-request_can_be_served(#req{nodes = Ns, require = R}, #state{down = Down}) ->
- case R of
- all -> intersection(Down, Ns) == [];
- any -> Ns -- Down =/= [];
- majority -> length(Ns -- Down) > (length(Ns) div 2);
- majority_alive -> true;
- all_alive -> true
- end.
+request_can_be_served(R, S) ->
+ locks_agent_lib:request_can_be_served(R, S).
handle_nodedown(Node, #state{down = Down, requests = Reqs,
pending = Pending,
@@ -870,11 +736,8 @@ handle_monitor_on_down(Node, #state{monitor_nodes = true,
end,
ok.
-
-prune_interesting(I, node, Node) ->
- [OID || {_, N} = OID <- I, N =/= Node];
-prune_interesting(I, object, Object) ->
- [OID || {O, _} = OID <- I, O =/= Object].
+prune_interesting(I, Type, Node) ->
+ locks_agent_lib:prune_interesting(I, Type, Node).
watch_node(N) ->
{M, F, A} =
@@ -882,15 +745,18 @@ watch_node(N) ->
P = spawn(N, M, F, A),
erlang:monitor(process, P).
-check_note([], _, State) ->
+check_note({surrender, A, V}, LockID, State) when A == self() ->
+ ?event({surrender_ack, LockID, V}),
State;
-check_note({surrender, A}, LockID, State) when A == self() ->
- ?event({surrender_ack, LockID}),
- Sync = [L || #lock{object = Ox} = L <- State#state.sync,
- Ox =/= LockID],
- State#state{sync = Sync};
-check_note({surrender, A}, LockID, State) ->
- note_deadlock(A, LockID, State).
+check_note({surrender, A, _V}, LockID, State) ->
+ note_deadlock(A, LockID, State);
+check_note(_, _, State) ->
+ State.
+
+gc_sync(LockID, V, #state{sync = Sync} = State) ->
+ Sync1 = [L || #lock{object = Ox, version = Vx} = L <- Sync,
+ Ox =/= LockID, Vx =< V],
+ State#state{sync = Sync1}.
note_deadlock(A, LockID, #state{deadlocks = Deadlocks} = State) ->
Entry = {A, LockID},
@@ -901,39 +767,35 @@ note_deadlock(A, LockID, #state{deadlocks = Deadlocks} = State) ->
State
end.
-intersection(A, B) ->
- A -- (A -- B).
+ensure_monitor(Node, S) ->
+ locks_agent_lib:ensure_monitor(Node, S).
-ensure_monitor(Node, S) when Node == node() ->
- S;
-ensure_monitor(Node, #state{monitored = Mon} = S) ->
- Mon1 = ensure_monitor_(?LOCKER, Node, Mon),
- S#state{monitored = Mon1}.
+request_lock(LockId, Mode, S) ->
+ locks_agent_lib:request_lock(LockId, Mode, S).
-ensure_monitor_(Locker, Node, Mon) ->
- case orddict:is_key(Node, Mon) of
- true ->
- Mon;
- false ->
- Ref = erlang:monitor(process, {Locker, Node}),
- orddict:store(Node, Ref, Mon)
- end.
+request_surrender({OID, Node} = _LockID, Vsn, #state{}) ->
+ ?event({request_surrender, _LockID, Vsn}),
+ locks_server:surrender(OID, Node, Vsn).
-request_lock({OID, Node} = _LockID, Mode, #state{client = Client} = State) ->
- ?event({request_lock, _LockID}),
- P = {?LOCKER, Node},
- erlang:monitor(process, P),
- locks_server:lock(OID, [Node], Client, Mode),
- State.
+check_if_done(#state{check = {true,_}} = S) ->
+ S;
+check_if_done(#state{check = false} = S) ->
+ S#state{check = {true, []}}.
+
+check_if_done(#state{check = {true, Msgs}} = S, Msgs1) ->
+ S#state{check = {true, add_msgs(Msgs1, Msgs)}};
+check_if_done(#state{check = false} = S, Msgs) ->
+ S#state{check = {true, Msgs}}.
-request_surrender({OID, Node} = _LockID, #state{}) ->
- ?event({request_surrender, _LockID}),
- locks_server:surrender(OID, Node).
+add_msgs(Msgs, OldMsgs) ->
+ locks_agent_lib:add_msgs(Msgs, OldMsgs).
-check_if_done(S) ->
- check_if_done(S, []).
+maybe_check_if_done(#state{check = {true, Msgs}} = S) ->
+ maybe_handle_locks(do_check_if_done(S#state{check = false}, Msgs));
+maybe_check_if_done(S) ->
+ S.
-check_if_done(#state{} = State, Msgs) ->
+do_check_if_done(#state{} = State, Msgs) ->
Status = all_locks_status(State),
notify_msgs(Msgs, set_status(Status, State)).
@@ -948,8 +810,8 @@ notify_have_all(#state{awaiting_all = Aw, status = Status} = S) ->
[reply_await_(W, Status) || W <- Aw],
S#state{awaiting_all = []}.
-reply_await_({Pid, notify}, Status) ->
- notify_(Pid, Status);
+%% reply_await_({Pid, notify}, Status) ->
+%% notify_(Pid, Status);
reply_await_({Pid, async}, Status) ->
notify_(Pid, Status);
reply_await_(From, Status) ->
@@ -972,14 +834,21 @@ notify(Msg, #state{notify = Notify} = State) ->
notify_(P, Msg) ->
P ! {?MODULE, self(), Msg}.
+handle_locks(S) ->
+ S#state{handle_locks = true}.
+
+maybe_handle_locks(#state{handle_locks = true} = S) ->
+ do_handle_locks(S#state{handle_locks = false});
+maybe_handle_locks(S) ->
+ S.
-handle_locks(#state{have_all = true} = State) ->
+do_handle_locks(#state{have_all = true} = State) ->
%% If we have all locks we've asked for, no need to search for potential
%% deadlocks - reasonably, we cannot be involved in one.
State;
-handle_locks(#state{agents = As} = State) ->
+do_handle_locks(#state{agents = As} = State) ->
InvolvedAgents = involved_agents(As),
- case analyse(InvolvedAgents, State) of
+ case analyse(State) of
ok ->
%% possible indirect deadlocks
%% inefficient computation, optimizes easily
@@ -1006,52 +875,36 @@ handle_locks(#state{agents = As} = State) ->
do_surrender(ShouldSurrender, ToObject, InvolvedAgents, State)
end.
-do_surrender(ShouldSurrender, ToObject, InvolvedAgents,
- #state{deadlocks = Deadlocks} = State) ->
+do_surrender(ShouldSurrender, ToObject, InvolvedAgents, State) ->
OldLock = get_lock(ToObject, State),
State1 = delete_lock(OldLock, State),
- NewDeadlocks = [{ShouldSurrender, ToObject} | Deadlocks],
+ %% NewDeadlocks = [{ShouldSurrender, ToObject} | Deadlocks],
?costly_event({do_surrender,
[{should_surrender, ShouldSurrender},
{to_object, ToObject},
{involved_agents, lists:sort(InvolvedAgents)}]}),
+ State2 = note_deadlock(ShouldSurrender, ToObject, State1),
if ShouldSurrender == self() ->
- request_surrender(ToObject, State1),
+ request_surrender(ToObject, OldLock#lock.version, State1),
send_surrender_info(InvolvedAgents, OldLock),
Sync1 = [OldLock | State1#state.sync],
set_status(
waiting,
- State1#state{sync = Sync1,
- deadlocks = NewDeadlocks});
+ State2#state{sync = Sync1});
true ->
- State1#state{deadlocks = NewDeadlocks}
+ State2
end.
-
-send_indirects(#state{interesting = I, agents = As} = State) ->
- InvolvedAgents = involved_agents(As),
- Locks = [get_lock(OID, State) || OID <- I],
- [ send_lockinfo(Agent, L)
- || Agent <- compute_indirects(InvolvedAgents),
- #lock{queue = [_,_|_]} = L <- Locks,
- interesting(State, L, Agent) ].
-
+send_indirects(S) ->
+ locks_agent_lib:send_indirects(S).
involved_agents(As) ->
- involved_agents(ets:first(As), As).
-
-involved_agents({A,_}, As) ->
- [A | involved_agents(ets_next(As, {A,[]}), As)];
-involved_agents('$end_of_table', _) ->
- [].
+ locks_agent_lib:involved_agents(As).
-send_surrender_info(Agents, #lock{object = OID, queue = Q}) ->
- [ A ! {surrendered, self(), OID}
+send_surrender_info(Agents, #lock{object = OID, version = V, queue = Q}) ->
+ [ A ! {surrendered, self(), OID, V}
|| A <- Agents -- [E#entry.agent || E <- flatten_queue(Q)]].
-send_lockinfo(Agent, #lock{object = {OID, Node}} = L) ->
- Agent ! #locks_info{lock = L#lock{object = OID}, where = Node}.
-
object_claimed({Obj,_}, #state{claim_no = Cl, requests = Rs}) ->
Requests = ets:lookup(Rs, Obj),
any(fun(#req{claim_no = Cl1}) ->
@@ -1068,58 +921,8 @@ code_change(_FromVsn, State, _Extra) ->
%%%%%%%%%%%%%%%%%%% data manipulation %%%%%%%%%%%%%%%%%%%
--spec all_locks_status(#state{}) ->
- no_locks
- | waiting
- | {have_all_locks, deadlocks()}
- | {cannot_serve, list()}.
-%%
-all_locks_status(#state{pending = Pend, locks = Locks} = State) ->
- Status = all_locks_status_(State),
- ?costly_event({locks_diagnostics,
- [{pending, pp_pend(ets:tab2list(Pend))},
- {locks, pp_locks(ets:tab2list(Locks))}]}),
- ?event({all_locks_status, Status}),
- Status.
-
-pp_pend(Pend) ->
- [{O, M, Ns, Req} || #req{object = O, mode = M, nodes = Ns,
- require = Req} <- Pend].
-
-pp_locks(Locks) ->
- [{O,lock_holder(Q)} ||
- #lock{object = O, queue = Q} <- Locks].
-
-lock_holder([#w{entries = Es}|_]) ->
- [A || #entry{agent = A} <- Es];
-lock_holder([#r{entries = Es}|_]) ->
- [A || #entry{agent = A} <- Es].
-
-
-all_locks_status_(#state{locks = Locks, pending = Pend} = State) ->
- case ets_info(Locks, size) of
- 0 ->
- case ets_info(Pend, size) of
- 0 -> no_locks;
- _ ->
- waiting
- end;
- _ ->
- case waitingfor(State) of
- [] ->
- {have_all_locks, State#state.deadlocks};
- WF ->
- case [O || {O, _} <- WF,
- requests_for_obj_can_be_served(
- O, State) =:= false] of
- [] ->
- waiting;
- Os ->
- {cannot_serve, Os}
- end
- end
- end.
-
+all_locks_status(S) ->
+ locks_agent_lib:all_locks_status(S).
%% a lock is outdated if the version number is too old,
%% i.e. if one of the already received locks is newer
@@ -1129,69 +932,10 @@ outdated([], _) -> false;
outdated([#lock{version = V1}], #lock{version = V2}) ->
V1 >= V2.
--spec waiting_for_ack(#state{}, {_,_}) -> boolean().
-waiting_for_ack(#state{sync = Sync}, LockID) ->
- lists:member(LockID, Sync).
-
--spec waitingfor(#state{}) -> [lock_id()].
-waitingfor(#state{requests = Reqs,
- pending = Pending, down = Down} = S) ->
- %% HaveLocks = [{L#lock.object, l_mode(Q)} || #lock{queue = Q} = L <- Locks,
- %% in(self(), hd(Q))],
- {Served, PendingOIDs} =
- ets:foldl(
- fun(#req{object = OID, mode = M,
- require = majority, nodes = Ns} = R,
- {SAcc, PAcc}) ->
- NodesLocked = nodes_locked(OID, M, S),
- case length(NodesLocked) > (length(Ns) div 2) of
- false ->
- {SAcc, ordsets:add_element(OID, PAcc)};
- true ->
- {[R|SAcc], PAcc}
- end;
- (#req{object = OID, mode = M,
- require = majority_alive, nodes = Ns} = R,
- {SAcc, PAcc}) ->
- Alive = Ns -- Down,
- NodesLocked = nodes_locked(OID, M, S),
- case length(NodesLocked) > (length(Alive) div 2) of
- false ->
- {SAcc, ordsets:add_element(OID, PAcc)};
- true ->
- {[R|SAcc], PAcc}
- end;
- (#req{object = OID, mode = M,
- require = any, nodes = Ns} = R, {SAcc, PAcc}) ->
- case [N || N <- nodes_locked(OID, M, S),
- member(N, Ns)] of
- [_|_] -> {[R|SAcc], PAcc};
- [] ->
- {SAcc, ordsets:add_element(OID, PAcc)}
- end;
- (#req{object = OID, mode = M,
- require = all, nodes = Ns} = R, {SAcc, PAcc}) ->
- NodesLocked = nodes_locked(OID, M, S),
- case Ns -- NodesLocked of
- [] -> {[R|SAcc], PAcc};
- [_|_] ->
- {SAcc, ordsets:add_element(OID, PAcc)}
- end;
- (#req{object = OID, mode = M,
- require = all_alive, nodes = Ns} = R, {SAcc, PAcc}) ->
- Alive = Ns -- Down,
- NodesLocked = nodes_locked(OID, M, S),
- case Alive -- NodesLocked of
- [] -> {[R|SAcc], PAcc};
- [_|_] ->
- {SAcc, ordsets:add_element(OID, PAcc)}
- end;
- (_, Acc) ->
- Acc
- end, {[], ordsets:new()}, Pending),
- ?event([{served, Served}, {pending_oids, PendingOIDs}]),
- move_requests(Served, Pending, Reqs),
- PendingOIDs.
+%% -spec waiting_for_ack(#state{}, {_,_}, integer()) -> boolean().
+%% waiting_for_ack(#state{sync = Sync}, {OID, Node}, V) ->
+%% [] =/= [L || #lock{object = OIDx, where = Nx, version = Vx} = L <- Sync,
+%% OID =:= OIDx, Nx =:= Node, Vx >= V].
requests_with_node(Node, R) ->
ets:foldl(
@@ -1202,28 +946,8 @@ requests_with_node(Node, R) ->
end
end, [], R).
-move_requests([R|Rs], From, To) ->
- ets_delete_object(From, R),
- ets_insert(To, R),
- move_requests(Rs, From, To);
-move_requests([], _, _) ->
- ok.
-
-
-have_locks(Obj, #state{agents = As}) ->
- %% We need to use {element,1,{element,2,{element,1,'$_'}}} in the body,
- %% since Obj may not be a legal output pattern (e.g. [a, {x,1}]).
- ets_select(As, [{ {{self(),{Obj,'$1'}}}, [],
- [{{{element,1,
- {element,2,
- {element,1,'$_'}}},'$1'}}] }]).
-
-l_mode(#lock{queue = [#r{}|_]}) -> read;
-l_mode(#lock{queue = [#w{}|_]}) -> write.
-
-l_covers(read, write) -> true;
-l_covers(M , M ) -> true;
-l_covers(_ , _ ) -> false.
+move_requests(Rs, From, To) ->
+ locks_agent_lib:move_requests(Rs, From, To).
i_add_lock(#state{locks = Ls, sync = Sync} = State,
#lock{object = Obj} = Lock, Prev) ->
@@ -1233,16 +957,8 @@ i_add_lock(#state{locks = Ls, sync = Sync} = State,
Prev, Lock,
State#state{sync = lists:keydelete(Obj, #lock.object, Sync)}).
-store_lock_holders(Prev, #lock{object = Obj} = Lock,
- #state{agents = As}) ->
- PrevLockHolders = case Prev of
- [PrevLock] -> lock_holders(PrevLock);
- [] -> []
- end,
- LockHolders = lock_holders(Lock),
- [ets_delete(As, {A,Obj}) || A <- PrevLockHolders -- LockHolders],
- [ets_insert(As, {{A,Obj}}) || A <- LockHolders -- PrevLockHolders],
- ok.
+store_lock_holders(Prev, Lock, State) ->
+ locks_agent_lib:store_lock_holders(Prev, Lock, State).
delete_lock(#lock{object = OID, queue = Q} = L,
#state{locks = Ls, agents = As, interesting = I} = S) ->
@@ -1261,22 +977,10 @@ ets_new(T, Opts) -> ets:new(T, Opts).
ets_insert(T, Obj) -> ets:insert(T, Obj).
ets_lookup(T, K) -> ets:lookup(T, K).
ets_delete(T, K) -> ets:delete(T, K).
-ets_delete_object(T, O) -> ets:delete_object(T, O).
ets_match_delete(T, P) -> ets:match_delete(T, P).
-ets_select(T, P) -> ets:select(T, P).
-ets_next(T, K) -> ets:next(T, K).
-ets_info(T, I) -> ets:info(T, I).
-%% ets_select(T, P, L) -> ets:select(T, P, L).
-
-
-lock_holders(#lock{queue = [#r{entries = Es}|_]}) ->
- [A || #entry{agent = A} <- Es];
-lock_holders(#lock{queue = [#w{entries = [#entry{agent = A}]}|_]}) ->
- %% exclusive lock held
- [A];
-lock_holders(#lock{queue = [#w{entries = [_,_|_]}|_]}) ->
- %% contention for write lock; no-one actually holds the lock
- [].
+
+lock_holders(Lock) ->
+ locks_agent_lib:lock_holders(Lock).
log_interesting(Prev, #lock{object = OID, queue = Q},
#state{interesting = I} = S) ->
@@ -1297,41 +1001,6 @@ log_interesting(Prev, #lock{object = OID, queue = Q},
end
end.
-nodes_locked(OID, M, #state{} = S) ->
- [N || {_, N} = Obj <- have_locks(OID, S),
- l_covers(M, l_mode( get_lock(Obj,S) ))].
-
--spec compute_indirects([agent()]) -> [agent()].
-compute_indirects(InvolvedAgents) ->
- [ A || A<-InvolvedAgents, A>self()].
-%% here we impose a global
-%% ordering on pids !!
-%% Alternatively, we send to
-%% all pids
-
-%% -spec has_a_lock([#lock{}], pid()) -> boolean().
-%% has_a_lock(Locks, Agent) ->
-%% is_member(Agent, [hd(L#lock.queue) || L<-Locks]).
-has_a_lock(As, Agent) ->
- case ets_next(As, {Agent,0}) of % Lock IDs are {LockName,Node} tuples
- {Agent,_} -> true;
- _ -> false
- end.
-
-%% is this lock interesting for the agent?
-%%
--spec interesting(#state{}, #lock{}, agent()) -> boolean().
-interesting(#state{agents = As}, #lock{queue = Q}, Agent) ->
- (not is_member(Agent, Q)) andalso
- has_a_lock(As, Agent).
-
-is_member(A, [#r{entries = Es}|T]) ->
- lists:keymember(A, #entry.agent, Es) orelse is_member(A, T);
-is_member(A, [#w{entries = Es}|T]) ->
- lists:keymember(A, #entry.agent, Es) orelse is_member(A, T);
-is_member(_, []) ->
- false.
-
flatten_queue(Q) ->
flatten_queue(Q, []).
@@ -1347,74 +1016,15 @@ flatten_queue([], Acc) ->
%% uniq([_] = L) -> L;
%% uniq(L) -> ordsets:from_list(L).
+analyse(#state{interesting = I, locks = Ls}) ->
+ Locks = get_locks(I, Ls),
+ locks_agent_lib:analyse(Locks).
+
get_locks([H|T], Ls) ->
[L] = ets_lookup(Ls, H),
[L | get_locks(T, Ls)];
get_locks([], _) ->
[].
-get_lock(OID, #state{locks = Ls}) ->
- [L] = ets_lookup(Ls, OID),
- L.
-
-%% analyse computes whether a local deadlock can be detected,
-%% if not, 'ok' is returned, otherwise it returns the triple
-%% {deadlock,ToSurrender,ToObject} stating which agent should
-%% surrender which object.
-%%
-analyse(_Agents, #state{interesting = I, locks = Ls}) ->
- Locks = get_locks(I, Ls),
- Nodes =
- expand_agents([ {hd(L#lock.queue), L#lock.object} ||
- L <- Locks]),
- Connect =
- fun({A1, O1}, {A2, _}) when A1 =/= A2 ->
- lists:any(
- fun(#lock{object = Obj, queue = Queue}) ->
- (O1=:=Obj)
- andalso in(A1, hd(Queue))
- andalso in_tail(A2, tl(Queue))
- end, Locks);
- (_, _) ->
- false
- end,
- ?event({connect, Connect}),
- case locks_cycles:components(Nodes, Connect) of
- [] ->
- ok;
- [Comp|_] ->
- {ToSurrender, ToObject} = max_agent(Comp),
- {deadlock, ToSurrender, ToObject}
- end.
-
-expand_agents([{#w{entries = Es}, Id} | T]) ->
- expand_agents_(Es, Id, T);
-expand_agents([{#r{entries = Es}, Id} | T]) ->
- expand_agents_(Es, Id, T);
-expand_agents([]) ->
- [].
-
-expand_agents_([#entry{agent = A}|T], Id, Tail) ->
- [{A, Id}|expand_agents_(T, Id, Tail)];
-expand_agents_([], _, Tail) ->
- expand_agents(Tail). % return to higher-level recursion
-
-in(A, #r{entries = Entries}) ->
- lists:keymember(A, #entry.agent, Entries);
-in(A, #w{entries = Entries}) ->
- lists:keymember(A, #entry.agent, Entries).
-
-in_tail(A, Tail) ->
- lists:any(fun(X) ->
- in(A, X)
- end, Tail).
-
-max_agent([{A, O}]) ->
- {A, O};
-max_agent([{A1, O1}, {A2, _O2} | Rest]) when A1 > A2 ->
- max_agent([{A1, O1} | Rest]);
-max_agent([{_A1, _O1}, {A2, O2} | Rest]) ->
- max_agent([{A2, O2} | Rest]).
-
-event(_Line, _Event, _State) ->
- ok.
+get_lock(OID, S) ->
+ locks_agent_lib:get_lock(OID, S).
diff --git a/src/locks_agent.hrl b/src/locks_agent.hrl
new file mode 100644
index 0000000..1024a6b
--- /dev/null
+++ b/src/locks_agent.hrl
@@ -0,0 +1,52 @@
+-include("locks.hrl").
+
+-type transaction_status() :: no_locks
+ | {have_all_locks, list()}
+ | waiting
+ | {cannot_serve, list()}.
+
+-record(req, {object,
+ mode,
+ nodes,
+ claim_no = 0,
+ require = all}).
+
+-type monitored_nodes() :: [{node(), reference()}].
+-type down_nodes() :: [node()].
+
+-record(state, {
+ locks :: ets:tab(),
+ agents :: ets:tab(),
+ interesting = [] :: [lock_id()],
+ claim_no = 0 :: integer(),
+ requests :: ets:tab(),
+ down = [] :: down_nodes(),
+ monitored = [] :: monitored_nodes(),
+ await_nodes = false :: boolean(),
+ monitor_nodes = false :: boolean(),
+ pending :: ets:tab(),
+ sync = [] :: [#lock{}],
+ client :: pid(),
+ client_mref :: reference(),
+ options = [] :: [option()],
+ notify = [] :: [pid()],
+ awaiting_all = []:: [{pid(), reference() | async}],
+ answer :: locking | waiting | done,
+ deadlocks = [] :: deadlocks(),
+ have_all = false :: boolean(),
+ status = no_locks :: transaction_status(),
+ check = false :: false | {true, [any()]},
+ handle_locks = false :: boolean()
+ }).
+
+-define(costly_event(E),
+ case erlang:trace_info({?MODULE,event,3}, traced) of
+ {_, false} -> ok;
+ _ -> event(?LINE, E, none)
+ end).
+
+-define(event(E), event(?LINE, E, none)).
+-define(event(E, S), event(?LINE, E, S)).
+
+event(_Line, _Event, _State) ->
+ ok.
diff --git a/src/locks_agent_lib.erl b/src/locks_agent_lib.erl
new file mode 100644
index 0000000..de32bb3
--- /dev/null
+++ b/src/locks_agent_lib.erl
@@ -0,0 +1,447 @@
+%% -*- mode: erlang; indent-tabs-mode: nil; -*-
+%%---- BEGIN COPYRIGHT -------------------------------------------------------
+%%
+%% Copyright (C) 2013-20 Ulf Wiger. All rights reserved.
+%%
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at http://mozilla.org/MPL/2.0/.
+%%
+%%---- END COPYRIGHT ---------------------------------------------------------
+%% Key contributor: Thomas Arts
+%%
+%%=============================================================================
+%% @doc Support functions for locks_agent
+%% @end
+
+-module(locks_agent_lib).
+-export([ all_locks_status/1
+ , analyse/1 ]).
+
+-export([ get_lock/2
+ , matching_request/5
+ , send_indirects/1
+ , store_lock_holders/3
+ , lock_holders/1
+ , move_requests/3
+ , involved_agents/1
+ , prune_interesting/3
+ , request_lock/3
+ , ensure_monitor/2
+ , request_can_be_served/2
+ , in_tail/2
+ , add_msgs/2 ]).
+
+-import(lists,[foreach/2,any/2,map/2,member/2]).
+
+-include("locks_agent.hrl").
+
+
+-spec all_locks_status(#state{}) ->
+ no_locks
+ | waiting
+ | {have_all_locks, deadlocks()}
+ | {cannot_serve, list()}.
+%%
+all_locks_status(#state{pending = Pend, locks = Locks} = State) ->
+ Status = all_locks_status_(State),
+ ?costly_event({locks_diagnostics,
+ [{pending, pp_pend(ets:tab2list(Pend))},
+ {locks, pp_locks(ets:tab2list(Locks))}]}),
+ ?event({all_locks_status, Status}),
+ Status.
+
+pp_pend(Pend) ->
+ [{O, M, Ns, Req} || #req{object = O, mode = M, nodes = Ns,
+ require = Req} <- Pend].
+
+pp_locks(Locks) ->
+ [{O,lock_holder(Q)} ||
+ #lock{object = O, queue = Q} <- Locks].
+
+lock_holder([#w{entries = Es}|_]) ->
+ [A || #entry{agent = A} <- Es];
+lock_holder([#r{entries = Es}|_]) ->
+ [A || #entry{agent = A} <- Es].
+
+
+all_locks_status_(#state{locks = Locks, pending = Pend} = State) ->
+ case ets_info(Locks, size) of
+ 0 ->
+ case ets_info(Pend, size) of
+ 0 -> no_locks;
+ _ ->
+ waiting
+ end;
+ _ ->
+ case waitingfor(State) of
+ [] ->
+ {have_all_locks, State#state.deadlocks};
+ WF ->
+ case [O || {O, _} <- WF,
+ requests_for_obj_can_be_served(
+ O, State) =:= false] of
+ [] ->
+ waiting;
+ Os ->
+ {cannot_serve, Os}
+ end
+ end
+ end.
+
+requests_for_obj_can_be_served(Obj, #state{pending = Pending} = S) ->
+ lists:all(
+ fun(R) ->
+ request_can_be_served(R, S)
+ end, ets:lookup(Pending, Obj)).
+
+request_can_be_served(_, #state{await_nodes = true}) ->
+ true;
+request_can_be_served(#req{nodes = Ns, require = R}, #state{down = Down}) ->
+ case R of
+ all -> intersection(Down, Ns) == [];
+ any -> Ns -- Down =/= [];
+ majority -> length(Ns -- Down) > (length(Ns) div 2);
+ majority_alive -> true;
+ all_alive -> true
+ end.
+
+-spec waitingfor(#state{}) -> [lock_id()].
+waitingfor(#state{requests = Reqs,
+ pending = Pending, down = Down} = S) ->
+ %% HaveLocks = [{L#lock.object, l_mode(Q)} || #lock{queue = Q} = L <- Locks,
+ %% in(self(), hd(Q))],
+ {Served, PendingOIDs} =
+ ets:foldl(
+ fun(#req{object = OID, mode = M,
+ require = majority, nodes = Ns} = R,
+ {SAcc, PAcc}) ->
+ NodesLocked = nodes_locked(OID, M, S),
+ case length(NodesLocked) > (length(Ns) div 2) of
+ false ->
+ {SAcc, ordsets:add_element(OID, PAcc)};
+ true ->
+ {[R|SAcc], PAcc}
+ end;
+ (#req{object = OID, mode = M,
+ require = majority_alive, nodes = Ns} = R,
+ {SAcc, PAcc}) ->
+ Alive = Ns -- Down,
+ NodesLocked = nodes_locked(OID, M, S),
+ case length(NodesLocked) > (length(Alive) div 2) of
+ false ->
+ {SAcc, ordsets:add_element(OID, PAcc)};
+ true ->
+ {[R|SAcc], PAcc}
+ end;
+ (#req{object = OID, mode = M,
+ require = any, nodes = Ns} = R, {SAcc, PAcc}) ->
+ case [N || N <- nodes_locked(OID, M, S),
+ member(N, Ns)] of
+ [_|_] -> {[R|SAcc], PAcc};
+ [] ->
+ {SAcc, ordsets:add_element(OID, PAcc)}
+ end;
+ (#req{object = OID, mode = M,
+ require = all, nodes = Ns} = R, {SAcc, PAcc}) ->
+ NodesLocked = nodes_locked(OID, M, S),
+ case Ns -- NodesLocked of
+ [] -> {[R|SAcc], PAcc};
+ [_|_] ->
+ {SAcc, ordsets:add_element(OID, PAcc)}
+ end;
+ (#req{object = OID, mode = M,
+ require = all_alive, nodes = Ns} = R, {SAcc, PAcc}) ->
+ Alive = Ns -- Down,
+ NodesLocked = nodes_locked(OID, M, S),
+ case Alive -- NodesLocked of
+ [] -> {[R|SAcc], PAcc};
+ [_|_] ->
+ {SAcc, ordsets:add_element(OID, PAcc)}
+ end;
+ (_, Acc) ->
+ Acc
+ end, {[], ordsets:new()}, Pending),
+ ?event([{served, Served}, {pending_oids, PendingOIDs}]),
+ move_requests(Served, Pending, Reqs),
+ PendingOIDs.
+
+nodes_locked(OID, M, #state{} = S) ->
+ [N || {_, N} = Obj <- have_locks(OID, S),
+ l_covers(M, l_mode( get_lock(Obj,S) ))].
+
+l_mode(#lock{queue = [#r{}|_]}) -> read;
+l_mode(#lock{queue = [#w{}|_]}) -> write.
+
+l_covers(read, write) -> true;
+l_covers(M , M ) -> true;
+l_covers(_ , _ ) -> false.
+
+intersection(A, B) ->
+ A -- (A -- B).
+
+get_lock(OID, #state{locks = Ls}) ->
+ [L] = ets_lookup(Ls, OID),
+ L.
+
+move_requests([R|Rs], From, To) ->
+ ets_delete_object(From, R),
+ ets_insert(To, R),
+ move_requests(Rs, From, To);
+move_requests([], _, _) ->
+ ok.
+
+store_lock_holders(Prev, #lock{object = Obj} = Lock,
+ #state{agents = As}) ->
+ PrevLockHolders = case Prev of
+ [PrevLock] -> lock_holders(PrevLock);
+ [] -> []
+ end,
+ LockHolders = lock_holders(Lock),
+ [ets_delete(As, {A,Obj}) || A <- PrevLockHolders -- LockHolders],
+ [ets_insert(As, {{A,Obj}}) || A <- LockHolders -- PrevLockHolders],
+ ok.
+
+lock_holders(#lock{queue = [#r{entries = Es}|_]}) ->
+ [A || #entry{agent = A} <- Es];
+lock_holders(#lock{queue = [#w{entries = [#entry{agent = A}]}|_]}) ->
+ %% exclusive lock held
+ [A];
+lock_holders(#lock{queue = [#w{entries = [_,_|_]}|_]}) ->
+ %% contention for write lock; no-one actually holds the lock
+ [].
+
+prune_interesting(I, node, Node) ->
+ [OID || {_, N} = OID <- I, N =/= Node];
+prune_interesting(I, object, Object) ->
+ [OID || {O, _} = OID <- I, O =/= Object].
+
+send_indirects(#state{interesting = I, agents = As} = State) ->
+ InvolvedAgents = involved_agents(As),
+ Locks = [get_lock(OID, State) || OID <- I],
+ [ send_lockinfo(Agent, L)
+ || Agent <- compute_indirects(InvolvedAgents),
+ #lock{queue = [_,_|_]} = L <- Locks,
+ interesting(State, L, Agent) ].
+
+-spec compute_indirects([agent()]) -> [agent()].
+compute_indirects(InvolvedAgents) ->
+ [ A || A<-InvolvedAgents, A>self()].
+
+send_lockinfo(Agent, #lock{object = {OID, Node}} = L) ->
+ Agent ! #locks_info{lock = L#lock{object = OID}, where = Node}.
+
+involved_agents(As) ->
+ involved_agents(ets:first(As), As).
+
+involved_agents({A,_}, As) ->
+ [A | involved_agents(ets_next(As, {A,[]}), As)];
+involved_agents('$end_of_table', _) ->
+ [].
+
+%% is this lock interesting for the agent?
+%%
+-spec interesting(#state{}, #lock{}, agent()) -> boolean().
+interesting(#state{agents = As}, #lock{queue = Q}, Agent) ->
+ (not is_member(Agent, Q)) andalso
+ has_a_lock(As, Agent).
+
+is_member(A, [#r{entries = Es}|T]) ->
+ lists:keymember(A, #entry.agent, Es) orelse is_member(A, T);
+is_member(A, [#w{entries = Es}|T]) ->
+ lists:keymember(A, #entry.agent, Es) orelse is_member(A, T);
+is_member(_, []) ->
+ false.
+
+%% -spec has_a_lock([#lock{}], pid()) -> boolean().
+%% has_a_lock(Locks, Agent) ->
+%% is_member(Agent, [hd(L#lock.queue) || L<-Locks]).
+has_a_lock(As, Agent) ->
+ case ets_next(As, {Agent,0}) of % Lock IDs are {LockName,Node} tuples
+ {Agent,_} -> true;
+ _ -> false
+ end.
+
+have_locks(Obj, #state{agents = As}) ->
+ %% We need to use {element,1,{element,2,{element,1,'$_'}}} in the body,
+ %% since Obj may not be a legal output pattern (e.g. [a, {x,1}]).
+ ets_select(As, [{ {{self(),{Obj,'$1'}}}, [],
+ [{{{element,1,
+ {element,2,
+ {element,1,'$_'}}},'$1'}}] }]).
+
+ets_insert(T, Obj) -> ets:insert(T, Obj).
+ets_lookup(T, K) -> ets:lookup(T, K).
+ets_delete(T, K) -> ets:delete(T, K).
+ets_delete_object(T, O) -> ets:delete_object(T, O).
+ets_select(T, P) -> ets:select(T, P).
+ets_info(T, I) -> ets:info(T, I).
+ets_next(T, K) -> ets:next(T, K).
+
+matching_request(Object, Mode, Nodes, Require,
+ #state{requests = Requests,
+ pending = Pending} = S) ->
+ case any_matching_request_(ets_lookup(Pending, Object), Pending,
+ Object, Mode, Nodes, Require, S) of
+ {false, S1} ->
+ any_matching_request_(ets_lookup(Requests, Object), Requests,
+ Object, Mode, Nodes, Require, S1);
+ True -> True
+ end.
+
+any_matching_request_([R|Rs], Tab, Object, Mode, Nodes, Require, S) ->
+ case matching_request_(R, Tab, Object, Mode, Nodes, Require, S) of
+ {false, S1} ->
+ any_matching_request_(Rs, Tab, Object, Mode, Nodes, Require, S1);
+ True -> True
+ end;
+any_matching_request_([], _, _, _, _, _, S) ->
+ {false, S}.
+
+matching_request_(Req, Tab, Object, Mode, Nodes, Require, S) ->
+ case Req of
+ #req{nodes = Nodes1, require = Require, mode = Mode} ->
+ ?event({repeated_request, Object}),
+ %% Repeated request
+ case Nodes -- Nodes1 of
+ [] ->
+ {true, S};
+ [_|_] = New ->
+ {true, add_nodes(New, Req, Tab, S)}
+ end;
+ #req{nodes = Nodes, require = Require, mode = write}
+ when Mode==read ->
+ ?event({found_old_write_request, Object}),
+ %% The old request is sufficient
+ {true, S};
+ #req{nodes = Nodes, require = Require, mode = read} when Mode==write ->
+ ?event({need_upgrade, Object}),
+ {false, remove_locks(Object, S)};
+ #req{nodes = PrevNodes} ->
+ %% Different conditions from last time
+ Reason = {conflicting_request,
+ [Object, Nodes, PrevNodes]},
+ error(Reason)
+ end.
+
+add_nodes(Nodes, #req{object = Object, mode = Mode, nodes = OldNodes} = Req,
+ Tab, #state{pending = Pending} = State) ->
+ AllNodes = union(Nodes, OldNodes),
+ Req1 = Req#req{nodes = AllNodes},
+ %% replace request
+ ets_delete_object(Tab, Req),
+ ets_insert(Pending, Req1),
+ lists:foldl(
+ fun(Node, Sx) ->
+ OID = {Object, Node},
+ request_lock(
+ OID, Mode, ensure_monitor(Node, Sx))
+ end, State, Nodes).
+
+remove_locks(Object, #state{locks = Locks, agents = As,
+ interesting = I} = S) ->
+ ets:match_delete(Locks, #lock{object = {Object,'_'}, _ = '_'}),
+ ets:match_delete(As, {{self(),{Object,'_'}}}),
+ S#state{interesting = prune_interesting(I, object, Object)}.
+
+request_lock({OID, Node} = _LockID, Mode, #state{client = Client} = State) ->
+ ?event({request_lock, _LockID}),
+ P = {?LOCKER, Node},
+ erlang:monitor(process, P),
+ locks_server:lock(OID, [Node], Client, Mode),
+ State.
+
+ensure_monitor(Node, S) when Node == node() ->
+ S;
+ensure_monitor(Node, #state{monitored = Mon} = S) ->
+ Mon1 = ensure_monitor_(?LOCKER, Node, Mon),
+ S#state{monitored = Mon1}.
+
+ensure_monitor_(Locker, Node, Mon) ->
+ case orddict:is_key(Node, Mon) of
+ true ->
+ Mon;
+ false ->
+ Ref = erlang:monitor(process, {Locker, Node}),
+ orddict:store(Node, Ref, Mon)
+ end.
+
+union(A, B) ->
+ A ++ (B -- A).
+
+%% analyse computes whether a local deadlock can be detected,
+%% if not, 'ok' is returned, otherwise it returns the triple
+%% {deadlock,ToSurrender,ToObject} stating which agent should
+%% surrender which object.
+%%
+analyse(Locks) ->
+ Nodes =
+ expand_agents([ {hd(L#lock.queue), L#lock.object} ||
+ L <- Locks]),
+ Connect =
+ fun({A1, O1}, {A2, _}) when A1 =/= A2 ->
+ lists:any(
+ fun(#lock{object = Obj, queue = Queue}) ->
+ (O1=:=Obj)
+ andalso in(A1, hd(Queue))
+ andalso in_tail(A2, tl(Queue))
+ end, Locks);
+ (_, _) ->
+ false
+ end,
+ case locks_cycles:components(Nodes, Connect) of
+ [] ->
+ ok;
+ [Comp|_] ->
+ {ToSurrender, ToObject} = max_agent(Comp),
+ {deadlock, ToSurrender, ToObject}
+ end.
+
+expand_agents([{#w{entries = Es}, Id} | T]) ->
+ expand_agents_(Es, Id, T);
+expand_agents([{#r{entries = Es}, Id} | T]) ->
+ expand_agents_(Es, Id, T);
+expand_agents([]) ->
+ [].
+
+expand_agents_([#entry{agent = A}|T], Id, Tail) ->
+ [{A, Id}|expand_agents_(T, Id, Tail)];
+expand_agents_([], _, Tail) ->
+ expand_agents(Tail). % return to higher-level recursion
+
+in(A, #r{entries = Entries}) ->
+ lists:keymember(A, #entry.agent, Entries);
+in(A, #w{entries = Entries}) ->
+ lists:keymember(A, #entry.agent, Entries).
+
+in_tail(A, Tail) ->
+ lists:any(fun(X) ->
+ in(A, X)
+ end, Tail).
+
+max_agent([{A, O}]) ->
+ {A, O};
+max_agent([{A1, O1}, {A2, _O2} | Rest]) when A1 > A2 ->
+ max_agent([{A1, O1} | Rest]);
+max_agent([{_A1, _O1}, {A2, O2} | Rest]) ->
+ max_agent([{A2, O2} | Rest]).
+
+add_msgs([H|T], Msgs) ->
+ add_msgs(T, append_msg(H, Msgs));
+add_msgs([], Msgs) ->
+ Msgs.
+
+append_msg(#locks_info{lock = #lock{object = O, version = V}} = I, Msgs) ->
+ append(Msgs, O, V, I).
+
+append([#locks_info{lock = #lock{object = O, version = V1}}|T] = L, O, V, I) ->
+ if V > V1 ->
+ [I|T];
+ true ->
+ L
+ end;
+append([H|T], O, V, I) ->
+ [H|append(T, O, V, I)];
+append([], _, _, I) ->
+ [I].
+
diff --git a/src/locks_debug.hrl b/src/locks_debug.hrl
new file mode 100644
index 0000000..b713151
--- /dev/null
+++ b/src/locks_debug.hrl
@@ -0,0 +1,12 @@
+
+-ifdef(LOCKS_DEBUG).
+
+-ifndef(LOCKS_LOGSZ).
+-define(LOCKS_LOGSZ, 10).
+-endif.
+
+-define(log(X), locks_window:log(X, ?LOCKS_LOGSZ)).
+
+-else.
+-define(log(X), ok).
+-endif.
diff --git a/src/locks_leader.erl b/src/locks_leader.erl
index b7f3b34..333e786 100644
--- a/src/locks_leader.erl
+++ b/src/locks_leader.erl
@@ -95,9 +95,9 @@
-export_type([mod_state/0, msg/0, election/0]).
--type option() :: {role, candidate | worker}
- | {resource, any()}.
--type ldr_options() :: [option()].
+-type ldr_option() :: {role, candidate | worker}
+ | {resource, any()}.
+-type ldr_options() :: [ldr_option()].
-type mod_state() :: any().
-type msg() :: any().
-type reply() :: any().
@@ -120,6 +120,7 @@
%% mode = dynamic,
initial = true,
lock,
+ vector,
agent,
leader,
election_ref,
@@ -135,6 +136,13 @@
buffered = []}).
-include("locks.hrl").
+-include("locks_debug.hrl").
+
+-ifdef(LOCKS_DEBUG).
+-define(log(X, S), dbg_log(X, S)).
+-else.
+-define(log(X, S), ok).
+-endif.
-define(event(E), event(?LINE, E, none)).
-define(event(E, S), event(?LINE, E, S)).
@@ -210,6 +218,7 @@ leader_node(#st{}) ->
%% safe.
%% @end
reply(From, Reply) ->
+ ?log({'$reply', From, Reply}),
gen_server:reply(From, Reply).
-spec broadcast(any(), election()) -> ok.
@@ -379,7 +388,7 @@ leader_call(L, Request, Timeout) ->
end.
leader_reply(From, Reply) ->
- gen_server:reply(From, {'$locks_leader_reply', Reply}).
+ reply(From, {'$locks_leader_reply', Reply}).
-spec leader_cast(L::server_ref(), Msg::term()) -> ok.
%% @doc Make an asynchronous cast to the leader.
@@ -471,18 +480,19 @@ init_(Module, ModSt0, Options, Parent, Reg) ->
undefined
end,
proc_lib:init_ack(Parent, {ok, self()}),
- case safe_loop(#st{agent = Agent,
- role = Role,
- mod = Module,
- mod_state = ModSt,
- lock = Lock,
- %% mode = Mode,
- nodes = AllNodes,
- regname = Reg}) of
+ S1 = #st{agent = Agent,
+ role = Role,
+ mod = Module,
+ mod_state = ModSt,
+ lock = Lock,
+ %% mode = Mode,
+ nodes = AllNodes,
+ regname = Reg},
+ case safe_loop(S1) of
{stop, StopReason, _} ->
error(StopReason);
_ ->
- ok
+ {ok, S1} % we should never get here, but it makes dialyzer happy
end.
default_lock(Mod, undefined) -> Mod;
@@ -515,10 +525,12 @@ noreply(Stop) when element(1, Stop) == stop ->
%% We enter safe_loop/1 as soon as no leader is elected
safe_loop(#st{agent = A} = S) ->
receive
- {nodeup, N} ->
+ {nodeup, N} = _Msg ->
+ ?log(_Msg, S),
?event({nodeup, N, nodes()}, S),
noreply(nodeup(N, S));
{locks_agent, A, Info} = _Msg ->
+ ?log(_Msg),
?event(_Msg, S),
case Info of
#locks_info{} ->
@@ -530,33 +542,44 @@ safe_loop(#st{agent = A} = S) ->
noreply(S)
end;
#locks_info{} = I -> % if worker - direct from locks_server
+ ?log(I, S),
?event(I, S),
noreply(locks_info(I, S));
{?MODULE, am_leader, L, ERef, LeaderMsg} = _Msg ->
+ ?log(_Msg, S),
?event(_Msg, S),
noreply(leader_announced(L, ERef, LeaderMsg, S));
{?MODULE, from_leader, L, ERef, LeaderMsg} = _Msg ->
+ ?log(_Msg, S),
?event(_Msg, S),
noreply(from_leader(L, ERef, LeaderMsg, S));
{?MODULE, am_worker, W} = _Msg ->
+ ?log(_Msg, S),
?event(_Msg, S),
noreply(worker_announced(W, S));
{?MODULE, leader_uncertain, _L, _Synced, _SyncedWs} = _Msg ->
+ ?log(_Msg, S),
?event(_Msg, S),
noreply(S);
{?MODULE, affirm_leader, L, Ref} = _Msg ->
+ ?log(_Msg, S),
?event({in_safe_loop, _Msg}, S),
noreply(leader_affirmed(L, Ref, S));
{?MODULE, ensure_sync, _, _} = _Msg ->
+ ?log(_Msg, S),
?event({in_safe_loop, _Msg}, S),
noreply(S);
- {'$gen_call', From, '$locks_leader_debug'} ->
+ {'$gen_call', From, '$locks_leader_debug'} = _Msg ->
+ ?log(_Msg, S),
handle_call('$locks_leader_debug', From, S);
- {'$gen_call', From, '$info'} ->
+ {'$gen_call', From, '$info'} = _Msg ->
+ ?log(_Msg, S),
handle_call('$locks_leader_info', From, S);
- {'$gen_call', From, {'$locks_leader_info', Item}} ->
+ {'$gen_call', From, {'$locks_leader_info', Item}} = _Msg ->
+ ?log(_Msg, S),
handle_call({'$locks_leader_info', Item}, From, S);
- {'$gen_call', {_, {?MODULE, _Ref}} = From, Req} ->
+ {'$gen_call', {_, {?MODULE, _Ref}} = From, Req} = _Msg ->
+ ?log(_Msg, S),
%% locks_leader-tagged call; handle also in safe loop
?event({safe_call, Req}),
#st{mod = M, mod_state = MSt} = S,
@@ -564,6 +587,7 @@ safe_loop(#st{agent = A} = S) ->
callback_reply(M:handle_call(Req, From, MSt, opaque(S)),
From, fun unchanged/1, S));
{'DOWN',_,_,_,_} = DownMsg ->
+ ?log(DownMsg, S),
?event(DownMsg, S),
noreply(down(DownMsg, S))
end.
@@ -572,16 +596,20 @@ event(_Line, _Event, _State) ->
ok.
%% @private
-handle_info({nodeup, N}, #st{role = candidate} = S) ->
+handle_info(Msg, S) ->
+ ?log(Msg, S),
+ handle_info_(Msg, S).
+
+handle_info_({nodeup, N}, #st{role = candidate} = S) ->
?event({handle_info, {nodeup, N, nodes()}}, S),
noreply(nodeup(N, S));
-handle_info({nodedown, N}, #st{nodes = Nodes} =S) ->
+handle_info_({nodedown, N}, #st{nodes = Nodes} =S) ->
?event({nodedown, N}, S),
noreply(S#st{nodes = ordsets:del_element(N, Nodes)});
-handle_info({'DOWN', _, _, _, _} = Msg, S) ->
+handle_info_({'DOWN', _, _, _, _} = Msg, S) ->
?event({handle_info, Msg}, S),
noreply(down(Msg, S));
-handle_info({locks_agent, A, Info} = _Msg, #st{agent = A} = S) ->
+handle_info_({locks_agent, A, Info} = _Msg, #st{agent = A} = S) ->
?event({handle_info, _Msg}, S),
case Info of
#locks_info{} -> noreply(locks_info(Info, S));
@@ -591,17 +619,18 @@ handle_info({locks_agent, A, Info} = _Msg, #st{agent = A} = S) ->
_ ->
noreply(S)
end;
-handle_info({?MODULE, leader_uncertain, L, Synced, SyncedWs}, S) ->
+handle_info_({?MODULE, leader_uncertain, L, Synced, SyncedWs}, S) ->
?event({leader_uncertain, {{L, Synced, SyncedWs}, S#st.leader}}),
case S#st.leader of
MyL when MyL == self() ->
- lists:foldl(
- fun({Pid, Type}, Sx) ->
- maybe_announce_leader(
- Pid, Type, remove_synced(Pid, Type, Sx))
- end, S,
- [{P,candidate} || P <- [L|Synced]]
- ++ [{P,worker} || P <- SyncedWs]);
+ noreply(
+ lists:foldl(
+ fun({Pid, Type}, Sx) ->
+ maybe_announce_leader(
+ Pid, Type, remove_synced(Pid, Type, Sx))
+ end, S,
+ [{P,candidate} || P <- [L|Synced]]
+ ++ [{P,worker} || P <- SyncedWs]));
L ->
locks_agent:change_flag(S#st.agent, notify, true),
noreply(S#st{leader = undefined,
@@ -609,48 +638,52 @@ handle_info({?MODULE, leader_uncertain, L, Synced, SyncedWs}, S) ->
_OtherL ->
noreply(S)
end;
-handle_info({?MODULE, affirm_leader, L, ERef} = _Msg, #st{} = S) ->
+handle_info_({?MODULE, affirm_leader, L, ERef} = _Msg, #st{} = S) ->
?event(_Msg, S),
noreply(leader_affirmed(L, ERef, S));
-handle_info({?MODULE, ensure_sync, Pid, Type} = _Msg, #st{} = S) ->
+handle_info_({?MODULE, ensure_sync, Pid, Type} = _Msg, #st{} = S) ->
?event(_Msg, S),
S1 = case S#st.leader of
Me when Me == self() ->
- maybe_announce_leader(Pid, Type, S);
+ maybe_announce_leader(Pid, Type, remove_synced(Pid, Type, S));
_ ->
S
end,
noreply(S1);
-handle_info({?MODULE, am_worker, W} = _Msg, #st{} = S) ->
+handle_info_({?MODULE, am_worker, W} = _Msg, #st{} = S) ->
?event({handle_info, _Msg}, S),
noreply(worker_announced(W, S));
-handle_info(#locks_info{lock = #lock{object = Lock}} = I,
- #st{lock = Lock} = S) ->
- {noreply, locks_info(I, S)};
-handle_info({?MODULE, am_leader, L, ERef, LeaderMsg} = _M, S) ->
+handle_info_(#locks_info{lock = #lock{object = Lock}} = I,
+ #st{lock = Lock} = S) ->
+ noreply(locks_info(I, S));
+handle_info_({?MODULE, am_leader, L, ERef, LeaderMsg} = _M, S) ->
?event({handle_info, _M}, S),
noreply(leader_announced(L, ERef, LeaderMsg, S));
-handle_info({?MODULE, from_leader, L, ERef, LeaderMsg} = _M, S) ->
+handle_info_({?MODULE, from_leader, L, ERef, LeaderMsg} = _M, S) ->
?event({handle_info, _M}, S),
noreply(from_leader(L, ERef, LeaderMsg, S));
-handle_info({Ref, {'$locks_leader_reply', Reply}} = _M,
+handle_info_({Ref, {'$locks_leader_reply', Reply}} = _M,
#st{buffered = Buf} = S) ->
?event({handle_info, _M}, S),
case lists:keytake(Ref, 1, Buf) of
{value, {_, OrigRef}, Buf1} ->
- gen_server:reply(OrigRef, {'$locks_leader_reply', Reply}),
+ reply(OrigRef, {'$locks_leader_reply', Reply}),
noreply(S#st{buffered = Buf1});
false ->
noreply(S)
end;
-handle_info(Msg, #st{mod = M, mod_state = MSt} = S) ->
+handle_info_(Msg, #st{mod = M, mod_state = MSt} = S) ->
?event({handle_info, Msg}, S),
noreply(callback(M:handle_info(Msg, MSt, opaque(S)), S)).
%% @private
-handle_cast({'$locks_leader_cast', Msg} = Cast, #st{mod = M, mod_state = MSt,
- leader = L} = S) ->
+handle_cast(Msg, S) ->
+ ?log({'$cast', Msg}, S),
+ handle_cast_(Msg, S).
+
+handle_cast_({'$locks_leader_cast', Msg} = Cast, #st{mod = M, mod_state = MSt,
+ leader = L} = S) ->
if L == self() ->
noreply(callback(M:handle_leader_cast(Msg, MSt, opaque(S)), S));
is_pid(L) ->
@@ -659,17 +692,21 @@ handle_cast({'$locks_leader_cast', Msg} = Cast, #st{mod = M, mod_state = MSt,
true ->
noreply(S)
end;
-handle_cast(Msg, #st{mod = M, mod_state = MSt} = St) ->
+handle_cast_(Msg, #st{mod = M, mod_state = MSt} = St) ->
noreply(callback(M:handle_cast(Msg, MSt, opaque(St)), St)).
%% @private
-handle_call(Req, {_, {?MODULE, _Ref}} = From,
- #st{mod = M, mod_state = MSt} = S) ->
+handle_call(Req, From, S) ->
+ ?log({'$call', Req, From}, S),
+ handle_call_(Req, From, S).
+
+handle_call_(Req, {_, {?MODULE, _Ref}} = From,
+ #st{mod = M, mod_state = MSt} = S) ->
noreply(
callback_reply(M:handle_call(Req, From, MSt, opaque(S)), From,
fun unchanged/1, S));
-handle_call('$locks_leader_debug', From, S) ->
+handle_call_('$locks_leader_debug', From, S) ->
I = [{leader, leader(S)},
{leader_node, leader_node(S)},
{candidates, candidates(S)},
@@ -678,9 +715,9 @@ handle_call('$locks_leader_debug', From, S) ->
{module, S#st.mod},
{mod_state, S#st.mod_state},
{process_info, process_info(self())}],
- gen_server:reply(From, I),
+ reply(From, I),
noreply(S);
-handle_call('$locks_leader_info', From, S) ->
+handle_call_('$locks_leader_info', From, S) ->
I = [{leader, leader(S)},
{leader_node, leader_node(S)},
{candidates, candidates(S)},
@@ -688,9 +725,9 @@ handle_call('$locks_leader_info', From, S) ->
{workers, workers(S)},
{module, S#st.mod},
{mod_state, S#st.mod_state}],
- gen_server:reply(From, I),
+ reply(From, I),
noreply(S);
-handle_call({'$locks_leader_info', Item}, From, S) ->
+handle_call_({'$locks_leader_info', Item}, From, S) ->
I = case Item of
leader -> leader(S);
leader_node -> leader_node(S);
@@ -701,11 +738,11 @@ handle_call({'$locks_leader_info', Item}, From, S) ->
mod_state -> S#st.mod_state;
_ -> undefined
end,
- gen_server:reply(From, I),
+ reply(From, I),
noreply(S);
-handle_call({'$locks_leader_call', Req} = Msg, From,
- #st{mod = M, mod_state = MSt, leader = L,
- buffered = Buf} = S) ->
+handle_call_({'$locks_leader_call', Req} = Msg, From,
+ #st{mod = M, mod_state = MSt, leader = L,
+ buffered = Buf} = S) ->
if L == self() ->
noreply(
callback_reply(
@@ -717,11 +754,10 @@ handle_call({'$locks_leader_call', Req} = Msg, From,
catch erlang:send(L, {'$gen_call', NewFrom, Msg}, [noconnect]),
noreply(S#st{buffered = [{MyRef, From}|Buf]})
end;
-handle_call(R, F, #st{mod = M, mod_state = MSt} = S) ->
+handle_call_(R, F, #st{mod = M, mod_state = MSt} = S) ->
noreply(
callback_reply(M:handle_call(R, F, MSt, opaque(S)), F,
fun unchanged/1, S)).
- %% fun(R1) -> R1 end, S)).
unchanged(X) ->
X.
@@ -782,7 +818,7 @@ down({'DOWN', Ref, _, Pid, _} = Msg,
callback(M:handle_info(Msg, MSt, opaque(S)), S);
Type ->
S1 = if Pid == LPid ->
- [gen_server:reply(From,'$leader_died')
+ [reply(From,'$leader_died')
|| {_, From} <- S#st.buffered],
S#st{leader = undefined, buffered = [],
synced = [], synced_workers = []};
@@ -799,7 +835,7 @@ add_cand(Client, #st{candidates = Cands, role = Role} = S) ->
monitor_cand(Client),
S1 = S#st{candidates = [Client | Cands]},
if Role == worker ->
- Client ! {?MODULE, am_worker, self()},
+ snd(Client, {?MODULE, am_worker, self()}),
S1;
true ->
maybe_announce_leader(Client, candidate, S1)
@@ -813,21 +849,25 @@ monitor_cand(Client) ->
put({?MODULE, monitor, MRef}, candidate).
maybe_announce_leader(Pid, Type, #st{leader = L, mod = M,
- mod_state = MSt} = S) ->
- ?event({maybe_announce_leader, Pid, Type}, S),
- ERef = S#st.election_ref,
- IsSynced = is_synced(Pid, Type, S),
+ mod_state = MSt} = S0) ->
+ ?event({maybe_announce_leader, Pid, Type}, S0),
+ IsSynced = is_synced(Pid, Type, S0),
+ ?event({is_synced, Pid, IsSynced}),
if L == self(), IsSynced == false ->
- case M:elected(MSt, opaque(S), Pid) of
+ S = refresh_vector(S0),
+ ERef = S#st.election_ref,
+ ERes = M:elected(MSt, opaque(S), Pid),
+ ?event({elected_result, ERes}),
+ case ERes of
{reply, Msg, MSt1} ->
- Pid ! msg(am_leader, ERef, Msg),
+ snd(Pid, msg(am_leader, ERef, Msg)),
mark_as_synced(Pid, Type, S#st{mod_state = MSt1});
{ok, Msg, MSt1} ->
- Pid ! msg(am_leader, ERef, Msg),
+ snd(Pid, msg(am_leader, ERef, Msg)),
S1 = do_broadcast(S#st{mod_state = MSt1}, Msg),
mark_as_synced(Pid, Type, S1);
{ok, AmLdrMsg, FromLdrMsg, MSt1} ->
- Pid ! msg(am_leader, ERef, AmLdrMsg),
+ snd(Pid, msg(am_leader, ERef, AmLdrMsg)),
S1 = do_broadcast(S#st{mod_state = MSt1}, FromLdrMsg),
mark_as_synced(Pid, Type, S1);
{surrender, Other, MSt1} ->
@@ -841,7 +881,8 @@ maybe_announce_leader(Pid, Type, #st{leader = L, mod = M,
end
end;
true ->
- S
+ ?event({will_not_announce, L, IsSynced}),
+ S0
end.
set_leader_undefined(#st{} = S) ->
@@ -876,18 +917,25 @@ maybe_remove_cand(worker, Pid, #st{workers = Ws} = S) ->
S#st{workers = Ws -- [Pid]}.
become_leader(#st{agent = A} = S) ->
- {_, Locks} = locks_agent:lock_info(A),
- S1 = lists:foldl(
+ {_, Locks} = LockInfo = locks_agent:lock_info(A),
+ S1 = refresh_vector(LockInfo, S),
+ S2 = lists:foldl(
fun(#lock{object = {OID,Node}} = L, Sx) ->
lock_info(L#lock{object = OID}, Node, Sx)
- end, S, Locks),
- become_leader_(S1).
+ end, S1, Locks),
+ case S2#st.vector of
+ #{leader := Lv} when Lv =/= A ->
+ ?event(vector_questions_leader, S2),
+ set_leader_uncertain(S2);
+ _ ->
+ become_leader_(S2)
+ end.
-become_leader_(#st{election_ref = {L,_}, mod = M, mod_state = MSt,
+become_leader_(#st{election_ref = {L,_,_}, mod = M, mod_state = MSt,
candidates = Cands, synced = Synced,
workers = Ws, synced_workers = SyncedWs} = S0)
when L =:= self() ->
- S = S0#st{leader = self()},
+ S = S0#st{leader = self(), election_ref = new_election_ref(S0)},
?event(become_leader_again, S),
send_all(S, {?MODULE, affirm_leader, self(), S#st.election_ref}),
case {Cands -- Synced, Ws -- SyncedWs} of
@@ -909,7 +957,7 @@ become_leader_(#st{election_ref = {L,_}, mod = M, mod_state = MSt,
end
end;
become_leader_(#st{mod = M, mod_state = MSt} = S0) ->
- S = S0#st{election_ref = {self(),erlang:monotonic_time(microsecond)}},
+ S = S0#st{election_ref = new_election_ref(S0)},
?event(become_leader, S),
case M:elected(MSt, opaque(S), undefined) of
{ok, Msg, MSt1} ->
@@ -920,6 +968,9 @@ become_leader_(#st{mod = M, mod_state = MSt} = S0) ->
error(Reason)
end.
+new_election_ref(#st{vector = V}) ->
+ {self(), erlang:monotonic_time(microsecond), V}.
+
msg(from_leader, ERef, Msg) ->
{?MODULE, from_leader, self(), ERef, Msg};
msg(am_leader, ERef, Msg) ->
@@ -948,13 +999,13 @@ callback({stop, Reason, MSt}, S) ->
callback_reply({reply, Reply, MSt}, From, F, S) ->
- gen_server:reply(From, F(Reply)),
+ reply(From, F(Reply)),
S#st{mod_state = MSt};
callback_reply({reply, Reply, Msg, MSt}, From, F, S) ->
if S#st.leader == self() ->
S1 = S#st{mod_state = MSt},
do_broadcast(S1, Msg),
- gen_server:reply(From, F(Reply)),
+ reply(From, F(Reply)),
S1;
true ->
error(not_leader)
@@ -962,7 +1013,7 @@ callback_reply({reply, Reply, Msg, MSt}, From, F, S) ->
callback_reply({noreply, MSt}, _, _, S) ->
S#st{mod_state = MSt};
callback_reply({stop, Reason, Reply, MSt}, From, F, S) ->
- gen_server:reply(From, F(Reply)),
+ reply(From, F(Reply)),
{stop, Reason, S#st{mod_state = MSt}};
callback_reply({stop, Reason, MSt}, _, _, S) ->
{stop, Reason, S#st{mod_state = MSt}}.
@@ -987,35 +1038,59 @@ send_all(#st{candidates = Cands, workers = Ws}, Msg) ->
do_broadcast_(Ws, Msg).
do_broadcast_(Pids, Msg) when is_list(Pids) ->
+ ?log({'$bcast', Pids, Msg}),
[P ! Msg || P <- Pids],
ok.
+snd(Pid, Msg) ->
+ ?log({'$snd', Pid, Msg}),
+ Pid ! Msg.
+
from_leader(L, ERef, Msg, #st{leader = L, election_ref = ERef,
mod = M, mod_state = MSt} = S) ->
callback(M:from_leader(Msg, MSt, opaque(S)), S);
-from_leader(_OtherL, _ERef, _Msg, S) ->
- ?event({ignoring_from_leader, _OtherL, _Msg}, S),
- S.
+from_leader(OtherL, ERef, _Msg, S) ->
+ ?event({possible_leader_conflict, OtherL, _Msg}, S),
+ S1 = refresh_vector(S),
+ case S1#st.vector of
+ #{leader := Lv} when Lv =/= OtherL ->
+ set_leader_uncertain(S1);
+ _ ->
+ request_sync(OtherL, ERef, S)
+ end.
-leader_announced(L, ERef, Msg, #st{leader = L, election_ref = ERef,
+leader_announced(L, ERef, Msg, #st{election_ref = ERef,
mod = M, mod_state = MSt} = S) ->
callback(M:surrendered(MSt, Msg, opaque(S)),
- S#st{synced = [], synced_workers = []});
+ S#st{leader = L, synced = [], synced_workers = []});
leader_announced(L, ERef, Msg, #st{mod = M, mod_state = MSt} = S) ->
- %% Ref = erlang:monitor(process, L),
- %% put({?MODULE,monitor,Ref}, candidate),
- S1 = S#st{leader = L, election_ref = ERef,
- synced = [], synced_workers = []},
- callback(M:surrendered(MSt, Msg, opaque(S1)), S1).
-
-leader_affirmed(L, ERef, #st{election_ref = ERef} = S) ->
- S#st{leader = L};
+ #st{vector = V} = S1 = refresh_vector(S),
+ {_,_,Vl} = ERef,
+ case Vl == V of
+ true ->
+ ?event({vectors_same, V}),
+ S2 = S1#st{leader = L, election_ref = ERef,
+ synced = [], synced_workers = []},
+ callback(M:surrendered(MSt, Msg, opaque(S1)), S2);
+ false ->
+ ?event({vectors_differ, {Vl, V}}),
+ set_leader_uncertain(S1)
+ end.
+
+leader_affirmed(L, ERef, #st{leader = L, election_ref = ERef} = S) ->
+ ?event({leader_affirmed_known, L, ERef}),
+ S;
leader_affirmed(_L, _ERef, #st{leader = Me} = S) when Me == self() ->
+ ?event({leader_not_affirmed, _L, _ERef}),
set_leader_uncertain(S);
-leader_affirmed(L, _ERef, #st{} = S) ->
- %% don't set election_ref, since we are not yet synced
- L ! {?MODULE, ensure_sync, self(), S#st.role},
- S#st{leader = L}.
+leader_affirmed(L, ERef, #st{} = S) ->
+ %% don't set leader, since we are not yet synced (return to safe_loop)
+ ?event({ensuring_sync, L, S#st.role}),
+ request_sync(L, ERef, S).
+
+request_sync(L, ERef, S) ->
+ snd(L, {?MODULE, ensure_sync, self(), S#st.role}),
+ S#st{leader = undefined, election_ref = ERef}.
set_leader_uncertain(#st{agent = A} = S) ->
send_all(S, {?MODULE, leader_uncertain, self(),
@@ -1043,3 +1118,38 @@ get_opt(K, Opts, Default) ->
asynch_ping(N) ->
rpc:cast(N, erlang, is_atom, [true]).
+
+refresh_vector(#st{agent = A} = S) ->
+ refresh_vector(locks_agent:lock_info(A), S).
+
+refresh_vector(LockInfo, #st{lock = L} = S) ->
+ maybe_refresh_eref(S#st{vector = vector(L, LockInfo)}).
+
+maybe_refresh_eref(#st{election_ref = {Me,_,Ve}, vector = V} = S)
+ when Me == self(),
+ Ve =/= V ->
+ S#st{election_ref = new_election_ref(S)};
+maybe_refresh_eref(S) ->
+ S.
+
+vector(Lock, {_Pending, Locks}) ->
+ %% As a matter of implementation, the list of locks happens to be ordered,
+ %% but this is not a documented fact.
+ NewVector = lists:sort(
+ [{N, V} || #lock{object = {L, N}, version = V} <- Locks,
+ L =:= Lock]),
+ case length(lists:usort([lock_holder(Lx) || Lx <- Locks])) == 1 of
+ true ->
+ Leader = lock_holder(hd(Locks)),
+ #{leader => Leader, vector => NewVector};
+ false ->
+ #{leader => none, vector => NewVector}
+ end.
+
+lock_holder(#lock{queue = [#w{entries = [#entry{agent = A}]}|_]}) ->
+ A.
+
+-ifdef(LOCKS_DEBUG).
+dbg_log(X, #st{leader = L, vector = V}) ->
+ ?log(#{x => X, l => L, v => V}).
+-endif.
diff --git a/src/locks_server.erl b/src/locks_server.erl
index bd61b0e..c7fd9e0 100644
--- a/src/locks_server.erl
+++ b/src/locks_server.erl
@@ -15,7 +15,7 @@
-behavior(gen_server).
-export([lock/2, lock/3, lock/4,
- surrender/2, surrender/3,
+ surrender/3, surrender/4,
watch/2, unwatch/2,
remove_agent/1, remove_agent/2]).
@@ -33,21 +33,8 @@
-export([record_fields/1]).
--define(event(E), event(?LINE, E, none)).
--define(event(E, S), event(?LINE, E, S)).
-
-include("locks.hrl").
-
--define(LOCKS, locks_server_locks).
--define(AGENTS, locks_server_agents).
-
--record(st, {tabs = {ets:new(?LOCKS, [public, named_table,
- ordered_set, {keypos, 2}]),
- ets:new(?AGENTS, [public, named_table, ordered_set])},
- monitors = dict:new(),
- notify_as = self()}).
-
--record(surr, {id, mode, type, client, vsn, lock}).
+-include("locks_server.hrl").
record_fields(st) ->
record_info(fields, st);
@@ -109,14 +96,14 @@ unwatch(LockID, Nodes) ->
_ = [cast({?LOCKER, N}, Msg) || N <- Nodes],
ok.
-surrender(LockID, Node) ->
- surrender_(LockID, Node, self()).
+surrender(LockID, Node, Vsn) ->
+ surrender_(LockID, Node, Vsn, self()).
-surrender(LockID, Node, TID) ->
- surrender_(LockID, Node, {self(), TID}).
+surrender(LockID, Node, Vsn, TID) ->
+ surrender_(LockID, Node, Vsn, {self(), TID}).
-surrender_(LockID, Node, Agent) ->
- cast({?LOCKER, Node}, {surrender, LockID, Agent}).
+surrender_(LockID, Node, Vsn, Agent) ->
+ cast({?LOCKER, Node}, {surrender, LockID, Vsn, Agent}).
remove_agent(Nodes) when is_list(Nodes) ->
[cast({?LOCKER, N}, {remove_agent, self()}) || N <- Nodes],
@@ -140,10 +127,11 @@ handle_cast({lock, LockID, Agent, Client, Mode} = _Msg, #st{tabs = Tabs} = S) ->
?event({updated, Updated}),
notify(Updated, S#st.notify_as),
{noreply, monitor_agent(Agent, S)};
-handle_cast({surrender, LockID, Agent} = _Msg, #st{tabs = Tabs} = S) ->
+handle_cast({surrender, LockID, Vsn, Agent} = _Msg, #st{tabs = Tabs} = S) ->
?event({handle_cast, _Msg}),
- Updated = do_surrender(LockID, Agent, Tabs),
- notify(Updated, S#st.notify_as, {surrender, Agent}),
+ Updated = do_surrender(LockID, Vsn, Agent, Tabs),
+ ?event({updated, Updated}),
+ notify(Updated, S#st.notify_as, {surrender, Agent, Vsn}),
{noreply, S};
handle_cast({remove_agent, Agent} = _Msg, #st{tabs = Tabs} = S) ->
?event({handle_cast, _Msg}),
@@ -188,41 +176,16 @@ code_change(_FromVsn, S, _Extra) ->
%% ==== End Server callbacks =================================
-notify(Locks, Me) ->
- notify(Locks, Me, []).
+notify(Locks, As) ->
+ locks_server_lib:notify(Locks, As, []).
-notify([#lock{queue = Q, watchers = W} = H|T], Me, Note) ->
- ?event({notify, Q, W}),
- Msg = #locks_info{lock = H, note = Note},
- _ = [send(A, Msg) || #entry{agent = A} <- queue_entries(Q)],
- _ = [send(P, Msg) || P <- W],
- notify(T, Me, Note);
-notify([], _, _) ->
- ok.
+notify(Locks, As, Note) ->
+ locks_server_lib:notify(Locks, As, Note).
-send(Pid, Msg) when is_pid(Pid) ->
- ?event({send, Pid, Msg}),
- Pid ! Msg;
-send({Agent,_} = _A, Msg) when is_pid(Agent) ->
- ?event({send, Agent, Msg}),
- Agent ! Msg.
-
-queue_entries(Q) ->
- Res = queue_entries_(Q),
- ?event({queue_entries, Q, Res}),
- Res.
-
-queue_entries_([#r{entries = Es}|Q]) ->
- Es ++ queue_entries_(Q);
-queue_entries_([#w{entries = Es}|Q]) ->
- Es ++ queue_entries_(Q);
-queue_entries_([]) ->
- [].
-
-insert(ID, Agent, Client, Mode, {Locks, Tids})
+insert(ID, Agent, Client, Mode, {Locks, Tids, Admin})
when is_list(ID), Mode==read; Mode==write ->
Related = related_locks(ID, Locks),
- NewVsn = new_vsn(Related),
+ NewVsn = new_vsn(Admin),
{Check, Result} = insert_agent(Related, ID, Agent, Client, Mode, NewVsn),
?event({insert_agent, ID, Agent, {Check, Result}}),
ets_insert(Tids, [{{Agent,ID1}} || #lock{object = ID1} <- Result]),
@@ -230,7 +193,7 @@ insert(ID, Agent, Client, Mode, {Locks, Tids})
ets_insert(Locks, Result),
Result.
-insert_watcher(ID, Pid, {Locks, Tids}) ->
+insert_watcher(ID, Pid, {Locks, Tids, _Admin}) ->
case ets_lookup(Locks, ID) of
[#lock{queue = Q, watchers = Ws} = L] ->
L1 = L#lock{watchers = [Pid | Ws -- [Pid]]},
@@ -245,7 +208,7 @@ insert_watcher(ID, Pid, {Locks, Tids}) ->
end,
ets_insert(Tids, {{Pid, ID}}).
-delete_watcher(ID, Pid, {Locks, Tids}) ->
+delete_watcher(ID, Pid, {Locks, Tids, _Admin}) ->
case ets_lookup(Locks, ID) of
[#lock{watchers = Ws} = L] ->
ets_insert(Locks, L#lock{watchers = Ws -- [Pid]}),
@@ -310,21 +273,33 @@ fold_queue(_, Acc, []) ->
%% 5. Remove agent A from all locks in the found groups.
%% 6. Re-apply the direct locks.
%% 7. Increment versions of updated locks.
-do_surrender(ID, A, {Locks, Agents}) ->
- Related = related_locks(ID, A, Agents, Locks),
+do_surrender(ID, Vsn, A, {Locks, Agents, Admin}) ->
+ case find_lock(Locks, ID) of
+ {ok, #lock{version = Vsn}} ->
+ do_surrender(related_locks(ID, A, Agents, Locks),
+ ID, A, Locks, Agents, Admin);
+ Other ->
+ ?event({lock_not_found_or_wrong_vsn, ID, Other}),
+ []
+ end.
+
+do_surrender([], _, _, _, _, _) ->
+ %% Found no locks matching the parameters
+ [];
+do_surrender([_|_] = Related, ID, A, Locks, Agents, Admin) ->
+ ?event({related_locks, Related}),
Groups = group_related(Related, A),
AffectedGroups = affected_groups(Groups, ID), % list of lists
Reapply = [hd(G) || G <- AffectedGroups],
AllLocks = lists:ukeysort(#surr.id, lists:append(AffectedGroups)),
+ NewVsn = new_vsn(Admin),
AllLocks1 =
- [remove_agent_from_lock(A, S#surr.lock) || S <- AllLocks],
+ [remove_agent_from_lock(A, S#surr.lock, NewVsn) || S <- AllLocks],
{Check, Updated} =
lists:foldl(
- fun(#surr{id = L, mode = Mode,
- client = Client,
- lock = #lock{version = V}}, {Chk, Acc}) ->
+ fun(#surr{id = L, mode = Mode, client = Client}, {Chk, Acc}) ->
{Check, Upd} =
- insert_agent(Acc, L, A, Client, Mode, V),
+ insert_agent(Acc, L, A, Client, Mode, NewVsn),
{Check orelse Chk,
lists:foldl(fun(Lock, Acc2) ->
lists:keyreplace(
@@ -388,107 +363,44 @@ affected_groups(Groups, ID) ->
process_updated([#lock{object = ID,
queue = [],
- watchers = Ws,
- version = V} = H|T], A, Locks, Agents) ->
+ watchers = Ws} = H|T], A, Locks, Agents) ->
ets_delete(Agents, {A, ID}),
if Ws =:= [] ->
ets_delete(Locks, ID),
process_updated(T, A, Locks, Agents);
true ->
- H1 = H#lock{version = V+1},
- ets_insert(Locks, H1),
- [H1 | process_updated(T, A, Locks, Agents)]
+ ets_insert(Locks, H),
+ [H | process_updated(T, A, Locks, Agents)]
end;
-process_updated([#lock{version = V} = H|T], A, Locks, Agents) ->
- H1 = H#lock{version = V+1},
- ets_insert(Locks, H1),
- [H1 | process_updated(T, A, Locks, Agents)];
+process_updated([H|T], A, Locks, Agents) ->
+ ets:insert(Locks, H),
+ [H | process_updated(T, A, Locks, Agents)];
process_updated([], _, _, _) ->
[].
-do_remove_agent(A, {Locks, Agents}) ->
- Found = ets_select(Agents, [{ {{A,'$1'}}, [], [{{A,'$1'}}] }]),
- ?event([{removing_agent, A}, {found, Found}]),
- ets:select_delete(Agents, [{ {{A,'_'}}, [], [true] }]),
- do_remove_agent_(Found, Locks, []).
+do_remove_agent(A, Tabs) ->
+ locks_server_lib:remove_agent(A, Tabs).
monitor_agent(A, #st{monitors = Mons} = S) when is_pid(A) ->
- case dict:find(A, Mons) of
- {ok, _} ->
+ case maps:is_key(A, Mons) of
+ true ->
S;
- error ->
+ false ->
Ref = erlang:monitor(process, A),
- S#st{monitors = dict:store(A, Ref, Mons)}
+ S#st{monitors = Mons#{A => Ref}}
end.
demonitor_agent(A, #st{monitors = Mons} = S) when is_pid(A) ->
- case dict:find(A, Mons) of
+ case maps:find(A, Mons) of
{ok, Ref} ->
erlang:demonitor(Ref),
- S#st{monitors = dict:erase(A, Mons)};
+ S#st{monitors = maps:remove(A, Mons)};
error ->
S
end.
-
-do_remove_agent_([{A, ID}|T], Locks, Acc) ->
- case ets_lookup(Locks, ID) of
- [] ->
- do_remove_agent_(T, Locks, Acc);
- [#lock{} = L] ->
- case remove_agent_from_lock(A, L) of
- #lock{queue = [], watchers = []} ->
- ets_delete(Locks, ID),
- do_remove_agent_(T, Locks, Acc);
- #lock{} = L1 ->
- do_remove_agent_(T, Locks, [L1|Acc])
- end
- end;
-do_remove_agent_([], Locks, Acc) ->
- ets_insert(Locks, [L || #lock{queue = Q,
- watchers = Ws} = L <- Acc,
- Q =/= [] orelse Ws =/= []]),
- Acc.
-
-remove_agent_from_lock(A, #lock{version = V, queue = Q, watchers = Ws} = L) ->
- Q1 = trivial_lock_upgrade(
- lists:foldr(
- fun(#r{entries = [#entry{agent = Ax}]}, Acc1) when Ax == A ->
- Acc1;
- (#r{entries = Es}, Acc1) ->
- case lists:keydelete(A, #entry.agent, Es) of
- [] -> Acc1;
- Es1 ->
- [#r{entries = Es1} | Acc1]
- end;
- (#w{entries = Es}, Acc1) ->
- case lists:keydelete(A, #entry.agent, Es) of
- [] -> Acc1;
- Es1 ->
- [#w{entries = Es1} | Acc1]
- end;
- (E, Acc1) ->
- [E|Acc1]
- end, [], Q)),
- L#lock{version = V+1, queue = Q1,
- watchers = Ws -- [A]}.
-
-trivial_lock_upgrade([#r{entries = [#entry{agent = A}]} |
- [#w{entries = [#entry{agent = A}]} | _] = T]) ->
- T;
-trivial_lock_upgrade([#r{entries = Es}|[_|_] = T] = Q) ->
- %% Not so trivial, perhaps
- case lists:all(fun(#entry{agent = A}) ->
- in_queue(T, A, write)
- end, Es) of
- true ->
- %% All agents holding the read lock are also waiting for an upgrade
- trivial_lock_upgrade(T);
- false ->
- Q
- end;
-trivial_lock_upgrade(Q) ->
- Q.
+remove_agent_from_lock(Agent, Lock, NewVsn) ->
+ locks_server_lib:remove_agent_from_lock(Agent, Lock, NewVsn).
related_locks(ID, A, Agents, Locks) ->
Pats = agent_patterns(ID, A, []),
@@ -501,7 +413,7 @@ get_locks([ID|T], _A, ID, _Vis, _Agents, Locks) ->
get_locks([H|T], A, ID, Vis, Agents, Locks) ->
#lock{queue = Q} = L = get_lock(Locks, H),
case find_first_entry(Q, A) of
- #entry{type = indirect} ->
+ {_Mode, #entry{type = indirect}} ->
%% Must include possible direct-lock children that were not
%% in the original set (since they intersect in indirect-lock
%% parents). Don't fetch any lock twice.
@@ -527,29 +439,19 @@ get_lock(T, Id) ->
[L] = ets_lookup(T, Id),
L.
-related_locks(ID, T) ->
- Pats = make_patterns(ID),
- ets_select(T, Pats).
-
-make_patterns(ID) ->
- make_patterns(ID, []).
-
-make_patterns([H|T], Acc) ->
- ID = Acc ++ [H],
- [{ #lock{object = ID, _ = '_'}, [], ['$_'] }
- | make_patterns(T, ID)];
-make_patterns([], Acc) ->
- [{ #lock{object = Acc ++ '_', _ = '_'}, [], ['$_'] }].
-
+find_lock(T, Id) ->
+ case ets_lookup(T, Id) of
+ [] ->
+ error;
+ [L] ->
+ {ok, L}
+ end.
-new_vsn(Locks) ->
- Current =
- lists:foldl(
- fun(#lock{version = V}, Acc) ->
- erlang:max(V, Acc)
- end, 0, Locks),
- Current + 1.
+related_locks(ID, T) ->
+ locks_server_lib:related_locks(ID, T).
+new_vsn(Tab) ->
+ locks_server_lib:new_vsn(Tab).
insert_agent([], ID, A, Client, Mode, Vsn) ->
%% No related locks; easy case.
@@ -606,94 +508,11 @@ insert_agent([_|_] = Related, ID, A, Client, Mode, Vsn) ->
end
end.
-in_queue([H|T], A, Mode) ->
- case in_queue_(H, A, Mode) of
- false -> in_queue(T, A, Mode);
- true -> true
- end;
-in_queue([], _, _) ->
- false.
-
-in_queue_(#r{entries = Entries}, A, read) ->
- lists:keymember(A, #entry.agent, Entries);
-in_queue_(#w{entries = Es}, A, M) when M==read; M==write ->
- lists:keymember(A, #entry.agent, Es);
-in_queue_(_, _, _) ->
- false.
-
-into_queue(read, [#r{entries = Entries}] = Q, Entry) ->
- %% No pending write locks
- case lists:keymember(Entry#entry.agent, #entry.agent, Entries) of
- true -> Q;
- false ->
- [#r{entries = [Entry|Entries]}]
- end;
-into_queue(read, [], Entry) ->
- [#r{entries = [Entry]}];
-into_queue(write, [], Entry) ->
- [#w{entries = [Entry]}];
-into_queue(write, [#r{entries = [Er]} = H], Entry) ->
- if Entry#entry.agent == Er#entry.agent ->
- %% upgrade to write lock
- [#w{entries = [Entry]}];
- true ->
- [H, #w{entries = [Entry]}]
- end;
-into_queue(write, [#w{entries = [#entry{agent = A}]}],
- #entry{agent = A, type = direct} = Entry) ->
- %% Refresh and ensure it's a direct lock
- [#w{entries = [Entry]}];
-into_queue(Type, [H|T], #entry{agent = A, type = direct} = Entry) ->
- case H of
- #w{entries = [_,_|_]} ->
- %% This means that there are multiple exclusive write locks at
- %% a lower level; they can co-exist, but neither has an exclusive
- %% claim at this level. Queue request
- [H | into_queue(Type, T, Entry)];
- #w{entries = Es} when Type == write; Type == read ->
- %% If a matching entry exists, we set to new version and
- %% set type to direct. This means we might get a direct write
- %% lock even though we asked for a read lock.
- maybe_refresh(Es, H, T, Type, A, Entry);
- #r{entries = Es} when Type == read ->
- maybe_refresh(Es, H, T, Type, A, Entry);
- #r{entries = Es} when Type == write ->
- %% A special case is when all agents holding read entries have
- %% asked for an upgrade.
- case lists:all(fun(#entry{agent = A1}) when A1 == A -> true;
- (#entry{agent = A1}) -> in_queue(T, A1, write)
- end, Es) of
- true ->
- %% discard all read entries
- into_queue(write, T, Entry);
- false ->
- [H | into_queue(Type, T, Entry)]
- end;
- _ ->
- [H | into_queue(Type, T, Entry)]
- end;
-into_queue(Type, [H|T] = Q, Entry) ->
- case in_queue_(H, Entry#entry.agent, Type) of
- false ->
- [H|into_queue(Type, T, Entry)];
- true ->
- Q
- end.
-
-maybe_refresh(Es, H, T, Type, A, Entry) ->
- case lists:keyfind(A, #entry.agent, Es) of
- #entry{} = E ->
- Es1 = lists:keyreplace(A, #entry.agent, Es,
- E#entry{type = Entry#entry.type,
- version = Entry#entry.version}),
- case H of
- #w{} -> [H#w{entries = Es1} | T];
- #r{} -> [H#r{entries = Es1} | T]
- end;
- false ->
- [H | into_queue(Type, T, Entry)]
- end.
+in_queue(Q, A, Mode) ->
+ locks_server_lib:in_queue(Q, A, Mode).
+into_queue(Mode, Q, Entry) ->
+ locks_server_lib:into_queue(Mode, Q, Entry).
append_entry([#lock{queue = Q} = H|T], Entry, Mode, Vsn) ->
[H#lock{version = Vsn, queue = into_queue(Mode, Q, Entry)}
@@ -791,75 +610,8 @@ upgrade_entries([#entry{agent = A, version = Vsn} = E|Esr], Esw, Accr) ->
upgrade_entries(Esr, Esw, [E|Accr])
end.
-split_related(Related, ID) ->
- case length(ID) of
- 1 ->
- case Related of
- [#lock{object = [_]} = Mine|Ch] -> {[], Mine, Ch};
- Ch -> {[], [], Ch}
- end;
- 2 -> split2(Related, ID);
- 3 -> split3(Related, ID);
- 4 -> split4(Related, ID);
- L -> split(Related, L, ID)
- end.
-
-%%% =====
-
-%% Generic split, will call length/1 on each ID
-split(Locks, Len, ID) ->
- {Parents, Locks1} = split_ps(Locks, Len, []),
- split_(Locks1, ID, Parents).
-
-split_ps([#lock{object = I}=H|T], Len, Acc) when length(I) < Len ->
- split_ps(T, Len, [H|Acc]);
-split_ps(L, _, Acc) ->
- {lists:reverse(Acc), L}.
-
-split_([#lock{object = ID} = Mine|Ch], ID, Parents) ->
- {Parents, Mine, Ch};
-split_(Ch, ID, Parents) ->
- {Parents, #lock{object = ID}, Ch}.
-
-%% Optimized split for ID length 2
-split2([#lock{object = [_]} = H|T], ID) -> split2(T, [H], ID);
-split2(L, ID) -> split2(L, [], ID).
-
-split2([#lock{object = ID} = Mine|Ch], Ps, ID) -> {Ps, Mine, Ch};
-split2(Ch, Ps, _ID) -> {Ps, [], Ch}.
-
-%% Optimized split for ID length 3
-split3(L, ID) ->
- case L of
- [#lock{object = [_]} = P1, #lock{object = [_,_]} = P2|T] ->
- split3(T, [P1,P2], ID);
- [#lock{object = [_]} = P1|T] ->
- split3(T, [P1], ID);
- [#lock{object = [_,_]} = P2|T] ->
- split3(T, [P2], ID);
- [#lock{object = ID} = Mine|T] ->
- {[], Mine, T};
- [_|_] ->
- {[], [], L}
- end.
-
-split3([#lock{object = ID} = Mine|Ch], Ps, ID) -> {Ps, Mine, Ch};
-split3(Ch, Ps, _ID) -> {Ps, [], Ch}.
-
-split4(L, ID) ->
- {Ps, L1} = split4_ps(L, []),
- split4(L1, Ps, ID).
-
-split4_ps([#lock{object = [_]} = H|T], Acc) -> split4_ps(T, [H|Acc]);
-split4_ps([#lock{object = [_,_]} = H|T], Acc) -> split4_ps(T, [H|Acc]);
-split4_ps([#lock{object = [_,_,_]} = H|T], Acc) -> split4_ps(T, [H|Acc]);
-split4_ps(L, Acc) -> {lists:reverse(Acc), L}.
-
-split4([#lock{object = [_,_,_,_]} = Mine|Ch], Ps, _) -> {Ps, Mine, Ch};
-split4(Ch, Ps, _ID) -> {Ps, [], Ch}.
-
-event(_Line, _Msg, _St) ->
- ok.
+split_related(Related, Id) ->
+ locks_server_lib:split_related(Related, Id).
ets_insert(T, Data) -> ets:insert(T, Data).
ets_lookup(T, K) -> ets:lookup(T, K).
diff --git a/src/locks_server.hrl b/src/locks_server.hrl
new file mode 100644
index 0000000..8468071
--- /dev/null
+++ b/src/locks_server.hrl
@@ -0,0 +1,21 @@
+
+
+-define(LOCKS, locks_server_locks).
+-define(AGENTS, locks_server_agents).
+-define(ADMIN, locks_server_admin).
+
+-record(st, {tabs = {ets:new(?LOCKS, [public, named_table,
+ ordered_set, {keypos, 2}]),
+ ets:new(?AGENTS, [public, named_table, ordered_set]),
+ ets:new(?ADMIN, [public, named_table, set])},
+ monitors = #{},
+ notify_as = self()}).
+
+-record(surr, {id, mode, type, client, vsn, lock}).
+
+-define(event(E), event(?LINE, E, none)).
+-define(event(E, S), event(?LINE, E, S)).
+
+event(_Line, _Msg, _St) ->
+ ok.
+
diff --git a/src/locks_server_lib.erl b/src/locks_server_lib.erl
new file mode 100644
index 0000000..20c7d94
--- /dev/null
+++ b/src/locks_server_lib.erl
@@ -0,0 +1,296 @@
+-module(locks_server_lib).
+
+-export([
+ new_vsn/1
+ , remove_agent/2
+ , remove_agent_from_lock/3
+ , related_locks/2
+ , split_related/2
+ , in_queue/3
+ , into_queue/3
+ , notify/3
+ ]).
+-include("locks.hrl").
+-include("locks_server.hrl").
+
+new_vsn(Tab) ->
+ ets:update_counter(Tab, version, {2, 1}, {version, 1}).
+
+notify([#lock{queue = Q, watchers = W} = H|T], Me, Note) ->
+ ?event({notify, Q, W}),
+ Msg = #locks_info{lock = H, note = Note},
+ _ = [send(A, Msg) || A <- queue_agents(Q)],
+ _ = [send(P, Msg) || P <- W],
+ notify(T, Me, Note);
+notify([], _, _) ->
+ ok.
+
+send(Pid, Msg) when is_pid(Pid) ->
+ ?event({send, Pid, Msg}),
+ Pid ! Msg.
+%% send({Agent,_} = _A, Msg) when is_pid(Agent) ->
+%% ?event({send, Agent, Msg}),
+%% Agent ! Msg.
+
+%% In the case of agents waiting for lock upgrade, there may be
+%% more than one entry from a given agent. Since the agent performs
+%% considerable work on each lock info, it's much cheaper if we avoid
+%% sending duplicate notifications for a given lock.
+queue_agents(Q) ->
+ lists:usort([A || #entry{agent = A} <- queue_entries(Q)]).
+
+queue_entries(Q) ->
+ Res = queue_entries_(Q),
+ ?event({queue_entries, Q, Res}),
+ Res.
+
+queue_entries_([#r{entries = Es}|Q]) ->
+ Es ++ queue_entries_(Q);
+queue_entries_([#w{entries = Es}|Q]) ->
+ Es ++ queue_entries_(Q);
+queue_entries_([]) ->
+ [].
+
+in_queue([H|T], A, Mode) ->
+ case in_queue_(H, A, Mode) of
+ false -> in_queue(T, A, Mode);
+ true -> true
+ end;
+in_queue([], _, _) ->
+ false.
+
+in_queue_(#r{entries = Entries}, A, read) ->
+ lists:keymember(A, #entry.agent, Entries);
+in_queue_(#w{entries = Es}, A, M) when M==read; M==write ->
+ lists:keymember(A, #entry.agent, Es);
+in_queue_(_, _, _) ->
+ false.
+
+into_queue(read, [#r{entries = Entries}] = Q, Entry) ->
+ %% No pending write locks
+ case lists:keymember(Entry#entry.agent, #entry.agent, Entries) of
+ true -> Q;
+ false ->
+ [#r{entries = [Entry|Entries]}]
+ end;
+into_queue(read, [], Entry) ->
+ [#r{entries = [Entry]}];
+into_queue(write, [], Entry) ->
+ [#w{entries = [Entry]}];
+into_queue(write, [#r{entries = [Er]} = H], Entry) ->
+ if Entry#entry.agent == Er#entry.agent ->
+ %% upgrade to write lock
+ [#w{entries = [Entry]}];
+ true ->
+ [H, #w{entries = [Entry]}]
+ end;
+into_queue(write, [#w{entries = [#entry{agent = A}]}],
+ #entry{agent = A, type = direct} = Entry) ->
+ %% Refresh and ensure it's a direct lock
+ [#w{entries = [Entry]}];
+into_queue(Type, [H|T], #entry{agent = A, type = direct} = Entry) ->
+ case H of
+ #w{entries = [_,_|_]} ->
+ %% This means that there are multiple exclusive write locks at
+ %% a lower level; they can co-exist, but neither has an exclusive
+ %% claim at this level. Queue request
+ [H | into_queue(Type, T, Entry)];
+ #w{entries = Es} when Type == write; Type == read ->
+ %% If a matching entry exists, we set to new version and
+ %% set type to direct. This means we might get a direct write
+ %% lock even though we asked for a read lock.
+ maybe_refresh(Es, H, T, Type, A, Entry);
+ #r{entries = Es} when Type == read ->
+ maybe_refresh(Es, H, T, Type, A, Entry);
+ #r{entries = Es} when Type == write ->
+ %% A special case is when all agents holding read entries have
+ %% asked for an upgrade.
+ case lists:all(fun(#entry{agent = A1}) when A1 == A -> true;
+ (#entry{agent = A1}) -> in_queue(T, A1, write)
+ end, Es) of
+ true ->
+ %% discard all read entries
+ into_queue(write, T, Entry);
+ false ->
+ [H | into_queue(Type, T, Entry)]
+ end;
+ _ ->
+ [H | into_queue(Type, T, Entry)]
+ end;
+into_queue(Type, [H|T] = Q, Entry) ->
+ case in_queue_(H, Entry#entry.agent, Type) of
+ false ->
+ [H|into_queue(Type, T, Entry)];
+ true ->
+ Q
+ end.
+
+remove_agent(A, {Locks, Agents, Admin}) ->
+ case ets_select(Agents, [{ {{A,'$1'}}, [], [{{A,'$1'}}] }]) of
+ [] ->
+ [];
+ [_|_] = Found ->
+ ?event([{removing_agent, A}, {found, Found}]),
+ ets:select_delete(Agents, [{ {{A,'_'}}, [], [true] }]),
+ NewVsn = new_vsn(Admin),
+ remove_agent_(Found, Locks, NewVsn, [])
+ end.
+
+remove_agent_([{A, ID}|T], Locks, Vsn, Acc) ->
+ case ets_lookup(Locks, ID) of
+ [] ->
+ remove_agent_(T, Locks, Vsn, Acc);
+ [#lock{} = L] ->
+ case remove_agent_from_lock(A, L, Vsn) of
+ #lock{queue = [], watchers = []} ->
+ ets_delete(Locks, ID),
+ remove_agent_(T, Locks, Vsn, Acc);
+ #lock{} = L1 ->
+ remove_agent_(T, Locks, Vsn, [L1|Acc])
+ end
+ end;
+remove_agent_([], Locks, _Vsn, Acc) ->
+ ets_insert(Locks, [L || #lock{queue = Q,
+ watchers = Ws} = L <- Acc,
+ Q =/= [] orelse Ws =/= []]),
+ Acc.
+
+remove_agent_from_lock(A, #lock{queue = Q, watchers = Ws} = L, Vsn) ->
+ Q1 = trivial_lock_upgrade(
+ lists:foldr(
+ fun(#r{entries = [#entry{agent = Ax}]}, Acc1) when Ax == A ->
+ Acc1;
+ (#r{entries = Es}, Acc1) ->
+ case lists:keydelete(A, #entry.agent, Es) of
+ [] -> Acc1;
+ Es1 ->
+ [#r{entries = Es1} | Acc1]
+ end;
+ (#w{entries = Es}, Acc1) ->
+ case lists:keydelete(A, #entry.agent, Es) of
+ [] -> Acc1;
+ Es1 ->
+ [#w{entries = Es1} | Acc1]
+ end;
+ (E, Acc1) ->
+ [E|Acc1]
+ end, [], Q)),
+ L#lock{version = Vsn, queue = Q1,
+ watchers = Ws -- [A]}.
+
+trivial_lock_upgrade([#r{entries = [#entry{agent = A}]} |
+ [#w{entries = [#entry{agent = A}]} | _] = T]) ->
+ T;
+trivial_lock_upgrade([#r{entries = Es}|[_|_] = T] = Q) ->
+ %% Not so trivial, perhaps
+ case lists:all(fun(#entry{agent = A}) ->
+ in_queue(T, A, write)
+ end, Es) of
+ true ->
+ %% All agents holding the read lock are also waiting for an upgrade
+ trivial_lock_upgrade(T);
+ false ->
+ Q
+ end;
+trivial_lock_upgrade(Q) ->
+ Q.
+
+maybe_refresh(Es, H, T, Type, A, Entry) ->
+ case lists:keyfind(A, #entry.agent, Es) of
+ #entry{} = E ->
+ Es1 = lists:keyreplace(A, #entry.agent, Es,
+ E#entry{type = Entry#entry.type,
+ version = Entry#entry.version}),
+ case H of
+ #w{} -> [H#w{entries = Es1} | T];
+ #r{} -> [H#r{entries = Es1} | T]
+ end;
+ false ->
+ [H | into_queue(Type, T, Entry)]
+ end.
+
+related_locks(ID, T) ->
+ Pats = make_patterns(ID),
+ ets_select(T, Pats).
+
+make_patterns(ID) ->
+ make_patterns(ID, []).
+
+make_patterns([H|T], Acc) ->
+ ID = Acc ++ [H],
+ [{ #lock{object = ID, _ = '_'}, [], ['$_'] }
+ | make_patterns(T, ID)];
+make_patterns([], Acc) ->
+ [{ #lock{object = Acc ++ '_', _ = '_'}, [], ['$_'] }].
+
+split_related(Related, ID) ->
+ case length(ID) of
+ 1 ->
+ case Related of
+ [#lock{object = [_]} = Mine|Ch] -> {[], Mine, Ch};
+ Ch -> {[], [], Ch}
+ end;
+ 2 -> split2(Related, ID);
+ 3 -> split3(Related, ID);
+ 4 -> split4(Related, ID);
+ L -> split(Related, L, ID)
+ end.
+
+%%% =====
+
+%% Generic split, will call length/1 on each ID
+split(Locks, Len, ID) ->
+ {Parents, Locks1} = split_ps(Locks, Len, []),
+ split_(Locks1, ID, Parents).
+
+split_ps([#lock{object = I}=H|T], Len, Acc) when length(I) < Len ->
+ split_ps(T, Len, [H|Acc]);
+split_ps(L, _, Acc) ->
+ {lists:reverse(Acc), L}.
+
+split_([#lock{object = ID} = Mine|Ch], ID, Parents) ->
+ {Parents, Mine, Ch};
+split_(Ch, ID, Parents) ->
+ {Parents, #lock{object = ID}, Ch}.
+
+%% Optimized split for ID length 2
+split2([#lock{object = [_]} = H|T], ID) -> split2(T, [H], ID);
+split2(L, ID) -> split2(L, [], ID).
+
+split2([#lock{object = ID} = Mine|Ch], Ps, ID) -> {Ps, Mine, Ch};
+split2(Ch, Ps, _ID) -> {Ps, [], Ch}.
+
+%% Optimized split for ID length 3
+split3(L, ID) ->
+ case L of
+ [#lock{object = [_]} = P1, #lock{object = [_,_]} = P2|T] ->
+ split3(T, [P1,P2], ID);
+ [#lock{object = [_]} = P1|T] ->
+ split3(T, [P1], ID);
+ [#lock{object = [_,_]} = P2|T] ->
+ split3(T, [P2], ID);
+ [#lock{object = ID} = Mine|T] ->
+ {[], Mine, T};
+ [_|_] ->
+ {[], [], L}
+ end.
+
+split3([#lock{object = ID} = Mine|Ch], Ps, ID) -> {Ps, Mine, Ch};
+split3(Ch, Ps, _ID) -> {Ps, [], Ch}.
+
+split4(L, ID) ->
+ {Ps, L1} = split4_ps(L, []),
+ split4(L1, Ps, ID).
+
+split4_ps([#lock{object = [_]} = H|T], Acc) -> split4_ps(T, [H|Acc]);
+split4_ps([#lock{object = [_,_]} = H|T], Acc) -> split4_ps(T, [H|Acc]);
+split4_ps([#lock{object = [_,_,_]} = H|T], Acc) -> split4_ps(T, [H|Acc]);
+split4_ps(L, Acc) -> {lists:reverse(Acc), L}.
+
+split4([#lock{object = [_,_,_,_]} = Mine|Ch], Ps, _) -> {Ps, Mine, Ch};
+split4(Ch, Ps, _ID) -> {Ps, [], Ch}.
+
+ets_insert(T, Data) -> ets:insert(T, Data).
+ets_lookup(T, K) -> ets:lookup(T, K).
+ets_select(T, Pat) -> ets:select(T, Pat).
+ets_delete(T, K) -> ets:delete(T, K).
diff --git a/src/locks_ttb.erl b/src/locks_ttb.erl
index b76d98f..61dc305 100644
--- a/src/locks_ttb.erl
+++ b/src/locks_ttb.erl
@@ -1,7 +1,7 @@
-module(locks_ttb).
--compile(export_all).
-
+-compile([export_all, nowarn_export_all]).
+-dialyzer({nowarn_function, pp_term/1}).
%% This function is also traced. Can be used to insert markers in the trace
%% log.
@@ -19,6 +19,7 @@ trace_nodes(Ns, Patterns, Flags, Opts) ->
default_patterns() ->
[{locks_agent , event, 3, []},
+ {locks_server, event, 3, []},
{locks_leader, event, 3, []},
{?MODULE , event, 3, []}].
diff --git a/src/locks_window.erl b/src/locks_window.erl
new file mode 100644
index 0000000..b995b73
--- /dev/null
+++ b/src/locks_window.erl
@@ -0,0 +1,73 @@
+-module(locks_window).
+
+-export([new/1,
+ add/2,
+ to_list/1]).
+
+-export([log/2,
+ fetch_log/1]).
+
+-record(w, {n = 0,
+ max = 10,
+ a = [],
+ b = []}).
+
+new(#{n := N}) when is_integer(N), N > 0 ->
+ #w{max = N}.
+
+add(X, #w{n = N0, max = Max, a = A} = W) ->
+ A1 = [X|A],
+ case N0+1 of
+ N1 when N1 > Max ->
+ W#w{n = 0, a = [], b = A1};
+ N1 ->
+ W#w{n = N1, a = A1}
+ end.
+
+to_list(#w{a = A, b = B}) ->
+ lists:reverse(B) ++ lists:reverse(A).
+
+
+log(X, Sz) ->
+ W = add({erlang:system_time(millisecond), X}, get_log(Sz)),
+ put_log(W).
+
+-define(KEY, {?MODULE, w}).
+
+get_log(Sz) ->
+ case get(?KEY) of
+ undefined ->
+ new(#{n => Sz});
+ W ->
+ W
+ end.
+
+put_log(W) ->
+ put(?KEY, W).
+
+
+fetch_log(P) ->
+ PI = case w(P) of
+ undefined ->
+ undefined;
+ Pid when node(Pid) == node() ->
+ process_info(Pid, dictionary);
+ Pid when is_pid(Pid) ->
+ rpc:call(node(Pid), erlang, process_info, [Pid, dictionary])
+ end,
+ fetch_from_info(PI).
+
+fetch_from_info(undefined) ->
+ undefined;
+fetch_from_info({dictionary, D}) ->
+ case lists:keyfind(?KEY, 1, D) of
+ false ->
+ '$no_debug';
+ {_, W} ->
+ to_list(W)
+ end.
+
+w(P) when is_atom(P) ->
+ whereis(P);
+w(P) when is_pid(P) ->
+ P.
diff --git a/test/locks_leader_SUITE.erl b/test/locks_leader_SUITE.erl
index 35149c6..9332927 100644
--- a/test/locks_leader_SUITE.erl
+++ b/test/locks_leader_SUITE.erl
@@ -73,7 +73,6 @@ suite() ->
[].
init_per_suite(Config) ->
- compile_dict(),
application:start(sasl),
Config.
@@ -139,7 +138,13 @@ end_per_testcase(_Case, _Config) ->
%% Test cases
%% ============================================================
-local_dict(_Config) ->
+local_dict(Config) ->
+ with_trace(fun local_dict_/1, Config, "leader_test_local_dict").
+
+local_dict_(_Config) ->
+ dbg:tracer(),
+ dbg:tpl(locks_leader, x),
+ dbg:p(all,[c]),
Name = {gdict, ?LINE},
Dicts = lists:map(
fun(_) ->
@@ -191,20 +196,20 @@ gdict_simple_netsplit_(Config) ->
Dicts = lists:map(fun({ok,D}) -> D end, Results),
wait_for_dicts(Dicts),
[X, X] = [locks_leader:info(Dx, leader) || Dx <- Dicts],
- locks_ttb:event(initial_consensus),
+ locks_ttb:event({?LINE, initial_consensus}),
call_proxy(A, erlang, disconnect_node, [B]),
[] = call_proxy(A, erlang, nodes, []),
[] = call_proxy(B, erlang, nodes, []),
- locks_ttb:event(netsplit_ready),
+ locks_ttb:event({?LINE, netsplit_ready}),
wait_for_dicts(Dicts),
[L1,L2] = [locks_leader:info(Dx, leader) || Dx <- Dicts],
true = (L1 =/= L2),
- locks_ttb:event(reconnecting),
+ locks_ttb:event({?LINE, reconnecting}),
proxy_multicall(Ns, ?MODULE, unbar_nodes, []),
proxy_multicall(Ns, ?MODULE, connect_nodes, [Ns]),
[B] = call_proxy(A, erlang, nodes, []),
[Z,Z] = ?retry([Z,Z], call_proxy(A, ?MODULE, leader_nodes, [Dicts])),
- locks_ttb:event({leader_consensus, Ns, Z}),
+ locks_ttb:event({?LINE, leader_consensus, Ns, Z}),
proxy_multicall(Ns, application, stop, [locks]),
ok.
@@ -222,35 +227,35 @@ gdict_netsplit_(Config) ->
proxy_multicall([A,B], ?MODULE, disconnect_nodes, [Rest]),
[B] = call_proxy(A, erlang, nodes, []),
[A] = call_proxy(B, erlang, nodes, []),
- locks_ttb:event(netsplit_ready),
+ locks_ttb:event({?LINE, netsplit_ready}),
ok = lists:foreach(
fun(ok) -> ok end,
proxy_multicall(Ns, application, start, [locks])),
Results = proxy_multicall(Ns, gdict, new_opt, [[{resource, Name}]]),
[Da,Db|[Dc|_] = DRest] = Dicts = lists:map(fun({ok,Dx}) -> Dx end, Results),
- locks_ttb:event({dicts_created, lists:zip(Ns, Dicts)}),
+ locks_ttb:event({?LINE, dicts_created, lists:zip(Ns, Dicts)}),
ok = ?retry(ok, gdict:store(a, 1, Da)),
ok = gdict:store(b, 2, Dc),
{ok, 1} = ?retry({ok,1}, gdict:find(a, Db)),
error = gdict:find(a, Dc),
[X,X] = [locks_leader:info(Dx, leader) || Dx <- [Da,Db]],
- locks_ttb:event({leader_consensus, [Da,Db], X}),
+ locks_ttb:event({?LINE, leader_consensus, [Da,Db], X}),
RestLeaders = [locks_leader:info(Dx, leader) || Dx <- DRest],
[Y] = lists:usort(RestLeaders),
- locks_ttb:event({leader_consensus, DRest, Y}),
+ locks_ttb:event({?LINE, leader_consensus, DRest, Y}),
true = (X =/= Y),
lists:foreach(
fun(Dx) ->
{ok, 2} = ?retry({ok,2}, gdict:find(b, Dx))
end, DRest),
error = gdict:find(b, Da),
- locks_ttb:event(reconnecting),
+ locks_ttb:event({?LINE, reconnecting}),
proxy_multicall(Ns, ?MODULE, unbar_nodes, []),
proxy_multicall(Ns, ?MODULE, connect_nodes, [Ns]),
[B,C|_] = lists:sort(call_proxy(A, erlang, nodes, [])),
[Z] = ?retry([_],
lists:usort(call_proxy(A, ?MODULE, leader_nodes, [Dicts]))),
- locks_ttb:event({leader_consensus, Ns, Z}),
+ locks_ttb:event({?LINE, leader_consensus, Ns, Z}),
{ok, 1} = ?retry({ok,1}, gdict:find(a, Dc)),
{ok, 2} = ?retry({ok,2}, gdict:find(b, Da)),
[exit(Dx, kill) || Dx <- Dicts],
@@ -311,19 +316,14 @@ with_trace(F, Config, Name) ->
ttb_stop() ->
Dir = locks_ttb:stop(),
+ ct:log("Dir = ~p", [Dir]),
Out = filename:join(filename:dirname(Dir),
filename:basename(Dir) ++ ".txt"),
+ ct:log("Out = ~p", [Out]),
locks_ttb:format(Dir, Out),
ct:log("Formatted trace log in ~s~n", [Out]).
-compile_dict() ->
- Lib = filename:absname(code:lib_dir(locks)),
- Examples = filename:join(Lib, "examples"),
- _ = os:cmd(["cd ", Examples, " && rebar clean compile"]),
- _ = code:add_path(filename:join(Examples, "ebin")),
- ok.
-
maybe_connect(_, []) ->
ok;
maybe_connect(N, [{N1,_}|_]) ->
@@ -399,7 +399,7 @@ call_proxy(N, M, F, A) ->
end.
get_slave_nodes(Config) ->
- [N || {N,_} <- ?config(slaves, Config)].
+ [N || {N,_} <- proplists:get_value(slaves, Config, [])].
start_slaves(Ns) ->
Nodes = [start_slave(N) || N <- Ns],
@@ -473,7 +473,7 @@ patch_net_kernel() ->
{ok,net_kernel,Bin} = compile:forms(NewForms, [binary]),
code:unstick_dir(filename:dirname(NetKernel)),
{module, _Module} = Res = code:load_binary(net_kernel, NetKernel, Bin),
- locks_ttb:event({net_kernel, NewForms}),
+ locks_ttb:event({?LINE, net_kernel, NewForms}),
Res
catch
error:What ->
diff --git a/test/locks_tests.erl b/test/locks_tests.erl
index 1e7bdd7..948bc33 100755
--- a/test/locks_tests.erl
+++ b/test/locks_tests.erl
@@ -14,7 +14,7 @@
-module(locks_tests).
-include_lib("eunit/include/eunit.hrl").
--compile(export_all).
+-compile([export_all, nowarn_export_all]).
-import(lists,[map/2]).
@@ -31,6 +31,7 @@ run_test_() ->
?_test(simple_lock())
, ?_test(one_lock_two_clients())
, ?_test(one_lock_wrr_clients())
+ , ?_test(three_locks_three_clients())
, ?_test(lock_merge())
, ?_test(lock_upgrade1())
, ?_test(lock_upgrade2())
@@ -87,6 +88,50 @@ one_lock_two_clients() ->
{2, ?LINE, ?MODULE, client_result, [], match({ok, []})},
{2, ?LINE, ?MODULE, kill_client, [], match(ok)}]).
+three_locks_three_clients() ->
+ L1 = [?MODULE, ?LINE, l1],
+ L2 = [?MODULE, ?LINE, l2],
+ L3 = [?MODULE, ?LINE, l3],
+ Match = fun(normal, {have_all_locks,_}) -> ok;
+ (timeout, timeout ) -> ok
+ end,
+ SMatch = fun(normal, {have_all_locks,_}) -> ok;
+ (normal, waiting ) -> ok
+ end,
+ script([1,2,3],
+ [{1, ?LINE, locks, lock, ['$agent', L1, write], match({ok,[]})},
+ {1, ?LINE, locks, lock, ['$agent', L2, write], match({ok,[]})},
+ {2, ?LINE, locks, lock_nowait, ['$agent', L2, write], match(ok)},
+ {1, ?LINE, locks, lock_nowait, ['$agent', L3, write], match(ok)},
+ {3, ?LINE, locks, lock_nowait, ['$agent', L3, write], match(ok)},
+ {2, ?LINE, locks, lock_nowait, ['$agent', L3, write], match(ok)},
+ {3, ?LINE, locks, lock_nowait, ['$agent', L1, write], match(ok)},
+ {2, ?LINE, locks, lock_nowait, ['$agent', L1, write], match(ok)},
+ %%
+ {1, ?LINE, locks, await_all_locks, ['$agent'], 100, Match},
+ %% The next command must not get the same result as the previous.
+ {2, ?LINE, locks, transaction_status, ['$agent'], 100,
+ 'ALL'([SMatch, 'NEQ'('V'(-1))])},
+ %%
+ %% At this point, end the first two transactions
+ %%
+ {1, ?LINE, locks, end_transaction, ['$agent'], match(ok)},
+
+ {2, ?LINE, locks, end_transaction, ['$agent'], match(ok)},
+ %%
+ %% Now try to let 3 acquire last lock and finish
+ %%
+ {3, ?LINE, locks, await_all_locks, ['$agent'],
+ match({have_all_locks, []})},
+ {3, ?LINE, locks, lock, ['$agent', L2, write], match({ok,[]})},
+ {3, ?LINE, locks, await_all_locks, ['$agent'],
+ match({have_all_locks, []})},
+ %%
+ {1, ?LINE, ?MODULE, kill_client, [], match(ok)},
+ {2, ?LINE, ?MODULE, kill_client, [], match(ok)},
+ {3, ?LINE, ?MODULE, kill_client, [], match(ok)}
+ ]).
+
one_lock_wrr_clients() ->
L = [?MODULE, ?LINE],
script([1,2,3],
@@ -192,10 +237,10 @@ two_w_locks_parent_read_deadlock() ->
{1,?LINE, locks,lock, ['$agent',L0,read], 100, match(timeout,timeout)},
{2,?LINE, locks,lock, ['$agent',L0,read], 100, match(timeout,timeout)},
{1, ?LINE, ?MODULE, client_result, [],
- fun(normal, {ok, [_,_,_]}) -> ok end},
+ fun(normal, {ok, [_|_]}) -> ok end},
{1,?LINE, ?MODULE, kill_client, [], match(ok)},
{2, ?LINE, ?MODULE, client_result, [],
- fun(normal, {ok, [_,_,_]}) -> ok end},
+ fun(normal, {ok, [_|_]}) -> ok end},
{2,?LINE, ?MODULE, kill_client, [], match(ok)}
]).