mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 09:17:20 +00:00
Check other blocked clients when value could not be pushed
This commit is contained in:
parent
ac06fc011d
commit
8a88c368ed
44
src/t_list.c
44
src/t_list.c
@ -780,35 +780,53 @@ void unblockClientWaitingData(redisClient *c) {
|
||||
int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
||||
struct dictEntry *de;
|
||||
redisClient *receiver;
|
||||
list *l;
|
||||
int numclients;
|
||||
list *clients;
|
||||
listNode *ln;
|
||||
robj *dstkey, *dstobj;
|
||||
|
||||
de = dictFind(c->db->blocking_keys,key);
|
||||
if (de == NULL) return 0;
|
||||
l = dictGetEntryVal(de);
|
||||
ln = listFirst(l);
|
||||
clients = dictGetEntryVal(de);
|
||||
numclients = listLength(clients);
|
||||
|
||||
/* Try to handle the push as long as there are clients waiting for a push.
|
||||
* Note that "numclients" is used because the list of clients waiting for a
|
||||
* push on "key" is deleted by unblockClient() when empty.
|
||||
*
|
||||
* This loop will have more than 1 iteration when there is a BRPOPLPUSH
|
||||
* that cannot push the target list because it does not contain a list. If
|
||||
* this happens, it simply tries the next client waiting for a push. */
|
||||
while (numclients--) {
|
||||
ln = listFirst(clients);
|
||||
redisAssert(ln != NULL);
|
||||
receiver = ln->value;
|
||||
dstkey = receiver->bpop.target;
|
||||
|
||||
robj *target = receiver->bpop.target;
|
||||
|
||||
/* This should remove the first element of the "clients" list. */
|
||||
unblockClientWaitingData(receiver);
|
||||
redisAssert(ln != listFirst(clients));
|
||||
|
||||
if (target == NULL) {
|
||||
/* BRPOP/BLPOP return a multi-bulk with the name
|
||||
* of the popped list */
|
||||
if (dstkey == NULL) {
|
||||
/* BRPOP/BLPOP */
|
||||
addReplyMultiBulkLen(receiver,2);
|
||||
addReplyBulk(receiver,key);
|
||||
addReplyBulk(receiver,ele);
|
||||
return 1;
|
||||
} else {
|
||||
/* BRPOPLPUSH */
|
||||
robj *dobj = lookupKeyWrite(receiver->db,target);
|
||||
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
|
||||
rpoplpushHandlePush(receiver,target,dobj,ele);
|
||||
decrRefCount(target);
|
||||
dstobj = lookupKeyWrite(receiver->db,dstkey);
|
||||
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
|
||||
decrRefCount(dstkey);
|
||||
} else {
|
||||
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
|
||||
decrRefCount(dstkey);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
|
||||
|
@ -191,6 +191,20 @@ start_server {
|
||||
assert_equal {foo} [r lrange blist 0 -1]
|
||||
}
|
||||
|
||||
test "BRPOPLPUSH with multiple blocked clients" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
r del blist target1 target2
|
||||
r set target1 nolist
|
||||
$rd1 brpoplpush blist target1 0
|
||||
$rd2 brpoplpush blist target2 0
|
||||
r lpush blist foo
|
||||
|
||||
assert_error "ERR*wrong kind*" {$rd1 read}
|
||||
assert_equal {foo} [$rd2 read]
|
||||
assert_equal {foo} [r lrange target2 0 -1]
|
||||
}
|
||||
|
||||
test "linked BRPOPLPUSH" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
|
Loading…
Reference in New Issue
Block a user