Replicate slot migration states via RDB aux fields (#586)

This commit is contained in:
Ping Xie 2024-06-07 20:32:27 -07:00 committed by GitHub
parent 54c9747935
commit aad6769a80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 145 additions and 46 deletions

1
.gitignore vendored
View File

@ -45,4 +45,5 @@ redis.code-workspace
.cache
.cscope*
.swp
nodes.conf
tests/cluster/tmp/*

View File

@ -103,7 +103,6 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);

View File

@ -113,6 +113,8 @@ int auxTlsPortPresent(clusterNode *n);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
@ -1014,6 +1016,10 @@ void clusterInit(void) {
exit(1);
}
/* Register our own rdb aux fields */
serverAssert(rdbRegisterAuxField("cluster-slot-states", clusterEncodeOpenSlotsAuxField,
clusterDecodeOpenSlotsAuxField) == C_OK);
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
@ -6523,39 +6529,82 @@ int detectAndUpdateCachedNodeHealth(void) {
return overall_health_changed;
}
/* Replicate migrating and importing slot states to all replicas */
void clusterReplicateOpenSlots(void) {
if (!server.cluster_enabled) return;
/* Encode open slot states into an sds string to be persisted as an aux field in RDB. */
sds clusterEncodeOpenSlotsAuxField(int rdbflags) {
if (!server.cluster_enabled) return NULL;
int argc = 5;
robj **argv = zmalloc(sizeof(robj *) * argc);
/* Open slots should not be persisted to an RDB file. This data is intended only for full sync. */
if ((rdbflags & RDBFLAGS_REPLICATION) == 0) return NULL;
argv[0] = shared.cluster;
argv[1] = shared.setslot;
sds s = NULL;
for (int i = 0; i < 2; i++) {
clusterNode **nodes_ptr = NULL;
clusterNode **nodes_ptr;
if (i == 0) {
nodes_ptr = server.cluster->importing_slots_from;
argv[3] = shared.importing;
} else {
nodes_ptr = server.cluster->migrating_slots_to;
argv[3] = shared.migrating;
}
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (nodes_ptr[j] == NULL) continue;
argv[2] = createStringObjectFromLongLongForValue(j);
sds name = sdsnewlen(nodes_ptr[j]->name, sizeof(nodes_ptr[j]->name));
argv[4] = createObject(OBJ_STRING, name);
replicationFeedReplicas(0, argv, argc);
decrRefCount(argv[2]);
decrRefCount(argv[4]);
if (s == NULL) s = sdsempty();
s = sdscatfmt(s, "%i%s", j, (i == 0) ? "<" : ">");
s = sdscatlen(s, nodes_ptr[j]->name, CLUSTER_NAMELEN);
s = sdscatlen(s, ",", 1);
}
}
zfree(argv);
return s;
}
/* Decode the open slot aux field and restore the in-memory slot states. */
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {
if (!server.cluster_enabled || s == NULL) return C_OK;
/* Open slots should not be loaded from a persisted RDB file, but only from a full sync. */
if ((rdbflags & RDBFLAGS_REPLICATION) == 0) return C_OK;
while (*s) {
/* Extract slot number */
int slot = atoi(s);
if (slot < 0 || slot >= CLUSTER_SLOTS) return C_ERR;
while (*s && *s != '<' && *s != '>') s++;
if (*s != '<' && *s != '>') return C_ERR;
/* Determine if it's an importing or migrating slot */
int is_importing = (*s == '<');
s++;
/* Extract the node name */
char node_name[CLUSTER_NAMELEN];
int k = 0;
while (*s && *s != ',' && k < CLUSTER_NAMELEN) {
node_name[k++] = *s++;
}
/* Ensure the node name is of the correct length */
if (k != CLUSTER_NAMELEN || *s != ',') return C_ERR;
/* Move to the next slot */
s++;
/* Find the corresponding node */
clusterNode *node = clusterLookupNode(node_name, CLUSTER_NAMELEN);
if (!node) {
/* Create a new node if not found */
node = createClusterNode(node_name, 0);
clusterAddNode(node);
}
/* Set the slot state */
if (is_importing) {
server.cluster->importing_slots_from[slot] = node;
} else {
server.cluster->migrating_slots_to[slot] = node;
}
}
return C_OK;
}

View File

@ -46,8 +46,6 @@ uint64_t dictStringHash(const void *key) {
return dictGenHashFunction(key, strlen(key));
}
void dictVanillaFree(dict *d, void *val);
dictType latencyTimeSeriesDictType = {
dictStringHash, /* hash function */
NULL, /* key dup */

View File

@ -36,6 +36,7 @@
#include "functions.h"
#include "intset.h" /* Compact integer set structure */
#include "bio.h"
#include "zmalloc.h"
#include <math.h>
#include <fcntl.h>
@ -107,6 +108,30 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
exit(1);
}
typedef struct {
rdbAuxFieldEncoder encoder;
rdbAuxFieldDecoder decoder;
} rdbAuxFieldCodec;
dictType rdbAuxFieldDictType = {
dictSdsCaseHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCaseCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictVanillaFree, /* val destructor */
NULL /* allow to expand */
};
dict *rdbAuxFields = NULL;
int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder) {
if (rdbAuxFields == NULL) rdbAuxFields = dictCreate(&rdbAuxFieldDictType);
rdbAuxFieldCodec *codec = zmalloc(sizeof(rdbAuxFieldCodec));
codec->encoder = encoder;
codec->decoder = decoder;
return dictAdd(rdbAuxFields, sdsnew(auxfield), (void *)codec) == DICT_OK ? C_OK : C_ERR;
}
ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
if (rdb && rioWrite(rdb, p, len) == 0) return -1;
return len;
@ -1186,6 +1211,24 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if (rdbSaveAuxFieldStrInt(rdb, "repl-offset", server.primary_repl_offset) == -1) return -1;
}
if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1;
/* Handle additional dynamic aux fields */
if (rdbAuxFields != NULL) {
dictIterator di;
dictInitIterator(&di, rdbAuxFields);
dictEntry *de;
while ((de = dictNext(&di)) != NULL) {
rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de);
sds s = codec->encoder(rdbflags);
if (s == NULL) continue;
if (rdbSaveAuxFieldStrStr(rdb, dictGetKey(de), s) == -1) {
sdsfree(s);
return -1;
}
sdsfree(s);
}
}
return 1;
}
@ -3099,9 +3142,22 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
} else if (!strcasecmp(auxkey->ptr, "redis-bits")) {
/* Just ignored. */
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr);
/* Check if this is a dynamic aux field */
int handled = 0;
if (rdbAuxFields != NULL) {
dictEntry *de = dictFind(rdbAuxFields, auxkey->ptr);
if (de != NULL) {
handled = 1;
rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de);
if (codec->decoder(rdbflags, auxval->ptr) < 0) goto eoferr;
}
}
if (!handled) {
/* We ignore fields we don't understand, as by AUX field
* contract. */
serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr);
}
}
decrRefCount(auxkey);

