Skip to content
76 changes: 70 additions & 6 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@

#include <math.h>
#include <algorithm>
#include <sstream>
#include <arpa/inet.h>

#include "client.h"
#include "cluster_client.h"
#include "config_types.h"


bool client::setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *objgen)
Expand Down Expand Up @@ -284,7 +286,47 @@ bool client::create_arbitrary_request(unsigned int command_index, struct timeval
get_key_response res = get_key_for_conn(command_index, conn_id, &key_index);
/* If key not available for this connection, we have a bug of sending partial request */
assert(res == available_for_conn);
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_obj_gen->get_key(), m_obj_gen->get_key_len());

//when we have static data mixed with the key placeholder
if (arg->has_key_affixes) {
// Pre-calculate total length to avoid reallocations
const char* key = m_obj_gen->get_key();
unsigned int key_len = m_obj_gen->get_key_len();
size_t prefix_len = arg->data_prefix.length();
size_t suffix_len = arg->data_suffix.length();
size_t total_len = prefix_len + key_len + suffix_len;

// Optimization: use stack buffer for small keys to avoid heap allocation
constexpr size_t STACK_BUFFER_SIZE = 512;
if (total_len < STACK_BUFFER_SIZE) {
char stack_buffer[STACK_BUFFER_SIZE];
char* pos = stack_buffer;

// Manual copy for better performance
if (prefix_len > 0) {
memcpy(pos, arg->data_prefix.data(), prefix_len);
pos += prefix_len;
}
memcpy(pos, key, key_len);
pos += key_len;
if (suffix_len > 0) {
memcpy(pos, arg->data_suffix.data(), suffix_len);
}

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, stack_buffer, total_len);
} else {
// Fallback to string for large keys
std::string combined_key;
combined_key.reserve(total_len);
combined_key.append(arg->data_prefix);
combined_key.append(key, key_len);
combined_key.append(arg->data_suffix);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, combined_key.c_str(), combined_key.length());
}
} else{
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_obj_gen->get_key(), m_obj_gen->get_key_len());
}
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);
Expand Down Expand Up @@ -462,11 +504,33 @@ void client::handle_response(unsigned int conn_id, struct timeval timestamp,
break;
case rt_arbitrary: {
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
m_stats.update_arbitrary_op(&timestamp,
response->get_total_len(),
request->m_size,
ts_diff(request->m_sent_time, timestamp),
ar->index);

// Detect cache misses for arbitrary commands based on protocol response
const arbitrary_command& cmd = get_arbitrary_command(ar->index);
unsigned int hits = 0;
unsigned int misses = 0;

if (cmd.keys_count > 0) { // Only check for commands that access keys
if (response->is_cache_miss()) {
misses = cmd.keys_count;
} else {
hits = cmd.keys_count;
}

// Use the new method that tracks hits/misses
m_stats.update_arbitrary_op(&timestamp,
response->get_total_len(),
request->m_size,
ts_diff(request->m_sent_time, timestamp),
ar->index, hits, misses);
} else {
// For commands without keys, use the original method
m_stats.update_arbitrary_op(&timestamp,
response->get_total_len(),
request->m_size,
ts_diff(request->m_sent_time, timestamp),
ar->index);
}
break;
}
default:
Expand Down
7 changes: 6 additions & 1 deletion config_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,14 @@ enum command_arg_type {
};

struct command_arg {
command_arg(const char* arg, unsigned int arg_len) : type(undefined_type), data(arg, arg_len) {;}
command_arg(const char* arg, unsigned int arg_len) : type(undefined_type), data(arg, arg_len), has_key_affixes(false) {;}
command_arg_type type;
std::string data;
// the prefix and suffix strings are used for mixed key placeholder storing of substrings
std::string data_prefix;
std::string data_suffix;
// optimization flag to avoid runtime checks
bool has_key_affixes;
};

struct arbitrary_command {
Expand Down
15 changes: 14 additions & 1 deletion memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,20 @@ int main(int argc, char *argv[])
obj_gen->set_key_range(cfg.key_minimum, cfg.key_maximum);
}
if (cfg.key_stddev>0 || cfg.key_median>0) {
if (cfg.key_pattern[key_pattern_set]!='G' && cfg.key_pattern[key_pattern_get]!='G') {
// Check if Gaussian distribution is used in global key patterns or arbitrary commands
bool has_gaussian = (cfg.key_pattern[key_pattern_set]=='G' || cfg.key_pattern[key_pattern_get]=='G');

// Also check if any arbitrary command uses Gaussian distribution
if (!has_gaussian && cfg.arbitrary_commands->is_defined()) {
for (size_t i = 0; i < cfg.arbitrary_commands->size(); i++) {
if (cfg.arbitrary_commands->at(i).key_pattern == 'G') {
has_gaussian = true;
break;
}
}
}

if (!has_gaussian) {
fprintf(stderr, "error: key-stddev and key-median are only allowed together with key-pattern set to G.\n");
usage();
}
Expand Down
61 changes: 56 additions & 5 deletions protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,39 @@ unsigned int protocol_response::get_hits(void)
return m_hits;
}

bool protocol_response::is_cache_miss()
{
// Protocol-based miss detection - works for ALL Redis commands
// Based on Redis protocol response patterns, not specific commands

if (!m_status) return false; // No status means no response yet

// Check Redis protocol response types that indicate "empty" or "not found"
switch (m_status[0]) {
case '$': // Bulk string response
// $-1 = null bulk string (key doesn't exist)
if (m_status[1] == '-') return true;
// $0 = empty string (key exists but empty) - this is a HIT, not a miss
return false;

case '*': // Array response
// *-1 = null array (shouldn't happen in practice)
// *0 = empty array (no elements found) - typically a miss
if (m_status[1] == '-' || m_status[1] == '0') return true;
return false;

case ':': // Integer response
// :0 = zero (count/length is zero) - typically a miss
if (m_status[1] == '0') return true;
return false;

case '+': // Simple string (OK, etc.) - not a miss
case '-': // Error response - not a miss
default:
return false;
}
}

void protocol_response::clear(void)
{
if (m_status != NULL) {
Expand Down Expand Up @@ -711,13 +744,31 @@ bool redis_protocol::format_arbitrary_command(arbitrary_command &cmd) {
current_arg->type = const_type;

// check arg type
if (current_arg->data.find(KEY_PLACEHOLDER) != std::string::npos) {
if (current_arg->data.length() != strlen(KEY_PLACEHOLDER)) {
benchmark_error_log("error: key placeholder can't combined with other data\n");
return false;
}
const std::size_t key_placeholder_start = current_arg->data.find(KEY_PLACEHOLDER);
if (key_placeholder_start != std::string::npos) {
cmd.keys_count++;
current_arg->type = key_type;

// Optimize: avoid substr() calls and use string_view-like approach
constexpr size_t key_placeholder_len = sizeof(KEY_PLACEHOLDER) - 1; // compile-time constant
const std::size_t suffix_start = key_placeholder_start + key_placeholder_len;

// Only create strings if there's actually prefix/suffix data
bool has_prefix = (key_placeholder_start > 0);
bool has_suffix = (suffix_start < current_arg->data.length());
current_arg->has_key_affixes = has_prefix || has_suffix;

if (has_prefix) {
current_arg->data_prefix.assign(current_arg->data, 0, key_placeholder_start);
} else {
current_arg->data_prefix.clear();
}

if (has_suffix) {
current_arg->data_suffix.assign(current_arg->data, suffix_start, current_arg->data.length() - suffix_start);
} else {
current_arg->data_suffix.clear();
}
} else if (current_arg->data.find(DATA_PLACEHOLDER) != std::string::npos) {
if (current_arg->data.length() != strlen(DATA_PLACEHOLDER)) {
benchmark_error_log("error: data placeholder can't combined with other data\n");
Expand Down
3 changes: 3 additions & 0 deletions protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ struct protocol_response {
void incr_hits(void);
unsigned int get_hits(void);

// Check if response indicates a cache miss based on Redis protocol patterns
bool is_cache_miss();

void clear();

void set_mbulk_value(mbulk_size_el* element);
Expand Down
68 changes: 62 additions & 6 deletions run_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,22 @@ void run_stats::update_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, u
hdr_record_value_capped(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t request_index, unsigned int hits, unsigned int misses) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_op(bytes_rx, bytes_tx, latency, hits, misses);
m_cur_stats.m_total_cmd.update_op(bytes_rx, bytes_tx, latency, hits, misses);
m_totals.update_op(bytes_rx, bytes_tx, latency, hits, misses);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
struct hdr_histogram* inst_hist = inst_m_ar_commands_latency_histograms.at(request_index);
hdr_record_value_capped(hist,latency);
hdr_record_value_capped(inst_hist,latency);
hdr_record_value_capped(m_totals_latency_histogram,latency);
hdr_record_value_capped(inst_m_totals_latency_histogram,latency);
}

unsigned int run_stats::get_duration(void)
{
return m_cur_stats.m_second;
Expand Down Expand Up @@ -926,6 +942,8 @@ void result_print_to_json(json_handler * jsonhandler, const char * type, double
jsonhandler->write_obj("Bytes RX","%lld", cmd_stats.m_bytes_rx);
jsonhandler->write_obj("Bytes TX","%lld", cmd_stats.m_bytes_tx);
jsonhandler->write_obj("Count","%lld", cmd_stats.m_ops);
jsonhandler->write_obj("Hits","%lld", cmd_stats.m_hits);
jsonhandler->write_obj("Misses","%lld", cmd_stats.m_misses);
if (sec_has_samples){
jsonhandler->write_obj("Average Latency","%.3f", cmd_stats.m_avg_latency);
jsonhandler->write_obj("Accumulated Latency","%lld", cmd_stats.m_total_latency / LATENCY_HDR_RESULTS_MULTIPLIER);
Expand Down Expand Up @@ -1065,6 +1083,40 @@ void run_stats::print_missess_sec_column(output_table &table) {
table.add_column(column);
}

void run_stats::print_arbitrary_hits_sec_column(output_table &table) {
table_el el;
table_column column(12);

column.elements.push_back(*el.init_str("%12s ", "Hits/sec"));
column.elements.push_back(*el.init_str("%s", "-------------"));

if (print_arbitrary_commands_results()) {
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_hits_sec));
}
}
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_hits_sec));

table.add_column(column);
}

void run_stats::print_arbitrary_misses_sec_column(output_table &table) {
table_el el;
table_column column(12);

column.elements.push_back(*el.init_str("%12s ", "Misses/sec"));
column.elements.push_back(*el.init_str("%s", "-------------"));

if (print_arbitrary_commands_results()) {
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_misses_sec));
}
}
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_misses_sec));

table.add_column(column);
}

void run_stats::print_moved_sec_column(output_table &table) {
table_el el;
table_column column(12);
Expand Down Expand Up @@ -1237,8 +1289,8 @@ void run_stats::print_json(json_handler *jsonhandler, arbitrary_command_list& co
std::vector<one_sec_cmd_stats> arbitrary_command_stats = get_one_sec_cmd_stats_arbitrary_command(i);

result_print_to_json(jsonhandler, command_name.c_str(), m_totals.m_ar_commands[i].m_ops_sec,
0.0,
0.0,
m_totals.m_ar_commands[i].m_hits_sec,
m_totals.m_ar_commands[i].m_misses_sec,
cluster_mode ? m_totals.m_ar_commands[i].m_moved_sec : -1,
cluster_mode ? m_totals.m_ar_commands[i].m_ask_sec : -1,
m_totals.m_ar_commands[i].m_bytes_sec,
Expand Down Expand Up @@ -1410,13 +1462,17 @@ void run_stats::print(FILE *out, benchmark_config *config,
// Ops/sec column
print_ops_sec_column(table);

// Hits/sec column (not relevant for arbitrary commands)
if (!print_arbitrary_commands_results()) {
// Hits/sec column
if (print_arbitrary_commands_results()) {
print_arbitrary_hits_sec_column(table);
} else {
print_hits_sec_column(table);
}

// Misses/sec column (not relevant for arbitrary commands)
if (!print_arbitrary_commands_results()) {
// Misses/sec column
if (print_arbitrary_commands_results()) {
print_arbitrary_misses_sec_column(table);
} else {
print_missess_sec_column(table);
}

Expand Down
4 changes: 4 additions & 0 deletions run_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class run_stats {
void update_wait_op(struct timeval* ts, unsigned int latency);
void update_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t arbitrary_index);
void update_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t arbitrary_index, unsigned int hits, unsigned int misses);

void aggregate_average(const std::vector<run_stats>& all_stats);
void summarize(totals& result) const;
Expand Down Expand Up @@ -176,6 +178,8 @@ class run_stats {
void print_ops_sec_column(output_table &table);
void print_hits_sec_column(output_table &table);
void print_missess_sec_column(output_table &table);
void print_arbitrary_hits_sec_column(output_table &table);
void print_arbitrary_misses_sec_column(output_table &table);
void print_moved_sec_column(output_table &table);
void print_ask_sec_column(output_table &table);
void print_avg_latency_column(output_table &table);
Expand Down
13 changes: 13 additions & 0 deletions run_stats_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ totals_cmd::totals_cmd() :
m_bytes_sec(0),
m_bytes_sec_rx(0),
m_bytes_sec_tx(0),
m_hits_sec(0),
m_misses_sec(0),
m_moved_sec(0),
m_ask_sec(0),
m_latency(0),
Expand All @@ -211,6 +213,8 @@ totals_cmd::totals_cmd() :

void totals_cmd::add(const totals_cmd& other) {
m_ops_sec += other.m_ops_sec;
m_hits_sec += other.m_hits_sec;
m_misses_sec += other.m_misses_sec;
m_moved_sec += other.m_moved_sec;
m_ask_sec += other.m_ask_sec;
m_bytes_sec += other.m_bytes_sec;
Expand All @@ -223,6 +227,8 @@ void totals_cmd::add(const totals_cmd& other) {

void totals_cmd::aggregate_average(size_t stats_size) {
m_ops_sec /= stats_size;
m_hits_sec /= stats_size;
m_misses_sec /= stats_size;
m_moved_sec /= stats_size;
m_ask_sec /= stats_size;
m_bytes_sec /= stats_size;
Expand All @@ -244,6 +250,8 @@ void totals_cmd::summarize(const one_sec_cmd_stats& other, unsigned long test_du
m_bytes_sec = ((other.m_bytes_rx + other.m_bytes_tx) / 1024.0) / test_duration_usec * 1000000;
m_bytes_sec_rx = (other.m_bytes_rx / 1024.0) / test_duration_usec * 1000000;
m_bytes_sec_tx = (other.m_bytes_tx / 1024.0) / test_duration_usec * 1000000;
m_hits_sec = (double) other.m_hits / test_duration_usec * 1000000;
m_misses_sec = (double) other.m_misses / test_duration_usec * 1000000;
m_moved_sec = (double) other.m_moved / test_duration_usec * 1000000;
m_ask_sec = (double) other.m_ask / test_duration_usec * 1000000;
}
Expand Down Expand Up @@ -330,3 +338,8 @@ void totals::update_op(unsigned long int bytes_rx, unsigned long int bytes_tx, u
m_total_latency += latency;
hdr_record_value_capped(latency_histogram,latency);
}

void totals::update_op(unsigned long int bytes_rx, unsigned long int bytes_tx, unsigned int latency, unsigned int hits, unsigned int misses) {
update_op(bytes_rx, bytes_tx, latency);
// Note: totals class doesn't track individual hits/misses, they're aggregated at higher levels
}
Loading
Loading