not yet working BLPOP implementation

This commit is contained in:
antirez 2009-12-29 14:59:40 -05:00
parent 436b423e18
commit 4409877e19
4 changed files with 235 additions and 4 deletions

View File

@ -71,6 +71,8 @@ static struct redisCommand cmdTable[] = {
{"lpush",3,REDIS_CMD_BULK},
{"rpop",2,REDIS_CMD_INLINE},
{"lpop",2,REDIS_CMD_INLINE},
{"brpop",3,REDIS_CMD_INLINE},
{"blpop",3,REDIS_CMD_INLINE},
{"llen",2,REDIS_CMD_INLINE},
{"lindex",3,REDIS_CMD_INLINE},
{"lset",4,REDIS_CMD_BULK},

211
redis.c
View File

@ -158,6 +158,7 @@
#define REDIS_MASTER 4 /* This client is a master server */
#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI 16 /* This client is in a MULTI context */
#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
@ -226,8 +227,9 @@ typedef struct redisObject {
} while(0);
typedef struct redisDb {
dict *dict;
dict *expires;
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
int id;
} redisDb;
@ -266,6 +268,10 @@ typedef struct redisClient {
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
robj *blockingkey; /* The key we waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
} redisClient;
struct saveparam {
@ -278,7 +284,7 @@ struct redisServer {
int port;
int fd;
redisDb *db;
dict *sharingpool;
dict *sharingpool; /* Poll used for object sharing */
unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */
list *clients;
@ -437,6 +443,8 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int
static void initClientMultiState(redisClient *c);
static void freeClientMultiState(redisClient *c);
static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
static void unblockClient(redisClient *c);
static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
@ -513,6 +521,8 @@ static void zscoreCommand(redisClient *c);
static void zremrangebyscoreCommand(redisClient *c);
static void multiCommand(redisClient *c);
static void execCommand(redisClient *c);
static void blpopCommand(redisClient *c);
static void brpopCommand(redisClient *c);
/*================================= Globals ================================= */
@ -531,6 +541,8 @@ static struct redisCommand cmdTable[] = {
{"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"rpop",rpopCommand,2,REDIS_CMD_INLINE},
{"lpop",lpopCommand,2,REDIS_CMD_INLINE},
{"brpop",brpopCommand,3,REDIS_CMD_INLINE},
{"blpop",blpopCommand,3,REDIS_CMD_INLINE},
{"llen",llenCommand,2,REDIS_CMD_INLINE},
{"lindex",lindexCommand,3,REDIS_CMD_INLINE},
{"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
@ -760,6 +772,12 @@ static void dictVanillaFree(void *privdata, void *val)
zfree(val);
}
static void dictListDestructor(void *privdata, void *val)
{
DICT_NOTUSED(privdata);
listRelease((list*)val);
}
static int sdsDictKeyCompare(void *privdata, const void *key1,
const void *key2)
{
@ -841,6 +859,17 @@ static dictType hashDictType = {
dictRedisObjectDestructor /* val destructor */
};
/* Keylist hash table type has unencoded redis objects as keys and
* lists as values. It's used for blocking operations (BLPOP) */
static dictType keylistDictType = {
dictObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
/* ========================= Random utility functions ======================= */
/* Redis generally does not try to recover from out of memory conditions
@ -1211,6 +1240,7 @@ static void initServer() {
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&hashDictType,NULL);
server.db[j].expires = dictCreate(&setDictType,NULL);
server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
server.cronloops = 0;
@ -1432,9 +1462,18 @@ static void freeClientArgv(redisClient *c) {
static void freeClient(redisClient *c) {
listNode *ln;
/* Note that if the client we are freeing is blocked into a blocking
* call, we have to set querybuf to NULL *before* to call unblockClient()
* to avoid processInputBuffer() will get called. Also it is important
* to remove the file events after this, because this call adds
* the READABLE event. */
sdsfree(c->querybuf);
c->querybuf = NULL;
if (c->flags & REDIS_BLOCKED)
unblockClient(c);
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
sdsfree(c->querybuf);
listRelease(c->reply);
freeClientArgv(c);
close(c->fd);
@ -1898,6 +1937,13 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
static void processInputBuffer(redisClient *c) {
again:
/* Before to process the input buffer, make sure the client is not
* waitig for a blocking operation such as BLPOP. Note that the first
* iteration the client is never blocked, otherwise the processInputBuffer
* would not be called at all, but after the execution of the first commands
* in the input buffer the client may be blocked, and the "goto again"
* will try to reiterate. The following line will make it return asap. */
if (c->flags & REDIS_BLOCKED) return;
if (c->bulklen == -1) {
/* Read the first line of the query */
char *p = strchr(c->querybuf,'\n');
@ -2034,6 +2080,7 @@ static redisClient *createClient(int fd) {
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
c->blockingkey = NULL;
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
@ -3477,6 +3524,7 @@ static void pushGenericCommand(redisClient *c, int where) {
lobj = lookupKeyWrite(c->db,c->argv[1]);
if (lobj == NULL) {
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) return;
lobj = createListObject();
list = lobj->ptr;
if (where == REDIS_HEAD) {
@ -3492,6 +3540,7 @@ static void pushGenericCommand(redisClient *c, int where) {
addReply(c,shared.wrongtypeerr);
return;
}
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) return;
list = lobj->ptr;
if (where == REDIS_HEAD) {
listAddNodeHead(list,c->argv[2]);
@ -5393,6 +5442,160 @@ static void execCommand(redisClient *c) {
c->flags &= (~REDIS_MULTI);
}
/* =========================== Blocking Operations ========================= */
/* Currently Redis blocking operations support is limited to list POP ops,
* so the current implementation is not fully generic, but it is also not
* completely specific so it will not require a rewrite to support new
* kind of blocking operations in the future.
*
* Still it's important to note that list blocking operations can be already
* used as a notification mechanism in order to implement other blocking
* operations at application level, so there must be a very strong evidence
* of usefulness and generality before new blocking operations are implemented.
*
* This is how the current blocking POP works, we use BLPOP as example:
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
* if there is not to block.
* - If instead BLPOP is called and the key does not exists or the list is
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client
* in a dictionary (server.blockingkeys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
* performed, we serve the first in the list: basically instead to push
* the new element inside the list we return it to the (first / oldest)
* blocking client, unblock the client, and remove it form the list.
*
* The above comment and the source code should be enough in order to understand
* the implementation and modify / fix it later.
*/
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
static void blockForKey(redisClient *c, robj *key, time_t timeout) {
dictEntry *de;
list *l;
c->blockingkey = key;
incrRefCount(key);
c->blockingto = timeout;
de = dictFind(c->db->blockingkeys,key);
if (de == NULL) {
int retval;
l = listCreate();
retval = dictAdd(c->db->blockingkeys,c,l);
assert(retval == DICT_OK);
} else {
l = dictGetEntryVal(de);
}
listAddNodeTail(l,c);
c->flags |= REDIS_BLOCKED;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
}
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
static void unblockClient(redisClient *c) {
dictEntry *de;
list *l;
/* Remove this client from the list of clients waiting for this key. */
assert(c->blockingkey != NULL);
de = dictFind(c->db->blockingkeys,c->blockingkey);
assert(de != NULL);
l = dictGetEntryVal(de);
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
dictDelete(c->db->blockingkeys,c->blockingkey);
/* Finally set the right flags in the client structure */
decrRefCount(c->blockingkey);
c->blockingkey = NULL;
c->flags &= (~REDIS_BLOCKED);
/* Ok now we are ready to get read events from socket, note that we
* can't trap errors here as it's possible that unblockClients() is
* called from freeClient() itself, and the only thing we can do
* if we failed to register the READABLE event is to kill the client.
* Still the following function should never fail in the real world as
* we are sure the file descriptor is sane, and we exit on out of mem. */
aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
/* As a final step we want to process data if there is some command waiting
* in the input buffer. Note that this is safe even if unblockClient()
* gets called from freeClient() because freeClient() will be smart
* enough to call this function *after* c->querybuf was set to NULL. */
if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
}
/* This should be called from any function PUSHing into lists.
* 'c' is the "pushing client", 'key' is the key it is pushing data against,
* 'ele' is the element pushed.
*
* If the function returns 0 there was no client waiting for a list push
* against this key.
*
* If the function returns 1 there was a client waiting for a list push
* against this key, the element was passed to this client thus it's not
* needed to actually add it to the list and the caller should return asap. */
static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
struct dictEntry *de;
redisClient *receiver;
list *l;
listNode *ln;
de = dictFind(c->db->blockingkeys,key);
if (de == NULL) return 0;
l = dictGetEntryVal(de);
ln = listFirst(l);
assert(ln != NULL);
receiver = ln->value;
listDelNode(l,ln);
if (listLength(l) == 0)
dictDelete(c->db->blockingkeys,key);
addReplyBulkLen(receiver,ele);
addReply(receiver,ele);
addReply(receiver,shared.crlf);
unblockClient(receiver);
return 1;
}
/* Blocking RPOP/LPOP */
static void blockingPopGenericCommand(redisClient *c, int where) {
robj *o;
time_t timeout;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL) {
if (o->type != REDIS_LIST) {
popGenericCommand(c,where);
return;
} else {
list *list = o->ptr;
if (listLength(list) != 0) {
/* If the list contains elements fall back to the usual
* non-blocking POP operation */
popGenericCommand(c,where);
return;
}
}
}
/* If the list is empty or the key does not exists we must block */
timeout = strtol(c->argv[2]->ptr,NULL,10);
if (timeout > 0) timeout += time(NULL);
blockForKey(c,c->argv[1],timeout);
}
static void blpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_HEAD);
}
static void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL);
}
/* =============================== Replication ============================= */
static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {

View File

@ -10,6 +10,11 @@ static struct redisFunctionSym symsTable[] = {
{"authCommand",(unsigned long)authCommand},
{"bgrewriteaofCommand",(unsigned long)bgrewriteaofCommand},
{"bgsaveCommand",(unsigned long)bgsaveCommand},
{"blockForKey",(unsigned long)blockForKey},
{"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand},
{"blpopCommand",(unsigned long)blpopCommand},
{"brpopCommand",(unsigned long)brpopCommand},
{"call",(unsigned long)call},
{"closeTimedoutClients",(unsigned long)closeTimedoutClients},
{"compareStringObjects",(unsigned long)compareStringObjects},
{"createClient",(unsigned long)createClient},
@ -30,11 +35,13 @@ static struct redisFunctionSym symsTable[] = {
{"deleteIfVolatile",(unsigned long)deleteIfVolatile},
{"deleteKey",(unsigned long)deleteKey},
{"dictEncObjKeyCompare",(unsigned long)dictEncObjKeyCompare},
{"dictListDestructor",(unsigned long)dictListDestructor},
{"dictObjKeyCompare",(unsigned long)dictObjKeyCompare},
{"dictRedisObjectDestructor",(unsigned long)dictRedisObjectDestructor},
{"dictVanillaFree",(unsigned long)dictVanillaFree},
{"dupClientReplyValue",(unsigned long)dupClientReplyValue},
{"echoCommand",(unsigned long)echoCommand},
{"execCommand",(unsigned long)execCommand},
{"existsCommand",(unsigned long)existsCommand},
{"expireCommand",(unsigned long)expireCommand},
{"expireGenericCommand",(unsigned long)expireGenericCommand},
@ -46,6 +53,7 @@ static struct redisFunctionSym symsTable[] = {
{"flushdbCommand",(unsigned long)flushdbCommand},
{"freeClient",(unsigned long)freeClient},
{"freeClientArgv",(unsigned long)freeClientArgv},
{"freeClientMultiState",(unsigned long)freeClientMultiState},
{"freeFakeClient",(unsigned long)freeFakeClient},
{"freeHashObject",(unsigned long)freeHashObject},
{"freeListObject",(unsigned long)freeListObject},
@ -60,15 +68,18 @@ static struct redisFunctionSym symsTable[] = {
{"getCommand",(unsigned long)getCommand},
{"getDecodedObject",(unsigned long)getDecodedObject},
{"getExpire",(unsigned long)getExpire},
{"getGenericCommand",(unsigned long)getGenericCommand},
{"getMcontextEip",(unsigned long)getMcontextEip},
{"getsetCommand",(unsigned long)getsetCommand},
{"glueReplyBuffersIfNeeded",(unsigned long)glueReplyBuffersIfNeeded},
{"handleClientsWaitingListPush",(unsigned long)handleClientsWaitingListPush},
{"htNeedsResize",(unsigned long)htNeedsResize},
{"incrCommand",(unsigned long)incrCommand},
{"incrDecrCommand",(unsigned long)incrDecrCommand},
{"incrRefCount",(unsigned long)incrRefCount},
{"incrbyCommand",(unsigned long)incrbyCommand},
{"infoCommand",(unsigned long)infoCommand},
{"initClientMultiState",(unsigned long)initClientMultiState},
{"initServer",(unsigned long)initServer},
{"initServerConfig",(unsigned long)initServerConfig},
{"isStringRepresentableAsLong",(unsigned long)isStringRepresentableAsLong},
@ -93,6 +104,7 @@ static struct redisFunctionSym symsTable[] = {
{"msetCommand",(unsigned long)msetCommand},
{"msetGenericCommand",(unsigned long)msetGenericCommand},
{"msetnxCommand",(unsigned long)msetnxCommand},
{"multiCommand",(unsigned long)multiCommand},
{"oom",(unsigned long)oom},
{"pingCommand",(unsigned long)pingCommand},
{"popGenericCommand",(unsigned long)popGenericCommand},
@ -100,6 +112,7 @@ static struct redisFunctionSym symsTable[] = {
{"processInputBuffer",(unsigned long)processInputBuffer},
{"pushGenericCommand",(unsigned long)pushGenericCommand},
{"qsortCompareSetsByCardinality",(unsigned long)qsortCompareSetsByCardinality},
{"queueMultiCommand",(unsigned long)queueMultiCommand},
{"randomkeyCommand",(unsigned long)randomkeyCommand},
{"rdbLoad",(unsigned long)rdbLoad},
{"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
@ -178,6 +191,7 @@ static struct redisFunctionSym symsTable[] = {
{"tryResizeHashTables",(unsigned long)tryResizeHashTables},
{"ttlCommand",(unsigned long)ttlCommand},
{"typeCommand",(unsigned long)typeCommand},
{"unblockClient",(unsigned long)unblockClient},
{"updateSlavesWaitingBgsave",(unsigned long)updateSlavesWaitingBgsave},
{"yesnotoi",(unsigned long)yesnotoi},
{"zaddCommand",(unsigned long)zaddCommand},

View File

@ -1562,6 +1562,18 @@ proc main {server port} {
set _ 1
} {1}
test {MUTLI / EXEC basics} {
$r del mylist
$r rpush mylist a
$r rpush mylist b
$r rpush mylist c
$r multi
set v1 [$r lrange mylist 0 -1]
set v2 [$r ping]
set v3 [$r exec]
list $v1 $v2 $v3
} {QUEUED QUEUED {{a b c} PONG}}
# Leave the user with a clean DB before to exit
test {FLUSHDB} {
set aux {}