diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 621ea6f51..e18af0456 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -18,7 +18,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc set_family.cc stream_family.cc string_family.cc - zset_family.cc version.cc bitops_family.cc container_utils.cc + zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc serializer_commons.cc journal/serializer.cc journal/executor.cc) cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index ecf7ceb61..6ac3483ea 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -12,6 +12,7 @@ #include "facade/dragonfly_connection.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/io_utils.h" #include "server/journal/journal.h" #include "server/journal/serializer.h" #include "server/rdb_save.h" @@ -28,6 +29,7 @@ namespace dfly { using namespace facade; using namespace std; +using namespace util::fibers_ext; using util::ProactorBase; namespace { @@ -59,6 +61,62 @@ struct TransactionGuard { Transaction* t; }; +// Buffered single-shard journal streamer that listens for journal changes with a +// journal listener and writes them to a destination sink in a separate fiber. +class JournalStreamer : protected BufferedStreamerBase { + public: + JournalStreamer(journal::Journal* journal, Context* cntx) + : BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx}, + journal_cb_id_{0}, journal_{journal}, write_fb_{}, writer_{this} { + } + + // Self referential. + JournalStreamer(const JournalStreamer& other) = delete; + JournalStreamer(JournalStreamer&& other) = delete; + + // Register journal listener and start writer in fiber. + void Start(io::Sink* dest); + + // Must be called on context cancellation for unblocking + // and manual cleanup. + void Cancel(); + + private: + // Writer fiber that steals buffer contents and writes them to dest. + void WriterFb(io::Sink* dest); + + private: + Context* cntx_; + + uint32_t journal_cb_id_; + journal::Journal* journal_; + + Fiber write_fb_; + JournalWriter writer_; +}; + +void JournalStreamer::Start(io::Sink* dest) { + write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest); + journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) { + writer_.Write(entry); + NotifyWritten(); + }); +} + +void JournalStreamer::Cancel() { + journal_->UnregisterOnChange(journal_cb_id_); + Finalize(); + + if (write_fb_.IsJoinable()) + write_fb_.Join(); +} + +void JournalStreamer::WriterFb(io::Sink* dest) { + if (auto ec = ConsumeIntoSink(dest); ec) { + cntx_->ReportError(ec); + } +} + } // namespace DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, SyncState sync_state) @@ -371,25 +429,20 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) { } OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { - // Register journal listener and cleanup. - uint32_t cb_id = 0; - JournalWriter* writer = nullptr; + // Create streamer for shard flows. + JournalStreamer* streamer = nullptr; if (shard != nullptr) { - writer = new JournalWriter{}; - auto journal_cb = [flow, cntx, writer](const journal::Entry& je) mutable { - writer->Write(je); - if (auto ec = writer->Flush(flow->conn->socket()); ec) - cntx->ReportError(ec); - }; - cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb)); + streamer = new JournalStreamer{sf_->journal(), cntx}; + streamer->Start(flow->conn->socket()); } - flow->cleanup = [this, cb_id, writer, flow]() { - if (writer) - delete writer; - if (cb_id) - sf_->journal()->UnregisterOnChange(cb_id); + // Register cleanup. + flow->cleanup = [this, streamer, flow]() { flow->TryShutdownSocket(); + if (streamer) { + streamer->Cancel(); + delete streamer; + } }; return OpStatus::OK; diff --git a/src/server/io_utils.cc b/src/server/io_utils.cc new file mode 100644 index 000000000..1b1c8e156 --- /dev/null +++ b/src/server/io_utils.cc @@ -0,0 +1,72 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/io_utils.h" + +#include "base/flags.h" +#include "base/logging.h" +#include "server/error.h" + +using namespace std; + +namespace dfly { + +io::Result BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t len) { + return io::BufSink{&producer_buf_}.WriteSome(vec, len); +} + +void BufferedStreamerBase::NotifyWritten() { + if (IsStopped()) + return; + buffered_++; + // Wake up the consumer. + waker_.notify(); + // Block if we're stalled because the consumer is not keeping up. + waker_.await([this]() { return !IsStalled() || IsStopped(); }); +} + +error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) { + while (!IsStopped()) { + // Wait for more data or stop signal. + waker_.await([this]() { return buffered_ > 0 || IsStopped(); }); + + // Break immediately on cancellation. + if (cll_->IsCancelled()) { + waker_.notifyAll(); // Wake consumer if it missed it. + break; + } + + // Swap producer and consumer buffers + std::swap(producer_buf_, consumer_buf_); + buffered_ = 0; + + // If producer stalled, notify we consumed data and it can unblock. + waker_.notify(); + + // Write data and check for errors. + if (auto ec = dest->Write(consumer_buf_.InputBuffer()); ec) { + Finalize(); // Finalize on error to unblock prodcer immediately. + return ec; + } + + // TODO: shrink big stash. + consumer_buf_.Clear(); + } + + return std::error_code{}; +} + +void BufferedStreamerBase::Finalize() { + producer_done_ = true; + waker_.notifyAll(); +} + +bool BufferedStreamerBase::IsStopped() { + return cll_->IsCancelled() || producer_done_; +} + +bool BufferedStreamerBase::IsStalled() { + return buffered_ > max_buffered_cnt_ || producer_buf_.InputLen() > max_buffered_mem_; +} +} // namespace dfly diff --git a/src/server/io_utils.h b/src/server/io_utils.h new file mode 100644 index 000000000..459f83b77 --- /dev/null +++ b/src/server/io_utils.h @@ -0,0 +1,72 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "base/io_buf.h" +#include "io/io.h" +#include "server/common.h" +#include "util/fibers/event_count.h" + +namespace dfly { + +// Base for constructing buffered byte streams with backpressure +// for single producer and consumer on the same thread. +// +// Use it as a io::Sink to write data from a producer fiber, +// and ConsumeIntoSink to extract this data in a consumer fiber. +// Use NotifyWritten to request the consumer to be woken up. +// +// Uses two base::IoBuf internally that are swapped in turns. +class BufferedStreamerBase : public io::Sink { + protected: + // Initialize with global cancellation and optional stall conditions. + BufferedStreamerBase(const Cancellation* cll, unsigned max_buffered_cnt = 5, + unsigned max_buffered_mem = 512) + : cll_{cll}, max_buffered_cnt_{max_buffered_cnt}, max_buffered_mem_{max_buffered_mem} { + } + + // Write some data into the internal buffer. + // + // Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small + // writes: + // + // while (should_write()) { + // bsb->WriteSome(...); <- Write some data + // bsb->WriteSome(...); + // ... + // bsb->NotifyWritten(); <- Wake up consumer after writes + // } + // bsb->Finalize(); <- Finalize to unblock consumer + // + io::Result WriteSome(const iovec* vec, uint32_t len) override; + + // Report that a batch of data has been written and the consumer can be woken up. + // Blocks if the consumer if not keeping up. + void NotifyWritten(); + + // Report producer finished. + void Finalize(); + + // Consume whole stream to sink from the consumer fiber. Unblocks when cancelled or finalized. + std::error_code ConsumeIntoSink(io::Sink* dest); + + // Whether the consumer is not keeping up. + bool IsStalled(); + + // Whether the producer stopped or the context was cancelled. + bool IsStopped(); + + protected: + bool producer_done_ = false; // whether producer is done + unsigned buffered_ = 0; // how many entries are buffered + ::util::fibers_ext::EventCount waker_{}; // two sided waker + + const Cancellation* cll_; // global cancellation + + unsigned max_buffered_cnt_; // Max buffered entries before stall + unsigned max_buffered_mem_; // Max buffered mem before stall + + base::IoBuf producer_buf_, consumer_buf_; // Two buffers that are swapped in turns. +}; + +} // namespace dfly diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 732472f36..d934af25a 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -18,26 +18,18 @@ using namespace std; namespace dfly { -std::error_code JournalWriter::Flush(io::Sink* sink) { - if (auto ec = sink->Write(buf_.InputBuffer()); ec) - return ec; - buf_.Clear(); - return {}; -} - -base::IoBuf& JournalWriter::Accumulated() { - return buf_; +JournalWriter::JournalWriter(io::Sink* sink) : sink_{sink} { } void JournalWriter::Write(uint64_t v) { uint8_t buf[10]; unsigned len = WritePackedUInt(v, buf); - buf_.WriteAndCommit(buf, len); + sink_->Write(io::Bytes{buf}.first(len)); } void JournalWriter::Write(std::string_view sv) { Write(sv.size()); - buf_.WriteAndCommit(sv.data(), sv.size()); + sink_->Write(io::Buffer(sv)); } void JournalWriter::Write(CmdArgList args) { diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index 880f07772..1b4fed1d8 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -18,15 +18,11 @@ namespace dfly { // It automatically keeps track of the current database index. class JournalWriter { public: - // Write single entry to internal buffer. + JournalWriter(io::Sink* sink); + + // Write single entry to sink. void Write(const journal::Entry& entry); - // Flush internal buffer to sink. - std::error_code Flush(io::Sink* sink_); - - // Return reference to internal buffer. - base::IoBuf& Accumulated(); - private: void Write(uint64_t v); // Write packed unsigned integer. void Write(std::string_view sv); // Write string. @@ -36,7 +32,7 @@ class JournalWriter { void Write(std::monostate); // Overload for empty std::variant private: - base::IoBuf buf_{}; + io::Sink* sink_; std::optional cur_dbid_{}; }; diff --git a/src/server/journal_test.cc b/src/server/journal_test.cc index 62a70e104..ba78962a1 100644 --- a/src/server/journal_test.cc +++ b/src/server/journal_test.cc @@ -108,15 +108,18 @@ TEST(Journal, WriteRead) { {6, journal::Op::MULTI_COMMAND, 2, 1, list("SET", "E", "2")}, {6, journal::Op::EXEC, 2, 1}}; - // Write all entries to string file. - JournalWriter writer{}; + // Write all entries to a buffer. + base::IoBuf buf; + io::BufSink sink{&buf}; + + JournalWriter writer{&sink}; for (const auto& entry : test_entries) { writer.Write(entry); } // Read them back. - io::BytesSource bs{writer.Accumulated().InputBuffer()}; - JournalReader reader{&bs, 0}; + io::BufSource source{&buf}; + JournalReader reader{&source, 0}; for (unsigned i = 0; i < test_entries.size(); i++) { auto& expected = test_entries[i]; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 242b0fd92..4ca48fef9 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -702,18 +702,17 @@ io::Bytes RdbSerializer::PrepareFlush() { } error_code RdbSerializer::WriteJournalEntries(absl::Span entries) { + io::BufSink buf_sink{&journal_mem_buf_}; + JournalWriter writer{&buf_sink}; for (const auto& entry : entries) { - journal_writer_.Write(entry); + writer.Write(entry); } RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); RETURN_ON_ERR(SaveLen(entries.size())); + RETURN_ON_ERR(SaveString(io::View(journal_mem_buf_.InputBuffer()))); - auto& buf = journal_writer_.Accumulated(); - auto bytes = buf.InputBuffer(); - RETURN_ON_ERR(SaveString(string_view{reinterpret_cast(bytes.data()), bytes.size()})); - buf.Clear(); - + journal_mem_buf_.Clear(); return error_code{}; } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 081618ec4..8c6d5d287 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -176,12 +176,12 @@ class RdbSerializer { void CompressBlob(); void AllocateCompressorOnce(); - JournalWriter journal_writer_; + base::IoBuf mem_buf_; + base::IoBuf journal_mem_buf_; + std::string tmp_str_; + base::PODArray tmp_buf_; std::unique_ptr lzf_; - base::IoBuf mem_buf_; - base::PODArray tmp_buf_; - std::string tmp_str_; CompressionMode compression_mode_; // TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.