feat: implement DeleteRange functions in DfImpl (#1716)

Remove DeleteRange function from bptree_set API and
implement them externally inside sorted_map.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-08-21 13:53:55 +03:00 committed by GitHub
parent afb39287ce
commit eb40ff8ed9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 200 additions and 232 deletions

View File

@ -87,21 +87,6 @@ template <typename T, typename Policy = BPTreePolicy<T>> class BPTree {
/// @param cb
bool IterateReverse(uint32_t rank_start, uint32_t rank_end, std::function<bool(KeyT)> cb) const;
/// @brief Deletes all items in the range [start, end] by rank.
/// @param start
/// @param end - inclusive. must be >= start.
/// @param cb - callback to be called for each deleted item.
/// @return number of deleted items.
size_t DeleteRangeByRank(uint32_t start, uint32_t end, std::function<void(KeyT)> cb);
/// @brief Deletes items in [first, last] range.
/// @param first
/// @param last
/// @param cb
/// @return
size_t DeleteRange(KeyT first, KeyT last, std::function<void(KeyT)> cb, bool inclusive_first,
bool inclusive_last);
/// @brief Returns the path to the first item in the tree that is greater or equal to key.
/// @param item
/// @return the path if such item exists, empty path otherwise.
@ -113,10 +98,13 @@ template <typename T, typename Policy = BPTreePolicy<T>> class BPTree {
/// @return the path if such item exists, empty path otherwise.
detail::BPTreePath<T> LEQ(KeyT key) const;
/// @brief Deletes the element pointed by path.
/// @param path
void Delete(BPTreePath path);
private:
BPTreeNode* CreateNode(bool leaf);
void Delete(BPTreePath path);
void DestroyNode(BPTreeNode* node);
void InsertToFullLeaf(KeyT item, const BPTreePath& path);
@ -409,7 +397,10 @@ void BPTree<T, Policy>::IncreaseSubtreeCounts(const BPTreePath& path, unsigned d
template <typename T, typename Policy>
bool BPTree<T, Policy>::Iterate(uint32_t rank_start, uint32_t rank_end,
std::function<bool(KeyT)> cb) const {
assert(rank_start <= rank_end && rank_end < count_);
if (rank_start >= Size())
return true;
assert(rank_start <= rank_end);
BPTreePath path;
ToRank(rank_start, &path);
@ -417,7 +408,8 @@ bool BPTree<T, Policy>::Iterate(uint32_t rank_start, uint32_t rank_end,
if (!cb(path.Terminal()))
return false;
path.Next();
if (!path.Next())
return true;
}
return true;
}
@ -466,74 +458,11 @@ void BPTree<T, Policy>::ToRank(uint32_t rank, BPTreePath* path) const {
path->Push(node, rank);
}
template <typename T, typename Policy>
size_t BPTree<T, Policy>::DeleteRangeByRank(uint32_t start, uint32_t end,
std::function<void(KeyT)> cb) {
assert(start <= end && end < count_);
BPTreePath path;
size_t deleted = 0;
for (uint32_t i = start; i <= end; ++i) {
/* Ideally, we would want to advance path to the next item and delete the previous one.
* However, we can not do that because the path is invalidated after the
* deletion. So we have to recreate the path for each item using the same rank.
* Note, it is probably could be improved, but it's much more complicated.
*/
ToRank(start, &path);
cb(path.Terminal());
Delete(path);
path.Clear();
++deleted;
}
return deleted;
}
template <typename T, typename Policy>
size_t BPTree<T, Policy>::DeleteRange(KeyT first, KeyT last, std::function<void(KeyT)> cb,
bool inclusive_first, bool inclusive_last) {
using Comp = typename Policy::KeyCompareTo;
if (!root_)
return 0;
BPTreePath path;
bool res = Locate(first, &path);
Comp comp;
if (res && !inclusive_first) {
if (!path.Next())
return 0;
}
if (!path.HasValidTerminal())
return 0;
// Set the low bound for deletion.
first = path.Terminal();
int last_bound = inclusive_last ? 0 : -1;
size_t deleted = 0;
while (comp(path.Terminal(), last) <= last_bound) {
cb(path.Terminal());
++deleted;
Delete(path);
if (!root_)
break;
path.Clear();
Locate(first, &path);
if (!path.HasValidTerminal())
break;
}
return deleted;
}
template <typename T, typename Policy>
detail::BPTreePath<T> BPTree<T, Policy>::GEQ(KeyT item) const {
BPTreePath path;
Locate(item, &path);
if (path.Last().second >= path.Last().first->NumItems())
if (!Locate(item, &path) && path.Last().second >= path.Last().first->NumItems())
path.Clear();
return path;

View File

@ -301,71 +301,6 @@ TEST_F(BPTreeSetTest, Ranges) {
EXPECT_TRUE(path.Empty());
}
TEST_F(BPTreeSetTest, DeleteRangeRank) {
FillTree(2);
unsigned cnt = 0;
unsigned from = 5950; //
unsigned to = 6513;
bptree_.DeleteRangeByRank(from, to, [&](uint64_t val) {
ASSERT_TRUE(Validate()) << val;
ASSERT_EQ((from + cnt) * 2, val) << from << " " << to << " " << cnt;
++cnt;
});
ASSERT_EQ(to - from + 1, cnt);
return;
for (unsigned j = 0; j < 10; ++j) {
if (bptree_.Size() == 0)
break;
cnt = 0;
from = generator_() % bptree_.Size();
to = from + generator_() % (bptree_.Size() - from);
bptree_.DeleteRangeByRank(from, to, [&](uint64_t val) {
ASSERT_EQ((from + cnt) * 2, val) << from << " " << to << " " << cnt;
++cnt;
});
ASSERT_EQ(to - from + 1, cnt);
}
}
TEST_F(BPTreeSetTest, DeleteRange) {
FillTree(2);
unsigned cnt = 0;
EXPECT_EQ(0, bptree_.DeleteRange(
14000, 14000, [&](uint64_t val) { ++cnt; }, true, true));
EXPECT_EQ(0, cnt);
EXPECT_EQ(0, bptree_.DeleteRange(
13999, 14000, [&](uint64_t val) { ++cnt; }, true, true));
EXPECT_EQ(0, cnt);
EXPECT_EQ(0, bptree_.DeleteRange(
13998, 14000, [&](uint64_t val) { ++cnt; }, false, true));
EXPECT_EQ(1, bptree_.DeleteRange(
13998, 13999, [&](uint64_t val) { ++cnt; }, true, true));
EXPECT_EQ(1, cnt);
EXPECT_EQ(2, bptree_.DeleteRange(
13993, 13997, [&](uint64_t val) { ++cnt; }, true, true));
EXPECT_EQ(3, cnt);
ASSERT_TRUE(Validate());
constexpr unsigned kMaxElem = kNumElems * 2;
for (unsigned j = 0; j < 10; ++j) {
unsigned from = generator_() % kMaxElem;
unsigned to = from + generator_() % (kMaxElem - from);
bptree_.DeleteRange(
from, to,
[&](uint64_t val) {
ASSERT_LT(val, to);
ASSERT_GT(val, from);
},
false, false);
}
}
TEST_F(BPTreeSetTest, MemoryUsage) {
zskiplist* zsl = zslCreate();
std::vector<sds> sds_vec;

View File

@ -21,6 +21,9 @@ using namespace std;
ABSL_FLAG(bool, use_zset_tree, false, "If true use b+tree for zset implementation");
extern "C" unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele,
double score);
namespace dfly {
namespace detail {
@ -41,25 +44,6 @@ size_t DictMallocSize(dict* d) {
return res + dictSize(d) * 16; // approximation.
}
unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) {
unsigned char* sptr;
char scorebuf[128];
int scorelen;
scorelen = d2string(scorebuf, sizeof(scorebuf), score);
if (eptr == NULL) {
zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele));
zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr);
/* Insert score after the member. */
zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL);
}
return zl;
}
inline zskiplistNode* Next(bool reverse, zskiplistNode* ln) {
return reverse ? ln->backward : ln->level[0].forward;
}
@ -82,7 +66,7 @@ void SetObjScore(void* obj, double score) {
// buf must be at least 10 chars long.
// Builds a tagged key that can be used for querying open/closed bounds.
void* BuilScoredKey(double score, bool is_str_inf, char buf[]) {
void* BuildScoredKey(double score, bool is_str_inf, char buf[]) {
buf[0] = SDS_TYPE_5; // length 0.
buf[1] = 0;
absl::little_endian::Store64(buf + 2, absl::bit_cast<uint64_t>(score));
@ -314,12 +298,14 @@ SortedMap::ScoredArray SortedMap::RdImpl::PopTopScores(unsigned count, bool reve
ScoredArray result;
while (ln && count--) {
result.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score);
sds ele = ln->ele;
result.emplace_back(string{ele, sdslen(ele)}, ln->score);
// Switch to next before deleting the element.
ln = Next(reverse, ln);
/* we can delete the element now */
CHECK(Delete(ln->ele));
ln = Next(reverse, ln);
CHECK(Delete(ele));
}
return result;
}
@ -537,7 +523,7 @@ SortedMap::ScoredArray SortedMap::DfImpl::GetRange(const zrangespec& range, unsi
char buf[16];
if (reverse) {
ScoreSds key = BuilScoredKey(range.max, !range.maxex, buf);
ScoreSds key = BuildScoredKey(range.max, !range.maxex, buf);
auto path = score_tree->LEQ(key);
if (path.Empty())
return arr;
@ -563,7 +549,7 @@ SortedMap::ScoredArray SortedMap::DfImpl::GetRange(const zrangespec& range, unsi
break;
}
} else {
ScoreSds key = BuilScoredKey(range.min, range.minex, buf);
ScoreSds key = BuildScoredKey(range.min, range.minex, buf);
auto path = score_tree->GEQ(key);
if (path.Empty())
return arr;
@ -662,8 +648,14 @@ SortedMap::ScoredArray SortedMap::DfImpl::GetLexRange(const zlexrangespec& range
}
uint8_t* SortedMap::DfImpl::ToListPack() const {
LOG(FATAL) << "TBD";
return nullptr;
uint8_t* lp = lpNew(0);
score_tree->Iterate(0, UINT32_MAX, [&](ScoreSds ele) {
lp = zzlInsertAt(lp, NULL, (sds)ele, GetObjScore(ele));
return true;
});
return lp;
}
bool SortedMap::DfImpl::Delete(sds ele) {
@ -687,18 +679,87 @@ bool SortedMap::DfImpl::Reserve(size_t sz) {
}
size_t SortedMap::DfImpl::DeleteRangeByRank(unsigned start, unsigned end) {
LOG(FATAL) << "TBD";
return 0;
DCHECK_LE(start, end);
DCHECK_LT(end, score_tree->Size());
for (uint32_t i = start; i <= end; ++i) {
/* Ideally, we would want to advance path to the next item and delete the previous one.
* However, we can not do that because the path is invalidated after the
* deletion. So we have to recreate the path for each item using the same rank.
* Note, it is probably could be improved, but it's much more complicated.
*/
auto path = score_tree->FromRank(start);
sds ele = (sds)path.Terminal();
score_tree->Delete(path);
score_map->Erase(ele);
}
return end - start + 1;
}
size_t SortedMap::DfImpl::DeleteRangeByScore(const zrangespec& range) {
LOG(FATAL) << "TBD";
return 0;
char buf[16] = {0};
size_t deleted = 0;
while (score_tree->Size() > 0) {
ScoreSds min_key = BuildScoredKey(range.min, range.minex, buf);
auto path = score_tree->GEQ(min_key);
if (path.Empty())
break;
ScoreSds item = path.Terminal();
double score = GetObjScore(item);
if (range.minex) {
DCHECK_GT(score, range.min);
} else {
DCHECK_GE(score, range.min);
}
if (score > range.max || (range.maxex && score == range.max))
break;
score_tree->Delete(item);
++deleted;
score_map->Erase((sds)item);
}
return deleted;
}
size_t SortedMap::DfImpl::DeleteRangeByLex(const zlexrangespec& range) {
LOG(FATAL) << "TBD";
return 0;
if (score_tree->Size() == 0)
return 0;
size_t deleted = 0;
uint32_t rank = 0;
if (range.min != cminstring) {
ScoreSds range_key = (ScoreSds)(uint64_t(range.min) | kIgnoreDoubleTag);
auto path = score_tree->GEQ(range_key);
if (path.Empty())
return {};
rank = path.Rank();
if (range.minex && sdscmp((sds)path.Terminal(), range.min) == 0) {
++rank;
}
}
while (rank < score_tree->Size()) {
auto path = score_tree->FromRank(rank);
ScoreSds item = path.Terminal();
if (range.max != cmaxstring) {
int cmp = sdscmp((sds)item, range.max);
if (cmp > 0 || (cmp == 0 && range.maxex))
break;
}
++deleted;
score_tree->Delete(path);
score_map->Erase((sds)item);
}
return deleted;
}
SortedMap::ScoredArray SortedMap::DfImpl::PopTopScores(unsigned count, bool reverse) {
@ -723,10 +784,12 @@ SortedMap::ScoredArray SortedMap::DfImpl::PopTopScores(unsigned count, bool reve
}
for (unsigned i = 0; i < count; ++i) {
score_tree->DeleteRangeByRank(rank, rank, [&](ScoreSds obj) {
res.emplace_back(string{(sds)obj, sdslen((sds)obj)}, GetObjScore(obj));
});
score_map->Erase(res.back().first);
auto path = score_tree->FromRank(rank);
ScoreSds obj = path.Terminal();
res.emplace_back(string{(sds)obj, sdslen((sds)obj)}, GetObjScore(obj));
score_tree->Delete(path);
score_map->Erase((sds)obj);
rank -= step;
}
@ -742,7 +805,7 @@ size_t SortedMap::DfImpl::Count(const zrangespec& range) const {
// build min key.
char buf[16];
ScoreSds range_key = BuilScoredKey(range.min, range.minex, buf);
ScoreSds range_key = BuildScoredKey(range.min, range.minex, buf);
auto path = score_tree->GEQ(range_key);
if (path.Empty())
return 0;
@ -760,7 +823,7 @@ size_t SortedMap::DfImpl::Count(const zrangespec& range) const {
// Now build the max key.
// If we need to exclude the maximum score, set the key'sstring part to empty string,
// otherwise set it to infinity.
range_key = BuilScoredKey(range.max, !range.maxex, buf);
range_key = BuildScoredKey(range.max, !range.maxex, buf);
path = score_tree->GEQ(range_key);
if (path.Empty()) {

View File

@ -25,7 +25,7 @@ using detail::SortedMap;
class SortedMapTest : public ::testing::Test {
protected:
SortedMapTest() : mr_(mi_heap_get_backing()) {
SortedMapTest() : mr_(mi_heap_get_backing()), sm_(&mr_) {
}
static void SetUpTestSuite() {
@ -39,57 +39,53 @@ class SortedMapTest : public ::testing::Test {
}
MiMemoryResource mr_;
SortedMap sm_;
};
TEST_F(SortedMapTest, Add) {
SortedMap sm(&mr_);
int out_flags;
double new_score;
sds ele = sdsnew("a");
int res = sm.Add(1.0, ele, 0, &out_flags, &new_score);
int res = sm_.Add(1.0, ele, 0, &out_flags, &new_score);
EXPECT_EQ(1, res);
EXPECT_EQ(ZADD_OUT_ADDED, out_flags);
EXPECT_EQ(1, new_score);
res = sm.Add(2.0, ele, ZADD_IN_NX, &out_flags, &new_score);
res = sm_.Add(2.0, ele, ZADD_IN_NX, &out_flags, &new_score);
EXPECT_EQ(1, res);
EXPECT_EQ(ZADD_OUT_NOP, out_flags);
res = sm.Add(2.0, ele, ZADD_IN_INCR, &out_flags, &new_score);
res = sm_.Add(2.0, ele, ZADD_IN_INCR, &out_flags, &new_score);
EXPECT_EQ(1, res);
EXPECT_EQ(ZADD_OUT_UPDATED, out_flags);
EXPECT_EQ(3, new_score);
EXPECT_EQ(3, sm.GetScore(ele));
EXPECT_EQ(3, sm_.GetScore(ele));
}
TEST_F(SortedMapTest, Scan) {
SortedMap sm(&mr_);
for (unsigned i = 0; i < 972; ++i) {
sm.Insert(i, sdsfromlonglong(i));
sm_.Insert(i, sdsfromlonglong(i));
}
uint64_t cursor = 0;
unsigned cnt = 0;
do {
cursor = sm.Scan(cursor, [&](string_view str, double score) { ++cnt; });
cursor = sm_.Scan(cursor, [&](string_view str, double score) { ++cnt; });
} while (cursor != 0);
EXPECT_EQ(972, cnt);
}
TEST_F(SortedMapTest, InsertPop) {
SortedMap sm(&mr_);
for (unsigned i = 0; i < 256; ++i) {
sds s = sdsempty();
s = sdscatfmt(s, "a%u", i);
ASSERT_TRUE(sm.Insert(1000, s));
ASSERT_TRUE(sm_.Insert(1000, s));
}
vector<sds> vec;
bool res = sm.Iterate(1, 2, false, [&](sds ele, double score) {
bool res = sm_.Iterate(1, 2, false, [&](sds ele, double score) {
vec.push_back(ele);
return true;
});
@ -97,26 +93,24 @@ TEST_F(SortedMapTest, InsertPop) {
EXPECT_THAT(vec, ElementsAre(StrEq("a1"), StrEq("a10")));
sds s = sdsnew("a1");
EXPECT_EQ(1, sm.GetRank(s, false));
EXPECT_EQ(254, sm.GetRank(s, true));
EXPECT_EQ(1, sm_.GetRank(s, false));
EXPECT_EQ(254, sm_.GetRank(s, true));
sdsfree(s);
auto top_scores = sm.PopTopScores(3, false);
auto top_scores = sm_.PopTopScores(3, false);
EXPECT_THAT(top_scores, ElementsAre(Pair(StrEq("a0"), 1000), Pair(StrEq("a1"), 1000),
Pair(StrEq("a10"), 1000)));
top_scores = sm.PopTopScores(3, true);
top_scores = sm_.PopTopScores(3, true);
EXPECT_THAT(top_scores, ElementsAre(Pair(StrEq("a99"), 1000), Pair(StrEq("a98"), 1000),
Pair(StrEq("a97"), 1000)));
}
TEST_F(SortedMapTest, LexRanges) {
SortedMap sm(&mr_);
for (unsigned i = 0; i < 100; ++i) {
sds s = sdsempty();
s = sdscatfmt(s, "a%u", i);
ASSERT_TRUE(sm.Insert(1, s));
ASSERT_TRUE(sm_.Insert(1, s));
}
zlexrangespec range;
@ -124,58 +118,56 @@ TEST_F(SortedMapTest, LexRanges) {
range.min = sdsnew("a93");
range.maxex = 0;
range.minex = 0;
EXPECT_EQ(4, sm.LexCount(range));
auto array = sm.GetLexRange(range, 1, 1000, false);
EXPECT_EQ(4, sm_.LexCount(range));
auto array = sm_.GetLexRange(range, 1, 1000, false);
ASSERT_EQ(3, array.size());
EXPECT_THAT(array.front(), Pair("a94", 1));
range.maxex = 1;
EXPECT_EQ(3, sm.LexCount(range));
array = sm.GetLexRange(range, 1, 1000, true);
EXPECT_EQ(3, sm_.LexCount(range));
array = sm_.GetLexRange(range, 1, 1000, true);
ASSERT_EQ(2, array.size());
EXPECT_THAT(array.front(), Pair("a94", 1));
range.minex = 1;
EXPECT_EQ(2, sm.LexCount(range));
array = sm.GetLexRange(range, 1, 1000, false);
EXPECT_EQ(2, sm_.LexCount(range));
array = sm_.GetLexRange(range, 1, 1000, false);
ASSERT_EQ(1, array.size());
EXPECT_THAT(array.front(), Pair("a95", 1));
sdsfree(range.min);
range.min = range.max;
EXPECT_EQ(0, sm.LexCount(range));
EXPECT_EQ(0, sm_.LexCount(range));
range.minex = 0;
EXPECT_EQ(0, sm.LexCount(range));
EXPECT_EQ(0, sm_.LexCount(range));
sdsfree(range.max);
range.maxex = 0;
range.min = cminstring;
range.max = sdsnew("a");
EXPECT_EQ(0, sm.LexCount(range));
EXPECT_EQ(0, sm_.LexCount(range));
sdsfree(range.max);
range.max = sdsnew("a0");
EXPECT_EQ(1, sm.LexCount(range));
EXPECT_EQ(1, sm_.LexCount(range));
range.maxex = 1;
EXPECT_EQ(0, sm.LexCount(range));
EXPECT_EQ(0, sm_.LexCount(range));
sdsfree(range.max);
}
TEST_F(SortedMapTest, ScoreRanges) {
SortedMap sm(&mr_);
for (unsigned i = 0; i < 10; ++i) {
sds s = sdsempty();
s = sdscatfmt(s, "a%u", i);
ASSERT_TRUE(sm.Insert(1, s));
ASSERT_TRUE(sm_.Insert(1, s));
}
for (unsigned i = 0; i < 10; ++i) {
sds s = sdsempty();
s = sdscatfmt(s, "b%u", i);
ASSERT_TRUE(sm.Insert(2, s));
ASSERT_TRUE(sm_.Insert(2, s));
}
zrangespec range;
@ -183,30 +175,75 @@ TEST_F(SortedMapTest, ScoreRanges) {
range.min = 1;
range.maxex = 0;
range.minex = 0;
EXPECT_EQ(20, sm.Count(range));
detail::SortedMap::ScoredArray array = sm.GetRange(range, 0, 1000, false);
EXPECT_EQ(20, sm_.Count(range));
detail::SortedMap::ScoredArray array = sm_.GetRange(range, 0, 1000, false);
ASSERT_EQ(20, array.size());
EXPECT_THAT(array.front(), Pair("a0", 1));
EXPECT_THAT(array.back(), Pair("b9", 2));
range.minex = 1; // exclude all the "1" scores.
EXPECT_EQ(10, sm.Count(range));
array = sm.GetRange(range, 2, 1, false);
EXPECT_EQ(10, sm_.Count(range));
array = sm_.GetRange(range, 2, 1, false);
ASSERT_EQ(1, array.size());
EXPECT_THAT(array.front(), Pair("b2", 2));
range.max = 1;
range.minex = 0;
range.min = -HUGE_VAL;
EXPECT_EQ(10, sm.Count(range));
array = sm.GetRange(range, 2, 2, true);
EXPECT_EQ(10, sm_.Count(range));
array = sm_.GetRange(range, 2, 2, true);
ASSERT_EQ(2, array.size());
EXPECT_THAT(array.back(), Pair("a6", 1));
range.maxex = 1;
EXPECT_EQ(0, sm.Count(range));
array = sm.GetRange(range, 0, 2, true);
EXPECT_EQ(0, sm_.Count(range));
array = sm_.GetRange(range, 0, 2, true);
ASSERT_EQ(0, array.size());
range.min = 3;
array = sm_.GetRange(range, 0, 2, true);
ASSERT_EQ(0, array.size());
}
TEST_F(SortedMapTest, DeleteRange) {
for (unsigned i = 0; i <= 100; ++i) {
sds s = sdsempty();
s = sdscatfmt(s, "a%u", i);
ASSERT_TRUE(sm_.Insert(i * 2, s));
}
zrangespec range;
range.min = range.max = 200;
range.minex = range.maxex = 1;
EXPECT_EQ(0, sm_.DeleteRangeByScore(range));
range.min = 199;
EXPECT_EQ(0, sm_.DeleteRangeByScore(range));
range.minex = 0;
EXPECT_EQ(0, sm_.DeleteRangeByScore(range));
range.max = 199;
range.min = 198;
EXPECT_EQ(1, sm_.DeleteRangeByScore(range));
range.max = 197;
range.min = 193;
EXPECT_EQ(2, sm_.DeleteRangeByScore(range));
EXPECT_EQ(2, sm_.DeleteRangeByRank(0, 1));
zlexrangespec lex_range;
lex_range.min = sdsnew("b");
lex_range.max = sdsnew("c");
EXPECT_EQ(0, sm_.DeleteRangeByLex(lex_range));
sdsfree(lex_range.min);
sdsfree(lex_range.max);
lex_range.min = cminstring;
lex_range.max = cmaxstring;
EXPECT_EQ(96, sm_.DeleteRangeByLex(lex_range));
}
} // namespace dfly

View File

@ -111,8 +111,12 @@ TEST_F(ZSetFamilyTest, LargeSet) {
auto resp = Run({"zadd", "key", absl::StrCat(i), absl::StrCat("element:", i)});
EXPECT_THAT(resp, IntArg(1)) << i;
}
Run({"zadd", "key", "129", ""});
EXPECT_THAT(Run({"zrangebyscore", "key", "(-inf", "(0.0"}), ArrLen(0));
EXPECT_THAT(Run({"zrangebyscore", "key", "(5", "0.0"}), ArrLen(0));
EXPECT_THAT(Run({"zrangebylex", "key", "-", "(element:0"}), ArrLen(0));
EXPECT_EQ(2, CheckedInt({"zremrangebyscore", "key", "127", "(129"}));
}
TEST_F(ZSetFamilyTest, ZRemRangeRank) {