Skip to content

Commit 1d82c85

Browse files
feat(search_family): Add basic support for the FT.CONFIG
fixes #4352 Signed-off-by: Stepan Bagritsevich <[email protected]>
1 parent ce5c44b commit 1d82c85

File tree

5 files changed

+282
-28
lines changed

5 files changed

+282
-28
lines changed

src/server/main_service.cc

+3-2
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,8 @@ Service::Service(ProactorPool* pp)
714714
: pp_(*pp),
715715
acl_family_(&user_registry_, pp),
716716
server_family_(this),
717-
cluster_family_(&server_family_) {
717+
cluster_family_(&server_family_),
718+
search_family_(&server_family_) {
718719
CHECK(pp);
719720
CHECK(shard_set == NULL);
720721

@@ -2694,10 +2695,10 @@ void Service::RegisterCommands() {
26942695
JsonFamily::Register(&registry_);
26952696
BitOpsFamily::Register(&registry_);
26962697
HllFamily::Register(&registry_);
2697-
SearchFamily::Register(&registry_);
26982698
BloomFamily::Register(&registry_);
26992699
server_family_.Register(&registry_);
27002700
cluster_family_.Register(&registry_);
2701+
search_family_.Register(&registry_);
27012702

27022703
// AclFamily should always be registered last
27032704
// If we add a new familly, register that first above and *not* below

src/server/main_service.h

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "server/command_registry.h"
1717
#include "server/config_registry.h"
1818
#include "server/engine_shard_set.h"
19+
#include "server/search/search_family.h"
1920
#include "server/server_family.h"
2021

2122
namespace util {
@@ -185,6 +186,7 @@ class Service : public facade::ServiceInterface {
185186
acl::AclFamily acl_family_;
186187
ServerFamily server_family_;
187188
cluster::ClusterFamily cluster_family_;
189+
SearchFamily search_family_;
188190
CommandRegistry registry_;
189191
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;
190192

src/server/search/search_family.cc

+182-14
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
#include "server/conn_context.h"
2525
#include "server/container_utils.h"
2626
#include "server/engine_shard_set.h"
27+
#include "server/main_service.h"
2728
#include "server/search/aggregator.h"
2829
#include "server/search/doc_index.h"
30+
#include "server/server_family.h"
2931
#include "server/transaction.h"
3032
#include "src/core/overloaded.h"
3133

@@ -599,8 +601,107 @@ void SearchReply(const SearchParams& params, std::optional<search::AggregationIn
599601
}
600602
}
601603

604+
constexpr size_t kOptionsCount = 2;
605+
constexpr std::string_view kSearchLimit = "MAXSEARCHRESULTS"sv;
606+
constexpr std::string_view kAggregateLimit = "MAXAGGREGATERESULTS"sv;
607+
608+
template <typename V>
609+
using ConfigOptionsMap = std::array<std::pair<const std::string_view, V>, kOptionsCount>;
610+
using Config = ConfigOptionsMap<uint64_t>;
611+
612+
// Do not forget to update kConfigOptions after adding new option
613+
constexpr ConfigOptionsMap<const std::string_view> kConfigOptionsHelp = {{
614+
{kSearchLimit, "Maximum number of results from ft.search command"sv},
615+
{kAggregateLimit, "Maximum number of results from ft.aggregate command"sv},
616+
}};
617+
618+
// Do not forget to update kConfigOptionsHelp after adding new option
619+
thread_local Config kConfigOptions = {{
620+
{kSearchLimit, 10000},
621+
{kAggregateLimit, 10000},
622+
}};
623+
624+
static_assert(
625+
kConfigOptions.size() == kConfigOptionsHelp.size() && kConfigOptions.size() == kOptionsCount,
626+
"kConfigOptions and kConfigOptionsHelp must have the same size and equal to kOptionsCount.");
627+
628+
template <typename V>
629+
std::optional<V> FindOptionsMapValue(const ConfigOptionsMap<V>& options, std::string_view name) {
630+
auto it = std::find_if(options.begin(), options.end(), [name](const auto& opt) {
631+
return absl::EqualsIgnoreCase(opt.first, name);
632+
});
633+
return it != options.end() ? it->second : std::optional<V>{};
634+
}
635+
636+
void FtConfigHelp(CmdArgParser* parser, RedisReplyBuilder* rb) {
637+
string_view option = parser->Next();
638+
639+
auto send_value = [&](string_view option_name) {
640+
auto value = FindOptionsMapValue(kConfigOptions, option_name);
641+
DCHECK(value.has_value());
642+
rb->SendLong(value.value());
643+
};
644+
645+
if (option == "*"sv) {
646+
rb->StartArray(kOptionsCount);
647+
for (const auto& option_help : kConfigOptionsHelp) {
648+
rb->StartArray(5);
649+
rb->SendBulkString(option_help.first);
650+
rb->SendBulkString("Description"sv);
651+
rb->SendBulkString(option_help.second);
652+
rb->SendBulkString("Value"sv);
653+
send_value(option_help.first);
654+
}
655+
return;
656+
}
657+
658+
auto option_description = FindOptionsMapValue(kConfigOptionsHelp, option);
659+
if (option_description) {
660+
rb->StartArray(1);
661+
rb->StartArray(5);
662+
rb->SendBulkString(absl::AsciiStrToUpper(option));
663+
rb->SendBulkString("Description"sv);
664+
rb->SendBulkString(option_description.value());
665+
rb->SendBulkString("Value"sv);
666+
send_value(option);
667+
return;
668+
}
669+
670+
LOG(WARNING) << "Unknown configuration option: " << option;
671+
rb->SendEmptyArray();
672+
}
673+
674+
void FtConfigGet(CmdArgParser* parser, RedisReplyBuilder* rb) {
675+
string_view option = parser->Next();
676+
677+
if (option == "*"sv) {
678+
rb->StartArray(kOptionsCount);
679+
for (const auto& option_help : kConfigOptions) {
680+
rb->StartArray(2);
681+
rb->SendBulkString(option_help.first);
682+
rb->SendLong(option_help.second);
683+
}
684+
return;
685+
}
686+
687+
auto option_value = FindOptionsMapValue(kConfigOptions, option);
688+
if (option_value) {
689+
rb->StartArray(1);
690+
rb->StartArray(2);
691+
rb->SendBulkString(absl::AsciiStrToUpper(option));
692+
rb->SendLong(option_value.value());
693+
return;
694+
}
695+
696+
LOG(WARNING) << "Unknown configuration option: " << option;
697+
rb->SendEmptyArray();
698+
}
699+
602700
} // namespace
603701

702+
SearchFamily::SearchFamily(ServerFamily* server_family) : server_family_(server_family) {
703+
}
704+
604705
void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) {
605706
auto* builder = cmd_cntx.rb;
606707
if (cmd_cntx.conn_cntx->conn_state.db_index != 0) {
@@ -815,6 +916,10 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
815916
if (!search_algo.Init(query_str, &params->query_params, sort_opt))
816917
return builder->SendError("Query syntax error");
817918

919+
auto search_limit = FindOptionsMapValue(kConfigOptions, kSearchLimit);
920+
DCHECK(search_limit.has_value());
921+
params->limit_total = std::min(params->limit_total, search_limit.value());
922+
818923
// Because our coordinator thread may not have a shard, we can't check ahead if the index exists.
819924
atomic<bool> index_not_found{false};
820925
vector<SearchResult> docs(shard_set->size());
@@ -966,6 +1071,54 @@ void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) {
9661071
}
9671072
}
9681073

1074+
void SearchFamily::FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx) {
1075+
CmdArgParser parser{args};
1076+
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
1077+
1078+
string_view option = parser.Next();
1079+
uint64_t value = parser.Next<uint64_t>();
1080+
1081+
if (auto err = parser.Error(); err)
1082+
return rb->SendError(err->MakeReply());
1083+
1084+
auto it = std::find_if(kConfigOptions.begin(), kConfigOptions.end(),
1085+
[option_name = option](const auto& opt) {
1086+
return absl::EqualsIgnoreCase(opt.first, option_name);
1087+
});
1088+
1089+
if (it != kConfigOptions.end()) {
1090+
DVLOG(2) << "Setting " << option << " to " << value;
1091+
1092+
auto cb = [&](util::ProactorBase*) { it->second = value; };
1093+
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
1094+
1095+
// Schedule empty callback inorder to journal command via transaction framework.
1096+
cmd_cntx.tx->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
1097+
1098+
rb->SendOk();
1099+
} else {
1100+
rb->SendError("Invalid option"sv);
1101+
}
1102+
}
1103+
1104+
void SearchFamily::FtConfig(CmdArgList args, const CommandContext& cmd_cntx) {
1105+
CmdArgParser parser{args};
1106+
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
1107+
1108+
if (parser.Check("SET")) {
1109+
FtConfigSet(parser.Tail(), cmd_cntx);
1110+
return;
1111+
}
1112+
1113+
auto func = parser.TryMapNext("GET", &FtConfigGet, "HELP", &FtConfigHelp);
1114+
1115+
if (func) {
1116+
return func.value()(&parser, rb);
1117+
} else {
1118+
return rb->SendError("Unknown subcommand");
1119+
}
1120+
}
1121+
9691122
void SearchFamily::FtTagVals(CmdArgList args, const CommandContext& cmd_cntx) {
9701123
string_view index_name = ArgS(args, 0);
9711124
string_view field_name = ArgS(args, 1);
@@ -1052,11 +1205,19 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx)
10521205
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
10531206
auto sortable_value_sender = SortableValueSender(rb);
10541207

1055-
const size_t result_size = agg_results.values.size();
1208+
auto aggregate_limit = FindOptionsMapValue(kConfigOptions, kAggregateLimit);
1209+
DCHECK(aggregate_limit.has_value());
1210+
size_t result_size = std::min(agg_results.values.size(), aggregate_limit.value());
1211+
10561212
rb->StartArray(result_size + 1);
10571213
rb->SendLong(result_size);
10581214

10591215
for (const auto& value : agg_results.values) {
1216+
if (result_size == 0) {
1217+
break;
1218+
}
1219+
result_size--;
1220+
10601221
size_t fields_count = 0;
10611222
for (const auto& field : agg_results.fields_to_print) {
10621223
if (value.find(field) != value.end()) {
@@ -1075,7 +1236,13 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx)
10751236
}
10761237
}
10771238

1078-
#define HFUNC(x) SetHandler(&SearchFamily::x)
1239+
using EngineFunc = void (SearchFamily::*)(CmdArgList args, const CommandContext& cmd_cntx);
1240+
1241+
inline CommandId::Handler3 HandlerFunc(SearchFamily* se, EngineFunc f) {
1242+
return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); };
1243+
}
1244+
1245+
#define HFUNC(x) SetHandler(HandlerFunc(this, &SearchFamily::x))
10791246

