Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/couch_index/src/couch_index_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

-export([root_dir/0, index_dir/2, index_file/3]).
-export([load_doc/3, sort_lib/1, hexsig/1]).
-export([get_purge_checkpoints/2, cleanup_purges/3]).

-include_lib("couch/include/couch_db.hrl").

Expand Down Expand Up @@ -72,3 +73,49 @@ sort_lib([{LName, LCode} | Rest], LAcc) ->

hexsig(Sig) ->
couch_util:to_hex(Sig).

% Helper function for indexes to get their purge checkpoints as signatures.
%
get_purge_checkpoints(DbName, Type) when is_binary(DbName), is_binary(Type) ->
couch_util:with_db(DbName, fun(Db) -> get_purge_checkpoints(Db, Type) end);
get_purge_checkpoints(Db, Type) when is_binary(Type) ->
Prefix = <<?LOCAL_DOC_PREFIX, "purge-", Type:(byte_size(Type))/binary, "-">>,
PrefixSize = byte_size(Prefix),
FoldFun = fun(#doc{id = Id}, Acc) ->
case Id of
<<Prefix:PrefixSize/binary, Sig/binary>> -> {ok, Acc#{Sig => Id}};
_ -> {stop, Acc}
end
end,
Opts = [{start_key, Prefix}],
{ok, Signatures = #{}} = couch_db:fold_local_docs(Db, FoldFun, #{}, Opts),
Signatures.

% Helper functions for indexes to clean their purge checkpoints.
%
cleanup_purges(DbName, #{} = Sigs, #{} = Checkpoints) when is_binary(DbName) ->
couch_util:with_db(DbName, fun(Db) ->
cleanup_purges(Db, Sigs, Checkpoints)
end);
cleanup_purges(Db, #{} = Sigs, #{} = CheckpointsMap) ->
InactiveMap = maps:without(maps:keys(Sigs), CheckpointsMap),
InactiveCheckpoints = maps:values(InactiveMap),
DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
lists:foreach(DeleteFun, InactiveCheckpoints).

delete_checkpoint(Db, DocId) ->
DbName = couch_db:name(Db),
LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
couch_log:debug(LogMsg, [?MODULE, DbName, DocId]),
try couch_db:open_doc(Db, DocId, []) of
{ok, Doc = #doc{}} ->
Deleted = Doc#doc{deleted = true, body = {[]}},
couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]);
{not_found, _} ->
ok
catch
Tag:Error ->
ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p",
couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]),
ok
end.
51 changes: 21 additions & 30 deletions src/couch_mrview/src/couch_mrview_cleanup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@

-export([
run/1,
cleanup_purges/3,
cleanup_indices/2
cleanup/2
]).

-include_lib("couch/include/couch_db.hrl").

run(Db) ->
Indices = couch_mrview_util:get_index_files(Db),
Checkpoints = couch_mrview_util:get_purge_checkpoints(Db),
Expand All @@ -28,15 +25,26 @@ run(Db) ->
ok = cleanup_purges(Db1, Sigs, Checkpoints),
ok = cleanup_indices(Sigs, Indices).

cleanup_purges(DbName, Sigs, Checkpoints) when is_binary(DbName) ->
couch_util:with_db(DbName, fun(Db) ->
cleanup_purges(Db, Sigs, Checkpoints)
end);
cleanup_purges(Db, #{} = Sigs, #{} = CheckpointsMap) ->
InactiveMap = maps:without(maps:keys(Sigs), CheckpointsMap),
InactiveCheckpoints = maps:values(InactiveMap),
DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
lists:foreach(DeleteFun, InactiveCheckpoints).
% erpc endpoint for fabric_index_cleanup:cleanup_indexes/2
%
cleanup(Dbs, #{} = Sigs) ->
try
lists:foreach(
fun(Db) ->
Indices = couch_mrview_util:get_index_files(Db),
Checkpoints = couch_mrview_util:get_purge_checkpoints(Db),
ok = cleanup_purges(Db, Sigs, Checkpoints),
ok = cleanup_indices(Sigs, Indices)
end,
Dbs
)
catch
error:database_does_not_exist ->
ok
end.

cleanup_purges(Db, Sigs, Checkpoints) ->
couch_index_util:cleanup_purges(Db, Sigs, Checkpoints).

cleanup_indices(#{} = Sigs, #{} = IndexMap) ->
Fun = fun(_, Files) -> lists:foreach(fun delete_file/1, Files) end,
Expand All @@ -54,20 +62,3 @@ delete_file(File) ->
couch_log:error(ErrLog, [?MODULE, File, Tag, Error]),
ok
end.

delete_checkpoint(Db, DocId) ->
DbName = couch_db:name(Db),
LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
couch_log:debug(LogMsg, [?MODULE, DbName, DocId]),
try couch_db:open_doc(Db, DocId, []) of
{ok, Doc = #doc{}} ->
Deleted = Doc#doc{deleted = true, body = {[]}},
couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]);
{not_found, _} ->
ok
catch
Tag:Error ->
ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p",
couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]),
ok
end.
38 changes: 17 additions & 21 deletions src/couch_mrview/src/couch_mrview_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
-export([get_local_purge_doc_id/1, get_value_from_options/2]).
-export([verify_view_filename/1, get_signature_from_filename/1]).
-export([get_signatures/1, get_purge_checkpoints/1, get_index_files/1]).
-export([get_signatures_from_ddocs/2]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
-export([index_file/2, compaction_file/2, open_file/1]).
Expand Down Expand Up @@ -94,40 +95,35 @@ get_signatures(DbName) when is_binary(DbName) ->
couch_util:with_db(DbName, fun get_signatures/1);
get_signatures(Db) ->
DbName = couch_db:name(Db),
% get_design_docs/1 returns ejson for clustered shards, and
% #full_doc_info{}'s for other cases.
{ok, DDocs} = couch_db:get_design_docs(Db),
% get_design_docs/1 returns ejson for clustered shards, and
% #full_doc_info{}'s for other cases. Both are transformed to #doc{} records
FoldFun = fun
({[_ | _]} = EJsonDoc, Acc) ->
Doc = couch_doc:from_json_obj(EJsonDoc),
{ok, Mrst} = ddoc_to_mrst(DbName, Doc),
Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
Acc#{Sig => true};
[Doc | Acc];
(#full_doc_info{} = FDI, Acc) ->
{ok, Doc} = couch_db:open_doc_int(Db, FDI, [ejson_body]),
{ok, Mrst} = ddoc_to_mrst(DbName, Doc),
Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
Acc#{Sig => true}
[Doc | Acc]
end,
DDocs1 = lists:foldl(FoldFun, [], DDocs),
get_signatures_from_ddocs(DbName, DDocs1).

% From a list of design #doc{} records returns signatures map: #{Sig => true}
%
get_signatures_from_ddocs(DbName, DDocs) when is_list(DDocs) ->
FoldFun = fun(#doc{} = Doc, Acc) ->
{ok, Mrst} = ddoc_to_mrst(DbName, Doc),
Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
Acc#{Sig => true}
end,
lists:foldl(FoldFun, #{}, DDocs).

% Returns a map of `Sig => DocId` elements for all the purge view
% checkpoint docs. Sig is a hex-encoded binary.
%
get_purge_checkpoints(DbName) when is_binary(DbName) ->
couch_util:with_db(DbName, fun get_purge_checkpoints/1);
get_purge_checkpoints(Db) ->
FoldFun = fun(#doc{id = Id}, Acc) ->
case Id of
<<?LOCAL_DOC_PREFIX, "purge-mrview-", Sig/binary>> ->
{ok, Acc#{Sig => Id}};
_ ->
{stop, Acc}
end
end,
Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mrview-">>}],
{ok, Signatures = #{}} = couch_db:fold_local_docs(Db, FoldFun, #{}, Opts),
Signatures.
couch_index_util:get_purge_checkpoints(Db, <<"mrview">>).

% Returns a map of `Sig => [FilePath, ...]` elements. Sig is a hex-encoded
% binary and FilePaths are lists as they intended to be passed to couch_file
Expand Down
11 changes: 9 additions & 2 deletions src/dreyfus/src/clouseau_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,17 @@ rename(DbName) ->
%% and an analyzer represented in a Javascript function in a design document.
%% `Sig` is used to check if an index description is changed,
%% and the index needs to be reconstructed.
-spec cleanup(DbName :: string_as_binary(_), ActiveSigs :: [sig()]) ->
-spec cleanup(DbName :: string_as_binary(_), SigList :: list() | SigMap :: #{sig() => true}) ->
ok.

cleanup(DbName, ActiveSigs) ->
% Compatibility clause to help when running search index cleanup during
% a mixed cluster state. Remove after version 3.6
%
cleanup(DbName, SigList) when is_list(SigList) ->
SigMap = #{Sig => true || Sig <- SigList},
cleanup(DbName, SigMap);
cleanup(DbName, #{} = SigMap) ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a _search_cleanup during a cluster upgrade will crash then? I think that's fine as they are rare but might be worth noting somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add an upgrade clause as well. It's easy enough in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an upgrade clause

ActiveSigs = maps:keys(SigMap),
gen_server:cast({cleanup, clouseau()}, {cleanup, DbName, ActiveSigs}).

%% a binary with value <<"tokens">>
Expand Down
129 changes: 40 additions & 89 deletions src/dreyfus/src/dreyfus_fabric_cleanup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,99 +14,50 @@

-module(dreyfus_fabric_cleanup).

-include("dreyfus.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").

-export([go/1]).
-export([go/1, go_local/3]).

go(DbName) ->
DesignDocs =
case fabric:design_docs(DbName) of
{ok, DDocs} when is_list(DDocs) ->
DDocs;
Else ->
couch_log:debug("Invalid design docs: ~p~n", [Else]),
[]
end,
ActiveSigs = lists:usort(
lists:flatmap(
fun active_sigs/1,
[couch_doc:from_json_obj(DD) || DD <- DesignDocs]
)
),
cleanup_local_purge_doc(DbName, ActiveSigs),
clouseau_rpc:cleanup(DbName, ActiveSigs),
ok.
case fabric_util:get_design_doc_records(DbName) of
{ok, DDocs} when is_list(DDocs) ->
Sigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs),
Shards = mem3:shards(DbName),
ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1, Shards),
Fun = fun(Node, Dbs, Acc) ->
erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs, Sigs], Node, Acc)
end,
Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode),
recv(DbName, Reqs, fabric_util:abs_request_timeout());
Error ->
couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE, DbName, Error]),
Error
end.

active_sigs(#doc{body = {Fields}} = Doc) ->
% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2
%
go_local(DbName, Dbs, #{} = Sigs) ->
try
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
{IndexNames, _} = lists:unzip(RawIndexes),
[
begin
{ok, Index} = dreyfus_index:design_doc_to_index(Doc, IndexName),
Index#index.sig
end
|| IndexName <- IndexNames
]
lists:foreach(
fun(Db) ->
Checkpoints = dreyfus_util:get_purge_checkpoints(Db),
ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints)
end,
Dbs
),
clouseau_rpc:cleanup(DbName, Sigs),
ok
catch
error:{badmatch, _Error} ->
[]
error:database_does_not_exist ->
ok
end.

cleanup_local_purge_doc(DbName, ActiveSigs) ->
{ok, BaseDir} = clouseau_rpc:get_root_dir(),
DbNamePattern = <<DbName/binary, ".*">>,
Pattern0 = filename:join([BaseDir, "shards", "*", DbNamePattern, "*"]),
Pattern = binary_to_list(iolist_to_binary(Pattern0)),
DirListStrs = filelib:wildcard(Pattern),
DirList = [iolist_to_binary(DL) || DL <- DirListStrs],
LocalShards = mem3:local_shards(DbName),
ActiveDirs = lists:foldl(
fun(LS, AccOuter) ->
lists:foldl(
fun(Sig, AccInner) ->
DirName = filename:join([BaseDir, LS#shard.name, Sig]),
[DirName | AccInner]
end,
AccOuter,
ActiveSigs
)
end,
[],
LocalShards
),

DeadDirs = DirList -- ActiveDirs,
lists:foreach(
fun(IdxDir) ->
Sig = dreyfus_util:get_signature_from_idxdir(IdxDir),
case Sig of
undefined ->
ok;
_ ->
DocId = dreyfus_util:get_local_purge_doc_id(Sig),
LocalShards = mem3:local_shards(DbName),
lists:foreach(
fun(LS) ->
ShardDbName = LS#shard.name,
{ok, ShardDb} = couch_db:open_int(ShardDbName, []),
case couch_db:open_doc(ShardDb, DocId, []) of
{ok, LocalPurgeDoc} ->
couch_db:update_doc(
ShardDb,
LocalPurgeDoc#doc{deleted = true},
[?ADMIN_CTX]
);
{not_found, _} ->
ok
end,
couch_db:close(ShardDb)
end,
LocalShards
)
end
end,
DeadDirs
).
recv(DbName, Reqs, Timeout) ->
case erpc:receive_response(Reqs, Timeout, true) of
{ok, _Lable, Reqs1} ->
recv(DbName, Reqs1, Timeout);
{Error, Label, Reqs1} ->
ErrMsg = "~p : error cleaning dreyfus indexes db:~p req:~p error:~p",
couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]),
recv(DbName, Reqs1, Timeout);
no_request ->
ok
end.
Loading