Skip to content

Commit 659752c

Browse files
Merge pull request #397 from filipecosta90/feat/mget-redis-protocol
MGET support for Redis protocol with OSS Cluster slot-aware routing
1 parent 41f8166 commit 659752c

12 files changed

Lines changed: 2343 additions & 29 deletions

.clang-format

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
BasedOnStyle: LLVM
55

6+
# C++ standard: keep Cpp03 so nested templates use "> >" (not ">>"),
7+
# which is required for the macOS C++03-compatible build.
8+
Standard: Cpp03
9+
610
# Indentation
711
IndentWidth: 4
812
TabWidth: 4
@@ -53,10 +57,6 @@ AllowShortFunctionsOnASingleLine: Inline
5357
AllowShortIfStatementsOnASingleLine: WithoutElse
5458
AllowShortLoopsOnASingleLine: false
5559

56-
# C++ standard: keep Cpp03 so nested templates use "> >" (not ">>"),
57-
# which is required for the macOS C++03-compatible build.
58-
Standard: Cpp03
59-
6060
# Other
6161
AlignTrailingComments: true
6262
AlwaysBreakAfterReturnType: None

client.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -585,12 +585,13 @@ bool client::create_mget_request(struct timeval &timestamp, unsigned int conn_id
585585
m_keylist->clear();
586586
for (unsigned int i = 0; i < keys_count; i++) {
587587
get_key_response res = get_key_for_conn(GET_CMD_IDX, conn_id, &key_index);
588-
/* Not supported in cluster mode */
589-
assert(res == available_for_conn);
588+
if (res != available_for_conn) continue;
590589

591590
m_keylist->add_key(m_obj_gen->get_key(), m_obj_gen->get_key_len());
592591
}
593592

593+
if (m_keylist->get_keys_count() == 0) return false;
594+
594595
m_connections[conn_id]->send_mget_command(&timestamp, m_keylist);
595596
return true;
596597
}
@@ -655,9 +656,16 @@ void client::create_request(struct timeval timestamp, unsigned int conn_id)
655656
}
656657

657658
// MGET command
658-
if (!create_mget_request(timestamp, conn_id)) return;
659+
if (!create_mget_request(timestamp, conn_id)) {
660+
// No MGET could be sent (e.g. this cluster connection owns no
661+
// slots that map to the configured key range). Force the ratio
662+
// counter past the threshold so the next create_request() call
663+
// resets both counters instead of busy-spinning here forever.
664+
m_get_ratio_count = m_config->ratio.b;
665+
return;
666+
}
659667

