diff --git a/src/server/tiering/disk_storage.cc b/src/server/tiering/disk_storage.cc index 5611bf2b4..215f2517e 100644 --- a/src/server/tiering/disk_storage.cc +++ b/src/server/tiering/disk_storage.cc @@ -41,7 +41,6 @@ UringBuf PrepareBuf(size_t size) { DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING); auto* up = static_cast(ProactorBase::me()); - UringBuf buf; if (auto borrowed = up->RequestBuffer(size); borrowed) return *borrowed; else @@ -77,6 +76,10 @@ std::error_code DiskStorage::Open(std::string_view path) { } void DiskStorage::Close() { + using namespace std::chrono_literals; + while (pending_ops_ > 0) + util::ThisFiber::SleepFor(10ms); + io_mgr_.Shutdown(); } @@ -85,13 +88,16 @@ void DiskStorage::Read(DiskSegment segment, ReadCb cb) { DCHECK_EQ(segment.offset % kPageSize, 0u); UringBuf buf = PrepareBuf(segment.length); - auto io_cb = [cb = std::move(cb), buf, segment](int io_res) { + auto io_cb = [this, cb = std::move(cb), buf, segment](int io_res) { if (io_res < 0) cb("", std::error_code{-io_res, std::system_category()}); else cb(std::string_view{reinterpret_cast(buf.bytes.data()), segment.length}, {}); ReturnBuf(buf); + pending_ops_--; }; + + pending_ops_++; io_mgr_.ReadAsync(segment.offset, buf, std::move(io_cb)); } @@ -135,8 +141,10 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) { cb({size_t(offset), len}, {}); } ReturnBuf(buf); + pending_ops_--; }; + pending_ops_++; io_mgr_.WriteAsync(offset, buf, std::move(io_cb)); return {}; } diff --git a/src/server/tiering/disk_storage.h b/src/server/tiering/disk_storage.h index ea38ecb70..bb968e609 100644 --- a/src/server/tiering/disk_storage.h +++ b/src/server/tiering/disk_storage.h @@ -45,6 +45,7 @@ class DiskStorage { Stats GetStats() const; private: + size_t pending_ops_ = 0; size_t max_size_; IoMgr io_mgr_; ExternalAllocator alloc_;