mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 09:17:20 +00:00
Handle output buffer limits for Module blocked clients (#8141)
Module blocked clients cache the response in a temporary client, the reply list in this client would be affected by the recent fix in #7202, but when the reply is later copied into the real client, it would have bypassed all the checks for output buffer limit, which would have resulted in both: responding with a partial response to the client, and also not disconnecting it at all.
This commit is contained in:
parent
a102b21d17
commit
48efc25f74
@ -270,6 +270,9 @@ int prepareClientToWrite(client *c) {
|
||||
* Low level functions to add more data to output buffers.
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Attempts to add the reply to the static buffer in the client struct.
|
||||
* Returns C_ERR if the buffer is full, or the reply list is not empty,
|
||||
* in which case the reply must be added to the reply list. */
|
||||
int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
size_t available = sizeof(c->buf)-c->bufpos;
|
||||
|
||||
@ -287,6 +290,8 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Adds the reply to the reply linked list.
|
||||
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
|
||||
void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||
|
||||
@ -848,14 +853,40 @@ void addReplySubcommandSyntaxError(client *c) {
|
||||
/* Append 'src' client output buffers into 'dst' client output buffers.
|
||||
* This function clears the output buffers of 'src' */
|
||||
void AddReplyFromClient(client *dst, client *src) {
|
||||
/* If the source client contains a partial response due to client output
|
||||
* buffer limits, propagate that to the dest rather than copy a partial
|
||||
* reply. We don't wanna run the risk of copying partial response in case
|
||||
* for some reason the output limits don't reach the same decision (maybe
|
||||
* they changed) */
|
||||
if (src->flags & CLIENT_CLOSE_ASAP) {
|
||||
sds client = catClientInfoString(sdsempty(),dst);
|
||||
freeClientAsync(dst);
|
||||
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
|
||||
sdsfree(client);
|
||||
return;
|
||||
}
|
||||
|
||||
/* First add the static buffer (either into the static buffer or reply list) */
|
||||
addReplyProto(dst,src->buf, src->bufpos);
|
||||
|
||||
/* We need to check with prepareClientToWrite again (after addReplyProto)
|
||||
* since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
|
||||
if (prepareClientToWrite(dst) != C_OK)
|
||||
return;
|
||||
addReplyProto(dst,src->buf, src->bufpos);
|
||||
|
||||
/* We're bypassing _addReplyProtoToList, so we need to add the pre/post
|
||||
* checks in it. */
|
||||
if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||
|
||||
/* Concatenate the reply list into the dest */
|
||||
if (listLength(src->reply))
|
||||
listJoin(dst->reply,src->reply);
|
||||
dst->reply_bytes += src->reply_bytes;
|
||||
src->reply_bytes = 0;
|
||||
src->bufpos = 0;
|
||||
|
||||
/* Check output buffer limits */
|
||||
asyncCloseClientOnOutputBufferLimitReached(dst);
|
||||
}
|
||||
|
||||
/* Copy 'src' client output buffers into 'dst' client output buffers.
|
||||
|
@ -85,6 +85,88 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
RedisModuleString **argv;
|
||||
int argc;
|
||||
RedisModuleBlockedClient *bc;
|
||||
} bg_call_data;
|
||||
|
||||
void *bg_call_worker(void *arg) {
|
||||
bg_call_data *bg = arg;
|
||||
|
||||
// Get Redis module context
|
||||
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
|
||||
|
||||
// Acquire GIL
|
||||
RedisModule_ThreadSafeContextLock(ctx);
|
||||
|
||||
// Call the command
|
||||
const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
|
||||
|
||||
// Release GIL
|
||||
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||
|
||||
// Reply to client
|
||||
if (!rep) {
|
||||
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||
} else {
|
||||
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
|
||||
// Unblock client
|
||||
RedisModule_UnblockClient(bg->bc, NULL);
|
||||
|
||||
/* Free the arguments */
|
||||
for (int i=0; i<bg->argc; i++)
|
||||
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||
RedisModule_Free(bg->argv);
|
||||
RedisModule_Free(bg);
|
||||
|
||||
// Free the Redis module context
|
||||
RedisModule_FreeThreadSafeContext(ctx);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
UNUSED(argv);
|
||||
UNUSED(argc);
|
||||
|
||||
/* Make sure we're not trying to block a client when we shouldn't */
|
||||
int flags = RedisModule_GetContextFlags(ctx);
|
||||
int allFlags = RedisModule_GetContextFlagsAll();
|
||||
if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
|
||||
(flags & REDISMODULE_CTX_FLAGS_MULTI)) {
|
||||
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
|
||||
(flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
|
||||
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Make a copy of the arguments and pass them to the thread. */
|
||||
bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
|
||||
bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
|
||||
bg->argc = argc;
|
||||
for (int i=0; i<argc; i++)
|
||||
bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
|
||||
|
||||
/* Block the client */
|
||||
bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
||||
|
||||
/* Start a thread to handle the request */
|
||||
pthread_t tid;
|
||||
int res = pthread_create(&tid, NULL, bg_call_worker, bg);
|
||||
assert(res == 0);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
||||
UNUSED(argv);
|
||||
UNUSED(argc);
|
||||
@ -120,5 +202,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -72,4 +72,22 @@ start_server {tags {"modules"}} {
|
||||
} e
|
||||
set e
|
||||
} {*ERR*DENY BLOCKING*}
|
||||
|
||||
test {RM_Call from blocked client} {
|
||||
r hset hash foo bar
|
||||
r do_bg_rm_call hgetall hash
|
||||
} {foo bar}
|
||||
|
||||
test {blocked client reaches client output buffer limit} {
|
||||
r hset hash big [string repeat x 50000]
|
||||
r hset hash bada [string repeat x 50000]
|
||||
r hset hash boom [string repeat x 50000]
|
||||
r config set client-output-buffer-limit {normal 100000 0 0}
|
||||
r client setname myclient
|
||||
catch {r do_bg_rm_call hgetall hash} e
|
||||
assert_match "*I/O error*" $e
|
||||
reconnect
|
||||
set clients [r client list]
|
||||
assert_no_match "*name=myclient*" $clients
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user