mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
Add support for plaintext clients in TLS cluster (#8587)
The cluster bus is established over TLS or non-TLS depending on the configuration tls-cluster. The client ports distributed in the cluster and sent to clients are assumed to be TLS or non-TLS also depending on tls-cluster. The cluster bus is now extended to also contain the non-TLS port of clients in a TLS cluster, when available. The non-TLS port of a cluster node, when available, is sent to clients connected without TLS in responses to CLUSTER SLOTS, CLUSTER NODES, CLUSTER SLAVES and MOVED and ASK redirects, instead of the TLS port. The user was able to override the client port by defining cluster-announce-port. Now cluster-announce-tls-port is added, so the user can define an alternative announce port for both TLS and non-TLS clients. Fixes #8134
This commit is contained in:
parent
8cbd858d45
commit
5629dbe715
18
redis.conf
18
redis.conf
@ -1500,16 +1500,21 @@ lua-time-limit 5000
|
||||
#
|
||||
# In order to make Redis Cluster working in such environments, a static
|
||||
# configuration where each node knows its public address is needed. The
|
||||
# following two options are used for this scope, and are:
|
||||
# following four options are used for this scope, and are:
|
||||
#
|
||||
# * cluster-announce-ip
|
||||
# * cluster-announce-port
|
||||
# * cluster-announce-tls-port
|
||||
# * cluster-announce-bus-port
|
||||
#
|
||||
# Each instructs the node about its address, client port, and cluster message
|
||||
# bus port. The information is then published in the header of the bus packets
|
||||
# so that other nodes will be able to correctly map the address of the node
|
||||
# publishing the information.
|
||||
# Each instructs the node about its address, client ports (for connections
|
||||
# without and with TLS) and cluster message bus port. The information is then
|
||||
# published in the header of the bus packets so that other nodes will be able to
|
||||
# correctly map the address of the node publishing the information.
|
||||
#
|
||||
# If cluster-tls is set to yes and cluster-announce-tls-port is omitted or set
|
||||
# to zero, then cluster-announce-port refers to the TLS port. Note also that
|
||||
# cluster-announce-tls-port has no effect if cluster-tls is set to no.
|
||||
#
|
||||
# If the above options are not used, the normal Redis Cluster auto-detection
|
||||
# will be used instead.
|
||||
@ -1522,7 +1527,8 @@ lua-time-limit 5000
|
||||
# Example:
|
||||
#
|
||||
# cluster-announce-ip 10.1.1.5
|
||||
# cluster-announce-port 6379
|
||||
# cluster-announce-tls-port 6379
|
||||
# cluster-announce-port 0
|
||||
# cluster-announce-bus-port 6380
|
||||
|
||||
################################## SLOW LOG ###################################
|
||||
|
@ -55,7 +55,7 @@ void clusterSendFail(char *nodename);
|
||||
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
|
||||
void clusterUpdateState(void);
|
||||
int clusterNodeGetSlotBit(clusterNode *n, int slot);
|
||||
sds clusterGenNodesDescription(int filter);
|
||||
sds clusterGenNodesDescription(int filter, int use_pport);
|
||||
clusterNode *clusterLookupNode(const char *name);
|
||||
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
|
||||
int clusterAddSlot(clusterNode *n, int slot);
|
||||
@ -190,6 +190,9 @@ int clusterLoadConfig(char *filename) {
|
||||
* base port. */
|
||||
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
|
||||
|
||||
/* The plaintext port for client in a TLS cluster (n->pport) is not
|
||||
* stored in nodes.conf. It is received later over the bus protocol. */
|
||||
|
||||
/* Parse flags */
|
||||
p = s = argv[2];
|
||||
while(p) {
|
||||
@ -336,7 +339,7 @@ int clusterSaveConfig(int do_fsync) {
|
||||
|
||||
/* Get the nodes description and concatenate our "vars" directive to
|
||||
* save currentEpoch and lastVoteEpoch. */
|
||||
ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
|
||||
ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
|
||||
ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
|
||||
(unsigned long long) server.cluster->currentEpoch,
|
||||
(unsigned long long) server.cluster->lastVoteEpoch);
|
||||
@ -437,6 +440,26 @@ int clusterLockConfig(char *filename) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Derives our ports to be announced in the cluster bus. */
|
||||
void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
|
||||
int *announced_cport) {
|
||||
int port = server.tls_cluster ? server.tls_port : server.port;
|
||||
/* Default announced ports. */
|
||||
*announced_port = port;
|
||||
*announced_pport = server.tls_cluster ? server.port : 0;
|
||||
*announced_cport = port + CLUSTER_PORT_INCR;
|
||||
/* Config overriding announced ports. */
|
||||
if (server.tls_cluster && server.cluster_announce_tls_port) {
|
||||
*announced_port = server.cluster_announce_tls_port;
|
||||
*announced_pport = server.cluster_announce_port;
|
||||
} else if (server.cluster_announce_port) {
|
||||
*announced_port = server.cluster_announce_port;
|
||||
}
|
||||
if (server.cluster_announce_bus_port) {
|
||||
*announced_cport = server.cluster_announce_bus_port;
|
||||
}
|
||||
}
|
||||
|
||||
/* Some flags (currently just the NOFAILOVER flag) may need to be updated
|
||||
* in the "myself" node based on the current configuration of the node,
|
||||
* that may change at runtime via CONFIG SET. This function changes the
|
||||
@ -524,14 +547,9 @@ void clusterInit(void) {
|
||||
memset(server.cluster->slots_keys_count,0,
|
||||
sizeof(server.cluster->slots_keys_count));
|
||||
|
||||
/* Set myself->port / cport to my listening ports, we'll just need to
|
||||
/* Set myself->port/cport/pport to my listening ports, we'll just need to
|
||||
* discover the IP address via MEET messages. */
|
||||
myself->port = port;
|
||||
myself->cport = port+CLUSTER_PORT_INCR;
|
||||
if (server.cluster_announce_port)
|
||||
myself->port = server.cluster_announce_port;
|
||||
if (server.cluster_announce_bus_port)
|
||||
myself->cport = server.cluster_announce_bus_port;
|
||||
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
|
||||
|
||||
server.cluster->mf_end = 0;
|
||||
resetManualFailover();
|
||||
@ -782,6 +800,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
|
||||
memset(node->ip,0,sizeof(node->ip));
|
||||
node->port = 0;
|
||||
node->cport = 0;
|
||||
node->pport = 0;
|
||||
node->fail_reports = listCreate();
|
||||
node->voted_time = 0;
|
||||
node->orphaned_time = 0;
|
||||
@ -1488,6 +1507,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
||||
if (node->link) freeClusterLink(node->link);
|
||||
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
||||
node->port = ntohs(g->port);
|
||||
node->pport = ntohs(g->pport);
|
||||
node->cport = ntohs(g->cport);
|
||||
node->flags &= ~CLUSTER_NODE_NOADDR;
|
||||
}
|
||||
@ -1509,6 +1529,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
||||
node = createClusterNode(g->nodename, flags);
|
||||
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
||||
node->port = ntohs(g->port);
|
||||
node->pport = ntohs(g->pport);
|
||||
node->cport = ntohs(g->cport);
|
||||
clusterAddNode(node);
|
||||
}
|
||||
@ -1548,6 +1569,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
|
||||
{
|
||||
char ip[NET_IP_STR_LEN] = {0};
|
||||
int port = ntohs(hdr->port);
|
||||
int pport = ntohs(hdr->pport);
|
||||
int cport = ntohs(hdr->cport);
|
||||
|
||||
/* We don't proceed if the link is the same as the sender link, as this
|
||||
@ -1559,12 +1581,13 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
|
||||
if (link == node->link) return 0;
|
||||
|
||||
nodeIp2String(ip,link,hdr->myip);
|
||||
if (node->port == port && node->cport == cport &&
|
||||
if (node->port == port && node->cport == cport && node->pport == pport &&
|
||||
strcmp(ip,node->ip) == 0) return 0;
|
||||
|
||||
/* IP / port is different, update it. */
|
||||
memcpy(node->ip,ip,sizeof(ip));
|
||||
node->port = port;
|
||||
node->pport = pport;
|
||||
node->cport = cport;
|
||||
if (node->link) freeClusterLink(node->link);
|
||||
node->flags &= ~CLUSTER_NODE_NOADDR;
|
||||
@ -1862,6 +1885,7 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
|
||||
nodeIp2String(node->ip,link,hdr->myip);
|
||||
node->port = ntohs(hdr->port);
|
||||
node->pport = ntohs(hdr->pport);
|
||||
node->cport = ntohs(hdr->cport);
|
||||
clusterAddNode(node);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||
@ -1924,6 +1948,7 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
link->node->flags |= CLUSTER_NODE_NOADDR;
|
||||
link->node->ip[0] = '\0';
|
||||
link->node->port = 0;
|
||||
link->node->pport = 0;
|
||||
link->node->cport = 0;
|
||||
freeClusterLink(link);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||
@ -2423,19 +2448,16 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
|
||||
hdr->myip[NET_IP_STR_LEN-1] = '\0';
|
||||
}
|
||||
|
||||
/* Handle cluster-announce-port as well. */
|
||||
int port = server.tls_cluster ? server.tls_port : server.port;
|
||||
int announced_port = server.cluster_announce_port ?
|
||||
server.cluster_announce_port : port;
|
||||
int announced_cport = server.cluster_announce_bus_port ?
|
||||
server.cluster_announce_bus_port :
|
||||
(port + CLUSTER_PORT_INCR);
|
||||
/* Handle cluster-announce-[tls-|bus-]port. */
|
||||
int announced_port, announced_pport, announced_cport;
|
||||
deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);
|
||||
|
||||
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
|
||||
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
|
||||
if (myself->slaveof != NULL)
|
||||
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
|
||||
hdr->port = htons(announced_port);
|
||||
hdr->pport = htons(announced_pport);
|
||||
hdr->cport = htons(announced_cport);
|
||||
hdr->flags = htons(myself->flags);
|
||||
hdr->state = server.cluster->state;
|
||||
@ -2492,6 +2514,7 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
|
||||
gossip->port = htons(n->port);
|
||||
gossip->cport = htons(n->cport);
|
||||
gossip->flags = htons(n->flags);
|
||||
gossip->pport = htons(n->pport);
|
||||
gossip->notused1 = 0;
|
||||
}
|
||||
|
||||
@ -4130,15 +4153,16 @@ sds representClusterNodeFlags(sds ci, uint16_t flags) {
|
||||
* See clusterGenNodesDescription() top comment for more information.
|
||||
*
|
||||
* The function returns the string representation as an SDS string. */
|
||||
sds clusterGenNodeDescription(clusterNode *node) {
|
||||
sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
|
||||
int j, start;
|
||||
sds ci;
|
||||
int port = use_pport && node->pport ? node->pport : node->port;
|
||||
|
||||
/* Node coordinates */
|
||||
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
|
||||
ci = sdscatfmt(ci," %s:%i@%i ",
|
||||
node->ip,
|
||||
node->port,
|
||||
port,
|
||||
node->cport);
|
||||
|
||||
/* Flags */
|
||||
@ -4249,10 +4273,13 @@ void clusterGenNodesSlotsInfo(int filter) {
|
||||
* include all the known nodes in the representation, including nodes in
|
||||
* the HANDSHAKE state.
|
||||
*
|
||||
* Setting use_pport to 1 in a TLS cluster makes the result contain the
|
||||
* plaintext client port rather then the TLS client port of each node.
|
||||
*
|
||||
* The representation obtained using this function is used for the output
|
||||
* of the CLUSTER NODES function, and as format for the cluster
|
||||
* configuration file (nodes.conf) for a given node. */
|
||||
sds clusterGenNodesDescription(int filter) {
|
||||
sds clusterGenNodesDescription(int filter, int use_pport) {
|
||||
sds ci = sdsempty(), ni;
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
@ -4265,7 +4292,7 @@ sds clusterGenNodesDescription(int filter) {
|
||||
clusterNode *node = dictGetVal(de);
|
||||
|
||||
if (node->flags & filter) continue;
|
||||
ni = clusterGenNodeDescription(node);
|
||||
ni = clusterGenNodeDescription(node, use_pport);
|
||||
ci = sdscatsds(ci,ni);
|
||||
sdsfree(ni);
|
||||
ci = sdscatlen(ci,"\n",1);
|
||||
@ -4319,7 +4346,10 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
|
||||
addReplyLongLong(c, end_slot);
|
||||
addReplyArrayLen(c, 3);
|
||||
addReplyBulkCString(c, node->ip);
|
||||
addReplyLongLong(c, node->port);
|
||||
/* Report non-TLS ports to non-TLS client in TLS cluster if available. */
|
||||
int use_pport = (server.tls_cluster &&
|
||||
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
|
||||
addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
|
||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||
|
||||
/* Remaining nodes in reply are replicas for slot range */
|
||||
@ -4329,7 +4359,10 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
|
||||
if (nodeFailed(node->slaves[i])) continue;
|
||||
addReplyArrayLen(c, 3);
|
||||
addReplyBulkCString(c, node->slaves[i]->ip);
|
||||
addReplyLongLong(c, node->slaves[i]->port);
|
||||
/* Report slave's non-TLS port to non-TLS client in TLS cluster */
|
||||
addReplyLongLong(c, (use_pport && node->slaves[i]->pport ?
|
||||
node->slaves[i]->pport :
|
||||
node->slaves[i]->port));
|
||||
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
|
||||
nested_elements++;
|
||||
}
|
||||
@ -4458,7 +4491,11 @@ NULL
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
|
||||
/* CLUSTER NODES */
|
||||
sds nodes = clusterGenNodesDescription(0);
|
||||
/* Report plaintext ports, only if cluster is TLS but client is known to
|
||||
* be non-TLS). */
|
||||
int use_pport = (server.tls_cluster &&
|
||||
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
|
||||
sds nodes = clusterGenNodesDescription(0, use_pport);
|
||||
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
|
||||
sdsfree(nodes);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
|
||||
@ -4834,9 +4871,12 @@ NULL
|
||||
return;
|
||||
}
|
||||
|
||||
/* Use plaintext port if cluster is TLS but client is non-TLS. */
|
||||
int use_pport = (server.tls_cluster &&
|
||||
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
|
||||
addReplyArrayLen(c,n->numslaves);
|
||||
for (j = 0; j < n->numslaves; j++) {
|
||||
sds ni = clusterGenNodeDescription(n->slaves[j]);
|
||||
sds ni = clusterGenNodeDescription(n->slaves[j], use_pport);
|
||||
addReplyBulkCString(c,ni);
|
||||
sdsfree(ni);
|
||||
}
|
||||
@ -5892,10 +5932,15 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
|
||||
} else if (error_code == CLUSTER_REDIR_MOVED ||
|
||||
error_code == CLUSTER_REDIR_ASK)
|
||||
{
|
||||
/* Redirect to IP:port. Include plaintext port if cluster is TLS but
|
||||
* client is non-TLS. */
|
||||
int use_pport = (server.tls_cluster &&
|
||||
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
|
||||
int port = use_pport && n->pport ? n->pport : n->port;
|
||||
addReplyErrorSds(c,sdscatprintf(sdsempty(),
|
||||
"-%s %d %s:%d",
|
||||
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
|
||||
hashslot,n->ip,n->port));
|
||||
hashslot, n->ip, port));
|
||||
} else {
|
||||
serverPanic("getNodeByQuery() unknown error.");
|
||||
}
|
||||
|
@ -135,7 +135,9 @@ typedef struct clusterNode {
|
||||
mstime_t orphaned_time; /* Starting time of orphaned master condition */
|
||||
long long repl_offset; /* Last known repl offset for this node. */
|
||||
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
|
||||
int port; /* Latest known clients port of this node */
|
||||
int port; /* Latest known clients port (TLS or plain). */
|
||||
int pport; /* Latest known clients plaintext port. Only used
|
||||
if the main clients port is for TLS. */
|
||||
int cport; /* Latest known cluster port of this node. */
|
||||
clusterLink *link; /* TCP/IP link with this node */
|
||||
list *fail_reports; /* List of nodes signaling this as failing */
|
||||
@ -194,7 +196,8 @@ typedef struct {
|
||||
uint16_t port; /* base port last time it was seen */
|
||||
uint16_t cport; /* cluster port last time it was seen */
|
||||
uint16_t flags; /* node->flags copy */
|
||||
uint32_t notused1;
|
||||
uint16_t pport; /* plaintext-port, when base port is TLS */
|
||||
uint16_t notused1;
|
||||
} clusterMsgDataGossip;
|
||||
|
||||
typedef struct {
|
||||
@ -267,7 +270,8 @@ typedef struct {
|
||||
unsigned char myslots[CLUSTER_SLOTS/8];
|
||||
char slaveof[CLUSTER_NAMELEN];
|
||||
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
|
||||
char notused1[34]; /* 34 bytes reserved for future usage. */
|
||||
char notused1[32]; /* 32 bytes reserved for future usage. */
|
||||
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
|
||||
uint16_t cport; /* Sender TCP cluster bus port */
|
||||
uint16_t flags; /* Sender node flags */
|
||||
unsigned char state; /* Cluster state from the POV of the sender */
|
||||
|
@ -2495,6 +2495,7 @@ standardConfig configs[] = {
|
||||
createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
|
||||
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, NULL), /* Default: Use +10000 offset. */
|
||||
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, NULL), /* Use server.port */
|
||||
createIntConfig("cluster-announce-tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_tls_port, 0, INTEGER_CONFIG, NULL, NULL), /* Use server.tls_port */
|
||||
createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("repl-ping-replica-period", "repl-ping-slave-period", MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_ping_slave_period, 10, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
|
@ -1548,6 +1548,7 @@ struct redisServer {
|
||||
if the master is in failure state. */
|
||||
char *cluster_announce_ip; /* IP address to announce on cluster bus. */
|
||||
int cluster_announce_port; /* base port to announce on cluster bus. */
|
||||
int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */
|
||||
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
|
||||
int cluster_module_flags; /* Set of flags that Redis modules are able
|
||||
to set in order to suppress certain
|
||||
|
@ -54,7 +54,17 @@ proc process_is_running {pid} {
|
||||
|
||||
set numkeys 50000
|
||||
set numops 200000
|
||||
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
|
||||
set start_node_port [get_instance_attrib redis 0 port]
|
||||
set cluster [redis_cluster 127.0.0.1:$start_node_port]
|
||||
if {$::tls} {
|
||||
# setup a non-TLS cluster client to the TLS cluster
|
||||
set plaintext_port [get_instance_attrib redis 0 plaintext-port]
|
||||
set cluster_plaintext [redis_cluster 127.0.0.1:$plaintext_port 0]
|
||||
puts "Testing TLS cluster on start node 127.0.0.1:$start_node_port, plaintext port $plaintext_port"
|
||||
} else {
|
||||
set cluster_plaintext $cluster
|
||||
puts "Testing using non-TLS cluster"
|
||||
}
|
||||
catch {unset content}
|
||||
array set content {}
|
||||
set tribpid {}
|
||||
@ -94,8 +104,11 @@ test "Cluster consistency during live resharding" {
|
||||
# This way we are able to stress Lua -> Redis command invocation
|
||||
# as well, that has tests to prevent Lua to write into wrong
|
||||
# hash slots.
|
||||
if {$listid % 2} {
|
||||
# We also use both TLS and plaintext connections.
|
||||
if {$listid % 3 == 0} {
|
||||
$cluster rpush $key $ele
|
||||
} elseif {$listid % 3 == 1} {
|
||||
$cluster_plaintext rpush $key $ele
|
||||
} else {
|
||||
$cluster eval {redis.call("rpush",KEYS[1],ARGV[1])} 1 $key $ele
|
||||
}
|
||||
|
@ -48,3 +48,16 @@ test "client can handle keys with hash tag" {
|
||||
$cluster set foo{tag} bar
|
||||
$cluster close
|
||||
}
|
||||
|
||||
if {$::tls} {
|
||||
test {CLUSTER SLOTS from non-TLS client in TLS cluster} {
|
||||
set slots_tls [R 0 cluster slots]
|
||||
set host [get_instance_attrib redis 0 host]
|
||||
set plaintext_port [get_instance_attrib redis 0 plaintext-port]
|
||||
set client_plain [redis $host $plaintext_port 0 0]
|
||||
set slots_plain [$client_plain cluster slots]
|
||||
$client_plain close
|
||||
# Compare the ports in the first row
|
||||
assert_no_match [lindex $slots_tls 0 3 1] [lindex $slots_plain 0 3 1]
|
||||
}
|
||||
}
|
||||
|
@ -64,6 +64,8 @@ proc exec_instance {type dirname cfgfile} {
|
||||
proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
|
||||
for {set j 0} {$j < $count} {incr j} {
|
||||
set port [find_available_port $base_port $::redis_port_count]
|
||||
# plaintext port (only used for TLS cluster)
|
||||
set pport 0
|
||||
# Create a directory for this instance.
|
||||
set dirname "${type}_${j}"
|
||||
lappend ::dirs $dirname
|
||||
@ -83,7 +85,9 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
|
||||
puts $cfg "tls-port $port"
|
||||
puts $cfg "tls-replication yes"
|
||||
puts $cfg "tls-cluster yes"
|
||||
puts $cfg "port 0"
|
||||
# plaintext port, only used by plaintext clients in a TLS cluster
|
||||
set pport [find_available_port $base_port $::redis_port_count]
|
||||
puts $cfg "port $pport"
|
||||
puts $cfg [format "tls-cert-file %s/../../tls/server.crt" [pwd]]
|
||||
puts $cfg [format "tls-key-file %s/../../tls/server.key" [pwd]]
|
||||
puts $cfg [format "tls-client-cert-file %s/../../tls/client.crt" [pwd]]
|
||||
@ -118,6 +122,8 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
|
||||
set cfg [open $cfgfile a+]
|
||||
if {$::tls} {
|
||||
puts $cfg "tls-port $port"
|
||||
set pport [find_available_port $base_port $::redis_port_count]
|
||||
puts $cfg "port $pport"
|
||||
} else {
|
||||
puts $cfg "port $port"
|
||||
}
|
||||
@ -143,6 +149,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
|
||||
pid $pid \
|
||||
host $::host \
|
||||
port $port \
|
||||
plaintext-port $pport \
|
||||
link $link \
|
||||
]
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
#
|
||||
# Example usage:
|
||||
#
|
||||
# set c [redis_cluster 127.0.0.1 6379 127.0.0.1 6380]
|
||||
# set c [redis_cluster {127.0.0.1:6379 127.0.0.1:6380}]
|
||||
# $c set foo
|
||||
# $c get foo
|
||||
# $c close
|
||||
@ -17,6 +17,7 @@ set ::redis_cluster::id 0
|
||||
array set ::redis_cluster::startup_nodes {}
|
||||
array set ::redis_cluster::nodes {}
|
||||
array set ::redis_cluster::slots {}
|
||||
array set ::redis_cluster::tls {}
|
||||
|
||||
# List of "plain" commands, which are commands where the sole key is always
|
||||
# the first argument.
|
||||
@ -34,11 +35,14 @@ set ::redis_cluster::plain_commands {
|
||||
dump bitcount bitpos pfadd pfcount
|
||||
}
|
||||
|
||||
proc redis_cluster {nodes} {
|
||||
# Create a cluster client. The nodes are given as a list of host:port. The TLS
|
||||
# parameter (1 or 0) is optional and defaults to the global $::tls.
|
||||
proc redis_cluster {nodes {tls -1}} {
|
||||
set id [incr ::redis_cluster::id]
|
||||
set ::redis_cluster::startup_nodes($id) $nodes
|
||||
set ::redis_cluster::nodes($id) {}
|
||||
set ::redis_cluster::slots($id) {}
|
||||
set ::redis_cluster::tls($id) [expr $tls == -1 ? $::tls : $tls]
|
||||
set handle [interp alias {} ::redis_cluster::instance$id {} ::redis_cluster::__dispatch__ $id]
|
||||
$handle refresh_nodes_map
|
||||
return $handle
|
||||
@ -60,9 +64,10 @@ proc ::redis_cluster::__method__refresh_nodes_map {id} {
|
||||
foreach start_node $::redis_cluster::startup_nodes($id) {
|
||||
set ip_port [lindex [split $start_node @] 0]
|
||||
lassign [split $ip_port :] start_host start_port
|
||||
set tls $::redis_cluster::tls($id)
|
||||
if {[catch {
|
||||
set r {}
|
||||
set r [redis $start_host $start_port 0 $::tls]
|
||||
set r [redis $start_host $start_port 0 $tls]
|
||||
set nodes_descr [$r cluster nodes]
|
||||
$r close
|
||||
} e]} {
|
||||
@ -107,7 +112,8 @@ proc ::redis_cluster::__method__refresh_nodes_map {id} {
|
||||
|
||||
# Connect to the node
|
||||
set link {}
|
||||
catch {set link [redis $host $port 0 $::tls]}
|
||||
set tls $::redis_cluster::tls($id)
|
||||
catch {set link [redis $host $port 0 $tls]}
|
||||
|
||||
# Build this node description as an hash.
|
||||
set node [dict create \
|
||||
@ -161,6 +167,7 @@ proc ::redis_cluster::__method__close {id} {
|
||||
catch {unset ::redis_cluster::startup_nodes($id)}
|
||||
catch {unset ::redis_cluster::nodes($id)}
|
||||
catch {unset ::redis_cluster::slots($id)}
|
||||
catch {unset ::redis_cluster::tls($id)}
|
||||
catch {interp alias {} ::redis_cluster::instance$id {}}
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ array set ::redis::addr {}
|
||||
array set ::redis::blocking {}
|
||||
array set ::redis::deferred {}
|
||||
array set ::redis::reconnect {}
|
||||
array set ::redis::tls {}
|
||||
array set ::redis::callback {}
|
||||
array set ::redis::state {} ;# State in non-blocking reply reading
|
||||
array set ::redis::statestack {} ;# Stack of states, for nested mbulks
|
||||
@ -58,7 +59,7 @@ proc redis {{server 127.0.0.1} {port 6379} {defer 0} {tls 0} {tlsoptions {}}} {
|
||||
set ::redis::blocking($id) 1
|
||||
set ::redis::deferred($id) $defer
|
||||
set ::redis::reconnect($id) 0
|
||||
set ::redis::tls $tls
|
||||
set ::redis::tls($id) $tls
|
||||
::redis::redis_reset_state $id
|
||||
interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
|
||||
}
|
||||
@ -83,7 +84,7 @@ proc ::redis::__dispatch__raw__ {id method argv} {
|
||||
# Reconnect the link if needed.
|
||||
if {$fd eq {}} {
|
||||
lassign $::redis::addr($id) host port
|
||||
if {$::redis::tls} {
|
||||
if {$::redis::tls($id)} {
|
||||
set ::redis::fd($id) [::tls::socket $host $port]
|
||||
} else {
|
||||
set ::redis::fd($id) [socket $host $port]
|
||||
@ -158,6 +159,7 @@ proc ::redis::__method__close {id fd} {
|
||||
catch {unset ::redis::blocking($id)}
|
||||
catch {unset ::redis::deferred($id)}
|
||||
catch {unset ::redis::reconnect($id)}
|
||||
catch {unset ::redis::tls($id)}
|
||||
catch {unset ::redis::state($id)}
|
||||
catch {unset ::redis::statestack($id)}
|
||||
catch {unset ::redis::callback($id)}
|
||||
|
Loading…
Reference in New Issue
Block a user