feat(streams): support entries_read and lag for XINFO GROUPS (#1952)

entries_read and lag have been added to the output of XINFO GROUPS since Redis 7.0. This patch supports both for Dragonfly. This patch also fixes a bug that incorrectly sets the initial value of entries_read when a consumer group is created.

fixes #1948
This commit is contained in:
Yue Li 2023-09-28 01:35:19 -07:00 committed by GitHub
parent 5c9c9255d2
commit a2457e3410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 5 deletions

View File

@ -138,6 +138,7 @@ typedef struct {
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
#define SCG_INVALID_ENTRIES_READ -1
#define SCG_INVALID_LAG -1
#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
@ -181,5 +182,6 @@ void streamDelConsumer(streamCG *cg, streamConsumer *consumer);
void streamLastValidID(stream *s, streamID *maxid);
int streamIDEqZero(streamID *id);
int streamRangeHasTombstones(stream *s, streamID *start, streamID *end);
long long streamCGLag(stream *s, streamCG *cg);
#endif

View File

@ -1490,6 +1490,34 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
return SCG_INVALID_ENTRIES_READ;
}
long long streamCGLag(stream *s, streamCG *cg) {
int valid = 0;
long long lag = 0;
if (!s->entries_added) {
/* The lag of a newly-initialized stream is 0. */
lag = 0;
valid = 1;
} else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
/* No fragmentation ahead means that the group's logical reads counter
* is valid for performing the lag calculation. */
lag = (long long)s->entries_added - cg->entries_read;
valid = 1;
} else {
/* Attempt to retrieve the group's last ID logical read counter. */
long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
if (entries_read != SCG_INVALID_ENTRIES_READ) {
/* A valid counter was obtained. */
lag = (long long)s->entries_added - entries_read;
valid = 1;
}
}
if (valid) {
return lag;
}
return SCG_INVALID_LAG;
}
#if ROMAN_ENABLE
/* Replies with a consumer group's current lag, that is the number of messages

View File

@ -157,9 +157,9 @@ TEST_F(RdbTest, Stream) {
EXPECT_THAT(resp, ArrLen(2));
resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1
EXPECT_THAT(resp, ArrLen(8));
EXPECT_THAT(resp.GetVec(), ElementsAre("name", "g2", "consumers", IntArg(0), "pending", IntArg(0),
"last-delivered-id", "1655444851523-1"));
EXPECT_THAT(resp, RespArray(ElementsAre("name", "g2", "consumers", IntArg(0), "pending",
IntArg(0), "last-delivered-id", "1655444851523-1",
"entries-read", IntArg(0), "lag", IntArg(0))));
resp = Run({"xinfo", "groups", "key:2"});
EXPECT_THAT(resp, ArrLen(0));

View File

@ -78,6 +78,8 @@ struct GroupInfo {
size_t consumer_size;
size_t pending_size;
streamID last_id;
int64_t entries_read;
int64_t lag;
};
struct RangeOpts {
@ -864,6 +866,8 @@ OpResult<vector<GroupInfo>> OpListGroups(const DbContext& db_cntx, string_view k
ginfo.consumer_size = raxSize(cg->consumers);
ginfo.pending_size = raxSize(cg->pel);
ginfo.last_id = cg->last_id;
ginfo.entries_read = cg->entries_read;
ginfo.lag = streamCGLag(s, cg);
result.push_back(std::move(ginfo));
}
raxStop(&ri);
@ -884,6 +888,7 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts
auto* shard = op_args.shard;
auto& db_slice = shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
int64_t entries_read = SCG_INVALID_ENTRIES_READ;
if (!res_it) {
if (opts.flags & kCreateOptMkstream) {
// MKSTREAM is enabled, so create the stream
@ -913,7 +918,7 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts
}
}
streamCG* cg = streamCreateCG(s, opts.gname.data(), opts.gname.size(), &id, 0);
streamCG* cg = streamCreateCG(s, opts.gname.data(), opts.gname.size(), &id, entries_read);
if (cg) {
return OpStatus::OK;
}
@ -1915,7 +1920,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
for (const auto& ginfo : *result) {
string last_id = StreamIdRepr(ginfo.last_id);
(*cntx)->StartCollection(4, RedisReplyBuilder::MAP);
(*cntx)->StartCollection(6, RedisReplyBuilder::MAP);
(*cntx)->SendBulkString("name");
(*cntx)->SendBulkString(ginfo.name);
(*cntx)->SendBulkString("consumers");
@ -1924,6 +1929,18 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(ginfo.pending_size);
(*cntx)->SendBulkString("last-delivered-id");
(*cntx)->SendBulkString(last_id);
(*cntx)->SendBulkString("entries-read");
if (ginfo.entries_read != SCG_INVALID_ENTRIES_READ) {
(*cntx)->SendLong(ginfo.entries_read);
} else {
(*cntx)->SendNull();
}
(*cntx)->SendBulkString("lag");
if (ginfo.lag != SCG_INVALID_LAG) {
(*cntx)->SendLong(ginfo.lag);
} else {
(*cntx)->SendNull();
}
}
return;
}

View File

@ -725,4 +725,13 @@ TEST_F(StreamFamilyTest, XAck) {
resp = Run({"xreadgroup", "group", "cgroup", "consumer", "streams", "foo", "0"});
EXPECT_THAT(resp, ArrLen(0));
}
TEST_F(StreamFamilyTest, XInfo) {
Run({"xgroup", "create", "foo", "cgroup", "0", "mkstream"});
auto resp = Run({"xinfo", "groups", "foo"});
EXPECT_THAT(resp, RespArray(ElementsAre("name", "cgroup", "consumers", IntArg(0), "pending",
IntArg(0), "last-delivered-id", "0-0", "entries-read",
ArgType(RespExpr::NIL), "lag", IntArg(0))));
}
} // namespace dfly