mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
Fix primary crash when processing dirty slots during shutdown wait / failover wait / client pause (#1131)
We have an assert in propagateNow. If the primary node receives a CLUSTER UPDATE such as dirty slots during SIGTERM waitting or during a manual failover pausing or during a client pause, the delKeysInSlot call will trigger this assert and cause primary crash. In this case, we added a new server_del_keys_in_slot state just like client_pause_in_transaction to track the state to avoid the assert in propagateNow, the dirty slots will be deleted in the end without affecting the data consistency. Signed-off-by: Binbin <binloveplay1314@qq.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
This commit is contained in:
parent
4e2493e5c9
commit
92181b6797
@ -6084,6 +6084,9 @@ void removeChannelsInSlot(unsigned int slot) {
|
|||||||
unsigned int delKeysInSlot(unsigned int hashslot) {
|
unsigned int delKeysInSlot(unsigned int hashslot) {
|
||||||
if (!countKeysInSlot(hashslot)) return 0;
|
if (!countKeysInSlot(hashslot)) return 0;
|
||||||
|
|
||||||
|
/* We may lose a slot during the pause. We need to track this
|
||||||
|
* state so that we don't assert in propagateNow(). */
|
||||||
|
server.server_del_keys_in_slot = 1;
|
||||||
unsigned int j = 0;
|
unsigned int j = 0;
|
||||||
|
|
||||||
kvstoreDictIterator *kvs_di = NULL;
|
kvstoreDictIterator *kvs_di = NULL;
|
||||||
@ -6108,6 +6111,8 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
|
|||||||
}
|
}
|
||||||
kvstoreReleaseDictIterator(kvs_di);
|
kvstoreReleaseDictIterator(kvs_di);
|
||||||
|
|
||||||
|
server.server_del_keys_in_slot = 0;
|
||||||
|
serverAssert(server.execution_nesting == 0);
|
||||||
return j;
|
return j;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4571,7 +4571,7 @@ static void pauseClientsByClient(mstime_t endTime, int isPauseClientAll) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Pause actions up to the specified unixtime (in ms) for a given type of
|
/* Pause actions up to the specified unixtime (in ms) for a given type of
|
||||||
* commands.
|
* purpose.
|
||||||
*
|
*
|
||||||
* A main use case of this function is to allow pausing replication traffic
|
* A main use case of this function is to allow pausing replication traffic
|
||||||
* so that a failover without data loss to occur. Replicas will continue to receive
|
* so that a failover without data loss to occur. Replicas will continue to receive
|
||||||
|
24
src/server.c
24
src/server.c
@ -3315,8 +3315,28 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
|
|||||||
if (!shouldPropagate(target)) return;
|
if (!shouldPropagate(target)) return;
|
||||||
|
|
||||||
/* This needs to be unreachable since the dataset should be fixed during
|
/* This needs to be unreachable since the dataset should be fixed during
|
||||||
* replica pause (otherwise data may be lost during a failover) */
|
* replica pause (otherwise data may be lost during a failover).
|
||||||
serverAssert(!(isPausedActions(PAUSE_ACTION_REPLICA) && (!server.client_pause_in_transaction)));
|
*
|
||||||
|
* Though, there are exceptions:
|
||||||
|
*
|
||||||
|
* 1. We allow write commands that were queued up before and after to
|
||||||
|
* execute, if a CLIENT PAUSE executed during a transaction, we will
|
||||||
|
* track the state, the CLIENT PAUSE takes effect only after a transaction
|
||||||
|
* has finished.
|
||||||
|
* 2. Primary loses a slot during the pause, deletes all keys and replicates
|
||||||
|
* DEL to its replicas. In this case, we will track the state, the dirty
|
||||||
|
* slots will be deleted in the end without affecting the data consistency.
|
||||||
|
*
|
||||||
|
* Note that case 2 can happen in one of the following scenarios:
|
||||||
|
* 1) The primary waits for the replica to replicate before exiting, see
|
||||||
|
* shutdown-timeout in conf for more details. In this case, primary lost
|
||||||
|
* a slot during the SIGTERM waiting.
|
||||||
|
* 2) The primary waits for the replica to replicate during a manual failover.
|
||||||
|
* In this case, primary lost a slot during the pausing.
|
||||||
|
* 3) The primary was paused by CLIENT PAUSE, and lost a slot during the
|
||||||
|
* pausing. */
|
||||||
|
serverAssert(!isPausedActions(PAUSE_ACTION_REPLICA) || server.client_pause_in_transaction ||
|
||||||
|
server.server_del_keys_in_slot);
|
||||||
|
|
||||||
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid, argv, argc);
|
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid, argv, argc);
|
||||||
if (target & PROPAGATE_REPL) replicationFeedReplicas(dbid, argv, argc);
|
if (target & PROPAGATE_REPL) replicationFeedReplicas(dbid, argv, argc);
|
||||||
|
@ -1701,6 +1701,7 @@ struct valkeyServer {
|
|||||||
const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
|
const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
|
||||||
char *ignore_warnings; /* Config: warnings that should be ignored. */
|
char *ignore_warnings; /* Config: warnings that should be ignored. */
|
||||||
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
|
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
|
||||||
|
int server_del_keys_in_slot; /* The server is deleting the keys in the dirty slot. */
|
||||||
int thp_enabled; /* If true, THP is enabled. */
|
int thp_enabled; /* If true, THP is enabled. */
|
||||||
size_t page_size; /* The page size of OS. */
|
size_t page_size; /* The page size of OS. */
|
||||||
/* Modules */
|
/* Modules */
|
||||||
@ -2863,7 +2864,7 @@ void flushReplicasOutputBuffers(void);
|
|||||||
void disconnectReplicas(void);
|
void disconnectReplicas(void);
|
||||||
void evictClients(void);
|
void evictClients(void);
|
||||||
int listenToPort(connListener *fds);
|
int listenToPort(connListener *fds);
|
||||||
void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions_bitmask);
|
void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions);
|
||||||
void unpauseActions(pause_purpose purpose);
|
void unpauseActions(pause_purpose purpose);
|
||||||
uint32_t isPausedActions(uint32_t action_bitmask);
|
uint32_t isPausedActions(uint32_t action_bitmask);
|
||||||
uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask);
|
uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask);
|
||||||
|
@ -59,3 +59,88 @@ start_cluster 2 2 {tags {external:skip cluster}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start_cluster 3 1 {tags {external:skip cluster} overrides {shutdown-timeout 100}} {
|
||||||
|
test "Primary lost a slot during the shutdown waiting" {
|
||||||
|
R 0 set FOO 0
|
||||||
|
|
||||||
|
# Pause the replica.
|
||||||
|
pause_process [srv -3 pid]
|
||||||
|
|
||||||
|
# Incr the key and immediately shutdown the primary.
|
||||||
|
# The primary waits for the replica to replicate before exiting.
|
||||||
|
R 0 incr FOO
|
||||||
|
exec kill -SIGTERM [srv 0 pid]
|
||||||
|
wait_for_condition 50 100 {
|
||||||
|
[s 0 shutdown_in_milliseconds] > 0
|
||||||
|
} else {
|
||||||
|
fail "Primary not indicating ongoing shutdown."
|
||||||
|
}
|
||||||
|
|
||||||
|
# Move the slot to other primary
|
||||||
|
R 1 cluster bumpepoch
|
||||||
|
R 1 cluster setslot [R 1 cluster keyslot FOO] node [R 1 cluster myid]
|
||||||
|
|
||||||
|
# Waiting for dirty slot update.
|
||||||
|
wait_for_log_messages 0 {"*Deleting keys in dirty slot*"} 0 1000 10
|
||||||
|
|
||||||
|
# Resume the replica and make sure primary exits normally instead of crashing.
|
||||||
|
resume_process [srv -3 pid]
|
||||||
|
wait_for_log_messages 0 {"*Valkey is now ready to exit, bye bye*"} 0 1000 10
|
||||||
|
|
||||||
|
# Make sure that the replica will become the new primary and does not own the key.
|
||||||
|
wait_for_condition 1000 50 {
|
||||||
|
[s -3 role] eq {master}
|
||||||
|
} else {
|
||||||
|
fail "The replica was not converted into primary"
|
||||||
|
}
|
||||||
|
assert_error {ERR no such key} {R 3 debug object foo}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start_cluster 3 1 {tags {external:skip cluster}} {
|
||||||
|
test "Primary lost a slot during the manual failover pausing" {
|
||||||
|
R 0 set FOO 0
|
||||||
|
|
||||||
|
# Set primaries to drop the FAILOVER_AUTH_REQUEST packets, so that
|
||||||
|
# primary 0 will pause until the failover times out.
|
||||||
|
R 1 debug drop-cluster-packet-filter 5
|
||||||
|
R 2 debug drop-cluster-packet-filter 5
|
||||||
|
|
||||||
|
# Replica doing the manual failover.
|
||||||
|
R 3 cluster failover
|
||||||
|
|
||||||
|
# Move the slot to other primary
|
||||||
|
R 1 cluster bumpepoch
|
||||||
|
R 1 cluster setslot [R 1 cluster keyslot FOO] node [R 1 cluster myid]
|
||||||
|
|
||||||
|
# Waiting for dirty slot update.
|
||||||
|
wait_for_log_messages 0 {"*Deleting keys in dirty slot*"} 0 1000 10
|
||||||
|
|
||||||
|
# Make sure primary doesn't crash when deleting the keys.
|
||||||
|
R 0 ping
|
||||||
|
|
||||||
|
R 1 debug drop-cluster-packet-filter -1
|
||||||
|
R 2 debug drop-cluster-packet-filter -1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start_cluster 3 1 {tags {external:skip cluster}} {
|
||||||
|
test "Primary lost a slot during the client pause command" {
|
||||||
|
R 0 set FOO 0
|
||||||
|
|
||||||
|
R 0 client pause 1000000000 write
|
||||||
|
|
||||||
|
# Move the slot to other primary
|
||||||
|
R 1 cluster bumpepoch
|
||||||
|
R 1 cluster setslot [R 1 cluster keyslot FOO] node [R 1 cluster myid]
|
||||||
|
|
||||||
|
# Waiting for dirty slot update.
|
||||||
|
wait_for_log_messages 0 {"*Deleting keys in dirty slot*"} 0 1000 10
|
||||||
|
|
||||||
|
# Make sure primary doesn't crash when deleting the keys.
|
||||||
|
R 0 ping
|
||||||
|
|
||||||
|
R 0 client unpause
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -260,6 +260,33 @@ start_server {tags {"pause network"}} {
|
|||||||
r client unpause
|
r client unpause
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "Test eviction is skipped during client pause" {
|
||||||
|
r flushall
|
||||||
|
set evicted_keys [s 0 evicted_keys]
|
||||||
|
|
||||||
|
r multi
|
||||||
|
r set foo{t} bar
|
||||||
|
r config set maxmemory-policy allkeys-random
|
||||||
|
r config set maxmemory 1
|
||||||
|
r client PAUSE 50000 WRITE
|
||||||
|
r exec
|
||||||
|
|
||||||
|
# No keys should actually have been evicted.
|
||||||
|
assert_match $evicted_keys [s 0 evicted_keys]
|
||||||
|
|
||||||
|
# The previous config set triggers a time event, but due to the pause,
|
||||||
|
# no eviction has been made. After the unpause, a eviction will happen.
|
||||||
|
r client unpause
|
||||||
|
wait_for_condition 1000 10 {
|
||||||
|
[expr $evicted_keys + 1] eq [s 0 evicted_keys]
|
||||||
|
} else {
|
||||||
|
fail "Key is not evicted"
|
||||||
|
}
|
||||||
|
|
||||||
|
r config set maxmemory 0
|
||||||
|
r config set maxmemory-policy noeviction
|
||||||
|
}
|
||||||
|
|
||||||
test "Test both active and passive expires are skipped during client pause" {
|
test "Test both active and passive expires are skipped during client pause" {
|
||||||
set expired_keys [s 0 expired_keys]
|
set expired_keys [s 0 expired_keys]
|
||||||
r multi
|
r multi
|
||||||
|
Loading…
Reference in New Issue
Block a user