mirror of
https://github.com/dragonflydb/dragonfly
synced 2024-11-21 23:19:53 +00:00
fix: add timeout for DFLYMIGRATE ACK to prevent deadlock (#3093)
* fix: add timeout for DFLYMIGRATE ACK to prevent deadlock
This commit is contained in:
parent
9360ee2768
commit
b02a789ebf
@ -771,7 +771,8 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
|
||||
|
||||
// First cancel socket, then flush slots, so that new entries won't arrive after we flush.
|
||||
migration->Cancel();
|
||||
migration->Join();
|
||||
while (!migration->Join())
|
||||
; // we need to join anycase
|
||||
jobs.erase(it);
|
||||
|
||||
if (!removed.Empty()) {
|
||||
@ -906,8 +907,9 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
|
||||
if (!migration)
|
||||
return cntx->SendError(kIdNotFound);
|
||||
|
||||
// TODO add timeout for join because it can fail
|
||||
migration->Join();
|
||||
if (!migration->Join()) {
|
||||
return cntx->SendError("Join timeout happened");
|
||||
}
|
||||
|
||||
VLOG(1) << "Migration is joined for " << source_id;
|
||||
|
||||
|
@ -12,12 +12,13 @@
|
||||
#include "server/journal/tx_executor.h"
|
||||
#include "server/main_service.h"
|
||||
|
||||
ABSL_DECLARE_FLAG(int, slot_migration_connection_timeout_ms);
|
||||
|
||||
namespace dfly::cluster {
|
||||
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
using namespace facade;
|
||||
using absl::GetFlag;
|
||||
|
||||
// ClusterShardMigration manage data receiving in slots migration process.
|
||||
// It is created per shard on the target node to initiate FLOW step.
|
||||
@ -120,11 +121,15 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
|
||||
IncomingSlotMigration::~IncomingSlotMigration() {
|
||||
}
|
||||
|
||||
void IncomingSlotMigration::Join() {
|
||||
// TODO add timeout
|
||||
bc_->Wait();
|
||||
state_.store(MigrationState::C_FINISHED);
|
||||
keys_number_ = cluster::GetKeyCount(slots_);
|
||||
bool IncomingSlotMigration::Join() {
|
||||
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
|
||||
if (bc_->WaitFor(timeout)) {
|
||||
state_.store(MigrationState::C_FINISHED);
|
||||
keys_number_ = cluster::GetKeyCount(slots_);
|
||||
return true;
|
||||
}
|
||||
LOG(WARNING) << "Can't join migration in time";
|
||||
return false;
|
||||
}
|
||||
|
||||
void IncomingSlotMigration::Cancel() {
|
||||
|
@ -28,10 +28,9 @@ class IncomingSlotMigration {
|
||||
void StartFlow(uint32_t shard, util::FiberSocketBase* source);
|
||||
|
||||
// Waits until all flows got FIN opcode.
|
||||
// Join can't be finished if after FIN opcode we get new data
|
||||
// Connection can be closed by another side, or using Cancel
|
||||
// returns true if we joined false if timeout is readed
|
||||
// After Join we still can get data due to error situation
|
||||
void Join();
|
||||
[[nodiscard]] bool Join();
|
||||
|
||||
void Cancel();
|
||||
|
||||
|
@ -1278,12 +1278,15 @@ async def test_cluster_fuzzymigration(
|
||||
|
||||
iterations = 0
|
||||
while True:
|
||||
is_all_finished = True
|
||||
for node in nodes:
|
||||
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
||||
print(states)
|
||||
if not all("FINISHED" in s for s in states) and not states == "NO_STATE":
|
||||
break
|
||||
else:
|
||||
is_all_finished = is_all_finished and (
|
||||
all("FINISHED" in s for s in states) or states == "NO_STATE"
|
||||
)
|
||||
|
||||
if is_all_finished:
|
||||
break
|
||||
|
||||
iterations += 1
|
||||
|
Loading…
Reference in New Issue
Block a user