feat: implement GetRange/GetLexRange for bptree. (#1715)

Also, implement GetLexCount.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-08-19 17:22:19 +03:00 committed by GitHub
parent f4e1e1e1b3
commit 98c6aac4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 248 additions and 9 deletions

View File

@ -69,10 +69,10 @@ template <typename T, typename Policy = BPTreePolicy<T>> class BPTree {
return root_;
}
KeyT FromRank(uint32_t rank) const {
BPTreePath FromRank(uint32_t rank) const {
BPTreePath path;
ToRank(rank, &path);
return path.Terminal();
return path;
}
/// @brief Iterates over all items in the range [rank_start, rank_end] by rank.

View File

@ -531,14 +531,134 @@ optional<unsigned> SortedMap::DfImpl::GetRank(sds ele, bool reverse) const {
SortedMap::ScoredArray SortedMap::DfImpl::GetRange(const zrangespec& range, unsigned offset,
unsigned limit, bool reverse) const {
LOG(FATAL) << "TBD";
return {};
ScoredArray arr;
if (score_tree->Size() <= offset || limit == 0)
return arr;
char buf[16];
if (reverse) {
ScoreSds key = BuilScoredKey(range.max, !range.maxex, buf);
auto path = score_tree->LEQ(key);
if (path.Empty())
return arr;
if (range.maxex && range.max == GetObjScore(path.Terminal())) {
++offset;
}
DCHECK_LE(GetObjScore(path.Terminal()), range.max);
while (offset--) {
if (!path.Prev())
return arr;
}
while (limit--) {
ScoreSds ele = path.Terminal();
double score = GetObjScore(ele);
if (range.min > score || (range.min == score && range.minex))
break;
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele));
if (!path.Prev())
break;
}
} else {
ScoreSds key = BuilScoredKey(range.min, range.minex, buf);
auto path = score_tree->GEQ(key);
if (path.Empty())
return arr;
while (offset--) {
if (!path.Next())
return arr;
}
while (limit--) {
ScoreSds ele = path.Terminal();
double score = GetObjScore(ele);
if (range.max < score || (range.max == score && range.maxex))
break;
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele));
if (!path.Next())
break;
}
}
return arr;
}
SortedMap::ScoredArray SortedMap::DfImpl::GetLexRange(const zlexrangespec& range, unsigned offset,
unsigned limit, bool reverse) const {
LOG(FATAL) << "TBD";
return {};
if (score_tree->Size() <= offset || limit == 0)
return {};
detail::BPTreePath<ScoreSds> path;
ScoredArray arr;
if (reverse) {
if (range.max != cmaxstring) {
ScoreSds range_key = (ScoreSds)(uint64_t(range.max) | kIgnoreDoubleTag);
path = score_tree->LEQ(range_key);
if (path.Empty())
return {};
if (range.maxex && sdscmp((sds)path.Terminal(), range.max) == 0) {
++offset;
}
while (offset--) {
if (!path.Prev())
return {};
}
} else {
path = score_tree->FromRank(score_tree->Size() - offset - 1);
}
while (limit--) {
ScoreSds ele = path.Terminal();
if (range.min != cminstring) {
int cmp = sdscmp((sds)ele, range.min);
if (cmp < 0 || (cmp == 0 && range.minex))
break;
}
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele));
if (!path.Prev())
break;
}
} else {
if (range.min != cminstring) {
ScoreSds range_key = (ScoreSds)(uint64_t(range.min) | kIgnoreDoubleTag);
path = score_tree->GEQ(range_key);
if (path.Empty())
return {};
if (range.minex && sdscmp((sds)path.Terminal(), range.min) == 0) {
++offset;
}
while (offset--) {
if (!path.Next())
return {};
}
} else {
path = score_tree->FromRank(offset);
}
while (limit--) {
ScoreSds ele = path.Terminal();
if (range.max != cmaxstring) {
int cmp = sdscmp((sds)ele, range.max);
if (cmp > 0 || (cmp == 0 && range.maxex))
break;
}
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele));
if (!path.Next())
break;
}
}
return arr;
}
uint8_t* SortedMap::DfImpl::ToListPack() const {
@ -660,8 +780,45 @@ size_t SortedMap::DfImpl::Count(const zrangespec& range) const {
}
size_t SortedMap::DfImpl::LexCount(const zlexrangespec& range) const {
LOG(FATAL) << "TBD";
return 0;
if (score_tree->Size() == 0)
return 0;
uint32_t min_rank = 0;
detail::BPTreePath<ScoreSds> path;
if (range.min != cminstring) {
ScoreSds range_key = (ScoreSds)(uint64_t(range.min) | kIgnoreDoubleTag);
path = score_tree->GEQ(range_key);
if (path.Empty())
return 0;
min_rank = path.Rank();
if (range.minex && sdscmp((sds)path.Terminal(), range.min) == 0) {
++min_rank;
if (min_rank >= score_tree->Size())
return 0;
}
}
uint32_t max_rank = score_tree->Size() - 1;
if (range.max != cmaxstring) {
ScoreSds range_key = (ScoreSds)(uint64_t(range.max) | kIgnoreDoubleTag);
path = score_tree->GEQ(range_key);
if (!path.Empty()) {
max_rank = path.Rank();
// fix the max rank, if needed.
int cmp = sdscmp((sds)path.Terminal(), range.max);
DCHECK_GE(cmp, 0);
if (cmp > 0 || range.maxex) {
if (max_rank <= min_rank)
return 0;
--max_rank;
}
}
}
return max_rank < min_rank ? 0 : max_rank - min_rank + 1;
}
bool SortedMap::DfImpl::Iterate(unsigned start_rank, unsigned len, bool reverse,

View File

@ -21,7 +21,6 @@ using testing::Pair;
using testing::StrEq;
namespace dfly {
using detail::SortedMap;
class SortedMapTest : public ::testing::Test {
@ -110,6 +109,58 @@ TEST_F(SortedMapTest, InsertPop) {
Pair(StrEq("a97"), 1000)));
}
TEST_F(SortedMapTest, LexRanges) {
SortedMap sm(&mr_);
for (unsigned i = 0; i < 100; ++i) {
sds s = sdsempty();
s = sdscatfmt(s, "a%u", i);
ASSERT_TRUE(sm.Insert(1, s));
}
zlexrangespec range;
range.max = sdsnew("a96");
range.min = sdsnew("a93");
range.maxex = 0;
range.minex = 0;
EXPECT_EQ(4, sm.LexCount(range));
auto array = sm.GetLexRange(range, 1, 1000, false);
ASSERT_EQ(3, array.size());
EXPECT_THAT(array.front(), Pair("a94", 1));
range.maxex = 1;
EXPECT_EQ(3, sm.LexCount(range));
array = sm.GetLexRange(range, 1, 1000, true);
ASSERT_EQ(2, array.size());
EXPECT_THAT(array.front(), Pair("a94", 1));
range.minex = 1;
EXPECT_EQ(2, sm.LexCount(range));
array = sm.GetLexRange(range, 1, 1000, false);
ASSERT_EQ(1, array.size());
EXPECT_THAT(array.front(), Pair("a95", 1));
sdsfree(range.min);
range.min = range.max;
EXPECT_EQ(0, sm.LexCount(range));
range.minex = 0;
EXPECT_EQ(0, sm.LexCount(range));
sdsfree(range.max);
range.maxex = 0;
range.min = cminstring;
range.max = sdsnew("a");
EXPECT_EQ(0, sm.LexCount(range));
sdsfree(range.max);
range.max = sdsnew("a0");
EXPECT_EQ(1, sm.LexCount(range));
range.maxex = 1;
EXPECT_EQ(0, sm.LexCount(range));
sdsfree(range.max);
}
TEST_F(SortedMapTest, ScoreRanges) {
SortedMap sm(&mr_);
@ -133,17 +184,29 @@ TEST_F(SortedMapTest, ScoreRanges) {
range.maxex = 0;
range.minex = 0;
EXPECT_EQ(20, sm.Count(range));
detail::SortedMap::ScoredArray array = sm.GetRange(range, 0, 1000, false);
ASSERT_EQ(20, array.size());
EXPECT_THAT(array.front(), Pair("a0", 1));
EXPECT_THAT(array.back(), Pair("b9", 2));
range.minex = 1; // exclude all the "1" scores.
EXPECT_EQ(10, sm.Count(range));
array = sm.GetRange(range, 2, 1, false);
ASSERT_EQ(1, array.size());
EXPECT_THAT(array.front(), Pair("b2", 2));
range.max = 1;
range.minex = 0;
range.min = -HUGE_VAL;
EXPECT_EQ(10, sm.Count(range));
array = sm.GetRange(range, 2, 2, true);
ASSERT_EQ(2, array.size());
EXPECT_THAT(array.back(), Pair("a6", 1));
range.maxex = 1;
EXPECT_EQ(0, sm.Count(range));
array = sm.GetRange(range, 0, 2, true);
ASSERT_EQ(0, array.size());
}
} // namespace dfly

View File

@ -106,6 +106,15 @@ TEST_F(ZSetFamilyTest, ZRangeRank) {
EXPECT_THAT(Run({"zrank", "y", "c"}), ArgType(RespExpr::NIL));
}
TEST_F(ZSetFamilyTest, LargeSet) {
for (int i = 0; i < 129; ++i) {
auto resp = Run({"zadd", "key", absl::StrCat(i), absl::StrCat("element:", i)});
EXPECT_THAT(resp, IntArg(1)) << i;
}
EXPECT_THAT(Run({"zrangebyscore", "key", "(-inf", "(0.0"}), ArrLen(0));
EXPECT_THAT(Run({"zrangebylex", "key", "-", "(element:0"}), ArrLen(0));
}
TEST_F(ZSetFamilyTest, ZRemRangeRank) {
Run({"zadd", "x", "1.1", "a", "2.1", "b"});
EXPECT_THAT(Run({"ZREMRANGEBYRANK", "y", "0", "1"}), IntArg(0));
@ -645,6 +654,16 @@ TEST_F(ZSetFamilyTest, ZDiff) {
EXPECT_THAT(resp.GetVec(), ElementsAre("two", "2", "three", "3", "four", "4"));
}
TEST_F(ZSetFamilyTest, Count) {
for (int i = 0; i < 129; ++i) {
auto resp = Run({"zadd", "key", absl::StrCat(i), absl::StrCat("element:", i)});
EXPECT_THAT(resp, IntArg(1)) << i;
}
EXPECT_THAT(CheckedInt({"zcount", "key", "-inf", "+inf"}), 129);
EXPECT_THAT(CheckedInt({"zlexcount", "key", "-", "+"}), 129);
}
TEST_F(ZSetFamilyTest, GeoAdd) {
EXPECT_EQ(2, CheckedInt({"geoadd", "Sicily", "13.361389", "38.115556", "Palermo", "15.087269",
"37.502669", "Catania"}));