chore: remove atomic<> from ReplicaInfo::state (#2409)

* chore: remove atomic<> from ReplicaInfo::state

This field is protected by ReplicaInfo::mu so non-protected access to it shows a design problem.
Indeed, it was done for being able to access this field without a mutex inside ReplicationLags() function.

I moved the access to this field to GetReplicasRoleInfo where we need to lock ReplicaRoleInfo anyways.
Also, done some cleanups in the file.

Finally, raised a threshold for "tx queue too long" warnings.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-01-13 18:03:29 +02:00 committed by GitHub
parent 484b4de216
commit 7054fc56b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 89 deletions

View File

@ -144,71 +144,6 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(kSyntaxErr);
}
#if 0
void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 2u);
ToUpper(&args[1]);
std::string_view sub_cmd = ArgS(args, 1);
Transaction* trans = cntx->transaction;
DCHECK(trans);
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (sub_cmd == "START") {
unique_lock lk(mu_);
journal::Journal* journal = ServerState::tlocal()->journal();
if (!journal) {
string dir = absl::GetFlag(FLAGS_dir);
atomic_uint32_t created{0};
auto open_cb = [&](EngineShard*) {
auto ec = sf_->journal()->OpenInThread(true, dir);
if (ec) {
LOG(ERROR) << "Could not create journal " << ec;
} else {
created.fetch_add(1, memory_order_relaxed);
}
};
shard_set->RunBlockingInParallel(open_cb);
if (created.load(memory_order_acquire) != shard_set->size()) {
LOG(FATAL) << "TBD / revert";
}
// We can not use transaction distribution mechanism because we must open journal for all
// threads and not only for shards.
trans->Schedule();
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(barrier_cb, true);
// tx id starting from which we may reliably fetch journal records.
journal_txid_ = trans->txid();
}
return rb->SendLong(journal_txid_);
}
if (sub_cmd == "STOP") {
unique_lock lk(mu_);
if (sf_->journal()->EnterLameDuck()) {
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->ScheduleSingleHop(std::move(barrier_cb));
auto ec = sf_->journal()->Close();
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
journal_txid_ = trans->txid();
}
return rb->SendLong(journal_txid_);
}
string reply = UnknownSubCmd(sub_cmd, "DFLY");
return rb->SendError(reply, kSyntaxErrType);
}
#endif
void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
util::ProactorPool* pool = shard_set->pool();
@ -269,7 +204,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
return;
unique_lock lk(replica_ptr->mu);
if (replica_ptr->state.load(memory_order_relaxed) != SyncState::PREPARATION)
if (replica_ptr->replica_state != SyncState::PREPARATION)
return rb->SendError(kInvalidState);
// Set meta info on connection.
@ -346,7 +281,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
LOG(INFO) << "Started sync with replica " << replica_ptr->address << ":"
<< replica_ptr->listening_port;
replica_ptr->state.store(SyncState::FULL_SYNC, memory_order_relaxed);
// protected by lk above.
replica_ptr->replica_state = SyncState::FULL_SYNC;
return rb->SendOk();
}
@ -383,7 +319,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
LOG(INFO) << "Transitioned into stable sync with replica " << replica_ptr->address << ":"
<< replica_ptr->listening_port;
replica_ptr->state.store(SyncState::STABLE_SYNC, memory_order_relaxed);
replica_ptr->replica_state = SyncState::STABLE_SYNC;
return rb->SendOk();
}
@ -658,7 +594,7 @@ void DflyCmd::StopReplication(uint32_t sync_id) {
void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replica_ptr) {
lock_guard lk(replica_ptr->mu);
if (replica_ptr->state.load(memory_order_relaxed) == SyncState::CANCELLED) {
if (replica_ptr->replica_state == SyncState::CANCELLED) {
return;
}
@ -666,7 +602,7 @@ void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replic
<< replica_ptr->listening_port;
// Update replica_ptr state and cancel context.
replica_ptr->state.store(SyncState::CANCELLED, memory_order_release);
replica_ptr->replica_state = SyncState::CANCELLED;
replica_ptr->cntx.Cancel();
// Wait for tasks to finish.
@ -690,7 +626,7 @@ void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replic
}
shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
unique_lock lk(mu_);
lock_guard lk(mu_);
auto it = replica_infos_.find(sync_id);
if (it != replica_infos_.end())
@ -700,14 +636,28 @@ shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() const {
std::vector<ReplicaRoleInfo> vec;
unique_lock lk(mu_);
lock_guard lk(mu_);
vec.reserve(replica_infos_.size());
auto replication_lags = ReplicationLags();
for (const auto& [id, info] : replica_infos_) {
vec.push_back(ReplicaRoleInfo{info->address, info->listening_port,
SyncStateName(info->state.load(memory_order_relaxed)),
replication_lags[id]});
LSN lag = replication_lags[id];
SyncState state = SyncState::PREPARATION;
// If the replica state being updated, its lag is undefined,
// the same applies of course if its state is not STABLE_SYNC.
if (info->mu.try_lock()) {
state = info->replica_state;
// If the replica is not in stable sync, its lag is undefined, so we set it as max.
if (state != SyncState::STABLE_SYNC) {
lag = std::numeric_limits<LSN>::max();
}
info->mu.unlock();
} else {
lag = std::numeric_limits<LSN>::max();
}
vec.push_back(ReplicaRoleInfo{info->address, info->listening_port, SyncStateName(state), lag});
}
return vec;
}
@ -720,6 +670,9 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {
lock_guard lk{stats_mu};
for (const auto& [_, info] : replica_infos_) {
lock_guard repl_lk{info->mu};
// flows should not be empty.
DCHECK(!info->flows.empty());
if (info->flows.empty())
continue;
@ -764,18 +717,15 @@ std::map<uint32_t, LSN> DflyCmd::ReplicationLags() const {
shard_set->RunBriefInParallel([&shard_lags, this](EngineShard* shard) {
auto& lags = shard_lags[shard->shard_id()];
for (const auto& info : replica_infos_) {
if (info.second->state.load() != SyncState::STABLE_SYNC) {
lags[info.first] = std::numeric_limits<LSN>::max();
continue;
}
DCHECK(shard->journal());
int64_t lag =
shard->journal()->GetLsn() - info.second->flows[shard->shard_id()].last_acked_lsn;
DCHECK(lag >= 0);
const ReplicaInfo* replica = info.second.get();
if (shard->journal()) {
int64_t lag = shard->journal()->GetLsn() - replica->flows[shard->shard_id()].last_acked_lsn;
lags[info.first] = lag;
}
}
});
// Merge the maps from all shards and derive the maximum lag for each replica.
std::map<uint32_t, LSN> rv;
for (const auto& lags : shard_lags) {
for (auto [replica_id, lag] : lags) {
@ -793,16 +743,17 @@ void DflyCmd::SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version)
replica_ptr->version = version;
}
bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState expected,
// Must run under locked replica_info.mu.
bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& repl_info, SyncState expected,
RedisReplyBuilder* rb) {
if (sync_info.state != expected) {
if (repl_info.replica_state != expected) {
rb->SendError(kInvalidState);
return false;
}
// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info.flows) {
for (const FlowInfo& flow : repl_info.flows) {
if (!flow.conn) {
rb->SendError(kInvalidState);
return false;

View File

@ -104,20 +104,22 @@ class DflyCmd {
struct ReplicaInfo {
ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port,
Context::ErrHandler err_handler)
: state{SyncState::PREPARATION},
: replica_state{SyncState::PREPARATION},
cntx{std::move(err_handler)},
address{std::move(address)},
listening_port(listening_port),
flows{flow_count} {
}
std::atomic<SyncState> state;
SyncState replica_state; // always guarded by ReplicaInfo::mu
Context cntx;
std::string address;
uint32_t listening_port;
DflyVersion version = DflyVersion::VER0;
// Flows describe the state of shard-local flow.
// They are always indexed by the shard index on the master.
std::vector<FlowInfo> flows;
Mutex mu; // See top of header for locking levels.
};

View File

@ -14,7 +14,7 @@
#include "server/journal/journal.h"
#include "server/server_state.h"
ABSL_FLAG(uint32_t, tx_queue_warning_len, 40,
ABSL_FLAG(uint32_t, tx_queue_warning_len, 96,
"Length threshold for warning about long transaction queue");
namespace dfly {