From 87332ce524e30d4949d2144a4da15b5ae17e5051 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 3 Nov 2019 17:35:35 +0200 Subject: [PATCH] Module API for PUBLISH, FLUSHALL, RANDOMKEY, DBSIZE --- src/db.c | 44 ++++++++++++++++++++++++-------------------- src/module.c | 35 +++++++++++++++++++++++++++++++++++ src/redismodule.h | 8 ++++++++ src/server.h | 2 ++ 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..ad19b42dd 100644 --- a/src/db.c +++ b/src/db.c @@ -461,6 +461,29 @@ int getFlushCommandFlags(client *c, int *flags) { return C_OK; } +/* Flushes the whole server data set. */ +void flushAllDataAndResetRDB(int flags) { + server.dirty += emptyDb(-1,flags,NULL); + if (server.rdb_child_pid != -1) killRDBChild(); + if (server.saveparamslen > 0) { + /* Normally rdbSave() will reset dirty, but we don't want this here + * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ + int saved_dirty = server.dirty; + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSave(server.rdb_filename,rsiptr); + server.dirty = saved_dirty; + } + server.dirty++; +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif +} + /* FLUSHDB [ASYNC] * * Flushes the currently SELECTed Redis DB. */ @@ -484,28 +507,9 @@ void flushdbCommand(client *c) { * Flushes the whole server data set. */ void flushallCommand(client *c) { int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - server.dirty += emptyDb(-1,flags,NULL); + flushAllDataAndResetRDB(flags); addReply(c,shared.ok); - if (server.rdb_child_pid != -1) killRDBChild(); - if (server.saveparamslen > 0) { - /* Normally rdbSave() will reset dirty, but we don't want this here - * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ - int saved_dirty = server.dirty; - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(server.rdb_filename,rsiptr); - server.dirty = saved_dirty; - } - server.dirty++; -#if defined(USE_JEMALLOC) - /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. - * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchroneus. */ - if (!(flags & EMPTYDB_ASYNC)) - jemalloc_purge(); -#endif } /* This command implements DEL and LAZYDEL. */ diff --git a/src/module.c b/src/module.c index f9f654b42..5453aed47 100644 --- a/src/module.c +++ b/src/module.c @@ -1677,6 +1677,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { return modulePopulateClientInfoStructure(ci,client,structver); } +/* Publish a message to subscribers (see PUBLISH command). */ +int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { + UNUSED(ctx); + int receivers = pubsubPublishMessage(channel, message); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message); + return receivers; +} + /* Return the currently selected DB. */ int RM_GetSelectedDb(RedisModuleCtx *ctx) { return ctx->client->db->id; @@ -1964,6 +1973,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_OK; } +/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled) + * If restart_aof is true, you must make sure the command that triggered this call is not + * propagated to the AOF file. + * When async is set to true, db contents will be freed by a background thread. */ +void RM_ResetDataset(int restart_aof, int async) { + if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly(); + flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); + if (server.aof_enabled && restart_aof) restartAOFAfterSYNC(); +} + +/* Returns the number of keys in the current db. */ +unsigned long long RM_DbSize(RedisModuleCtx *ctx) { + return dictSize(ctx->client->db->dict); +} + +/* Returns a name of a random key, or NULL if current db is empty. */ +RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { + robj *key = dbRandomKey(ctx->client->db); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key); + return key; +} + /* -------------------------------------------------------------------------- * Key API for String type * -------------------------------------------------------------------------- */ @@ -6630,6 +6661,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringTruncate); REGISTER_API(SetExpire); REGISTER_API(GetExpire); + REGISTER_API(ResetDataset); + REGISTER_API(DbSize); + REGISTER_API(RandomKey); REGISTER_API(ZsetAdd); REGISTER_API(ZsetIncrby); REGISTER_API(ZsetScore); @@ -6757,6 +6791,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldLongLong); REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); + REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); diff --git a/src/redismodule.h b/src/redismodule.h index ea0d6a139..77019f89e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -416,6 +416,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen); mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire); +void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async); +unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr); int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore); int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score); @@ -435,6 +438,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos); unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id); +int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message); int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); @@ -619,6 +623,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(StringTruncate); REDISMODULE_GET_API(GetExpire); REDISMODULE_GET_API(SetExpire); + REDISMODULE_GET_API(ResetDataset); + REDISMODULE_GET_API(DbSize); + REDISMODULE_GET_API(RandomKey); REDISMODULE_GET_API(ZsetAdd); REDISMODULE_GET_API(ZsetIncrby); REDISMODULE_GET_API(ZsetScore); @@ -705,6 +712,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldLongLong); REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); + REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); diff --git a/src/server.h b/src/server.h index f724f7d64..4287d1adf 100644 --- a/src/server.h +++ b/src/server.h @@ -1862,6 +1862,7 @@ void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); ssize_t aofReadDiffFromParent(void); void killAppendOnlyChild(void); +void restartAOFAfterSYNC(); /* Child info */ void openChildInfoPipe(void); @@ -2101,6 +2102,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(); int selectDb(client *c, int id);