From 6f78ae5073e57af24cf89e68825c90df83a884e4 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Wed, 21 Jun 2023 10:26:22 +0300 Subject: [PATCH] fix: call NotifyPending only from tx queue invocations (#1439) * fix: call NotifyPending only from tx queue invocations --------- Signed-off-by: Vladislav Oleshko --- src/server/blocking_controller.cc | 3 +++ src/server/engine_shard_set.cc | 4 ++-- src/server/engine_shard_set.h | 8 +++++-- src/server/list_family_test.cc | 35 +++++++++++++++++++++++++++++++ src/server/transaction.cc | 22 ++++++++++++------- 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index d7ead7b8f..2bbbf97f0 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -196,6 +196,9 @@ void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) { } void BlockingController::NotifyPending() { + const Transaction* tx = owner_->GetContTx(); + CHECK(tx == nullptr) << tx->DebugId(); + DbContext context; context.time_now_ms = GetCurrentTimeMs(); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ed98853be..6f73e012c 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -402,8 +402,8 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } } -void EngineShard::ShutdownMulti(Transaction* multi) { - if (continuation_trans_ == multi) { +void EngineShard::RemoveContTx(Transaction* tx) { + if (continuation_trans_ == tx) { continuation_trans_ = nullptr; } } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index f35c196db..174991d57 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -98,8 +98,8 @@ class EngineShard { return &shard_lock_; } - // TODO: Awkward interface. I should solve it somehow. - void ShutdownMulti(Transaction* multi); + // Remove current continuation trans if its equal to tx. + void RemoveContTx(Transaction* tx); void IncQuickRun() { stats_.quick_runs++; @@ -153,6 +153,10 @@ class EngineShard { return is_replica_; } + const Transaction* GetContTx() const { + return continuation_trans_; + } + void TEST_EnableHeartbeat(); private: diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 0151dc15a..4303e2316 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -827,4 +827,39 @@ TEST_F(ListFamilyTest, BLPopUnwakesInScript) { f2.Join(); } +TEST_F(ListFamilyTest, OtherMultiWakesBLpop) { + const string_view SCRIPT = R"( + redis.call('LPUSH', 'l', 'bad') + for i = 1, 1000 do + redis.call('MGET', 'a', 'b', 'c', 'd') + end + redis.call('LPUSH', 'l', 'good') + )"; + + const string_view SCRIPT_SHORT = R"( + redis.call('GET', KEYS[1]) + )"; + + // Start BLPOP with infinite timeout + auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&]() { + auto resp = Run("blpop", {"BLPOP", "l", "0"}); + // blpop should only be awakened after the script has completed, so the + // last element added in the script should be returned. + EXPECT_THAT(resp, ArgType(RespExpr::ARRAY)); + EXPECT_THAT(resp.GetVec(), ElementsAre("l", "good")); + }); + + // Start long running script that accesses the list, but should wake up blpop only after it + // finished + auto f2 = pp_->at(2)->LaunchFiber(Launch::dispatch, [&]() { + Run("script", {"EVAL", SCRIPT, "5", "a", "b", "c", "d", "l"}); + }); + + // Run quick multi transaction that concludes after one hop + Run({"EVAL", SCRIPT_SHORT, "1", "y"}); + + f1.Join(); + f2.Join(); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 29a54ea78..e6cc64371 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -522,16 +522,24 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { sd.local_mask &= ~OUT_OF_ORDER; } + // This is the last hop, so clear cont_trans if its held by the current tx + shard->RemoveContTx(this); + // It has 2 responsibilities. // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head // of the queue and notify the next one. // RunStep is also called for global transactions because of commands like MOVE. - if (shard->blocking_controller()) { + if (auto* bcontroller = shard->blocking_controller(); bcontroller) { if (awaked_prerun || was_suspended) { - shard->blocking_controller()->FinalizeWatched(largs, this); + bcontroller->FinalizeWatched(largs, this); + } + + // Wake only if no tx queue head is currently running + // Note: RemoveContTx might have no effect above if this tx had no continuations + if (shard->GetContTx() == nullptr) { + bcontroller->NotifyPending(); } - shard->blocking_controller()->NotifyPending(); } } @@ -1232,12 +1240,12 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E sd.pq_pos = TxQueue::kEnd; } - shard->ShutdownMulti(this); + shard->RemoveContTx(this); - // notify awakened transactions, not sure we need it here because it's done after - // each operation - if (shard->blocking_controller()) + // Wake only if no tx queue head is currently running + if (shard->blocking_controller() && shard->GetContTx() == nullptr) shard->blocking_controller()->NotifyPending(); + shard->PollExecution("unlockmulti", nullptr); this->DecreaseRunCnt();