mirror of
https://github.com/dragonflydb/dragonfly
synced 2024-11-21 15:11:20 +00:00
fix(server): hscan command better support - core review changes #347 Co-authored-by: Boaz Sade <boaz@dragonflydb.io>
This commit is contained in:
parent
881e8a1b0c
commit
af690668ca
@ -1,5 +1,5 @@
|
||||
add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc
|
||||
memcache_parser.cc redis_parser.cc reply_builder.cc)
|
||||
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc)
|
||||
|
||||
if (DF_USE_SSL)
|
||||
set(TLS_LIB tls_lib)
|
||||
|
46
src/facade/op_status.cc
Normal file
46
src/facade/op_status.cc
Normal file
@ -0,0 +1,46 @@
|
||||
#include "facade/op_status.h"
|
||||
|
||||
namespace facade {
|
||||
|
||||
const char* DebugString(OpStatus op) {
|
||||
switch (op) {
|
||||
case OpStatus::OK:
|
||||
return "OK";
|
||||
case OpStatus::KEY_EXISTS:
|
||||
return "KEY EXISTS";
|
||||
case OpStatus::KEY_NOTFOUND:
|
||||
return "KEY NOTFOUND";
|
||||
case OpStatus::SKIPPED:
|
||||
return "SKIPPED";
|
||||
case OpStatus::INVALID_VALUE:
|
||||
return "INVALID VALUE";
|
||||
case OpStatus::OUT_OF_RANGE:
|
||||
return "OUT OF RANGE";
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return "WRONG TYPE";
|
||||
case OpStatus::TIMED_OUT:
|
||||
return "TIMED OUT";
|
||||
case OpStatus::OUT_OF_MEMORY:
|
||||
return "OUT OF MEMORY";
|
||||
case OpStatus::INVALID_FLOAT:
|
||||
return "INVALID FLOAT";
|
||||
case OpStatus::INVALID_INT:
|
||||
return "INVALID INT";
|
||||
case OpStatus::SYNTAX_ERR:
|
||||
return "INVALID SYNTAX";
|
||||
case OpStatus::BUSY_GROUP:
|
||||
return "BUSY GROUP";
|
||||
case OpStatus::STREAM_ID_SMALL:
|
||||
return "STREAM ID TO SMALL";
|
||||
case OpStatus::ENTRIES_ADDED_SMALL:
|
||||
return "ENTRIES ADDED IS TO SMALL";
|
||||
case OpStatus::INVALID_NUMERIC_RESULT:
|
||||
return "INVALID NUMERIC RESULT";
|
||||
}
|
||||
return "Unknown Error Code"; // we should not be here, but this is how enums works in c++
|
||||
}
|
||||
const char* OpResultBase::DebugFormat() const {
|
||||
return DebugString(st_);
|
||||
}
|
||||
|
||||
} // namespace facade
|
@ -27,6 +27,8 @@ enum class OpStatus : uint16_t {
|
||||
INVALID_NUMERIC_RESULT,
|
||||
};
|
||||
|
||||
const char* DebugString(OpStatus op);
|
||||
|
||||
class OpResultBase {
|
||||
public:
|
||||
OpResultBase(OpStatus st = OpStatus::OK) : st_(st) {
|
||||
@ -48,6 +50,8 @@ class OpResultBase {
|
||||
return st_ == OpStatus::OK;
|
||||
}
|
||||
|
||||
const char* DebugFormat() const;
|
||||
|
||||
private:
|
||||
OpStatus st_;
|
||||
};
|
||||
@ -97,7 +101,7 @@ inline bool operator==(OpStatus st, const OpResultBase& ob) {
|
||||
namespace std {
|
||||
|
||||
template <typename T> std::ostream& operator<<(std::ostream& os, const facade::OpResult<T>& res) {
|
||||
os << int(res.status());
|
||||
os << res.status();
|
||||
return os;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,48 @@ using namespace facade;
|
||||
|
||||
namespace {
|
||||
|
||||
struct ScanOpts {
|
||||
string_view pattern;
|
||||
size_t limit = 10;
|
||||
|
||||
constexpr bool Matches(std::string_view val_name) const {
|
||||
return pattern.empty() ? true
|
||||
: stringmatchlen(pattern.data(), pattern.size(), val_name.data(),
|
||||
val_name.size(), 0);
|
||||
}
|
||||
|
||||
static OpResult<ScanOpts> TryFrom(CmdArgList args);
|
||||
};
|
||||
|
||||
OpResult<ScanOpts> ScanOpts::TryFrom(CmdArgList args) {
|
||||
ScanOpts scan_opts;
|
||||
|
||||
for (unsigned i = 3; i < args.size(); i += 2) {
|
||||
ToUpper(&args[i]);
|
||||
string_view opt = ArgS(args, i);
|
||||
if (i + 1 == args.size()) {
|
||||
return OpStatus::SYNTAX_ERR;
|
||||
}
|
||||
|
||||
if (opt == "COUNT") {
|
||||
if (!absl::SimpleAtoi(ArgS(args, i + 1), &scan_opts.limit)) {
|
||||
return OpStatus::INVALID_INT;
|
||||
}
|
||||
if (scan_opts.limit == 0)
|
||||
scan_opts.limit = 1;
|
||||
else if (scan_opts.limit > 4096)
|
||||
scan_opts.limit = 4096;
|
||||
} else if (opt == "MATCH") {
|
||||
scan_opts.pattern = ArgS(args, i + 1);
|
||||
if (scan_opts.pattern == "*")
|
||||
scan_opts.pattern = string_view{};
|
||||
} else {
|
||||
return OpStatus::SYNTAX_ERR;
|
||||
}
|
||||
}
|
||||
return scan_opts;
|
||||
}
|
||||
|
||||
constexpr size_t kMaxListPackLen = 1024;
|
||||
using IncrByParam = std::variant<double, int64_t>;
|
||||
using OptStr = std::optional<std::string>;
|
||||
@ -215,15 +257,27 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) {
|
||||
OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor,
|
||||
const ScanOpts& scan_op) {
|
||||
using PrivateDataRef = std::tuple<StringVec&, const ScanOpts&>;
|
||||
constexpr size_t HASH_TABLE_ENTRIES_FACTOR = 2; // return key/value
|
||||
|
||||
/* We set the max number of iterations to ten times the specified
|
||||
* COUNT, so if the hash table is in a pathological state (very
|
||||
* sparsely populated) we avoid to block too much time at the cost
|
||||
* of returning no or very few elements. (taken from redis code at db.c line 904 */
|
||||
constexpr size_t INTERATION_FACTOR = 10;
|
||||
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_HASH);
|
||||
|
||||
if (!find_res)
|
||||
if (!find_res) {
|
||||
DVLOG(1) << "ScanOp: find failed: " << find_res << ", baling out";
|
||||
return find_res.status();
|
||||
}
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
uint32_t count = 20;
|
||||
uint32_t count = scan_op.limit * HASH_TABLE_ENTRIES_FACTOR;
|
||||
robj* hset = it->second.AsRObj();
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
@ -235,29 +289,38 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t
|
||||
int64_t ele_len;
|
||||
unsigned char intbuf[LP_INTBUF_SIZE];
|
||||
|
||||
// We do single pass on listpack for this operation.
|
||||
// We do single pass on listpack for this operation - ignore any limits.
|
||||
do {
|
||||
uint8_t* elem = lpGet(lp_elem, &ele_len, intbuf);
|
||||
DCHECK(elem);
|
||||
res.emplace_back(reinterpret_cast<char*>(elem), size_t(ele_len));
|
||||
if (scan_op.Matches({reinterpret_cast<char*>(elem), size_t(ele_len)})) {
|
||||
res.emplace_back(reinterpret_cast<char*>(elem), size_t(ele_len));
|
||||
}
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to value
|
||||
} while (lp_elem);
|
||||
*cursor = 0;
|
||||
} else {
|
||||
dict* ht = (dict*)hset->ptr;
|
||||
long maxiterations = count * 10;
|
||||
void* privdata = &res;
|
||||
auto scanCb = [](void* privdata, const dictEntry* de) {
|
||||
StringVec* res = (StringVec*)privdata;
|
||||
long max_iterations = count * INTERATION_FACTOR;
|
||||
PrivateDataRef private_data_ref(res, scan_op);
|
||||
void* private_data = &private_data_ref;
|
||||
|
||||
// note about this lambda - don't capture here! it should be convertible to C function!
|
||||
auto scanCb = [](void* private_data, const dictEntry* de) {
|
||||
StringVec& res = std::get<0>(*(PrivateDataRef*)private_data);
|
||||
const ScanOpts& scan_op = std::get<1>(*(PrivateDataRef*)private_data);
|
||||
sds val = (sds)de->key;
|
||||
res->emplace_back(val, sdslen(val));
|
||||
val = (sds)de->v.val;
|
||||
res->emplace_back(val, sdslen(val));
|
||||
auto len = sdslen(val);
|
||||
if (scan_op.Matches(std::string_view(val, len))) {
|
||||
res.emplace_back(val, len);
|
||||
val = (sds)de->v.val;
|
||||
res.emplace_back(val, sdslen(val));
|
||||
}
|
||||
};
|
||||
|
||||
do {
|
||||
*cursor = dictScan(ht, *cursor, scanCb, NULL, privdata);
|
||||
} while (*cursor && maxiterations-- && res.size() < count);
|
||||
*cursor = dictScan(ht, *cursor, scanCb, NULL, private_data);
|
||||
} while (*cursor && max_iterations-- && res.size() < count);
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -801,12 +864,22 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) {
|
||||
return (*cntx)->SendError("invalid cursor");
|
||||
}
|
||||
|
||||
if (args.size() > 3) {
|
||||
return (*cntx)->SendError("scan options are not supported yet");
|
||||
// HSCAN key cursor [MATCH pattern] [COUNT count]
|
||||
if (args.size() > 7) {
|
||||
DVLOG(1) << "got " << args.size() << " this is more than it should be";
|
||||
return (*cntx)->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args);
|
||||
if (!ops) {
|
||||
DVLOG(1) << "HScan invalid args - return " << ops << " to the user";
|
||||
return (*cntx)->SendError(ops.status());
|
||||
}
|
||||
|
||||
ScanOpts scan_op = ops.value();
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpScan(t->GetOpArgs(shard), key, &cursor);
|
||||
return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -118,4 +118,43 @@ TEST_F(HSetFamilyTest, HIncr) {
|
||||
EXPECT_THAT(resp, ErrArg("hash value is not an integer"));
|
||||
}
|
||||
|
||||
TEST_F(HSetFamilyTest, HScan) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Run({"HSET", "myhash", absl::StrCat("Field-", i), absl::StrCat("Value-", i)});
|
||||
}
|
||||
|
||||
// Note that even though this limit by 4, it would return more because
|
||||
// all fields are on listpack
|
||||
auto resp = Run({"hscan", "myhash", "0", "count", "4"});
|
||||
EXPECT_THAT(resp, ArrLen(2));
|
||||
auto vec = StrArray(resp.GetVec()[1]);
|
||||
EXPECT_EQ(vec.size(), 20);
|
||||
EXPECT_THAT(vec, Each(AnyOf(StartsWith("Field"), StartsWith("Value"))));
|
||||
|
||||
// Now run with filter on the results - we are expecting to not getting
|
||||
// any result at this point
|
||||
resp = Run({"hscan", "myhash", "0", "match", "*x*"}); // nothing should match this
|
||||
EXPECT_THAT(resp, ArrLen(2));
|
||||
vec = StrArray(resp.GetVec()[1]);
|
||||
EXPECT_EQ(vec.size(), 0);
|
||||
|
||||
// now we will do a positive match - anything that has 1 on it
|
||||
resp = Run({"hscan", "myhash", "0", "match", "*1*"});
|
||||
EXPECT_THAT(resp, ArrLen(2));
|
||||
vec = StrArray(resp.GetVec()[1]);
|
||||
EXPECT_EQ(vec.size(), 2); // key/value = 2
|
||||
|
||||
// Test with large hash to see that count limit the number of entries
|
||||
for (int i = 0; i < 200; i++) {
|
||||
Run({"HSET", "largehash", absl::StrCat("KeyNum-", i), absl::StrCat("KeyValue-", i)});
|
||||
}
|
||||
resp = Run({"hscan", "largehash", "0", "count", "20"});
|
||||
EXPECT_THAT(resp, ArrLen(2));
|
||||
vec = StrArray(resp.GetVec()[1]);
|
||||
|
||||
// See https://redis.io/commands/scan/ --> "The COUNT option", for why this cannot be exact
|
||||
EXPECT_GT(vec.size(), 40); // This should be larger than (20 * 2) and less than about 50
|
||||
EXPECT_LT(vec.size(), 60);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
Loading…
Reference in New Issue
Block a user