feat(stream): add support for xclaim command (#1372)

* feat(stream): add support for xclaim command

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* add retrycount test

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

---------

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>
This commit is contained in:
Abhradeep Chakraborty 2023-09-07 17:05:28 +05:30 committed by GitHub
parent 30b8bcad4e
commit 16c9502d02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 329 additions and 0 deletions

View File

@ -159,6 +159,7 @@ streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
int streamEntryExists(stream *s, streamID *id);
void streamFreeNACK(streamNACK *na);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);

View File

@ -920,6 +920,143 @@ OpResult<pair<stream*, streamCG*>> FindGroup(const OpArgs& op_args, string_view
return res;
}
constexpr uint8_t kClaimForce = 1 << 0;
constexpr uint8_t kClaimJustID = 1 << 1;
struct ClaimOpts {
string_view group;
string_view consumer;
int64 min_idle_time;
int64 delivery_time = -1;
int retry = -1;
uint8_t flags = 0;
};
struct ClaimInfo {
bool justid = false;
vector<streamID> ids;
RecordVec records;
};
void AppendClaimResultItem(ClaimInfo& result, stream* s, streamID id) {
int64_t numfields;
if (result.justid) {
result.ids.push_back(id);
return;
}
streamIterator it;
streamID cid;
streamIteratorStart(&it, s, &id, &id, 0);
while (streamIteratorGetID(&it, &cid, &numfields)) {
Record rec;
rec.id = cid;
rec.kv_arr.reserve(numfields);
/* Emit the field-value pairs. */
while (numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
streamIteratorGetField(&it, &key, &value, &key_len, &value_len);
string skey(reinterpret_cast<char*>(key), key_len);
string sval(reinterpret_cast<char*>(value), value_len);
rec.kv_arr.emplace_back(std::move(skey), std::move(sval));
}
result.records.push_back(std::move(rec));
}
streamIteratorStop(&it);
}
// XCLAIM key group consumer min-idle-time id
OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts,
absl::Span<streamID> ids) {
OpResult<pair<stream*, streamCG*>> cgr_res = FindGroup(op_args, key, opts.group);
if (!cgr_res)
return cgr_res.status();
stream* s = cgr_res->first;
streamCG* scg = cgr_res->second;
if (!scg) {
return OpStatus::SKIPPED;
}
streamConsumer* consumer = nullptr;
auto now = GetCurrentTimeMs();
ClaimInfo result;
result.justid = (opts.flags & kClaimJustID);
for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf;
StreamEncodeID(buf.begin(), &id);
streamNACK* nack = (streamNACK*)raxFind(scg->pel, buf.begin(), sizeof(buf));
if (!streamEntryExists(s, &id)) {
if (nack != raxNotFound) {
/* Release the NACK */
raxRemove(scg->pel, buf.begin(), sizeof(buf), nullptr);
raxRemove(nack->consumer->pel, buf.begin(), sizeof(buf), nullptr);
streamFreeNACK(nack);
}
continue;
}
// We didn't find a nack but the FORCE option is given.
// Create the NACK forcefully.
if ((opts.flags & kClaimForce) && nack == raxNotFound) {
/* Create the NACK. */
nack = streamCreateNACK(nullptr);
raxInsert(scg->pel, buf.begin(), sizeof(buf), nack, nullptr);
}
// We found the nack, continue.
if (nack != raxNotFound) {
// First check if the entry id exceeds the `min_idle_time`.
if (nack->consumer && opts.min_idle_time) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < opts.min_idle_time) {
continue;
}
}
// Try to get the consumer. If not found, create a new one.
op_args.shard->tmp_str1 =
sdscpylen(op_args.shard->tmp_str1, opts.consumer.data(), opts.consumer.size());
if ((consumer = streamLookupConsumer(scg, op_args.shard->tmp_str1, SLC_NO_REFRESH)) ==
nullptr) {
consumer = streamCreateConsumer(scg, op_args.shard->tmp_str1, nullptr, 0,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
// If the entry belongs to the same consumer, we don't have to
// do anything. Else remove the entry from the old consumer.
if (nack->consumer != consumer) {
/* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the
* NACK above because of the FORCE option. */
if (nack->consumer) {
raxRemove(nack->consumer->pel, buf.begin(), sizeof(buf), nullptr);
}
}
// Set the delivery time for the entry.
nack->delivery_time = opts.delivery_time;
/* Set the delivery attempts counter if given, otherwise
* autoincrement unless JUSTID option provided */
if (opts.retry >= 0) {
nack->delivery_count = opts.retry;
} else if (!(opts.flags & kClaimJustID)) {
nack->delivery_count++;
}
if (nack->consumer != consumer) {
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr);
nack->consumer = consumer;
}
/* Send the reply for this entry. */
AppendClaimResultItem(result, s, id);
}
}
return result;
}
// XGROUP DESTROY key groupname
OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) {
OpResult<pair<stream*, streamCG*>> cgr_res = FindGroup(op_args, key, gname);
@ -1518,6 +1655,119 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(add_result.status());
}
absl::InlinedVector<streamID, 8> GetXclaimIds(CmdArgList& args) {
size_t i;
absl::InlinedVector<streamID, 8> ids;
for (i = 0; i < args.size(); ++i) {
ParsedStreamId parsed_id;
string_view str_id = ArgS(args, i);
if (!ParseID(str_id, true, 0, &parsed_id)) {
if (i > 0) {
break;
}
return ids;
}
ids.push_back(parsed_id.val);
}
args.remove_prefix(i);
return ids;
}
void ParseXclaimOptions(CmdArgList& args, ClaimOpts& opts, ConnectionContext* cntx) {
for (size_t i = 0; i < args.size(); ++i) {
ToUpper(&args[i]);
string_view arg = ArgS(args, i);
bool remaining_args = args.size() - i - 1 > 0;
if (remaining_args) {
if (arg == "IDLE") {
arg = ArgS(args, ++i);
if (!absl::SimpleAtoi(arg, &opts.delivery_time)) {
return (*cntx)->SendError(kInvalidIntErr);
}
continue;
} else if (arg == "TIME") {
arg = ArgS(args, ++i);
if (!absl::SimpleAtoi(arg, &opts.delivery_time)) {
return (*cntx)->SendError(kInvalidIntErr);
}
continue;
} else if (arg == "RETRYCOUNT") {
arg = ArgS(args, ++i);
if (!absl::SimpleAtoi(arg, &opts.retry)) {
return (*cntx)->SendError(kInvalidIntErr);
}
continue;
} else if (arg == "LASTID") {
arg = ArgS(args, ++i);
// TODO: implement lastID
continue;
}
}
if (arg == "FORCE") {
opts.flags |= kClaimForce;
} else if (arg == "JUSTID") {
opts.flags |= kClaimJustID;
} else {
return (*cntx)->SendError("Unknown argument given for XCLAIM command", kSyntaxErr);
}
}
}
void StreamFamily::XClaim(CmdArgList args, ConnectionContext* cntx) {
ClaimOpts opts;
string_view key = ArgS(args, 0);
opts.group = ArgS(args, 1);
opts.consumer = ArgS(args, 2);
if (!absl::SimpleAtoi(ArgS(args, 3), &opts.min_idle_time)) {
return (*cntx)->SendError(kSyntaxErr);
}
// Ignore negative min-idle-time
opts.min_idle_time = std::max(opts.min_idle_time, static_cast<int64>(0));
args.remove_prefix(4);
auto ids = GetXclaimIds(args);
if (ids.empty()) {
// No ids given.
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
}
// parse the options
ParseXclaimOptions(args, opts, cntx);
if (auto now = GetCurrentTimeMs();
opts.delivery_time < 0 || static_cast<uint64_t>(opts.delivery_time) > now)
opts.delivery_time = now;
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpClaim(t->GetOpArgs(shard), key, opts, absl::Span{ids.data(), ids.size()});
};
OpResult<ClaimInfo> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (!result) {
(*cntx)->SendError(result.status());
return;
}
ClaimInfo cresult = result.value();
if (cresult.justid) {
(*cntx)->StartArray(cresult.ids.size());
for (auto id : cresult.ids) {
(*cntx)->SendBulkString(StreamIdRepr(id));
}
} else {
const RecordVec& crec = cresult.records;
(*cntx)->StartArray(crec.size());
for (const auto& item : crec) {
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(StreamIdRepr(item.id));
(*cntx)->StartArray(item.kv_arr.size() * 2);
for (const auto& [k, v] : item.kv_arr) {
(*cntx)->SendBulkString(k);
(*cntx)->SendBulkString(v);
}
}
}
}
void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
@ -2274,6 +2524,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext
namespace acl {
constexpr uint32_t kXAdd = WRITE | STREAM | FAST;
constexpr uint32_t kXClaim = WRITE | FAST;
constexpr uint32_t kXDel = WRITE | STREAM | FAST;
constexpr uint32_t kXGroup = SLOW;
constexpr uint32_t kXInfo = SLOW;
@ -2293,6 +2544,7 @@ void StreamFamily::Register(CommandRegistry* registry) {
*registry
<< CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, 1, acl::kXAdd}.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, 1, acl::kXDel}.HFUNC(XDel)
<< CI{"XGROUP", CO::WRITE | CO::DENYOOM, -3, 2, 2, 1, acl::kXGroup}.HFUNC(XGroup)
<< CI{"XINFO", CO::READONLY | CO::NOSCRIPT, -2, 0, 0, 0, acl::kXInfo}.HFUNC(XInfo)