View File

@ -1273,8 +1273,6 @@ int replicaPutOnline(client *replica) {
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE, VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL);
serverLog(LL_NOTICE, "Synchronization with replica %s succeeded", replicationGetReplicaName(replica));
/* Replicate slot being migrated/imported to the new replica */
clusterReplicateOpenSlots();
return 1;
}

View File

@ -1914,10 +1914,6 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*", 1);
shared.special_equals = createStringObject("=", 1);
shared.redacted = makeObjectShared(createStringObject("(redacted)", 10));
shared.cluster = createStringObject("CLUSTER", 7);
shared.setslot = createStringObject("SETSLOT", 7);
shared.importing = createStringObject("IMPORTING", 9);
shared.migrating = createStringObject("MIGRATING", 9);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] = makeObjectShared(createObject(OBJ_STRING, (void *)(long)j));

View File

@ -1032,6 +1032,9 @@ typedef struct rdbLoadingCtx {
functionsLibCtx *functions_lib_ctx;
} rdbLoadingCtx;
typedef sds (*rdbAuxFieldEncoder)(int flags);
typedef int (*rdbAuxFieldDecoder)(int flags, sds s);
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
@ -1368,11 +1371,11 @@ struct sharedObjectsStruct {
*xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl,
*retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack,
*special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk,
*smessagebulk, *cluster, *setslot, *importing, *migrating, *select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
*sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
*smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
*sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
sds minstring, maxstring;
};
@ -2603,6 +2606,7 @@ int serverSetProcTitle(char *title);
int validateProcTitleTemplate(const char *template);
int serverCommunicateSystemd(const char *sd_notify_msg);
void serverSetCpuAffinity(const char *cpulist);
void dictVanillaFree(dict *d, void *val);
/* afterErrorReply flags */
#define ERR_REPLY_FLAG_NO_STATS_UPDATE \
@ -2904,6 +2908,7 @@ void rebaseReplicationBuffer(long long base_repl_offset);
void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder);
void clearFailoverState(void);
void updateFailoverStatus(void);
void abortFailover(const char *err);

View File

@ -371,3 +371,7 @@ proc check_cluster_node_mark {flag ref_node_index instance_id_to_check} {
}
fail "Unable to find instance id in cluster nodes. ID: $instance_id_to_check"
}
proc get_slot_field {slot_output shard_id node_id attrib_id} {
return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
}

View File

@ -1,7 +1,3 @@
proc get_slot_field {slot_output shard_id node_id attrib_id} {
return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
}
# Start a cluster with 3 masters and 4 replicas.
# These tests rely on specific node ordering, so make sure no node fails over.
start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-replica-no-failover yes}} {

View File

@ -15,9 +15,6 @@ proc get_cluster_role {srv_idx} {
}
proc wait_for_role {srv_idx role} {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
# wait for a gossip cycle for states to be propagated throughout the cluster
after $node_timeout
wait_for_condition 100 100 {
[lindex [split [R $srv_idx ROLE] " "] 0] eq $role
} else {
@ -198,7 +195,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
assert_equal {OK} [R 3 CLUSTER REPLICATE $R0_id]
wait_for_role 3 slave
# Validate that R3 now sees slot 609 open
assert_equal [get_open_slots 3] "\[609->-$R1_id\]"
wait_for_slot_state 3 "\[609->-$R1_id\]"
}
test "New replica inherits importing slot" {
@ -212,7 +209,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
assert_equal {OK} [R 4 CLUSTER REPLICATE $R1_id]
wait_for_role 4 slave
# Validate that R4 now sees slot 609 open
assert_equal [get_open_slots 4] "\[609-<-$R0_id\]"
wait_for_slot_state 4 "\[609-<-$R0_id\]"
}
}