mirror of
https://github.com/dragonflydb/dragonfly
synced 2024-11-21 23:19:53 +00:00
Fix consistency bug in watched code
CI improvements
This commit is contained in:
parent
fac4bf0354
commit
e88d995618
35
.github/workflows/ci.yml
vendored
35
.github/workflows/ci.yml
vendored
@ -2,7 +2,7 @@ name: ci-tests
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
# branches: [ main ]
|
||||
pull_request:
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
@ -26,25 +26,42 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
uname -a
|
||||
sudo apt update
|
||||
sudo apt install -y autoconf-archive bison libunwind-dev libfl-dev ninja-build libtool curl gcc-10 g++-10
|
||||
sudo apt install -y libboost-all-dev libxml2-dev zip
|
||||
# sudo apt-fast update
|
||||
sudo apt-fast install -y autoconf-archive bison libunwind-dev libfl-dev ninja-build libtool curl gcc-10 g++-10
|
||||
sudo apt-fast install -y libboost-all-dev libxml2-dev zip ccache
|
||||
cmake --version
|
||||
mkdir -p ${{github.workspace}}/build
|
||||
- name: Cache build dependencies
|
||||
id: cache-deps
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: |
|
||||
${{github.workspace}}/build/_deps
|
||||
${{github.workspace}}/build/third_party
|
||||
key: ${{ runner.os }}-builddeps
|
||||
- name: Cache ccache
|
||||
id: cache-ccache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: |
|
||||
~/.ccache
|
||||
key: ${{ runner.os }}-ccache
|
||||
- name: Configure CMake
|
||||
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
|
||||
# See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type
|
||||
run: |
|
||||
cmake -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -GNinja
|
||||
cmake -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -GNinja -DCMAKE_CXX_COMPILER_LAUNCHER=ccache
|
||||
cd ${{github.workspace}}/build
|
||||
env:
|
||||
CC: gcc-10
|
||||
CXX: g++-10
|
||||
|
||||
- name: Build & Test
|
||||
working-directory: ${{github.workspace}}/build
|
||||
run: |
|
||||
ninja core/all server/all
|
||||
ccache --show-stats
|
||||
# GLOG_logtostderr=1 ctest -V -R list_family_test
|
||||
GLOG_logtostderr=1 ctest -V -L DFLY
|
||||
GLOG_logtostderr=1 GLOG_vmodule=transaction=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
|
||||
|
||||
echo Run ctest -V -L DFLY
|
||||
GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 ctest -V -L DFLY
|
||||
GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
|
||||
|
2
helio
2
helio
@ -1 +1 @@
|
||||
Subproject commit b3675ad8860a315fffbb14a1247cbfec5e5b8ea5
|
||||
Subproject commit de1c2b5578f835f1d659f437c0a7ea7d991faa24
|
@ -119,7 +119,7 @@ void Connection::HandleRequests() {
|
||||
tls_sock.reset(new tls::TlsSocket(socket_.get()));
|
||||
tls_sock->InitSSL(ctx_);
|
||||
|
||||
FiberSocketBase::accept_result aresult = tls_sock->Accept();
|
||||
FiberSocketBase::AcceptResult aresult = tls_sock->Accept();
|
||||
if (!aresult) {
|
||||
LOG(WARNING) << "Error handshaking " << aresult.error().message();
|
||||
return;
|
||||
|
@ -39,6 +39,15 @@ struct EngineShard::WatchQueue {
|
||||
}
|
||||
};
|
||||
|
||||
bool EngineShard::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) {
|
||||
DVLOG(1) << "Erasing watchqueue key " << it->first;
|
||||
|
||||
awakened_keys.erase(it->first);
|
||||
queue_map.erase(it);
|
||||
|
||||
return queue_map.empty();
|
||||
}
|
||||
|
||||
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time)
|
||||
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }),
|
||||
db_slice_(pb->GetIndex(), this) {
|
||||
@ -160,7 +169,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
||||
}
|
||||
|
||||
OnTxFinish();
|
||||
} // while(!txq_.Empty())
|
||||
} // while(!txq_.Empty())
|
||||
} else { // if (continuation_trans_ == nullptr && !has_awaked_trans)
|
||||
DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans;
|
||||
}
|
||||
@ -222,6 +231,7 @@ Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) {
|
||||
|
||||
// Processes potentially awakened keys and verifies that these are indeed
|
||||
// awakened to eliminate false positives.
|
||||
// In addition it, optionally removes completed_t from watch queues.
|
||||
void EngineShard::ProcessAwakened(Transaction* completed_t) {
|
||||
for (DbIndex index : awakened_indices_) {
|
||||
DbWatchTable& wt = watched_dbs_[index];
|
||||
@ -247,8 +257,13 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
|
||||
if (!completed_t)
|
||||
return;
|
||||
|
||||
auto& wt = watched_dbs_[completed_t->db_index()];
|
||||
auto dbit = watched_dbs_.find(completed_t->db_index());
|
||||
if (dbit == watched_dbs_.end())
|
||||
return;
|
||||
|
||||
DbWatchTable& wt = dbit->second;
|
||||
KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id());
|
||||
|
||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||
string_view key = lock_args.args[i];
|
||||
auto w_it = wt.queue_map.find(key);
|
||||
@ -262,6 +277,7 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
|
||||
|
||||
auto& queue = wq.items;
|
||||
DCHECK(!queue.empty()); // since it's active
|
||||
|
||||
if (queue.front().trans == completed_t) {
|
||||
queue.pop_front();
|
||||
|
||||
@ -275,11 +291,14 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
|
||||
}
|
||||
|
||||
if (queue.empty()) {
|
||||
DVLOG(1) << "Erasing watchqueue key " << key;
|
||||
wt.queue_map.erase(w_it);
|
||||
wt.RemoveEntry(w_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (wt.queue_map.empty()) {
|
||||
watched_dbs_.erase(dbit);
|
||||
}
|
||||
awakened_transactions_.erase(completed_t);
|
||||
}
|
||||
|
||||
@ -295,7 +314,10 @@ void EngineShard::AddWatched(string_view key, Transaction* me) {
|
||||
|
||||
// Runs in O(N) complexity.
|
||||
bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
|
||||
DbWatchTable& wt = watched_dbs_[me->db_index()];
|
||||
auto dbit = watched_dbs_.find(me->db_index());
|
||||
CHECK(dbit != watched_dbs_.end());
|
||||
|
||||
DbWatchTable& wt = dbit->second;
|
||||
auto watch_it = wt.queue_map.find(key);
|
||||
CHECK(watch_it != wt.queue_map.end());
|
||||
|
||||
@ -304,8 +326,9 @@ bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
|
||||
if (j->trans == me) {
|
||||
wq.items.erase(j);
|
||||
if (wq.items.empty()) {
|
||||
DVLOG(1) << "Erasing watchqueue key " << key;
|
||||
wt.queue_map.erase(watch_it);
|
||||
if (wt.RemoveEntry(watch_it)) {
|
||||
watched_dbs_.erase(dbit);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -317,12 +340,16 @@ bool EngineShard::RemovedWatched(string_view key, Transaction* me) {
|
||||
}
|
||||
|
||||
void EngineShard::GCWatched(const KeyLockArgs& largs) {
|
||||
auto& queue_map = watched_dbs_[largs.db_index].queue_map;
|
||||
auto dbit = watched_dbs_.find(largs.db_index);
|
||||
CHECK(dbit != watched_dbs_.end());
|
||||
|
||||
DbWatchTable& wt = dbit->second;
|
||||
|
||||
for (size_t i = 0; i < largs.args.size(); i += largs.key_step) {
|
||||
string_view key = largs.args[i];
|
||||
auto watch_it = queue_map.find(key);
|
||||
CHECK(watch_it != queue_map.end());
|
||||
auto watch_it = wt.queue_map.find(key);
|
||||
CHECK(watch_it != wt.queue_map.end());
|
||||
|
||||
WatchQueue& wq = *watch_it->second;
|
||||
DCHECK(!wq.items.empty());
|
||||
do {
|
||||
@ -334,25 +361,24 @@ void EngineShard::GCWatched(const KeyLockArgs& largs) {
|
||||
} while (!wq.items.empty());
|
||||
|
||||
if (wq.items.empty()) {
|
||||
DVLOG(1) << "Erasing watchqueue key " << key;
|
||||
queue_map.erase(watch_it);
|
||||
if (wt.RemoveEntry(watch_it)) {
|
||||
watched_dbs_.erase(dbit);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called from commands like lpush.
|
||||
void EngineShard::AwakeWatched(DbIndex db_index, const MainIterator& main_it) {
|
||||
void EngineShard::AwakeWatched(DbIndex db_index, std::string_view db_key) {
|
||||
auto it = watched_dbs_.find(db_index);
|
||||
if (it == watched_dbs_.end())
|
||||
return;
|
||||
|
||||
DbWatchTable& wt = it->second;
|
||||
if (wt.queue_map.empty()) { /// No blocked transactions.
|
||||
return;
|
||||
}
|
||||
DCHECK(!wt.queue_map.empty());
|
||||
|
||||
string tmp;
|
||||
string_view db_key = main_it->first;
|
||||
auto wit = wt.queue_map.find(db_key);
|
||||
|
||||
if (wit == wt.queue_map.end())
|
||||
|
@ -94,7 +94,7 @@ class EngineShard {
|
||||
bool RemovedWatched(std::string_view key, Transaction* me);
|
||||
void GCWatched(const KeyLockArgs& lock_args);
|
||||
|
||||
void AwakeWatched(DbIndex db_index, const MainIterator& it);
|
||||
void AwakeWatched(DbIndex db_index, std::string_view key);
|
||||
|
||||
bool HasAwakedTransaction() const {
|
||||
return !awakened_transactions_.empty();
|
||||
@ -128,13 +128,18 @@ class EngineShard {
|
||||
/// or null if all transactions in the queue have expired..
|
||||
Transaction* NotifyWatchQueue(WatchQueue* wq);
|
||||
|
||||
using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>;
|
||||
// Watch state per db slice.
|
||||
struct DbWatchTable {
|
||||
absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>> queue_map;
|
||||
WatchQueueMap queue_map;
|
||||
|
||||
// awakened keys that point to blocked entries that can potentially be unblocked.
|
||||
// reference watched keys.
|
||||
// awakened keys point to blocked keys that can potentially be unblocked.
|
||||
// they reference key objects in queue_map.
|
||||
absl::flat_hash_set<base::string_view_sso> awakened_keys;
|
||||
|
||||
|
||||
// Returns true if queue_map is empty and DbWatchTable can be removed as well.
|
||||
bool RemoveEntry(WatchQueueMap::iterator it);
|
||||
};
|
||||
|
||||
absl::flat_hash_map<DbIndex, DbWatchTable> watched_dbs_;
|
||||
|
@ -348,7 +348,7 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
|
||||
}
|
||||
|
||||
if (new_key) {
|
||||
es->AwakeWatched(op_args.db_ind, it);
|
||||
es->AwakeWatched(op_args.db_ind, it->first);
|
||||
}
|
||||
return quicklistCount(ql);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user