View File

@ -17,6 +17,7 @@ class StreamFamily {
private:
static void XAdd(CmdArgList args, ConnectionContext* cntx);
static void XClaim(CmdArgList args, ConnectionContext* cntx);
static void XDel(CmdArgList args, ConnectionContext* cntx);
static void XGroup(CmdArgList args, ConnectionContext* cntx);
static void XInfo(CmdArgList args, ConnectionContext* cntx);

View File

@ -416,6 +416,81 @@ TEST_F(StreamFamilyTest, XGroupConsumer) {
EXPECT_THAT(resp, ErrArg("NOGROUP"));
}
TEST_F(StreamFamilyTest, Xclaim) {
Run({"xadd", "foo", "1-0", "k1", "v1"});
Run({"xadd", "foo", "1-1", "k2", "v2"});
Run({"xadd", "foo", "1-2", "k3", "v3"});
Run({"xadd", "foo", "1-3", "k4", "v4"});
// create a group for foo stream
Run({"xgroup", "create", "foo", "group", "0"});
// alice consume all the stream entries
Run({"xreadgroup", "group", "group", "alice", "streams", "foo", ">"});
// bob claims alice's two pending stream entries
auto resp = Run({"xclaim", "foo", "group", "bob", "0", "1-2", "1-3"});
EXPECT_THAT(resp, RespArray(ElementsAre(
RespArray(ElementsAre("1-2", RespArray(ElementsAre("k3", "v3")))),
RespArray(ElementsAre("1-3", RespArray(ElementsAre("k4", "v4")))))));
// bob really have these claimed entries
resp = Run({"xreadgroup", "group", "group", "bob", "streams", "foo", "0"});
EXPECT_THAT(resp,
RespArray(ElementsAre(
"foo", RespArray(ElementsAre(
RespArray(ElementsAre("1-2", RespArray(ElementsAre("k3", "v3")))),
RespArray(ElementsAre("1-3", RespArray(ElementsAre("k4", "v4")))))))));
// alice no longer have those entries
resp = Run({"xreadgroup", "group", "group", "alice", "streams", "foo", "0"});
EXPECT_THAT(resp,
RespArray(ElementsAre(
"foo", RespArray(ElementsAre(
RespArray(ElementsAre("1-0", RespArray(ElementsAre("k1", "v1")))),
RespArray(ElementsAre("1-1", RespArray(ElementsAre("k2", "v2")))))))));
// xclaim ensures that entries before the min-idle-time are not claimed by bob
resp = Run({"xclaim", "foo", "group", "bob", "3600000", "1-0"});
EXPECT_THAT(resp, ArrLen(0));
resp = Run({"xreadgroup", "group", "group", "alice", "streams", "foo", "0"});
EXPECT_THAT(resp,
RespArray(ElementsAre(
"foo", RespArray(ElementsAre(
RespArray(ElementsAre("1-0", RespArray(ElementsAre("k1", "v1")))),
RespArray(ElementsAre("1-1", RespArray(ElementsAre("k2", "v2")))))))));
Run({"xadd", "foo", "1-4", "k5", "v5"});
Run({"xreadgroup", "group", "group", "alice", "streams", "foo", ">"});
// xclaim returns only claimed ids when justid is set
resp = Run({"xclaim", "foo", "group", "bob", "0", "1-0", "1-4", "justid"});
EXPECT_THAT(resp.GetVec(), ElementsAre("1-0", "1-4"));
Run({"xadd", "foo", "1-5", "k6", "v6"});
// bob should claim the id forcefully even if it is not yet present in group pel
resp = Run({"xclaim", "foo", "group", "bob", "0", "1-5", "force", "justid"});
EXPECT_THAT(resp.GetString(), "1-5");
resp = Run({"xreadgroup", "group", "group", "bob", "streams", "foo", "0"});
EXPECT_THAT(resp.GetVec()[1].GetVec()[4].GetVec(),
ElementsAre("1-5", RespArray(ElementsAre("k6", "v6"))));
TEST_current_time_ms += 2000;
resp = Run({"xclaim", "foo", "group", "alice", "0", "1-4", "TIME",
absl::StrCat(TEST_current_time_ms - 500), "justid"});
EXPECT_THAT(resp.GetString(), "1-4");
// min idle time is exceeded for this entry
resp = Run({"xclaim", "foo", "group", "bob", "600", "1-4"});
EXPECT_THAT(resp, ArrLen(0));
resp = Run({"xclaim", "foo", "group", "bob", "400", "1-4", "justid"});
EXPECT_THAT(resp.GetString(), "1-4");
// test RETRYCOUNT
Run({"xadd", "foo", "1-6", "k7", "v7"});
resp = Run({"xclaim", "foo", "group", "bob", "0", "1-6", "force", "justid", "retrycount", "5"});
EXPECT_THAT(resp.GetString(), "1-6");
resp = Run({"xpending", "foo", "group", "1-6", "1-6", "1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("1-6", "bob", ArgType(RespExpr::INT64), IntArg(5)));
}
TEST_F(StreamFamilyTest, XTrim) {
Run({"xadd", "foo", "1-*", "k", "v"});
Run({"xadd", "foo", "1-*", "k", "v"});