mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
Fix broken protocol when PUBLISH emits local push inside MULTI (#12326)
When a connection that's subscribe to a channel emits PUBLISH inside MULTI-EXEC, the push notification messes up the EXEC response. e.g. MULTI, PING, PUSH foo bar, PING, EXEC the EXEC's response will contain: PONG, {message foo bar}, 1. and the second PONG will be delivered outside the EXEC's response. Additionally, this PR changes the order of responses in case of a plain PUBLISH (when the current client also subscribed to it), by delivering the push after the command's response instead of before it. This also affects modules calling RM_PublishMessage in a similar way, so that we don't run the risk of getting that push mixed together with the module command's response.
This commit is contained in:
parent
93708c7f6a
commit
8ad8f0f9d8
@ -345,8 +345,8 @@ size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
|
||||
/* Adds the reply to the reply linked list.
|
||||
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
|
||||
void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
||||
listNode *ln = listLast(c->reply);
|
||||
void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
|
||||
listNode *ln = listLast(reply_list);
|
||||
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
|
||||
|
||||
/* Note that 'tail' may be NULL even if we have a tail node, because when
|
||||
@ -374,13 +374,23 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
||||
tail->size = usable_size - sizeof(clientReplyBlock);
|
||||
tail->used = len;
|
||||
memcpy(tail->buf, s, len);
|
||||
listAddNodeTail(c->reply, tail);
|
||||
listAddNodeTail(reply_list, tail);
|
||||
c->reply_bytes += tail->size;
|
||||
|
||||
closeClientOnOutputBufferLimitReached(c, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/* The subscribe / unsubscribe command family has a push as a reply,
|
||||
* or in other words, it responds with a push (or several of them
|
||||
* depending on how many arguments it got), and has no reply. */
|
||||
int cmdHasPushAsReply(struct redisCommand *cmd) {
|
||||
if (!cmd) return 0;
|
||||
return cmd->proc == subscribeCommand || cmd->proc == unsubscribeCommand ||
|
||||
cmd->proc == psubscribeCommand || cmd->proc == punsubscribeCommand ||
|
||||
cmd->proc == ssubscribeCommand || cmd->proc == sunsubscribeCommand;
|
||||
}
|
||||
|
||||
void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||
|
||||
@ -399,8 +409,20 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
|
||||
* buffer offset (see function comment) */
|
||||
reqresSaveClientReplyOffset(c);
|
||||
|
||||
/* If we're processing a push message into the current client (i.e. executing PUBLISH
|
||||
* to a channel which we are subscribed to, then we wanna postpone that message to be added
|
||||
* after the command's reply (specifically important during multi-exec). the exception is
|
||||
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
|
||||
* The check for executing_client also avoids affecting push messages that are part of eviction. */
|
||||
if (c == server.current_client && (c->flags & CLIENT_PUSHING) &&
|
||||
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd))
|
||||
{
|
||||
_addReplyProtoToList(c,server.pending_push_messages,s,len);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t reply_len = _addReplyToBuffer(c,s,len);
|
||||
if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len);
|
||||
if (len > reply_len) _addReplyProtoToList(c,c->reply,s+reply_len,len-reply_len);
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
|
@ -1717,6 +1717,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
* we have to flush them after each command, so when we get here, the list
|
||||
* must be empty. */
|
||||
serverAssert(listLength(server.tracking_pending_keys) == 0);
|
||||
serverAssert(listLength(server.pending_push_messages) == 0);
|
||||
|
||||
/* Send the invalidation messages to clients participating to the
|
||||
* client side caching protocol in broadcasting (BCAST) mode. */
|
||||
@ -2611,6 +2612,7 @@ void initServer(void) {
|
||||
server.unblocked_clients = listCreate();
|
||||
server.ready_keys = listCreate();
|
||||
server.tracking_pending_keys = listCreate();
|
||||
server.pending_push_messages = listCreate();
|
||||
server.clients_waiting_acks = listCreate();
|
||||
server.get_ack_from_slaves = 0;
|
||||
server.paused_actions = 0;
|
||||
@ -3758,7 +3760,14 @@ void afterCommand(client *c) {
|
||||
/* Should be done before trackingHandlePendingKeyInvalidations so that we
|
||||
* reply to client before invalidating cache (makes more sense) */
|
||||
postExecutionUnitOperations();
|
||||
|
||||
/* Flush pending tracking invalidations. */
|
||||
trackingHandlePendingKeyInvalidations();
|
||||
|
||||
/* Flush other pending push messages. only when we are not in nested call.
|
||||
* So the messages are not interleaved with transaction response. */
|
||||
if (!server.execution_nesting)
|
||||
listJoin(c->reply, server.pending_push_messages);
|
||||
}
|
||||
|
||||
/* Check if c->cmd exists, fills `err` with details in case it doesn't.
|
||||
|
@ -1928,6 +1928,7 @@ struct redisServer {
|
||||
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
|
||||
size_t tracking_table_max_keys; /* Max number of keys in tracking table. */
|
||||
list *tracking_pending_keys; /* tracking invalidation keys pending to flush */
|
||||
list *pending_push_messages; /* pending publish or other push messages to flush */
|
||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||
* have to take this state global, in order to pass it to sortCompare() */
|
||||
int sort_desc;
|
||||
|
@ -5,6 +5,18 @@
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
|
||||
int cmd_publish_classic_multi(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc < 3)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
RedisModule_ReplyWithArray(ctx, argc-2);
|
||||
for (int i = 2; i < argc; i++) {
|
||||
int receivers = RedisModule_PublishMessage(ctx, argv[1], argv[i]);
|
||||
RedisModule_ReplyWithLongLong(ctx, receivers);
|
||||
}
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int cmd_publish_classic(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 3)
|
||||
@ -35,6 +47,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx,"publish.classic",cmd_publish_classic,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"publish.classic_multi",cmd_publish_classic_multi,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"publish.shard",cmd_publish_shard,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
|
@ -148,6 +148,19 @@ start_server {overrides {save {900 1}} tags {"modules"}} {
|
||||
$rd_trk close
|
||||
}
|
||||
|
||||
test {publish to self inside rm_call} {
|
||||
r hello 3
|
||||
r subscribe foo
|
||||
|
||||
# published message comes after the response of the command that issued it.
|
||||
assert_equal [r test.rm_call publish foo bar] {1}
|
||||
assert_equal [r read] {message foo bar}
|
||||
|
||||
r unsubscribe foo
|
||||
r hello 2
|
||||
set _ ""
|
||||
} {} {resp3}
|
||||
|
||||
test {test module get/set client name by id api} {
|
||||
catch { r test.getname } e
|
||||
assert_equal "-ERR No name" $e
|
||||
|
@ -13,5 +13,22 @@ start_server {tags {"modules"}} {
|
||||
assert_equal 1 [r publish.classic chan1 world]
|
||||
assert_equal {smessage chan1 hello} [$rd1 read]
|
||||
assert_equal {message chan1 world} [$rd2 read]
|
||||
$rd1 close
|
||||
$rd2 close
|
||||
}
|
||||
|
||||
test {module publish to self with multi message} {
|
||||
r hello 3
|
||||
r subscribe foo
|
||||
|
||||
# published message comes after the response of the command that issued it.
|
||||
assert_equal [r publish.classic_multi foo bar vaz] {1 1}
|
||||
assert_equal [r read] {message foo bar}
|
||||
assert_equal [r read] {message foo vaz}
|
||||
|
||||
r unsubscribe foo
|
||||
r hello 2
|
||||
set _ ""
|
||||
} {} {resp3}
|
||||
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ start_server {tags {"pubsub network"}} {
|
||||
assert_equal {0} [punsubscribe $rd ch*]
|
||||
|
||||
$rd close
|
||||
}
|
||||
} {0} {resp3}
|
||||
|
||||
test "PUNSUBSCRIBE from non-subscribed channels" {
|
||||
set rd1 [redis_deferring_client]
|
||||
@ -451,4 +451,56 @@ start_server {tags {"pubsub network"}} {
|
||||
assert_equal "pmessage * __keyevent@${db}__:new bar" [$rd1 read]
|
||||
$rd1 close
|
||||
}
|
||||
|
||||
test "publish to self inside multi" {
|
||||
r hello 3
|
||||
r subscribe foo
|
||||
r multi
|
||||
r ping abc
|
||||
r publish foo bar
|
||||
r publish foo vaz
|
||||
r ping def
|
||||
assert_equal [r exec] {abc 1 1 def}
|
||||
assert_equal [r read] {message foo bar}
|
||||
assert_equal [r read] {message foo vaz}
|
||||
} {} {resp3}
|
||||
|
||||
test "publish to self inside script" {
|
||||
r hello 3
|
||||
r subscribe foo
|
||||
set res [r eval {
|
||||
redis.call("ping","abc")
|
||||
redis.call("publish","foo","bar")
|
||||
redis.call("publish","foo","vaz")
|
||||
redis.call("ping","def")
|
||||
return "bla"} 0]
|
||||
assert_equal $res {bla}
|
||||
assert_equal [r read] {message foo bar}
|
||||
assert_equal [r read] {message foo vaz}
|
||||
} {} {resp3}
|
||||
|
||||
test "unsubscribe inside multi, and publish to self" {
|
||||
r hello 3
|
||||
|
||||
# Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command,
|
||||
# breaks the multi response, see https://github.com/redis/redis/issues/12207
|
||||
# this is just a temporary sanity test to detect unintended breakage.
|
||||
|
||||
# subscribe for 3 channels actually emits 3 "responses"
|
||||
assert_equal "subscribe foo 1" [r subscribe foo bar baz]
|
||||
assert_equal "subscribe bar 2" [r read]
|
||||
assert_equal "subscribe baz 3" [r read]
|
||||
|
||||
r multi
|
||||
r ping abc
|
||||
r unsubscribe bar
|
||||
r unsubscribe baz
|
||||
r ping def
|
||||
assert_equal [r exec] {abc {unsubscribe bar 2} {unsubscribe baz 1} def}
|
||||
|
||||
# published message comes after the publish command's response.
|
||||
assert_equal [r publish foo vaz] {1}
|
||||
assert_equal [r read] {message foo vaz}
|
||||
} {} {resp3}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user