mirror of
https://github.com/dragonflydb/dragonfly
synced 2024-11-22 15:44:13 +00:00
fix: MONITOR now works for multi transactions (#1675)
* fix: fix monitoring for multi transactions Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
d8ab016792
commit
71fa2f275e
@ -28,6 +28,10 @@ class ConnectionContext {
|
||||
return owner_;
|
||||
}
|
||||
|
||||
const Connection* owner() const {
|
||||
return owner_;
|
||||
}
|
||||
|
||||
Protocol protocol() const {
|
||||
return protocol_;
|
||||
}
|
||||
|
@ -110,13 +110,16 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
|
||||
if (owner) {
|
||||
protocol_ = owner->protocol();
|
||||
}
|
||||
switch (protocol_) {
|
||||
case Protocol::REDIS:
|
||||
rbuilder_.reset(new RedisReplyBuilder(stream));
|
||||
break;
|
||||
case Protocol::MEMCACHE:
|
||||
rbuilder_.reset(new MCReplyBuilder(stream));
|
||||
break;
|
||||
|
||||
if (stream) {
|
||||
switch (protocol_) {
|
||||
case Protocol::REDIS:
|
||||
rbuilder_.reset(new RedisReplyBuilder(stream));
|
||||
break;
|
||||
case Protocol::MEMCACHE:
|
||||
rbuilder_.reset(new MCReplyBuilder(stream));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
conn_closing = false;
|
||||
|
@ -76,6 +76,18 @@ const CommandId* StoredCmd::Cid() const {
|
||||
return cid_;
|
||||
}
|
||||
|
||||
ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction* tx,
|
||||
facade::CapturingReplyBuilder* crb)
|
||||
: facade::ConnectionContext(nullptr, nullptr), transaction{tx} {
|
||||
if (tx) { // If we have a carrier transaction, this context is used for squashing
|
||||
DCHECK(owner);
|
||||
conn_state.db_index = owner->conn_state.db_index;
|
||||
conn_state.squashing_info = {owner};
|
||||
}
|
||||
auto* prev_reply_builder = Inject(crb);
|
||||
CHECK_EQ(prev_reply_builder, nullptr);
|
||||
}
|
||||
|
||||
void ConnectionContext::ChangeMonitor(bool start) {
|
||||
// This will either remove or register a new connection
|
||||
// at the "top level" thread --> ServerState context
|
||||
|
@ -123,21 +123,31 @@ struct ConnectionState {
|
||||
DflyVersion repl_version = DflyVersion::VER0;
|
||||
};
|
||||
|
||||
struct SquashingInfo {
|
||||
// Pointer to the original underlying context of the base command.
|
||||
// Only const access it possible for reading from multiple threads,
|
||||
// each squashing thread has its own proxy context that contains this info.
|
||||
const ConnectionContext* owner = nullptr;
|
||||
};
|
||||
|
||||
enum MCGetMask {
|
||||
FETCH_CAS_VER = 1,
|
||||
};
|
||||
|
||||
public:
|
||||
DbIndex db_index = 0;
|
||||
|
||||
// used for memcache set/get commands.
|
||||
// For set op - it's the flag value we are storing along with the value.
|
||||
// For get op - we use it as a mask of MCGetMask values.
|
||||
uint32_t memcache_flag = 0;
|
||||
|
||||
bool is_blocking = false; // whether this connection is blocking on a command
|
||||
|
||||
ExecInfo exec_info;
|
||||
ReplicationInfo replication_info;
|
||||
|
||||
std::optional<SquashingInfo> squashing_info;
|
||||
std::unique_ptr<ScriptInfo> script_info;
|
||||
std::unique_ptr<SubscribeInfo> subscribe_info;
|
||||
};
|
||||
@ -148,10 +158,8 @@ class ConnectionContext : public facade::ConnectionContext {
|
||||
: facade::ConnectionContext(stream, owner) {
|
||||
}
|
||||
|
||||
ConnectionContext(Transaction* tx, facade::CapturingReplyBuilder* crb)
|
||||
: facade::ConnectionContext(nullptr, nullptr), transaction{tx} {
|
||||
delete Inject(crb); // deletes the previous reply builder.
|
||||
}
|
||||
ConnectionContext(const ConnectionContext* owner, Transaction* tx,
|
||||
facade::CapturingReplyBuilder* crb);
|
||||
|
||||
struct DebugInfo {
|
||||
uint32_t shards_count = 0;
|
||||
|
@ -38,7 +38,7 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
|
||||
} // namespace
|
||||
|
||||
JournalExecutor::JournalExecutor(Service* service)
|
||||
: service_{service}, reply_builder_{facade::ReplyMode::NONE}, conn_context_{nullptr,
|
||||
: service_{service}, reply_builder_{facade::ReplyMode::NONE}, conn_context_{nullptr, nullptr,
|
||||
&reply_builder_} {
|
||||
conn_context_.is_replicating = true;
|
||||
conn_context_.journal_emulated = true;
|
||||
|
@ -193,27 +193,31 @@ auto CmdEntryToMonitorFormat(std::string_view str) -> std::string {
|
||||
return result;
|
||||
}
|
||||
|
||||
std::string MakeMonitorMessage(const ConnectionState& conn_state,
|
||||
const facade::Connection* connection, CmdArgList args) {
|
||||
std::string message = absl::StrCat(CreateMonitorTimestamp(), " [", conn_state.db_index);
|
||||
std::string MakeMonitorMessage(const ConnectionContext* cntx, const CommandId* cid,
|
||||
CmdArgList tail_args) {
|
||||
std::string message = absl::StrCat(CreateMonitorTimestamp(), " [", cntx->conn_state.db_index);
|
||||
|
||||
if (conn_state.script_info) {
|
||||
absl::StrAppend(&message, " lua] ");
|
||||
if (cntx->conn_state.squashing_info)
|
||||
cntx = cntx->conn_state.squashing_info->owner;
|
||||
|
||||
string endpoint;
|
||||
if (cntx->conn_state.script_info) {
|
||||
endpoint = "lua";
|
||||
} else if (const auto* conn = cntx->owner(); conn != nullptr) {
|
||||
endpoint = conn->RemoteEndpointStr();
|
||||
} else {
|
||||
auto endpoint = connection == nullptr ? "REPLICATION:0" : connection->RemoteEndpointStr();
|
||||
absl::StrAppend(&message, " ", endpoint, "] ");
|
||||
}
|
||||
if (args.empty()) {
|
||||
absl::StrAppend(&message, "error - empty cmd list!");
|
||||
} else if (auto cmd_name = std::string_view(args[0].data(), args[0].size());
|
||||
cmd_name == "AUTH") { // we cannot just send auth details in this case
|
||||
absl::StrAppend(&message, "\"", cmd_name, "\"");
|
||||
} else {
|
||||
message = std::accumulate(args.begin(), args.end(), message, [](auto str, const auto& cmd) {
|
||||
absl::StrAppend(&str, " ", CmdEntryToMonitorFormat(std::string_view(cmd.data(), cmd.size())));
|
||||
return str;
|
||||
});
|
||||
endpoint = "REPLICATION:0";
|
||||
}
|
||||
absl::StrAppend(&message, " ", endpoint, "] ");
|
||||
|
||||
absl::StrAppend(&message, "\"", cid->name(), "\"");
|
||||
|
||||
if (cid->name() == "AUTH")
|
||||
return message;
|
||||
|
||||
for (auto arg : tail_args)
|
||||
absl::StrAppend(&message, " ", CmdEntryToMonitorFormat(facade::ToSV(arg)));
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
@ -231,9 +235,9 @@ void SendMonitor(const std::string& msg) {
|
||||
}
|
||||
}
|
||||
|
||||
void DispatchMonitor(ConnectionContext* cntx, CmdArgList args) {
|
||||
void DispatchMonitor(ConnectionContext* cntx, const CommandId* cid, CmdArgList tail_args) {
|
||||
// We have connections waiting to get the info on the last command, send it to them
|
||||
string monitor_msg = MakeMonitorMessage(cntx->conn_state, cntx->owner(), args);
|
||||
string monitor_msg = MakeMonitorMessage(cntx, cid, tail_args);
|
||||
|
||||
VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it";
|
||||
|
||||
@ -909,14 +913,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||
return (*cntx)->SendSimpleString("QUEUED");
|
||||
}
|
||||
|
||||
// We are not sending any admin command in the monitor, and we do not want to
|
||||
// do any processing if we don't have any waiting connections with monitor
|
||||
// enabled on them - see https://redis.io/commands/monitor/
|
||||
const MonitorsRepo& monitors = etl.Monitors();
|
||||
if (!monitors.Empty() && (cid->opt_mask() & CO::ADMIN) == 0) {
|
||||
DispatchMonitor(dfly_cntx, args);
|
||||
}
|
||||
|
||||
uint64_t start_ns = absl::GetCurrentTimeNanos();
|
||||
|
||||
if (cid->opt_mask() & CO::DENYOOM) {
|
||||
@ -990,6 +986,15 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
|
||||
return true; // return false only for internal error aborts
|
||||
}
|
||||
|
||||
// We are not sending any admin command in the monitor, and we do not want to
|
||||
// do any processing if we don't have any waiting connections with monitor
|
||||
// enabled on them - see https://redis.io/commands/monitor/
|
||||
ServerState& etl = *ServerState::tlocal();
|
||||
const MonitorsRepo& monitors = etl.Monitors();
|
||||
if (!monitors.Empty() && (cid->opt_mask() & CO::ADMIN) == 0) {
|
||||
DispatchMonitor(cntx, cid, tail_args);
|
||||
}
|
||||
|
||||
try {
|
||||
cid->Invoke(tail_args, cntx);
|
||||
} catch (std::exception& e) {
|
||||
@ -999,7 +1004,6 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
|
||||
|
||||
if (record_stats) {
|
||||
DCHECK(cntx->transaction);
|
||||
ServerState& etl = *ServerState::tlocal();
|
||||
bool is_ooo = cntx->transaction->IsOOO();
|
||||
|
||||
cntx->last_command_debug.clock = cntx->transaction->txid();
|
||||
|
@ -130,9 +130,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
|
||||
|
||||
auto* local_tx = sinfo.local_tx.get();
|
||||
facade::CapturingReplyBuilder crb;
|
||||
ConnectionContext local_cntx{local_tx, &crb};
|
||||
local_cntx.conn_state.db_index = cntx_->conn_state.db_index;
|
||||
|
||||
ConnectionContext local_cntx{cntx_, local_tx, &crb};
|
||||
absl::InlinedVector<MutableSlice, 4> arg_vec;
|
||||
|
||||
for (auto* cmd : sinfo.cmds) {
|
||||
|
@ -112,9 +112,10 @@ class DflyInstance:
|
||||
def format_args(args):
|
||||
out = []
|
||||
for k, v in args.items():
|
||||
out.append(f"--{k}")
|
||||
if v is not None:
|
||||
out.append(str(v))
|
||||
out.append(f"--{k}={v}")
|
||||
else:
|
||||
out.append(f"--{k}")
|
||||
return out
|
||||
|
||||
async def metrics(self):
|
||||
|
@ -5,147 +5,135 @@ from redis import asyncio as aioredis
|
||||
from redis.exceptions import ConnectionError as redis_conn_error
|
||||
import async_timeout
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from . import DflyInstance, dfly_args
|
||||
|
||||
BASE_PORT = 1111
|
||||
|
||||
|
||||
async def run_monitor_eval(monitor, expected):
|
||||
async with monitor as mon:
|
||||
count = 0
|
||||
max = len(expected)
|
||||
while count < max:
|
||||
@dataclass(frozen=True)
|
||||
class CollectedRedisMsg:
|
||||
cmd: str
|
||||
src: str = "tcp"
|
||||
|
||||
@staticmethod
|
||||
def all_from_src(*args, src="tcp"):
|
||||
return [CollectedRedisMsg(arg, src) for arg in args]
|
||||
|
||||
|
||||
class CollectingMonitor:
|
||||
"""Tracks all monitor messages between start() and stop()"""
|
||||
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.messages = []
|
||||
self._monitor_task = None
|
||||
|
||||
async def _monitor(self):
|
||||
async with self.client.monitor() as monitor:
|
||||
async for message in monitor.listen():
|
||||
self.messages.append(CollectedRedisMsg(message["command"], message["client_type"]))
|
||||
|
||||
async def start(self):
|
||||
if self._monitor_task is None:
|
||||
self._monitor_task = asyncio.create_task(self._monitor())
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def stop(self):
|
||||
if self._monitor_task:
|
||||
self._monitor_task.cancel()
|
||||
try:
|
||||
async with async_timeout.timeout(1):
|
||||
response = await mon.next_command()
|
||||
if "select" not in response["command"].lower():
|
||||
cmd = expected[count]
|
||||
if cmd not in response["command"]:
|
||||
print(f"command {response['command']} != {cmd}")
|
||||
return False
|
||||
else:
|
||||
count = count + 1
|
||||
except Exception as e:
|
||||
print(f"failed to monitor: {e}")
|
||||
return False
|
||||
return True
|
||||
await self._monitor_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._monitor_task = None
|
||||
|
||||
if len(self.messages) > 0 and self.messages[0].cmd == "SELECT 1":
|
||||
self.messages = self.messages[1:]
|
||||
return self.messages
|
||||
|
||||
|
||||
"""
|
||||
Test issue https://github.com/dragonflydb/dragonfly/issues/756
|
||||
Monitor command do not return when we have lua script issue
|
||||
"""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_monitor_command_lua(async_pool):
|
||||
expected = ["EVAL return redis", "EVAL return redis", "SET foo2"]
|
||||
|
||||
conn = aioredis.Redis(connection_pool=async_pool)
|
||||
monitor = conn.monitor()
|
||||
|
||||
cmd1 = aioredis.Redis(connection_pool=async_pool)
|
||||
future = asyncio.create_task(run_monitor_eval(monitor=monitor, expected=expected))
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
try:
|
||||
res = await cmd1.eval(r'return redis.call("GET", "bar")', 0)
|
||||
assert False # this will return an error
|
||||
except Exception as e:
|
||||
assert "script tried accessing undeclared key" in str(e)
|
||||
|
||||
try:
|
||||
res = await cmd1.eval(r'return redis.call("SET", KEYS[1], ARGV[1])', 1, "foo2", "bar2")
|
||||
except Exception as e:
|
||||
print(f"EVAL error: {e}")
|
||||
assert False
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
await future
|
||||
status = future.result()
|
||||
assert status
|
||||
|
||||
|
||||
"""
|
||||
Test the monitor command.
|
||||
Open connection which is used for monitoring
|
||||
Then send on other connection commands to dragonfly instance
|
||||
Make sure that we are getting the commands in the monitor context
|
||||
Test MONITOR command with basic use case
|
||||
"""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({"proactor_threads": 4})
|
||||
async def test_monitor_command(async_pool):
|
||||
def generate(max):
|
||||
for i in range(max):
|
||||
yield f"key{i}", f"value={i}"
|
||||
monitor = CollectingMonitor(aioredis.Redis(connection_pool=async_pool))
|
||||
await monitor.start()
|
||||
|
||||
messages = {a: b for a, b in generate(5)}
|
||||
assert await run_monitor(messages, async_pool)
|
||||
c = aioredis.Redis(connection_pool=async_pool)
|
||||
await c.set("a", 1)
|
||||
await c.get("a")
|
||||
await c.lpush("l", "V")
|
||||
await c.lpop("l")
|
||||
|
||||
collected = await monitor.stop()
|
||||
expected = CollectedRedisMsg.all_from_src("SET a 1", "GET a", "LPUSH l V", "LPOP l")
|
||||
|
||||
assert expected == collected
|
||||
|
||||
|
||||
def verify_response(monitor_response: dict, key: str, value: str) -> bool:
|
||||
if monitor_response is None:
|
||||
return False
|
||||
if monitor_response["db"] == 1 and monitor_response["client_type"] == "tcp":
|
||||
return key in monitor_response["command"] and value in monitor_response["command"]
|
||||
else:
|
||||
return False
|
||||
"""
|
||||
Test MONITOR command with MULTI/EXEC transaction with squashing
|
||||
"""
|
||||
|
||||
|
||||
async def process_cmd(monitor, key, value):
|
||||
while True:
|
||||
try:
|
||||
async with async_timeout.timeout(1):
|
||||
response = await monitor.next_command()
|
||||
if "select" not in response["command"].lower():
|
||||
success = verify_response(response, key, value)
|
||||
if not success:
|
||||
print(f"failed to verify message {response} for {key}/{value}")
|
||||
return (
|
||||
False,
|
||||
f"failed on the verification of the message {response} at {key}: {value}",
|
||||
)
|
||||
else:
|
||||
return True, None
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({"proactor_threads": 4, "multi_exec_squash": "true"})
|
||||
async def test_monitor_command_multi(async_pool):
|
||||
monitor = CollectingMonitor(aioredis.Redis(connection_pool=async_pool))
|
||||
await monitor.start()
|
||||
|
||||
c = aioredis.Redis(connection_pool=async_pool)
|
||||
p = c.pipeline(transaction=True)
|
||||
|
||||
expected = []
|
||||
for i in range(100):
|
||||
p.lpush(str(i), "V")
|
||||
expected.append(f"LPUSH {i} V")
|
||||
|
||||
await p.execute()
|
||||
|
||||
collected = await monitor.stop()
|
||||
expected = CollectedRedisMsg.all_from_src(*expected)
|
||||
|
||||
# The order is random due to squashing
|
||||
assert set(expected) == set(collected[2:])
|
||||
|
||||
|
||||
async def monitor_cmd(mon: aioredis.client.Monitor, messages: dict):
|
||||
success = None
|
||||
async with mon as monitor:
|
||||
try:
|
||||
for key, value in messages.items():
|
||||
state, msg = await process_cmd(monitor, key, value)
|
||||
if not state:
|
||||
return state, msg
|
||||
return True, "monitor is successfully done"
|
||||
except Exception as e:
|
||||
return False, f"stopping monitor on {e}"
|
||||
"""
|
||||
Test MONITOR command with lua script
|
||||
https://github.com/dragonflydb/dragonfly/issues/756
|
||||
"""
|
||||
|
||||
TEST_MONITOR_SCRIPT = """
|
||||
redis.call('SET', 'A', 1)
|
||||
redis.call('GET', 'A')
|
||||
redis.call('SADD', 'S', 1, 2, 3)
|
||||
redis.call('LPUSH', 'L', 1)
|
||||
redis.call('LPOP', 'L')
|
||||
"""
|
||||
|
||||
|
||||
async def run_monitor(messages: dict, pool: aioredis.ConnectionPool):
|
||||
cmd1 = aioredis.Redis(connection_pool=pool)
|
||||
conn = aioredis.Redis(connection_pool=pool)
|
||||
monitor = conn.monitor()
|
||||
future = asyncio.create_task(monitor_cmd(monitor, messages))
|
||||
success = True
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({"proactor_threads": 4, "lua_auto_async": "false"})
|
||||
async def test_monitor_command_lua(async_pool):
|
||||
monitor = CollectingMonitor(aioredis.Redis(connection_pool=async_pool))
|
||||
await monitor.start()
|
||||
|
||||
# make sure that the monitor task starts before we're sending anything else!
|
||||
await asyncio.sleep(0.01)
|
||||
for key, val in messages.items():
|
||||
res = await cmd1.set(key, val)
|
||||
if not res:
|
||||
success = False
|
||||
break
|
||||
await asyncio.sleep(0.01)
|
||||
await future
|
||||
status, message = future.result()
|
||||
if status and success:
|
||||
return True, "successfully completed all"
|
||||
else:
|
||||
return False, f"monitor result: {status}: {message}, set command success {success}"
|
||||
c = aioredis.Redis(connection_pool=async_pool)
|
||||
await c.eval(TEST_MONITOR_SCRIPT, 3, "A", "S", "L")
|
||||
|
||||
collected = await monitor.stop()
|
||||
expected = CollectedRedisMsg.all_from_src(
|
||||
"SET A 1", "GET A", "SADD S 1 2 3", "LPUSH L 1", "LPOP L", src="lua"
|
||||
)
|
||||
|
||||
assert expected == collected[1:]
|
||||
|
||||
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user