mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
Release clients blocked on module commands in cluster resharding and down state (#9483)
Prevent clients from being blocked forever in cluster when they block with their own module command and the hash slot is migrated to another master at the same time. These will get a redirection message when unblocked. Also, release clients blocked on module commands when cluster is down (same as other blocked clients) This commit adds basic tests for the main (non-cluster) redis test infra that test the cluster. This was done because the cluster test infra can't handle some common test features, but most importantly we only build the test modules with the non-cluster test suite. note that rather than really supporting cluster operations by the test infra, it was added (as dup code) in two files, one for module tests and one for non-modules tests, maybe in the future we'll refactor that. Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
c9fabc2ef0
commit
4962c5526d
@ -40,5 +40,6 @@ $TCLSH tests/test_helper.tcl \
|
||||
--single unit/moduleapi/list \
|
||||
--single unit/moduleapi/stream \
|
||||
--single unit/moduleapi/datatype2 \
|
||||
--single unit/moduleapi/cluster \
|
||||
--single unit/moduleapi/aclcheck \
|
||||
"${@}"
|
||||
|
@ -6077,7 +6077,8 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
|
||||
if (c->flags & CLIENT_BLOCKED &&
|
||||
(c->btype == BLOCKED_LIST ||
|
||||
c->btype == BLOCKED_ZSET ||
|
||||
c->btype == BLOCKED_STREAM))
|
||||
c->btype == BLOCKED_STREAM ||
|
||||
c->btype == BLOCKED_MODULE))
|
||||
{
|
||||
dictEntry *de;
|
||||
dictIterator *di;
|
||||
@ -6091,6 +6092,11 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* If the client is blocked on module, but ont on a specific key,
|
||||
* don't unblock it (except for the CLSUTER_FAIL case above). */
|
||||
if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
|
||||
return 0;
|
||||
|
||||
/* All keys must belong to the same slot, so check first key only. */
|
||||
di = dictGetIterator(c->bpop.keys);
|
||||
if ((de = dictNext(di)) != NULL) {
|
||||
|
@ -470,19 +470,19 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (fsltype == NULL)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",1,2,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall,
|
||||
|
@ -82,6 +82,7 @@ set ::all_tests {
|
||||
unit/oom-score-adj
|
||||
unit/shutdown
|
||||
unit/networking
|
||||
unit/cluster
|
||||
unit/client-eviction
|
||||
}
|
||||
# Index to the next test to run in the ::all_tests list.
|
||||
|
151
tests/unit/cluster.tcl
Normal file
151
tests/unit/cluster.tcl
Normal file
@ -0,0 +1,151 @@
|
||||
# Primitive tests on cluster-enabled redis using redis-cli
|
||||
|
||||
source tests/support/cli.tcl
|
||||
|
||||
proc cluster_info {r field} {
|
||||
if {[regexp "^$field:(.*?)\r\n" [$r cluster info] _ value]} {
|
||||
set _ $value
|
||||
}
|
||||
}
|
||||
|
||||
# Provide easy access to CLUSTER INFO properties. Same semantic as "proc s".
|
||||
proc csi {args} {
|
||||
set level 0
|
||||
if {[string is integer [lindex $args 0]]} {
|
||||
set level [lindex $args 0]
|
||||
set args [lrange $args 1 end]
|
||||
}
|
||||
cluster_info [srv $level "client"] [lindex $args 0]
|
||||
}
|
||||
|
||||
# make sure the test infra won't use SELECT
|
||||
set ::singledb 1
|
||||
|
||||
# start three servers
|
||||
start_server {overrides {cluster-enabled yes cluster-node-timeout 1} tags {"external:skip cluster"}} {
|
||||
start_server {overrides {cluster-enabled yes cluster-node-timeout 1} tags {"external:skip cluster"}} {
|
||||
start_server {overrides {cluster-enabled yes cluster-node-timeout 1} tags {"external:skip cluster"}} {
|
||||
|
||||
set node1 [srv 0 client]
|
||||
set node2 [srv -1 client]
|
||||
set node3 [srv -2 client]
|
||||
set node3_pid [srv -2 pid]
|
||||
|
||||
test {Create 3 node cluster} {
|
||||
exec src/redis-cli --cluster-yes --cluster create \
|
||||
127.0.0.1:[srv 0 port] \
|
||||
127.0.0.1:[srv -1 port] \
|
||||
127.0.0.1:[srv -2 port]
|
||||
|
||||
wait_for_condition 1000 50 {
|
||||
[csi 0 cluster_state] eq {ok} &&
|
||||
[csi -1 cluster_state] eq {ok} &&
|
||||
[csi -2 cluster_state] eq {ok}
|
||||
} else {
|
||||
fail "Cluster doesn't stabilize"
|
||||
}
|
||||
}
|
||||
|
||||
test "Run blocking command on cluster node3" {
|
||||
# key9184688 is mapped to slot 10923 (first slot of node 3)
|
||||
set node3_rd [redis_deferring_client -2]
|
||||
$node3_rd brpop key9184688 0
|
||||
$node3_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s -2 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Client not blocked"
|
||||
}
|
||||
}
|
||||
|
||||
test "Perform a Resharding" {
|
||||
exec src/redis-cli --cluster-yes --cluster reshard 127.0.0.1:[srv -2 port] \
|
||||
--cluster-to [$node1 cluster myid] \
|
||||
--cluster-from [$node3 cluster myid] \
|
||||
--cluster-slots 1
|
||||
}
|
||||
|
||||
test "Verify command got unblocked after resharding" {
|
||||
# this (read) will wait for the node3 to realize the new topology
|
||||
assert_error {*MOVED*} {$node3_rd read}
|
||||
|
||||
# verify there are no blocked clients
|
||||
assert_equal [s 0 blocked_clients] {0}
|
||||
assert_equal [s -1 blocked_clients] {0}
|
||||
assert_equal [s -2 blocked_clients] {0}
|
||||
}
|
||||
|
||||
test "Wait for cluster to be stable" {
|
||||
wait_for_condition 1000 50 {
|
||||
[catch {exec src/redis-cli --cluster \
|
||||
check 127.0.0.1:[srv 0 port] \
|
||||
}] == 0
|
||||
} else {
|
||||
fail "Cluster doesn't stabilize"
|
||||
}
|
||||
}
|
||||
|
||||
test "Sanity test push cmd after resharding" {
|
||||
assert_error {*MOVED*} {$node3 lpush key9184688 v1}
|
||||
|
||||
set node1_rd [redis_deferring_client 0]
|
||||
$node1_rd brpop key9184688 0
|
||||
$node1_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
puts "Client not blocked"
|
||||
puts "read from blocked client: [$node1_rd read]"
|
||||
fail "Client not blocked"
|
||||
}
|
||||
|
||||
$node1 lpush key9184688 v2
|
||||
assert_equal {key9184688 v2} [$node1_rd read]
|
||||
}
|
||||
|
||||
$node1_rd close
|
||||
$node3_rd close
|
||||
|
||||
test "Run blocking command again on cluster node1" {
|
||||
$node1 del key9184688
|
||||
# key9184688 is mapped to slot 10923 which has been moved to node1
|
||||
set node1_rd [redis_deferring_client 0]
|
||||
$node1_rd brpop key9184688 0
|
||||
$node1_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Client not blocked"
|
||||
}
|
||||
}
|
||||
|
||||
test "Kill a cluster node and wait for fail state" {
|
||||
# kill node3 in cluster
|
||||
exec kill -SIGSTOP $node3_pid
|
||||
|
||||
wait_for_condition 1000 50 {
|
||||
[csi 0 cluster_state] eq {fail} &&
|
||||
[csi -1 cluster_state] eq {fail}
|
||||
} else {
|
||||
fail "Cluster doesn't fail"
|
||||
}
|
||||
}
|
||||
|
||||
test "Verify command got unblocked after cluster failure" {
|
||||
assert_error {*CLUSTERDOWN*} {$node1_rd read}
|
||||
|
||||
# verify there are no blocked clients
|
||||
assert_equal [s 0 blocked_clients] {0}
|
||||
assert_equal [s -1 blocked_clients] {0}
|
||||
}
|
||||
|
||||
exec kill -SIGCONT $node3_pid
|
||||
$node1_rd close
|
||||
|
||||
# stop three servers
|
||||
}
|
||||
}
|
||||
}
|
202
tests/unit/moduleapi/cluster.tcl
Normal file
202
tests/unit/moduleapi/cluster.tcl
Normal file
@ -0,0 +1,202 @@
|
||||
# Primitive tests on cluster-enabled redis with modules using redis-cli
|
||||
|
||||
source tests/support/cli.tcl
|
||||
|
||||
proc cluster_info {r field} {
|
||||
if {[regexp "^$field:(.*?)\r\n" [$r cluster info] _ value]} {
|
||||
set _ $value
|
||||
}
|
||||
}
|
||||
|
||||
# Provide easy access to CLUSTER INFO properties. Same semantic as "proc s".
|
||||
proc csi {args} {
|
||||
set level 0
|
||||
if {[string is integer [lindex $args 0]]} {
|
||||
set level [lindex $args 0]
|
||||
set args [lrange $args 1 end]
|
||||
}
|
||||
cluster_info [srv $level "client"] [lindex $args 0]
|
||||
}
|
||||
|
||||
set testmodule [file normalize tests/modules/blockonkeys.so]
|
||||
set testmodule_nokey [file normalize tests/modules/blockonbackground.so]
|
||||
|
||||
# make sure the test infra won't use SELECT
|
||||
set ::singledb 1
|
||||
|
||||
# start three servers
|
||||
start_server {overrides {cluster-enabled yes cluster-node-timeout 1} tags {"external:skip cluster modules"}} {
|
||||
start_server {overrides {cluster-enabled yes cluster-node-timeout 1} tags {"external:skip cluster modules"}} {
|
||||
start_server {overrides {cluster-enabled yes cluster-node-timeout 1} tags {"external:skip cluster modules"}} {
|
||||
|
||||
set node1 [srv 0 client]
|
||||
set node2 [srv -1 client]
|
||||
set node3 [srv -2 client]
|
||||
set node3_pid [srv -2 pid]
|
||||
|
||||
$node1 module load $testmodule
|
||||
$node2 module load $testmodule
|
||||
$node3 module load $testmodule
|
||||
|
||||
$node1 module load $testmodule_nokey
|
||||
$node2 module load $testmodule_nokey
|
||||
$node3 module load $testmodule_nokey
|
||||
|
||||
test {Create 3 node cluster} {
|
||||
exec src/redis-cli --cluster-yes --cluster create \
|
||||
127.0.0.1:[srv 0 port] \
|
||||
127.0.0.1:[srv -1 port] \
|
||||
127.0.0.1:[srv -2 port]
|
||||
|
||||
wait_for_condition 1000 50 {
|
||||
[csi 0 cluster_state] eq {ok} &&
|
||||
[csi -1 cluster_state] eq {ok} &&
|
||||
[csi -2 cluster_state] eq {ok}
|
||||
} else {
|
||||
fail "Cluster doesn't stabilize"
|
||||
}
|
||||
}
|
||||
|
||||
test "Run blocking command (blocked on key) on cluster node3" {
|
||||
# key9184688 is mapped to slot 10923 (first slot of node 3)
|
||||
set node3_rd [redis_deferring_client -2]
|
||||
$node3_rd fsl.bpop key9184688 0
|
||||
$node3_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s -2 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Client executing blocking command (blocked on key) not blocked"
|
||||
}
|
||||
}
|
||||
|
||||
test "Run blocking command (no keys) on cluster node2" {
|
||||
set node2_rd [redis_deferring_client -1]
|
||||
$node2_rd block.block 0
|
||||
$node2_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s -1 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Client executing blocking command (no keys) not blocked"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test "Perform a Resharding" {
|
||||
exec src/redis-cli --cluster-yes --cluster reshard 127.0.0.1:[srv -2 port] \
|
||||
--cluster-to [$node1 cluster myid] \
|
||||
--cluster-from [$node3 cluster myid] \
|
||||
--cluster-slots 1
|
||||
}
|
||||
|
||||
test "Verify command (no keys) is unaffected after resharding" {
|
||||
# verify there are blocked clients on node2
|
||||
assert_equal [s -1 blocked_clients] {1}
|
||||
|
||||
#release client
|
||||
$node2 block.release 0
|
||||
}
|
||||
|
||||
test "Verify command (blocked on key) got unblocked after resharding" {
|
||||
# this (read) will wait for the node3 to realize the new topology
|
||||
assert_error {*MOVED*} {$node3_rd read}
|
||||
|
||||
# verify there are no blocked clients
|
||||
assert_equal [s 0 blocked_clients] {0}
|
||||
assert_equal [s -1 blocked_clients] {0}
|
||||
assert_equal [s -2 blocked_clients] {0}
|
||||
}
|
||||
|
||||
test "Wait for cluster to be stable" {
|
||||
wait_for_condition 1000 50 {
|
||||
[catch {exec src/redis-cli --cluster \
|
||||
check 127.0.0.1:[srv 0 port] \
|
||||
}] == 0
|
||||
} else {
|
||||
fail "Cluster doesn't stabilize"
|
||||
}
|
||||
}
|
||||
|
||||
test "Sanity test push cmd after resharding" {
|
||||
assert_error {*MOVED*} {$node3 fsl.push key9184688 1}
|
||||
|
||||
set node1_rd [redis_deferring_client 0]
|
||||
$node1_rd fsl.bpop key9184688 0
|
||||
$node1_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
puts "Client not blocked"
|
||||
puts "read from blocked client: [$node1_rd read]"
|
||||
fail "Client not blocked"
|
||||
}
|
||||
|
||||
$node1 fsl.push key9184688 2
|
||||
assert_equal {2} [$node1_rd read]
|
||||
}
|
||||
|
||||
$node1_rd close
|
||||
$node2_rd close
|
||||
$node3_rd close
|
||||
|
||||
test "Run blocking command (blocked on key) again on cluster node1" {
|
||||
$node1 del key9184688
|
||||
# key9184688 is mapped to slot 10923 which has been moved to node1
|
||||
set node1_rd [redis_deferring_client 0]
|
||||
$node1_rd fsl.bpop key9184688 0
|
||||
$node1_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Client executing blocking command (blocked on key) again not blocked"
|
||||
}
|
||||
}
|
||||
|
||||
test "Run blocking command (no keys) again on cluster node2" {
|
||||
set node2_rd [redis_deferring_client -1]
|
||||
|
||||
$node2_rd block.block 0
|
||||
$node2_rd flush
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s -1 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Client executing blocking command (no keys) again not blocked"
|
||||
}
|
||||
}
|
||||
|
||||
test "Kill a cluster node and wait for fail state" {
|
||||
# kill node3 in cluster
|
||||
exec kill -SIGSTOP $node3_pid
|
||||
|
||||
wait_for_condition 1000 50 {
|
||||
[csi 0 cluster_state] eq {fail} &&
|
||||
[csi -1 cluster_state] eq {fail}
|
||||
} else {
|
||||
fail "Cluster doesn't fail"
|
||||
}
|
||||
}
|
||||
|
||||
test "Verify command (blocked on key) got unblocked after cluster failure" {
|
||||
assert_error {*CLUSTERDOWN*} {$node1_rd read}
|
||||
}
|
||||
|
||||
test "Verify command (no keys) got unblocked after cluster failure" {
|
||||
assert_error {*CLUSTERDOWN*} {$node2_rd read}
|
||||
|
||||
# verify there are no blocked clients
|
||||
assert_equal [s 0 blocked_clients] {0}
|
||||
assert_equal [s -1 blocked_clients] {0}
|
||||
}
|
||||
|
||||
exec kill -SIGCONT $node3_pid
|
||||
$node1_rd close
|
||||
$node2_rd close
|
||||
|
||||
# stop three servers
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user