Add ZSET serialization

This commit is contained in:
Roman Gershman 2022-04-06 17:17:33 +03:00
parent abbefd0bc4
commit 92ebb74500
3 changed files with 81 additions and 35 deletions

View File

@ -9,12 +9,14 @@
#include <absl/strings/str_format.h>
extern "C" {
#include "redis/endianconv.h"
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/rdb.h"
#include "redis/util.h"
#include "redis/ziplist.h"
#include "redis/zmalloc.h"
#include "redis/zset.h"
}
#include "base/logging.h"
@ -130,7 +132,7 @@ uint8_t RdbObjectType(const robj* o) {
break;
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return RDB_TYPE_ZSET_LISTPACK;
return RDB_TYPE_ZSET_ZIPLIST; // we save using the old ziplist encoding.
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return RDB_TYPE_ZSET_2;
break;
@ -213,6 +215,10 @@ error_code RdbSerializer::SaveObject(const robj* o) {
return SaveHSetObject(o);
}
if (o->type == OBJ_ZSET) {
return SaveZSetObject(o);
}
LOG(FATAL) << "Not implemented " << o->type;
return error_code{};
}
@ -268,27 +274,11 @@ error_code RdbSerializer::SaveListObject(const robj* obj) {
lp = decompressed;
}
// listpack, convert to ziplist first.
uint8_t* lpfield = lpFirst(lp);
int64_t entry_len;
uint8_t* entry;
uint8_t buf[32];
uint8_t* zl = ziplistNew();
while (lpfield) {
entry = lpGet(lpfield, &entry_len, buf);
zl = ziplistPush(zl, entry, entry_len, ZIPLIST_TAIL);
lpfield = lpNext(lp, lpfield);
}
size_t ziplen = ziplistBlobLen(zl);
auto cleanup = absl::MakeCleanup([=] {
zfree(zl);
if (decompressed)
zfree(decompressed);
});
RETURN_ON_ERR(SaveString(string_view{reinterpret_cast<char*>(zl), ziplen}));
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
}
node = node->next;
}
@ -339,29 +329,43 @@ error_code RdbSerializer::SaveHSetObject(const robj* obj) {
RETURN_ON_ERR(SaveString(string_view{key, sdslen(key)}));
RETURN_ON_ERR(SaveString(string_view{value, sdslen(value)}));
}
} else if (obj->encoding == OBJ_ENCODING_LISTPACK) {
// convert to ziplist first.
uint8_t* lp = (uint8_t*)obj->ptr;
} else {
CHECK_EQ(unsigned(OBJ_ENCODING_LISTPACK), obj->encoding);
uint8_t* lp = (uint8_t*)obj->ptr;
size_t lplen = lpLength(lp);
CHECK(lplen > 0 && lplen % 2 == 0); // has (key,value) pairs.
uint8_t* lpfield = lpFirst(lp);
uint8_t* zl = ziplistNew();
int64_t entry_len;
uint8_t* entry;
uint8_t buf[32];
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
}
while (lpfield) {
entry = lpGet(lpfield, &entry_len, buf);
zl = ziplistPush(zl, entry, entry_len, ZIPLIST_TAIL);
lpfield = lpNext(lp, lpfield);
return error_code{};
}
error_code RdbSerializer::SaveZSetObject(const robj* obj) {
DCHECK_EQ(OBJ_ZSET, obj->type);
if (obj->encoding == OBJ_ENCODING_SKIPLIST) {
zset* zs = (zset*)obj->ptr;
zskiplist* zsl = zs->zsl;
RETURN_ON_ERR(SaveLen(zsl->length));
/* We save the skiplist elements from the greatest to the smallest
* (that's trivial since the elements are already ordered in the
* skiplist): this improves the load process, since the next loaded
* element will always be the smaller, so adding to the skiplist
* will always immediately stop at the head, making the insertion
* O(1) instead of O(log(N)). */
zskiplistNode* zn = zsl->tail;
while (zn != NULL) {
RETURN_ON_ERR(SaveString(string_view{zn->ele, sdslen(zn->ele)}));
RETURN_ON_ERR(SaveBinaryDouble(zn->score));
zn = zn->backward;
}
size_t ziplen = ziplistBlobLen(zl);
auto cleanup = absl::MakeCleanup([zl] { zfree(zl); });
RETURN_ON_ERR(SaveString(string_view{reinterpret_cast<char*>(zl), ziplen}));
} else {
LOG(FATAL) << "Unknown jset encoding " << obj->encoding;
CHECK_EQ(obj->encoding, unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding";
uint8_t* lp = (uint8_t*)obj->ptr;
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
}
return error_code{};
@ -383,6 +387,40 @@ error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
return WriteRaw(Bytes{buf, enclen});
}
/* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed.
* We just make sure the integer is always stored in little endian, otherwise
* the value is copied verbatim from memory to disk.
*
* Return -1 on error, the size of the serialized value on success. */
error_code RdbSerializer::SaveBinaryDouble(double val) {
static_assert(sizeof(val) == 8);
uint8_t buf[8];
memcpy(buf, &val, sizeof(buf));
memrev64ifbe(buf);
return WriteRaw(Bytes{buf, sizeof(buf)});
}
error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) {
uint8_t* lpfield = lpFirst(lp);
int64_t entry_len;
uint8_t* entry;
uint8_t buf[32];
uint8_t* zl = ziplistNew();
while (lpfield) {
entry = lpGet(lpfield, &entry_len, buf);
zl = ziplistPush(zl, entry, entry_len, ZIPLIST_TAIL);
lpfield = lpNext(lp, lpfield);
}
size_t ziplen = ziplistBlobLen(zl);
error_code ec = SaveString(string_view{reinterpret_cast<char*>(zl), ziplen});
zfree(zl);
return ec;
}
// TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {

View File

@ -50,7 +50,10 @@ class RdbSerializer {
std::error_code SaveListObject(const robj* obj);
std::error_code SaveSetObject(const robj* obj);
std::error_code SaveHSetObject(const robj* obj);
std::error_code SaveZSetObject(const robj* obj);
std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
::io::Sink* sink_ = nullptr;
std::unique_ptr<LZF_HSLOT[]> lzf_;

View File

@ -89,13 +89,18 @@ TEST_F(RdbTest, Save) {
FLAGS_list_max_listpack_size = 1; // limit listpack to a single element.
Run({"set", "string_key", "val"});
Run({"set", "large_key", string(511, 'L')});
Run({"set", "huge_key", string((1 << 17) - 10, 'H')});
Run({"sadd", "set_key1", "val1", "val2"});
Run({"sadd", "intset_key", "1", "2", "3"});
Run({"hset", "small_hset", "field1", "val1", "field2", "val2"});
Run({"hset", "large_hset", "field1", string(510, 'V'), string(120, 'F'), "val2"});
Run({"rpush", "list_key1", "val", "val2"});
Run({"rpush", "list_key2", "head", string(512, 'a'), string(512, 'b'), "tail"});
Run({"rpush", "list_key2", "head", string(511, 'a'), string(500, 'b'), "tail"});
Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"});
Run({"save"});
}