feat(stream): implement rdb save support for streams

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-06-18 20:08:09 +03:00
parent 2d9370c6b2
commit da3ae760d5
11 changed files with 196 additions and 26 deletions

View File

@ -33,7 +33,7 @@ uint32_t CommandId::OptCount(uint32_t mask) {
}
CommandRegistry::CommandRegistry() {
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING | CO::NOSCRIPT, -1, 0, 0, 0);
CommandId cd("COMMAND", CO::LOADING | CO::NOSCRIPT, -1, 0, 0, 0);
cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); });
@ -101,8 +101,6 @@ const char* OptName(CO::CommandOpt fl) {
return "fast";
case LOADING:
return "loading";
case RANDOM:
return "random";
case ADMIN:
return "admin";
case NOSCRIPT:

View File

@ -25,14 +25,14 @@ enum CommandOpt : uint32_t {
LOADING = 8,
DENYOOM = 0x10, // use-memory in redis.
REVERSE_MAPPING = 0x20,
RANDOM = 0x40,
// arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
VARIADIC_KEYS = 0x40,
ADMIN = 0x80, // implies NOSCRIPT,
NOSCRIPT = 0x100,
BLOCKING = 0x200, // implies REVERSE_MAPPING
GLOBAL_TRANS = 0x1000,
// arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
VARIADIC_KEYS = 0x2000,
};
const char* OptName(CommandOpt fl);

View File

@ -740,8 +740,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
<< CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan)
<< CI{"TTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Ttl)
<< CI{"PTTL", CO::READONLY | CO::FAST | CO::RANDOM, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl)
<< CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TYPE", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del);
}

View File

