Delete empty key if fails after moduleCreateEmptyKey() in module (#12129)

When `RM_ZsetAdd()`/`RM_ZsetIncrby()`/`RM_StreamAdd()` fails, if a new key happens to 
be created using `moduleCreateEmptyKey()`, we should clean up the empty key.

## Test
1) Add new module commands(`zset.add` and `zset.incrby`) to cover  `RM_ZsetAdd()`/`RM_ZsetIncrby()`.
2) Add a large-memory test to cover `RM_StreamAdd()`.
This commit is contained in:
sundb 2023-05-07 15:13:19 +08:00 committed by GitHub
parent b0dd7b3245
commit ce5f4ea3a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 6 deletions

View File

@ -4695,6 +4695,7 @@ int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *f
if (flagsptr) in_flags = moduleZsetAddFlagsToCoreFlags(*flagsptr); if (flagsptr) in_flags = moduleZsetAddFlagsToCoreFlags(*flagsptr);
if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,NULL) == 0) { if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,NULL) == 0) {
if (flagsptr) *flagsptr = 0; if (flagsptr) *flagsptr = 0;
moduleDelKeyIfEmpty(key);
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags); if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags);
@ -4723,6 +4724,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int
in_flags |= ZADD_IN_INCR; in_flags |= ZADD_IN_INCR;
if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,newscore) == 0) { if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,newscore) == 0) {
if (flagsptr) *flagsptr = 0; if (flagsptr) *flagsptr = 0;
moduleDelKeyIfEmpty(key);
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags); if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags);
@ -5405,6 +5407,7 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM
/* Either the ID not greater than all existing IDs in the stream, or /* Either the ID not greater than all existing IDs in the stream, or
* the elements are too large to be stored. either way, errno is already * the elements are too large to be stored. either way, errno is already
* set by streamAppendItem. */ * set by streamAppendItem. */
if (created) moduleDelKeyIfEmpty(key);
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
/* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey() /* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey()

View File

@ -1,4 +1,6 @@
#include "redismodule.h" #include "redismodule.h"
#include <math.h>
#include <errno.h>
/* ZSET.REM key element /* ZSET.REM key element
* *
@ -17,14 +19,73 @@ int zset_rem(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return RedisModule_ReplyWithError(ctx, "ERR ZsetRem failed"); return RedisModule_ReplyWithError(ctx, "ERR ZsetRem failed");
} }
/* ZSET.ADD key score member
*
* Adds a specified member with the specified score to the sorted
* set stored at key.
*/
int zset_add(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 4) return RedisModule_WrongArity(ctx);
RedisModule_AutoMemory(ctx);
int keymode = REDISMODULE_READ | REDISMODULE_WRITE;
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], keymode);
size_t len;
double score;
char *endptr;
const char *str = RedisModule_StringPtrLen(argv[2], &len);
score = strtod(str, &endptr);
if (*endptr != '\0' || errno == ERANGE)
return RedisModule_ReplyWithError(ctx, "value is not a valid float");
if (RedisModule_ZsetAdd(key, score, argv[3], NULL) == REDISMODULE_OK)
return RedisModule_ReplyWithSimpleString(ctx, "OK");
else
return RedisModule_ReplyWithError(ctx, "ERR ZsetAdd failed");
}
/* ZSET.INCRBY key member increment
*
* Increments the score stored at member in the sorted set stored at key by increment.
* Replies with the new score of this element.
*/
int zset_incrby(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 4) return RedisModule_WrongArity(ctx);
RedisModule_AutoMemory(ctx);
int keymode = REDISMODULE_READ | REDISMODULE_WRITE;
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], keymode);
size_t len;
double score, newscore;
char *endptr;
const char *str = RedisModule_StringPtrLen(argv[3], &len);
score = strtod(str, &endptr);
if (*endptr != '\0' || errno == ERANGE)
return RedisModule_ReplyWithError(ctx, "value is not a valid float");
if (RedisModule_ZsetIncrby(key, score, argv[2], NULL, &newscore) == REDISMODULE_OK)
return RedisModule_ReplyWithDouble(ctx, newscore);
else
return RedisModule_ReplyWithError(ctx, "ERR ZsetIncrby failed");
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc); REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "zset", 1, REDISMODULE_APIVER_1) == if (RedisModule_Init(ctx, "zset", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
REDISMODULE_OK &&
RedisModule_CreateCommand(ctx, "zset.rem", zset_rem, "write",
1, 1, 1) == REDISMODULE_OK)
return REDISMODULE_OK;
else
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "zset.rem", zset_rem, "write",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "zset.add", zset_add, "write",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "zset.incrby", zset_incrby, "write",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
} }

View File

@ -61,6 +61,23 @@ start_server {tags {"modules"}} {
assert_equal $result $n assert_equal $result $n
} }
test {Module stream XADD big fields doesn't create empty key} {
set original_proto [config_get_set proto-max-bulk-len 2147483647] ;#2gb
set original_query [config_get_set client-query-buffer-limit 2147483647] ;#2gb
r del mystream
r write "*4\r\n\$10\r\nstream.add\r\n\$8\r\nmystream\r\n\$5\r\nfield\r\n"
catch {
write_big_bulk 1073741824 ;#1gb
} err
assert {$err eq "ERR StreamAdd failed"}
assert_equal 0 [r exists mystream]
# restore defaults
r config set proto-max-bulk-len $original_proto
r config set client-query-buffer-limit $original_query
} {OK} {large-memory}
test {Module stream iterator} { test {Module stream iterator} {
r del mystream r del mystream
set streamid1 [r xadd mystream * item 1 value a] set streamid1 [r xadd mystream * item 1 value a]

View File

@ -14,6 +14,26 @@ start_server {tags {"modules"}} {
assert_equal 0 [r exists k] assert_equal 0 [r exists k]
} }
test {Module zset add} {
r del k
# Check that failure does not create empty key
assert_error "ERR ZsetAdd failed" {r zset.add k nan hello}
assert_equal 0 [r exists k]
r zset.add k 100 hello
assert_equal {hello 100} [r zrange k 0 -1 withscores]
}
test {Module zset incrby} {
r del k
# Check that failure does not create empty key
assert_error "ERR ZsetIncrby failed" {r zset.incrby k hello nan}
assert_equal 0 [r exists k]
r zset.incrby k hello 100
assert_equal {hello 100} [r zrange k 0 -1 withscores]
}
test "Unload the module - zset" { test "Unload the module - zset" {
assert_equal {OK} [r module unload zset] assert_equal {OK} [r module unload zset]
} }