feat(server): Buffered streamer + use on stable state (#639)

This commit is contained in:
Vladislav 2023-01-15 12:17:04 +03:00 committed by GitHub
parent 7b7e3ea9d4
commit 7eff61c9ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 236 additions and 49 deletions

View File

@ -18,7 +18,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib

View File

@ -12,6 +12,7 @@
#include "facade/dragonfly_connection.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/io_utils.h"
#include "server/journal/journal.h"
#include "server/journal/serializer.h"
#include "server/rdb_save.h"
@ -28,6 +29,7 @@ namespace dfly {
using namespace facade;
using namespace std;
using namespace util::fibers_ext;
using util::ProactorBase;
namespace {
@ -59,6 +61,62 @@ struct TransactionGuard {
Transaction* t;
};
// Buffered single-shard journal streamer that listens for journal changes with a
// journal listener and writes them to a destination sink in a separate fiber.
class JournalStreamer : protected BufferedStreamerBase {
public:
JournalStreamer(journal::Journal* journal, Context* cntx)
: BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx},
journal_cb_id_{0}, journal_{journal}, write_fb_{}, writer_{this} {
}
// Self referential.
JournalStreamer(const JournalStreamer& other) = delete;
JournalStreamer(JournalStreamer&& other) = delete;
// Register journal listener and start writer in fiber.
void Start(io::Sink* dest);
// Must be called on context cancellation for unblocking
// and manual cleanup.
void Cancel();
private:
// Writer fiber that steals buffer contents and writes them to dest.
void WriterFb(io::Sink* dest);
private:
Context* cntx_;
uint32_t journal_cb_id_;
journal::Journal* journal_;
Fiber write_fb_;
JournalWriter writer_;
};
void JournalStreamer::Start(io::Sink* dest) {
write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) {
writer_.Write(entry);
NotifyWritten();
});
}
void JournalStreamer::Cancel() {
journal_->UnregisterOnChange(journal_cb_id_);
Finalize();
if (write_fb_.IsJoinable())
write_fb_.Join();
}
void JournalStreamer::WriterFb(io::Sink* dest) {
if (auto ec = ConsumeIntoSink(dest); ec) {
cntx_->ReportError(ec);
}
}
} // namespace
DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, SyncState sync_state)
@ -371,25 +429,20 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
}
OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
// Register journal listener and cleanup.
uint32_t cb_id = 0;
JournalWriter* writer = nullptr;
// Create streamer for shard flows.
JournalStreamer* streamer = nullptr;
if (shard != nullptr) {
writer = new JournalWriter{};
auto journal_cb = [flow, cntx, writer](const journal::Entry& je) mutable {
writer->Write(je);
if (auto ec = writer->Flush(flow->conn->socket()); ec)
cntx->ReportError(ec);
};
cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb));
streamer = new JournalStreamer{sf_->journal(), cntx};
streamer->Start(flow->conn->socket());
}
flow->cleanup = [this, cb_id, writer, flow]() {
if (writer)
delete writer;
if (cb_id)
sf_->journal()->UnregisterOnChange(cb_id);
// Register cleanup.
flow->cleanup = [this, streamer, flow]() {
flow->TryShutdownSocket();
if (streamer) {
streamer->Cancel();
delete streamer;
}
};
return OpStatus::OK;

72
src/server/io_utils.cc Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/io_utils.h"
#include "base/flags.h"
#include "base/logging.h"
#include "server/error.h"
using namespace std;
namespace dfly {
io::Result<size_t> BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t len) {
return io::BufSink{&producer_buf_}.WriteSome(vec, len);
}
void BufferedStreamerBase::NotifyWritten() {
if (IsStopped())
return;
buffered_++;
// Wake up the consumer.
waker_.notify();
// Block if we're stalled because the consumer is not keeping up.
waker_.await([this]() { return !IsStalled() || IsStopped(); });
}
error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) {
while (!IsStopped()) {
// Wait for more data or stop signal.
waker_.await([this]() { return buffered_ > 0 || IsStopped(); });
// Break immediately on cancellation.
if (cll_->IsCancelled()) {
waker_.notifyAll(); // Wake consumer if it missed it.
break;
}
// Swap producer and consumer buffers
std::swap(producer_buf_, consumer_buf_);
buffered_ = 0;
// If producer stalled, notify we consumed data and it can unblock.
waker_.notify();
// Write data and check for errors.
if (auto ec = dest->Write(consumer_buf_.InputBuffer()); ec) {
Finalize(); // Finalize on error to unblock prodcer immediately.
return ec;
}
// TODO: shrink big stash.
consumer_buf_.Clear();
}
return std::error_code{};
}
void BufferedStreamerBase::Finalize() {
producer_done_ = true;
waker_.notifyAll();
}
bool BufferedStreamerBase::IsStopped() {
return cll_->IsCancelled() || producer_done_;
}
bool BufferedStreamerBase::IsStalled() {
return buffered_ > max_buffered_cnt_ || producer_buf_.InputLen() > max_buffered_mem_;
}
} // namespace dfly

