From fdf909cf0a4b85786260bcdc92fccc919d9408cf Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Sun, 29 Dec 2024 00:59:28 +0000 Subject: [PATCH] reject write at leader if conflict This should prevent spurious intra-cluster conflicts most of the time. It is not true consistency, however. spurious conflicts are still possible whenever the nodes in the cluster disagree on the current live set of other nodes. --- src/fabric/src/fabric_doc_update.erl | 185 ++++++++++++++++++++------- 1 file changed, 141 insertions(+), 44 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 1f5755de09e..36f08678613 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -22,7 +22,10 @@ doc_count, w, grouped_docs, - reply + reply, + update_options, + leaders = [], + started = [] }). go(_, [], _) -> @@ -33,10 +36,8 @@ go(DbName, AllDocs0, Opts) -> validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), GroupedDocs = lists:map( - fun({#shard{name = Name, node = Node} = Shard, Docs}) -> - Docs1 = untag_docs(Docs), - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}), - {Shard#shard{ref = Ref}, Docs} + fun({#shard{} = Shard, Docs}) -> + {Shard#shard{ref = make_ref()}, Docs} end, group_docs_by_shard(DbName, AllDocs) ), @@ -44,6 +45,7 @@ go(DbName, AllDocs0, Opts) -> RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), Acc0 = #acc{ + update_options = Options, waiting_count = length(Workers), doc_count = length(AllDocs), w = list_to_integer(W), @@ -51,7 +53,8 @@ go(DbName, AllDocs0, Opts) -> reply = dict:new() }, Timeout = fabric_util:request_timeout(), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of + Acc1 = start_leaders(Acc0), + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted; Health =:= error -> @@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) -> +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, Worker, #acc{} = Acc0) -> #acc{grouped_docs = GroupedDocs} = Acc0, NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) -> #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) -> % treat it like rexi_EXIT, the hope at least one copy will return successfully #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message(internal_server_error, Worker, #acc{} = Acc0) -> % happens when we fail to load validation functions in an RPC worker #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) -> {ok, Acc0}; handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> #acc{ - waiting_count = WaitingCount, doc_count = DocCount, w = W, grouped_docs = GroupedDocs, reply = DocReplyDict0 } = Acc0, - {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {Health, W, Reply} = dict:fold( - fun force_reply/3, - {ok, W, []}, - DocReplyDict - ), - {stop, {Health, Reply}}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of - continue -> - {ok, Acc0#acc{ - waiting_count = WaitingCount - 1, - grouped_docs = NewGrpDocs, - reply = DocReplyDict - }}; - {stop, W, FinalReplies} -> - {stop, {ok, FinalReplies}} - end; - _ -> - {ok, Acc0#acc{ - waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict - }} + {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), + IsLeader = lists:member(Worker#shard.ref, Acc0#acc.leaders), + DocReplyDict = append_update_replies(Docs, Replies, W, IsLeader ,DocReplyDict0), + Acc1 = Acc0#acc{grouped_docs = NewGrpDocs0, reply = DocReplyDict}, + Acc2 = remove_conflicts(Docs, Replies, Acc1), + NewGrpDocs = Acc2#acc.grouped_docs, + case skip_message(Acc2) of + {stop, Msg} -> + {stop, Msg}; + {ok, Acc3} -> + Acc4 = start_followers(Worker, Acc3), + case {Acc4#acc.waiting_count, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {Health, W, Reply} = dict:fold( + fun force_reply/3, + {ok, W, []}, + DocReplyDict + ), + {stop, {Health, Reply}}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of + continue -> + {ok, Acc4#acc{ + waiting_count = Acc4#acc.waiting_count - 1, + grouped_docs = NewGrpDocs + }}; + {stop, W, FinalReplies} -> + {stop, {ok, FinalReplies}} + end; + _ -> + {ok, Acc4#acc{ + waiting_count = Acc4#acc.waiting_count - 1, + grouped_docs = NewGrpDocs + }} + end end; handle_message({missing_stub, Stub}, _, _) -> throw({missing_stub, Stub}); @@ -318,13 +338,90 @@ group_docs_by_shard(DbName, Docs) -> ) ). -append_update_replies([], [], DocReplyDict) -> +%% use 'lowest' node that hosts this shard range as leader +is_leader(Worker, Workers) -> + Worker == lists:min([W#shard.node || W <- Workers, W#shard.range == Worker#shard.range]). + +start_leaders(#acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs} = Acc0, + {Workers, _} = lists:unzip(GroupedDocs), + LeaderRefs = lists:foldl( + fun({Worker, Docs}, RefAcc) -> + case is_leader(Worker, Workers) of + true -> + start_worker(Worker, Docs, Acc0), + [Worker#shard.ref | RefAcc]; + false -> + RefAcc + end + end, + [], + GroupedDocs + ), + Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}. + +start_followers(#shard{} = Leader, #acc{} = Acc0) -> + Followers = [ + {Worker, Docs} + || {Worker, Docs} <- Acc0#acc.grouped_docs, + Worker#shard.range == Leader#shard.range, + not lists:member(Worker#shard.ref, Acc0#acc.started) + ], + lists:foreach( + fun({Worker, Docs}) -> + start_worker(Worker, Docs, Acc0) + end, + Followers + ), + Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers], + Acc0#acc{started = lists:append([Started, Acc0#acc.started])}. + +start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(Ref) -> + #shard{name = Name, node = Node} = Worker, + #acc{update_options = UpdateOptions} = Acc0, + rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}), + ok; +start_worker(#shard{ref = undefined}, _Docs, #acc{}) -> + % for unit tests below. + ok. + +append_update_replies([], [], _W, _IsLeader, DocReplyDict) -> DocReplyDict; -append_update_replies([Doc | Rest], [], Dict0) -> +append_update_replies([Doc | Rest], [], W, IsLeader, Dict0) -> % icky, if replicated_changes only errors show up in result - append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + append_update_replies(Rest, [], W, IsLeader, dict:append(Doc, noreply, Dict0)); +append_update_replies([Doc | Rest1], [conflict | Rest2], W, true, Dict0) -> + %% fake conflict replies from followers as we won't ask them + append_update_replies( + Rest1, Rest2, W, true, dict:append_list(Doc, lists:duplicate(W, conflict), Dict0) + ); +append_update_replies([Doc | Rest1], [Reply | Rest2], W, IsLeader, Dict0) -> + append_update_replies(Rest1, Rest2, W, IsLeader, dict:append(Doc, Reply, Dict0)). + +%% leader found a conflict, remove that doc from the other (follower) workers, +%% removing the worker entirely if no docs remain. +remove_conflicts([], [], #acc{} = Acc0) -> + Acc0; +remove_conflicts([Doc | DocRest], [conflict | ReplyRest], #acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs0} = Acc0, + GroupedDocs1 = lists:foldl( + fun({Worker, Docs}, FoldAcc) -> + case lists:delete(Doc, Docs) of + [] -> + FoldAcc#acc{waiting_count = FoldAcc#acc.waiting_count - 1}; + Rest -> + [{Worker, Rest} | FoldAcc] + end + end, + [], + GroupedDocs0 + ), + Acc1 = Acc0#acc{grouped_docs = GroupedDocs1}, + remove_conflicts(DocRest, ReplyRest, Acc1); +remove_conflicts([_Doc | DocRest], [_Reply | ReplyRest], #acc{} = Acc0) -> + remove_conflicts(DocRest, ReplyRest, Acc0); +remove_conflicts([_Doc | DocRest], [], #acc{} = Acc0) -> + remove_conflicts(DocRest, [], Acc0). skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) -> {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),