feat(streams): support LASTID option for XCLAIM command. (#1968)

fixes #1898

here's the spec of last_id option from Redis

 *  * 6. LASTID <id>:
 *      Update the consumer group last ID with the specified ID if the
 *      current last ID is smaller than the provided one.
 *      This is used for replication / AOF, so that when we read from a
 *      consumer group, the XCLAIM that gets propagated to give ownership
 *      to the consumer, is also used in order to update the group current
 *      ID.
This commit is contained in:
Yue Li 2023-10-03 12:50:41 -07:00 committed by GitHub
parent 8b7a43d214
commit 06dc497bf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 1 deletions

View File

@ -944,6 +944,7 @@ OpResult<pair<stream*, streamCG*>> FindGroup(const OpArgs& op_args, string_view
constexpr uint8_t kClaimForce = 1 << 0; constexpr uint8_t kClaimForce = 1 << 0;
constexpr uint8_t kClaimJustID = 1 << 1; constexpr uint8_t kClaimJustID = 1 << 1;
constexpr uint8_t kClaimLastID = 1 << 2;
struct ClaimOpts { struct ClaimOpts {
string_view group; string_view group;
@ -952,6 +953,7 @@ struct ClaimOpts {
int64 delivery_time = -1; int64 delivery_time = -1;
int retry = -1; int retry = -1;
uint8_t flags = 0; uint8_t flags = 0;
streamID last_id;
}; };
struct ClaimInfo { struct ClaimInfo {
@ -1005,6 +1007,13 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
ClaimInfo result; ClaimInfo result;
result.justid = (opts.flags & kClaimJustID); result.justid = (opts.flags & kClaimJustID);
streamID last_id = opts.last_id;
if (opts.flags & kClaimLastID) {
if (streamCompareID(&last_id, &scg->last_id) > 0) {
scg->last_id = last_id;
}
}
for (streamID id : ids) { for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf; std::array<uint8_t, sizeof(streamID)> buf;
StreamEncodeID(buf.begin(), &id); StreamEncodeID(buf.begin(), &id);
@ -1753,8 +1762,14 @@ void ParseXclaimOptions(CmdArgList& args, ClaimOpts& opts, ConnectionContext* cn
} }
continue; continue;
} else if (arg == "LASTID") { } else if (arg == "LASTID") {
opts.flags |= kClaimLastID;
arg = ArgS(args, ++i); arg = ArgS(args, ++i);
// TODO: implement lastID ParsedStreamId parsed_id;
if (ParseID(arg, true, 0, &parsed_id)) {
opts.last_id = parsed_id.val;
} else {
return (*cntx)->SendError(kInvalidStreamId, kSyntaxErrType);
}
continue; continue;
} }
} }

View File

@ -510,6 +510,16 @@ TEST_F(StreamFamilyTest, Xclaim) {
EXPECT_THAT(resp.GetString(), "1-6"); EXPECT_THAT(resp.GetString(), "1-6");
resp = Run({"xpending", "foo", "group", "1-6", "1-6", "1"}); resp = Run({"xpending", "foo", "group", "1-6", "1-6", "1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("1-6", "bob", ArgType(RespExpr::INT64), IntArg(5))); EXPECT_THAT(resp.GetVec(), ElementsAre("1-6", "bob", ArgType(RespExpr::INT64), IntArg(5)));
// test LASTID
Run({"xreadgroup", "group", "group", "bob", "count", "2", "streams", "foo", ">"});
Run({"xclaim", "foo", "group", "alice", "0", "1-6", "LASTID", "1-4"});
resp = Run({"xinfo", "groups", "foo"});
EXPECT_EQ(resp.GetVec()[7], "1-6");
Run({"xclaim", "foo", "group", "bob", "0", "1-6", "LASTID", "1-9"});
resp = Run({"xinfo", "groups", "foo"});
EXPECT_EQ(resp.GetVec()[7], "1-9");
} }
TEST_F(StreamFamilyTest, XTrim) { TEST_F(StreamFamilyTest, XTrim) {