mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 18:54:58 +00:00
Avoid redundant calls to signalKeyAsReady (#7625)
signalKeyAsReady has some overhead (namely dictFind) so we should only call it when there are clients blocked on the relevant type (BLOCKED_*)
This commit is contained in:
parent
efe92ee546
commit
229327ad8b
@ -634,6 +634,16 @@ void unblockClientWaitingData(client *c) {
|
||||
}
|
||||
}
|
||||
|
||||
static int getBlockedTypeByType(int type) {
|
||||
switch (type) {
|
||||
case OBJ_LIST: return BLOCKED_LIST;
|
||||
case OBJ_ZSET: return BLOCKED_ZSET;
|
||||
case OBJ_MODULE: return BLOCKED_MODULE;
|
||||
case OBJ_STREAM: return BLOCKED_STREAM;
|
||||
default: return BLOCKED_NONE;
|
||||
}
|
||||
}
|
||||
|
||||
/* If the specified key has clients blocked waiting for list pushes, this
|
||||
* function will put the key reference into the server.ready_keys list.
|
||||
* Note that db->ready_keys is a hash table that allows us to avoid putting
|
||||
@ -641,9 +651,14 @@ void unblockClientWaitingData(client *c) {
|
||||
* made by a script or in the context of MULTI/EXEC.
|
||||
*
|
||||
* The list will be finally processed by handleClientsBlockedOnKeys() */
|
||||
void signalKeyAsReady(redisDb *db, robj *key) {
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type) {
|
||||
readyList *rl;
|
||||
|
||||
/* If no clients are blocked on this type, just return */
|
||||
int btype = getBlockedTypeByType(type);
|
||||
if (btype == BLOCKED_NONE || !server.blocked_clients_by_type[btype])
|
||||
return;
|
||||
|
||||
/* No clients blocking for this key? No need to queue it. */
|
||||
if (dictFind(db->blocking_keys,key) == NULL) return;
|
||||
|
||||
@ -664,4 +679,3 @@ void signalKeyAsReady(redisDb *db, robj *key) {
|
||||
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
|
||||
}
|
||||
|
||||
|
||||
|
10
src/db.c
10
src/db.c
@ -181,10 +181,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
|
||||
int retval = dictAdd(db->dict, copy, val);
|
||||
|
||||
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
||||
if (val->type == OBJ_LIST ||
|
||||
val->type == OBJ_ZSET ||
|
||||
val->type == OBJ_STREAM)
|
||||
signalKeyAsReady(db, key);
|
||||
signalKeyAsReady(db, key, val->type);
|
||||
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
|
||||
}
|
||||
|
||||
@ -1089,10 +1086,7 @@ void scanDatabaseForReadyLists(redisDb *db) {
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *key = dictGetKey(de);
|
||||
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
|
||||
if (value && (value->type == OBJ_LIST ||
|
||||
value->type == OBJ_STREAM ||
|
||||
value->type == OBJ_ZSET))
|
||||
signalKeyAsReady(db, key);
|
||||
if (value) signalKeyAsReady(db, key, value->type);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
|
@ -4579,7 +4579,7 @@ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleC
|
||||
* all the clients blocked for this key will get their reply callback called,
|
||||
* and if the callback returns REDISMODULE_OK the client will be unblocked. */
|
||||
void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
|
||||
signalKeyAsReady(ctx->client->db, key);
|
||||
signalKeyAsReady(ctx->client->db, key, OBJ_MODULE);
|
||||
}
|
||||
|
||||
/* Implements RM_UnblockClient() and moduleUnblockClient(). */
|
||||
|
@ -2189,7 +2189,7 @@ void replyToBlockedClientTimedOut(client *c);
|
||||
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
|
||||
void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key);
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
|
@ -1299,8 +1299,7 @@ void xaddCommand(client *c) {
|
||||
|
||||
/* We need to signal to blocked clients that there is new data on this
|
||||
* stream. */
|
||||
if (server.blocked_clients_by_type[BLOCKED_STREAM])
|
||||
signalKeyAsReady(c->db, c->argv[1]);
|
||||
signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
|
||||
}
|
||||
|
||||
/* XRANGE/XREVRANGE actual implementation. */
|
||||
@ -1876,7 +1875,7 @@ NULL
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
|
||||
c->argv[2],c->db->id);
|
||||
/* We want to unblock any XREADGROUP consumers with -NOGROUP. */
|
||||
signalKeyAsReady(c->db,c->argv[2]);
|
||||
signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM);
|
||||
} else {
|
||||
addReply(c,shared.czero);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user