@ -945,7 +945,7 @@ void HSetFamily::Register(CommandRegistry* registry) {
// TODO: add options support
<< CI{"HRANDFIELD", CO::READONLY, 2, 1, 1, 1}.HFUNC(HRandField)
<< CI{"HSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(HScan)
<< CI{"HSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(HScan)
<< CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
<< CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx)
<< CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen)

View File

@ -12,6 +12,7 @@ extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/rdb.h"
#include "redis/stream.h"
#include "redis/util.h"
#include "redis/ziplist.h"
#include "redis/zmalloc.h"
@ -236,6 +237,10 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
return SaveZSetObject(pv.AsRObj());
}
if (obj_type == OBJ_STREAM) {
return SaveStreamObject(pv.AsRObj());
}
LOG(ERROR) << "Not implemented " << obj_type;
return make_error_code(errc::function_not_supported);
}
@ -375,6 +380,83 @@ error_code RdbSerializer::SaveZSetObject(const robj* obj) {
return error_code{};
}
error_code RdbSerializer::SaveStreamObject(const robj* obj) {
/* Store how many listpacks we have inside the radix tree. */
stream* s = (stream*)obj->ptr;
rax* rax = s->rax_tree;
RETURN_ON_ERR(SaveLen(raxSize(rax)));
/* Serialize all the listpacks inside the radix tree as they are,
* when loading back, we'll use the first entry of each listpack
* to insert it back into the radix tree. */
raxIterator ri;
raxStart(&ri, rax);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
uint8_t* lp = (uint8_t*)ri.data;
size_t lp_bytes = lpBytes(lp);
error_code ec = SaveString((uint8_t*)ri.key, ri.key_len);
if (ec) {
raxStop(&ri);
return ec;
}
ec = SaveString(lp, lp_bytes);
if (ec) {
raxStop(&ri);
return ec;
}
}
raxStop(&ri);
/* Save the number of elements inside the stream. We cannot obtain
* this easily later, since our macro nodes should be checked for
* number of items: not a great CPU / space tradeoff. */
RETURN_ON_ERR(SaveLen(s->length));
/* Save the last entry ID. */
RETURN_ON_ERR(SaveLen(s->last_id.ms));
RETURN_ON_ERR(SaveLen(s->last_id.seq));
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */
/* Save the number of groups. */
size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
RETURN_ON_ERR(SaveLen(num_cgroups));
if (num_cgroups) {
/* Serialize each consumer group. */
raxStart(&ri, s->cgroups);
raxSeek(&ri, "^", NULL, 0);
auto cleanup = absl::MakeCleanup([&] { raxStop(&ri); });
while (raxNext(&ri)) {
streamCG* cg = (streamCG*)ri.data;
/* Save the group name. */
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));
/* Last ID. */
RETURN_ON_ERR(SaveLen(s->last_id.ms));
RETURN_ON_ERR(SaveLen(s->last_id.seq));
/* Save the global PEL. */
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));
/* Save the consumers of this group. */
RETURN_ON_ERR(SaveStreamConsumers(cg));
}
}
return error_code{};
}
/* Save a long long value as either an encoded string or a string. */
error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
uint8_t buf[32];
@ -424,6 +506,71 @@ error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) {
return ec;
}
error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) {
/* Number of entries in the PEL. */
RETURN_ON_ERR(SaveLen(raxSize(pel)));
/* Save each entry. */
raxIterator ri;
raxStart(&ri, pel);
raxSeek(&ri, "^", NULL, 0);
auto cleanup = absl::MakeCleanup([&] { raxStop(&ri); });
while (raxNext(&ri)) {
/* We store IDs in raw form as 128 big big endian numbers, like
* they are inside the radix tree key. */
RETURN_ON_ERR(WriteRaw(Bytes{ri.key, sizeof(streamID)}));
if (nacks) {
streamNACK* nack = (streamNACK*)ri.data;
uint8_t buf[8];
absl::little_endian::Store64(buf, nack->delivery_time);
RETURN_ON_ERR(WriteRaw(buf));
RETURN_ON_ERR(SaveLen(nack->delivery_count));
/* We don't save the consumer name: we'll save the pending IDs
* for each consumer in the consumer PEL, and resolve the consumer
* at loading time. */
}
}
return error_code{};
}
error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
/* Number of consumers in this consumer group. */
RETURN_ON_ERR(SaveLen(raxSize(cg->consumers)));
/* Save each consumer. */
raxIterator ri;
raxStart(&ri, cg->consumers);
raxSeek(&ri, "^", NULL, 0);
auto cleanup = absl::MakeCleanup([&] { raxStop(&ri); });
uint8_t buf[8];
while (raxNext(&ri)) {
streamConsumer* consumer = (streamConsumer*)ri.data;
/* Consumer name. */
RETURN_ON_ERR(SaveString(ri.key, ri.key_len));
/* Last seen time. */
absl::little_endian::Store64(buf, consumer->seen_time);
RETURN_ON_ERR(WriteRaw(buf));
/* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
* consumer local PEL. */
RETURN_ON_ERR(SaveStreamPEL(consumer->pel, false));
}
return error_code{};
}
// TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {

View File

