handleClientsBlockedOnKeys() refactoring.

This commit is contained in:
antirez 2019-09-06 12:24:26 +02:00
parent 89ad0ca566
commit a092f20d87

View File

@ -229,6 +229,207 @@ void disconnectAllBlockedClients(void) {
}
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a list key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
continue;
}
robj *dstkey = receiver->bpop.target;
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
LIST_HEAD : LIST_TAIL;
robj *value = listTypePop(o,where);
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == C_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
break;
}
}
}
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
}
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a sorted set key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
unsigned long zcard = zsetLength(o);
while(numclients-- && zcard) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_ZSET) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
continue;
}
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
zcard--;
/* Replicate the command. */
robj *argv[2];
struct redisCommand *cmd = where == ZSET_MIN ?
server.zpopminCommand :
server.zpopmaxCommand;
argv[0] = createStringObject(cmd->name,strlen(cmd->name));
argv[1] = rl->key;
incrRefCount(rl->key);
propagate(cmd,receiver->db->id,
argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
}
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a stream key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
stream *s = o->ptr;
/* We need to provide the new data arrived on the stream
* to all the clients that are waiting for an offset smaller
* than the current top item. */
if (de) {
list *clients = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_STREAM) continue;
streamID *gt = dictFetchValue(receiver->bpop.keys,
rl->key);
/* If we blocked in the context of a consumer
* group, we need to resolve the group and update the
* last ID the client is blocked for: this is needed
* because serving other clients in the same consumer
* group will alter the "last ID" of the consumer
* group, and clients blocked in a consumer group are
* always blocked for the ">" ID: we need to deliver
* only new messages and avoid unblocking the client
* otherwise. */
streamCG *group = NULL;
if (receiver->bpop.xread_group) {
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* If the group was not found, send an error
* to the consumer. */
if (!group) {
addReplyError(receiver,
"-NOGROUP the consumer group this client "
"was blocked on no longer exists");
unblockClient(receiver);
continue;
} else {
*gt = group->last_id;
}
}
if (streamCompareID(&s->last_id, gt) > 0) {
streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */
/* Lookup the consumer for the group, if any. */
streamConsumer *consumer = NULL;
int noack = 0;
if (group) {
consumer = streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr,
1);
noack = receiver->bpop.xread_group_noack;
}
/* Emit the two elements sub-array consisting of
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
* array, since we have just one key. */
if (receiver->resp == 2) {
addReplyArrayLen(receiver,1);
addReplyArrayLen(receiver,2);
} else {
addReplyMapLen(receiver,1);
}
addReplyBulk(receiver,rl->key);
streamPropInfo pi = {
rl->key,
receiver->bpop.xread_group
};
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
* valid, so we must do the setup above before
* this call. */
unblockClient(receiver);
}
}
}
}
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@ -271,202 +472,14 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on list key. */
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == OBJ_LIST) {
dictEntry *de;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
continue;
}
robj *dstkey = receiver->bpop.target;
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
LIST_HEAD : LIST_TAIL;
robj *value = listTypePop(o,where);
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == C_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
break;
}
}
}
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
}
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
/* Serve clients blocked on sorted set key. */
else if (o != NULL && o->type == OBJ_ZSET) {
dictEntry *de;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
unsigned long zcard = zsetLength(o);
while(numclients-- && zcard) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_ZSET) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
continue;
}
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
zcard--;
/* Replicate the command. */
robj *argv[2];
struct redisCommand *cmd = where == ZSET_MIN ?
server.zpopminCommand :
server.zpopmaxCommand;
argv[0] = createStringObject(cmd->name,strlen(cmd->name));
argv[1] = rl->key;
incrRefCount(rl->key);
propagate(cmd,receiver->db->id,
argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
}
}
/* Serve clients blocked on stream key. */
else if (o != NULL && o->type == OBJ_STREAM) {
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
stream *s = o->ptr;
/* We need to provide the new data arrived on the stream
* to all the clients that are waiting for an offset smaller
* than the current top item. */
if (de) {
list *clients = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_STREAM) continue;
streamID *gt = dictFetchValue(receiver->bpop.keys,
rl->key);
/* If we blocked in the context of a consumer
* group, we need to resolve the group and update the
* last ID the client is blocked for: this is needed
* because serving other clients in the same consumer
* group will alter the "last ID" of the consumer
* group, and clients blocked in a consumer group are
* always blocked for the ">" ID: we need to deliver
* only new messages and avoid unblocking the client
* otherwise. */
streamCG *group = NULL;
if (receiver->bpop.xread_group) {
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* If the group was not found, send an error
* to the consumer. */
if (!group) {
addReplyError(receiver,
"-NOGROUP the consumer group this client "
"was blocked on no longer exists");
unblockClient(receiver);
continue;
} else {
*gt = group->last_id;
}
}
if (streamCompareID(&s->last_id, gt) > 0) {
streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */
/* Lookup the consumer for the group, if any. */
streamConsumer *consumer = NULL;
int noack = 0;
if (group) {
consumer = streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr,
1);
noack = receiver->bpop.xread_group_noack;
}
/* Emit the two elements sub-array consisting of
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
* array, since we have just one key. */
if (receiver->resp == 2) {
addReplyArrayLen(receiver,1);
addReplyArrayLen(receiver,2);
} else {
addReplyMapLen(receiver,1);
}
addReplyBulk(receiver,rl->key);
streamPropInfo pi = {
rl->key,
receiver->bpop.xread_group
};
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
* valid, so we must do the setup above before
* this call. */
unblockClient(receiver);
}
}
}
if (o != NULL) {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
else if (o->type == OBJ_ZSET)
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
}
/* Free this item. */