Optimize __ziplistCascadeUpdate algorithm (#6886)

The previous algorithm is of O(n^2) time complexity.
It would have run through the ziplist entries one by one, each time doing a `realloc` and a
`memmove` (moving the entire tail of the ziplist).

The new algorithm is O(n), it runs over all the records once, computing the size of the `realloc`
needed, then does one `realloc`, and run thought the records again doing many smaller `memmove`s,
each time moving just one record.

So this change reduces many reallocs, and moves each record just once.

Co-authored-by: zhumaohua <zhumaohua@megvii.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
maohuazhu 2020-08-28 09:22:35 -05:00 committed by GitHub
parent c01e94a431
commit ee4a15aae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -614,62 +614,99 @@ unsigned char *ziplistResize(unsigned char *zl, unsigned int len) {
* The pointer "p" points to the first entry that does NOT need to be
* updated, i.e. consecutive fields MAY need an update. */
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;
zlentry cur;
size_t prevlen, prevlensize, prevoffset; /* Informat of the last changed entry. */
size_t firstentrylen; /* Used to handle insert at head. */
size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl));
size_t extra = 0, cnt = 0, offset;
size_t delta = 4; /* Extra bytes needed to update a entry's prevlen (5-1). */
unsigned char *tail = zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl));
/* Empty ziplist */
if (p[0] == ZIP_END) return zl;
zipEntry(p, &cur);
firstentrylen = prevlen = cur.headersize + cur.len;
prevlensize = zipStorePrevEntryLength(NULL, prevlen);
prevoffset = p - zl;
p += prevlen;
/* Iterate ziplist to find out how many extra bytes do we need to update it. */
while (p[0] != ZIP_END) {
zipEntry(p, &cur);
rawlen = cur.headersize + cur.len;
rawlensize = zipStorePrevEntryLength(NULL,rawlen);
/* Abort if there is no next entry. */
if (p[rawlen] == ZIP_END) break;
zipEntry(p+rawlen, &next);
/* Abort when "prevlen" has not changed. */
if (next.prevrawlen == rawlen) break;
if (cur.prevrawlen == prevlen) break;
if (next.prevrawlensize < rawlensize) {
/* The "prevlen" field of "next" needs more bytes to hold
* the raw length of "cur". */
offset = p-zl;
extra = rawlensize-next.prevrawlensize;
zl = ziplistResize(zl,curlen+extra);
p = zl+offset;
/* Current pointer and offset for next element. */
np = p+rawlen;
noffset = np-zl;
/* Update tail offset when next element is not the tail element. */
if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}
/* Move the tail to the back. */
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
zipStorePrevEntryLength(np,rawlen);
/* Advance the cursor */
p += rawlen;
curlen += extra;
} else {
if (next.prevrawlensize > rawlensize) {
/* This would result in shrinking, which we want to avoid.
* So, set "rawlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
/* Abort when entry's "prevlensize" is big enough. */
if (cur.prevrawlensize >= prevlensize) {
if (cur.prevrawlensize == prevlensize) {
zipStorePrevEntryLength(p, prevlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
/* This would result in shrinking, which we want to avoid.
* So, set "prevlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p, prevlen);
}
/* Stop here, as the raw length of "next" has not changed. */
break;
}
/* cur.prevrawlen means cur is the former head entry. */
assert(cur.prevrawlen == 0 || cur.prevrawlen + delta == prevlen);
/* Update prev entry's info and advance the cursor. */
rawlen = cur.headersize + cur.len;
prevlen = rawlen + delta;
prevlensize = zipStorePrevEntryLength(NULL, prevlen);
prevoffset = p - zl;
p += rawlen;
extra += delta;
cnt++;
}
/* Extra bytes is zero all update has been done(or no need to update). */
if (extra == 0) return zl;
/* Update tail offset after loop. */
if (tail == zl + prevoffset) {
/* When the the last entry we need to update is also the tail, update tail offset
* unless this is the only entry that was updated (so the tail offset didn't change). */
if (extra - delta != 0) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra-delta);
}
} else {
/* Update the tail offset in cases where the last entry we updated is not the tail. */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}
/* Now "p" points at the first unchanged byte in original ziplist,
* move data after that to new ziplist. */
offset = p - zl;
zl = ziplistResize(zl, curlen + extra);
p = zl + offset;
memmove(p + extra, p, curlen - offset - 1);
p += extra;
/* Iterate all entries that need to be updated tail to head. */
while (cnt) {
zipEntry(zl + prevoffset, &cur);
rawlen = cur.headersize + cur.len;
/* Move entry to tail and reset prevlen. */
memmove(p - (rawlen - cur.prevrawlensize),
zl + prevoffset + cur.prevrawlensize,
rawlen - cur.prevrawlensize);
p -= (rawlen + delta);
if (cur.prevrawlen == 0) {
/* "cur" is the previous head entry, update its prevlen with firstentrylen. */
zipStorePrevEntryLength(p, firstentrylen);
} else {
/* An entry's prevlen can only increment 4 bytes. */
zipStorePrevEntryLength(p, cur.prevrawlen+delta);
}
/* Foward to previous entry. */
prevoffset -= cur.prevrawlen;
cnt--;
}
return zl;
}
@ -1379,6 +1416,30 @@ static void verify(unsigned char *zl, zlentry *e) {
}
}
static unsigned char *insertHelper(unsigned char *zl, char ch, size_t len, unsigned char *pos) {
assert(len <= ZIP_BIG_PREVLEN);
unsigned char data[ZIP_BIG_PREVLEN] = {0};
memset(data, ch, len);
return ziplistInsert(zl, pos, data, len);
}
static int compareHelper(unsigned char *zl, char ch, size_t len, int index) {
assert(len <= ZIP_BIG_PREVLEN);
unsigned char data[ZIP_BIG_PREVLEN] = {0};
memset(data, ch, len);
unsigned char *p = ziplistIndex(zl, index);
assert(p != NULL);
return ziplistCompare(p, data, len);
}
static size_t strEntryBytesSmall(size_t slen) {
return slen + zipStorePrevEntryLength(NULL, 0) + zipStoreEntryEncoding(NULL, 0, slen);
}
static size_t strEntryBytesLarge(size_t slen) {
return slen + zipStorePrevEntryLength(NULL, ZIP_BIG_PREVLEN) + zipStoreEntryEncoding(NULL, 0, slen);
}
int ziplistTest(int argc, char **argv) {
unsigned char *zl, *p;
unsigned char *entry;
@ -1927,6 +1988,100 @@ int ziplistTest(int argc, char **argv) {
stress(ZIPLIST_TAIL,100000,16384,256);
}
printf("Stress __ziplistCascadeUpdate:\n");
{
char data[ZIP_BIG_PREVLEN];
zl = ziplistNew();
for (int i = 0; i < 100000; i++) {
zl = ziplistPush(zl, (unsigned char*)data, ZIP_BIG_PREVLEN-4, ZIPLIST_TAIL);
}
unsigned long long start = usec();
zl = ziplistPush(zl, (unsigned char*)data, ZIP_BIG_PREVLEN-3, ZIPLIST_HEAD);
printf("Done. usec=%lld\n\n", usec()-start);
zfree(zl);
}
printf("Edge cases of __ziplistCascadeUpdate:\n");
{
/* Inserting a entry with data length greater than ZIP_BIG_PREVLEN-4
* will leads to cascade update. */
size_t s1 = ZIP_BIG_PREVLEN-4, s2 = ZIP_BIG_PREVLEN-3;
zl = ziplistNew();
zlentry e[4] = {{.prevrawlensize = 0, .prevrawlen = 0, .lensize = 0,
.len = 0, .headersize = 0, .encoding = 0, .p = NULL}};
zl = insertHelper(zl, 'a', s1, ZIPLIST_ENTRY_HEAD(zl));
verify(zl, e);
assert(e[0].prevrawlensize == 1 && e[0].prevrawlen == 0);
assert(compareHelper(zl, 'a', s1, 0));
ziplistRepr(zl);
/* No expand. */
zl = insertHelper(zl, 'b', s1, ZIPLIST_ENTRY_HEAD(zl));
verify(zl, e);
assert(e[0].prevrawlensize == 1 && e[0].prevrawlen == 0);
assert(compareHelper(zl, 'b', s1, 0));
assert(e[1].prevrawlensize == 1 && e[1].prevrawlen == strEntryBytesSmall(s1));
assert(compareHelper(zl, 'a', s1, 1));
ziplistRepr(zl);
/* Expand(tail included). */
zl = insertHelper(zl, 'c', s2, ZIPLIST_ENTRY_HEAD(zl));
verify(zl, e);
assert(e[0].prevrawlensize == 1 && e[0].prevrawlen == 0);
assert(compareHelper(zl, 'c', s2, 0));
assert(e[1].prevrawlensize == 5 && e[1].prevrawlen == strEntryBytesSmall(s2));
assert(compareHelper(zl, 'b', s1, 1));
assert(e[2].prevrawlensize == 5 && e[2].prevrawlen == strEntryBytesLarge(s1));
assert(compareHelper(zl, 'a', s1, 2));
ziplistRepr(zl);
/* Expand(only previous head entry). */
zl = insertHelper(zl, 'd', s2, ZIPLIST_ENTRY_HEAD(zl));
verify(zl, e);
assert(e[0].prevrawlensize == 1 && e[0].prevrawlen == 0);
assert(compareHelper(zl, 'd', s2, 0));
assert(e[1].prevrawlensize == 5 && e[1].prevrawlen == strEntryBytesSmall(s2));
assert(compareHelper(zl, 'c', s2, 1));
assert(e[2].prevrawlensize == 5 && e[2].prevrawlen == strEntryBytesLarge(s2));
assert(compareHelper(zl, 'b', s1, 2));
assert(e[3].prevrawlensize == 5 && e[3].prevrawlen == strEntryBytesLarge(s1));
assert(compareHelper(zl, 'a', s1, 3));
ziplistRepr(zl);
/* Delete from mid. */
unsigned char *p = ziplistIndex(zl, 2);
zl = ziplistDelete(zl, &p);
verify(zl, e);
assert(e[0].prevrawlensize == 1 && e[0].prevrawlen == 0);
assert(compareHelper(zl, 'd', s2, 0));
assert(e[1].prevrawlensize == 5 && e[1].prevrawlen == strEntryBytesSmall(s2));
assert(compareHelper(zl, 'c', s2, 1));
assert(e[2].prevrawlensize == 5 && e[2].prevrawlen == strEntryBytesLarge(s2));
assert(compareHelper(zl, 'a', s1, 2));
ziplistRepr(zl);
zfree(zl);
}
return 0;
}
#endif