chore: Implement list Pop/Erase functionality with QList (#4099)

Adjusted OpRem/OpBPop/OpPop

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-10 12:37:42 +02:00 committed by GitHub
parent c43ba5f2cc
commit 9366c67464
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 154 additions and 46 deletions

View File

@ -355,6 +355,40 @@ void QList::Push(string_view value, Where where) {
}
}
string QList::Pop(Where where) {
DCHECK_GT(count_, 0u);
quicklistNode* node;
if (where == HEAD) {
node = head_;
} else {
DCHECK_EQ(TAIL, where);
node = tail_;
}
/* The head and tail should never be compressed */
DCHECK(node->encoding != QUICKLIST_NODE_ENCODING_LZF);
string res;
if (ABSL_PREDICT_FALSE(QL_NODE_IS_PLAIN(node))) {
// TODO: We could avoid this copy by returning the pointer of the plain node.
// But the higher level APIs should support this.
res.assign(reinterpret_cast<char*>(node->entry), node->sz);
DelNode(node);
} else {
uint8_t* pos = where == HEAD ? lpFirst(node->entry) : lpLast(node->entry);
unsigned int vlen;
long long vlong;
uint8_t* vstr = lpGetValue(pos, &vlen, &vlong);
if (vstr) {
res.assign(reinterpret_cast<char*>(vstr), vlen);
} else {
res = absl::StrCat(vlong);
}
DelPackedIndex(node, &pos);
}
return res;
}
void QList::AppendListpack(unsigned char* zl) {
quicklistNode* node = CreateNode();
node->entry = zl;

View File

@ -8,7 +8,8 @@ extern "C" {
#include "redis/quicklist.h"
}
#include <functional>
#include <absl/functional/function_ref.h>
#include <optional>
#include <string>
#include <variant>
@ -35,6 +36,10 @@ class QList {
return std::get<std::string_view>(value_);
}
bool is_int() const {
return std::holds_alternative<int64_t>(value_);
}
int64_t ival() const {
return std::get<int64_t>(value_);
}
@ -66,7 +71,7 @@ class QList {
friend class QList;
};
using IterateFunc = std::function<bool(Entry)>;
using IterateFunc = absl::FunctionRef<bool(Entry)>;
enum InsertOpt { BEFORE, AFTER };
QList();
@ -85,6 +90,10 @@ class QList {
void Clear();
void Push(std::string_view value, Where where);
// Returns the popped value. Precondition: list is not empty.
std::string Pop(Where where);
void AppendListpack(unsigned char* zl);
void AppendPlain(unsigned char* zl, size_t sz);

View File

@ -156,13 +156,25 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis
CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok.
auto it = it_res->it;
quicklist* ql = GetQL(it->second);
std::string value;
size_t len;
if (it->second.Encoding() == OBJ_ENCODING_QUICKLIST) {
quicklist* ql = GetQL(it->second);
value = ListPop(dir, ql);
len = quicklistCount(ql);
} else {
QList* ql = GetQLV2(it->second);
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
value = ql->Pop(where);
len = ql->Size();
}
std::string value = ListPop(dir, ql);
it_res->post_updater.Run();
OpArgs op_args = t->GetOpArgs(shard);
if (quicklistCount(ql) == 0) {
if (len == 0) {
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, it));
}
@ -346,24 +358,46 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
return StringVec{};
auto it = it_res->it;
quicklist* ql = GetQL(it->second);
auto prev_len = quicklistCount(ql);
size_t prev_len = 0;
StringVec res;
if (prev_len < count) {
count = prev_len;
}
if (return_results) {
res.reserve(count);
}
if (it->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(it->second);
prev_len = ql->Size();
if (prev_len < count) {
count = prev_len;
}
for (unsigned i = 0; i < count; ++i) {
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
res.reserve(count);
}
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
for (unsigned i = 0; i < count; ++i) {
if (return_results) {
res.push_back(ql->Pop(where));
}
}
} else {
quicklist* ql = GetQL(it->second);
prev_len = quicklistCount(ql);
if (prev_len < count) {
count = prev_len;
}
if (return_results) {
res.reserve(count);
}
for (unsigned i = 0; i < count; ++i) {
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
}
}
}
it_res->post_updater.Run();
if (count == prev_len) {
@ -572,46 +606,77 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, string_view ele
return it_res.status();
auto it = it_res->it;
quicklist* ql = GetQL(it->second);
int iter_direction = AL_START_HEAD;
long long index = 0;
if (count < 0) {
count = -count;
iter_direction = AL_START_TAIL;
index = -1;
}
quicklistIter qiter;
quicklistInitIterator(&qiter, ql, iter_direction, index);
quicklistEntry entry;
size_t len = 0;
unsigned removed = 0;
int64_t ival;
// try parsing the element into an integer.
int is_int = lpStringToInt64(elem.data(), elem.size(), &ival);
auto is_match = [&](const quicklistEntry& entry) {
if (is_int != (entry.value == nullptr))
return false;
if (it->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(it->second);
QList::Where where = QList::HEAD;
return is_int ? entry.longval == ival : ElemCompare(entry, elem);
};
while (quicklistNext(&qiter, &entry)) {
if (is_match(entry)) {
quicklistDelEntry(&qiter, &entry);
removed++;
if (count && removed == count)
break;
if (count < 0) {
count = -count;
where = QList::TAIL;
}
}
auto it = ql->GetIterator(where);
auto is_match = [&](const QList::Entry& entry) {
if (is_int) {
return entry.is_int() && entry.ival() == ival;
}
return entry == elem;
};
while (it.Next()) {
QList::Entry entry = it.Get();
if (is_match(entry)) {
it = ql->Erase(it);
removed++;
if (count && removed == count)
break;
}
}
len = ql->Size();
} else {
quicklist* ql = GetQL(it->second);
int iter_direction = AL_START_HEAD;
long long index = 0;
if (count < 0) {
count = -count;
iter_direction = AL_START_TAIL;
index = -1;
}
quicklistIter qiter;
quicklistInitIterator(&qiter, ql, iter_direction, index);
quicklistEntry entry;
auto is_match = [&](const quicklistEntry& entry) {
if (is_int != (entry.value == nullptr))
return false;
return is_int ? entry.longval == ival : ElemCompare(entry, elem);
};
while (quicklistNext(&qiter, &entry)) {
if (is_match(entry)) {
quicklistDelEntry(&qiter, &entry);
removed++;
if (count && removed == count)
break;
}
}
quicklistCompressIterator(&qiter);
len = quicklistCount(ql);
}
it_res->post_updater.Run();
quicklistCompressIterator(&qiter);
if (quicklistCount(ql) == 0) {
if (len == 0) {
CHECK(db_slice.Del(op_args.db_cntx, it));
}