fix(server): Fix #207 (#208)

1. Erase expiry data from the expire table in case of evictions.
2. Add test that covers the bug.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-07-15 15:14:20 +03:00 committed by GitHub
parent d2b7987ac3
commit 05eb323a0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 52 additions and 5 deletions

View File

@ -1175,7 +1175,7 @@ sds getMemoryDoctorReport(void) {
return s;
}
#endif
/* Set the object LRU/LFU depending on server.maxmemory_policy.
* The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU.
@ -1210,3 +1210,4 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
return 0;
}
#endif

View File

@ -124,6 +124,10 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
auto last_slot_it = bucket_it;
last_slot_it += (PrimeTable::kBucketWidth - 1);
if (!last_slot_it.is_done()) {
if (last_slot_it->second.HasExpire()) {
ExpireTable* expire_tbl = db_slice_->GetTables(db_indx_).second;
CHECK_EQ(1u, expire_tbl->Erase(last_slot_it->first));
}
UpdateStatsOnDeletion(last_slot_it, db_slice_->MutableStats(db_indx_));
}
CHECK(me->ShiftRight(bucket_it));
@ -446,7 +450,6 @@ bool DbSlice::UpdateExpire(DbIndex db_ind, PrimeIterator it, uint64_t at) {
if (!it->second.HasExpire() && at) {
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
it->second.SetExpire(true);

View File

@ -237,6 +237,10 @@ class DbSlice {
return db_arr_;
}
void TEST_EnableCacheMode() {
caching_mode_ = 1;
}
private:
void CreateDb(DbIndex index);

View File

@ -7,6 +7,7 @@ extern "C" {
#include "redis/zmalloc.h"
}
#include <absl/flags/reflection.h>
#include <absl/strings/ascii.h>
#include <absl/strings/str_join.h>
#include <absl/strings/strip.h>
@ -444,7 +445,7 @@ TEST_F(DflyEngineTest, OOM) {
max_memory_limit = 0;
size_t i = 0;
RespExpr resp;
for (; i < 10000; i += 3) {
for (; i < 5000; i += 3) {
resp = Run({"mset", StrCat("key", i), "bar", StrCat("key", i + 1), "bar", StrCat("key", i + 2),
"bar"});
if (resp != "OK")
@ -464,7 +465,7 @@ TEST_F(DflyEngineTest, OOM) {
}
run_args.push_back("bar");
for (unsigned i = 0; i < 10000; ++i) {
for (unsigned i = 0; i < 5000; ++i) {
run_args[1] = StrCat("key", cmd, i);
resp = Run(run_args);
@ -477,6 +478,27 @@ TEST_F(DflyEngineTest, OOM) {
}
}
/// Reproduces the case where items with expiry data were evicted,
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode();
max_memory_limit = 0;
ssize_t i = 0;
RespExpr resp;
for (; i < 5000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
// we evict some items because 5000 is too much when max_memory_limit is zero.
ASSERT_EQ(resp, "OK");
}
for (; i > 0; --i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
}
}
TEST_F(DflyEngineTest, PSubscribe) {
single_response_ = false;
auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); });

View File

@ -381,4 +381,8 @@ void EngineShardSet::TEST_EnableHeartBeat() {
RunBriefInParallel([](EngineShard* shard) { shard->TEST_EnableHeartbeat(); });
}
void EngineShardSet::TEST_EnableCacheMode() {
RunBriefInParallel([](EngineShard* shard) { shard->db_slice().TEST_EnableCacheMode(); });
}
} // namespace dfly

View File

@ -226,6 +226,7 @@ class EngineShardSet {
// Used in tests
void TEST_EnableHeartBeat();
void TEST_EnableCacheMode();
private:
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);

View File

@ -501,6 +501,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
builder->SendSetSkipped();
}
/// (P)SETEX key seconds value
void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view ex = ArgS(args, 2);

View File

@ -10,6 +10,9 @@ namespace dfly {
#define ADD(x) (x) += o.x
// It should be const, but we override this variable in our tests so that they run faster.
unsigned kInitSegmentLog = 3;
DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
constexpr size_t kDbSz = sizeof(DbTableStats);
static_assert(kDbSz == 56);
@ -26,7 +29,7 @@ DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
}
DbTable::DbTable(std::pmr::memory_resource* mr)
: prime(2, detail::PrimeTablePolicy{}, mr),
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
expire(0, detail::ExpireTablePolicy{}, mr),
mcflag(0, detail::ExpireTablePolicy{}, mr) {
}

View File

@ -71,6 +71,9 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
void Release(IntentLock::Mode mode, std::string_view key, unsigned count);
};
// We use reference counting semantics of DbTable when doing snapshotting.
// There we need to preserve the copy of the table in case someone flushes it during
// the snapshot process. We copy the pointers in StartSnapshotInShard function.
using DbTableArray = std::vector<boost::intrusive_ptr<DbTable>>;
} // namespace dfly

View File

@ -23,6 +23,9 @@ using namespace std;
ABSL_DECLARE_FLAG(string, dbfilename);
namespace dfly {
extern unsigned kInitSegmentLog;
using MP = MemcacheParser;
using namespace util;
using namespace testing;
@ -113,6 +116,8 @@ BaseFamilyTest::~BaseFamilyTest() {
}
void BaseFamilyTest::SetUpTestSuite() {
kInitSegmentLog = 1;
absl::SetFlag(&FLAGS_dbfilename, "");
init_zmalloc_threadlocal(mi_heap_get_backing());
}