Fix BRPOPLPUSH behavior for all use cases.

This commit is contained in:
Michel Martens & Damian Janowski 2010-11-29 23:47:45 -03:00 committed by Michel Martens
parent 8987bf23bf
commit baa14ef913
2 changed files with 53 additions and 10 deletions

View File

@ -748,10 +748,6 @@ void unblockClientWaitingData(redisClient *c) {
decrRefCount(c->bpop.keys[j]); decrRefCount(c->bpop.keys[j]);
} }
if (c->bpop.target != NULL) {
decrRefCount(c->bpop.target);
}
/* Cleanup the client structure */ /* Cleanup the client structure */
zfree(c->bpop.keys); zfree(c->bpop.keys);
c->bpop.keys = NULL; c->bpop.keys = NULL;
@ -789,7 +785,11 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
redisAssert(ln != NULL); redisAssert(ln != NULL);
receiver = ln->value; receiver = ln->value;
if (receiver->bpop.target == NULL) { robj *target = receiver->bpop.target;
unblockClientWaitingData(receiver);
if (target == NULL) {
/* BRPOP/BLPOP return a multi-bulk with the name /* BRPOP/BLPOP return a multi-bulk with the name
* of the popped list */ * of the popped list */
addReplyMultiBulkLen(receiver,2); addReplyMultiBulkLen(receiver,2);
@ -797,23 +797,23 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyBulk(receiver,ele); addReplyBulk(receiver,ele);
} else { } else {
/* BRPOPLPUSH */ /* BRPOPLPUSH */
robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); robj *dobj = lookupKeyWrite(receiver->db,target);
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
addReplyBulk(receiver,ele); addReplyBulk(receiver,ele);
if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { if (!handleClientsWaitingListPush(receiver, target, ele)) {
/* Create the list if the key does not exist */ /* Create the list if the key does not exist */
if (!dobj) { if (!dobj) {
dobj = createZiplistObject(); dobj = createZiplistObject();
dbAdd(receiver->db, receiver->bpop.target, dobj); dbAdd(receiver->db, target, dobj);
} }
listTypePush(dobj, ele, REDIS_HEAD); listTypePush(dobj, ele, REDIS_HEAD);
} }
decrRefCount(target);
} }
unblockClientWaitingData(receiver);
return 1; return 1;
} }

View File

@ -191,6 +191,49 @@ start_server {
assert_equal {foo} [r lrange blist 0 -1] assert_equal {foo} [r lrange blist 0 -1]
} }
test "linked BRPOPLPUSH" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
r del list1 list2 list3
$rd1 brpoplpush list1 list2 0
$rd2 brpoplpush list2 list3 0
r rpush list1 foo
assert_equal {} [r lrange list1 0 -1]
assert_equal {} [r lrange list2 0 -1]
assert_equal {foo} [r lrange list3 0 -1]
}
test "circular BRPOPLPUSH" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
r del list1 list2
$rd1 brpoplpush list1 list2 0
$rd2 brpoplpush list2 list1 0
r rpush list1 foo
assert_equal {foo} [r lrange list1 0 -1]
assert_equal {} [r lrange list2 0 -1]
}
test "self-referential BRPOPLPUSH" {
set rd [redis_deferring_client]
r del blist
$rd brpoplpush blist blist 0
r rpush blist foo
assert_equal {foo} [r lrange blist 0 -1]
}
test "BRPOPLPUSH inside a transaction" { test "BRPOPLPUSH inside a transaction" {
r del xlist target r del xlist target
r lpush xlist foo r lpush xlist foo