Modules: block on keys: implement the internals.

This commit is contained in:
antirez 2019-10-30 10:57:44 +01:00
parent fb6110ac20
commit 215b72c0ba
3 changed files with 78 additions and 8 deletions

View File

@ -430,6 +430,32 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* in order to check if we can serve clients blocked by modules using
* RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
* our goal here is to call the is_key_ready() callback to see if the key
* is really able to serve the client, and in that case, unblock it. */
void serveClientsBlockedOnKeyByModule(readyList *rl) {
dictEntry *de;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (!moduleIsKeyReady(receiver, rl->key)) continue;
moduleUnblockClient(receiver);
}
}
}
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@ -480,6 +506,10 @@ void handleClientsBlockedOnKeys(void) {
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
}
/* Free this item. */

View File

@ -4060,6 +4060,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
return bc;
}
/* This function is called from module.c in order to check if a module
* blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
* can really be unblocked since the key is ready. */
int moduleIsKeyReady(client *c, robj *key) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_privdata = bc->privdata; /* In case the callback uses the
API to get the pointer to the
privdata, even if we provide it
as argument. */
selectDb(ctx.client, bc->dbid);
int ready = bc->is_key_ready(&ctx, key, bc->privdata);
moduleFreeContext(&ctx);
return ready;
}
/* Block a client in the context of a blocking command, returning an handle
* which will be used, later, in order to unblock the client with a call to
* RedisModule_UnblockClient(). The arguments specify callback functions
@ -4137,6 +4155,25 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
signalKeyAsReady(ctx->client->db, key);
}
/* Implements RM_UnblockClient() and moduleUnblockClient(). */
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
if (!bc->blocked_on_keys) bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK;
}
/* This API is used by the Redis core to unblock a client that was blocked
* by a module. */
void moduleUnblockClient(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
moduleUnblockClientByHandle(bc,NULL);
}
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@ -4147,15 +4184,16 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
* Note: this function can be called from threads spawned by the module. */
* Note: this function can be called from threads spawned by the module.
*
* Note: when we unblock a client that is blocked for keys using
* the API RedisModule_BlockClientOnKeys(), the privdata argument here is
* not used, and the reply callback is called with the privdata pointer that
* was passed when blocking the client. Also note if you unblock clients
* blocked on keys in this way, the reply callback should be ready to handle
* the fact the key may not be ready at all. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
if (!bc->blocked_on_keys) bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
}

View File

@ -1602,6 +1602,8 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
int moduleIsKeyReady(client *c, robj *key);
void moduleUnblockClient(client *c);
/* Utils */
long long ustime(void);