Fix bugs related to concurrency when saving multiple databases under … (#82)

Fix bugs related to concurrency when saving multiple databases under write load
This commit is contained in:
Roman Gershman 2022-06-03 00:52:38 +03:00 committed by GitHub
parent ec9754150f
commit 89baa5bfa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 64 deletions

View File

@ -32,6 +32,9 @@ using facade::ArgS;
using ArgSlice = absl::Span<const std::string_view>;
using StringVec = std::vector<std::string>;
// keys are RDB_TYPE_xxx constants.
using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
constexpr DbIndex kInvalidDbId = DbIndex(-1);
constexpr ShardId kInvalidSid = ShardId(-1);
constexpr DbIndex kMaxDbId = 1024; // Reasonable starting point.

View File

@ -172,14 +172,17 @@ error_code RdbSerializer::SelectDb(uint32_t dbid) {
}
// Called by snapshot
error_code RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms) {
io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv,
uint64_t expire_ms) {
uint8_t buf[16];
error_code ec;
/* Save the expire time */
if (expire_ms > 0) {
buf[0] = RDB_OPCODE_EXPIRETIME_MS;
absl::little_endian::Store64(buf + 1, expire_ms);
RETURN_ON_ERR(WriteRaw(Bytes{buf, 9}));
ec = WriteRaw(Bytes{buf, 9});
if (ec)
return make_unexpected(ec);
}
string_view key = pk.GetSlice(&tmp_str_);
@ -189,20 +192,28 @@ error_code RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv, ui
DVLOG(3) << "Saving keyval start " << key;
++type_freq_map_[rdb_type];
RETURN_ON_ERR(WriteOpcode(rdb_type));
ec = WriteOpcode(rdb_type);
if (ec)
return make_unexpected(ec);
RETURN_ON_ERR(SaveString(key));
ec = SaveString(key);
if (ec)
return make_unexpected(ec);
if (obj_type == OBJ_STRING) {
auto opt_int = pv.TryGetInt();
if (opt_int) {
return SaveLongLongAsString(*opt_int);
ec = SaveLongLongAsString(*opt_int);
} else {
ec = SaveString(pv.GetSlice(&tmp_str_));
}
return SaveString(pv.GetSlice(&tmp_str_));
} else {
ec = SaveObject(pv);
}
return SaveObject(pv);
if (ec)
return make_unexpected(ec);
return rdb_type;
}
error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
@ -662,7 +673,7 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
if (freq_map) {
freq_map->clear();
for (auto& ptr : impl_->shard_snapshots) {
const RdbTypeFreqMap& src_map = ptr->serializer()->type_freq_map();
const RdbTypeFreqMap& src_map = ptr->freq_map();
for (const auto& k_v : src_map)
(*freq_map)[k_v.first] += k_v.second;
}

View File

@ -20,9 +20,6 @@ namespace dfly {
class EngineShard;
// keys are RDB_TYPE_xxx constants.
using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
class AlignedBuffer {
public:
AlignedBuffer(size_t cap, ::io::Sink* upstream);
@ -72,7 +69,6 @@ class RdbSaver {
std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val);
AlignedBuffer aligned_buf_;
std::unique_ptr<Impl> impl_;
};
@ -98,7 +94,8 @@ class RdbSerializer {
std::error_code SelectDb(uint32_t dbid);
// Must be called in the thread to which `it` belongs.
std::error_code SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
// Returns the serialized rdb_type or the error.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code SaveString(std::string_view val);
@ -110,10 +107,6 @@ class RdbSerializer {
std::error_code FlushMem();
const RdbTypeFreqMap& type_freq_map() const {
return type_freq_map_;
}
private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv);
@ -132,7 +125,6 @@ class RdbSerializer {
base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_;
std::string tmp_str_;
RdbTypeFreqMap type_freq_map_;
};
} // namespace dfly

View File

@ -55,8 +55,10 @@ void SliceSnapshot::Join() {
fb_.join();
}
void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
const PrimeValue& pv) {
// This function should not block and should not preempt because it's called
// from SerializePhysicalBucket which should execute atomically.
void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
RdbSerializer* serializer) {
time_t expire_time = 0;
if (pv.HasExpire()) {
@ -64,29 +66,9 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk,
expire_time = db_slice_->ExpireTime(eit);
}
if (db_indx != savecb_current_db_) {
FlushSfile(true);
}
error_code ec = rdb_serializer_->SaveEntry(pk, pv, expire_time);
CHECK(!ec); // we write to StringFile.
++num_records_in_blob_;
if (db_indx != savecb_current_db_) {
ec = rdb_serializer_->FlushMem();
CHECK(!ec && !sfile_->val.empty());
string tmp = std::move(sfile_->val);
channel_bytes_ += tmp.size();
DCHECK(!dest_->IsClosing());
DbRecord rec{.db_index = db_indx,
.id = rec_id_,
.num_records = num_records_in_blob_,
.value = std::move(tmp)};
DVLOG(2) << "Pushed " << rec_id_;
++rec_id_;
num_records_in_blob_ = 0;
dest_->Push(std::move(rec));
}
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time);
CHECK(res); // we write to StringFile.
++type_freq_map_[*res];
}
// Serializes all the entries with version less than snapshot_version_.
@ -103,12 +85,11 @@ void SliceSnapshot::FiberFunc() {
VLOG(1) << "Start traversing " << pt->size() << " items";
uint64_t last_yield = 0;
mu_.lock();
savecb_current_db_ = db_indx;
mu_.unlock();
do {
// Traverse a single logical bucket but do not update its versions.
// we can not update a version because entries in the same bucket share part of the version.
// Therefore we save first, and then update version in one atomic swipe.
PrimeTable::cursor next = pt->Traverse(cursor, [this](auto it) { this->SaveCb(move(it)); });
cursor = next;
@ -131,6 +112,10 @@ void SliceSnapshot::FiberFunc() {
FlushSfile(true);
} // for (dbindex)
// stupid barrier to make sure that SerializePhysicalBucket finished.
// Can not think of anything more elegant.
mu_.lock();
mu_.unlock();
dest_->StartClosing();
VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/"
@ -158,9 +143,9 @@ bool SliceSnapshot::FlushSfile(bool force) {
string tmp = std::move(sfile_->val); // important to move before pushing!
channel_bytes_ += tmp.size();
DbRecord rec{.db_index = savecb_current_db_,
.id = rec_id_,
.num_records = num_records_in_blob_,
.value = std::move(tmp)};
.id = rec_id_,
.num_records = num_records_in_blob_,
.value = std::move(tmp)};
DVLOG(2) << "Pushed " << rec_id_;
++rec_id_;
num_records_in_blob_ = 0;
@ -193,7 +178,7 @@ bool SliceSnapshot::SaveCb(PrimeIterator it) {
uint64_t v = it.GetVersion();
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(2) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++skipped_;
return false;
@ -226,13 +211,38 @@ unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bu
// traverse physical bucket and write it into string file.
it.SetVersion(snapshot_version_);
unsigned result = 0;
string tmp;
while (!it.is_done()) {
++result;
SerializeSingleEntry(db_index, it->first, it->second);
++it;
}
lock_guard lk(mu_);
if (db_index == savecb_current_db_) {
while (!it.is_done()) {
++result;
SerializeSingleEntry(db_index, it->first, it->second, rdb_serializer_.get());
++it;
}
num_records_in_blob_ += result;
} else {
io::StringFile sfile;
RdbSerializer tmp_serializer(&sfile);
while (!it.is_done()) {
++result;
SerializeSingleEntry(db_index, it->first, it->second, &tmp_serializer);
++it;
}
error_code ec = tmp_serializer.FlushMem();
CHECK(!ec && !sfile.val.empty());
string tmp = std::move(sfile.val);
channel_bytes_ += tmp.size();
DbRecord rec{
.db_index = db_index, .id = rec_id_, .num_records = result, .value = std::move(tmp)};
DVLOG(2) << "Pushed " << rec_id_;
++rec_id_;
dest_->Push(std::move(rec));
}
return result;
}

View File

@ -7,8 +7,8 @@
#include <bitset>
#include "io/file.h"
#include "server/table.h"
#include "server/db_slice.h"
#include "server/table.h"
#include "util/fibers/simple_channel.h"
namespace dfly {
@ -39,14 +39,23 @@ class SliceSnapshot {
return snapshot_version_;
}
RdbSerializer* serializer() { return rdb_serializer_.get(); }
RdbSerializer* serializer() {
return rdb_serializer_.get();
}
size_t channel_bytes() const { return channel_bytes_;}
size_t channel_bytes() const {
return channel_bytes_;
}
const RdbTypeFreqMap& freq_map() const {
return type_freq_map_;
}
private:
void FiberFunc();
bool FlushSfile(bool force);
void SerializeSingleEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv);
void SerializeSingleEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
RdbSerializer* serializer);
bool SaveCb(PrimeIterator it);
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
@ -58,17 +67,18 @@ class SliceSnapshot {
::boost::fibers::fiber fb_;
DbTableArray db_array_;
RdbTypeFreqMap type_freq_map_;
std::unique_ptr<io::StringFile> sfile_;
std::unique_ptr<RdbSerializer> rdb_serializer_;
boost::fibers::mutex mu_;
// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
DbSlice* db_slice_;
DbIndex savecb_current_db_; // used by SaveCb
RecordChannel* dest_;
size_t channel_bytes_ = 0;
::size_t channel_bytes_ = 0;
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
uint64_t rec_id_ = 0;
uint32_t num_records_in_blob_ = 0;