diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index e28318ded..9de3976f4 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -144,71 +144,6 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { cntx->SendError(kSyntaxErr); } -#if 0 -void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) { - DCHECK_GE(args.size(), 2u); - ToUpper(&args[1]); - - std::string_view sub_cmd = ArgS(args, 1); - Transaction* trans = cntx->transaction; - DCHECK(trans); - RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); - - if (sub_cmd == "START") { - unique_lock lk(mu_); - journal::Journal* journal = ServerState::tlocal()->journal(); - if (!journal) { - string dir = absl::GetFlag(FLAGS_dir); - - atomic_uint32_t created{0}; - - auto open_cb = [&](EngineShard*) { - auto ec = sf_->journal()->OpenInThread(true, dir); - if (ec) { - LOG(ERROR) << "Could not create journal " << ec; - } else { - created.fetch_add(1, memory_order_relaxed); - } - }; - - shard_set->RunBlockingInParallel(open_cb); - if (created.load(memory_order_acquire) != shard_set->size()) { - LOG(FATAL) << "TBD / revert"; - } - - // We can not use transaction distribution mechanism because we must open journal for all - // threads and not only for shards. - trans->Schedule(); - auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - trans->Execute(barrier_cb, true); - - // tx id starting from which we may reliably fetch journal records. - journal_txid_ = trans->txid(); - } - - return rb->SendLong(journal_txid_); - } - - if (sub_cmd == "STOP") { - unique_lock lk(mu_); - if (sf_->journal()->EnterLameDuck()) { - auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - trans->ScheduleSingleHop(std::move(barrier_cb)); - - auto ec = sf_->journal()->Close(); - LOG_IF(ERROR, ec) << "Error closing journal " << ec; - journal_txid_ = trans->txid(); - } - - return rb->SendLong(journal_txid_); - } - - string reply = UnknownSubCmd(sub_cmd, "DFLY"); - return rb->SendError(reply, kSyntaxErrType); -} - -#endif - void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); util::ProactorPool* pool = shard_set->pool(); @@ -269,7 +204,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { return; unique_lock lk(replica_ptr->mu); - if (replica_ptr->state.load(memory_order_relaxed) != SyncState::PREPARATION) + if (replica_ptr->replica_state != SyncState::PREPARATION) return rb->SendError(kInvalidState); // Set meta info on connection. @@ -346,7 +281,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { LOG(INFO) << "Started sync with replica " << replica_ptr->address << ":" << replica_ptr->listening_port; - replica_ptr->state.store(SyncState::FULL_SYNC, memory_order_relaxed); + // protected by lk above. + replica_ptr->replica_state = SyncState::FULL_SYNC; return rb->SendOk(); } @@ -383,7 +319,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { LOG(INFO) << "Transitioned into stable sync with replica " << replica_ptr->address << ":" << replica_ptr->listening_port; - replica_ptr->state.store(SyncState::STABLE_SYNC, memory_order_relaxed); + replica_ptr->replica_state = SyncState::STABLE_SYNC; return rb->SendOk(); } @@ -658,7 +594,7 @@ void DflyCmd::StopReplication(uint32_t sync_id) { void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr replica_ptr) { lock_guard lk(replica_ptr->mu); - if (replica_ptr->state.load(memory_order_relaxed) == SyncState::CANCELLED) { + if (replica_ptr->replica_state == SyncState::CANCELLED) { return; } @@ -666,7 +602,7 @@ void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr replic << replica_ptr->listening_port; // Update replica_ptr state and cancel context. - replica_ptr->state.store(SyncState::CANCELLED, memory_order_release); + replica_ptr->replica_state = SyncState::CANCELLED; replica_ptr->cntx.Cancel(); // Wait for tasks to finish. @@ -690,7 +626,7 @@ void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr replic } shared_ptr DflyCmd::GetReplicaInfo(uint32_t sync_id) { - unique_lock lk(mu_); + lock_guard lk(mu_); auto it = replica_infos_.find(sync_id); if (it != replica_infos_.end()) @@ -700,14 +636,28 @@ shared_ptr DflyCmd::GetReplicaInfo(uint32_t sync_id) { std::vector DflyCmd::GetReplicasRoleInfo() const { std::vector vec; - unique_lock lk(mu_); + lock_guard lk(mu_); + vec.reserve(replica_infos_.size()); auto replication_lags = ReplicationLags(); for (const auto& [id, info] : replica_infos_) { - vec.push_back(ReplicaRoleInfo{info->address, info->listening_port, - SyncStateName(info->state.load(memory_order_relaxed)), - replication_lags[id]}); + LSN lag = replication_lags[id]; + SyncState state = SyncState::PREPARATION; + + // If the replica state being updated, its lag is undefined, + // the same applies of course if its state is not STABLE_SYNC. + if (info->mu.try_lock()) { + state = info->replica_state; + // If the replica is not in stable sync, its lag is undefined, so we set it as max. + if (state != SyncState::STABLE_SYNC) { + lag = std::numeric_limits::max(); + } + info->mu.unlock(); + } else { + lag = std::numeric_limits::max(); + } + vec.push_back(ReplicaRoleInfo{info->address, info->listening_port, SyncStateName(state), lag}); } return vec; } @@ -720,6 +670,9 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const { lock_guard lk{stats_mu}; for (const auto& [_, info] : replica_infos_) { lock_guard repl_lk{info->mu}; + + // flows should not be empty. + DCHECK(!info->flows.empty()); if (info->flows.empty()) continue; @@ -764,18 +717,15 @@ std::map DflyCmd::ReplicationLags() const { shard_set->RunBriefInParallel([&shard_lags, this](EngineShard* shard) { auto& lags = shard_lags[shard->shard_id()]; for (const auto& info : replica_infos_) { - if (info.second->state.load() != SyncState::STABLE_SYNC) { - lags[info.first] = std::numeric_limits::max(); - continue; + const ReplicaInfo* replica = info.second.get(); + if (shard->journal()) { + int64_t lag = shard->journal()->GetLsn() - replica->flows[shard->shard_id()].last_acked_lsn; + lags[info.first] = lag; } - DCHECK(shard->journal()); - int64_t lag = - shard->journal()->GetLsn() - info.second->flows[shard->shard_id()].last_acked_lsn; - DCHECK(lag >= 0); - lags[info.first] = lag; } }); + // Merge the maps from all shards and derive the maximum lag for each replica. std::map rv; for (const auto& lags : shard_lags) { for (auto [replica_id, lag] : lags) { @@ -793,16 +743,17 @@ void DflyCmd::SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version) replica_ptr->version = version; } -bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState expected, +// Must run under locked replica_info.mu. +bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& repl_info, SyncState expected, RedisReplyBuilder* rb) { - if (sync_info.state != expected) { + if (repl_info.replica_state != expected) { rb->SendError(kInvalidState); return false; } // Check all flows are connected. // This might happen if a flow abruptly disconnected before sending the SYNC request. - for (const FlowInfo& flow : sync_info.flows) { + for (const FlowInfo& flow : repl_info.flows) { if (!flow.conn) { rb->SendError(kInvalidState); return false; diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 494c65627..1e190a9d7 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -104,20 +104,22 @@ class DflyCmd { struct ReplicaInfo { ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port, Context::ErrHandler err_handler) - : state{SyncState::PREPARATION}, + : replica_state{SyncState::PREPARATION}, cntx{std::move(err_handler)}, address{std::move(address)}, listening_port(listening_port), flows{flow_count} { } - std::atomic state; + SyncState replica_state; // always guarded by ReplicaInfo::mu Context cntx; std::string address; uint32_t listening_port; DflyVersion version = DflyVersion::VER0; + // Flows describe the state of shard-local flow. + // They are always indexed by the shard index on the master. std::vector flows; Mutex mu; // See top of header for locking levels. }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f87425c88..961b277a3 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -14,7 +14,7 @@ #include "server/journal/journal.h" #include "server/server_state.h" -ABSL_FLAG(uint32_t, tx_queue_warning_len, 40, +ABSL_FLAG(uint32_t, tx_queue_warning_len, 96, "Length threshold for warning about long transaction queue"); namespace dfly {