feat: support cluster mode emulation (#492)

Signed-off-by: Leonardo Mello <lsvmello@gmail.com>
This commit is contained in:
Leonardo Mello 2023-03-01 03:43:40 -03:00 committed by GitHub
parent 60d22eba47
commit abe3b3cb91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 430 additions and 20 deletions

View File

@ -21,7 +21,7 @@ cd build-dbg && ninja dragonfly
```sh
cd dragonfly # project root
# Make sure you have 'pre-commit' e 'clang-format' installed
# Make sure you have 'pre-commit' and 'clang-format' installed
pip install pre-commit clang-format
# IMPORTANT! Enable our pre-commit message hooks

View File

@ -111,6 +111,8 @@ In addition, it has Dragonfly specific arguments options:
* `primary_port_http_enabled` - If true allows accessing http console on main TCP port, default: true
* `admin_port` - If set, would enable admin access to console on the assigned port. This supports both HTTP and RESP protocols. default disabled
* `admin_bind` - If set, the admin consol TCP connection would be bind the given address. This supports both HTTP and RESP protocols. default any
* `cluster_mode` - cluster mode supported. Currently supports only `emulated`. default: ""
* `cluster_announce_ip` - ip that cluster commands announce to the client.
### Example Start Script, with popular options:

View File

@ -210,6 +210,9 @@ with respect to Memcached and Redis APIs.
- [ ] CLIENT REPLY
- [X] REPLCONF
- [ ] WAIT
- [X] CLUSTER SLOTS
- [X] CLUSTER NODES
- [X] CLUSTER INFO
### API 4
- [X] Generic Family
@ -258,6 +261,10 @@ with respect to Memcached and Redis APIs.
- [X] Sorted Set Family
- [X] ZUNION
### API 7
- [ ] Server Family
- [ ] CLUSTER SHARDS
## Notes
Some commands were implemented as decorators along the way:

View File

@ -458,6 +458,12 @@ void Connection::SendMsgVecAsync(const PubMessage& pub_msg) {
}
}
std::string Connection::LocalBindAddress() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
auto le = lsb->LocalEndpoint();
return le.address().to_string();
}
string Connection::GetClientInfo() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());

View File

@ -81,6 +81,7 @@ class Connection : public util::Connection {
std::string GetClientInfo() const;
std::string RemoteEndpointStr() const;
std::string RemoteEndpointAddress() const;
std::string LocalBindAddress() const;
uint32 GetClientId() const;
void ShutdownSelf();

View File

@ -573,7 +573,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
bool under_script = dfly_cntx->conn_state.script_info.has_value();
if (VLOG_IS_ON(2)) {
if (VLOG_IS_ON(2) &&
cntx->owner()) { // owner may not exists in case of this being called from replica context
const char* lua = under_script ? "LUA " : "";
LOG(INFO) << "Got (" << cntx->owner()->GetClientId() << "): " << lua << args;
}

View File

@ -101,7 +101,8 @@ vector<vector<unsigned>> Partition(unsigned num_flows) {
} // namespace
Replica::Replica(string host, uint16_t port, Service* se) : service_(*se) {
Replica::Replica(string host, uint16_t port, Service* se, std::string_view id)
: service_(*se), id_{id} {
master_context_.host = std::move(host);
master_context_.port = port;
}
@ -287,7 +288,7 @@ error_code Replica::Greet() {
base::IoBuf io_buf{128};
ReqSerializer serializer{sock_.get()};
uint32_t consumed = 0;
VLOG(1) << "greeting message handling";
// Corresponds to server.repl_state == REPL_STATE_CONNECTING state in redis
RETURN_ON_ERR(SendCommand("PING", &serializer)); // optional.
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
@ -334,7 +335,7 @@ error_code Replica::Greet() {
return make_error_code(errc::bad_message);
}
} else if (resp_args_.size() == 3) { // it's dragonfly master.
// Reponse is: <master_repl_id, syncid, num_shards>
// Response is: <master_repl_id, syncid, num_shards>
if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING, RespExpr::INT64}) ||
resp_args_[0].GetBuf().size() != CONFIG_RUN_ID_SIZE) {
LOG(ERROR) << "Unexpected response " << ToSV(io_buf.InputBuffer());
@ -356,7 +357,14 @@ error_code Replica::Greet() {
master_context_.master_repl_id = param0;
master_context_.dfly_session_id = param1;
num_df_flows_ = param2;
// We need to send this because we may require to use this for cluster commands.
// this reason to send this here is that in other context we can get an error reply
// since we are budy with the replication
RETURN_ON_ERR(SendCommand(StrCat("REPLCONF CLIENT-ID ", id_), &serializer));
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
if (!CheckRespIsSimpleReply("OK")) {
LOG(WARNING) << "master did not return OK on id message";
}
VLOG(1) << "Master id: " << param0 << ", sync id: " << param1 << ", num journals "
<< num_df_flows_;
} else {
@ -367,7 +375,6 @@ error_code Replica::Greet() {
io_buf.ConsumeInput(consumed);
state_mask_ |= R_GREETED;
return error_code{};
}

View File

@ -98,7 +98,7 @@ class Replica {
};
public:
Replica(std::string master_host, uint16_t port, Service* se);
Replica(std::string master_host, uint16_t port, Service* se, std::string_view id);
~Replica();
// Spawns a fiber that runs until link with master is broken or the replication is stopped.
@ -110,6 +110,10 @@ class Replica {
void Pause(bool pause);
std::string_view MasterId() const {
return master_context_.master_repl_id;
}
private: /* Main standalone mode functions */
// Coordinate state transitions. Spawned by start.
void MainReplicationFb();
@ -253,6 +257,7 @@ class Replica {
unsigned num_df_flows_ = 0;
bool is_paused_ = false;
std::string id_;
};
} // namespace dfly

View File

@ -58,6 +58,10 @@ ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(bool, df_snapshot_format, true,
"if true, save in dragonfly-specific snapshotting format");
ABSL_FLAG(string, cluster_mode, "",
"Cluster mode supported. Currently supports only `emulated`. "
"default: \"\"");
ABSL_FLAG(string, cluster_announce_ip, "", "ip that cluster commands announce to the client");
ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
@ -286,6 +290,16 @@ void SlowLog(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}
void BuildClusterSlotNetworkInfo(ConnectionContext* cntx, std::string_view host, uint32_t port,
std::string_view id) {
constexpr unsigned int kNetworkInfoSize = 3;
(*cntx)->StartArray(kNetworkInfoSize);
(*cntx)->SendBulkString(host);
(*cntx)->SendLong(port);
(*cntx)->SendBulkString(id);
}
} // namespace
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
@ -349,6 +363,15 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
master_id_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE);
DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size());
}
string cluster_mode = GetFlag(FLAGS_cluster_mode);
if (cluster_mode.empty()) {
is_emulated_cluster_ = false;
} else if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else {
LOG(FATAL) << "invalid cluster_mode. Exiting...";
}
}
ServerFamily::~ServerFamily() {
@ -1144,6 +1167,143 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType);
}
void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
// This command supports 2 sub options:
// 1. HELP
// 2. SLOTS: the slots are a mapping between sharding and hosts in the cluster.
// Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across multiple
// hosts), as a results all shards are map to the same host (i.e. range is between and kEndSlot)
// and number of cluster sharding is thus == 1 (kClustersShardingCount).
// For more details https://redis.io/commands/cluster-slots/
constexpr unsigned int kEndSlot = 16383; // see redis code (cluster.c CLUSTER_SLOTS).
constexpr unsigned int kStartSlot = 0;
constexpr unsigned int kClustersShardingCount = 1;
constexpr unsigned int kNoReplicaInfoSize = 3;
constexpr unsigned int kWithReplicaInfoSize = 4;
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (!is_emulated_cluster_) {
return (*cntx)->SendError("CLUSTER commands demands the `cluster_mode` flag set to `emulated`");
}
if (sub_cmd == "HELP") {
string_view help_arr[] = {
"CLUSTER <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids.",
"NODES",
" Return cluster configuration seen by node. Output format:",
" <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
"INFO",
+" Return information about the cluster",
"HELP",
" Prints this help.",
};
return (*cntx)->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr));
}
if (sub_cmd == "SLOTS") {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP (optional)
* 2) replica port
* 3) node ID
* ... note that in this case, only 1 slot
*/
ServerState& etl = *ServerState::tlocal();
// we have 3 cases here
// 1. This is a stand alone, in this case we only sending local information
// 2. We are the master, and we have replica, in this case send us as master
// 3. We are replica to a master, sends the information about us as replica
(*cntx)->StartArray(kClustersShardingCount);
if (etl.is_master) {
std::string cluster_announce_ip = GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
auto vec = dfly_cmd_->GetReplicasRoleInfo();
unsigned int info_len = vec.empty() ? kNoReplicaInfoSize : kWithReplicaInfoSize;
(*cntx)->StartArray(info_len);
(*cntx)->SendLong(kStartSlot); // start sharding range
(*cntx)->SendLong(kEndSlot); // end sharding range
BuildClusterSlotNetworkInfo(cntx, preferred_endpoint, GetFlag(FLAGS_port), master_id());
if (!vec.empty()) { // info about the replica
const auto& info = vec[0];
BuildClusterSlotNetworkInfo(cntx, info.address, info.listening_port, etl.remote_client_id_);
}
} else {
unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive!
auto replica_ptr = replica_;
CHECK(replica_ptr);
Replica::Info info = replica_ptr->GetInfo();
(*cntx)->StartArray(kWithReplicaInfoSize);
(*cntx)->SendLong(kStartSlot); // start sharding range
(*cntx)->SendLong(kEndSlot); // end sharding range
BuildClusterSlotNetworkInfo(cntx, info.host, info.port, replica_ptr->MasterId());
BuildClusterSlotNetworkInfo(cntx, cntx->owner()->LocalBindAddress(), GetFlag(FLAGS_port),
master_id());
}
return;
} else if (sub_cmd == "NODES") {
// Support for NODES commands can help in case we are working in cluster mode
// In this case, we can save information about the cluster
// In case this is the master, it can save the information about the replica from this command
std::string msg = BuildClusterNodeReply(cntx);
(*cntx)->SendBulkString(msg);
return;
} else if (sub_cmd == "INFO") {
std::string msg;
auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) {
absl::StrAppend(&msg, a1, ":", a2, "\r\n");
};
// info command just return some stats about this instance
int known_nodes = 1;
long epoch = 1;
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
auto vec = dfly_cmd_->GetReplicasRoleInfo();
if (!vec.empty()) {
known_nodes = 2;
}
} else {
if (replica_) {
known_nodes = 2;
unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive!
auto replica_ptr = replica_;
CHECK(replica_ptr);
epoch = replica_ptr->GetInfo().master_last_io_sec;
}
}
int cluster_size = known_nodes - 1;
append("cluster_state", "ok");
append("cluster_slots_assigned", kEndSlot);
append("cluster_slots_ok", kEndSlot);
append("cluster_slots_pfail", 0);
append("cluster_slots_fail", 0);
append("cluster_known_nodes", known_nodes);
append("cluster_size", cluster_size);
append("cluster_current_epoch", epoch);
append("cluster_my_epoch", 1);
append("cluster_stats_messages_ping_sent", 1);
append("cluster_stats_messages_pong_sent", 1);
append("cluster_stats_messages_sent", 1);
append("cluster_stats_messages_ping_received", 1);
append("cluster_stats_messages_pong_received", 1);
append("cluster_stats_messages_meet_received", 0);
append("cluster_stats_messages_received", 1);
(*cntx)->SendBulkString(msg);
return;
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLUSTER"), kSyntaxErrType);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
@ -1509,6 +1669,11 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("used_cpu_user_main_thread", StrCat(tu.ru_utime.tv_sec, ".", tu.ru_utime.tv_usec));
}
if (should_enter("CLUSTER")) {
ADD_HEADER("# Cluster");
append("cluster_enabled", is_emulated_cluster_);
}
(*cntx)->SendBulkString(info);
}
@ -1542,6 +1707,42 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}
std::string ServerFamily::BuildClusterNodeReply(ConnectionContext* cntx) const {
ServerState& etl = *ServerState::tlocal();
auto epoch_master_time = std::time(nullptr) * 1000;
if (etl.is_master) {
std::string cluster_announce_ip = GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
auto vec = dfly_cmd_->GetReplicasRoleInfo();
auto my_port = GetFlag(FLAGS_port);
const char* connect_state = vec.empty() ? "disconnected" : "connected";
std::string msg = absl::StrCat(master_id(), " ", preferred_endpoint, ":", my_port, "@", my_port,
" myself,master - 0 ", epoch_master_time, " 1 ", connect_state,
" 0-16383\r\n");
if (!vec.empty()) { // info about the replica
const auto& info = vec[0];
absl::StrAppend(&msg, etl.remote_client_id_, " ", info.address, ":", info.listening_port, "@",
info.listening_port, " slave 0 ", master_id(), " 1 ", connect_state, "\r\n");
}
return msg;
} else {
unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive!
auto replica_ptr = replica_;
Replica::Info info = replica_ptr->GetInfo();
auto my_ip = cntx->owner()->LocalBindAddress();
auto my_port = GetFlag(FLAGS_port);
const char* connect_state =
replica_ptr->GetInfo().master_link_established ? "connected" : "disconnected";
std::string msg =
absl::StrCat(master_id(), " ", my_ip, ":", my_port, "@", my_port, " myself,slave ",
master_id(), " 0 ", epoch_master_time, " 1 ", connect_state, "\r\n");
absl::StrAppend(&msg, replica_ptr->MasterId(), " ", info.host, ":", info.port, "@", info.port,
" master - 0 ", epoch_master_time, " 1 ", connect_state, " 0-16383\r\n");
return msg;
}
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 1);
std::string_view port_s = ArgS(args, 2);
@ -1571,7 +1772,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}
auto new_replica = make_shared<Replica>(string(host), port, &service_);
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_id());
unique_lock lk(replicaof_mu_);
if (replica_) {
@ -1615,7 +1816,6 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
if (args.size() % 2 == 0)
goto err;
for (unsigned i = 1; i < args.size(); i += 2) {
DCHECK_LT(i + 1, args.size());
ToUpper(&args[i]);
@ -1649,6 +1849,11 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
return;
}
cntx->conn_state.replicaiton_info.repl_listening_port = replica_listening_port;
} else if (cmd == "CLIENT-ID" && args.size() == 3) {
std::string client_id{arg};
auto& pool = service_.proactor_pool();
pool.AwaitFiberOnAll(
[&](util::ProactorBase* pb) { ServerState::tlocal()->remote_client_id_ = arg; });
} else {
VLOG(1) << cmd << " " << arg;
}
@ -1759,6 +1964,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
*registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth)
<< CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"CLIENT", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Client)
<< CI{"CLUSTER", CO::READONLY, 2, 1, 1, 1}.HFUNC(Cluster)
<< CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)

View File

@ -130,8 +130,11 @@ class ServerFamily {
return shard_set->size();
}
std::string BuildClusterNodeReply(ConnectionContext* cntx) const;
void Auth(CmdArgList args, ConnectionContext* cntx);
void Client(CmdArgList args, ConnectionContext* cntx);
void Cluster(CmdArgList args, ConnectionContext* cntx);
void Config(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void Debug(CmdArgList args, ConnectionContext* cntx);
@ -178,6 +181,7 @@ class ServerFamily {
std::string master_id_;
time_t start_time_ = 0; // in seconds, epoch time.
bool is_emulated_cluster_ = false;
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_;
std::atomic_bool is_saving_{false};

View File

@ -106,6 +106,7 @@ class ServerState { // public struct - to allow initialization.
void Shutdown();
bool is_master = true;
std::string remote_client_id_; // for cluster support
facade::ConnectionStats connection_stats;

View File

@ -111,19 +111,19 @@ Integration tests for ioredis client.
It contains a very extensive test coverage for Redis. Currently not all features are supported by Dragonfly.
As such please use the scripts for running the test successfully -
**[run_ioredis_on_docker.sh](./integration/run_ioredis_on_docker.sh)**: to run the supported tests on a docker image
Please note that you can run this script in tow forms:
Please note that you can run this script in two forms:
If the image is already build:
```
./integration/run_ioredis_on_docker
./integration/run_ioredis_on_docker.sh
```
A more safe way is to build the image (or ensure that it is up to date), and then execute the tests:
```
./integration/run_ioredis_on_docker --build
./integration/run_ioredis_on_docker.sh --build
```
The the "--build" first build the image and then execute the tests.
Please do not try to run out of docker image as this brigs the correct version and patch some tests.
Please do not try to run out of docker image as this brings the correct version and patch some tests.
Please note that the script only run tests that are currently supported
You can just build the image with

View File

@ -0,0 +1,149 @@
import pytest
import redis
from . import dfly_args
import aioredis
import asyncio
BASE_PORT = 30001
@dfly_args({})
class TestNotEmulated:
def test_cluster_commands_fails_when_not_emulate(self, client: redis.Redis):
with pytest.raises(redis.ResponseError) as respErr:
client.execute_command("CLUSTER HELP")
assert "cluster_mode" in str(respErr.value)
with pytest.raises(redis.ResponseError) as respErr:
client.execute_command("CLUSTER SLOTS")
assert "emulated" in str(respErr.value)
@dfly_args({"cluster_mode": "emulated"})
class TestEmulated:
def test_cluster_slots_command(self, cluster_client: redis.RedisCluster):
expected = {(0, 16383): {'primary': (
'127.0.0.1', 6379), 'replicas': []}}
res = cluster_client.execute_command("CLUSTER SLOTS")
assert expected == res
def test_cluster_help_command(self, cluster_client: redis.RedisCluster):
# `target_nodes` is necessary because CLUSTER HELP is not mapped on redis-py
res = cluster_client.execute_command(
"CLUSTER HELP", target_nodes=redis.RedisCluster.RANDOM)
assert "HELP" in res
assert "SLOTS" in res
def test_cluster_pipeline(self, cluster_client: redis.RedisCluster):
pipeline = cluster_client.pipeline()
pipeline.set("foo", "bar")
pipeline.get("foo")
val = pipeline.execute()
assert val == [True, "bar"]
@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"})
class TestEmulatedWithAnnounceIp:
def test_cluster_slots_command(self, cluster_client: redis.RedisCluster):
expected = {(0, 16383): {'primary': (
'127.0.0.2', 6379), 'replicas': []}}
res = cluster_client.execute_command("CLUSTER SLOTS")
assert expected == res
def verify_slots_result(ip: str, port: int, answer: list, rep_ip: str = None, rep_port: int = None) -> bool:
def is_local_host(ip: str) -> bool:
return ip == '127.0.0.1' or ip == 'localhost'
assert answer[0] == 0 # start shard
assert answer[1] == 16383 # last shard
if rep_ip is not None:
assert len(answer) == 4 # the network info
rep_info = answer[3]
assert len(rep_info) == 3
ip_addr = str(rep_info[0], 'utf-8')
assert ip_addr == rep_ip or (
is_local_host(ip_addr) and is_local_host(ip))
assert rep_info[1] == rep_port
else:
assert len(answer) == 3
info = answer[2]
assert len(info) == 3
ip_addr = str(info[0], 'utf-8')
assert ip_addr == ip or (is_local_host(ip_addr) and is_local_host(ip))
assert info[1] == port
return True
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"})
@pytest.mark.asyncio
async def test_cluster_slots_in_replicas(df_local_factory):
master = df_local_factory.create(port=BASE_PORT)
replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True)
df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port)
c_replica = aioredis.Redis(port=replica.port)
res = await c_replica.execute_command("CLUSTER SLOTS")
assert len(res) == 1
assert verify_slots_result(
ip="127.0.0.1", port=replica.port, answer=res[0])
res = await c_master.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
ip="127.0.0.1", port=master.port, answer=res[0])
# Connect replica to master
rc = await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert str(rc, 'utf-8') == "OK"
await asyncio.sleep(0.5)
res = await c_replica.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port)
res = await c_master.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port)
@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"})
@pytest.mark.asyncio
async def test_cluster_info(async_pool):
conn = aioredis.Redis(connection_pool=async_pool)
res = await conn.execute_command("CLUSTER INFO")
assert len(res) == 16
assert res == {'cluster_current_epoch': '1',
'cluster_known_nodes': '1',
'cluster_my_epoch': '1',
'cluster_size': '0',
'cluster_slots_assigned': '16383',
'cluster_slots_fail': '0',
'cluster_slots_ok': '16383',
'cluster_slots_pfail': '0',
'cluster_state': 'ok',
'cluster_stats_messages_meet_received': '0',
'cluster_stats_messages_ping_received': '1',
'cluster_stats_messages_ping_sent': '1',
'cluster_stats_messages_pong_received': '1',
'cluster_stats_messages_pong_sent': '1',
'cluster_stats_messages_received': '1',
'cluster_stats_messages_sent': '1'
}
@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"})
@pytest.mark.asyncio
async def test_cluster_nodes(async_pool):
conn = aioredis.Redis(connection_pool=async_pool)
res = await conn.execute_command("CLUSTER NODES")
assert len(res) == 1
info = res['127.0.0.2:6379@6379']
assert res is not None
assert info['connected'] == False
assert info['epoch'] == '1'
assert info['flags'] == 'myself,master'
assert info['last_ping_sent'] == '0'
assert info['slots'] == [['0', '16383']]
assert info['master_id'] == "-"

View File

@ -119,6 +119,19 @@ def client(sync_pool):
return client
@pytest.fixture(scope="function")
def cluster_client(df_server):
"""
Return a cluster client to the default instance with all entries flushed.
"""
client = redis.RedisCluster(decode_responses=True, host="localhost",
port=df_server.port)
client.flushall()
yield client
client.disconnect_connection_pools()
@pytest_asyncio.fixture(scope="function")
async def async_pool(df_server: DflyInstance):
pool = aioredis.ConnectionPool(host="localhost", port=df_server.port,

View File

@ -4,6 +4,7 @@ import asyncio
import aioredis
import async_timeout
async def run_monitor_eval(monitor, expected):
async with monitor as mon:
count = 0
@ -28,16 +29,20 @@ async def run_monitor_eval(monitor, expected):
Test issue https://github.com/dragonflydb/dragonfly/issues/756
Monitor command do not return when we have lua script issue
'''
@pytest.mark.asyncio
async def test_monitor_command_lua(async_pool):
expected = ["EVAL return redis", "GET bar", "EVAL return redis", "SET foo2"]
expected = ["EVAL return redis", "GET bar",
"EVAL return redis", "SET foo2"]
conn = aioredis.Redis(connection_pool=async_pool)
monitor = conn.monitor()
cmd1 = aioredis.Redis(connection_pool=async_pool)
future = asyncio.create_task(run_monitor_eval(monitor=monitor, expected=expected))
await asyncio.sleep(1)
future = asyncio.create_task(run_monitor_eval(
monitor=monitor, expected=expected))
await asyncio.sleep(0.1)
try:
res = await cmd1.eval(r'return redis.call("GET", "bar")', 0)
assert False # this will return an error
@ -60,6 +65,8 @@ Open connection which is used for monitoring
Then send on other connection commands to dragonfly instance
Make sure that we are getting the commands in the monitor context
'''
@pytest.mark.asyncio
async def test_monitor_command(async_pool):
def generate(max):
@ -87,7 +94,8 @@ async def process_cmd(monitor, key, value):
if "select" not in response["command"].lower():
success = verify_response(response, key, value)
if not success:
print(f"failed to verify message {response} for {key}/{value}")
print(
f"failed to verify message {response} for {key}/{value}")
return False, f"failed on the verification of the message {response} at {key}: {value}"
else:
return True, None

View File

@ -28,5 +28,5 @@
TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \
"test/helpers/*.ts" "test/unit/**/*.ts" "test/functional/**/*.ts" \
-g "should reload scripts on redis restart|should reconnect if reconnectOnError|should be supported in transaction blocks|rejects when monitor is disabled|should resend unfulfilled commands to the correct|should set the name before any subscribe|should name the connection if options|scanStream|does not fallback to EVAL|should try to use EVALSHA and fallback to EVAL|should use evalsha when script|should affect the old way|should support Map|should support object|should batch all commands before ready event|should support key prefixing for sort|should be sent on the connect event" \
-g "should reload scripts on redis restart|should reconnect if reconnectOnError|should be supported in transaction blocks|rejects when monitor is disabled|should resend unfulfilled commands to the correct|should set the name before any subscribe|should name the connection if options|scanStream|does not fallback to EVAL|should try to use EVALSHA and fallback to EVAL|should use evalsha when script|should affect the old way|should support Map|should support object|should batch all commands before ready event|should support key prefixing for sort|should be sent on the connect event|spub|ssub|should support parallel script execution|works for moved" \
--invert

View File

@ -8,7 +8,7 @@ if [ "$1" = "--build" ]; then
fi
# run the tests
echo "runniing ioredis tess"
echo "running ioredis tests"
docker run --rm -i --network=host ioredis-test ./run_tests.sh
if [ $? -ne 0 ];then
echo "some tests failed - please look at the output from this run"