From bc92ace19c139c12177c03e60044e95366a5331d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 14 Apr 2022 21:31:31 +0300 Subject: [PATCH] Add safe cursor API to dash table --- src/core/compact_object.cc | 22 ++++++-------- src/core/compact_object.h | 2 ++ src/core/dash.h | 29 +++++++++++++----- src/core/dash_internal.h | 52 +++++++++++++++++++++++++++++---- src/core/dash_test.cc | 5 ++-- src/core/external_alloc.cc | 4 +++ src/core/external_alloc.h | 11 +++++++ src/core/external_alloc_test.cc | 5 ++-- src/server/debugcmd.cc | 3 ++ src/server/generic_family.cc | 6 ++-- src/server/snapshot.cc | 8 ++--- 11 files changed, 109 insertions(+), 38 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 4aa94c0d6..0de870fdf 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -45,7 +45,7 @@ size_t DictMallocSize(dict* d) { size_t res = zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) + znallocx(sizeof(dict)); - return res = dictSize(d) * 16; // approximation. + return res + dictSize(d) * 16; // approximation. } inline void FreeObjSet(unsigned encoding, void* ptr, pmr::memory_resource* mr) { @@ -68,9 +68,9 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) { return 0; // TODO case kEncodingIntSet: return intsetBlobLen((intset*)ptr); - default: - LOG(FATAL) << "Unknown set encoding type " << encoding; } + + LOG(DFATAL) << "Unknown set encoding type " << encoding; return 0; } @@ -80,9 +80,8 @@ size_t MallocUsedHSet(unsigned encoding, void* ptr) { return lpBytes(reinterpret_cast(ptr)); case OBJ_ENCODING_HT: return DictMallocSize((dict*)ptr); - default: - LOG(FATAL) << "Unknown set encoding type " << encoding; } + LOG(DFATAL) << "Unknown set encoding type " << encoding; return 0; } @@ -93,10 +92,9 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) { case OBJ_ENCODING_SKIPLIST: { zset* zs = (zset*)ptr; return DictMallocSize(zs->dict); - } break; - default: - LOG(FATAL) << "Unknown set encoding type " << encoding; + } } + LOG(DFATAL) << "Unknown set encoding type " << encoding; return 0; } @@ -217,7 +215,7 @@ size_t RobjWrapper::MallocUsed() const { CHECK_EQ(OBJ_ENCODING_RAW, encoding_); return InnerObjMallocUsed(); case OBJ_LIST: - CHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST); + DCHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST); return QlMAllocSize((quicklist*)inner_obj_); case OBJ_SET: return MallocUsedSet(encoding_, inner_obj_); @@ -371,7 +369,6 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_ inner_obj_ = newp; } - #pragma GCC push_options #pragma GCC optimize("Ofast") @@ -694,8 +691,7 @@ void CompactObj::SetString(std::string_view str) { if (rev_len == str.size()) { mask |= ASCII2_ENC_BIT; // str hits its highest bound. } else { - CHECK_EQ(str.size(), rev_len - 1) - << "Bad ascii encoding for len " << str.size(); + CHECK_EQ(str.size(), rev_len - 1) << "Bad ascii encoding for len " << str.size(); mask |= ASCII1_ENC_BIT; } @@ -845,7 +841,7 @@ size_t CompactObj::MallocUsed() const { return u_.small_str.MallocUsed(); } - LOG(FATAL) << "TBD"; + LOG(DFATAL) << "should not reach"; return 0; } diff --git a/src/core/compact_object.h b/src/core/compact_object.h index e6741bbad..cf2e8ebe4 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -225,6 +225,8 @@ class CompactObj { void SetString(std::string_view str); void GetString(std::string* res) const; + // In case this object a single blob, returns number of bytes allocated on heap + // for that blob. Otherwise returns 0. size_t MallocUsed() const; // Resets the object to empty state. diff --git a/src/core/dash.h b/src/core/dash.h index e176653f8..8f2695756 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -183,13 +183,21 @@ class DashTable : public detail::DashTableBase { return !(lhs == rhs); } - // debug accessors. + // Bucket resolution cursor that is safe to use with insertions/removals. + // Serves as a hint really to the placement of the original item, i.e. the item + // could have moved. + detail::DashCursor bucket_cursor() const { + return detail::DashCursor(owner_->global_depth_, seg_id_, bucket_id_); + } + unsigned bucket_id() const { return bucket_id_; } + unsigned slot_id() const { return slot_id_; } + unsigned segment_id() const { return seg_id_; } @@ -200,6 +208,7 @@ class DashTable : public detail::DashTableBase { using const_bucket_iterator = Iterator; using bucket_iterator = Iterator; + using cursor = detail::DashCursor; struct EvictionBuckets { bucket_iterator iter[2 + Policy::kStashBucketNum]; @@ -305,7 +314,7 @@ class DashTable : public detail::DashTableBase { // It guarantees that if key exists at the beginning of traversal, stays in the table during the // traversal, it will eventually reach it even when the table shrinks or grows. // Returns: cursor that is guaranteed to be less than 2^40. - template uint64_t Traverse(uint64_t cursor, Cb&& cb); + template cursor Traverse(cursor curs, Cb&& cb); // Takes an iterator pointing to an entry in a dash bucket and traverses all bucket's entries by // calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket. @@ -315,6 +324,10 @@ class DashTable : public detail::DashTableBase { return const_bucket_iterator{it.owner_, it.seg_id_, it.bucket_id_, 0}; } + const_bucket_iterator CursorToBucketIt(cursor c) const { + return const_bucket_iterator{this, c.segment_id(global_depth_), c.bucket_id(), 0}; + } + // Capture Version Change. Runs cb(it) on every bucket! (not entry) in the table whose version // would potentially change upon insertion of 'k'. // In practice traversal is limited to a single segment. The operation is read-only and @@ -673,13 +686,13 @@ void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id) { template template -uint64_t DashTable<_Key, _Value, Policy>::Traverse(uint64_t cursor, Cb&& cb) { - unsigned bid = cursor & 0xFF; - - if (bid >= kLogicalBucketNum) // sanity. +auto DashTable<_Key, _Value, Policy>::Traverse(cursor curs, Cb&& cb) -> cursor { + if (curs.bucket_id() >= kLogicalBucketNum) // sanity. return 0; - uint32_t sid = cursor >> (40 - global_depth_); + uint32_t sid = curs.segment_id(global_depth_); + uint8_t bid = curs.bucket_id(); + auto hash_fun = [this](const auto& k) { return policy_.HashFn(k); }; bool fetched = false; @@ -700,7 +713,7 @@ uint64_t DashTable<_Key, _Value, Policy>::Traverse(uint64_t cursor, Cb&& cb) { } } while (!fetched); - return (uint64_t(sid) << (40 - global_depth_)) | bid; + return cursor{global_depth_, sid, bid}; } template diff --git a/src/core/dash_internal.h b/src/core/dash_internal.h index 1ea65de9e..ed24375c2 100644 --- a/src/core/dash_internal.h +++ b/src/core/dash_internal.h @@ -532,14 +532,14 @@ class DashTableBase { public: explicit DashTableBase(uint32_t gd) - : initial_depth_(gd), global_depth_(gd), unique_segments_(1 << gd) { + : unique_segments_(1 << gd), initial_depth_(gd), global_depth_(gd) { } uint32_t unique_segments() const { return unique_segments_; } - uint32_t depth() const { + uint16_t depth() const { return global_depth_; } @@ -556,10 +556,10 @@ class DashTableBase { return 0; } - uint32_t initial_depth_; - uint32_t global_depth_; - uint32_t unique_segments_; size_t size_ = 0; + uint32_t unique_segments_; + uint8_t initial_depth_; + uint8_t global_depth_; }; // DashTableBase template class IteratorPair { @@ -579,6 +579,48 @@ template class IteratorPair { _Value& second; }; +// Represents a cursor that points to a bucket in dash table. +// One major difference with iterator is that the cursor survives dash table resizes and +// will always point to the most appropriate segment with the same bucket. +// It uses 40 lsb bits out of 64 assuming that number of segments does not cross 4B. +// It's a reasonable assumption in shared nothing architecture when we usually have no more than +// 32GB per CPU. Each segment spawns hundreds of entries so we can not grow segment table +// to billions. +class DashCursor { + public: + DashCursor(uint64_t val = 0) : val_(val) { + } + + DashCursor(uint8_t depth, uint32_t seg_id, uint8_t bid) + : val_((uint64_t(seg_id) << (40 - depth)) | bid) { + } + + uint8_t bucket_id() const { + return val_ & 0xFF; + } + + // segment_id is padded to the left of 32 bit region: + // | segment_id......| bucket_id + // 40 8 0 + // By using depth we take most significant bits of segment_id if depth has decreased + // since the cursort was created, or extend the least significant bits with zeros if + // depth has increased. + uint32_t segment_id(uint8_t depth) { + return val_ >> (40 - depth); + } + + uint64_t value() const { + return val_; + } + + explicit operator bool() const { + return val_ != 0; + } + + private: + uint64_t val_; +}; + /*********************************************************** * Implementation section. */ diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 1f7f1026a..9fbead70b 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -425,7 +425,8 @@ TEST_F(DashTest, Traverse) { for (size_t i = 0; i < kNumItems; ++i) { dt_.Insert(i, i); } - uint64_t cursor = 0; + + Dash64::cursor cursor; vector nums; auto tr_cb = [&](Dash64::iterator it) { nums.push_back(it->first); @@ -434,7 +435,7 @@ TEST_F(DashTest, Traverse) { do { cursor = dt_.Traverse(cursor, tr_cb); - } while (cursor != 0); + } while (cursor); sort(nums.begin(), nums.end()); nums.resize(unique(nums.begin(), nums.end()) - nums.begin()); ASSERT_EQ(kNumItems, nums.size()); diff --git a/src/core/external_alloc.cc b/src/core/external_alloc.cc index 395af3e56..a9d67f3d2 100644 --- a/src/core/external_alloc.cc +++ b/src/core/external_alloc.cc @@ -265,6 +265,7 @@ int64_t ExternalAllocator::Malloc(size_t sz) { size_t pos = page->free_blocks._Find_first(); page->free_blocks.flip(pos); --page->available; + allocated_bytes_ += ToBlockSize(page->block_size_bin); SegmentDescr* seg = ToSegDescr(page); return seg->BlockOffset(page, pos); @@ -298,6 +299,7 @@ void ExternalAllocator::Free(size_t offset, size_t sz) { if (page->available == blocks_num) { FreePage(page, seg, block_size); } + allocated_bytes_ -= block_size; } void ExternalAllocator::AddStorage(size_t offset, size_t size) { @@ -319,6 +321,8 @@ void ExternalAllocator::AddStorage(size_t offset, size_t size) { if (next != added_segs_.end()) { CHECK_LE(offset + size, next->first); } + + capacity_ += size; } size_t ExternalAllocator::GoogSize(size_t sz) { diff --git a/src/core/external_alloc.h b/src/core/external_alloc.h index 46c7cc82d..c32ec9a43 100644 --- a/src/core/external_alloc.h +++ b/src/core/external_alloc.h @@ -69,6 +69,14 @@ class ExternalAllocator { // No allocation is done. static size_t GoogSize(size_t sz); + size_t capacity() const { + return capacity_; + } + + size_t allocated_bytes() const { + return allocated_bytes_; + } + private: class SegmentDescr; using Page = detail::Page; @@ -88,6 +96,9 @@ class ExternalAllocator { // weird queue to support AddStorage interface. We can not instantiate segment // until we know its class and that we know only when a page is demanded. absl::btree_map added_segs_; + + size_t capacity_ = 0; // in bytes. + size_t allocated_bytes_ = 0; }; } // namespace dfly diff --git a/src/core/external_alloc_test.cc b/src/core/external_alloc_test.cc index 84506d007..53795de30 100644 --- a/src/core/external_alloc_test.cc +++ b/src/core/external_alloc_test.cc @@ -46,7 +46,6 @@ TEST_F(ExternalAllocatorTest, Invariants) { std::map ranges; int64_t res = 0; - size_t sum = 0; while (res >= 0) { for (unsigned j = 1; j < 5; ++j) { size_t sz = 4000 * j; @@ -55,10 +54,10 @@ TEST_F(ExternalAllocatorTest, Invariants) { break; auto [it, added] = ranges.emplace(res, sz); ASSERT_TRUE(added); - sum += sz; } } - EXPECT_GT(sum, kSegSize / 2); + + EXPECT_GT(ext_alloc_.allocated_bytes(), ext_alloc_.capacity() * 0.75); off_t last = 0; for (const auto& k_v : ranges) { diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 96b550f66..7f293e3ae 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -124,6 +124,7 @@ void DebugCmd::Reload(CmdArgList args) { CHECK_NOTNULL(cid); intrusive_ptr trans(new Transaction{cid, &ess}); trans->InitByArgs(0, {}); + VLOG(1) << "Performing save"; ec = sf_.DoSave(trans.get(), &err_details); if (ec) { return (*cntx_)->SendError(absl::StrCat(err_details, ec.message())); @@ -133,6 +134,7 @@ void DebugCmd::Reload(CmdArgList args) { const CommandId* cid = sf_.service().FindCmd("FLUSHALL"); intrusive_ptr flush_trans(new Transaction{cid, &ess}); flush_trans->InitByArgs(0, {}); + VLOG(1) << "Performing flush"; ec = sf_.DoFlush(flush_trans.get(), DbSlice::kDbAll); if (ec) { LOG(ERROR) << "Error flushing db " << ec.message(); @@ -154,6 +156,7 @@ void DebugCmd::Reload(CmdArgList args) { return; } + VLOG(1) << "Performing load"; io::FileSource fs(*res); RdbLoader loader(&ess); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 94d71feb7..a8679567d 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -642,15 +642,15 @@ void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_vi VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has " << db_slice.DbSize(op_args.db_ind); - uint64_t cur = *cursor; + PrimeTable::cursor cur = *cursor; auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind); do { cur = prime_table->Traverse( cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, pattern, type_filter, vec); }); } while (cur && cnt < limit); - VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur; - *cursor = cur; + VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); + *cursor = cur.value(); } bool GenericFamily::ScanCb(const OpArgs& op_args, PrimeIterator it, string_view pattern, diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 10f982deb..c8015ab3b 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -71,7 +71,6 @@ void SliceSnapshot::Join() { static_assert(sizeof(PrimeTable::const_iterator) == 16); void SliceSnapshot::SerializeSingleEntry(PrimeIterator it) { - uint64_t expire_time = 0; if (it->second.HasExpire()) { auto eit = expire_tbl_->Find(it->first); @@ -86,7 +85,7 @@ void SliceSnapshot::SerializeSingleEntry(PrimeIterator it) { void SliceSnapshot::FiberFunc() { this_fiber::properties().set_name( absl::StrCat("SliceSnapshot", ProactorBase::GetIndex())); - uint64_t cursor = 0; + PrimeTable::cursor cursor; static_assert(PHYSICAL_LEN > PrimeTable::kPhysicalBucketNum); uint64_t last_yield = 0; @@ -94,7 +93,8 @@ void SliceSnapshot::FiberFunc() { // Traverse a single logical bucket but do not update its versions. // we can not update a version because entries in the same bucket share part of the version. // Therefore we save first, and then update version in one atomic swipe. - uint64_t next = prime_table_->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); }); + PrimeTable::cursor next = + prime_table_->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); }); cursor = next; physical_mask_.reset(); @@ -109,7 +109,7 @@ void SliceSnapshot::FiberFunc() { // flush in case other fibers (writes commands that pushed previous values) filled the file. FlushSfile(false); } - } while (cursor > 0); + } while (cursor); DVLOG(1) << "after loop " << this_fiber::properties().name(); FlushSfile(true);