Fix flush when migrating connection (#2407)

fix: don't miss flush for control messages
This commit is contained in:
Vladislav 2024-01-13 09:57:33 +03:00 committed by GitHub
parent b3e36b0caa
commit 484b4de216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 1 deletions

View File

@ -373,6 +373,10 @@ bool Connection::MessageHandle::IsPubMsg() const {
return holds_alternative<PubMessagePtr>(handle);
}
bool Connection::MessageHandle::IsReplying() const {
return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle);
}
void Connection::DispatchOperations::operator()(const MonitorMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
rbuilder->SendSimpleString(msg);
@ -1257,6 +1261,13 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
MessageHandle msg = std::move(dispatch_q_.front());
dispatch_q_.pop_front();
// We keep the batch mode enabled as long as the dispatch queue is not empty, relying on the
// last command to reply and flush. If it doesn't reply (i.e. is a control message like
// migrate), we have to flush manually.
if (dispatch_q_.empty() && !msg.IsReplying()) {
builder->FlushBatch();
}
if (ShouldEndDispatchFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();

View File

@ -136,11 +136,12 @@ class Connection : public util::Connection {
size_t UsedMemory() const; // How much bytes this handle takes up in total.
// Intrusive messages put themselves at the front of the queue, but only after all other
// intrusive ones. Used for quick transfer or control / update messages.
// intrusive ones. Used for quick transfer of control / update messages.
bool IsIntrusive() const;
bool IsPipelineMsg() const;
bool IsPubMsg() const;
bool IsReplying() const; // control messges don't reply, messages carrying data do
std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, AclUpdateMessagePtr,
MigrationRequestMessage, CheckpointMessage, InvalidationMessage>

View File

@ -510,6 +510,38 @@ async def test_parser_while_script_running(async_client: aioredis.Redis, df_serv
await writer.wait_closed()
"""
This test makes sure that we can migrate while handling pipelined commands and don't keep replies
batched even if the stream suddenly stops.
"""
@dfly_args({"proactor_threads": "4", "pipeline_squash": 0})
async def test_pipeline_batching_while_migrating(
async_client: aioredis.Redis, df_server: DflyInstance
):
sha = await async_client.script_load("return redis.call('GET', KEYS[1])")
reader, writer = await asyncio.open_connection("localhost", df_server.port)
# First, write a EVALSHA that will ask for migration (75% it's on the wrong shard)
# and some more pipelined commands that will keep Dragonfly busy
incrs = "".join("INCR a\r\n" for _ in range(50))
writer.write((f"EVALSHA {sha} 1 a\r\n" + incrs).encode())
await writer.drain()
# We migrate only when the socket wakes up, so send another batch to trigger migration
writer.write("INCR a\r\n".encode())
await writer.drain()
# Make sure we recived all replies
reply = await reader.read(520)
assert reply.decode().strip().endswith("51")
writer.close()
await writer.wait_closed()
@dfly_args({"proactor_threads": 1})
async def test_large_cmd(async_client: aioredis.Redis):
MAX_ARR_SIZE = 65535