From b36c16b314226d469636de8ef51b40d264d28905 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 28 Apr 2022 19:05:51 +0300 Subject: [PATCH] Implement single shard use-case for rpoplpush. Some BLPOP related refactoring --- TODO.md | 19 +++- src/server/db_slice.cc | 2 +- src/server/db_slice.h | 4 +- src/server/list_family.cc | 182 ++++++++++++++++++++++++++++++-------- src/server/list_family.h | 3 +- src/server/transaction.cc | 74 ---------------- src/server/transaction.h | 13 --- 7 files changed, 167 insertions(+), 130 deletions(-) diff --git a/TODO.md b/TODO.md index ee616d205..4d2e29881 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,18 @@ -1. To move lua_project to dragonfly from helio +1. To move lua_project to dragonfly from helio (DONE) 2. To limit lua stack to something reasonable like 4096. -3. To inject our own allocator to lua to track its memory. \ No newline at end of file +3. To inject our own allocator to lua to track its memory. + + +## Object lifecycle and thread-safety. + +Currently our transactional and locking model is based on an assumption that any READ or WRITE +access to objects must be performed in a shard where they belong. + +However, this assumption can be relaxed to get significant gains for read-only queries. + +### Explanation +Our transactional framework prevents from READ-locked objects to be mutated. It does not prevent from their PrimaryTable to grow or change, of course. These objects can move to different entries inside the table. However, our CompactObject maintains the following property - its reference CompactObject.AsRef() is valid no matter where the master object moves and it's valid and safe for reading even from other threads. The exception regarding thread safety is SmallString which uses translation table for its pointers. + +If we change the SmallString translation table to be global and thread-safe (it should not have lots of write contention anyway) we may access primetable keys and values from another thread and write them directly to sockets. + +Use-case: large strings that need to be copied. Sets that need to be serialized for SMEMBERS/HGETALL commands etc. Additional complexity - we will need to lock those variables even for single hop transactions and unlock them afterwards. The unlocking hop does not need to increase user-visible latency since it can be done after we send reply to the socket. \ No newline at end of file diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index bbcc6fcd2..afedaa96d 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -202,7 +202,7 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view return make_pair(it, ExpireIterator{}); } -OpResult> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) { +OpResult> DbSlice::FindFirst(DbIndex db_index, ArgSlice args) { DCHECK(!args.empty()); for (unsigned i = 0; i < args.size(); ++i) { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 53ae8c472..3f6d01c69 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -124,9 +124,9 @@ class DbSlice { // Returns (value, expire) dict entries if key exists, null if it does not exist or has expired. std::pair FindExt(DbIndex db_ind, std::string_view key) const; - // Returns dictEntry, args-index if found, KEY_NOTFOUND otherwise. + // Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise. // If multiple keys are found, returns the first index in the ArgSlice. - OpResult> FindFirst(DbIndex db_index, const ArgSlice& args); + OpResult> FindFirst(DbIndex db_index, ArgSlice args); // Return .second=true if insertion ocurred, false if we return the existing key. std::pair AddOrFind(DbIndex db_ind, std::string_view key); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 1563cf984..261d8919b 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -114,6 +114,75 @@ bool ElemCompare(const quicklistEntry& entry, string_view elem) { return elem == an.Piece(); } +using FFResult = pair; // key, argument index. + +struct ShardFFResult { + PrimeKey key; + ShardId sid = kInvalidSid; +}; + +OpResult FindFirst(Transaction* trans) { + VLOG(2) << "FindFirst::Find " << trans->DebugId(); + + // Holds Find results: (iterator to a found key, and its index in the passed arguments). + // See DbSlice::FindFirst for more details. + // spans all the shards for now. + std::vector> find_res(trans->shard_set()->size()); + fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); + + auto cb = [&find_res](auto* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + OpResult> ff_res = + shard->db_slice().FindFirst(t->db_index(), args); + + if (ff_res) { + FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); + find_res[shard->shard_id()] = move(ff_result); + } else { + find_res[shard->shard_id()] = ff_res.status(); + } + return OpStatus::OK; + }; + + trans->Execute(move(cb), false); + + uint32_t min_arg_indx = UINT32_MAX; + + ShardFFResult shard_result; + + for (size_t sid = 0; sid < find_res.size(); ++sid) { + const auto& fr = find_res[sid]; + auto status = fr.status(); + if (status == OpStatus::KEY_NOTFOUND) + continue; + + if (status == OpStatus::WRONG_TYPE) { + return status; + } + + CHECK(fr); + + const auto& it_pos = fr.value(); + + size_t arg_indx = trans->ReverseArgIndex(sid, it_pos.second); + if (arg_indx < min_arg_indx) { + min_arg_indx = arg_indx; + shard_result.sid = sid; + + // we do not dereference the key, do not extract the string value, so it it + // ok to just move it. We can not dereference it due to limitations of SmallString + // that rely on thread-local data-structure for pointer translation. + shard_result.key = it_pos.first.AsRef(); + } + } + + if (shard_result.sid == kInvalidSid) { + return OpStatus::KEY_NOTFOUND; + } + + return OpResult{move(shard_result)}; +} + class BPopper { public: explicit BPopper(ListDir dir); @@ -122,22 +191,18 @@ class BPopper { // If OK is returned then use result() to fetch the value. OpStatus Run(Transaction* t, unsigned msec); + + // returns (key, value) pair. auto result() const { return make_pair(key_, value_); } - bool found() const { - return found_; - } - private: OpStatus Pop(Transaction* t, EngineShard* shard); ListDir dir_; - bool found_ = false; - PrimeIterator find_it_; - ShardId find_sid_ = std::numeric_limits::max(); + ShardFFResult ff_result_; string key_; string value_; @@ -158,7 +223,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { auto* stats = ServerState::tl_connection_stats(); - OpResult result = t->FindFirst(); + OpResult result = FindFirst(t); if (result.status() == OpStatus::KEY_NOTFOUND) { if (is_multi) { @@ -169,6 +234,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { return OpStatus::TIMED_OUT; } + // Block ++stats->num_blocked_clients; bool wait_succeeded = t->WaitOnWatch(tp); --stats->num_blocked_clients; @@ -176,7 +242,8 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { if (!wait_succeeded) return OpStatus::TIMED_OUT; - result = t->FindFirst(); // retry - must find something. + // Now we have something for sure. + result = FindFirst(t); // retry - must find something. } if (!result) { @@ -185,9 +252,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { } VLOG(1) << "Popping an element"; - find_sid_ = result->sid; - find_it_ = result->find_res; - found_ = true; + ff_result_ = move(result.value()); auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); }; t->Execute(std::move(cb), true); @@ -196,18 +261,20 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { } OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { - DCHECK(found()); + if (shard->shard_id() == ff_result_.sid) { + ff_result_.key.GetString(&key_); - if (shard->shard_id() == find_sid_) { - find_it_->first.GetString(&key_); - - quicklist* ql = GetQL(find_it_->second); + auto it_res = shard->db_slice().Find(t->db_index(), key_, OBJ_LIST); + CHECK(it_res); // must exist and must be ok. + PrimeIterator it = *it_res; + quicklist* ql = GetQL(it->second); value_ = ListPop(dir_, ql); if (quicklistCount(ql) == 0) { - CHECK(shard->db_slice().Del(t->db_index(), find_it_)); + CHECK(shard->db_slice().Del(t->db_index(), it)); } } + return OpStatus::OK; } @@ -242,9 +309,10 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { string_view dest = ArgS(args, 2); OpResult result; - if (dest == src) { + + if (cntx->transaction->unique_shard_cnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRPopLPushSingleKey(OpArgs{shard, t->db_index()}, src); + return OpRPopLPushSingleShard(OpArgs{shard, t->db_index()}, src, dest); }; result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -446,11 +514,12 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn OpStatus result = popper.Run(transaction, unsigned(timeout * 1000)); if (result == OpStatus::OK) { - CHECK(popper.found()); - VLOG(1) << "BLPop returned "; - auto res = popper.result(); + + VLOG(1) << "BLPop returned from " << res.first; // key. + std::string_view str_arr[2] = {res.first, res.second}; + return (*cntx)->SendStringArr(str_arr); } @@ -550,7 +619,7 @@ OpResult ListFamily::OpPush(const OpArgs& op_args, std::string_view ke } else { tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key); } - quicklist* ql; + quicklist* ql = nullptr; if (new_key) { robj* o = createQuicklistObject(); @@ -572,10 +641,12 @@ OpResult ListFamily::OpPush(const OpArgs& op_args, std::string_view ke quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos); } - if (new_key && es->blocking_controller()) { - string tmp; - string_view key = it->first.GetSlice(&tmp); - es->blocking_controller()->AwakeWatched(op_args.db_ind, key); + if (new_key) { + if (es->blocking_controller()) { + string tmp; + string_view key = it->first.GetSlice(&tmp); + es->blocking_controller()->AwakeWatched(op_args.db_ind, key); + } } else { es->db_slice().PostUpdate(op_args.db_ind, it); } @@ -811,17 +882,54 @@ OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view return str_vec; } -OpResult ListFamily::OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key) { +OpResult ListFamily::OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, + string_view dest) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); - if (!it_res) - return it_res.status(); + auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST); + if (!src_res) + return src_res.status(); + + PrimeIterator src_it = *src_res; + quicklist* src_ql = GetQL(src_it->second); + + if (src == dest) { // simple case. + db_slice.PreUpdate(op_args.db_ind, src_it); + string val = ListPop(ListDir::RIGHT, src_ql); + + quicklistPushHead(src_ql, val.data(), val.size()); + db_slice.PostUpdate(op_args.db_ind, src_it); + + return val; + } + + quicklist* dest_ql = nullptr; + auto [dest_it, created] = db_slice.AddOrFind(op_args.db_ind, dest); + if (created) { + robj* obj = createQuicklistObject(); + dest_ql = (quicklist*)obj->ptr; + quicklistSetOptions(dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth); + dest_it->second.ImportRObj(obj); + + // Insertion of dest could invalidate src_it. Find it again. + src_it = db_slice.GetTables(op_args.db_ind).first->Find(src); + } else { + if (dest_it->second.ObjType() != OBJ_LIST) + return OpStatus::WRONG_TYPE; + + dest_ql = GetQL(dest_it->second); + db_slice.PreUpdate(op_args.db_ind, dest_it); + } + + db_slice.PreUpdate(op_args.db_ind, src_it); + string val = ListPop(ListDir::RIGHT, src_ql); + quicklistPushHead(dest_ql, val.data(), val.size()); + db_slice.PostUpdate(op_args.db_ind, src_it); + db_slice.PostUpdate(op_args.db_ind, dest_it); + + if (quicklistCount(src_ql) == 0) { + CHECK(db_slice.Del(op_args.db_ind, src_it)); + } - PrimeIterator it = *it_res; - quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(op_args.db_ind, it); - string val = ListPop(ListDir::RIGHT, ql); - quicklistPushHead(ql, val.data(), val.size()); return val; } diff --git a/src/server/list_family.h b/src/server/list_family.h index 25aaf15fa..c32f75a77 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -63,7 +63,8 @@ class ListFamily { static OpResult OpRange(const OpArgs& op_args, std::string_view key, long start, long end); - static OpResult OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key); + static OpResult OpRPopLPushSingleShard(const OpArgs& op_args, std::string_view src, + std::string_view dest); }; } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 8a206585f..ceb244049 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -27,72 +27,6 @@ std::atomic_uint64_t op_seq{1}; } // namespace -struct Transaction::FindFirstProcessor { - public: - FindFirstProcessor(TxId notify, unsigned size) - : find_res_(size, OpStatus::KEY_NOTFOUND), notify_txid_(notify) { - } - - void Find(Transaction* t); - - OpResult Process(Transaction* t); - - private: - OpStatus RunInShard(Transaction* t, EngineShard* shard); - - // Holds Find results: (iterator to a found key, and its index in the passed arguments). - // See DbSlice::FindFirst for more details. - // spans all the shards for now. - std::vector>> find_res_; - TxId notify_txid_; -}; - -void Transaction::FindFirstProcessor::Find(Transaction* t) { - VLOG(2) << "FindFirst::Find " << t->DebugId(); - - t->Execute([this](auto* t, auto* s) { return RunInShard(t, s); }, false); -} - -OpStatus Transaction::FindFirstProcessor::RunInShard(Transaction* t, EngineShard* shard) { - if (notify_txid_ == kuint64max || shard->committed_txid() == notify_txid_) { - // TODO: to add timestamp logic that provides consistency guarantees for blocking transactions. - auto args = t->ShardArgsInShard(shard->shard_id()); - find_res_[shard->shard_id()] = shard->db_slice().FindFirst(t->db_index(), args); - } - return OpStatus::OK; -} - -OpResult Transaction::FindFirstProcessor::Process(Transaction* t) { - uint32_t min_arg_indx = UINT32_MAX; - - FindFirstResult result; - for (size_t sid = 0; sid < find_res_.size(); ++sid) { - const auto& fr = find_res_[sid]; - auto status = fr.status(); - if (status == OpStatus::KEY_NOTFOUND) - continue; - - if (status == OpStatus::WRONG_TYPE) { - return status; - } - - DCHECK(fr && IsValid(fr->first)); - const auto& it_pos = fr.value(); - - size_t arg_indx = t->ReverseArgIndex(sid, it_pos.second); - if (arg_indx < min_arg_indx) { - min_arg_indx = arg_indx; - result.sid = sid; - result.find_res = it_pos.first; - } - } - - if (result.sid == kInvalidSid) { - return OpStatus::KEY_NOTFOUND; - } - return result; -} - IntentLock::Mode Transaction::Mode() const { return (cid_->opt_mask() & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } @@ -1194,12 +1128,4 @@ void Transaction::BreakOnClose() { } } -auto Transaction::FindFirst() -> OpResult { - FindFirstProcessor processor(notify_txid_.load(memory_order_relaxed), ess_->size()); - - processor.Find(this); - - return processor.Process(this); -} - } // namespace dfly diff --git a/src/server/transaction.h b/src/server/transaction.h index e31fd4974..4ffb84177 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -186,17 +186,6 @@ class Transaction { //! Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; - // TODO: iterators do not survive between hops. - // It could happen that FindFirst returns a result but then a different transaction - // grows the table and invalidates find_res. We should return a key, unfortunately, - // and not the iterator. - struct FindFirstResult { - PrimeIterator find_res; - ShardId sid = kInvalidSid; - }; - - OpResult FindFirst(); - private: unsigned SidToId(ShardId sid) const { return sid < shard_data_.size() ? sid : 0; @@ -243,8 +232,6 @@ class Transaction { return use_count_.load(std::memory_order_relaxed); } - struct FindFirstProcessor; - struct PerShardData { uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0;