From b02a789ebfa97ca861cd0737cc51f6fa81799348 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 28 May 2024 17:41:51 +0300 Subject: [PATCH] fix: add timeout for DFLYMIGRATE ACK to prevent deadlock (#3093) * fix: add timeout for DFLYMIGRATE ACK to prevent deadlock --- src/server/cluster/cluster_family.cc | 8 +++++--- src/server/cluster/incoming_slot_migration.cc | 17 +++++++++++------ src/server/cluster/incoming_slot_migration.h | 5 ++--- tests/dragonfly/cluster_test.py | 9 ++++++--- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index b4732d253..583b88d11 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -771,7 +771,8 @@ bool RemoveIncomingMigrationImpl(std::vectorCancel(); - migration->Join(); + while (!migration->Join()) + ; // we need to join anycase jobs.erase(it); if (!removed.Empty()) { @@ -906,8 +907,9 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { if (!migration) return cntx->SendError(kIdNotFound); - // TODO add timeout for join because it can fail - migration->Join(); + if (!migration->Join()) { + return cntx->SendError("Join timeout happened"); + } VLOG(1) << "Migration is joined for " << source_id; diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 61cb5ce77..791d2b426 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -12,12 +12,13 @@ #include "server/journal/tx_executor.h" #include "server/main_service.h" +ABSL_DECLARE_FLAG(int, slot_migration_connection_timeout_ms); + namespace dfly::cluster { using namespace std; using namespace util; using namespace facade; -using absl::GetFlag; // ClusterShardMigration manage data receiving in slots migration process. // It is created per shard on the target node to initiate FLOW step. @@ -120,11 +121,15 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot IncomingSlotMigration::~IncomingSlotMigration() { } -void IncomingSlotMigration::Join() { - // TODO add timeout - bc_->Wait(); - state_.store(MigrationState::C_FINISHED); - keys_number_ = cluster::GetKeyCount(slots_); +bool IncomingSlotMigration::Join() { + auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; + if (bc_->WaitFor(timeout)) { + state_.store(MigrationState::C_FINISHED); + keys_number_ = cluster::GetKeyCount(slots_); + return true; + } + LOG(WARNING) << "Can't join migration in time"; + return false; } void IncomingSlotMigration::Cancel() { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index ab27c94e6..411c758cb 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -28,10 +28,9 @@ class IncomingSlotMigration { void StartFlow(uint32_t shard, util::FiberSocketBase* source); // Waits until all flows got FIN opcode. - // Join can't be finished if after FIN opcode we get new data - // Connection can be closed by another side, or using Cancel + // returns true if we joined false if timeout is readed // After Join we still can get data due to error situation - void Join(); + [[nodiscard]] bool Join(); void Cancel(); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 51f3b35d9..ac2902dfb 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1278,12 +1278,15 @@ async def test_cluster_fuzzymigration( iterations = 0 while True: + is_all_finished = True for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") print(states) - if not all("FINISHED" in s for s in states) and not states == "NO_STATE": - break - else: + is_all_finished = is_all_finished and ( + all("FINISHED" in s for s in states) or states == "NO_STATE" + ) + + if is_all_finished: break iterations += 1