@ -16,6 +16,10 @@ extern "C" {
#include "server/common.h"
#include "server/table.h"
typedef struct rax rax;
typedef struct streamCG streamCG;
namespace dfly {
class EngineShard;
@ -114,9 +118,12 @@ class RdbSerializer {
std::error_code SaveSetObject(const PrimeValue& pv);
std::error_code SaveHSetObject(const robj* obj);
std::error_code SaveZSetObject(const robj* obj);
std::error_code SaveStreamObject(const robj* obj);
std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg);
::io::Sink* sink_ = nullptr;
AlignedBuffer* aligned_buf_ = nullptr;

View File

@ -115,7 +115,7 @@ TEST_F(RdbTest, LoadSmall6) {
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1)));
}
TEST_F(RdbTest, LoadStream) {
TEST_F(RdbTest, Stream) {
io::FileSource fs = GetSource("redis6_stream.rdb");
RdbLoader loader(service_->script_mgr());
@ -124,6 +124,21 @@ TEST_F(RdbTest, LoadStream) {
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
ASSERT_FALSE(ec) << ec.message();
auto resp = Run({"type", "key:10"});
EXPECT_EQ(resp, "stream");
resp = Run({"xinfo", "groups", "key:0"});
EXPECT_THAT(resp, ArrLen(2));
resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1
EXPECT_THAT(resp, ArrLen(8));
EXPECT_THAT(resp.GetVec(), ElementsAre("name", "g2", "consumers", "0", "pending", "0",
"last-delivered-id", "1655444851523-1"));
resp = Run({"xinfo", "groups", "key:2"});
EXPECT_THAT(resp, ArrLen(0));
Run({"save"});
}
TEST_F(RdbTest, Reload) {
@ -233,7 +248,7 @@ TEST_F(RdbTest, SaveManyDbs) {
Run({"select", "1"});
resp = Run({"scan", "0", "match", "ab*"});
StringVec vec = StrArray(resp.GetVec()[1]);
for (const auto& s: vec) {
for (const auto& s : vec) {
LOG(ERROR) << "Bad key: " << s;
}
}

View File

@ -1131,13 +1131,13 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"CLIENT", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Client)
<< CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"DEBUG", CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll)
<< CI{"INFO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Info)
<< CI{"HELLO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Hello)
<< CI{"LASTSAVE", CO::LOADING | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave)
<< CI{"LATENCY", CO::NOSCRIPT | CO::LOADING | CO::RANDOM | CO::FAST, -2, 0, 0, 0}.HFUNC(
<< CI{"LASTSAVE", CO::LOADING | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave)
<< CI{"LATENCY", CO::NOSCRIPT | CO::LOADING | CO::FAST, -2, 0, 0, 0}.HFUNC(
Latency)
<< CI{"MEMORY", kMemOpts, -2, 0, 0, 0}.HFUNC(Memory)
<< CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)

View File

@ -1172,10 +1172,10 @@ void SetFamily::Register(CommandRegistry* registry) {
<< CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove)
<< CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem)
<< CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard)
<< CI{"SPOP", CO::WRITE | CO::RANDOM | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SPOP", CO::WRITE | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
<< CI{"SSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(SScan);
<< CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan);
}
uint32_t SetFamily::MaxIntsetEntries() {

View File

@ -255,9 +255,9 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
return s->length;
}
OpResult<vector<GroupInfo>> OpListGroups(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM);
OpResult<vector<GroupInfo>> OpListGroups(DbIndex db_index, string_view key, EngineShard* shard) {
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(db_index, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
@ -719,13 +719,16 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
if (args.size() >= 3) {
string_view key = ArgS(args, 2);
ShardId sid = Shard(key, shard_set->size());
if (sub_cmd == "GROUPS") {
auto cb = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args{shard, t->db_index()};
return OpListGroups(op_args, key);
// We do not use transactional xemantics for xinfo since it's informational command.
auto cb = [&]() {
EngineShard* shard = EngineShard::tlocal();
return OpListGroups(cntx->db_index(), key, shard);
};
OpResult<vector<GroupInfo>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult<vector<GroupInfo>> result = shard_set->Await(sid, std::move(cb));
if (result) {
(*cntx)->StartArray(result->size());
for (const auto& ginfo : *result) {
@ -865,7 +868,7 @@ void StreamFamily::Register(CommandRegistry* registry) {
*registry << CI{"XADD", CO::WRITE | CO::FAST, -5, 1, 1, 1}.HFUNC(XAdd)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, 1}.HFUNC(XDel)
<< CI{"XGROUP", CO::WRITE | CO::DENYOOM, -2, 2, 2, 1}.HFUNC(XGroup)
<< CI{"XINFO", CO::READONLY, -2, 2, 2, 1}.HFUNC(XInfo)
<< CI{"XINFO", CO::READONLY | CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(XInfo)
<< CI{"XLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(XLen)
<< CI{"XRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRange)
<< CI{"XREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(XRevRange)

View File

@ -1895,7 +1895,7 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"ZREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRange)
<< CI{"ZREVRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByScore)
<< CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank)
<< CI{"ZSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(ZScan)
<< CI{"ZSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(ZScan)
<< CI{"ZUNIONSTORE", kUnionMask, -4, 3, 3, 1}.HFUNC(ZUnionStore);
}