diff --git a/README.md b/README.md index 57b391779..2fe3ceb61 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,7 @@ a distributed log format. API 2.0 - [ ] List Family - [X] BLPOP - - [ ] BRPOP + - [X] BRPOP - [ ] BRPOPLPUSH - [ ] BLMOVE - [ ] LINSERT @@ -192,10 +192,10 @@ API 2.0 - [ ] PUBSUB CHANNELS - [X] SUBSCRIBE - [X] UNSUBSCRIBE -- [ ] Server Family +- [X] Server Family - [ ] WATCH - [ ] UNWATCH - - [ ] DISCARD + - [X] DISCARD - [ ] CLIENT KILL/LIST/UNPAUSE/PAUSE/GETNAME/SETNAME/REPLY/TRACKINGINFO - [X] COMMAND - [ ] COMMAND COUNT/GETKEYS/INFO diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 2e94ab77f..db0ebde17 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -102,7 +102,7 @@ string ListPop(ListDir dir, quicklist* ql) { class BPopper { public: - explicit BPopper(); + explicit BPopper(ListDir dir); // Returns WRONG_TYPE, OK. // If OK is returned then use result() to fetch the value. @@ -119,6 +119,8 @@ class BPopper { private: OpStatus Pop(Transaction* t, EngineShard* shard); + ListDir dir_; + bool found_ = false; PrimeIterator find_it_; ShardId find_sid_ = std::numeric_limits::max(); @@ -127,7 +129,7 @@ class BPopper { string value_; }; -BPopper::BPopper() { +BPopper::BPopper(ListDir dir) : dir_(dir) { } OpStatus BPopper::Run(Transaction* t, unsigned msec) { @@ -186,7 +188,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { find_it_->first.GetString(&key_); quicklist* ql = GetQL(find_it_->second); - value_ = ListPop(ListDir::LEFT, ql); + value_ = ListPop(dir_, ql); if (quicklistCount(ql) == 0) { CHECK(shard->db_slice().Del(t->db_index(), find_it_)); @@ -343,6 +345,14 @@ void ListFamily::LSet(CmdArgList args, ConnectionContext* cntx) { } void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) { + BPopGeneric(ListDir::LEFT, std::move(args), cntx); +} + +void ListFamily::BRPop(CmdArgList args, ConnectionContext* cntx) { + BPopGeneric(ListDir::RIGHT, std::move(args), cntx); +} + +void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) { DCHECK_GE(args.size(), 3u); float timeout; @@ -356,7 +366,7 @@ void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "BLPop start " << timeout; Transaction* transaction = cntx->transaction; - BPopper popper; + BPopper popper(dir); OpStatus result = popper.Run(transaction, unsigned(timeout * 1000)); switch (result) { @@ -704,6 +714,7 @@ void ListFamily::Register(CommandRegistry* registry) { << CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX) << CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop) << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop) + << CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) << CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex) << CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange) diff --git a/src/server/list_family.h b/src/server/list_family.h index c3e48efaa..b8d47e812 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -27,6 +27,7 @@ class ListFamily { static void LPop(CmdArgList args, ConnectionContext* cntx); static void RPop(CmdArgList args, ConnectionContext* cntx); static void BLPop(CmdArgList args, ConnectionContext* cntx); + static void BRPop(CmdArgList args, ConnectionContext* cntx); static void LLen(CmdArgList args, ConnectionContext* cntx); static void LIndex(CmdArgList args, ConnectionContext* cntx); static void LTrim(CmdArgList args, ConnectionContext* cntx); @@ -38,6 +39,8 @@ class ListFamily { static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args, ConnectionContext* cntx); + static void BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx); + static OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, bool skip_notexist, absl::Span vals); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index fe21ce547..10c301658 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -360,7 +360,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) VLOG(2) << "Got: " << args; string_view cmd_str = ArgS(args, 0); - bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI"); + bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI" || cmd_str == "DISCARD"); const CommandId* cid = registry_.Find(cmd_str); ServerState& etl = *ServerState::tlocal(); @@ -813,6 +813,19 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, interpreter->ResetStack(); } +void Service::Discard(CmdArgList args, ConnectionContext* cntx) { + RedisReplyBuilder* rb = (*cntx).operator->(); + + if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) { + return rb->SendError("DISCARD without MULTI"); + } + + cntx->conn_state.exec_state = ConnectionState::EXEC_INACTIVE; + cntx->conn_state.exec_body.clear(); + + rb->SendOk(); +} + void Service::Exec(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = (*cntx).operator->(); @@ -940,6 +953,7 @@ void Service::RegisterCommands() { registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) << CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi) + << CI{"DISCARD", CO::NOSCRIPT | CO::FAST| CO::LOADING, 1, 0, 0, 0}.MFUNC(Discard) << CI{"EVAL", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(Eval).SetValidator(&EvalValidator) << CI{"EVALSHA", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(EvalSha).SetValidator(&EvalValidator) << CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec) diff --git a/src/server/main_service.h b/src/server/main_service.h index 4e68f4ba2..1d083bf1b 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -71,6 +71,7 @@ class Service : public facade::ServiceInterface { static void Quit(CmdArgList args, ConnectionContext* cntx); static void Multi(CmdArgList args, ConnectionContext* cntx); + void Discard(CmdArgList args, ConnectionContext* cntx); void Eval(CmdArgList args, ConnectionContext* cntx); void EvalSha(CmdArgList args, ConnectionContext* cntx); void Exec(CmdArgList args, ConnectionContext* cntx);