Handle BRPOPLPUSH inside a transaction.

This commit is contained in:
Damian Janowski & Michel Martens 2010-11-09 10:31:02 -03:00 committed by Michel Martens
parent ba3b474111
commit 7c25a43adc
2 changed files with 51 additions and 26 deletions

View File

@ -790,23 +790,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
receiver = ln->value; receiver = ln->value;
if (receiver->bstate.target == NULL) { if (receiver->bstate.target == NULL) {
addReplyMultiBulkLen(receiver,2); /* BRPOP/BLPOP return a multi-bulk with the name
addReplyBulk(receiver,key); * of the popped list */
addReplyBulk(receiver,ele); addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
} }
else { else {
robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); /* BRPOPLPUSH */
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
addReplyBulk(receiver,ele); addReplyBulk(receiver,ele);
/* Create the list if the key does not exist */ if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) {
if (!dobj) { /* Create the list if the key does not exist */
dobj = createZiplistObject(); if (!dobj) {
dbAdd(receiver->db,receiver->bstate.target,dobj); dobj = createZiplistObject();
} dbAdd(receiver->db, receiver->bstate.target, dobj);
}
listTypePush(dobj,ele,REDIS_HEAD); listTypePush(dobj, ele, REDIS_HEAD);
}
} }
unblockClientWaitingData(receiver); unblockClientWaitingData(receiver);
@ -880,13 +885,13 @@ int checkTimeout(redisClient *c, robj *object, time_t *timeout) {
long long lltimeout; long long lltimeout;
if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) {
addReplyError(c, "timeout is not an integer"); addReplyError(c, "timeout is not an integer");
return REDIS_ERR; return REDIS_ERR;
} }
if (lltimeout < 0) { if (lltimeout < 0) {
addReplyError(c, "timeout is negative"); addReplyError(c, "timeout is negative");
return REDIS_ERR; return REDIS_ERR;
} }
*timeout = lltimeout; *timeout = lltimeout;
@ -911,20 +916,27 @@ void brpoplpushCommand(redisClient *c) {
robj *key = lookupKeyWrite(c->db, c->argv[1]); robj *key = lookupKeyWrite(c->db, c->argv[1]);
if (key == NULL) { if (key == NULL) {
// block
if (c->flags & REDIS_MULTI) { if (c->flags & REDIS_MULTI) {
addReply(c,shared.nullmultibulk);
/* Blocking against an empty list in a multi state
* returns immediately. */
addReply(c, shared.nullmultibulk);
} else { } else {
if (timeout > 0) timeout += time(NULL); if (timeout > 0) timeout += time(NULL);
/* The list is empty and the client blocks. */
blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
} }
} else if (key->type != REDIS_LIST) {
addReply(c, shared.wrongtypeerr);
} else { } else {
// The list exists and has elements. if (key->type != REDIS_LIST) {
redisAssert(listTypeLength(key) > 0); addReply(c, shared.wrongtypeerr);
rpoplpushCommand(c); } else {
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
redisAssert(listTypeLength(key) > 0);
rpoplpushCommand(c);
}
} }
} }

View File

@ -152,6 +152,19 @@ start_server {
assert_equal {foo} [r lrange target 0 -1] assert_equal {foo} [r lrange target 0 -1]
} }
test "BRPOPLPUSH with a client BLPOPing the target list" {
set rd [redis_deferring_client]
set rd2 [redis_deferring_client]
r del blist target
$rd2 blpop target 0
$rd brpoplpush blist target 0
after 1000
r rpush blist foo
assert_equal foo [$rd read]
assert_equal {target foo} [$rd2 read]
assert_equal 0 [r exists target]
}
test "BRPOPLPUSH with wrong source type" { test "BRPOPLPUSH with wrong source type" {
set rd [redis_deferring_client] set rd [redis_deferring_client]
r del blist target r del blist target
@ -178,7 +191,7 @@ start_server {
assert_equal {foo} [r lrange blist 0 -1] assert_equal {foo} [r lrange blist 0 -1]
} }
test {BRPOPLPUSH inside a transaction} { test "BRPOPLPUSH inside a transaction" {
r del xlist target r del xlist target
r lpush xlist foo r lpush xlist foo
r lpush xlist bar r lpush xlist bar