fix(tiering): wait for IO before test teardown (#3079)

* chore: wait for IO before test teardown

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-05-25 20:05:47 +03:00 committed by GitHub
parent 82d8b02348
commit 42500c3d1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 2 deletions

View File

@ -41,7 +41,6 @@ UringBuf PrepareBuf(size_t size) {
DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING);
auto* up = static_cast<UringProactor*>(ProactorBase::me());
UringBuf buf;
if (auto borrowed = up->RequestBuffer(size); borrowed)
return *borrowed;
else
@ -77,6 +76,10 @@ std::error_code DiskStorage::Open(std::string_view path) {
}
void DiskStorage::Close() {
using namespace std::chrono_literals;
while (pending_ops_ > 0)
util::ThisFiber::SleepFor(10ms);
io_mgr_.Shutdown();
}
@ -85,13 +88,16 @@ void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
DCHECK_EQ(segment.offset % kPageSize, 0u);
UringBuf buf = PrepareBuf(segment.length);
auto io_cb = [cb = std::move(cb), buf, segment](int io_res) {
auto io_cb = [this, cb = std::move(cb), buf, segment](int io_res) {
if (io_res < 0)
cb("", std::error_code{-io_res, std::system_category()});
else
cb(std::string_view{reinterpret_cast<char*>(buf.bytes.data()), segment.length}, {});
ReturnBuf(buf);
pending_ops_--;
};
pending_ops_++;
io_mgr_.ReadAsync(segment.offset, buf, std::move(io_cb));
}
@ -135,8 +141,10 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
cb({size_t(offset), len}, {});
}
ReturnBuf(buf);
pending_ops_--;
};
pending_ops_++;
io_mgr_.WriteAsync(offset, buf, std::move(io_cb));
return {};
}

View File

@ -45,6 +45,7 @@ class DiskStorage {
Stats GetStats() const;
private:
size_t pending_ops_ = 0;
size_t max_size_;
IoMgr io_mgr_;
ExternalAllocator alloc_;