From 3649568ff22dbb6ea3e26c722d115ba5635547ae Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 30 Oct 2019 10:11:58 +0100 Subject: [PATCH 01/12] Modules: block on keys functions layout and mechanism. --- src/module.c | 153 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 120 insertions(+), 33 deletions(-) diff --git a/src/module.c b/src/module.c index 971bf5c08..bdafc1590 100644 --- a/src/module.c +++ b/src/module.c @@ -245,6 +245,15 @@ typedef struct RedisModuleBlockedClient { client *reply_client; /* Fake client used to accumulate replies in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ + int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ + int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, + void *privdata); /* When blocking on keys, even if the + key is signaled as ready, maybe it + was modified afterward before the + client unblocks. So we always + need a callback that tells us if + the key is ready in order to serve + the next blocked client. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -3989,6 +3998,68 @@ void unblockClientFromModule(client *c) { resetClient(c); } +/* Block a client in the context of a module: this function implements both + * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the + * keys are passed or not. + * + * When not blocking for keys, the keys, numkeys, is_key_ready callback + * and privdata parameters are not needed. The privdata in that case must + * be NULL, since later is RM_UnblockClient() that will provide some private + * data that the reply callback will receive. + * + * Instead when blocking for keys, normally RM_UnblockClient() will not be + * called (because the client will unblock when the key is modified), so + * 'privdata' should be provided in that case, so that once the client is + * unlocked and the reply callback is called, it will receive its associated + * private data. + * + * Even when blocking on keys, RM_UnblockClient() can be called however, but + * in that case the privdata argument is disregarded, because we pass the + * reply callback the privdata that is set here while blocking. + */ +RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) { + client *c = ctx->client; + int islua = c->flags & CLIENT_LUA; + int ismulti = c->flags & CLIENT_MULTI; + + c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + ctx->module->blocked_clients++; + + /* We need to handle the invalid operation of calling modules blocking + * commands from Lua or MULTI. We actually create an already aborted + * (client set to NULL) blocked client handle, and actually reply with + * an error. */ + mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0; + bc->client = (islua || ismulti) ? NULL : c; + bc->module = ctx->module; + bc->reply_callback = reply_callback; + bc->timeout_callback = timeout_callback; + bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ + bc->free_privdata = free_privdata; + bc->privdata = privdata; + bc->reply_client = createClient(NULL); + bc->reply_client->flags |= CLIENT_MODULE; + bc->dbid = c->db->id; + bc->is_key_ready = is_key_ready; + bc->blocked_on_keys = keys != NULL; + c->bpop.timeout = timeout; + + if (islua || ismulti) { + c->bpop.module_blocked_handle = NULL; + addReplyError(c, islua ? + "Blocking module command called from Lua script" : + "Blocking module command called from transaction"); + } else { + if (keys) { + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL); + } else { + blockClient(c,BLOCKED_MODULE); + } + } + return bc; +} + /* Block a client in the context of a blocking command, returning an handle * which will be used, later, in order to unblock the client with a call to * RedisModule_UnblockClient(). The arguments specify callback functions @@ -4006,39 +4077,55 @@ void unblockClientFromModule(client *c) { * by RedisModule_UnblockClient() call. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { - client *c = ctx->client; - int islua = c->flags & CLIENT_LUA; - int ismulti = c->flags & CLIENT_MULTI; + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL); +} - c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; - ctx->module->blocked_clients++; - - /* We need to handle the invalid operation of calling modules blocking - * commands from Lua or MULTI. We actually create an already aborted - * (client set to NULL) blocked client handle, and actually reply with - * an error. */ - bc->client = (islua || ismulti) ? NULL : c; - bc->module = ctx->module; - bc->reply_callback = reply_callback; - bc->timeout_callback = timeout_callback; - bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ - bc->free_privdata = free_privdata; - bc->privdata = NULL; - bc->reply_client = createClient(NULL); - bc->reply_client->flags |= CLIENT_MODULE; - bc->dbid = c->db->id; - c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0; - - if (islua || ismulti) { - c->bpop.module_blocked_handle = NULL; - addReplyError(c, islua ? - "Blocking module command called from Lua script" : - "Blocking module command called from transaction"); - } else { - blockClient(c,BLOCKED_MODULE); - } - return bc; +/* This call is similar to RedisModule_BlockClient(), however in this case we + * don't just block the client, but also ask Redis to unblock it automatically + * once certain keys become "ready", that is, contain more data. + * + * Basically this is similar to what a typical Redis command usually does, + * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP, + * and later when the key receives new data (a list push for instance), the + * client is unblocked and served. + * + * However in the case of this module API, when the client is unblocked? + * + * 1. If you block ok a key of a type that has blocking operations associated, + * like a list, a sorted set, a stream, and so forth, the client may be + * unblocked once the relevant key is targeted by an operation that normally + * unblocks the native blocking operations for that type. So if we block + * on a list key, an RPUSH command may unblock our client and so forth. + * 2. If you are implementing your native data type, or if you want to add new + * unblocking conditions in addition to "1", you can call the modules API + * RedisModule_SignalKeyAsReady(). + * + * Anyway we can't be sure if the client should be unblocked just because the + * key is signaled as ready: for instance a successive operation may change the + * key, or a client in queue before this one can be served, modifying the key + * as well and making it empty again. So when blocking for keys, we need to + * register a callback called is_key_ready. This callback gets called with + * a context, selected with the right database, and the key name: if it + * returns 1, then we proceed calling the reply callback, otherwise if the + * is_key_ready callback returns 0 the client is not unblocked, since the + * key is yet not ready. + * + * Thanks to this system we can setup complex blocking scenarios, like + * unblocking a client only if a list contains at least 5 items or other + * more fancy logics. + * + * Note that another difference with RedisModule_BlockClient(), is that here + * we pass the private data directly when blocking the client: such private + * data will be later provided both to the is_key_ready callback, and to the + * reply callback. Normally in RedisModule_BlockClient() the private data + * to reply to the client is passed when calling RedisModule_UnblockClient() + * but here the unblocking is performed by Redis itself, so we need to have + * some private data before hand. The private data is used to store any + * information about the specific unblocking operation that you are + * implementing. Such information will be freed using the free_privdata + * callback provided by the user. */ +RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata); } /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger @@ -4054,7 +4141,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * Note: this function can be called from threads spawned by the module. */ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { pthread_mutex_lock(&moduleUnblockedClientsMutex); - bc->privdata = privdata; + if (!bc->blocked_on_keys) bc->privdata = privdata; listAddNodeTail(moduleUnblockedClients,bc); if (write(server.module_blocked_pipe[1],"A",1) != 1) { /* Ignore the error, this is best-effort. */ From fb6110ac2062a548aa79c9bb5f11e3fa10e71ba2 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 30 Oct 2019 10:20:28 +0100 Subject: [PATCH 02/12] Modules: block on keys: export APIs. --- src/module.c | 11 +++++++++++ src/redismodule.h | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/src/module.c b/src/module.c index bdafc1590..113375e75 100644 --- a/src/module.c +++ b/src/module.c @@ -4128,6 +4128,15 @@ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleC return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata); } +/* This function is used in order to potentially unblock a client blocked + * on keys with RedisModule_BlockClientOnKeys(). When this function is called, + * all the clients blocked for this key will get their is_key_ready callback, + * and if the callback returns non-zero the client will be unblocked and the + * reply callback will be called in order to reply to the client. */ +void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { + signalKeyAsReady(ctx->client->db, key); +} + /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger * the reply callbacks to be called in order to reply to the client. * The 'privdata' argument will be accessible by the reply callback, so @@ -6634,4 +6643,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); REGISTER_API(SubscribeToServerEvent); + REGISTER_API(BlockClientOnKeys); + REGISTER_API(SignalKeyAsReady); } diff --git a/src/redismodule.h b/src/redismodule.h index 4b63a227c..5b4c31b19 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -492,6 +492,8 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); +RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata); +void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -688,6 +690,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(SubscribeToServerEvent); + REDISMODULE_GET_API(BlockClientOnKeys); + REDISMODULE_GET_API(SignalKeyAsReady); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); From 215b72c0ba1f42d15dcfe6fa60abb414275296ba Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 30 Oct 2019 10:57:44 +0100 Subject: [PATCH 03/12] Modules: block on keys: implement the internals. --- src/blocked.c | 30 ++++++++++++++++++++++++++++ src/module.c | 54 +++++++++++++++++++++++++++++++++++++++++++-------- src/server.h | 2 ++ 3 files changed, 78 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 867f03de6..2b91c1b44 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -430,6 +430,32 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { } } +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * in order to check if we can serve clients blocked by modules using + * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: + * our goal here is to call the is_key_ready() callback to see if the key + * is really able to serve the client, and in that case, unblock it. */ +void serveClientsBlockedOnKeyByModule(readyList *rl) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (!moduleIsKeyReady(receiver, rl->key)) continue; + + moduleUnblockClient(receiver); + } + } +} + /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after * being called by a client. It handles serving clients blocked in @@ -480,6 +506,10 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnSortedSetKey(o,rl); else if (o->type == OBJ_STREAM) serveClientsBlockedOnStreamKey(o,rl); + /* We want to serve clients blocked on module keys + * regardless of the object type: we don't know what the + * module is trying to accomplish right now. */ + serveClientsBlockedOnKeyByModule(rl); } /* Free this item. */ diff --git a/src/module.c b/src/module.c index 113375e75..85bc9e808 100644 --- a/src/module.c +++ b/src/module.c @@ -4060,6 +4060,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF return bc; } +/* This function is called from module.c in order to check if a module + * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) + * can really be unblocked since the key is ready. */ +int moduleIsKeyReady(client *c, robj *key) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = bc->module; + ctx.client = bc->client; + ctx.blocked_privdata = bc->privdata; /* In case the callback uses the + API to get the pointer to the + privdata, even if we provide it + as argument. */ + selectDb(ctx.client, bc->dbid); + int ready = bc->is_key_ready(&ctx, key, bc->privdata); + moduleFreeContext(&ctx); + return ready; +} + /* Block a client in the context of a blocking command, returning an handle * which will be used, later, in order to unblock the client with a call to * RedisModule_UnblockClient(). The arguments specify callback functions @@ -4137,6 +4155,25 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { signalKeyAsReady(ctx->client->db, key); } +/* Implements RM_UnblockClient() and moduleUnblockClient(). */ +int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { + pthread_mutex_lock(&moduleUnblockedClientsMutex); + if (!bc->blocked_on_keys) bc->privdata = privdata; + listAddNodeTail(moduleUnblockedClients,bc); + if (write(server.module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } + pthread_mutex_unlock(&moduleUnblockedClientsMutex); + return REDISMODULE_OK; +} + +/* This API is used by the Redis core to unblock a client that was blocked + * by a module. */ +void moduleUnblockClient(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + moduleUnblockClientByHandle(bc,NULL); +} + /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger * the reply callbacks to be called in order to reply to the client. * The 'privdata' argument will be accessible by the reply callback, so @@ -4147,15 +4184,16 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { * needs to be passed to the client, included but not limited some slow * to compute reply or some reply obtained via networking. * - * Note: this function can be called from threads spawned by the module. */ + * Note: this function can be called from threads spawned by the module. + * + * Note: when we unblock a client that is blocked for keys using + * the API RedisModule_BlockClientOnKeys(), the privdata argument here is + * not used, and the reply callback is called with the privdata pointer that + * was passed when blocking the client. Also note if you unblock clients + * blocked on keys in this way, the reply callback should be ready to handle + * the fact the key may not be ready at all. */ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { - pthread_mutex_lock(&moduleUnblockedClientsMutex); - if (!bc->blocked_on_keys) bc->privdata = privdata; - listAddNodeTail(moduleUnblockedClients,bc); - if (write(server.module_blocked_pipe[1],"A",1) != 1) { - /* Ignore the error, this is best-effort. */ - } - pthread_mutex_unlock(&moduleUnblockedClientsMutex); + moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; } diff --git a/src/server.h b/src/server.h index 97672d727..3a2bb1c7b 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,6 +1602,8 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); +int moduleIsKeyReady(client *c, robj *key); +void moduleUnblockClient(client *c); /* Utils */ long long ustime(void); From 228bc89ecbeeb61ee2a37ba7bd5f565cb5a78f63 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 10:30:54 +0100 Subject: [PATCH 04/12] Modules: block on keys: example on hellotype.c. --- src/modules/hellotype.c | 68 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c index ba634c4a1..084408798 100644 --- a/src/modules/hellotype.c +++ b/src/modules/hellotype.c @@ -129,6 +129,7 @@ int HelloTypeInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, /* Insert the new element. */ HelloTypeInsert(hto,value); + RedisModule_SignalKeyAsReady(ctx,argv[1]); RedisModule_ReplyWithLongLong(ctx,hto->len); RedisModule_ReplicateVerbatim(ctx); @@ -190,6 +191,69 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return REDISMODULE_OK; } +/* ====================== Example of a blocking command ==================== */ + +/* Is_key_ready callback for blocking command HELLOTYPE.BRANGE */ +int HelloBlock_IsKeyReady(RedisModuleCtx *ctx, RedisModuleString *keyname, void *privdata) { + REDISMODULE_NOT_USED(privdata); + + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_MODULE || + RedisModule_ModuleTypeGetType(key) != HelloType) + { + return 0; + } else { + return 1; + } +} + +/* Reply callback for blocking command HELLOTYPE.BRANGE */ +int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return HelloTypeRange_RedisCommand(ctx,argv,argc-1); +} + +/* Timeout callback for blocking command HELLOTYPE.BRANGE */ +int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); +} + +/* Private data freeing callback for HELLOTYPE.BRANGE command. */ +void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { + REDISMODULE_NOT_USED(ctx); + RedisModule_Free(privdata); +} + +/* HELLOTYPE.BRANGE key first count timeout -- This is a blocking verison of + * the RANGE operation, in order to show how to use the API + * RedisModule_BlockClientOnKeys(). */ +int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) return RedisModule_WrongArity(ctx); + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_ModuleTypeGetType(key) != HelloType) + { + return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); + } + + long long timeout; + if (RedisModule_StringToLongLong(argv[4],&timeout) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, + "ERR invalid timeout parameter"); + } + + void *privdata = RedisModule_Alloc(100); + RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,HelloBlock_IsKeyReady,privdata); + return REDISMODULE_OK; +} /* ========================== "hellotype" type methods ======================= */ @@ -282,5 +346,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hellotype.brange", + HelloTypeBRange_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } From 4534960b293d41fd5193c1d8a51a19453c2aabf6 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 10:32:59 +0100 Subject: [PATCH 05/12] Modules: remove spurious call from moduleHandleBlockedClients(). Now we handle propagation when we free the context. --- src/module.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/module.c b/src/module.c index 85bc9e808..a11d3a306 100644 --- a/src/module.c +++ b/src/module.c @@ -4262,7 +4262,6 @@ void moduleHandleBlockedClients(void) { ctx.client = bc->client; ctx.blocked_client = bc; bc->reply_callback(&ctx,(void**)c->argv,c->argc); - moduleHandlePropagationAfterCommandCallback(&ctx); moduleFreeContext(&ctx); } From 91f4bdc9f9d80ba7431f093cb58b6c49f4021d0b Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 11:35:05 +0100 Subject: [PATCH 06/12] Modules: block on keys: use a better interface. Using the is_key_ready() callback plus the reply callback later, creates different issues AFAIK: 1. More complex API. 2. We need to call the reply callback() ASAP if the is_key_ready() interface returned success, however the internals do not work in that way, so when the reply callback is called the setup could be different. To fix that, there is to break the current design that handles the unblocked clients asyncrhonously, and run the list ASAP. --- src/blocked.c | 3 +- src/module.c | 74 +++++++++++++++++++++++++---------------- src/modules/hellotype.c | 26 +++++++-------- src/redismodule.h | 4 ++- src/server.h | 3 +- 5 files changed, 65 insertions(+), 45 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 2b91c1b44..fb58f850b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -174,6 +174,7 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else if (c->btype == BLOCKED_MODULE) { + if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); } else { serverPanic("Unknown btype in unblockClient()."); @@ -449,7 +450,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; - if (!moduleIsKeyReady(receiver, rl->key)) continue; + if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; moduleUnblockClient(receiver); } diff --git a/src/module.c b/src/module.c index a11d3a306..8837ae017 100644 --- a/src/module.c +++ b/src/module.c @@ -140,6 +140,9 @@ struct RedisModuleCtx { void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */ void *blocked_privdata; /* Privdata set when unblocking a client. */ + RedisModuleString *blocked_ready_key; /* Key ready when the reply callback + gets called for clients blocked + on keys. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ int *keys_pos; @@ -153,7 +156,7 @@ struct RedisModuleCtx { }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) @@ -246,14 +249,6 @@ typedef struct RedisModuleBlockedClient { in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ - int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, - void *privdata); /* When blocking on keys, even if the - key is signaled as ready, maybe it - was modified afterward before the - client unblocks. So we always - need a callback that tells us if - the key is ready in order to serve - the next blocked client. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -4002,10 +3997,10 @@ void unblockClientFromModule(client *c) { * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the * keys are passed or not. * - * When not blocking for keys, the keys, numkeys, is_key_ready callback - * and privdata parameters are not needed. The privdata in that case must - * be NULL, since later is RM_UnblockClient() that will provide some private - * data that the reply callback will receive. + * When not blocking for keys, the keys, numkeys, and privdata parameters are + * not needed. The privdata in that case must be NULL, since later is + * RM_UnblockClient() that will provide some private data that the reply + * callback will receive. * * Instead when blocking for keys, normally RM_UnblockClient() will not be * called (because the client will unblock when the key is modified), so @@ -4017,7 +4012,7 @@ void unblockClientFromModule(client *c) { * in that case the privdata argument is disregarded, because we pass the * reply callback the privdata that is set here while blocking. */ -RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) { +RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { client *c = ctx->client; int islua = c->flags & CLIENT_LUA; int ismulti = c->flags & CLIENT_MULTI; @@ -4041,7 +4036,6 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->reply_client = createClient(NULL); bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; - bc->is_key_ready = is_key_ready; bc->blocked_on_keys = keys != NULL; c->bpop.timeout = timeout; @@ -4062,20 +4056,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF /* This function is called from module.c in order to check if a module * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) - * can really be unblocked since the key is ready. */ -int moduleIsKeyReady(client *c, robj *key) { + * can really be unblocked, since the module was able to serve the client. + * If the callback returns REDISMODULE_OK, then the client can be unblocked, + * otherwise the client remains blocked and we'll retry again when one of + * the keys it blocked for becomes "ready" again. */ +int moduleTryServeClientBlockedOnKey(client *c, robj *key) { + int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + ctx.blocked_ready_key = key; + ctx.blocked_privdata = bc->privdata; ctx.module = bc->module; ctx.client = bc->client; - ctx.blocked_privdata = bc->privdata; /* In case the callback uses the - API to get the pointer to the - privdata, even if we provide it - as argument. */ - selectDb(ctx.client, bc->dbid); - int ready = bc->is_key_ready(&ctx, key, bc->privdata); + ctx.blocked_client = bc; + if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK) + served = 1; moduleFreeContext(&ctx); - return ready; + return served; } /* Block a client in the context of a blocking command, returning an handle @@ -4095,7 +4093,7 @@ int moduleIsKeyReady(client *c, robj *key) { * by RedisModule_UnblockClient() call. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { - return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL); + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); } /* This call is similar to RedisModule_BlockClient(), however in this case we @@ -4142,8 +4140,8 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * information about the specific unblocking operation that you are * implementing. Such information will be freed using the free_privdata * callback provided by the user. */ -RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) { - return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata); +RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); } /* This function is used in order to potentially unblock a client blocked @@ -4174,6 +4172,13 @@ void moduleUnblockClient(client *c) { moduleUnblockClientByHandle(bc,NULL); } +/* Return true if the client 'c' was blocked by a module using + * RM_BlockClientOnKeys(). */ +int moduleClientIsBlockedOnKeys(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + return bc->blocked_on_keys; +} + /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger * the reply callbacks to be called in order to reply to the client. * The 'privdata' argument will be accessible by the reply callback, so @@ -4253,11 +4258,15 @@ void moduleHandleBlockedClients(void) { * touch the shared list. */ /* Call the reply callback if the client is valid and we have - * any callback. */ - if (c && bc->reply_callback) { + * any callback. However the callback is not called if the client + * was blocked on keys (RM_BlockClientOnKeys()), because we already + * called such callback in moduleTryServeClientBlockedOnKey() when + * the key was signaled as ready. */ + if (c && !bc->blocked_on_keys && bc->reply_callback) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_privdata = bc->privdata; + ctx.blocked_ready_key = NULL; ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; @@ -4349,6 +4358,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { return ctx->blocked_privdata; } +/* Get the key that is ready when the reply callback is called in the context + * of a client blocked by RedisModule_BlockClientOnKeys(). */ +RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) { + return ctx->blocked_ready_key; +} + /* Get the blocked client associated with a given context. * This is useful in the reply and timeout callbacks of blocked clients, * before sometimes the module has the blocked client handle references @@ -6682,4 +6697,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SubscribeToServerEvent); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); + REGISTER_API(GetBlockedClientReadyKey); } diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c index 084408798..dafbadbe5 100644 --- a/src/modules/hellotype.c +++ b/src/modules/hellotype.c @@ -193,26 +193,26 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int /* ====================== Example of a blocking command ==================== */ -/* Is_key_ready callback for blocking command HELLOTYPE.BRANGE */ -int HelloBlock_IsKeyReady(RedisModuleCtx *ctx, RedisModuleString *keyname, void *privdata) { - REDISMODULE_NOT_USED(privdata); +/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get + * called when the key we blocked for is ready: we need to check if we + * can really serve the client, and reply OK or ERR accordingly. */ +int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); - RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ); int type = RedisModule_KeyType(key); if (type != REDISMODULE_KEYTYPE_MODULE || RedisModule_ModuleTypeGetType(key) != HelloType) { - return 0; - } else { - return 1; + RedisModule_CloseKey(key); + return REDISMODULE_ERR; } -} -/* Reply callback for blocking command HELLOTYPE.BRANGE */ -int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - REDISMODULE_NOT_USED(argv); - REDISMODULE_NOT_USED(argc); + /* In case the key is able to serve our blocked client, let's directly + * use our original command implementation to make this example simpler. */ + RedisModule_CloseKey(key); return HelloTypeRange_RedisCommand(ctx,argv,argc-1); } @@ -251,7 +251,7 @@ int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, } void *privdata = RedisModule_Alloc(100); - RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,HelloBlock_IsKeyReady,privdata); + RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata); return REDISMODULE_OK; } diff --git a/src/redismodule.h b/src/redismodule.h index 5b4c31b19..1b284770b 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -492,8 +492,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); -RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata); +RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -692,6 +693,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); + REDISMODULE_GET_API(GetBlockedClientReadyKey); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); diff --git a/src/server.h b/src/server.h index 3a2bb1c7b..f724f7d64 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,8 +1602,9 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); -int moduleIsKeyReady(client *c, robj *key); +int moduleTryServeClientBlockedOnKey(client *c, robj *key); void moduleUnblockClient(client *c); +int moduleClientIsBlockedOnKeys(client *c); /* Utils */ long long ustime(void); From 629081f839498734d3426d71653619d4f9e93dc9 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 11:43:45 +0100 Subject: [PATCH 07/12] Modules: block on keys: fix the top comments. --- src/module.c | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/module.c b/src/module.c index 8837ae017..353b6f426 100644 --- a/src/module.c +++ b/src/module.c @@ -4119,36 +4119,41 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * Anyway we can't be sure if the client should be unblocked just because the * key is signaled as ready: for instance a successive operation may change the * key, or a client in queue before this one can be served, modifying the key - * as well and making it empty again. So when blocking for keys, we need to - * register a callback called is_key_ready. This callback gets called with - * a context, selected with the right database, and the key name: if it - * returns 1, then we proceed calling the reply callback, otherwise if the - * is_key_ready callback returns 0 the client is not unblocked, since the - * key is yet not ready. + * as well and making it empty again. So when a client is blocked with + * RedisModule_BlockClientOnKeys() the reply callback is not called after + * RM_UnblockCLient() is called, but every time a key is signaled as ready: + * if the reply callback can serve the client, it returns REDISMODULE_OK + * and the client is unblocked, otherwise it will return REDISMODULE_ERR + * and we'll try again later. + * + * The reply callback can access the key that was signaled as ready by + * calling the API RedisModule_GetBlockedClientReadyKey(), that returns + * just the string name of the key as a RedisModuleString object. * * Thanks to this system we can setup complex blocking scenarios, like * unblocking a client only if a list contains at least 5 items or other * more fancy logics. * * Note that another difference with RedisModule_BlockClient(), is that here - * we pass the private data directly when blocking the client: such private - * data will be later provided both to the is_key_ready callback, and to the - * reply callback. Normally in RedisModule_BlockClient() the private data - * to reply to the client is passed when calling RedisModule_UnblockClient() - * but here the unblocking is performed by Redis itself, so we need to have - * some private data before hand. The private data is used to store any - * information about the specific unblocking operation that you are - * implementing. Such information will be freed using the free_privdata - * callback provided by the user. */ + * we pass the private data directly when blocking the client: it will + * be accessible later in the reply callback. Normally when blocking with + * RedisModule_BlockClient() the private data to reply to the client is + * passed when calling RedisModule_UnblockClient() but here the unblocking + * is performed by Redis itself, so we need to have some private data before + * hand. The private data is used to store any information about the specific + * unblocking operation that you are implementing. Such information will be + * freed using the free_privdata callback provided by the user. + * + * However the reply callback will be able to access the argument vector of + * the command, so the private data is often not needed. */ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); } /* This function is used in order to potentially unblock a client blocked * on keys with RedisModule_BlockClientOnKeys(). When this function is called, - * all the clients blocked for this key will get their is_key_ready callback, - * and if the callback returns non-zero the client will be unblocked and the - * reply callback will be called in order to reply to the client. */ + * all the clients blocked for this key will get their reply callback called, + * and if the callback returns REDISMODULE_OK the client will be unblocked. */ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { signalKeyAsReady(ctx->client->db, key); } From 66f55bc5c15d72542983f37c6c1b48b0c1618917 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 12:23:55 +0100 Subject: [PATCH 08/12] Modules: block on keys: fix bugs in processing order. --- src/blocked.c | 16 ++++++++++++++++ src/module.c | 7 +++++++ 2 files changed, 23 insertions(+) diff --git a/src/blocked.c b/src/blocked.c index fb58f850b..3110c00fc 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -450,6 +450,22 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; + /* Put at the tail, so that at the next call + * we'll not run into it again: clients here may not be + * ready to be served, so they'll remain in the list + * sometimes. We want also be able to skip clients that are + * not blocked for the MODULE type safely. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + + if (receiver->btype != BLOCKED_MODULE) continue; + + /* Note that if *this* client cannot be served by this key, + * it does not mean that another client that is next into the + * list cannot be served as well: they may be blocked by + * different modules with different triggers to consider if a key + * is ready or not. This means we can't exit the loop but need + * to continue after the first failure. */ if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; moduleUnblockClient(receiver); diff --git a/src/module.c b/src/module.c index 353b6f426..248a55e62 100644 --- a/src/module.c +++ b/src/module.c @@ -249,6 +249,7 @@ typedef struct RedisModuleBlockedClient { in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ + int unblocked; /* Already on the moduleUnblocked list. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -4037,6 +4038,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; bc->blocked_on_keys = keys != NULL; + bc->unblocked = 0; c->bpop.timeout = timeout; if (islua || ismulti) { @@ -4063,6 +4065,10 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + /* Protect against re-processing: don't serve clients that are already + * in the unblocking list for any reason (including RM_UnblockClient() + * explicit call). */ + if (bc->unblocked) return REDISMODULE_ERR; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_ready_key = key; @@ -4162,6 +4168,7 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { pthread_mutex_lock(&moduleUnblockedClientsMutex); if (!bc->blocked_on_keys) bc->privdata = privdata; + bc->unblocked = 1; listAddNodeTail(moduleUnblockedClients,bc); if (write(server.module_blocked_pipe[1],"A",1) != 1) { /* Ignore the error, this is best-effort. */ From 37bf3e18cbc4db5538940ac4a6006627122e1c17 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 12:31:22 +0100 Subject: [PATCH 09/12] Modules: block ok keys: improve example. --- src/modules/hellotype.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c index dafbadbe5..4f2d1d730 100644 --- a/src/modules/hellotype.c +++ b/src/modules/hellotype.c @@ -244,12 +244,20 @@ int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); } + /* Parse the timeout before even trying to serve the client synchronously, + * so that we always fail ASAP on syntax errors. */ long long timeout; if (RedisModule_StringToLongLong(argv[4],&timeout) != REDISMODULE_OK) { return RedisModule_ReplyWithError(ctx, "ERR invalid timeout parameter"); } + /* Can we serve the reply synchronously? */ + if (type != REDISMODULE_KEYTYPE_EMPTY) { + return HelloTypeRange_RedisCommand(ctx,argv,argc-1); + } + + /* Otherwise let's block on the key. */ void *privdata = RedisModule_Alloc(100); RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata); return REDISMODULE_OK; From 825adcf3f5775271583ebe4df0ba92fd8f9c821b Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 17:38:58 +0100 Subject: [PATCH 10/12] Modules: block on keys: finish implementing RM_UnblockClient(). --- src/module.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/module.c b/src/module.c index 248a55e62..a4384163a 100644 --- a/src/module.c +++ b/src/module.c @@ -4201,15 +4201,24 @@ int moduleClientIsBlockedOnKeys(client *c) { * needs to be passed to the client, included but not limited some slow * to compute reply or some reply obtained via networking. * - * Note: this function can be called from threads spawned by the module. + * Note 1: this function can be called from threads spawned by the module. * - * Note: when we unblock a client that is blocked for keys using + * Note 2: when we unblock a client that is blocked for keys using * the API RedisModule_BlockClientOnKeys(), the privdata argument here is * not used, and the reply callback is called with the privdata pointer that - * was passed when blocking the client. Also note if you unblock clients - * blocked on keys in this way, the reply callback should be ready to handle - * the fact the key may not be ready at all. */ + * was passed when blocking the client. + * + * Unblocking a client that was blocked for keys using this API will still + * require the client to get some reply, so the function will use the + * "timeout" handler in order to do so. */ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { + if (bc->blocked_on_keys) { + /* In theory the user should always pass the timeout handler as an + * argument, but better to be safe than sorry. */ + if (bc->timeout_callback == NULL) return REDISMODULE_ERR; + if (bc->unblocked) return REDISMODULE_OK; + if (bc->client) moduleBlockedClientTimedOut(bc->client); + } moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; } From dd5feec5e81288950784740ad23d9dc13b7df9a2 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 17:45:07 +0100 Subject: [PATCH 11/12] Modules: block on keys: fix stale comment. --- src/blocked.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 3110c00fc..14c2ff830 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -434,8 +434,9 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { /* Helper function for handleClientsBlockedOnKeys(). This function is called * in order to check if we can serve clients blocked by modules using * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: - * our goal here is to call the is_key_ready() callback to see if the key - * is really able to serve the client, and in that case, unblock it. */ + * our goal here is to call the RedisModuleBlockedClient reply() callback to + * see if the key is really able to serve the client, and in that case, + * unblock it. */ void serveClientsBlockedOnKeyByModule(readyList *rl) { dictEntry *de; From fdaea2a7a7eed1499f46bb98552f8d8bb8dc7e9d Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 18:07:33 +0100 Subject: [PATCH 12/12] Modules: fix thread safe context creation crash. See #6525, this likely creates a NULL deference if the client was terminated by Redis between the creation of the blocked client and the creation of the thread safe context. --- src/module.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index f298ec760..f9f654b42 100644 --- a/src/module.c +++ b/src/module.c @@ -4463,7 +4463,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { ctx->client = createClient(NULL); if (bc) { selectDb(ctx->client,bc->dbid); - ctx->client->id = bc->client->id; + if (bc->client) ctx->client->id = bc->client->id; } return ctx; }