diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index bf260540d..c2348fb31 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -21,7 +21,7 @@ cd build-dbg && ninja dragonfly ```sh cd dragonfly # project root -# Make sure you have 'pre-commit' e 'clang-format' installed +# Make sure you have 'pre-commit' and 'clang-format' installed pip install pre-commit clang-format # IMPORTANT! Enable our pre-commit message hooks diff --git a/README.md b/README.md index 14836fc61..8dcf0e212 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,8 @@ In addition, it has Dragonfly specific arguments options: * `primary_port_http_enabled` - If true allows accessing http console on main TCP port, default: true * `admin_port` - If set, would enable admin access to console on the assigned port. This supports both HTTP and RESP protocols. default disabled * `admin_bind` - If set, the admin consol TCP connection would be bind the given address. This supports both HTTP and RESP protocols. default any + * `cluster_mode` - cluster mode supported. Currently supports only `emulated`. default: "" + * `cluster_announce_ip` - ip that cluster commands announce to the client. ### Example Start Script, with popular options: diff --git a/docs/api_status.md b/docs/api_status.md index 5de962ee3..7ebd17abe 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -210,6 +210,9 @@ with respect to Memcached and Redis APIs. - [ ] CLIENT REPLY - [X] REPLCONF - [ ] WAIT + - [X] CLUSTER SLOTS + - [X] CLUSTER NODES + - [X] CLUSTER INFO ### API 4 - [X] Generic Family @@ -258,6 +261,10 @@ with respect to Memcached and Redis APIs. - [X] Sorted Set Family - [X] ZUNION +### API 7 +- [ ] Server Family + - [ ] CLUSTER SHARDS + ## Notes Some commands were implemented as decorators along the way: diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 533fbaebb..6fbedbe06 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -458,6 +458,12 @@ void Connection::SendMsgVecAsync(const PubMessage& pub_msg) { } } +std::string Connection::LocalBindAddress() const { + LinuxSocketBase* lsb = static_cast(socket_.get()); + auto le = lsb->LocalEndpoint(); + return le.address().to_string(); +} + string Connection::GetClientInfo() const { LinuxSocketBase* lsb = static_cast(socket_.get()); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 837588108..2fcfd55e6 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -81,6 +81,7 @@ class Connection : public util::Connection { std::string GetClientInfo() const; std::string RemoteEndpointStr() const; std::string RemoteEndpointAddress() const; + std::string LocalBindAddress() const; uint32 GetClientId() const; void ShutdownSelf(); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index cdb26fe4f..97f4bb69e 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -573,7 +573,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) ConnectionContext* dfly_cntx = static_cast(cntx); bool under_script = dfly_cntx->conn_state.script_info.has_value(); - if (VLOG_IS_ON(2)) { + if (VLOG_IS_ON(2) && + cntx->owner()) { // owner may not exists in case of this being called from replica context const char* lua = under_script ? "LUA " : ""; LOG(INFO) << "Got (" << cntx->owner()->GetClientId() << "): " << lua << args; } diff --git a/src/server/replica.cc b/src/server/replica.cc index 3a5271b30..4f6574c88 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -101,7 +101,8 @@ vector> Partition(unsigned num_flows) { } // namespace -Replica::Replica(string host, uint16_t port, Service* se) : service_(*se) { +Replica::Replica(string host, uint16_t port, Service* se, std::string_view id) + : service_(*se), id_{id} { master_context_.host = std::move(host); master_context_.port = port; } @@ -287,7 +288,7 @@ error_code Replica::Greet() { base::IoBuf io_buf{128}; ReqSerializer serializer{sock_.get()}; uint32_t consumed = 0; - + VLOG(1) << "greeting message handling"; // Corresponds to server.repl_state == REPL_STATE_CONNECTING state in redis RETURN_ON_ERR(SendCommand("PING", &serializer)); // optional. RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed)); @@ -334,7 +335,7 @@ error_code Replica::Greet() { return make_error_code(errc::bad_message); } } else if (resp_args_.size() == 3) { // it's dragonfly master. - // Reponse is: + // Response is: if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING, RespExpr::INT64}) || resp_args_[0].GetBuf().size() != CONFIG_RUN_ID_SIZE) { LOG(ERROR) << "Unexpected response " << ToSV(io_buf.InputBuffer()); @@ -356,7 +357,14 @@ error_code Replica::Greet() { master_context_.master_repl_id = param0; master_context_.dfly_session_id = param1; num_df_flows_ = param2; - + // We need to send this because we may require to use this for cluster commands. + // this reason to send this here is that in other context we can get an error reply + // since we are budy with the replication + RETURN_ON_ERR(SendCommand(StrCat("REPLCONF CLIENT-ID ", id_), &serializer)); + RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed)); + if (!CheckRespIsSimpleReply("OK")) { + LOG(WARNING) << "master did not return OK on id message"; + } VLOG(1) << "Master id: " << param0 << ", sync id: " << param1 << ", num journals " << num_df_flows_; } else { @@ -367,7 +375,6 @@ error_code Replica::Greet() { io_buf.ConsumeInput(consumed); state_mask_ |= R_GREETED; - return error_code{}; } diff --git a/src/server/replica.h b/src/server/replica.h index 42867490e..6ec63d48a 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -98,7 +98,7 @@ class Replica { }; public: - Replica(std::string master_host, uint16_t port, Service* se); + Replica(std::string master_host, uint16_t port, Service* se, std::string_view id); ~Replica(); // Spawns a fiber that runs until link with master is broken or the replication is stopped. @@ -110,6 +110,10 @@ class Replica { void Pause(bool pause); + std::string_view MasterId() const { + return master_context_.master_repl_id; + } + private: /* Main standalone mode functions */ // Coordinate state transitions. Spawned by start. void MainReplicationFb(); @@ -253,6 +257,7 @@ class Replica { unsigned num_df_flows_ = 0; bool is_paused_ = false; + std::string id_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dcded575b..213c1b649 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -58,6 +58,10 @@ ABSL_FLAG(string, save_schedule, "", "glob spec for the UTC time to save a snapshot which matches HH:MM 24h time"); ABSL_FLAG(bool, df_snapshot_format, true, "if true, save in dragonfly-specific snapshotting format"); +ABSL_FLAG(string, cluster_mode, "", + "Cluster mode supported. Currently supports only `emulated`. " + "default: \"\""); +ABSL_FLAG(string, cluster_announce_ip, "", "ip that cluster commands announce to the client"); ABSL_DECLARE_FLAG(uint32_t, port); ABSL_DECLARE_FLAG(bool, cache_mode); @@ -286,6 +290,16 @@ void SlowLog(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType); } +void BuildClusterSlotNetworkInfo(ConnectionContext* cntx, std::string_view host, uint32_t port, + std::string_view id) { + constexpr unsigned int kNetworkInfoSize = 3; + + (*cntx)->StartArray(kNetworkInfoSize); + (*cntx)->SendBulkString(host); + (*cntx)->SendLong(port); + (*cntx)->SendBulkString(id); +} + } // namespace std::optional ParseSaveSchedule(string_view time) { @@ -349,6 +363,15 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { master_id_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE); DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size()); } + + string cluster_mode = GetFlag(FLAGS_cluster_mode); + if (cluster_mode.empty()) { + is_emulated_cluster_ = false; + } else if (cluster_mode == "emulated") { + is_emulated_cluster_ = true; + } else { + LOG(FATAL) << "invalid cluster_mode. Exiting..."; + } } ServerFamily::~ServerFamily() { @@ -1144,6 +1167,143 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType); } +void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { + // This command supports 2 sub options: + // 1. HELP + // 2. SLOTS: the slots are a mapping between sharding and hosts in the cluster. + // Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across multiple + // hosts), as a results all shards are map to the same host (i.e. range is between and kEndSlot) + // and number of cluster sharding is thus == 1 (kClustersShardingCount). + // For more details https://redis.io/commands/cluster-slots/ + constexpr unsigned int kEndSlot = 16383; // see redis code (cluster.c CLUSTER_SLOTS). + constexpr unsigned int kStartSlot = 0; + constexpr unsigned int kClustersShardingCount = 1; + constexpr unsigned int kNoReplicaInfoSize = 3; + constexpr unsigned int kWithReplicaInfoSize = 4; + + ToUpper(&args[1]); + string_view sub_cmd = ArgS(args, 1); + + if (!is_emulated_cluster_) { + return (*cntx)->SendError("CLUSTER commands demands the `cluster_mode` flag set to `emulated`"); + } + + if (sub_cmd == "HELP") { + string_view help_arr[] = { + "CLUSTER [ [value] [opt] ...]. Subcommands are:", + "SLOTS", + " Return information about slots range mappings. Each range is made of:", + " start, end, master and replicas IP addresses, ports and ids.", + "NODES", + " Return cluster configuration seen by node. Output format:", + " ...", + "INFO", + +" Return information about the cluster", + "HELP", + " Prints this help.", + }; + return (*cntx)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr)); + } + + if (sub_cmd == "SLOTS") { + /* Format: 1) 1) start slot + * 2) end slot + * 3) 1) master IP + * 2) master port + * 3) node ID + * 4) 1) replica IP (optional) + * 2) replica port + * 3) node ID + * ... note that in this case, only 1 slot + */ + ServerState& etl = *ServerState::tlocal(); + // we have 3 cases here + // 1. This is a stand alone, in this case we only sending local information + // 2. We are the master, and we have replica, in this case send us as master + // 3. We are replica to a master, sends the information about us as replica + (*cntx)->StartArray(kClustersShardingCount); + if (etl.is_master) { + std::string cluster_announce_ip = GetFlag(FLAGS_cluster_announce_ip); + std::string preferred_endpoint = + cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip; + auto vec = dfly_cmd_->GetReplicasRoleInfo(); + unsigned int info_len = vec.empty() ? kNoReplicaInfoSize : kWithReplicaInfoSize; + (*cntx)->StartArray(info_len); + (*cntx)->SendLong(kStartSlot); // start sharding range + (*cntx)->SendLong(kEndSlot); // end sharding range + BuildClusterSlotNetworkInfo(cntx, preferred_endpoint, GetFlag(FLAGS_port), master_id()); + if (!vec.empty()) { // info about the replica + const auto& info = vec[0]; + BuildClusterSlotNetworkInfo(cntx, info.address, info.listening_port, etl.remote_client_id_); + } + } else { + unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive! + auto replica_ptr = replica_; + CHECK(replica_ptr); + Replica::Info info = replica_ptr->GetInfo(); + (*cntx)->StartArray(kWithReplicaInfoSize); + (*cntx)->SendLong(kStartSlot); // start sharding range + (*cntx)->SendLong(kEndSlot); // end sharding range + BuildClusterSlotNetworkInfo(cntx, info.host, info.port, replica_ptr->MasterId()); + BuildClusterSlotNetworkInfo(cntx, cntx->owner()->LocalBindAddress(), GetFlag(FLAGS_port), + master_id()); + } + + return; + } else if (sub_cmd == "NODES") { + // Support for NODES commands can help in case we are working in cluster mode + // In this case, we can save information about the cluster + // In case this is the master, it can save the information about the replica from this command + std::string msg = BuildClusterNodeReply(cntx); + (*cntx)->SendBulkString(msg); + return; + } else if (sub_cmd == "INFO") { + std::string msg; + auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) { + absl::StrAppend(&msg, a1, ":", a2, "\r\n"); + }; + // info command just return some stats about this instance + int known_nodes = 1; + long epoch = 1; + ServerState& etl = *ServerState::tlocal(); + if (etl.is_master) { + auto vec = dfly_cmd_->GetReplicasRoleInfo(); + if (!vec.empty()) { + known_nodes = 2; + } + } else { + if (replica_) { + known_nodes = 2; + unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive! + auto replica_ptr = replica_; + CHECK(replica_ptr); + epoch = replica_ptr->GetInfo().master_last_io_sec; + } + } + int cluster_size = known_nodes - 1; + append("cluster_state", "ok"); + append("cluster_slots_assigned", kEndSlot); + append("cluster_slots_ok", kEndSlot); + append("cluster_slots_pfail", 0); + append("cluster_slots_fail", 0); + append("cluster_known_nodes", known_nodes); + append("cluster_size", cluster_size); + append("cluster_current_epoch", epoch); + append("cluster_my_epoch", 1); + append("cluster_stats_messages_ping_sent", 1); + append("cluster_stats_messages_pong_sent", 1); + append("cluster_stats_messages_sent", 1); + append("cluster_stats_messages_ping_received", 1); + append("cluster_stats_messages_pong_received", 1); + append("cluster_stats_messages_meet_received", 0); + append("cluster_stats_messages_received", 1); + (*cntx)->SendBulkString(msg); + return; + } + + return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLUSTER"), kSyntaxErrType); +} + void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[1]); string_view sub_cmd = ArgS(args, 1); @@ -1509,6 +1669,11 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("used_cpu_user_main_thread", StrCat(tu.ru_utime.tv_sec, ".", tu.ru_utime.tv_usec)); } + if (should_enter("CLUSTER")) { + ADD_HEADER("# Cluster"); + append("cluster_enabled", is_emulated_cluster_); + } + (*cntx)->SendBulkString(info); } @@ -1542,6 +1707,42 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave"); } +std::string ServerFamily::BuildClusterNodeReply(ConnectionContext* cntx) const { + ServerState& etl = *ServerState::tlocal(); + auto epoch_master_time = std::time(nullptr) * 1000; + if (etl.is_master) { + std::string cluster_announce_ip = GetFlag(FLAGS_cluster_announce_ip); + std::string preferred_endpoint = + cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip; + auto vec = dfly_cmd_->GetReplicasRoleInfo(); + auto my_port = GetFlag(FLAGS_port); + const char* connect_state = vec.empty() ? "disconnected" : "connected"; + std::string msg = absl::StrCat(master_id(), " ", preferred_endpoint, ":", my_port, "@", my_port, + " myself,master - 0 ", epoch_master_time, " 1 ", connect_state, + " 0-16383\r\n"); + if (!vec.empty()) { // info about the replica + const auto& info = vec[0]; + absl::StrAppend(&msg, etl.remote_client_id_, " ", info.address, ":", info.listening_port, "@", + info.listening_port, " slave 0 ", master_id(), " 1 ", connect_state, "\r\n"); + } + return msg; + } else { + unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive! + auto replica_ptr = replica_; + Replica::Info info = replica_ptr->GetInfo(); + auto my_ip = cntx->owner()->LocalBindAddress(); + auto my_port = GetFlag(FLAGS_port); + const char* connect_state = + replica_ptr->GetInfo().master_link_established ? "connected" : "disconnected"; + std::string msg = + absl::StrCat(master_id(), " ", my_ip, ":", my_port, "@", my_port, " myself,slave ", + master_id(), " 0 ", epoch_master_time, " 1 ", connect_state, "\r\n"); + absl::StrAppend(&msg, replica_ptr->MasterId(), " ", info.host, ":", info.port, "@", info.port, + " master - 0 ", epoch_master_time, " 1 ", connect_state, " 0-16383\r\n"); + return msg; + } +} + void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { std::string_view host = ArgS(args, 1); std::string_view port_s = ArgS(args, 2); @@ -1571,7 +1772,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { return; } - auto new_replica = make_shared(string(host), port, &service_); + auto new_replica = make_shared(string(host), port, &service_, master_id()); unique_lock lk(replicaof_mu_); if (replica_) { @@ -1615,7 +1816,6 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { if (args.size() % 2 == 0) goto err; - for (unsigned i = 1; i < args.size(); i += 2) { DCHECK_LT(i + 1, args.size()); ToUpper(&args[i]); @@ -1649,6 +1849,11 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { return; } cntx->conn_state.replicaiton_info.repl_listening_port = replica_listening_port; + } else if (cmd == "CLIENT-ID" && args.size() == 3) { + std::string client_id{arg}; + auto& pool = service_.proactor_pool(); + pool.AwaitFiberOnAll( + [&](util::ProactorBase* pb) { ServerState::tlocal()->remote_client_id_ = arg; }); } else { VLOG(1) << cmd << " " << arg; } @@ -1759,6 +1964,7 @@ void ServerFamily::Register(CommandRegistry* registry) { *registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth) << CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) << CI{"CLIENT", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Client) + << CI{"CLUSTER", CO::READONLY, 2, 1, 1, 1}.HFUNC(Cluster) << CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config) << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize) << CI{"DEBUG", CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug) diff --git a/src/server/server_family.h b/src/server/server_family.h index 895c29204..d09519e07 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -130,8 +130,11 @@ class ServerFamily { return shard_set->size(); } + std::string BuildClusterNodeReply(ConnectionContext* cntx) const; + void Auth(CmdArgList args, ConnectionContext* cntx); void Client(CmdArgList args, ConnectionContext* cntx); + void Cluster(CmdArgList args, ConnectionContext* cntx); void Config(CmdArgList args, ConnectionContext* cntx); void DbSize(CmdArgList args, ConnectionContext* cntx); void Debug(CmdArgList args, ConnectionContext* cntx); @@ -178,6 +181,7 @@ class ServerFamily { std::string master_id_; time_t start_time_ = 0; // in seconds, epoch time. + bool is_emulated_cluster_ = false; std::shared_ptr last_save_info_; // protected by save_mu_; std::atomic_bool is_saving_{false}; diff --git a/src/server/server_state.h b/src/server/server_state.h index bb482540e..f596d114b 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -106,6 +106,7 @@ class ServerState { // public struct - to allow initialization. void Shutdown(); bool is_master = true; + std::string remote_client_id_; // for cluster support facade::ConnectionStats connection_stats; diff --git a/tests/README.md b/tests/README.md index 288478df7..e3db30f6e 100644 --- a/tests/README.md +++ b/tests/README.md @@ -111,19 +111,19 @@ Integration tests for ioredis client. It contains a very extensive test coverage for Redis. Currently not all features are supported by Dragonfly. As such please use the scripts for running the test successfully - **[run_ioredis_on_docker.sh](./integration/run_ioredis_on_docker.sh)**: to run the supported tests on a docker image - Please note that you can run this script in tow forms: + Please note that you can run this script in two forms: If the image is already build: ``` - ./integration/run_ioredis_on_docker + ./integration/run_ioredis_on_docker.sh ``` A more safe way is to build the image (or ensure that it is up to date), and then execute the tests: ``` - ./integration/run_ioredis_on_docker --build + ./integration/run_ioredis_on_docker.sh --build ``` The the "--build" first build the image and then execute the tests. - Please do not try to run out of docker image as this brigs the correct version and patch some tests. + Please do not try to run out of docker image as this brings the correct version and patch some tests. Please note that the script only run tests that are currently supported You can just build the image with diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py new file mode 100644 index 000000000..dca380b88 --- /dev/null +++ b/tests/dragonfly/cluster_test.py @@ -0,0 +1,149 @@ +import pytest +import redis +from . import dfly_args +import aioredis +import asyncio + +BASE_PORT = 30001 + + +@dfly_args({}) +class TestNotEmulated: + def test_cluster_commands_fails_when_not_emulate(self, client: redis.Redis): + with pytest.raises(redis.ResponseError) as respErr: + client.execute_command("CLUSTER HELP") + assert "cluster_mode" in str(respErr.value) + + with pytest.raises(redis.ResponseError) as respErr: + client.execute_command("CLUSTER SLOTS") + assert "emulated" in str(respErr.value) + + +@dfly_args({"cluster_mode": "emulated"}) +class TestEmulated: + def test_cluster_slots_command(self, cluster_client: redis.RedisCluster): + expected = {(0, 16383): {'primary': ( + '127.0.0.1', 6379), 'replicas': []}} + res = cluster_client.execute_command("CLUSTER SLOTS") + assert expected == res + + def test_cluster_help_command(self, cluster_client: redis.RedisCluster): + # `target_nodes` is necessary because CLUSTER HELP is not mapped on redis-py + res = cluster_client.execute_command( + "CLUSTER HELP", target_nodes=redis.RedisCluster.RANDOM) + assert "HELP" in res + assert "SLOTS" in res + + def test_cluster_pipeline(self, cluster_client: redis.RedisCluster): + pipeline = cluster_client.pipeline() + pipeline.set("foo", "bar") + pipeline.get("foo") + val = pipeline.execute() + assert val == [True, "bar"] + + +@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"}) +class TestEmulatedWithAnnounceIp: + def test_cluster_slots_command(self, cluster_client: redis.RedisCluster): + expected = {(0, 16383): {'primary': ( + '127.0.0.2', 6379), 'replicas': []}} + res = cluster_client.execute_command("CLUSTER SLOTS") + assert expected == res + + +def verify_slots_result(ip: str, port: int, answer: list, rep_ip: str = None, rep_port: int = None) -> bool: + def is_local_host(ip: str) -> bool: + return ip == '127.0.0.1' or ip == 'localhost' + + assert answer[0] == 0 # start shard + assert answer[1] == 16383 # last shard + if rep_ip is not None: + assert len(answer) == 4 # the network info + rep_info = answer[3] + assert len(rep_info) == 3 + ip_addr = str(rep_info[0], 'utf-8') + assert ip_addr == rep_ip or ( + is_local_host(ip_addr) and is_local_host(ip)) + assert rep_info[1] == rep_port + else: + assert len(answer) == 3 + info = answer[2] + assert len(info) == 3 + ip_addr = str(info[0], 'utf-8') + assert ip_addr == ip or (is_local_host(ip_addr) and is_local_host(ip)) + assert info[1] == port + return True + + +@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"}) +@pytest.mark.asyncio +async def test_cluster_slots_in_replicas(df_local_factory): + master = df_local_factory.create(port=BASE_PORT) + replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True) + + df_local_factory.start_all([master, replica]) + + c_master = aioredis.Redis(port=master.port) + c_replica = aioredis.Redis(port=replica.port) + + res = await c_replica.execute_command("CLUSTER SLOTS") + assert len(res) == 1 + assert verify_slots_result( + ip="127.0.0.1", port=replica.port, answer=res[0]) + res = await c_master.execute_command("CLUSTER SLOTS") + assert verify_slots_result( + ip="127.0.0.1", port=master.port, answer=res[0]) + + # Connect replica to master + rc = await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + assert str(rc, 'utf-8') == "OK" + await asyncio.sleep(0.5) + res = await c_replica.execute_command("CLUSTER SLOTS") + assert verify_slots_result( + ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port) + res = await c_master.execute_command("CLUSTER SLOTS") + assert verify_slots_result( + ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port) + + +@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"}) +@pytest.mark.asyncio +async def test_cluster_info(async_pool): + conn = aioredis.Redis(connection_pool=async_pool) + + res = await conn.execute_command("CLUSTER INFO") + assert len(res) == 16 + assert res == {'cluster_current_epoch': '1', + 'cluster_known_nodes': '1', + 'cluster_my_epoch': '1', + 'cluster_size': '0', + 'cluster_slots_assigned': '16383', + 'cluster_slots_fail': '0', + 'cluster_slots_ok': '16383', + 'cluster_slots_pfail': '0', + 'cluster_state': 'ok', + 'cluster_stats_messages_meet_received': '0', + 'cluster_stats_messages_ping_received': '1', + 'cluster_stats_messages_ping_sent': '1', + 'cluster_stats_messages_pong_received': '1', + 'cluster_stats_messages_pong_sent': '1', + 'cluster_stats_messages_received': '1', + 'cluster_stats_messages_sent': '1' + } + + +@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"}) +@pytest.mark.asyncio +async def test_cluster_nodes(async_pool): + conn = aioredis.Redis(connection_pool=async_pool) + + res = await conn.execute_command("CLUSTER NODES") + assert len(res) == 1 + info = res['127.0.0.2:6379@6379'] + assert res is not None + assert info['connected'] == False + assert info['epoch'] == '1' + assert info['flags'] == 'myself,master' + assert info['last_ping_sent'] == '0' + assert info['slots'] == [['0', '16383']] + assert info['master_id'] == "-" diff --git a/tests/dragonfly/conftest.py b/tests/dragonfly/conftest.py index 636be36d3..e1777ab00 100644 --- a/tests/dragonfly/conftest.py +++ b/tests/dragonfly/conftest.py @@ -119,6 +119,19 @@ def client(sync_pool): return client +@pytest.fixture(scope="function") +def cluster_client(df_server): + """ + Return a cluster client to the default instance with all entries flushed. + """ + client = redis.RedisCluster(decode_responses=True, host="localhost", + port=df_server.port) + client.flushall() + + yield client + client.disconnect_connection_pools() + + @pytest_asyncio.fixture(scope="function") async def async_pool(df_server: DflyInstance): pool = aioredis.ConnectionPool(host="localhost", port=df_server.port, diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 72f4bdc75..4fbf520a4 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -4,6 +4,7 @@ import asyncio import aioredis import async_timeout + async def run_monitor_eval(monitor, expected): async with monitor as mon: count = 0 @@ -28,16 +29,20 @@ async def run_monitor_eval(monitor, expected): Test issue https://github.com/dragonflydb/dragonfly/issues/756 Monitor command do not return when we have lua script issue ''' + + @pytest.mark.asyncio async def test_monitor_command_lua(async_pool): - expected = ["EVAL return redis", "GET bar", "EVAL return redis", "SET foo2"] + expected = ["EVAL return redis", "GET bar", + "EVAL return redis", "SET foo2"] conn = aioredis.Redis(connection_pool=async_pool) monitor = conn.monitor() cmd1 = aioredis.Redis(connection_pool=async_pool) - future = asyncio.create_task(run_monitor_eval(monitor=monitor, expected=expected)) - await asyncio.sleep(1) + future = asyncio.create_task(run_monitor_eval( + monitor=monitor, expected=expected)) + await asyncio.sleep(0.1) try: res = await cmd1.eval(r'return redis.call("GET", "bar")', 0) assert False # this will return an error @@ -60,6 +65,8 @@ Open connection which is used for monitoring Then send on other connection commands to dragonfly instance Make sure that we are getting the commands in the monitor context ''' + + @pytest.mark.asyncio async def test_monitor_command(async_pool): def generate(max): @@ -87,7 +94,8 @@ async def process_cmd(monitor, key, value): if "select" not in response["command"].lower(): success = verify_response(response, key, value) if not success: - print(f"failed to verify message {response} for {key}/{value}") + print( + f"failed to verify message {response} for {key}/{value}") return False, f"failed on the verification of the message {response} at {key}: {value}" else: return True, None diff --git a/tests/integration/.run_ioredis_valid_test.sh b/tests/integration/.run_ioredis_valid_test.sh index 6c4bc77a1..3d8f490ad 100755 --- a/tests/integration/.run_ioredis_valid_test.sh +++ b/tests/integration/.run_ioredis_valid_test.sh @@ -28,5 +28,5 @@ TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \ "test/helpers/*.ts" "test/unit/**/*.ts" "test/functional/**/*.ts" \ --g "should reload scripts on redis restart|should reconnect if reconnectOnError|should be supported in transaction blocks|rejects when monitor is disabled|should resend unfulfilled commands to the correct|should set the name before any subscribe|should name the connection if options|scanStream|does not fallback to EVAL|should try to use EVALSHA and fallback to EVAL|should use evalsha when script|should affect the old way|should support Map|should support object|should batch all commands before ready event|should support key prefixing for sort|should be sent on the connect event" \ +-g "should reload scripts on redis restart|should reconnect if reconnectOnError|should be supported in transaction blocks|rejects when monitor is disabled|should resend unfulfilled commands to the correct|should set the name before any subscribe|should name the connection if options|scanStream|does not fallback to EVAL|should try to use EVALSHA and fallback to EVAL|should use evalsha when script|should affect the old way|should support Map|should support object|should batch all commands before ready event|should support key prefixing for sort|should be sent on the connect event|spub|ssub|should support parallel script execution|works for moved" \ --invert diff --git a/tests/integration/run_ioredis_on_docker.sh b/tests/integration/run_ioredis_on_docker.sh index fcaa6276b..dd237d447 100755 --- a/tests/integration/run_ioredis_on_docker.sh +++ b/tests/integration/run_ioredis_on_docker.sh @@ -8,7 +8,7 @@ if [ "$1" = "--build" ]; then fi # run the tests -echo "runniing ioredis tess" +echo "running ioredis tests" docker run --rm -i --network=host ioredis-test ./run_tests.sh if [ $? -ne 0 ];then echo "some tests failed - please look at the output from this run"