mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
Don't write replies if close the client ASAP (#7202)
Before this commit, we would have continued to add replies to the reply buffer even if client output buffer limit is reached, so the used memory would keep increasing over the configured limit. What's more, we shouldn’t write any reply to the client if it is set 'CLIENT_CLOSE_ASAP' flag because that doesn't conform to its definition and we will close all clients flagged with 'CLIENT_CLOSE_ASAP' in ‘beforeSleep’. Because of code execution order, before this, we may firstly write to part of the replies to the socket before disconnecting it, but in fact, we may can’t send the full replies to clients since OS socket buffer is limited. But this unexpected behavior makes some commands work well, for instance ACL DELUSER, if the client deletes the current user, we need to send reply to client and close the connection, but before, we close the client firstly and write the reply to reply buffer. secondly, we shouldn't do this despite the fact it works well in most cases. We add a flag 'CLIENT_CLOSE_AFTER_COMMAND' to mark clients, this flag means we will close the client after executing commands and send all entire replies, so that we can write replies to reply buffer during executing commands, send replies to clients, and close them later. We also fix some implicit problems. If client output buffer limit is enforced in 'multi/exec', all commands will be executed completely in redis and clients will not read any reply instead of partial replies. Even more, if the client executes 'ACL deluser' the using user in 'multi/exec', it will not read the replies after 'ACL deluser' just like before executing 'client kill' itself in 'multi/exec'. We added some tests for output buffer limit breach during multi-exec and using a pipeline of many small commands rather than one with big response. Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
b464afb9e2
commit
57709c4bc6
@ -297,7 +297,13 @@ void ACLFreeUserAndKillClients(user *u) {
|
||||
* it in non authenticated mode. */
|
||||
c->user = DefaultUser;
|
||||
c->authenticated = 0;
|
||||
freeClientAsync(c);
|
||||
/* We will write replies to this client later, so we can't
|
||||
* close it directly even if async. */
|
||||
if (c == server.current_client) {
|
||||
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
|
||||
} else {
|
||||
freeClientAsync(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
ACLFreeUser(u);
|
||||
|
@ -5538,7 +5538,13 @@ void revokeClientAuthentication(client *c) {
|
||||
|
||||
c->user = DefaultUser;
|
||||
c->authenticated = 0;
|
||||
freeClientAsync(c);
|
||||
/* We will write replies to this client later, so we can't close it
|
||||
* directly even if async. */
|
||||
if (c == server.current_client) {
|
||||
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
|
||||
} else {
|
||||
freeClientAsync(c);
|
||||
}
|
||||
}
|
||||
|
||||
/* Cleanup all clients that have been authenticated with this module. This
|
||||
|
@ -224,6 +224,9 @@ int prepareClientToWrite(client *c) {
|
||||
* handler since there is no socket at all. */
|
||||
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
|
||||
|
||||
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
|
||||
|
||||
/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
|
||||
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
|
||||
|
||||
@ -1471,6 +1474,9 @@ int handleClientsWithPendingWrites(void) {
|
||||
* that may trigger write error or recreate handler. */
|
||||
if (c->flags & CLIENT_PROTECTED) continue;
|
||||
|
||||
/* Don't write to clients that are going to be closed anyway. */
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
|
||||
/* Try to write buffers to the client socket. */
|
||||
if (writeToClient(c,0) == C_ERR) continue;
|
||||
|
||||
@ -3160,6 +3166,14 @@ int handleClientsWithPendingWritesUsingThreads(void) {
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = listNodeValue(ln);
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
|
||||
/* Remove clients from the list of pending writes since
|
||||
* they are going to be closed ASAP. */
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) {
|
||||
listDelNode(server.clients_pending_write, ln);
|
||||
continue;
|
||||
}
|
||||
|
||||
int target_id = item_id % server.io_threads_num;
|
||||
listAddNodeTail(io_threads_list[target_id],c);
|
||||
item_id++;
|
||||
|
@ -3427,6 +3427,13 @@ void call(client *c, int flags) {
|
||||
dirty = server.dirty-dirty;
|
||||
if (dirty < 0) dirty = 0;
|
||||
|
||||
/* After executing command, we will close the client after writing entire
|
||||
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
|
||||
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
|
||||
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
|
||||
}
|
||||
|
||||
/* When EVAL is called loading the AOF we don't want commands called
|
||||
* from Lua to go into the slowlog or to populate statistics. */
|
||||
if (server.loading && c->flags & CLIENT_LUA)
|
||||
|
@ -265,6 +265,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
about writes performed by myself.*/
|
||||
#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
|
||||
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
|
||||
#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands
|
||||
* and writing entire reply. */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
|
@ -260,6 +260,23 @@ start_server {tags {"acl"}} {
|
||||
catch {r ACL help xxx} e
|
||||
assert_match "*Unknown subcommand or wrong number of arguments*" $e
|
||||
}
|
||||
|
||||
test {Delete a user that the client doesn't use} {
|
||||
r ACL setuser not_used on >passwd
|
||||
assert {[r ACL deluser not_used] == 1}
|
||||
# The client is not closed
|
||||
assert {[r ping] eq {PONG}}
|
||||
}
|
||||
|
||||
test {Delete a user that the client is using} {
|
||||
r ACL setuser using on +acl >passwd
|
||||
r AUTH using passwd
|
||||
# The client will receive reply normally
|
||||
assert {[r ACL deluser using] == 1}
|
||||
# The client is closed
|
||||
catch {[r ping]} e
|
||||
assert_match "*I/O error*" $e
|
||||
}
|
||||
}
|
||||
|
||||
set server_path [tmpdir "server.acl"]
|
||||
|
@ -70,4 +70,94 @@ start_server {tags {"obuf-limits"}} {
|
||||
assert {$omem >= 100000 && $time_elapsed < 6}
|
||||
$rd1 close
|
||||
}
|
||||
|
||||
test {No response for single command if client output buffer hard limit is enforced} {
|
||||
r config set client-output-buffer-limit {normal 100000 0 0}
|
||||
# Total size of all items must be more than 100k
|
||||
set item [string repeat "x" 1000]
|
||||
for {set i 0} {$i < 150} {incr i} {
|
||||
r lpush mylist $item
|
||||
}
|
||||
set orig_mem [s used_memory]
|
||||
# Set client name and get all items
|
||||
set rd [redis_deferring_client]
|
||||
$rd client setname mybiglist
|
||||
assert {[$rd read] eq "OK"}
|
||||
$rd lrange mylist 0 -1
|
||||
$rd flush
|
||||
after 100
|
||||
|
||||
# Before we read reply, redis will close this client.
|
||||
set clients [r client list]
|
||||
assert_no_match "*name=mybiglist*" $clients
|
||||
set cur_mem [s used_memory]
|
||||
# 10k just is a deviation threshold
|
||||
assert {$cur_mem < 10000 + $orig_mem}
|
||||
|
||||
# Read nothing
|
||||
set fd [$rd channel]
|
||||
assert_equal {} [read $fd]
|
||||
}
|
||||
|
||||
test {No response for multi commands in pipeline if client output buffer limit is enforced} {
|
||||
r config set client-output-buffer-limit {normal 100000 0 0}
|
||||
set value [string repeat "x" 10000]
|
||||
r set bigkey $value
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
$rd2 client setname multicommands
|
||||
assert_equal "OK" [$rd2 read]
|
||||
# Let redis sleep 2s firstly
|
||||
$rd1 debug sleep 2
|
||||
$rd1 flush
|
||||
after 100
|
||||
|
||||
# Total size should be less than OS socket buffer, redis can
|
||||
# execute all commands in this pipeline when it wakes up.
|
||||
for {set i 0} {$i < 15} {incr i} {
|
||||
$rd2 set $i $i
|
||||
$rd2 get $i
|
||||
$rd2 del $i
|
||||
# One bigkey is 10k, total response size must be more than 100k
|
||||
$rd2 get bigkey
|
||||
}
|
||||
$rd2 flush
|
||||
after 100
|
||||
|
||||
# Reds must wake up if it can send reply
|
||||
assert_equal "PONG" [r ping]
|
||||
set clients [r client list]
|
||||
assert_no_match "*name=multicommands*" $clients
|
||||
set fd [$rd2 channel]
|
||||
assert_equal {} [read $fd]
|
||||
}
|
||||
|
||||
test {Execute transactions completely even if client output buffer limit is enforced} {
|
||||
r config set client-output-buffer-limit {normal 100000 0 0}
|
||||
# Total size of all items must be more than 100k
|
||||
set item [string repeat "x" 1000]
|
||||
for {set i 0} {$i < 150} {incr i} {
|
||||
r lpush mylist2 $item
|
||||
}
|
||||
|
||||
# Output buffer limit is enforced during executing transaction
|
||||
r client setname transactionclient
|
||||
r set k1 v1
|
||||
r multi
|
||||
r set k2 v2
|
||||
r get k2
|
||||
r lrange mylist2 0 -1
|
||||
r set k3 v3
|
||||
r del k1
|
||||
catch {[r exec]} e
|
||||
assert_match "*I/O error*" $e
|
||||
reconnect
|
||||
set clients [r client list]
|
||||
assert_no_match "*name=transactionclient*" $clients
|
||||
|
||||
# Transactions should be executed completely
|
||||
assert_equal {} [r get k1]
|
||||
assert_equal "v2" [r get k2]
|
||||
assert_equal "v3" [r get k3]
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user