660-
m_get_ratio_count += m_config->multi_key_get;
668+
m_get_ratio_count += m_keylist->get_keys_count();
661669
m_reqs_generated++;
662670
} else {
663671
// overlap counters

cluster_client.cpp

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,68 @@ bool cluster_client::connect_shard_connection(shard_connection *sc, char *addres
279279
return res == 0;
280280
}
281281

282+
void cluster_client::build_mget_slot_cache()
283+
{
284+
if (!m_config->multi_key_get) return;
285+
286+
mget_slot_cache *cache = m_config->mget_cache;
287+
assert(cache != NULL);
288+
289+
unsigned int num_conns = (unsigned int) m_connections.size();
290+
291+
// Slot→key mapping is topology-independent: build it once across all threads.
292+
pthread_mutex_lock(&cache->mutex);
293+
if (!cache->built.load(std::memory_order_relaxed)) {
294+
unsigned long long key_min = m_config->key_minimum;
295+
unsigned long long key_max = m_config->key_maximum;
296+
297+
// Cap per-slot storage: multi_key_get * 4, bounded to [multi_key_get, 4096].
298+
// This bounds both memory and scan time regardless of key range size.
299+
unsigned int cap = (unsigned int) m_config->multi_key_get * 4;
300+
if (cap > 4096) cap = 4096;
301+
if (cap < (unsigned int) m_config->multi_key_get) cap = (unsigned int) m_config->multi_key_get;
302+
303+
benchmark_error_log("Building MGET slot cache for key range [%llu, %llu] "
304+
"(cap %u keys/slot)...\n",
305+
key_min, key_max, cap);
306+
307+
cache->slot_keys.assign(MAX_CLUSTER_HSLOT + 1, std::vector<unsigned long long>());
308+
309+
unsigned int filled_slots = 0;
310+
for (unsigned long long idx = key_min; idx <= key_max && filled_slots < MAX_CLUSTER_HSLOT + 1; idx++) {
311+
m_obj_gen->generate_key(idx);
312+
unsigned int slot = calc_hslot_crc16_with_hash_tag(m_obj_gen->get_key(), m_obj_gen->get_key_len());
313+
if (cache->slot_keys[slot].size() < cap) {
314+
cache->slot_keys[slot].push_back(idx);
315+
if (cache->slot_keys[slot].size() == cap) filled_slots++;
316+
}
317+
}
318+
319+
cache->built.store(true, std::memory_order_release);
320+
321+
// Count slots that ended up with at least one key (informational).
322+
unsigned int populated = 0;
323+
for (unsigned int s = 0; s <= MAX_CLUSTER_HSLOT; s++) {
324+
if (!cache->slot_keys[s].empty()) populated++;
325+
}
326+
benchmark_error_log("MGET slot cache built: %u/%u slots populated.\n", populated, MAX_CLUSTER_HSLOT + 1);
327+
}
328+
pthread_mutex_unlock(&cache->mutex);
329+
330+
// Per-thread cursor: one entry per slot, sized to match the shared table.
331+
m_mget_slot_cursor.assign(MAX_CLUSTER_HSLOT + 1, 0);
332+
333+
// Conn→slot mapping depends on topology: rebuild on every refresh.
334+
m_mget_conn_slots.assign(num_conns, std::vector<unsigned int>());
335+
m_mget_conn_slot_cursor.assign(num_conns, 0);
336+
337+
for (unsigned int slot = 0; slot <= MAX_CLUSTER_HSLOT; slot++) {
338+
if (cache->slot_keys[slot].empty()) continue;
339+
unsigned int cid = m_slot_to_shard[slot];
340+
if (cid < num_conns) m_mget_conn_slots[cid].push_back(slot);
341+
}
342+
}
343+
282344
void cluster_client::handle_cluster_slots(protocol_response *r)
283345
{
284346
/*
@@ -362,6 +424,19 @@ void cluster_client::handle_cluster_slots(protocol_response *r)
362424
}
363425
}
364426
}
427+
428+
// Rebuild same-slot key index cache for MGET if enabled.
429+
build_mget_slot_cache();
430+
431+
// Wake all connected shard connections so each one re-evaluates hold_pipeline()
432+
// with the freshly-built m_mget_conn_slots. Without this, a connection that
433+
// was bufferevent_disable()'d before the cache existed would never re-run
434+
// fill_pipeline() and would stay permanently idle.
435+
if (m_config->multi_key_get > 0) {
436+
for (size_t i = 0; i < m_connections.size(); i++) {
437+
if (m_connections[i]->get_connection_state() != conn_disconnected) m_connections[i]->schedule_fill();
438+
}
439+
}
365440
}
366441

367442
bool cluster_client::hold_pipeline(unsigned int conn_id)
@@ -392,6 +467,17 @@ bool cluster_client::hold_pipeline(unsigned int conn_id)
392467
}
393468
}
394469

470+
/* In GET-only MGET mode, a connection whose slots own no keys in the
471+
* configured key range can never generate a request. Returning true here
472+
* breaks the fill_pipeline while-loop for that connection so it does not
473+
* spin consuming CPU. Other connections (which do have eligible slots)
474+
* continue to operate normally. */
475+
if (m_config->multi_key_get > 0 && m_config->ratio.a == 0 && m_config->mget_cache != NULL &&
476+
m_config->mget_cache->built.load(std::memory_order_acquire) && conn_id < m_mget_conn_slots.size() &&
477+
m_mget_conn_slots[conn_id].empty() && m_staged_monitor_commands[conn_id].empty()) {
478+
return true;
479+
}
480+
395481
/* In transaction mode the pin connection drives the entire rotation.
396482
* Non-pin connections must not spin in fill_pipeline; they will be
397483
* rescheduled via schedule_fill() when the pin is cleared. If the pin
@@ -649,6 +735,42 @@ bool cluster_client::create_arbitrary_request(unsigned int command_index, struct
649735
return true;
650736
}
651737

738+
bool cluster_client::create_mget_request(struct timeval &timestamp, unsigned int conn_id)
739+
{
740+
// Only reached when --multi-key-get is set.
741+
// Use the pre-built slot cache so all N keys in this MGET share one hash
742+
// slot — Redis requires exact same-slot (not just same-node) for MGET in
743+
// cluster mode. Cache is rebuilt on every topology change via
744+
// build_mget_slot_cache() at the end of handle_cluster_slots().
745+
unsigned int keys_count = m_config->ratio.b - m_get_ratio_count;
746+
if ((int) keys_count > m_config->multi_key_get) keys_count = m_config->multi_key_get;
747+
if (keys_count == 0) return false;
748+
749+
if (conn_id >= m_mget_conn_slots.size() || m_mget_conn_slots[conn_id].empty()) {
750+
// Cache not ready or no key in the configured range maps to this shard.
751+
return false;
752+
}
753+
754+
// Round-robin over the slots owned by this connection.
755+
size_t &sc = m_mget_conn_slot_cursor[conn_id];
756+
unsigned int target_slot = m_mget_conn_slots[conn_id][sc % m_mget_conn_slots[conn_id].size()];
757+
sc++;
758+
759+
std::vector<unsigned long long> &slot_keys = m_config->mget_cache->slot_keys[target_slot];
760+
size_t &kc = m_mget_slot_cursor[target_slot];
761+
762+
m_keylist->clear();
763+
for (unsigned int i = 0; i < keys_count; i++) {
764+
unsigned long long idx = slot_keys[kc % slot_keys.size()];
765+
kc++;
766+
m_obj_gen->generate_key(idx);
767+
m_keylist->add_key(m_obj_gen->get_key(), m_obj_gen->get_key_len());
768+
}
769+
770+
m_connections[conn_id]->send_mget_command(&timestamp, m_keylist);
771+
return true;
772+
}
773+
652774
void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id)
653775
{
654776
/* Drain staged monitor commands that were routed here from another shard connection. */

cluster_client.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <set>
2323
#include <queue>
2424
#include <string>
25+
#include <vector>
2526
#include "client.h"
2627

2728
typedef std::queue<unsigned long long> key_index_pool;
@@ -58,6 +59,15 @@ class cluster_client : public client
5859
// Set when we emit the "pin connection lost mid-rotation" warning so we
5960
// don't spam it on every hold_pipeline() call during the outage.
6061
bool m_txn_pin_lost_warned;
62+
// Per-slot key index cache for cluster MGET. The slot→key table
63+
// (slot_keys) is shared across threads via m_config->mget_cache and is
64+
// read-only after the first thread builds it. Only the per-slot
65+
// round-robin cursor (m_mget_slot_cursor) and the per-connection slot
66+
// list (m_mget_conn_slots) are per-thread.
67+
std::vector<size_t> m_mget_slot_cursor; // [slot] → per-thread round-robin cursor
68+
std::vector<std::vector<unsigned int> > m_mget_conn_slots; // [conn] → owned slot list
69+
std::vector<size_t> m_mget_conn_slot_cursor; // [conn] → slot round-robin cursor
70+
void build_mget_slot_cache();
6171

6272
virtual int connect(void);
6373
virtual void disconnect(void);
@@ -89,6 +99,7 @@ class cluster_client : public client
8999
virtual get_key_response get_key_for_conn(unsigned int command_index, unsigned int conn_id,
90100
unsigned long long *key_index);
91101
virtual bool create_arbitrary_request(unsigned int command_index, struct timeval &timestamp, unsigned int conn_id);
102+
virtual bool create_mget_request(struct timeval &timestamp, unsigned int conn_id);
92103

93104
// client manager api's
94105
virtual void handle_cluster_slots(protocol_response *r);

memtier_benchmark.cpp

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ static int parse_uri(const char *uri, struct benchmark_config *cfg)
561561

562562
static void config_init_defaults(struct benchmark_config *cfg)
563563
{
564+
cfg->mget_cache = NULL;
564565
if (!cfg->server && !cfg->unix_socket) cfg->server = "localhost";
565566
if (!cfg->port && !cfg->unix_socket) cfg->port = 6379;
566567
if (!cfg->resolution) cfg->resolution = AF_UNSPEC;
@@ -616,9 +617,6 @@ static bool verify_cluster_option(struct benchmark_config *cfg)
616617
if (cfg->reconnect_interval) {
617618
fprintf(stderr, "error: cluster mode dose not support reconnect-interval option.\n");
618619
return false;
619-
} else if (cfg->multi_key_get) {
620-
fprintf(stderr, "error: cluster mode dose not support multi-key-get option.\n");
621-
return false;
622620
} else if (cfg->wait_ratio.is_defined()) {
623621
fprintf(stderr, "error: cluster mode dose not support wait-ratio option.\n");
624622
return false;
@@ -1252,7 +1250,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
12521250
case o_multi_key_get:
12531251
endptr = NULL;
12541252
cfg->multi_key_get = (unsigned int) strtoul(optarg, &endptr, 10);
1255-
if (cfg->multi_key_get <= 0 || !endptr || *endptr != '\0') {
1253+
if (cfg->multi_key_get < 1 || !endptr || *endptr != '\0') {
12561254
fprintf(stderr, "error: multi-key-get must be greater than zero.\n");
12571255
return -1;
12581256
}
@@ -1724,7 +1722,9 @@ void usage()
17241722
"(default: 0)\n"
17251723
" --thread-conn-start-max-jitter-micros=NUM Maximum jitter in microseconds between connection creation "
17261724
"(default: 0)\n"
1727-
" --multi-key-get=NUM Enable multi-key get commands, up to NUM keys (default: 0)\n"
1725+
" --multi-key-get=NUM Enable multi-key get commands, up to NUM keys (default: 0).\n"
1726+
" In cluster mode, keys are probed from the key space so that all\n"
1727+
" keys in one batch route to the same shard (no hash-tag prefix).\n"
17281728
" --select-db=DB DB number to select, when testing a redis server\n"
17291729
" --distinct-client-seed Use a different random seed for each client\n"
17301730
" --randomize random seed based on timestamp (default is constant value)\n"
@@ -2117,6 +2117,13 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj
21172117
{
21182118
fprintf(stderr, "[RUN #%u] Preparing benchmark client...\n", run_id);
21192119

2120+
// Shared MGET slot cache: allocate fresh for this run so the lazy build
2121+
// inside build_mget_slot_cache() fires again (topology may have changed).
2122+
if (cfg->cluster_mode && cfg->multi_key_get > 0) {
2123+
delete cfg->mget_cache;
2124+
cfg->mget_cache = new mget_slot_cache();
2125+
}
2126+
21202127
// prepare threads data
21212128
std::vector<cg_thread *> threads;
21222129
g_threads = &threads; // Set global pointer for crash handler
@@ -2127,6 +2134,8 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj
21272134

21282135
if (t->prepare() < 0) {
21292136
benchmark_error_log("error: failed to prepare thread %u for test.\n", i);
2137+
delete cfg->mget_cache;
2138+
cfg->mget_cache = NULL;
21302139
exit(1);
21312140
}
21322141
threads.push_back(t);
@@ -2562,6 +2571,9 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj
25622571

25632572
g_threads = NULL; // Clear global pointer
25642573

2574+
delete cfg->mget_cache;
2575+
cfg->mget_cache = NULL;
2576+
25652577
return stats;
25662578
}
25672579

@@ -3196,6 +3208,14 @@ int main(int argc, char *argv[])
31963208
fprintf(stderr, "error: select-db can only be used with redis protocol.\n");
31973209
usage();
31983210
}
3211+
if (cfg.multi_key_get > 0 && cfg.protocol == PROTOCOL_MEMCACHE_BINARY) {
3212+
fprintf(stderr, "error: --multi-key-get is not supported with memcache_binary.\n");
3213+
usage();
3214+
}
3215+
if (cfg.multi_key_get > 0 && cfg.arbitrary_commands->is_defined()) {
3216+
fprintf(stderr, "error: --multi-key-get cannot be combined with --command.\n");
3217+
usage();
3218+
}
31993219
if (cfg.data_offset > 0) {
32003220
if (cfg.data_offset > (1 << 29) - 1) {
32013221
fprintf(stderr, "error: data-offset too long\n");

memtier_benchmark.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
#ifndef _MEMTIER_BENCHMARK_H
2020
#define _MEMTIER_BENCHMARK_H
2121

22+
#include <atomic>
2223
#include <vector>
2324
#include <sys/time.h>
25+
#include <pthread.h>
2426
#include "config_types.h"
2527

2628
#ifdef USE_TLS
@@ -53,6 +55,23 @@ enum PROTOCOL_TYPE
5355
PROTOCOL_MEMCACHE_BINARY,
5456
};
5557

58+
// Shared MGET slot cache: built once (lazily, on first topology load) and
59+
// read concurrently by all cluster_client threads. m_mget_slot_keys is
60+
// identical for every thread — only the per-slot round-robin cursors differ.
61+
struct mget_slot_cache
62+
{
63+
std::vector<std::vector<unsigned long long> > slot_keys; // [slot] → key indices; read-only after built
64+
std::atomic<bool> built;
65+
pthread_mutex_t mutex;
66+
67+
mget_slot_cache() : built(false) { pthread_mutex_init(&mutex, NULL); }
68+
~mget_slot_cache() { pthread_mutex_destroy(&mutex); }
69+
70+
private:
71+
mget_slot_cache(const mget_slot_cache &);
72+
mget_slot_cache &operator=(const mget_slot_cache &);
73+
};
74+
5675
struct benchmark_config
5776
{
5877
const char *server;
@@ -124,6 +143,7 @@ struct benchmark_config
124143
unsigned int thread_conn_start_min_jitter_micros;
125144
unsigned int thread_conn_start_max_jitter_micros;
126145
int multi_key_get;
146+
struct mget_slot_cache *mget_cache; // NULL unless cluster_mode && multi_key_get > 0
127147
const char *authenticate;
128148
int select_db;
129149
const char *uri;

0 commit comments

Comments
 (0)