diff --git a/helio b/helio index 382695d92..33673ea29 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 382695d925d79372e2cb53769ffaaf2f86d04365 +Subproject commit 33673ea2952529c4e8a9dca9903e38a3dbafef58 diff --git a/src/core/string_set.h b/src/core/string_set.h index 51ee9e881..d250c219d 100644 --- a/src/core/string_set.h +++ b/src/core/string_set.h @@ -98,6 +98,7 @@ class StringSet : public DenseSet { } uint32_t Scan(uint32_t, const std::function&) const; + iterator Find(std::string_view member) { return iterator{FindIt(&member, 1)}; } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index f0fe055e6..f567ccb18 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -12,6 +12,7 @@ #include #include +#include "base/iterator.h" #include "facade/op_status.h" namespace facade { @@ -37,6 +38,8 @@ enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 }; using MutableSlice = absl::Span; using CmdArgList = absl::Span; using CmdArgVec = std::vector; +using ArgSlice = absl::Span; +using OwnedArgSlice = absl::Span; inline std::string_view ToSV(MutableSlice slice) { return std::string_view{slice.data(), slice.size()}; @@ -46,6 +49,50 @@ inline std::string_view ToSV(std::string_view slice) { return slice; } +inline std::string_view ToSV(const std::string& slice) { + return slice; +} + +inline std::string_view ToSV(std::string&& slice) = delete; + +constexpr auto kToSV = [](auto&& v) { return ToSV(std::forward(v)); }; + +inline std::string_view ArgS(CmdArgList args, size_t i) { + auto arg = args[i]; + return {arg.data(), arg.size()}; +} + +inline auto ArgS(CmdArgList args) { + return base::it::Transform(kToSV, base::it::Range{args.begin(), args.end()}); +} + +struct ArgRange { + ArgRange(ArgRange&&) = default; + ArgRange(const ArgRange&) = default; + ArgRange(ArgRange& range) : ArgRange((const ArgRange&)range) { + } + + template ArgRange(T&& span) : span(std::forward(span)) { + } + + size_t Size() const { + return std::visit([](const auto& span) { return span.size(); }, span); + } + + auto Range() const { + return base::it::Wrap(kToSV, span); + } + + auto begin() const { + return Range().first; + } + + auto end() const { + return Range().second; + } + + std::variant span; +}; struct ConnectionStats { size_t read_buf_capacity = 0; // total capacity of input buffers uint64_t dispatch_queue_entries = 0; // total number of dispatch queue entries @@ -120,7 +167,7 @@ struct ErrorReply { } std::string_view ToSv() const { - return std::visit([](auto& str) { return std::string_view(str); }, message); + return std::visit(kToSV, message); } std::variant message; @@ -132,11 +179,6 @@ inline MutableSlice ToMSS(absl::Span span) { return MutableSlice{reinterpret_cast(span.data()), span.size()}; } -inline std::string_view ArgS(CmdArgList args, size_t i) { - auto arg = args[i]; - return std::string_view(arg.data(), arg.size()); -} - constexpr inline unsigned long long operator""_MB(unsigned long long x) { return 1024L * 1024L * x; } diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index b39d79463..d0f3b9df7 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -140,8 +140,7 @@ void SinkReplyBuilder::SendError(ErrorReply error) { if (error.status) return SendError(*error.status); - string_view message_sv = visit([](auto&& str) -> string_view { return str; }, error.message); - SendError(message_sv, error.kind); + SendError(error.ToSv(), error.kind); } void SinkReplyBuilder::SendError(OpStatus status) { @@ -264,14 +263,6 @@ void MCReplyBuilder::SendNotFound() { SendSimpleString("NOT_FOUND"); } -size_t RedisReplyBuilder::WrappedStrSpan::Size() const { - return visit([](auto arr) { return arr.size(); }, (const StrSpan&)*this); -} - -string_view RedisReplyBuilder::WrappedStrSpan::operator[](size_t i) const { - return visit([i](auto arr) { return string_view{arr[i]}; }, (const StrSpan&)*this); -} - char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len) { StringBuilder sb(dest, dest_len); CHECK(dfly_conv.ToShortest(val, &sb)); @@ -504,12 +495,9 @@ void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) { } void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) { - WrappedStrSpan warr{arr}; - - string res = absl::StrCat("*", warr.Size(), kCRLF); - - for (unsigned i = 0; i < warr.Size(); i++) - StrAppend(&res, "+", warr[i], kCRLF); + string res = absl::StrCat("*", arr.Size(), kCRLF); + for (std::string_view str : arr) + StrAppend(&res, "+", str, kCRLF); SendRaw(res); } @@ -523,16 +511,15 @@ void RedisReplyBuilder::SendEmptyArray() { } void RedisReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) { - WrappedStrSpan warr{arr}; - - if (type == ARRAY && warr.Size() == 0) { + if (type == ARRAY && arr.Size() == 0) { SendRaw("*0\r\n"); return; } - auto cb = [&](size_t i) { return warr[i]; }; - - SendStringArrInternal(warr.Size(), std::move(cb), type); + auto cb = [&](size_t i) { + return visit([i](auto& span) { return facade::ToSV(span[i]); }, arr.span); + }; + SendStringArrInternal(arr.Size(), std::move(cb), type); } void RedisReplyBuilder::StartArray(unsigned len) { diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 8024eb4c1..9337c7d87 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -203,7 +203,7 @@ class RedisReplyBuilder : public SinkReplyBuilder { enum VerbatimFormat { TXT, MARKDOWN }; - using StrSpan = std::variant, absl::Span>; + using StrSpan = facade::ArgRange; RedisReplyBuilder(::io::Sink* stream); @@ -242,12 +242,6 @@ class RedisReplyBuilder : public SinkReplyBuilder { static char* FormatDouble(double val, char* dest, unsigned dest_len); - protected: - struct WrappedStrSpan : public StrSpan { - size_t Size() const; - std::string_view operator[](size_t index) const; - }; - private: void SendStringArrInternal(size_t size, absl::FunctionRef producer, CollectionType type); diff --git a/src/facade/reply_capture.cc b/src/facade/reply_capture.cc index 7a8b0c30a..c81e9f5ec 100644 --- a/src/facade/reply_capture.cc +++ b/src/facade/reply_capture.cc @@ -23,10 +23,7 @@ void CapturingReplyBuilder::SendError(std::string_view str, std::string_view typ void CapturingReplyBuilder::SendError(ErrorReply error) { SKIP_LESS(ReplyMode::ONLY_ERR); - - string message = - visit([](auto&& str) -> string { return string{std::move(str)}; }, error.message); - Capture(Error{std::move(message), error.kind}); + Capture(Error{error.ToSv(), error.kind}); } void CapturingReplyBuilder::SendMGetResponse(MGetResponse resp) { @@ -53,12 +50,7 @@ void CapturingReplyBuilder::SendSimpleStrArr(StrSpan arr) { SKIP_LESS(ReplyMode::FULL); DCHECK_EQ(current_.index(), 0u); - WrappedStrSpan warr{arr}; - vector sarr(warr.Size()); - for (unsigned i = 0; i < warr.Size(); i++) - sarr[i] = warr[i]; - - Capture(StrArrPayload{true, ARRAY, std::move(sarr)}); + Capture(StrArrPayload{true, ARRAY, {arr.begin(), arr.end()}}); } void CapturingReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) { @@ -66,12 +58,7 @@ void CapturingReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) { DCHECK_EQ(current_.index(), 0u); // TODO: 1. Allocate all strings at once 2. Allow movable types - WrappedStrSpan warr{arr}; - vector sarr(warr.Size()); - for (unsigned i = 0; i < warr.Size(); i++) - sarr[i] = warr[i]; - - Capture(StrArrPayload{false, type, std::move(sarr)}); + Capture(StrArrPayload{false, type, {arr.begin(), arr.end()}}); } void CapturingReplyBuilder::SendNull() { diff --git a/src/server/acl/acl_family.cc b/src/server/acl/acl_family.cc index 62c4290db..3a58e77fb 100644 --- a/src/server/acl/acl_family.cc +++ b/src/server/acl/acl_family.cc @@ -281,7 +281,7 @@ GenericError AclFamily::LoadToRegistryFromFile(std::string_view full_path, std::vector requests; for (auto& cmds : *materialized) { - auto req = ParseAclSetUser&>(cmds, *cmd_registry_, true); + auto req = ParseAclSetUser(cmds, *cmd_registry_, true); if (std::holds_alternative(req)) { auto error = std::move(std::get(req)); LOG(WARNING) << "Error while parsing aclfile: " << error.ToSv(); diff --git a/src/server/acl/helpers.cc b/src/server/acl/helpers.cc index d54f4980b..4493eb334 100644 --- a/src/server/acl/helpers.cc +++ b/src/server/acl/helpers.cc @@ -254,13 +254,12 @@ MaterializedContents MaterializeFileContents(std::vector* usernames using facade::ErrorReply; -template -std::variant ParseAclSetUser(T args, +std::variant ParseAclSetUser(facade::ArgRange args, const CommandRegistry& registry, bool hashed, bool has_all_keys) { User::UpdateRequest req; - for (auto& arg : args) { + for (std::string_view arg : args) { if (auto pass = MaybeParsePassword(facade::ToSV(arg), hashed); pass) { if (req.password) { return ErrorReply("Only one password is allowed"); @@ -291,18 +290,7 @@ std::variant ParseAclSetUser(T args, continue; } - std::string buffer; - std::string_view command; - if constexpr (std::is_same_v) { - ToUpper(&arg); - command = facade::ToSV(arg); - } else { - // Guaranteed SSO because commands are small - buffer = arg; - absl::Span view{buffer.data(), buffer.size()}; - ToUpper(&view); - command = buffer; - } + std::string command = absl::AsciiStrToUpper(arg); if (auto status = MaybeParseStatus(command); status) { if (req.is_active) { @@ -338,14 +326,6 @@ std::variant ParseAclSetUser(T args, using facade::CmdArgList; -template std::variant -ParseAclSetUser&>(std::vector&, - const CommandRegistry& registry, bool hashed, - bool has_all_keys); - -template std::variant ParseAclSetUser( - CmdArgList args, const CommandRegistry& registry, bool hashed, bool has_all_keys); - std::string AclKeysToString(const AclKeys& keys) { if (keys.all_keys) { return "~*"; diff --git a/src/server/acl/helpers.h b/src/server/acl/helpers.h index 85483f40a..0840ab817 100644 --- a/src/server/acl/helpers.h +++ b/src/server/acl/helpers.h @@ -36,9 +36,9 @@ using OptCommand = std::optional>; std::pair MaybeParseAclCommand(std::string_view command, const CommandRegistry& registry); -template std::variant ParseAclSetUser( - T args, const CommandRegistry& registry, bool hashed = false, bool has_all_keys = false); + facade::ArgRange args, const CommandRegistry& registry, bool hashed = false, + bool has_all_keys = false); using MaterializedContents = std::optional>>; diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 7eca07b43..307e60b41 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -118,7 +118,7 @@ bool BlockingController::DbWatchTable::AddAwakeEvent(string_view key) { } // Removes tx from its watch queues if tx appears there. -void BlockingController::FinalizeWatched(const ShardArgs& args, Transaction* tx) { +void BlockingController::FinalizeWatched(Keys keys, Transaction* tx) { DCHECK(tx); VLOG(1) << "FinalizeBlocking [" << owner_->shard_id() << "]" << tx->DebugId(); @@ -135,7 +135,7 @@ void BlockingController::FinalizeWatched(const ShardArgs& args, Transaction* tx) // Add keys of processed transaction so we could awake the next one in the queue // in case those keys still exist. - for (string_view key : args) { + for (string_view key : base::it::Wrap(facade::kToSV, keys)) { bool removed_awakened = wt.UnwatchTx(key, tx); CHECK(!removed_awakened || removed) << tx->DebugId() << " " << key << " " << tx->DEBUG_GetLocalMask(owner_->shard_id()); @@ -197,8 +197,7 @@ void BlockingController::NotifyPending() { awakened_indices_.clear(); } -void BlockingController::AddWatched(const ShardArgs& watch_keys, KeyReadyChecker krc, - Transaction* trans) { +void BlockingController::AddWatched(Keys watch_keys, KeyReadyChecker krc, Transaction* trans) { auto [dbit, added] = watched_dbs_.emplace(trans->GetDbIndex(), nullptr); if (added) { dbit->second.reset(new DbWatchTable); @@ -206,7 +205,7 @@ void BlockingController::AddWatched(const ShardArgs& watch_keys, KeyReadyChecker DbWatchTable& wt = *dbit->second; - for (auto key : watch_keys) { + for (auto key : base::it::Wrap(facade::kToSV, watch_keys)) { auto [res, inserted] = wt.queue_map.emplace(key, nullptr); if (inserted) { res->second.reset(new WatchQueue); diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index 251811f4a..94375e7ab 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -21,6 +21,8 @@ class BlockingController { explicit BlockingController(EngineShard* owner); ~BlockingController(); + using Keys = std::variant; + bool HasAwakedTransaction() const { return !awakened_transactions_.empty(); } @@ -29,7 +31,7 @@ class BlockingController { return awakened_transactions_; } - void FinalizeWatched(const ShardArgs& args, Transaction* tx); + void FinalizeWatched(Keys keys, Transaction* tx); // go over potential wakened keys, verify them and activate watch queues. void NotifyPending(); @@ -38,7 +40,7 @@ class BlockingController { // TODO: consider moving all watched functions to // EngineShard with separate per db map. //! AddWatched adds a transaction to the blocking queue. - void AddWatched(const ShardArgs& watch_keys, KeyReadyChecker krc, Transaction* me); + void AddWatched(Keys watch_keys, KeyReadyChecker krc, Transaction* me); // Called from operations that create keys like lpush, rename etc. void AwakeWatched(DbIndex db_index, std::string_view db_key); diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index e521e150d..6619e38a3 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -157,15 +157,15 @@ vector ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, ChannelStoreUpdater csu{pattern, to_add, conn, uint32_t(tid)}; // Gather all the channels we need to subscribe to / remove. - for (size_t i = 0; i < args.size(); ++i) { - string_view channel = ArgS(args, i); + size_t i = 0; + for (string_view channel : ArgS(args)) { if (to_add && local_store.emplace(channel).second) csu.Record(channel); else if (!to_add && local_store.erase(channel) > 0) csu.Record(channel); if (to_reply) - result[i] = sinfo.SubscriptionCount(); + result[i++] = sinfo.SubscriptionCount(); } csu.Apply(); diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 977fe4658..4938421ef 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -35,22 +35,15 @@ void JournalWriter::Write(std::string_view sv) { sink_->Write(io::Buffer(sv)); } -// element count, total size -template pair SliceSize(const C& list) { - size_t res = 0, count = 0; - for (auto a : list) { - res += a.size(); - ++count; - } - return {count, res}; -} - void JournalWriter::Write(const journal::Entry::Payload& payload) { if (payload.cmd.empty()) return; - auto [num_elems, size] = - std::visit([](const auto& list) { return SliceSize(list); }, payload.args); + size_t num_elems = 0, size = 0; + for (string_view str : base::it::Wrap(facade::kToSV, payload.args)) { + num_elems++; + size += str.size(); + }; Write(1 + num_elems); @@ -58,13 +51,8 @@ void JournalWriter::Write(const journal::Entry::Payload& payload) { Write(cmd_size); Write(payload.cmd); - std::visit( - [this](const auto& list) { - for (auto v : list) { - this->Write(v); - } - }, - payload.args); + for (string_view str : base::it::Wrap(facade::kToSV, payload.args)) + this->Write(str); } void JournalWriter::Write(const journal::Entry& entry) { diff --git a/src/server/journal/types.cc b/src/server/journal/types.cc index 6b8d7f414..4c0e273f7 100644 --- a/src/server/journal/types.cc +++ b/src/server/journal/types.cc @@ -9,7 +9,6 @@ namespace dfly::journal { using namespace std; -using facade::ToSV; void AppendPrefix(string_view cmd, string* dest) { absl::StrAppend(dest, ", cmd='"); @@ -23,22 +22,13 @@ void AppendSuffix(string* dest) { absl::StrAppend(dest, "]"); } -template string Concat(const C& list) { - string res; - for (auto arg : list) { - absl::StrAppend(&res, "'"); - absl::StrAppend(&res, ToSV(arg)); - absl::StrAppend(&res, "',"); - } - return res; -} - string Entry::ToString() const { string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid); if (HasPayload()) { AppendPrefix(payload.cmd, &rv); - rv += visit([](const auto& list) { return Concat(list); }, payload.args); + for (string_view arg : base::it::Wrap(facade::kToSV, payload.args)) + absl::StrAppend(&rv, "'", facade::ToSV(arg), "',"); AppendSuffix(&rv); } else { absl::StrAppend(&rv, ", empty"); diff --git a/src/server/json_family.cc b/src/server/json_family.cc index f12b4a723..5b765839f 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -775,13 +775,13 @@ OpResult> OpObjKeys(const OpArgs& op_args, string_view key, // Retruns array of string lengths after a successful operation. OpResult> OpStrAppend(const OpArgs& op_args, string_view key, string_view path, - JsonPathV2 expression, const vector& strs) { + JsonPathV2 expression, facade::ArgRange strs) { vector vec; OpStatus status; auto cb = [&](const auto&, JsonType* val) { if (val->is_string()) { string new_val = val->as_string(); - for (auto& str : strs) { + for (string_view str : strs) { new_val += str; } @@ -1822,14 +1822,11 @@ void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) { string_view path = ArgS(args, 1); JsonPathV2 expression = PARSE_PATHV2(path); - - vector strs; - for (size_t i = 2; i < args.size(); ++i) { - strs.emplace_back(ArgS(args, i)); - } + auto strs = args.subspan(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpStrAppend(t->GetOpArgs(shard), key, path, std::move(expression), strs); + return OpStrAppend(t->GetOpArgs(shard), key, path, std::move(expression), + facade::ArgRange{strs}); }; Transaction* trans = cntx->transaction; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 9515c7239..8f9db6d20 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -158,22 +158,6 @@ struct CircularMessages { // Used to recover logs for BLPOP failures. See OpBPop. thread_local CircularMessages debugMessages{50}; -// A bit awkward translation from a single key to ShardArgs. -// We create a mutable slice (which will never be mutated) from the key, then we create -// a CmdArgList of size 1 that references mslice and finally -// we reference the first element in the CmdArgList via islice. -struct SingleArg { - MutableSlice mslice; - IndexSlice islice{0, 1}; - - SingleArg(string_view arg) : mslice(const_cast(arg.data()), arg.size()) { - } - - ShardArgs Get() { - return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)}; - } -}; - class BPopPusher { public: BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir); @@ -318,7 +302,7 @@ OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool } OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, - bool skip_notexist, ArgSlice vals, bool journal_rewrite) { + bool skip_notexist, facade::ArgRange vals, bool journal_rewrite) { EngineShard* es = op_args.shard; DbSlice::AddOrFindResult res; @@ -350,7 +334,7 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d // Left push is LIST_HEAD. int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL; - for (auto v : vals) { + for (string_view v : vals) { es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size()); quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos); } @@ -368,7 +352,7 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d if (journal_rewrite && op_args.shard->journal()) { string command = dir == ListDir::LEFT ? "LPUSH" : "RPUSH"; - vector mapped(vals.size() + 1); + vector mapped(vals.Size() + 1); mapped[0] = key; std::copy(vals.begin(), vals.end(), mapped.begin() + 1); RecordJournal(op_args, command, mapped, 2); @@ -456,17 +440,14 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view string_view val{find_res[0].value()}; DVLOG(1) << "Pushing value: " << val << " to list: " << dest; - ArgSlice span{&val, 1}; - OpPush(op_args, key, dest_dir, false, span, true); + OpPush(op_args, key, dest_dir, false, ArgSlice{val}, true); // blocking_controller does not have to be set with non-blocking transactions. if (shard->blocking_controller()) { // hack, again. since we hacked which queue we are waiting on (see RunPair) // we must clean-up src key here manually. See RunPair why we do this. // in short- we suspended on "src" on both shards. - - SingleArg single_arg{src}; - shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t); + shard->blocking_controller()->FinalizeWatched(ArgSlice({src}), t); } } else { DVLOG(1) << "Popping value from list: " << key; @@ -891,8 +872,7 @@ OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { return op_res; } - SingleArg single_arg{pop_key_}; - auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice(&pop_key_, 1); }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { @@ -919,13 +899,11 @@ OpResult BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) { return op_res; } - SingleArg single_arg(this->pop_key_); - // a hack: we watch in both shards for pop_key but only in the source shard it's relevant. // Therefore we follow the regular flow of watching the key but for the destination shard it // will never be triggerred. // This allows us to run Transaction::Execute on watched transactions in both shards. - auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice(&this->pop_key_, 1); }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { @@ -1255,13 +1233,9 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 0); - vector vals(args.size() - 1); - for (size_t i = 1; i < args.size(); ++i) { - vals[i - 1] = ArgS(args, i); - } - absl::Span span{vals.data(), vals.size()}; + auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, span, false); + return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, args.subspan(1), false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 7f0a4d92e..399d25a05 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -407,24 +407,21 @@ void InterpreterReplier::SendMGetResponse(MGetResponse resp) { } void InterpreterReplier::SendSimpleStrArr(StrSpan arr) { - WrappedStrSpan warr{arr}; - explr_->OnArrayStart(warr.Size()); - for (unsigned i = 0; i < warr.Size(); i++) - explr_->OnString(warr[i]); + explr_->OnArrayStart(arr.Size()); + for (string_view str : arr) + explr_->OnString(str); explr_->OnArrayEnd(); } void InterpreterReplier::SendNullArray() { - SendSimpleStrArr({}); + SendSimpleStrArr(ArgSlice{}); PostItem(); } void InterpreterReplier::SendStringArr(StrSpan arr, CollectionType) { - WrappedStrSpan warr{arr}; - size_t size = warr.Size(); - explr_->OnArrayStart(size); - for (size_t i = 0; i < size; i++) - explr_->OnString(warr[i]); + explr_->OnArrayStart(arr.Size()); + for (string_view str : arr) + explr_->OnString(str); explr_->OnArrayEnd(); PostItem(); } @@ -1662,8 +1659,8 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) { // Duplicate keys are stored to keep correct count. exec_info.watched_existed += keys_existed.load(memory_order_relaxed); - for (size_t i = 0; i < args.size(); i++) { - exec_info.watched_keys.emplace_back(cntx->db_index(), ArgS(args, i)); + for (std::string_view key : ArgS(args)) { + exec_info.watched_keys.emplace_back(cntx->db_index(), key); } return cntx->SendOk(); @@ -2341,12 +2338,10 @@ void Service::PubsubPatterns(ConnectionContext* cntx) { } void Service::PubsubNumSub(CmdArgList args, ConnectionContext* cntx) { - int channels_size = args.size(); auto* rb = static_cast(cntx->reply_builder()); - rb->StartArray(channels_size * 2); + rb->StartArray(args.size() * 2); - for (auto i = 0; i < channels_size; i++) { - auto channel = ArgS(args, i); + for (string_view channel : ArgS(args)) { rb->SendBulkString(channel); rb->SendLong(ServerState::tlocal()->channel_store()->FetchSubscribers(channel).size()); } diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 561cd32f7..a584457b2 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -40,6 +40,13 @@ using SetType = pair; namespace { +// Possible sources of new set entries +using NewEntries = std::variant>; + +auto EntriesRange(const NewEntries& entries) { + return base::it::Wrap(facade::kToSV, entries); +} + constexpr uint32_t kMaxIntSetEntries = 256; bool IsDenseEncoding(const CompactObj& co) { @@ -67,7 +74,7 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) { return is; } -pair RemoveStrSet(uint32_t now_sec, ArgSlice vals, CompactObj* set) { +pair RemoveStrSet(uint32_t now_sec, facade::ArgRange vals, CompactObj* set) { unsigned removed = 0; bool isempty = false; DCHECK(IsDenseEncoding(*set)); @@ -76,7 +83,7 @@ pair RemoveStrSet(uint32_t now_sec, ArgSlice vals, CompactObj* s StringSet* ss = ((StringSet*)set->RObjPtr()); ss->set_time(now_sec); - for (auto member : vals) { + for (string_view member : vals) { removed += ss->Erase(member); } @@ -86,7 +93,8 @@ pair RemoveStrSet(uint32_t now_sec, ArgSlice vals, CompactObj* s return make_pair(removed, isempty); } -unsigned AddStrSet(const DbContext& db_context, ArgSlice vals, uint32_t ttl_sec, CompactObj* dest) { +unsigned AddStrSet(const DbContext& db_context, const NewEntries& vals, uint32_t ttl_sec, + CompactObj* dest) { unsigned res = 0; DCHECK(IsDenseEncoding(*dest)); @@ -96,7 +104,7 @@ unsigned AddStrSet(const DbContext& db_context, ArgSlice vals, uint32_t ttl_sec, ss->set_time(time_now); - for (auto member : vals) { + for (auto member : EntriesRange(vals)) { res += ss->Add(member, ttl_sec); } } @@ -109,7 +117,8 @@ void InitStrSet(CompactObj* set) { } // returns (removed, isempty) -pair RemoveSet(const DbContext& db_context, ArgSlice vals, CompactObj* set) { +pair RemoveSet(const DbContext& db_context, facade::ArgRange vals, + CompactObj* set) { bool isempty = false; unsigned removed = 0; @@ -117,7 +126,7 @@ pair RemoveSet(const DbContext& db_context, ArgSlice vals, Compa intset* is = (intset*)set->RObjPtr(); long long llval; - for (auto val : vals) { + for (string_view val : vals) { if (!string2ll(val.data(), val.size(), &llval)) { continue; } @@ -134,11 +143,11 @@ pair RemoveSet(const DbContext& db_context, ArgSlice vals, Compa return make_pair(removed, isempty); } -void InitSet(ArgSlice vals, CompactObj* set) { +void InitSet(const NewEntries& vals, CompactObj* set) { bool int_set = true; long long intv; - for (auto v : vals) { + for (string_view v : EntriesRange(vals)) { if (!string2ll(v.data(), v.size(), &intv)) { int_set = false; break; @@ -245,8 +254,8 @@ int32_t GetExpiry(const DbContext& db_context, const SetType& st, string_view me } void FindInSet(StringVec& memberships, const DbContext& db_context, const SetType& st, - const vector& members) { - for (const auto& member : members) { + facade::ArgRange members) { + for (string_view member : members) { bool status = IsInSet(db_context, st, member); memberships.emplace_back(to_string(status)); } @@ -458,17 +467,18 @@ SvArray ToSvArray(const absl::flat_hash_set& set) { } // if overwrite is true then OpAdd writes vals into the key and discards its previous value. -OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite, - bool journal_update) { +OpResult OpAdd(const OpArgs& op_args, std::string_view key, const NewEntries& vals, + bool overwrite, bool journal_update) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); + auto vals_it = EntriesRange(vals); VLOG(2) << "OpAdd(" << key << ")"; // overwrite - meaning we run in the context of 2-hop operation and we want // to overwrite the key. However, if the set is empty it means we should delete the // key if it exists. - if (overwrite && vals.empty()) { + if (overwrite && (vals_it.begin() == vals_it.end())) { auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately db_slice.Del(op_args.db_cntx.db_index, it); if (journal_update && op_args.shard->journal()) { @@ -502,7 +512,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v intset* is = (intset*)inner_obj; bool success = true; - for (auto val : vals) { + for (auto val : vals_it) { bool added = false; is = IntsetAddSafe(val, is, &success, &added); res += added; @@ -534,16 +544,17 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v if (overwrite) { RecordJournal(op_args, "DEL"sv, ArgSlice{key}); } - vector mapped(vals.size() + 1); + size_t size = visit([](auto& c) { return c.size(); }, vals); + vector mapped(size + 1); mapped[0] = key; - std::copy(vals.begin(), vals.end(), mapped.begin() + 1); + std::copy(vals_it.begin(), vals_it.end(), mapped.begin() + 1); RecordJournal(op_args, "SADD"sv, mapped); } return res; } OpResult OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_sec, - ArgSlice vals) { + const NewEntries& vals) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); @@ -573,12 +584,12 @@ OpResult OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_ CHECK(IsDenseEncoding(co)); } - uint32_t res = AddStrSet(op_args.db_cntx, std::move(vals), ttl_sec, &co); + uint32_t res = AddStrSet(op_args.db_cntx, vals, ttl_sec, &co); return res; } -OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals, +OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRange vals, bool journal_rewrite) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); @@ -596,7 +607,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res->it)); } if (journal_rewrite && op_args.shard->journal()) { - vector mapped(vals.size() + 1); + vector mapped(vals.Size() + 1); mapped[0] = key; std::copy(vals.begin(), vals.end(), mapped.begin() + 1); RecordJournal(op_args, "SREM"sv, mapped); @@ -655,10 +666,11 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) { OpArgs op_args = t->GetOpArgs(es); for (auto k : largs) { if (k == src_) { - CHECK_EQ(1u, OpRem(op_args, k, {member_}, journal_rewrite_).value()); // must succeed. + CHECK_EQ(1u, + OpRem(op_args, k, ArgSlice{member_}, journal_rewrite_).value()); // must succeed. } else { DCHECK_EQ(k, dest_); - OpAdd(op_args, k, {member_}, false, journal_rewrite_); + OpAdd(op_args, k, ArgSlice(&member_, 1), false, journal_rewrite_); } } @@ -933,16 +945,17 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count StringVec result = RandMemberSet(db_cntx, co, generator, picks_count); // Remove selected members - std::vector members_to_remove{result.begin(), result.end()}; - bool is_empty = RemoveSet(db_cntx, members_to_remove, &co).second; + bool is_empty = RemoveSet(db_cntx, result, &co).second; find_res->post_updater.Run(); CHECK(!is_empty); // Replicate as SREM with removed keys, because SPOP is not deterministic. if (op_args.shard->journal()) { - members_to_remove.insert(members_to_remove.begin(), key); - RecordJournal(op_args, "SREM"sv, members_to_remove); + vector mapped(result.size() + 1); + mapped[0] = key; + copy(result.begin(), result.end(), mapped.begin() + 1); + RecordJournal(op_args, "SREM"sv, mapped); } return result; @@ -978,14 +991,10 @@ OpResult OpScan(const OpArgs& op_args, string_view key, uint64_t* cur void SAdd(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); - vector vals(args.size() - 1); - for (size_t i = 1; i < args.size(); ++i) { - vals[i - 1] = ArgS(args, i); - } - ArgSlice arg_slice{vals.data(), vals.size()}; + auto values = args.subspan(1); - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpAdd(t->GetOpArgs(shard), key, arg_slice, false, false); + auto cb = [key, values](Transaction* t, EngineShard* shard) { + return OpAdd(t->GetOpArgs(shard), key, values, false, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1022,11 +1031,7 @@ void SIsMember(CmdArgList args, ConnectionContext* cntx) { void SMIsMember(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); - - vector vals(args.size() - 1); - for (size_t i = 1; i < args.size(); ++i) { - vals[i - 1] = ArgS(args, i); - } + auto vals = args.subspan(1); StringVec memberships; memberships.reserve(vals.size()); @@ -1071,14 +1076,10 @@ void SMove(CmdArgList args, ConnectionContext* cntx) { void SRem(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); - vector vals(args.size() - 1); - for (size_t i = 1; i < args.size(); ++i) { - vals[i - 1] = ArgS(args, i); - } - ArgSlice span{vals.data(), vals.size()}; + auto vals = args.subspan(1); - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRem(t->GetOpArgs(shard), key, span, false); + auto cb = [key, vals](Transaction* t, EngineShard* shard) { + return OpRem(t->GetOpArgs(shard), key, vals, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1223,17 +1224,17 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) { return; } - SvArray result = ToSvArray(rsv.value()); + size_t result_size = rsv.value().size(); auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(t->GetOpArgs(shard), dest_key, result, true, true); + OpAdd(t->GetOpArgs(shard), dest_key, std::move(rsv.value()), true, true); } return OpStatus::OK; }; cntx->transaction->Execute(std::move(store_cb), true); - cntx->SendLong(result.size()); + cntx->SendLong(result_size); } void SMembers(CmdArgList args, ConnectionContext* cntx) { @@ -1424,18 +1425,17 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) { return; } - SvArray result = ToSvArray(unionset.value()); - + size_t result_size = unionset.value().size(); auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(t->GetOpArgs(shard), dest_key, result, true, true); + OpAdd(t->GetOpArgs(shard), dest_key, std::move(unionset.value()), true, true); } return OpStatus::OK; }; cntx->transaction->Execute(std::move(store_cb), true); - cntx->SendLong(result.size()); + cntx->SendLong(result_size); } void SScan(CmdArgList args, ConnectionContext* cntx) { @@ -1491,15 +1491,9 @@ void SAddEx(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(kInvalidIntErr); } - vector vals(args.size() - 2); - for (size_t i = 2; i < args.size(); ++i) { - vals[i - 2] = ArgS(args, i); - } - - ArgSlice arg_slice{vals.data(), vals.size()}; - + auto vals = args.subspan(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpAddEx(t->GetOpArgs(shard), key, ttl_sec, arg_slice); + return OpAddEx(t->GetOpArgs(shard), key, ttl_sec, vals); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 91eb33394..1a17ca2db 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -257,10 +257,9 @@ void Transaction::PrepareMultiFps(CmdArgList keys) { auto& tag_fps = multi_->tag_fps; tag_fps.reserve(keys.size()); - for (MutableSlice key : keys) { - string_view sv = facade::ToSV(key); - ShardId sid = Shard(sv, shard_set->size()); - tag_fps.emplace(sid, LockTag(sv).Fingerprint()); + for (string_view str : ArgS(keys)) { + ShardId sid = Shard(str, shard_set->size()); + tag_fps.emplace(sid, LockTag(str).Fingerprint()); } } @@ -1213,7 +1212,8 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p return result; } -OpStatus Transaction::WatchInShard(const ShardArgs& keys, EngineShard* shard, KeyReadyChecker krc) { +OpStatus Transaction::WatchInShard(BlockingController::Keys keys, EngineShard* shard, + KeyReadyChecker krc) { auto& sd = shard_data_[SidToId(shard->shard_id())]; CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); @@ -1221,12 +1221,12 @@ OpStatus Transaction::WatchInShard(const ShardArgs& keys, EngineShard* shard, Ke sd.local_mask &= ~OUT_OF_ORDER; shard->EnsureBlockingController()->AddWatched(keys, std::move(krc), this); - DVLOG(2) << "WatchInShard " << DebugId() << ", first_key:" << keys.Front(); + DVLOG(2) << "WatchInShard " << DebugId(); return OpStatus::OK; } -void Transaction::ExpireShardCb(const ShardArgs& wkeys, EngineShard* shard) { +void Transaction::ExpireShardCb(BlockingController::Keys keys, EngineShard* shard) { // Blocking transactions don't release keys when suspending, release them now. auto lock_args = GetLockArgs(shard->shard_id()); shard->db_slice().Release(LockMode(), lock_args); @@ -1234,7 +1234,7 @@ void Transaction::ExpireShardCb(const ShardArgs& wkeys, EngineShard* shard) { auto& sd = shard_data_[SidToId(shard->shard_id())]; sd.local_mask &= ~KEYLOCK_ACQUIRED; - shard->blocking_controller()->FinalizeWatched(wkeys, this); + shard->blocking_controller()->FinalizeWatched(keys, this); DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this)); // Resume processing of transaction queue diff --git a/src/server/transaction.h b/src/server/transaction.h index 5a0427352..2d36d40f5 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -132,7 +132,8 @@ class Transaction { using RunnableType = absl::FunctionRef; // Provides keys to block on for specific shard. - using WaitKeysProvider = std::function; + using WaitKeysProvider = + std::function(Transaction*, EngineShard* shard)>; // Modes in which a multi transaction can run. enum MultiMode { @@ -525,12 +526,13 @@ class Transaction { void RunCallback(EngineShard* shard); // Adds itself to watched queue in the shard. Must run in that shard thread. - OpStatus WatchInShard(const ShardArgs& keys, EngineShard* shard, KeyReadyChecker krc); + OpStatus WatchInShard(std::variant keys, EngineShard* shard, + KeyReadyChecker krc); // Expire blocking transaction, unlock keys and unregister it from the blocking controller void ExpireBlocking(WaitKeysProvider wcb); - void ExpireShardCb(const ShardArgs& wkeys, EngineShard* shard); + void ExpireShardCb(std::variant keys, EngineShard* shard); // Returns true if we need to follow up with PollExecution on this shard. bool CancelShardCb(EngineShard* shard); diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index a6de88e95..7d6f06e6d 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -5,6 +5,7 @@ #include "server/tx_base.h" #include "base/logging.h" +#include "facade/facade_types.h" #include "server/cluster/cluster_defs.h" #include "server/engine_shard_set.h" #include "server/journal/journal.h" @@ -29,8 +30,8 @@ void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args false); } -void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, uint32_t shard_cnt, - bool multi_commands) { +void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args, + uint32_t shard_cnt, bool multi_commands) { VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands, false); diff --git a/src/server/tx_base.h b/src/server/tx_base.h index 5410221f2..9373bdb1d 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -19,7 +19,7 @@ using DbIndex = uint16_t; using ShardId = uint16_t; using LockFp = uint64_t; // a key fingerprint used by the LockTable. -using ArgSlice = absl::Span; +using facade::ArgSlice; constexpr DbIndex kInvalidDbId = DbIndex(-1); constexpr ShardId kInvalidSid = ShardId(-1); @@ -164,6 +164,8 @@ class ShardArgs { } }; + using const_iterator = Iterator; + ShardArgs(facade::CmdArgList fa, absl::Span s) : slice_(ArgsIndexPair(fa, s)) { } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index d1a28546b..84823b79a 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1604,7 +1604,7 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, return count; } -OpResult OpRem(const OpArgs& op_args, string_view key, ArgSlice members) { +OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRange members) { auto& db_slice = op_args.shard->db_slice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) @@ -1649,21 +1649,21 @@ OpResult OpScore(const OpArgs& op_args, string_view key, string_view mem return *res; } -OpResult OpMScore(const OpArgs& op_args, string_view key, ArgSlice members) { +OpResult OpMScore(const OpArgs& op_args, string_view key, + facade::ArgRange members) { auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); - MScoreResponse scores(members.size()); + MScoreResponse scores(members.Size()); const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); sds& tmp_str = op_args.shard->tmp_str1; - for (size_t i = 0; i < members.size(); i++) { - const auto& m = members[i]; - - tmp_str = sdscpylen(tmp_str, m.data(), m.size()); - scores[i] = GetZsetScore(robj_wrapper, tmp_str); + size_t i = 0; + for (string_view member : members.Range()) { + tmp_str = sdscpylen(tmp_str, member.data(), member.size()); + scores[i++] = GetZsetScore(robj_wrapper, tmp_str); } return scores; @@ -2362,12 +2362,7 @@ void ZSetFamily::ZRemRangeByLex(CmdArgList args, ConnectionContext* cntx) { void ZSetFamily::ZRem(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); - - absl::InlinedVector members(args.size() - 1); - for (size_t i = 1; i < args.size(); ++i) { - members[i - 1] = ArgS(args, i); - } - + auto members = args.subspan(1); auto cb = [&](Transaction* t, EngineShard* shard) { return OpRem(t->GetOpArgs(shard), key, members); }; @@ -2636,13 +2631,8 @@ void ZSetFamily::ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cn OpResult ZSetFamily::ZGetMembers(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); - - absl::InlinedVector members(args.size() - 1); - for (size_t i = 1; i < args.size(); ++i) { - members[i - 1] = ArgS(args, i); - } - - auto cb = [&](Transaction* t, EngineShard* shard) { + auto members = args.subspan(1); + auto cb = [key, members](Transaction* t, EngineShard* shard) { return OpMScore(t->GetOpArgs(shard), key, members); };