10801247
// Redis search is a module. Therefore we introduce dragonfly extension search
10811248
// to set as the default for the search family of commands. More sensible defaults,
@@ -1089,18 +1256,19 @@ void SearchFamily::Register(CommandRegistry* registry) {
10891256
CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL;
10901257

10911258
registry->StartFamily();
1092-
*registry << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
1093-
FtCreate)
1094-
<< CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter)
1095-
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
1096-
FtDropIndex)
1097-
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
1098-
// Underscore same as in RediSearch because it's "temporary" (long time already)
1099-
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)
1100-
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch)
1101-
<< CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate)
1102-
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile)
1103-
<< CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals);
1259+
*registry
1260+
<< CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate)
1261+
<< CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter)
1262+
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
1263+
FtDropIndex)
1264+
<< CI{"FT.CONFIG", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtConfig)
1265+
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
1266+
// Underscore same as in RediSearch because it's "temporary" (long time already)
1267+
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)
1268+
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch)
1269+
<< CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate)
1270+
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile)
1271+
<< CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals);
11041272
}
11051273

11061274
} // namespace dfly

src/server/search/search_family.h

+23-12
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,35 @@ class SinkReplyBuilder;
1414
} // namespace facade
1515

1616
namespace dfly {
17+
class ServerFamily;
1718
class CommandRegistry;
1819
struct CommandContext;
1920

2021
class SearchFamily {
21-
using SinkReplyBuilder = facade::SinkReplyBuilder;
22+
public:
23+
explicit SearchFamily(ServerFamily* server_family);
2224

23-
static void FtCreate(CmdArgList args, const CommandContext& cmd_cntx);
24-
static void FtAlter(CmdArgList args, const CommandContext& cmd_cntx);
25-
static void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx);
26-
static void FtInfo(CmdArgList args, const CommandContext& cmd_cntx);
27-
static void FtList(CmdArgList args, const CommandContext& cmd_cntx);
28-
static void FtSearch(CmdArgList args, const CommandContext& cmd_cntx);
29-
static void FtProfile(CmdArgList args, const CommandContext& cmd_cntx);
30-
static void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx);
31-
static void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx);
25+
void Register(CommandRegistry* registry);
3226

33-
public:
34-
static void Register(CommandRegistry* registry);
27+
private:
28+
using SinkReplyBuilder = facade::SinkReplyBuilder;
29+
30+
void FtCreate(CmdArgList args, const CommandContext& cmd_cntx);
31+
void FtAlter(CmdArgList args, const CommandContext& cmd_cntx);
32+
void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx);
33+
void FtInfo(CmdArgList args, const CommandContext& cmd_cntx);
34+
void FtList(CmdArgList args, const CommandContext& cmd_cntx);
35+
void FtSearch(CmdArgList args, const CommandContext& cmd_cntx);
36+
void FtProfile(CmdArgList args, const CommandContext& cmd_cntx);
37+
void FtConfig(CmdArgList args, const CommandContext& cmd_cntx);
38+
void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx);
39+
void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx);
40+
41+
// Uses server_family_ to synchronize config changes
42+
// Should not be registered
43+
void FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx);
44+
45+
ServerFamily* server_family_;
3546
};
3647

3748
} // namespace dfly

0 commit comments

Comments
 (0)