72
src/server/io_utils.h Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "base/io_buf.h"
#include "io/io.h"
#include "server/common.h"
#include "util/fibers/event_count.h"
namespace dfly {
// Base for constructing buffered byte streams with backpressure
// for single producer and consumer on the same thread.
//
// Use it as a io::Sink to write data from a producer fiber,
// and ConsumeIntoSink to extract this data in a consumer fiber.
// Use NotifyWritten to request the consumer to be woken up.
//
// Uses two base::IoBuf internally that are swapped in turns.
class BufferedStreamerBase : public io::Sink {
protected:
// Initialize with global cancellation and optional stall conditions.
BufferedStreamerBase(const Cancellation* cll, unsigned max_buffered_cnt = 5,
unsigned max_buffered_mem = 512)
: cll_{cll}, max_buffered_cnt_{max_buffered_cnt}, max_buffered_mem_{max_buffered_mem} {
}
// Write some data into the internal buffer.
//
// Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small
// writes:
//
// while (should_write()) {
// bsb->WriteSome(...); <- Write some data
// bsb->WriteSome(...);
// ...
// bsb->NotifyWritten(); <- Wake up consumer after writes
// }
// bsb->Finalize(); <- Finalize to unblock consumer
//
io::Result<size_t> WriteSome(const iovec* vec, uint32_t len) override;
// Report that a batch of data has been written and the consumer can be woken up.
// Blocks if the consumer if not keeping up.
void NotifyWritten();
// Report producer finished.
void Finalize();
// Consume whole stream to sink from the consumer fiber. Unblocks when cancelled or finalized.
std::error_code ConsumeIntoSink(io::Sink* dest);
// Whether the consumer is not keeping up.
bool IsStalled();
// Whether the producer stopped or the context was cancelled.
bool IsStopped();
protected:
bool producer_done_ = false; // whether producer is done
unsigned buffered_ = 0; // how many entries are buffered
::util::fibers_ext::EventCount waker_{}; // two sided waker
const Cancellation* cll_; // global cancellation
unsigned max_buffered_cnt_; // Max buffered entries before stall
unsigned max_buffered_mem_; // Max buffered mem before stall
base::IoBuf producer_buf_, consumer_buf_; // Two buffers that are swapped in turns.
};
} // namespace dfly

View File

@ -18,26 +18,18 @@ using namespace std;
namespace dfly {
std::error_code JournalWriter::Flush(io::Sink* sink) {
if (auto ec = sink->Write(buf_.InputBuffer()); ec)
return ec;
buf_.Clear();
return {};
}
base::IoBuf& JournalWriter::Accumulated() {
return buf_;
JournalWriter::JournalWriter(io::Sink* sink) : sink_{sink} {
}
void JournalWriter::Write(uint64_t v) {
uint8_t buf[10];
unsigned len = WritePackedUInt(v, buf);
buf_.WriteAndCommit(buf, len);
sink_->Write(io::Bytes{buf}.first(len));
}
void JournalWriter::Write(std::string_view sv) {
Write(sv.size());
buf_.WriteAndCommit(sv.data(), sv.size());
sink_->Write(io::Buffer(sv));
}
void JournalWriter::Write(CmdArgList args) {

View File

@ -18,15 +18,11 @@ namespace dfly {
// It automatically keeps track of the current database index.
class JournalWriter {
public:
// Write single entry to internal buffer.
JournalWriter(io::Sink* sink);
// Write single entry to sink.
void Write(const journal::Entry& entry);
// Flush internal buffer to sink.
std::error_code Flush(io::Sink* sink_);
// Return reference to internal buffer.
base::IoBuf& Accumulated();
private:
void Write(uint64_t v); // Write packed unsigned integer.
void Write(std::string_view sv); // Write string.
@ -36,7 +32,7 @@ class JournalWriter {
void Write(std::monostate); // Overload for empty std::variant
private:
base::IoBuf buf_{};
io::Sink* sink_;
std::optional<DbIndex> cur_dbid_{};
};

View File

@ -108,15 +108,18 @@ TEST(Journal, WriteRead) {
{6, journal::Op::MULTI_COMMAND, 2, 1, list("SET", "E", "2")},
{6, journal::Op::EXEC, 2, 1}};
// Write all entries to string file.
JournalWriter writer{};
// Write all entries to a buffer.
base::IoBuf buf;
io::BufSink sink{&buf};
JournalWriter writer{&sink};
for (const auto& entry : test_entries) {
writer.Write(entry);
}
// Read them back.
io::BytesSource bs{writer.Accumulated().InputBuffer()};
JournalReader reader{&bs, 0};
io::BufSource source{&buf};
JournalReader reader{&source, 0};
for (unsigned i = 0; i < test_entries.size(); i++) {
auto& expected = test_entries[i];

View File

@ -702,18 +702,17 @@ io::Bytes RdbSerializer::PrepareFlush() {
}
error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) {
io::BufSink buf_sink{&journal_mem_buf_};
JournalWriter writer{&buf_sink};
for (const auto& entry : entries) {
journal_writer_.Write(entry);
writer.Write(entry);
}
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB));
RETURN_ON_ERR(SaveLen(entries.size()));
RETURN_ON_ERR(SaveString(io::View(journal_mem_buf_.InputBuffer())));
auto& buf = journal_writer_.Accumulated();
auto bytes = buf.InputBuffer();
RETURN_ON_ERR(SaveString(string_view{reinterpret_cast<const char*>(bytes.data()), bytes.size()}));
buf.Clear();
journal_mem_buf_.Clear();
return error_code{};
}

View File

@ -176,12 +176,12 @@ class RdbSerializer {
void CompressBlob();
void AllocateCompressorOnce();
JournalWriter journal_writer_;
base::IoBuf mem_buf_;
base::IoBuf journal_mem_buf_;
std::string tmp_str_;
base::PODArray<uint8_t> tmp_buf_;
std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_;
std::string tmp_str_;
CompressionMode compression_mode_;
// TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.