Fix accidental deletion of sinterstore command when we meet wrong type error. (#9032)

SINTERSTORE would have deleted the dest key right away,
even when later on it is bound to fail on an (WRONGTYPE) error.

With this change it first picks up all the input keys, and only later
delete the dest key if one is empty.

Also add more tests for some commands.
Mainly focus on 
- `wrong type error`: 
	expand test case (base on sinter bug) in non-store variant
	add tests for store variant (although it exists in non-store variant, i think it would be better to have same tests)
- the dstkey result when we meet `non-exist key (empty set)` in *store

sdiff:
- improve test case about wrong type error (the one we found in sinter, although it is safe in sdiff)
- add test about using non-exist key (treat it like an empty set)
sdiffstore:
- according to sdiff test case, also add some tests about `wrong type error` and `non-exist key`
- the different is that in sdiffstore, we will consider the `dstkey` result

sunion/sunionstore add more tests (same as above)

sinter/sinterstore also same as above ...
This commit is contained in:
Binbin 2021-06-13 15:53:46 +08:00 committed by GitHub
parent 5517f3624d
commit b8a5da80c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 210 additions and 17 deletions

View File

@ -858,13 +858,28 @@ void sinterGenericCommand(client *c, robj **setkeys,
int64_t intobj;
void *replylen = NULL;
unsigned long j, cardinality = 0;
int encoding;
int encoding, empty = 0;
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
if (!setobj) {
/* A NULL is considered an empty set */
empty += 1;
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/* Set intersection with an empty set always results in an empty set.
* Return ASAP if there is an empty set. */
if (empty > 0) {
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
@ -878,12 +893,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
}
return;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
@ -977,10 +987,12 @@ void sinterGenericCommand(client *c, robj **setkeys,
zfree(sets);
}
/* SINTER key [key ...] */
void sinterCommand(client *c) {
sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}
/* SINTERSTORE destination key [key ...] */
void sinterstoreCommand(client *c) {
sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
}
@ -1150,18 +1162,22 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
zfree(sets);
}
/* SUNION key [key ...] */
void sunionCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION);
}
/* SUNIONSTORE destination key [key ...] */
void sunionstoreCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_UNION);
}
/* SDIFF key [key ...] */
void sdiffCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF);
}
/* SDIFFSTORE destination key [key ...] */
void sdiffstoreCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF);
}

View File

