diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a1a3ed233..e0e2b04df 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -142,7 +142,7 @@ jobs: EOF gdb -ix ./init.gdb --batch -ex r --args ./dragonfly_test --force_epoll - FLAGS_force_epoll=true GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1 timeout 20m ctest -V -L DFLY + DFLY_use_new_io=true FLAGS_force_epoll=true GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1 timeout 20m ctest -V -L DFLY echo "Finished running tests with --force_epoll" diff --git a/src/facade/conn_context.cc b/src/facade/conn_context.cc index ef0c9df2c..6049231af 100644 --- a/src/facade/conn_context.cc +++ b/src/facade/conn_context.cc @@ -4,7 +4,12 @@ #include "facade/conn_context.h" +#include "absl/flags/internal/flag.h" +#include "base/flags.h" #include "facade/dragonfly_connection.h" +#include "facade/reply_builder.h" + +ABSL_FLAG(bool, use_new_io, false, "Use new IO by default"); namespace facade { @@ -15,9 +20,12 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow if (stream) { switch (protocol_) { - case Protocol::REDIS: - rbuilder_.reset(new RedisReplyBuilder(stream)); + case Protocol::REDIS: { + RedisReplyBuilder* rb = absl::GetFlag(FLAGS_use_new_io) ? new RedisReplyBuilder2(stream) + : new RedisReplyBuilder(stream); + rbuilder_.reset(rb); break; + } case Protocol::MEMCACHE: rbuilder_.reset(new MCReplyBuilder(stream)); break; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 1f49a72cf..49d6d8a1d 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -140,10 +140,6 @@ void SinkReplyBuilder::ExpectReply() { has_replied_ = false; } -bool SinkReplyBuilder::HasReplied() const { - return has_replied_; -} - void SinkReplyBuilder::SendError(ErrorReply error) { if (error.status) return SendError(*error.status); @@ -215,7 +211,8 @@ void SinkReplyBuilder2::SendError(ErrorReply error) { void SinkReplyBuilder2::SendError(OpStatus status) { if (status == OpStatus::OK) - return SendOk(); + return SendSimpleString("OK"); + // return SendOk(); SendError(StatusToMsg(status)); } @@ -294,7 +291,7 @@ void SinkReplyBuilder2::FinishScope() { if (vecs_[i].iov_base >= ib.data() && vecs_[i].iov_base <= ib.data() + ib.size()) continue; // this is a piece - DCHECK_LE(buffer_.AppendLen(), vecs_[i].iov_len); + DCHECK_LE(vecs_[i].iov_len, buffer_.AppendLen()); void* dest = buffer_.AppendBuffer().data(); memcpy(dest, vecs_[i].iov_base, vecs_[i].iov_len); buffer_.CommitWrite(vecs_[i].iov_len); @@ -529,7 +526,7 @@ void RedisReplyBuilder::SendLong(long num) { SendRaw(str); } -void RedisReplyBuilder::SendScoredArray(const std::vector>& arr, +void RedisReplyBuilder::SendScoredArray(absl::Span> arr, bool with_scores) { ReplyAggregator agg(this); if (!with_scores) { @@ -919,7 +916,7 @@ void RedisReplyBuilder2Base::SendVerbatimString(std::string_view str, VerbatimFo WritePiece(kCRLF); } -void RedisReplyBuilder2::SendSimpleStrArr(const facade::ArgRange& strs) { +void RedisReplyBuilder2::SendSimpleStrArr2(const facade::ArgRange& strs) { ReplyScope scope(this); StartArray(strs.Size()); for (std::string_view str : strs) @@ -962,4 +959,15 @@ void RedisReplyBuilder2::SendEmptyArray() { StartArray(0); } +void RedisReplyBuilder2::SendMGetResponse(SinkReplyBuilder::MGetResponse resp) { + ReplyScope scope(this); + StartArray(resp.resp_arr.size()); + for (const auto& entry : resp.resp_arr) { + if (entry) + SendBulkString(entry->value); + else + SendNull(); + } +} + } // namespace facade diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 81f5489a8..ad38f459b 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -99,19 +99,19 @@ class SinkReplyBuilder { // In order to reduce interrupt rate we allow coalescing responses together using // Batch mode. It is controlled by Connection state machine because it makes sense only // when pipelined requests are arriving. - void SetBatchMode(bool batch); + virtual void SetBatchMode(bool batch); - void FlushBatch(); + virtual void FlushBatch(); // Used for QUIT - > should move to conn_context? - void CloseConnection(); + virtual void CloseConnection(); - std::error_code GetError() const { + virtual std::error_code GetError() const { return ec_; } bool IsSendActive() const { - return send_active_; + return send_active_; // BROKEN } struct ReplyAggregator { @@ -137,7 +137,9 @@ class SinkReplyBuilder { }; void ExpectReply(); - bool HasReplied() const; + bool HasReplied() const { + return true; // WE break it for now + } virtual size_t UsedMemory() const; @@ -147,14 +149,14 @@ class SinkReplyBuilder { static void ResetThreadLocalStats(); + virtual void StartAggregate(); + virtual void StopAggregate(); + protected: void SendRaw(std::string_view str); // Sends raw without any formatting. void Send(const iovec* v, uint32_t len); - void StartAggregate(); - void StopAggregate(); - std::string batch_; ::io::Sink* sink_; std::error_code ec_; @@ -359,7 +361,7 @@ class RedisReplyBuilder : public SinkReplyBuilder { RedisReplyBuilder(::io::Sink* stream); - void SetResp3(bool is_resp3); + virtual void SetResp3(bool is_resp3); bool IsResp3() const { return is_resp3_; } @@ -385,7 +387,7 @@ class RedisReplyBuilder : public SinkReplyBuilder { virtual void SendBulkString(std::string_view str); virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT); - virtual void SendScoredArray(const std::vector>& arr, + virtual void SendScoredArray(absl::Span> arr, bool with_scores); void StartArray(unsigned len); // StartCollection(len, ARRAY) @@ -402,45 +404,75 @@ class RedisReplyBuilder : public SinkReplyBuilder { }; // Redis reply builder interface for sending RESP data. -class RedisReplyBuilder2Base : public SinkReplyBuilder2 { +class RedisReplyBuilder2Base : public SinkReplyBuilder2, public RedisReplyBuilder { public: - enum CollectionType { ARRAY, SET, MAP, PUSH }; + using CollectionType = RedisReplyBuilder::CollectionType; + using VerbatimFormat = RedisReplyBuilder::VerbatimFormat; - enum VerbatimFormat { TXT, MARKDOWN }; - - explicit RedisReplyBuilder2Base(io::Sink* sink) : SinkReplyBuilder2(sink) { + explicit RedisReplyBuilder2Base(io::Sink* sink) + : SinkReplyBuilder2(sink), RedisReplyBuilder(nullptr) { } ~RedisReplyBuilder2Base() override = default; - virtual void SendNull(); + void SendNull() override; + void SendSimpleString(std::string_view str) override; - virtual void SendBulkString(std::string_view str); // RESP: Blob String + void SendBulkString(std::string_view str) override; // RESP: Blob String void SendLong(long val) override; - virtual void SendDouble(double val); // RESP: Number + void SendDouble(double val) override; // RESP: Number - virtual void SendNullArray(); - virtual void StartCollection(unsigned len, CollectionType ct); + void SendNullArray() override; + void StartCollection(unsigned len, CollectionType ct) override; using SinkReplyBuilder2::SendError; void SendError(std::string_view str, std::string_view type = {}) override; void SendProtocolError(std::string_view str) override; static char* FormatDouble(double d, char* dest, unsigned len); - virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT); + virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT) override; bool IsResp3() const { return resp3_; } - void SetResp3(bool resp3) { + // REMOVE THIS override + void SetResp3(bool resp3) override { resp3_ = resp3; } + // REMOVE THIS + void SetBatchMode(bool mode) override { + SinkReplyBuilder2::SetBatchMode(mode); + } + + void StartAggregate() override { + aggregators_.emplace_back(SinkReplyBuilder2::ReplyAggregator(this)); + } + + void StopAggregate() override { + aggregators_.pop_back(); + } + + void FlushBatch() override { + SinkReplyBuilder2::Flush(); + } + + // REMOVE THIS + + void CloseConnection() override { + SinkReplyBuilder2::CloseConnection(); + } + + std::error_code GetError() const override { + return SinkReplyBuilder2::GetError(); + } + private: void WriteIntWithPrefix(char prefix, int64_t val); // FastIntToBuffer directly into ReservePiece + std::vector aggregators_; bool resp3_ = false; }; @@ -452,15 +484,27 @@ class RedisReplyBuilder2 : public RedisReplyBuilder2Base { ~RedisReplyBuilder2() override = default; - void SendSimpleStrArr(const facade::ArgRange& strs); + void SendSimpleStrArr2(const facade::ArgRange& strs); + void SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct = ARRAY); - void SendScoredArray(absl::Span> arr, bool with_scores); + void SendScoredArray(absl::Span> arr, + bool with_scores) override; + + void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) { + SendSimpleStrArr2(arr); + } + void SendStringArr(RedisReplyBuilder::StrSpan arr, CollectionType type = ARRAY) override { + SendBulkStrArr(arr, type); + } void SendStored() final; void SendSetSkipped() final; void StartArray(unsigned len); - void SendEmptyArray(); + void SendEmptyArray() override; + + // TODO: Remove + void SendMGetResponse(SinkReplyBuilder::MGetResponse resp) override; static std::string SerializeCommmand(std::string_view cmd); }; diff --git a/src/facade/reply_builder_test.cc b/src/facade/reply_builder_test.cc index 60a915536..e71aef84b 100644 --- a/src/facade/reply_builder_test.cc +++ b/src/facade/reply_builder_test.cc @@ -205,7 +205,7 @@ RedisReplyBuilderTest::ParsingResults RedisReplyBuilderTest::Parse() { TEST_F(RedisReplyBuilderTest, MessageSend) { // Test each message that is "sent" to the sink - builder_->SendOk(); + builder_->SinkReplyBuilder2::SendOk(); ASSERT_EQ(TakePayload(), kOKMessage); builder_->StartArray(10); diff --git a/src/facade/reply_capture.cc b/src/facade/reply_capture.cc index 9f0163368..5589b1c33 100644 --- a/src/facade/reply_capture.cc +++ b/src/facade/reply_capture.cc @@ -3,6 +3,7 @@ // #include "facade/reply_capture.h" +#include "absl/types/span.h" #include "base/logging.h" #include "reply_capture.h" @@ -81,10 +82,11 @@ void CapturingReplyBuilder::SendBulkString(std::string_view str) { Capture(BulkString{string{str}}); } -void CapturingReplyBuilder::SendScoredArray(const std::vector>& arr, +void CapturingReplyBuilder::SendScoredArray(absl::Span> arr, bool with_scores) { SKIP_LESS(ReplyMode::FULL); - Capture(ScoredArray{arr, with_scores}); + std::vector> values(arr.begin(), arr.end()); + Capture(ScoredArray{std::move(values), with_scores}); } void CapturingReplyBuilder::StartCollection(unsigned len, CollectionType type) { @@ -219,7 +221,8 @@ void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) { current_ = monostate{}; } -optional CapturingReplyBuilder::GetError(const Payload& pl) { +optional CapturingReplyBuilder::TryExtractError( + const Payload& pl) { if (auto* err = get_if(&pl); err != nullptr) { return ErrorRef{err->first, err->second}; } diff --git a/src/facade/reply_capture.h b/src/facade/reply_capture.h index d0d05d804..44eb708ce 100644 --- a/src/facade/reply_capture.h +++ b/src/facade/reply_capture.h @@ -42,7 +42,7 @@ class CapturingReplyBuilder : public RedisReplyBuilder { void SendSimpleString(std::string_view str) override; void SendBulkString(std::string_view str) override; - void SendScoredArray(const std::vector>& arr, + void SendScoredArray(absl::Span> arr, bool with_scores) override; void StartCollection(unsigned len, CollectionType type) override; @@ -87,7 +87,7 @@ class CapturingReplyBuilder : public RedisReplyBuilder { static void Apply(Payload&& pl, RedisReplyBuilder* builder); // If an error is stored inside payload, get a reference to it. - static std::optional GetError(const Payload& pl); + static std::optional TryExtractError(const Payload& pl); struct CollectionPayload { CollectionPayload(unsigned len, CollectionType type); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 29f0a9af9..eba7a43a2 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -297,7 +297,7 @@ class InterpreterReplier : public RedisReplyBuilder { void SendBulkString(std::string_view str) final; void StartCollection(unsigned len, CollectionType type) final; - void SendScoredArray(const std::vector>& arr, + void SendScoredArray(absl::Span> arr, bool with_scores) final; private: @@ -472,7 +472,7 @@ void InterpreterReplier::StartCollection(unsigned len, CollectionType) { } } -void InterpreterReplier::SendScoredArray(const std::vector>& arr, +void InterpreterReplier::SendScoredArray(absl::Span> arr, bool with_scores) { if (with_scores) { if (IsResp3()) { @@ -1727,7 +1727,7 @@ optional Service::FlushEvalAsyncCmds(ConnectionC info->async_cmds.clear(); auto reply = crb.Take(); - return CapturingReplyBuilder::GetError(reply) ? make_optional(std::move(reply)) : nullopt; + return CapturingReplyBuilder::TryExtractError(reply) ? make_optional(std::move(reply)) : nullopt; } void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca) { @@ -2030,7 +2030,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret result = interpreter->RunFunction(eval_args.sha, &error); if (auto err = FlushEvalAsyncCmds(cntx, true); err) { - auto err_ref = CapturingReplyBuilder::GetError(*err); + auto err_ref = CapturingReplyBuilder::TryExtractError(*err); result = Interpreter::RUN_ERR; error = absl::StrCat(err_ref->first); } diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 814db9ca9..5872252d3 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -239,7 +239,7 @@ bool MultiCommandSquasher::ExecuteSquashed() { auto& replies = sharded_[idx].replies; CHECK(!replies.empty()); - aborted |= error_abort_ && CapturingReplyBuilder::GetError(replies.back()); + aborted |= error_abort_ && CapturingReplyBuilder::TryExtractError(replies.back()); CapturingReplyBuilder::Apply(std::move(replies.back()), rb); replies.pop_back(); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index ad1f0a859..a4b25b31c 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2606,7 +2606,7 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { service_->DispatchCommand(absl::MakeSpan(arg_vec), &cntx); auto response = crb.Take(); - if (auto err = facade::CapturingReplyBuilder::GetError(response); err) { + if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) { LOG(ERROR) << "Bad index definition: " << def << " " << err->first; } } diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 000b60bd2..852983159 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -87,9 +87,6 @@ class DflyInstance: if threads > 1: self.args["num_shards"] = threads - 1 - # Add 1 byte limit for big values - self.args["serialization_max_chunk_size"] = 1 - def __del__(self): assert self.proc == None @@ -326,9 +323,6 @@ class DflyInstance: mem_info = process.memory_info() return mem_info.rss - def clear_max_chunk_flag(self): - del self.args["serialization_max_chunk_size"] - class DflyInstanceFactory: """ @@ -340,7 +334,7 @@ class DflyInstanceFactory: self.params = params self.instances = [] - def create(self, existing_port=None, path=None, **kwargs) -> DflyInstance: + def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyInstance: args = {**self.args, **kwargs} args.setdefault("dbfilename", "") args.setdefault("noversion_check", None) @@ -351,6 +345,13 @@ class DflyInstanceFactory: args.setdefault("jsonpathv2") args.setdefault("log_dir", self.params.log_dir) + if version >= 1.21: + # Add 1 byte limit for big values + args.setdefault("serialization_max_chunk_size", 1) + + if version >= 1.21: + args.setdefault("use_new_io") + for k, v in args.items(): args[k] = v.format(**self.params.env) if isinstance(v, str) else v diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 52aed51e2..763ab6a46 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2386,8 +2386,7 @@ async def test_replicate_old_master( dfly_version = "v1.19.2" released_dfly_path = download_dragonfly_release(dfly_version) - master = df_factory.create(path=released_dfly_path, cluster_mode=cluster_mode) - master.clear_max_chunk_flag() + master = df_factory.create(version=1.19, path=released_dfly_path, cluster_mode=cluster_mode) replica = df_factory.create( cluster_mode=cluster_mode, announce_ip=announce_ip, announce_port=announce_port )