From af690668ca339dddee8838884a296a38e7974380 Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Tue, 4 Oct 2022 11:10:47 +0300 Subject: [PATCH] fix(server): hscan command better support #347 (#350) fix(server): hscan command better support - core review changes #347 Co-authored-by: Boaz Sade --- src/facade/CMakeLists.txt | 2 +- src/facade/op_status.cc | 46 ++++++++++++++ src/facade/op_status.h | 6 +- src/server/hset_family.cc | 107 +++++++++++++++++++++++++++------ src/server/hset_family_test.cc | 39 ++++++++++++ 5 files changed, 181 insertions(+), 19 deletions(-) create mode 100644 src/facade/op_status.cc diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index b8907a83a..528a4e5e6 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc - memcache_parser.cc redis_parser.cc reply_builder.cc) + memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc) if (DF_USE_SSL) set(TLS_LIB tls_lib) diff --git a/src/facade/op_status.cc b/src/facade/op_status.cc new file mode 100644 index 000000000..e398bad00 --- /dev/null +++ b/src/facade/op_status.cc @@ -0,0 +1,46 @@ +#include "facade/op_status.h" + +namespace facade { + +const char* DebugString(OpStatus op) { + switch (op) { + case OpStatus::OK: + return "OK"; + case OpStatus::KEY_EXISTS: + return "KEY EXISTS"; + case OpStatus::KEY_NOTFOUND: + return "KEY NOTFOUND"; + case OpStatus::SKIPPED: + return "SKIPPED"; + case OpStatus::INVALID_VALUE: + return "INVALID VALUE"; + case OpStatus::OUT_OF_RANGE: + return "OUT OF RANGE"; + case OpStatus::WRONG_TYPE: + return "WRONG TYPE"; + case OpStatus::TIMED_OUT: + return "TIMED OUT"; + case OpStatus::OUT_OF_MEMORY: + return "OUT OF MEMORY"; + case OpStatus::INVALID_FLOAT: + return "INVALID FLOAT"; + case OpStatus::INVALID_INT: + return "INVALID INT"; + case OpStatus::SYNTAX_ERR: + return "INVALID SYNTAX"; + case OpStatus::BUSY_GROUP: + return "BUSY GROUP"; + case OpStatus::STREAM_ID_SMALL: + return "STREAM ID TO SMALL"; + case OpStatus::ENTRIES_ADDED_SMALL: + return "ENTRIES ADDED IS TO SMALL"; + case OpStatus::INVALID_NUMERIC_RESULT: + return "INVALID NUMERIC RESULT"; + } + return "Unknown Error Code"; // we should not be here, but this is how enums works in c++ +} +const char* OpResultBase::DebugFormat() const { + return DebugString(st_); +} + +} // namespace facade diff --git a/src/facade/op_status.h b/src/facade/op_status.h index 0cd9ab73e..3dd70af2d 100644 --- a/src/facade/op_status.h +++ b/src/facade/op_status.h @@ -27,6 +27,8 @@ enum class OpStatus : uint16_t { INVALID_NUMERIC_RESULT, }; +const char* DebugString(OpStatus op); + class OpResultBase { public: OpResultBase(OpStatus st = OpStatus::OK) : st_(st) { @@ -48,6 +50,8 @@ class OpResultBase { return st_ == OpStatus::OK; } + const char* DebugFormat() const; + private: OpStatus st_; }; @@ -97,7 +101,7 @@ inline bool operator==(OpStatus st, const OpResultBase& ob) { namespace std { template std::ostream& operator<<(std::ostream& os, const facade::OpResult& res) { - os << int(res.status()); + os << res.status(); return os; } diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index a8fcc870e..97beedc27 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -26,6 +26,48 @@ using namespace facade; namespace { +struct ScanOpts { + string_view pattern; + size_t limit = 10; + + constexpr bool Matches(std::string_view val_name) const { + return pattern.empty() ? true + : stringmatchlen(pattern.data(), pattern.size(), val_name.data(), + val_name.size(), 0); + } + + static OpResult TryFrom(CmdArgList args); +}; + +OpResult ScanOpts::TryFrom(CmdArgList args) { + ScanOpts scan_opts; + + for (unsigned i = 3; i < args.size(); i += 2) { + ToUpper(&args[i]); + string_view opt = ArgS(args, i); + if (i + 1 == args.size()) { + return OpStatus::SYNTAX_ERR; + } + + if (opt == "COUNT") { + if (!absl::SimpleAtoi(ArgS(args, i + 1), &scan_opts.limit)) { + return OpStatus::INVALID_INT; + } + if (scan_opts.limit == 0) + scan_opts.limit = 1; + else if (scan_opts.limit > 4096) + scan_opts.limit = 4096; + } else if (opt == "MATCH") { + scan_opts.pattern = ArgS(args, i + 1); + if (scan_opts.pattern == "*") + scan_opts.pattern = string_view{}; + } else { + return OpStatus::SYNTAX_ERR; + } + } + return scan_opts; +} + constexpr size_t kMaxListPackLen = 1024; using IncrByParam = std::variant; using OptStr = std::optional; @@ -215,15 +257,27 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc return OpStatus::OK; } -OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { +OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor, + const ScanOpts& scan_op) { + using PrivateDataRef = std::tuple; + constexpr size_t HASH_TABLE_ENTRIES_FACTOR = 2; // return key/value + + /* We set the max number of iterations to ten times the specified + * COUNT, so if the hash table is in a pathological state (very + * sparsely populated) we avoid to block too much time at the cost + * of returning no or very few elements. (taken from redis code at db.c line 904 */ + constexpr size_t INTERATION_FACTOR = 10; + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_HASH); - if (!find_res) + if (!find_res) { + DVLOG(1) << "ScanOp: find failed: " << find_res << ", baling out"; return find_res.status(); + } PrimeIterator it = find_res.value(); StringVec res; - uint32_t count = 20; + uint32_t count = scan_op.limit * HASH_TABLE_ENTRIES_FACTOR; robj* hset = it->second.AsRObj(); if (hset->encoding == OBJ_ENCODING_LISTPACK) { @@ -235,29 +289,38 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t int64_t ele_len; unsigned char intbuf[LP_INTBUF_SIZE]; - // We do single pass on listpack for this operation. + // We do single pass on listpack for this operation - ignore any limits. do { uint8_t* elem = lpGet(lp_elem, &ele_len, intbuf); DCHECK(elem); - res.emplace_back(reinterpret_cast(elem), size_t(ele_len)); + if (scan_op.Matches({reinterpret_cast(elem), size_t(ele_len)})) { + res.emplace_back(reinterpret_cast(elem), size_t(ele_len)); + } lp_elem = lpNext(lp, lp_elem); // switch to value } while (lp_elem); *cursor = 0; } else { dict* ht = (dict*)hset->ptr; - long maxiterations = count * 10; - void* privdata = &res; - auto scanCb = [](void* privdata, const dictEntry* de) { - StringVec* res = (StringVec*)privdata; + long max_iterations = count * INTERATION_FACTOR; + PrivateDataRef private_data_ref(res, scan_op); + void* private_data = &private_data_ref; + + // note about this lambda - don't capture here! it should be convertible to C function! + auto scanCb = [](void* private_data, const dictEntry* de) { + StringVec& res = std::get<0>(*(PrivateDataRef*)private_data); + const ScanOpts& scan_op = std::get<1>(*(PrivateDataRef*)private_data); sds val = (sds)de->key; - res->emplace_back(val, sdslen(val)); - val = (sds)de->v.val; - res->emplace_back(val, sdslen(val)); + auto len = sdslen(val); + if (scan_op.Matches(std::string_view(val, len))) { + res.emplace_back(val, len); + val = (sds)de->v.val; + res.emplace_back(val, sdslen(val)); + } }; do { - *cursor = dictScan(ht, *cursor, scanCb, NULL, privdata); - } while (*cursor && maxiterations-- && res.size() < count); + *cursor = dictScan(ht, *cursor, scanCb, NULL, private_data); + } while (*cursor && max_iterations-- && res.size() < count); } return res; @@ -801,12 +864,22 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendError("invalid cursor"); } - if (args.size() > 3) { - return (*cntx)->SendError("scan options are not supported yet"); + // HSCAN key cursor [MATCH pattern] [COUNT count] + if (args.size() > 7) { + DVLOG(1) << "got " << args.size() << " this is more than it should be"; + return (*cntx)->SendError(kSyntaxErr); } + OpResult ops = ScanOpts::TryFrom(args); + if (!ops) { + DVLOG(1) << "HScan invalid args - return " << ops << " to the user"; + return (*cntx)->SendError(ops.status()); + } + + ScanOpts scan_op = ops.value(); + auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(t->GetOpArgs(shard), key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/hset_family_test.cc b/src/server/hset_family_test.cc index c1778be50..2084bb6de 100644 --- a/src/server/hset_family_test.cc +++ b/src/server/hset_family_test.cc @@ -118,4 +118,43 @@ TEST_F(HSetFamilyTest, HIncr) { EXPECT_THAT(resp, ErrArg("hash value is not an integer")); } +TEST_F(HSetFamilyTest, HScan) { + for (int i = 0; i < 10; i++) { + Run({"HSET", "myhash", absl::StrCat("Field-", i), absl::StrCat("Value-", i)}); + } + + // Note that even though this limit by 4, it would return more because + // all fields are on listpack + auto resp = Run({"hscan", "myhash", "0", "count", "4"}); + EXPECT_THAT(resp, ArrLen(2)); + auto vec = StrArray(resp.GetVec()[1]); + EXPECT_EQ(vec.size(), 20); + EXPECT_THAT(vec, Each(AnyOf(StartsWith("Field"), StartsWith("Value")))); + + // Now run with filter on the results - we are expecting to not getting + // any result at this point + resp = Run({"hscan", "myhash", "0", "match", "*x*"}); // nothing should match this + EXPECT_THAT(resp, ArrLen(2)); + vec = StrArray(resp.GetVec()[1]); + EXPECT_EQ(vec.size(), 0); + + // now we will do a positive match - anything that has 1 on it + resp = Run({"hscan", "myhash", "0", "match", "*1*"}); + EXPECT_THAT(resp, ArrLen(2)); + vec = StrArray(resp.GetVec()[1]); + EXPECT_EQ(vec.size(), 2); // key/value = 2 + + // Test with large hash to see that count limit the number of entries + for (int i = 0; i < 200; i++) { + Run({"HSET", "largehash", absl::StrCat("KeyNum-", i), absl::StrCat("KeyValue-", i)}); + } + resp = Run({"hscan", "largehash", "0", "count", "20"}); + EXPECT_THAT(resp, ArrLen(2)); + vec = StrArray(resp.GetVec()[1]); + + // See https://redis.io/commands/scan/ --> "The COUNT option", for why this cannot be exact + EXPECT_GT(vec.size(), 40); // This should be larger than (20 * 2) and less than about 50 + EXPECT_LT(vec.size(), 60); +} + } // namespace dfly