@ -278,14 +278,86 @@ start_server {
}
}
test "SDIFF against non-set should throw error" {
# with an empty set
r set key1{t} x
assert_error "WRONGTYPE*" {r sdiff key1{t} noset{t}}
# different order
assert_error "WRONGTYPE*" {r sdiff noset{t} key1{t}}
# with a legal set
r del set1{t}
r sadd set1{t} a b c
assert_error "WRONGTYPE*" {r sdiff key1{t} set1{t}}
# different order
assert_error "WRONGTYPE*" {r sdiff set1{t} key1{t}}
}
test "SDIFF should handle non existing key as empty" {
r del set1{t} set2{t} set3{t}
r sadd set1{t} a b c
r sadd set2{t} b c d
assert_equal {a} [lsort [r sdiff set1{t} set2{t} set3{t}]]
assert_equal {} [lsort [r sdiff set3{t} set2{t} set1{t}]]
}
test "SDIFFSTORE against non-set should throw error" {
r del set1{t} set2{t} set3{t} key1{t}
r set key1{t} x
# with en empty dstkey
assert_error "WRONGTYPE*" {r SDIFFSTORE set3{t} key1{t} noset{t}}
assert_equal 0 [r exists set3{t}]
assert_error "WRONGTYPE*" {r SDIFFSTORE set3{t} noset{t} key1{t}}
assert_equal 0 [r exists set3{t}]
# with a legal dstkey
r sadd set1{t} a b c
r sadd set2{t} b c d
r sadd set3{t} e
assert_error "WRONGTYPE*" {r SDIFFSTORE set3{t} key1{t} set1{t} noset{t}}
assert_equal 1 [r exists set3{t}]
assert_equal {e} [lsort [r smembers set3{t}]]
assert_error "WRONGTYPE*" {r SDIFFSTORE set3{t} set1{t} key1{t} set2{t}}
assert_equal 1 [r exists set3{t}]
assert_equal {e} [lsort [r smembers set3{t}]]
}
test "SDIFFSTORE should handle non existing key as empty" {
r del set1{t} set2{t} set3{t}
r set setres{t} xxx
assert_equal 0 [r sdiffstore setres{t} foo111{t} bar222{t}]
assert_equal 0 [r exists setres{t}]
# with a legal dstkey, should delete dstkey
r sadd set3{t} a b c
assert_equal 0 [r sdiffstore set3{t} set1{t} set2{t}]
assert_equal 0 [r exists set3{t}]
r sadd set1{t} a b c
assert_equal 3 [r sdiffstore set3{t} set1{t} set2{t}]
assert_equal 1 [r exists set3{t}]
assert_equal {a b c} [lsort [r smembers set3{t}]]
# with a legal dstkey and empty set2, should delete the dstkey
r sadd set3{t} a b c
assert_equal 0 [r sdiffstore set3{t} set2{t} set1{t}]
assert_equal 0 [r exists set3{t}]
}
test "SINTER against non-set should throw error" {
r set key1{t} x
assert_error "WRONGTYPE*" {r sinter key1{t} noset{t}}
}
# different order
assert_error "WRONGTYPE*" {r sinter noset{t} key1{t}}
test "SUNION against non-set should throw error" {
r set key1{t} x
assert_error "WRONGTYPE*" {r sunion key1{t} noset{t}}
r sadd set1{t} a b c
assert_error "WRONGTYPE*" {r sinter key1{t} set1{t}}
# different order
assert_error "WRONGTYPE*" {r sinter set1{t} key1{t}}
}
test "SINTER should handle non existing key as empty" {
@ -305,10 +377,115 @@ start_server {
lsort [r sinter set1{t} set2{t}]
} {1 2 3}
test "SINTERSTORE against non-set should throw error" {
r del set1{t} set2{t} set3{t} key1{t}
r set key1{t} x
# with en empty dstkey
assert_error "WRONGTYPE*" {r sinterstore set3{t} key1{t} noset{t}}
assert_equal 0 [r exists set3{t}]
assert_error "WRONGTYPE*" {r sinterstore set3{t} noset{t} key1{t}}
assert_equal 0 [r exists set3{t}]
# with a legal dstkey
r sadd set1{t} a b c
r sadd set2{t} b c d
r sadd set3{t} e
assert_error "WRONGTYPE*" {r sinterstore set3{t} key1{t} set2{t} noset{t}}
assert_equal 1 [r exists set3{t}]
assert_equal {e} [lsort [r smembers set3{t}]]
assert_error "WRONGTYPE*" {r sinterstore set3{t} noset{t} key1{t} set2{t}}
assert_equal 1 [r exists set3{t}]
assert_equal {e} [lsort [r smembers set3{t}]]
}
test "SINTERSTORE against non existing keys should delete dstkey" {
r del set1{t} set2{t} set3{t}
r set setres{t} xxx
assert_equal 0 [r sinterstore setres{t} foo111{t} bar222{t}]
assert_equal 0 [r exists setres{t}]
# with a legal dstkey
r sadd set3{t} a b c
assert_equal 0 [r sinterstore set3{t} set1{t} set2{t}]
assert_equal 0 [r exists set3{t}]
r sadd set1{t} a b c
assert_equal 0 [r sinterstore set3{t} set1{t} set2{t}]
assert_equal 0 [r exists set3{t}]
assert_equal 0 [r sinterstore set3{t} set2{t} set1{t}]
assert_equal 0 [r exists set3{t}]
}
test "SUNION against non-set should throw error" {
r set key1{t} x
assert_error "WRONGTYPE*" {r sunion key1{t} noset{t}}
# different order
assert_error "WRONGTYPE*" {r sunion noset{t} key1{t}}
r del set1{t}
r sadd set1{t} a b c
assert_error "WRONGTYPE*" {r sunion key1{t} set1{t}}
# different order
assert_error "WRONGTYPE*" {r sunion set1{t} key1{t}}
}
test "SUNION should handle non existing key as empty" {
r del set1{t} set2{t} set3{t}
r sadd set1{t} a b c
r sadd set2{t} b c d
assert_equal {a b c d} [lsort [r sunion set1{t} set2{t} set3{t}]]
}
test "SUNIONSTORE against non-set should throw error" {
r del set1{t} set2{t} set3{t} key1{t}
r set key1{t} x
# with en empty dstkey
assert_error "WRONGTYPE*" {r sunionstore set3{t} key1{t} noset{t}}
assert_equal 0 [r exists set3{t}]
assert_error "WRONGTYPE*" {r sunionstore set3{t} noset{t} key1{t}}
assert_equal 0 [r exists set3{t}]
# with a legal dstkey
r sadd set1{t} a b c
r sadd set2{t} b c d
r sadd set3{t} e
assert_error "WRONGTYPE*" {r sunionstore set3{t} key1{t} key2{t} noset{t}}
assert_equal 1 [r exists set3{t}]
assert_equal {e} [lsort [r smembers set3{t}]]
assert_error "WRONGTYPE*" {r sunionstore set3{t} noset{t} key1{t} key2{t}}
assert_equal 1 [r exists set3{t}]
assert_equal {e} [lsort [r smembers set3{t}]]
}
test "SUNIONSTORE should handle non existing key as empty" {
r del set1{t} set2{t} set3{t}
r set setres{t} xxx
assert_equal 0 [r sunionstore setres{t} foo111{t} bar222{t}]
assert_equal 0 [r exists setres{t}]
# set1 set2 both empty, should delete the dstkey
r sadd set3{t} a b c
assert_equal 0 [r sunionstore set3{t} set1{t} set2{t}]
assert_equal 0 [r exists set3{t}]
r sadd set1{t} a b c
r sadd set3{t} e f
assert_equal 3 [r sunionstore set3{t} set1{t} set2{t}]
assert_equal 1 [r exists set3{t}]
assert_equal {a b c} [lsort [r smembers set3{t}]]
r sadd set3{t} d
assert_equal 3 [r sunionstore set3{t} set2{t} set1{t}]
assert_equal 1 [r exists set3{t}]
assert_equal {a b c} [lsort [r smembers set3{t}]]
}
test "SUNIONSTORE against non existing keys should delete dstkey" {