Module API for PUBLISH, FLUSHALL, RANDOMKEY, DBSIZE

This commit is contained in:
Oran Agra 2019-11-03 17:35:35 +02:00
parent fdaea2a7a7
commit 87332ce524
4 changed files with 69 additions and 20 deletions

View File

@ -461,6 +461,29 @@ int getFlushCommandFlags(client *c, int *flags) {
return C_OK;
}
/* Flushes the whole server data set. */
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyDb(-1,flags,NULL);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
@ -484,28 +507,9 @@ void flushdbCommand(client *c) {
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
server.dirty += emptyDb(-1,flags,NULL);
flushAllDataAndResetRDB(flags);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* This command implements DEL and LAZYDEL. */

View File

@ -1677,6 +1677,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
return modulePopulateClientInfoStructure(ci,client,structver);
}
/* Publish a message to subscribers (see PUBLISH command). */
int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
UNUSED(ctx);
int receivers = pubsubPublishMessage(channel, message);
if (server.cluster_enabled)
clusterPropagatePublish(channel, message);
return receivers;
}
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@ -1964,6 +1973,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
* If restart_aof is true, you must make sure the command that triggered this call is not
* propagated to the AOF file.
* When async is set to true, db contents will be freed by a background thread. */
void RM_ResetDataset(int restart_aof, int async) {
if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
}
/* Returns the number of keys in the current db. */
unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
return dictSize(ctx->client->db->dict);
}
/* Returns a name of a random key, or NULL if current db is empty. */
RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
robj *key = dbRandomKey(ctx->client->db);
autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
return key;
}
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@ -6630,6 +6661,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
REGISTER_API(ResetDataset);
REGISTER_API(DbSize);
REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@ -6757,6 +6791,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(InfoAddFieldLongLong);
REGISTER_API(InfoAddFieldULongLong);
REGISTER_API(GetClientInfoById);
REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);

View File

@ -416,6 +416,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async);
unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
@ -435,6 +438,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message);
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
@ -619,6 +623,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringTruncate);
REDISMODULE_GET_API(GetExpire);
REDISMODULE_GET_API(SetExpire);
REDISMODULE_GET_API(ResetDataset);
REDISMODULE_GET_API(DbSize);
REDISMODULE_GET_API(RandomKey);
REDISMODULE_GET_API(ZsetAdd);
REDISMODULE_GET_API(ZsetIncrby);
REDISMODULE_GET_API(ZsetScore);
@ -705,6 +712,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(InfoAddFieldLongLong);
REDISMODULE_GET_API(InfoAddFieldULongLong);
REDISMODULE_GET_API(GetClientInfoById);
REDISMODULE_GET_API(PublishMessage);
REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady);

View File

@ -1862,6 +1862,7 @@ void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
void killAppendOnlyChild(void);
void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
@ -2101,6 +2102,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);