From 7c25a43adc67c4a8d08e930aa92f1c5575ec3646 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 10:31:02 -0300 Subject: [PATCH] Handle BRPOPLPUSH inside a transaction. --- src/t_list.c | 62 ++++++++++++++++++++++++---------------- tests/unit/type/list.tcl | 15 +++++++++- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 2dedf4811..d14de708f 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -790,23 +790,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { receiver = ln->value; if (receiver->bstate.target == NULL) { - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + /* BRPOP/BLPOP return a multi-bulk with the name + * of the popped list */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); } else { - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + /* BRPOPLPUSH */ + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - addReplyBulk(receiver,ele); + addReplyBulk(receiver,ele); - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->bstate.target,dobj); - } + if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db, receiver->bstate.target, dobj); + } - listTypePush(dobj,ele,REDIS_HEAD); + listTypePush(dobj, ele, REDIS_HEAD); + } } unblockClientWaitingData(receiver); @@ -880,13 +885,13 @@ int checkTimeout(redisClient *c, robj *object, time_t *timeout) { long long lltimeout; if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); - return REDIS_ERR; + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; } - + if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); - return REDIS_ERR; + addReplyError(c, "timeout is negative"); + return REDIS_ERR; } *timeout = lltimeout; @@ -911,20 +916,27 @@ void brpoplpushCommand(redisClient *c) { robj *key = lookupKeyWrite(c->db, c->argv[1]); - if (key == NULL) { - // block if (c->flags & REDIS_MULTI) { - addReply(c,shared.nullmultibulk); + + /* Blocking against an empty list in a multi state + * returns immediately. */ + addReply(c, shared.nullmultibulk); } else { if (timeout > 0) timeout += time(NULL); + + /* The list is empty and the client blocks. */ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); } - } else if (key->type != REDIS_LIST) { - addReply(c, shared.wrongtypeerr); } else { - // The list exists and has elements. - redisAssert(listTypeLength(key) > 0); - rpoplpushCommand(c); + if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + + /* The list exists and has elements, so + * the regular rpoplpushCommand is executed. */ + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index a2d0edf6d..62ea159dd 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -152,6 +152,19 @@ start_server { assert_equal {foo} [r lrange target 0 -1] } + test "BRPOPLPUSH with a client BLPOPing the target list" { + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + r del blist target + $rd2 blpop target 0 + $rd brpoplpush blist target 0 + after 1000 + r rpush blist foo + assert_equal foo [$rd read] + assert_equal {target foo} [$rd2 read] + assert_equal 0 [r exists target] + } + test "BRPOPLPUSH with wrong source type" { set rd [redis_deferring_client] r del blist target @@ -178,7 +191,7 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } - test {BRPOPLPUSH inside a transaction} { + test "BRPOPLPUSH inside a transaction" { r del xlist target r lpush xlist foo r lpush xlist bar