Fix crash due to delete entry from compress quicklistNode and wrongly split quicklistNode (#11242)

This PR mainly deals with 2 crashes introduced in #9357,
and fix the QUICKLIST-PACKED-THRESHOLD mess in external test mode.

1. Fix crash due to deleting an entry from a compress quicklistNode
   When inserting a large element, we need to create a new quicklistNode first,
   and then delete its previous element, if the node where the deleted element is
   located is compressed, it will cause a crash.
   Now add `dont_compress` to quicklistNode, if we want to use a quicklistNode
   after some operation, we can use this flag like following:

    ```c
    node->dont_compress = 1; /* Prevent to be compressed */
    some_operation(node); /* This operation might try to compress this node */
    some_other_operation(node); /* We can use this node without decompress it */
    node->dont_compress = 0; /* Re-able compression */
    quicklistCompressNode(node);
    ```

   Perhaps in the future, we could just disable the current entry from being
   compressed during the iterator loop, but that would require more work.

2. Fix crash due to wrongly split quicklist
   before #9357, the offset param of _quicklistSplitNode() will not negative.
   For now, when offset is negative, the split extent will be wrong.
   following example:
    ```c
    int orig_start = after ? offset + 1 : 0;
    int orig_extent = after ? -1 : offset;
    int new_start = after ? 0 : offset;
    int new_extent = after ? offset + 1 : -1;
    # offset: -2, after: 1, node->count: 2
    # current wrong range: [-1,-1] [0,-1]
    # correct range: [1,-1] [0, 1]
    ```

   Because only `_quicklistInsert()` splits the quicklistNode and only
   `quicklistInsertAfter()`, `quicklistInsertBefore()` call _quicklistInsert(), 
   so `quicklistReplaceEntry()` and `listTypeInsert()` might occur this crash.
   But the iterator of `listTypeInsert()` is alway from head to tail(iter->offset is
   always positive), so it is not affected.
   The final conclusion is this crash only occur when we insert a large element
   with negative index into a list, that affects `LSET` command and `RM_ListSet`
   module api.
     
3. In external test mode, we need to restore quicklist packed threshold after
   when the end of test.
4. Show `node->count` in quicklistRepr().
5. Add new tcl proc `config_get_set` to support restoring config in tests.
This commit is contained in:
sundb 2022-09-19 14:47:52 +08:00 committed by GitHub
parent 464aa04188
commit 13d25dd95e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 17 deletions

View File

@ -466,7 +466,7 @@ void debugCommand(client *c) {
" default.",
"QUICKLIST-PACKED-THRESHOLD <size>",
" Sets the threshold for elements to be inserted as plain vs packed nodes",
" Default value is 1GB, allows values up to 4GB",
" Default value is 1GB, allows values up to 4GB. Setting to 0 restores to default.",
"SET-SKIP-CHECKSUM-VALIDATION <0|1>",
" Enables or disables checksum checks for RDB files and RESTORE's payload.",
"SLEEP <seconds>",

View File

@ -57,6 +57,8 @@ int quicklistisSetPackedThreshold(size_t sz) {
/* Don't allow threshold to be set above or even slightly below 4GB */
if (sz > (1ull<<32) - (1<<20)) {
return 0;
} else if (sz == 0) { /* 0 means restore threshold */
sz = (1 << 30);
}
packed_threshold = sz;
return 1;
@ -177,6 +179,7 @@ REDIS_STATIC quicklistNode *quicklistCreateNode(void) {
node->encoding = QUICKLIST_NODE_ENCODING_RAW;
node->container = QUICKLIST_NODE_CONTAINER_PACKED;
node->recompress = 0;
node->dont_compress = 0;
return node;
}
@ -212,6 +215,7 @@ REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
#ifdef REDIS_TEST
node->attempted_compress = 1;
#endif
if (node->dont_compress) return 0;
/* validate that the node is neither
* tail nor head (it has prev and next)*/
@ -748,12 +752,15 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
__quicklistDelNode(quicklist, entry->node);
}
} else {
entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */
quicklistInsertAfter(iter, entry, data, sz);
if (entry->node->count == 1) {
__quicklistDelNode(quicklist, entry->node);
} else {
unsigned char *p = lpSeek(entry->node->entry, -1);
quicklistDelIndex(quicklist, entry->node, &p);
entry->node->dont_compress = 0; /* Re-enable compression */
quicklistCompress(quicklist, entry->node);
quicklistCompress(quicklist, entry->node->next);
}
}
@ -905,6 +912,9 @@ REDIS_STATIC quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset,
/* Copy original listpack so we can split it */
memcpy(new_node->entry, node->entry, zl_sz);
/* Need positive offset for calculating extent below. */
if (offset < 0) offset = node->count + offset;
/* Ranges to be trimmed: -1 here means "continue deleting until the list ends" */
int orig_start = after ? offset + 1 : 0;
int orig_extent = after ? -1 : offset;
@ -1608,10 +1618,11 @@ void quicklistRepr(unsigned char *ql, int full) {
while(node != NULL) {
printf("{quicklist node(%d)\n", i++);
printf("{container : %s, encoding: %s, size: %zu, recompress: %d, attempted_compress: %d}\n",
printf("{container : %s, encoding: %s, size: %zu, count: %d, recompress: %d, attempted_compress: %d}\n",
QL_NODE_IS_PLAIN(node) ? "PLAIN": "PACKED",
(node->encoding == QUICKLIST_NODE_ENCODING_RAW) ? "RAW": "LZF",
node->sz,
node->count,
node->recompress,
node->attempted_compress);

View File

@ -53,7 +53,8 @@ typedef struct quicklistNode {
unsigned int container : 2; /* PLAIN==1 or PACKED==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
unsigned int dont_compress : 1; /* prevent compression of entry that will be used later */
unsigned int extra : 9; /* more bits to steal for future usage */
} quicklistNode;
/* quicklistLZF is a 8+N byte struct holding 'sz' followed by 'compressed'.

View File

@ -928,6 +928,12 @@ proc config_set {param value {options {}}} {
}
}
proc config_get_set {param value {options {}}} {
set config [lindex [r config get $param] 1]
config_set $param $value $options
return $config
}
proc delete_lines_with_pattern {filename tmpfilename pattern} {
set fh_in [open $filename r]
set fh_out [open $tmpfilename w]

View File

@ -66,7 +66,8 @@ start_server [list overrides [list save ""] ] {
assert_equal [r lpop list4] [string repeat c 500]
assert_equal [r lpop list4] [string repeat b 500]
assert_equal [r lpop list4] [string repeat a 500]
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
test {plain node check compression with ltrim} {
r debug quicklist-packed-threshold 1b
@ -75,8 +76,9 @@ start_server [list overrides [list save ""] ] {
r rpush list5 [string repeat c 500]
assert_equal [string repeat b 500] [r lindex list5 1]
r LTRIM list5 1 -1
r llen list5
} {2} {needs:debug}
assert_equal [r llen list5] 2
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
test {plain node check compression using lset} {
r debug quicklist-packed-threshold 1b
@ -86,7 +88,8 @@ start_server [list overrides [list save ""] ] {
r lpush list6 [string repeat c 500]
r LSET list6 0 [string repeat d 500]
assert_equal [string repeat d 500] [r lindex list6 0]
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# revert config for external mode tests.
r config set list-compress-depth 0
@ -115,7 +118,8 @@ start_server [list overrides [list save ""] ] {
r lpush lst bb
r debug reload
assert_equal [r rpop lst] "xxxxxxxxxx"
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# basic command check for plain nodes - "LINDEX & LINSERT"
test {Test LINDEX and LINSERT on plain nodes} {
@ -129,7 +133,8 @@ start_server [list overrides [list save ""] ] {
r linsert lst BEFORE "9" "7"
r linsert lst BEFORE "9" "xxxxxxxxxxx"
assert {[r lindex lst 3] eq "xxxxxxxxxxx"}
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# basic command check for plain nodes - "LTRIM"
test {Test LTRIM on plain nodes} {
@ -140,7 +145,8 @@ start_server [list overrides [list save ""] ] {
r lpush lst1 9
r LTRIM lst1 1 -1
assert_equal [r llen lst1] 2
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# basic command check for plain nodes - "LREM"
test {Test LREM on plain nodes} {
@ -153,7 +159,8 @@ start_server [list overrides [list save ""] ] {
r lpush lst 9
r LREM lst -2 "one"
assert_equal [r llen lst] 2
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# basic command check for plain nodes - "LPOS"
test {Test LPOS on plain nodes} {
@ -164,7 +171,8 @@ start_server [list overrides [list save ""] ] {
r RPUSH lst "cc"
r LSET lst 0 "xxxxxxxxxxx"
assert_equal [r LPOS lst "xxxxxxxxxxx"] 0
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# basic command check for plain nodes - "LMOVE"
test {Test LMOVE on plain nodes} {
@ -183,7 +191,8 @@ start_server [list overrides [list save ""] ] {
assert_equal [r lpop lst2{t}] "cc"
assert_equal [r lpop lst{t}] "dd"
assert_equal [r lpop lst{t}] "xxxxxxxxxxx"
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# testing LSET with combinations of node types
# plain->packed , packed->plain, plain->plain, packed->packed
@ -206,7 +215,8 @@ start_server [list overrides [list save ""] ] {
r lset lst 0 "cc"
set s1 [r lpop lst]
assert_equal $s1 "cc"
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# checking LSET in case ziplist needs to be split
test {Test LSET with packed is split in the middle} {
@ -223,14 +233,15 @@ start_server [list overrides [list save ""] ] {
assert_equal [r lpop lst] [string repeat e 10]
assert_equal [r lpop lst] "dd"
assert_equal [r lpop lst] "ee"
} {} {needs:debug}
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
# repeating "plain check LSET with combinations"
# but now with single item in each ziplist
test {Test LSET with packed consist only one item} {
r flushdb
r config set list-max-ziplist-size 1
set original_config [config_get_set list-max-ziplist-size 1]
r debug quicklist-packed-threshold 1b
r RPUSH lst "aa"
r RPUSH lst "bb"
@ -249,7 +260,46 @@ start_server [list overrides [list save ""] ] {
r lset lst 0 "cc"
set s1 [r lpop lst]
assert_equal $s1 "cc"
} {} {needs:debug}
r debug quicklist-packed-threshold 0
r config set list-max-ziplist-size $original_config
} {OK} {needs:debug}
test {Crash due to delete entry from a compress quicklist node} {
r flushdb
r debug quicklist-packed-threshold 100b
set original_config [config_get_set list-compress-depth 1]
set small_ele [string repeat x 32]
set large_ele [string repeat x 100]
# Push a large element
r RPUSH lst $large_ele
# Insert two elements and keep them in the same node
r RPUSH lst $small_ele
r RPUSH lst $small_ele
# When setting the position of -1 to a large element, we first insert
# a large element at the end and then delete its previous element.
r LSET lst -1 $large_ele
assert_equal "$large_ele $small_ele $large_ele" [r LRANGE lst 0 -1]
r debug quicklist-packed-threshold 0
r config set list-compress-depth $original_config
} {OK} {needs:debug}
test {Crash due to split quicklist node wrongly} {
r flushdb
r debug quicklist-packed-threshold 10b
r LPUSH lst "aa"
r LPUSH lst "bb"
r LSET lst -2 [string repeat x 10]
r RPOP lst
assert_equal [string repeat x 10] [r LRANGE lst 0 -1]
r debug quicklist-packed-threshold 0
} {OK} {needs:debug}
}
run_solo {list-large-memory} {