From fe67267038bdb1c41a0ecdef03eea2f37fc94077 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Fri, 2 Jan 2026 22:53:45 +0000 Subject: [PATCH] Add support for using Redis MONITOR inputs files as the commands used during memtier execution --- README.md | 21 +++ client.cpp | 58 +++++++ config_types.cpp | 85 ++++++++++ config_types.h | 26 ++- memtier_benchmark.cpp | 148 +++++++++++++++- memtier_benchmark.h | 3 + protocol.cpp | 27 +++ tests/test_monitor_input.py | 327 ++++++++++++++++++++++++++++++++++++ 8 files changed, 684 insertions(+), 11 deletions(-) create mode 100644 tests/test_monitor_input.py diff --git a/README.md b/README.md index 0f506ee..710112e 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,27 @@ $ memtier_benchmark --help for command line options. +### Using monitor input files + +You can replay real command streams by pointing memtier_benchmark to a monitor log file with the `--monitor-input=/path/to/file` option. Special commands such as `__monitor_c1__` pick a specific entry from the file, while `__monitor_c@__` selects commands at runtime (optionally combined with `--monitor-pattern` and `--command-ratio`). For example, the following command replays the first command from the file on each request: + +``` +$ memtier_benchmark --monitor-input=monitor.txt --command=__monitor_c1__ +``` + +This lets you mix synthetic workloads with realistic captured traffic in both standalone and Redis Cluster deployments. + +To generate monitor logs, you can use the Redis `MONITOR` command from `redis-cli`, which prints all commands received by the server. For example: + +``` +$ redis-cli MONITOR +OK +1460100081.165665 [0 127.0.0.1:51706] "set" "shipment:8000736522714:status" "sorting" +1460100083.053365 [0 127.0.0.1:51707] "get" "shipment:8000736522714:status" +``` + +You can pipe this output and filter specific patterns with tools such as `grep`, then save it to a file and use it as a `--monitor-input` source. For more details, see the official Redis documentation on [monitoring commands executed in Redis](https://redis.io/docs/latest/develop/tools/cli/#monitor-commands-executed-in-redis). + ## Crash Reporting memtier_benchmark includes built-in crash handling that automatically generates detailed bug reports when the program crashes. If you encounter a crash, the tool will print a comprehensive report including: diff --git a/client.cpp b/client.cpp index 27400e1..89e1d2e 100755 --- a/client.cpp +++ b/client.cpp @@ -279,6 +279,64 @@ bool client::create_arbitrary_request(unsigned int command_index, struct timeval benchmark_debug_log("%s: %s:\n", m_connections[conn_id]->get_readable_id(), cmd.command.c_str()); + // Check if this is a monitor command placeholder - handle it specially + if (cmd.command_args.size() == 1 && cmd.command_args[0].type == monitor_random_type) { + // Select a command from the monitor file at runtime based on the monitor pattern + size_t selected_index = 0; + const std::string* monitor_cmd_ptr = NULL; + if (m_config->monitor_pattern == 'R') { + monitor_cmd_ptr = &m_config->monitor_commands->get_random_command(&selected_index); + benchmark_debug_log("%s: random monitor command selected (q%zu): %s\n", + m_connections[conn_id]->get_readable_id(), + selected_index + 1, // 1-based index for user display + monitor_cmd_ptr->c_str()); + } else { + monitor_cmd_ptr = &m_config->monitor_commands->get_next_sequential_command(&selected_index); + benchmark_debug_log("%s: sequential monitor command selected (q%zu): %s\n", + m_connections[conn_id]->get_readable_id(), + selected_index + 1, // 1-based index for user display + monitor_cmd_ptr->c_str()); + } + + const std::string& monitor_cmd = *monitor_cmd_ptr; + + // Parse and format the monitor command into a temporary arbitrary_command + arbitrary_command temp_cmd(monitor_cmd.c_str()); + if (!temp_cmd.split_command_to_args()) { + fprintf(stderr, "error: failed to parse random monitor command at runtime: %s\n", monitor_cmd.c_str()); + return false; + } + + // Format the command for the protocol (adds RESP headers) + if (!m_connections[conn_id]->get_protocol()->format_arbitrary_command(temp_cmd)) { + fprintf(stderr, "error: failed to format random monitor command at runtime: %s\n", monitor_cmd.c_str()); + return false; + } + + // Send the randomly selected command + for (unsigned int i = 0; i < temp_cmd.command_args.size(); i++) { + const command_arg* arg = &temp_cmd.command_args[i]; + if (arg->type == const_type) { + cmd_size += m_connections[conn_id]->send_arbitrary_command(arg); + } else if (arg->type == key_type) { + unsigned long long key_index; + get_key_response res = get_key_for_conn(command_index, conn_id, &key_index); + assert(res == available_for_conn); + cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_obj_gen->get_key(), m_obj_gen->get_key_len()); + } else if (arg->type == data_type) { + unsigned int value_len; + const char *value = m_obj_gen->get_value(0, &value_len); + assert(value != NULL); + assert(value_len > 0); + cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, value, value_len); + } + } + + m_connections[conn_id]->send_arbitrary_command_end(command_index, ×tamp, cmd_size); + return true; + } + + // Normal arbitrary command handling for (unsigned int i = 0; i < cmd.command_args.size(); i++) { const command_arg* arg = &cmd.command_args[i]; if (arg->type == const_type) { diff --git a/config_types.cpp b/config_types.cpp index 2dcef1f..33cb308 100644 --- a/config_types.cpp +++ b/config_types.cpp @@ -489,3 +489,88 @@ bool arbitrary_command::split_command_to_args() { err: return false; } + +// Monitor command list implementation +bool monitor_command_list::load_from_file(const char* filename) { + FILE* file = fopen(filename, "r"); + if (!file) { + fprintf(stderr, "error: failed to open monitor input file: %s\n", filename); + return false; + } + + char line[65536]; // Large buffer for monitor lines + size_t total_lines = 0; + while (fgets(line, sizeof(line), file)) { + total_lines++; + // Find the first quote - this is where the command starts + char* first_quote = strchr(line, '"'); + if (!first_quote) { + continue; // Skip lines without commands + } + + // Extract everything from first quote to end of line + // We keep the quotes as-is to avoid re-parsing + std::string command_str(first_quote); + + // Remove trailing newline if present + if (!command_str.empty() && command_str[command_str.length() - 1] == '\n') { + command_str.erase(command_str.length() - 1); + } + if (!command_str.empty() && command_str[command_str.length() - 1] == '\r') { + command_str.erase(command_str.length() - 1); + } + + commands.push_back(command_str); + } + + fclose(file); + + if (commands.empty()) { + fprintf(stderr, "error: no commands found in monitor input file: %s\n", filename); + return false; + } + + fprintf(stderr, "Loaded %zu monitor commands from %zu total lines\n", commands.size(), total_lines); + return true; +} + +const std::string& monitor_command_list::get_command(size_t index) const { + if (index >= commands.size()) { + static std::string empty; + return empty; + } + return commands[index]; +} + +const std::string& monitor_command_list::get_random_command() const { + if (commands.empty()) { + static std::string empty; + return empty; + } + size_t random_index = rand() % commands.size(); + return commands[random_index]; +} + +const std::string& monitor_command_list::get_random_command(size_t* out_index) const { + if (commands.empty()) { + static std::string empty; + if (out_index) *out_index = 0; + return empty; + } + size_t random_index = rand() % commands.size(); + if (out_index) *out_index = random_index; + return commands[random_index]; +} + + const std::string& monitor_command_list::get_next_sequential_command(size_t* out_index) { + if (commands.empty()) { + static std::string empty; + if (out_index) *out_index = 0; + return empty; + } + // Use a global sequential index across all clients/threads. + size_t index = next_index.fetch_add(1, std::memory_order_relaxed); + index = index % commands.size(); + if (out_index) *out_index = index; + return commands[index]; + } diff --git a/config_types.h b/config_types.h index b8ad862..330bc6b 100644 --- a/config_types.h +++ b/config_types.h @@ -106,18 +106,24 @@ struct server_addr { #define KEY_PLACEHOLDER "__key__" #define DATA_PLACEHOLDER "__data__" +#define MONITOR_PLACEHOLDER_PREFIX "__monitor_c" +#define MONITOR_RANDOM_PLACEHOLDER "__monitor_c@__" enum command_arg_type { const_type = 0, key_type = 1, data_type = 2, - undefined_type = 3 + monitor_type = 3, + monitor_random_type = 4, + undefined_type = 5 }; struct command_arg { - command_arg(const char* arg, unsigned int arg_len) : type(undefined_type), data(arg, arg_len), has_key_affixes(false) {;} + command_arg(const char* arg, unsigned int arg_len) : type(undefined_type), data(arg, arg_len), monitor_index(0), has_key_affixes(false) {;} command_arg_type type; std::string data; + // For monitor_type, stores the index (1-based) + size_t monitor_index; // the prefix and suffix strings are used for mixed key placeholder storing of substrings std::string data_prefix; std::string data_suffix; @@ -183,4 +189,20 @@ struct arbitrary_command_list { } }; +struct monitor_command_list { +private: + std::vector commands; + std::atomic next_index; + +public: + monitor_command_list() : next_index(0) {;} + + bool load_from_file(const char* filename); + const std::string& get_command(size_t index) const; + const std::string& get_random_command() const; + const std::string& get_random_command(size_t* out_index) const; + const std::string& get_next_sequential_command(size_t* out_index); + size_t size() const { return commands.size(); } +}; + #endif /* _CONFIG_TYPES_H */ diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index 8afdb98..bc4fa79 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -72,6 +73,7 @@ #include #include #include +#include #include "client.h" #include "JSON_handler.h" @@ -79,7 +81,7 @@ #include "memtier_benchmark.h" -static int log_level = 0; +int log_level = 0; // Global flag for signal handling static volatile sig_atomic_t g_interrupted = 0; @@ -587,6 +589,8 @@ static void config_init_defaults(struct benchmark_config *cfg) cfg->hdr_prefix = ""; if (!cfg->print_percentiles.is_defined()) cfg->print_percentiles = config_quantiles("50,99,99.9"); + if (!cfg->monitor_pattern) + cfg->monitor_pattern = 'S'; #ifdef USE_TLS if (!cfg->tls_protocols) @@ -702,6 +706,8 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf o_command, o_command_key_pattern, o_command_ratio, + o_monitor_input, + o_monitor_pattern, o_tls, o_tls_cert, o_tls_key, @@ -788,6 +794,8 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf { "command", 1, 0, o_command }, { "command-key-pattern", 1, 0, o_command_key_pattern }, { "command-ratio", 1, 0, o_command_ratio }, + { "monitor-input", 1, 0, o_monitor_input }, + { "monitor-pattern", 1, 0, o_monitor_pattern }, { "rate-limiting", 1, 0, o_rate_limiting }, { "uri", 1, 0, o_uri }, { NULL, 0, 0, 0 } @@ -1208,14 +1216,23 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf cfg->cluster_mode = true; break; case o_command: { - // add new arbitrary command - arbitrary_command cmd(optarg); - - if (cmd.split_command_to_args()) { + // Check if this is a monitor placeholder + const char* cmd_str = optarg; + if (strcmp(cmd_str, MONITOR_RANDOM_PLACEHOLDER) == 0 || + strncmp(cmd_str, MONITOR_PLACEHOLDER_PREFIX, strlen(MONITOR_PLACEHOLDER_PREFIX)) == 0) { + // This is a monitor placeholder, we'll expand it later after loading the file + arbitrary_command cmd(cmd_str); + cmd.split_command_to_args(); cfg->arbitrary_commands->add_command(cmd); } else { - fprintf(stderr, "error: failed to parse arbitrary command.\n"); - return -1; + // Regular arbitrary command + arbitrary_command cmd(cmd_str); + if (cmd.split_command_to_args()) { + cfg->arbitrary_commands->add_command(cmd); + } else { + fprintf(stderr, "error: failed to parse arbitrary command.\n"); + return -1; + } } break; } @@ -1247,6 +1264,22 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf } break; } + case o_monitor_input: + cfg->monitor_input = optarg; + break; + case o_monitor_pattern: { + if (!optarg || strlen(optarg) != 1) { + fprintf(stderr, "error: monitor-pattern must be a single character: 'S' (sequential) or 'R' (random).\n"); + return -1; + } + char pattern = toupper((unsigned char)optarg[0]); + if (pattern != 'S' && pattern != 'R') { + fprintf(stderr, "error: monitor-pattern must be 'S' (sequential) or 'R' (random).\n"); + return -1; + } + cfg->monitor_pattern = pattern; + break; + } case o_rate_limiting: { endptr = NULL; cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10); @@ -1400,6 +1433,11 @@ void usage() { " Z for zipf distribution (will limit keys to positive).\n" " S for Sequential.\n" " P for Parallel (Sequential were each client has a subset of the key-range).\n" + " --monitor-input=FILE Read commands from Redis MONITOR output file.\n" + " Commands can be referenced as __monitor_c1__, __monitor_c2__, etc.\n" + " Use __monitor_c@__ to select commands from the file.\n" + " By default, selection is sequential; use --monitor-pattern=R for random.\n" + " For example: --monitor-input=monitor.txt --command=\"__monitor_c1__\"\n" "\n" "Object Options:\n" " -d --data-size=SIZE Object data size in bytes (default: 32)\n" @@ -1927,13 +1965,85 @@ int main(int argc, char *argv[]) benchmark_config cfg = benchmark_config(); cfg.arbitrary_commands = new arbitrary_command_list(); + cfg.monitor_commands = new monitor_command_list(); if (config_parse_args(argc, argv, &cfg) < 0) { usage(); } + // Load monitor input file if specified + if (cfg.monitor_input) { + if (!cfg.monitor_commands->load_from_file(cfg.monitor_input)) { + exit(1); + } + + // Seed random for monitor random selection + srand(time(NULL) ^ getpid()); + + // Expand monitor placeholders in commands + for (unsigned int i = 0; i < cfg.arbitrary_commands->size(); i++) { + arbitrary_command& cmd = cfg.arbitrary_commands->at(i); + + // Check if command is a random monitor placeholder + if (strcmp(cmd.command.c_str(), MONITOR_RANDOM_PLACEHOLDER) == 0) { + // Mark this command as random monitor type - will be expanded at runtime + cmd.command_args.clear(); + command_arg arg(MONITOR_RANDOM_PLACEHOLDER, strlen(MONITOR_RANDOM_PLACEHOLDER)); + arg.type = monitor_random_type; + arg.monitor_index = 0; // 0 means random + cmd.command_args.push_back(arg); + cmd.command_name = "MONITOR_RANDOM"; + continue; + } + + // Check if command is a specific monitor placeholder + if (strncmp(cmd.command.c_str(), MONITOR_PLACEHOLDER_PREFIX, strlen(MONITOR_PLACEHOLDER_PREFIX)) == 0) { + // Extract the index from __monitor_cN__ + const char* num_start = cmd.command.c_str() + strlen(MONITOR_PLACEHOLDER_PREFIX); + char* endptr; + long index = strtol(num_start, &endptr, 10); + + if (endptr == num_start || index < 1 || (size_t)index > cfg.monitor_commands->size()) { + fprintf(stderr, "error: invalid monitor placeholder '%s' (valid range: q1-q%zu or q@)\n", + cmd.command.c_str(), cfg.monitor_commands->size()); + exit(1); + } + + // Replace command with the one from monitor file (0-based index) + const std::string& monitor_cmd = cfg.monitor_commands->get_command(index - 1); + cmd.command = monitor_cmd; + cmd.command_args.clear(); + + // Re-parse the command + if (!cmd.split_command_to_args()) { + fprintf(stderr, "error: failed to parse monitor command: %s\n", monitor_cmd.c_str()); + exit(1); + } + + // Update command name (first word of the command) + size_t pos = cmd.command.find(" "); + if (pos == std::string::npos) { + pos = cmd.command.size(); + } + cmd.command_name.assign(cmd.command.c_str(), pos); + // Remove quotes if present + if (cmd.command_name.length() > 0 && cmd.command_name[0] == '"') { + cmd.command_name = cmd.command_name.substr(1); + } + if (cmd.command_name.length() > 0 && cmd.command_name[cmd.command_name.length()-1] == '"') { + cmd.command_name = cmd.command_name.substr(0, cmd.command_name.length()-1); + } + std::transform(cmd.command_name.begin(), cmd.command_name.end(), cmd.command_name.begin(), ::toupper); + } + } + } + // Process URI if provided if (cfg.uri) { + // Save original values before URI parsing + const char *orig_server = cfg.server; + const char *orig_authenticate = cfg.authenticate; + // Check for conflicts with individual connection parameters if (cfg.server && strcmp(cfg.server, "localhost") != 0) { fprintf(stderr, "warning: both URI and --host/--server specified, URI takes precedence.\n"); @@ -1957,6 +2067,15 @@ int main(int argc, char *argv[]) exit(1); } + // If URI parsing didn't change these values, clear them so we don't try to free them later + // (they point to optarg or static strings, not malloc'd memory) + if (cfg.server == orig_server) { + cfg.server = NULL; + } + if (cfg.authenticate == orig_authenticate) { + cfg.authenticate = NULL; + } + // Validate cluster mode constraints if (cfg.cluster_mode && cfg.select_db > 0) { fprintf(stderr, "error: database selection not supported in cluster mode. Redis Cluster only supports database 0.\n"); @@ -1982,15 +2101,22 @@ int main(int argc, char *argv[]) // if user configure arbitrary commands, format and prepare it for (unsigned int i=0; isize(); i++) { + arbitrary_command& cmd = cfg.arbitrary_commands->at(i); + + // Skip formatting for random monitor commands - they will be formatted at runtime + if (cmd.command_args.size() == 1 && cmd.command_args[0].type == monitor_random_type) { + continue; + } + abstract_protocol* tmp_protocol = protocol_factory(cfg.protocol); assert(tmp_protocol != NULL); - if (!tmp_protocol->format_arbitrary_command(cfg.arbitrary_commands->at(i))) { + if (!tmp_protocol->format_arbitrary_command(cmd)) { exit(1); } // Cluster mode supports only a single key commands - if (cfg.cluster_mode && cfg.arbitrary_commands->at(i).keys_count > 1) { + if (cfg.cluster_mode && cmd.keys_count > 1) { benchmark_error_log("error: Cluster mode supports only a single key commands\n"); exit(1); } @@ -2400,6 +2526,10 @@ int main(int argc, char *argv[]) delete cfg.arbitrary_commands; } + if (cfg.monitor_commands != NULL) { + delete cfg.monitor_commands; + } + // Clean up dynamically allocated strings from URI parsing if (cfg.uri) { if (cfg.server) { diff --git a/memtier_benchmark.h b/memtier_benchmark.h index 231c8db..8b0bf0a 100644 --- a/memtier_benchmark.h +++ b/memtier_benchmark.h @@ -112,6 +112,9 @@ struct benchmark_config { const char *json_out_file; bool cluster_mode; struct arbitrary_command_list* arbitrary_commands; + const char *monitor_input; + struct monitor_command_list* monitor_commands; + char monitor_pattern; const char *hdr_prefix; unsigned int request_rate; unsigned int request_per_interval; diff --git a/protocol.cpp b/protocol.cpp index cb6e3c7..e48f534 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -31,6 +31,9 @@ #include "memtier_benchmark.h" #include "libmemcached_protocol/binary.h" +// External log level for debug output +extern int log_level; + ///////////////////////////////////////////////////////////////////////// abstract_protocol::abstract_protocol() : @@ -476,6 +479,30 @@ int redis_protocol::parse_response(void) while (true) { switch (m_response_state) { case rs_initial: + // Debug: dump raw response buffer + if (log_level > 0) { + size_t buf_len = evbuffer_get_length(m_read_buf); + if (buf_len > 0) { + unsigned char *buf_data = evbuffer_pullup(m_read_buf, buf_len); + fprintf(stderr, "RAW RESPONSE [%zu bytes]: ", buf_len); + for (size_t i = 0; i < buf_len && i < 512; i++) { + if (buf_data[i] >= 32 && buf_data[i] <= 126) { + fprintf(stderr, "%c", buf_data[i]); + } else if (buf_data[i] == '\r') { + fprintf(stderr, "\\r"); + } else if (buf_data[i] == '\n') { + fprintf(stderr, "\\n"); + } else { + fprintf(stderr, "\\x%02x", buf_data[i]); + } + } + if (buf_len > 512) { + fprintf(stderr, "... [truncated]"); + } + fprintf(stderr, "\n"); + } + } + // clear last response m_last_response.clear(); m_response_len = 0; diff --git a/tests/test_monitor_input.py b/tests/test_monitor_input.py new file mode 100644 index 0000000..6325cf7 --- /dev/null +++ b/tests/test_monitor_input.py @@ -0,0 +1,327 @@ +import tempfile +import os +from include import * +from mb import Benchmark, RunConfig + + +def test_monitor_input_specific_command(env): + """ + Test that memtier_benchmark can use specific commands from a monitor input file. + + This test: + 1. Creates a monitor input file with multiple commands + 2. Uses __monitor_c1__ to select the first command (SET) + 3. Verifies the command executes correctly + """ + # Create monitor input file + test_dir = tempfile.mkdtemp() + monitor_file = os.path.join(test_dir, "monitor.txt") + with open(monitor_file, "w") as f: + f.write( + '[ proxy49 ] 1764031576.604009 [0 172.16.10.147:51682] "SET" "key1" "value1"\n' + ) + f.write('[ proxy47 ] 1764031576.603223 [0 172.16.10.147:39564] "GET" "key1"\n') + f.write( + '[ proxy48 ] 1764031576.605123 [0 172.16.10.147:41234] "HSET" "myhash" "field1" "value1"\n' + ) + f.write( + '[ proxy50 ] 1764031576.606456 [0 172.16.10.147:42567] "LPUSH" "mylist" "item1"\n' + ) + f.write( + '[ proxy51 ] 1764031576.607789 [0 172.16.10.147:43890] "SADD" "myset" "member1"\n' + ) + + # Configure memtier to use the first command from monitor file + benchmark_specs = { + "name": env.testName, + "args": [ + "--monitor-input={}".format(monitor_file), + "--command=__monitor_c1__", # Use first command (SET) + ], + } + addTLSArgs(benchmark_specs, env) + + config = get_default_memtier_config(threads=1, clients=1, requests=100) + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # Run memtier_benchmark + memtier_ok = benchmark.run() + + # Verify success + debugPrintMemtierOnError(config, env) + env.assertTrue(memtier_ok == True) + env.assertTrue(os.path.isfile("{0}/mb.stdout".format(config.results_dir))) + env.assertTrue(os.path.isfile("{0}/mb.stderr".format(config.results_dir))) + + # Check that stderr shows the monitor file was loaded + with open("{0}/mb.stderr".format(config.results_dir)) as stderr: + stderr_content = stderr.read() + env.assertTrue("Loaded 5 monitor commands from 5 total lines" in stderr_content) + + + # Verify the key was created in Redis (standalone and OSS cluster-safe) + master_nodes_connections = env.getOSSMasterNodesConnectionList() + found = False + for master_connection in master_nodes_connections: + try: + result = master_connection.execute_command("GET", "key1") + except Exception: + # In cluster mode, non-owner shards may reply MOVED/ASK; ignore and continue + continue + if result == b"value1": + found = True + break + env.assertTrue(found) + + +def test_monitor_input_random_runtime(env): + """ + Test that __monitor_c@__ picks random commands at runtime. + + This test: + 1. Creates a monitor input file with multiple different command types + 2. Uses __monitor_c@__ to randomly select commands at runtime + 3. Verifies that multiple different command types were executed + """ + # Create monitor input file with diverse commands + test_dir = tempfile.mkdtemp() + monitor_file = os.path.join(test_dir, "monitor.txt") + with open(monitor_file, "w") as f: + f.write( + '[ proxy49 ] 1764031576.604009 [0 172.16.10.147:51682] "SET" "key1" "value1"\n' + ) + f.write('[ proxy47 ] 1764031576.603223 [0 172.16.10.147:39564] "GET" "key1"\n') + f.write( + '[ proxy48 ] 1764031576.605123 [0 172.16.10.147:41234] "HSET" "myhash" "field1" "value1"\n' + ) + f.write( + '[ proxy50 ] 1764031576.606456 [0 172.16.10.147:42567] "LPUSH" "mylist" "item1"\n' + ) + f.write( + '[ proxy51 ] 1764031576.607789 [0 172.16.10.147:43890] "SADD" "myset" "member1"\n' + ) + + # Configure memtier to use random commands from monitor file + benchmark_specs = { + "name": env.testName, + "args": [ + "--monitor-input={}".format(monitor_file), + "--command=__monitor_c@__", # Command selection at runtime + "--monitor-pattern=R", # Random selection + ], + } + addTLSArgs(benchmark_specs, env) + + config = get_default_memtier_config(threads=2, clients=2, requests=100) + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # Run memtier_benchmark + memtier_ok = benchmark.run() + + # Verify success + debugPrintMemtierOnError(config, env) + env.assertTrue(memtier_ok == True) + env.assertTrue(os.path.isfile("{0}/mb.stdout".format(config.results_dir))) + env.assertTrue(os.path.isfile("{0}/mb.stderr".format(config.results_dir))) + + # Check that stderr shows the monitor file was loaded + with open("{0}/mb.stderr".format(config.results_dir)) as stderr: + stderr_content = stderr.read() + env.assertTrue("Loaded 5 monitor commands from 5 total lines" in stderr_content) + + # Verify that multiple different data types were created in Redis + # This proves that different commands were executed + master_nodes_connections = env.getOSSMasterNodesConnectionList() + types_found = set() + + for master_connection in master_nodes_connections: + # Check for different key types + keys_to_check = [ + ("key1", "string"), + ("myhash", "hash"), + ("mylist", "list"), + ("myset", "set"), + ] + + for key, expected_type in keys_to_check: + try: + key_type = master_connection.execute_command("TYPE", key) + if isinstance(key_type, bytes): + key_type = key_type.decode("utf-8") + if key_type == expected_type: + types_found.add(expected_type) + except: + pass + + # We should have at least 2 different types, proving randomization worked + env.debugPrint("Types found: {}".format(types_found), True) + env.assertTrue(len(types_found) >= 2) + if len(types_found) < 2: + env.debugPrint( + "Expected at least 2 different data types, found: {}".format(types_found), + True, + ) + + +def test_monitor_input_sequential_default(env): + """ + Test that __monitor_c@__ picks commands sequentially when monitor-pattern is explicitly set to S. + + This test: + 1. Creates a monitor input file with multiple SET commands for the same key but different values + 2. Uses __monitor_c@__ with --monitor-pattern=S (sequential pattern, which is also the default) + 3. Verifies that the commands are applied in sequential order (with wrap-around) + """ + # Create monitor input file with sequential SET commands + test_dir = tempfile.mkdtemp() + monitor_file = os.path.join(test_dir, "monitor.txt") + with open(monitor_file, "w") as f: + f.write( + '[ proxy60 ] 1764031576.604009 [0 172.16.10.147:51682] "SET" "seq_key" "v1"\n' + ) + f.write( + '[ proxy61 ] 1764031576.605123 [0 172.16.10.147:41234] "SET" "seq_key" "v2"\n' + ) + f.write( + '[ proxy62 ] 1764031576.606456 [0 172.16.10.147:42567] "SET" "seq_key" "v3"\n' + ) + + # Configure memtier to use sequential commands from monitor file with explicit pattern S + benchmark_specs = { + "name": env.testName, + "args": [ + "--monitor-input={}".format(monitor_file), + "--command=__monitor_c@__", # Sequential selection + "--monitor-pattern=S", # Explicit sequential pattern + ], + } + addTLSArgs(benchmark_specs, env) + + # 4 requests: expect sequence q1, q2, q3, q1 + config = get_default_memtier_config(threads=1, clients=1, requests=4) + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # Run memtier_benchmark + memtier_ok = benchmark.run() + + # Verify success + debugPrintMemtierOnError(config, env) + env.assertTrue(memtier_ok == True) + env.assertTrue(os.path.isfile("{0}/mb.stdout".format(config.results_dir))) + env.assertTrue(os.path.isfile("{0}/mb.stderr".format(config.results_dir))) + + # Verify the final value for seq_key corresponds to the expected sequence with wrap-around (v1) + # This must work both on standalone and OSS cluster deployments. + master_nodes_connections = env.getOSSMasterNodesConnectionList() + found = False + for master_connection in master_nodes_connections: + try: + result = master_connection.execute_command("GET", "seq_key") + except Exception: + # In cluster mode, non-owner shards may reply MOVED/ASK; ignore and continue + continue + if result == b"v1": + found = True + break + env.assertTrue(found) + + +def test_monitor_input_mixed_commands(env): + """ + Test mixing specific and random monitor commands with command ratios. + + This test: + 1. Creates a monitor input file + 2. Uses 30% __monitor_c1__ (specific SET command) and 70% __monitor_c@__ (random) + 3. Verifies both command types execute correctly + """ + # Create monitor input file + test_dir = tempfile.mkdtemp() + monitor_file = os.path.join(test_dir, "monitor.txt") + with open(monitor_file, "w") as f: + f.write( + '[ proxy49 ] 1764031576.604009 [0 172.16.10.147:51682] "SET" "key1" "value1"\n' + ) + f.write('[ proxy47 ] 1764031576.603223 [0 172.16.10.147:39564] "GET" "key1"\n') + f.write( + '[ proxy48 ] 1764031576.605123 [0 172.16.10.147:41234] "HSET" "myhash" "field1" "value1"\n' + ) + f.write( + '[ proxy50 ] 1764031576.606456 [0 172.16.10.147:42567] "LPUSH" "mylist" "item1"\n' + ) + f.write( + '[ proxy51 ] 1764031576.607789 [0 172.16.10.147:43890] "SADD" "myset" "member1"\n' + ) + + # Configure memtier with mixed commands + benchmark_specs = { + "name": env.testName, + "args": [ + "--monitor-input={}".format(monitor_file), + "--command=__monitor_c1__", + "--command-ratio=30", + "--command=__monitor_c@__", + "--command-ratio=70", + ], + } + addTLSArgs(benchmark_specs, env) + + config = get_default_memtier_config(threads=1, clients=1, requests=100) + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # Run memtier_benchmark + memtier_ok = benchmark.run() + + # Verify success + debugPrintMemtierOnError(config, env) + env.assertTrue(memtier_ok == True) + env.assertTrue(os.path.isfile("{0}/mb.stdout".format(config.results_dir))) + env.assertTrue(os.path.isfile("{0}/mb.stderr".format(config.results_dir))) + + # Check that stderr shows the monitor file was loaded + with open("{0}/mb.stderr".format(config.results_dir)) as stderr: + stderr_content = stderr.read() + env.assertTrue("Loaded 5 monitor commands from 5 total lines" in stderr_content) + + + # Verify key1 exists (from the specific SET command) in a cluster-safe way + master_nodes_connections = env.getOSSMasterNodesConnectionList() + found = False + for master_connection in master_nodes_connections: + try: + result = master_connection.execute_command("GET", "key1") + except Exception: + # In cluster mode, non-owner shards may reply MOVED/ASK; ignore and continue + continue + if result == b"value1": + found = True + break + env.assertTrue(found)