-
Notifications
You must be signed in to change notification settings - Fork 247
Expand file tree
/
Copy pathcluster_client.h
More file actions
113 lines (100 loc) · 5.47 KB
/
cluster_client.h
File metadata and controls
113 lines (100 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* Copyright (C) 2011-2026 Redis Labs Ltd.
*
* This file is part of memtier_benchmark.
*
* memtier_benchmark is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* memtier_benchmark is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with memtier_benchmark. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MEMTIER_BENCHMARK_CLUSTER_CLIENT_H
#define MEMTIER_BENCHMARK_CLUSTER_CLIENT_H
#include <set>
#include <queue>
#include <string>
#include <vector>
#include "client.h"
typedef std::queue<unsigned long long> key_index_pool;
struct staged_monitor_cmd
{
arbitrary_command parsed_cmd; // already split (split_command_to_args done); format at drain
size_t stats_index;
struct timeval enqueue_time; // latency measured from selection, not drain
size_t source_line; // original monitor file line (1-based) for error messages
};
// forward decleration
class shard_connection;
class cluster_client : public client
{
protected:
std::vector<key_index_pool *> m_key_index_pools;
std::vector<std::queue<staged_monitor_cmd> > m_staged_monitor_commands;
unsigned int m_slot_to_shard[16384];
// --transaction: shard connection that owns the in-flight rotation of
// --command entries. -1 = no pin (rotation has not started or just ended).
// The pin is established when the first command of a fresh rotation is
// processed and cleared when m_arbitrary_command_rotation_seq advances
// past m_txn_observed_rotation_seq (i.e. the previous rotation wrapped).
int m_txn_pinned_conn_id;
unsigned long long m_txn_observed_rotation_seq;
// Key index pre-generated by the lookahead for the first keyed command of
// the pinned rotation. Stored here (not in the shared pool) to avoid
// corrupting the pool's (command_index, key_index) pair invariant.
unsigned long long m_txn_staged_key_index;
bool m_txn_has_staged_key;
// Set when we emit the "pin connection lost mid-rotation" warning so we
// don't spam it on every hold_pipeline() call during the outage.
bool m_txn_pin_lost_warned;
// Per-slot key index cache for cluster MGET. The slot→key table
// (slot_keys) is shared across threads via m_config->mget_cache and is
// read-only after the first thread builds it. Only the per-slot
// round-robin cursor (m_mget_slot_cursor) and the per-connection slot
// list (m_mget_conn_slots) are per-thread.
std::vector<size_t> m_mget_slot_cursor; // [slot] → per-thread round-robin cursor
std::vector<std::vector<unsigned int> > m_mget_conn_slots; // [conn] → owned slot list
std::vector<size_t> m_mget_conn_slot_cursor; // [conn] → slot round-robin cursor
void build_mget_slot_cache();
virtual int connect(void);
virtual void disconnect(void);
shard_connection *create_shard_connection(abstract_protocol *abs_protocol);
bool connect_shard_connection(shard_connection *sc, char *address, char *port);
void handle_moved(unsigned int conn_id, struct timeval timestamp, request *request, protocol_response *response);
void handle_ask(unsigned int conn_id, struct timeval timestamp, request *request, protocol_response *response);
bool create_monitor_request_cluster(unsigned int command_index, struct timeval ×tamp, unsigned int conn_id);
void process_staged_monitor_command(struct timeval timestamp, unsigned int conn_id);
// Resend the request on the slot-owning connection (or same connection if
// routing info isn't available yet). Returns true if ownership of `req`
// transferred to a retry queue.
bool retry_after_redirect(unsigned int conn_id, request *req);
// Terminal accounting for a MOVED/ASK request whose retry was refused
// (e.g. max_retries exhausted, retry queue full, no captured bytes). Logs
// to --failed-keys-file if configured and increments the error counter,
// so the request doesn't silently disappear from the stats.
void finalize_dropped_redirect(struct timeval timestamp, request *req, protocol_response *response);
// Release the transaction pin and return any staged key to the pool so the
// key index is not silently burned when a mid-rotation MOVED or disconnect
// forces the rotation to restart.
void txn_release_pin();
public:
cluster_client(client_group *group);
virtual ~cluster_client();
virtual get_key_response get_key_for_conn(unsigned int command_index, unsigned int conn_id,
unsigned long long *key_index);
virtual bool create_arbitrary_request(unsigned int command_index, struct timeval ×tamp, unsigned int conn_id);
virtual bool create_mget_request(struct timeval ×tamp, unsigned int conn_id);
// client manager api's
virtual void handle_cluster_slots(protocol_response *r);
virtual void create_request(struct timeval timestamp, unsigned int conn_id);
virtual bool hold_pipeline(unsigned int conn_id);
virtual void handle_response(unsigned int conn_id, struct timeval timestamp, request *request,
protocol_response *response);
};
#endif // MEMTIER_BENCHMARK_CLUSTER_CLIENT_H