diff --git a/src/blocked.c b/src/blocked.c index 867f03de6..2b91c1b44 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -430,6 +430,32 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { } } +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * in order to check if we can serve clients blocked by modules using + * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: + * our goal here is to call the is_key_ready() callback to see if the key + * is really able to serve the client, and in that case, unblock it. */ +void serveClientsBlockedOnKeyByModule(readyList *rl) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (!moduleIsKeyReady(receiver, rl->key)) continue; + + moduleUnblockClient(receiver); + } + } +} + /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after * being called by a client. It handles serving clients blocked in @@ -480,6 +506,10 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnSortedSetKey(o,rl); else if (o->type == OBJ_STREAM) serveClientsBlockedOnStreamKey(o,rl); + /* We want to serve clients blocked on module keys + * regardless of the object type: we don't know what the + * module is trying to accomplish right now. */ + serveClientsBlockedOnKeyByModule(rl); } /* Free this item. */ diff --git a/src/module.c b/src/module.c index 113375e75..85bc9e808 100644 --- a/src/module.c +++ b/src/module.c @@ -4060,6 +4060,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF return bc; } +/* This function is called from module.c in order to check if a module + * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) + * can really be unblocked since the key is ready. */ +int moduleIsKeyReady(client *c, robj *key) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = bc->module; + ctx.client = bc->client; + ctx.blocked_privdata = bc->privdata; /* In case the callback uses the + API to get the pointer to the + privdata, even if we provide it + as argument. */ + selectDb(ctx.client, bc->dbid); + int ready = bc->is_key_ready(&ctx, key, bc->privdata); + moduleFreeContext(&ctx); + return ready; +} + /* Block a client in the context of a blocking command, returning an handle * which will be used, later, in order to unblock the client with a call to * RedisModule_UnblockClient(). The arguments specify callback functions @@ -4137,6 +4155,25 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { signalKeyAsReady(ctx->client->db, key); } +/* Implements RM_UnblockClient() and moduleUnblockClient(). */ +int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { + pthread_mutex_lock(&moduleUnblockedClientsMutex); + if (!bc->blocked_on_keys) bc->privdata = privdata; + listAddNodeTail(moduleUnblockedClients,bc); + if (write(server.module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } + pthread_mutex_unlock(&moduleUnblockedClientsMutex); + return REDISMODULE_OK; +} + +/* This API is used by the Redis core to unblock a client that was blocked + * by a module. */ +void moduleUnblockClient(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + moduleUnblockClientByHandle(bc,NULL); +} + /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger * the reply callbacks to be called in order to reply to the client. * The 'privdata' argument will be accessible by the reply callback, so @@ -4147,15 +4184,16 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { * needs to be passed to the client, included but not limited some slow * to compute reply or some reply obtained via networking. * - * Note: this function can be called from threads spawned by the module. */ + * Note: this function can be called from threads spawned by the module. + * + * Note: when we unblock a client that is blocked for keys using + * the API RedisModule_BlockClientOnKeys(), the privdata argument here is + * not used, and the reply callback is called with the privdata pointer that + * was passed when blocking the client. Also note if you unblock clients + * blocked on keys in this way, the reply callback should be ready to handle + * the fact the key may not be ready at all. */ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { - pthread_mutex_lock(&moduleUnblockedClientsMutex); - if (!bc->blocked_on_keys) bc->privdata = privdata; - listAddNodeTail(moduleUnblockedClients,bc); - if (write(server.module_blocked_pipe[1],"A",1) != 1) { - /* Ignore the error, this is best-effort. */ - } - pthread_mutex_unlock(&moduleUnblockedClientsMutex); + moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; } diff --git a/src/server.h b/src/server.h index 97672d727..3a2bb1c7b 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,6 +1602,8 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); +int moduleIsKeyReady(client *c, robj *key); +void moduleUnblockClient(client *c); /* Utils */ long long ustime(void);