24
24
#include " server/conn_context.h"
25
25
#include " server/container_utils.h"
26
26
#include " server/engine_shard_set.h"
27
+ #include " server/main_service.h"
27
28
#include " server/search/aggregator.h"
28
29
#include " server/search/doc_index.h"
30
+ #include " server/server_family.h"
29
31
#include " server/transaction.h"
30
32
#include " src/core/overloaded.h"
31
33
@@ -599,8 +601,116 @@ void SearchReply(const SearchParams& params, std::optional<search::AggregationIn
599
601
}
600
602
}
601
603
604
+ constexpr size_t kOptionsCount = 2 ;
605
+ constexpr std::string_view kSearchLimit = " MAXSEARCHRESULTS" sv;
606
+ constexpr std::string_view kAggregateLimit = " MAXAGGREGATERESULTS" sv;
607
+
608
+ template <typename V>
609
+ using ConfigOptionsMap = std::array<std::pair<const std::string_view, V>, kOptionsCount >;
610
+ using Config = ConfigOptionsMap<uint64_t >;
611
+
612
+ // Do not forget to update kConfigOptions after adding new option
613
+ constexpr ConfigOptionsMap<const std::string_view> kConfigOptionsHelp = {{
614
+ {kSearchLimit , " Maximum number of results from ft.search command" sv},
615
+ {kAggregateLimit , " Maximum number of results from ft.aggregate command" sv},
616
+ }};
617
+
618
+ // Do not forget to update kConfigOptionsHelp after adding new option
619
+ thread_local Config kConfigOptions = {{
620
+ {kSearchLimit , 10000 },
621
+ {kAggregateLimit , 10000 },
622
+ }};
623
+
624
+ static_assert (
625
+ kConfigOptions .size() == kConfigOptionsHelp .size() && kConfigOptions .size() == kOptionsCount ,
626
+ " kConfigOptions and kConfigOptionsHelp must have the same size and equal to kOptionsCount." );
627
+
628
+ template <typename V>
629
+ std::optional<V> FindOptionsMapValue (const ConfigOptionsMap<V>& options, std::string_view name) {
630
+ auto it = std::find_if (options.begin (), options.end (), [name](const auto & opt) {
631
+ return absl::EqualsIgnoreCase (opt.first , name);
632
+ });
633
+ return it != options.end () ? it->second : std::optional<V>{};
634
+ }
635
+
636
+ void UpdateConfigOption (std::string_view option_name, uint64_t value) {
637
+ auto it = std::find_if (
638
+ kConfigOptions .begin (), kConfigOptions .end (),
639
+ [option_name](const auto & opt) { return absl::EqualsIgnoreCase (opt.first , option_name); });
640
+
641
+ DCHECK (it != kConfigOptions .end ());
642
+ it->second = value;
643
+ }
644
+
645
+ void FtConfigHelp (CmdArgParser* parser, RedisReplyBuilder* rb) {
646
+ string_view option = parser->Next ();
647
+
648
+ auto send_value = [&](string_view option_name) {
649
+ auto value = FindOptionsMapValue (kConfigOptions , option_name);
650
+ DCHECK (value.has_value ());
651
+ rb->SendLong (value.value ());
652
+ };
653
+
654
+ if (option == " *" sv) {
655
+ rb->StartArray (kOptionsCount );
656
+ for (const auto & option_help : kConfigOptionsHelp ) {
657
+ rb->StartArray (5 );
658
+ rb->SendBulkString (option_help.first );
659
+ rb->SendBulkString (" Description" sv);
660
+ rb->SendBulkString (option_help.second );
661
+ rb->SendBulkString (" Value" sv);
662
+ send_value (option_help.first );
663
+ }
664
+ return ;
665
+ }
666
+
667
+ auto option_description = FindOptionsMapValue (kConfigOptionsHelp , option);
668
+ if (option_description) {
669
+ rb->StartArray (1 );
670
+ rb->StartArray (5 );
671
+ rb->SendBulkString (absl::AsciiStrToUpper (option));
672
+ rb->SendBulkString (" Description" sv);
673
+ rb->SendBulkString (option_description.value ());
674
+ rb->SendBulkString (" Value" sv);
675
+ send_value (option);
676
+ return ;
677
+ }
678
+
679
+ LOG (WARNING) << " Unknown configuration option: " << option;
680
+ rb->SendEmptyArray ();
681
+ }
682
+
683
+ void FtConfigGet (CmdArgParser* parser, RedisReplyBuilder* rb) {
684
+ string_view option = parser->Next ();
685
+
686
+ if (option == " *" sv) {
687
+ rb->StartArray (kOptionsCount );
688
+ for (const auto & option_help : kConfigOptions ) {
689
+ rb->StartArray (2 );
690
+ rb->SendBulkString (option_help.first );
691
+ rb->SendLong (option_help.second );
692
+ }
693
+ return ;
694
+ }
695
+
696
+ auto option_value = FindOptionsMapValue (kConfigOptions , option);
697
+ if (option_value) {
698
+ rb->StartArray (1 );
699
+ rb->StartArray (2 );
700
+ rb->SendBulkString (absl::AsciiStrToUpper (option));
701
+ rb->SendLong (option_value.value ());
702
+ return ;
703
+ }
704
+
705
+ LOG (WARNING) << " Unknown configuration option: " << option;
706
+ rb->SendEmptyArray ();
707
+ }
708
+
602
709
} // namespace
603
710
711
+ SearchFamily::SearchFamily (ServerFamily* server_family) : server_family_(server_family) {
712
+ }
713
+
604
714
void SearchFamily::FtCreate (CmdArgList args, const CommandContext& cmd_cntx) {
605
715
auto * builder = cmd_cntx.rb ;
606
716
if (cmd_cntx.conn_cntx ->conn_state .db_index != 0 ) {
@@ -815,6 +925,10 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
815
925
if (!search_algo.Init (query_str, ¶ms->query_params , sort_opt))
816
926
return builder->SendError (" Query syntax error" );
817
927
928
+ auto search_limit = FindOptionsMapValue (kConfigOptions , kSearchLimit );
929
+ DCHECK (search_limit.has_value ());
930
+ params->limit_total = std::min (params->limit_total , search_limit.value ());
931
+
818
932
// Because our coordinator thread may not have a shard, we can't check ahead if the index exists.
819
933
atomic<bool > index_not_found{false };
820
934
vector<SearchResult> docs (shard_set->size ());
@@ -966,6 +1080,50 @@ void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) {
966
1080
}
967
1081
}
968
1082
1083
+ void SearchFamily::FtConfigSet (CmdArgList args, const CommandContext& cmd_cntx) {
1084
+ CmdArgParser parser{args};
1085
+ auto * rb = static_cast <RedisReplyBuilder*>(cmd_cntx.rb );
1086
+
1087
+ string_view option = parser.Next ();
1088
+ uint64_t value = parser.Next <uint64_t >();
1089
+
1090
+ if (auto err = parser.Error (); err)
1091
+ return rb->SendError (err->MakeReply ());
1092
+
1093
+ auto option_exists = FindOptionsMapValue (kConfigOptions , option);
1094
+ if (option_exists) {
1095
+ DVLOG (2 ) << " Setting " << option << " to " << value;
1096
+
1097
+ auto cb = [option, value](util::ProactorBase*) { UpdateConfigOption (option, value); };
1098
+ server_family_->service ().proactor_pool ().AwaitFiberOnAll (std::move (cb));
1099
+
1100
+ // Schedule empty callback inorder to journal command via transaction framework.
1101
+ cmd_cntx.tx ->ScheduleSingleHop ([](auto * t, auto * shard) { return OpStatus::OK; });
1102
+
1103
+ rb->SendOk ();
1104
+ } else {
1105
+ rb->SendError (" Invalid option" sv);
1106
+ }
1107
+ }
1108
+
1109
+ void SearchFamily::FtConfig (CmdArgList args, const CommandContext& cmd_cntx) {
1110
+ CmdArgParser parser{args};
1111
+ auto * rb = static_cast <RedisReplyBuilder*>(cmd_cntx.rb );
1112
+
1113
+ if (parser.Check (" SET" )) {
1114
+ FtConfigSet (parser.Tail (), cmd_cntx);
1115
+ return ;
1116
+ }
1117
+
1118
+ auto func = parser.TryMapNext (" GET" , &FtConfigGet, " HELP" , &FtConfigHelp);
1119
+
1120
+ if (func) {
1121
+ return func.value ()(&parser, rb);
1122
+ } else {
1123
+ return rb->SendError (" Unknown subcommand" );
1124
+ }
1125
+ }
1126
+
969
1127
void SearchFamily::FtTagVals (CmdArgList args, const CommandContext& cmd_cntx) {
970
1128
string_view index_name = ArgS (args, 0 );
971
1129
string_view field_name = ArgS (args, 1 );
@@ -1052,11 +1210,19 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx)
1052
1210
auto * rb = static_cast <RedisReplyBuilder*>(cmd_cntx.rb );
1053
1211
auto sortable_value_sender = SortableValueSender (rb);
1054
1212
1055
- const size_t result_size = agg_results.values .size ();
1213
+ auto aggregate_limit = FindOptionsMapValue (kConfigOptions , kAggregateLimit );
1214
+ DCHECK (aggregate_limit.has_value ());
1215
+ size_t result_size = std::min (agg_results.values .size (), aggregate_limit.value ());
1216
+
1056
1217
rb->StartArray (result_size + 1 );
1057
1218
rb->SendLong (result_size);
1058
1219
1059
1220
for (const auto & value : agg_results.values ) {
1221
+ if (result_size == 0 ) {
1222
+ break ;
1223
+ }
1224
+ result_size--;
1225
+
1060
1226
size_t fields_count = 0 ;
1061
1227
for (const auto & field : agg_results.fields_to_print ) {
1062
1228
if (value.find (field) != value.end ()) {
@@ -1075,7 +1241,13 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx)
1075
1241
}
1076
1242
}
1077
1243
1078
- #define HFUNC (x ) SetHandler(&SearchFamily::x)
1244
+ using EngineFunc = void (SearchFamily::*)(CmdArgList args, const CommandContext& cmd_cntx);
1245
+
1246
+ inline CommandId::Handler3 HandlerFunc (SearchFamily* se, EngineFunc f) {
1247
+ return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); };
1248
+ }
1249
+
1250
+ #define HFUNC (x ) SetHandler(HandlerFunc(this , &SearchFamily::x))
1079
1251
1080
1252
// Redis search is a module. Therefore we introduce dragonfly extension search
1081
1253
// to set as the default for the search family of commands. More sensible defaults,
@@ -1089,18 +1261,19 @@ void SearchFamily::Register(CommandRegistry* registry) {
1089
1261
CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL;
1090
1262
1091
1263
registry->StartFamily ();
1092
- *registry << CI{" FT.CREATE" , CO::WRITE | CO::GLOBAL_TRANS, -2 , 0 , 0 , acl::FT_SEARCH}.HFUNC (
1093
- FtCreate)
1094
- << CI{" FT.ALTER" , CO::WRITE | CO::GLOBAL_TRANS, -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtAlter)
1095
- << CI{" FT.DROPINDEX" , CO::WRITE | CO::GLOBAL_TRANS, -2 , 0 , 0 , acl::FT_SEARCH}.HFUNC (
1096
- FtDropIndex)
1097
- << CI{" FT.INFO" , kReadOnlyMask , 2 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtInfo)
1098
- // Underscore same as in RediSearch because it's "temporary" (long time already)
1099
- << CI{" FT._LIST" , kReadOnlyMask , 1 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtList)
1100
- << CI{" FT.SEARCH" , kReadOnlyMask , -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtSearch)
1101
- << CI{" FT.AGGREGATE" , kReadOnlyMask , -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtAggregate)
1102
- << CI{" FT.PROFILE" , kReadOnlyMask , -4 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtProfile)
1103
- << CI{" FT.TAGVALS" , kReadOnlyMask , 3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtTagVals);
1264
+ *registry
1265
+ << CI{" FT.CREATE" , CO::WRITE | CO::GLOBAL_TRANS, -2 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtCreate)
1266
+ << CI{" FT.ALTER" , CO::WRITE | CO::GLOBAL_TRANS, -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtAlter)
1267
+ << CI{" FT.DROPINDEX" , CO::WRITE | CO::GLOBAL_TRANS, -2 , 0 , 0 , acl::FT_SEARCH}.HFUNC (
1268
+ FtDropIndex)
1269
+ << CI{" FT.CONFIG" , CO::WRITE | CO::GLOBAL_TRANS, -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtConfig)
1270
+ << CI{" FT.INFO" , kReadOnlyMask , 2 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtInfo)
1271
+ // Underscore same as in RediSearch because it's "temporary" (long time already)
1272
+ << CI{" FT._LIST" , kReadOnlyMask , 1 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtList)
1273
+ << CI{" FT.SEARCH" , kReadOnlyMask , -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtSearch)
1274
+ << CI{" FT.AGGREGATE" , kReadOnlyMask , -3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtAggregate)
1275
+ << CI{" FT.PROFILE" , kReadOnlyMask , -4 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtProfile)
1276
+ << CI{" FT.TAGVALS" , kReadOnlyMask , 3 , 0 , 0 , acl::FT_SEARCH}.HFUNC (FtTagVals);
1104
1277
}
1105
1278
1106
1279
} // namespace dfly
0 commit comments