diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index f1d3b878c..69af65f1e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6084,6 +6084,9 @@ void removeChannelsInSlot(unsigned int slot) { unsigned int delKeysInSlot(unsigned int hashslot) { if (!countKeysInSlot(hashslot)) return 0; + /* We may lose a slot during the pause. We need to track this + * state so that we don't assert in propagateNow(). */ + server.server_del_keys_in_slot = 1; unsigned int j = 0; kvstoreDictIterator *kvs_di = NULL; @@ -6108,6 +6111,8 @@ unsigned int delKeysInSlot(unsigned int hashslot) { } kvstoreReleaseDictIterator(kvs_di); + server.server_del_keys_in_slot = 0; + serverAssert(server.execution_nesting == 0); return j; } diff --git a/src/networking.c b/src/networking.c index 0db1fda8d..4791055b5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4571,7 +4571,7 @@ static void pauseClientsByClient(mstime_t endTime, int isPauseClientAll) { } /* Pause actions up to the specified unixtime (in ms) for a given type of - * commands. + * purpose. * * A main use case of this function is to allow pausing replication traffic * so that a failover without data loss to occur. Replicas will continue to receive diff --git a/src/server.c b/src/server.c index 884121969..12691df8e 100644 --- a/src/server.c +++ b/src/server.c @@ -3315,8 +3315,28 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { if (!shouldPropagate(target)) return; /* This needs to be unreachable since the dataset should be fixed during - * replica pause (otherwise data may be lost during a failover) */ - serverAssert(!(isPausedActions(PAUSE_ACTION_REPLICA) && (!server.client_pause_in_transaction))); + * replica pause (otherwise data may be lost during a failover). + * + * Though, there are exceptions: + * + * 1. We allow write commands that were queued up before and after to + * execute, if a CLIENT PAUSE executed during a transaction, we will + * track the state, the CLIENT PAUSE takes effect only after a transaction + * has finished. + * 2. Primary loses a slot during the pause, deletes all keys and replicates + * DEL to its replicas. In this case, we will track the state, the dirty + * slots will be deleted in the end without affecting the data consistency. + * + * Note that case 2 can happen in one of the following scenarios: + * 1) The primary waits for the replica to replicate before exiting, see + * shutdown-timeout in conf for more details. In this case, primary lost + * a slot during the SIGTERM waiting. + * 2) The primary waits for the replica to replicate during a manual failover. + * In this case, primary lost a slot during the pausing. + * 3) The primary was paused by CLIENT PAUSE, and lost a slot during the + * pausing. */ + serverAssert(!isPausedActions(PAUSE_ACTION_REPLICA) || server.client_pause_in_transaction || + server.server_del_keys_in_slot); if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid, argv, argc); if (target & PROPAGATE_REPL) replicationFeedReplicas(dbid, argv, argc); diff --git a/src/server.h b/src/server.h index c7a9806ca..5ef04a908 100644 --- a/src/server.h +++ b/src/server.h @@ -1701,6 +1701,7 @@ struct valkeyServer { const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */ char *ignore_warnings; /* Config: warnings that should be ignored. */ int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ + int server_del_keys_in_slot; /* The server is deleting the keys in the dirty slot. */ int thp_enabled; /* If true, THP is enabled. */ size_t page_size; /* The page size of OS. */ /* Modules */ @@ -2863,7 +2864,7 @@ void flushReplicasOutputBuffers(void); void disconnectReplicas(void); void evictClients(void); int listenToPort(connListener *fds); -void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions_bitmask); +void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions); void unpauseActions(pause_purpose purpose); uint32_t isPausedActions(uint32_t action_bitmask); uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask); diff --git a/tests/unit/cluster/slot-ownership.tcl b/tests/unit/cluster/slot-ownership.tcl index 0f3e3cc4f..0073c2904 100644 --- a/tests/unit/cluster/slot-ownership.tcl +++ b/tests/unit/cluster/slot-ownership.tcl @@ -59,3 +59,88 @@ start_cluster 2 2 {tags {external:skip cluster}} { } } } + +start_cluster 3 1 {tags {external:skip cluster} overrides {shutdown-timeout 100}} { + test "Primary lost a slot during the shutdown waiting" { + R 0 set FOO 0 + + # Pause the replica. + pause_process [srv -3 pid] + + # Incr the key and immediately shutdown the primary. + # The primary waits for the replica to replicate before exiting. + R 0 incr FOO + exec kill -SIGTERM [srv 0 pid] + wait_for_condition 50 100 { + [s 0 shutdown_in_milliseconds] > 0 + } else { + fail "Primary not indicating ongoing shutdown." + } + + # Move the slot to other primary + R 1 cluster bumpepoch + R 1 cluster setslot [R 1 cluster keyslot FOO] node [R 1 cluster myid] + + # Waiting for dirty slot update. + wait_for_log_messages 0 {"*Deleting keys in dirty slot*"} 0 1000 10 + + # Resume the replica and make sure primary exits normally instead of crashing. + resume_process [srv -3 pid] + wait_for_log_messages 0 {"*Valkey is now ready to exit, bye bye*"} 0 1000 10 + + # Make sure that the replica will become the new primary and does not own the key. + wait_for_condition 1000 50 { + [s -3 role] eq {master} + } else { + fail "The replica was not converted into primary" + } + assert_error {ERR no such key} {R 3 debug object foo} + } +} + +start_cluster 3 1 {tags {external:skip cluster}} { + test "Primary lost a slot during the manual failover pausing" { + R 0 set FOO 0 + + # Set primaries to drop the FAILOVER_AUTH_REQUEST packets, so that + # primary 0 will pause until the failover times out. + R 1 debug drop-cluster-packet-filter 5 + R 2 debug drop-cluster-packet-filter 5 + + # Replica doing the manual failover. + R 3 cluster failover + + # Move the slot to other primary + R 1 cluster bumpepoch + R 1 cluster setslot [R 1 cluster keyslot FOO] node [R 1 cluster myid] + + # Waiting for dirty slot update. + wait_for_log_messages 0 {"*Deleting keys in dirty slot*"} 0 1000 10 + + # Make sure primary doesn't crash when deleting the keys. + R 0 ping + + R 1 debug drop-cluster-packet-filter -1 + R 2 debug drop-cluster-packet-filter -1 + } +} + +start_cluster 3 1 {tags {external:skip cluster}} { + test "Primary lost a slot during the client pause command" { + R 0 set FOO 0 + + R 0 client pause 1000000000 write + + # Move the slot to other primary + R 1 cluster bumpepoch + R 1 cluster setslot [R 1 cluster keyslot FOO] node [R 1 cluster myid] + + # Waiting for dirty slot update. + wait_for_log_messages 0 {"*Deleting keys in dirty slot*"} 0 1000 10 + + # Make sure primary doesn't crash when deleting the keys. + R 0 ping + + R 0 client unpause + } +} diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl index 38c13afc4..b18a32d48 100644 --- a/tests/unit/pause.tcl +++ b/tests/unit/pause.tcl @@ -260,6 +260,33 @@ start_server {tags {"pause network"}} { r client unpause } + test "Test eviction is skipped during client pause" { + r flushall + set evicted_keys [s 0 evicted_keys] + + r multi + r set foo{t} bar + r config set maxmemory-policy allkeys-random + r config set maxmemory 1 + r client PAUSE 50000 WRITE + r exec + + # No keys should actually have been evicted. + assert_match $evicted_keys [s 0 evicted_keys] + + # The previous config set triggers a time event, but due to the pause, + # no eviction has been made. After the unpause, a eviction will happen. + r client unpause + wait_for_condition 1000 10 { + [expr $evicted_keys + 1] eq [s 0 evicted_keys] + } else { + fail "Key is not evicted" + } + + r config set maxmemory 0 + r config set maxmemory-policy noeviction + } + test "Test both active and passive expires are skipped during client pause" { set expired_keys [s 0 expired_keys] r multi